If you’re running AI systems that process user data, you need automated deletion. Not “we’ll get to it eventually” deletion — scheduled, auditable, policy-driven cleanup that runs without human intervention. GDPR Article 17 doesn’t care about your backlog.

Here’s how to build a data retention system that handles training data, inference logs, and model artifacts with configurable TTLs, dry-run support, and full audit trails.

Define the Data Model

Start with SQLAlchemy 2.0 models that track what data exists and when it should expire. Every record gets a created_at timestamp and a retention_days field.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from datetime import datetime, timedelta, timezone
from pathlib import Path
from sqlalchemy import String, Integer, DateTime, Boolean, Text, create_engine, select, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session

class Base(DeclarativeBase):
    pass

class TrainingRecord(Base):
    __tablename__ = "training_records"

    id: Mapped[int] = mapped_column(primary_key=True)
    dataset_name: Mapped[str] = mapped_column(String(255))
    user_id: Mapped[str] = mapped_column(String(255), index=True)
    file_path: Mapped[str] = mapped_column(String(1024))
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
    )
    retention_days: Mapped[int] = mapped_column(Integer, default=90)

    @property
    def expires_at(self) -> datetime:
        return self.created_at + timedelta(days=self.retention_days)

class InferenceLog(Base):
    __tablename__ = "inference_logs"

    id: Mapped[int] = mapped_column(primary_key=True)
    model_name: Mapped[str] = mapped_column(String(255))
    user_id: Mapped[str] = mapped_column(String(255), index=True)
    prompt_text: Mapped[str] = mapped_column(Text)
    response_text: Mapped[str] = mapped_column(Text)
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
    )
    retention_days: Mapped[int] = mapped_column(Integer, default=30)

class DeletionAuditLog(Base):
    __tablename__ = "deletion_audit_log"

    id: Mapped[int] = mapped_column(primary_key=True)
    table_name: Mapped[str] = mapped_column(String(255))
    record_id: Mapped[int] = mapped_column(Integer)
    user_id: Mapped[str] = mapped_column(String(255), index=True)
    reason: Mapped[str] = mapped_column(String(255))
    deleted_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
    )
    dry_run: Mapped[bool] = mapped_column(Boolean, default=False)

engine = create_engine("sqlite:///ai_data.db")
Base.metadata.create_all(engine)

The DeletionAuditLog is non-negotiable. When an auditor asks “what happened to record X?”, you need a concrete answer with timestamps.

Build the Retention Engine

The core logic scans for expired records, optionally deletes associated files, and logs every action. The dry_run flag lets you preview what would be deleted without touching anything.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import logging
from dataclasses import dataclass

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("retention")

@dataclass
class DeletionResult:
    table: str
    records_found: int
    records_deleted: int
    files_removed: int
    dry_run: bool

def delete_expired_records(
    engine,
    model_class,
    reason: str = "retention_policy",
    dry_run: bool = False,
) -> DeletionResult:
    now = datetime.now(timezone.utc)
    files_removed = 0

    with Session(engine) as session:
        stmt = select(model_class).where(
            model_class.created_at + func.cast(
                model_class.retention_days * 86400, Integer
            ) < func.extract("epoch", now)
        )
        # For SQLite compatibility, use a simpler approach:
        expired = []
        all_stmt = select(model_class)
        for record in session.scalars(all_stmt):
            if record.expires_at < now:
                expired.append(record)

        logger.info(
            "Found %d expired records in %s (dry_run=%s)",
            len(expired), model_class.__tablename__, dry_run,
        )

        for record in expired:
            # Remove associated file if it exists
            if hasattr(record, "file_path") and record.file_path:
                artifact_path = Path(record.file_path)
                if artifact_path.exists():
                    if not dry_run:
                        artifact_path.unlink()
                        files_removed += 1
                    logger.info("File %s: %s", "would delete" if dry_run else "deleted", artifact_path)

            # Log the deletion
            audit_entry = DeletionAuditLog(
                table_name=model_class.__tablename__,
                record_id=record.id,
                user_id=record.user_id,
                reason=reason,
                dry_run=dry_run,
            )
            session.add(audit_entry)

            if not dry_run:
                session.delete(record)

        session.commit()

    return DeletionResult(
        table=model_class.__tablename__,
        records_found=len(expired),
        records_deleted=0 if dry_run else len(expired),
        files_removed=files_removed,
        dry_run=dry_run,
    )

Notice we iterate all records and check expires_at in Python. This keeps the expiration logic in one place — the model property — rather than duplicating it in SQL. For tables with millions of rows, you’d push that filter into the query and use database-native date arithmetic instead.

Schedule Automated Cleanup with APScheduler

Wire the retention engine into a scheduler so it runs nightly. APScheduler 3.x handles this cleanly.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger

def nightly_cleanup():
    logger.info("Starting scheduled retention cleanup")
    for model_class in [TrainingRecord, InferenceLog]:
        result = delete_expired_records(engine, model_class, dry_run=False)
        logger.info(
            "Cleanup complete: %s — deleted %d/%d records, %d files removed",
            result.table, result.records_deleted, result.records_found, result.files_removed,
        )

scheduler = BlockingScheduler()
scheduler.add_job(
    nightly_cleanup,
    trigger=CronTrigger(hour=2, minute=0),  # Run at 2 AM UTC
    id="retention_cleanup",
    name="Nightly data retention cleanup",
    misfire_grace_time=3600,
)

if __name__ == "__main__":
    logger.info("Retention scheduler started. Cleanup runs at 02:00 UTC.")
    scheduler.start()

Set misfire_grace_time to something reasonable. If your server was down at 2 AM, you still want the job to fire when it comes back — within an hour is a good window.

Handle GDPR Right-to-Deletion Requests

Article 17 requests need a different path. Instead of waiting for TTL expiration, you delete everything for a specific user immediately.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def handle_deletion_request(engine, user_id: str, dry_run: bool = False) -> dict:
    """Process a GDPR Article 17 right-to-erasure request for a user."""
    results = {}

    for model_class in [TrainingRecord, InferenceLog]:
        with Session(engine) as session:
            stmt = select(model_class).where(model_class.user_id == user_id)
            records = list(session.scalars(stmt))
            files_removed = 0

            for record in records:
                if hasattr(record, "file_path") and record.file_path:
                    artifact_path = Path(record.file_path)
                    if artifact_path.exists() and not dry_run:
                        artifact_path.unlink()
                        files_removed += 1

                audit_entry = DeletionAuditLog(
                    table_name=model_class.__tablename__,
                    record_id=record.id,
                    user_id=user_id,
                    reason="gdpr_article_17_request",
                    dry_run=dry_run,
                )
                session.add(audit_entry)

                if not dry_run:
                    session.delete(record)

            session.commit()

            results[model_class.__tablename__] = {
                "records_found": len(records),
                "records_deleted": 0 if dry_run else len(records),
                "files_removed": files_removed,
            }

    logger.info("GDPR deletion for user %s: %s (dry_run=%s)", user_id, results, dry_run)
    return results

Always run this with dry_run=True first. You’d be surprised how often someone submits a deletion request for the wrong user ID.

Clean Up Model Artifacts by Age

Training runs produce checkpoints, ONNX exports, and TensorBoard logs that pile up fast. Use pathlib to sweep a directory tree and remove anything past its retention window.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def cleanup_model_artifacts(
    artifacts_dir: Path,
    max_age_days: int = 60,
    dry_run: bool = False,
) -> list[Path]:
    """Remove model artifacts older than max_age_days."""
    now = datetime.now(timezone.utc)
    cutoff = now - timedelta(days=max_age_days)
    removed = []

    if not artifacts_dir.exists():
        logger.warning("Artifacts directory does not exist: %s", artifacts_dir)
        return removed

    suffixes_to_clean = {".pt", ".pth", ".onnx", ".safetensors", ".bin", ".pkl"}

    for file_path in artifacts_dir.rglob("*"):
        if not file_path.is_file():
            continue
        if file_path.suffix not in suffixes_to_clean:
            continue

        mtime = datetime.fromtimestamp(file_path.stat().st_mtime, tz=timezone.utc)
        if mtime < cutoff:
            size_mb = file_path.stat().st_size / (1024 * 1024)
            if dry_run:
                logger.info("Would delete: %s (%.1f MB, modified %s)", file_path, size_mb, mtime.date())
            else:
                file_path.unlink()
                logger.info("Deleted: %s (%.1f MB, modified %s)", file_path, size_mb, mtime.date())
            removed.append(file_path)

    logger.info(
        "Artifact cleanup: %d files %s (dry_run=%s)",
        len(removed), "would be removed" if dry_run else "removed", dry_run,
    )
    return removed

# Usage
stale_files = cleanup_model_artifacts(
    artifacts_dir=Path("/data/models/checkpoints"),
    max_age_days=60,
    dry_run=True,
)

The suffix allowlist prevents you from accidentally nuking config files or READMEs that live alongside model weights.

Putting It All Together

Here’s how you’d wire the full system for a production deployment:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# run_retention.py
from pathlib import Path

def run_full_retention(dry_run: bool = True):
    """Execute all retention policies."""
    logger.info("=== Retention sweep started (dry_run=%s) ===", dry_run)

    # 1. Database records
    for model_class in [TrainingRecord, InferenceLog]:
        result = delete_expired_records(engine, model_class, dry_run=dry_run)
        logger.info("%s: %d expired, %d deleted", result.table, result.records_found, result.records_deleted)

    # 2. Model artifacts on disk
    artifact_dirs = [
        Path("/data/models/checkpoints"),
        Path("/data/models/exports"),
    ]
    for d in artifact_dirs:
        cleanup_model_artifacts(d, max_age_days=60, dry_run=dry_run)

    logger.info("=== Retention sweep complete ===")

# First run: always dry-run
run_full_retention(dry_run=True)

# After reviewing logs, run for real
# run_full_retention(dry_run=False)

Start every deployment with dry_run=True. Review the logs. Then flip the flag. This two-step pattern has saved me from deleting production data more than once.

Common Errors and Fixes

sqlalchemy.exc.OperationalError: no such table

You forgot to call Base.metadata.create_all(engine) before running queries. This creates all tables defined in your models. In production, use Alembic migrations instead of create_all.

PermissionError: [Errno 13] Permission denied when deleting files

Your process doesn’t have write access to the artifacts directory. Run with the correct user or fix directory permissions:

1
2
3
4
5
# Check ownership
ls -la /data/models/checkpoints/

# Fix if needed (match your service user)
sudo chown -R appuser:appuser /data/models/checkpoints/

TypeError: can't subtract offset-naive and offset-aware datetimes

You’re mixing timezone-aware and naive datetimes. The fix: always use timezone.utc. Replace datetime.utcnow() (which returns naive) with datetime.now(timezone.utc) everywhere. The code above already does this correctly.

APScheduler job silently doesn’t run

Check that you’re using BlockingScheduler for standalone scripts or BackgroundScheduler if running inside a web app. BlockingScheduler.start() blocks the main thread — that’s by design. If you need it alongside Flask or FastAPI, switch to:

1
2
3
4
5
6
from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()
scheduler.add_job(nightly_cleanup, trigger=CronTrigger(hour=2, minute=0))
scheduler.start()
# Your web app continues running here

GDPR deletion misses records in other tables

The handle_deletion_request function only covers the tables you explicitly list. If you add a new table that stores user data, add it to the loop. A good practice: maintain a registry of all user-data models.

1
USER_DATA_MODELS = [TrainingRecord, InferenceLog]  # Add new models here

Then reference USER_DATA_MODELS in both the retention engine and the GDPR handler so they stay in sync.

Audit log grows forever

The DeletionAuditLog table itself isn’t subject to retention by default. Depending on your compliance requirements, you may need to keep audit logs for years. But if you do want to trim them, apply the same pattern — just with a much longer TTL (e.g., 2555 days for 7 years).