Most model monitoring setups involve Prometheus, Grafana, and a bunch of infrastructure you don’t need for a single model serving predictions behind an API. If you just want to track accuracy, latency, and prediction volume over time, SQLite and FastAPI get the job done in under 200 lines of Python.
This guide walks through building a self-hosted model health dashboard. You’ll store metrics in SQLite, expose logging and query endpoints, render a basic HTML chart page, and run periodic health checks as background tasks.
Set Up the SQLite Schema#
Start with a clean schema that tracks every prediction along with its latency and whether it was correct. A separate table stores periodic health snapshots.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
| # schema.py
import sqlite3
from pathlib import Path
DB_PATH = Path("model_health.db")
SCHEMA = """
CREATE TABLE IF NOT EXISTS predictions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
model_name TEXT NOT NULL,
input_hash TEXT,
prediction TEXT NOT NULL,
ground_truth TEXT,
is_correct INTEGER,
latency_ms REAL NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS health_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
model_name TEXT NOT NULL,
total_predictions INTEGER NOT NULL,
accuracy REAL,
avg_latency_ms REAL NOT NULL,
p95_latency_ms REAL NOT NULL,
window_start TIMESTAMP NOT NULL,
window_end TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_predictions_model_time
ON predictions(model_name, created_at);
CREATE INDEX IF NOT EXISTS idx_snapshots_model_time
ON health_snapshots(model_name, created_at);
"""
def init_db() -> sqlite3.Connection:
conn = sqlite3.connect(str(DB_PATH), check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.executescript(SCHEMA)
conn.commit()
return conn
|
The predictions table captures raw data per request. The health_snapshots table stores rolled-up metrics computed by a background task. The check_same_thread=False flag is necessary because FastAPI handles requests across multiple threads.
Build the FastAPI Backend#
Wire up the app using the lifespan context manager pattern. This replaced the deprecated @app.on_event("startup") decorator in newer FastAPI versions.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
| # app.py
import sqlite3
import time
import hashlib
from contextlib import asynccontextmanager
from datetime import datetime, timedelta
from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from schema import init_db
db_conn: sqlite3.Connection | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global db_conn
db_conn = init_db()
print("Database initialized")
yield
if db_conn:
db_conn.close()
print("Database connection closed")
app = FastAPI(title="Model Health Dashboard", lifespan=lifespan)
class PredictionLog(BaseModel):
model_name: str
input_data: str
prediction: str
ground_truth: str | None = None
latency_ms: float
class MetricsQuery(BaseModel):
model_name: str
hours: int = 24
@app.post("/log")
def log_prediction(entry: PredictionLog):
input_hash = hashlib.sha256(entry.input_data.encode()).hexdigest()[:16]
is_correct = None
if entry.ground_truth is not None:
is_correct = 1 if entry.prediction == entry.ground_truth else 0
db_conn.execute(
"""INSERT INTO predictions
(model_name, input_hash, prediction, ground_truth, is_correct, latency_ms)
VALUES (?, ?, ?, ?, ?, ?)""",
(entry.model_name, input_hash, entry.prediction,
entry.ground_truth, is_correct, entry.latency_ms),
)
db_conn.commit()
return {"status": "logged"}
@app.get("/metrics/{model_name}")
def get_metrics(model_name: str, hours: int = 24):
cutoff = datetime.utcnow() - timedelta(hours=hours)
cutoff_str = cutoff.strftime("%Y-%m-%d %H:%M:%S")
row = db_conn.execute(
"""SELECT
COUNT(*) as total,
AVG(CASE WHEN is_correct IS NOT NULL THEN is_correct END) as accuracy,
AVG(latency_ms) as avg_latency,
MAX(latency_ms) as max_latency
FROM predictions
WHERE model_name = ? AND created_at >= ?""",
(model_name, cutoff_str),
).fetchone()
if row["total"] == 0:
raise HTTPException(status_code=404, detail="No predictions found for this model")
return {
"model_name": model_name,
"window_hours": hours,
"total_predictions": row["total"],
"accuracy": round(row["accuracy"], 4) if row["accuracy"] else None,
"avg_latency_ms": round(row["avg_latency"], 2),
"max_latency_ms": round(row["max_latency"], 2),
}
|
The /log endpoint accepts prediction results with timing data. The /metrics/{model_name} endpoint returns aggregated stats for a given time window. Both are straightforward SQL queries against the predictions table.
Add an HTML Dashboard Endpoint#
You don’t need a separate frontend framework. A single endpoint returning HTML with inline Chart.js gives you a visual dashboard that auto-refreshes.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
| # Add this to app.py
@app.get("/dashboard/{model_name}", response_class=HTMLResponse)
def dashboard(model_name: str, hours: int = 24):
cutoff = datetime.utcnow() - timedelta(hours=hours)
cutoff_str = cutoff.strftime("%Y-%m-%d %H:%M:%S")
rows = db_conn.execute(
"""SELECT
strftime('%Y-%m-%d %H:00', created_at) as hour_bucket,
COUNT(*) as count,
AVG(latency_ms) as avg_latency,
AVG(CASE WHEN is_correct IS NOT NULL THEN is_correct END) as accuracy
FROM predictions
WHERE model_name = ? AND created_at >= ?
GROUP BY hour_bucket
ORDER BY hour_bucket""",
(model_name, cutoff_str),
).fetchall()
labels = [r["hour_bucket"] for r in rows]
latencies = [round(r["avg_latency"], 2) for r in rows]
counts = [r["count"] for r in rows]
accuracies = [round(r["accuracy"] * 100, 1) if r["accuracy"] else 0 for r in rows]
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>{model_name} Health Dashboard</title>
<script src="https://cdn.jsdelivr.net/npm/[email protected]"></script>
<style>
body {{ font-family: sans-serif; background: #1a1a2e; color: #eee; padding: 20px; }}
.chart-container {{ max-width: 800px; margin: 20px auto; }}
h1 {{ text-align: center; color: #00d4aa; }}
canvas {{ background: #16213e; border-radius: 8px; padding: 10px; }}
</style>
<meta http-equiv="refresh" content="30">
</head>
<body>
<h1>{model_name} - Model Health</h1>
<div class="chart-container">
<canvas id="latencyChart"></canvas>
</div>
<div class="chart-container">
<canvas id="volumeChart"></canvas>
</div>
<script>
const labels = {labels};
new Chart(document.getElementById('latencyChart'), {{
type: 'line',
data: {{
labels: labels,
datasets: [{{
label: 'Avg Latency (ms)',
data: {latencies},
borderColor: '#00d4aa',
tension: 0.3
}}, {{
label: 'Accuracy (%)',
data: {accuracies},
borderColor: '#ff6b6b',
tension: 0.3,
yAxisID: 'y1'
}}]
}},
options: {{
scales: {{
y: {{ title: {{ display: true, text: 'Latency (ms)', color: '#aaa' }}, ticks: {{ color: '#aaa' }}, grid: {{ color: '#333' }} }},
y1: {{ position: 'right', min: 0, max: 100, title: {{ display: true, text: 'Accuracy %', color: '#aaa' }}, ticks: {{ color: '#aaa' }}, grid: {{ drawOnChartArea: false }} }},
x: {{ ticks: {{ color: '#aaa' }}, grid: {{ color: '#333' }} }}
}}
}}
}});
new Chart(document.getElementById('volumeChart'), {{
type: 'bar',
data: {{
labels: labels,
datasets: [{{ label: 'Predictions', data: {counts}, backgroundColor: '#00d4aa88' }}]
}},
options: {{
scales: {{
y: {{ title: {{ display: true, text: 'Count', color: '#aaa' }}, ticks: {{ color: '#aaa' }}, grid: {{ color: '#333' }} }},
x: {{ ticks: {{ color: '#aaa' }}, grid: {{ color: '#333' }} }}
}}
}}
}});
</script>
</body>
</html>
"""
return HTMLResponse(content=html)
|
The page auto-refreshes every 30 seconds via the <meta http-equiv="refresh"> tag. The latency chart overlays accuracy percentage on a secondary axis so you can spot correlation between slowdowns and accuracy drops.
Schedule Periodic Health Checks#
Use FastAPI’s BackgroundTasks to compute and store health snapshots. You can also wire this into an asyncio loop that runs on a schedule.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
| # Add this to app.py
import asyncio
from fastapi import BackgroundTasks
def compute_health_snapshot(model_name: str, window_hours: int = 1):
now = datetime.utcnow()
window_start = now - timedelta(hours=window_hours)
start_str = window_start.strftime("%Y-%m-%d %H:%M:%S")
end_str = now.strftime("%Y-%m-%d %H:%M:%S")
rows = db_conn.execute(
"""SELECT latency_ms, is_correct
FROM predictions
WHERE model_name = ? AND created_at >= ? AND created_at <= ?
ORDER BY latency_ms""",
(model_name, start_str, end_str),
).fetchall()
if not rows:
return None
latencies = [r["latency_ms"] for r in rows]
correct_vals = [r["is_correct"] for r in rows if r["is_correct"] is not None]
total = len(rows)
p95_index = int(total * 0.95)
p95_latency = latencies[min(p95_index, total - 1)]
avg_latency = sum(latencies) / total
accuracy = sum(correct_vals) / len(correct_vals) if correct_vals else None
db_conn.execute(
"""INSERT INTO health_snapshots
(model_name, total_predictions, accuracy, avg_latency_ms, p95_latency_ms,
window_start, window_end)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(model_name, total, accuracy, avg_latency, p95_latency, start_str, end_str),
)
db_conn.commit()
return {
"model_name": model_name,
"total": total,
"accuracy": accuracy,
"avg_latency_ms": avg_latency,
"p95_latency_ms": p95_latency,
}
@app.post("/snapshot/{model_name}")
def trigger_snapshot(model_name: str, background_tasks: BackgroundTasks):
background_tasks.add_task(compute_health_snapshot, model_name, 1)
return {"status": "snapshot scheduled"}
async def periodic_snapshots(model_names: list[str], interval_seconds: int = 3600):
while True:
for name in model_names:
compute_health_snapshot(name, window_hours=1)
await asyncio.sleep(interval_seconds)
|
To start the periodic loop alongside the app, update the lifespan function:
1
2
3
4
5
6
7
8
9
10
11
12
| @asynccontextmanager
async def lifespan(app: FastAPI):
global db_conn
db_conn = init_db()
# Start periodic snapshots for known models
task = asyncio.create_task(
periodic_snapshots(["sentiment-v1", "classifier-v2"], interval_seconds=3600)
)
yield
task.cancel()
if db_conn:
db_conn.close()
|
The periodic_snapshots coroutine runs every hour (or whatever interval you set), computing rolled-up stats and writing them to the health_snapshots table. You can query those snapshots separately to build long-term trend charts without scanning the full predictions table.
Run and Test It#
Save both files, then start the server:
1
2
| pip install fastapi uvicorn pydantic
uvicorn app:app --reload --port 8000
|
Log a few test predictions with curl:
1
2
3
4
5
6
7
8
9
10
11
| curl -X POST http://localhost:8000/log \
-H "Content-Type: application/json" \
-d '{"model_name": "sentiment-v1", "input_data": "great product", "prediction": "positive", "ground_truth": "positive", "latency_ms": 45.2}'
curl -X POST http://localhost:8000/log \
-H "Content-Type: application/json" \
-d '{"model_name": "sentiment-v1", "input_data": "terrible service", "prediction": "negative", "ground_truth": "negative", "latency_ms": 52.8}'
curl -X POST http://localhost:8000/log \
-H "Content-Type: application/json" \
-d '{"model_name": "sentiment-v1", "input_data": "okay experience", "prediction": "positive", "ground_truth": "negative", "latency_ms": 120.5}'
|
Check metrics and open the dashboard:
1
2
| curl http://localhost:8000/metrics/sentiment-v1?hours=24
# Open in browser: http://localhost:8000/dashboard/sentiment-v1
|
Common Errors and Fixes#
sqlite3.OperationalError: database is locked – This happens when multiple threads write simultaneously. SQLite handles concurrent reads fine but chokes on concurrent writes. Add a write lock or switch to WAL mode:
1
2
| conn = sqlite3.connect(str(DB_PATH), check_same_thread=False)
conn.execute("PRAGMA journal_mode=WAL")
|
WAL (Write-Ahead Logging) allows concurrent reads while a write is in progress and fixes most locking issues.
TypeError: 'NoneType' object is not subscriptable on metrics endpoint – This means db_conn is None, which happens if you try to call an endpoint before the lifespan context initializes the connection. Make sure you’re using lifespan and not trying to access the database at module import time.
Chart.js not rendering – If the dashboard shows a blank page, check your browser console. The most common cause is empty arrays being passed to Chart.js when there are no predictions in the time window. The query returns no rows, so labels, latencies, and counts are all empty lists. Add a check in the HTML template or log some predictions first.
pydantic.error_wrappers.ValidationError on POST /log – If you’re getting validation errors, check that latency_ms is a number (not a string) in your JSON payload. Pydantic v2 is stricter about type coercion than v1. Also verify that model_name, input_data, and prediction are all present since they’re required fields.
Background task silently fails – BackgroundTasks swallows exceptions by default. Wrap the compute_health_snapshot body in a try/except and log errors explicitly:
1
2
3
4
5
6
7
8
9
10
| import logging
logger = logging.getLogger(__name__)
def compute_health_snapshot(model_name: str, window_hours: int = 1):
try:
# ... snapshot logic ...
pass
except Exception as e:
logger.error(f"Snapshot failed for {model_name}: {e}")
|