ML datasets change shape constantly. A new feature gets added, an old column gets renamed, a float field switches to an integer. If your pipeline can’t handle those changes gracefully, you end up with silent data corruption or crashes at 2 AM.

This guide shows you how to build a schema evolution pipeline that versions your dataset schemas, validates DataFrames on the fly, and migrates old data forward automatically.

Define Versioned Schemas with Pydantic v2

Start by defining your dataset schema as a Pydantic model. Each version gets its own class, tagged with a version number. This gives you a clear record of every schema change.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from pydantic import BaseModel, Field
from enum import Enum
from typing import Optional


class SchemaVersion(str, Enum):
    V1 = "1.0"
    V2 = "2.0"
    V3 = "3.0"


class DatasetSchemaV1(BaseModel):
    """Original schema: basic user features."""
    schema_version: str = SchemaVersion.V1
    user_id: int
    age: int
    income: float
    city: str


class DatasetSchemaV2(BaseModel):
    """V2: renamed 'city' to 'location', added 'signup_year'."""
    schema_version: str = SchemaVersion.V2
    user_id: int
    age: int
    income: float
    location: str
    signup_year: int


class DatasetSchemaV3(BaseModel):
    """V3: changed 'age' from int to float, added 'churn_risk' score."""
    schema_version: str = SchemaVersion.V3
    user_id: int
    age: float
    income: float
    location: str
    signup_year: int
    churn_risk: Optional[float] = None

Each schema is a snapshot. You can validate a single row by passing a dict to the model:

1
2
3
4
row = {"user_id": 1, "age": 34, "income": 55000.0, "city": "Denver"}
validated = DatasetSchemaV1.model_validate(row)
print(validated.model_dump())
# {'schema_version': '1.0', 'user_id': 1, 'age': 34, 'income': 55000.0, 'city': 'Denver'}

Validate DataFrames with Pandera

Pydantic handles row-level validation. For full DataFrame validation, use Pandera. Define a schema that matches each version and run it against your pandas DataFrame.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import pandera as pa
from pandera import Column, DataFrameSchema, Check
import pandas as pd


schema_v1 = DataFrameSchema(
    {
        "user_id": Column(int, Check.greater_than(0), unique=True),
        "age": Column(int, Check.in_range(0, 150)),
        "income": Column(float, Check.greater_than_or_equal_to(0)),
        "city": Column(str, Check.str_length(min_value=1)),
    },
    strict=True,  # reject unexpected columns
)

schema_v2 = DataFrameSchema(
    {
        "user_id": Column(int, Check.greater_than(0), unique=True),
        "age": Column(int, Check.in_range(0, 150)),
        "income": Column(float, Check.greater_than_or_equal_to(0)),
        "location": Column(str, Check.str_length(min_value=1)),
        "signup_year": Column(int, Check.in_range(2000, 2030)),
    },
    strict=True,
)

schema_v3 = DataFrameSchema(
    {
        "user_id": Column(int, Check.greater_than(0), unique=True),
        "age": Column(float, Check.in_range(0, 150)),
        "income": Column(float, Check.greater_than_or_equal_to(0)),
        "location": Column(str, Check.str_length(min_value=1)),
        "signup_year": Column(int, Check.in_range(2000, 2030)),
        "churn_risk": Column(float, Check.in_range(0, 1), nullable=True),
    },
    strict=True,
)

# Map version strings to Pandera schemas
PANDERA_SCHEMAS = {
    "1.0": schema_v1,
    "2.0": schema_v2,
    "3.0": schema_v3,
}

Now validate a DataFrame against the right version:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
df_v1 = pd.DataFrame({
    "user_id": [1, 2, 3],
    "age": [29, 45, 33],
    "income": [60000.0, 85000.0, 72000.0],
    "city": ["Denver", "Austin", "Portland"],
})

validated_df = PANDERA_SCHEMAS["1.0"].validate(df_v1)
print(f"Validated {len(validated_df)} rows against schema v1.0")
# Validated 3 rows against schema v1.0

If you pass a v1 DataFrame to the v2 schema, Pandera catches it immediately – city doesn’t exist in v2, and location is missing.

Build Migration Functions

Each migration function transforms a DataFrame from one version to the next. Keep them small and focused on a single version bump.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from typing import Callable


def migrate_v1_to_v2(df: pd.DataFrame) -> pd.DataFrame:
    """Rename 'city' to 'location', add 'signup_year' with default."""
    df = df.rename(columns={"city": "location"})
    df["signup_year"] = 2025  # default for backfill
    return df


def migrate_v2_to_v3(df: pd.DataFrame) -> pd.DataFrame:
    """Cast 'age' to float, add 'churn_risk' as nullable."""
    df["age"] = df["age"].astype(float)
    df["churn_risk"] = None
    return df


# Ordered migration chain
MIGRATIONS: list[tuple[str, str, Callable]] = [
    ("1.0", "2.0", migrate_v1_to_v2),
    ("2.0", "3.0", migrate_v2_to_v3),
]

Chain Migrations Automatically

The real power is chaining these migrations. Give the pipeline a source version and a target version, and it walks the chain forward, validating at each step.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def migrate_dataframe(
    df: pd.DataFrame,
    from_version: str,
    to_version: str,
) -> pd.DataFrame:
    """Migrate a DataFrame from one schema version to another."""
    if from_version == to_version:
        return PANDERA_SCHEMAS[to_version].validate(df)

    current_version = from_version
    current_df = df.copy()

    for src, dst, fn in MIGRATIONS:
        if src == current_version:
            print(f"Migrating {src} -> {dst}")
            current_df = fn(current_df)
            current_df = PANDERA_SCHEMAS[dst].validate(current_df)
            current_version = dst

            if current_version == to_version:
                return current_df

    raise ValueError(
        f"No migration path from {from_version} to {to_version}. "
        f"Stopped at {current_version}."
    )

Run the full pipeline:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
df_old = pd.DataFrame({
    "user_id": [1, 2, 3],
    "age": [29, 45, 33],
    "income": [60000.0, 85000.0, 72000.0],
    "city": ["Denver", "Austin", "Portland"],
})

df_latest = migrate_dataframe(df_old, from_version="1.0", to_version="3.0")
print(df_latest.dtypes)
# user_id         int64
# age           float64
# income        float64
# location       object
# signup_year     int64
# churn_risk    float64
# dtype: object

print(df_latest.head())
#    user_id   age   income  location  signup_year  churn_risk
# 0        1  29.0  60000.0    Denver         2025         NaN
# 1        2  45.0  85000.0    Austin         2025         NaN
# 2        3  33.0  72000.0  Portland         2025         NaN

The DataFrame went from v1 (with city as a string and age as int) all the way to v3 (with location, signup_year, age as float, and churn_risk). Each intermediate step was validated by Pandera.

Detect Schema Version Automatically

You don’t always know what version a CSV file is. Write a detector that checks columns against known schemas.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def detect_schema_version(df: pd.DataFrame) -> str:
    """Detect which schema version a DataFrame matches."""
    columns = set(df.columns)

    version_columns = {
        "1.0": {"user_id", "age", "income", "city"},
        "2.0": {"user_id", "age", "income", "location", "signup_year"},
        "3.0": {"user_id", "age", "income", "location", "signup_year", "churn_risk"},
    }

    for version, expected_cols in version_columns.items():
        if columns == expected_cols:
            return version

    raise ValueError(
        f"Unknown schema. Columns found: {columns}. "
        f"Known schemas: {list(version_columns.keys())}"
    )


# Auto-detect and migrate
raw_df = pd.read_csv("legacy_users.csv")
detected = detect_schema_version(raw_df)
print(f"Detected schema version: {detected}")

if detected != "3.0":
    migrated_df = migrate_dataframe(raw_df, from_version=detected, to_version="3.0")
else:
    migrated_df = PANDERA_SCHEMAS["3.0"].validate(raw_df)

This pattern works well in batch pipelines. Load the file, detect the version, migrate if needed, validate, then pass downstream.

Backward Compatibility Checks

Before deploying a new schema version, verify that old data can still be migrated cleanly. Run this as part of your CI pipeline.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def check_backward_compatibility(
    sample_data: dict[str, pd.DataFrame],
    target_version: str,
) -> dict[str, bool]:
    """Test that all sample datasets migrate to the target version."""
    results = {}
    for version, df in sample_data.items():
        try:
            migrate_dataframe(df, from_version=version, to_version=target_version)
            results[version] = True
        except Exception as e:
            print(f"Migration from {version} failed: {e}")
            results[version] = False
    return results


# Build sample DataFrames for each version
samples = {
    "1.0": pd.DataFrame({
        "user_id": [99], "age": [30], "income": [50000.0], "city": ["Seattle"],
    }),
    "2.0": pd.DataFrame({
        "user_id": [99], "age": [30], "income": [50000.0],
        "location": ["Seattle"], "signup_year": [2024],
    }),
}

compat = check_backward_compatibility(samples, target_version="3.0")
print(compat)
# {'1.0': True, '2.0': True}

If any version fails, your new migration function has a bug. Fix it before merging.

Common Errors and Fixes

SchemaError: column 'city' not in dataframe – You’re validating against the wrong schema version. Use detect_schema_version() first, or check that your migration actually renamed the column.

TypeError: Cannot convert non-finite values (NA or inf) to integer – Happens when you add a nullable column and then try to cast it to int. Use pd.Int64Dtype() for nullable integer columns:

1
df["signup_year"] = df["signup_year"].astype(pd.Int64Dtype())

SchemaError: expected series 'age' to have type float64, got int64 – Your migration didn’t cast the column. Make sure migrate_v2_to_v3 explicitly runs df["age"] = df["age"].astype(float) before validation.

ValidationError: Extra inputs are not permitted (Pydantic strict mode) – Your row dict has extra keys that aren’t in the model. Either remove them before validation or add model_config = ConfigDict(extra="ignore") to your Pydantic model.

Migrations run out of order – The MIGRATIONS list must be sorted by version. If you insert a migration for 2.5 between 2.0 and 3.0, update the chain accordingly. A missing link means migrate_dataframe raises a ValueError.

Pandera strict=True rejects extra columns – If your DataFrame has metadata columns (like _loaded_at or _source_file), either drop them before validation or set strict=False and add only the columns you care about.