The Pipeline in 30 Seconds

Here’s a complete Kubeflow pipeline that loads data, trains a model, evaluates it, and conditionally deploys — all orchestrated on Kubernetes:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics

@dsl.component(base_image="python:3.11-slim", packages_to_install=["pandas", "scikit-learn"])
def train_model(dataset: Input[Dataset], model: Output[Model], metrics: Output[Metrics]):
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import train_test_split
    import pickle

    df = pd.read_csv(dataset.path)
    X = df.drop("target", axis=1)
    y = df["target"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    clf = RandomForestClassifier(n_estimators=100, random_state=42)
    clf.fit(X_train, y_train)
    accuracy = clf.score(X_test, y_test)

    metrics.log_metric("accuracy", accuracy)
    with open(model.path, "wb") as f:
        pickle.dump(clf, f)

@dsl.pipeline(name="training-pipeline")
def ml_pipeline(min_accuracy: float = 0.85):
    load_task = load_data()
    train_task = train_model(dataset=load_task.outputs["dataset"])
    eval_task = evaluate_model(
        model=train_task.outputs["model"],
        test_data=load_task.outputs["dataset"],
    )
    with dsl.Condition(eval_task.outputs["deploy_decision"] == "yes"):
        deploy_model(model=train_task.outputs["model"])

That’s the shape of every KFP v2 pipeline. Components are Python functions decorated with @dsl.component. The pipeline wires them together with typed inputs and outputs. Kubeflow handles the Kubernetes pods, artifact storage, and DAG execution.

Setting Up Kubeflow Locally

You don’t need a cloud cluster to get started. A local Kubernetes cluster with kind or minikube works fine for development.

Using kind

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Install kind if you don't have it
go install sigs.k8s.io/[email protected]

# Create a cluster with enough resources
kind create cluster --name kubeflow-dev --config - <<EOF
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
  - role: control-plane
    extraPortMappings:
      - containerPort: 30080
        hostPort: 8080
        protocol: TCP
EOF

# Deploy Kubeflow Pipelines standalone
export PIPELINE_VERSION=2.3.0
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic?ref=$PIPELINE_VERSION"

# Wait for everything to come up
kubectl wait --for=condition=ready pod -l app=ml-pipeline -n kubeflow --timeout=300s

# Port-forward the UI
kubectl port-forward svc/ml-pipeline-ui -n kubeflow 8080:80

Open http://localhost:8080 and you’ll see the Kubeflow Pipelines dashboard. The standalone deployment is enough for pipeline development — you don’t need the full Kubeflow platform.

Install the KFP SDK

1
pip install kfp==2.10.1

Pin the version. KFP has had breaking changes between major releases, and you want your pipelines to compile consistently across your team.

Building Components

Every step in your pipeline is a component. KFP v2 gives you two ways to define them: lightweight Python components and containerized components. Start with lightweight — they’re faster to iterate on.

Lightweight Python Components

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics

@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas", "scikit-learn", "pyarrow"],
)
def load_data(source_url: str, dataset: Output[Dataset]):
    """Download and prepare the training dataset."""
    import pandas as pd

    df = pd.read_csv(source_url)
    # Basic cleaning
    df = df.dropna()
    df = df.drop_duplicates()
    df.to_csv(dataset.path, index=False)
    dataset.metadata["rows"] = len(df)
    dataset.metadata["columns"] = len(df.columns)

A few things to note. The function body runs inside a container — imports go inside the function, not at the top of the file. The packages_to_install list gets pip-installed at runtime. For production, build a custom image instead to avoid the install overhead on every run.

Typed Artifacts

KFP v2 has a proper type system for artifacts. Use it — it makes pipelines self-documenting and enables the UI to render artifacts correctly.

TypeUse Case
DatasetTabular data, CSVs, Parquet files
ModelSerialized models (pickle, ONNX, SavedModel)
MetricsScalar metrics logged with log_metric()
MarkdownRich text reports
HTMLVisualizations, plots
ClassificationMetricsConfusion matrix, ROC curve
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@dsl.component(base_image="python:3.11-slim", packages_to_install=["scikit-learn"])
def evaluate_model(
    model: Input[Model],
    test_data: Input[Dataset],
    metrics: Output[Metrics],
    deploy_decision: Output[str],
    min_accuracy: float = 0.85,
) -> str:
    import pickle
    import pandas as pd
    from sklearn.metrics import accuracy_score, f1_score

    with open(model.path, "rb") as f:
        clf = pickle.load(f)

    df = pd.read_csv(test_data.path)
    X = df.drop("target", axis=1)
    y = df["target"]

    preds = clf.predict(X)
    acc = accuracy_score(y, preds)
    f1 = f1_score(y, preds, average="weighted")

    metrics.log_metric("accuracy", acc)
    metrics.log_metric("f1_score", f1)

    return "yes" if acc >= min_accuracy else "no"

Assembling the Full Pipeline

Now wire the components together. The pipeline decorator defines the DAG — KFP figures out the execution order from the data dependencies.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from kfp import dsl, compiler

@dsl.component(base_image="python:3.11-slim", packages_to_install=["requests"])
def deploy_model(model: Input[Model], endpoint_name: str = "production"):
    """Deploy model to a serving endpoint."""
    import shutil
    import os

    serve_path = f"/mnt/models/{endpoint_name}"
    os.makedirs(serve_path, exist_ok=True)
    shutil.copy(model.path, f"{serve_path}/model.pkl")
    print(f"Model deployed to {serve_path}")

@dsl.pipeline(
    name="end-to-end-ml-pipeline",
    description="Train, evaluate, and conditionally deploy a classifier",
)
def ml_pipeline(
    data_url: str = "https://example.com/dataset.csv",
    min_accuracy: float = 0.85,
    deploy_endpoint: str = "production",
):
    # Step 1: Load and clean data
    load_task = load_data(source_url=data_url)

    # Step 2: Train model
    train_task = train_model(dataset=load_task.outputs["dataset"])

    # Step 3: Evaluate
    eval_task = evaluate_model(
        model=train_task.outputs["model"],
        test_data=load_task.outputs["dataset"],
        min_accuracy=min_accuracy,
    )

    # Step 4: Deploy only if accuracy threshold is met
    with dsl.Condition(eval_task.output == "yes"):
        deploy_model(
            model=train_task.outputs["model"],
            endpoint_name=deploy_endpoint,
        )

# Compile to YAML
compiler.Compiler().compile(ml_pipeline, "pipeline.yaml")

The dsl.Condition block is where Kubeflow shines. The deploy step only runs if evaluation passes. No shell scripting, no manual gates — it’s declarative.

Pipeline Parameters

Notice the pipeline function takes typed parameters with defaults. When you submit a run, you can override these from the UI or the SDK:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from kfp.client import Client

client = Client(host="http://localhost:8080")

# Submit with custom parameters
run = client.create_run_from_pipeline_package(
    "pipeline.yaml",
    arguments={
        "data_url": "gs://my-bucket/training-data.csv",
        "min_accuracy": 0.90,
        "deploy_endpoint": "staging",
    },
    experiment_name="model-experiments",
)

print(f"Run ID: {run.run_id}")
print(f"Dashboard: http://localhost:8080/#/runs/details/{run.run_id}")

Custom Container Components

Lightweight components are great for prototyping, but for production you want reproducible builds. Create a custom image:

1
2
3
4
5
6
7
8
FROM python:3.11-slim

RUN pip install --no-cache-dir \
    pandas==2.2.0 \
    scikit-learn==1.4.0 \
    pyarrow==15.0.0

WORKDIR /app

Then reference it in your component:

1
2
3
4
5
6
7
@dsl.container_component
def train_with_gpu():
    return dsl.ContainerSpec(
        image="gcr.io/my-project/ml-trainer:v1.2.0",
        command=["python", "train.py"],
        args=["--epochs", "50", "--lr", "0.001"],
    )

Use container_component when you need GPU access, custom system packages, or deterministic dependency versions.

Common Errors

compiler.Compiler() produces empty YAML

You forgot to use KFP artifact types for outputs. Plain Python return types work for scalars, but anything file-based needs Output[Dataset], Output[Model], etc.

PodUnschedulable: insufficient memory

Set resource requests on your components:

1
2
3
4
5
6
7
8
9
@dsl.component(base_image="python:3.11-slim")
def train_model(dataset: Input[Dataset], model: Output[Model]):
    ...

# In the pipeline, after calling the component:
train_task = train_model(dataset=load_task.outputs["dataset"])
train_task.set_memory_request("4Gi")
train_task.set_memory_limit("8Gi")
train_task.set_cpu_request("2")

ImportError inside a component at runtime

Remember: the function body runs in an isolated container. Every import must be inside the function, and every dependency must be listed in packages_to_install or baked into the container image. Top-level imports in your Python file are not available.

ConnectionRefusedError when submitting from kfp.client

Your port-forward likely died. Re-run kubectl port-forward svc/ml-pipeline-ui -n kubeflow 8080:80. For persistent access, set up an Ingress instead.

Pipeline runs stuck in Pending

Check that your cluster has the ml-pipeline and minio pods running:

1
2
3
kubectl get pods -n kubeflow
# Look for CrashLoopBackOff or ImagePullBackOff
kubectl describe pod <pod-name> -n kubeflow

Minio is the default artifact store. If it’s down, no pipeline step can write outputs, and everything stalls.

When to Use Kubeflow vs Alternatives

Pick Kubeflow Pipelines when you’re already on Kubernetes and need container-level isolation between pipeline steps. Each component runs in its own pod, which means you can mix Python, R, and custom binaries in the same pipeline. The artifact tracking and experiment comparison in the UI are also genuinely useful.

If you’re not on Kubernetes, look at Airflow for orchestration or just run things with a Makefile. Kubeflow’s value comes from Kubernetes integration — without it, you’re fighting the infrastructure instead of building models.

For teams already using managed ML platforms (SageMaker, Vertex AI), those have their own pipeline systems that are simpler to operate. Kubeflow makes the most sense when you want full control over your infrastructure or when you’re running on-prem.