Set Up Great Expectations with a Parquet Data Source

Bad training data is the number one silent killer of ML models. Not bad architectures, not wrong hyperparameters – bad data. The fix is automated validation that runs on every data refresh, catches problems before training, and pages you when something breaks. That’s what we’re building here: a Great Expectations validation pipeline orchestrated by Airflow.

Install GX with pandas support:

1
pip install 'great_expectations[pandas]' apache-airflow pandas pyarrow

Start by connecting GX v1 to a Parquet data source. The fluent API uses context.data_sources to register backends and add_batch_definition_whole_dataframe to point at your data:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import great_expectations as gx
import pandas as pd
import numpy as np

# Load your ML dataset from Parquet
df = pd.read_parquet("data/training_features.parquet")

# Create an ephemeral context (no YAML config files needed)
context = gx.get_context(mode="ephemeral")

# Register a pandas data source
data_source = context.data_sources.add_pandas(name="ml_monitoring")
data_asset = data_source.add_dataframe_asset(name="training_features")

# Define a batch that covers the entire DataFrame
batch_definition = data_asset.add_batch_definition_whole_dataframe(
    name="full_batch"
)

This gives you a reusable data source that you can validate against any expectation suite. The ephemeral context keeps everything in memory – no filesystem config, no great_expectations.yml to manage. For persistent setups where you want to track validation history, swap mode="ephemeral" for mode="file" and pass a project_root_dir.

Define an Expectation Suite Programmatically

An expectation suite is a named collection of rules your data must satisfy. For ML monitoring, you want four categories of checks: schema enforcement, null detection, value range validation, and distribution stability.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# Create the suite
suite = context.suites.add(
    gx.ExpectationSuite(name="training_data_monitor")
)

# --- Schema checks ---
# Catch column renames, drops, or additions from upstream changes
suite.add_expectation(
    gx.expectations.ExpectTableColumnsToMatchSet(
        column_set=["user_id", "age", "income", "credit_score", "label"],
        exact_match=True,
    )
)

# Enforce column types -- a float column that silently becomes object will
# pass through pandas without errors but produce garbage embeddings
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeOfType(column="income", type_="float64")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeOfType(column="label", type_="int64")
)

# --- Null checks ---
# Zero tolerance on label column, 1% tolerance on features
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="label")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="age", mostly=0.99)
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="income", mostly=0.99)
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="credit_score", mostly=0.99)
)

# --- Value range checks ---
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="age", min_value=18, max_value=120
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="credit_score", min_value=300, max_value=850
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="income", min_value=0, max_value=1_000_000
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnDistinctValuesToBeInSet(
        column="label", value_set=[0, 1]
    )
)

# --- Distribution checks ---
# If mean income drifts far from the training baseline, the model is
# making predictions on a population it wasn't designed for
suite.add_expectation(
    gx.expectations.ExpectColumnMeanToBeBetween(
        column="income", min_value=35000, max_value=80000
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnStdevToBeBetween(
        column="income", min_value=5000, max_value=40000
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnMedianToBeBetween(
        column="age", min_value=25, max_value=65
    )
)

# Row count sanity -- catches truncated exports and runaway joins
suite.add_expectation(
    gx.expectations.ExpectTableRowCountToBeBetween(
        min_value=500, max_value=5_000_000
    )
)

Set your distribution bounds based on a known-good baseline. Pull statistics from a validated reference dataset and add a 20-30% buffer in each direction. Tight bounds catch drift faster but create more false alerts – start loose and tighten as you learn what normal looks like.

Run Validations and Generate Data Docs

Wire the suite to a checkpoint and run it. The checkpoint is the execution unit in GX v1 – it binds a validation definition (data + suite) and produces results you can inspect or export.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Create a validation definition
validation_definition = context.validation_definitions.add(
    gx.ValidationDefinition(
        name="monitor_training_data",
        data=batch_definition,
        suite=suite,
    )
)

# Build a checkpoint
checkpoint = context.checkpoints.add(
    gx.Checkpoint(
        name="daily_monitoring_checkpoint",
        validation_definitions=[validation_definition],
    )
)

# Run the checkpoint with the actual data
results = checkpoint.run(batch_parameters={"dataframe": df})

# Print a summary
print(f"Overall pass: {results.success}")
for run_result in results.run_results.values():
    for r in run_result["validation_result"].results:
        status = "PASS" if r.success else "FAIL"
        print(f"  [{status}] {r.expectation_config.type}: {r.expectation_config.kwargs}")

If you use a file-based context (mode="file"), GX automatically generates Data Docs – a static HTML report with every validation result, historical trends, and drill-down details. You can serve these on an internal web server or push them to S3 for team visibility.

For ephemeral contexts, extract the results programmatically and log them wherever your team already looks – Slack, PagerDuty, or a monitoring dashboard.

Orchestrate with Apache Airflow

Now wrap the validation in an Airflow DAG that runs on a schedule. We’re using the TaskFlow API with @task decorators – it’s cleaner than the old PythonOperator pattern and handles XCom returns automatically.

Save this as dags/dataset_monitoring_dag.py:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
from datetime import datetime, timedelta
from airflow.decorators import dag, task
import json


@dag(
    dag_id="dataset_monitoring_pipeline",
    schedule="0 5 * * *",  # Daily at 5 AM UTC
    start_date=datetime(2026, 2, 1),
    catchup=False,
    default_args={
        "owner": "ml-team",
        "retries": 1,
        "retry_delay": timedelta(minutes=10),
    },
    tags=["monitoring", "data-quality", "ml"],
)
def dataset_monitoring_pipeline():

    @task()
    def validate_dataset() -> dict:
        """Run Great Expectations validation against the latest data snapshot."""
        import great_expectations as gx
        import pandas as pd

        # Load the latest dataset -- in production, this path comes from
        # your data pipeline or a metadata store
        df = pd.read_parquet("/opt/airflow/data/training_features.parquet")

        context = gx.get_context(mode="ephemeral")
        data_source = context.data_sources.add_pandas(name="monitoring_source")
        data_asset = data_source.add_dataframe_asset(name="features")
        batch_def = data_asset.add_batch_definition_whole_dataframe(name="batch")

        suite = context.suites.add(
            gx.ExpectationSuite(name="daily_monitor")
        )

        # Schema
        suite.add_expectation(
            gx.expectations.ExpectTableColumnsToMatchSet(
                column_set=["user_id", "age", "income", "credit_score", "label"],
                exact_match=True,
            )
        )

        # Nulls
        for col in ["age", "income", "credit_score"]:
            suite.add_expectation(
                gx.expectations.ExpectColumnValuesToNotBeNull(column=col, mostly=0.99)
            )
        suite.add_expectation(
            gx.expectations.ExpectColumnValuesToNotBeNull(column="label")
        )

        # Ranges
        suite.add_expectation(
            gx.expectations.ExpectColumnValuesToBeBetween(
                column="credit_score", min_value=300, max_value=850
            )
        )

        # Distribution
        suite.add_expectation(
            gx.expectations.ExpectColumnMeanToBeBetween(
                column="income", min_value=35000, max_value=80000
            )
        )

        # Row count
        suite.add_expectation(
            gx.expectations.ExpectTableRowCountToBeBetween(
                min_value=500, max_value=5_000_000
            )
        )

        validation_def = context.validation_definitions.add(
            gx.ValidationDefinition(
                name="daily_validation", data=batch_def, suite=suite
            )
        )
        checkpoint = context.checkpoints.add(
            gx.Checkpoint(
                name="daily_checkpoint",
                validation_definitions=[validation_def],
            )
        )

        results = checkpoint.run(batch_parameters={"dataframe": df})

        failures = []
        for run_result in results.run_results.values():
            for r in run_result["validation_result"].results:
                if not r.success:
                    failures.append({
                        "type": r.expectation_config.type,
                        "kwargs": str(r.expectation_config.kwargs),
                    })

        return {
            "success": results.success,
            "row_count": len(df),
            "total_checks": sum(
                1 for rr in results.run_results.values()
                for _ in rr["validation_result"].results
            ),
            "failures": failures,
        }

    @task()
    def alert_on_failure(validation_result: dict):
        """Send an alert if validation failed."""
        import urllib.request

        if validation_result["success"]:
            print(
                f"All {validation_result['total_checks']} checks passed "
                f"on {validation_result['row_count']} rows."
            )
            return

        message = (
            f"Dataset validation FAILED.\n"
            f"Rows: {validation_result['row_count']}\n"
            f"Failed checks: {len(validation_result['failures'])}\n"
        )
        for f in validation_result["failures"]:
            message += f"  - {f['type']}: {f['kwargs']}\n"

        # Post to Slack via incoming webhook
        slack_webhook = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
        payload = json.dumps({"text": message}).encode("utf-8")
        req = urllib.request.Request(
            slack_webhook,
            data=payload,
            headers={"Content-Type": "application/json"},
        )
        try:
            urllib.request.urlopen(req)
            print("Slack alert sent.")
        except Exception as e:
            print(f"Slack alert failed: {e}")
            # Still raise so Airflow marks the task as failed
            raise

    @task.branch()
    def decide_next_step(validation_result: dict) -> str:
        """Branch: trigger training only if data is valid."""
        if validation_result["success"]:
            return "trigger_training"
        return "skip_training"

    @task(task_id="trigger_training")
    def trigger_training():
        """Kick off model training with validated data."""
        print("Data passed all checks. Triggering training job...")
        # In production, call your training API, submit a Kubernetes job,
        # or trigger another DAG with TriggerDagRunOperator
        import subprocess
        subprocess.run(
            ["python", "/opt/airflow/scripts/train_model.py",
             "--data", "/opt/airflow/data/training_features.parquet"],
            check=True,
        )

    @task(task_id="skip_training")
    def skip_training():
        print("Skipping training due to validation failures.")

    # Wire the DAG
    result = validate_dataset()
    alert_on_failure(result)
    decision = decide_next_step(result)
    decision >> [trigger_training(), skip_training()]


dataset_monitoring_pipeline()

A few things worth noting about this DAG:

  • @task.branch() handles conditional execution. If validation fails, training is skipped and the alert task fires. You never train on bad data by accident.
  • XCom serialization happens automatically with TaskFlow. The validate_dataset return dict gets passed to downstream tasks as function arguments. No manual xcom_push/xcom_pull.
  • Schedule at 0 5 * * * – run validation an hour before your training window. This gives your team time to investigate and fix data issues before the training job would have started.

For email alerts instead of Slack, configure Airflow’s SMTP settings in airflow.cfg and use email_on_failure=True in your default_args. Airflow sends an email automatically when any task fails, which means a validation failure in validate_dataset already triggers a notification without extra code.

Common Errors and Fixes

TypeError: get_context() got an unexpected keyword argument 'mode'

You’re on GX 0.x. The mode parameter was added in GX 1.0. Upgrade:

1
pip install --upgrade 'great_expectations[pandas]'

The entire API surface changed between 0.x and 1.x. If you see context.sources instead of context.data_sources, or validator.expect_* instead of suite-level expectations, you’re reading 0.x documentation.

AttributeError: 'EphemeralDataContext' object has no attribute 'sources'

The attribute was renamed from sources to data_sources in GX 1.x. Change context.sources.add_pandas(...) to context.data_sources.add_pandas(...).

Airflow task fails with ModuleNotFoundError: No module named 'great_expectations'

GX needs to be installed in the same Python environment as your Airflow workers. If you use Docker, add it to your Airflow image:

1
2
FROM apache/airflow:2.10.0-python3.11
RUN pip install --no-cache-dir 'great_expectations[pandas]' pyarrow

mostly parameter set to a percentage instead of a fraction

mostly=99 does not mean 99%. It means “99 times the total rows,” which always fails. Use mostly=0.99 for a 99% pass threshold.

XCom serialization errors with numpy types

If your validation results contain numpy int64 or float64 values, XCom’s JSON serializer chokes. Cast to native Python types before returning:

1
2
3
4
return {
    "row_count": int(len(df)),
    "mean_income": float(df["income"].mean()),
}

DAG not appearing in the Airflow UI

Airflow re-scans the dags/ folder every 30 seconds by default. If your DAG still doesn’t show up after a minute, check for import errors:

1
docker compose exec airflow-webserver python /opt/airflow/dags/dataset_monitoring_dag.py

Any exception here means Airflow silently skipped your DAG file.