Every ML team hits the same problem eventually. Your training pipeline wants Parquet. The labeling team needs CSV. The evaluation harness reads JSONL. And someone just asked for a HuggingFace Dataset they can push to the Hub. You end up with five different export scripts, each with its own quirks and bugs.
A better approach: one pipeline that takes your canonical dataset and exports it to any format on demand. Here’s how to build it.
Building the Core Export Pipeline#
Start with a class that wraps a PyArrow table and handles format dispatch. PyArrow is the backbone here because it gives you zero-copy conversion to pandas and efficient serialization to Parquet and IPC.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
| import json
import csv
from pathlib import Path
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
class DatasetExporter:
"""Exports a PyArrow table to multiple formats."""
def __init__(self, table: pa.Table):
self.table = table
@classmethod
def from_pandas(cls, df: pd.DataFrame) -> "DatasetExporter":
table = pa.Table.from_pandas(df, preserve_index=False)
return cls(table)
def to_parquet(self, path: str, compression: str = "snappy") -> None:
pq.write_table(self.table, path, compression=compression)
print(f"Wrote {self.table.num_rows} rows to {path}")
def to_csv(self, path: str) -> None:
df = self.table.to_pandas()
df.to_csv(path, index=False, quoting=csv.QUOTE_NONNUMERIC, encoding="utf-8")
print(f"Wrote {self.table.num_rows} rows to {path}")
def to_jsonl(self, path: str) -> None:
with open(path, "w", encoding="utf-8") as f:
for batch in self.table.to_batches(max_chunksize=1024):
# Convert batch to row-oriented dicts
cols = batch.to_pydict()
num_rows = len(next(iter(cols.values())))
for i in range(num_rows):
record = {k: v[i] for k, v in cols.items()}
f.write(json.dumps(record, default=str) + "\n")
print(f"Wrote {self.table.num_rows} rows to {path}")
def export_all(self, output_dir: str, stem: str = "dataset") -> dict:
out = Path(output_dir)
out.mkdir(parents=True, exist_ok=True)
paths = {}
for fmt, method in [
("parquet", self.to_parquet),
("csv", self.to_csv),
("jsonl", self.to_jsonl),
]:
dest = str(out / f"{stem}.{fmt}")
method(dest)
paths[fmt] = dest
return paths
# Usage
df = pd.DataFrame({
"text": ["The cat sat on the mat", "Dogs are great companions"],
"label": [0, 1],
"score": [0.95, 0.87],
})
exporter = DatasetExporter.from_pandas(df)
paths = exporter.export_all("./exports", stem="sentiment_v1")
# Wrote 2 rows to ./exports/sentiment_v1.parquet
# Wrote 2 rows to ./exports/sentiment_v1.csv
# Wrote 2 rows to ./exports/sentiment_v1.jsonl
|
The QUOTE_NONNUMERIC flag in the CSV writer prevents downstream parsers from misreading text fields that contain commas or newlines. This is the kind of thing that bites you at 2am when a labeler’s annotation includes a comma.
Handling Large Datasets with Chunked Export#
The naive approach loads everything into memory. That falls apart with datasets over a few GB. PyArrow’s RecordBatchWriter lets you stream Parquet without holding the full table in memory, and you can chunk CSV and JSONL the same way.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
| import pyarrow as pa
import pyarrow.parquet as pq
import json
import csv
def chunked_export_parquet(
batches: list[pa.RecordBatch], schema: pa.Schema, path: str
) -> None:
writer = pq.ParquetWriter(path, schema, compression="zstd")
total = 0
for batch in batches:
writer.write_batch(batch)
total += batch.num_rows
writer.close()
print(f"Streamed {total} rows to {path}")
def chunked_export_csv(
batches: list[pa.RecordBatch], path: str
) -> None:
total = 0
with open(path, "w", newline="", encoding="utf-8") as f:
writer = None
for batch in batches:
df_chunk = batch.to_pandas()
if writer is None:
df_chunk.to_csv(f, index=False, quoting=csv.QUOTE_NONNUMERIC)
else:
df_chunk.to_csv(f, index=False, header=False, quoting=csv.QUOTE_NONNUMERIC)
writer = True
total += batch.num_rows
print(f"Streamed {total} rows to {path}")
def chunked_export_jsonl(
batches: list[pa.RecordBatch], path: str
) -> None:
total = 0
with open(path, "w", encoding="utf-8") as f:
for batch in batches:
cols = batch.to_pydict()
num_rows = len(next(iter(cols.values())))
for i in range(num_rows):
record = {k: v[i] for k, v in cols.items()}
f.write(json.dumps(record, default=str) + "\n")
total += batch.num_rows
print(f"Streamed {total} rows to {path}")
# Simulate a large dataset as record batches
schema = pa.schema([
("text", pa.string()),
("label", pa.int32()),
("embedding", pa.list_(pa.float32())),
])
batches = []
for chunk_id in range(5):
batch = pa.record_batch({
"text": [f"sample text {chunk_id * 1000 + i}" for i in range(1000)],
"label": [i % 3 for i in range(1000)],
"embedding": [[0.1 * j for j in range(8)] for _ in range(1000)],
}, schema=schema)
batches.append(batch)
chunked_export_parquet(batches, schema, "./exports/large_dataset.parquet")
chunked_export_csv(batches, "./exports/large_dataset.csv")
chunked_export_jsonl(batches, "./exports/large_dataset.jsonl")
|
Using zstd compression for Parquet here instead of Snappy. It’s slower to write but gives you 20-30% smaller files, which matters when you’re shipping datasets around. Snappy is better if write speed is your bottleneck.
The key insight with chunked export: you never materialize the full dataset. If you’re reading from a database or another Parquet file, you can iterate over record batches and pipe them straight to the writer.
HuggingFace’s datasets library is the standard for sharing ML data. Converting from PyArrow is nearly free since HuggingFace Datasets uses Arrow under the hood.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
| from datasets import Dataset, Features, Value, Sequence
import pyarrow as pa
def export_to_hf_dataset(
table: pa.Table, save_path: str = None, push_to_hub: str = None
) -> Dataset:
"""Convert a PyArrow table to a HuggingFace Dataset."""
ds = Dataset(table)
if save_path:
ds.save_to_disk(save_path)
print(f"Saved HF dataset to {save_path} ({ds.num_rows} rows)")
if push_to_hub:
ds.push_to_hub(push_to_hub, private=True)
print(f"Pushed {ds.num_rows} rows to hub: {push_to_hub}")
return ds
# Build a table with types that map cleanly to HF features
table = pa.table({
"text": pa.array(["Hello world", "Testing export pipeline"], type=pa.string()),
"label": pa.array([1, 0], type=pa.int32()),
"confidence": pa.array([0.98, 0.72], type=pa.float64()),
"tokens": pa.array([["Hello", "world"], ["Testing", "export", "pipeline"]], type=pa.list_(pa.string())),
})
# Save locally
ds = export_to_hf_dataset(table, save_path="./exports/hf_dataset")
# Check that feature types survived the round trip
print(ds.features)
# {'text': Value(dtype='string'), 'label': Value(dtype='int32'),
# 'confidence': Value(dtype='float64'), 'tokens': Sequence(feature=Value(dtype='string'))}
# You can also define explicit features for tighter control
features = Features({
"text": Value("string"),
"label": Value("int32"),
"confidence": Value("float64"),
"tokens": Sequence(Value("string")),
})
ds_typed = Dataset(table, features=features)
print(ds_typed.features)
|
One gotcha: if your PyArrow table has pa.dictionary encoded columns (common after reading categorical CSVs), the HuggingFace Dataset constructor will handle it fine. But if you push to the Hub and reload, dictionary encoding is gone. The data is identical, just stored differently. Don’t rely on dictionary encoding surviving a Hub round trip.
Adding Schema Validation on Export#
Exporting garbage data in a valid format is worse than a failed export. You get silent data corruption that shows up as mysterious model regressions weeks later. Validate before you write.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
| import pyarrow as pa
class SchemaValidationError(Exception):
pass
def validate_schema(table: pa.Table, expected_schema: pa.Schema) -> None:
"""Validate table schema before export. Raises on mismatch."""
errors = []
# Check for missing columns
expected_names = set(expected_schema.names)
actual_names = set(table.schema.names)
missing = expected_names - actual_names
if missing:
errors.append(f"Missing columns: {missing}")
# Check type mismatches for columns that exist
for field in expected_schema:
if field.name in actual_names:
actual_type = table.schema.field(field.name).type
if actual_type != field.type:
errors.append(
f"Column '{field.name}': expected {field.type}, got {actual_type}"
)
# Check for null values in non-nullable fields
for field in expected_schema:
if not field.nullable and field.name in actual_names:
col = table.column(field.name)
if col.null_count > 0:
errors.append(
f"Column '{field.name}' has {col.null_count} nulls but is non-nullable"
)
if errors:
raise SchemaValidationError(
"Schema validation failed:\n " + "\n ".join(errors)
)
print("Schema validation passed")
# Define the expected schema
expected = pa.schema([
pa.field("text", pa.string(), nullable=False),
pa.field("label", pa.int32(), nullable=False),
pa.field("score", pa.float64(), nullable=True),
])
# Good table - passes validation
good_table = pa.table({
"text": pa.array(["hello", "world"], type=pa.string()),
"label": pa.array([1, 0], type=pa.int32()),
"score": pa.array([0.9, None], type=pa.float64()),
})
validate_schema(good_table, expected)
# Schema validation passed
# Bad table - wrong type for label
bad_table = pa.table({
"text": pa.array(["hello", "world"], type=pa.string()),
"label": pa.array(["positive", "negative"], type=pa.string()),
"score": pa.array([0.9, 0.1], type=pa.float64()),
})
try:
validate_schema(bad_table, expected)
except SchemaValidationError as e:
print(e)
# Schema validation failed:
# Column 'label': expected int32, got string
|
Wire this into your DatasetExporter class by calling validate_schema at the top of each export method. Fail fast, fail loud.
Common Errors and Fixes#
UnicodeEncodeError when writing CSV:
This happens when your text data has characters outside the default encoding. Always specify encoding="utf-8" explicitly. If you’re reading from a legacy system, you might also need to clean the data first:
1
2
| # Fix encoding issues before export
df["text"] = df["text"].str.encode("utf-8", errors="replace").str.decode("utf-8")
|
ArrowInvalid: Could not convert X with type Y:
PyArrow is strict about types. If a column has mixed types (some ints, some strings), pa.Table.from_pandas will fail. Cast the column first:
1
2
3
| # Force a column to string before conversion
df["mixed_column"] = df["mixed_column"].astype(str)
table = pa.Table.from_pandas(df, preserve_index=False)
|
MemoryError on large Parquet files:
Don’t use pq.write_table for tables over ~2GB. Switch to the chunked approach with ParquetWriter shown above. You can also set row_group_size to control memory:
1
| pq.write_table(table, "output.parquet", row_group_size=100_000)
|
JSONL floats losing precision:
Python’s json.dumps rounds floats. If you need exact decimal representation (financial data, precise scores), pass a custom encoder:
1
2
3
4
5
6
7
8
9
10
| import json
import decimal
class PrecisionEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, float):
return format(obj, ".10g")
return super().default(obj)
json.dumps(record, cls=PrecisionEncoder)
|
HuggingFace Dataset push fails with large files:
The Hub has a 50GB per-file limit and uses Git LFS. If your dataset is huge, shard it:
1
| ds.push_to_hub("your-org/dataset-name", max_shard_size="500MB")
|
This splits the upload into multiple Parquet files, each under 500MB.