Skip to content

Async execution#245

Merged
vijayvammi merged 49 commits intomainfrom
async-execution
Jan 5, 2026
Merged

Async execution#245
vijayvammi merged 49 commits intomainfrom
async-execution

Conversation

@vijayvammi
Copy link
Collaborator

No description provided.

vijayvammi and others added 30 commits December 30, 2025 23:13
Design document for Phase 1 telemetry implementation:
- Pipeline and task-level spans via logfire-api
- StreamingSpanProcessor for FastAPI SSE real-time streaming
- Self-hosted OpenTelemetry backend support
- Zero-dependency shim (no-ops without logfire installed)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
12 bite-sized tasks with TDD approach:
- Add logfire-api dependency
- Create telemetry module with helpers
- Add StreamingSpanProcessor
- Instrument PipelineContext, JobContext
- Instrument all task types (Python, Notebook, Shell)
- FastAPI streaming example

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add Task 11: Local telemetry test with console output
- Mark Task 12 (FastAPI) as placeholder pending logfire integration docs
- Add task order table to summary

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Use logfire.instrument_fastapi(app) for HTTP request instrumentation
- Pipeline/task spans become children of HTTP request via OTEL context
- Keep StreamingSpanProcessor for real-time SSE streaming to UI

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add logfire-api>=2.0.0 to core dependencies
- Add telemetry optional extras with logfire and opentelemetry

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- truncate_value() for serializing params with size limit
- set_stream_queue()/get_stream_queue() for SSE streaming
- Re-export logfire_api as logfire for convenience

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- OTEL_AVAILABLE flag detects opentelemetry availability
- StreamingSpanProcessor pushes span_start/span_end to queue
- Dual output: forwards to base processor AND streams to UI
- Gracefully no-ops when OTEL not installed

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Pipeline span with name, run_id, executor attributes
- logfire.info for started/completed events
- logfire.error for failure events

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Job span with job_name, run_id, executor attributes
- logfire.info for started/submitted events
- logfire.error for failure events

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add logfire import and truncate_value from telemetry module
- Add _safe_serialize_params() for defensive serialization
- Wrap execute_command in logfire.span with command as task name
- Log task started/completed with truncated inputs/outputs
- Log task failures with error message

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Move _safe_serialize_params to BaseTaskType for reuse
- Wrap NotebookTaskType.execute_command in logfire.span
- Wrap ShellTaskType.execute_command in logfire.span
- Log task started/completed with truncated inputs/outputs
- Log task failures with error messages

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Export set_stream_queue, get_stream_queue, truncate_value
- Export OTEL_AVAILABLE flag
- Conditionally export StreamingSpanProcessor when OTEL available

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Example shows console telemetry output with:
- Pipeline span wrapping execution
- Task spans with truncated inputs/outputs
- Timestamps and durations

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Example demonstrates:
- logfire.instrument_fastapi for HTTP span instrumentation
- SSE streaming of telemetry spans via set_stream_queue
- Pipeline execution in thread pool
- User parameters via RUNNABLE_PRM_* env vars

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add _emit_event() to task types for real-time SSE streaming
- Include inputs/outputs in task events for observability
- Auto-configure logfire from env vars at import time
  (RUNNABLE_TELEMETRY_CONSOLE, OTEL_EXPORTER_OTLP_ENDPOINT, LOGFIRE_TOKEN)
- Update FastAPI example with functools.partial pipeline registry
- Add comprehensive telemetry documentation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add 6 pipelines: example, hello, parameters, parallel, map, shell
- Add GET /pipelines endpoint to list available pipelines
- Improve _safe_serialize_params to truncate per-value not whole dict
- Handle ObjectParameter explicitly with "<object>" placeholder
- Update README with pipeline examples and curl commands

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Design for native async support in runnable:
- AsyncPythonTaskType for async functions
- AsyncLocalExecutor for async graph traversal
- AsyncPipelineContext with async execute()
- AsyncPythonTask/AsyncPipeline SDK classes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
18 bite-sized tasks with TDD approach:
- AsyncPythonTaskType task plugin
- AsyncLocalExecutor executor plugin
- AsyncPipelineContext context class
- AsyncPythonTask/AsyncPipeline SDK classes
- Tests and example

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Update AsyncPythonTaskType to support AsyncGenerator functions
- Add event_callback parameter for streaming task_chunk events
- Add execute_stream() method to AsyncPipeline for SSE
- Add TaskChunkEvent model for streaming chunks
- Add streaming LLM example with FastAPI SSE
- Update tests to cover streaming behavior
- Add architecture diagram showing dual flow pattern

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Resolved conflicts:
- runnable/context.py: logfire tracing + contextvars
- runnable/tasks.py: catalog.put uses get_run_context()
- argo-pipeline.yaml: used main's generated node IDs

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add explicit sync/async method signatures to base classes
- BasePipelineExecutor async methods raise NotImplementedError by default
- BaseTaskType.execute_command_async raises NotImplementedError by default
- Only LocalExecutor and AsyncPythonTaskType implement async methods
- Shared helpers minimize code duplication between sync/async paths
- Clear error messages when async called on unsupported components

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Rewrite plan with explicit method separation
- Base classes raise NotImplementedError for async by default
- LocalExecutor implements both sync and async paths
- PipelineContext gets execute_async() method
- Add shared helpers to minimize code duplication
- 19 tasks covering full implementation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add pytest-asyncio>=0.23.0 to dev dependencies
- Add async method stubs to BasePipelineExecutor:
  - execute_graph_async()
  - execute_from_graph_async()
  - trigger_node_execution_async()
  - _execute_node_async()
- Add execute_command_async() stub to BaseTaskType
- All stubs raise NotImplementedError by default

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add _prepare_node_for_execution() shared helper
- Add _finalize_graph_execution() shared helper
- Refactor execute_from_graph() to use shared helper
- Add async methods to LocalExecutor:
  - execute_graph_async()
  - execute_from_graph_async()
  - trigger_node_execution_async()
  - _execute_node_async()
- Add execute_async() to BaseNode (delegates to sync by default)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add execute_as_graph_async() stub to CompositeNode
- Add execute_async() to TaskNode with fallback to sync
- Add execute_as_graph_async() to ParallelNode

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add execute_as_graph_async() to MapNode
- Add execute_as_graph_async() to ConditionalNode
- Add _handle_completion() helper to PipelineContext
- Add execute_async() to PipelineContext

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add AsyncPipelineContext for simplified async execution
- Add AsyncPythonTask and AsyncPythonTaskType for async functions
- Add async examples (async_tasks.py, async_sequential.py)
- Add test_async_examples in test_pipeline_examples.py
- Update _context property to accept both context types
- Export async classes from runnable/__init__.py

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Design document for native async streaming from pipeline functions
to SSE clients, demonstrating token-by-token LLM response streaming.

Key decisions:
- Separate telemetry (infrastructure) from LLM streaming (domain)
- AsyncGenerator pass-through pattern for universal consumption
- Minimal implementation via callback → queue → AsyncGenerator wrapper

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
vijayvammi and others added 19 commits January 3, 2026 10:30
Implementation plan for execute_streaming() on AsyncPipeline with
event_callback stored on executor (not threaded through params).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add execute_streaming() method that returns an AsyncGenerator yielding
events from pipeline execution. This enables streaming patterns like SSE
by using asyncio.Queue to bridge executor callbacks to the AsyncGenerator
interface.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add stream_end_type field to AsyncPythonTask with default 'done'.
This allows users to customize which event type indicates end of stream
and contains return values, making the framework more flexible beyond
LLM-specific patterns.
AsyncGenerator functions should always yield event dicts with 'type'.
The stream_end_type event contains return values. Removed backwards
compatibility code for direct value yields to clarify the pattern.
- Import datetime and names module
- Generate unique run_id per request using get_random_name()
- Pass run_id to execute_streaming()
- Remove unused chat-and-summarize endpoint
@vijayvammi vijayvammi merged commit 31b1d57 into main Jan 5, 2026
1 check passed
@vijayvammi vijayvammi deleted the async-execution branch January 5, 2026 12:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant