ML datasets change constantly. New labels arrive, annotations get corrected, bad samples get removed, and fresh data gets added weekly. If you’re not versioning those changes, you can’t reproduce a training run from last month, and you can’t debug why your model suddenly regressed.
Delta Lake solves this by adding ACID transactions, time travel, and schema enforcement on top of plain Parquet files. Every write creates a new version. You can load any historical snapshot instantly. No copies, no manual file naming schemes, no git-lfs headaches.
Here’s how fast you can get started:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| import pandas as pd
from deltalake import write_deltalake, DeltaTable
df = pd.DataFrame({
"sample_id": [1, 2, 3],
"feature_a": [0.5, 0.8, 0.3],
"feature_b": [1.2, 0.9, 1.7],
"label": ["cat", "dog", "cat"],
})
write_deltalake("data/training_set", df)
dt = DeltaTable("data/training_set")
print(f"Version: {dt.version()}") # Version: 0
|
That single write_deltalake call creates a Delta table with a transaction log, schema metadata, and your data stored as Parquet. Every subsequent write bumps the version number automatically.
Setting Up Delta Lake#
Install the Python package. No JVM, no Spark, no heavy dependencies – deltalake is built on Rust via delta-rs.
1
| pip install deltalake pandas pyarrow
|
Now create a Delta table with realistic ML training data:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| import pandas as pd
from deltalake import write_deltalake, DeltaTable
training_data = pd.DataFrame({
"sample_id": range(1, 101),
"text": [f"Sample text document number {i}" for i in range(1, 101)],
"embedding_dim_0": [round(i * 0.01, 4) for i in range(1, 101)],
"embedding_dim_1": [round(i * 0.02, 4) for i in range(1, 101)],
"label": ["positive" if i % 3 != 0 else "negative" for i in range(1, 101)],
"annotator": ["alice" if i % 2 == 0 else "bob" for i in range(1, 101)],
})
table_path = "data/text_classification"
write_deltalake(table_path, training_data)
dt = DeltaTable(table_path)
print(f"Created table at version {dt.version()} with {len(dt.to_pandas())} rows")
# Created table at version 0 with 100 rows
|
The data/text_classification/ directory now contains Parquet files and a _delta_log/ folder. The transaction log tracks every change – what files were added, removed, and the schema at each version.
Time Travel for Dataset Versions#
This is where Delta Lake shines for ML workflows. Every write operation creates a new version, and you can read any past version instantly.
Append some corrected labels and then look at both versions:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| from deltalake import write_deltalake, DeltaTable
# Append corrected labels for 10 samples
corrections = pd.DataFrame({
"sample_id": range(101, 111),
"text": [f"Corrected sample {i}" for i in range(101, 111)],
"embedding_dim_0": [round(i * 0.01, 4) for i in range(101, 111)],
"embedding_dim_1": [round(i * 0.02, 4) for i in range(101, 111)],
"label": ["positive"] * 5 + ["negative"] * 5,
"annotator": ["carol"] * 10,
})
write_deltalake("data/text_classification", corrections, mode="append")
dt = DeltaTable("data/text_classification")
print(f"Current version: {dt.version()}") # Current version: 1
print(f"Current row count: {len(dt.to_pandas())}") # 110 rows
# Load version 0 -- the original 100 rows
dt.load_as_version(0)
original_df = dt.to_pandas()
print(f"Version 0 row count: {len(original_df)}") # 100 rows
|
Check the full audit history to see what changed and when:
1
2
3
| dt = DeltaTable("data/text_classification")
for entry in dt.history():
print(f"Version {entry['version']}: {entry['operation']} at {entry['timestamp']}")
|
This prints something like:
1
2
| Version 1: WRITE at 2026-02-15 14:30:22
Version 0: CREATE TABLE at 2026-02-15 14:30:18
|
You can also load a version by passing it directly to the constructor:
1
2
| dt_v0 = DeltaTable("data/text_classification", version=0)
df_v0 = dt_v0.to_pandas()
|
Updating and Merging Data#
Appending is simple, but real ML data pipelines need upserts. New annotations come in for existing samples, and you want to update them without duplicating rows.
Delta Lake’s merge operation handles this. Say you have corrected labels for some existing samples plus a few brand new ones:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| import pyarrow as pa
from deltalake import DeltaTable
dt = DeltaTable("data/text_classification")
# Incoming data: updated labels for existing IDs + new samples
incoming = pa.table({
"sample_id": pa.array([5, 10, 15, 200, 201], pa.int64()),
"text": pa.array([
"Revised sample 5", "Revised sample 10", "Revised sample 15",
"Brand new sample 200", "Brand new sample 201",
]),
"embedding_dim_0": pa.array([0.05, 0.10, 0.15, 2.00, 2.01]),
"embedding_dim_1": pa.array([0.10, 0.20, 0.30, 4.00, 4.02]),
"label": pa.array(["negative", "positive", "positive", "negative", "positive"]),
"annotator": pa.array(["carol", "carol", "carol", "dave", "dave"]),
})
(
dt.merge(
source=incoming,
predicate="target.sample_id = source.sample_id",
source_alias="source",
target_alias="target",
)
.when_matched_update_all()
.when_not_matched_insert_all()
.execute()
)
dt = DeltaTable("data/text_classification")
print(f"After merge -- version: {dt.version()}, rows: {len(dt.to_pandas())}")
# After merge -- version: 2, rows: 112 (3 updated in place, 2 new)
|
The merge matched 3 existing IDs (5, 10, 15) and updated their fields. The 2 new IDs (200, 201) got inserted. Version bumped to 2 automatically.
You can also overwrite specific partitions if your table is partitioned:
1
2
3
4
5
6
| write_deltalake(
"data/text_classification",
new_batch_df,
mode="overwrite",
predicate="annotator = 'carol'",
)
|
This replaces only the rows where annotator = 'carol', leaving everything else untouched.
Integrating with ML Training#
The real payoff is tying dataset versions to model training. When you log which dataset version produced which model, you get full reproducibility.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| from deltalake import DeltaTable
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
# Pin the dataset version for this training run
dataset_version = 1
dt = DeltaTable("data/text_classification", version=dataset_version)
df = dt.to_pandas()
# Prepare features and labels
vectorizer = TfidfVectorizer(max_features=500)
X = vectorizer.fit_transform(df["text"])
y = (df["label"] == "positive").astype(int)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
model = LogisticRegression(max_iter=200)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
acc = accuracy_score(y_test, y_pred)
# Log the dataset version alongside model metrics
run_metadata = {
"dataset_path": "data/text_classification",
"dataset_version": dataset_version,
"dataset_rows": len(df),
"accuracy": round(acc, 4),
"model": "LogisticRegression",
}
print(run_metadata)
# {'dataset_path': 'data/text_classification', 'dataset_version': 1,
# 'dataset_rows': 110, 'accuracy': 0.8636, 'model': 'LogisticRegression'}
|
Now if your model degrades next week, you can diff the dataset versions to find what changed:
1
2
3
4
5
6
7
8
9
| dt_old = DeltaTable("data/text_classification", version=1)
dt_new = DeltaTable("data/text_classification")
df_old = dt_old.to_pandas()
df_new = dt_new.to_pandas()
added_ids = set(df_new["sample_id"]) - set(df_old["sample_id"])
removed_ids = set(df_old["sample_id"]) - set(df_new["sample_id"])
print(f"Added {len(added_ids)} samples, removed {len(removed_ids)} samples")
|
Store run_metadata in your experiment tracker (Weights & Biases, MLflow, or even a JSON file) and you’ve got a complete lineage from raw data version to trained model.
Common Errors and Fixes#
DeltaError: table already exists when writing
The default write mode is "error", which refuses to write if the table path exists. If you’re adding data to an existing table, pass mode="append" or mode="overwrite":
1
2
3
4
5
| # Wrong -- fails on second write
write_deltalake("data/my_table", df)
# Right -- appends to existing table
write_deltalake("data/my_table", df, mode="append")
|
SchemaMismatchError when appending data with new columns
If your new DataFrame has columns that don’t exist in the table, Delta Lake rejects the write by default. Pass schema_mode="merge" to add the new columns automatically:
1
2
3
4
5
6
| write_deltalake(
"data/my_table",
df_with_extra_columns,
mode="append",
schema_mode="merge",
)
|
Existing rows get null values for the new columns. This is better than silently dropping columns.
DeltaError: No such file or directory for time travel to old versions
If you ran VACUUM on the table (or it ran automatically), old Parquet files may have been deleted. Delta Lake’s vacuum removes files no longer referenced by recent versions. To keep more history, set a longer retention period before vacuuming:
1
2
3
| dt = DeltaTable("data/my_table")
# Keep 30 days of history (default is 7 days)
dt.vacuum(retention_hours=720, enforce_retention_duration=True, dry_run=False)
|
If you need to time travel to old versions regularly, avoid aggressive vacuum schedules. The transaction log metadata is always preserved – only the underlying Parquet files get cleaned up.