Batch inference is the workhorse of production ML. You have millions of rows sitting in Parquet files, a trained model, and you need predictions on all of them. Running that sequentially on a single core takes hours. Ray Data makes it take minutes by streaming the data through your model across as many workers as you have available.

Here’s the fastest path from Parquet files to prediction columns:

1
pip install "ray[data]" transformers torch pyarrow
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import ray
from transformers import pipeline

ray.init()

ds = ray.data.read_parquet("s3://your-bucket/input-data/")

class SentimentPredictor:
    def __init__(self):
        self.model = pipeline(
            "sentiment-analysis",
            model="distilbert-base-uncased-finetuned-sst-2-english",
            device=-1,
        )

    def __call__(self, batch):
        texts = batch["text"].tolist()
        results = self.model(texts, batch_size=64, truncation=True)
        batch["label"] = [r["label"] for r in results]
        batch["score"] = [r["score"] for r in results]
        return batch

ds = ds.map_batches(SentimentPredictor, concurrency=4)
ds.write_parquet("s3://your-bucket/output-predictions/")

That reads Parquet, fans the data out to 4 workers each running a sentiment classifier, and writes the results back to Parquet with two new columns: label and score. The entire pipeline streams – Ray never loads the full dataset into memory at once.

Reading Large Datasets from Parquet

Ray Data reads Parquet files lazily. It scans the file metadata first, figures out the row groups, and streams blocks through your pipeline on demand. This is the key difference from loading everything into a Pandas DataFrame – you can process terabytes without OOM-killing your machine.

For a local directory of Parquet files:

1
2
3
4
5
import ray

ds = ray.data.read_parquet("/data/my-dataset/")
print(ds.schema())
print(ds.count())

For partitioned datasets on S3, just point at the parent directory. Ray picks up every .parquet file recursively:

1
2
3
4
ds = ray.data.read_parquet(
    "s3://my-bucket/dataset/",
    ray_remote_args={"num_cpus": 0.5},
)

The ray_remote_args parameter controls how many resources each read task uses. Set num_cpus lower than 1 if you want more read parallelism without being bottlenecked by CPU reservations.

If you only need specific columns, push the selection down to the reader. This skips reading unnecessary column data from disk entirely:

1
2
3
4
ds = ray.data.read_parquet(
    "s3://my-bucket/dataset/",
    columns=["text", "id", "timestamp"],
)

Running Batch Predictions with map_batches

The map_batches method is where the real work happens. You define a class that loads the model once in __init__ and processes pandas-style batches in __call__. Ray instantiates your class across workers and feeds batches to them.

Here’s a full example with a zero-shot classifier that handles GPU assignment:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import ray
from transformers import pipeline

ray.init()

ds = ray.data.read_parquet("./reviews.parquet")

candidate_labels = ["positive", "negative", "neutral"]

class ZeroShotClassifier:
    def __init__(self):
        self.model = pipeline(
            "zero-shot-classification",
            model="facebook/bart-large-mnli",
            device=0,
        )

    def __call__(self, batch):
        texts = batch["text"].tolist()
        results = self.model(texts, candidate_labels=candidate_labels, batch_size=16)
        batch["predicted_label"] = [r["labels"][0] for r in results]
        batch["confidence"] = [r["scores"][0] for r in results]
        return batch

ds = ds.map_batches(
    ZeroShotClassifier,
    concurrency=2,
    num_gpus=1,
    batch_size=32,
)

ds.write_parquet("./reviews_classified.parquet")

A few things to notice:

  • concurrency=2 spins up 2 actor replicas. Each one loads the model independently, so you need enough GPU memory for 2 copies.
  • num_gpus=1 tells Ray each actor needs one GPU. Ray assigns GPUs automatically – you don’t pick device IDs yourself.
  • batch_size=32 controls how many rows are in each pandas batch passed to __call__. This is separate from the model’s internal batch_size parameter. Tune both for throughput.
  • device=0 in the pipeline constructor works because Ray ensures each actor sees only its assigned GPU as device 0 via CUDA_VISIBLE_DEVICES.

For CPU-only inference, drop num_gpus and set device=-1 in the pipeline constructor. You can scale to dozens of CPU workers cheaply this way.

Monitoring Progress and Handling Failures

Ray Data prints progress bars by default showing blocks processed, throughput, and memory usage. For more control, use ray.data.DataContext:

1
2
ctx = ray.data.DataContext.get_current()
ctx.execution_options.verbose_progress = True

For long-running jobs, you want fault tolerance. If one worker crashes mid-batch, you don’t want to restart the entire pipeline. Enable retries on the map_batches call:

1
2
3
4
5
ds = ds.map_batches(
    SentimentPredictor,
    concurrency=4,
    max_task_retries=3,
)

Ray will retry failed batches up to 3 times on a different worker. This handles transient GPU errors, OOM on a single batch, or network blips when reading from S3.

For checkpointing, write intermediate results in partitions. If you’re processing 100 million rows, break the job into chunks:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import pyarrow.parquet as pq

input_files = [
    "s3://bucket/data/part-0001.parquet",
    "s3://bucket/data/part-0002.parquet",
    "s3://bucket/data/part-0003.parquet",
]

for i, path in enumerate(input_files):
    ds = ray.data.read_parquet(path)
    ds = ds.map_batches(SentimentPredictor, concurrency=4)
    ds.write_parquet(f"s3://bucket/output/part-{i:04d}/")
    print(f"Completed partition {i}")

This way, if the job fails on partition 47, you resume from partition 47 instead of starting over.

Common Errors and Fixes

OutOfMemoryError during inference

Your batch size is too large for the GPU. Reduce batch_size in map_batches and the model’s internal batch size. Start with batch_size=8 and increase until you hit 80-90% GPU utilization.

ray.exceptions.ActorDiedError: The actor died because of an error raised in its creation task

The model failed to load. Common causes: the model doesn’t fit on the GPU, the HuggingFace model name is wrong, or a dependency is missing. Test your predictor class locally first:

1
2
3
4
5
predictor = SentimentPredictor()
import pandas as pd
test_batch = pd.DataFrame({"text": ["This is great", "This is terrible"]})
result = predictor(test_batch)
print(result)

ValueError: Batch type <class 'dict'> is not supported

By default Ray Data passes pandas DataFrames to __call__. If you’re getting dicts instead, set the batch format explicitly:

1
ds.map_batches(MyPredictor, batch_format="pandas", concurrency=4)

Slow throughput despite multiple GPUs

Check if your pipeline is bottlenecked on reading. Add more read parallelism:

1
ds = ray.data.read_parquet("s3://bucket/data/", override_num_blocks=200)

Also check that concurrency matches your GPU count. If you have 4 GPUs but concurrency=1, three GPUs sit idle.

FileNotFoundError when writing back to S3

Make sure you have s3fs installed and AWS credentials are configured:

1
2
3
pip install s3fs
export AWS_ACCESS_KEY_ID=your-key
export AWS_SECRET_ACCESS_KEY=your-secret