LLM API calls are expensive. A single rogue user or a misconfigured script can burn through thousands of dollars in minutes. You need rate limiting that goes beyond simple request counting – you need token-aware cost controls and abuse detection baked into every request.

Here’s the minimal setup: a token-bucket rate limiter backed by Redis, wired into FastAPI middleware, with per-user daily token budgets and basic abuse pattern detection.

1
pip install fastapi uvicorn redis tiktoken pydantic

Token-Bucket Rate Limiter with Redis

The token-bucket algorithm is a natural fit for LLM APIs. Each user gets a bucket that refills at a steady rate. Requests drain tokens from the bucket. When the bucket is empty, requests get rejected. Redis handles the state so this works across multiple API instances.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import time
import redis.asyncio as redis


class TokenBucketLimiter:
    """Per-key token-bucket rate limiter backed by Redis."""

    def __init__(self, redis_client: redis.Redis, max_tokens: int, refill_rate: float):
        self.redis = redis_client
        self.max_tokens = max_tokens
        self.refill_rate = refill_rate  # tokens per second

    async def allow_request(self, key: str, cost: int = 1) -> dict:
        now = time.time()
        bucket_key = f"ratelimit:{key}"

        pipe = self.redis.pipeline()
        pipe.hmget(bucket_key, "tokens", "last_refill")
        results = await pipe.execute()
        stored = results[0]

        if stored[0] is None:
            tokens = float(self.max_tokens)
            last_refill = now
        else:
            tokens = float(stored[0])
            last_refill = float(stored[1])

        # Refill tokens based on elapsed time
        elapsed = now - last_refill
        tokens = min(self.max_tokens, tokens + elapsed * self.refill_rate)

        if tokens >= cost:
            tokens -= cost
            allowed = True
        else:
            allowed = False

        # Update bucket state
        pipe = self.redis.pipeline()
        pipe.hset(bucket_key, mapping={"tokens": tokens, "last_refill": now})
        pipe.expire(bucket_key, 3600)  # auto-cleanup after 1 hour of inactivity
        await pipe.execute()

        return {
            "allowed": allowed,
            "remaining": max(0, int(tokens)),
            "reset_in": int((self.max_tokens - tokens) / self.refill_rate),
        }

The cost parameter is key. Instead of treating every request as equal, you can charge based on the actual token count of the LLM call. A 50-token request costs less than a 4000-token request.

FastAPI Middleware with Lifespan

Wire the limiter into FastAPI using the lifespan context manager for Redis connection management and middleware for request interception.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
import tiktoken


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: connect to Redis
    app.state.redis = redis.Redis(host="localhost", port=6379, decode_responses=True)
    app.state.limiter = TokenBucketLimiter(
        redis_client=app.state.redis,
        max_tokens=100,       # 100 requests per bucket
        refill_rate=2.0,      # 2 tokens/second = ~120 requests/minute
    )
    app.state.encoding = tiktoken.encoding_for_model("gpt-4")
    yield
    # Shutdown: close Redis
    await app.state.redis.close()


app = FastAPI(lifespan=lifespan)


@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    # Extract API key from header
    api_key = request.headers.get("X-API-Key")
    if not api_key:
        return JSONResponse(status_code=401, content={"error": "Missing X-API-Key header"})

    # Check rate limit
    result = await request.app.state.limiter.allow_request(key=api_key, cost=1)

    if not result["allowed"]:
        return JSONResponse(
            status_code=429,
            content={"error": "Rate limit exceeded", "retry_after": result["reset_in"]},
            headers={
                "Retry-After": str(result["reset_in"]),
                "X-RateLimit-Remaining": str(result["remaining"]),
            },
        )

    response = await call_next(request)
    response.headers["X-RateLimit-Remaining"] = str(result["remaining"])
    return response


@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
    body = await request.json()
    messages = body.get("messages", [])

    # Count input tokens
    enc = request.app.state.encoding
    input_tokens = sum(len(enc.encode(m["content"])) for m in messages)

    # Your LLM call goes here
    # response = await call_llm(messages)

    return {"message": "ok", "input_tokens": input_tokens}

The middleware runs before every request. It pulls the API key from the header, checks the bucket, and either lets the request through or returns a 429 with proper Retry-After headers. Clients that respect those headers will back off automatically.

Token-Based Cost Tracking

Request-count limits are a blunt instrument. A user sending 10 requests with 100 tokens each costs way less than one sending 10 requests with 8000 tokens each. Track actual token usage with daily budgets per user.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from datetime import datetime


class DailyTokenBudget:
    """Track per-user daily token consumption against a budget."""

    def __init__(self, redis_client: redis.Redis, daily_limit: int = 100_000):
        self.redis = redis_client
        self.daily_limit = daily_limit

    def _day_key(self, user_id: str) -> str:
        today = datetime.utcnow().strftime("%Y-%m-%d")
        return f"token_usage:{user_id}:{today}"

    async def check_and_deduct(self, user_id: str, token_count: int) -> dict:
        key = self._day_key(user_id)

        current_usage = await self.redis.get(key)
        current_usage = int(current_usage) if current_usage else 0

        if current_usage + token_count > self.daily_limit:
            return {
                "allowed": False,
                "used": current_usage,
                "limit": self.daily_limit,
                "remaining": max(0, self.daily_limit - current_usage),
            }

        # Atomically increment usage
        new_usage = await self.redis.incrby(key, token_count)
        # Expire at end of day (set TTL to 24 hours on first write)
        if current_usage == 0:
            await self.redis.expire(key, 86400)

        return {
            "allowed": True,
            "used": new_usage,
            "limit": self.daily_limit,
            "remaining": max(0, self.daily_limit - new_usage),
        }

    async def get_usage(self, user_id: str) -> dict:
        key = self._day_key(user_id)
        current = await self.redis.get(key)
        used = int(current) if current else 0
        return {"used": used, "limit": self.daily_limit, "remaining": max(0, self.daily_limit - used)}

Plug this into your endpoint to enforce token budgets alongside request rate limits:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@app.post("/v1/chat/completions/metered")
async def metered_chat(request: Request):
    api_key = request.headers.get("X-API-Key", "anonymous")
    body = await request.json()
    messages = body.get("messages", [])

    enc = request.app.state.encoding
    input_tokens = sum(len(enc.encode(m["content"])) for m in messages)

    budget = DailyTokenBudget(request.app.state.redis, daily_limit=100_000)
    result = await budget.check_and_deduct(user_id=api_key, token_count=input_tokens)

    if not result["allowed"]:
        raise HTTPException(
            status_code=429,
            detail={
                "error": "Daily token budget exceeded",
                "used": result["used"],
                "limit": result["limit"],
            },
        )

    # Process the LLM request here
    return {
        "message": "ok",
        "tokens_used_today": result["used"],
        "tokens_remaining": result["remaining"],
    }

This gives you two layers of protection: the token-bucket limiter caps burst traffic, while the daily budget caps total cost.

Abuse Pattern Detection

Rate limiting stops volume abuse but not content abuse. Catch prompt injection attempts, suspicious retry patterns, and other red flags before they hit your model.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import re
from collections import defaultdict


class AbuseDetector:
    """Detect common abuse patterns in LLM API requests."""

    INJECTION_PATTERNS = [
        r"ignore\s+(all\s+)?previous\s+instructions",
        r"you\s+are\s+now\s+(?:a|an)\s+\w+",
        r"system\s*:\s*you\s+are",
        r"<\|im_start\|>",
        r"\[INST\]",
        r"### (?:Human|System|Assistant):",
        r"OVERRIDE\s+SAFETY",
    ]

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.patterns = [re.compile(p, re.IGNORECASE) for p in self.INJECTION_PATTERNS]

    def check_prompt_injection(self, text: str) -> dict:
        """Check for known prompt injection patterns."""
        for i, pattern in enumerate(self.patterns):
            match = pattern.search(text)
            if match:
                return {
                    "flagged": True,
                    "reason": "prompt_injection",
                    "pattern": self.INJECTION_PATTERNS[i],
                    "matched_text": match.group()[:50],
                }
        return {"flagged": False}

    async def track_failed_requests(self, user_id: str, window_seconds: int = 300) -> dict:
        """Track rapid failed requests -- a sign of automated abuse."""
        key = f"abuse:failures:{user_id}"
        now = time.time()

        pipe = self.redis.pipeline()
        pipe.zadd(key, {str(now): now})
        pipe.zremrangebyscore(key, 0, now - window_seconds)
        pipe.zcard(key)
        pipe.expire(key, window_seconds)
        results = await pipe.execute()

        failure_count = results[2]
        threshold = 20  # 20 failures in 5 minutes = suspicious

        return {
            "flagged": failure_count >= threshold,
            "failure_count": failure_count,
            "window_seconds": window_seconds,
            "threshold": threshold,
        }

    async def check_request(self, user_id: str, prompt_text: str) -> dict:
        """Run all abuse checks on a request."""
        injection_result = self.check_prompt_injection(prompt_text)
        if injection_result["flagged"]:
            # Log the attempt for review
            await self.redis.lpush(
                f"abuse:log:{user_id}",
                f"{time.time()}|injection|{injection_result['matched_text']}"
            )
            return injection_result

        failure_result = await self.track_failed_requests(user_id)
        if failure_result["flagged"]:
            return failure_result

        return {"flagged": False}

Wire it into the endpoint:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
@app.post("/v1/chat/completions/protected")
async def protected_chat(request: Request):
    api_key = request.headers.get("X-API-Key", "anonymous")
    body = await request.json()
    messages = body.get("messages", [])
    prompt_text = " ".join(m["content"] for m in messages)

    detector = AbuseDetector(request.app.state.redis)
    abuse_check = await detector.check_request(user_id=api_key, prompt_text=prompt_text)

    if abuse_check["flagged"]:
        raise HTTPException(
            status_code=403,
            detail={"error": "Request blocked", "reason": abuse_check["reason"]},
        )

    # Proceed with rate limiting and LLM call
    return {"message": "ok"}

You can extend INJECTION_PATTERNS with your own patterns as you discover new attack vectors in production logs.

Common Errors and Fixes

redis.exceptions.ConnectionError: Error connecting to localhost:6379

Redis isn’t running. Start it with redis-server or docker run -d -p 6379:6379 redis:7-alpine. In production, use a managed Redis instance (AWS ElastiCache, GCP Memorystore) and pass the connection string via environment variables.

429 responses even with low traffic

Your bucket might be too small or the refill rate too low. Debug by checking the bucket state directly:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Check a user's current bucket state
import asyncio
import redis.asyncio as redis

async def check_bucket(api_key: str):
    r = redis.Redis(host="localhost", port=6379, decode_responses=True)
    data = await r.hgetall(f"ratelimit:{api_key}")
    print(f"Bucket state: {data}")
    await r.close()

asyncio.run(check_bucket("user-123"))

Token count mismatch between your counter and the LLM provider

tiktoken counts tokens for OpenAI models specifically. If you’re using Anthropic, Llama, or another model family, the tokenizer is different. For Anthropic, use their anthropic SDK’s count_tokens method. For open-source models, use the matching tokenizer from transformers:

1
2
3
4
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3.1-8B")
token_count = len(tokenizer.encode("Your prompt text here"))

Race condition in check_and_deduct

The read-then-increment in DailyTokenBudget.check_and_deduct has a small race window. For strict enforcement, use a Lua script to make the check-and-increment atomic:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
ATOMIC_DEDUCT_SCRIPT = """
local key = KEYS[1]
local cost = tonumber(ARGV[1])
local limit = tonumber(ARGV[2])
local current = tonumber(redis.call('GET', key) or '0')
if current + cost > limit then
    return -1
end
local new_val = redis.call('INCRBY', key, cost)
if current == 0 then
    redis.call('EXPIRE', key, 86400)
end
return new_val
"""

Register it with redis_client.register_script(ATOMIC_DEDUCT_SCRIPT) and call it instead of the two-step check.

Abuse detector has false positives

Regex-based injection detection will flag legitimate content that happens to contain phrases like “ignore previous instructions” in a quoted context. Add a confidence scoring layer or use a small classifier model to reduce false positives. At minimum, log flagged requests for human review instead of hard-blocking them during the initial rollout.