A data pipeline agent takes a request like “find all customers who spent more than $1000 last quarter and group by region” and turns it into working pandas code, runs it, and hands you the result. No manual DataFrame wrangling, no writing one-off scripts. The agent generates the code, executes it in a sandbox, retries on failure, and formats the output.
Here’s the full setup. Install the dependencies first:
1
| pip install openai pandas
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| import json
import pandas as pd
from openai import OpenAI
client = OpenAI()
# Sample data -- replace with your own CSV
data = {
"customer_id": [101, 102, 103, 104, 105, 106, 107, 108],
"name": ["Alice", "Bob", "Carol", "Dave", "Eve", "Frank", "Grace", "Hank"],
"region": ["West", "East", "West", "South", "East", "South", "West", "East"],
"amount": [1200, 450, 980, 1500, 2100, 300, 1800, 670],
"quarter": ["Q4", "Q4", "Q4", "Q4", "Q4", "Q3", "Q4", "Q4"],
}
df = pd.DataFrame(data)
print(df.to_string(index=False))
|
Output:
1
2
3
4
5
6
7
8
9
| customer_id name region amount quarter
101 Alice West 1200 Q4
102 Bob East 450 Q4
103 Carol West 980 Q4
104 Dave South 1500 Q4
105 Eve East 2100 Q4
106 Frank South 300 Q3
107 Grace West 1800 Q4
108 Hank East 670 Q4
|
The agent needs two tools: one to generate pandas code from a natural language request, and one to execute that code against the loaded DataFrame. OpenAI’s tools parameter with function calling handles the first part. The second part runs locally.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| tools = [
{
"type": "function",
"function": {
"name": "run_pandas_code",
"description": (
"Execute pandas code against the loaded DataFrame 'df'. "
"The code must use the variable 'df' and assign the final "
"result to a variable called 'result'. Only pandas and "
"standard library operations are allowed."
),
"parameters": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "Python code using pandas to transform or query df. Must assign output to 'result'.",
},
"explanation": {
"type": "string",
"description": "Brief explanation of what this code does.",
},
},
"required": ["code", "explanation"],
},
},
},
{
"type": "function",
"function": {
"name": "get_dataframe_info",
"description": "Get schema info about the current DataFrame including columns, dtypes, shape, and sample rows.",
"parameters": {
"type": "object",
"properties": {},
"required": [],
},
},
},
]
|
Two tools keep things clean. get_dataframe_info lets the agent inspect the data before writing code, and run_pandas_code executes the generated transformation. The model decides which to call and when.
Execute Code Safely#
Running LLM-generated code without restrictions is asking for trouble. Restrict the execution namespace to pandas and the loaded DataFrame. Block imports of anything dangerous and catch exceptions so the agent can retry.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| import re
BLOCKED_MODULES = {"os", "sys", "subprocess", "shutil", "pathlib", "importlib", "socket", "http"}
def safe_exec(code: str, df: pd.DataFrame) -> dict:
"""Execute pandas code in a restricted namespace."""
# Block dangerous imports
import_matches = re.findall(r"import\s+(\w+)", code)
from_matches = re.findall(r"from\s+(\w+)", code)
all_imports = set(import_matches + from_matches)
blocked = all_imports & BLOCKED_MODULES
if blocked:
return {"success": False, "error": f"Blocked imports: {blocked}"}
namespace = {"df": df.copy(), "pd": pd}
try:
exec(code, {"__builtins__": {}}, namespace)
except Exception as e:
return {"success": False, "error": f"{type(e).__name__}: {e}"}
if "result" not in namespace:
return {"success": False, "error": "Code did not assign to 'result'. Add result = ... at the end."}
result = namespace["result"]
if isinstance(result, pd.DataFrame):
return {"success": True, "data": result.to_string(index=False), "shape": list(result.shape)}
elif isinstance(result, pd.Series):
return {"success": True, "data": result.to_string(), "shape": [len(result)]}
else:
return {"success": True, "data": str(result), "shape": None}
|
The df.copy() inside safe_exec prevents the agent from mutating your original DataFrame. The restricted __builtins__ blocks access to open(), eval(), exec(), and other dangerous built-ins. If the code fails, the error message goes back to the model for a retry.
Build the Agent Loop#
This is the core. Send the user’s request, let the model pick tools, execute them, feed results back, and repeat until the model responds with a final answer. Cap the iterations to avoid infinite loops.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
| def handle_tool_call(tool_call, df):
"""Dispatch a tool call and return the result string."""
name = tool_call.function.name
args = json.loads(tool_call.function.arguments)
if name == "get_dataframe_info":
info = {
"shape": list(df.shape),
"columns": list(df.columns),
"dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()},
"sample": df.head(3).to_dict(orient="records"),
}
return json.dumps(info)
elif name == "run_pandas_code":
result = safe_exec(args["code"], df)
return json.dumps(result)
return json.dumps({"error": f"Unknown tool: {name}"})
def run_agent(query: str, df: pd.DataFrame, max_turns: int = 6) -> str:
"""Run the data pipeline agent on a natural language query."""
system_prompt = (
"You are a data pipeline agent. You have a pandas DataFrame called 'df'. "
"First inspect the data with get_dataframe_info, then write pandas code "
"to answer the user's question using run_pandas_code. If code fails, fix "
"and retry. Always assign the final output to 'result'."
)
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": query},
]
for turn in range(max_turns):
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=tools,
tool_choice="auto",
)
msg = response.choices[0].message
messages.append(msg)
# No tool calls means the model is done
if not msg.tool_calls:
return msg.content
# Process each tool call
for tool_call in msg.tool_calls:
result_str = handle_tool_call(tool_call, df)
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result_str,
})
return "Agent hit the maximum turn limit without producing a final answer."
|
A few things worth noting. tool_choice="auto" lets the model decide when to call tools and when to respond directly. The loop processes multiple tool calls per turn (the model can call get_dataframe_info and run_pandas_code in the same response). And the max_turns cap prevents runaway loops if the model keeps generating broken code.
Run a Query End-to-End#
Put it all together:
1
2
3
| query = "Find all customers who spent more than $1000 last quarter and group the total by region"
answer = run_agent(query, df)
print(answer)
|
The agent will first call get_dataframe_info to see the columns and types, then generate something like:
1
2
3
4
5
6
7
| result = (
df[(df["amount"] > 1000) & (df["quarter"] == "Q4")]
.groupby("region")["amount"]
.sum()
.reset_index()
.rename(columns={"amount": "total_amount"})
)
|
And return a formatted answer:
1
2
3
4
| region total_amount
East 2100
South 1500
West 3000
|
You can swap the sample data for any CSV:
1
2
3
| df = pd.read_csv("sales_data.csv")
answer = run_agent("Show me month-over-month revenue growth as percentages", df)
print(answer)
|
Common Errors and Fixes#
KeyError: 'result' from safe_exec
The generated code didn’t assign to result. This happens when the model writes print(...) instead of result = .... The error message goes back to the model and it usually self-corrects on the next turn. If it doesn’t, make the system prompt more explicit:
1
2
| # Strengthen the instruction in your system prompt
"IMPORTANT: You MUST assign the final output to a variable called 'result'. Do not use print()."
|
NameError: name 'np' is not defined
The model generated numpy code but only pandas is in the execution namespace. Fix this by adding numpy to the sandbox:
1
2
3
| import numpy as np
namespace = {"df": df.copy(), "pd": pd, "np": np}
|
TypeError: unhashable type: 'list' during groupby
This surfaces when a column contains lists or nested objects. The fix is to tell the agent about column types in the system prompt, or preprocess the DataFrame to flatten nested fields before handing it to the agent.
Rate limit errors from OpenAI
If the agent does many retries, you can hit rate limits. Add exponential backoff:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| import time
for turn in range(max_turns):
try:
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=tools,
tool_choice="auto",
)
except Exception as e:
if "rate_limit" in str(e).lower():
time.sleep(2 ** turn)
continue
raise
# ... rest of the loop
|
The model generates overly complex code
Sometimes the model writes 20-line transformations when a one-liner would work. Add a constraint to the system prompt: "Prefer simple, readable code. Avoid unnecessary intermediate variables." You can also limit the code length in safe_exec by rejecting anything over a threshold (say 50 lines).