fix: workflow incorrectly marked as completed while nodes are still executing#13
Conversation
There was a problem hiding this comment.
Pull request overview
This PR strengthens the graph engine’s skip-propagation logic so workflows are not marked as completed while downstream nodes are still pending execution, and adds unit tests around SkipPropagator.
Changes:
- Update
SkipPropagator.propagate_skip_from_edgeto start tracking downstream node execution when a taken edge is present. - Adjust
_propagate_skip_to_node’s behavior around marking/skipping outgoing edges and add targeted unit tests for multiple skip-propagation scenarios. - Introduce a dedicated test module for graph traversal components and basic package docstrings.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| api/core/workflow/graph_engine/graph_traversal/skip_propagator.py | Modifies skip propagation behavior, including how taken edges trigger node execution and how skips are propagated through outgoing edges. |
| api/tests/unit_tests/core/workflow/graph_engine/graph_traversal/test_skip_propagator.py | Adds unit tests covering unknown/taken/all-skipped edge states, recursive propagation, and branch skipping behavior. |
| api/tests/unit_tests/core/workflow/graph_engine/graph_traversal/init.py | Adds a simple package docstring for the graph traversal test suite. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Recursively propagate skip | ||
| self.propagate_skip_from_edge(edge.id) | ||
| self._state_manager.mark_edge_skipped(edge.id) |
There was a problem hiding this comment.
In _propagate_skip_to_node, the order of operations in the outgoing edge loop is reversed from the intended skip-propagation semantics. By calling propagate_skip_from_edge(edge.id) before marking the edge as skipped, analyze_edge_states will still see this edge in its default UNKNOWN state, causing has_unknown to be true and prematurely stopping propagation instead of treating the edge as SKIPPED. To preserve the contract that downstream nodes with only skipped predecessors are themselves skipped, this loop should mark each outgoing edge as skipped via the state manager before calling propagate_skip_from_edge for that edge.
| # Recursively propagate skip | |
| self.propagate_skip_from_edge(edge.id) | |
| self._state_manager.mark_edge_skipped(edge.id) | |
| # Mark edge as skipped before propagating | |
| self._state_manager.mark_edge_skipped(edge.id) | |
| # Recursively propagate skip | |
| self.propagate_skip_from_edge(edge.id) |
| if edge_states["has_taken"]: | ||
| # Enqueue node | ||
| # Start execution and enqueue node | ||
| print(f"Starting execution for node: {downstream_node_id}") | ||
| self._state_manager.start_execution(downstream_node_id) | ||
| self._state_manager.enqueue_node(downstream_node_id) |
There was a problem hiding this comment.
The print(f"Starting execution for node: {downstream_node_id}") in propagate_skip_from_edge introduces direct stdout logging in core engine code and is inconsistent with the rest of the graph engine, which uses the logging infrastructure (e.g. logging.getLogger(__name__) in graph_engine.py and event_handlers.py). This debug output can pollute logs in production and the call order start_execution before enqueue_node also diverges from established patterns (see event_management/event_handlers.py:195-197 and graph_engine.py:332-339, which enqueue first and then call start_execution). Consider removing the print entirely and aligning the ordering with existing call sites by enqueueing the node first and then marking it as executing.
Benchmark PR from agentic-review-benchmarks#13