Manual retraining is a ticking time bomb. Someone forgets to kick off the script, the model silently degrades, and you find out from a Slack message that says “predictions look weird.” Prefect lets you build retraining pipelines that run on a schedule, retry on failure, skip deployment when metrics regress, and ping you when something breaks – all without writing your own scheduler or cron wrapper.
Here’s a complete retraining pipeline you can steal and adapt.
The Full Retraining Flow#
This pipeline handles the entire lifecycle: pull fresh data, preprocess, train, evaluate against the current production model, and only promote if metrics actually improve. Every step is a Prefect task with its own retry policy.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
| import json
from datetime import datetime
from pathlib import Path
import joblib
import pandas as pd
from prefect import flow, task, get_run_logger
from prefect.cache_policies import INPUTS
from prefect.blocks.notifications import SlackWebhook
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import f1_score
from sklearn.model_selection import train_test_split
MODELS_DIR = Path("models")
METRICS_FILE = MODELS_DIR / "production_metrics.json"
@task(retries=3, retry_delay_seconds=30, cache_policy=INPUTS)
def fetch_training_data(source: str) -> pd.DataFrame:
"""Pull the latest labeled data from your warehouse or lake."""
logger = get_run_logger()
# Replace this with your actual data source -- BigQuery, Snowflake, S3, etc.
df = pd.read_parquet(source)
logger.info(f"Fetched {len(df)} rows from {source}")
return df
@task(retries=1)
def preprocess(df: pd.DataFrame) -> tuple[pd.DataFrame, pd.Series]:
"""Clean and split features from target."""
logger = get_run_logger()
df = df.dropna(subset=["target"])
# Drop columns that leak or aren't useful
drop_cols = [c for c in ["id", "created_at", "updated_at"] if c in df.columns]
X = df.drop(columns=["target"] + drop_cols)
y = df["target"]
logger.info(f"Preprocessed: {X.shape[1]} features, {len(y)} samples")
return X, y
@task
def split_data(
X: pd.DataFrame, y: pd.Series
) -> tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]:
"""Split into train and test sets once to avoid data leakage."""
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
return X_train, X_test, y_train, y_test
@task(retries=2, retry_delay_seconds=10)
def train_model(
X_train: pd.DataFrame, y_train: pd.Series
) -> GradientBoostingClassifier:
"""Train a new candidate model."""
logger = get_run_logger()
model = GradientBoostingClassifier(
n_estimators=200,
max_depth=5,
learning_rate=0.1,
subsample=0.8,
)
model.fit(X_train, y_train)
logger.info("Model training complete")
return model
@task
def evaluate_model(
model: GradientBoostingClassifier,
X_test: pd.DataFrame,
y_test: pd.Series,
) -> float:
"""Evaluate on the holdout test set and return F1 score."""
preds = model.predict(X_test)
score = f1_score(y_test, preds, average="weighted")
logger = get_run_logger()
logger.info(f"Candidate model F1: {score:.4f}")
return score
@task
def load_production_metrics() -> float:
"""Load the current production model's benchmark score."""
if METRICS_FILE.exists():
with open(METRICS_FILE) as f:
return json.load(f)["f1_score"]
return 0.0 # no production model yet -- any score wins
@task
def promote_model(
model: GradientBoostingClassifier,
score: float,
) -> str:
"""Save the new model as the production artifact."""
logger = get_run_logger()
MODELS_DIR.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
model_path = MODELS_DIR / f"model_{timestamp}.joblib"
joblib.dump(model, model_path)
# Update production metrics
with open(METRICS_FILE, "w") as f:
json.dump({"f1_score": score, "model_path": str(model_path), "promoted_at": timestamp}, f)
logger.info(f"Promoted model to {model_path} with F1={score:.4f}")
return str(model_path)
@task
def notify_failure(message: str):
"""Send a Slack notification on pipeline failure."""
logger = get_run_logger()
try:
slack = SlackWebhook.load("retraining-alerts")
slack.notify(message)
except ValueError:
logger.warning("Slack webhook block 'retraining-alerts' not configured, skipping notification")
logger.warning(f"Would have sent: {message}")
@flow(name="retrain-model", log_prints=True)
def retrain_pipeline(data_source: str = "s3://my-bucket/training/latest.parquet"):
"""End-to-end retraining: fetch, preprocess, train, evaluate, conditionally promote."""
logger = get_run_logger()
try:
df = fetch_training_data(data_source)
X, y = preprocess(df)
X_train, X_test, y_train, y_test = split_data(X, y)
candidate = train_model(X_train, y_train)
candidate_score = evaluate_model(candidate, X_test, y_test)
prod_score = load_production_metrics()
improvement = candidate_score - prod_score
logger.info(f"Production F1: {prod_score:.4f} | Candidate F1: {candidate_score:.4f} | Delta: {improvement:+.4f}")
if candidate_score > prod_score:
model_path = promote_model(candidate, candidate_score)
print(f"New model promoted: {model_path}")
else:
print(f"Candidate did not beat production ({candidate_score:.4f} <= {prod_score:.4f}). Skipping promotion.")
except Exception as e:
notify_failure(f"Retraining pipeline failed: {e}")
raise
if __name__ == "__main__":
retrain_pipeline()
|
Save this as retrain_flow.py and run it with python retrain_flow.py for a one-off execution. But the real value comes from scheduling.
Scheduling Retraining on a Cron#
Prefect separates flow code from scheduling through deployments. You define when and how a flow runs without touching the flow logic.
1
2
3
4
5
6
7
8
9
10
| from prefect import serve
from retrain_flow import retrain_pipeline
if __name__ == "__main__":
retrain_pipeline.serve(
name="nightly-retrain",
cron="0 2 * * *", # every day at 2 AM UTC
parameters={"data_source": "s3://my-bucket/training/latest.parquet"},
tags=["retraining", "production"],
)
|
Run python deploy.py and the flow shows up in the Prefect UI, scheduled to fire at 2 AM UTC every day. The .serve() method starts a lightweight process that polls the schedule and runs the flow locally. For production, use prefect deploy with a prefect.yaml to run flows on remote infrastructure:
1
2
3
4
| prefect deploy retrain_flow.py:retrain_pipeline \
--name nightly-retrain \
--cron "0 2 * * *" \
--pool ml-training
|
The .serve() approach is simpler for single-machine setups, while prefect deploy with work pools gives you remote execution and scaling.
How Retries and Failure Handling Work#
Each @task has its own retry policy. The fetch_training_data task retries 3 times with 30-second delays because network calls to data warehouses are flaky. The train_model task retries twice because OOM errors sometimes resolve on a second attempt with different memory conditions.
When a task exhausts all retries, the exception propagates to the flow. The try/except in retrain_pipeline catches it and calls notify_failure before re-raising, so you get a Slack alert and the flow still shows as failed in the Prefect UI.
To set up the Slack notification block:
1
2
| prefect block register -m prefect.blocks.notifications
prefect block create slack-webhook --name retraining-alerts
|
Then paste your Slack webhook URL into the Prefect UI when prompted. The notify_failure task gracefully handles the case where the block isn’t configured yet, so you won’t get a crash on top of a crash.
Conditional Deployment Logic#
The pipeline only promotes a new model when candidate_score > prod_score. This prevents a bad data batch or a hyperparameter regression from pushing a worse model to production. You can tighten this further:
1
2
3
4
5
6
7
8
9
| # Require at least 1% improvement to justify the deployment overhead
MIN_IMPROVEMENT = 0.01
if candidate_score > prod_score + MIN_IMPROVEMENT:
model_path = promote_model(candidate, candidate_score)
elif candidate_score > prod_score:
print(f"Marginal improvement ({improvement:+.4f}), skipping promotion")
else:
print(f"Regression detected ({improvement:+.4f}), skipping promotion")
|
This avoids deploying for noise-level improvements that aren’t worth the operational churn.
Artifact Tracking#
The promote_model task writes a production_metrics.json file that acts as a lightweight model registry. Every time a model is promoted, you get a timestamped record of the model path and its evaluation score. For production systems, swap this out for MLflow or Weights & Biases:
1
2
3
4
5
6
7
8
9
10
11
| import mlflow
@task
def promote_model_mlflow(model, score: float) -> str:
with mlflow.start_run():
mlflow.sklearn.log_model(model, "model")
mlflow.log_metric("f1_score", score)
run_id = mlflow.active_run().info.run_id
# Register and transition to production stage
mlflow.register_model(f"runs:/{run_id}/model", "my-classifier")
return run_id
|
This gives you model versioning, comparison dashboards, and a proper lineage trail – things the JSON file approach doesn’t scale to.
Common Errors and Fixes#
prefect.exceptions.MissingResult after task completes – This happens when a task returns a large object that Prefect can’t serialize. Add persist_result=True and configure a result storage block, or return a file path instead of the raw model object.
TypeError: cannot pickle '_thread.lock' object – Some ML models hold thread locks that can’t be serialized between tasks. Wrap the model in joblib.dump() inside the task and pass the file path downstream instead of the model object.
ConnectionRefusedError when running flows – You need a running Prefect server or Prefect Cloud connection. For local development, start the server with prefect server start in a separate terminal. For production, point to Prefect Cloud with prefect cloud login.
Tasks running sequentially despite no dependencies – Prefect 2.x runs tasks sequentially by default. To get concurrency, use task.submit() instead of calling tasks directly, and provide a task runner like ConcurrentTaskRunner:
1
2
3
4
5
6
| from prefect.task_runners import ConcurrentTaskRunner
@flow(task_runner=ConcurrentTaskRunner())
def retrain_pipeline():
# task.submit() calls now run concurrently where possible
...
|
Schedule not triggering – If using prefect deploy, make sure a worker is running and polling the correct work pool: prefect worker start --pool ml-training. No worker means no execution, even if the schedule is active. If using .serve(), the process must stay running.
Stale cache returning old data – The cache_policy=INPUTS on fetch_training_data caches results based on input parameters. If your data source URL stays the same but the underlying data changes, remove the cache policy or add a timestamp parameter to bust it.