If your AI system trains on user data and you don’t have a consent management layer, you’re one regulatory inquiry away from a very bad week. GDPR Article 17 gives users the right to erasure. CCPA lets California residents opt out of data sales. The EU AI Act adds transparency requirements on top. You need actual infrastructure to track who said yes, who said no, and what happens when someone changes their mind.
Here’s a minimal consent record using Pydantic, which we’ll expand into a full system:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| from pydantic import BaseModel, Field
from datetime import datetime
from enum import Enum
from uuid import uuid4
class ConsentPurpose(str, Enum):
TRAINING = "model_training"
INFERENCE = "inference"
ANALYTICS = "analytics"
PERSONALIZATION = "personalization"
class ConsentStatus(str, Enum):
GRANTED = "granted"
REVOKED = "revoked"
PENDING = "pending"
class ConsentRecord(BaseModel):
id: str = Field(default_factory=lambda: str(uuid4()))
user_id: str
purpose: ConsentPurpose
status: ConsentStatus
granted_at: datetime | None = None
revoked_at: datetime | None = None
ip_address: str | None = None
user_agent: str | None = None
legal_basis: str = "consent" # consent, legitimate_interest, contract
|
This gives you a typed, validated consent record with audit-relevant fields. Every consent change gets its own record – you never overwrite history.
Setting Up the Database Layer#
SQLAlchemy handles persistence. We store consent records and audit logs in SQLite for simplicity, but swap in PostgreSQL for production. The key design choice: consent records are append-only. When a user revokes consent, you insert a new record with status=revoked rather than updating the old one. This preserves the full timeline for audits.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| from sqlalchemy import create_engine, Column, String, DateTime, Enum as SAEnum, Text
from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker
from datetime import datetime, timezone
class Base(DeclarativeBase):
pass
class ConsentEntry(Base):
__tablename__ = "consent_records"
id = Column(String, primary_key=True)
user_id = Column(String, index=True, nullable=False)
purpose = Column(String, nullable=False)
status = Column(String, nullable=False)
granted_at = Column(DateTime, nullable=True)
revoked_at = Column(DateTime, nullable=True)
ip_address = Column(String, nullable=True)
legal_basis = Column(String, default="consent")
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
class AuditLog(Base):
__tablename__ = "audit_logs"
id = Column(String, primary_key=True, default=lambda: str(uuid4()))
timestamp = Column(DateTime, default=lambda: datetime.now(timezone.utc))
user_id = Column(String, index=True, nullable=False)
action = Column(String, nullable=False) # consent_granted, consent_revoked, data_deleted, data_exported
details = Column(Text, nullable=True)
performed_by = Column(String, nullable=False) # system, user, admin
from uuid import uuid4
engine = create_engine("sqlite:///consent.db")
Base.metadata.create_all(engine)
SessionLocal = sessionmaker(bind=engine)
|
The AuditLog table is critical. Regulators don’t just want to see current consent status – they want to see every action taken, when, and by whom. Every endpoint in your API should write an audit log entry.
Building the Data Rights API#
FastAPI gives you automatic OpenAPI docs, which is useful when your legal team asks “what endpoints exist for data rights?” Use the lifespan context manager for startup/shutdown logic – the old @app.on_event("startup") pattern is deprecated.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
| from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
from datetime import datetime, timezone
from uuid import uuid4
import json
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: ensure tables exist
Base.metadata.create_all(engine)
yield
# Shutdown: clean up connections
engine.dispose()
app = FastAPI(title="Data Rights API", lifespan=lifespan)
class GrantConsentRequest(BaseModel):
user_id: str
purpose: ConsentPurpose
legal_basis: str = "consent"
class RevokeConsentRequest(BaseModel):
user_id: str
purpose: ConsentPurpose
def log_audit(db: Session, user_id: str, action: str, details: str, performed_by: str = "system"):
entry = AuditLog(
id=str(uuid4()),
user_id=user_id,
action=action,
details=details,
performed_by=performed_by,
)
db.add(entry)
db.commit()
@app.post("/consent/grant")
def grant_consent(req: GrantConsentRequest, request: Request):
now = datetime.now(timezone.utc)
db = SessionLocal()
try:
entry = ConsentEntry(
id=str(uuid4()),
user_id=req.user_id,
purpose=req.purpose.value,
status="granted",
granted_at=now,
ip_address=request.client.host if request.client else None,
legal_basis=req.legal_basis,
created_at=now,
)
db.add(entry)
log_audit(db, req.user_id, "consent_granted", f"purpose={req.purpose.value}")
db.commit()
return {"status": "granted", "id": entry.id, "timestamp": now.isoformat()}
finally:
db.close()
@app.post("/consent/revoke")
def revoke_consent(req: RevokeConsentRequest):
now = datetime.now(timezone.utc)
db = SessionLocal()
try:
entry = ConsentEntry(
id=str(uuid4()),
user_id=req.user_id,
purpose=req.purpose.value,
status="revoked",
revoked_at=now,
created_at=now,
)
db.add(entry)
log_audit(db, req.user_id, "consent_revoked", f"purpose={req.purpose.value}")
db.commit()
return {"status": "revoked", "timestamp": now.isoformat()}
finally:
db.close()
@app.get("/consent/{user_id}")
def get_consent_status(user_id: str):
db = SessionLocal()
try:
records = (
db.query(ConsentEntry)
.filter(ConsentEntry.user_id == user_id)
.order_by(ConsentEntry.created_at.desc())
.all()
)
if not records:
raise HTTPException(status_code=404, detail="No consent records found")
# Latest record per purpose determines current status
current = {}
for r in records:
if r.purpose not in current:
current[r.purpose] = {"status": r.status, "updated_at": r.created_at.isoformat()}
return {"user_id": user_id, "consent": current}
finally:
db.close()
@app.post("/data/delete/{user_id}")
def request_data_deletion(user_id: str):
db = SessionLocal()
try:
# Check if user has any records
records = db.query(ConsentEntry).filter(ConsentEntry.user_id == user_id).all()
if not records:
raise HTTPException(status_code=404, detail="User not found")
# Trigger deletion pipeline
deletion_result = execute_deletion_pipeline(user_id)
log_audit(db, user_id, "data_deleted", json.dumps(deletion_result), performed_by="user")
db.commit()
return {"status": "deletion_complete", "details": deletion_result}
finally:
db.close()
@app.get("/data/export/{user_id}")
def export_user_data(user_id: str):
db = SessionLocal()
try:
consent_records = (
db.query(ConsentEntry).filter(ConsentEntry.user_id == user_id).all()
)
audit_records = (
db.query(AuditLog).filter(AuditLog.user_id == user_id).all()
)
export = {
"user_id": user_id,
"exported_at": datetime.now(timezone.utc).isoformat(),
"consent_history": [
{
"purpose": r.purpose,
"status": r.status,
"granted_at": r.granted_at.isoformat() if r.granted_at else None,
"revoked_at": r.revoked_at.isoformat() if r.revoked_at else None,
"legal_basis": r.legal_basis,
}
for r in consent_records
],
"audit_trail": [
{
"action": r.action,
"timestamp": r.timestamp.isoformat(),
"details": r.details,
}
for r in audit_records
],
}
log_audit(db, user_id, "data_exported", "Full data export requested", performed_by="user")
db.commit()
return export
finally:
db.close()
|
The /data/export/{user_id} endpoint satisfies GDPR Article 20 (right to data portability). Return everything you have about the user in a machine-readable format.
Data Deletion Pipeline and Opt-Out Filtering#
When a user requests deletion, you need to purge their data from everywhere: training datasets, vector stores, embedding caches, and any derived models. This function handles the orchestration:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
| import os
import shutil
from pathlib import Path
def execute_deletion_pipeline(user_id: str) -> dict:
"""Remove user data from all storage locations."""
results = {}
# 1. Remove from training dataset files
dataset_dir = Path("./datasets/training")
if dataset_dir.exists():
for file in dataset_dir.glob("*.jsonl"):
original_lines = file.read_text().splitlines()
filtered_lines = [
line for line in original_lines
if f'"user_id": "{user_id}"' not in line
]
removed_count = len(original_lines) - len(filtered_lines)
if removed_count > 0:
file.write_text("\n".join(filtered_lines) + "\n")
results[str(file)] = f"removed {removed_count} records"
# 2. Remove cached embeddings
embedding_cache = Path(f"./cache/embeddings/{user_id}")
if embedding_cache.exists():
shutil.rmtree(embedding_cache)
results["embedding_cache"] = "deleted"
# 3. Remove from vector store (using ChromaDB as example)
try:
import chromadb
client = chromadb.PersistentClient(path="./vectorstore")
collection = client.get_or_create_collection("user_data")
# Delete all documents belonging to this user
collection.delete(where={"user_id": user_id})
results["vector_store"] = "purged"
except ImportError:
results["vector_store"] = "chromadb not installed, skipped"
except Exception as e:
results["vector_store"] = f"error: {str(e)}"
# 4. Remove from database
db = SessionLocal()
try:
# Keep consent/audit records for legal compliance, but mark as deleted
log_audit(db, user_id, "data_deleted", f"Pipeline results: {results}", performed_by="system")
db.commit()
finally:
db.close()
return results
|
For training pipelines, you need a filter that checks consent before including any data point. Wrap your data loader with this:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
| from typing import Iterator
def consent_filtered_loader(data_source: Iterator[dict], purpose: str = "model_training") -> Iterator[dict]:
"""Only yield data points from users who have granted consent for the given purpose."""
db = SessionLocal()
consent_cache: dict[str, bool] = {}
try:
for record in data_source:
user_id = record.get("user_id")
if not user_id:
continue
if user_id not in consent_cache:
# Get the latest consent record for this purpose
latest = (
db.query(ConsentEntry)
.filter(
ConsentEntry.user_id == user_id,
ConsentEntry.purpose == purpose,
)
.order_by(ConsentEntry.created_at.desc())
.first()
)
consent_cache[user_id] = latest is not None and latest.status == "granted"
if consent_cache[user_id]:
yield record
finally:
db.close()
# Usage in a training pipeline:
def load_training_data():
"""Load JSONL training data with consent filtering."""
import json
from pathlib import Path
raw_data = Path("./datasets/training/data.jsonl")
if not raw_data.exists():
return
def raw_records():
for line in raw_data.read_text().splitlines():
if line.strip():
yield json.loads(line)
for record in consent_filtered_loader(raw_records(), purpose="model_training"):
# Only consented data reaches this point
yield record
|
The consent_cache dictionary prevents hammering the database on every record. For datasets with millions of rows, this makes the difference between a 10-minute and a 10-hour training prep step.
Common Errors and Fixes#
“No consent records found” on every request
You’re querying before any consent has been granted. Seed initial consent records during user signup, or return a default “no consent” status instead of a 404:
1
2
3
4
5
6
| # Instead of raising 404, return default state
if not records:
return {
"user_id": user_id,
"consent": {p.value: {"status": "pending"} for p in ConsentPurpose},
}
|
SQLAlchemy DetachedInstanceError when accessing attributes after session close
This happens when you close the session and then try to read lazy-loaded attributes. Fix it by eagerly loading everything you need before closing, or by converting to dictionaries inside the try block. The code above does this correctly – all attribute access happens before db.close().
Stale consent cache during long training runs
The consent_cache in consent_filtered_loader doesn’t refresh if a user revokes consent mid-run. For large-scale training, add a TTL to the cache or re-query every N records:
1
2
3
4
5
6
7
8
9
| import time
CACHE_TTL_SECONDS = 300 # Re-check every 5 minutes
cache_timestamps: dict[str, float] = {}
def is_cache_valid(user_id: str) -> bool:
if user_id not in cache_timestamps:
return False
return (time.time() - cache_timestamps[user_id]) < CACHE_TTL_SECONDS
|
GDPR requires you keep audit logs even after deletion
Don’t delete the AuditLog and ConsentEntry records when processing a deletion request. You need proof that you processed the deletion. Only purge the actual user-generated data (training samples, embeddings, cached outputs). The consent and audit trail stays.
FastAPI returns 422 for consent requests
Your request body doesn’t match the Pydantic model. The purpose field expects one of the ConsentPurpose enum values: model_training, inference, analytics, or personalization. Sending "purpose": "training" instead of "purpose": "model_training" triggers a validation error.