If you’re running AI systems that process user data, you need automated deletion. Not “we’ll get to it eventually” deletion — scheduled, auditable, policy-driven cleanup that runs without human intervention. GDPR Article 17 doesn’t care about your backlog.
Here’s how to build a data retention system that handles training data, inference logs, and model artifacts with configurable TTLs, dry-run support, and full audit trails.
Define the Data Model#
Start with SQLAlchemy 2.0 models that track what data exists and when it should expire. Every record gets a created_at timestamp and a retention_days field.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
| from datetime import datetime, timedelta, timezone
from pathlib import Path
from sqlalchemy import String, Integer, DateTime, Boolean, Text, create_engine, select, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session
class Base(DeclarativeBase):
pass
class TrainingRecord(Base):
__tablename__ = "training_records"
id: Mapped[int] = mapped_column(primary_key=True)
dataset_name: Mapped[str] = mapped_column(String(255))
user_id: Mapped[str] = mapped_column(String(255), index=True)
file_path: Mapped[str] = mapped_column(String(1024))
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
)
retention_days: Mapped[int] = mapped_column(Integer, default=90)
@property
def expires_at(self) -> datetime:
return self.created_at + timedelta(days=self.retention_days)
class InferenceLog(Base):
__tablename__ = "inference_logs"
id: Mapped[int] = mapped_column(primary_key=True)
model_name: Mapped[str] = mapped_column(String(255))
user_id: Mapped[str] = mapped_column(String(255), index=True)
prompt_text: Mapped[str] = mapped_column(Text)
response_text: Mapped[str] = mapped_column(Text)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
)
retention_days: Mapped[int] = mapped_column(Integer, default=30)
class DeletionAuditLog(Base):
__tablename__ = "deletion_audit_log"
id: Mapped[int] = mapped_column(primary_key=True)
table_name: Mapped[str] = mapped_column(String(255))
record_id: Mapped[int] = mapped_column(Integer)
user_id: Mapped[str] = mapped_column(String(255), index=True)
reason: Mapped[str] = mapped_column(String(255))
deleted_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
)
dry_run: Mapped[bool] = mapped_column(Boolean, default=False)
engine = create_engine("sqlite:///ai_data.db")
Base.metadata.create_all(engine)
|
The DeletionAuditLog is non-negotiable. When an auditor asks “what happened to record X?”, you need a concrete answer with timestamps.
Build the Retention Engine#
The core logic scans for expired records, optionally deletes associated files, and logs every action. The dry_run flag lets you preview what would be deleted without touching anything.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
| import logging
from dataclasses import dataclass
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("retention")
@dataclass
class DeletionResult:
table: str
records_found: int
records_deleted: int
files_removed: int
dry_run: bool
def delete_expired_records(
engine,
model_class,
reason: str = "retention_policy",
dry_run: bool = False,
) -> DeletionResult:
now = datetime.now(timezone.utc)
files_removed = 0
with Session(engine) as session:
stmt = select(model_class).where(
model_class.created_at + func.cast(
model_class.retention_days * 86400, Integer
) < func.extract("epoch", now)
)
# For SQLite compatibility, use a simpler approach:
expired = []
all_stmt = select(model_class)
for record in session.scalars(all_stmt):
if record.expires_at < now:
expired.append(record)
logger.info(
"Found %d expired records in %s (dry_run=%s)",
len(expired), model_class.__tablename__, dry_run,
)
for record in expired:
# Remove associated file if it exists
if hasattr(record, "file_path") and record.file_path:
artifact_path = Path(record.file_path)
if artifact_path.exists():
if not dry_run:
artifact_path.unlink()
files_removed += 1
logger.info("File %s: %s", "would delete" if dry_run else "deleted", artifact_path)
# Log the deletion
audit_entry = DeletionAuditLog(
table_name=model_class.__tablename__,
record_id=record.id,
user_id=record.user_id,
reason=reason,
dry_run=dry_run,
)
session.add(audit_entry)
if not dry_run:
session.delete(record)
session.commit()
return DeletionResult(
table=model_class.__tablename__,
records_found=len(expired),
records_deleted=0 if dry_run else len(expired),
files_removed=files_removed,
dry_run=dry_run,
)
|
Notice we iterate all records and check expires_at in Python. This keeps the expiration logic in one place — the model property — rather than duplicating it in SQL. For tables with millions of rows, you’d push that filter into the query and use database-native date arithmetic instead.
Schedule Automated Cleanup with APScheduler#
Wire the retention engine into a scheduler so it runs nightly. APScheduler 3.x handles this cleanly.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
def nightly_cleanup():
logger.info("Starting scheduled retention cleanup")
for model_class in [TrainingRecord, InferenceLog]:
result = delete_expired_records(engine, model_class, dry_run=False)
logger.info(
"Cleanup complete: %s — deleted %d/%d records, %d files removed",
result.table, result.records_deleted, result.records_found, result.files_removed,
)
scheduler = BlockingScheduler()
scheduler.add_job(
nightly_cleanup,
trigger=CronTrigger(hour=2, minute=0), # Run at 2 AM UTC
id="retention_cleanup",
name="Nightly data retention cleanup",
misfire_grace_time=3600,
)
if __name__ == "__main__":
logger.info("Retention scheduler started. Cleanup runs at 02:00 UTC.")
scheduler.start()
|
Set misfire_grace_time to something reasonable. If your server was down at 2 AM, you still want the job to fire when it comes back — within an hour is a good window.
Handle GDPR Right-to-Deletion Requests#
Article 17 requests need a different path. Instead of waiting for TTL expiration, you delete everything for a specific user immediately.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| def handle_deletion_request(engine, user_id: str, dry_run: bool = False) -> dict:
"""Process a GDPR Article 17 right-to-erasure request for a user."""
results = {}
for model_class in [TrainingRecord, InferenceLog]:
with Session(engine) as session:
stmt = select(model_class).where(model_class.user_id == user_id)
records = list(session.scalars(stmt))
files_removed = 0
for record in records:
if hasattr(record, "file_path") and record.file_path:
artifact_path = Path(record.file_path)
if artifact_path.exists() and not dry_run:
artifact_path.unlink()
files_removed += 1
audit_entry = DeletionAuditLog(
table_name=model_class.__tablename__,
record_id=record.id,
user_id=user_id,
reason="gdpr_article_17_request",
dry_run=dry_run,
)
session.add(audit_entry)
if not dry_run:
session.delete(record)
session.commit()
results[model_class.__tablename__] = {
"records_found": len(records),
"records_deleted": 0 if dry_run else len(records),
"files_removed": files_removed,
}
logger.info("GDPR deletion for user %s: %s (dry_run=%s)", user_id, results, dry_run)
return results
|
Always run this with dry_run=True first. You’d be surprised how often someone submits a deletion request for the wrong user ID.
Clean Up Model Artifacts by Age#
Training runs produce checkpoints, ONNX exports, and TensorBoard logs that pile up fast. Use pathlib to sweep a directory tree and remove anything past its retention window.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
| def cleanup_model_artifacts(
artifacts_dir: Path,
max_age_days: int = 60,
dry_run: bool = False,
) -> list[Path]:
"""Remove model artifacts older than max_age_days."""
now = datetime.now(timezone.utc)
cutoff = now - timedelta(days=max_age_days)
removed = []
if not artifacts_dir.exists():
logger.warning("Artifacts directory does not exist: %s", artifacts_dir)
return removed
suffixes_to_clean = {".pt", ".pth", ".onnx", ".safetensors", ".bin", ".pkl"}
for file_path in artifacts_dir.rglob("*"):
if not file_path.is_file():
continue
if file_path.suffix not in suffixes_to_clean:
continue
mtime = datetime.fromtimestamp(file_path.stat().st_mtime, tz=timezone.utc)
if mtime < cutoff:
size_mb = file_path.stat().st_size / (1024 * 1024)
if dry_run:
logger.info("Would delete: %s (%.1f MB, modified %s)", file_path, size_mb, mtime.date())
else:
file_path.unlink()
logger.info("Deleted: %s (%.1f MB, modified %s)", file_path, size_mb, mtime.date())
removed.append(file_path)
logger.info(
"Artifact cleanup: %d files %s (dry_run=%s)",
len(removed), "would be removed" if dry_run else "removed", dry_run,
)
return removed
# Usage
stale_files = cleanup_model_artifacts(
artifacts_dir=Path("/data/models/checkpoints"),
max_age_days=60,
dry_run=True,
)
|
The suffix allowlist prevents you from accidentally nuking config files or READMEs that live alongside model weights.
Putting It All Together#
Here’s how you’d wire the full system for a production deployment:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| # run_retention.py
from pathlib import Path
def run_full_retention(dry_run: bool = True):
"""Execute all retention policies."""
logger.info("=== Retention sweep started (dry_run=%s) ===", dry_run)
# 1. Database records
for model_class in [TrainingRecord, InferenceLog]:
result = delete_expired_records(engine, model_class, dry_run=dry_run)
logger.info("%s: %d expired, %d deleted", result.table, result.records_found, result.records_deleted)
# 2. Model artifacts on disk
artifact_dirs = [
Path("/data/models/checkpoints"),
Path("/data/models/exports"),
]
for d in artifact_dirs:
cleanup_model_artifacts(d, max_age_days=60, dry_run=dry_run)
logger.info("=== Retention sweep complete ===")
# First run: always dry-run
run_full_retention(dry_run=True)
# After reviewing logs, run for real
# run_full_retention(dry_run=False)
|
Start every deployment with dry_run=True. Review the logs. Then flip the flag. This two-step pattern has saved me from deleting production data more than once.
Common Errors and Fixes#
sqlalchemy.exc.OperationalError: no such table
You forgot to call Base.metadata.create_all(engine) before running queries. This creates all tables defined in your models. In production, use Alembic migrations instead of create_all.
PermissionError: [Errno 13] Permission denied when deleting files
Your process doesn’t have write access to the artifacts directory. Run with the correct user or fix directory permissions:
1
2
3
4
5
| # Check ownership
ls -la /data/models/checkpoints/
# Fix if needed (match your service user)
sudo chown -R appuser:appuser /data/models/checkpoints/
|
TypeError: can't subtract offset-naive and offset-aware datetimes
You’re mixing timezone-aware and naive datetimes. The fix: always use timezone.utc. Replace datetime.utcnow() (which returns naive) with datetime.now(timezone.utc) everywhere. The code above already does this correctly.
APScheduler job silently doesn’t run
Check that you’re using BlockingScheduler for standalone scripts or BackgroundScheduler if running inside a web app. BlockingScheduler.start() blocks the main thread — that’s by design. If you need it alongside Flask or FastAPI, switch to:
1
2
3
4
5
6
| from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
scheduler.add_job(nightly_cleanup, trigger=CronTrigger(hour=2, minute=0))
scheduler.start()
# Your web app continues running here
|
GDPR deletion misses records in other tables
The handle_deletion_request function only covers the tables you explicitly list. If you add a new table that stores user data, add it to the loop. A good practice: maintain a registry of all user-data models.
1
| USER_DATA_MODELS = [TrainingRecord, InferenceLog] # Add new models here
|
Then reference USER_DATA_MODELS in both the retention engine and the GDPR handler so they stay in sync.
Audit log grows forever
The DeletionAuditLog table itself isn’t subject to retention by default. Depending on your compliance requirements, you may need to keep audit logs for years. But if you do want to trim them, apply the same pattern — just with a much longer TTL (e.g., 2555 days for 7 years).