If your AI system trains on user data and you don’t have a consent management layer, you’re one regulatory inquiry away from a very bad week. GDPR Article 17 gives users the right to erasure. CCPA lets California residents opt out of data sales. The EU AI Act adds transparency requirements on top. You need actual infrastructure to track who said yes, who said no, and what happens when someone changes their mind.

Here’s a minimal consent record using Pydantic, which we’ll expand into a full system:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from pydantic import BaseModel, Field
from datetime import datetime
from enum import Enum
from uuid import uuid4


class ConsentPurpose(str, Enum):
    TRAINING = "model_training"
    INFERENCE = "inference"
    ANALYTICS = "analytics"
    PERSONALIZATION = "personalization"


class ConsentStatus(str, Enum):
    GRANTED = "granted"
    REVOKED = "revoked"
    PENDING = "pending"


class ConsentRecord(BaseModel):
    id: str = Field(default_factory=lambda: str(uuid4()))
    user_id: str
    purpose: ConsentPurpose
    status: ConsentStatus
    granted_at: datetime | None = None
    revoked_at: datetime | None = None
    ip_address: str | None = None
    user_agent: str | None = None
    legal_basis: str = "consent"  # consent, legitimate_interest, contract

This gives you a typed, validated consent record with audit-relevant fields. Every consent change gets its own record – you never overwrite history.

Setting Up the Database Layer

SQLAlchemy handles persistence. We store consent records and audit logs in SQLite for simplicity, but swap in PostgreSQL for production. The key design choice: consent records are append-only. When a user revokes consent, you insert a new record with status=revoked rather than updating the old one. This preserves the full timeline for audits.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from sqlalchemy import create_engine, Column, String, DateTime, Enum as SAEnum, Text
from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker
from datetime import datetime, timezone


class Base(DeclarativeBase):
    pass


class ConsentEntry(Base):
    __tablename__ = "consent_records"

    id = Column(String, primary_key=True)
    user_id = Column(String, index=True, nullable=False)
    purpose = Column(String, nullable=False)
    status = Column(String, nullable=False)
    granted_at = Column(DateTime, nullable=True)
    revoked_at = Column(DateTime, nullable=True)
    ip_address = Column(String, nullable=True)
    legal_basis = Column(String, default="consent")
    created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))


class AuditLog(Base):
    __tablename__ = "audit_logs"

    id = Column(String, primary_key=True, default=lambda: str(uuid4()))
    timestamp = Column(DateTime, default=lambda: datetime.now(timezone.utc))
    user_id = Column(String, index=True, nullable=False)
    action = Column(String, nullable=False)  # consent_granted, consent_revoked, data_deleted, data_exported
    details = Column(Text, nullable=True)
    performed_by = Column(String, nullable=False)  # system, user, admin


from uuid import uuid4

engine = create_engine("sqlite:///consent.db")
Base.metadata.create_all(engine)
SessionLocal = sessionmaker(bind=engine)

The AuditLog table is critical. Regulators don’t just want to see current consent status – they want to see every action taken, when, and by whom. Every endpoint in your API should write an audit log entry.

Building the Data Rights API

FastAPI gives you automatic OpenAPI docs, which is useful when your legal team asks “what endpoints exist for data rights?” Use the lifespan context manager for startup/shutdown logic – the old @app.on_event("startup") pattern is deprecated.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
from datetime import datetime, timezone
from uuid import uuid4
import json


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: ensure tables exist
    Base.metadata.create_all(engine)
    yield
    # Shutdown: clean up connections
    engine.dispose()


app = FastAPI(title="Data Rights API", lifespan=lifespan)


class GrantConsentRequest(BaseModel):
    user_id: str
    purpose: ConsentPurpose
    legal_basis: str = "consent"


class RevokeConsentRequest(BaseModel):
    user_id: str
    purpose: ConsentPurpose


def log_audit(db: Session, user_id: str, action: str, details: str, performed_by: str = "system"):
    entry = AuditLog(
        id=str(uuid4()),
        user_id=user_id,
        action=action,
        details=details,
        performed_by=performed_by,
    )
    db.add(entry)
    db.commit()


@app.post("/consent/grant")
def grant_consent(req: GrantConsentRequest, request: Request):
    now = datetime.now(timezone.utc)
    db = SessionLocal()
    try:
        entry = ConsentEntry(
            id=str(uuid4()),
            user_id=req.user_id,
            purpose=req.purpose.value,
            status="granted",
            granted_at=now,
            ip_address=request.client.host if request.client else None,
            legal_basis=req.legal_basis,
            created_at=now,
        )
        db.add(entry)
        log_audit(db, req.user_id, "consent_granted", f"purpose={req.purpose.value}")
        db.commit()
        return {"status": "granted", "id": entry.id, "timestamp": now.isoformat()}
    finally:
        db.close()


@app.post("/consent/revoke")
def revoke_consent(req: RevokeConsentRequest):
    now = datetime.now(timezone.utc)
    db = SessionLocal()
    try:
        entry = ConsentEntry(
            id=str(uuid4()),
            user_id=req.user_id,
            purpose=req.purpose.value,
            status="revoked",
            revoked_at=now,
            created_at=now,
        )
        db.add(entry)
        log_audit(db, req.user_id, "consent_revoked", f"purpose={req.purpose.value}")
        db.commit()
        return {"status": "revoked", "timestamp": now.isoformat()}
    finally:
        db.close()


@app.get("/consent/{user_id}")
def get_consent_status(user_id: str):
    db = SessionLocal()
    try:
        records = (
            db.query(ConsentEntry)
            .filter(ConsentEntry.user_id == user_id)
            .order_by(ConsentEntry.created_at.desc())
            .all()
        )
        if not records:
            raise HTTPException(status_code=404, detail="No consent records found")

        # Latest record per purpose determines current status
        current = {}
        for r in records:
            if r.purpose not in current:
                current[r.purpose] = {"status": r.status, "updated_at": r.created_at.isoformat()}

        return {"user_id": user_id, "consent": current}
    finally:
        db.close()


@app.post("/data/delete/{user_id}")
def request_data_deletion(user_id: str):
    db = SessionLocal()
    try:
        # Check if user has any records
        records = db.query(ConsentEntry).filter(ConsentEntry.user_id == user_id).all()
        if not records:
            raise HTTPException(status_code=404, detail="User not found")

        # Trigger deletion pipeline
        deletion_result = execute_deletion_pipeline(user_id)

        log_audit(db, user_id, "data_deleted", json.dumps(deletion_result), performed_by="user")
        db.commit()
        return {"status": "deletion_complete", "details": deletion_result}
    finally:
        db.close()


@app.get("/data/export/{user_id}")
def export_user_data(user_id: str):
    db = SessionLocal()
    try:
        consent_records = (
            db.query(ConsentEntry).filter(ConsentEntry.user_id == user_id).all()
        )
        audit_records = (
            db.query(AuditLog).filter(AuditLog.user_id == user_id).all()
        )

        export = {
            "user_id": user_id,
            "exported_at": datetime.now(timezone.utc).isoformat(),
            "consent_history": [
                {
                    "purpose": r.purpose,
                    "status": r.status,
                    "granted_at": r.granted_at.isoformat() if r.granted_at else None,
                    "revoked_at": r.revoked_at.isoformat() if r.revoked_at else None,
                    "legal_basis": r.legal_basis,
                }
                for r in consent_records
            ],
            "audit_trail": [
                {
                    "action": r.action,
                    "timestamp": r.timestamp.isoformat(),
                    "details": r.details,
                }
                for r in audit_records
            ],
        }
        log_audit(db, user_id, "data_exported", "Full data export requested", performed_by="user")
        db.commit()
        return export
    finally:
        db.close()

The /data/export/{user_id} endpoint satisfies GDPR Article 20 (right to data portability). Return everything you have about the user in a machine-readable format.

Data Deletion Pipeline and Opt-Out Filtering

When a user requests deletion, you need to purge their data from everywhere: training datasets, vector stores, embedding caches, and any derived models. This function handles the orchestration:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import os
import shutil
from pathlib import Path


def execute_deletion_pipeline(user_id: str) -> dict:
    """Remove user data from all storage locations."""
    results = {}

    # 1. Remove from training dataset files
    dataset_dir = Path("./datasets/training")
    if dataset_dir.exists():
        for file in dataset_dir.glob("*.jsonl"):
            original_lines = file.read_text().splitlines()
            filtered_lines = [
                line for line in original_lines
                if f'"user_id": "{user_id}"' not in line
            ]
            removed_count = len(original_lines) - len(filtered_lines)
            if removed_count > 0:
                file.write_text("\n".join(filtered_lines) + "\n")
                results[str(file)] = f"removed {removed_count} records"

    # 2. Remove cached embeddings
    embedding_cache = Path(f"./cache/embeddings/{user_id}")
    if embedding_cache.exists():
        shutil.rmtree(embedding_cache)
        results["embedding_cache"] = "deleted"

    # 3. Remove from vector store (using ChromaDB as example)
    try:
        import chromadb

        client = chromadb.PersistentClient(path="./vectorstore")
        collection = client.get_or_create_collection("user_data")
        # Delete all documents belonging to this user
        collection.delete(where={"user_id": user_id})
        results["vector_store"] = "purged"
    except ImportError:
        results["vector_store"] = "chromadb not installed, skipped"
    except Exception as e:
        results["vector_store"] = f"error: {str(e)}"

    # 4. Remove from database
    db = SessionLocal()
    try:
        # Keep consent/audit records for legal compliance, but mark as deleted
        log_audit(db, user_id, "data_deleted", f"Pipeline results: {results}", performed_by="system")
        db.commit()
    finally:
        db.close()

    return results

For training pipelines, you need a filter that checks consent before including any data point. Wrap your data loader with this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from typing import Iterator


def consent_filtered_loader(data_source: Iterator[dict], purpose: str = "model_training") -> Iterator[dict]:
    """Only yield data points from users who have granted consent for the given purpose."""
    db = SessionLocal()
    consent_cache: dict[str, bool] = {}

    try:
        for record in data_source:
            user_id = record.get("user_id")
            if not user_id:
                continue

            if user_id not in consent_cache:
                # Get the latest consent record for this purpose
                latest = (
                    db.query(ConsentEntry)
                    .filter(
                        ConsentEntry.user_id == user_id,
                        ConsentEntry.purpose == purpose,
                    )
                    .order_by(ConsentEntry.created_at.desc())
                    .first()
                )
                consent_cache[user_id] = latest is not None and latest.status == "granted"

            if consent_cache[user_id]:
                yield record
    finally:
        db.close()


# Usage in a training pipeline:
def load_training_data():
    """Load JSONL training data with consent filtering."""
    import json
    from pathlib import Path

    raw_data = Path("./datasets/training/data.jsonl")
    if not raw_data.exists():
        return

    def raw_records():
        for line in raw_data.read_text().splitlines():
            if line.strip():
                yield json.loads(line)

    for record in consent_filtered_loader(raw_records(), purpose="model_training"):
        # Only consented data reaches this point
        yield record

The consent_cache dictionary prevents hammering the database on every record. For datasets with millions of rows, this makes the difference between a 10-minute and a 10-hour training prep step.

Common Errors and Fixes

“No consent records found” on every request You’re querying before any consent has been granted. Seed initial consent records during user signup, or return a default “no consent” status instead of a 404:

1
2
3
4
5
6
# Instead of raising 404, return default state
if not records:
    return {
        "user_id": user_id,
        "consent": {p.value: {"status": "pending"} for p in ConsentPurpose},
    }

SQLAlchemy DetachedInstanceError when accessing attributes after session close This happens when you close the session and then try to read lazy-loaded attributes. Fix it by eagerly loading everything you need before closing, or by converting to dictionaries inside the try block. The code above does this correctly – all attribute access happens before db.close().

Stale consent cache during long training runs The consent_cache in consent_filtered_loader doesn’t refresh if a user revokes consent mid-run. For large-scale training, add a TTL to the cache or re-query every N records:

1
2
3
4
5
6
7
8
9
import time

CACHE_TTL_SECONDS = 300  # Re-check every 5 minutes
cache_timestamps: dict[str, float] = {}

def is_cache_valid(user_id: str) -> bool:
    if user_id not in cache_timestamps:
        return False
    return (time.time() - cache_timestamps[user_id]) < CACHE_TTL_SECONDS

GDPR requires you keep audit logs even after deletion Don’t delete the AuditLog and ConsentEntry records when processing a deletion request. You need proof that you processed the deletion. Only purge the actual user-generated data (training samples, embeddings, cached outputs). The consent and audit trail stays.

FastAPI returns 422 for consent requests Your request body doesn’t match the Pydantic model. The purpose field expects one of the ConsentPurpose enum values: model_training, inference, analytics, or personalization. Sending "purpose": "training" instead of "purpose": "model_training" triggers a validation error.