Your model is trained and ready to serve predictions. But at inference time, it needs features – user history, aggregated stats, embeddings – and fetching them from a SQL database adds 50-200ms of latency per request. That kills your SLA.

Redis solves this. It holds your features in memory, and reads come back in under a millisecond. Pair it with FastAPI and you get a feature server that your model serving layer can call over HTTP with minimal overhead.

Install what you need:

1
pip install fastapi uvicorn redis pydantic pandas

Make sure you have a Redis instance running. Locally, redis-server on the default port 6379 works fine. In production, use a managed service like Amazon ElastiCache or Redis Cloud.

Designing the Feature Schema

Before writing anything to Redis, define what your features look like. Pydantic models give you validation for free and make your API self-documenting.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from pydantic import BaseModel
from typing import Optional
from datetime import datetime


class UserFeatures(BaseModel):
    user_id: str
    avg_session_duration: float
    total_purchases: int
    days_since_last_login: int
    lifetime_value: float
    churn_score: float
    preferred_category: str
    updated_at: datetime


class FeatureRequest(BaseModel):
    entity_key: str


class BatchFeatureRequest(BaseModel):
    entity_keys: list[str]

The user_id field is your entity key – the thing you look up features by. Every feature set maps to exactly one entity key. The updated_at timestamp lets you track feature freshness, which matters when your batch pipeline runs on a schedule and you need to know if features are stale.

Ingesting Features into Redis

Your offline pipeline computes features in batch (think Spark, dbt, or plain pandas). The last step pushes those features into Redis so they are available at serving time. Redis hashes are the right data structure here – one hash per entity, with each field being a feature name.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import redis
import pandas as pd
from datetime import datetime

# Sample feature data -- in production this comes from your batch pipeline
feature_data = pd.DataFrame({
    "user_id": ["user_001", "user_002", "user_003"],
    "avg_session_duration": [12.5, 8.3, 22.1],
    "total_purchases": [15, 3, 42],
    "days_since_last_login": [2, 14, 0],
    "lifetime_value": [1250.00, 180.50, 5400.75],
    "churn_score": [0.12, 0.78, 0.05],
    "preferred_category": ["electronics", "books", "clothing"],
})

r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)

FEATURE_PREFIX = "features:user:"
TTL_SECONDS = 86400  # 24 hours -- forces re-ingestion daily


def ingest_features(df: pd.DataFrame) -> int:
    """Push a DataFrame of features into Redis hashes."""
    pipe = r.pipeline()
    count = 0

    for _, row in df.iterrows():
        key = f"{FEATURE_PREFIX}{row['user_id']}"
        feature_map = {
            "avg_session_duration": str(row["avg_session_duration"]),
            "total_purchases": str(row["total_purchases"]),
            "days_since_last_login": str(row["days_since_last_login"]),
            "lifetime_value": str(row["lifetime_value"]),
            "churn_score": str(row["churn_score"]),
            "preferred_category": row["preferred_category"],
            "updated_at": datetime.now().isoformat(),
        }
        pipe.hset(key, mapping=feature_map)
        pipe.expire(key, TTL_SECONDS)
        count += 1

    pipe.execute()
    print(f"Ingested {count} feature sets into Redis")
    return count


ingested = ingest_features(feature_data)
# Output: Ingested 3 feature sets into Redis

A few things to note. We convert numeric values to strings because Redis hashes store string values. We set a TTL so stale features expire automatically if the batch pipeline fails to run. And we use pipe (a Redis pipeline) even during ingestion to batch the writes – this cuts the number of round-trips from 2N to 1.

Building the FastAPI Feature Server

Now build the HTTP layer. Your model serving infrastructure calls this endpoint to get features before running inference. We use FastAPI’s lifespan context manager to manage the Redis connection lifecycle.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import redis.asyncio as aioredis

FEATURE_PREFIX = "features:user:"


class FeatureRequest(BaseModel):
    entity_key: str


class FeatureResponse(BaseModel):
    entity_key: str
    features: dict[str, str]
    found: bool


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: create async Redis connection pool
    app.state.redis = aioredis.Redis(
        host="localhost",
        port=6379,
        db=0,
        decode_responses=True,
        max_connections=50,
    )
    # Verify the connection works
    await app.state.redis.ping()
    print("Connected to Redis")
    yield
    # Shutdown: close the connection pool
    await app.state.redis.aclose()
    print("Redis connection closed")


app = FastAPI(title="Feature Store API", lifespan=lifespan)


@app.post("/features", response_model=FeatureResponse)
async def get_features(request: FeatureRequest):
    r = app.state.redis
    key = f"{FEATURE_PREFIX}{request.entity_key}"
    features = await r.hgetall(key)

    if not features:
        raise HTTPException(
            status_code=404,
            detail=f"No features found for entity: {request.entity_key}",
        )

    return FeatureResponse(
        entity_key=request.entity_key,
        features=features,
        found=True,
    )


@app.get("/health")
async def health():
    try:
        await app.state.redis.ping()
        return {"status": "healthy", "redis": "connected"}
    except Exception as e:
        raise HTTPException(status_code=503, detail=f"Redis unavailable: {str(e)}")

Run it with:

1
uvicorn feature_server:app --host 0.0.0.0 --port 8000

Then test it:

1
2
3
curl -X POST http://localhost:8000/features \
  -H "Content-Type: application/json" \
  -d '{"entity_key": "user_001"}'

The response comes back with all features for that user as a flat dictionary. Your model serving code parses the values back into the types it expects.

Batch Feature Retrieval with Pipelines

When your model needs to score multiple users at once – a batch prediction job, a ranking request, a recommendation list – fetching features one at a time wastes round-trips. Redis pipelines let you send all requests in a single network call.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import redis.asyncio as aioredis
import time

FEATURE_PREFIX = "features:user:"


class BatchFeatureRequest(BaseModel):
    entity_keys: list[str]


class BatchFeatureResponse(BaseModel):
    results: dict[str, dict[str, str]]
    missing: list[str]
    latency_ms: float


@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.redis = aioredis.Redis(
        host="localhost", port=6379, db=0, decode_responses=True
    )
    await app.state.redis.ping()
    yield
    await app.state.redis.aclose()


app = FastAPI(title="Feature Store API", lifespan=lifespan)


@app.post("/features/batch", response_model=BatchFeatureResponse)
async def get_features_batch(request: BatchFeatureRequest):
    r = app.state.redis
    start = time.perf_counter()

    # Use a pipeline -- one round-trip for all keys
    pipe = r.pipeline()
    for entity_key in request.entity_keys:
        pipe.hgetall(f"{FEATURE_PREFIX}{entity_key}")

    raw_results = await pipe.execute()

    results = {}
    missing = []
    for entity_key, features in zip(request.entity_keys, raw_results):
        if features:
            results[entity_key] = features
        else:
            missing.append(entity_key)

    elapsed_ms = (time.perf_counter() - start) * 1000

    return BatchFeatureResponse(
        results=results,
        missing=missing,
        latency_ms=round(elapsed_ms, 3),
    )

The performance difference is significant. Fetching 100 entities individually takes 100 round-trips – roughly 10-20ms on a local network, much worse across availability zones. With a pipeline, the same 100 entities come back in a single round-trip, typically under 2ms. That is a 5-10x speedup with zero extra complexity.

Test the batch endpoint:

1
2
3
curl -X POST http://localhost:8000/features/batch \
  -H "Content-Type: application/json" \
  -d '{"entity_keys": ["user_001", "user_002", "user_003", "user_999"]}'

The response includes a missing array so your calling code knows which entities had no features, and a latency_ms field for monitoring.

Common Errors and Fixes

Connection refused on startup

1
redis.exceptions.ConnectionError: Error 111 connecting to localhost:6379. Connection refused.

Redis is not running. Start it with redis-server or check your service manager. If you are using Docker, make sure the container is up and the port mapping is correct:

1
docker run -d --name redis -p 6379:6379 redis:7-alpine

Hash values come back as bytes instead of strings

1
{b'avg_session_duration': b'12.5', b'total_purchases': b'15'}

You forgot to set decode_responses=True when creating the Redis client. Without it, redis-py returns raw bytes. Fix it in your client constructor:

1
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)

Feature TTL expired and keys are missing

1
{"detail": "No features found for entity: user_001"}

Your batch ingestion pipeline did not run before the TTL expired. Check your scheduler (Airflow, cron, whatever runs the pipeline). You can verify directly in Redis:

1
2
3
4
redis-cli TTL "features:user:user_001"
# Returns -2 if the key does not exist (expired or never set)
# Returns -1 if the key exists but has no TTL
# Returns a positive integer for remaining seconds

Increase your TTL to give yourself more buffer, or add alerting when the ingestion job fails so you catch it before features go stale.