Your system prompt says “never give medical advice.” Your LLM gives medical advice anyway. System prompts are suggestions, not guarantees – the model will violate them under the right combination of user input and token probabilities. You need a post-generation safety layer that classifies every output before it leaves your API.

The approach is straightforward: run every LLM response through a toxicity classifier and a set of rule-based policy filters, then block or flag anything that trips a threshold. Here’s what the ML piece looks like in five lines:

1
2
3
4
5
6
from transformers import pipeline

safety_clf = pipeline("text-classification", model="martin-ha/toxic-comment-model")
result = safety_clf("You're worthless and everyone hates you")
print(result)
# [{'label': 'toxic', 'score': 0.9991}]

That single classifier already catches overtly harmful outputs. But real-world safety needs go beyond toxicity – you also need policy-specific rules that no off-the-shelf model will cover. This guide builds both layers and combines them into a production pipeline.

Toxicity Classification with Transformers

The martin-ha/toxic-comment-model is a fine-tuned DistilBERT that gives you a binary toxic/non-toxic score. It runs locally, needs no API key, and handles inference in under 50ms on a CPU for typical LLM outputs.

For more granular results, unitary/toxic-bert breaks toxicity into six categories: toxic, severe toxic, obscene, threat, insult, and identity hate. That granularity matters when you want to block threats immediately but only flag mild insults for human review.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from transformers import pipeline
from dataclasses import dataclass

# Load multi-label toxicity classifier
toxicity_clf = pipeline(
    "text-classification",
    model="unitary/toxic-bert",
    top_k=None,
)

# Define per-category thresholds -- lower means stricter
TOXICITY_THRESHOLDS = {
    "toxic": 0.7,
    "severe_toxic": 0.4,
    "threat": 0.4,
    "insult": 0.7,
    "obscene": 0.7,
    "identity_hate": 0.4,
}


@dataclass
class ToxicityResult:
    is_toxic: bool
    triggered_categories: list
    scores: dict


def classify_toxicity(text: str) -> ToxicityResult:
    """Run toxicity classification and check against thresholds."""
    raw_scores = toxicity_clf(text)[0]
    scores = {item["label"]: item["score"] for item in raw_scores}

    triggered = []
    for category, threshold in TOXICITY_THRESHOLDS.items():
        if scores.get(category, 0.0) >= threshold:
            triggered.append(category)

    return ToxicityResult(
        is_toxic=len(triggered) > 0,
        triggered_categories=triggered,
        scores=scores,
    )


# Test it
result = classify_toxicity("I will find you and destroy everything you care about")
print(f"Toxic: {result.is_toxic}")
print(f"Triggered: {result.triggered_categories}")
print(f"Scores: {result.scores}")

Set severe_toxic, threat, and identity_hate thresholds lower than the others. These categories represent serious harm and you want to catch borderline cases. For toxic and insult, a higher threshold avoids flagging blunt-but-harmless language.

Rule-Based Policy Filters

ML classifiers handle general toxicity, but they know nothing about your specific business rules. If your app shouldn’t give medical diagnoses, recommend specific stocks, or output email addresses, you need explicit policy filters.

Build each filter as a function that returns a list of violations. Chain them together so every output runs through all of them.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import re
from dataclasses import dataclass, field


@dataclass
class PolicyViolation:
    rule: str
    description: str
    matched_text: str


def check_medical_advice(text: str) -> list[PolicyViolation]:
    """Flag text that looks like medical diagnosis or prescription advice."""
    patterns = [
        (r"\byou (?:have|likely have|probably have|might have)\b\s+\w+", "Appears to diagnose a condition"),
        (r"\b(?:take|prescribe|recommend)\b\s+\d+\s*(?:mg|ml|mcg)\b", "Suggests specific medication dosage"),
        (r"\byou should (?:stop|start) taking\b", "Advises on medication changes"),
    ]
    violations = []
    for pattern, desc in patterns:
        matches = re.findall(pattern, text, re.IGNORECASE)
        for match in matches:
            violations.append(PolicyViolation(
                rule="no_medical_advice",
                description=desc,
                matched_text=match,
            ))
    return violations


def check_financial_advice(text: str) -> list[PolicyViolation]:
    """Flag text that recommends specific financial actions."""
    patterns = [
        (r"\b(?:buy|sell|invest in)\b\s+(?:\$?\w+\s+)?(?:stock|shares|crypto|bitcoin|ETF)\b", "Recommends specific investment"),
        (r"\byou should (?:buy|sell|invest)\b", "Gives direct financial advice"),
        (r"\bguaranteed (?:return|profit|gain)\b", "Claims guaranteed financial returns"),
    ]
    violations = []
    for pattern, desc in patterns:
        matches = re.findall(pattern, text, re.IGNORECASE)
        for match in matches:
            violations.append(PolicyViolation(
                rule="no_financial_advice",
                description=desc,
                matched_text=match,
            ))
    return violations


def check_pii_leakage(text: str) -> list[PolicyViolation]:
    """Flag outputs that contain PII patterns."""
    patterns = [
        (r"\b\d{3}-\d{2}-\d{4}\b", "Contains SSN pattern"),
        (r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", "Contains email address"),
        (r"\b(?:\+?1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b", "Contains phone number"),
    ]
    violations = []
    for pattern, desc in patterns:
        matches = re.findall(pattern, text)
        for match in matches:
            violations.append(PolicyViolation(
                rule="no_pii_leakage",
                description=desc,
                matched_text=match,
            ))
    return violations


# All filters in a chain
POLICY_FILTERS = [
    check_medical_advice,
    check_financial_advice,
    check_pii_leakage,
]


def run_policy_filters(text: str) -> list[PolicyViolation]:
    """Run all policy filters and collect violations."""
    all_violations = []
    for filter_fn in POLICY_FILTERS:
        all_violations.extend(filter_fn(text))
    return all_violations


# Test it
test_output = "You probably have diabetes. Take 500mg metformin twice daily. Contact me at [email protected]"
violations = run_policy_filters(test_output)
for v in violations:
    print(f"[{v.rule}] {v.description}: '{v.matched_text}'")

Adding a new policy is just writing another function that returns PolicyViolation objects and appending it to POLICY_FILTERS. No config files, no YAML schemas – just Python.

Combined Safety Pipeline

Now wire the ML classifier and rule-based filters into a single pipeline that makes a block/allow/flag decision for every LLM output. The pipeline logs every check for audit trails – you will want this when debugging false positives or proving compliance.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import logging
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timezone

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("safety_pipeline")


class SafetyDecision(Enum):
    ALLOW = "allow"
    FLAG = "flag"
    BLOCK = "block"


@dataclass
class SafetyCheckResult:
    decision: SafetyDecision
    toxicity: ToxicityResult | None = None
    policy_violations: list = field(default_factory=list)
    timestamp: str = ""

    def __post_init__(self):
        if not self.timestamp:
            self.timestamp = datetime.now(timezone.utc).isoformat()


# Categories that trigger an immediate block
BLOCK_CATEGORIES = {"severe_toxic", "threat", "identity_hate"}


class OutputSafetyPipeline:
    def __init__(self, enable_toxicity: bool = True, enable_policy: bool = True):
        self.enable_toxicity = enable_toxicity
        self.enable_policy = enable_policy

    def check(self, text: str) -> SafetyCheckResult:
        """Run all safety checks and return a decision."""
        toxicity_result = None
        policy_violations = []

        # Step 1: ML toxicity classification
        if self.enable_toxicity:
            toxicity_result = classify_toxicity(text)
            if toxicity_result.is_toxic:
                logger.warning(
                    "Toxicity triggered: categories=%s",
                    toxicity_result.triggered_categories,
                )

        # Step 2: Rule-based policy filters
        if self.enable_policy:
            policy_violations = run_policy_filters(text)
            if policy_violations:
                logger.warning(
                    "Policy violations: %s",
                    [v.rule for v in policy_violations],
                )

        # Step 3: Decision logic
        decision = SafetyDecision.ALLOW

        # Any policy violation -> at least flag
        if policy_violations:
            decision = SafetyDecision.FLAG

        # Toxicity in severe categories -> block
        if toxicity_result and toxicity_result.is_toxic:
            blocked_cats = set(toxicity_result.triggered_categories) & BLOCK_CATEGORIES
            if blocked_cats:
                decision = SafetyDecision.BLOCK
            elif decision != SafetyDecision.BLOCK:
                decision = SafetyDecision.FLAG

        # Multiple policy violations -> escalate to block
        if len(policy_violations) >= 3:
            decision = SafetyDecision.BLOCK

        logger.info("Safety decision: %s", decision.value)
        return SafetyCheckResult(
            decision=decision,
            toxicity=toxicity_result,
            policy_violations=policy_violations,
        )


# Usage
pipeline_instance = OutputSafetyPipeline()

safe_text = "Python lists are ordered collections that support indexing and slicing."
result = pipeline_instance.check(safe_text)
print(f"Decision: {result.decision.value}")  # allow

risky_text = "You likely have a vitamin D deficiency. Take 5000mg daily. Email me at [email protected] for follow-up."
result = pipeline_instance.check(risky_text)
print(f"Decision: {result.decision.value}")  # flag or block
print(f"Policy violations: {len(result.policy_violations)}")

The decision hierarchy is simple: policy violations flag, severe toxicity blocks, and three or more policy violations escalate to a block. Adjust these thresholds based on your risk tolerance – a children’s app should block on any toxicity flag, while a research tool might only block severe categories.

Integrating with Your API

Drop the pipeline into your request handler so every response gets checked before it’s returned:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from openai import OpenAI

api_client = OpenAI()


def generate_response(user_message: str) -> dict:
    """Generate LLM response with safety checking."""
    completion = api_client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": user_message}],
    )
    llm_output = completion.choices[0].message.content

    # Safety check
    safety = OutputSafetyPipeline()
    result = safety.check(llm_output)

    if result.decision == SafetyDecision.BLOCK:
        return {
            "response": "I can't provide that information. Please rephrase your question.",
            "safety_blocked": True,
        }
    elif result.decision == SafetyDecision.FLAG:
        # Return response but log for human review
        logger.warning("Flagged response for review: %s", llm_output[:200])
        return {"response": llm_output, "safety_flagged": True}
    else:
        return {"response": llm_output}

Common Errors and Fixes

Classifier false positives on legitimate content. A toxicity model trained on social media comments will sometimes flag clinical or educational text. The fix is threshold tuning – bump the toxic and insult thresholds up to 0.8 or 0.85 if you’re seeing too many false positives. Track your false positive rate over a sample of 500+ real outputs before adjusting.

Model loading OOM on small instances. unitary/toxic-bert needs around 400MB of RAM. If you’re on a 512MB container, switch to martin-ha/toxic-comment-model (DistilBERT, ~260MB) or export to ONNX for a smaller memory footprint. You can also load the model once at startup and share it across requests instead of loading per-request.

Regex backtracking on long texts. Complex regex patterns with nested quantifiers can hang on long LLM outputs. If you’re processing outputs over 10K characters, either truncate before checking, use Google’s re2 library (pip install google-re2) which guarantees linear-time matching, or set a timeout with signal.alarm on Unix systems.

Handling multilingual content. The English-only toxicity models miss harmful content in other languages entirely. For multilingual support, use unitary/multilingual-toxic-xlm-roberta which covers dozens of languages. The tradeoff is slower inference – roughly 2x compared to the English-only DistilBERT model. For the rule-based filters, your regex patterns probably only cover English, so build separate pattern sets for each supported language.

Pipeline adds latency to every request. The ML classifier adds 30-80ms on CPU. If that’s too much, run the rule-based filters first (they take under 1ms) and only invoke the ML classifier when the output looks borderline – for instance, when it mentions sensitive topics. You can also run the classifier asynchronously and serve the response immediately while flagging it retroactively.