Every time you retrain a model, you pull a fresh snapshot of your training data. But how do you know what actually changed between v1 and v2? A row got dropped, a column shifted distribution, a categorical field gained new values — any of these can silently wreck model performance. A reconciliation pipeline catches these problems before they reach your training loop.
Here’s the core idea: load two dataset versions, compare them at the row level, column level, and distribution level, then generate a report with pass/fail thresholds.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| import pandas as pd
import numpy as np
from scipy import stats
from dataclasses import dataclass, field
@dataclass
class ReconciliationReport:
schema_changes: dict = field(default_factory=dict)
row_diff: dict = field(default_factory=dict)
column_stats: list = field(default_factory=list)
distribution_tests: list = field(default_factory=list)
alerts: list = field(default_factory=list)
def reconcile(df_old: pd.DataFrame, df_new: pd.DataFrame, thresholds: dict, id_col: str = None) -> ReconciliationReport:
report = ReconciliationReport()
report.schema_changes = check_schema(df_old, df_new)
report.row_diff = check_rows(df_old, df_new, id_col=id_col)
report.column_stats = compare_column_stats(df_old, df_new)
report.distribution_tests = run_distribution_tests(df_old, df_new, thresholds)
report.alerts = generate_alerts(report, thresholds)
return report
|
That’s the skeleton. Now let’s build each piece.
Schema Comparison#
Schema changes are the loudest signal that something went wrong upstream. A renamed column, a new field, or a dtype change can all break feature engineering code.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| def check_schema(df_old: pd.DataFrame, df_new: pd.DataFrame) -> dict:
old_cols = set(df_old.columns)
new_cols = set(df_new.columns)
added = new_cols - old_cols
removed = old_cols - new_cols
common = old_cols & new_cols
dtype_changes = {}
for col in common:
if df_old[col].dtype != df_new[col].dtype:
dtype_changes[col] = {
"old": str(df_old[col].dtype),
"new": str(df_new[col].dtype),
}
return {
"added_columns": list(added),
"removed_columns": list(removed),
"dtype_changes": dtype_changes,
}
|
This catches the obvious stuff. If removed_columns is non-empty, that’s almost always worth a hard stop — your downstream code probably references those columns.
Row-Level Diff#
You need to know how many rows were added, removed, or modified between versions. If you have a unique ID column, this is straightforward. If you don’t, you compare on the full row hash.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| def check_rows(df_old: pd.DataFrame, df_new: pd.DataFrame, id_col: str = None) -> dict:
result = {
"old_row_count": len(df_old),
"new_row_count": len(df_new),
"count_delta": len(df_new) - len(df_old),
"count_delta_pct": round((len(df_new) - len(df_old)) / max(len(df_old), 1) * 100, 2),
}
if id_col and id_col in df_old.columns and id_col in df_new.columns:
old_ids = set(df_old[id_col])
new_ids = set(df_new[id_col])
result["added_rows"] = len(new_ids - old_ids)
result["removed_rows"] = len(old_ids - new_ids)
result["common_rows"] = len(old_ids & new_ids)
# Check for modified rows among common IDs
common = old_ids & new_ids
old_common = df_old[df_old[id_col].isin(common)].set_index(id_col).sort_index()
new_common = df_new[df_new[id_col].isin(common)].set_index(id_col).sort_index()
# Only compare columns that exist in both
shared_cols = list(set(old_common.columns) & set(new_common.columns))
mismatches = (old_common[shared_cols] != new_common[shared_cols]).any(axis=1)
result["modified_rows"] = int(mismatches.sum())
else:
# Hash-based comparison when no ID column exists
old_hashes = set(pd.util.hash_pandas_object(df_old))
new_hashes = set(pd.util.hash_pandas_object(df_new))
result["unique_to_old"] = len(old_hashes - new_hashes)
result["unique_to_new"] = len(new_hashes - old_hashes)
return result
|
The count_delta_pct field is what you’ll threshold on. A 5% swing in row count might be normal for weekly refreshes. A 40% drop means your ETL pipeline probably broke.
Column Statistics and Distribution Tests#
Row counts tell you the “what.” Distribution tests tell you the “how.” A column could have the same number of rows but a completely different value distribution — that’s drift.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
| def compare_column_stats(df_old: pd.DataFrame, df_new: pd.DataFrame) -> list:
common_cols = set(df_old.columns) & set(df_new.columns)
results = []
for col in sorted(common_cols):
stat = {"column": col}
if pd.api.types.is_numeric_dtype(df_old[col]) and pd.api.types.is_numeric_dtype(df_new[col]):
stat["type"] = "numeric"
stat["old_mean"] = round(df_old[col].mean(), 4)
stat["new_mean"] = round(df_new[col].mean(), 4)
stat["mean_delta_pct"] = round(
abs(stat["new_mean"] - stat["old_mean"]) / max(abs(stat["old_mean"]), 1e-9) * 100, 2
)
stat["old_null_pct"] = round(df_old[col].isna().mean() * 100, 2)
stat["new_null_pct"] = round(df_new[col].isna().mean() * 100, 2)
else:
stat["type"] = "categorical"
old_unique = set(df_old[col].dropna().unique())
new_unique = set(df_new[col].dropna().unique())
stat["new_categories"] = list(new_unique - old_unique)
stat["removed_categories"] = list(old_unique - new_unique)
stat["old_null_pct"] = round(df_old[col].isna().mean() * 100, 2)
stat["new_null_pct"] = round(df_new[col].isna().mean() * 100, 2)
results.append(stat)
return results
def run_distribution_tests(
df_old: pd.DataFrame, df_new: pd.DataFrame, thresholds: dict
) -> list:
common_cols = set(df_old.columns) & set(df_new.columns)
alpha = thresholds.get("alpha", 0.05)
results = []
for col in sorted(common_cols):
if pd.api.types.is_numeric_dtype(df_old[col]) and pd.api.types.is_numeric_dtype(df_new[col]):
old_vals = df_old[col].dropna().values
new_vals = df_new[col].dropna().values
if len(old_vals) < 5 or len(new_vals) < 5:
continue
ks_stat, ks_p = stats.ks_2samp(old_vals, new_vals)
results.append({
"column": col,
"test": "ks_2samp",
"statistic": round(ks_stat, 4),
"p_value": round(ks_p, 6),
"drifted": ks_p < alpha,
})
else:
old_counts = df_old[col].value_counts()
new_counts = df_new[col].value_counts()
# Align categories so both series have the same index
all_cats = sorted(set(old_counts.index) | set(new_counts.index))
old_aligned = old_counts.reindex(all_cats, fill_value=0)
new_aligned = new_counts.reindex(all_cats, fill_value=0)
if len(all_cats) < 2:
continue
old_smoothed = old_aligned + 1
old_normalized = old_smoothed / old_smoothed.sum() * new_aligned.sum()
chi2, chi_p = stats.chisquare(new_aligned, f_exp=old_normalized)
results.append({
"column": col,
"test": "chi_square",
"statistic": round(float(chi2), 4),
"p_value": round(float(chi_p), 6),
"drifted": chi_p < alpha,
})
return results
|
The Kolmogorov-Smirnov test works well for continuous numeric columns — it compares the full distribution shape, not just means. For categorical columns, chi-square checks whether the frequency distribution shifted. We add +1 to expected values to avoid division by zero when a category didn’t exist in the old data, then normalize so the expected sum matches the observed sum (required by scipy.stats.chisquare).
Generating Alerts and Running the Pipeline#
Wire everything together with threshold-based alerting. You want hard failures for schema breaks and soft warnings for distribution shifts.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
| def generate_alerts(report: ReconciliationReport, thresholds: dict) -> list:
alerts = []
# Schema alerts — always critical
if report.schema_changes.get("removed_columns"):
alerts.append({
"severity": "CRITICAL",
"message": f"Columns removed: {report.schema_changes['removed_columns']}",
})
if report.schema_changes.get("dtype_changes"):
alerts.append({
"severity": "WARNING",
"message": f"Dtype changes detected: {report.schema_changes['dtype_changes']}",
})
# Row count alerts
max_row_delta = thresholds.get("max_row_delta_pct", 10)
delta_pct = abs(report.row_diff.get("count_delta_pct", 0))
if delta_pct > max_row_delta:
alerts.append({
"severity": "CRITICAL",
"message": f"Row count changed by {delta_pct}% (threshold: {max_row_delta}%)",
})
# Distribution drift alerts
drifted_cols = [t["column"] for t in report.distribution_tests if t.get("drifted")]
max_drifted = thresholds.get("max_drifted_columns", 3)
if len(drifted_cols) > max_drifted:
alerts.append({
"severity": "WARNING",
"message": f"{len(drifted_cols)} columns show distribution drift: {drifted_cols}",
})
# Null rate spike alerts
max_null_increase = thresholds.get("max_null_increase_pct", 5)
for col_stat in report.column_stats:
null_increase = col_stat.get("new_null_pct", 0) - col_stat.get("old_null_pct", 0)
if null_increase > max_null_increase:
alerts.append({
"severity": "WARNING",
"message": f"Column '{col_stat['column']}' null rate increased by {null_increase:.1f}%",
})
return alerts
# --- Run the full pipeline ---
# Create sample datasets representing two versions
np.random.seed(42)
df_v1 = pd.DataFrame({
"id": range(1000),
"age": np.random.normal(35, 10, 1000).astype(int),
"income": np.random.lognormal(10, 1, 1000),
"category": np.random.choice(["A", "B", "C"], 1000),
"score": np.random.uniform(0, 1, 1000),
})
# v2: slight drift in age, new category, some rows dropped
df_v2 = pd.DataFrame({
"id": range(50, 1100),
"age": np.random.normal(38, 12, 1050).astype(int),
"income": np.random.lognormal(10, 1, 1050),
"category": np.random.choice(["A", "B", "C", "D"], 1050),
"score": np.random.uniform(0, 1, 1050),
})
thresholds = {
"alpha": 0.05,
"max_row_delta_pct": 10,
"max_drifted_columns": 3,
"max_null_increase_pct": 5,
}
report = reconcile(df_v1, df_v2, thresholds, id_col="id")
print("=== Schema Changes ===")
print(report.schema_changes)
print("\n=== Row Diff ===")
for k, v in report.row_diff.items():
print(f" {k}: {v}")
print("\n=== Distribution Tests ===")
for t in report.distribution_tests:
flag = "DRIFT" if t["drifted"] else "OK"
print(f" {t['column']} ({t['test']}): p={t['p_value']} [{flag}]")
print("\n=== Alerts ===")
for a in report.alerts:
print(f" [{a['severity']}] {a['message']}")
|
Running this produces output like:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| === Schema Changes ===
{'added_columns': [], 'removed_columns': [], 'dtype_changes': {}}
=== Row Diff ===
old_row_count: 1000
new_row_count: 1050
count_delta: 50
count_delta_pct: 5.0
added_rows: 100
removed_rows: 50
common_rows: 950
modified_rows: 950
=== Distribution Tests ===
age (ks_2samp): p=0.000134 [DRIFT]
category (chi_square): p=0.0 [DRIFT]
income (ks_2samp): p=0.581729 [OK]
score (ks_2samp): p=0.63418 [OK]
=== Alerts ===
[WARNING] 4 columns show distribution drift: ['age', 'category', 'income', 'score']
|
The pipeline caught the age distribution shift and the new category value — exactly the kind of silent changes that cause model degradation.
Scheduling and Automation#
Wrap the reconciliation in a function your CI or Airflow DAG can call. Fail the pipeline on CRITICAL alerts, log warnings for review.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
| import sys
import json
def run_reconciliation(old_path: str, new_path: str, config_path: str = None) -> bool:
"""Returns True if reconciliation passes, False on critical alerts."""
df_old = pd.read_parquet(old_path)
df_new = pd.read_parquet(new_path)
thresholds = {
"alpha": 0.05,
"max_row_delta_pct": 10,
"max_drifted_columns": 3,
"max_null_increase_pct": 5,
}
if config_path:
with open(config_path) as f:
thresholds.update(json.load(f))
report = reconcile(df_old, df_new, thresholds)
# Write report to JSON for downstream consumption
report_dict = {
"schema_changes": report.schema_changes,
"row_diff": report.row_diff,
"column_stats": report.column_stats,
"distribution_tests": report.distribution_tests,
"alerts": report.alerts,
}
with open("reconciliation_report.json", "w") as f:
json.dump(report_dict, f, indent=2, default=str)
critical_alerts = [a for a in report.alerts if a["severity"] == "CRITICAL"]
if critical_alerts:
print(f"FAILED: {len(critical_alerts)} critical alert(s) found", file=sys.stderr)
for a in critical_alerts:
print(f" {a['message']}", file=sys.stderr)
return False
print(f"PASSED: {len(report.alerts)} warning(s)")
return True
# Usage from command line or Airflow
if __name__ == "__main__":
passed = run_reconciliation(
old_path="data/training_v1.parquet",
new_path="data/training_v2.parquet",
)
sys.exit(0 if passed else 1)
|
This gives you a non-zero exit code on failure, which CI systems pick up automatically.
Common Errors and Fixes#
ValueError: Can only compare identically-labeled DataFrame objects — This happens when your ID column has duplicates. Deduplicate before reconciliation:
1
2
| df_old = df_old.drop_duplicates(subset=["id"], keep="last")
df_new = df_new.drop_duplicates(subset=["id"], keep="last")
|
Chi-square test returns nan for p-value — You likely have a category with zero expected frequency. The +1 smoothing in run_distribution_tests handles this, but if you’re seeing it, check that your old and new DataFrames aren’t empty for that column.
KS test says everything drifted — With very large datasets (100k+ rows), the KS test becomes overly sensitive. Either subsample before testing or raise the alpha threshold:
1
| thresholds = {"alpha": 0.001} # Stricter p-value for large datasets
|
MemoryError on large Parquet files — Load only the columns you need:
1
2
| cols_to_check = ["id", "age", "income", "category", "score"]
df_old = pd.read_parquet("data/v1.parquet", columns=cols_to_check)
|
Schema comparison misses renamed columns — This pipeline detects added/removed columns but can’t tell if user_age was renamed to age. Track column renames in your schema registry or version control system separately.