You built a model API. It handles one request just fine. Now you need to know what happens at 100 concurrent users, 500, or 1,000. Guessing is not a strategy. You need a repeatable load testing pipeline that tells you exactly where your endpoint falls over, and Locust paired with FastAPI is the fastest way to get there.

Here’s the full setup – a FastAPI endpoint serving a real sentence-transformers model, a Locust test file with realistic traffic patterns, and a pass/fail gate you can plug into CI.

1
pip install fastapi uvicorn sentence-transformers locust

Setting Up the FastAPI Model Endpoint

Use FastAPI’s lifespan context manager to load the model once at startup and clean up on shutdown. This avoids the deprecated @app.on_event pattern and keeps the model in application state where all request handlers can access it.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# app.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from pydantic import BaseModel
from sentence_transformers import SentenceTransformer
import time

ml_models = {}

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Load model on startup
    ml_models["embedder"] = SentenceTransformer("all-MiniLM-L6-v2")
    yield
    # Cleanup on shutdown
    ml_models.clear()

app = FastAPI(lifespan=lifespan)

class EmbedRequest(BaseModel):
    texts: list[str]

class EmbedResponse(BaseModel):
    embeddings: list[list[float]]
    latency_ms: float

@app.post("/embed", response_model=EmbedResponse)
async def embed(request: EmbedRequest):
    start = time.perf_counter()
    embeddings = ml_models["embedder"].encode(request.texts).tolist()
    latency_ms = (time.perf_counter() - start) * 1000
    return EmbedResponse(embeddings=embeddings, latency_ms=latency_ms)

@app.get("/health")
async def health():
    model_loaded = "embedder" in ml_models
    return {"status": "ok" if model_loaded else "not_ready"}

Start it with uvicorn app:app --host 0.0.0.0 --port 8000 --workers 1. One worker is intentional here – you want to establish a single-process baseline first before scaling workers.

Writing the Locust Test File

The key to useful load tests is realistic traffic. Don’t just hammer the endpoint with identical payloads. Vary the batch size, mix in health checks, and weight tasks by how often they actually happen in production.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# locustfile.py
import random
from locust import HttpUser, task, between

SAMPLE_TEXTS = [
    "How do I deploy a machine learning model to production?",
    "What is the difference between batch and real-time inference?",
    "Explain transformer attention mechanisms in simple terms.",
    "Best practices for model versioning and rollback strategies.",
    "Compare ONNX Runtime vs TensorRT for inference optimization.",
    "Steps to set up GPU autoscaling on Kubernetes.",
    "How does quantization affect model accuracy and latency?",
    "Monitoring ML model performance in production environments.",
]

class ModelUser(HttpUser):
    wait_time = between(0.5, 2.0)

    @task(8)
    def single_embed(self):
        """Most common request: embed 1-3 texts."""
        batch = random.sample(SAMPLE_TEXTS, k=random.randint(1, 3))
        self.client.post("/embed", json={"texts": batch})

    @task(2)
    def batch_embed(self):
        """Less common: larger batch of 5-8 texts."""
        batch = random.sample(SAMPLE_TEXTS, k=random.randint(5, 8))
        self.client.post("/embed", json={"texts": batch})

    @task(1)
    def health_check(self):
        """Simulates load balancer health probes."""
        self.client.get("/health")

The @task weights matter. Setting single_embed to weight 8 and batch_embed to weight 2 means roughly 73% of requests are small batches and 18% are larger ones. That matches a typical production traffic mix where most callers send one or two texts at a time.

Configuring Load Profiles

Run Locust in headless mode for CI pipelines. Define three profiles that cover the scenarios you actually care about: gradual ramp-up, sustained load, and traffic spikes.

Ramp-up test – find your throughput ceiling:

1
2
3
4
5
6
7
8
locust -f locustfile.py \
    --host http://localhost:8000 \
    --headless \
    --users 100 \
    --spawn-rate 5 \
    --run-time 120s \
    --csv results/ramp_up \
    --html results/ramp_up_report.html

This starts at 0 users and adds 5 per second until it hits 100. The --csv flag writes results/ramp_up_stats.csv, results/ramp_up_stats_history.csv, and results/ramp_up_failures.csv. The --html flag generates a self-contained HTML report with charts.

Sustained load test – check stability over time:

1
2
3
4
5
6
7
8
locust -f locustfile.py \
    --host http://localhost:8000 \
    --headless \
    --users 50 \
    --spawn-rate 50 \
    --run-time 300s \
    --csv results/sustained \
    --html results/sustained_report.html

Setting --spawn-rate equal to --users means all 50 users start immediately. Run it for 5 minutes and look for latency drift – if p95 keeps climbing, you have a memory leak or resource exhaustion issue.

Spike test – simulate a traffic burst:

1
2
3
4
5
6
7
8
locust -f locustfile.py \
    --host http://localhost:8000 \
    --headless \
    --users 200 \
    --spawn-rate 200 \
    --run-time 60s \
    --csv results/spike \
    --html results/spike_report.html

All 200 users hit at once. This is where you find out if your endpoint queues gracefully or starts dropping requests.

Analyzing Results and Setting Pass/Fail Thresholds

The CSV files Locust generates have everything you need. Parse them to build an automated quality gate that blocks deployment when latency or error rate crosses your thresholds.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# check_results.py
import csv
import sys

THRESHOLDS = {
    "p50_ms": 100,       # 50th percentile must be under 100ms
    "p95_ms": 500,       # 95th percentile must be under 500ms
    "p99_ms": 1000,      # 99th percentile must be under 1 second
    "error_rate": 0.01,  # Less than 1% errors
    "min_rps": 20,       # At least 20 requests per second
}

def check_stats(csv_path: str) -> bool:
    passed = True

    with open(csv_path) as f:
        reader = csv.DictReader(f)
        rows = list(reader)

    # The last row with "Aggregated" is the summary
    aggregated = [r for r in rows if r["Name"] == "Aggregated"]
    if not aggregated:
        print("ERROR: No aggregated stats found in CSV.")
        return False

    stats = aggregated[0]

    total_requests = int(stats["Request Count"])
    total_failures = int(stats["Failure Count"])
    error_rate = total_failures / total_requests if total_requests > 0 else 1.0
    rps = float(stats["Requests/s"])
    p50 = float(stats["50%"])
    p95 = float(stats["95%"])
    p99 = float(stats["99%"])

    checks = [
        ("p50_ms", p50, THRESHOLDS["p50_ms"]),
        ("p95_ms", p95, THRESHOLDS["p95_ms"]),
        ("p99_ms", p99, THRESHOLDS["p99_ms"]),
        ("error_rate", error_rate, THRESHOLDS["error_rate"]),
    ]

    for name, actual, threshold in checks:
        if actual > threshold:
            print(f"FAIL: {name} = {actual:.2f} (threshold: {threshold})")
            passed = False
        else:
            print(f"PASS: {name} = {actual:.2f} (threshold: {threshold})")

    if rps < THRESHOLDS["min_rps"]:
        print(f"FAIL: rps = {rps:.1f} (minimum: {THRESHOLDS['min_rps']})")
        passed = False
    else:
        print(f"PASS: rps = {rps:.1f} (minimum: {THRESHOLDS['min_rps']})")

    return passed

if __name__ == "__main__":
    csv_path = sys.argv[1] if len(sys.argv) > 1 else "results/ramp_up_stats.csv"
    success = check_stats(csv_path)
    print(f"\nOverall: {'PASSED' if success else 'FAILED'}")
    sys.exit(0 if success else 1)

Run it after your load test:

1
2
mkdir -p results
python check_results.py results/ramp_up_stats.csv

If any metric crosses the threshold, the script exits with code 1. Wire this into your CI pipeline and bad deployments never make it past staging.

Common Errors and Fixes

ConnectionRefusedError: [Errno 111] Connection refused

Locust started before the FastAPI server was ready. The model takes a few seconds to load. Add a wait loop in your CI script:

1
2
3
4
for i in $(seq 1 30); do
    curl -s http://localhost:8000/health | grep -q '"ok"' && break
    sleep 1
done

locust: error: unrecognized arguments: --autostart

The --autostart flag was removed in Locust 2.x. Use --headless instead for non-interactive runs.

High p99 latency with low p50 on CPU inference

This usually means garbage collection pauses or CPU thermal throttling under sustained load. Check with --workers 1 first to isolate the cause. If the problem disappears with fresh processes, GC is the culprit – set PYTHONDONTWRITEBYTECODE=1 and consider running the model in a subprocess pool.

RuntimeError: No model loaded or KeyError on ml_models

Your Locust test started sending requests before the model finished loading. The /health endpoint returns "not_ready" until the lifespan context finishes. Gate your tests on that health check as shown above.

CSV files are empty or missing the Aggregated row

Locust only writes the final aggregated stats after the test completes. If you kill the process with Ctrl+C or SIGKILL, the CSV gets truncated. Always use --run-time to let Locust exit cleanly, or send SIGTERM and wait for graceful shutdown.