Cron expressions are one of those things every developer has written and nobody enjoys reading. 0 9 * * 1-5 means “every weekday at 9am” but you had to think about it for a second. Now imagine an agent that takes “run the database backup every weekday at 9am” and does the right thing – parses the schedule, registers the job, monitors execution, and tells you when something breaks.

Here’s the minimal setup to get a scheduled task running with APScheduler:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime

scheduler = BackgroundScheduler()
scheduler.start()

def db_backup():
    print(f"[{datetime.now()}] Running database backup...")

# Every weekday at 9:00 AM
scheduler.add_job(
    db_backup,
    trigger=CronTrigger(day_of_week="mon-fri", hour=9, minute=0),
    id="db_backup",
    name="Database Backup",
    replace_existing=True,
)

print("Scheduled jobs:", [job.id for job in scheduler.get_jobs()])

That handles scheduling, but the interesting part is putting an LLM in front of it so users can manage jobs with plain English instead of memorizing cron syntax.

Parsing Natural Language into Cron Schedules

The first piece you need is a function that converts human-readable schedules into APScheduler trigger parameters. You could write a regex parser, but an LLM handles the ambiguity much better – “every other Tuesday” or “twice a day during business hours” are hard to capture with rules.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import json
from openai import OpenAI

client = OpenAI()

def parse_schedule(natural_language: str) -> dict:
    """Convert natural language schedule to APScheduler CronTrigger kwargs."""
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": (
                    "You convert natural language schedules into APScheduler "
                    "CronTrigger keyword arguments. Return ONLY valid JSON with "
                    "keys from: year, month, day, week, day_of_week, hour, minute, second. "
                    "Use cron-style values. Examples:\n"
                    "- 'every weekday at 9am' -> {\"day_of_week\": \"mon-fri\", \"hour\": 9, \"minute\": 0}\n"
                    "- 'every Sunday at midnight' -> {\"day_of_week\": \"sun\", \"hour\": 0, \"minute\": 0}\n"
                    "- 'every 6 hours' -> {\"hour\": \"*/6\", \"minute\": 0}\n"
                    "- 'first of every month at 3pm' -> {\"day\": 1, \"hour\": 15, \"minute\": 0}"
                ),
            },
            {"role": "user", "content": natural_language},
        ],
        temperature=0,
    )
    raw = response.choices[0].message.content.strip()
    # Strip markdown fences if present
    if raw.startswith("```"):
        raw = raw.split("\n", 1)[1].rsplit("```", 1)[0]
    return json.loads(raw)


# Test it
schedule = parse_schedule("every weekday at 9am")
print(schedule)
# {"day_of_week": "mon-fri", "hour": 9, "minute": 0}

The temperature=0 keeps outputs deterministic. You get back a dictionary that plugs directly into CronTrigger(**schedule).

Building the Agent with Tool Calling

Now wire this into an agent loop. The agent needs four tools: add a job, list jobs, remove a job, and check job status. Define them as OpenAI function-calling tools and dispatch based on what the model picks.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import json
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from openai import OpenAI

client = OpenAI()
scheduler = BackgroundScheduler()
scheduler.start()

# Registry of task functions the agent can schedule
TASK_REGISTRY = {
    "db_backup": lambda: print(f"[{datetime.now()}] Database backup completed"),
    "send_report": lambda: print(f"[{datetime.now()}] Report sent"),
    "cleanup_logs": lambda: print(f"[{datetime.now()}] Logs cleaned up"),
    "health_check": lambda: print(f"[{datetime.now()}] Health check passed"),
}

# Tool definitions for OpenAI function calling
tools = [
    {
        "type": "function",
        "function": {
            "name": "add_job",
            "description": "Schedule a new recurring job",
            "parameters": {
                "type": "object",
                "properties": {
                    "job_id": {
                        "type": "string",
                        "description": "Unique identifier for the job",
                    },
                    "task_name": {
                        "type": "string",
                        "enum": list(TASK_REGISTRY.keys()),
                        "description": "The task function to run",
                    },
                    "schedule_description": {
                        "type": "string",
                        "description": "Natural language schedule, e.g. 'every weekday at 9am'",
                    },
                },
                "required": ["job_id", "task_name", "schedule_description"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "list_jobs",
            "description": "List all currently scheduled jobs",
            "parameters": {"type": "object", "properties": {}},
        },
    },
    {
        "type": "function",
        "function": {
            "name": "remove_job",
            "description": "Remove a scheduled job by its ID",
            "parameters": {
                "type": "object",
                "properties": {
                    "job_id": {
                        "type": "string",
                        "description": "The job ID to remove",
                    }
                },
                "required": ["job_id"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "get_job_status",
            "description": "Get details about a specific scheduled job",
            "parameters": {
                "type": "object",
                "properties": {
                    "job_id": {
                        "type": "string",
                        "description": "The job ID to check",
                    }
                },
                "required": ["job_id"],
            },
        },
    },
]


def handle_tool_call(name: str, args: dict) -> str:
    """Execute a tool call and return the result as a string."""
    if name == "add_job":
        task_fn = TASK_REGISTRY.get(args["task_name"])
        if not task_fn:
            return json.dumps({"error": f"Unknown task: {args['task_name']}"})
        cron_kwargs = parse_schedule(args["schedule_description"])
        scheduler.add_job(
            task_fn,
            trigger=CronTrigger(**cron_kwargs),
            id=args["job_id"],
            name=args["task_name"],
            replace_existing=True,
        )
        return json.dumps({
            "status": "scheduled",
            "job_id": args["job_id"],
            "cron_kwargs": cron_kwargs,
            "next_run": str(scheduler.get_job(args["job_id"]).next_run_time),
        })

    elif name == "list_jobs":
        jobs = []
        for job in scheduler.get_jobs():
            jobs.append({
                "id": job.id,
                "name": job.name,
                "next_run": str(job.next_run_time),
                "trigger": str(job.trigger),
            })
        return json.dumps({"jobs": jobs, "total": len(jobs)})

    elif name == "remove_job":
        try:
            scheduler.remove_job(args["job_id"])
            return json.dumps({"status": "removed", "job_id": args["job_id"]})
        except Exception as e:
            return json.dumps({"error": str(e)})

    elif name == "get_job_status":
        job = scheduler.get_job(args["job_id"])
        if not job:
            return json.dumps({"error": f"Job '{args['job_id']}' not found"})
        return json.dumps({
            "id": job.id,
            "name": job.name,
            "next_run": str(job.next_run_time),
            "trigger": str(job.trigger),
            "pending": job.pending,
        })

    return json.dumps({"error": f"Unknown tool: {name}"})


def run_agent(user_message: str) -> str:
    """Run the cron agent with a user message and return the response."""
    messages = [
        {
            "role": "system",
            "content": (
                "You are a cron job management agent. You help users schedule, "
                "list, remove, and monitor recurring tasks. Available tasks: "
                f"{list(TASK_REGISTRY.keys())}. Always confirm what you did after "
                "executing a tool call."
            ),
        },
        {"role": "user", "content": user_message},
    ]

    while True:
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            tools=tools,
        )
        choice = response.choices[0]

        if choice.finish_reason == "stop":
            return choice.message.content

        if choice.finish_reason == "tool_calls":
            messages.append(choice.message)
            for tool_call in choice.message.tool_calls:
                args = json.loads(tool_call.function.arguments)
                result = handle_tool_call(tool_call.function.name, args)
                messages.append({
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "content": result,
                })

Use it like this:

1
2
3
4
5
6
7
8
9
print(run_agent("Schedule a database backup every weekday at 9am"))
# "I've scheduled the database backup (job ID: db_backup) to run
#  every weekday at 9:00 AM. The next run is Monday at 09:00."

print(run_agent("What jobs are running right now?"))
# Lists all scheduled jobs with their next run times

print(run_agent("Cancel the database backup"))
# "I've removed the db_backup job. It will no longer run."

The agent loop keeps calling tools until the model returns a plain text response. This handles multi-step requests too – “schedule a health check every 6 hours and show me all jobs” triggers two tool calls in sequence.

Handling Failures and Retries

Scheduled tasks fail. Networks go down, APIs hit rate limits, disks fill up. APScheduler has built-in retry support through event listeners and job configuration.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED, EVENT_JOB_MISSED
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("cron_agent")

# Track job execution history
job_history = []

def job_listener(event):
    """Log job outcomes and track history."""
    job_id = event.job_id
    timestamp = datetime.now().isoformat()

    if event.exception:
        logger.error(f"Job {job_id} failed: {event.exception}")
        job_history.append({
            "job_id": job_id,
            "status": "failed",
            "error": str(event.exception),
            "timestamp": timestamp,
        })
    else:
        logger.info(f"Job {job_id} completed successfully")
        job_history.append({
            "job_id": job_id,
            "status": "success",
            "timestamp": timestamp,
        })

def missed_job_listener(event):
    """Handle jobs that missed their scheduled time."""
    logger.warning(f"Job {event.job_id} missed its scheduled run time")
    job_history.append({
        "job_id": event.job_id,
        "status": "missed",
        "timestamp": datetime.now().isoformat(),
    })

scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler.add_listener(missed_job_listener, EVENT_JOB_MISSED)

For automatic retries, configure the job with misfire_grace_time and wrap the task function:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import time

def with_retry(fn, max_retries=3, backoff_base=2):
    """Wrap a task function with exponential backoff retry logic."""
    def wrapper():
        last_error = None
        for attempt in range(max_retries):
            try:
                return fn()
            except Exception as e:
                last_error = e
                wait = backoff_base ** attempt
                logger.warning(
                    f"Attempt {attempt + 1}/{max_retries} failed: {e}. "
                    f"Retrying in {wait}s..."
                )
                time.sleep(wait)
        raise last_error
    return wrapper

# Schedule with retry wrapper and a 60-second grace time for misfires
scheduler.add_job(
    with_retry(TASK_REGISTRY["db_backup"], max_retries=3),
    trigger=CronTrigger(day_of_week="mon-fri", hour=9, minute=0),
    id="db_backup_with_retry",
    name="Database Backup (with retry)",
    misfire_grace_time=60,
    replace_existing=True,
)

The misfire_grace_time=60 means if the scheduler was down and missed the 9:00 AM window, it will still fire the job as long as it’s within 60 seconds of the scheduled time. The with_retry wrapper gives you exponential backoff – waits 1s, 2s, then 4s between attempts.

Common Errors and Fixes

ConflictingIdError: Job identifier 'x' conflicts with an existing job

You tried to add a job with an ID that already exists. Pass replace_existing=True to add_job(), or remove the old job first with scheduler.remove_job("x").

KeyError when the LLM returns unexpected cron keys

The LLM sometimes returns keys like "weekday" instead of "day_of_week". Add validation before passing to CronTrigger:

1
2
3
4
5
6
7
8
VALID_CRON_KEYS = {"year", "month", "day", "week", "day_of_week", "hour", "minute", "second"}

def validate_cron_kwargs(kwargs: dict) -> dict:
    """Strip invalid keys and apply safe defaults."""
    cleaned = {k: v for k, v in kwargs.items() if k in VALID_CRON_KEYS}
    if not cleaned:
        raise ValueError(f"No valid cron keys found in: {kwargs}")
    return cleaned

json.JSONDecodeError from LLM schedule parsing

The model sometimes wraps JSON in markdown fences or adds explanatory text. The parse_schedule function above strips fences, but you should also wrap the parse call in a try/except and retry once:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def safe_parse_schedule(text: str) -> dict:
    try:
        return parse_schedule(text)
    except json.JSONDecodeError:
        # Retry with an explicit "JSON only" reminder
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "system",
                    "content": "Return ONLY raw JSON, no markdown, no explanation.",
                },
                {
                    "role": "user",
                    "content": f"Convert to APScheduler CronTrigger kwargs: {text}",
                },
            ],
            temperature=0,
        )
        return json.loads(response.choices[0].message.content.strip())

Scheduler stops after the main thread exits

BackgroundScheduler runs in a daemon thread, so it dies when your main script ends. For a long-running agent, either use BlockingScheduler or add a keep-alive loop:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import signal
import sys

def shutdown(signum, frame):
    scheduler.shutdown(wait=False)
    sys.exit(0)

signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)

# Keep the main thread alive
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    scheduler.shutdown(wait=False)

Jobs silently fail with no error output

APScheduler swallows exceptions by default. Always attach the event listener from the retry section above. You can also set jitter on jobs to prevent thundering herd problems when multiple jobs share the same schedule:

1
2
3
4
5
6
scheduler.add_job(
    task_fn,
    trigger=CronTrigger(hour="*/6", minute=0),
    id="health_check",
    jitter=120,  # Random delay up to 120 seconds
)

This gives you a production-ready pattern: an LLM that understands natural language schedules, APScheduler handling the execution timing, tool-calling for management operations, and proper error handling with retries. The agent loop is stateless between requests, so you can drop it behind a REST API or a chat interface without any changes to the core logic.