The Quick Version#
Apache Kafka is a distributed event streaming platform. For ML, it solves a specific problem: getting fresh data to your models continuously instead of running batch jobs. You produce events (user actions, sensor readings, transactions) into Kafka topics, then consume them for real-time feature computation, online inference, or training data collection.
1
| pip install confluent-kafka
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| from confluent_kafka import Producer, Consumer
import json
# Produce events
producer = Producer({"bootstrap.servers": "localhost:9092"})
event = {
"user_id": "u_12345",
"action": "purchase",
"amount": 49.99,
"timestamp": "2026-02-15T10:30:00Z",
"item_category": "electronics",
}
producer.produce("user-events", key="u_12345", value=json.dumps(event))
producer.flush()
# Consume events
consumer = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "ml-feature-pipeline",
"auto.offset.reset": "latest",
})
consumer.subscribe(["user-events"])
msg = consumer.poll(timeout=5.0)
if msg and not msg.error():
event = json.loads(msg.value())
print(f"User {event['user_id']} made a {event['action']}: ${event['amount']}")
|
That’s the basic pattern. Producers push data in, consumers pull it out. Everything in between is Kafka handling ordering, durability, and distribution.
Real-Time Feature Computation#
The most common ML use case for Kafka is computing features from live data. Instead of running a nightly SQL query to calculate “purchases in the last 24 hours,” you maintain a running count that updates with each event.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
| from confluent_kafka import Consumer
from collections import defaultdict
import json
import time
import redis
# Redis as the feature store for online serving
r = redis.Redis(host="localhost", port=6379)
# Sliding window counters
class FeatureComputer:
def __init__(self):
self.windows = defaultdict(list) # user_id -> [(timestamp, amount)]
def update(self, event: dict):
user_id = event["user_id"]
ts = time.time()
amount = event.get("amount", 0)
self.windows[user_id].append((ts, amount))
# Prune events older than 24 hours
cutoff = ts - 86400
self.windows[user_id] = [
(t, a) for t, a in self.windows[user_id] if t > cutoff
]
# Compute features
recent = self.windows[user_id]
features = {
"purchase_count_24h": len(recent),
"total_spend_24h": sum(a for _, a in recent),
"avg_purchase_amount": sum(a for _, a in recent) / max(len(recent), 1),
"last_purchase_ts": ts,
}
# Push to feature store
r.hset(f"features:{user_id}", mapping={
k: str(v) for k, v in features.items()
})
r.expire(f"features:{user_id}", 172800) # TTL: 48 hours
return features
computer = FeatureComputer()
consumer = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "feature-compute",
"auto.offset.reset": "latest",
})
consumer.subscribe(["user-events"])
print("Computing features from stream...")
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(f"Error: {msg.error()}")
continue
event = json.loads(msg.value())
features = computer.update(event)
print(f"Updated features for {event['user_id']}: {features}")
|
Your inference service reads these features from Redis at prediction time. The features are always fresh — within seconds of the latest event, not hours behind like batch-computed features.
Online Inference Pipeline#
For real-time predictions (fraud detection, recommendations, content ranking), consume events from Kafka, compute features, run inference, and produce results back to another topic.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
| import torch
from confluent_kafka import Consumer, Producer
import json
# Load your model
model = torch.jit.load("fraud_detector.pt")
model.eval()
consumer = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "fraud-detection",
"auto.offset.reset": "latest",
})
consumer.subscribe(["transactions"])
producer = Producer({"bootstrap.servers": "localhost:9092"})
def predict(event: dict) -> dict:
"""Run fraud detection on a transaction."""
features = torch.tensor([
event["amount"],
event["hour_of_day"],
event["is_international"],
event["merchant_risk_score"],
]).unsqueeze(0)
with torch.no_grad():
score = model(features).item()
return {
"transaction_id": event["transaction_id"],
"fraud_score": round(score, 4),
"is_fraud": score > 0.85,
}
print("Fraud detection pipeline running...")
while True:
msg = consumer.poll(timeout=1.0)
if msg is None or msg.error():
continue
event = json.loads(msg.value())
result = predict(event)
# Produce prediction to results topic
producer.produce(
"fraud-predictions",
key=event["transaction_id"],
value=json.dumps(result),
)
producer.poll(0)
if result["is_fraud"]:
print(f"FRAUD ALERT: {result['transaction_id']} (score: {result['fraud_score']})")
|
Downstream services consume from fraud-predictions to block transactions, send alerts, or update dashboards. The entire pipeline runs in under 100ms per event.
Training Data Collection#
Kafka is also excellent for collecting labeled training data continuously. Log predictions alongside outcomes, then replay the topic to build training datasets.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| from confluent_kafka import Consumer
import json
import pyarrow as pa
import pyarrow.parquet as pq
consumer = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "training-data-collector",
"auto.offset.reset": "earliest", # replay from beginning
})
consumer.subscribe(["labeled-events"])
batch = []
batch_size = 10000
file_count = 0
print("Collecting training data...")
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
continue
record = json.loads(msg.value())
batch.append(record)
if len(batch) >= batch_size:
# Write batch to Parquet
table = pa.Table.from_pylist(batch)
pq.write_table(table, f"training_data/batch_{file_count:05d}.parquet")
print(f"Wrote batch {file_count}: {len(batch)} records")
file_count += 1
batch = []
|
Set auto.offset.reset to earliest and you replay the entire history. Kafka retains messages based on your retention policy (default 7 days, configurable up to forever). This means you can always regenerate training datasets by replaying topics with new feature logic.
Running Kafka Locally for Development#
You don’t need a production Kafka cluster to develop. Use Docker Compose for a single-node setup:
1
2
3
4
5
6
7
8
| # docker-compose.yaml
services:
kafka:
image: confluentinc/confluent-local:latest
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
|
That gives you a Kafka broker with KRaft (no ZooKeeper needed) at localhost:9092. Create topics with the Kafka CLI or let them auto-create on first produce.
Common Errors and Fixes#
KafkaException: No brokers available
The bootstrap server is unreachable. Check that Kafka is running and the hostname/port are correct. For Docker, make sure the advertised listener matches what your code connects to.
Consumer doesn’t receive messages
Check auto.offset.reset. If set to latest (default), you only see messages produced after the consumer starts. Use earliest to read from the beginning. Also verify the group.id — a new group starts from the reset position, but an existing group resumes from its last committed offset.
Messages arrive out of order
Kafka guarantees ordering within a partition, not across partitions. Use the same key for events that must stay ordered (e.g., use user_id as the key so all events for a user go to the same partition).
Consumer falls behind (lag grows)
Your consumer is slower than the producer. Options: increase partitions and add more consumer instances (up to one per partition), batch process messages instead of one at a time, or optimize your processing logic.
Memory issues with large messages
Kafka’s default max message size is 1MB. For large payloads (images, embeddings), store the data in object storage (S3) and put only a reference URL in the Kafka message.