Shadow deployment (also called dark launching) is the safest way to test a new model in production. Both your current model and the new candidate receive every request, but only the primary model’s response goes back to the user. The shadow model’s predictions get logged for offline comparison. If the shadow performs well, you promote it. If it doesn’t, nothing bad happened – users never saw its output.

This is stricter than canary deployments. In a canary, some users get the new model’s predictions directly. In shadow mode, zero users are affected by the new model. The tradeoff: you can’t measure user-facing metrics like click-through or conversion, only prediction agreement and latency.

1
pip install fastapi uvicorn pydantic aiosqlite

The Shadow Service

The core idea: every request hits both models concurrently via asyncio. The primary model’s response goes to the user immediately. The shadow model runs in a background task so it never adds latency to the response.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# shadow_service.py
import asyncio
import json
import time
import uuid
from contextlib import asynccontextmanager

import aiosqlite
from fastapi import BackgroundTasks, FastAPI
from pydantic import BaseModel

DB_PATH = "shadow_log.db"


async def init_db():
    async with aiosqlite.connect(DB_PATH) as db:
        await db.execute("""
            CREATE TABLE IF NOT EXISTS shadow_log (
                id TEXT PRIMARY KEY,
                timestamp REAL,
                input_text TEXT,
                primary_prediction TEXT,
                primary_confidence REAL,
                primary_latency_ms REAL,
                shadow_prediction TEXT,
                shadow_confidence REAL,
                shadow_latency_ms REAL,
                agreement INTEGER
            )
        """)
        await db.commit()


@asynccontextmanager
async def lifespan(app: FastAPI):
    await init_db()
    yield


app = FastAPI(lifespan=lifespan)


# Simulate two model versions. Replace these with your actual model inference.
def primary_model(text: str) -> dict:
    """Current production model (v1)."""
    return {"prediction": "positive", "confidence": 0.88}


def shadow_model(text: str) -> dict:
    """New candidate model (v2) running in shadow mode."""
    return {"prediction": "positive", "confidence": 0.93}


class PredictRequest(BaseModel):
    text: str


class PredictResponse(BaseModel):
    prediction: str
    confidence: float
    request_id: str


async def run_shadow_and_log(
    request_id: str,
    text: str,
    primary_result: dict,
    primary_latency: float,
):
    """Run the shadow model and log both results to SQLite."""
    start = time.perf_counter()
    shadow_result = await asyncio.to_thread(shadow_model, text)
    shadow_latency = (time.perf_counter() - start) * 1000

    agreement = int(primary_result["prediction"] == shadow_result["prediction"])

    async with aiosqlite.connect(DB_PATH) as db:
        await db.execute(
            """INSERT INTO shadow_log
               (id, timestamp, input_text, primary_prediction, primary_confidence,
                primary_latency_ms, shadow_prediction, shadow_confidence,
                shadow_latency_ms, agreement)
               VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
            (
                request_id,
                time.time(),
                text,
                primary_result["prediction"],
                primary_result["confidence"],
                primary_latency,
                shadow_result["prediction"],
                shadow_result["confidence"],
                shadow_latency,
                agreement,
            ),
        )
        await db.commit()


@app.post("/predict", response_model=PredictResponse)
async def predict(req: PredictRequest, background_tasks: BackgroundTasks):
    request_id = str(uuid.uuid4())

    # Run primary model synchronously -- this is the response the user gets
    start = time.perf_counter()
    primary_result = await asyncio.to_thread(primary_model, req.text)
    primary_latency = (time.perf_counter() - start) * 1000

    # Schedule shadow model to run in background -- does NOT block the response
    background_tasks.add_task(
        run_shadow_and_log, request_id, req.text, primary_result, primary_latency
    )

    return PredictResponse(
        prediction=primary_result["prediction"],
        confidence=primary_result["confidence"],
        request_id=request_id,
    )
1
uvicorn shadow_service:app --host 0.0.0.0 --port 8000

The key detail: BackgroundTasks fires after the response is sent. The user gets the primary model’s answer with zero extra latency. The shadow model runs, its result gets logged, and nobody knows it happened.

Logging to the Comparison Store

The SQLite approach above works for single-instance deployments and local testing. Every prediction pair lands in shadow_log.db with timestamps, both predictions, confidence scores, latencies, and whether they agreed.

For multi-instance production setups, swap SQLite for a shared store. Here’s a file-based JSON logger that writes to a shared volume or object storage:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# file_logger.py
import json
import os
import time
from pathlib import Path

LOG_DIR = Path("shadow_logs")
LOG_DIR.mkdir(exist_ok=True)


def log_comparison(
    request_id: str,
    input_text: str,
    primary_result: dict,
    primary_latency_ms: float,
    shadow_result: dict,
    shadow_latency_ms: float,
):
    """Append a comparison record to a daily JSONL file."""
    date_str = time.strftime("%Y-%m-%d")
    log_file = LOG_DIR / f"shadow_{date_str}.jsonl"

    record = {
        "request_id": request_id,
        "timestamp": time.time(),
        "input": input_text,
        "primary": {
            "prediction": primary_result["prediction"],
            "confidence": primary_result["confidence"],
            "latency_ms": primary_latency_ms,
        },
        "shadow": {
            "prediction": shadow_result["prediction"],
            "confidence": shadow_result["confidence"],
            "latency_ms": shadow_latency_ms,
        },
        "agreement": primary_result["prediction"] == shadow_result["prediction"],
    }

    with open(log_file, "a") as f:
        f.write(json.dumps(record) + "\n")

JSONL files are easy to ingest into any analytics tool. One file per day keeps things manageable. In production, you’d point LOG_DIR at a shared NFS mount, S3 bucket (via s3fs), or ship records to a message queue like Kafka.

Analyzing Shadow Results

Once you’ve collected enough shadow data, you need to compare the two models. This script reads from the SQLite log and produces the metrics that matter for a promotion decision.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# analyze_shadow.py
import sqlite3
import sys

DB_PATH = "shadow_log.db"


def analyze(min_samples: int = 100):
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()

    cursor.execute("SELECT COUNT(*) FROM shadow_log")
    total = cursor.fetchone()[0]

    if total < min_samples:
        print(f"Only {total} samples collected. Need at least {min_samples}. Wait for more traffic.")
        sys.exit(1)

    cursor.execute("SELECT SUM(agreement) FROM shadow_log")
    agreed = cursor.fetchone()[0]
    agreement_rate = agreed / total * 100

    cursor.execute("SELECT AVG(primary_latency_ms), AVG(shadow_latency_ms) FROM shadow_log")
    avg_primary_lat, avg_shadow_lat = cursor.fetchone()

    cursor.execute("""
        SELECT AVG(primary_confidence), AVG(shadow_confidence) FROM shadow_log
    """)
    avg_primary_conf, avg_shadow_conf = cursor.fetchone()

    # Disagreement breakdown
    cursor.execute("""
        SELECT primary_prediction, shadow_prediction, COUNT(*) as cnt
        FROM shadow_log
        WHERE agreement = 0
        GROUP BY primary_prediction, shadow_prediction
        ORDER BY cnt DESC
        LIMIT 10
    """)
    disagreements = cursor.fetchall()

    conn.close()

    print(f"Total predictions:     {total}")
    print(f"Agreement rate:        {agreement_rate:.1f}%")
    print(f"Avg primary latency:   {avg_primary_lat:.1f} ms")
    print(f"Avg shadow latency:    {avg_shadow_lat:.1f} ms")
    print(f"Avg primary confidence: {avg_primary_conf:.3f}")
    print(f"Avg shadow confidence:  {avg_shadow_conf:.3f}")

    if disagreements:
        print(f"\nTop disagreements:")
        for primary_pred, shadow_pred, count in disagreements:
            print(f"  Primary={primary_pred} vs Shadow={shadow_pred}: {count} times")

    # Promotion recommendation
    latency_ok = avg_shadow_lat < avg_primary_lat * 1.2  # shadow within 20% of primary
    agreement_ok = agreement_rate >= 95.0
    confidence_ok = avg_shadow_conf >= avg_primary_conf

    print(f"\n--- Promotion Check ---")
    print(f"Latency within 20%:   {'PASS' if latency_ok else 'FAIL'}")
    print(f"Agreement >= 95%:      {'PASS' if agreement_ok else 'FAIL'}")
    print(f"Confidence >= primary: {'PASS' if confidence_ok else 'FAIL'}")

    if latency_ok and agreement_ok and confidence_ok:
        print("\nRECOMMENDATION: Shadow model is ready for promotion.")
    else:
        print("\nRECOMMENDATION: Shadow model is NOT ready. Investigate failures above.")


if __name__ == "__main__":
    analyze()
1
python analyze_shadow.py

Sample output:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
Total predictions:     5432
Agreement rate:        97.2%
Avg primary latency:   45.3 ms
Avg shadow latency:    52.1 ms
Avg primary confidence: 0.881
Avg shadow confidence:  0.923

--- Promotion Check ---
Latency within 20%:   PASS
Agreement >= 95%:      PASS
Confidence >= primary: PASS

RECOMMENDATION: Shadow model is ready for promotion.

The three checks here are a reasonable starting point. Adjust the thresholds based on your domain. For high-stakes predictions (medical, financial), you might want 99%+ agreement and manual review of every disagreement. For recommendations or content ranking, 90% agreement might be fine.

Promotion Logic

When the analysis script says the shadow is ready, you need a mechanism to swap it to primary. The simplest approach: an admin endpoint that flips a flag.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# Add to shadow_service.py

from fastapi import Header, HTTPException

ADMIN_TOKEN = "your-secret-admin-token"

model_config = {
    "active": "v1",
    "models": {
        "v1": primary_model,
        "v2": shadow_model,
    },
}


@app.post("/admin/promote")
async def promote_shadow(
    new_primary: str,
    authorization: str = Header(...),
):
    if authorization != f"Bearer {ADMIN_TOKEN}":
        raise HTTPException(status_code=401, detail="Unauthorized")

    if new_primary not in model_config["models"]:
        raise HTTPException(status_code=400, detail=f"Unknown model: {new_primary}")

    old = model_config["active"]
    model_config["active"] = new_primary
    return {
        "message": f"Promoted {new_primary} to primary",
        "previous": old,
    }


@app.get("/admin/status")
async def model_status(authorization: str = Header(...)):
    if authorization != f"Bearer {ADMIN_TOKEN}":
        raise HTTPException(status_code=401, detail="Unauthorized")

    return {
        "active_model": model_config["active"],
        "available_models": list(model_config["models"].keys()),
    }
1
2
3
4
5
6
# Check current status
curl -H "Authorization: Bearer your-secret-admin-token" http://localhost:8000/admin/status

# Promote shadow to primary
curl -X POST "http://localhost:8000/admin/promote?new_primary=v2" \
  -H "Authorization: Bearer your-secret-admin-token"

For automated promotion, wire the analysis script into your CI/CD pipeline. Run it on a cron schedule, and if all checks pass, call the promote endpoint. If any check fails, page the on-call engineer instead.

A more robust promotion path: use the shadow analysis results to trigger a canary deployment of the shadow model. Shadow mode proves the model doesn’t break things; canary mode proves it improves user-facing metrics.

Common Errors and Fixes

Shadow model adds latency despite background tasks

If you’re seeing increased p99 latency, the shadow model might be competing for CPU/GPU with the primary. Run the shadow model in a separate process or on a separate instance. FastAPI’s BackgroundTasks shares the same event loop, so CPU-heavy shadow inference can still block. Use asyncio.to_thread (as shown above) to push it off the main thread, or offload to a task queue like Celery.

SQLite “database is locked” errors under load

SQLite doesn’t handle concurrent writes well. If you’re running multiple workers with uvicorn --workers 4, each worker fights for the write lock. Fix: switch to WAL mode at startup.

1
2
3
4
5
6
7
async def init_db():
    async with aiosqlite.connect(DB_PATH) as db:
        await db.execute("PRAGMA journal_mode=WAL")
        await db.execute("""
            CREATE TABLE IF NOT EXISTS shadow_log (...)
        """)
        await db.commit()

For higher throughput, swap SQLite for PostgreSQL or write to JSONL files (one per worker PID).

Shadow predictions don’t match when models are on different hardware

Floating-point results can differ between CPU and GPU, or between GPU architectures. If you’re comparing exact confidence scores, use a tolerance threshold (e.g., abs(primary - shadow) < 0.01) instead of exact equality. Compare prediction labels, not raw logits.

Background tasks silently fail

If the shadow model crashes, FastAPI swallows the exception by default. Add error handling inside the background task to log failures.

1
2
3
4
5
6
7
8
9
async def run_shadow_and_log(request_id, text, primary_result, primary_latency):
    try:
        start = time.perf_counter()
        shadow_result = await asyncio.to_thread(shadow_model, text)
        shadow_latency = (time.perf_counter() - start) * 1000
        # ... log to DB
    except Exception as e:
        import logging
        logging.error(f"Shadow model failed for {request_id}: {e}")

Memory usage doubles with two models loaded

Yes, that’s the cost of shadow deployment. You’re running two models simultaneously. Options: use quantized versions for the shadow, run the shadow on a separate instance, or use model offloading to keep only one model in GPU memory and swap as needed.