From dd0d90b3291819813f0663ae7ce0df62fcded1e2 Mon Sep 17 00:00:00 2001 From: R4ph-t Date: Thu, 5 Feb 2026 12:39:11 +0100 Subject: [PATCH 1/3] Refactor workflow tasks to use @app.task decorator and update SDK usage across multiple services. Updated requirements to render-sdk version 0.2.0 for enhanced functionality. Adjusted README files to reflect changes in task invocation and examples. --- data-pipeline/README.md | 28 ++++----- data-pipeline/main.py | 32 +++++----- data-pipeline/requirements.txt | 2 +- etl-job/README.md | 22 +++---- etl-job/main.py | 29 ++++----- etl-job/requirements.txt | 2 +- file-analyzer/README.md | 43 +++++++------ file-analyzer/api-service/main.py | 36 +++++++---- file-analyzer/api-service/requirements.txt | 2 +- file-analyzer/workflow-service/main.py | 28 ++++----- .../workflow-service/requirements.txt | 2 +- file-processing/README.md | 35 +++++------ file-processing/main.py | 43 ++++++------- file-processing/requirements.txt | 2 +- hello-world/README.md | 59 +++++++++-------- hello-world/main.py | 63 +++++-------------- hello-world/requirements.txt | 2 +- openai-agent/README.md | 27 ++++---- openai-agent/main.py | 30 ++++----- openai-agent/requirements.txt | 2 +- 20 files changed, 240 insertions(+), 249 deletions(-) diff --git a/data-pipeline/README.md b/data-pipeline/README.md index da9dedb..8c348bb 100644 --- a/data-pipeline/README.md +++ b/data-pipeline/README.md @@ -179,23 +179,21 @@ Input: Once deployed, trigger the pipeline via the Render API or SDK: ```python -from render_sdk.client import Client +from render_sdk import Render -client = Client(api_key="your_render_api_key") +# Uses RENDER_API_KEY environment variable automatically +render = Render() # Run the complete pipeline -task_run = client.workflows.run_task( - workflow_service_slug="data-pipeline-workflows", - task_name="run_data_pipeline", - input={ - "user_ids": ["user_1", "user_2", "user_3", "user_4"] - } +task_run = await render.workflows.run_task( + "data-pipeline-workflows/run_data_pipeline", + {"user_ids": ["user_1", "user_2", "user_3", "user_4"]} ) result = await task_run -print(f"Pipeline status: {result['status']}") -print(f"Total revenue: ${result['insights']['revenue']['total']}") -print(f"Segment distribution: {result['insights']['segment_distribution']}") +print(f"Pipeline status: {result.results['status']}") +print(f"Total revenue: ${result.results['insights']['revenue']['total']}") +print(f"Segment distribution: {result.results['segment_distribution']}") ``` ## Pipeline Stages @@ -328,7 +326,7 @@ Business logic classifies users into segments: **Add Real APIs**: ```python -@task +@app.task async def fetch_user_data_from_api(user_ids: list[str]) -> dict: client = get_http_client() response = await client.post( @@ -340,7 +338,7 @@ async def fetch_user_data_from_api(user_ids: list[str]) -> dict: **Add Database Integration**: ```python -@task +@app.task async def load_to_warehouse(insights: dict) -> dict: # Connect to data warehouse (Snowflake, BigQuery, etc.) # Insert aggregated insights @@ -350,7 +348,7 @@ async def load_to_warehouse(insights: dict) -> dict: **Add Caching**: ```python -@task +@app.task async def fetch_with_cache(source: str, key: str) -> dict: # Check Redis/Memcached # If miss, fetch from source and cache @@ -360,7 +358,7 @@ async def fetch_with_cache(source: str, key: str) -> dict: **Add Notifications**: ```python -@task +@app.task async def send_pipeline_notification(result: dict) -> dict: # Send to Slack, email, etc. # Notify stakeholders of pipeline completion diff --git a/data-pipeline/main.py b/data-pipeline/main.py index 6415a8d..3398e9f 100644 --- a/data-pipeline/main.py +++ b/data-pipeline/main.py @@ -16,9 +16,8 @@ import asyncio import logging from datetime import datetime, timedelta -from typing import Any -from render_sdk.workflows import Options, Retry, start, task +from render_sdk import Retry, Workflows # Configure logging logging.basicConfig( @@ -44,11 +43,19 @@ def get_http_client(): return _http_client +# Initialize Workflows app with defaults +app = Workflows( + default_retry=Retry(max_retries=3, wait_duration_ms=2000, backoff_scaling=1.5), + default_timeout=300, + auto_start=True, +) + + # ============================================================================ # Data Source Tasks - Extract from multiple sources # ============================================================================ -@task(options=Options(retry=Retry(max_retries=3, wait_duration_ms=2000))) +@app.task async def fetch_user_data(user_ids: list[str]) -> dict: """ Fetch user profile data from user service. @@ -83,7 +90,7 @@ async def fetch_user_data(user_ids: list[str]) -> dict: } -@task(options=Options(retry=Retry(max_retries=3, wait_duration_ms=2000))) +@app.task async def fetch_transaction_data(user_ids: list[str], days: int = 30) -> dict: """ Fetch transaction history for users. @@ -123,7 +130,7 @@ async def fetch_transaction_data(user_ids: list[str], days: int = 30) -> dict: } -@task(options=Options(retry=Retry(max_retries=3, wait_duration_ms=2000))) +@app.task async def fetch_engagement_data(user_ids: list[str]) -> dict: """ Fetch user engagement metrics. @@ -167,7 +174,7 @@ async def fetch_engagement_data(user_ids: list[str]) -> dict: # Enrichment Tasks - Add additional context # ============================================================================ -@task +@app.task async def enrich_with_geo_data(user_email: str) -> dict: """ Enrich user data with geographic information. @@ -192,7 +199,7 @@ async def enrich_with_geo_data(user_email: str) -> dict: return geo_data -@task +@app.task async def calculate_user_metrics( user: dict, transactions: list[dict], @@ -260,7 +267,7 @@ async def calculate_user_metrics( # Transformation Tasks - Process and combine data # ============================================================================ -@task +@app.task async def transform_user_data( user_data: dict, transaction_data: dict, @@ -319,7 +326,7 @@ async def transform_user_data( # Aggregation Tasks - Generate insights # ============================================================================ -@task +@app.task def aggregate_insights(enriched_data: dict) -> dict: """ Generate aggregate insights from enriched user data. @@ -390,7 +397,7 @@ def aggregate_insights(enriched_data: dict) -> dict: # Main Pipeline Orchestrator # ============================================================================ -@task +@app.task async def run_data_pipeline(user_ids: list[str]) -> dict: """ Execute the complete data pipeline. @@ -486,8 +493,3 @@ async def run_data_pipeline(user_ids: list[str]) -> dict: "error": str(e), "failed_at": datetime.now().isoformat() } - - -if __name__ == "__main__": - logger.info("Starting Data Pipeline Workflow Service") - start() diff --git a/data-pipeline/requirements.txt b/data-pipeline/requirements.txt index e145ae4..0f0ce62 100644 --- a/data-pipeline/requirements.txt +++ b/data-pipeline/requirements.txt @@ -1,2 +1,2 @@ -render-sdk>=0.1.0 +render-sdk>=0.2.0 httpx>=0.27.0 diff --git a/etl-job/README.md b/etl-job/README.md index 5471a84..f265807 100644 --- a/etl-job/README.md +++ b/etl-job/README.md @@ -126,21 +126,21 @@ Input: Once deployed, trigger the ETL pipeline via the Render API or SDK: ```python -from render_sdk.client import Client +from render_sdk import Render -client = Client(api_key="your_render_api_key") +# Uses RENDER_API_KEY environment variable automatically +render = Render() # Run the ETL pipeline -task_run = client.workflows.run_task( - workflow_service_slug="etl-job-workflows", - task_name="run_etl_pipeline", - input="sample_data.csv" +task_run = await render.workflows.run_task( + "etl-job-workflows/run_etl_pipeline", + {"source_file": "sample_data.csv"} ) # Wait for completion result = await task_run -print(f"Pipeline status: {result['status']}") -print(f"Valid records: {result['transform']['valid_count']}") +print(f"Pipeline status: {result.results['status']}") +print(f"Valid records: {result.results['transform']['valid_count']}") ``` ## Sample Data @@ -187,7 +187,7 @@ This demonstrates **sequential subtask orchestration** for multi-stage pipelines **Add Database Loading**: ```python -@task +@app.task async def load_to_database(records: list[dict]) -> dict: # Connect to database # Insert records @@ -197,7 +197,7 @@ async def load_to_database(records: list[dict]) -> dict: **Add API Data Source**: ```python -@task +@app.task async def extract_from_api(api_url: str) -> list[dict]: # Fetch from REST API # Parse JSON response @@ -209,7 +209,7 @@ async def extract_from_api(api_url: str) -> list[dict]: ```python import asyncio -@task +@app.task async def transform_batch_parallel(records: list[dict]) -> dict: # Validate all records in parallel tasks = [validate_record(record) for record in records] diff --git a/etl-job/main.py b/etl-job/main.py index 75685ba..08f697a 100644 --- a/etl-job/main.py +++ b/etl-job/main.py @@ -16,9 +16,8 @@ import logging from datetime import datetime from pathlib import Path -from typing import Any -from render_sdk.workflows import Options, Retry, start, task +from render_sdk import Retry, Workflows # Configure logging logging.basicConfig( @@ -27,12 +26,19 @@ ) logger = logging.getLogger(__name__) +# Initialize Workflows app with defaults +app = Workflows( + default_retry=Retry(max_retries=3, wait_duration_ms=1000, backoff_scaling=1.5), + default_timeout=300, + auto_start=True, +) + # ============================================================================ # EXTRACT Tasks # ============================================================================ -@task(options=Options(retry=Retry(max_retries=3, wait_duration_ms=1000))) +@app.task def extract_csv_data(file_path: str) -> list[dict]: """ Extract data from a CSV file. @@ -51,7 +57,7 @@ def extract_csv_data(file_path: str) -> list[dict]: try: path = Path(file_path) if not path.exists(): - logger.warning(f"[EXTRACT] File not found, using sample data") + logger.warning("[EXTRACT] File not found, using sample data") # In production, this would read from cloud storage or database return [ {"id": "1", "name": "Alice", "email": "alice@example.com", "age": "28", "country": "USA"}, @@ -76,7 +82,7 @@ def extract_csv_data(file_path: str) -> list[dict]: # TRANSFORM Tasks # ============================================================================ -@task +@app.task def validate_record(record: dict) -> dict: """ Validate and clean a single data record. @@ -137,7 +143,7 @@ def validate_record(record: dict) -> dict: return cleaned_record -@task +@app.task async def transform_batch(records: list[dict]) -> dict: """ Transform a batch of records by validating each one. @@ -187,7 +193,7 @@ async def transform_batch(records: list[dict]) -> dict: # LOAD Tasks # ============================================================================ -@task +@app.task def compute_statistics(valid_records: list[dict]) -> dict: """ Compute statistical insights from validated records. @@ -237,7 +243,7 @@ def compute_statistics(valid_records: list[dict]) -> dict: 'timestamp': datetime.now().isoformat() } - logger.info(f"[LOAD] Statistics computed successfully") + logger.info("[LOAD] Statistics computed successfully") logger.info(f"[LOAD] Countries: {list(country_counts.keys())}") if age_stats: logger.info(f"[LOAD] Age range: {age_stats['min']}-{age_stats['max']}") @@ -249,7 +255,7 @@ def compute_statistics(valid_records: list[dict]) -> dict: # MAIN ETL Pipeline # ============================================================================ -@task +@app.task async def run_etl_pipeline(source_file: str) -> dict: """ Complete ETL pipeline orchestrating extract, transform, and load operations. @@ -328,8 +334,3 @@ async def run_etl_pipeline(source_file: str) -> dict: 'error': str(e), 'failed_at': datetime.now().isoformat() } - - -if __name__ == "__main__": - logger.info("Starting ETL Job Workflow Service") - start() diff --git a/etl-job/requirements.txt b/etl-job/requirements.txt index 40ca944..2a52138 100644 --- a/etl-job/requirements.txt +++ b/etl-job/requirements.txt @@ -1 +1 @@ -render-sdk>=0.1.0 +render-sdk>=0.2.0 diff --git a/file-analyzer/README.md b/file-analyzer/README.md index f5cb919..74154c4 100644 --- a/file-analyzer/README.md +++ b/file-analyzer/README.md @@ -166,7 +166,7 @@ file-analyzer/ The main `analyze_file` task demonstrates subtask orchestration: ```python -@task +@app.task async def analyze_file(file_content: str) -> dict: # SUBTASK CALL: Parse CSV data parsed_data = await parse_csv_data(file_content) @@ -204,19 +204,19 @@ async def analyze_file(file_content: str) -> dict: The API service demonstrates the complete Client SDK workflow: ```python -from render_sdk.client import Client +from render_sdk import Render -# 1. Get client instance -client = Client(api_key=os.getenv("RENDER_API_KEY")) +# 1. Get client instance (uses RENDER_API_KEY env var automatically) +render = Render() # 2. Construct task identifier: {service-slug}/{task-name} service_slug = os.getenv("WORKFLOW_SERVICE_SLUG") task_identifier = f"{service_slug}/analyze_file" -# 3. Call the workflow task with arguments as a list -task_run = await client.workflows.run_task( +# 3. Call the workflow task with arguments as a dict +task_run = await render.workflows.run_task( task_identifier, - [file_content] # Arguments as list + {"file_content": file_content} ) # 4. Await the task completion @@ -495,17 +495,18 @@ Contains customer information with columns: **Creating the Client:** ```python -from render_sdk.client import Client +from render_sdk import Render -client = Client(api_key=os.getenv("RENDER_API_KEY")) +# Uses RENDER_API_KEY environment variable automatically +render = Render() ``` **Calling Tasks:** ```python -# Format: client.workflows.run_task(task_identifier, [args]) -task_run = await client.workflows.run_task( +# Format: render.workflows.run_task(task_identifier, {args}) +task_run = await render.workflows.run_task( "service-slug/task-name", - [arg1, arg2, arg3] # Arguments as list + {"arg1": value1, "arg2": value2} ) # Await completion @@ -521,15 +522,13 @@ print(result.results) # Return value from task **Defining Tasks:** ```python -from render_sdk.workflows import start, task +from render_sdk import Workflows -@task +app = Workflows(auto_start=True) + +@app.task def my_task(param: str) -> dict: return {"result": param} - -# Start the workflow service -if __name__ == "__main__": - start() ``` ### 3. Service Separation @@ -595,7 +594,7 @@ async def analyze_file(file: UploadFile): ### Add Webhook Notifications ```python -@task +@app.task async def analyze_file(file_content: str, webhook_url: str = None) -> dict: # ... perform analysis ... @@ -609,12 +608,12 @@ async def analyze_file(file_content: str, webhook_url: str = None) -> dict: ### Add More File Formats ```python -@task +@app.task def parse_json_data(file_content: str) -> dict: # Parse JSON files pass -@task +@app.task def parse_excel_data(file_content: bytes) -> dict: # Parse Excel files pass @@ -675,7 +674,7 @@ def parse_excel_data(file_content: bytes) -> dict: - **Python-only**: Render Workflows are only supported in Python via `render-sdk` - **No Blueprint Support**: Workflows don't support `render.yaml` blueprint configuration - **Service Types**: Workflow service (for tasks) vs Web Service (for API) -- **Task Arguments**: Must be passed as a list: `[arg1, arg2, ...]` +- **Task Arguments**: Passed as a dict: `{"arg1": value1, "arg2": value2}` - **Awaitable Pattern**: Use `await task_run` to wait for completion - **Service Slug**: Set correctly in `WORKFLOW_SERVICE_SLUG` environment variable - **API Key**: Required in both services, get from Account Settings diff --git a/file-analyzer/api-service/main.py b/file-analyzer/api-service/main.py index 5782aaf..f1eadd2 100644 --- a/file-analyzer/api-service/main.py +++ b/file-analyzer/api-service/main.py @@ -16,8 +16,8 @@ from typing import Any from fastapi import FastAPI, File, UploadFile, HTTPException from fastapi.middleware.cors import CORSMiddleware -from render_sdk.client import Client -from render_sdk.client.errors import RenderError +from render_sdk import Render +from render_sdk.client.errors import RenderError, TaskRunError from pydantic import BaseModel # Configure logging @@ -62,7 +62,7 @@ class HealthResponse(BaseModel): # Client SDK helper functions -def get_client() -> Client: +def get_client() -> Render: """ Get Render API client. @@ -76,7 +76,7 @@ def get_client() -> Client: status_code=500, detail="RENDER_API_KEY not configured. Get your API key from Render Dashboard → Account Settings → API Keys" ) - return Client(api_key) + return Render() # Uses RENDER_API_KEY env var automatically def get_task_identifier(task_name: str) -> str: @@ -192,19 +192,19 @@ async def analyze_file(file: UploadFile = File(...)): logger.info(f"Calling workflow task: {task_identifier}") # CLIENT SDK CALL: Run the workflow task - # Format: client.workflows.run_task(task_identifier, [arg1, arg2, ...]) - # Arguments must be passed as a list + # Format: client.workflows.run_task(task_identifier, {"arg": value}) task_run = await client.workflows.run_task( task_identifier, - [file_content_str] # Pass file content as first argument + {"file_content": file_content_str} ) + logger.info(f"Task started: {task_run.id}") + # CLIENT SDK CALL: Await the task completion # This will block until the task finishes result = await task_run logger.info(f"Task completed with status: {result.status}") - logger.info(f"Task run ID: {result.id}") return AnalysisResponse( task_run_id=result.id, @@ -213,11 +213,17 @@ async def analyze_file(file: UploadFile = File(...)): result=result.results # Task return value ) + except TaskRunError as e: + logger.error(f"Task run error: {e}") + raise HTTPException( + status_code=500, + detail=f"Workflow task failed: {str(e)}" + ) except RenderError as e: logger.error(f"Render API error: {e}") raise HTTPException( status_code=500, - detail=f"Workflow task failed: {str(e)}" + detail=f"Render API error: {str(e)}" ) except UnicodeDecodeError: logger.error("File encoding error") @@ -278,9 +284,11 @@ async def analyze_with_custom_task(task_name: str, file: UploadFile = File(...)) # CLIENT SDK CALL: Run the specified workflow task task_run = await client.workflows.run_task( task_identifier, - [file_content_str] + {"file_content": file_content_str} ) + logger.info(f"Task started: {task_run.id}") + # CLIENT SDK CALL: Await the task completion result = await task_run @@ -293,11 +301,17 @@ async def analyze_with_custom_task(task_name: str, file: UploadFile = File(...)) result=result.results ) + except TaskRunError as e: + logger.error(f"Task run error: {e}") + raise HTTPException( + status_code=500, + detail=f"Workflow task '{task_name}' failed: {str(e)}" + ) except RenderError as e: logger.error(f"Render API error: {e}") raise HTTPException( status_code=500, - detail=f"Workflow task '{task_name}' failed: {str(e)}" + detail=f"Render API error: {str(e)}" ) except Exception as e: logger.error(f"Unexpected error: {e}") diff --git a/file-analyzer/api-service/requirements.txt b/file-analyzer/api-service/requirements.txt index 18b85b3..5e5cdc1 100644 --- a/file-analyzer/api-service/requirements.txt +++ b/file-analyzer/api-service/requirements.txt @@ -1,4 +1,4 @@ -render-sdk>=0.1.0 +render-sdk>=0.2.0 fastapi>=0.110.0 uvicorn[standard]>=0.27.0 python-multipart>=0.0.9 diff --git a/file-analyzer/workflow-service/main.py b/file-analyzer/workflow-service/main.py index a0efb6e..2c24635 100644 --- a/file-analyzer/workflow-service/main.py +++ b/file-analyzer/workflow-service/main.py @@ -15,9 +15,9 @@ import logging import csv import io -from typing import Any from datetime import datetime -from render_sdk.workflows import start, task, Options, Retry + +from render_sdk import Retry, Workflows # Configure logging logging.basicConfig( @@ -26,8 +26,15 @@ ) logger = logging.getLogger(__name__) +# Initialize Workflows app with defaults +app = Workflows( + default_retry=Retry(max_retries=2, wait_duration_ms=1000, backoff_scaling=1.5), + default_timeout=300, + auto_start=True, +) + -@task(options=Options(retry=Retry(max_retries=2, wait_duration_ms=1000))) +@app.task def parse_csv_data(file_content: str) -> dict: """ Parse CSV file content into structured data. @@ -78,7 +85,7 @@ def parse_csv_data(file_content: str) -> dict: } -@task +@app.task def calculate_statistics(data: dict) -> dict: """ Calculate statistical metrics from parsed data. @@ -141,7 +148,7 @@ def calculate_statistics(data: dict) -> dict: } -@task +@app.task def identify_trends(data: dict) -> dict: """ Identify trends and patterns in the data. @@ -199,7 +206,7 @@ def identify_trends(data: dict) -> dict: } -@task +@app.task async def generate_insights(stats: dict, trends: dict, metadata: dict) -> dict: """ Generate final insights report combining statistics and trends. @@ -254,7 +261,7 @@ async def generate_insights(stats: dict, trends: dict, metadata: dict) -> dict: return insights -@task +@app.task async def analyze_file(file_content: str) -> dict: """ Main orchestrator task for file analysis. @@ -319,10 +326,3 @@ async def analyze_file(file_content: str) -> dict: "insights": insights, "completed_at": datetime.now().isoformat() } - - -if __name__ == "__main__": - logger.info("Starting File Analyzer Workflow Service") - logger.info("This service defines tasks that can be called via the Client SDK") - logger.info("Tasks available: parse_csv_data, calculate_statistics, identify_trends, generate_insights, analyze_file") - start() diff --git a/file-analyzer/workflow-service/requirements.txt b/file-analyzer/workflow-service/requirements.txt index 40ca944..2a52138 100644 --- a/file-analyzer/workflow-service/requirements.txt +++ b/file-analyzer/workflow-service/requirements.txt @@ -1 +1 @@ -render-sdk>=0.1.0 +render-sdk>=0.2.0 diff --git a/file-processing/README.md b/file-processing/README.md index 4e6c8bb..48674e1 100644 --- a/file-processing/README.md +++ b/file-processing/README.md @@ -183,15 +183,15 @@ Note: You'll need to copy the actual result from `process_file_batch` to test th Once deployed, trigger file processing via the Render API or SDK: ```python -from render_sdk.client import Client +from render_sdk import Render -client = Client(api_key="your_render_api_key") +# Uses RENDER_API_KEY environment variable automatically +render = Render() # Process a batch of files -task_run = client.workflows.run_task( - workflow_service_slug="file-processing-workflows", - task_name="process_file_batch", - input={ +task_run = await render.workflows.run_task( + "file-processing-workflows/process_file_batch", + { "file_paths": [ "sample_files/sales_data.csv", "sample_files/config.json", @@ -201,17 +201,16 @@ task_run = client.workflows.run_task( ) result = await task_run -print(f"Processed {result['successful']}/{result['total_files']} files") +print(f"Processed {result.results['successful']}/{result.results['total_files']} files") # Generate consolidated report -report_run = client.workflows.run_task( - workflow_service_slug="file-processing-workflows", - task_name="generate_consolidated_report", - input={"batch_result": result} +report_run = await render.workflows.run_task( + "file-processing-workflows/generate_consolidated_report", + {"batch_result": result.results} ) report = await report_run -print(f"Report: {report['summary']}") +print(f"Report: {report.results['summary']}") ``` ## Sample Files @@ -261,7 +260,7 @@ The example includes sample files in `sample_files/`: The key to efficient batch processing is using `asyncio.gather()`: ```python -@task +@app.task async def process_file_batch(file_paths: list[str]) -> dict: # Launch all file processing tasks concurrently tasks = [process_single_file(fp) for fp in file_paths] @@ -277,13 +276,13 @@ This processes all files simultaneously rather than sequentially, dramatically r **Add New File Format**: ```python -@task +@app.task def read_xml_file(file_path: str) -> dict: # Parse XML file # Return structured data pass -@task +@app.task def analyze_xml_data(xml_result: dict) -> dict: # Analyze XML content # Return insights @@ -294,14 +293,14 @@ def analyze_xml_data(xml_result: dict) -> dict: **Add Cloud Storage Integration**: ```python -@task +@app.task async def download_from_s3(bucket: str, key: str) -> str: # Download file from S3 # Save to temp location # Return local path pass -@task +@app.task async def process_s3_batch(bucket: str, keys: list[str]) -> dict: # Download files in parallel paths = await asyncio.gather(*[download_from_s3(bucket, k) for k in keys]) @@ -311,7 +310,7 @@ async def process_s3_batch(bucket: str, keys: list[str]) -> dict: **Add Database Export**: ```python -@task +@app.task async def export_to_database(report: dict) -> dict: # Connect to database # Insert report data diff --git a/file-processing/main.py b/file-processing/main.py index 0e7a7bc..6f5adf3 100644 --- a/file-processing/main.py +++ b/file-processing/main.py @@ -19,9 +19,8 @@ import logging from datetime import datetime from pathlib import Path -from typing import Any -from render_sdk.workflows import Options, Retry, start, task +from render_sdk import Retry, Workflows # Configure logging logging.basicConfig( @@ -30,12 +29,19 @@ ) logger = logging.getLogger(__name__) +# Initialize Workflows app with defaults +app = Workflows( + default_retry=Retry(max_retries=3, wait_duration_ms=1000, backoff_scaling=1.5), + default_timeout=300, + auto_start=True, +) + # ============================================================================ # File Reading Tasks # ============================================================================ -@task(options=Options(retry=Retry(max_retries=3, wait_duration_ms=1000))) +@app.task def read_csv_file(file_path: str) -> dict: """ Read and parse a CSV file. @@ -83,7 +89,7 @@ def read_csv_file(file_path: str) -> dict: } -@task(options=Options(retry=Retry(max_retries=3, wait_duration_ms=1000))) +@app.task def read_json_file(file_path: str) -> dict: """ Read and parse a JSON file. @@ -109,7 +115,7 @@ def read_json_file(file_path: str) -> dict: with open(path, 'r', encoding='utf-8') as f: data = json.load(f) - logger.info(f"[JSON] Successfully parsed JSON") + logger.info("[JSON] Successfully parsed JSON") return { "success": True, @@ -128,7 +134,7 @@ def read_json_file(file_path: str) -> dict: } -@task(options=Options(retry=Retry(max_retries=3, wait_duration_ms=1000))) +@app.task def read_text_file(file_path: str) -> dict: """ Read and analyze a text file. @@ -183,7 +189,7 @@ def read_text_file(file_path: str) -> dict: # Analysis Tasks # ============================================================================ -@task +@app.task def analyze_csv_data(csv_result: dict) -> dict: """ Analyze CSV data and extract insights. @@ -241,7 +247,7 @@ def analyze_csv_data(csv_result: dict) -> dict: return analysis -@task +@app.task def analyze_json_structure(json_result: dict) -> dict: """ Analyze JSON structure and extract metadata. @@ -284,7 +290,7 @@ def count_keys(obj, depth=0): return analysis -@task +@app.task def analyze_text_content(text_result: dict) -> dict: """ Analyze text content for insights. @@ -339,7 +345,7 @@ def analyze_text_content(text_result: dict) -> dict: # Orchestration Tasks # ============================================================================ -@task +@app.task async def process_single_file(file_path: str) -> dict: """ Process a single file based on its extension. @@ -392,7 +398,7 @@ async def process_single_file(file_path: str) -> dict: } -@task +@app.task async def process_file_batch(*file_paths: str) -> dict: """ Process multiple files in parallel. @@ -432,25 +438,25 @@ async def process_file_batch(*file_paths: str) -> dict: file_types[file_type] = file_types.get(file_type, 0) + 1 batch_result = { - "total_files": len(file_paths), + "total_files": len(file_paths_list), "successful": len(successful), "failed": len(failed), - "success_rate": len(successful) / len(file_paths) if file_paths else 0, + "success_rate": len(successful) / len(file_paths_list) if file_paths_list else 0, "file_types": file_types, "results": results, "processed_at": datetime.now().isoformat() } logger.info("=" * 80) - logger.info(f"[BATCH] Batch processing complete!") - logger.info(f"[BATCH] Successful: {len(successful)}/{len(file_paths)}") + logger.info("[BATCH] Batch processing complete!") + logger.info(f"[BATCH] Successful: {len(successful)}/{len(file_paths_list)}") logger.info(f"[BATCH] File types: {file_types}") logger.info("=" * 80) return batch_result -@task +@app.task async def generate_consolidated_report(batch_result: dict) -> dict: """ Generate a consolidated report from batch processing results. @@ -506,8 +512,3 @@ async def generate_consolidated_report(batch_result: dict) -> dict: f"JSON keys: {total_json_keys}") return report - - -if __name__ == "__main__": - logger.info("Starting File Processing Workflow Service") - start() diff --git a/file-processing/requirements.txt b/file-processing/requirements.txt index 40ca944..2a52138 100644 --- a/file-processing/requirements.txt +++ b/file-processing/requirements.txt @@ -1 +1 @@ -render-sdk>=0.1.0 +render-sdk>=0.2.0 diff --git a/hello-world/README.md b/hello-world/README.md index a2384f1..4fc6002 100644 --- a/hello-world/README.md +++ b/hello-world/README.md @@ -5,7 +5,7 @@ The simplest possible workflow example to help you understand the basics of Rend ## What You'll Learn This example teaches the fundamental concepts: -- **What is a Task?** - A function decorated with `@task` that can be executed as a workflow +- **What is a Task?** - A function decorated with `@app.task` that can be executed as a workflow - **What is a Subtask?** - A task called by another task using `await` - **How to Orchestrate** - Combining multiple tasks to create workflows - **How to Deploy** - Getting your first workflow running on Render @@ -31,10 +31,14 @@ calculate_and_process (multi-step orchestrator) ### What is a Task? -A **task** is simply a Python function decorated with `@task`. It becomes a workflow step that Render can execute: +A **task** is simply a Python function decorated with `@app.task`. It becomes a workflow step that Render can execute: ```python -@task +from render_sdk import Workflows + +app = Workflows(auto_start=True) + +@app.task def double(x: int) -> int: """A simple task that doubles a number""" return x * 2 @@ -45,7 +49,7 @@ def double(x: int) -> int: A **subtask** is when one task calls another task using `await`. This is how you compose workflows: ```python -@task +@app.task async def add_doubled_numbers(a: int, b: int) -> dict: # Call 'double' as a subtask using await doubled_a = await double(a) # ← This is a subtask call! @@ -229,25 +233,26 @@ This is the most complex example - it calls `add_doubled_numbers` and `process_n Once deployed, trigger workflows via the Render Client SDK: ```python -from render_sdk.client import Client +from render_sdk import Render -client = Client(api_key="your_render_api_key") +# Uses RENDER_API_KEY environment variable automatically +render = Render() # Call the simple double task -task_run = client.workflows.run_task( +task_run = await render.workflows.run_task( "hello-world-workflows/double", - [5] # x=5 + {"x": 5} ) result = await task_run -print(f"Result: {result}") # Output: 10 +print(f"Result: {result.results}") # Output: 10 # Call the subtask orchestration example -task_run = client.workflows.run_task( +task_run = await render.workflows.run_task( "hello-world-workflows/add_doubled_numbers", - [3, 4] # a=3, b=4 + {"a": 3, "b": 4} ) result = await task_run -print(f"Sum of doubled: {result['sum_of_doubled']}") # Output: 14 +print(f"Sum of doubled: {result.results['sum_of_doubled']}") # Output: 14 ``` ## Tasks Explained @@ -319,14 +324,16 @@ step2 = await process_numbers(numbers) # ← Second subtask ## Key Concepts -### The `@task` Decorator +### The `@app.task` Decorator -Every workflow function needs the `@task` decorator: +Every workflow function needs the `@app.task` decorator: ```python -from render_sdk.workflows import task +from render_sdk import Workflows + +app = Workflows(auto_start=True) -@task +@app.task def my_task(): return "Hello World" ``` @@ -336,7 +343,7 @@ def my_task(): Tasks that call other tasks as subtasks must be `async`: ```python -@task +@app.task async def orchestrator(): result = await subtask() # ← Calls another task return result @@ -354,7 +361,7 @@ Without `await`, you're just calling a regular Python function! ### Task Registration -When you run `start()`, all `@task` decorated functions are automatically registered and become available as workflow tasks. +When you use `Workflows(auto_start=True)`, all `@app.task` decorated functions are automatically registered and become available as workflow tasks. ## Common Patterns @@ -363,7 +370,7 @@ When you run `start()`, all `@task` decorated functions are automatically regist Execute subtasks one after another: ```python -@task +@app.task async def sequential(): step1 = await task_a() step2 = await task_b(step1) # Uses result from step1 @@ -376,7 +383,7 @@ async def sequential(): Execute subtasks where order doesn't matter: ```python -@task +@app.task async def independent(): result_a = await task_a() result_b = await task_b() @@ -388,7 +395,7 @@ async def independent(): Process a list by calling a subtask for each item: ```python -@task +@app.task async def batch_process(items: list): results = [] for item in items: @@ -402,15 +409,15 @@ async def batch_process(items: list): Subtasks can call other subtasks: ```python -@task +@app.task async def level_1(): return await level_2() -@task +@app.task async def level_2(): return await level_3() -@task +@app.task def level_3(): return "Done!" ``` @@ -437,7 +444,7 @@ Make sure: ### Import errors Make sure: -- `requirements.txt` includes `render-sdk>=0.1.0` +- `requirements.txt` includes `render-sdk>=0.2.0` - Build command is running correctly - Python version is 3.10 or higher @@ -446,7 +453,7 @@ Make sure: Make sure: - Your task function is marked `async` - You're using `await` before the task call -- Both tasks are decorated with `@task` +- Both tasks are decorated with `@app.task` ## Important Notes diff --git a/hello-world/main.py b/hello-world/main.py index 7fb234d..77378fb 100644 --- a/hello-world/main.py +++ b/hello-world/main.py @@ -3,7 +3,7 @@ This is the simplest possible workflow example to help you understand the basics. It demonstrates: -- How to define a task using the @task decorator +- How to define a task using the @app.task decorator - How to call a task as a subtask using await - How to orchestrate multiple subtask calls @@ -11,7 +11,8 @@ """ import logging -from render_sdk.workflows import start, task + +from render_sdk import Workflows # Configure logging to see what's happening logging.basicConfig( @@ -20,12 +21,15 @@ ) logger = logging.getLogger(__name__) +# Initialize Workflows app with auto_start enabled +app = Workflows(auto_start=True) + # ============================================================================ # BASIC TASK - The building block # ============================================================================ -@task +@app.task def double(x: int) -> int: """ A basic task that doubles a number. @@ -50,7 +54,7 @@ def double(x: int) -> int: elif not isinstance(x, int): logger.error(f"[TASK] Invalid input type: {type(x)}, value: {x}") raise ValueError(f"Expected integer, got: {type(x)}") - + logger.info(f"[TASK] Doubling {x}") result = x * 2 logger.info(f"[TASK] Result: {result}") @@ -61,7 +65,7 @@ def double(x: int) -> int: # SUBTASK CALLING - Tasks calling other tasks # ============================================================================ -@task +@app.task async def add_doubled_numbers(*args: int) -> dict: """ Demonstrates calling a task as a subtask. @@ -111,7 +115,7 @@ async def add_doubled_numbers(*args: int) -> dict: # SUBTASK IN A LOOP - Processing multiple items # ============================================================================ -@task +@app.task async def process_numbers(*numbers: int) -> dict: """ Demonstrates calling a subtask in a loop. @@ -157,7 +161,7 @@ async def process_numbers(*numbers: int) -> dict: # MULTI-STEP WORKFLOW - Chaining multiple subtasks # ============================================================================ -@task +@app.task async def calculate_and_process(a: int, b: int, *more_numbers: int) -> dict: """ Demonstrates a multi-step workflow that chains multiple subtasks. @@ -179,18 +183,18 @@ async def calculate_and_process(a: int, b: int, *more_numbers: int) -> dict: # Convert to list for easier handling more_numbers_list = list(more_numbers) - logger.info(f"[WORKFLOW] Starting multi-step workflow") + logger.info("[WORKFLOW] Starting multi-step workflow") # STEP 1: Add two doubled numbers - logger.info(f"[WORKFLOW] Step 1: Adding doubled numbers") + logger.info("[WORKFLOW] Step 1: Adding doubled numbers") step1_result = await add_doubled_numbers(a, b) # STEP 2: Process a list of numbers - logger.info(f"[WORKFLOW] Step 2: Processing number list") + logger.info("[WORKFLOW] Step 2: Processing number list") step2_result = await process_numbers(*more_numbers_list) # STEP 3: Combine the results - logger.info(f"[WORKFLOW] Step 3: Combining results") + logger.info("[WORKFLOW] Step 3: Combining results") final_result = { "step1_sum": step1_result["sum_of_doubled"], "step2_doubled": step2_result["doubled_numbers"], @@ -198,40 +202,5 @@ async def calculate_and_process(a: int, b: int, *more_numbers: int) -> dict: "summary": f"Added doubled {a} and {b}, then doubled {len(more_numbers_list)} more numbers" } - logger.info(f"[WORKFLOW] Multi-step workflow complete") + logger.info("[WORKFLOW] Multi-step workflow complete") return final_result - - -# ============================================================================ -# START THE WORKFLOW SERVICE -# ============================================================================ - -if __name__ == "__main__": - """ - This starts the workflow service and registers all tasks defined above. - - When you run 'python main.py', this will: - 1. Register all @task decorated functions - 2. Start the workflow service - 3. Wait for task executions from Render - - The tasks can then be called: - - Via the Render Dashboard (Manual Run) - - Via the Render API - - Via the Client SDK from another service - """ - logger.info("=" * 80) - logger.info("Starting Hello World Workflow Service") - logger.info("=" * 80) - logger.info("") - logger.info("Registered tasks:") - logger.info(" - double(x)") - logger.info(" - add_doubled_numbers(a, b)") - logger.info(" - process_numbers(numbers)") - logger.info(" - calculate_and_process(a, b, more_numbers)") - logger.info("") - logger.info("Ready to accept task executions!") - logger.info("=" * 80) - - # Start the workflow service - start() diff --git a/hello-world/requirements.txt b/hello-world/requirements.txt index 40ca944..2a52138 100644 --- a/hello-world/requirements.txt +++ b/hello-world/requirements.txt @@ -1 +1 @@ -render-sdk>=0.1.0 +render-sdk>=0.2.0 diff --git a/openai-agent/README.md b/openai-agent/README.md index ebe4a78..b52f75c 100644 --- a/openai-agent/README.md +++ b/openai-agent/README.md @@ -197,28 +197,27 @@ This will process all messages in sequence, maintaining context between turns. Once deployed, interact with the agent via the Render API or SDK: ```python -from render_sdk.client import Client +from render_sdk import Render -client = Client(api_key="your_render_api_key") +# Uses RENDER_API_KEY environment variable automatically +render = Render() # Single turn conversation -task_run = client.workflows.run_task( - workflow_service_slug="openai-agent-workflows", - task_name="agent_turn", - input={ +task_run = await render.workflows.run_task( + "openai-agent-workflows/agent_turn", + { "user_message": "What is the status of order ORD-001?", "conversation_history": [] } ) result = await task_run -print(f"Agent: {result['response']}") +print(f"Agent: {result.results['response']}") # Multi-turn conversation -task_run = client.workflows.run_task( - workflow_service_slug="openai-agent-workflows", - task_name="multi_turn_conversation", - input={ +task_run = await render.workflows.run_task( + "openai-agent-workflows/multi_turn_conversation", + { "messages": [ "What is your return policy?", "What is the status of order ORD-001?", @@ -228,7 +227,7 @@ task_run = client.workflows.run_task( ) result = await task_run -for turn in result['turns']: +for turn in result.results['turns']: print(f"User: {turn['user']}") print(f"Agent: {turn['assistant']}") print(f"Tools used: {[t['tool'] for t in turn['tool_calls']]}") @@ -274,7 +273,7 @@ Searches the knowledge base for information. **`execute_tool`**: Dynamically executes a tool as a subtask based on the agent's decision: ```python -@task +@app.task async def execute_tool(tool_name: str, arguments: dict) -> dict: # Map tool names to tasks tool_map = { @@ -310,7 +309,7 @@ To add a new tool capability: 1. **Define the tool function**: ```python -@task +@app.task def new_tool(param: str) -> dict: """Tool: Description of what this tool does.""" # Implementation diff --git a/openai-agent/main.py b/openai-agent/main.py index a68ded2..a2154d7 100644 --- a/openai-agent/main.py +++ b/openai-agent/main.py @@ -17,9 +17,8 @@ import logging import os from datetime import datetime -from typing import Any -from render_sdk.workflows import Options, Retry, start, task +from render_sdk import Retry, Workflows # Configure logging logging.basicConfig( @@ -64,11 +63,19 @@ def get_openai_client(): return _openai_client +# Initialize Workflows app with defaults +app = Workflows( + default_retry=Retry(max_retries=3, wait_duration_ms=2000, backoff_scaling=2.0), + default_timeout=300, + auto_start=True, +) + + # ============================================================================ # Tool Functions - Actions the agent can perform # ============================================================================ -@task +@app.task def get_order_status(order_id: str) -> dict: """ Tool: Look up order status. @@ -107,7 +114,7 @@ def get_order_status(order_id: str) -> dict: } -@task +@app.task def process_refund(order_id: str, reason: str) -> dict: """ Tool: Process a refund for an order. @@ -140,7 +147,7 @@ def process_refund(order_id: str, reason: str) -> dict: return result -@task +@app.task def search_knowledge_base(query: str) -> dict: """ Tool: Search the knowledge base for information. @@ -193,7 +200,7 @@ def search_knowledge_base(query: str) -> dict: # Agent Tasks # ============================================================================ -@task(options=Options(retry=Retry(max_retries=3, wait_duration_ms=2000, factor=2.0))) +@app.task async def call_llm_with_tools( messages: list[dict], tools: list[dict], @@ -252,7 +259,7 @@ async def call_llm_with_tools( raise -@task +@app.task async def execute_tool(tool_name: str, arguments: dict) -> dict: """ Execute a tool function by name. @@ -305,7 +312,7 @@ async def execute_tool(tool_name: str, arguments: dict) -> dict: return {"error": str(e)} -@task +@app.task async def agent_turn( user_message: str, conversation_history: list[dict] = None @@ -475,7 +482,7 @@ async def agent_turn( } -@task +@app.task async def multi_turn_conversation(*messages: str) -> dict: """ Run a multi-turn conversation with the agent. @@ -523,8 +530,3 @@ async def multi_turn_conversation(*messages: str) -> dict: "total_turns": len(responses), "conversation_history": conversation_history } - - -if __name__ == "__main__": - logger.info("Starting OpenAI Agent Workflow Service") - start() diff --git a/openai-agent/requirements.txt b/openai-agent/requirements.txt index 4342dcd..3ea8c21 100644 --- a/openai-agent/requirements.txt +++ b/openai-agent/requirements.txt @@ -1,2 +1,2 @@ -render-sdk>=0.1.0 +render-sdk>=0.2.0 openai>=1.0.0 From 73b88cc68ed568701bba1003b5f9dfffa299757c Mon Sep 17 00:00:00 2001 From: R4ph-t Date: Thu, 5 Feb 2026 19:44:01 +0100 Subject: [PATCH 2/3] Refactor OpenAI client initialization to create a new instance each time, addressing atexit registration issues. --- openai-agent/main.py | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/openai-agent/main.py b/openai-agent/main.py index a2154d7..b0bfec1 100644 --- a/openai-agent/main.py +++ b/openai-agent/main.py @@ -28,7 +28,6 @@ logger = logging.getLogger(__name__) # OpenAI client initialization -_openai_client = None _openai_import_error = None try: @@ -40,27 +39,25 @@ ) -def get_openai_client(): - """Get or initialize the OpenAI client.""" - global _openai_client - +def create_openai_client() -> "AsyncOpenAI": + """Create a new OpenAI client instance. + + Creates a fresh client each time to avoid atexit registration issues + that can occur with global async clients in workflow environments. + """ if _openai_import_error: raise ImportError( "OpenAI package not installed. Install with: pip install openai" ) from _openai_import_error - if _openai_client is None: - api_key = os.getenv("OPENAI_API_KEY") - if not api_key: - raise ValueError( - "OPENAI_API_KEY environment variable not set. " - "Please set it in your Render environment variables." - ) - - _openai_client = AsyncOpenAI(api_key=api_key) - logger.info("OpenAI client initialized successfully") + api_key = os.getenv("OPENAI_API_KEY") + if not api_key: + raise ValueError( + "OPENAI_API_KEY environment variable not set. " + "Please set it in your Render environment variables." + ) - return _openai_client + return AsyncOpenAI(api_key=api_key) # Initialize Workflows app with defaults @@ -222,7 +219,7 @@ async def call_llm_with_tools( """ logger.info(f"[AGENT] Calling {model} with {len(tools)} tools available") - client = get_openai_client() + client = create_openai_client() try: response = await client.chat.completions.create( @@ -257,6 +254,8 @@ async def call_llm_with_tools( except Exception as e: logger.error(f"[AGENT] LLM call failed: {e}") raise + finally: + await client.close() @app.task From 715d1d058d25fc36e0aa0b695054f2bd8b2d8a4e Mon Sep 17 00:00:00 2001 From: R4ph-t Date: Thu, 5 Feb 2026 21:57:46 +0100 Subject: [PATCH 3/3] explicit app.start for openai --- openai-agent/main.py | 198 +++++++++++++++++++++---------------------- 1 file changed, 96 insertions(+), 102 deletions(-) diff --git a/openai-agent/main.py b/openai-agent/main.py index b0bfec1..fdc0faa 100644 --- a/openai-agent/main.py +++ b/openai-agent/main.py @@ -22,8 +22,7 @@ # Configure logging logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) @@ -34,14 +33,12 @@ from openai import AsyncOpenAI except ImportError as e: _openai_import_error = e - logger.warning( - "OpenAI package not installed. Install with: pip install openai" - ) + logger.warning("OpenAI package not installed. Install with: pip install openai") def create_openai_client() -> "AsyncOpenAI": """Create a new OpenAI client instance. - + Creates a fresh client each time to avoid atexit registration issues that can occur with global async clients in workflow environments. """ @@ -64,7 +61,6 @@ def create_openai_client() -> "AsyncOpenAI": app = Workflows( default_retry=Retry(max_retries=3, wait_duration_ms=2000, backoff_scaling=2.0), default_timeout=300, - auto_start=True, ) @@ -72,6 +68,7 @@ def create_openai_client() -> "AsyncOpenAI": # Tool Functions - Actions the agent can perform # ============================================================================ + @app.task def get_order_status(order_id: str) -> dict: """ @@ -89,26 +86,26 @@ def get_order_status(order_id: str) -> dict: # Simulated database lookup mock_orders = { - "ORD-001": {"status": "shipped", "tracking": "1Z999AA1234567890", "eta": "2024-10-15"}, + "ORD-001": { + "status": "shipped", + "tracking": "1Z999AA1234567890", + "eta": "2024-10-15", + }, "ORD-002": {"status": "processing", "tracking": None, "eta": "2024-10-12"}, - "ORD-003": {"status": "delivered", "tracking": "1Z999AA9876543210", "eta": "2024-10-08"}, + "ORD-003": { + "status": "delivered", + "tracking": "1Z999AA9876543210", + "eta": "2024-10-08", + }, } if order_id in mock_orders: result = mock_orders[order_id] logger.info(f"[TOOL] Order {order_id} found: {result['status']}") - return { - "success": True, - "order_id": order_id, - **result - } + return {"success": True, "order_id": order_id, **result} else: logger.warning(f"[TOOL] Order {order_id} not found") - return { - "success": False, - "order_id": order_id, - "error": "Order not found" - } + return {"success": False, "order_id": order_id, "error": "Order not found"} @app.task @@ -137,7 +134,7 @@ def process_refund(order_id: str, reason: str) -> dict: "order_id": order_id, "reason": reason, "amount": 99.99, # Mock amount - "processed_at": datetime.now().isoformat() + "processed_at": datetime.now().isoformat(), } logger.info(f"[TOOL] Refund processed: {refund_id}") @@ -163,16 +160,16 @@ def search_knowledge_base(query: str) -> dict: knowledge = { "shipping": { "title": "Shipping Policy", - "content": "We offer free shipping on orders over $50. Standard shipping takes 3-5 business days. Express shipping is available for $15 and takes 1-2 business days." + "content": "We offer free shipping on orders over $50. Standard shipping takes 3-5 business days. Express shipping is available for $15 and takes 1-2 business days.", }, "returns": { "title": "Return Policy", - "content": "We accept returns within 30 days of purchase. Items must be unused and in original packaging. Refunds are processed within 5-7 business days." + "content": "We accept returns within 30 days of purchase. Items must be unused and in original packaging. Refunds are processed within 5-7 business days.", }, "warranty": { "title": "Warranty Information", - "content": "All products come with a 1-year manufacturer warranty. Extended warranties are available for purchase." - } + "content": "All products come with a 1-year manufacturer warranty. Extended warranties are available for purchase.", + }, } # Simple keyword matching @@ -180,28 +177,24 @@ def search_knowledge_base(query: str) -> dict: matches = [] for key, article in knowledge.items(): - if key in query_lower or any(word in article['content'].lower() for word in query_lower.split()): + if key in query_lower or any( + word in article["content"].lower() for word in query_lower.split() + ): matches.append(article) logger.info(f"[TOOL] Found {len(matches)} knowledge base articles") - return { - "success": True, - "query": query, - "results": matches, - "count": len(matches) - } + return {"success": True, "query": query, "results": matches, "count": len(matches)} # ============================================================================ # Agent Tasks # ============================================================================ + @app.task async def call_llm_with_tools( - messages: list[dict], - tools: list[dict], - model: str = "gpt-4" + messages: list[dict], tools: list[dict], model: str = "gpt-4" ) -> dict: """ Call OpenAI with function/tool definitions. @@ -223,17 +216,11 @@ async def call_llm_with_tools( try: response = await client.chat.completions.create( - model=model, - messages=messages, - tools=tools, - tool_choice="auto" + model=model, messages=messages, tools=tools, tool_choice="auto" ) message = response.choices[0].message - result = { - "content": message.content, - "tool_calls": [] - } + result = {"content": message.content, "tool_calls": []} if message.tool_calls: result["tool_calls"] = [ @@ -242,12 +229,14 @@ async def call_llm_with_tools( "type": "function", "function": { "name": tc.function.name, - "arguments": tc.function.arguments - } + "arguments": tc.function.arguments, + }, } for tc in message.tool_calls ] - logger.info(f"[AGENT] Model requested {len(result['tool_calls'])} tool calls") + logger.info( + f"[AGENT] Model requested {len(result['tool_calls'])} tool calls" + ) return result @@ -279,7 +268,7 @@ async def execute_tool(tool_name: str, arguments: dict) -> dict: tool_map = { "get_order_status": get_order_status, "process_refund": process_refund, - "search_knowledge_base": search_knowledge_base + "search_knowledge_base": search_knowledge_base, } if tool_name not in tool_map: @@ -295,8 +284,7 @@ async def execute_tool(tool_name: str, arguments: dict) -> dict: result = await tool_function(arguments.get("order_id")) elif tool_name == "process_refund": result = await tool_function( - arguments.get("order_id"), - arguments.get("reason") + arguments.get("order_id"), arguments.get("reason") ) elif tool_name == "search_knowledge_base": result = await tool_function(arguments.get("query")) @@ -313,8 +301,7 @@ async def execute_tool(tool_name: str, arguments: dict) -> dict: @app.task async def agent_turn( - user_message: str, - conversation_history: list[dict] = None + user_message: str, conversation_history: list[dict] = None ) -> dict: """ Execute a single agent turn with tool calling capability. @@ -332,16 +319,18 @@ async def agent_turn( Dictionary with agent response and updated history """ logger.info("[AGENT TURN] Starting agent turn") - + # Handle case where user_message might be a slice object or other type if isinstance(user_message, str): logger.info(f"[AGENT TURN] User message: {user_message[:100]}...") else: - logger.error(f"[AGENT TURN] Invalid user_message type: {type(user_message)}, value: {user_message}") + logger.error( + f"[AGENT TURN] Invalid user_message type: {type(user_message)}, value: {user_message}" + ) return { "success": False, "error": f"user_message must be a string, got {type(user_message)}", - "response": "I'm sorry, there was an error processing your message. Please try again." + "response": "I'm sorry, there was an error processing your message. Please try again.", } if conversation_history is None: @@ -359,12 +348,12 @@ async def agent_turn( "properties": { "order_id": { "type": "string", - "description": "The order ID (e.g., ORD-001)" + "description": "The order ID (e.g., ORD-001)", } }, - "required": ["order_id"] - } - } + "required": ["order_id"], + }, + }, }, { "type": "function", @@ -376,16 +365,16 @@ async def agent_turn( "properties": { "order_id": { "type": "string", - "description": "The order ID to refund" + "description": "The order ID to refund", }, "reason": { "type": "string", - "description": "Reason for the refund" - } + "description": "Reason for the refund", + }, }, - "required": ["order_id", "reason"] - } - } + "required": ["order_id", "reason"], + }, + }, }, { "type": "function", @@ -395,15 +384,12 @@ async def agent_turn( "parameters": { "type": "object", "properties": { - "query": { - "type": "string", - "description": "The search query" - } + "query": {"type": "string", "description": "The search query"} }, - "required": ["query"] - } - } - } + "required": ["query"], + }, + }, + }, ] # System prompt @@ -414,13 +400,15 @@ async def agent_turn( "status, process refunds, and search the knowledge base for information. " "Be polite, professional, and helpful. Use tools when necessary to " "assist the customer." - ) + ), } # Build messages - messages = [system_message] + conversation_history + [ - {"role": "user", "content": user_message} - ] + messages = ( + [system_message] + + conversation_history + + [{"role": "user", "content": user_message}] + ) # Call LLM llm_response = await call_llm_with_tools(messages, tools) @@ -430,11 +418,12 @@ async def agent_turn( logger.info("[AGENT TURN] No tool calls, returning response") return { "response": llm_response["content"], - "conversation_history": conversation_history + [ + "conversation_history": conversation_history + + [ {"role": "user", "content": user_message}, - {"role": "assistant", "content": llm_response["content"]} + {"role": "assistant", "content": llm_response["content"]}, ], - "tool_calls": [] + "tool_calls": [], } # Execute tool calls @@ -444,27 +433,24 @@ async def agent_turn( for tool_call in llm_response["tool_calls"]: result = await execute_tool( tool_call["function"]["name"], - json.loads(tool_call["function"]["arguments"]) + json.loads(tool_call["function"]["arguments"]), ) - tool_results.append({ - "tool": tool_call["function"]["name"], - "result": result - }) + tool_results.append({"tool": tool_call["function"]["name"], "result": result}) # Format tool results for LLM tool_messages = [ - { - "role": "tool", - "tool_call_id": tc["id"], - "content": json.dumps(tr["result"]) - } + {"role": "tool", "tool_call_id": tc["id"], "content": json.dumps(tr["result"])} for tc, tr in zip(llm_response["tool_calls"], tool_results) ] # Get final response from LLM with tool results final_messages = messages + [ - {"role": "assistant", "content": llm_response.get("content"), "tool_calls": llm_response["tool_calls"]}, - *tool_messages + { + "role": "assistant", + "content": llm_response.get("content"), + "tool_calls": llm_response["tool_calls"], + }, + *tool_messages, ] final_response = await call_llm_with_tools(final_messages, tools) @@ -473,11 +459,12 @@ async def agent_turn( return { "response": final_response["content"], - "conversation_history": conversation_history + [ + "conversation_history": conversation_history + + [ {"role": "user", "content": user_message}, - {"role": "assistant", "content": final_response["content"]} + {"role": "assistant", "content": final_response["content"]}, ], - "tool_calls": tool_results + "tool_calls": tool_results, } @@ -497,9 +484,11 @@ async def multi_turn_conversation(*messages: str) -> dict: """ # Convert to list for easier handling messages_list = list(messages) - + logger.info("=" * 80) - logger.info(f"[CONVERSATION] Starting multi-turn conversation with {len(messages_list)} messages") + logger.info( + f"[CONVERSATION] Starting multi-turn conversation with {len(messages_list)} messages" + ) logger.info("=" * 80) conversation_history = [] @@ -510,12 +499,14 @@ async def multi_turn_conversation(*messages: str) -> dict: turn_result = await agent_turn(user_message, conversation_history) - responses.append({ - "turn": i, - "user": user_message, - "assistant": turn_result["response"], - "tool_calls": turn_result.get("tool_calls", []) - }) + responses.append( + { + "turn": i, + "user": user_message, + "assistant": turn_result["response"], + "tool_calls": turn_result.get("tool_calls", []), + } + ) conversation_history = turn_result["conversation_history"] @@ -527,5 +518,8 @@ async def multi_turn_conversation(*messages: str) -> dict: return { "turns": responses, "total_turns": len(responses), - "conversation_history": conversation_history + "conversation_history": conversation_history, } + + +app.start()