We have the answers to your questions! - Don't miss our next open house about the data universe!

Introduction to Kubeflow for MLOps
by Tony Ruiz, GCP Customer Engineer at Google

- Reading Time: 6 minutes

Motivation

Machine Learning Development goes beyond developing a model. Productionizing a model quickly becomes a multi-faceted, indeed even a multi-disciplinary process that involves several key stakeholders across both the business and IT. As depicted in the image below, the code to actually develop a model is a small component compared to other complexities such as configuring serving infrastructure, data verification, and monitoring.

Purpose

The goal of this article is to serve as a gentle introduction to Kubeflow pipelines and how it can address the challenges in ML Learning Lifecycle development. To address all the components in the image above would go beyond the scope of a short blog. For the sake of simplicity, we are going to reference Vertex Pipelines on Google Cloud for our example. Note that Kubeflow is an open source project and can be deployed in several different environments.

What is Kubeflow ?

Kubeflow is an open-source MLOps (Machine Learning Operations) platform built on top of Kubernetes. It simplifies and automates various stages of the machine learning lifecycle, from data preparation and model training to deployment and monitoring. Think of it as a toolkit that helps you build and manage your ML workflows efficiently and portably across different platforms.

Here are some key aspects of Kubeflow:

  • Components: It provides a collection of components for different stages of the ML lifecycle, such as pipelines for data processing, notebooks for experimentation, training jobs for model training, and KFServing for model deployment.
  • Kubernetes integration: It leverages Kubernetes for resource management, scheduling, and scaling. This makes Kubeflow workflows portable and scalable across different environments.
  • Flexibility and customization: You can tailor your ML workflows by choosing and configuring the specific components you need.
  • Community-driven: Kubeflow is an active open-source project with a vibrant community that contributes to its development and supports users.

Several different cloud providers (AWS, Google Cloud, Microsoft Azure ) offer pre-configured Kubeflow distributions that can be installed on a Kubernetes cluster. For this article, Vertex Pipelines will be used since it is a fully-managed Kubeflow service. Fully managed cloud services are cloud services that are handled by a cloud service provider using automations, typically meaning that a developer doesn’t have to set up and manage machines, patching, or backing up clusters. 

Pipelines and components

A Kubeflow Pipeline is a platform-agnostic way to define, orchestrate, and manage repeatable, end-to-end machine learning (ML) workflows based on containers.

Think of a pipeline as the workflow for your machine learning job. A pipeline is built up from components. Components are light weight abstractions of a task in a pipeline. These tasks can be isolated containers or implementations of a function.  This is useful because now we can reap several benefits in our machine learning workflows

  • Modularity: Components break down complex machine learning workflows into manageable, reusable steps. This improves organization and scalability.
  • Reusability: Well-defined components can be used across various pipelines, reducing code duplication and promoting efficiency.
  • Flexibility: You can develop Kubeflow components in various ways, giving you flexibility in your technology choices.

Hello Kubeflow - A Rudimentary Example

In this example we are going to create 3 functions 

  • Example_string will take a string and append some text to the string
  • Example_number will take a number and add 10 
  • Example_combo will take the output of both of those function return a string of the out
A couple of points here  
  • We are using the kfp.dsl.component decorator to transform our python functions into components
  • Each of these components will be executed in their own container, note that we are defining our base image as well as our packages to install.

Next we are going to define our pipeline, which will in turn define how the components are going to interact with each other. In this case, we are going to create a DAG pipeline which will take the output of the example_string() and example_number() functions and feed them into a the example_combo() function. Note that the pipeline creation is happening inside the example_pipeline() function and we are using the kfp.dsl.pipeline decorator.

After defining the pipeline function, the next step is to compile the pipeline. The output of this compilation is a YAML template (can also be JSON) that will give the instructions to the Kubeflow service on how to create the pipeline and its components. In this example, we are submitting the template to Google Cloud’s Vertex Pipelines for execution.

A Minimalistic End-to-End Pipeline

If you are new to Google Cloud, refer to this page to get started. In this example, we are going to be developing our pipeline on a Vertex AI Notebook.

Installation

				
					! pip3 install --no-cache-dir --upgrade "kfp>2" \
                                        google-cloud-aiplatform



import google.cloud.aiplatform as aiplatform
import kfp
from kfp import compiler, dsl
from kfp.dsl import Artifact, Dataset, Input, Metrics, Model, Output, component

from kfp.dsl import ClassificationMetrics
from typing import NamedTuple
import os

PROJECT_ID = '<YOUR-PROJECT-ID>' # replace with project ID
REGION = '<YOUR-REGION>'
EXPERIMENT = 'vertex-pipelines'
SERIES = 'dev'

# gcs bucket
GCS_BUCKET = PROJECT_ID
BUCKET_URI = f"gs://{PROJECT_ID}-bucket"  # @param {type:"string"}

aiplatform.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)
				
			

Note that Vertex Pipelines require a Google Cloud Storage bucket to store pipeline metadata and artifacts. If you don’t have one created yet, you can create a bucket manually through the console or use this gsutil command line :

				
					! gsutil mb -l $REGION -p $PROJECT_ID $BUCKET_URI
				
			

Defining the components

We are going to create 4 different components 

  • get_data() – this will retrieve a curated breast cancer dataset from kaggle and perform a train/test split. Note that this example is using the Output[Dataset] artifact to store the metadata (such as the path of where the dataset is stored). 
  • train_model() – this will take the dataset artifact created in get_data(), train an XGBoost classification model, and finally use the Model Artifact to store the model and it’s metadata in a google cloud storage bucket. Note that custom metadata, such as the training score and framework, is being defined inside of the model artifact.
  • eval_model() – this function will take the test dataset created from the get_data() and  the model artifact created in the train_mode() as inputs and create an evaluation step. In an MLOps process, we may want to ensure that the machine learning model is meeting a certain performance threshold before deploying the model to an  environment 
  • deploy_xgboost_model() – finally, if our model meets our conditions established in the eval_model(), the model will then be deployed to an Vertex AI Endpoint where it will service requests.

Note that this is a simplified and minimalistic example. In practice, instead of deploying the model directly in the same environment, there may be using several different environments (Dev UAT) that the model artifact has to traverse before being deployed to a production environment.

				
					@component(
    packages_to_install = [
        "pandas==1.3.4",
        "scikit-learn==1.0.1",
    ],
)
def get_data(
    dataset_train: Output[Dataset],
    dataset_test: Output[Dataset],
):

    from sklearn import datasets
    from sklearn.model_selection import train_test_split as tts
    import pandas as pd


    # dataset https://www.kaggle.com/uciml/breast-cancer-wisconsin-data
    data_raw = datasets.load_breast_cancer()
    data = pd.DataFrame(data_raw.data, columns=data_raw.feature_names)
    data["target"] = data_raw.target

    train, test = tts(data, test_size=0.3)

    train.to_csv(dataset_train.path)
    test.to_csv(dataset_test.path)


@component(
    packages_to_install = [
        
        "xgboost==1.6.2",
        "pandas==1.3.5",
        "joblib==1.1.0",
        "scikit-learn==1.0.2",
        "pickle5==0.0.12",
        "joblib==1.1.0",
    ],
)
def train_model(
    dataset: Input[Dataset],
    model_artifact: Output[Model]
):

    from xgboost import XGBClassifier
    import pandas as pd
    import pickle5 as pickle
    import os
    import subprocess
    from pathlib import Path
    import joblib
    import logging
    logging.basicConfig(level=logging.DEBUG)
    
    
    data = pd.read_csv(dataset.path)

    model = XGBClassifier(
        objective="binary:logistic"
    )
    model.fit(
        data.drop(columns=["target"]),
        data.target,
    )

    score = model.score(
        data.drop(columns=["target"]),
        data.target,
    )

    model_artifact.metadata["train_score"] = float(score)
    model_artifact.metadata["framework"] = "XGBoost"
    os.makedirs(model_artifact.path, exist_ok=True)
    joblib.dump(model, os.path.join(model_artifact.path, "model.joblib"))
    print(model_artifact.path)
 

@component(
    packages_to_install = [
        
        "xgboost==1.6.2",
        "pandas==1.3.5",
        "joblib==1.1.0",
        "scikit-learn==1.0.2",
        "google-cloud-storage==2.13.0",
    ],
)
def eval_model(
    test_set: Input[Dataset],
    xgb_model: Input[Model],
    metrics: Output[ClassificationMetrics],
    smetrics: Output[Metrics],
    bucket_name: str,
) -> NamedTuple("Outputs", [("deploy", str)]):
    from xgboost import XGBClassifier
    import pandas as pd

    data = pd.read_csv(test_set.path)
    model = XGBClassifier()
    
    from google.cloud import storage
    import joblib
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)
    blob_path = xgb_model.uri.replace("gs://" + bucket_name + "/", "")
    smetrics.log_metric("blob_path", str(blob_path))

    blob = bucket.blob(blob_path + "/model.joblib")
    
    
    
    with blob.open(mode="rb") as file:
        model = joblib.load(file)
    #model.load_model(xgb_model.path)
    

    score = model.score(
        data.drop(columns=["target"]),
        data.target,
    )

    from sklearn.metrics import roc_curve
    y_scores =  model.predict_proba(data.drop(columns=["target"]))[:, 1]
    fpr, tpr, thresholds = roc_curve(
         y_true=data.target.to_numpy(), y_score=y_scores, pos_label=True
    )
    metrics.log_roc_curve(fpr.tolist(), tpr.tolist(), thresholds.tolist())

    from sklearn.metrics import confusion_matrix
    y_pred = model.predict(data.drop(columns=["target"]))

    metrics.log_confusion_matrix(
       ["False", "True"],
       confusion_matrix(
           data.target, y_pred
       ).tolist()
    )

    xgb_model.metadata["test_score"] = float(score)
    smetrics.log_metric("score", float(score))


    deploy = "true"
    #compare threshold or to previous

    return (deploy,)


@component(
    packages_to_install=["google-cloud-aiplatform==1.25.0"],
)
def deploy_xgboost_model(
    model: Input[Model],
    project_id: str,
    vertex_endpoint: Output[Artifact],
    vertex_model: Output[Model],
):
    """Deploys an XGBoost model to Vertex AI Endpoint.

    Args:
        model: The model to deploy.
        project_id: The project ID of the Vertex AI Endpoint.

    Returns:
        vertex_endpoint: The deployed Vertex AI Endpoint.
        vertex_model: The deployed Vertex AI Model.
    """
    from google.cloud import aiplatform

    aiplatform.init(project=project_id)

    deployed_model = aiplatform.Model.upload(
        display_name="breast-cancer-xgb-classification",
        artifact_uri=model.uri,
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-6:latest",
    )
    endpoint = deployed_model.deploy(machine_type="n1-standard-4")

    vertex_endpoint.uri = endpoint.resource_name
    vertex_model.uri = deployed_model.resource_name

				
			

Defining the pipeline

Next step is to define the pipeline function. Similar to the initial example, the pipeline components are stitched together to create a Directed Acyclic graph of steps. The dsl.Condition is introduced as a control flow step for the evaluation function. In this case, if the evaluation criteria is met (returns true), the pipeline will proceed with the deployment.

				
					PIPELINE_ROOT = BUCKET_NAME + "/pipeline_root/" # path to pipeline metadata @dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT + "xgboost-pipeline-v2",
    # A name for the pipeline. Use to determine the pipeline Context.
    name="xgboost-pipeline-with-deployment-v2",
)
def pipeline():
    dataset_op = get_data()
    training_op = train_model(dataset = dataset_op.outputs["dataset_train"])
    eval_op = eval_model(
        test_set=dataset_op.outputs["dataset_test"],
        xgb_model=training_op.outputs["model_artifact"],
        bucket_name = "kubeflow-mlops-410520-bucket"
    )

    with dsl.Condition(
        eval_op.outputs["deploy"] == "true",
        name="deploy",
    ):

      deploy_op = deploy_xgboost_model(model = training_op.outputs["model_artifact"],
                         project_id = PROJECT_ID,
                        )
				
			

Compiling and executing the pipeline

Finally, the pipeline is compiled to create the YAML template. That template is submitted to the Vertex AI platform service. The model is then trained and deployed to the endpoint :

				
					compiler.Compiler().compile(pipeline_func=pipeline, package_path="pipeline-breast-cancer.yaml")

from google.cloud.aiplatform import pipeline_jobs

job = aiplatform.PipelineJob(
    display_name="breast-cancer-demo-pipeline",
    template_path="pipeline-breast-cancer.yaml",
    pipeline_root=PIPELINE_ROOT,
)

job.run()
				
			

Conclusion

This article covers a lot of ground. To summarize we covered some basic components Kubeflow pipelines and their part to play in creating a modularize, reusable, and flexible machine learning workflows. We also created a minimalistic end-to-end pipeline on Googe Cloud. Congratulations !

You are not available?

Leave us your e-mail, so that we can send you your new articles when they are published!
icon newsletter

DataNews

Get monthly insider insights from experts directly in your mailbox