From 1bdf99ed2723b0e16e814120dfecd2cf7b90871d Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Thu, 5 Feb 2026 20:40:01 -0800 Subject: [PATCH] update --- README.md | 260 +++++++++++++--------- examples/README.md | 2 + examples/agentic_workflow.py | 407 +++++++++++++++++++++++++++++++++++ 3 files changed, 573 insertions(+), 96 deletions(-) create mode 100644 examples/agentic_workflow.py diff --git a/README.md b/README.md index 369d6455..a5ce1a57 100644 --- a/README.md +++ b/README.md @@ -13,76 +13,80 @@ Show support for the Conductor OSS. Please help spread the awareness by starrin [![GitHub stars](https://img.shields.io/github/stars/conductor-oss/conductor.svg?style=social&label=Star&maxAge=)](https://GitHub.com/conductor-oss/conductor/) -## Content - - - - -- [Install Conductor Python SDK](#install-conductor-python-sdk) - - [Get Conductor Python SDK](#get-conductor-python-sdk) -- [Hello World Application Using Conductor](#hello-world-application-using-conductor) - - [Step 1: Create Workflow](#step-1-create-workflow) - - [Creating Workflows by Code](#creating-workflows-by-code) - - [(Alternatively) Creating Workflows in JSON](#alternatively-creating-workflows-in-json) - - [Step 2: Write Task Worker](#step-2-write-task-worker) - - [Step 3: Write _Hello World_ Application](#step-3-write-_hello-world_-application) -- [Running Workflows on Conductor Standalone (Installed Locally)](#running-workflows-on-conductor-standalone-installed-locally) - - [Setup Environment Variable](#setup-environment-variable) - - [Start Conductor Server](#start-conductor-server) - - [Execute Hello World Application](#execute-hello-world-application) -- [Running Workflows on Orkes Conductor](#running-workflows-on-orkes-conductor) -- [Learn More about Conductor Python SDK](#learn-more-about-conductor-python-sdk) -- [Create and Run Conductor Workers](#create-and-run-conductor-workers) -- [Writing Workers](#writing-workers) - - [Implementing Workers](#implementing-workers) - - [Managing Workers in Application](#managing-workers-in-application) - - [Design Principles for Workers](#design-principles-for-workers) - - [System Task Workers](#system-task-workers) - - [Wait Task](#wait-task) - - [Using Code to Create Wait Task](#using-code-to-create-wait-task) - - [JSON Configuration](#json-configuration) - - [HTTP Task](#http-task) - - [Using Code to Create HTTP Task](#using-code-to-create-http-task) - - [JSON Configuration](#json-configuration-1) - - [Javascript Executor Task](#javascript-executor-task) - - [Using Code to Create Inline Task](#using-code-to-create-inline-task) - - [JSON Configuration](#json-configuration-2) - - [JSON Processing using JQ](#json-processing-using-jq) - - [Using Code to Create JSON JQ Transform Task](#using-code-to-create-json-jq-transform-task) - - [JSON Configuration](#json-configuration-3) - - [Worker vs. Microservice/HTTP Endpoints](#worker-vs-microservicehttp-endpoints) - - [Deploying Workers in Production](#deploying-workers-in-production) -- [Create Conductor Workflows](#create-conductor-workflows) - - [Conductor Workflows](#conductor-workflows) - - [Creating Workflows](#creating-workflows) - - [Execute Dynamic Workflows Using Code](#execute-dynamic-workflows-using-code) - - [Kitchen-Sink Workflow](#kitchen-sink-workflow) - - [Executing Workflows](#executing-workflows) - - [Execute Workflow Asynchronously](#execute-workflow-asynchronously) - - [Execute Workflow Synchronously](#execute-workflow-synchronously) - - [Managing Workflow Executions](#managing-workflow-executions) - - [Get Execution Status](#get-execution-status) - - [Update Workflow State Variables](#update-workflow-state-variables) - - [Terminate Running Workflows](#terminate-running-workflows) - - [Retry Failed Workflows](#retry-failed-workflows) - - [Restart Workflows](#restart-workflows) - - [Rerun Workflow from a Specific Task](#rerun-workflow-from-a-specific-task) - - [Pause Running Workflow](#pause-running-workflow) - - [Resume Paused Workflow](#resume-paused-workflow) - - [Searching for Workflows](#searching-for-workflows) - - [Handling Failures, Retries and Rate Limits](#handling-failures-retries-and-rate-limits) - - [Retries](#retries) - - [Rate Limits](#rate-limits) - - [Task Registration](#task-registration) - - [Update Task Definition:](#update-task-definition) -- [Using Conductor in Your Application](#using-conductor-in-your-application) - - [Adding Conductor SDK to Your Application](#adding-conductor-sdk-to-your-application) - - [Testing Workflows](#testing-workflows) - - [Example Unit Testing Application](#example-unit-testing-application) - - [Workflow Deployments Using CI/CD](#workflow-deployments-using-cicd) - - [Versioning Workflows](#versioning-workflows) - - + + +* [Conductor OSS Python SDK](#conductor-oss-python-sdk) + * [⭐ Conductor OSS](#-conductor-oss) + * [Install Conductor Python SDK](#install-conductor-python-sdk) + * [šŸš€ Quick Start](#-quick-start) + * [šŸ¤– Agentic Workflows](#-agentic-workflows) + * [Python Workers as Agent Tools](#python-workers-as-agent-tools) + * [How It Works](#how-it-works) + * [Example: Interactive AI Agent](#example-interactive-ai-agent) + * [Run the Complete Example](#run-the-complete-example) + * [Getting Started](#getting-started) + * [Hello World Application Using Conductor](#hello-world-application-using-conductor) + * [Step 1: Create Workflow](#step-1-create-workflow) + * [Creating Workflows by Code](#creating-workflows-by-code) + * [(Alternatively) Creating Workflows in JSON](#alternatively-creating-workflows-in-json) + * [Step 2: Write Task Worker](#step-2-write-task-worker) + * [Step 3: Write _Hello World_ Application](#step-3-write-_hello-world_-application) + * [Running Workflows on Conductor Standalone (Installed Locally)](#running-workflows-on-conductor-standalone-installed-locally) + * [Setup Environment Variable](#setup-environment-variable) + * [Start Conductor Server](#start-conductor-server) + * [Execute Hello World Application](#execute-hello-world-application) + * [Running Workflows on Orkes Conductor](#running-workflows-on-orkes-conductor) + * [Learn More about Conductor Python SDK](#learn-more-about-conductor-python-sdk) + * [Create and Run Conductor Workers](#create-and-run-conductor-workers) + * [Writing Workers](#writing-workers) + * [Implementing Workers](#implementing-workers) + * [Managing Workers in Application](#managing-workers-in-application) + * [Design Principles for Workers](#design-principles-for-workers) + * [System Task Workers](#system-task-workers) + * [Wait Task](#wait-task) + * [Using Code to Create Wait Task](#using-code-to-create-wait-task) + * [JSON Configuration](#json-configuration) + * [HTTP Task](#http-task) + * [Using Code to Create HTTP Task](#using-code-to-create-http-task) + * [JSON Configuration](#json-configuration-1) + * [Javascript Executor Task](#javascript-executor-task) + * [Using Code to Create Inline Task](#using-code-to-create-inline-task) + * [JSON Configuration](#json-configuration-2) + * [JSON Processing using JQ](#json-processing-using-jq) + * [Using Code to Create JSON JQ Transform Task](#using-code-to-create-json-jq-transform-task) + * [JSON Configuration](#json-configuration-3) + * [Worker vs. Microservice/HTTP Endpoints](#worker-vs-microservicehttp-endpoints) + * [Deploying Workers in Production](#deploying-workers-in-production) + * [Create Conductor Workflows](#create-conductor-workflows) + * [Conductor Workflows](#conductor-workflows) + * [Creating Workflows](#creating-workflows) + * [Execute Dynamic Workflows Using Code](#execute-dynamic-workflows-using-code) + * [Kitchen-Sink Workflow](#kitchen-sink-workflow-) + * [Executing Workflows](#executing-workflows) + * [Execute Workflow Asynchronously](#execute-workflow-asynchronously) + * [Execute Workflow Synchronously](#execute-workflow-synchronously) + * [Managing Workflow Executions](#managing-workflow-executions) + * [Get Execution Status](#get-execution-status) + * [Update Workflow State Variables](#update-workflow-state-variables) + * [Terminate Running Workflows](#terminate-running-workflows) + * [Retry Failed Workflows](#retry-failed-workflows) + * [Restart Workflows](#restart-workflows) + * [Rerun Workflow from a Specific Task](#rerun-workflow-from-a-specific-task) + * [Pause Running Workflow](#pause-running-workflow) + * [Resume Paused Workflow](#resume-paused-workflow) + * [Searching for Workflows](#searching-for-workflows) + * [Handling Failures, Retries and Rate Limits](#handling-failures-retries-and-rate-limits) + * [Retries](#retries) + * [Rate Limits](#rate-limits) + * [Task Registration](#task-registration) + * [Update Task Definition:](#update-task-definition) + * [Using Conductor in Your Application](#using-conductor-in-your-application) + * [Adding Conductor SDK to Your Application](#adding-conductor-sdk-to-your-application) + * [Testing Workflows](#testing-workflows) + * [Example Unit Testing Application](#example-unit-testing-application) + * [Workflow Deployments Using CI/CD](#workflow-deployments-using-cicd) + * [Versioning Workflows](#versioning-workflows) + ## Install Conductor Python SDK @@ -92,9 +96,6 @@ Before installing Conductor Python SDK, it is a good practice to set up a dedica virtualenv conductor source conductor/bin/activate ``` - -### Get Conductor Python SDK - The SDK requires Python 3.9+. To install the SDK, use the following command: ```shell @@ -123,32 +124,89 @@ This example demonstrates: - Metrics endpoint at http://localhost:8000/metrics - Long-running task with TaskInProgress (5 polls) -## ⚔ Performance Features (SDK 1.3.0+) +See [docs/design/WORKER_DESIGN.md](docs/design/WORKER_DESIGN.md) for complete architecture details. + +## šŸ¤– Agentic Workflows -The Python SDK provides high-performance worker execution with automatic optimization: +Conductor supports **agentic workflows** where LLMs can dynamically call Python workers as tools. This enables building AI agents that combine the reasoning capabilities of LLMs with the execution power of your existing Python code. -**Worker Architecture:** -- **AsyncTaskRunner** for async workers (`async def`) - Pure async/await, zero thread overhead -- **TaskRunner** for sync workers (`def`) - ThreadPoolExecutor for concurrent execution -- **Automatic selection** - Based on function signature, no configuration needed -- **One process per worker** - Process isolation and fault tolerance +### Python Workers as Agent Tools -**Performance Optimizations:** -- **Dynamic batch polling** - Batch size adapts to available capacity (thread_count - running tasks) -- **Adaptive backoff** - Exponential backoff when queue empty (1ms → 2ms → 4ms → poll_interval) -- **High concurrency** - Async workers support higher task throughput, sync workers use thread pools +Any Python worker can be used as a tool for an AI agent. The LLM decides when and how to call your workers based on user queries: -**AsyncTaskRunner Benefits (async def workers):** -- Fewer threads per worker (single event loop) -- Lower memory footprint per worker -- Better I/O throughput for async workloads -- Direct `await worker_fn()` execution +```python +from conductor.client.worker.worker_task import worker_task -See [docs/design/WORKER_DESIGN.md](docs/design/WORKER_DESIGN.md) for complete architecture details. +# Define workers that the AI agent can call as tools +@worker_task(task_definition_name='get_weather') +def get_weather(city: str, units: str = 'fahrenheit') -> dict: + """Get current weather for a city.""" + # Your implementation here + return {'city': city, 'temperature': 72, 'condition': 'Sunny'} + +@worker_task(task_definition_name='search_products') +def search_products(query: str, max_results: int = 5) -> dict: + """Search product catalog.""" + # Your implementation here + return {'products': [...], 'total': 10} + +@worker_task(task_definition_name='calculate') +def calculate(expression: str) -> dict: + """Perform mathematical calculations.""" + result = eval(expression) # Use safe evaluation in production + return {'result': result} +``` + +### How It Works + +1. **Define Tools**: Create Python workers with `@worker_task` decorator +2. **Create Agent Prompt**: Tell the LLM what tools are available and how to call them +3. **LLM Decides**: The LLM analyzes user queries and decides which tool(s) to invoke +4. **Dynamic Execution**: Conductor dynamically routes to the appropriate worker +5. **Response**: Results are returned to the LLM for summarization + +### Example: Interactive AI Agent + +```python +from conductor.client.workflow.task.llm_tasks.llm_chat_complete import LlmChatComplete +from conductor.client.workflow.task.dynamic_task import DynamicTask + +# LLM with tool awareness +chat_complete = LlmChatComplete( + task_ref_name='chat_ref', + llm_provider='openai', + model='gpt-4', + instructions_template='agent_prompt', + messages='${workflow.variables.messages}' +) -## šŸ“š Documentation +# Dynamic task execution based on LLM output +function_call = DynamicTask( + task_reference_name='tool_call', + dynamic_task=chat_complete.output('function') +) +function_call.input_parameters['inputs'] = chat_complete.output('function_parameters') +``` + +### Run the Complete Example + +```bash +export CONDUCTOR_SERVER_URL="http://localhost:8080/api" +python3 examples/agentic_workflow.py +``` + +This interactive example lets you: +- Ask about weather in any city +- Search for products +- Perform calculations +- Send notifications -**Getting Started:** +The agent automatically selects and calls the appropriate Python worker based on your query. + +> **šŸ“š Learn More**: See the [Conductor AI Module documentation](https://github.com/conductor-oss/conductor/blob/main/ai/README.md) for details on supported LLM providers, vector databases, and advanced AI task types. + + +## Getting Started - **[End-to-End Example](examples/workers_e2e.py)** - Complete workflow execution with workers - **[Examples Guide](examples/EXAMPLES_README.md)** - All examples with quick reference @@ -294,12 +352,22 @@ export CONDUCTOR_SERVER_URL=http://localhost:8080/api ``` ### Start Conductor Server -To start the Conductor server in a standalone mode from a Docker image, type the command below: +**One-liner for macOS / Linux:** +```shell +curl -sSL https://raw.githubusercontent.com/conductor-oss/conductor/main/conductor_server.sh | sh +``` + +**One-liner for Windows PowerShell:** +```powershell +irm https://raw.githubusercontent.com/conductor-oss/conductor/main/conductor_server.ps1 | iex +``` +**Or run with Docker:** ```shell -docker run --init -p 8080:8080 -p 5000:5000 conductoross/conductor-standalone:3.15.0 +docker run -p 8080:8080 conductoross/conductor:latest ``` -To ensure the server has started successfully, open Conductor UI on http://localhost:5000. + +To ensure the server has started successfully, open Conductor UI at http://localhost:8080. ### Execute Hello World Application @@ -309,7 +377,7 @@ To run the application, type the following command: python helloworld.py ``` -Now, the workflow is executed, and its execution status can be viewed from Conductor UI (http://localhost:5000). +Now, the workflow is executed, and its execution status can be viewed from Conductor UI (http://localhost:8080). Navigate to the **Executions** tab to view the workflow execution. diff --git a/examples/README.md b/examples/README.md index 0b7366f7..689a37c8 100644 --- a/examples/README.md +++ b/examples/README.md @@ -60,6 +60,7 @@ See: `task_context_example.py`, `worker_example.py` | File | Description | Run | |------|-------------|-----| +| **agentic_workflow.py** | šŸ¤– AI agent with Python workers as tools | `python examples/agentic_workflow.py` | | **dynamic_workflow.py** | Create workflows programmatically | `python examples/dynamic_workflow.py` | | **workflow_ops.py** | Start, pause, resume, terminate workflows | `python examples/workflow_ops.py` | | **workflow_status_listner.py** | Workflow event listeners | `python examples/workflow_status_listner.py` | @@ -209,6 +210,7 @@ examples/ │ └── pythonic_usage.py # Pythonic decorators │ ā”œā”€ā”€ Workflows +│ ā”œā”€ā”€ agentic_workflow.py # šŸ¤– AI agent with tools │ ā”œā”€ā”€ dynamic_workflow.py # Workflow creation │ ā”œā”€ā”€ workflow_ops.py # Workflow management │ ā”œā”€ā”€ workflow_status_listner.py # Workflow events diff --git a/examples/agentic_workflow.py b/examples/agentic_workflow.py new file mode 100644 index 00000000..ec4a96b4 --- /dev/null +++ b/examples/agentic_workflow.py @@ -0,0 +1,407 @@ +""" +Agentic Workflow Example - Using Python Workers as Agent Tools + +This example demonstrates how to create an agentic workflow where an LLM can +dynamically call Python worker tasks as tools to accomplish goals. + +The workflow: +1. Takes a user query +2. LLM analyzes the query and decides which tool(s) to call +3. Python workers execute as tools +4. LLM summarizes the results + +Requirements: +- Conductor server running (see README.md for startup instructions) +- OpenAI API key configured in Conductor integrations +- Set environment variables: + export CONDUCTOR_SERVER_URL=http://localhost:8080/api + export CONDUCTOR_AUTH_KEY=your_key # if using Orkes Conductor + export CONDUCTOR_AUTH_SECRET=your_secret # if using Orkes Conductor + +Usage: + python examples/agentic_workflow.py +""" + +import os +import time +from typing import Optional + +from conductor.client.ai.orchestrator import AIOrchestrator +from conductor.client.automator.task_handler import TaskHandler +from conductor.client.configuration.configuration import Configuration +from conductor.client.http.models import TaskDef +from conductor.client.http.models.task_result_status import TaskResultStatus +from conductor.client.orkes_clients import OrkesClients +from conductor.client.worker.worker_task import worker_task +from conductor.client.workflow.conductor_workflow import ConductorWorkflow +from conductor.client.workflow.task.do_while_task import LoopTask +from conductor.client.workflow.task.dynamic_task import DynamicTask +from conductor.client.workflow.task.llm_tasks.llm_chat_complete import LlmChatComplete, ChatMessage +from conductor.client.workflow.task.set_variable_task import SetVariableTask +from conductor.client.workflow.task.switch_task import SwitchTask +from conductor.client.workflow.task.timeout_policy import TimeoutPolicy +from conductor.client.workflow.task.wait_task import WaitTask + + +# ============================================================================= +# DEFINE PYTHON WORKERS AS AGENT TOOLS +# ============================================================================= +# These workers will be available as tools for the LLM agent to call. +# Each worker is a self-contained function that performs a specific task. + +@worker_task(task_definition_name='get_weather') +def get_weather(city: str, units: str = 'fahrenheit') -> dict: + """ + Get current weather for a city. + + Args: + city: City name or zip code + units: Temperature units ('fahrenheit' or 'celsius') + + Returns: + Weather information including temperature and conditions + """ + # In a real application, this would call a weather API + weather_data = { + 'new york': {'temp': 72, 'condition': 'Partly Cloudy', 'humidity': 65}, + 'san francisco': {'temp': 58, 'condition': 'Foggy', 'humidity': 80}, + 'miami': {'temp': 85, 'condition': 'Sunny', 'humidity': 75}, + 'chicago': {'temp': 45, 'condition': 'Windy', 'humidity': 55}, + } + + city_lower = city.lower() + data = weather_data.get(city_lower, {'temp': 70, 'condition': 'Clear', 'humidity': 50}) + + if units == 'celsius': + data['temp'] = round((data['temp'] - 32) * 5/9, 1) + + return { + 'city': city, + 'temperature': data['temp'], + 'units': units, + 'condition': data['condition'], + 'humidity': data['humidity'] + } + + +@worker_task(task_definition_name='search_products') +def search_products(query: str, max_results: int = 5) -> dict: + """ + Search for products in a catalog. + + Args: + query: Search query string + max_results: Maximum number of results to return + + Returns: + List of matching products with prices + """ + # Simulated product database + products = [ + {'name': 'Wireless Headphones', 'price': 79.99, 'category': 'Electronics'}, + {'name': 'Running Shoes', 'price': 129.99, 'category': 'Sports'}, + {'name': 'Coffee Maker', 'price': 49.99, 'category': 'Kitchen'}, + {'name': 'Laptop Stand', 'price': 39.99, 'category': 'Electronics'}, + {'name': 'Yoga Mat', 'price': 24.99, 'category': 'Sports'}, + {'name': 'Bluetooth Speaker', 'price': 59.99, 'category': 'Electronics'}, + {'name': 'Water Bottle', 'price': 19.99, 'category': 'Sports'}, + ] + + query_lower = query.lower() + matches = [p for p in products if query_lower in p['name'].lower() or query_lower in p['category'].lower()] + + return { + 'query': query, + 'total_found': len(matches), + 'products': matches[:max_results] + } + + +@worker_task(task_definition_name='calculate') +def calculate(expression: str) -> dict: + """ + Perform a mathematical calculation. + + Args: + expression: Mathematical expression to evaluate (e.g., "2 + 2", "sqrt(16)") + + Returns: + Calculation result + """ + import math + + # Safe evaluation with limited functions + safe_dict = { + 'abs': abs, 'round': round, 'min': min, 'max': max, + 'sqrt': math.sqrt, 'pow': pow, 'log': math.log, + 'sin': math.sin, 'cos': math.cos, 'tan': math.tan, + 'pi': math.pi, 'e': math.e + } + + try: + result = eval(expression, {"__builtins__": {}}, safe_dict) + return {'expression': expression, 'result': result, 'success': True} + except Exception as e: + return {'expression': expression, 'error': str(e), 'success': False} + + +@worker_task(task_definition_name='send_notification') +def send_notification(recipient: str, message: str, channel: str = 'email') -> dict: + """ + Send a notification to a user. + + Args: + recipient: Email address or phone number + message: Notification message content + channel: Notification channel ('email', 'sms', 'push') + + Returns: + Confirmation of notification sent + """ + # In a real application, this would integrate with notification services + return { + 'status': 'sent', + 'recipient': recipient, + 'channel': channel, + 'message_preview': message[:50] + '...' if len(message) > 50 else message, + 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S') + } + + +# ============================================================================= +# AGENT WORKFLOW SETUP +# ============================================================================= + +def start_workers(api_config: Configuration) -> TaskHandler: + """Start the task handler with worker discovery.""" + task_handler = TaskHandler( + workers=[], + configuration=api_config, + scan_for_annotated_workers=True, + ) + task_handler.start_processes() + return task_handler + + +def register_tool_tasks(metadata_client) -> None: + """Register task definitions for our worker tools.""" + tools = ['get_weather', 'search_products', 'calculate', 'send_notification'] + for tool in tools: + metadata_client.register_task_def(task_def=TaskDef(name=tool)) + + +def create_agent_prompt() -> str: + """Create the system prompt that defines available tools for the agent.""" + return """ +You are a helpful AI assistant with access to the following tools: + +1. get_weather(city: str, units: str = 'fahrenheit') -> dict + - Get current weather for a city + - units can be 'fahrenheit' or 'celsius' + +2. search_products(query: str, max_results: int = 5) -> dict + - Search for products in our catalog + - Returns product names and prices + +3. calculate(expression: str) -> dict + - Perform mathematical calculations + - Supports basic math, sqrt, pow, log, trig functions + +4. send_notification(recipient: str, message: str, channel: str = 'email') -> dict + - Send notifications via email, sms, or push + +When you need to use a tool, respond with a JSON object in this exact format: +{ + "type": "function", + "function": "FUNCTION_NAME", + "function_parameters": {"param1": "value1", "param2": "value2"} +} + +If you don't need to use a tool, just respond normally with text. +Always be helpful and explain your actions to the user. +""" + + +def create_agentic_workflow( + workflow_executor, + llm_provider: str, + model: str, + prompt_name: str +) -> ConductorWorkflow: + """ + Create an agentic workflow that uses Python workers as tools. + + The workflow: + 1. Waits for user input + 2. Sends to LLM with tool definitions + 3. If LLM wants to call a tool, dynamically execute the worker + 4. Loop back for more interactions + """ + wf = ConductorWorkflow(name='python_agent_workflow', version=1, executor=workflow_executor) + + # Wait for user input + user_input = WaitTask(task_ref_name='get_user_input') + + # Collect conversation history + collect_history = SetVariableTask(task_ref_name='collect_history_ref') + collect_history.input_parameter('messages', [ + ChatMessage(role='user', message='${get_user_input.output.question}') + ]) + collect_history.input_parameter('_merge', True) + + # LLM chat completion with tool awareness + chat_complete = LlmChatComplete( + task_ref_name='chat_complete_ref', + llm_provider=llm_provider, + model=model, + instructions_template=prompt_name, + messages='${workflow.variables.messages}', + max_tokens=1000, + temperature=0 + ) + + # Dynamic task to call the function returned by LLM + function_call = DynamicTask( + task_reference_name='fn_call_ref', + dynamic_task=chat_complete.output('function') + ) + function_call.input_parameters['inputs'] = chat_complete.output('function_parameters') + function_call.input_parameters['dynamicTaskInputParam'] = 'inputs' + + # Switch to check if LLM wants to call a function + should_call_fn = SwitchTask( + task_ref_name='check_function_call', + case_expression="$.type == 'function' ? 'call_function' : 'direct_response'", + use_javascript=True + ) + should_call_fn.input_parameter('type', chat_complete.output('type')) + should_call_fn.switch_case('call_function', [function_call]) + should_call_fn.default_case([]) # No function call needed + + # Update history with assistant response + update_history = SetVariableTask(task_ref_name='update_history_ref') + update_history.input_parameter('messages', [ + ChatMessage(role='assistant', message='${chat_complete_ref.output.result}') + ]) + update_history.input_parameter('_merge', True) + + # Create the conversation loop + loop_tasks = [user_input, collect_history, chat_complete, should_call_fn, update_history] + chat_loop = LoopTask(task_ref_name='agent_loop', iterations=10, tasks=loop_tasks) + + wf >> chat_loop + + # Set workflow timeout (5 minutes) + wf.timeout_seconds(300).timeout_policy(timeout_policy=TimeoutPolicy.TIME_OUT_WORKFLOW) + + return wf + + +def main(): + """Main entry point for the agentic workflow example.""" + + # Configuration + llm_provider = 'openai' # Change to your configured provider + model = 'gpt-4' # Or 'gpt-3.5-turbo' for faster/cheaper responses + + print(""" +╔══════════════════════════════════════════════════════════════════╗ +ā•‘ šŸ¤– Conductor Agentic Workflow Example ā•‘ +╠══════════════════════════════════════════════════════════════════╣ +ā•‘ This agent can: ā•‘ +ā•‘ • Get weather for any city ā•‘ +ā•‘ • Search products in a catalog ā•‘ +ā•‘ • Perform calculations ā•‘ +ā•‘ • Send notifications ā•‘ +ā•‘ ā•‘ +ā•‘ Try asking: ā•‘ +ā•‘ - "What's the weather in San Francisco?" ā•‘ +ā•‘ - "Search for electronics under $100" ā•‘ +ā•‘ - "Calculate the square root of 144" ā•‘ +ā•‘ - "Send an email to user@example.com saying hello" ā•‘ +ā•šā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā• +""") + + # Initialize configuration and clients + api_config = Configuration() + clients = OrkesClients(configuration=api_config) + workflow_executor = clients.get_workflow_executor() + workflow_client = clients.get_workflow_client() + task_client = clients.get_task_client() + metadata_client = clients.get_metadata_client() + + # Start workers + task_handler = start_workers(api_config) + + # Register tool tasks + register_tool_tasks(metadata_client) + + # Set up AI orchestrator and prompt + orchestrator = AIOrchestrator(api_configuration=api_config) + prompt_name = 'python_agent_instructions' + prompt_text = create_agent_prompt() + + orchestrator.add_prompt_template(prompt_name, prompt_text, 'Agent with Python tool access') + orchestrator.associate_prompt_template(prompt_name, llm_provider, [model]) + + # Create and register workflow + wf = create_agentic_workflow(workflow_executor, llm_provider, model, prompt_name) + wf.register(overwrite=True) + + print(f"āœ… Workflow registered: {wf.name}") + print(f"🌐 Conductor UI: {api_config.ui_host}\n") + + # Start workflow execution + workflow_run = wf.execute( + wait_until_task_ref='get_user_input', + wait_for_seconds=1, + workflow_input={} + ) + workflow_id = workflow_run.workflow_id + + print(f"šŸš€ Workflow started: {api_config.ui_host}/execution/{workflow_id}\n") + + # Interactive conversation loop + try: + while workflow_run.is_running(): + current_task = workflow_run.current_task + if current_task and current_task.workflow_task.task_reference_name == 'get_user_input': + + # Check for previous function call results + fn_call_task = workflow_run.get_task(task_reference_name='fn_call_ref') + if fn_call_task and fn_call_task.output_data: + print(f"\nšŸ”§ Tool Result: {fn_call_task.output_data.get('result', fn_call_task.output_data)}") + + # Check for LLM response + chat_task = workflow_run.get_task(task_reference_name='chat_complete_ref') + if chat_task and chat_task.output_data.get('result'): + print(f"\nšŸ¤– Assistant: {chat_task.output_data['result']}") + + # Get user input + question = input('\nšŸ‘¤ You: ') + if question.lower() in ['quit', 'exit', 'q']: + print("\nšŸ‘‹ Goodbye!") + break + + # Submit user input to workflow + task_client.update_task_sync( + workflow_id=workflow_id, + task_ref_name='get_user_input', + status=TaskResultStatus.COMPLETED, + output={'question': question} + ) + + time.sleep(0.5) + workflow_run = workflow_client.get_workflow(workflow_id=workflow_id, include_tasks=True) + + except KeyboardInterrupt: + print("\n\nāš ļø Interrupted by user") + finally: + # Cleanup + print("\nšŸ›‘ Stopping workers...") + task_handler.stop_processes() + print("āœ… Done!") + + +if __name__ == '__main__': + main()