Showing a blank screen while Claude generates a 2000-token response is a terrible UX. Your users will think the app is frozen. Streaming fixes this – tokens appear as they’re generated, and time-to-first-token drops below a second.

Here’s the fastest way to get streaming working with the Anthropic Python SDK.

Basic Streaming in 10 Lines

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from anthropic import Anthropic

client = Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Explain how garbage collection works in Python."}],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)
print()

The messages.stream() method returns a context manager. Inside it, stream.text_stream yields each text chunk as it arrives. The flush=True is critical – without it, Python buffers stdout and you lose the real-time effect in terminals.

After the stream finishes, you can grab the full message object:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "What is 2+2?"}],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

    # After iteration, the final message is available
    final_message = stream.get_final_message()
    print(f"\nTokens used: {final_message.usage.input_tokens} in, {final_message.usage.output_tokens} out")

This gives you token counts, stop reason, and the full content – all without a second API call.

Understanding Stream Events

If you need more control than text_stream provides, iterate over raw events instead. Each event has a type field telling you what happened.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from anthropic import Anthropic

client = Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Write a haiku about Python."}],
) as stream:
    for event in stream:
        if event.type == "message_start":
            print(f"Model: {event.message.model}")
            print(f"Input tokens: {event.message.usage.input_tokens}")
        elif event.type == "content_block_start":
            print(f"\n--- Content block {event.index} started ---")
        elif event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                print(event.delta.text, end="", flush=True)
        elif event.type == "content_block_stop":
            print(f"\n--- Content block {event.index} stopped ---")
        elif event.type == "message_delta":
            print(f"\nStop reason: {event.delta.stop_reason}")
            print(f"Output tokens: {event.usage.output_tokens}")
        elif event.type == "message_stop":
            print("Stream complete.")

The event flow is always the same:

  1. message_start – contains the message ID, model name, and input token count
  2. content_block_start – marks the beginning of a content block (text or tool use)
  3. content_block_delta – the actual content chunks (text deltas or tool input JSON deltas)
  4. content_block_stop – the block is finished
  5. message_delta – carries the stop reason and output token count
  6. message_stop – the stream is done

You’ll typically see one text content block per response, but tool use responses can have multiple blocks.

Async Streaming

For web servers, you want the async variant. It uses AsyncAnthropic and async for instead of blocking iteration.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import asyncio
from anthropic import AsyncAnthropic

async_client = AsyncAnthropic()

async def stream_response(user_message: str) -> str:
    collected_text = []

    async with async_client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[{"role": "user", "content": user_message}],
    ) as stream:
        async for text in stream.text_stream:
            collected_text.append(text)
            print(text, end="", flush=True)

    return "".join(collected_text)

result = asyncio.run(stream_response("What are Python descriptors?"))

The API surface is identical to the sync version. Same event types, same text_stream property, same get_final_message(). The only differences are async with, async for, and using AsyncAnthropic instead of Anthropic.

Use async streaming whenever you’re inside an async framework like FastAPI, Starlette, or aiohttp. It lets your server handle other requests while waiting for Anthropic’s API to send the next token.

Building a FastAPI SSE Endpoint

This is where streaming gets practical. You build a FastAPI endpoint that streams Claude’s response to a browser using Server-Sent Events.

Install dependencies first:

1
pip install fastapi uvicorn anthropic sse-starlette

Here’s the full server:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from sse_starlette.sse import EventSourceResponse
from anthropic import AsyncAnthropic

app = FastAPI()
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

async_client = AsyncAnthropic()

class ChatRequest(BaseModel):
    message: str
    system: str = "You are a helpful assistant."

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    async def event_generator():
        async with async_client.messages.stream(
            model="claude-sonnet-4-20250514",
            max_tokens=2048,
            system=request.system,
            messages=[{"role": "user", "content": request.message}],
        ) as stream:
            async for text in stream.text_stream:
                yield {"data": text}

            final = stream.get_final_message()
            yield {
                "event": "done",
                "data": f'{{"input_tokens": {final.usage.input_tokens}, "output_tokens": {final.usage.output_tokens}}}',
            }

    return EventSourceResponse(event_generator())

Run it:

1
uvicorn server:app --host 0.0.0.0 --port 8000

On the frontend, consume the stream with the native EventSource API or fetch:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
const response = await fetch("http://localhost:8000/chat/stream", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ message: "Explain async/await in Python" }),
});

const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const chunk = decoder.decode(value, { stream: true });
    // Parse SSE lines
    const lines = chunk.split("\n");
    for (const line of lines) {
        if (line.startsWith("data:")) {
            const text = line.slice(5).trim();
            document.getElementById("output").textContent += text;
        }
    }
}

The sse-starlette library handles the SSE framing (the data: prefixes, newline separators, keep-alive pings). You just yield dictionaries.

Handling Tool Use Events in Streams

When Claude calls a tool, the stream events change. Instead of text_delta content blocks, you get input_json_delta blocks containing incremental JSON for the tool’s input.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import json
from anthropic import Anthropic

client = Anthropic()

tools = [
    {
        "name": "get_weather",
        "description": "Get current weather for a city",
        "input_schema": {
            "type": "object",
            "properties": {
                "city": {"type": "string", "description": "City name"},
            },
            "required": ["city"],
        },
    }
]

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    tools=tools,
    messages=[{"role": "user", "content": "What's the weather in Tokyo?"}],
) as stream:
    current_tool_name = None
    tool_input_json = ""

    for event in stream:
        if event.type == "content_block_start":
            if event.content_block.type == "tool_use":
                current_tool_name = event.content_block.name
                tool_input_json = ""
                print(f"Tool call: {current_tool_name}")
        elif event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                print(event.delta.text, end="", flush=True)
            elif event.delta.type == "input_json_delta":
                tool_input_json += event.delta.partial_json
        elif event.type == "content_block_stop":
            if current_tool_name:
                parsed_input = json.loads(tool_input_json)
                print(f"Tool input: {parsed_input}")
                current_tool_name = None

The tool input arrives in fragments. You accumulate the partial_json strings and parse the full JSON only after content_block_stop. Don’t try to parse intermediate chunks – they’re not valid JSON on their own.

Common Errors and Fixes

anthropic.APIConnectionError: Connection error

Your network can’t reach Anthropic’s API. Check your firewall, proxy settings, or VPN. If you’re behind a corporate proxy:

1
2
3
4
5
6
import httpx
from anthropic import Anthropic

client = Anthropic(
    http_client=httpx.Client(proxy="http://your-proxy:8080")
)

anthropic.AuthenticationError: 401

Your API key is missing or wrong. Make sure ANTHROPIC_API_KEY is set:

1
export ANTHROPIC_API_KEY="sk-ant-api03-..."

The SDK reads this automatically. Don’t hardcode keys in source files.

Stream disconnects mid-response

This usually happens with long responses on flaky connections. Wrap your stream in a retry:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import time
from anthropic import Anthropic, APITimeoutError, APIConnectionError

client = Anthropic()

def stream_with_retry(messages, max_retries=3):
    for attempt in range(max_retries):
        try:
            with client.messages.stream(
                model="claude-sonnet-4-20250514",
                max_tokens=4096,
                messages=messages,
            ) as stream:
                full_text = []
                for text in stream.text_stream:
                    full_text.append(text)
                    print(text, end="", flush=True)
                return "".join(full_text)
        except (APITimeoutError, APIConnectionError) as e:
            if attempt < max_retries - 1:
                wait = 2 ** attempt
                print(f"\nRetrying in {wait}s... ({e})")
                time.sleep(wait)
            else:
                raise

result = stream_with_retry([{"role": "user", "content": "Write a long essay about async programming."}])

TypeError: 'async_generator' object is not iterable

You used for instead of async for with the async client, or called the sync messages.stream() on AsyncAnthropic. Match your client type to your iteration style. AsyncAnthropic needs async with and async for. Anthropic uses regular with and for.

SSE endpoint returns the full response at once instead of streaming

Make sure your ASGI server supports streaming. Gunicorn with Uvicorn workers works. Plain Gunicorn with sync workers does not. Run with:

1
uvicorn server:app --host 0.0.0.0 --port 8000

Don’t use --workers with gunicorn unless you’re using uvicorn.workers.UvicornWorker as the worker class.

anthropic.RateLimitError: 429

You’ve hit the rate limit. The SDK has built-in retry with exponential backoff for 429s, but if you’re doing many concurrent streams, you’ll need to throttle on your side. Use an asyncio.Semaphore:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import asyncio
from anthropic import AsyncAnthropic

async_client = AsyncAnthropic()
semaphore = asyncio.Semaphore(5)  # max 5 concurrent streams

async def limited_stream(message: str):
    async with semaphore:
        async with async_client.messages.stream(
            model="claude-sonnet-4-20250514",
            max_tokens=1024,
            messages=[{"role": "user", "content": message}],
        ) as stream:
            async for text in stream.text_stream:
                print(text, end="", flush=True)