A model that silently degrades is worse than a model that crashes loudly. Crashes get fixed in hours. Drift goes unnoticed for weeks while your predictions quietly rot. The fix is a pipeline that runs drift checks on a schedule, exposes results through an API, and sends alerts the moment distributions shift beyond your thresholds.

This guide wires together Evidently AI for statistical drift detection and FastAPI for serving results and triggering alerts. You’ll get a self-contained service that compares incoming production data against a reference dataset, stores drift history, and fires Slack webhooks when things go sideways.

Generating Drift Reports with Evidently

Install the dependencies first:

1
pip install evidently fastapi uvicorn apscheduler httpx pandas numpy

Start by creating realistic reference and production datasets, then running Evidently’s drift presets against them. This is the core detection logic that everything else builds on.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# drift_checker.py
import pandas as pd
import numpy as np
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset

def generate_reference_data(n: int = 2000) -> pd.DataFrame:
    """Simulate training data for a loan approval model."""
    np.random.seed(42)
    return pd.DataFrame({
        "age": np.random.normal(35, 8, n).astype(int),
        "income": np.random.normal(60000, 15000, n),
        "debt_ratio": np.random.uniform(0.05, 0.6, n),
        "credit_score": np.random.normal(710, 40, n).astype(int),
        "prediction": np.random.choice([0, 1], n, p=[0.35, 0.65]),
    })

def generate_current_data(n: int = 500, drifted: bool = False) -> pd.DataFrame:
    """Simulate production data, optionally with drift injected."""
    np.random.seed(99)
    if drifted:
        # Shift income down and age up to simulate demographic change
        return pd.DataFrame({
            "age": np.random.normal(42, 10, n).astype(int),
            "income": np.random.normal(45000, 20000, n),
            "debt_ratio": np.random.uniform(0.15, 0.8, n),
            "credit_score": np.random.normal(670, 55, n).astype(int),
            "prediction": np.random.choice([0, 1], n, p=[0.55, 0.45]),
        })
    return pd.DataFrame({
        "age": np.random.normal(36, 9, n).astype(int),
        "income": np.random.normal(59000, 16000, n),
        "debt_ratio": np.random.uniform(0.06, 0.62, n),
        "credit_score": np.random.normal(705, 42, n).astype(int),
        "prediction": np.random.choice([0, 1], n, p=[0.37, 0.63]),
    })

def run_drift_check(
    reference: pd.DataFrame,
    current: pd.DataFrame,
) -> dict:
    """Run data drift and target drift presets, return structured results."""
    report = Report(metrics=[
        DataDriftPreset(),
        TargetDriftPreset(),
    ])
    report.run(reference_data=reference, current_data=current)
    raw = report.as_dict()

    data_drift_result = raw["metrics"][0]["result"]
    target_drift_result = raw["metrics"][1]["result"]

    # Extract per-column drift details
    column_drifts = {}
    for col_name, col_data in data_drift_result["drift_by_columns"].items():
        column_drifts[col_name] = {
            "drifted": col_data["drift_detected"],
            "statistic": round(col_data["stattest_value"], 4),
            "threshold": col_data["stattest_threshold"],
            "test_name": col_data["stattest_name"],
        }

    return {
        "dataset_drift": data_drift_result["dataset_drift"],
        "share_of_drifted_columns": round(
            data_drift_result["share_of_drifted_columns"], 3
        ),
        "number_of_drifted_columns": data_drift_result["number_of_drifted_columns"],
        "total_columns": data_drift_result["number_of_columns"],
        "column_drifts": column_drifts,
        "target_drift_detected": target_drift_result.get("drift_detected", False),
    }

if __name__ == "__main__":
    ref = generate_reference_data()
    cur = generate_current_data(drifted=True)
    result = run_drift_check(ref, cur)

    print(f"Dataset drift: {result['dataset_drift']}")
    print(f"Drifted columns: {result['number_of_drifted_columns']}/{result['total_columns']}")
    for col, info in result["column_drifts"].items():
        status = "DRIFT" if info["drifted"] else "ok"
        print(f"  {col}: {status} (p={info['statistic']}, test={info['test_name']})")

Run this standalone to verify detection works before wiring it into an API:

1
python drift_checker.py

You should see drift detected on most columns when drifted=True – income, debt_ratio, and credit_score shift significantly.

Serving Drift Results via FastAPI

Wrap the drift checker in a FastAPI app that loads reference data at startup and runs checks on demand. This uses the lifespan context manager pattern – the current recommended approach that replaced the deprecated @app.on_event decorators.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# app.py
import json
from contextlib import asynccontextmanager
from datetime import datetime, timezone

import pandas as pd
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

from drift_checker import generate_reference_data, run_drift_check

# In-memory store for drift history
drift_history: list[dict] = []
reference_data: pd.DataFrame | None = None


@asynccontextmanager
async def lifespan(app: FastAPI):
    global reference_data
    # Load reference data once at startup
    # In production, load from a parquet file or feature store:
    #   reference_data = pd.read_parquet("s3://bucket/reference_data.parquet")
    reference_data = generate_reference_data(n=2000)
    print(f"Loaded reference data: {reference_data.shape}")
    yield
    print("Shutting down drift monitor")


app = FastAPI(title="Drift Alert Pipeline", lifespan=lifespan)


class DriftCheckRequest(BaseModel):
    data: list[dict]


class DriftSummary(BaseModel):
    timestamp: str
    dataset_drift: bool
    share_of_drifted_columns: float
    number_of_drifted_columns: int
    total_columns: int
    target_drift_detected: bool
    column_drifts: dict


@app.post("/check-drift", response_model=DriftSummary)
def check_drift(request: DriftCheckRequest):
    if reference_data is None:
        raise HTTPException(status_code=503, detail="Reference data not loaded")

    current = pd.DataFrame(request.data)

    if len(current) < 30:
        raise HTTPException(
            status_code=400,
            detail="Need at least 30 samples for reliable drift detection",
        )

    # Validate columns match reference
    missing = set(reference_data.columns) - set(current.columns)
    if missing:
        raise HTTPException(
            status_code=400,
            detail=f"Missing columns in request: {missing}",
        )

    result = run_drift_check(reference_data, current[reference_data.columns])
    timestamp = datetime.now(timezone.utc).isoformat()
    result["timestamp"] = timestamp

    drift_history.append(result)

    return DriftSummary(**result)


@app.get("/drift-history")
def get_drift_history(limit: int = 50):
    return drift_history[-limit:]


@app.get("/health")
def health():
    return {
        "status": "ok",
        "reference_shape": list(reference_data.shape) if reference_data is not None else None,
        "checks_performed": len(drift_history),
    }

Start the server and test it:

1
uvicorn app:app --host 0.0.0.0 --port 8000

Hit the endpoint with production data using curl or any HTTP client:

1
2
3
curl -X POST http://localhost:8000/check-drift \
  -H "Content-Type: application/json" \
  -d '{"data": [{"age": 45, "income": 40000, "debt_ratio": 0.5, "credit_score": 650, "prediction": 0}]}'

That will return a 400 because you need at least 30 samples. In practice, you’d batch your production data and send windows of 100+ rows at a time.

Automating Drift Checks with a Scheduler

Running drift checks manually defeats the purpose. Use APScheduler to trigger checks on a fixed interval. This version loads data from a CSV that your prediction service appends to, but you could swap in a database query or message queue consumer.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# scheduled_app.py
import json
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from pathlib import Path

import pandas as pd
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import FastAPI

from drift_checker import generate_reference_data, run_drift_check

drift_history: list[dict] = []
reference_data: pd.DataFrame | None = None
scheduler = AsyncIOScheduler()

PRODUCTION_DATA_PATH = Path("production_logs.csv")
ALERT_WEBHOOK_URL = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"


async def scheduled_drift_check():
    """Run drift detection against the latest production data."""
    if reference_data is None:
        print("Reference data not loaded, skipping check")
        return

    if not PRODUCTION_DATA_PATH.exists():
        print(f"No production data at {PRODUCTION_DATA_PATH}, skipping")
        return

    current = pd.read_csv(PRODUCTION_DATA_PATH)
    if len(current) < 30:
        print(f"Only {len(current)} rows, need 30+ for reliable detection")
        return

    result = run_drift_check(reference_data, current[reference_data.columns])
    result["timestamp"] = datetime.now(timezone.utc).isoformat()
    drift_history.append(result)

    print(
        f"[{result['timestamp']}] Drift check complete: "
        f"drift={result['dataset_drift']}, "
        f"drifted_cols={result['number_of_drifted_columns']}/{result['total_columns']}"
    )

    if result["dataset_drift"]:
        await send_drift_alert(result)


async def send_drift_alert(result: dict):
    """Send alert to Slack when drift is detected."""
    import httpx

    drifted_cols = [
        col for col, info in result["column_drifts"].items() if info["drifted"]
    ]

    message = {
        "text": (
            f":warning: *Model Drift Detected*\n"
            f"Time: {result['timestamp']}\n"
            f"Drifted columns: {', '.join(drifted_cols)} "
            f"({result['number_of_drifted_columns']}/{result['total_columns']})\n"
            f"Target drift: {result['target_drift_detected']}\n"
            f"Share drifted: {result['share_of_drifted_columns']:.1%}"
        )
    }

    try:
        async with httpx.AsyncClient() as client:
            resp = await client.post(ALERT_WEBHOOK_URL, json=message, timeout=10)
            resp.raise_for_status()
            print("Alert sent to Slack")
    except httpx.HTTPError as e:
        print(f"Failed to send Slack alert: {e}")


@asynccontextmanager
async def lifespan(app: FastAPI):
    global reference_data
    reference_data = generate_reference_data(n=2000)
    print(f"Reference data loaded: {reference_data.shape}")

    # Run drift check every 15 minutes
    scheduler.add_job(scheduled_drift_check, "interval", minutes=15)
    scheduler.start()
    print("Scheduler started: drift checks every 15 minutes")

    yield

    scheduler.shutdown()
    print("Scheduler stopped")


app = FastAPI(title="Scheduled Drift Monitor", lifespan=lifespan)


@app.get("/drift-history")
def get_drift_history(limit: int = 50):
    return drift_history[-limit:]


@app.get("/health")
def health():
    return {
        "status": "ok",
        "reference_loaded": reference_data is not None,
        "checks_performed": len(drift_history),
        "scheduler_running": scheduler.running,
    }


@app.post("/trigger-check")
async def trigger_check():
    """Manually trigger a drift check outside the schedule."""
    await scheduled_drift_check()
    if drift_history:
        return drift_history[-1]
    return {"status": "no data available for check"}

The scheduler fires scheduled_drift_check every 15 minutes. Adjust the interval based on your traffic volume – high-traffic models might check hourly with larger windows, while low-traffic models could check daily.

Sending Alerts on Drift Detection

The send_drift_alert function in the scheduler above handles Slack notifications. Here’s a more complete version that supports multiple notification channels and includes severity levels based on how many columns drifted:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# alerts.py
import httpx
from enum import Enum


class Severity(str, Enum):
    LOW = "low"         # < 25% of columns drifted
    MEDIUM = "medium"   # 25-50% of columns drifted
    HIGH = "high"       # > 50% of columns drifted
    CRITICAL = "critical"  # target drift detected


def classify_severity(result: dict) -> Severity:
    if result["target_drift_detected"]:
        return Severity.CRITICAL
    share = result["share_of_drifted_columns"]
    if share > 0.5:
        return Severity.HIGH
    if share > 0.25:
        return Severity.MEDIUM
    return Severity.LOW


async def send_slack_alert(webhook_url: str, result: dict) -> bool:
    severity = classify_severity(result)
    drifted_cols = [
        col for col, info in result["column_drifts"].items() if info["drifted"]
    ]

    color_map = {
        Severity.LOW: "#36a64f",
        Severity.MEDIUM: "#daa038",
        Severity.HIGH: "#e01e5a",
        Severity.CRITICAL: "#ff0000",
    }

    payload = {
        "attachments": [
            {
                "color": color_map[severity],
                "blocks": [
                    {
                        "type": "header",
                        "text": {
                            "type": "plain_text",
                            "text": f"Drift Alert [{severity.value.upper()}]",
                        },
                    },
                    {
                        "type": "section",
                        "fields": [
                            {
                                "type": "mrkdwn",
                                "text": f"*Drifted Columns:*\n{', '.join(drifted_cols)}",
                            },
                            {
                                "type": "mrkdwn",
                                "text": (
                                    f"*Share:* {result['share_of_drifted_columns']:.1%}\n"
                                    f"*Target Drift:* {result['target_drift_detected']}"
                                ),
                            },
                        ],
                    },
                ],
            }
        ]
    }

    async with httpx.AsyncClient() as client:
        resp = await client.post(webhook_url, json=payload, timeout=10)
        return resp.status_code == 200


async def send_generic_webhook(url: str, result: dict) -> bool:
    """Send drift results to any webhook endpoint (PagerDuty, custom, etc.)."""
    severity = classify_severity(result)
    payload = {
        "event_type": "model_drift_detected",
        "severity": severity.value,
        "timestamp": result["timestamp"],
        "dataset_drift": result["dataset_drift"],
        "drifted_columns": result["number_of_drifted_columns"],
        "total_columns": result["total_columns"],
        "target_drift": result["target_drift_detected"],
        "details": result["column_drifts"],
    }

    async with httpx.AsyncClient() as client:
        resp = await client.post(url, json=payload, timeout=10)
        return resp.status_code < 300

Wire this into the scheduled check by replacing the inline send_drift_alert call:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from alerts import send_slack_alert, classify_severity, Severity

async def scheduled_drift_check():
    # ... same data loading and drift check logic ...

    if result["dataset_drift"]:
        severity = classify_severity(result)
        await send_slack_alert(ALERT_WEBHOOK_URL, result)

        # Only page on-call for critical issues
        if severity == Severity.CRITICAL:
            await send_generic_webhook(PAGERDUTY_WEBHOOK_URL, result)

This two-tier approach avoids alert fatigue. Low-severity drift goes to a Slack channel for the ML team to review during business hours. Critical drift – especially target drift – pages the on-call engineer immediately.

Common Errors and Fixes

ValueError: reference and current data must have the same columns – Your production data has extra or missing columns compared to the reference. Filter the current DataFrame to match: current[reference_data.columns]. The code above already handles this.

StatisticsError or NaN in test results – This happens when a feature has zero variance (all identical values) in either the reference or current set. Filter out constant columns before running the report, or add a minimum variance check to your data loading.

APScheduler job not firing – Make sure you call scheduler.start() inside the lifespan and that you’re using AsyncIOScheduler, not BackgroundScheduler. The background variant creates its own event loop that conflicts with FastAPI’s async loop.

Drift detected on every check (false positives) – Your reference dataset might be too small or too homogeneous. Use at least 1000 reference samples. You can also adjust the per-column thresholds by passing stattest_threshold to the DataDriftPreset:

1
2
3
4
5
6
from evidently.metric_preset import DataDriftPreset

# Require stronger evidence before flagging drift
report = Report(metrics=[
    DataDriftPreset(stattest_threshold=0.01),  # default is 0.05
])

httpx.ConnectError on Slack webhook – Verify your webhook URL is correct and that outbound HTTPS is not blocked by your network. Test the webhook independently with curl before blaming the Python code.