Skip to main content

MLOps Pipeline on EKS

Overview

MLOps is a set of practices that automate and standardize the development, deployment, and operation of machine learning models. This document covers how to build an end-to-end ML lifecycle from data preparation to model serving in an Amazon EKS environment, utilizing Kubeflow Pipelines, MLflow, vLLM model serving, and ArgoCD GitOps deployment.

Key Objectives

  • Full Automation: Build automated pipelines from data collection to model deployment
  • Experiment Tracking: Systematic experiment management and model version control via MLflow
  • Scalable Serving: High-performance model serving with vLLM + ArgoCD GitOps deployment
  • GPU Optimization: Dynamic GPU resource management using Karpenter

MLOps Architecture Overview

End-to-End ML Lifecycle

Core Components

🔧 Pipeline Core Components
5 Core Technologies Powering MLOps Platform
Kubeflow Pipelines
Role
ML Workflow Orchestration
Tech Stack
Argo Workflows, Tekton
MLflow
Role
Experiment Tracking, Model Registry
Tech Stack
MLflow Tracking Server, S3 Backend
vLLM + ArgoCD
Role
Model Serving & GitOps Deployment
Tech Stack
vLLM, ArgoCD, GitOps
Karpenter
Role
Dynamic GPU Node Provisioning
Tech Stack
AWS EC2, Spot Instances
Argo Workflows
Role
CI/CD for ML
Tech Stack
GitOps, Automated Deployment

Kubeflow Pipelines Architecture

Kubeflow Installation (AWS Distribution)

AWS provides the Kubeflow on AWS distribution, offering an EKS-integrated configuration.

Refer to the Kubeflow on AWS official documentation for deployment guides.

Kubeflow Architecture

Writing Kubeflow Pipelines Components

Kubeflow Pipelines defines reusable components through the Python SDK.

# pipeline_components.py
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics

@dsl.component(
base_image="python:3.10",
packages_to_install=["pandas", "scikit-learn", "boto3"]
)
def data_preparation(
s3_input_path: str,
output_dataset: Output[Dataset],
train_split: float = 0.8
):
"""Data preparation and preprocessing component"""
import pandas as pd
import boto3
from sklearn.model_selection import train_test_split

# Load data from S3
s3 = boto3.client('s3')
bucket, key = s3_input_path.replace("s3://", "").split("/", 1)
obj = s3.get_object(Bucket=bucket, Key=key)
df = pd.read_csv(obj['Body'])

# Data preprocessing
df = df.dropna().drop_duplicates()

# Train/Test split
train_df, test_df = train_test_split(df, train_size=train_split, random_state=42)

# Save output
output_path = output_dataset.path
train_df.to_csv(f"{output_path}/train.csv", index=False)
test_df.to_csv(f"{output_path}/test.csv", index=False)
print(f"Train: {len(train_df)}, Test: {len(test_df)}")


@dsl.component(
base_image="pytorch/pytorch:2.1.0-cuda12.1-cudnn8-runtime",
packages_to_install=["mlflow", "scikit-learn", "boto3"]
)
def model_training(
input_features: Input[Dataset],
output_model: Output[Model],
mlflow_tracking_uri: str,
experiment_name: str,
learning_rate: float = 0.001,
epochs: int = 10
):
"""Model training component (PyTorch)"""
import pandas as pd
import torch
import torch.nn as nn
import mlflow
import mlflow.pytorch

mlflow.set_tracking_uri(mlflow_tracking_uri)
mlflow.set_experiment(experiment_name)

# Load data
X_train = pd.read_csv(f"{input_features.path}/X_train.csv").values
y_train = pd.read_csv(f"{input_features.path}/y_train.csv").values.ravel()

# Model definition (simple example)
class SimpleNN(nn.Module):
def __init__(self, input_dim):
super().__init__()
self.fc = nn.Sequential(
nn.Linear(input_dim, 64),
nn.ReLU(),
nn.Linear(64, 1)
)
def forward(self, x):
return self.fc(x)

model = SimpleNN(X_train.shape[1])
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Start MLflow experiment
with mlflow.start_run():
mlflow.log_params({"learning_rate": learning_rate, "epochs": epochs})

# Training loop
for epoch in range(epochs):
optimizer.zero_grad()
outputs = model(torch.FloatTensor(X_train))
loss = criterion(outputs.squeeze(), torch.FloatTensor(y_train))
loss.backward()
optimizer.step()
mlflow.log_metric("train_loss", loss.item(), step=epoch)

# Save model
torch.save(model.state_dict(), f"{output_model.path}/model.pth")
mlflow.pytorch.log_model(model, "model")

Pipeline Definition

# ml_pipeline.py
from kfp import dsl

@dsl.pipeline(
name="End-to-End ML Pipeline",
description="Complete ML pipeline from data prep to model evaluation"
)
def ml_pipeline(
s3_input_path: str = "s3://my-bucket/data/input.csv",
mlflow_tracking_uri: str = "http://mlflow-server.mlflow.svc.cluster.local:5000",
experiment_name: str = "eks-ml-experiment"
):
# 1. Data preparation
data_prep_task = data_preparation(s3_input_path=s3_input_path)

# 2. Model training (run on GPU node)
train_task = model_training(
input_features=data_prep_task.outputs["output_dataset"],
mlflow_tracking_uri=mlflow_tracking_uri,
experiment_name=experiment_name
)
train_task.set_gpu_limit(1)
train_task.add_node_selector_constraint("node.kubernetes.io/instance-type", "g5.xlarge")

return train_task.outputs["output_model"]

MLflow Integration

MLflow Architecture

MLflow is an open-source platform for ML experiment tracking, model registry, and model deployment.

MLflow Deployment YAML

apiVersion: apps/v1
kind: Deployment
metadata:
name: mlflow-server
namespace: mlflow
spec:
replicas: 2
selector:
matchLabels:
app: mlflow-server
template:
metadata:
labels:
app: mlflow-server
spec:
serviceAccountName: mlflow-sa
containers:
- name: mlflow
image: ghcr.io/mlflow/mlflow:v2.10.2
ports:
- name: http
containerPort: 5000
env:
- name: MLFLOW_DB_USER
valueFrom:
secretKeyRef:
name: mlflow-db-credentials
key: username
- name: MLFLOW_DB_PASSWORD
valueFrom:
secretKeyRef:
name: mlflow-db-credentials
key: password
command:
- mlflow
- server
- --host
- "0.0.0.0"
- --port
- "5000"
- --backend-store-uri
- "postgresql://$(MLFLOW_DB_USER):$(MLFLOW_DB_PASSWORD)@postgres-service.mlflow.svc.cluster.local:5432/mlflow"
- --default-artifact-root
- "s3://my-mlflow-artifacts/"
resources:
requests:
memory: "2Gi"
cpu: "1"
limits:
memory: "4Gi"
cpu: "2"
---
apiVersion: v1
kind: Service
metadata:
name: mlflow-server
namespace: mlflow
spec:
type: ClusterIP
ports:
- port: 5000
targetPort: 5000
selector:
app: mlflow-server

GitOps Deployment Pattern Comparison

ArgoCD vs Flux vs Manual Deployment

⚖️ GitOps Deployment Comparison
ArgoCD vs Flux vs Manual kubectl Deployment Strategy Comparison
🚀
Deployment Method
ArgoCD
Git repo-based declarative deployment
Flux
Git repo-based declarative deployment
kubectl
Imperative/manual apply
🔄
Sync Method
ArgoCD
Pull-based + Auto Sync
Flux
Pull-based + Reconciliation
kubectl
Push-based (from CI)
🖥️
UI/Dashboard
ArgoCD
Rich Web UI, app topology visualization
Flux
Weave GitOps (separate install)
kubectl
None (CLI only)
Rollback
ArgoCD
One-click rollback via Git history
Flux
Auto rollback via Git revert
kubectl
kubectl rollout undo (manual)
🌐
Multi-cluster
ArgoCD
Native via ApplicationSet
Flux
Supported via Kustomization
kubectl
kubeconfig switching (manual)
Helm Support
ArgoCD
Native Helm chart rendering
Flux
HelmRelease CRD
kubectl
helm install/upgrade (CLI)
🔒
Security/RBAC
ArgoCD
Fine-grained project-based RBAC
Flux
Delegates to Kubernetes RBAC
kubectl
Depends on kubeconfig permissions
👥
Community/Maturity
ArgoCD
CNCF Graduated
Flux
CNCF Graduated
kubectl
Kubernetes built-in

ArgoCD GitOps Deployment Architecture

ArgoCD Installation and Configuration

Refer to the ArgoCD official documentation for deployment guides.

ArgoCD Application Example (vLLM Model Serving Deployment)

apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: vllm-model-serving
namespace: argocd
spec:
project: ml-platform
source:
repoURL: https://github.com/myorg/ml-manifests.git
targetRevision: main
path: deployments/vllm-serving
destination:
server: https://kubernetes.default.svc
namespace: model-serving
syncPolicy:
automated:
prune: true
selfHeal: true
syncOptions:
- CreateNamespace=true
---
apiVersion: argoproj.io/v1alpha1
kind: ApplicationSet
metadata:
name: vllm-multi-model
namespace: argocd
spec:
generators:
- list:
elements:
- model: llama-3-70b
gpu: "4"
- model: mistral-7b
gpu: "1"
template:
metadata:
name: 'vllm-{{model}}'
spec:
project: ml-platform
source:
repoURL: https://github.com/myorg/ml-manifests.git
targetRevision: main
path: 'deployments/vllm/{{model}}'
destination:
server: https://kubernetes.default.svc
namespace: model-serving
syncPolicy:
automated:
prune: true
selfHeal: true

vLLM Model Serving Deployment Example

apiVersion: apps/v1
kind: Deployment
metadata:
name: vllm-llama3-70b
namespace: model-serving
spec:
replicas: 2
selector:
matchLabels:
app: vllm-serving
model: llama-3-70b
template:
metadata:
labels:
app: vllm-serving
model: llama-3-70b
spec:
containers:
- name: vllm
image: vllm/vllm-openai:v0.18.2
ports:
- name: http
containerPort: 8000
args:
- --model
- meta-llama/Llama-3-70B-Instruct
- --tensor-parallel-size
- "4"
- --max-model-len
- "8192"
- --gpu-memory-utilization
- "0.90"
- --enable-prefix-caching
resources:
requests:
nvidia.com/gpu: 4
memory: "128Gi"
limits:
nvidia.com/gpu: 4
memory: "160Gi"
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: vllm-llama3-70b
namespace: model-serving
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: vllm-llama3-70b
minReplicas: 2
maxReplicas: 8
metrics:
- type: Pods
pods:
metric:
name: vllm_requests_running
target:
type: AverageValue
averageValue: "10"

Argo Workflows CI/CD Integration

Argo Workflows Architecture

Argo Workflow Example

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: ml-cicd-pipeline-
namespace: argo
spec:
entrypoint: ml-pipeline
serviceAccountName: argo-workflow-sa

arguments:
parameters:
- name: git-repo
value: "https://github.com/myorg/ml-model.git"
- name: model-name
value: "fraud-detection-v2"

templates:
- name: ml-pipeline
steps:
- - name: clone-repo
template: git-clone
- - name: build-image
template: docker-build
- - name: train-model
template: kubeflow-training
- - name: validate-model
template: model-validation
- - name: deploy-model
template: argocd-deployment
when: "{{steps.validate-model.outputs.result}} == passed"

- name: git-clone
container:
image: alpine/git:latest
command: [sh, -c]
args:
- git clone {{workflow.parameters.git-repo}} /workspace

- name: docker-build
container:
image: gcr.io/kaniko-project/executor:latest
args:
- --dockerfile=/workspace/Dockerfile
- --context=/workspace
- --destination=my-registry/{{workflow.parameters.model-name}}:{{workflow.uid}}

- name: kubeflow-training
resource:
action: create
manifest: |
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
name: {{workflow.parameters.model-name}}-{{workflow.uid}}
spec:
pytorchReplicaSpecs:
Master:
replicas: 1
template:
spec:
containers:
- name: pytorch
image: my-registry/{{workflow.parameters.model-name}}:{{workflow.uid}}
command: [python, train.py]
resources:
limits:
nvidia.com/gpu: 1

- name: model-validation
script:
image: python:3.10
command: [python]
source: |
# Validation logic (simplified)
accuracy = 0.95
print("passed" if accuracy >= 0.90 else "failed")

- name: argocd-deployment
script:
image: argoproj/argocd:v2.13
command: [sh]
source: |
argocd app sync vllm-{{workflow.parameters.model-name}} --force
argocd app wait vllm-{{workflow.parameters.model-name}} --health

GPU Resource Scheduling (Karpenter)

Karpenter provides dynamic node provisioning for GPU workloads. When it detects Pending Pods, it automatically selects and provisions appropriate GPU instances.

Detailed Karpenter Configuration

For NodePool configuration, Spot strategies, Consolidation policies, and GPU instance comparisons, refer to GPU Resource Management.


End-to-End Pipeline Example

Complete Workflow

# complete_ml_workflow.py
from kfp import dsl, compiler

@dsl.pipeline(
name="Production ML Pipeline",
description="Complete production-ready ML pipeline"
)
def production_ml_pipeline(
data_source: str = "s3://prod-data/transactions.parquet",
model_name: str = "fraud-detection",
experiment_name: str = "fraud-detection-prod"
):
# 1. Data preparation
data_prep = data_preparation(s3_input_path=data_source)

# 2. Model training (GPU)
training = model_training(
input_features=data_prep.outputs["output_dataset"],
mlflow_tracking_uri="http://mlflow-server.mlflow.svc.cluster.local:5000",
experiment_name=experiment_name
)
training.set_gpu_limit(1)
training.add_node_selector_constraint("karpenter.sh/capacity-type", "spot")

# 3. ArgoCD GitOps deployment
deployment = deploy_via_argocd(
model_name=model_name,
model_uri=training.outputs["output_model"].uri
)

Summary

The EKS-based MLOps pipeline integrates Kubeflow, MLflow, vLLM, and ArgoCD to provide a fully automated ML lifecycle.

Key Points

  1. Kubeflow Pipelines: Reusable component-based ML workflows
  2. MLflow: Strengthened governance through experiment tracking and model registry
  3. vLLM: High-performance LLM serving (PagedAttention, Prefix Caching)
  4. ArgoCD GitOps: Declarative deployment, auto-sync, one-click rollback
  5. Karpenter: Cost optimization through dynamic GPU resource provisioning
  6. Argo Workflows: Shortened deployment cycles through CI/CD automation

Next Steps


References