Models degrade silently in production. Data distributions shift, upstream pipelines break, and suddenly your model is serving garbage predictions while every dashboard stays green. By the time someone notices, you’ve already lost revenue or trust. You need automated alerts that fire the moment accuracy drops or latency spikes — not a human checking charts once a week.

Here’s how to build a lightweight monitoring service with FastAPI that tracks model metrics over sliding windows and sends Slack alerts when thresholds are breached.

Track Model Metrics

Start with a simple in-memory metrics store. Every time your model serves a prediction, log the result with its latency. When ground truth arrives later, record that too.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import time
from collections import deque
from dataclasses import dataclass, field
from threading import Lock


@dataclass
class PredictionRecord:
    prediction: float
    ground_truth: float | None
    latency_ms: float
    timestamp: float


class MetricsStore:
    def __init__(self, window_seconds: int = 3600):
        self.window_seconds = window_seconds
        self.records: deque[PredictionRecord] = deque()
        self._lock = Lock()

    def log_prediction(self, prediction: float, latency_ms: float) -> None:
        with self._lock:
            self.records.append(
                PredictionRecord(
                    prediction=prediction,
                    ground_truth=None,
                    latency_ms=latency_ms,
                    timestamp=time.time(),
                )
            )

    def log_ground_truth(self, index: int, ground_truth: float) -> None:
        with self._lock:
            if 0 <= index < len(self.records):
                self.records[index].ground_truth = ground_truth

    def _evict_old(self) -> None:
        cutoff = time.time() - self.window_seconds
        while self.records and self.records[0].timestamp < cutoff:
            self.records.popleft()

    def compute_accuracy(self) -> float | None:
        with self._lock:
            self._evict_old()
            labeled = [
                r for r in self.records if r.ground_truth is not None
            ]
            if len(labeled) < 10:
                return None
            correct = sum(
                1 for r in labeled if round(r.prediction) == round(r.ground_truth)
            )
            return correct / len(labeled)

    def compute_p95_latency(self) -> float | None:
        with self._lock:
            self._evict_old()
            if len(self.records) < 10:
                return None
            latencies = sorted(r.latency_ms for r in self.records)
            idx = int(len(latencies) * 0.95)
            return latencies[idx]

    def compute_error_rate(self) -> float | None:
        with self._lock:
            self._evict_old()
            labeled = [
                r for r in self.records if r.ground_truth is not None
            ]
            if len(labeled) < 10:
                return None
            errors = sum(
                1 for r in labeled if round(r.prediction) != round(r.ground_truth)
            )
            return errors / len(labeled)

The deque with time-based eviction gives you a sliding window. The _lock keeps things safe when your FastAPI app handles concurrent requests. You need at least 10 samples before computing anything — alerting on noise is worse than not alerting at all.

Define Alert Rules

Alert rules map metric names to thresholds and severity levels. Keep them as plain data so you can load them from a config file later.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from dataclasses import dataclass
from enum import Enum


class Severity(str, Enum):
    WARNING = "warning"
    CRITICAL = "critical"


@dataclass
class AlertRule:
    metric: str
    threshold: float
    direction: str  # "above" or "below"
    severity: Severity
    message: str


DEFAULT_RULES = [
    AlertRule(
        metric="accuracy",
        threshold=0.85,
        direction="below",
        severity=Severity.CRITICAL,
        message="Model accuracy dropped below 85%",
    ),
    AlertRule(
        metric="p95_latency",
        threshold=500.0,
        direction="above",
        severity=Severity.WARNING,
        message="P95 latency exceeded 500ms",
    ),
    AlertRule(
        metric="error_rate",
        threshold=0.20,
        direction="above",
        severity=Severity.CRITICAL,
        message="Error rate exceeded 20%",
    ),
]


def evaluate_rules(
    metrics: dict[str, float | None], rules: list[AlertRule]
) -> list[AlertRule]:
    triggered = []
    for rule in rules:
        value = metrics.get(rule.metric)
        if value is None:
            continue
        if rule.direction == "below" and value < rule.threshold:
            triggered.append(rule)
        elif rule.direction == "above" and value > rule.threshold:
            triggered.append(rule)
    return triggered

The direction field matters. Accuracy fires when it drops below a threshold. Latency fires when it goes above. This lets you express both kinds of degradation with the same data structure.

You can add cooldown logic too — you don’t want 500 Slack messages in an hour. A simple approach is tracking the last alert time per rule and skipping if it fired within the last 15 minutes.

Send Alerts to Slack

Slack incoming webhooks accept a JSON payload with a text field. Use httpx for async HTTP calls since you’re already in an async FastAPI world.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import httpx


SEVERITY_COLORS = {
    Severity.WARNING: "#f59e0b",
    Severity.CRITICAL: "#ef4444",
}


async def send_slack_alert(
    webhook_url: str,
    rule: AlertRule,
    current_value: float,
) -> bool:
    color = SEVERITY_COLORS[rule.severity]
    payload = {
        "attachments": [
            {
                "color": color,
                "blocks": [
                    {
                        "type": "section",
                        "text": {
                            "type": "mrkdwn",
                            "text": (
                                f"*[{rule.severity.value.upper()}] {rule.message}*\n"
                                f"Metric: `{rule.metric}` | "
                                f"Current: `{current_value:.4f}` | "
                                f"Threshold: `{rule.threshold}`"
                            ),
                        },
                    }
                ],
            }
        ]
    }
    async with httpx.AsyncClient() as client:
        resp = await client.post(webhook_url, json=payload, timeout=10.0)
        return resp.status_code == 200

The attachments field with a color gives you a colored sidebar in Slack — amber for warnings, red for critical. Block Kit formatting with mrkdwn lets you bold the alert message and use inline code for metric values.

Build the Monitoring Service

Wire everything together with FastAPI. Use the lifespan context manager to start and stop a background task that runs metric checks on a schedule.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import asyncio
import os
from contextlib import asynccontextmanager

from fastapi import FastAPI
from pydantic import BaseModel

SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL", "")
CHECK_INTERVAL_SECONDS = 60

store = MetricsStore(window_seconds=3600)


async def monitoring_loop():
    while True:
        metrics = {
            "accuracy": store.compute_accuracy(),
            "p95_latency": store.compute_p95_latency(),
            "error_rate": store.compute_error_rate(),
        }
        triggered = evaluate_rules(metrics, DEFAULT_RULES)
        for rule in triggered:
            value = metrics[rule.metric]
            if value is not None and SLACK_WEBHOOK_URL:
                await send_slack_alert(SLACK_WEBHOOK_URL, rule, value)
        await asyncio.sleep(CHECK_INTERVAL_SECONDS)


@asynccontextmanager
async def lifespan(app: FastAPI):
    task = asyncio.create_task(monitoring_loop())
    yield
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass


app = FastAPI(lifespan=lifespan)


class PredictionRequest(BaseModel):
    prediction: float
    latency_ms: float


class GroundTruthRequest(BaseModel):
    index: int
    ground_truth: float


@app.post("/log-prediction")
async def log_prediction(req: PredictionRequest):
    store.log_prediction(req.prediction, req.latency_ms)
    return {"status": "ok"}


@app.post("/log-ground-truth")
async def log_ground_truth(req: GroundTruthRequest):
    store.log_ground_truth(req.index, req.ground_truth)
    return {"status": "ok"}


@app.get("/metrics")
async def get_metrics():
    return {
        "accuracy": store.compute_accuracy(),
        "p95_latency": store.compute_p95_latency(),
        "error_rate": store.compute_error_rate(),
    }

Run it with:

1
2
export SLACK_WEBHOOK_URL="https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX"
uvicorn main:app --host 0.0.0.0 --port 8000

The lifespan context manager starts the monitoring loop when the app boots and cancels it cleanly on shutdown. The loop checks metrics every 60 seconds. Your model serving code calls /log-prediction after every inference, and your labeling pipeline calls /log-ground-truth when annotations come in.

Hit /metrics any time to see current values without waiting for an alert cycle.

Common Errors and Fixes

httpx.ConnectError: All connection attempts failed when sending to Slack

Your Slack webhook URL is wrong or the network blocks outbound HTTPS. Verify the URL by sending a test curl:

1
2
3
curl -X POST -H 'Content-Type: application/json' \
  -d '{"text":"test alert"}' \
  "https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX"

If that works but httpx fails, check whether your container has DNS resolution or a proxy configured. Set httpx.AsyncClient(proxy=os.environ.get("HTTPS_PROXY")) if you need proxy support.

RuntimeError: no running event loop in the background task

This happens if you try to use asyncio.run() inside a task that’s already running in an event loop. The lifespan pattern handles this correctly — the asyncio.create_task() call runs within FastAPI’s existing loop. Don’t wrap the monitoring loop in asyncio.run(). Use create_task() directly.

Metrics return null even though you’re logging predictions

The compute_accuracy() method requires at least 10 records with ground truth labels. If you’re only logging predictions without calling /log-ground-truth, accuracy and error rate will stay None. P95 latency only needs prediction records, so check that one first to verify logging works. Also confirm your sliding window hasn’t evicted everything — if window_seconds is too short and you’re logging slowly, records expire before you check.

422 Unprocessable Entity on prediction logging

FastAPI’s Pydantic validation rejected your request body. Make sure you’re sending JSON with the exact field names: prediction and latency_ms, both as numbers. A common mistake is sending latency instead of latency_ms.