Your system prompt says “never give medical advice.” Your LLM gives medical advice anyway. System prompts are suggestions, not guarantees – the model will violate them under the right combination of user input and token probabilities. You need a post-generation safety layer that classifies every output before it leaves your API.
The approach is straightforward: run every LLM response through a toxicity classifier and a set of rule-based policy filters, then block or flag anything that trips a threshold. Here’s what the ML piece looks like in five lines:
1
2
3
4
5
6
| from transformers import pipeline
safety_clf = pipeline("text-classification", model="martin-ha/toxic-comment-model")
result = safety_clf("You're worthless and everyone hates you")
print(result)
# [{'label': 'toxic', 'score': 0.9991}]
|
That single classifier already catches overtly harmful outputs. But real-world safety needs go beyond toxicity – you also need policy-specific rules that no off-the-shelf model will cover. This guide builds both layers and combines them into a production pipeline.
The martin-ha/toxic-comment-model is a fine-tuned DistilBERT that gives you a binary toxic/non-toxic score. It runs locally, needs no API key, and handles inference in under 50ms on a CPU for typical LLM outputs.
For more granular results, unitary/toxic-bert breaks toxicity into six categories: toxic, severe toxic, obscene, threat, insult, and identity hate. That granularity matters when you want to block threats immediately but only flag mild insults for human review.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
| from transformers import pipeline
from dataclasses import dataclass
# Load multi-label toxicity classifier
toxicity_clf = pipeline(
"text-classification",
model="unitary/toxic-bert",
top_k=None,
)
# Define per-category thresholds -- lower means stricter
TOXICITY_THRESHOLDS = {
"toxic": 0.7,
"severe_toxic": 0.4,
"threat": 0.4,
"insult": 0.7,
"obscene": 0.7,
"identity_hate": 0.4,
}
@dataclass
class ToxicityResult:
is_toxic: bool
triggered_categories: list
scores: dict
def classify_toxicity(text: str) -> ToxicityResult:
"""Run toxicity classification and check against thresholds."""
raw_scores = toxicity_clf(text)[0]
scores = {item["label"]: item["score"] for item in raw_scores}
triggered = []
for category, threshold in TOXICITY_THRESHOLDS.items():
if scores.get(category, 0.0) >= threshold:
triggered.append(category)
return ToxicityResult(
is_toxic=len(triggered) > 0,
triggered_categories=triggered,
scores=scores,
)
# Test it
result = classify_toxicity("I will find you and destroy everything you care about")
print(f"Toxic: {result.is_toxic}")
print(f"Triggered: {result.triggered_categories}")
print(f"Scores: {result.scores}")
|
Set severe_toxic, threat, and identity_hate thresholds lower than the others. These categories represent serious harm and you want to catch borderline cases. For toxic and insult, a higher threshold avoids flagging blunt-but-harmless language.
Rule-Based Policy Filters#
ML classifiers handle general toxicity, but they know nothing about your specific business rules. If your app shouldn’t give medical diagnoses, recommend specific stocks, or output email addresses, you need explicit policy filters.
Build each filter as a function that returns a list of violations. Chain them together so every output runs through all of them.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
| import re
from dataclasses import dataclass, field
@dataclass
class PolicyViolation:
rule: str
description: str
matched_text: str
def check_medical_advice(text: str) -> list[PolicyViolation]:
"""Flag text that looks like medical diagnosis or prescription advice."""
patterns = [
(r"\byou (?:have|likely have|probably have|might have)\b\s+\w+", "Appears to diagnose a condition"),
(r"\b(?:take|prescribe|recommend)\b\s+\d+\s*(?:mg|ml|mcg)\b", "Suggests specific medication dosage"),
(r"\byou should (?:stop|start) taking\b", "Advises on medication changes"),
]
violations = []
for pattern, desc in patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
for match in matches:
violations.append(PolicyViolation(
rule="no_medical_advice",
description=desc,
matched_text=match,
))
return violations
def check_financial_advice(text: str) -> list[PolicyViolation]:
"""Flag text that recommends specific financial actions."""
patterns = [
(r"\b(?:buy|sell|invest in)\b\s+(?:\$?\w+\s+)?(?:stock|shares|crypto|bitcoin|ETF)\b", "Recommends specific investment"),
(r"\byou should (?:buy|sell|invest)\b", "Gives direct financial advice"),
(r"\bguaranteed (?:return|profit|gain)\b", "Claims guaranteed financial returns"),
]
violations = []
for pattern, desc in patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
for match in matches:
violations.append(PolicyViolation(
rule="no_financial_advice",
description=desc,
matched_text=match,
))
return violations
def check_pii_leakage(text: str) -> list[PolicyViolation]:
"""Flag outputs that contain PII patterns."""
patterns = [
(r"\b\d{3}-\d{2}-\d{4}\b", "Contains SSN pattern"),
(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", "Contains email address"),
(r"\b(?:\+?1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b", "Contains phone number"),
]
violations = []
for pattern, desc in patterns:
matches = re.findall(pattern, text)
for match in matches:
violations.append(PolicyViolation(
rule="no_pii_leakage",
description=desc,
matched_text=match,
))
return violations
# All filters in a chain
POLICY_FILTERS = [
check_medical_advice,
check_financial_advice,
check_pii_leakage,
]
def run_policy_filters(text: str) -> list[PolicyViolation]:
"""Run all policy filters and collect violations."""
all_violations = []
for filter_fn in POLICY_FILTERS:
all_violations.extend(filter_fn(text))
return all_violations
# Test it
test_output = "You probably have diabetes. Take 500mg metformin twice daily. Contact me at [email protected]"
violations = run_policy_filters(test_output)
for v in violations:
print(f"[{v.rule}] {v.description}: '{v.matched_text}'")
|
Adding a new policy is just writing another function that returns PolicyViolation objects and appending it to POLICY_FILTERS. No config files, no YAML schemas – just Python.
Combined Safety Pipeline#
Now wire the ML classifier and rule-based filters into a single pipeline that makes a block/allow/flag decision for every LLM output. The pipeline logs every check for audit trails – you will want this when debugging false positives or proving compliance.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
| import logging
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timezone
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("safety_pipeline")
class SafetyDecision(Enum):
ALLOW = "allow"
FLAG = "flag"
BLOCK = "block"
@dataclass
class SafetyCheckResult:
decision: SafetyDecision
toxicity: ToxicityResult | None = None
policy_violations: list = field(default_factory=list)
timestamp: str = ""
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.now(timezone.utc).isoformat()
# Categories that trigger an immediate block
BLOCK_CATEGORIES = {"severe_toxic", "threat", "identity_hate"}
class OutputSafetyPipeline:
def __init__(self, enable_toxicity: bool = True, enable_policy: bool = True):
self.enable_toxicity = enable_toxicity
self.enable_policy = enable_policy
def check(self, text: str) -> SafetyCheckResult:
"""Run all safety checks and return a decision."""
toxicity_result = None
policy_violations = []
# Step 1: ML toxicity classification
if self.enable_toxicity:
toxicity_result = classify_toxicity(text)
if toxicity_result.is_toxic:
logger.warning(
"Toxicity triggered: categories=%s",
toxicity_result.triggered_categories,
)
# Step 2: Rule-based policy filters
if self.enable_policy:
policy_violations = run_policy_filters(text)
if policy_violations:
logger.warning(
"Policy violations: %s",
[v.rule for v in policy_violations],
)
# Step 3: Decision logic
decision = SafetyDecision.ALLOW
# Any policy violation -> at least flag
if policy_violations:
decision = SafetyDecision.FLAG
# Toxicity in severe categories -> block
if toxicity_result and toxicity_result.is_toxic:
blocked_cats = set(toxicity_result.triggered_categories) & BLOCK_CATEGORIES
if blocked_cats:
decision = SafetyDecision.BLOCK
elif decision != SafetyDecision.BLOCK:
decision = SafetyDecision.FLAG
# Multiple policy violations -> escalate to block
if len(policy_violations) >= 3:
decision = SafetyDecision.BLOCK
logger.info("Safety decision: %s", decision.value)
return SafetyCheckResult(
decision=decision,
toxicity=toxicity_result,
policy_violations=policy_violations,
)
# Usage
pipeline_instance = OutputSafetyPipeline()
safe_text = "Python lists are ordered collections that support indexing and slicing."
result = pipeline_instance.check(safe_text)
print(f"Decision: {result.decision.value}") # allow
risky_text = "You likely have a vitamin D deficiency. Take 5000mg daily. Email me at [email protected] for follow-up."
result = pipeline_instance.check(risky_text)
print(f"Decision: {result.decision.value}") # flag or block
print(f"Policy violations: {len(result.policy_violations)}")
|
The decision hierarchy is simple: policy violations flag, severe toxicity blocks, and three or more policy violations escalate to a block. Adjust these thresholds based on your risk tolerance – a children’s app should block on any toxicity flag, while a research tool might only block severe categories.
Integrating with Your API#
Drop the pipeline into your request handler so every response gets checked before it’s returned:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| from openai import OpenAI
api_client = OpenAI()
def generate_response(user_message: str) -> dict:
"""Generate LLM response with safety checking."""
completion = api_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": user_message}],
)
llm_output = completion.choices[0].message.content
# Safety check
safety = OutputSafetyPipeline()
result = safety.check(llm_output)
if result.decision == SafetyDecision.BLOCK:
return {
"response": "I can't provide that information. Please rephrase your question.",
"safety_blocked": True,
}
elif result.decision == SafetyDecision.FLAG:
# Return response but log for human review
logger.warning("Flagged response for review: %s", llm_output[:200])
return {"response": llm_output, "safety_flagged": True}
else:
return {"response": llm_output}
|
Common Errors and Fixes#
Classifier false positives on legitimate content. A toxicity model trained on social media comments will sometimes flag clinical or educational text. The fix is threshold tuning – bump the toxic and insult thresholds up to 0.8 or 0.85 if you’re seeing too many false positives. Track your false positive rate over a sample of 500+ real outputs before adjusting.
Model loading OOM on small instances. unitary/toxic-bert needs around 400MB of RAM. If you’re on a 512MB container, switch to martin-ha/toxic-comment-model (DistilBERT, ~260MB) or export to ONNX for a smaller memory footprint. You can also load the model once at startup and share it across requests instead of loading per-request.
Regex backtracking on long texts. Complex regex patterns with nested quantifiers can hang on long LLM outputs. If you’re processing outputs over 10K characters, either truncate before checking, use Google’s re2 library (pip install google-re2) which guarantees linear-time matching, or set a timeout with signal.alarm on Unix systems.
Handling multilingual content. The English-only toxicity models miss harmful content in other languages entirely. For multilingual support, use unitary/multilingual-toxic-xlm-roberta which covers dozens of languages. The tradeoff is slower inference – roughly 2x compared to the English-only DistilBERT model. For the rule-based filters, your regex patterns probably only cover English, so build separate pattern sets for each supported language.
Pipeline adds latency to every request. The ML classifier adds 30-80ms on CPU. If that’s too much, run the rule-based filters first (they take under 1ms) and only invoke the ML classifier when the output looks borderline – for instance, when it mentions sensitive topics. You can also run the classifier asynchronously and serve the response immediately while flagging it retroactively.