The Plan-and-Execute Pattern#
Planning agents solve complex problems by breaking them down into manageable subtasks. Instead of asking an LLM to do everything in one shot, you decompose the goal, build a dependency graph, and execute tasks in the correct order.
Here’s a minimal planning agent that decomposes a task and executes it:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
| from openai import OpenAI
import json
client = OpenAI()
def decompose_task(goal: str) -> list[dict]:
"""Ask the LLM to break down a goal into subtasks with dependencies."""
response = client.chat.completions.create(
model="gpt-4",
messages=[{
"role": "system",
"content": """Break down the user's goal into concrete subtasks.
Return a JSON array where each task has:
- id: unique identifier (1, 2, 3...)
- description: what to do
- depends_on: list of task IDs that must complete first (empty if none)
Example:
[
{"id": 1, "description": "Research topic", "depends_on": []},
{"id": 2, "description": "Write outline", "depends_on": [1]},
{"id": 3, "description": "Write content", "depends_on": [2]}
]"""
}, {
"role": "user",
"content": goal
}],
response_format={"type": "json_object"}
)
content = json.loads(response.choices[0].message.content)
return content.get("tasks", [])
def execute_task(task: dict) -> str:
"""Execute a single task using the LLM."""
response = client.chat.completions.create(
model="gpt-4",
messages=[{
"role": "user",
"content": f"Complete this task: {task['description']}"
}]
)
return response.choices[0].message.content
def run_planning_agent(goal: str):
"""Main planning loop: decompose, sort by dependencies, execute."""
print(f"Goal: {goal}\n")
# Step 1: Decompose
tasks = decompose_task(goal)
print(f"Plan ({len(tasks)} tasks):")
for task in tasks:
deps = f" (depends on: {task['depends_on']})" if task['depends_on'] else ""
print(f" {task['id']}. {task['description']}{deps}")
print()
# Step 2: Topological sort (simple version - assumes valid DAG)
completed = set()
results = {}
while len(completed) < len(tasks):
for task in tasks:
task_id = task['id']
if task_id in completed:
continue
# Check if dependencies are met
if all(dep in completed for dep in task['depends_on']):
print(f"Executing task {task_id}: {task['description']}")
results[task_id] = execute_task(task)
completed.add(task_id)
print(f" ✓ Completed\n")
return results
# Example usage
if __name__ == "__main__":
goal = "Write a blog post about Python async/await"
results = run_planning_agent(goal)
|
This gives you the core loop: decompose → sort → execute. The LLM handles both planning and execution.
Building a Dependency Graph#
For more complex workflows, you need proper dependency resolution. A topological sort ensures tasks run in the right order and catches circular dependencies early.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
| from collections import defaultdict, deque
class DependencyGraph:
"""Manages task dependencies and provides topological ordering."""
def __init__(self, tasks: list[dict]):
self.tasks = {task['id']: task for task in tasks}
self.graph = defaultdict(list) # task_id -> [dependent_task_ids]
self.in_degree = defaultdict(int) # task_id -> count of dependencies
# Build adjacency list and in-degree counts
for task in tasks:
task_id = task['id']
if task_id not in self.in_degree:
self.in_degree[task_id] = 0
for dep_id in task.get('depends_on', []):
self.graph[dep_id].append(task_id)
self.in_degree[task_id] += 1
def topological_sort(self) -> list[dict]:
"""Return tasks in execution order using Kahn's algorithm."""
queue = deque([task_id for task_id, degree in self.in_degree.items() if degree == 0])
sorted_tasks = []
while queue:
task_id = queue.popleft()
sorted_tasks.append(self.tasks[task_id])
# Reduce in-degree for dependent tasks
for dependent_id in self.graph[task_id]:
self.in_degree[dependent_id] -= 1
if self.in_degree[dependent_id] == 0:
queue.append(dependent_id)
# Check for cycles
if len(sorted_tasks) != len(self.tasks):
raise ValueError("Circular dependency detected in task graph")
return sorted_tasks
def get_ready_tasks(self, completed: set) -> list[dict]:
"""Return tasks that can execute now (all dependencies met)."""
ready = []
for task_id, task in self.tasks.items():
if task_id in completed:
continue
if all(dep in completed for dep in task.get('depends_on', [])):
ready.append(task)
return ready
# Enhanced planning agent with proper dependency resolution
class PlanningAgent:
def __init__(self, model: str = "gpt-4"):
self.client = OpenAI()
self.model = model
self.task_history = []
def decompose(self, goal: str) -> list[dict]:
"""Decompose goal into subtasks with dependencies."""
response = self.client.chat.completions.create(
model=self.model,
messages=[{
"role": "system",
"content": """Break down the goal into 3-7 concrete subtasks.
Return JSON with this structure:
{
"tasks": [
{"id": 1, "description": "task description", "depends_on": []},
...
]
}
Guidelines:
- Keep tasks specific and executable
- Use depends_on to specify which task IDs must complete first
- Avoid circular dependencies
- Number tasks sequentially starting from 1"""
}, {
"role": "user",
"content": goal
}],
response_format={"type": "json_object"}
)
content = json.loads(response.choices[0].message.content)
return content.get("tasks", [])
def execute(self, task: dict, context: dict = None) -> dict:
"""Execute a task and return the result."""
messages = [{
"role": "system",
"content": "You are executing a subtask as part of a larger plan. Be concise and focused."
}]
# Add context from previous tasks
if context:
context_str = "\n".join([f"Task {tid}: {result}" for tid, result in context.items()])
messages.append({
"role": "user",
"content": f"Previous task results:\n{context_str}\n\n"
})
messages.append({
"role": "user",
"content": f"Execute this task: {task['description']}"
})
response = self.client.chat.completions.create(
model=self.model,
messages=messages
)
return {
"task_id": task['id'],
"description": task['description'],
"result": response.choices[0].message.content,
"status": "completed"
}
def replan(self, original_goal: str, completed_tasks: list[dict], failed_task: dict, error: str) -> list[dict]:
"""Generate a new plan when a task fails."""
completed_str = "\n".join([f"✓ {t['description']}" for t in completed_tasks])
response = self.client.chat.completions.create(
model=self.model,
messages=[{
"role": "system",
"content": """The original plan hit a failure. Generate a new plan to recover.
Return the same JSON format with tasks array.
You can:
- Retry the failed task with modifications
- Add new tasks to work around the failure
- Skip the failed task if possible
- Adjust remaining tasks based on what's already done"""
}, {
"role": "user",
"content": f"""Original goal: {original_goal}
Completed tasks:
{completed_str}
Failed task: {failed_task['description']}
Error: {error}
Generate a recovery plan."""
}],
response_format={"type": "json_object"}
)
content = json.loads(response.choices[0].message.content)
return content.get("tasks", [])
def run(self, goal: str, max_retries: int = 2):
"""Main planning loop with failure recovery."""
print(f"Goal: {goal}\n")
tasks = self.decompose(goal)
graph = DependencyGraph(tasks)
execution_order = graph.topological_sort()
print(f"Plan ({len(tasks)} tasks):")
for task in execution_order:
deps = f" ← {task['depends_on']}" if task['depends_on'] else ""
print(f" {task['id']}. {task['description']}{deps}")
print()
completed = set()
results = {}
retry_count = 0
for task in execution_order:
task_id = task['id']
try:
print(f"[{task_id}] {task['description']}")
result = self.execute(task, context=results)
results[task_id] = result['result']
completed.add(task_id)
self.task_history.append(result)
print(f" ✓ Done\n")
except Exception as e:
print(f" ✗ Failed: {e}\n")
if retry_count >= max_retries:
print("Max retries reached. Aborting.")
break
print("Replanning...\n")
retry_count += 1
# Get new plan
completed_tasks = [graph.tasks[tid] for tid in completed]
new_tasks = self.replan(goal, completed_tasks, task, str(e))
# Rebuild graph with new tasks
graph = DependencyGraph(new_tasks)
execution_order = graph.topological_sort()
# Filter out already-completed tasks
execution_order = [t for t in execution_order if t['id'] not in completed]
print(f"New plan ({len(execution_order)} remaining tasks):")
for t in execution_order:
print(f" {t['id']}. {t['description']}")
print()
return self.task_history
# Usage
agent = PlanningAgent()
agent.run("Create a data analysis pipeline for customer churn prediction")
|
The DependencyGraph class handles topological sorting and detects cycles. The PlanningAgent adds replanning on failure.
Replanning Strategies#
When a task fails, you have three options:
- Retry with modifications - Ask the LLM to adjust the task based on the error
- Skip and continue - Mark as failed but keep going with remaining tasks
- Full replan - Generate an entirely new plan from the current state
The replan method uses the full context: what’s done, what failed, and why. This lets the LLM make intelligent recovery decisions.
For critical tasks, implement exponential backoff:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| def execute_with_retry(self, task: dict, context: dict = None, max_attempts: int = 3) -> dict:
"""Execute task with exponential backoff."""
import time
for attempt in range(max_attempts):
try:
return self.execute(task, context)
except Exception as e:
if attempt == max_attempts - 1:
raise
wait_time = 2 ** attempt # 1s, 2s, 4s
print(f" Retry {attempt + 1}/{max_attempts} after {wait_time}s...")
time.sleep(wait_time)
|
Parallel Execution#
If tasks don’t depend on each other, run them in parallel. Use the dependency graph to find tasks ready to execute:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| import asyncio
from openai import AsyncOpenAI
async def execute_parallel(agent: PlanningAgent, goal: str):
"""Execute independent tasks in parallel."""
tasks = agent.decompose(goal)
graph = DependencyGraph(tasks)
completed = set()
results = {}
client = AsyncOpenAI()
while len(completed) < len(tasks):
# Get tasks ready to run
ready = graph.get_ready_tasks(completed)
if not ready:
break # Deadlock or done
# Execute ready tasks in parallel
async def run_task(task):
response = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": task['description']}]
)
return task['id'], response.choices[0].message.content
# Wait for all ready tasks to complete
task_results = await asyncio.gather(*[run_task(t) for t in ready])
for task_id, result in task_results:
results[task_id] = result
completed.add(task_id)
print(f"✓ Completed task {task_id}")
return results
# Run it
asyncio.run(execute_parallel(agent, "Build a web scraper"))
|
This cuts execution time significantly when you have many independent tasks.
Common Errors and Fixes#
Circular dependencies: The LLM sometimes creates invalid dependency chains. Catch this early with topological sort validation. If detected, ask the LLM to regenerate the plan with explicit instructions to avoid cycles.
Over-decomposition: LLMs can break tasks into 20+ tiny subtasks. Add constraints in your prompt: “Break into 3-7 tasks” works better than open-ended decomposition.
Missing context: Later tasks fail because they lack results from earlier ones. Always pass completed task results as context when executing dependent tasks. The context parameter in the execute method handles this.
JSON parsing failures: The LLM sometimes returns invalid JSON. Use OpenAI’s response_format={"type": "json_object"} parameter to enforce valid JSON output. Wrap all JSON parsing in try-except blocks.
Task ID mismatches: If you replan, task IDs might conflict with completed tasks. Use a separate namespace for replanned tasks (e.g., “2a”, “2b” for retries of task 2) or reset the entire graph with new IDs.
Infinite replanning: Set a hard limit on replan attempts (max_retries). After 2-3 replans, escalate to a human or fail gracefully.
When to Use Planning Agents#
Planning agents shine when you have:
- Multi-step workflows with clear dependencies (data pipelines, content creation, code generation)
- Tasks that can fail and need recovery (API calls, file operations, external tool usage)
- Problems where order matters (setup → configure → test → deploy)
Don’t use them for simple queries where a single LLM call would work. The overhead isn’t worth it for straightforward tasks.
The plan-and-execute pattern also works well with ReAct agents. Use planning to determine what to do, and ReAct loops to handle the how.