ML datasets change shape constantly. A new feature gets added, an old column gets renamed, a float field switches to an integer. If your pipeline can’t handle those changes gracefully, you end up with silent data corruption or crashes at 2 AM.
This guide shows you how to build a schema evolution pipeline that versions your dataset schemas, validates DataFrames on the fly, and migrates old data forward automatically.
Define Versioned Schemas with Pydantic v2#
Start by defining your dataset schema as a Pydantic model. Each version gets its own class, tagged with a version number. This gives you a clear record of every schema change.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| from pydantic import BaseModel, Field
from enum import Enum
from typing import Optional
class SchemaVersion(str, Enum):
V1 = "1.0"
V2 = "2.0"
V3 = "3.0"
class DatasetSchemaV1(BaseModel):
"""Original schema: basic user features."""
schema_version: str = SchemaVersion.V1
user_id: int
age: int
income: float
city: str
class DatasetSchemaV2(BaseModel):
"""V2: renamed 'city' to 'location', added 'signup_year'."""
schema_version: str = SchemaVersion.V2
user_id: int
age: int
income: float
location: str
signup_year: int
class DatasetSchemaV3(BaseModel):
"""V3: changed 'age' from int to float, added 'churn_risk' score."""
schema_version: str = SchemaVersion.V3
user_id: int
age: float
income: float
location: str
signup_year: int
churn_risk: Optional[float] = None
|
Each schema is a snapshot. You can validate a single row by passing a dict to the model:
1
2
3
4
| row = {"user_id": 1, "age": 34, "income": 55000.0, "city": "Denver"}
validated = DatasetSchemaV1.model_validate(row)
print(validated.model_dump())
# {'schema_version': '1.0', 'user_id': 1, 'age': 34, 'income': 55000.0, 'city': 'Denver'}
|
Validate DataFrames with Pandera#
Pydantic handles row-level validation. For full DataFrame validation, use Pandera. Define a schema that matches each version and run it against your pandas DataFrame.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
| import pandera as pa
from pandera import Column, DataFrameSchema, Check
import pandas as pd
schema_v1 = DataFrameSchema(
{
"user_id": Column(int, Check.greater_than(0), unique=True),
"age": Column(int, Check.in_range(0, 150)),
"income": Column(float, Check.greater_than_or_equal_to(0)),
"city": Column(str, Check.str_length(min_value=1)),
},
strict=True, # reject unexpected columns
)
schema_v2 = DataFrameSchema(
{
"user_id": Column(int, Check.greater_than(0), unique=True),
"age": Column(int, Check.in_range(0, 150)),
"income": Column(float, Check.greater_than_or_equal_to(0)),
"location": Column(str, Check.str_length(min_value=1)),
"signup_year": Column(int, Check.in_range(2000, 2030)),
},
strict=True,
)
schema_v3 = DataFrameSchema(
{
"user_id": Column(int, Check.greater_than(0), unique=True),
"age": Column(float, Check.in_range(0, 150)),
"income": Column(float, Check.greater_than_or_equal_to(0)),
"location": Column(str, Check.str_length(min_value=1)),
"signup_year": Column(int, Check.in_range(2000, 2030)),
"churn_risk": Column(float, Check.in_range(0, 1), nullable=True),
},
strict=True,
)
# Map version strings to Pandera schemas
PANDERA_SCHEMAS = {
"1.0": schema_v1,
"2.0": schema_v2,
"3.0": schema_v3,
}
|
Now validate a DataFrame against the right version:
1
2
3
4
5
6
7
8
9
10
| df_v1 = pd.DataFrame({
"user_id": [1, 2, 3],
"age": [29, 45, 33],
"income": [60000.0, 85000.0, 72000.0],
"city": ["Denver", "Austin", "Portland"],
})
validated_df = PANDERA_SCHEMAS["1.0"].validate(df_v1)
print(f"Validated {len(validated_df)} rows against schema v1.0")
# Validated 3 rows against schema v1.0
|
If you pass a v1 DataFrame to the v2 schema, Pandera catches it immediately – city doesn’t exist in v2, and location is missing.
Build Migration Functions#
Each migration function transforms a DataFrame from one version to the next. Keep them small and focused on a single version bump.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| from typing import Callable
def migrate_v1_to_v2(df: pd.DataFrame) -> pd.DataFrame:
"""Rename 'city' to 'location', add 'signup_year' with default."""
df = df.rename(columns={"city": "location"})
df["signup_year"] = 2025 # default for backfill
return df
def migrate_v2_to_v3(df: pd.DataFrame) -> pd.DataFrame:
"""Cast 'age' to float, add 'churn_risk' as nullable."""
df["age"] = df["age"].astype(float)
df["churn_risk"] = None
return df
# Ordered migration chain
MIGRATIONS: list[tuple[str, str, Callable]] = [
("1.0", "2.0", migrate_v1_to_v2),
("2.0", "3.0", migrate_v2_to_v3),
]
|
Chain Migrations Automatically#
The real power is chaining these migrations. Give the pipeline a source version and a target version, and it walks the chain forward, validating at each step.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| def migrate_dataframe(
df: pd.DataFrame,
from_version: str,
to_version: str,
) -> pd.DataFrame:
"""Migrate a DataFrame from one schema version to another."""
if from_version == to_version:
return PANDERA_SCHEMAS[to_version].validate(df)
current_version = from_version
current_df = df.copy()
for src, dst, fn in MIGRATIONS:
if src == current_version:
print(f"Migrating {src} -> {dst}")
current_df = fn(current_df)
current_df = PANDERA_SCHEMAS[dst].validate(current_df)
current_version = dst
if current_version == to_version:
return current_df
raise ValueError(
f"No migration path from {from_version} to {to_version}. "
f"Stopped at {current_version}."
)
|
Run the full pipeline:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| df_old = pd.DataFrame({
"user_id": [1, 2, 3],
"age": [29, 45, 33],
"income": [60000.0, 85000.0, 72000.0],
"city": ["Denver", "Austin", "Portland"],
})
df_latest = migrate_dataframe(df_old, from_version="1.0", to_version="3.0")
print(df_latest.dtypes)
# user_id int64
# age float64
# income float64
# location object
# signup_year int64
# churn_risk float64
# dtype: object
print(df_latest.head())
# user_id age income location signup_year churn_risk
# 0 1 29.0 60000.0 Denver 2025 NaN
# 1 2 45.0 85000.0 Austin 2025 NaN
# 2 3 33.0 72000.0 Portland 2025 NaN
|
The DataFrame went from v1 (with city as a string and age as int) all the way to v3 (with location, signup_year, age as float, and churn_risk). Each intermediate step was validated by Pandera.
Detect Schema Version Automatically#
You don’t always know what version a CSV file is. Write a detector that checks columns against known schemas.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| def detect_schema_version(df: pd.DataFrame) -> str:
"""Detect which schema version a DataFrame matches."""
columns = set(df.columns)
version_columns = {
"1.0": {"user_id", "age", "income", "city"},
"2.0": {"user_id", "age", "income", "location", "signup_year"},
"3.0": {"user_id", "age", "income", "location", "signup_year", "churn_risk"},
}
for version, expected_cols in version_columns.items():
if columns == expected_cols:
return version
raise ValueError(
f"Unknown schema. Columns found: {columns}. "
f"Known schemas: {list(version_columns.keys())}"
)
# Auto-detect and migrate
raw_df = pd.read_csv("legacy_users.csv")
detected = detect_schema_version(raw_df)
print(f"Detected schema version: {detected}")
if detected != "3.0":
migrated_df = migrate_dataframe(raw_df, from_version=detected, to_version="3.0")
else:
migrated_df = PANDERA_SCHEMAS["3.0"].validate(raw_df)
|
This pattern works well in batch pipelines. Load the file, detect the version, migrate if needed, validate, then pass downstream.
Backward Compatibility Checks#
Before deploying a new schema version, verify that old data can still be migrated cleanly. Run this as part of your CI pipeline.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| def check_backward_compatibility(
sample_data: dict[str, pd.DataFrame],
target_version: str,
) -> dict[str, bool]:
"""Test that all sample datasets migrate to the target version."""
results = {}
for version, df in sample_data.items():
try:
migrate_dataframe(df, from_version=version, to_version=target_version)
results[version] = True
except Exception as e:
print(f"Migration from {version} failed: {e}")
results[version] = False
return results
# Build sample DataFrames for each version
samples = {
"1.0": pd.DataFrame({
"user_id": [99], "age": [30], "income": [50000.0], "city": ["Seattle"],
}),
"2.0": pd.DataFrame({
"user_id": [99], "age": [30], "income": [50000.0],
"location": ["Seattle"], "signup_year": [2024],
}),
}
compat = check_backward_compatibility(samples, target_version="3.0")
print(compat)
# {'1.0': True, '2.0': True}
|
If any version fails, your new migration function has a bug. Fix it before merging.
Common Errors and Fixes#
SchemaError: column 'city' not in dataframe – You’re validating against the wrong schema version. Use detect_schema_version() first, or check that your migration actually renamed the column.
TypeError: Cannot convert non-finite values (NA or inf) to integer – Happens when you add a nullable column and then try to cast it to int. Use pd.Int64Dtype() for nullable integer columns:
1
| df["signup_year"] = df["signup_year"].astype(pd.Int64Dtype())
|
SchemaError: expected series 'age' to have type float64, got int64 – Your migration didn’t cast the column. Make sure migrate_v2_to_v3 explicitly runs df["age"] = df["age"].astype(float) before validation.
ValidationError: Extra inputs are not permitted (Pydantic strict mode) – Your row dict has extra keys that aren’t in the model. Either remove them before validation or add model_config = ConfigDict(extra="ignore") to your Pydantic model.
Migrations run out of order – The MIGRATIONS list must be sorted by version. If you insert a migration for 2.5 between 2.0 and 3.0, update the chain accordingly. A missing link means migrate_dataframe raises a ValueError.
Pandera strict=True rejects extra columns – If your DataFrame has metadata columns (like _loaded_at or _source_file), either drop them before validation or set strict=False and add only the columns you care about.