Stale data kills model accuracy silently. Your feature pipeline breaks at 2am, the daily training job ingests yesterday’s stale features, and your model quietly degrades. Nobody notices until a customer complains. By then you’ve been serving garbage predictions for hours.
The fix is a freshness monitoring layer that runs independently from your data pipelines. It checks when each data source was last updated, compares that against your expectations, and screams at you when something is late. Here’s how to build one from scratch in Python.
Defining Freshness Rules#
Start by defining what “fresh” means for each data source. A user events table that updates hourly has different expectations than a monthly demographics export.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
| from dataclasses import dataclass, field
from datetime import timedelta
from enum import Enum
from typing import Optional
class SourceType(Enum):
FILE = "file"
S3 = "s3"
DATABASE = "database"
@dataclass
class FreshnessRule:
"""Defines freshness expectations for a single data source."""
name: str
source_type: SourceType
max_age: timedelta
path: str # file path, S3 key, or table name
min_row_count: Optional[int] = None
description: str = ""
def __repr__(self) -> str:
hours = self.max_age.total_seconds() / 3600
return f"FreshnessRule({self.name}, max_age={hours}h)"
# Define your data sources and their freshness expectations
rules = [
FreshnessRule(
name="user_events",
source_type=SourceType.DATABASE,
max_age=timedelta(hours=2),
path="user_events",
min_row_count=1000,
description="Clickstream events, updated every hour",
),
FreshnessRule(
name="training_features",
source_type=SourceType.FILE,
max_age=timedelta(hours=24),
path="/data/features/training_features.parquet",
description="Daily feature export for model training",
),
FreshnessRule(
name="product_embeddings",
source_type=SourceType.S3,
max_age=timedelta(hours=12),
path="ml-data-bucket/embeddings/product_embeddings.npy",
description="Product embeddings refreshed twice daily",
),
]
|
The max_age field is the key constraint. If a source hasn’t been updated within that window, it’s stale. The min_row_count is optional but catches a different failure mode – a pipeline that runs but produces empty or truncated output.
Checking File and Database Freshness#
Now build the checkers. Each one returns a FreshnessResult with the actual age and whether the source passed.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
| import os
import sqlite3
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
@dataclass
class FreshnessResult:
rule_name: str
is_fresh: bool
actual_age: Optional[timedelta]
row_count: Optional[int] = None
error: Optional[str] = None
checked_at: datetime = None
def __post_init__(self):
if self.checked_at is None:
self.checked_at = datetime.now(timezone.utc)
def check_file_freshness(rule: FreshnessRule) -> FreshnessResult:
"""Check freshness of a local file."""
try:
p = Path(rule.path)
if not p.exists():
return FreshnessResult(
rule_name=rule.name,
is_fresh=False,
actual_age=None,
error=f"File not found: {rule.path}",
)
mtime = datetime.fromtimestamp(p.stat().st_mtime, tz=timezone.utc)
age = datetime.now(timezone.utc) - mtime
is_fresh = age <= rule.max_age
return FreshnessResult(
rule_name=rule.name,
is_fresh=is_fresh,
actual_age=age,
)
except OSError as e:
return FreshnessResult(
rule_name=rule.name,
is_fresh=False,
actual_age=None,
error=str(e),
)
def check_s3_freshness(rule: FreshnessRule) -> FreshnessResult:
"""Check freshness of an S3 object. Requires boto3."""
import boto3
from botocore.exceptions import ClientError
try:
bucket, key = rule.path.split("/", 1)
s3 = boto3.client("s3")
response = s3.head_object(Bucket=bucket, Key=key)
last_modified = response["LastModified"] # already timezone-aware
age = datetime.now(timezone.utc) - last_modified
is_fresh = age <= rule.max_age
return FreshnessResult(
rule_name=rule.name,
is_fresh=is_fresh,
actual_age=age,
)
except ClientError as e:
return FreshnessResult(
rule_name=rule.name,
is_fresh=False,
actual_age=None,
error=f"S3 error: {e.response['Error']['Message']}",
)
def check_db_freshness(rule: FreshnessRule, db_path: str = "app.db") -> FreshnessResult:
"""Check freshness of a database table using its most recent timestamp."""
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Get the most recent row timestamp
cursor.execute(
f"SELECT MAX(updated_at) FROM {rule.path}"
)
result = cursor.fetchone()
if result[0] is None:
return FreshnessResult(
rule_name=rule.name,
is_fresh=False,
actual_age=None,
error=f"Table {rule.path} is empty",
)
last_update = datetime.fromisoformat(result[0]).replace(tzinfo=timezone.utc)
age = datetime.now(timezone.utc) - last_update
# Check row count if required
row_count = None
if rule.min_row_count is not None:
cursor.execute(f"SELECT COUNT(*) FROM {rule.path}")
row_count = cursor.fetchone()[0]
conn.close()
is_fresh = age <= rule.max_age
if rule.min_row_count and row_count is not None:
is_fresh = is_fresh and (row_count >= rule.min_row_count)
return FreshnessResult(
rule_name=rule.name,
is_fresh=is_fresh,
actual_age=age,
row_count=row_count,
)
except sqlite3.Error as e:
return FreshnessResult(
rule_name=rule.name,
is_fresh=False,
actual_age=None,
error=f"DB error: {e}",
)
|
The pattern is the same for each source type: get the last modified time, compute the age, compare against the rule’s max_age. The S3 checker uses head_object instead of downloading the file – fast and cheap. The database checker queries the max updated_at timestamp from the table directly.
Dispatching Checks by Source Type#
Wire the checkers together with a dispatcher:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| def check_freshness(rule: FreshnessRule) -> FreshnessResult:
"""Route a freshness check to the correct checker."""
if rule.source_type == SourceType.FILE:
return check_file_freshness(rule)
elif rule.source_type == SourceType.S3:
return check_s3_freshness(rule)
elif rule.source_type == SourceType.DATABASE:
return check_db_freshness(rule)
else:
return FreshnessResult(
rule_name=rule.name,
is_fresh=False,
actual_age=None,
error=f"Unknown source type: {rule.source_type}",
)
|
Building the Monitoring Loop#
You want this running on a schedule, not as a one-off script. The schedule library is the simplest option for a standalone monitoring process.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
| import logging
import time
import schedule
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger("freshness_monitor")
def run_all_checks(rules: list[FreshnessRule]) -> list[FreshnessResult]:
"""Run freshness checks for all rules and return results."""
results = []
for rule in rules:
logger.info(f"Checking freshness: {rule.name}")
result = check_freshness(rule)
if result.is_fresh:
logger.info(f" PASS: {rule.name} (age={result.actual_age})")
elif result.error:
logger.error(f" ERROR: {rule.name} - {result.error}")
else:
logger.warning(
f" STALE: {rule.name} (age={result.actual_age}, max={rule.max_age})"
)
results.append(result)
return results
def monitoring_job():
"""Scheduled job that checks freshness and sends alerts."""
results = run_all_checks(rules)
stale = [r for r in results if not r.is_fresh]
if stale:
logger.warning(f"{len(stale)} stale data source(s) detected")
send_slack_alert(stale) # defined in the next section
else:
logger.info("All data sources are fresh")
# Run checks every 15 minutes
schedule.every(15).minutes.do(monitoring_job)
# Run once immediately on startup
monitoring_job()
# Keep the process alive
logger.info("Freshness monitor started. Checking every 15 minutes.")
while True:
schedule.run_pending()
time.sleep(10)
|
For production use, you would run this as a systemd service or a Kubernetes CronJob. The schedule library is fine for a single-process monitor. If you need something more resilient, swap it for an Airflow DAG or a Celery beat task.
Sending Alerts#
When something goes stale, you need to know fast. Slack webhooks are the easiest way to get alerts in front of your team.
Slack Alerts#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
| import json
import os
import requests
SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL", "")
def send_slack_alert(stale_results: list[FreshnessResult]) -> None:
"""Send a Slack notification for stale data sources."""
if not SLACK_WEBHOOK_URL:
logger.warning("SLACK_WEBHOOK_URL not set, skipping Slack alert")
return
blocks = []
for r in stale_results:
if r.error:
detail = f"Error: {r.error}"
else:
hours = r.actual_age.total_seconds() / 3600
detail = f"Age: {hours:.1f}h"
if r.row_count is not None:
detail += f" | Rows: {r.row_count:,}"
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*{r.rule_name}*\n{detail}",
},
})
payload = {
"text": f"Data Freshness Alert: {len(stale_results)} source(s) stale",
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"Stale Data Alert ({len(stale_results)} source(s))",
},
},
*blocks,
],
}
response = requests.post(
SLACK_WEBHOOK_URL,
data=json.dumps(payload),
headers={"Content-Type": "application/json"},
timeout=10,
)
if response.status_code != 200:
logger.error(f"Slack alert failed: {response.status_code} {response.text}")
else:
logger.info("Slack alert sent successfully")
|
Set the SLACK_WEBHOOK_URL environment variable to your incoming webhook URL from the Slack API console. The payload uses Block Kit for formatting – each stale source gets its own section with the age and optional row count.
Email Alerts#
For teams that prefer email or need alerts outside of Slack:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| import smtplib
from email.mime.text import MIMEText
def send_email_alert(
stale_results: list[FreshnessResult],
smtp_host: str = "smtp.gmail.com",
smtp_port: int = 587,
sender: str = "[email protected]",
password: str = "",
recipients: list[str] = None,
) -> None:
"""Send an email alert for stale data sources."""
if recipients is None:
recipients = ["[email protected]"]
lines = ["The following data sources are stale:\n"]
for r in stale_results:
if r.error:
lines.append(f" - {r.rule_name}: ERROR - {r.error}")
else:
hours = r.actual_age.total_seconds() / 3600
lines.append(f" - {r.rule_name}: {hours:.1f}h old (max: "
f"{r.actual_age.total_seconds() / 3600:.1f}h)")
body = "\n".join(lines)
msg = MIMEText(body)
msg["Subject"] = f"[DATA ALERT] {len(stale_results)} stale data source(s)"
msg["From"] = sender
msg["To"] = ", ".join(recipients)
try:
with smtplib.SMTP(smtp_host, smtp_port) as server:
server.starttls()
server.login(sender, password)
server.sendmail(sender, recipients, msg.as_string())
logger.info(f"Email alert sent to {recipients}")
except smtplib.SMTPException as e:
logger.error(f"Email alert failed: {e}")
|
Swap the SMTP credentials for your actual mail server. For production, consider using SES or SendGrid instead of direct SMTP – they handle delivery reliability and bounce management for you.
Common Errors and Fixes#
Real issues you will hit when deploying this.
S3 Access Denied#
1
| botocore.exceptions.ClientError: An error occurred (403) when calling the HeadObject operation: Forbidden
|
Your IAM role or credentials lack s3:GetObject permission on the bucket. Fix by adding the permission to your policy:
1
2
3
4
5
| {
"Effect": "Allow",
"Action": ["s3:GetObject", "s3:HeadObject"],
"Resource": "arn:aws:s3:::ml-data-bucket/*"
}
|
Make sure you include s3:HeadObject specifically – s3:GetObject alone is not enough for head_object calls.
SQLite Database Locked#
1
| sqlite3.OperationalError: database is locked
|
This happens when your monitoring process tries to read while another process is writing. Set a timeout on the connection:
1
| conn = sqlite3.connect("app.db", timeout=30)
|
That gives the writer up to 30 seconds to release the lock before raising the error. If you are hitting this constantly, switch to PostgreSQL or add WAL mode:
1
| conn.execute("PRAGMA journal_mode=WAL")
|
Slack Webhook Returns 400#
1
| Slack alert failed: 400 invalid_payload
|
The Block Kit payload has a formatting problem. The most common cause is a text field exceeding 3000 characters. Truncate long messages before sending:
1
| detail = detail[:2900] if len(detail) > 2900 else detail
|
Also check that your webhook URL is still active. Slack deactivates webhooks after extended periods of inactivity.