Every time you retrain a model, you pull a fresh snapshot of your training data. But how do you know what actually changed between v1 and v2? A row got dropped, a column shifted distribution, a categorical field gained new values — any of these can silently wreck model performance. A reconciliation pipeline catches these problems before they reach your training loop.

Here’s the core idea: load two dataset versions, compare them at the row level, column level, and distribution level, then generate a report with pass/fail thresholds.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import pandas as pd
import numpy as np
from scipy import stats
from dataclasses import dataclass, field

@dataclass
class ReconciliationReport:
    schema_changes: dict = field(default_factory=dict)
    row_diff: dict = field(default_factory=dict)
    column_stats: list = field(default_factory=list)
    distribution_tests: list = field(default_factory=list)
    alerts: list = field(default_factory=list)

def reconcile(df_old: pd.DataFrame, df_new: pd.DataFrame, thresholds: dict, id_col: str = None) -> ReconciliationReport:
    report = ReconciliationReport()
    report.schema_changes = check_schema(df_old, df_new)
    report.row_diff = check_rows(df_old, df_new, id_col=id_col)
    report.column_stats = compare_column_stats(df_old, df_new)
    report.distribution_tests = run_distribution_tests(df_old, df_new, thresholds)
    report.alerts = generate_alerts(report, thresholds)
    return report

That’s the skeleton. Now let’s build each piece.

Schema Comparison

Schema changes are the loudest signal that something went wrong upstream. A renamed column, a new field, or a dtype change can all break feature engineering code.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def check_schema(df_old: pd.DataFrame, df_new: pd.DataFrame) -> dict:
    old_cols = set(df_old.columns)
    new_cols = set(df_new.columns)

    added = new_cols - old_cols
    removed = old_cols - new_cols
    common = old_cols & new_cols

    dtype_changes = {}
    for col in common:
        if df_old[col].dtype != df_new[col].dtype:
            dtype_changes[col] = {
                "old": str(df_old[col].dtype),
                "new": str(df_new[col].dtype),
            }

    return {
        "added_columns": list(added),
        "removed_columns": list(removed),
        "dtype_changes": dtype_changes,
    }

This catches the obvious stuff. If removed_columns is non-empty, that’s almost always worth a hard stop — your downstream code probably references those columns.

Row-Level Diff

You need to know how many rows were added, removed, or modified between versions. If you have a unique ID column, this is straightforward. If you don’t, you compare on the full row hash.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def check_rows(df_old: pd.DataFrame, df_new: pd.DataFrame, id_col: str = None) -> dict:
    result = {
        "old_row_count": len(df_old),
        "new_row_count": len(df_new),
        "count_delta": len(df_new) - len(df_old),
        "count_delta_pct": round((len(df_new) - len(df_old)) / max(len(df_old), 1) * 100, 2),
    }

    if id_col and id_col in df_old.columns and id_col in df_new.columns:
        old_ids = set(df_old[id_col])
        new_ids = set(df_new[id_col])
        result["added_rows"] = len(new_ids - old_ids)
        result["removed_rows"] = len(old_ids - new_ids)
        result["common_rows"] = len(old_ids & new_ids)

        # Check for modified rows among common IDs
        common = old_ids & new_ids
        old_common = df_old[df_old[id_col].isin(common)].set_index(id_col).sort_index()
        new_common = df_new[df_new[id_col].isin(common)].set_index(id_col).sort_index()

        # Only compare columns that exist in both
        shared_cols = list(set(old_common.columns) & set(new_common.columns))
        mismatches = (old_common[shared_cols] != new_common[shared_cols]).any(axis=1)
        result["modified_rows"] = int(mismatches.sum())
    else:
        # Hash-based comparison when no ID column exists
        old_hashes = set(pd.util.hash_pandas_object(df_old))
        new_hashes = set(pd.util.hash_pandas_object(df_new))
        result["unique_to_old"] = len(old_hashes - new_hashes)
        result["unique_to_new"] = len(new_hashes - old_hashes)

    return result

The count_delta_pct field is what you’ll threshold on. A 5% swing in row count might be normal for weekly refreshes. A 40% drop means your ETL pipeline probably broke.

Column Statistics and Distribution Tests

Row counts tell you the “what.” Distribution tests tell you the “how.” A column could have the same number of rows but a completely different value distribution — that’s drift.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def compare_column_stats(df_old: pd.DataFrame, df_new: pd.DataFrame) -> list:
    common_cols = set(df_old.columns) & set(df_new.columns)
    results = []

    for col in sorted(common_cols):
        stat = {"column": col}

        if pd.api.types.is_numeric_dtype(df_old[col]) and pd.api.types.is_numeric_dtype(df_new[col]):
            stat["type"] = "numeric"
            stat["old_mean"] = round(df_old[col].mean(), 4)
            stat["new_mean"] = round(df_new[col].mean(), 4)
            stat["mean_delta_pct"] = round(
                abs(stat["new_mean"] - stat["old_mean"]) / max(abs(stat["old_mean"]), 1e-9) * 100, 2
            )
            stat["old_null_pct"] = round(df_old[col].isna().mean() * 100, 2)
            stat["new_null_pct"] = round(df_new[col].isna().mean() * 100, 2)
        else:
            stat["type"] = "categorical"
            old_unique = set(df_old[col].dropna().unique())
            new_unique = set(df_new[col].dropna().unique())
            stat["new_categories"] = list(new_unique - old_unique)
            stat["removed_categories"] = list(old_unique - new_unique)
            stat["old_null_pct"] = round(df_old[col].isna().mean() * 100, 2)
            stat["new_null_pct"] = round(df_new[col].isna().mean() * 100, 2)

        results.append(stat)

    return results


def run_distribution_tests(
    df_old: pd.DataFrame, df_new: pd.DataFrame, thresholds: dict
) -> list:
    common_cols = set(df_old.columns) & set(df_new.columns)
    alpha = thresholds.get("alpha", 0.05)
    results = []

    for col in sorted(common_cols):
        if pd.api.types.is_numeric_dtype(df_old[col]) and pd.api.types.is_numeric_dtype(df_new[col]):
            old_vals = df_old[col].dropna().values
            new_vals = df_new[col].dropna().values

            if len(old_vals) < 5 or len(new_vals) < 5:
                continue

            ks_stat, ks_p = stats.ks_2samp(old_vals, new_vals)
            results.append({
                "column": col,
                "test": "ks_2samp",
                "statistic": round(ks_stat, 4),
                "p_value": round(ks_p, 6),
                "drifted": ks_p < alpha,
            })
        else:
            old_counts = df_old[col].value_counts()
            new_counts = df_new[col].value_counts()

            # Align categories so both series have the same index
            all_cats = sorted(set(old_counts.index) | set(new_counts.index))
            old_aligned = old_counts.reindex(all_cats, fill_value=0)
            new_aligned = new_counts.reindex(all_cats, fill_value=0)

            if len(all_cats) < 2:
                continue

            old_smoothed = old_aligned + 1
            old_normalized = old_smoothed / old_smoothed.sum() * new_aligned.sum()
            chi2, chi_p = stats.chisquare(new_aligned, f_exp=old_normalized)
            results.append({
                "column": col,
                "test": "chi_square",
                "statistic": round(float(chi2), 4),
                "p_value": round(float(chi_p), 6),
                "drifted": chi_p < alpha,
            })

    return results

The Kolmogorov-Smirnov test works well for continuous numeric columns — it compares the full distribution shape, not just means. For categorical columns, chi-square checks whether the frequency distribution shifted. We add +1 to expected values to avoid division by zero when a category didn’t exist in the old data, then normalize so the expected sum matches the observed sum (required by scipy.stats.chisquare).

Generating Alerts and Running the Pipeline

Wire everything together with threshold-based alerting. You want hard failures for schema breaks and soft warnings for distribution shifts.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def generate_alerts(report: ReconciliationReport, thresholds: dict) -> list:
    alerts = []

    # Schema alerts — always critical
    if report.schema_changes.get("removed_columns"):
        alerts.append({
            "severity": "CRITICAL",
            "message": f"Columns removed: {report.schema_changes['removed_columns']}",
        })
    if report.schema_changes.get("dtype_changes"):
        alerts.append({
            "severity": "WARNING",
            "message": f"Dtype changes detected: {report.schema_changes['dtype_changes']}",
        })

    # Row count alerts
    max_row_delta = thresholds.get("max_row_delta_pct", 10)
    delta_pct = abs(report.row_diff.get("count_delta_pct", 0))
    if delta_pct > max_row_delta:
        alerts.append({
            "severity": "CRITICAL",
            "message": f"Row count changed by {delta_pct}% (threshold: {max_row_delta}%)",
        })

    # Distribution drift alerts
    drifted_cols = [t["column"] for t in report.distribution_tests if t.get("drifted")]
    max_drifted = thresholds.get("max_drifted_columns", 3)
    if len(drifted_cols) > max_drifted:
        alerts.append({
            "severity": "WARNING",
            "message": f"{len(drifted_cols)} columns show distribution drift: {drifted_cols}",
        })

    # Null rate spike alerts
    max_null_increase = thresholds.get("max_null_increase_pct", 5)
    for col_stat in report.column_stats:
        null_increase = col_stat.get("new_null_pct", 0) - col_stat.get("old_null_pct", 0)
        if null_increase > max_null_increase:
            alerts.append({
                "severity": "WARNING",
                "message": f"Column '{col_stat['column']}' null rate increased by {null_increase:.1f}%",
            })

    return alerts


# --- Run the full pipeline ---

# Create sample datasets representing two versions
np.random.seed(42)
df_v1 = pd.DataFrame({
    "id": range(1000),
    "age": np.random.normal(35, 10, 1000).astype(int),
    "income": np.random.lognormal(10, 1, 1000),
    "category": np.random.choice(["A", "B", "C"], 1000),
    "score": np.random.uniform(0, 1, 1000),
})

# v2: slight drift in age, new category, some rows dropped
df_v2 = pd.DataFrame({
    "id": range(50, 1100),
    "age": np.random.normal(38, 12, 1050).astype(int),
    "income": np.random.lognormal(10, 1, 1050),
    "category": np.random.choice(["A", "B", "C", "D"], 1050),
    "score": np.random.uniform(0, 1, 1050),
})

thresholds = {
    "alpha": 0.05,
    "max_row_delta_pct": 10,
    "max_drifted_columns": 3,
    "max_null_increase_pct": 5,
}

report = reconcile(df_v1, df_v2, thresholds, id_col="id")

print("=== Schema Changes ===")
print(report.schema_changes)
print("\n=== Row Diff ===")
for k, v in report.row_diff.items():
    print(f"  {k}: {v}")
print("\n=== Distribution Tests ===")
for t in report.distribution_tests:
    flag = "DRIFT" if t["drifted"] else "OK"
    print(f"  {t['column']} ({t['test']}): p={t['p_value']} [{flag}]")
print("\n=== Alerts ===")
for a in report.alerts:
    print(f"  [{a['severity']}] {a['message']}")

Running this produces output like:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
=== Schema Changes ===
{'added_columns': [], 'removed_columns': [], 'dtype_changes': {}}

=== Row Diff ===
  old_row_count: 1000
  new_row_count: 1050
  count_delta: 50
  count_delta_pct: 5.0
  added_rows: 100
  removed_rows: 50
  common_rows: 950
  modified_rows: 950

=== Distribution Tests ===
  age (ks_2samp): p=0.000134 [DRIFT]
  category (chi_square): p=0.0 [DRIFT]
  income (ks_2samp): p=0.581729 [OK]
  score (ks_2samp): p=0.63418 [OK]

=== Alerts ===
  [WARNING] 4 columns show distribution drift: ['age', 'category', 'income', 'score']

The pipeline caught the age distribution shift and the new category value — exactly the kind of silent changes that cause model degradation.

Scheduling and Automation

Wrap the reconciliation in a function your CI or Airflow DAG can call. Fail the pipeline on CRITICAL alerts, log warnings for review.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import sys
import json

def run_reconciliation(old_path: str, new_path: str, config_path: str = None) -> bool:
    """Returns True if reconciliation passes, False on critical alerts."""
    df_old = pd.read_parquet(old_path)
    df_new = pd.read_parquet(new_path)

    thresholds = {
        "alpha": 0.05,
        "max_row_delta_pct": 10,
        "max_drifted_columns": 3,
        "max_null_increase_pct": 5,
    }

    if config_path:
        with open(config_path) as f:
            thresholds.update(json.load(f))

    report = reconcile(df_old, df_new, thresholds)

    # Write report to JSON for downstream consumption
    report_dict = {
        "schema_changes": report.schema_changes,
        "row_diff": report.row_diff,
        "column_stats": report.column_stats,
        "distribution_tests": report.distribution_tests,
        "alerts": report.alerts,
    }

    with open("reconciliation_report.json", "w") as f:
        json.dump(report_dict, f, indent=2, default=str)

    critical_alerts = [a for a in report.alerts if a["severity"] == "CRITICAL"]
    if critical_alerts:
        print(f"FAILED: {len(critical_alerts)} critical alert(s) found", file=sys.stderr)
        for a in critical_alerts:
            print(f"  {a['message']}", file=sys.stderr)
        return False

    print(f"PASSED: {len(report.alerts)} warning(s)")
    return True


# Usage from command line or Airflow
if __name__ == "__main__":
    passed = run_reconciliation(
        old_path="data/training_v1.parquet",
        new_path="data/training_v2.parquet",
    )
    sys.exit(0 if passed else 1)

This gives you a non-zero exit code on failure, which CI systems pick up automatically.

Common Errors and Fixes

ValueError: Can only compare identically-labeled DataFrame objects — This happens when your ID column has duplicates. Deduplicate before reconciliation:

1
2
df_old = df_old.drop_duplicates(subset=["id"], keep="last")
df_new = df_new.drop_duplicates(subset=["id"], keep="last")

Chi-square test returns nan for p-value — You likely have a category with zero expected frequency. The +1 smoothing in run_distribution_tests handles this, but if you’re seeing it, check that your old and new DataFrames aren’t empty for that column.

KS test says everything drifted — With very large datasets (100k+ rows), the KS test becomes overly sensitive. Either subsample before testing or raise the alpha threshold:

1
thresholds = {"alpha": 0.001}  # Stricter p-value for large datasets

MemoryError on large Parquet files — Load only the columns you need:

1
2
cols_to_check = ["id", "age", "income", "category", "score"]
df_old = pd.read_parquet("data/v1.parquet", columns=cols_to_check)

Schema comparison misses renamed columns — This pipeline detects added/removed columns but can’t tell if user_age was renamed to age. Track column renames in your schema registry or version control system separately.