Every turn in a multi-turn LLM conversation resends the entire message history. Turn 1 sends the system prompt. Turn 2 sends the system prompt plus turn 1. Turn 10 sends everything from turns 1 through 9. You’re paying for the same tokens over and over again.
Here’s what a typical multi-turn call looks like without caching:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| from openai import OpenAI
client = OpenAI()
conversation = [
{"role": "system", "content": "You are a helpful coding assistant."},
{"role": "user", "content": "What is a decorator in Python?"},
{"role": "assistant", "content": "A decorator is a function that wraps another function..."},
{"role": "user", "content": "Show me a caching decorator."},
{"role": "assistant", "content": "Here's a simple memoization decorator..."},
# Turn 6: all 5 previous messages get resent and re-tokenized
{"role": "user", "content": "How do I add a TTL to that cache?"},
]
response = client.chat.completions.create(
model="gpt-4o",
messages=conversation,
)
print(response.choices[0].message.content)
|
By turn 10, you’re sending thousands of tokens that the model has already processed in previous requests. The fix is a caching layer that deduplicates these repeated prefixes.
Hash-Based Prompt Cache#
The simplest approach: hash the conversation prefix and cache the response. If the same prefix appears again (same messages in the same order), return the cached completion instead of hitting the API.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
| import hashlib
import json
from openai import OpenAI
client = OpenAI()
class PromptCache:
def __init__(self):
self.cache = {}
self.hits = 0
self.misses = 0
def _hash_messages(self, messages: list[dict]) -> str:
"""Create a deterministic hash of the message list."""
serialized = json.dumps(messages, sort_keys=True, ensure_ascii=False)
return hashlib.sha256(serialized.encode("utf-8")).hexdigest()
def get_completion(self, messages: list[dict], model: str = "gpt-4o") -> str:
cache_key = self._hash_messages(messages)
if cache_key in self.cache:
self.hits += 1
return self.cache[cache_key]
self.misses += 1
response = client.chat.completions.create(
model=model,
messages=messages,
)
content = response.choices[0].message.content
self.cache[cache_key] = content
return content
def hit_rate(self) -> float:
total = self.hits + self.misses
return self.hits / total if total > 0 else 0.0
# Usage
cache = PromptCache()
messages = [
{"role": "system", "content": "You are a helpful coding assistant."},
{"role": "user", "content": "What is a decorator in Python?"},
]
# First call hits the API
answer = cache.get_completion(messages)
print(answer)
# Second call with identical messages returns cached result
answer_again = cache.get_completion(messages)
print(f"Cache hit rate: {cache.hit_rate():.0%}") # 50%
|
This works well when users retry a message or when you restart a conversation with the same opening. But it only catches exact matches. Change a single character and you get a cache miss.
When Exact Matching Falls Short#
The hash-based cache only helps with identical message sequences. In real conversations, each new user message creates a unique prefix. You need a smarter strategy for those cases.
Sliding Window with Cache Checkpoints#
Instead of sending the full history every time, keep only the last N messages and store a compressed summary of everything older. This bounds your token usage while preserving context.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
| import hashlib
import json
from openai import OpenAI
client = OpenAI()
class SlidingWindowCache:
def __init__(self, window_size: int = 6, model: str = "gpt-4o"):
self.window_size = window_size
self.model = model
self.full_history: list[dict] = []
self.summary_cache: dict[str, str] = {}
def _hash_messages(self, messages: list[dict]) -> str:
serialized = json.dumps(messages, sort_keys=True, ensure_ascii=False)
return hashlib.sha256(serialized.encode("utf-8")).hexdigest()
def _summarize(self, messages: list[dict]) -> str:
"""Summarize older messages into a compact context block."""
cache_key = self._hash_messages(messages)
if cache_key in self.summary_cache:
return self.summary_cache[cache_key]
summary_prompt = [
{
"role": "system",
"content": (
"Summarize the following conversation in 2-3 sentences. "
"Preserve key facts, decisions, and code snippets mentioned."
),
},
{
"role": "user",
"content": json.dumps(messages, indent=2),
},
]
response = client.chat.completions.create(
model=self.model,
messages=summary_prompt,
max_tokens=200,
)
summary = response.choices[0].message.content
self.summary_cache[cache_key] = summary
return summary
def add_message(self, role: str, content: str):
self.full_history.append({"role": role, "content": content})
def get_context_messages(self, system_prompt: str) -> list[dict]:
"""Build the message list with summary + recent window."""
messages = [{"role": "system", "content": system_prompt}]
if len(self.full_history) > self.window_size:
older = self.full_history[: -self.window_size]
summary = self._summarize(older)
messages.append({
"role": "system",
"content": f"Summary of earlier conversation:\n{summary}",
})
messages.extend(self.full_history[-self.window_size :])
else:
messages.extend(self.full_history)
return messages
def chat(self, user_message: str, system_prompt: str) -> str:
self.add_message("user", user_message)
messages = self.get_context_messages(system_prompt)
response = client.chat.completions.create(
model=self.model,
messages=messages,
)
assistant_reply = response.choices[0].message.content
self.add_message("assistant", assistant_reply)
return assistant_reply
# Usage
session = SlidingWindowCache(window_size=6)
system = "You are a Python expert."
reply = session.chat("What's the difference between a list and a tuple?", system)
print(reply)
reply = session.chat("When should I use each one?", system)
print(reply)
# After many turns, older messages get summarized automatically
# and the summary itself is cached for reuse
|
The key insight: the summary of turns 1-4 doesn’t change when turn 11 happens. Caching that summary means you only pay to generate it once.
Redis-Backed Cache for Production#
A dictionary cache dies when your process restarts. For production, use Redis with TTL expiration so stale entries clean themselves up.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
| import hashlib
import json
import redis
from openai import OpenAI
client = OpenAI()
redis_client = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
DEFAULT_TTL = 3600 # 1 hour
class RedisPromptCache:
def __init__(self, ttl: int = DEFAULT_TTL, prefix: str = "llm_cache"):
self.ttl = ttl
self.prefix = prefix
def _cache_key(self, messages: list[dict], model: str) -> str:
payload = json.dumps({"messages": messages, "model": model}, sort_keys=True)
digest = hashlib.sha256(payload.encode("utf-8")).hexdigest()
return f"{self.prefix}:{digest}"
def get_completion(
self,
messages: list[dict],
model: str = "gpt-4o",
temperature: float = 0.0,
) -> dict:
"""Return completion with cache metadata."""
key = self._cache_key(messages, model)
cached = redis_client.get(key)
if cached is not None:
data = json.loads(cached)
data["cache_hit"] = True
return data
response = client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
)
result = {
"content": response.choices[0].message.content,
"usage": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens,
},
"cache_hit": False,
}
redis_client.setex(key, self.ttl, json.dumps(result))
return result
def invalidate(self, messages: list[dict], model: str = "gpt-4o"):
"""Remove a specific entry from cache."""
key = self._cache_key(messages, model)
redis_client.delete(key)
def flush_all(self):
"""Clear all cached completions."""
cursor = 0
while True:
cursor, keys = redis_client.scan(cursor, match=f"{self.prefix}:*")
if keys:
redis_client.delete(*keys)
if cursor == 0:
break
# Usage
cache = RedisPromptCache(ttl=1800) # 30-minute TTL
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Explain Python generators in 3 sentences."},
]
result = cache.get_completion(messages, temperature=0.0)
print(result["content"])
print(f"Cache hit: {result['cache_hit']}")
print(f"Tokens used: {result['usage']['total_tokens']}")
|
Set temperature=0.0 when caching. Non-zero temperatures produce different outputs for the same input, so caching them gives stale responses that miss the variety you asked for.
TTL Strategy#
Short TTLs (5-15 minutes) work for conversational assistants where context goes stale fast. Longer TTLs (1-24 hours) suit FAQ-style queries where answers don’t change often. Match the TTL to how quickly your data changes.
Measuring Cache Hit Rates#
You need numbers to know if caching is actually saving money. Wrap your cache with metrics tracking.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
| import time
from dataclasses import dataclass, field
@dataclass
class CacheMetrics:
hits: int = 0
misses: int = 0
total_tokens_saved: int = 0
total_latency_saved_ms: float = 0.0
request_latencies: list[float] = field(default_factory=list)
def record_hit(self, tokens_saved: int, latency_saved_ms: float):
self.hits += 1
self.total_tokens_saved += tokens_saved
self.total_latency_saved_ms += latency_saved_ms
def record_miss(self, latency_ms: float):
self.misses += 1
self.request_latencies.append(latency_ms)
@property
def hit_rate(self) -> float:
total = self.hits + self.misses
return self.hits / total if total > 0 else 0.0
@property
def avg_api_latency_ms(self) -> float:
if not self.request_latencies:
return 0.0
return sum(self.request_latencies) / len(self.request_latencies)
@property
def estimated_cost_saved(self) -> float:
"""Estimate savings at GPT-4o input pricing ($2.50 per 1M tokens)."""
return (self.total_tokens_saved / 1_000_000) * 2.50
def report(self) -> str:
return (
f"Cache Performance:\n"
f" Hit rate: {self.hit_rate:.1%}\n"
f" Hits: {self.hits}, Misses: {self.misses}\n"
f" Tokens saved: {self.total_tokens_saved:,}\n"
f" Estimated cost saved: ${self.estimated_cost_saved:.4f}\n"
f" Latency saved: {self.total_latency_saved_ms:.0f}ms\n"
f" Avg API latency: {self.avg_api_latency_ms:.0f}ms"
)
# Integration with the cache
metrics = CacheMetrics()
# Simulating a cache hit
metrics.record_hit(tokens_saved=1500, latency_saved_ms=800.0)
metrics.record_hit(tokens_saved=1200, latency_saved_ms=750.0)
# Simulating cache misses
metrics.record_miss(latency_ms=850.0)
metrics.record_miss(latency_ms=920.0)
metrics.record_miss(latency_ms=780.0)
print(metrics.report())
|
Output:
1
2
3
4
5
6
7
| Cache Performance:
Hit rate: 40.0%
Hits: 2, Misses: 3
Tokens saved: 2,700
Estimated cost saved: $0.0068
Latency saved: 1550ms
Avg API latency: 850ms
|
A 40-60% hit rate is typical for multi-turn sessions where users often rephrase or revisit earlier topics. Even a 20% hit rate pays for itself on high-volume workloads.
Common Errors and Fixes#
Redis connection refused#
1
| redis.exceptions.ConnectionError: Error 111 connecting to localhost:6379. Connection refused.
|
Redis isn’t running. Start it:
1
2
3
4
5
6
7
8
| # Ubuntu/Debian
sudo systemctl start redis-server
# macOS with Homebrew
brew services start redis
# Docker
docker run -d --name redis -p 6379:6379 redis:7-alpine
|
JSON serialization fails on message content#
1
| TypeError: Object of type bytes is not JSON serializable
|
This happens when message content includes raw bytes (e.g., from file reads or image data). Decode bytes to strings before caching:
1
2
3
4
| # Fix: ensure all message content is a string
for msg in messages:
if isinstance(msg["content"], bytes):
msg["content"] = msg["content"].decode("utf-8")
|
Cache returns stale results after prompt changes#
1
| # No error — just wrong answers after you update the system prompt
|
Your system prompt changed but the old cached responses still use the previous instructions. The fix is to include the system prompt in the cache key (which the implementations above already do) and invalidate stale entries:
1
2
3
4
5
6
| # If you changed your system prompt, flush relevant cache entries
cache.flush_all()
# Or better: version your system prompts
system_prompt = "v2: You are a Python expert who always includes type hints."
# The version prefix changes the hash, so old cache entries won't match
|
Hash collisions on very long conversations#
This is extremely unlikely with SHA-256 but if you’re paranoid, store the original messages alongside the cached response and verify on retrieval:
1
2
3
4
5
6
7
8
9
10
11
| # Store both hash and original input for verification
cached_entry = {
"messages_json": json.dumps(messages, sort_keys=True),
"response": content,
}
redis_client.setex(key, self.ttl, json.dumps(cached_entry))
# On retrieval, verify the messages match
cached = json.loads(redis_client.get(key))
if cached["messages_json"] == json.dumps(messages, sort_keys=True):
return cached["response"]
|
Pick the caching strategy that matches your scale. Dict-based caches work fine for single-process scripts. Redis is the right call once you have multiple workers or need persistence across restarts. The sliding window approach gives you the best cost savings for long conversations where the early turns rarely change.