Showing a blank screen while Claude generates a 2000-token response is a terrible UX. Your users will think the app is frozen. Streaming fixes this – tokens appear as they’re generated, and time-to-first-token drops below a second.
Here’s the fastest way to get streaming working with the Anthropic Python SDK.
Basic Streaming in 10 Lines#
1
2
3
4
5
6
7
8
9
10
11
12
| from anthropic import Anthropic
client = Anthropic()
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Explain how garbage collection works in Python."}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
print()
|
The messages.stream() method returns a context manager. Inside it, stream.text_stream yields each text chunk as it arrives. The flush=True is critical – without it, Python buffers stdout and you lose the real-time effect in terminals.
After the stream finishes, you can grab the full message object:
1
2
3
4
5
6
7
8
9
10
11
| with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "What is 2+2?"}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
# After iteration, the final message is available
final_message = stream.get_final_message()
print(f"\nTokens used: {final_message.usage.input_tokens} in, {final_message.usage.output_tokens} out")
|
This gives you token counts, stop reason, and the full content – all without a second API call.
Understanding Stream Events#
If you need more control than text_stream provides, iterate over raw events instead. Each event has a type field telling you what happened.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| from anthropic import Anthropic
client = Anthropic()
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a haiku about Python."}],
) as stream:
for event in stream:
if event.type == "message_start":
print(f"Model: {event.message.model}")
print(f"Input tokens: {event.message.usage.input_tokens}")
elif event.type == "content_block_start":
print(f"\n--- Content block {event.index} started ---")
elif event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.type == "content_block_stop":
print(f"\n--- Content block {event.index} stopped ---")
elif event.type == "message_delta":
print(f"\nStop reason: {event.delta.stop_reason}")
print(f"Output tokens: {event.usage.output_tokens}")
elif event.type == "message_stop":
print("Stream complete.")
|
The event flow is always the same:
message_start – contains the message ID, model name, and input token countcontent_block_start – marks the beginning of a content block (text or tool use)content_block_delta – the actual content chunks (text deltas or tool input JSON deltas)content_block_stop – the block is finishedmessage_delta – carries the stop reason and output token countmessage_stop – the stream is done
You’ll typically see one text content block per response, but tool use responses can have multiple blocks.
Async Streaming#
For web servers, you want the async variant. It uses AsyncAnthropic and async for instead of blocking iteration.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| import asyncio
from anthropic import AsyncAnthropic
async_client = AsyncAnthropic()
async def stream_response(user_message: str) -> str:
collected_text = []
async with async_client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": user_message}],
) as stream:
async for text in stream.text_stream:
collected_text.append(text)
print(text, end="", flush=True)
return "".join(collected_text)
result = asyncio.run(stream_response("What are Python descriptors?"))
|
The API surface is identical to the sync version. Same event types, same text_stream property, same get_final_message(). The only differences are async with, async for, and using AsyncAnthropic instead of Anthropic.
Use async streaming whenever you’re inside an async framework like FastAPI, Starlette, or aiohttp. It lets your server handle other requests while waiting for Anthropic’s API to send the next token.
Building a FastAPI SSE Endpoint#
This is where streaming gets practical. You build a FastAPI endpoint that streams Claude’s response to a browser using Server-Sent Events.
Install dependencies first:
1
| pip install fastapi uvicorn anthropic sse-starlette
|
Here’s the full server:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from sse_starlette.sse import EventSourceResponse
from anthropic import AsyncAnthropic
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
async_client = AsyncAnthropic()
class ChatRequest(BaseModel):
message: str
system: str = "You are a helpful assistant."
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
async def event_generator():
async with async_client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=2048,
system=request.system,
messages=[{"role": "user", "content": request.message}],
) as stream:
async for text in stream.text_stream:
yield {"data": text}
final = stream.get_final_message()
yield {
"event": "done",
"data": f'{{"input_tokens": {final.usage.input_tokens}, "output_tokens": {final.usage.output_tokens}}}',
}
return EventSourceResponse(event_generator())
|
Run it:
1
| uvicorn server:app --host 0.0.0.0 --port 8000
|
On the frontend, consume the stream with the native EventSource API or fetch:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| const response = await fetch("http://localhost:8000/chat/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message: "Explain async/await in Python" }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value, { stream: true });
// Parse SSE lines
const lines = chunk.split("\n");
for (const line of lines) {
if (line.startsWith("data:")) {
const text = line.slice(5).trim();
document.getElementById("output").textContent += text;
}
}
}
|
The sse-starlette library handles the SSE framing (the data: prefixes, newline separators, keep-alive pings). You just yield dictionaries.
When Claude calls a tool, the stream events change. Instead of text_delta content blocks, you get input_json_delta blocks containing incremental JSON for the tool’s input.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
| import json
from anthropic import Anthropic
client = Anthropic()
tools = [
{
"name": "get_weather",
"description": "Get current weather for a city",
"input_schema": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "City name"},
},
"required": ["city"],
},
}
]
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
tools=tools,
messages=[{"role": "user", "content": "What's the weather in Tokyo?"}],
) as stream:
current_tool_name = None
tool_input_json = ""
for event in stream:
if event.type == "content_block_start":
if event.content_block.type == "tool_use":
current_tool_name = event.content_block.name
tool_input_json = ""
print(f"Tool call: {current_tool_name}")
elif event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.delta.type == "input_json_delta":
tool_input_json += event.delta.partial_json
elif event.type == "content_block_stop":
if current_tool_name:
parsed_input = json.loads(tool_input_json)
print(f"Tool input: {parsed_input}")
current_tool_name = None
|
The tool input arrives in fragments. You accumulate the partial_json strings and parse the full JSON only after content_block_stop. Don’t try to parse intermediate chunks – they’re not valid JSON on their own.
Common Errors and Fixes#
anthropic.APIConnectionError: Connection error
Your network can’t reach Anthropic’s API. Check your firewall, proxy settings, or VPN. If you’re behind a corporate proxy:
1
2
3
4
5
6
| import httpx
from anthropic import Anthropic
client = Anthropic(
http_client=httpx.Client(proxy="http://your-proxy:8080")
)
|
anthropic.AuthenticationError: 401
Your API key is missing or wrong. Make sure ANTHROPIC_API_KEY is set:
1
| export ANTHROPIC_API_KEY="sk-ant-api03-..."
|
The SDK reads this automatically. Don’t hardcode keys in source files.
Stream disconnects mid-response
This usually happens with long responses on flaky connections. Wrap your stream in a retry:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| import time
from anthropic import Anthropic, APITimeoutError, APIConnectionError
client = Anthropic()
def stream_with_retry(messages, max_retries=3):
for attempt in range(max_retries):
try:
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=4096,
messages=messages,
) as stream:
full_text = []
for text in stream.text_stream:
full_text.append(text)
print(text, end="", flush=True)
return "".join(full_text)
except (APITimeoutError, APIConnectionError) as e:
if attempt < max_retries - 1:
wait = 2 ** attempt
print(f"\nRetrying in {wait}s... ({e})")
time.sleep(wait)
else:
raise
result = stream_with_retry([{"role": "user", "content": "Write a long essay about async programming."}])
|
TypeError: 'async_generator' object is not iterable
You used for instead of async for with the async client, or called the sync messages.stream() on AsyncAnthropic. Match your client type to your iteration style. AsyncAnthropic needs async with and async for. Anthropic uses regular with and for.
SSE endpoint returns the full response at once instead of streaming
Make sure your ASGI server supports streaming. Gunicorn with Uvicorn workers works. Plain Gunicorn with sync workers does not. Run with:
1
| uvicorn server:app --host 0.0.0.0 --port 8000
|
Don’t use --workers with gunicorn unless you’re using uvicorn.workers.UvicornWorker as the worker class.
anthropic.RateLimitError: 429
You’ve hit the rate limit. The SDK has built-in retry with exponential backoff for 429s, but if you’re doing many concurrent streams, you’ll need to throttle on your side. Use an asyncio.Semaphore:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| import asyncio
from anthropic import AsyncAnthropic
async_client = AsyncAnthropic()
semaphore = asyncio.Semaphore(5) # max 5 concurrent streams
async def limited_stream(message: str):
async with semaphore:
async with async_client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": message}],
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
|