# Kubeflow Pipelines SDK
import kfp
from kfp import dsl
def preprocess_op(data_path):
return dsl.ContainerOp(
name='Preprocess Data',
image='preprocess-image:latest',
arguments=['--data_path', data_path]
)
def train_op(data):
return dsl.ContainerOp(
name='Train Model',
image='train-image:latest',
arguments=['--data', data]
)
@dsl.pipeline(
name='My ML Pipeline',
description='A sample ML pipeline'
)
def my_pipeline(data_path: str):
preprocess_task = preprocess_op(data_path)
train_task = train_op(preprocess_task.output)
# Compile and run the pipeline
kfp.compiler.Compiler().compile(my_pipeline, 'pipeline.yaml')
client = kfp.Client()
client.create_run_from_pipeline_func(my_pipeline, arguments={})