Multi-agent systems split complex work across specialized agents, each with isolated context and targeted tools. An orchestrator agent handles planning and delegation; subagents do the actual work and report back summaries. The Model Context Protocol (MCP) is the standard way to give those subagents access to external tools without hard-coding integrations into every agent.

This guide builds a complete pipeline: an MCP tool server, an orchestrator that plans and delegates, and two specialized subagents that run in parallel and return aggregated results.

The Architecture

The setup has three layers:

  1. MCP tool server — exposes domain tools (web search, data lookup, file I/O) over HTTP
  2. Subagents — each calls Claude with a narrow scope, uses the MCP tools they need, returns a structured result
  3. Orchestrator — breaks the task into subtasks, invokes subagents (sequentially or in parallel), and synthesizes the final answer

This pattern is directly from Anthropic’s guidance on subagents: they enable parallelization and they manage context, because each subagent uses its own isolated context window and only the summarized result travels back to the orchestrator.

Build the MCP Tool Server

Install the dependencies first:

1
pip install anthropic fastmcp httpx

The MCP server exposes tools as Python functions. fastmcp handles the protocol, transport, and schema generation automatically.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# mcp_server.py
import random
import httpx
from fastmcp import FastMCP

mcp = FastMCP(name="research-tools")


@mcp.tool
def web_search(query: str, max_results: int = 5) -> list[dict]:
    """Search the web and return a list of result snippets.

    Args:
        query: The search query string.
        max_results: Maximum number of results to return (1-10).

    Returns:
        List of dicts with 'title', 'url', and 'snippet' keys.
    """
    # In production, integrate a real search API (Brave, Serper, etc.)
    # This stub returns realistic-looking placeholder results for demonstration.
    return [
        {
            "title": f"Result {i + 1} for: {query}",
            "url": f"https://example.com/result-{i + 1}",
            "snippet": f"Relevant content about {query} from source {i + 1}.",
        }
        for i in range(min(max_results, 10))
    ]


@mcp.tool
def fetch_page(url: str) -> str:
    """Fetch the text content of a web page.

    Args:
        url: The full URL to fetch.

    Returns:
        The page content as plain text, truncated to 10,000 characters.
    """
    try:
        response = httpx.get(url, timeout=10, follow_redirects=True)
        response.raise_for_status()
        # In production, use a proper HTML-to-text library like trafilatura
        return response.text[:10_000]
    except httpx.HTTPError as e:
        return f"Error fetching {url}: {e}"


@mcp.tool
def summarize_findings(findings: list[str], topic: str) -> str:
    """Aggregate and deduplicate a list of research findings.

    Args:
        findings: List of raw finding strings.
        topic: The research topic for context.

    Returns:
        A deduplicated, sorted summary string.
    """
    unique = sorted(set(f.strip() for f in findings if f.strip()))
    joined = "\n- ".join(unique)
    return f"Findings on '{topic}':\n- {joined}"


if __name__ == "__main__":
    mcp.run(transport="http", port=8000)

Run the server in a separate terminal:

1
python mcp_server.py

The server listens at http://localhost:8000/mcp/. For the Anthropic API to reach it during development, you need a public URL — use ngrok:

1
2
ngrok http 8000
# Copy the https URL it prints, e.g. https://abc123.ngrok.io

Build the Subagents

Each subagent is a function that calls Claude with a focused system prompt and the MCP server attached. The subagent returns a plain-text summary — not the full conversation history — so the orchestrator’s context stays clean.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# agents.py
import os
from anthropic import Anthropic

client = Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])

MCP_SERVER_URL = os.environ.get("MCP_SERVER_URL", "http://localhost:8000")
MCP_BETA_HEADER = "mcp-client-2025-04-04"


def run_research_agent(topic: str, depth: str = "overview") -> str:
    """
    Subagent 1: Research agent.
    Searches the web and returns a structured summary of findings.

    Args:
        topic: The research topic.
        depth: 'overview' for a broad summary, 'deep' for detailed analysis.

    Returns:
        A text summary of research findings.
    """
    depth_instruction = (
        "Give a broad overview with 3-5 key points."
        if depth == "overview"
        else "Go deep: cover technical details, edge cases, and recent developments."
    )

    response = client.beta.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=2048,
        system=(
            "You are a research specialist. Your job is to search the web, "
            "gather relevant information, and return a structured summary. "
            "Always use the web_search tool to find current information. "
            "Cite sources by URL. Be concise and factual."
        ),
        messages=[
            {
                "role": "user",
                "content": f"Research the following topic: {topic}\n\n{depth_instruction}",
            }
        ],
        mcp_servers=[
            {
                "type": "url",
                "url": f"{MCP_SERVER_URL}/mcp/",
                "name": "research-tools",
            }
        ],
        extra_headers={"anthropic-beta": MCP_BETA_HEADER},
    )

    return response.content[-1].text


def run_analysis_agent(data: str, question: str) -> str:
    """
    Subagent 2: Analysis agent.
    Takes raw research data and answers a specific analytical question.

    Args:
        data: Raw research content to analyze.
        question: The specific question to answer.

    Returns:
        A structured analytical response.
    """
    response = client.beta.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=2048,
        system=(
            "You are a data analyst. You receive raw research and answer specific "
            "questions with structured, evidence-based responses. "
            "Format your output as: "
            "ANSWER: <direct answer> | CONFIDENCE: <high/medium/low> | "
            "KEY_EVIDENCE: <bullet list of supporting evidence>"
        ),
        messages=[
            {
                "role": "user",
                "content": (
                    f"Research data:\n{data}\n\n"
                    f"Analytical question: {question}"
                ),
            }
        ],
        mcp_servers=[
            {
                "type": "url",
                "url": f"{MCP_SERVER_URL}/mcp/",
                "name": "research-tools",
            }
        ],
        extra_headers={"anthropic-beta": MCP_BETA_HEADER},
    )

    return response.content[-1].text

Build the Orchestrator

The orchestrator breaks the task into subtasks, runs subagents (in parallel using asyncio when tasks are independent), and synthesizes results.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# orchestrator.py
import asyncio
import os
import time
from anthropic import Anthropic
from agents import run_research_agent, run_analysis_agent

client = Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])


def plan_subtasks(main_task: str) -> list[dict]:
    """
    Use Claude to decompose the main task into a list of subtasks.
    Returns a list of dicts with 'type', 'topic', and 'question' keys.
    """
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        system=(
            "You are a task planner. Decompose the given task into 2-4 subtasks. "
            "Output valid JSON only — a list of objects with keys: "
            "'type' (either 'research' or 'analysis'), "
            "'topic' (for research tasks), "
            "'question' (for analysis tasks). "
            "No explanation, no markdown, just the JSON array."
        ),
        messages=[
            {
                "role": "user",
                "content": f"Decompose this task into subtasks:\n\n{main_task}",
            }
        ],
    )

    import json
    raw = response.content[0].text.strip()
    # Strip markdown code fences if the model adds them despite instructions
    if raw.startswith("```"):
        raw = "\n".join(raw.split("\n")[1:-1])
    return json.loads(raw)


async def run_agent_async(agent_fn, *args) -> tuple[str, float]:
    """Run a synchronous agent function in a thread pool, return (result, elapsed)."""
    loop = asyncio.get_event_loop()
    start = time.monotonic()
    result = await loop.run_in_executor(None, agent_fn, *args)
    elapsed = time.monotonic() - start
    return result, elapsed


async def run_pipeline(main_task: str) -> str:
    """
    Full orchestration pipeline:
    1. Plan subtasks
    2. Run research subtasks in parallel
    3. Feed research results to analysis subtasks
    4. Synthesize final answer
    """
    print(f"[orchestrator] Planning subtasks for: {main_task[:80]}...")
    subtasks = plan_subtasks(main_task)
    print(f"[orchestrator] {len(subtasks)} subtasks planned.")

    # Separate research and analysis subtasks
    research_tasks = [t for t in subtasks if t.get("type") == "research"]
    analysis_tasks = [t for t in subtasks if t.get("type") == "analysis"]

    # Run all research tasks in parallel
    research_results = {}
    if research_tasks:
        print(f"[orchestrator] Running {len(research_tasks)} research agents in parallel...")
        coros = [
            run_agent_async(run_research_agent, task["topic"])
            for task in research_tasks
        ]
        raw_results = await asyncio.gather(*coros, return_exceptions=True)

        for task, (result, elapsed) in zip(research_tasks, raw_results):
            if isinstance(result, Exception):
                research_results[task["topic"]] = f"[ERROR] {result}"
                print(f"  [research:{task['topic']}] FAILED: {result}")
            else:
                research_results[task["topic"]] = result
                print(f"  [research:{task['topic']}] Done in {elapsed:.1f}s")

    # Run analysis tasks sequentially (they may depend on research results)
    analysis_results = []
    combined_research = "\n\n---\n\n".join(
        f"Topic: {topic}\n{content}"
        for topic, content in research_results.items()
    )

    for task in analysis_tasks:
        print(f"[orchestrator] Running analysis agent: {task.get('question', '')[:60]}...")
        try:
            result, elapsed = await run_agent_async(
                run_analysis_agent,
                combined_research,
                task["question"],
            )
            analysis_results.append(result)
            print(f"  [analysis] Done in {elapsed:.1f}s")
        except Exception as e:
            analysis_results.append(f"[ERROR] Analysis failed: {e}")
            print(f"  [analysis] FAILED: {e}")

    # Synthesize
    print("[orchestrator] Synthesizing final answer...")
    all_results = list(research_results.values()) + analysis_results
    synthesis_input = "\n\n===\n\n".join(all_results)

    final = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=2048,
        system=(
            "You are a senior analyst. You receive outputs from multiple specialized agents. "
            "Synthesize them into a single, coherent, well-structured final answer. "
            "Remove redundancy. Highlight the most important insights. "
            "Use clear headings."
        ),
        messages=[
            {
                "role": "user",
                "content": (
                    f"Original task: {main_task}\n\n"
                    f"Agent outputs:\n\n{synthesis_input}\n\n"
                    "Write the final synthesized answer."
                ),
            }
        ],
    )

    return final.content[0].text


def run(task: str) -> str:
    return asyncio.run(run_pipeline(task))


if __name__ == "__main__":
    task = (
        "What are the current state-of-the-art approaches to reducing hallucinations "
        "in large language models, and which techniques are most practical to deploy "
        "in production API pipelines today?"
    )
    result = run(task)
    print("\n" + "=" * 60)
    print("FINAL ANSWER")
    print("=" * 60)
    print(result)

Run it end to end:

1
2
3
4
5
6
7
8
# Terminal 1: MCP server
MCP_SERVER_URL=http://localhost:8000 python mcp_server.py

# Terminal 2: ngrok (if testing with remote Claude API)
ngrok http 8000

# Terminal 3: run the pipeline
MCP_SERVER_URL=https://abc123.ngrok.io python orchestrator.py

Error Handling and Monitoring

The pipeline uses return_exceptions=True in asyncio.gather so one failing subagent doesn’t abort the whole run. Failed agents return error strings that flow into the synthesis step, where the orchestrator can acknowledge partial data.

For production, add structured logging and a retry wrapper:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import time
import logging
from functools import wraps

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("agent-pipeline")


def with_retry(max_attempts: int = 3, backoff_base: float = 2.0):
    """Decorator that retries on Anthropic API errors with exponential backoff."""
    def decorator(fn):
        @wraps(fn)
        def wrapper(*args, **kwargs):
            last_exc = None
            for attempt in range(1, max_attempts + 1):
                try:
                    return fn(*args, **kwargs)
                except Exception as e:
                    last_exc = e
                    if attempt < max_attempts:
                        wait = backoff_base ** attempt
                        logger.warning(
                            f"{fn.__name__} attempt {attempt} failed: {e}. "
                            f"Retrying in {wait:.0f}s..."
                        )
                        time.sleep(wait)
                    else:
                        logger.error(f"{fn.__name__} failed after {max_attempts} attempts: {e}")
            raise last_exc
        return wrapper
    return decorator


# Usage on agent functions:
@with_retry(max_attempts=3)
def run_research_agent_with_retry(topic: str) -> str:
    return run_research_agent(topic)

Track token usage across the pipeline by accumulating response.usage from each agent call:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from dataclasses import dataclass, field

@dataclass
class PipelineUsage:
    input_tokens: int = 0
    output_tokens: int = 0
    calls: int = 0

    def add(self, usage) -> None:
        self.input_tokens += usage.input_tokens
        self.output_tokens += usage.output_tokens
        self.calls += 1

    def cost_estimate(self, input_rate: float = 3.0, output_rate: float = 15.0) -> float:
        """Estimate cost in USD. Rates in $/MTok."""
        return (
            (self.input_tokens / 1_000_000) * input_rate
            + (self.output_tokens / 1_000_000) * output_rate
        )

    def report(self) -> str:
        return (
            f"Pipeline usage: {self.calls} calls, "
            f"{self.input_tokens:,} input tokens, "
            f"{self.output_tokens:,} output tokens, "
            f"~${self.cost_estimate():.4f}"
        )

Common Pitfalls

The MCP server isn’t reachable by the API. The Anthropic API calls your MCP server over HTTPS from Anthropic’s servers. localhost won’t work — you need ngrok, a cloud deployment, or a VPN-accessible URL.

Subagents ignore MCP tools and answer from training data. This happens when the system prompt doesn’t explicitly instruct the model to use tools. Add “Always use the [tool_name] tool to get current information” to the system prompt. Don’t assume the model will reach for tools unprompted.

The orchestrator’s context fills up with raw agent output. Pass only summaries back to the orchestrator, not the full subagent conversation. The synthesis prompt should receive dense, structured text, not 10K-token verbose responses. Size your subagent max_tokens to match what you want to pipe upstream.

Parallel agents hit rate limits. asyncio.gather fires all tasks simultaneously. If you have 10 parallel research agents, that’s 10 concurrent API calls. Add a semaphore to cap concurrency:

1
2
3
4
5
semaphore = asyncio.Semaphore(4)  # max 4 concurrent agent calls

async def run_agent_gated(agent_fn, *args):
    async with semaphore:
        return await run_agent_async(agent_fn, *args)