Skip to content

rafikbahri/async-ml-temporal

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

16 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Async ML Workflows on Temporal

A distributed machine learning training and prediction platform built on Temporal workflows. This project demonstrates how to implement asynchronous, fault-tolerant machine learning pipelines using Temporal for workflow orchestration and MinIO for object storage.

Overview

Async ML Workflows on Temporal provides a robust infrastructure for training machine learning models and making predictions in a distributed, asynchronous manner. The application leverages:

  • Temporal.io: For reliable workflow orchestration, ensuring durability and fault-tolerance
  • MinIO: Object storage for datasets, trained models, and prediction results
  • Scikit-learn: Machine learning model implementation (HistGradientBoostingClassifier)
  • Polars: Fast data manipulation and analytics for DataFrame operations

The system is designed with three primary workflows:

  1. Upload Workflow: Uploads datasets to MinIO storage
  2. Fit Workflow: Trains machine learning models on datasets
  3. Predict Workflow: Uses trained models to make predictions on new data

Architecture

The application is organized into several services:

  • Client Applications: Applications that initiate workflows and consume results
  • Temporal Server: Central orchestration service that manages workflows and task queues
    • Upload Tasks Queue: Handles dataset upload operations
    • ML Tasks Queue: Handles model training and prediction operations
  • Workers: Service processes that execute the tasks
    • ML Worker: Processes machine learning related tasks
    • Upload Worker: Processes data upload tasks
  • MinIO Storage: Object storage service for datasets, models, and prediction results

Architecture Diagram

graph TB
    %% Client
    client[Client Applications]
    
    %% Client to Temporal connection
    client --> temporalServer
    
    %% Temporal Cluster
    subgraph "Temporal Cluster"
        temporalServer[Temporal Server]
        uploadQueue[Upload Tasks Queue]
        mlQueue[ML Tasks Queue]
        temporalServer --> uploadQueue
        temporalServer --> mlQueue
    end
    
    %% Worker connections to queues
    uploadQueue --> uploadWorker
    mlQueue --> mlWorker
    
    %% Workers at the bottom
    subgraph "Workers"
        uploadWorker[Upload Worker]
        mlWorker[ML Worker]
    end
    
    %% Storage
    subgraph "MinIO Storage"
        datasetsBucket[Datasets Bucket]
        modelsBucket[Models Bucket]
        resultsBucket[Results Bucket]
    end
    
    %% Service interactions
    uploadWorker --> datasetsBucket
    mlWorker --> datasetsBucket
    mlWorker --> modelsBucket
    mlWorker --> resultsBucket
    
    %% Client to results connection
    resultsBucket --> client
Loading

Prerequisites

  • Python 3.9+
  • Temporal server (v1.17+)
  • Docker and Docker Compose
  • MinIO object storage (automatically configured through Docker Compose)

Dependencies

Main project dependencies include:

  • temporalio: Temporal SDK for Python
  • scikit-learn: Machine learning library
  • polars: DataFrame library for data manipulation
  • minio: MinIO Python client
  • cloudpickle: Serialization library for Python objects
  • aiohttp: Asynchronous HTTP client/server

Installation

Step 1: Clone the repository:

git clone <repository-url>
cd async-ml-temporal

Step 2: Install Python dependencies:

pip install -r requirements.txt

Step 3: Start the required infrastructure services:

docker compose up -d

Configuration

Create a .env file in the project root with the following content (or adjust as needed):

# Logging configuration
LOG_LEVEL=INFO
DATASETS_DIR=datasets

# Temporal configuration
TEMPORAL_HOST=localhost:7233
NAMESPACE=default

# MinIO configuration
MINIO_HOST=localhost:9000
MINIO_ACCESS_KEY=minioadmin
MINIO_SECRET_KEY=minioadmin
MINIO_SECURE=False

# Task queue configuration
ML_TASK_QUEUE=ml-tasks
UPLOAD_TASK_QUEUE=upload-tasks

The application also supports loading configuration from a .env.local file which will override values in the main .env file.

Running the Application

Prequisites

Ensure you have:

The application uses Taskfile to manage common commands. Here are the key tasks you can run:

Step 1: Start the Temporal server:

task start-server

Step 2: Start the ML worker:

task start-ml-worker

Step 3: Start the Upload worker:

task start-upload-worker

Step 4: Run the full ML pipeline:

task run-upload-workflow  # Upload a dataset
task run-fit-workflow     # Train a model
task run-predict-workflow # Make predictions

Workflow Examples

The project includes example scripts that demonstrate how to run the workflows:

Upload Workflow

dataset_path = "datasets/test.parquet"
upload_presigned_url, upload_result = await start_upload_workflow(dataset_path)

Fit (Training) Workflow

model_id = await start_fit_workflow(dataset_id)

Predict Workflow

prediction_id, result_data = await start_predict_workflow(dataset_id, model_id)

Data Format

The system expects Parquet files with the following characteristics:

  • Training data must include a target column named "y"
  • Prediction data should have the same features as the training data

Project Structure

async-ml-temporal/
├── datasets/               # Example datasets (Parquet format)
├── examples/               # Example scripts for running workflows
├── logs/                   # Application logs
├── src/                    # Source code
│   ├── activities/         # Implementation of workflow activities
│   │   ├── ml.py          # ML training and prediction activities
│   │   └── upload.py      # Data upload activities
│   ├── common/            # Shared utilities
│   │   ├── config.py      # Configuration management
│   │   └── logger.py      # Logging setup
│   ├── starters/          # Workflow starter clients
│   │   ├── fit.py         # Fit workflow starter
│   │   ├── predict.py     # Predict workflow starter
│   │   └── upload.py      # Upload workflow starter
│   ├── workers/           # Temporal workers
│   │   ├── ml.py          # ML worker implementation
│   │   └── upload.py      # Upload worker implementation
│   └── workflows/         # Workflow definitions
│       ├── fit.py         # Fit workflow logic
│       ├── predict.py     # Prediction workflow logic
│       └── upload.py      # Upload workflow logic
├── docker-compose.yml      # Docker services configuration
├── requirements.txt        # Python dependencies
└── Taskfile.yml           # Task runner configuration

Advanced Usage

Custom Models

The default implementation uses HistGradientBoostingClassifier from scikit-learn. To implement custom models:

  • Modify src/activities/ml.py to use your preferred model
  • Ensure your model can be serialized with cloudpickle

Scaling Workers

For production environments, you can:

  • Run multiple instances of workers to handle increased load
  • Configure resource limits in docker-compose.yml

Troubleshooting

Common Issues

  1. Connection errors to MinIO:

    • Verify MinIO is running: docker compose ps
    • Check MinIO credentials in .env file
  2. Temporal server connection issues:

    • Ensure Temporal server is running: task start-server
    • Check for network connectivity issues
    • Verify TEMPORAL_HOST and NAMESPACE settings in .env file
  3. Worker connection issues:

    • Ensure the ML and Upload workers are running on the correct task queues
    • Check ML_TASK_QUEUE and UPLOAD_TASK_QUEUE settings in .env file
  4. Dataset format errors:

    • Verify your Parquet files have the correct schema
    • Training data must include a 'y' column with target values

Logging

Logs are stored in the logs/ directory with a date-based naming convention. Check these logs for detailed error information.

About

Async backend based on Temporal to run ML workflows

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages