Apache Arrow gives you a columnar memory format that both Pandas and your ML framework already understand. Instead of serializing data between every step in your pipeline, Arrow keeps everything in a single in-memory layout and passes pointers around. That means zero-copy reads between producers and consumers, which matters a lot when you’re ingesting gigabytes of training data.

Here’s the fastest way to get started:

1
pip install pyarrow pandas numpy
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import pyarrow as pa

schema = pa.schema([
    ("user_id", pa.int64()),
    ("feature_a", pa.float64()),
    ("feature_b", pa.float64()),
    ("label", pa.int32()),
    ("timestamp", pa.timestamp("ms")),
])

batch = pa.record_batch(
    [
        [1001, 1002, 1003, 1004, 1005],
        [0.23, 0.87, 0.45, 0.12, 0.99],
        [1.5, 2.3, 0.8, 3.1, 1.9],
        [1, 0, 1, 0, 1],
        [1708000000000, 1708000001000, 1708000002000, 1708000003000, 1708000004000],
    ],
    schema=schema,
)

print(batch.num_rows)   # 5
print(batch.nbytes)     # total bytes in memory

That record_batch is the fundamental unit of streaming in Arrow. You never load the full dataset into a single table. You process one batch at a time, write it out, and move on.

Streaming Record Batches from a Data Source

Real pipelines don’t have all data available at once. You receive chunks from a database cursor, a Kafka consumer, a REST API, or a file stream. Arrow’s RecordBatchReader pattern handles this cleanly.

Here’s a generator that simulates a streaming data source producing batches:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import pyarrow as pa
import numpy as np

schema = pa.schema([
    ("user_id", pa.int64()),
    ("feature_a", pa.float64()),
    ("feature_b", pa.float64()),
    ("label", pa.int32()),
    ("timestamp", pa.timestamp("ms")),
])


def generate_batches(num_batches=10, batch_size=1000):
    """Simulate a streaming data source producing Arrow record batches."""
    rng = np.random.default_rng(42)
    base_ts = 1708000000000

    for i in range(num_batches):
        user_ids = rng.integers(1000, 9999, size=batch_size)
        feature_a = rng.standard_normal(batch_size)
        feature_b = rng.uniform(0.0, 10.0, size=batch_size)
        labels = rng.integers(0, 2, size=batch_size).astype(np.int32)
        timestamps = np.arange(
            base_ts + i * batch_size, base_ts + (i + 1) * batch_size, dtype=np.int64
        )

        batch = pa.record_batch(
            [
                pa.array(user_ids, type=pa.int64()),
                pa.array(feature_a, type=pa.float64()),
                pa.array(feature_b, type=pa.float64()),
                pa.array(labels, type=pa.int32()),
                pa.array(timestamps, type=pa.timestamp("ms")),
            ],
            schema=schema,
        )
        yield batch


# Process batches one at a time — memory stays flat
total_rows = 0
for batch in generate_batches(num_batches=10, batch_size=1000):
    total_rows += batch.num_rows

print(f"Processed {total_rows} rows in streaming fashion")  # 10000

Each batch lives in memory only while you’re working on it. This is the right way to handle datasets that are larger than RAM. You control memory usage by tuning batch_size, not by hoping your machine has enough swap.

Writing Partitioned Parquet for ML Training

Once you’ve ingested data as Arrow batches, you want to persist it in a format your training pipeline can read efficiently. Parquet is the obvious choice – it’s columnar (matches Arrow’s layout), compressed, and supports predicate pushdown for selective reads.

Arrow’s pq.write_to_dataset handles partitioning automatically:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import pyarrow as pa
import pyarrow.parquet as pq
import numpy as np
import os

schema = pa.schema([
    ("user_id", pa.int64()),
    ("feature_a", pa.float64()),
    ("feature_b", pa.float64()),
    ("label", pa.int32()),
    ("split", pa.string()),
])


def create_training_table(num_rows=5000):
    rng = np.random.default_rng(42)
    splits = rng.choice(["train", "val", "test"], size=num_rows, p=[0.7, 0.15, 0.15])

    return pa.table(
        {
            "user_id": pa.array(rng.integers(1000, 9999, size=num_rows), type=pa.int64()),
            "feature_a": pa.array(rng.standard_normal(num_rows), type=pa.float64()),
            "feature_b": pa.array(rng.uniform(0, 10, size=num_rows), type=pa.float64()),
            "label": pa.array(rng.integers(0, 2, size=num_rows).astype(np.int32), type=pa.int32()),
            "split": pa.array(splits, type=pa.string()),
        },
        schema=schema,
    )


table = create_training_table(num_rows=5000)

# Write partitioned by split — creates train/, val/, test/ subdirectories
output_path = "/tmp/ml_dataset"
pq.write_to_dataset(
    table,
    root_path=output_path,
    partition_cols=["split"],
)

# Read back only the training split — Parquet skips val/test files entirely
train_table = pq.read_table(output_path, filters=[("split", "=", "train")])
print(f"Training rows: {train_table.num_rows}")
print(f"Columns: {train_table.column_names}")

Partitioning on the split column means your training script reads only train/ without touching validation or test data. For larger datasets, add a second partition key like a date column so each file stays under a few hundred MB.

Streaming Writes with ParquetWriter

If you’re receiving data as a stream and don’t want to accumulate a full table in memory before writing, use ParquetWriter directly:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import pyarrow as pa
import pyarrow.parquet as pq
import numpy as np

schema = pa.schema([
    ("user_id", pa.int64()),
    ("feature_a", pa.float64()),
    ("feature_b", pa.float64()),
    ("label", pa.int32()),
])

output_file = "/tmp/streamed_output.parquet"
writer = pq.ParquetWriter(output_file, schema, compression="zstd")

rng = np.random.default_rng(42)

for i in range(20):
    batch_size = 500
    batch = pa.record_batch(
        [
            pa.array(rng.integers(1000, 9999, size=batch_size), type=pa.int64()),
            pa.array(rng.standard_normal(batch_size), type=pa.float64()),
            pa.array(rng.uniform(0, 10, size=batch_size), type=pa.float64()),
            pa.array(rng.integers(0, 2, size=batch_size).astype(np.int32), type=pa.int32()),
        ],
        schema=schema,
    )
    writer.write_batch(batch)

writer.close()

# Verify the result
result = pq.read_table(output_file)
print(f"Total rows written: {result.num_rows}")  # 10000
print(f"File size: {pq.read_metadata(output_file).serialized_size} bytes")

Use zstd compression – it gives better ratios than snappy at comparable decompression speed, which is what matters when your training loop is reading data.

Arrow IPC for Zero-Copy Data Sharing

Arrow IPC lets you share data between processes without serialization. One process writes Arrow record batches to a file or socket, another reads them back with zero deserialization cost. The bytes on disk are the same layout as the bytes in memory.

This is the fastest way to move data between a preprocessing pipeline and a training process:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import pyarrow as pa
import pyarrow.ipc as ipc
import numpy as np

schema = pa.schema([
    ("feature_a", pa.float64()),
    ("feature_b", pa.float64()),
    ("label", pa.int32()),
])

rng = np.random.default_rng(42)

# Writer: serialize batches to an IPC file
ipc_path = "/tmp/pipeline_output.arrow"
with pa.OSFile(ipc_path, "wb") as f:
    writer = ipc.new_file(f, schema)
    for _ in range(5):
        batch = pa.record_batch(
            [
                pa.array(rng.standard_normal(1000), type=pa.float64()),
                pa.array(rng.uniform(0, 10, size=1000), type=pa.float64()),
                pa.array(rng.integers(0, 2, size=1000).astype(np.int32), type=pa.int32()),
            ],
            schema=schema,
        )
        writer.write_batch(batch)
    writer.close()

# Reader: memory-map the file for zero-copy access
source = pa.memory_map(ipc_path, "r")
reader = ipc.open_file(source)

print(f"Number of record batches: {reader.num_record_batches}")  # 5

batch_0 = reader.get_batch(0)
print(f"First batch rows: {batch_0.num_rows}")  # 1000

The pa.memory_map call is the key part. It maps the file into virtual memory so Arrow can read batch data directly from the OS page cache. No copies, no parsing. For inter-process communication on the same machine, this is significantly faster than gRPC or REST with JSON serialization.

IPC Streaming Format

If you need to pipe data between processes (through a socket or Unix pipe), use the streaming IPC format instead of the file format:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import pyarrow as pa
import pyarrow.ipc as ipc
import numpy as np

schema = pa.schema([
    ("feature_a", pa.float64()),
    ("label", pa.int32()),
])

rng = np.random.default_rng(42)

# Write to a stream (could be a socket, pipe, or buffer)
sink = pa.BufferOutputStream()
writer = ipc.new_stream(sink, schema)

for _ in range(3):
    batch = pa.record_batch(
        [
            pa.array(rng.standard_normal(500), type=pa.float64()),
            pa.array(rng.integers(0, 2, size=500).astype(np.int32), type=pa.int32()),
        ],
        schema=schema,
    )
    writer.write_batch(batch)

writer.close()

# Read from the buffer (simulating the receiving end)
buf = sink.getvalue()
reader = ipc.open_stream(buf)

all_batches = [b for b in reader]
print(f"Received {len(all_batches)} batches")  # 3
print(f"Total rows: {sum(b.num_rows for b in all_batches)}")  # 1500

The streaming format differs from the file format in one important way: the file format has a footer with batch offsets for random access, while the stream format is sequential. Use file format for on-disk storage, stream format for live data transfer.

Converting Between Arrow, Pandas, and NumPy

Your ML framework doesn’t speak Arrow natively (yet). You need to convert. The good news is that Arrow-to-Pandas and Arrow-to-NumPy conversions can be zero-copy for compatible types.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import pyarrow as pa
import numpy as np
import pandas as pd

schema = pa.schema([
    ("feature_a", pa.float64()),
    ("feature_b", pa.float64()),
    ("label", pa.int32()),
])

rng = np.random.default_rng(42)
table = pa.table(
    {
        "feature_a": pa.array(rng.standard_normal(10000), type=pa.float64()),
        "feature_b": pa.array(rng.uniform(0, 10, size=10000), type=pa.float64()),
        "label": pa.array(rng.integers(0, 2, size=10000).astype(np.int32), type=pa.int32()),
    },
    schema=schema,
)

# Arrow -> Pandas (zero-copy for numeric columns without nulls)
df = table.to_pandas(zero_copy_only=False)
print(f"Pandas shape: {df.shape}")  # (10000, 3)

# Arrow -> NumPy (zero-copy for contiguous numeric arrays)
features_a = table.column("feature_a").to_numpy(zero_copy_only=True)
features_b = table.column("feature_b").to_numpy(zero_copy_only=True)
labels = table.column("label").to_numpy(zero_copy_only=True)

# Stack into a feature matrix for sklearn/PyTorch
X = np.column_stack([features_a, features_b])
y = labels
print(f"Feature matrix shape: {X.shape}")  # (10000, 2)
print(f"Labels shape: {y.shape}")  # (10000,)

# Pandas -> Arrow (useful for the reverse direction)
roundtrip_table = pa.Table.from_pandas(df, preserve_index=False)
print(f"Round-trip columns: {roundtrip_table.column_names}")

A few gotchas to watch for: zero_copy_only=True will raise an error if the conversion requires a copy (for example, if the Arrow array contains nulls, since NumPy doesn’t have a null concept for numeric types). Use it to verify you’re actually getting zero-copy behavior. For production code that must not fail, use zero_copy_only=False and accept the occasional copy for nullable columns.

Setting preserve_index=False in from_pandas avoids writing the Pandas index as an extra column in the Arrow table. You almost never want the Pandas index in your ML data.

Common Errors and Fixes

pyarrow.lib.ArrowInvalid: Column 0: Expected length 1000 but got length 999

Every array in a record batch must have the same length. This usually happens when your data source returns variable-length chunks. Fix it by checking lengths before creating the batch:

1
2
3
4
5
6
7
8
import pyarrow as pa

# Ensure all arrays have matching length
arrays = [col_a, col_b, col_c]
lengths = [len(a) for a in arrays]
if len(set(lengths)) > 1:
    min_len = min(lengths)
    arrays = [a[:min_len] for a in arrays]

pyarrow.lib.ArrowTypeError: Expected bytes, got a 'int' object

You passed a Python int where Arrow expected a specific type. Always specify the Arrow type explicitly when creating arrays:

1
2
3
4
5
6
7
import pyarrow as pa

# Wrong: Arrow guesses the type, sometimes incorrectly
arr = pa.array([1, 2, 3])

# Right: Explicit type
arr = pa.array([1, 2, 3], type=pa.int64())

ArrowNotImplementedError: Unsupported cast from string to int64

Arrow won’t silently coerce types. If your source data has strings that represent numbers, cast them explicitly:

1
2
3
4
5
import pyarrow as pa
import pyarrow.compute as pc

string_arr = pa.array(["1", "2", "3"])
int_arr = pc.cast(string_arr, pa.int64())

OSError: [Errno 24] Too many open files when writing partitioned datasets

This happens when write_to_dataset creates too many partition files at once. Increase the open file limit or reduce the number of unique partition values:

1
ulimit -n 65536

pyarrow.lib.ArrowInvalid: Casting from timestamp[ns] to timestamp[ms] would lose data

Pandas uses nanosecond timestamps by default, but Arrow (and Parquet) often use milliseconds. Specify the resolution explicitly:

1
2
3
4
5
6
import pyarrow as pa
import pandas as pd

ts_series = pd.Series(pd.to_datetime(["2024-01-01", "2024-01-02"]))
# Cast to ms precision before converting
arr = pa.array(ts_series, type=pa.timestamp("ms"))