A data pipeline agent takes a request like “find all customers who spent more than $1000 last quarter and group by region” and turns it into working pandas code, runs it, and hands you the result. No manual DataFrame wrangling, no writing one-off scripts. The agent generates the code, executes it in a sandbox, retries on failure, and formats the output.

Here’s the full setup. Install the dependencies first:

1
pip install openai pandas
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import json
import pandas as pd
from openai import OpenAI

client = OpenAI()

# Sample data -- replace with your own CSV
data = {
    "customer_id": [101, 102, 103, 104, 105, 106, 107, 108],
    "name": ["Alice", "Bob", "Carol", "Dave", "Eve", "Frank", "Grace", "Hank"],
    "region": ["West", "East", "West", "South", "East", "South", "West", "East"],
    "amount": [1200, 450, 980, 1500, 2100, 300, 1800, 670],
    "quarter": ["Q4", "Q4", "Q4", "Q4", "Q4", "Q3", "Q4", "Q4"],
}
df = pd.DataFrame(data)
print(df.to_string(index=False))

Output:

1
2
3
4
5
6
7
8
9
 customer_id   name region  amount quarter
         101  Alice   West    1200      Q4
         102    Bob   East     450      Q4
         103  Carol   West     980      Q4
         104   Dave  South    1500      Q4
         105    Eve   East    2100      Q4
         106  Frank  South     300      Q3
         107  Grace   West    1800      Q4
         108   Hank   East     670      Q4

Define the Tools

The agent needs two tools: one to generate pandas code from a natural language request, and one to execute that code against the loaded DataFrame. OpenAI’s tools parameter with function calling handles the first part. The second part runs locally.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
tools = [
    {
        "type": "function",
        "function": {
            "name": "run_pandas_code",
            "description": (
                "Execute pandas code against the loaded DataFrame 'df'. "
                "The code must use the variable 'df' and assign the final "
                "result to a variable called 'result'. Only pandas and "
                "standard library operations are allowed."
            ),
            "parameters": {
                "type": "object",
                "properties": {
                    "code": {
                        "type": "string",
                        "description": "Python code using pandas to transform or query df. Must assign output to 'result'.",
                    },
                    "explanation": {
                        "type": "string",
                        "description": "Brief explanation of what this code does.",
                    },
                },
                "required": ["code", "explanation"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "get_dataframe_info",
            "description": "Get schema info about the current DataFrame including columns, dtypes, shape, and sample rows.",
            "parameters": {
                "type": "object",
                "properties": {},
                "required": [],
            },
        },
    },
]

Two tools keep things clean. get_dataframe_info lets the agent inspect the data before writing code, and run_pandas_code executes the generated transformation. The model decides which to call and when.

Execute Code Safely

Running LLM-generated code without restrictions is asking for trouble. Restrict the execution namespace to pandas and the loaded DataFrame. Block imports of anything dangerous and catch exceptions so the agent can retry.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import re

BLOCKED_MODULES = {"os", "sys", "subprocess", "shutil", "pathlib", "importlib", "socket", "http"}

def safe_exec(code: str, df: pd.DataFrame) -> dict:
    """Execute pandas code in a restricted namespace."""
    # Block dangerous imports
    import_matches = re.findall(r"import\s+(\w+)", code)
    from_matches = re.findall(r"from\s+(\w+)", code)
    all_imports = set(import_matches + from_matches)
    blocked = all_imports & BLOCKED_MODULES
    if blocked:
        return {"success": False, "error": f"Blocked imports: {blocked}"}

    namespace = {"df": df.copy(), "pd": pd}
    try:
        exec(code, {"__builtins__": {}}, namespace)
    except Exception as e:
        return {"success": False, "error": f"{type(e).__name__}: {e}"}

    if "result" not in namespace:
        return {"success": False, "error": "Code did not assign to 'result'. Add result = ... at the end."}

    result = namespace["result"]
    if isinstance(result, pd.DataFrame):
        return {"success": True, "data": result.to_string(index=False), "shape": list(result.shape)}
    elif isinstance(result, pd.Series):
        return {"success": True, "data": result.to_string(), "shape": [len(result)]}
    else:
        return {"success": True, "data": str(result), "shape": None}

The df.copy() inside safe_exec prevents the agent from mutating your original DataFrame. The restricted __builtins__ blocks access to open(), eval(), exec(), and other dangerous built-ins. If the code fails, the error message goes back to the model for a retry.

Build the Agent Loop

This is the core. Send the user’s request, let the model pick tools, execute them, feed results back, and repeat until the model responds with a final answer. Cap the iterations to avoid infinite loops.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def handle_tool_call(tool_call, df):
    """Dispatch a tool call and return the result string."""
    name = tool_call.function.name
    args = json.loads(tool_call.function.arguments)

    if name == "get_dataframe_info":
        info = {
            "shape": list(df.shape),
            "columns": list(df.columns),
            "dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()},
            "sample": df.head(3).to_dict(orient="records"),
        }
        return json.dumps(info)

    elif name == "run_pandas_code":
        result = safe_exec(args["code"], df)
        return json.dumps(result)

    return json.dumps({"error": f"Unknown tool: {name}"})


def run_agent(query: str, df: pd.DataFrame, max_turns: int = 6) -> str:
    """Run the data pipeline agent on a natural language query."""
    system_prompt = (
        "You are a data pipeline agent. You have a pandas DataFrame called 'df'. "
        "First inspect the data with get_dataframe_info, then write pandas code "
        "to answer the user's question using run_pandas_code. If code fails, fix "
        "and retry. Always assign the final output to 'result'."
    )
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": query},
    ]

    for turn in range(max_turns):
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            tools=tools,
            tool_choice="auto",
        )
        msg = response.choices[0].message
        messages.append(msg)

        # No tool calls means the model is done
        if not msg.tool_calls:
            return msg.content

        # Process each tool call
        for tool_call in msg.tool_calls:
            result_str = handle_tool_call(tool_call, df)
            messages.append({
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": result_str,
            })

    return "Agent hit the maximum turn limit without producing a final answer."

A few things worth noting. tool_choice="auto" lets the model decide when to call tools and when to respond directly. The loop processes multiple tool calls per turn (the model can call get_dataframe_info and run_pandas_code in the same response). And the max_turns cap prevents runaway loops if the model keeps generating broken code.

Run a Query End-to-End

Put it all together:

1
2
3
query = "Find all customers who spent more than $1000 last quarter and group the total by region"
answer = run_agent(query, df)
print(answer)

The agent will first call get_dataframe_info to see the columns and types, then generate something like:

1
2
3
4
5
6
7
result = (
    df[(df["amount"] > 1000) & (df["quarter"] == "Q4")]
    .groupby("region")["amount"]
    .sum()
    .reset_index()
    .rename(columns={"amount": "total_amount"})
)

And return a formatted answer:

1
2
3
4
region  total_amount
  East          2100
 South          1500
  West          3000

You can swap the sample data for any CSV:

1
2
3
df = pd.read_csv("sales_data.csv")
answer = run_agent("Show me month-over-month revenue growth as percentages", df)
print(answer)

Common Errors and Fixes

KeyError: 'result' from safe_exec

The generated code didn’t assign to result. This happens when the model writes print(...) instead of result = .... The error message goes back to the model and it usually self-corrects on the next turn. If it doesn’t, make the system prompt more explicit:

1
2
# Strengthen the instruction in your system prompt
"IMPORTANT: You MUST assign the final output to a variable called 'result'. Do not use print()."

NameError: name 'np' is not defined

The model generated numpy code but only pandas is in the execution namespace. Fix this by adding numpy to the sandbox:

1
2
3
import numpy as np

namespace = {"df": df.copy(), "pd": pd, "np": np}

TypeError: unhashable type: 'list' during groupby

This surfaces when a column contains lists or nested objects. The fix is to tell the agent about column types in the system prompt, or preprocess the DataFrame to flatten nested fields before handing it to the agent.

Rate limit errors from OpenAI

If the agent does many retries, you can hit rate limits. Add exponential backoff:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import time

for turn in range(max_turns):
    try:
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            tools=tools,
            tool_choice="auto",
        )
    except Exception as e:
        if "rate_limit" in str(e).lower():
            time.sleep(2 ** turn)
            continue
        raise
    # ... rest of the loop

The model generates overly complex code

Sometimes the model writes 20-line transformations when a one-liner would work. Add a constraint to the system prompt: "Prefer simple, readable code. Avoid unnecessary intermediate variables." You can also limit the code length in safe_exec by rejecting anything over a threshold (say 50 lines).