Google’s Agent-to-Agent (A2A) protocol gives you a standard wire format for agents to discover each other, exchange tasks, and stream results – regardless of what framework each agent runs on. Instead of gluing agents together with custom REST endpoints and ad-hoc message schemas, A2A defines an open protocol with JSON-RPC transport, agent cards for capability discovery, and a task lifecycle that handles everything from submission to streaming partial results.

This guide walks you through building a multi-agent orchestrator in Python where a coordinator agent discovers specialist agents via their agent cards, delegates subtasks over A2A, and streams progress back to the caller using Server-Sent Events (SSE).

Install the SDK

The official a2a-sdk package from Google requires Python 3.10+. Install it with the HTTP server extras so you get Starlette and Uvicorn support out of the box:

1
pip install "a2a-sdk[http-server]"

This pulls in the core types (AgentCard, AgentSkill, AgentCapabilities), the server framework (A2AStarletteApplication, DefaultRequestHandler), the client (A2AClient, A2ACardResolver), and utility functions for building messages and artifacts.

Agent Cards: Declaring What Your Agent Can Do

Every A2A agent publishes an Agent Card – a JSON document that describes its name, version, skills, supported input/output modes, and endpoint URL. Clients fetch this card from a well-known path (/.well-known/agent.json) before sending any tasks.

Here is a card for a research agent that can summarize topics:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from a2a.types import AgentCard, AgentSkill, AgentCapabilities

research_skill = AgentSkill(
    id="research_summarize",
    name="Research and Summarize",
    description="Searches the web and returns a concise summary of a given topic.",
    tags=["research", "summarization", "web search"],
    examples=["summarize recent advances in protein folding"],
)

research_agent_card = AgentCard(
    name="Research Agent",
    description="Summarizes topics by searching and synthesizing web sources.",
    url="http://localhost:8001/",
    version="1.0.0",
    defaultInputModes=["text"],
    defaultOutputModes=["text"],
    capabilities=AgentCapabilities(streaming=True),
    skills=[research_skill],
)

The capabilities field tells clients this agent supports streaming responses. The skills list is how the orchestrator decides which agent to route a given subtask to – match the task description against skill tags and descriptions.

Implement a Specialist Agent

Each specialist agent needs an AgentExecutor subclass. This is where you put the actual logic – call an LLM, run a tool, query a database. The executor receives a RequestContext with the incoming message and an EventQueue where you push status updates and final results.

Here is a full research agent that calls OpenAI to generate a summary:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import asyncio
from openai import OpenAI
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import InvalidParamsError, TaskState, UnsupportedOperationError
from a2a.utils import new_agent_text_message
from a2a.utils.errors import ServerError

openai_client = OpenAI()


class ResearchAgentExecutor(AgentExecutor):
    """Summarizes a topic using OpenAI."""

    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue,
    ) -> None:
        # Extract the user's query from the incoming message
        user_message = context.get_user_message()
        if not user_message or not user_message.parts:
            raise ServerError(error=InvalidParamsError(message="Empty message"))

        query = user_message.parts[0].root.text
        updater = TaskUpdater(event_queue, context.task_id, context.context_id)

        # Signal that work has started
        await updater.start_work()

        # Call OpenAI in a thread so we don't block the event loop
        response = await asyncio.to_thread(
            lambda: openai_client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[
                    {"role": "system", "content": "You are a research assistant. Provide a concise, factual summary."},
                    {"role": "user", "content": query},
                ],
            )
        )

        summary = response.choices[0].message.content

        # Publish the result and mark the task as complete
        await updater.add_artifact(
            parts=[{"type": "text", "text": summary}],
            last_chunk=True,
        )
        await updater.complete()

    async def cancel(
        self, context: RequestContext, event_queue: EventQueue
    ) -> None:
        raise ServerError(error=UnsupportedOperationError())

The TaskUpdater is a helper that wraps EventQueue and handles the boilerplate of creating TaskStatusUpdateEvent and TaskArtifactUpdateEvent objects. You call start_work() to signal the “working” state, add_artifact() to push results, and complete() to mark the task as done.

Run the Agent as an A2A Server

Wrap the executor in a Starlette app and serve it with Uvicorn:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import uvicorn
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore

request_handler = DefaultRequestHandler(
    agent_executor=ResearchAgentExecutor(),
    task_store=InMemoryTaskStore(),
)

server = A2AStarletteApplication(
    agent_card=research_agent_card,
    http_handler=request_handler,
)

uvicorn.run(server.build(), host="0.0.0.0", port=8001)

The DefaultRequestHandler manages the JSON-RPC layer, routes message/send and message/stream calls to your executor, and serves the agent card at /.well-known/agent.json. The InMemoryTaskStore tracks task state so clients can poll for status on long-running work.

You can spin up a second agent (say a code-review agent on port 8002) by writing another executor and card with different skills.

Build the Orchestrator Client

The orchestrator is itself an agent that discovers other agents and delegates work to them. Here is the core delegation function:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import httpx
from uuid import uuid4
from a2a.client import A2AClient, A2ACardResolver
from a2a.types import (
    MessageSendParams,
    SendMessageRequest,
    SendStreamingMessageRequest,
)


async def discover_agent(base_url: str) -> A2AClient:
    """Fetch an agent card and return a ready-to-use client."""
    async with httpx.AsyncClient() as http:
        resolver = A2ACardResolver(http, base_url)
        card = await resolver.get_agent_card()
        return A2AClient(httpx_client=http, agent_card=card)


async def delegate_task(client: A2AClient, query: str) -> str:
    """Send a task to a remote agent and return the text result."""
    payload = {
        "message": {
            "role": "user",
            "parts": [{"type": "text", "text": query}],
            "messageId": uuid4().hex,
        },
    }
    request = SendMessageRequest(
        id=str(uuid4()),
        params=MessageSendParams(**payload),
    )
    response = await client.send_message(request)
    # Extract text from the first artifact
    result = response.model_dump(mode="json", exclude_none=True)
    return result

The A2ACardResolver hits /.well-known/agent.json on the target agent, parses the card, and gives you a typed AgentCard object. Then A2AClient uses that card’s URL to send tasks via JSON-RPC.

Streaming Updates with SSE

For long-running tasks, you want partial progress instead of waiting for the final answer. Switch from send_message to send_message_streaming and iterate over the SSE chunks:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
async def delegate_task_streaming(client: A2AClient, query: str):
    """Send a task and stream back status updates and artifacts."""
    payload = {
        "message": {
            "role": "user",
            "parts": [{"type": "text", "text": query}],
            "messageId": uuid4().hex,
        },
    }
    request = SendStreamingMessageRequest(
        id=str(uuid4()),
        params=MessageSendParams(**payload),
    )

    async for event in client.send_message_streaming(request):
        chunk = event.model_dump(mode="json", exclude_none=True)
        # Each chunk is either a status update or an artifact chunk
        print(chunk)

On the server side, the agent executor pushes events to the EventQueue as they happen. The DefaultRequestHandler serializes these into SSE frames over the HTTP response. Status updates arrive as TaskStatusUpdateEvent (with state transitions like working or input-required), and results arrive as TaskArtifactUpdateEvent objects that can be chunked for large payloads.

Multi-Agent Orchestration

Now put it all together. The orchestrator maintains a registry of specialist agents, matches incoming requests to the right agent based on skill tags, and fans out subtasks:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import asyncio
import httpx
from uuid import uuid4
from a2a.client import A2AClient, A2ACardResolver
from a2a.types import MessageSendParams, SendMessageRequest


AGENT_REGISTRY = {
    "research": "http://localhost:8001",
    "code_review": "http://localhost:8002",
    "data_analysis": "http://localhost:8003",
}


async def resolve_agents(
    registry: dict[str, str],
) -> dict[str, tuple[A2AClient, list[str]]]:
    """Discover all agents and index them by skill tags."""
    agents = {}
    async with httpx.AsyncClient() as http:
        for name, url in registry.items():
            resolver = A2ACardResolver(http, url)
            card = await resolver.get_agent_card()
            client = A2AClient(httpx_client=http, agent_card=card)
            tags = []
            for skill in card.skills:
                tags.extend(skill.tags)
            agents[name] = (client, tags)
    return agents


async def orchestrate(user_request: str, agents: dict):
    """Route a user request to the best-matching specialist agent."""
    # Simple keyword matching -- in production, use an LLM to pick the agent
    best_agent = None
    best_score = 0
    request_words = set(user_request.lower().split())

    for name, (client, tags) in agents.items():
        tag_set = set(t.lower() for t in tags)
        overlap = len(request_words & tag_set)
        if overlap > best_score:
            best_score = overlap
            best_agent = (name, client)

    if not best_agent:
        # Default to the first agent
        name, (client, _) = next(iter(agents.items()))
        best_agent = (name, client)

    agent_name, client = best_agent
    print(f"Routing to: {agent_name}")

    payload = {
        "message": {
            "role": "user",
            "parts": [{"type": "text", "text": user_request}],
            "messageId": uuid4().hex,
        },
    }
    request = SendMessageRequest(
        id=str(uuid4()),
        params=MessageSendParams(**payload),
    )
    response = await client.send_message(request)
    return response.model_dump(mode="json", exclude_none=True)


async def main():
    agents = await resolve_agents(AGENT_REGISTRY)
    result = await orchestrate("summarize recent advances in protein folding", agents)
    print(result)

if __name__ == "__main__":
    asyncio.run(main())

This is a minimal orchestrator. A production version would use an LLM to decompose complex requests into multiple subtasks, fan them out to different agents in parallel with asyncio.gather, and merge the results before returning to the caller.

Task Lifecycle

Every task in A2A goes through a state machine:

  • submitted – the client sent the request, the server acknowledged it
  • working – the agent executor is processing the task
  • input-required – the agent needs clarification from the user (enables multi-turn)
  • completed – the agent produced a final artifact
  • failed – something went wrong
  • canceled – the client or server aborted the task

Your executor controls these transitions through the TaskUpdater:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# Signal you need more info from the user
await updater.requires_input()

# Mark a task as failed with an error message
await updater.failed()

# Mark as complete with a final artifact
await updater.add_artifact(
    parts=[{"type": "text", "text": "Final answer here."}],
    last_chunk=True,
)
await updater.complete()

Clients can also cancel a running task, which triggers the cancel() method on your executor.

Common Errors and Fixes

“Connection refused” when the client calls an agent The agent server is not running or is on the wrong port. Double-check the url field in your AgentCard matches the host/port you passed to uvicorn.run(). Also verify no firewall rules are blocking the port.

“Agent card not found” (404 on /.well-known/agent.json) You are hitting the wrong base URL. The A2AStarletteApplication serves the card automatically, but only if you use server.build() to create the ASGI app. If you wrote a custom Starlette app, you need to mount the well-known route yourself.

InvalidParamsError when sending a message The message payload is malformed. Every message needs a role (either "user" or "agent"), a non-empty parts array, and a unique messageId. Check that you are wrapping text in the correct structure:

1
2
3
4
5
# Correct structure
{"type": "text", "text": "your query here"}

# Wrong -- missing 'type' field
{"text": "your query here"}

Streaming hangs and never completes Your executor is not calling updater.complete() (or updater.failed()) after the last artifact. The SSE stream stays open until a terminal state event arrives. Always pair add_artifact(last_chunk=True) with a complete() or failed() call.

UnsupportedOperationError on cancel The default cancel implementation raises this error. If you need cancellation support, override the cancel() method in your executor and implement actual cleanup logic (like canceling an in-flight LLM call).

Import errors after installing a2a-sdk Make sure you installed the extras: pip install "a2a-sdk[http-server]". The base package does not include Starlette, Uvicorn, or the HTTP server components. If you need gRPC support, add that extra too: pip install "a2a-sdk[http-server,grpc]".