Your model didn’t break. The data changed underneath it. Feature distributions shift, new categories appear, null rates creep up – and none of this triggers a deployment error. By the time prediction quality visibly degrades, you’ve been serving bad results for weeks.
whylogs solves this by generating lightweight statistical profiles of your data. You profile a reference dataset, profile incoming production data, compare the two, and fire alerts when the distributions diverge. The entire pipeline runs in pure Python, doesn’t require a database, and produces profiles that are small enough to store alongside your model artifacts.
Install whylogs with the viz extras for drift analysis:
1
| pip install "whylogs[viz]"
|
Profile a Reference Dataset#
A whylogs profile captures summary statistics – counts, distributions, cardinality, frequent items, data types – for every column in a DataFrame. It’s the statistical fingerprint of your data at a specific point in time.
Start by profiling your training data (or a known-good production snapshot) as the reference baseline:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| import pandas as pd
import whylogs as why
# Simulate a reference dataset (use your actual training data here)
reference_df = pd.DataFrame({
"age": [25, 32, 47, 29, 51, 38, 44, 33, 27, 60],
"income": [45000, 62000, 88000, 51000, 95000, 71000, 83000, 58000, 42000, 110000],
"credit_score": [680, 720, 750, 690, 780, 710, 740, 700, 660, 800],
"loan_approved": [0, 1, 1, 0, 1, 1, 1, 1, 0, 1],
})
# Profile the reference data
reference_result = why.log(pandas=reference_df)
reference_profile = reference_result.view()
# Save the profile to disk for later comparison
reference_result.writer("local").write(dest="profiles/reference")
|
That profiles/reference directory now contains a binary protobuf file with the full statistical profile. It’s typically a few KB, regardless of how large your DataFrame was. You can version this alongside your model checkpoint.
To load a saved profile back:
1
2
3
| from whylogs.core import DatasetProfileView
reference_profile = DatasetProfileView.read("profiles/reference/profile.bin")
|
Profile Production Data and Compare#
When new data arrives – a daily batch, an hourly partition, whatever your cadence – profile it and compare against the reference:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| import whylogs as why
from whylogs.viz.drift.column_drift_algorithms import calculate_drift_scores
# Simulate production data with some drift
# Income shifted higher, credit_score has a different spread
production_df = pd.DataFrame({
"age": [28, 35, 50, 31, 55, 40, 46, 36, 30, 63],
"income": [75000, 92000, 118000, 81000, 135000, 101000, 113000, 88000, 72000, 150000],
"credit_score": [640, 670, 710, 650, 730, 680, 700, 665, 620, 760],
"loan_approved": [1, 1, 1, 1, 1, 1, 1, 1, 0, 1],
})
# Profile production data
production_result = why.log(pandas=production_df)
production_profile = production_result.view()
# Calculate drift scores for every column
drift_scores = calculate_drift_scores(
target_view=production_profile,
reference_view=reference_profile,
with_thresholds=True,
)
for column_name, score_info in drift_scores.items():
algorithm = score_info.get("algorithm", "unknown")
pvalue = score_info.get("pvalue", None)
drift_category = score_info.get("drift_category", "NO_DRIFT")
print(f"{column_name}: algorithm={algorithm}, p-value={pvalue:.4f}, drift={drift_category}")
|
calculate_drift_scores picks the right statistical test automatically. Numerical columns get the Kolmogorov-Smirnov test. Categorical columns get the Chi-squared test. The returned dictionary includes the p-value, the algorithm used, and a drift category (NO_DRIFT, POSSIBLE_DRIFT, or DRIFT).
You can also generate a visual drift report in a Jupyter notebook:
1
2
3
4
5
6
7
8
9
10
11
12
13
| from whylogs.viz import NotebookProfileVisualizer
viz = NotebookProfileVisualizer()
viz.set_profiles(
target_profile_view=production_profile,
reference_profile_view=reference_profile,
)
# Full drift summary across all features
viz.summary_drift_report()
# Overlay histograms for a specific feature
viz.double_histogram(feature_name="income")
|
Set Up Drift Constraints and Validation Rules#
Drift scores are useful for dashboards, but you need hard pass/fail gates for automation. whylogs constraints let you define rules that a profile must satisfy, then validate programmatically:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
| import whylogs as why
from whylogs.core.constraints import ConstraintsBuilder, MetricConstraint, MetricsSelector
# Profile the production data
production_result = why.log(pandas=production_df)
production_profile = production_result.view()
# Build constraints
builder = ConstraintsBuilder(dataset_profile_view=production_profile)
# Rule 1: credit_score mean must stay between 650 and 800
credit_mean_check = MetricConstraint(
name="credit_score_mean_in_range",
condition=lambda dist: 650 <= dist.mean <= 800,
metric_selector=MetricsSelector(metric_name="distribution", column_name="credit_score"),
)
builder.add_constraint(credit_mean_check)
# Rule 2: age must have no negative values
age_non_negative = MetricConstraint(
name="age_non_negative",
condition=lambda dist: dist.min >= 0,
metric_selector=MetricsSelector(metric_name="distribution", column_name="age"),
)
builder.add_constraint(age_non_negative)
# Rule 3: income stddev shouldn't explode (detect variance shift)
income_variance_check = MetricConstraint(
name="income_stddev_below_50000",
condition=lambda dist: dist.stddev < 50000,
metric_selector=MetricsSelector(metric_name="distribution", column_name="income"),
)
builder.add_constraint(income_variance_check)
# Rule 4: null count for credit_score should be zero
null_check = MetricConstraint(
name="credit_score_no_nulls",
condition=lambda counts: counts.null.value == 0,
metric_selector=MetricsSelector(metric_name="counts", column_name="credit_score"),
)
builder.add_constraint(null_check)
# Validate all constraints
constraints = builder.build()
is_valid = constraints.validate()
report = constraints.generate_constraints_report()
for entry in report:
print(f"{entry.name}: {'PASS' if entry.passed else 'FAIL'} "
f"(passed={entry.num_passed}, failed={entry.num_failed})")
|
When constraints.validate() returns False, you know something is off. The report tells you exactly which rules failed.
Automate Drift Checks in a Batch Pipeline#
Here’s a complete drift detection function you can wire into a cron job, Prefect flow, or Airflow DAG. It loads the reference profile, profiles the new batch, runs drift scoring and constraints, and returns a structured result:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
| import json
import logging
from datetime import datetime
from pathlib import Path
import pandas as pd
import whylogs as why
from whylogs.core import DatasetProfileView
from whylogs.core.constraints import ConstraintsBuilder, MetricConstraint, MetricsSelector
from whylogs.viz.drift.column_drift_algorithms import calculate_drift_scores
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("drift_pipeline")
def run_drift_check(
new_data: pd.DataFrame,
reference_profile_path: str,
drift_threshold: float = 0.05,
output_dir: str = "drift_reports",
) -> dict:
"""Run drift detection against a saved reference profile.
Args:
new_data: The incoming production DataFrame.
reference_profile_path: Path to the saved reference profile .bin file.
drift_threshold: p-value threshold below which drift is flagged.
output_dir: Directory to save drift reports.
Returns:
Dictionary with drift results and constraint validation status.
"""
# Load reference profile
reference_profile = DatasetProfileView.read(reference_profile_path)
# Profile new data
new_result = why.log(pandas=new_data)
new_profile = new_result.view()
# Save the new profile for audit trail
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
Path(output_dir).mkdir(parents=True, exist_ok=True)
new_result.writer("local").write(dest=f"{output_dir}/{timestamp}")
# Calculate drift scores
drift_scores = calculate_drift_scores(
target_view=new_profile,
reference_view=reference_profile,
with_thresholds=True,
)
# Flag columns with significant drift
drifted_columns = []
for col, info in drift_scores.items():
pvalue = info.get("pvalue", 1.0)
if pvalue < drift_threshold:
drifted_columns.append({
"column": col,
"pvalue": pvalue,
"algorithm": info.get("algorithm", "unknown"),
"drift_category": info.get("drift_category", "DRIFT"),
})
# Run constraints
builder = ConstraintsBuilder(dataset_profile_view=new_profile)
for col in new_data.select_dtypes(include="number").columns:
builder.add_constraint(MetricConstraint(
name=f"{col}_non_negative",
condition=lambda dist: dist.min >= 0,
metric_selector=MetricsSelector(metric_name="distribution", column_name=col),
))
constraints = builder.build()
constraints_passed = constraints.validate()
result = {
"timestamp": timestamp,
"total_columns": len(drift_scores),
"drifted_columns": drifted_columns,
"drift_detected": len(drifted_columns) > 0,
"constraints_passed": constraints_passed,
}
# Save report
report_path = f"{output_dir}/{timestamp}_report.json"
with open(report_path, "w") as f:
json.dump(result, f, indent=2, default=str)
logger.info(f"Drift check complete. Drifted columns: {len(drifted_columns)}")
return result
|
Call it from a scheduled job:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| # Example: daily batch drift check
import pandas as pd
daily_data = pd.read_parquet("data/production/2026-02-15.parquet")
result = run_drift_check(
new_data=daily_data,
reference_profile_path="profiles/reference/profile.bin",
drift_threshold=0.05,
)
if result["drift_detected"]:
print(f"DRIFT ALERT: {len(result['drifted_columns'])} columns drifted")
for col_info in result["drifted_columns"]:
print(f" - {col_info['column']}: p={col_info['pvalue']:.4f} ({col_info['algorithm']})")
|
For Prefect, wrap it as a flow:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| from prefect import flow, task
@task
def load_daily_data(date_str: str) -> pd.DataFrame:
return pd.read_parquet(f"data/production/{date_str}.parquet")
@task
def check_drift(data: pd.DataFrame) -> dict:
return run_drift_check(
new_data=data,
reference_profile_path="profiles/reference/profile.bin",
)
@task
def send_alert(result: dict):
if result["drift_detected"]:
# Replace with your actual alerting (Slack, PagerDuty, email)
columns = [c["column"] for c in result["drifted_columns"]]
message = f"Data drift detected in columns: {', '.join(columns)}"
print(f"ALERT: {message}")
# Example: requests.post(slack_webhook_url, json={"text": message})
@flow(name="daily-drift-check")
def daily_drift_pipeline(date_str: str):
data = load_daily_data(date_str)
result = check_drift(data)
send_alert(result)
return result
|
Common Errors and Fixes#
ModuleNotFoundError: No module named 'whylogs.viz'
You installed the base package without viz extras. Fix it:
1
| pip install "whylogs[viz]"
|
FileNotFoundError when reading a saved profile
The profile writer saves to a subdirectory. The actual file is profile.bin inside the directory you specified. If you wrote to profiles/reference, read from profiles/reference/profile.bin.
KeyError in drift scores for a column
calculate_drift_scores only returns scores for columns that exist in both profiles. If a column was added or removed in production data (a schema change), it won’t appear in the scores. Check for missing columns explicitly:
1
2
3
4
5
6
7
8
| reference_columns = set(reference_profile.get_columns().keys())
production_columns = set(production_profile.get_columns().keys())
added = production_columns - reference_columns
removed = reference_columns - production_columns
if added or removed:
print(f"Schema drift detected! Added: {added}, Removed: {removed}")
|
Drift scores all show NO_DRIFT even when data clearly changed
With small sample sizes (under ~30 rows), the KS test lacks statistical power to detect drift. whylogs profiles are designed for production-scale data. If you’re testing with tiny DataFrames, the p-values will be high regardless. Use at least a few hundred rows per batch for meaningful drift signals.
TypeError: 'NoneType' object is not subscriptable in constraint lambdas
This happens when the metric you’re selecting doesn’t exist for that column. For example, selecting distribution on a string column that has no numeric statistics. Check profile_view.get_column(col_name).to_summary_dict() to see which metrics are available for a given column.
Profile files are unexpectedly large
By default, whylogs tracks frequent items which can inflate profile size for high-cardinality string columns. If you only need distribution stats, configure a custom schema to disable frequent items tracking on those columns.