Grepping through thousands of log lines at 2 AM is nobody’s idea of a good time. You chain together grep, awk, and sort commands, scrolling through walls of text trying to figure out why your service fell over. What if you could describe the problem in plain English and let an agent figure out which patterns to search for, cross-reference timestamps, and summarize what actually happened?

That’s what we’re building here: a log analysis agent that uses OpenAI’s function calling to invoke regex-based tools on log data. The LLM decides what to search for, calls the right tool, reads the results, and iterates until it has an answer. You bring the logs, the agent brings the reasoning.

Defining Log Analysis Tools

The agent needs a handful of focused tools. Each one does exactly one thing on the log data and returns a string result. We keep them pure Python with re so there are no external dependencies beyond the OpenAI SDK.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import re
from datetime import datetime


def regex_search(log_text: str, pattern: str, context_lines: int = 0) -> str:
    """Search logs with a regex pattern. Returns matching lines with optional context."""
    lines = log_text.splitlines()
    matches = []
    for i, line in enumerate(lines):
        if re.search(pattern, line, re.IGNORECASE):
            start = max(0, i - context_lines)
            end = min(len(lines), i + context_lines + 1)
            block = lines[start:end]
            matches.append("\n".join(block))
    if not matches:
        return f"No matches found for pattern: {pattern}"
    result = f"Found {len(matches)} matches:\n\n"
    result += "\n---\n".join(matches[:50])  # cap output to avoid token explosion
    return result


def count_by_level(log_text: str) -> str:
    """Count log entries by severity level (ERROR, WARN, INFO, DEBUG)."""
    levels = {"ERROR": 0, "WARN": 0, "INFO": 0, "DEBUG": 0}
    for line in log_text.splitlines():
        for level in levels:
            if f" {level} " in line or f"[{level}]" in line:
                levels[level] += 1
                break
    total = sum(levels.values())
    result = f"Total parsed lines: {total}\n"
    for level, count in levels.items():
        pct = (count / total * 100) if total > 0 else 0
        result += f"  {level}: {count} ({pct:.1f}%)\n"
    return result


def filter_by_timerange(log_text: str, start_time: str, end_time: str) -> str:
    """Filter log lines to a specific time window. Expects HH:MM:SS format."""
    ts_pattern = re.compile(r"(\d{2}:\d{2}:\d{2})")
    filtered = []
    for line in log_text.splitlines():
        match = ts_pattern.search(line)
        if match:
            ts = match.group(1)
            if start_time <= ts <= end_time:
                filtered.append(line)
    if not filtered:
        return f"No log lines found between {start_time} and {end_time}"
    return f"Found {len(filtered)} lines in range:\n\n" + "\n".join(filtered[:100])


def extract_unique_errors(log_text: str) -> str:
    """Extract unique error messages, deduplicated and sorted by frequency."""
    error_lines = [
        line for line in log_text.splitlines()
        if " ERROR " in line or "[ERROR]" in line
    ]
    # Strip timestamps and thread info to group similar errors
    error_msgs = []
    for line in error_lines:
        # Remove leading timestamp/date portion
        cleaned = re.sub(r"^\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}[\.,]\d+\s*", "", line)
        cleaned = re.sub(r"\[.*?\]\s*", "", cleaned)  # remove bracketed prefixes
        error_msgs.append(cleaned.strip())

    from collections import Counter
    counts = Counter(error_msgs)
    result = f"Found {len(error_lines)} total errors, {len(counts)} unique:\n\n"
    for msg, count in counts.most_common(20):
        result += f"  [{count}x] {msg}\n"
    return result

Four tools, each with a clear contract. The regex_search function handles arbitrary patterns with optional context lines. count_by_level gives a quick severity breakdown. filter_by_timerange narrows the window. extract_unique_errors deduplicates error messages and ranks them by frequency — this is usually the first thing you want when diagnosing an incident.

Wiring Tools to the LLM

OpenAI’s tools parameter takes a list of JSON schemas describing each function. The LLM uses these schemas to decide which tool to call and what arguments to pass.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import json
from openai import OpenAI

client = OpenAI()

tools = [
    {
        "type": "function",
        "function": {
            "name": "regex_search",
            "description": "Search log text with a regex pattern. Use this to find specific errors, IPs, endpoints, or any text pattern in the logs.",
            "parameters": {
                "type": "object",
                "properties": {
                    "pattern": {
                        "type": "string",
                        "description": "A Python regex pattern to search for, e.g. 'ERROR.*timeout' or '5\\d{2}\\s'"
                    },
                    "context_lines": {
                        "type": "integer",
                        "description": "Number of lines to show before and after each match. Default 0.",
                        "default": 0
                    }
                },
                "required": ["pattern"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "count_by_level",
            "description": "Count log entries grouped by severity level (ERROR, WARN, INFO, DEBUG). Use for a quick health overview.",
            "parameters": {
                "type": "object",
                "properties": {},
                "required": []
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "filter_by_timerange",
            "description": "Filter log lines to a specific time window. Use when investigating incidents within a known timeframe.",
            "parameters": {
                "type": "object",
                "properties": {
                    "start_time": {
                        "type": "string",
                        "description": "Start time in HH:MM:SS format, e.g. '14:30:00'"
                    },
                    "end_time": {
                        "type": "string",
                        "description": "End time in HH:MM:SS format, e.g. '14:45:00'"
                    }
                },
                "required": ["start_time", "end_time"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "extract_unique_errors",
            "description": "Extract and deduplicate all ERROR-level messages, ranked by frequency. Use to identify the most common failure modes.",
            "parameters": {
                "type": "object",
                "properties": {},
                "required": []
            }
        }
    }
]

# Map tool names to Python functions
TOOL_REGISTRY = {
    "regex_search": regex_search,
    "count_by_level": count_by_level,
    "filter_by_timerange": filter_by_timerange,
    "extract_unique_errors": extract_unique_errors,
}

Notice that count_by_level and extract_unique_errors take no parameters from the LLM — the log text is passed implicitly. The LLM just decides when to call them. The tool descriptions matter a lot here; vague descriptions lead to the model picking the wrong tool or hallucinating arguments.

Building the Agent Loop

The core loop follows the standard ReAct pattern: send messages to the LLM, check if it wants to call a tool, execute it, append the result, and repeat. The loop exits when the model responds with plain text instead of a tool call.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def run_log_agent(log_text: str, question: str, max_turns: int = 10) -> str:
    """Run the log analysis agent on the given log text with a user question."""
    system_prompt = (
        "You are a log analysis agent. You have access to tools that search and "
        "analyze application log data. Use these tools to answer the user's question "
        "about the logs.\n\n"
        "Strategy:\n"
        "1. Start with count_by_level to get an overview if the question is broad.\n"
        "2. Use extract_unique_errors to find the top failure modes.\n"
        "3. Use regex_search to drill into specific patterns.\n"
        "4. Use filter_by_timerange to narrow your investigation window.\n"
        "5. Synthesize your findings into a clear, actionable answer.\n\n"
        "Be specific. Cite exact log lines and counts in your answer."
    )

    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": f"Here are the logs to analyze:\n\n```\n{log_text[:15000]}\n```\n\nQuestion: {question}"},
    ]

    for turn in range(max_turns):
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            tools=tools,
            tool_choice="auto",
        )

        message = response.choices[0].message
        messages.append(message)

        # If no tool calls, the agent is done — return the final answer
        if not message.tool_calls:
            return message.content

        # Execute each tool call and append results
        for tool_call in message.tool_calls:
            fn_name = tool_call.function.name
            fn_args = json.loads(tool_call.function.arguments)

            print(f"[Turn {turn + 1}] Calling {fn_name}({fn_args})")

            func = TOOL_REGISTRY.get(fn_name)
            if func is None:
                result = f"Error: unknown tool '{fn_name}'"
            else:
                # All tools receive log_text as first arg; LLM args come after
                if fn_name == "regex_search":
                    result = func(log_text, fn_args["pattern"], fn_args.get("context_lines", 0))
                elif fn_name == "filter_by_timerange":
                    result = func(log_text, fn_args["start_time"], fn_args["end_time"])
                else:
                    result = func(log_text)

            messages.append({
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": result,
            })

    return "Agent reached maximum turns without a final answer."

A few things to notice. First, we cap the log text at 15,000 characters in the initial message — you don’t want to blow the context window on the first turn. For larger log files, you’d chunk them or only send the log data through tools. Second, the tool_call_id in the tool response message must match the ID from the request. OpenAI uses this to associate results with the correct call. Third, we print each tool invocation so you can watch the agent reason in real time.

Running the Agent on Real Logs

Let’s generate a realistic sample log file and run the agent against it.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import random
from datetime import datetime, timedelta

def generate_sample_logs(num_lines: int = 200) -> str:
    """Generate realistic application log data."""
    services = ["api-gateway", "auth-service", "order-service", "payment-service", "inventory-db"]
    endpoints = ["/api/v1/orders", "/api/v1/users/login", "/api/v1/payments/charge", "/health", "/api/v1/inventory"]
    error_messages = [
        "Connection refused to downstream service",
        "Request timeout after 30000ms",
        "NullPointerException in OrderProcessor.process()",
        "Database connection pool exhausted (max=50)",
        "Invalid JWT token: signature verification failed",
        "Rate limit exceeded for client_id=abc123",
    ]

    lines = []
    base_time = datetime(2026, 2, 15, 14, 0, 0)

    for i in range(num_lines):
        ts = base_time + timedelta(seconds=random.randint(0, 3600))
        ts_str = ts.strftime("%Y-%m-%d %H:%M:%S.") + f"{random.randint(0,999):03d}"
        service = random.choice(services)

        # Skew toward more errors in the 14:20-14:35 window (simulated incident)
        in_incident_window = 1200 <= (ts - base_time).seconds <= 2100
        if in_incident_window:
            level_weights = {"ERROR": 0.4, "WARN": 0.3, "INFO": 0.2, "DEBUG": 0.1}
        else:
            level_weights = {"ERROR": 0.05, "WARN": 0.1, "INFO": 0.7, "DEBUG": 0.15}

        level = random.choices(
            list(level_weights.keys()),
            weights=list(level_weights.values()),
            k=1
        )[0]

        if level == "ERROR":
            msg = random.choice(error_messages)
        elif level == "WARN":
            msg = f"Slow response on {random.choice(endpoints)}: {random.randint(2000,8000)}ms"
        elif level == "INFO":
            status = random.choice([200, 200, 200, 201, 304])
            msg = f"GET {random.choice(endpoints)} {status} {random.randint(10,500)}ms"
        else:
            msg = f"Processing request req-{random.randint(10000,99999)}"

        lines.append(f"{ts_str} [{level}] [{service}] {msg}")

    lines.sort()  # sort by timestamp
    return "\n".join(lines)


# Generate logs and run the agent
log_data = generate_sample_logs(200)

answer = run_log_agent(log_data, "What caused the spike in errors and which services were most affected?")
print("\n=== Agent Answer ===\n")
print(answer)

When you run this, you’ll see the agent’s tool calls printed in order. A typical session looks like:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
[Turn 1] Calling count_by_level({})
[Turn 2] Calling extract_unique_errors({})
[Turn 3] Calling filter_by_timerange({'start_time': '14:20:00', 'end_time': '14:35:00'})
[Turn 4] Calling regex_search({'pattern': 'Connection refused|pool exhausted', 'context_lines': 1})

=== Agent Answer ===

The error spike occurred between 14:20 and 14:35, concentrated in two failure
modes: "Database connection pool exhausted" (12 occurrences) and "Connection
refused to downstream service" (9 occurrences). The payment-service and
order-service were the most affected. The pool exhaustion errors suggest the
database couldn't handle the load, causing cascading connection refusals
in dependent services.

The agent autonomously decided to get an overview first, then drill into errors, narrow the time window, and finally search for the specific patterns it identified. That’s the whole point — you asked a vague question and the agent figured out the investigation strategy.

Common Errors and Fixes

openai.BadRequestError: ... messages with role 'tool' must be a response to a preceding message with 'tool_calls'

This means your message list is out of order. Every tool-role message must immediately follow the assistant message that contained the corresponding tool_calls. Don’t insert other messages between them.

Token limit exceeded on large log files

If your logs are more than ~10K lines, don’t send them in the user message. Instead, keep the log data server-side and only pass it to the tool functions. The LLM never sees the raw logs directly — it only sees tool results. Modify the system prompt to explain that tools access the log data on its behalf.

Regex patterns from the LLM fail to compile

The model sometimes generates invalid regex (unbalanced parentheses, bad escape sequences). Wrap the re.search call in a try/except:

1
2
3
4
5
try:
    if re.search(pattern, line, re.IGNORECASE):
        matches.append(line)
except re.error as e:
    return f"Invalid regex pattern '{pattern}': {e}"

The agent loops without converging

If the agent keeps calling tools without producing a final answer, your max_turns guard will catch it. But if it’s happening consistently, the system prompt probably needs stronger instructions about when to stop. Adding “After 3-4 tool calls, synthesize your findings and respond” helps.

Tool results are too large and eat up context

The 50-match cap in regex_search and the 100-line cap in filter_by_timerange are there for a reason. Without them, a broad pattern can return thousands of lines and burn through your entire context window in one turn. Adjust these limits based on your model’s context size.