When your model starts producing garbage predictions, the first question is always the same: what happened to the data? If you can’t trace every transformation from raw CSV to training set, you’re debugging blind. Commercial lineage tools exist, but most ML teams don’t need a full-blown data catalog. You need something you can drop into an existing pandas pipeline in an afternoon.
Here’s a lightweight LineageTracker that records every transformation step with metadata, stores it in JSON, and can render the full lineage graph.
The LineageTracker Class#
This is the core. It wraps your pipeline steps and records inputs, outputs, row counts, column changes, and timestamps for each transformation.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
| import json
import hashlib
import datetime
from dataclasses import dataclass, field, asdict
from typing import Any, Callable
from functools import wraps
import pandas as pd
@dataclass
class LineageNode:
step_id: str
name: str
operation: str
input_shape: tuple
output_shape: tuple
columns_added: list
columns_removed: list
timestamp: str
parent_id: str | None = None
metadata: dict = field(default_factory=dict)
class LineageTracker:
def __init__(self, pipeline_name: str):
self.pipeline_name = pipeline_name
self.nodes: list[LineageNode] = []
self._current_df: pd.DataFrame | None = None
def _make_id(self, name: str) -> str:
raw = f"{self.pipeline_name}:{name}:{datetime.datetime.now().isoformat()}"
return hashlib.sha256(raw.encode()).hexdigest()[:12]
def track(self, name: str, operation: str = "transform", metadata: dict | None = None):
"""Decorator that records lineage for a function taking and returning a DataFrame."""
def decorator(func: Callable[[pd.DataFrame], pd.DataFrame]):
@wraps(func)
def wrapper(df: pd.DataFrame, *args, **kwargs) -> pd.DataFrame:
input_cols = set(df.columns)
input_shape = df.shape
parent_id = self.nodes[-1].step_id if self.nodes else None
result = func(df, *args, **kwargs)
output_cols = set(result.columns)
node = LineageNode(
step_id=self._make_id(name),
name=name,
operation=operation,
input_shape=input_shape,
output_shape=result.shape,
columns_added=sorted(output_cols - input_cols),
columns_removed=sorted(input_cols - output_cols),
timestamp=datetime.datetime.now().isoformat(),
parent_id=parent_id,
metadata=metadata or {},
)
self.nodes.append(node)
return result
return wrapper
return decorator
def record_source(self, df: pd.DataFrame, name: str, source_path: str) -> pd.DataFrame:
"""Record the initial data source as the root node."""
node = LineageNode(
step_id=self._make_id(name),
name=name,
operation="source",
input_shape=(0, 0),
output_shape=df.shape,
columns_added=list(df.columns),
columns_removed=[],
timestamp=datetime.datetime.now().isoformat(),
parent_id=None,
metadata={"source_path": source_path},
)
self.nodes.append(node)
return df
def save(self, path: str) -> None:
payload = {
"pipeline": self.pipeline_name,
"created_at": datetime.datetime.now().isoformat(),
"steps": [asdict(n) for n in self.nodes],
}
with open(path, "w") as f:
json.dump(payload, f, indent=2)
print(f"Lineage saved to {path} ({len(self.nodes)} steps)")
def print_summary(self) -> None:
print(f"\n{'='*60}")
print(f"Pipeline: {self.pipeline_name}")
print(f"Steps: {len(self.nodes)}")
print(f"{'='*60}")
for i, node in enumerate(self.nodes):
arrow = " " if i == 0 else "-> "
print(f"{arrow}[{node.operation}] {node.name}")
print(f" Shape: {node.input_shape} -> {node.output_shape}")
if node.columns_added:
print(f" + columns: {node.columns_added}")
if node.columns_removed:
print(f" - columns: {node.columns_removed}")
|
That’s around 90 lines. No external dependencies beyond pandas. The track decorator is the key piece – you slap it on any function that takes a DataFrame and returns a DataFrame, and it captures the before/after state automatically.
Running a Full Pipeline#
Here’s a realistic pipeline: load data, clean it, engineer features, and split for training. We’ll create sample data inline so you can run this immediately.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
| import numpy as np
# --- Set up tracker and sample data ---
tracker = LineageTracker("customer_churn_pipeline")
raw_data = pd.DataFrame({
"customer_id": range(1, 1001),
"age": np.random.randint(18, 75, 1000),
"monthly_spend": np.random.uniform(10.0, 500.0, 1000).round(2),
"tenure_months": np.random.randint(1, 120, 1000),
"support_tickets": np.random.randint(0, 20, 1000),
"last_login": pd.date_range("2025-01-01", periods=1000, freq="h"),
"plan": np.random.choice(["free", "basic", "premium", None], 1000),
"churned": np.random.choice([0, 1], 1000, p=[0.7, 0.3]),
})
df = tracker.record_source(raw_data, "raw_customer_data", "s3://data-lake/customers/2025-01.parquet")
# --- Step 1: Clean ---
@tracker.track("drop_nulls_and_dupes", operation="clean")
def clean(df: pd.DataFrame) -> pd.DataFrame:
df = df.dropna(subset=["plan"])
df = df.drop_duplicates(subset=["customer_id"])
return df
# --- Step 2: Feature engineering ---
@tracker.track("engineer_features", operation="feature_engineering", metadata={"features_created": 3})
def engineer_features(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df["spend_per_month_tenure"] = (df["monthly_spend"] / df["tenure_months"].clip(lower=1)).round(4)
df["tickets_per_tenure"] = (df["support_tickets"] / df["tenure_months"].clip(lower=1)).round(4)
df["is_premium"] = (df["plan"] == "premium").astype(int)
df = df.drop(columns=["last_login", "plan"])
return df
# --- Step 3: Filter outliers ---
@tracker.track("filter_outliers", operation="filter", metadata={"method": "IQR", "column": "monthly_spend"})
def filter_outliers(df: pd.DataFrame) -> pd.DataFrame:
q1 = df["monthly_spend"].quantile(0.25)
q3 = df["monthly_spend"].quantile(0.75)
iqr = q3 - q1
mask = (df["monthly_spend"] >= q1 - 1.5 * iqr) & (df["monthly_spend"] <= q3 + 1.5 * iqr)
return df[mask].reset_index(drop=True)
# --- Step 4: Train/test split ---
@tracker.track("train_test_split", operation="split", metadata={"test_ratio": 0.2})
def split_data(df: pd.DataFrame) -> pd.DataFrame:
df = df.sample(frac=1, random_state=42).reset_index(drop=True)
split_idx = int(len(df) * 0.8)
# Tag rows so downstream knows which partition they belong to
df = df.copy()
df["_split"] = "train"
df.loc[split_idx:, "_split"] = "test"
return df
# --- Execute the pipeline ---
df = clean(df)
df = engineer_features(df)
df = filter_outliers(df)
df = split_data(df)
tracker.print_summary()
tracker.save("lineage_customer_churn.json")
|
Running this prints something like:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| ============================================================
Pipeline: customer_churn_pipeline
Steps: 5
============================================================
[source] raw_customer_data
Shape: (0, 0) -> (1000, 8)
-> [clean] drop_nulls_and_dupes
Shape: (1000, 8) -> (748, 8)
-> [feature_engineering] engineer_features
Shape: (748, 8) -> (748, 9)
+ columns: ['is_premium', 'spend_per_month_tenure', 'tickets_per_tenure']
- columns: ['last_login', 'plan']
-> [filter] filter_outliers
Shape: (748, 9) -> (745, 9)
-> [split] train_test_split
Shape: (745, 9) -> (745, 10)
+ columns: ['_split']
|
Every step is captured. You can see exactly where rows were dropped, which columns appeared or disappeared, and the order of operations.
Visualizing the Lineage Graph#
If you want a visual representation, graphviz makes it straightforward.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| from graphviz import Digraph
def render_lineage(tracker: LineageTracker, output_path: str = "lineage_graph") -> None:
dot = Digraph(comment=tracker.pipeline_name)
dot.attr(rankdir="TB", bgcolor="#1a1a2e", fontcolor="white")
dot.attr("node", shape="box", style="filled", fillcolor="#16213e",
fontcolor="#00ff88", fontname="monospace", fontsize="10")
dot.attr("edge", color="#00ff88")
for node in tracker.nodes:
label = (
f"{node.name}\n"
f"[{node.operation}]\n"
f"rows: {node.output_shape[0]} | cols: {node.output_shape[1]}"
)
dot.node(node.step_id, label=label)
for node in tracker.nodes:
if node.parent_id:
dot.edge(node.parent_id, node.step_id)
dot.render(output_path, format="png", cleanup=True)
print(f"Graph saved to {output_path}.png")
render_lineage(tracker)
|
Install graphviz with pip install graphviz and make sure the system package is installed too (apt install graphviz on Debian/Ubuntu). The output is a top-to-bottom directed graph showing each transformation node with row and column counts.
Querying Lineage Later#
The JSON file is queryable. Need to know what happened to a specific column? Load the lineage and search:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| def find_column_history(lineage_path: str, column_name: str) -> None:
with open(lineage_path) as f:
data = json.load(f)
print(f"History for column: '{column_name}'\n")
for step in data["steps"]:
if column_name in step["columns_added"]:
print(f" CREATED at step '{step['name']}' ({step['operation']})")
if column_name in step["columns_removed"]:
print(f" REMOVED at step '{step['name']}' ({step['operation']})")
find_column_history("lineage_customer_churn.json", "is_premium")
# Output:
# History for column: 'is_premium'
# CREATED at step 'engineer_features' (feature_engineering)
find_column_history("lineage_customer_churn.json", "plan")
# Output:
# History for column: 'plan'
# CREATED at step 'raw_customer_data' (source)
# REMOVED at step 'engineer_features' (feature_engineering)
|
This is a simple example but the pattern scales. You can add column-level transformations, data hashes for detecting drift, or integration with MLflow to tie lineage to specific model runs.
Common Errors and Fixes#
TypeError: unsupported operand type(s) for |: 'type' and 'NoneType' when using str | None type hints. This syntax requires Python 3.10+. On older versions, use Optional[str] from typing instead:
1
2
3
| from typing import Optional
# Change: parent_id: str | None = None
# To: parent_id: Optional[str] = None
|
Decorator modifies the original DataFrame in place. If your transformation function doesn’t call df.copy(), pandas might modify the input DataFrame through the reference. Always copy inside your tracked functions when you’re adding or modifying columns. The engineer_features function above shows this pattern.
columns_added shows unexpected columns after a merge. When you join DataFrames, pandas appends suffixes like _x and _y. The tracker will faithfully record these. Either rename columns after the merge or pass explicit suffixes to pd.merge() so the lineage is readable.
JSON lineage file gets huge with many pipeline runs. Each save() call overwrites the file for that pipeline instance. If you’re running hundreds of pipeline versions, switch to SQLite:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| import sqlite3
def save_to_sqlite(tracker: LineageTracker, db_path: str = "lineage.db") -> None:
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS lineage (
step_id TEXT PRIMARY KEY,
pipeline TEXT,
name TEXT,
operation TEXT,
input_rows INTEGER,
input_cols INTEGER,
output_rows INTEGER,
output_cols INTEGER,
columns_added TEXT,
columns_removed TEXT,
timestamp TEXT,
parent_id TEXT,
metadata TEXT
)
""")
for node in tracker.nodes:
conn.execute(
"INSERT OR REPLACE INTO lineage VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)",
(
node.step_id, tracker.pipeline_name, node.name, node.operation,
node.input_shape[0], node.input_shape[1],
node.output_shape[0], node.output_shape[1],
json.dumps(node.columns_added), json.dumps(node.columns_removed),
node.timestamp, node.parent_id, json.dumps(node.metadata),
),
)
conn.commit()
conn.close()
print(f"Lineage saved to {db_path}")
|
graphviz.backend.execute.ExecutableNotFound means the system-level graphviz isn’t installed. The Python graphviz package is just a wrapper. Run apt install graphviz (Debian/Ubuntu), brew install graphviz (macOS), or conda install graphviz before using render_lineage.