Skip to content

Refactor branch logging#246

Merged
vijayvammi merged 87 commits intomainfrom
refactor-branch-logging
Jan 12, 2026
Merged

Refactor branch logging#246
vijayvammi merged 87 commits intomainfrom
refactor-branch-logging

Conversation

@vijayvammi
Copy link
Collaborator

No description provided.

vijayvammi and others added 30 commits December 30, 2025 23:13
Design document for Phase 1 telemetry implementation:
- Pipeline and task-level spans via logfire-api
- StreamingSpanProcessor for FastAPI SSE real-time streaming
- Self-hosted OpenTelemetry backend support
- Zero-dependency shim (no-ops without logfire installed)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
12 bite-sized tasks with TDD approach:
- Add logfire-api dependency
- Create telemetry module with helpers
- Add StreamingSpanProcessor
- Instrument PipelineContext, JobContext
- Instrument all task types (Python, Notebook, Shell)
- FastAPI streaming example

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add Task 11: Local telemetry test with console output
- Mark Task 12 (FastAPI) as placeholder pending logfire integration docs
- Add task order table to summary

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Use logfire.instrument_fastapi(app) for HTTP request instrumentation
- Pipeline/task spans become children of HTTP request via OTEL context
- Keep StreamingSpanProcessor for real-time SSE streaming to UI

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add logfire-api>=2.0.0 to core dependencies
- Add telemetry optional extras with logfire and opentelemetry

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- truncate_value() for serializing params with size limit
- set_stream_queue()/get_stream_queue() for SSE streaming
- Re-export logfire_api as logfire for convenience

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- OTEL_AVAILABLE flag detects opentelemetry availability
- StreamingSpanProcessor pushes span_start/span_end to queue
- Dual output: forwards to base processor AND streams to UI
- Gracefully no-ops when OTEL not installed

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Pipeline span with name, run_id, executor attributes
- logfire.info for started/completed events
- logfire.error for failure events

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Job span with job_name, run_id, executor attributes
- logfire.info for started/submitted events
- logfire.error for failure events

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add logfire import and truncate_value from telemetry module
- Add _safe_serialize_params() for defensive serialization
- Wrap execute_command in logfire.span with command as task name
- Log task started/completed with truncated inputs/outputs
- Log task failures with error message

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Move _safe_serialize_params to BaseTaskType for reuse
- Wrap NotebookTaskType.execute_command in logfire.span
- Wrap ShellTaskType.execute_command in logfire.span
- Log task started/completed with truncated inputs/outputs
- Log task failures with error messages

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Export set_stream_queue, get_stream_queue, truncate_value
- Export OTEL_AVAILABLE flag
- Conditionally export StreamingSpanProcessor when OTEL available

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Example shows console telemetry output with:
- Pipeline span wrapping execution
- Task spans with truncated inputs/outputs
- Timestamps and durations

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Example demonstrates:
- logfire.instrument_fastapi for HTTP span instrumentation
- SSE streaming of telemetry spans via set_stream_queue
- Pipeline execution in thread pool
- User parameters via RUNNABLE_PRM_* env vars

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add _emit_event() to task types for real-time SSE streaming
- Include inputs/outputs in task events for observability
- Auto-configure logfire from env vars at import time
  (RUNNABLE_TELEMETRY_CONSOLE, OTEL_EXPORTER_OTLP_ENDPOINT, LOGFIRE_TOKEN)
- Update FastAPI example with functools.partial pipeline registry
- Add comprehensive telemetry documentation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add 6 pipelines: example, hello, parameters, parallel, map, shell
- Add GET /pipelines endpoint to list available pipelines
- Improve _safe_serialize_params to truncate per-value not whole dict
- Handle ObjectParameter explicitly with "<object>" placeholder
- Update README with pipeline examples and curl commands

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Design for native async support in runnable:
- AsyncPythonTaskType for async functions
- AsyncLocalExecutor for async graph traversal
- AsyncPipelineContext with async execute()
- AsyncPythonTask/AsyncPipeline SDK classes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
18 bite-sized tasks with TDD approach:
- AsyncPythonTaskType task plugin
- AsyncLocalExecutor executor plugin
- AsyncPipelineContext context class
- AsyncPythonTask/AsyncPipeline SDK classes
- Tests and example

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Update AsyncPythonTaskType to support AsyncGenerator functions
- Add event_callback parameter for streaming task_chunk events
- Add execute_stream() method to AsyncPipeline for SSE
- Add TaskChunkEvent model for streaming chunks
- Add streaming LLM example with FastAPI SSE
- Update tests to cover streaming behavior
- Add architecture diagram showing dual flow pattern

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Resolved conflicts:
- runnable/context.py: logfire tracing + contextvars
- runnable/tasks.py: catalog.put uses get_run_context()
- argo-pipeline.yaml: used main's generated node IDs

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add explicit sync/async method signatures to base classes
- BasePipelineExecutor async methods raise NotImplementedError by default
- BaseTaskType.execute_command_async raises NotImplementedError by default
- Only LocalExecutor and AsyncPythonTaskType implement async methods
- Shared helpers minimize code duplication between sync/async paths
- Clear error messages when async called on unsupported components

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Rewrite plan with explicit method separation
- Base classes raise NotImplementedError for async by default
- LocalExecutor implements both sync and async paths
- PipelineContext gets execute_async() method
- Add shared helpers to minimize code duplication
- 19 tasks covering full implementation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add pytest-asyncio>=0.23.0 to dev dependencies
- Add async method stubs to BasePipelineExecutor:
  - execute_graph_async()
  - execute_from_graph_async()
  - trigger_node_execution_async()
  - _execute_node_async()
- Add execute_command_async() stub to BaseTaskType
- All stubs raise NotImplementedError by default

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add _prepare_node_for_execution() shared helper
- Add _finalize_graph_execution() shared helper
- Refactor execute_from_graph() to use shared helper
- Add async methods to LocalExecutor:
  - execute_graph_async()
  - execute_from_graph_async()
  - trigger_node_execution_async()
  - _execute_node_async()
- Add execute_async() to BaseNode (delegates to sync by default)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add execute_as_graph_async() stub to CompositeNode
- Add execute_async() to TaskNode with fallback to sync
- Add execute_as_graph_async() to ParallelNode

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add execute_as_graph_async() to MapNode
- Add execute_as_graph_async() to ConditionalNode
- Add _handle_completion() helper to PipelineContext
- Add execute_async() to PipelineContext

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add AsyncPipelineContext for simplified async execution
- Add AsyncPythonTask and AsyncPythonTaskType for async functions
- Add async examples (async_tasks.py, async_sequential.py)
- Add test_async_examples in test_pipeline_examples.py
- Update _context property to accept both context types
- Export async classes from runnable/__init__.py

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Design document for native async streaming from pipeline functions
to SSE clients, demonstrating token-by-token LLM response streaming.

Key decisions:
- Separate telemetry (infrastructure) from LLM streaming (domain)
- AsyncGenerator pass-through pattern for universal consumption
- Minimal implementation via callback → queue → AsyncGenerator wrapper

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
vijayvammi and others added 28 commits January 10, 2026 20:36
Remove STEP_LOG and PARAMETER log types. Steps and parameters are
now stored within their parent (RunLog or BranchLog), eliminating
the need for separate file management.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Steps are now stored within their parent entity rather than as
separate files. This simplifies file management and aligns with
the branch-scoped model.

Also cleaned up lingering references to STEP_LOG and PARAMETER
in naming_pattern and retrieve methods.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Parameters are now stored within RunLog.parameters or
BranchLog.parameters, eliminating separate parameter files.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Remove obsolete methods and simplify _prepare_full_run_log.
Branches are now self-contained with their own steps and params.

Changes:
- Removed orderly_retrieve method (no longer needed)
- Simplified _prepare_full_run_log to just attach branches to parent steps
- Fixed get_step_log to retrieve from parent RunLog/BranchLog
- Fixed branch attachment logic to correctly identify parent steps

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
- BranchLog files now use simple naming: BranchLog-{internal_name}
- Removed Template substitution for creation_time
- Removed unused imports (time, Template, string)
- Branch files are now stable and predictable
- Keep separate list of iteration variables for failure reporting
- Fix TypeError when checking failed iterations (JSON string vs dict)
- Simplify failed_iterations list comprehension
- Get parameters from each branch using internal_branch_name
- Removed old {iter_variable}_{param_name} naming convention
- Removed setting reduced flag (field was removed)
- Removed duplicate set_parameters call
- Fixed error messages to reference branch parameters
- Pass internal_branch_name to set_parameters
- Tasks now update parameters in their branch scope
- Fixes old {iter_value}_{param_name} naming in branches
- Field was missing from BaseTaskType causing AttributeError
- Added with default empty string and excluded from serialization
- Required for branch-scoped parameter updates
- When branches take failure path, they may not set all return params
- Changed KeyError exception to skip missing parameters
- Only reduce parameters that were actually set by branches
- Fixes map examples with on_failure handlers
- Task returns use clean names (processed_python not 1_processed_python)
- Parameters are scoped to branches, no need for prefixing
- Removed all {iter_value}_{param_name} generation logic
- Fixed for Python, Notebook, and Shell task types
- Return parameters don't exist in parent initially (created by tasks)
- Check if parameter exists before updating value
- Create new JsonParameter if missing in parent scope
- Fixes KeyError when setting reduced parameters
- Renamed variable to parent_internal_branch_name for clarity
- For root-level map, this is empty string (RunLog parameters)
- For nested map, this is the branch containing the map node
- Ensures reduced parameters are set to correct parent scope
- Removed unnecessary _resolve_map_placeholders call
- Reduced parameters go to where map node lives (parent of branches)
- For root-level map: empty string (RunLog)
- For nested map: the branch containing the map node
Tasks need to know which branch they belong to so they can set
parameters in the correct branch scope. The TaskNode now sets
the resolved branch name on the executable before execution.
The task executable should receive internal_branch_name during
construction so it knows which branch it belongs to from the start.
This gets resolved to the actual branch name at execution time.
When updating parameters in a branch, we need to call add_branch_log
to update the branch back in the run log structure. This is essential
for file-based stores where branches are saved as separate files.
- Removed print debug statements from tasks.py and map.py
- Removed 'reduced' field from all Parameter classes
- Simplified parameter resolution to direct get_parameters call
- Removed unreduced parameter filtering from notebook execution
- Removed placeholder parameter creation in map fan_out
- Updated comments and variable names
With only 2 log types (RunLog and BranchLog), we can construct file
names directly instead of using pattern matching:
- RunLog: always named 'RunLog'
- BranchLog: always named 'BranchLog-{internal_branch_name}'

Changes:
- Replaced naming_pattern() with get_file_name()
- Replaced get_matches() abstract method with _exists() and _list_branch_logs()
- Simplified store() and retrieve() to use direct file name construction
- Updated _prepare_full_run_log() to use _list_branch_logs()
- Changed model(**contents) to model.model_validate(contents)
- Implemented new methods in ChunkedFileSystemRunLogStore
- Removed unused Template import
Implemented _exists() and _list_branch_logs() to replace get_matches()
pattern matching, making the Minio implementation consistent with the
simplified ChunkedRunLogStore base class.

- Removed unused Template import
- Simplified file existence checking with direct path construction
- Branch log listing uses exact glob pattern instead of template substitution
Both ChunkedFileSystemRunLogStore and ChunkedMinioRunLogStore now
consistently construct full file paths in both _store and _retrieve
methods by combining run_id folder with the file name.

Previously, the path construction logic was tied to the 'insert' flag,
which caused FileNotFoundError when retrieving files. Now the file name
parameter is always treated as a relative name that needs to be
prefixed with the run_id folder path.

All tests in test_chunked_simplified.py now pass.
Removed assertion for the 'reduced' attribute from
test_object_parameter_init since the 'reduced' field was removed
from all Parameter classes as part of the refactoring.

All 53 tests in test_datastore.py now pass.
@vijayvammi vijayvammi merged commit 3a6bcb0 into main Jan 12, 2026
1 check passed
@vijayvammi vijayvammi deleted the refactor-branch-logging branch January 12, 2026 07:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant