Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 164 additions & 96 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,76 +13,80 @@ Show support for the Conductor OSS. Please help spread the awareness by starrin

[![GitHub stars](https://img.shields.io/github/stars/conductor-oss/conductor.svg?style=social&label=Star&maxAge=)](https://GitHub.com/conductor-oss/conductor/)

## Content

<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->

- [Install Conductor Python SDK](#install-conductor-python-sdk)
- [Get Conductor Python SDK](#get-conductor-python-sdk)
- [Hello World Application Using Conductor](#hello-world-application-using-conductor)
- [Step 1: Create Workflow](#step-1-create-workflow)
- [Creating Workflows by Code](#creating-workflows-by-code)
- [(Alternatively) Creating Workflows in JSON](#alternatively-creating-workflows-in-json)
- [Step 2: Write Task Worker](#step-2-write-task-worker)
- [Step 3: Write _Hello World_ Application](#step-3-write-_hello-world_-application)
- [Running Workflows on Conductor Standalone (Installed Locally)](#running-workflows-on-conductor-standalone-installed-locally)
- [Setup Environment Variable](#setup-environment-variable)
- [Start Conductor Server](#start-conductor-server)
- [Execute Hello World Application](#execute-hello-world-application)
- [Running Workflows on Orkes Conductor](#running-workflows-on-orkes-conductor)
- [Learn More about Conductor Python SDK](#learn-more-about-conductor-python-sdk)
- [Create and Run Conductor Workers](#create-and-run-conductor-workers)
- [Writing Workers](#writing-workers)
- [Implementing Workers](#implementing-workers)
- [Managing Workers in Application](#managing-workers-in-application)
- [Design Principles for Workers](#design-principles-for-workers)
- [System Task Workers](#system-task-workers)
- [Wait Task](#wait-task)
- [Using Code to Create Wait Task](#using-code-to-create-wait-task)
- [JSON Configuration](#json-configuration)
- [HTTP Task](#http-task)
- [Using Code to Create HTTP Task](#using-code-to-create-http-task)
- [JSON Configuration](#json-configuration-1)
- [Javascript Executor Task](#javascript-executor-task)
- [Using Code to Create Inline Task](#using-code-to-create-inline-task)
- [JSON Configuration](#json-configuration-2)
- [JSON Processing using JQ](#json-processing-using-jq)
- [Using Code to Create JSON JQ Transform Task](#using-code-to-create-json-jq-transform-task)
- [JSON Configuration](#json-configuration-3)
- [Worker vs. Microservice/HTTP Endpoints](#worker-vs-microservicehttp-endpoints)
- [Deploying Workers in Production](#deploying-workers-in-production)
- [Create Conductor Workflows](#create-conductor-workflows)
- [Conductor Workflows](#conductor-workflows)
- [Creating Workflows](#creating-workflows)
- [Execute Dynamic Workflows Using Code](#execute-dynamic-workflows-using-code)
- [Kitchen-Sink Workflow](#kitchen-sink-workflow)
- [Executing Workflows](#executing-workflows)
- [Execute Workflow Asynchronously](#execute-workflow-asynchronously)
- [Execute Workflow Synchronously](#execute-workflow-synchronously)
- [Managing Workflow Executions](#managing-workflow-executions)
- [Get Execution Status](#get-execution-status)
- [Update Workflow State Variables](#update-workflow-state-variables)
- [Terminate Running Workflows](#terminate-running-workflows)
- [Retry Failed Workflows](#retry-failed-workflows)
- [Restart Workflows](#restart-workflows)
- [Rerun Workflow from a Specific Task](#rerun-workflow-from-a-specific-task)
- [Pause Running Workflow](#pause-running-workflow)
- [Resume Paused Workflow](#resume-paused-workflow)
- [Searching for Workflows](#searching-for-workflows)
- [Handling Failures, Retries and Rate Limits](#handling-failures-retries-and-rate-limits)
- [Retries](#retries)
- [Rate Limits](#rate-limits)
- [Task Registration](#task-registration)
- [Update Task Definition:](#update-task-definition)
- [Using Conductor in Your Application](#using-conductor-in-your-application)
- [Adding Conductor SDK to Your Application](#adding-conductor-sdk-to-your-application)
- [Testing Workflows](#testing-workflows)
- [Example Unit Testing Application](#example-unit-testing-application)
- [Workflow Deployments Using CI/CD](#workflow-deployments-using-cicd)
- [Versioning Workflows](#versioning-workflows)

<!-- END doctoc generated TOC please keep comment here to allow auto update -->

<!-- TOC -->
* [Conductor OSS Python SDK](#conductor-oss-python-sdk)
* [⭐ Conductor OSS](#-conductor-oss)
* [Install Conductor Python SDK](#install-conductor-python-sdk)
* [🚀 Quick Start](#-quick-start)
* [🤖 Agentic Workflows](#-agentic-workflows)
* [Python Workers as Agent Tools](#python-workers-as-agent-tools)
* [How It Works](#how-it-works)
* [Example: Interactive AI Agent](#example-interactive-ai-agent)
* [Run the Complete Example](#run-the-complete-example)
* [Getting Started](#getting-started)
* [Hello World Application Using Conductor](#hello-world-application-using-conductor)
* [Step 1: Create Workflow](#step-1-create-workflow)
* [Creating Workflows by Code](#creating-workflows-by-code)
* [(Alternatively) Creating Workflows in JSON](#alternatively-creating-workflows-in-json)
* [Step 2: Write Task Worker](#step-2-write-task-worker)
* [Step 3: Write _Hello World_ Application](#step-3-write-_hello-world_-application)
* [Running Workflows on Conductor Standalone (Installed Locally)](#running-workflows-on-conductor-standalone-installed-locally)
* [Setup Environment Variable](#setup-environment-variable)
* [Start Conductor Server](#start-conductor-server)
* [Execute Hello World Application](#execute-hello-world-application)
* [Running Workflows on Orkes Conductor](#running-workflows-on-orkes-conductor)
* [Learn More about Conductor Python SDK](#learn-more-about-conductor-python-sdk)
* [Create and Run Conductor Workers](#create-and-run-conductor-workers)
* [Writing Workers](#writing-workers)
* [Implementing Workers](#implementing-workers)
* [Managing Workers in Application](#managing-workers-in-application)
* [Design Principles for Workers](#design-principles-for-workers)
* [System Task Workers](#system-task-workers)
* [Wait Task](#wait-task)
* [Using Code to Create Wait Task](#using-code-to-create-wait-task)
* [JSON Configuration](#json-configuration)
* [HTTP Task](#http-task)
* [Using Code to Create HTTP Task](#using-code-to-create-http-task)
* [JSON Configuration](#json-configuration-1)
* [Javascript Executor Task](#javascript-executor-task)
* [Using Code to Create Inline Task](#using-code-to-create-inline-task)
* [JSON Configuration](#json-configuration-2)
* [JSON Processing using JQ](#json-processing-using-jq)
* [Using Code to Create JSON JQ Transform Task](#using-code-to-create-json-jq-transform-task)
* [JSON Configuration](#json-configuration-3)
* [Worker vs. Microservice/HTTP Endpoints](#worker-vs-microservicehttp-endpoints)
* [Deploying Workers in Production](#deploying-workers-in-production)
* [Create Conductor Workflows](#create-conductor-workflows)
* [Conductor Workflows](#conductor-workflows)
* [Creating Workflows](#creating-workflows)
* [Execute Dynamic Workflows Using Code](#execute-dynamic-workflows-using-code)
* [Kitchen-Sink Workflow](#kitchen-sink-workflow-)
* [Executing Workflows](#executing-workflows)
* [Execute Workflow Asynchronously](#execute-workflow-asynchronously)
* [Execute Workflow Synchronously](#execute-workflow-synchronously)
* [Managing Workflow Executions](#managing-workflow-executions)
* [Get Execution Status](#get-execution-status)
* [Update Workflow State Variables](#update-workflow-state-variables)
* [Terminate Running Workflows](#terminate-running-workflows)
* [Retry Failed Workflows](#retry-failed-workflows)
* [Restart Workflows](#restart-workflows)
* [Rerun Workflow from a Specific Task](#rerun-workflow-from-a-specific-task)
* [Pause Running Workflow](#pause-running-workflow)
* [Resume Paused Workflow](#resume-paused-workflow)
* [Searching for Workflows](#searching-for-workflows)
* [Handling Failures, Retries and Rate Limits](#handling-failures-retries-and-rate-limits)
* [Retries](#retries)
* [Rate Limits](#rate-limits)
* [Task Registration](#task-registration)
* [Update Task Definition:](#update-task-definition)
* [Using Conductor in Your Application](#using-conductor-in-your-application)
* [Adding Conductor SDK to Your Application](#adding-conductor-sdk-to-your-application)
* [Testing Workflows](#testing-workflows)
* [Example Unit Testing Application](#example-unit-testing-application)
* [Workflow Deployments Using CI/CD](#workflow-deployments-using-cicd)
* [Versioning Workflows](#versioning-workflows)
<!-- TOC -->

## Install Conductor Python SDK

Expand All @@ -92,9 +96,6 @@ Before installing Conductor Python SDK, it is a good practice to set up a dedica
virtualenv conductor
source conductor/bin/activate
```

### Get Conductor Python SDK

The SDK requires Python 3.9+. To install the SDK, use the following command:

```shell
Expand Down Expand Up @@ -123,32 +124,89 @@ This example demonstrates:
- Metrics endpoint at http://localhost:8000/metrics
- Long-running task with TaskInProgress (5 polls)

## ⚡ Performance Features (SDK 1.3.0+)
See [docs/design/WORKER_DESIGN.md](docs/design/WORKER_DESIGN.md) for complete architecture details.

## 🤖 Agentic Workflows

The Python SDK provides high-performance worker execution with automatic optimization:
Conductor supports **agentic workflows** where LLMs can dynamically call Python workers as tools. This enables building AI agents that combine the reasoning capabilities of LLMs with the execution power of your existing Python code.

**Worker Architecture:**
- **AsyncTaskRunner** for async workers (`async def`) - Pure async/await, zero thread overhead
- **TaskRunner** for sync workers (`def`) - ThreadPoolExecutor for concurrent execution
- **Automatic selection** - Based on function signature, no configuration needed
- **One process per worker** - Process isolation and fault tolerance
### Python Workers as Agent Tools

**Performance Optimizations:**
- **Dynamic batch polling** - Batch size adapts to available capacity (thread_count - running tasks)
- **Adaptive backoff** - Exponential backoff when queue empty (1ms → 2ms → 4ms → poll_interval)
- **High concurrency** - Async workers support higher task throughput, sync workers use thread pools
Any Python worker can be used as a tool for an AI agent. The LLM decides when and how to call your workers based on user queries:

**AsyncTaskRunner Benefits (async def workers):**
- Fewer threads per worker (single event loop)
- Lower memory footprint per worker
- Better I/O throughput for async workloads
- Direct `await worker_fn()` execution
```python
from conductor.client.worker.worker_task import worker_task

See [docs/design/WORKER_DESIGN.md](docs/design/WORKER_DESIGN.md) for complete architecture details.
# Define workers that the AI agent can call as tools
@worker_task(task_definition_name='get_weather')
def get_weather(city: str, units: str = 'fahrenheit') -> dict:
"""Get current weather for a city."""
# Your implementation here
return {'city': city, 'temperature': 72, 'condition': 'Sunny'}

@worker_task(task_definition_name='search_products')
def search_products(query: str, max_results: int = 5) -> dict:
"""Search product catalog."""
# Your implementation here
return {'products': [...], 'total': 10}

@worker_task(task_definition_name='calculate')
def calculate(expression: str) -> dict:
"""Perform mathematical calculations."""
result = eval(expression) # Use safe evaluation in production
return {'result': result}
```

### How It Works

1. **Define Tools**: Create Python workers with `@worker_task` decorator
2. **Create Agent Prompt**: Tell the LLM what tools are available and how to call them
3. **LLM Decides**: The LLM analyzes user queries and decides which tool(s) to invoke
4. **Dynamic Execution**: Conductor dynamically routes to the appropriate worker
5. **Response**: Results are returned to the LLM for summarization

### Example: Interactive AI Agent

```python
from conductor.client.workflow.task.llm_tasks.llm_chat_complete import LlmChatComplete
from conductor.client.workflow.task.dynamic_task import DynamicTask

# LLM with tool awareness
chat_complete = LlmChatComplete(
task_ref_name='chat_ref',
llm_provider='openai',
model='gpt-4',
instructions_template='agent_prompt',
messages='${workflow.variables.messages}'
)

## 📚 Documentation
# Dynamic task execution based on LLM output
function_call = DynamicTask(
task_reference_name='tool_call',
dynamic_task=chat_complete.output('function')
)
function_call.input_parameters['inputs'] = chat_complete.output('function_parameters')
```

### Run the Complete Example

```bash
export CONDUCTOR_SERVER_URL="http://localhost:8080/api"
python3 examples/agentic_workflow.py
```

This interactive example lets you:
- Ask about weather in any city
- Search for products
- Perform calculations
- Send notifications

**Getting Started:**
The agent automatically selects and calls the appropriate Python worker based on your query.

> **📚 Learn More**: See the [Conductor AI Module documentation](https://github.com/conductor-oss/conductor/blob/main/ai/README.md) for details on supported LLM providers, vector databases, and advanced AI task types.


## Getting Started
- **[End-to-End Example](examples/workers_e2e.py)** - Complete workflow execution with workers
- **[Examples Guide](examples/EXAMPLES_README.md)** - All examples with quick reference

Expand Down Expand Up @@ -294,12 +352,22 @@ export CONDUCTOR_SERVER_URL=http://localhost:8080/api
```
### Start Conductor Server

To start the Conductor server in a standalone mode from a Docker image, type the command below:
**One-liner for macOS / Linux:**
```shell
curl -sSL https://raw.githubusercontent.com/conductor-oss/conductor/main/conductor_server.sh | sh
```

**One-liner for Windows PowerShell:**
```powershell
irm https://raw.githubusercontent.com/conductor-oss/conductor/main/conductor_server.ps1 | iex
```

**Or run with Docker:**
```shell
docker run --init -p 8080:8080 -p 5000:5000 conductoross/conductor-standalone:3.15.0
docker run -p 8080:8080 conductoross/conductor:latest
```
To ensure the server has started successfully, open Conductor UI on http://localhost:5000.

To ensure the server has started successfully, open Conductor UI at http://localhost:8080.

### Execute Hello World Application

Expand All @@ -309,7 +377,7 @@ To run the application, type the following command:
python helloworld.py
```

Now, the workflow is executed, and its execution status can be viewed from Conductor UI (http://localhost:5000).
Now, the workflow is executed, and its execution status can be viewed from Conductor UI (http://localhost:8080).

Navigate to the **Executions** tab to view the workflow execution.

Expand Down
2 changes: 2 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ See: `task_context_example.py`, `worker_example.py`

| File | Description | Run |
|------|-------------|-----|
| **agentic_workflow.py** | 🤖 AI agent with Python workers as tools | `python examples/agentic_workflow.py` |
| **dynamic_workflow.py** | Create workflows programmatically | `python examples/dynamic_workflow.py` |
| **workflow_ops.py** | Start, pause, resume, terminate workflows | `python examples/workflow_ops.py` |
| **workflow_status_listner.py** | Workflow event listeners | `python examples/workflow_status_listner.py` |
Expand Down Expand Up @@ -209,6 +210,7 @@ examples/
│ └── pythonic_usage.py # Pythonic decorators
├── Workflows
│ ├── agentic_workflow.py # 🤖 AI agent with tools
│ ├── dynamic_workflow.py # Workflow creation
│ ├── workflow_ops.py # Workflow management
│ ├── workflow_status_listner.py # Workflow events
Expand Down
Loading
Loading