Stale data kills model accuracy silently. Your feature pipeline breaks at 2am, the daily training job ingests yesterday’s stale features, and your model quietly degrades. Nobody notices until a customer complains. By then you’ve been serving garbage predictions for hours.

The fix is a freshness monitoring layer that runs independently from your data pipelines. It checks when each data source was last updated, compares that against your expectations, and screams at you when something is late. Here’s how to build one from scratch in Python.

Defining Freshness Rules

Start by defining what “fresh” means for each data source. A user events table that updates hourly has different expectations than a monthly demographics export.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from dataclasses import dataclass, field
from datetime import timedelta
from enum import Enum
from typing import Optional


class SourceType(Enum):
    FILE = "file"
    S3 = "s3"
    DATABASE = "database"


@dataclass
class FreshnessRule:
    """Defines freshness expectations for a single data source."""
    name: str
    source_type: SourceType
    max_age: timedelta
    path: str  # file path, S3 key, or table name
    min_row_count: Optional[int] = None
    description: str = ""

    def __repr__(self) -> str:
        hours = self.max_age.total_seconds() / 3600
        return f"FreshnessRule({self.name}, max_age={hours}h)"


# Define your data sources and their freshness expectations
rules = [
    FreshnessRule(
        name="user_events",
        source_type=SourceType.DATABASE,
        max_age=timedelta(hours=2),
        path="user_events",
        min_row_count=1000,
        description="Clickstream events, updated every hour",
    ),
    FreshnessRule(
        name="training_features",
        source_type=SourceType.FILE,
        max_age=timedelta(hours=24),
        path="/data/features/training_features.parquet",
        description="Daily feature export for model training",
    ),
    FreshnessRule(
        name="product_embeddings",
        source_type=SourceType.S3,
        max_age=timedelta(hours=12),
        path="ml-data-bucket/embeddings/product_embeddings.npy",
        description="Product embeddings refreshed twice daily",
    ),
]

The max_age field is the key constraint. If a source hasn’t been updated within that window, it’s stale. The min_row_count is optional but catches a different failure mode – a pipeline that runs but produces empty or truncated output.

Checking File and Database Freshness

Now build the checkers. Each one returns a FreshnessResult with the actual age and whether the source passed.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import os
import sqlite3
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional


@dataclass
class FreshnessResult:
    rule_name: str
    is_fresh: bool
    actual_age: Optional[timedelta]
    row_count: Optional[int] = None
    error: Optional[str] = None
    checked_at: datetime = None

    def __post_init__(self):
        if self.checked_at is None:
            self.checked_at = datetime.now(timezone.utc)


def check_file_freshness(rule: FreshnessRule) -> FreshnessResult:
    """Check freshness of a local file."""
    try:
        p = Path(rule.path)
        if not p.exists():
            return FreshnessResult(
                rule_name=rule.name,
                is_fresh=False,
                actual_age=None,
                error=f"File not found: {rule.path}",
            )

        mtime = datetime.fromtimestamp(p.stat().st_mtime, tz=timezone.utc)
        age = datetime.now(timezone.utc) - mtime
        is_fresh = age <= rule.max_age

        return FreshnessResult(
            rule_name=rule.name,
            is_fresh=is_fresh,
            actual_age=age,
        )
    except OSError as e:
        return FreshnessResult(
            rule_name=rule.name,
            is_fresh=False,
            actual_age=None,
            error=str(e),
        )


def check_s3_freshness(rule: FreshnessRule) -> FreshnessResult:
    """Check freshness of an S3 object. Requires boto3."""
    import boto3
    from botocore.exceptions import ClientError

    try:
        bucket, key = rule.path.split("/", 1)
        s3 = boto3.client("s3")
        response = s3.head_object(Bucket=bucket, Key=key)
        last_modified = response["LastModified"]  # already timezone-aware
        age = datetime.now(timezone.utc) - last_modified
        is_fresh = age <= rule.max_age

        return FreshnessResult(
            rule_name=rule.name,
            is_fresh=is_fresh,
            actual_age=age,
        )
    except ClientError as e:
        return FreshnessResult(
            rule_name=rule.name,
            is_fresh=False,
            actual_age=None,
            error=f"S3 error: {e.response['Error']['Message']}",
        )


def check_db_freshness(rule: FreshnessRule, db_path: str = "app.db") -> FreshnessResult:
    """Check freshness of a database table using its most recent timestamp."""
    try:
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()

        # Get the most recent row timestamp
        cursor.execute(
            f"SELECT MAX(updated_at) FROM {rule.path}"
        )
        result = cursor.fetchone()

        if result[0] is None:
            return FreshnessResult(
                rule_name=rule.name,
                is_fresh=False,
                actual_age=None,
                error=f"Table {rule.path} is empty",
            )

        last_update = datetime.fromisoformat(result[0]).replace(tzinfo=timezone.utc)
        age = datetime.now(timezone.utc) - last_update

        # Check row count if required
        row_count = None
        if rule.min_row_count is not None:
            cursor.execute(f"SELECT COUNT(*) FROM {rule.path}")
            row_count = cursor.fetchone()[0]

        conn.close()

        is_fresh = age <= rule.max_age
        if rule.min_row_count and row_count is not None:
            is_fresh = is_fresh and (row_count >= rule.min_row_count)

        return FreshnessResult(
            rule_name=rule.name,
            is_fresh=is_fresh,
            actual_age=age,
            row_count=row_count,
        )
    except sqlite3.Error as e:
        return FreshnessResult(
            rule_name=rule.name,
            is_fresh=False,
            actual_age=None,
            error=f"DB error: {e}",
        )

The pattern is the same for each source type: get the last modified time, compute the age, compare against the rule’s max_age. The S3 checker uses head_object instead of downloading the file – fast and cheap. The database checker queries the max updated_at timestamp from the table directly.

Dispatching Checks by Source Type

Wire the checkers together with a dispatcher:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
def check_freshness(rule: FreshnessRule) -> FreshnessResult:
    """Route a freshness check to the correct checker."""
    if rule.source_type == SourceType.FILE:
        return check_file_freshness(rule)
    elif rule.source_type == SourceType.S3:
        return check_s3_freshness(rule)
    elif rule.source_type == SourceType.DATABASE:
        return check_db_freshness(rule)
    else:
        return FreshnessResult(
            rule_name=rule.name,
            is_fresh=False,
            actual_age=None,
            error=f"Unknown source type: {rule.source_type}",
        )

Building the Monitoring Loop

You want this running on a schedule, not as a one-off script. The schedule library is the simplest option for a standalone monitoring process.

1
pip install schedule
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import logging
import time

import schedule

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger("freshness_monitor")


def run_all_checks(rules: list[FreshnessRule]) -> list[FreshnessResult]:
    """Run freshness checks for all rules and return results."""
    results = []
    for rule in rules:
        logger.info(f"Checking freshness: {rule.name}")
        result = check_freshness(rule)

        if result.is_fresh:
            logger.info(f"  PASS: {rule.name} (age={result.actual_age})")
        elif result.error:
            logger.error(f"  ERROR: {rule.name} - {result.error}")
        else:
            logger.warning(
                f"  STALE: {rule.name} (age={result.actual_age}, max={rule.max_age})"
            )

        results.append(result)
    return results


def monitoring_job():
    """Scheduled job that checks freshness and sends alerts."""
    results = run_all_checks(rules)
    stale = [r for r in results if not r.is_fresh]

    if stale:
        logger.warning(f"{len(stale)} stale data source(s) detected")
        send_slack_alert(stale)  # defined in the next section
    else:
        logger.info("All data sources are fresh")


# Run checks every 15 minutes
schedule.every(15).minutes.do(monitoring_job)

# Run once immediately on startup
monitoring_job()

# Keep the process alive
logger.info("Freshness monitor started. Checking every 15 minutes.")
while True:
    schedule.run_pending()
    time.sleep(10)

For production use, you would run this as a systemd service or a Kubernetes CronJob. The schedule library is fine for a single-process monitor. If you need something more resilient, swap it for an Airflow DAG or a Celery beat task.

Sending Alerts

When something goes stale, you need to know fast. Slack webhooks are the easiest way to get alerts in front of your team.

Slack Alerts

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import json
import os

import requests


SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL", "")


def send_slack_alert(stale_results: list[FreshnessResult]) -> None:
    """Send a Slack notification for stale data sources."""
    if not SLACK_WEBHOOK_URL:
        logger.warning("SLACK_WEBHOOK_URL not set, skipping Slack alert")
        return

    blocks = []
    for r in stale_results:
        if r.error:
            detail = f"Error: {r.error}"
        else:
            hours = r.actual_age.total_seconds() / 3600
            detail = f"Age: {hours:.1f}h"
            if r.row_count is not None:
                detail += f" | Rows: {r.row_count:,}"

        blocks.append({
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": f"*{r.rule_name}*\n{detail}",
            },
        })

    payload = {
        "text": f"Data Freshness Alert: {len(stale_results)} source(s) stale",
        "blocks": [
            {
                "type": "header",
                "text": {
                    "type": "plain_text",
                    "text": f"Stale Data Alert ({len(stale_results)} source(s))",
                },
            },
            *blocks,
        ],
    }

    response = requests.post(
        SLACK_WEBHOOK_URL,
        data=json.dumps(payload),
        headers={"Content-Type": "application/json"},
        timeout=10,
    )

    if response.status_code != 200:
        logger.error(f"Slack alert failed: {response.status_code} {response.text}")
    else:
        logger.info("Slack alert sent successfully")

Set the SLACK_WEBHOOK_URL environment variable to your incoming webhook URL from the Slack API console. The payload uses Block Kit for formatting – each stale source gets its own section with the age and optional row count.

Email Alerts

For teams that prefer email or need alerts outside of Slack:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import smtplib
from email.mime.text import MIMEText


def send_email_alert(
    stale_results: list[FreshnessResult],
    smtp_host: str = "smtp.gmail.com",
    smtp_port: int = 587,
    sender: str = "[email protected]",
    password: str = "",
    recipients: list[str] = None,
) -> None:
    """Send an email alert for stale data sources."""
    if recipients is None:
        recipients = ["[email protected]"]

    lines = ["The following data sources are stale:\n"]
    for r in stale_results:
        if r.error:
            lines.append(f"  - {r.rule_name}: ERROR - {r.error}")
        else:
            hours = r.actual_age.total_seconds() / 3600
            lines.append(f"  - {r.rule_name}: {hours:.1f}h old (max: "
                         f"{r.actual_age.total_seconds() / 3600:.1f}h)")

    body = "\n".join(lines)

    msg = MIMEText(body)
    msg["Subject"] = f"[DATA ALERT] {len(stale_results)} stale data source(s)"
    msg["From"] = sender
    msg["To"] = ", ".join(recipients)

    try:
        with smtplib.SMTP(smtp_host, smtp_port) as server:
            server.starttls()
            server.login(sender, password)
            server.sendmail(sender, recipients, msg.as_string())
        logger.info(f"Email alert sent to {recipients}")
    except smtplib.SMTPException as e:
        logger.error(f"Email alert failed: {e}")

Swap the SMTP credentials for your actual mail server. For production, consider using SES or SendGrid instead of direct SMTP – they handle delivery reliability and bounce management for you.

Common Errors and Fixes

Real issues you will hit when deploying this.

S3 Access Denied

1
botocore.exceptions.ClientError: An error occurred (403) when calling the HeadObject operation: Forbidden

Your IAM role or credentials lack s3:GetObject permission on the bucket. Fix by adding the permission to your policy:

1
2
3
4
5
{
    "Effect": "Allow",
    "Action": ["s3:GetObject", "s3:HeadObject"],
    "Resource": "arn:aws:s3:::ml-data-bucket/*"
}

Make sure you include s3:HeadObject specifically – s3:GetObject alone is not enough for head_object calls.

SQLite Database Locked

1
sqlite3.OperationalError: database is locked

This happens when your monitoring process tries to read while another process is writing. Set a timeout on the connection:

1
conn = sqlite3.connect("app.db", timeout=30)

That gives the writer up to 30 seconds to release the lock before raising the error. If you are hitting this constantly, switch to PostgreSQL or add WAL mode:

1
conn.execute("PRAGMA journal_mode=WAL")

Slack Webhook Returns 400

1
Slack alert failed: 400 invalid_payload

The Block Kit payload has a formatting problem. The most common cause is a text field exceeding 3000 characters. Truncate long messages before sending:

1
detail = detail[:2900] if len(detail) > 2900 else detail

Also check that your webhook URL is still active. Slack deactivates webhooks after extended periods of inactivity.