A three-step prompt chain that takes 4 seconds per call costs you 12 seconds sequentially. If two of those steps are independent, you can run them concurrently and cut total time to 8 seconds. Add batching for fan-out patterns and you’re looking at 10x throughput improvements on real workloads.

Python’s asyncio pairs perfectly with the OpenAI async client. You get concurrent API calls with clean syntax, proper error handling, and rate limit control through semaphores.

Async LLM Calls with asyncio

The OpenAI SDK ships with AsyncOpenAI, a drop-in async version of the standard client. It returns the same response objects but works with await and asyncio.gather.

Here’s the basic pattern for running multiple prompts concurrently:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def complete(prompt: str) -> str:
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        max_tokens=256,
    )
    return response.choices[0].message.content

async def main():
    prompts = [
        "Explain gradient descent in two sentences.",
        "What is backpropagation? Keep it brief.",
        "Define a learning rate in machine learning.",
    ]

    # Run all three calls concurrently
    results = await asyncio.gather(*[complete(p) for p in prompts])

    for prompt, result in zip(prompts, results):
        print(f"Q: {prompt}")
        print(f"A: {result}\n")

asyncio.run(main())

Three API calls fire at the same time instead of one after another. Wall-clock time equals the slowest single call, not the sum of all three.

asyncio.gather takes any number of coroutines and returns their results in the same order you passed them. If one call fails, the whole gather raises that exception by default. Pass return_exceptions=True to get the exception object in the results list instead.

Building Async Prompt Chains

Real pipelines have dependencies. Step 2 needs step 1’s output. But often step 2 fans out into multiple independent branches that can run concurrently, then a final step merges everything back.

Here’s a concrete example: generate an article outline, expand each section concurrently, then combine the results.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def call_llm(system: str, user: str, max_tokens: int = 512) -> str:
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system},
            {"role": "user", "content": user},
        ],
        max_tokens=max_tokens,
    )
    return response.choices[0].message.content

async def generate_article(topic: str) -> str:
    # Step 1: Generate outline (must complete first)
    outline = await call_llm(
        system="You are a technical writer. Return a numbered list of 3 section titles.",
        user=f"Create an outline for a short article about: {topic}",
        max_tokens=128,
    )
    print(f"Outline:\n{outline}\n")

    # Parse sections from the outline
    sections = [
        line.strip().lstrip("0123456789.)")
        for line in outline.strip().split("\n")
        if line.strip()
    ]

    # Step 2: Expand each section concurrently
    expand_tasks = [
        call_llm(
            system="Write one detailed paragraph for the given section of a technical article.",
            user=f"Article topic: {topic}\nSection title: {section}\nWrite the section content.",
            max_tokens=256,
        )
        for section in sections
    ]
    expanded = await asyncio.gather(*expand_tasks)

    # Step 3: Combine into final article
    body = "\n\n".join(
        f"## {title}\n\n{content}"
        for title, content in zip(sections, expanded)
    )

    final = await call_llm(
        system="You are an editor. Write a two-sentence intro for this article.",
        user=f"Topic: {topic}\n\nArticle body:\n{body}",
        max_tokens=128,
    )

    return f"{final}\n\n{body}"

async def main():
    article = await generate_article("async programming in Python")
    print(article)

asyncio.run(main())

The execution flow looks like this:

  1. Sequential: Generate outline (one call, must finish first)
  2. Concurrent: Expand all three sections at the same time via asyncio.gather
  3. Sequential: Write the intro (depends on all sections being done)

Total API calls: 5. Total round-trips: 3. You saved two full round-trip times compared to running everything sequentially.

This pattern works for any fan-out/fan-in workflow. Code review across multiple files, translating to several languages at once, generating test cases for different edge cases – anywhere you have independent branches.

Batch Processing with Rate Limiting

When you’re processing hundreds of prompts, you’ll hit rate limits fast. A semaphore caps how many requests fly concurrently.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import asyncio
from openai import AsyncOpenAI, RateLimitError

client = AsyncOpenAI()

async def process_prompt(
    semaphore: asyncio.Semaphore,
    prompt: str,
    index: int,
    max_retries: int = 3,
) -> dict:
    for attempt in range(max_retries):
        async with semaphore:
            try:
                response = await client.chat.completions.create(
                    model="gpt-4o-mini",
                    messages=[{"role": "user", "content": prompt}],
                    max_tokens=256,
                )
                return {
                    "index": index,
                    "prompt": prompt,
                    "result": response.choices[0].message.content,
                    "error": None,
                }
            except RateLimitError:
                wait_time = 2 ** attempt
                print(f"Rate limited on prompt {index}, retrying in {wait_time}s...")
                await asyncio.sleep(wait_time)
            except Exception as e:
                return {
                    "index": index,
                    "prompt": prompt,
                    "result": None,
                    "error": str(e),
                }

    return {
        "index": index,
        "prompt": prompt,
        "result": None,
        "error": "Max retries exceeded due to rate limiting",
    }

async def batch_process(prompts: list[str], max_concurrent: int = 10) -> list[dict]:
    semaphore = asyncio.Semaphore(max_concurrent)
    tasks = [
        process_prompt(semaphore, prompt, i)
        for i, prompt in enumerate(prompts)
    ]
    return await asyncio.gather(*tasks)

async def main():
    # Generate 50 prompts as an example batch
    prompts = [
        f"Give a one-sentence definition of: concept #{i}"
        for i in range(50)
    ]

    results = await batch_process(prompts, max_concurrent=10)

    succeeded = sum(1 for r in results if r["error"] is None)
    failed = sum(1 for r in results if r["error"] is not None)
    print(f"Completed: {succeeded} succeeded, {failed} failed")

    # Print first 3 results
    for r in results[:3]:
        print(f"  [{r['index']}] {r['result'][:80]}...")

asyncio.run(main())

The asyncio.Semaphore(10) ensures at most 10 requests run at the same time. The async with semaphore block acquires a slot before making the API call and releases it when done – even if the call throws an exception.

Set max_concurrent based on your rate limit tier. OpenAI’s Tier 1 allows 500 RPM for gpt-4o-mini, so 10 concurrent requests is conservative. Tier 3+ users can push to 50 or higher.

The exponential backoff on RateLimitError (2s, 4s, 8s) handles burst throttling. For production workloads, you’d pair this with a token bucket or leaky bucket rate limiter for tighter control.

Common Errors and Fixes

Rate limit errors (HTTP 429)

1
openai.RateLimitError: Error code: 429 - Rate limit reached for gpt-4o-mini

Lower your max_concurrent value or add longer backoff delays. Check your usage tier at platform.openai.com/account/limits.

Timeout errors

1
httpx.ReadTimeout: timed out

The default timeout is 10 minutes, but individual calls can hang on overloaded endpoints. Set an explicit timeout on the client:

1
client = AsyncOpenAI(timeout=30.0)  # 30-second timeout per request

You can also wrap calls with asyncio.wait_for:

1
result = await asyncio.wait_for(complete("Your prompt"), timeout=30.0)

Event loop already running (Jupyter notebooks)

1
RuntimeError: asyncio.run() cannot be called from a running event loop

Jupyter already runs an event loop. Use await directly in a cell instead of asyncio.run():

1
2
# In a Jupyter cell, just await directly
results = await batch_process(prompts, max_concurrent=5)

Or install nest_asyncio as a workaround:

1
2
3
import nest_asyncio
nest_asyncio.apply()
asyncio.run(main())  # Now works inside Jupyter

Gather swallowing exceptions silently

By default, asyncio.gather raises the first exception and cancels remaining tasks. If you want all tasks to finish regardless of individual failures:

1
2
3
4
5
6
7
results = await asyncio.gather(*tasks, return_exceptions=True)

for r in results:
    if isinstance(r, Exception):
        print(f"Task failed: {r}")
    else:
        print(f"Success: {r[:50]}...")

This returns exception objects in the results list instead of raising them, so you can handle failures per-task.