Your model is trained and ready to serve predictions. But at inference time, it needs features – user history, aggregated stats, embeddings – and fetching them from a SQL database adds 50-200ms of latency per request. That kills your SLA.
Redis solves this. It holds your features in memory, and reads come back in under a millisecond. Pair it with FastAPI and you get a feature server that your model serving layer can call over HTTP with minimal overhead.
Install what you need:
1
| pip install fastapi uvicorn redis pydantic pandas
|
Make sure you have a Redis instance running. Locally, redis-server on the default port 6379 works fine. In production, use a managed service like Amazon ElastiCache or Redis Cloud.
Designing the Feature Schema#
Before writing anything to Redis, define what your features look like. Pydantic models give you validation for free and make your API self-documenting.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| from pydantic import BaseModel
from typing import Optional
from datetime import datetime
class UserFeatures(BaseModel):
user_id: str
avg_session_duration: float
total_purchases: int
days_since_last_login: int
lifetime_value: float
churn_score: float
preferred_category: str
updated_at: datetime
class FeatureRequest(BaseModel):
entity_key: str
class BatchFeatureRequest(BaseModel):
entity_keys: list[str]
|
The user_id field is your entity key – the thing you look up features by. Every feature set maps to exactly one entity key. The updated_at timestamp lets you track feature freshness, which matters when your batch pipeline runs on a schedule and you need to know if features are stale.
Ingesting Features into Redis#
Your offline pipeline computes features in batch (think Spark, dbt, or plain pandas). The last step pushes those features into Redis so they are available at serving time. Redis hashes are the right data structure here – one hash per entity, with each field being a feature name.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
| import redis
import pandas as pd
from datetime import datetime
# Sample feature data -- in production this comes from your batch pipeline
feature_data = pd.DataFrame({
"user_id": ["user_001", "user_002", "user_003"],
"avg_session_duration": [12.5, 8.3, 22.1],
"total_purchases": [15, 3, 42],
"days_since_last_login": [2, 14, 0],
"lifetime_value": [1250.00, 180.50, 5400.75],
"churn_score": [0.12, 0.78, 0.05],
"preferred_category": ["electronics", "books", "clothing"],
})
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
FEATURE_PREFIX = "features:user:"
TTL_SECONDS = 86400 # 24 hours -- forces re-ingestion daily
def ingest_features(df: pd.DataFrame) -> int:
"""Push a DataFrame of features into Redis hashes."""
pipe = r.pipeline()
count = 0
for _, row in df.iterrows():
key = f"{FEATURE_PREFIX}{row['user_id']}"
feature_map = {
"avg_session_duration": str(row["avg_session_duration"]),
"total_purchases": str(row["total_purchases"]),
"days_since_last_login": str(row["days_since_last_login"]),
"lifetime_value": str(row["lifetime_value"]),
"churn_score": str(row["churn_score"]),
"preferred_category": row["preferred_category"],
"updated_at": datetime.now().isoformat(),
}
pipe.hset(key, mapping=feature_map)
pipe.expire(key, TTL_SECONDS)
count += 1
pipe.execute()
print(f"Ingested {count} feature sets into Redis")
return count
ingested = ingest_features(feature_data)
# Output: Ingested 3 feature sets into Redis
|
A few things to note. We convert numeric values to strings because Redis hashes store string values. We set a TTL so stale features expire automatically if the batch pipeline fails to run. And we use pipe (a Redis pipeline) even during ingestion to batch the writes – this cuts the number of round-trips from 2N to 1.
Building the FastAPI Feature Server#
Now build the HTTP layer. Your model serving infrastructure calls this endpoint to get features before running inference. We use FastAPI’s lifespan context manager to manage the Redis connection lifecycle.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
| from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import redis.asyncio as aioredis
FEATURE_PREFIX = "features:user:"
class FeatureRequest(BaseModel):
entity_key: str
class FeatureResponse(BaseModel):
entity_key: str
features: dict[str, str]
found: bool
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: create async Redis connection pool
app.state.redis = aioredis.Redis(
host="localhost",
port=6379,
db=0,
decode_responses=True,
max_connections=50,
)
# Verify the connection works
await app.state.redis.ping()
print("Connected to Redis")
yield
# Shutdown: close the connection pool
await app.state.redis.aclose()
print("Redis connection closed")
app = FastAPI(title="Feature Store API", lifespan=lifespan)
@app.post("/features", response_model=FeatureResponse)
async def get_features(request: FeatureRequest):
r = app.state.redis
key = f"{FEATURE_PREFIX}{request.entity_key}"
features = await r.hgetall(key)
if not features:
raise HTTPException(
status_code=404,
detail=f"No features found for entity: {request.entity_key}",
)
return FeatureResponse(
entity_key=request.entity_key,
features=features,
found=True,
)
@app.get("/health")
async def health():
try:
await app.state.redis.ping()
return {"status": "healthy", "redis": "connected"}
except Exception as e:
raise HTTPException(status_code=503, detail=f"Redis unavailable: {str(e)}")
|
Run it with:
1
| uvicorn feature_server:app --host 0.0.0.0 --port 8000
|
Then test it:
1
2
3
| curl -X POST http://localhost:8000/features \
-H "Content-Type: application/json" \
-d '{"entity_key": "user_001"}'
|
The response comes back with all features for that user as a flat dictionary. Your model serving code parses the values back into the types it expects.
Batch Feature Retrieval with Pipelines#
When your model needs to score multiple users at once – a batch prediction job, a ranking request, a recommendation list – fetching features one at a time wastes round-trips. Redis pipelines let you send all requests in a single network call.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
| from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import redis.asyncio as aioredis
import time
FEATURE_PREFIX = "features:user:"
class BatchFeatureRequest(BaseModel):
entity_keys: list[str]
class BatchFeatureResponse(BaseModel):
results: dict[str, dict[str, str]]
missing: list[str]
latency_ms: float
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.redis = aioredis.Redis(
host="localhost", port=6379, db=0, decode_responses=True
)
await app.state.redis.ping()
yield
await app.state.redis.aclose()
app = FastAPI(title="Feature Store API", lifespan=lifespan)
@app.post("/features/batch", response_model=BatchFeatureResponse)
async def get_features_batch(request: BatchFeatureRequest):
r = app.state.redis
start = time.perf_counter()
# Use a pipeline -- one round-trip for all keys
pipe = r.pipeline()
for entity_key in request.entity_keys:
pipe.hgetall(f"{FEATURE_PREFIX}{entity_key}")
raw_results = await pipe.execute()
results = {}
missing = []
for entity_key, features in zip(request.entity_keys, raw_results):
if features:
results[entity_key] = features
else:
missing.append(entity_key)
elapsed_ms = (time.perf_counter() - start) * 1000
return BatchFeatureResponse(
results=results,
missing=missing,
latency_ms=round(elapsed_ms, 3),
)
|
The performance difference is significant. Fetching 100 entities individually takes 100 round-trips – roughly 10-20ms on a local network, much worse across availability zones. With a pipeline, the same 100 entities come back in a single round-trip, typically under 2ms. That is a 5-10x speedup with zero extra complexity.
Test the batch endpoint:
1
2
3
| curl -X POST http://localhost:8000/features/batch \
-H "Content-Type: application/json" \
-d '{"entity_keys": ["user_001", "user_002", "user_003", "user_999"]}'
|
The response includes a missing array so your calling code knows which entities had no features, and a latency_ms field for monitoring.
Common Errors and Fixes#
Connection refused on startup#
1
| redis.exceptions.ConnectionError: Error 111 connecting to localhost:6379. Connection refused.
|
Redis is not running. Start it with redis-server or check your service manager. If you are using Docker, make sure the container is up and the port mapping is correct:
1
| docker run -d --name redis -p 6379:6379 redis:7-alpine
|
Hash values come back as bytes instead of strings#
1
| {b'avg_session_duration': b'12.5', b'total_purchases': b'15'}
|
You forgot to set decode_responses=True when creating the Redis client. Without it, redis-py returns raw bytes. Fix it in your client constructor:
1
| r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
|
Feature TTL expired and keys are missing#
1
| {"detail": "No features found for entity: user_001"}
|
Your batch ingestion pipeline did not run before the TTL expired. Check your scheduler (Airflow, cron, whatever runs the pipeline). You can verify directly in Redis:
1
2
3
4
| redis-cli TTL "features:user:user_001"
# Returns -2 if the key does not exist (expired or never set)
# Returns -1 if the key exists but has no TTL
# Returns a positive integer for remaining seconds
|
Increase your TTL to give yourself more buffer, or add alerting when the ingestion job fails so you catch it before features go stale.