-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Fix ParallelExecutor crash when Parallel called from within module forward() #9104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Fix ParallelExecutor crash when Parallel called from within module forward() #9104
Conversation
dspy/utils/parallelizer.py
Outdated
| index, outcome = f.result() | ||
| except Exception: | ||
| pass | ||
| logger.error(f"Worker failed: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not really helpful since we catch exceptions by wrapping the original function in _wrap_function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TomeHirata Understood. I've reverted back to pass.
dspy/utils/parallelizer.py
Outdated
| if parent_overrides.get("usage_tracker"): | ||
| # Usage tracker needs to be deep copied across threads so that each thread tracks its own usage | ||
| thread_local_overrides.overrides["usage_tracker"] = copy.deepcopy(parent_overrides["usage_tracker"]) | ||
| new_overrides["usage_tracker"] = copy.deepcopy(parent_overrides["usage_tracker"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks reasonable, can we keep the comment? And also can you add a test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TomeHirata Added the comment back in directly above the deep copy.
|
@TomeHirata. Implemented a unit test as requested. Tried to use the existing tests as reference. |
## Summary
Fixes a bug where
dspy.Parallelsilently returnsNonefor all tasks when called from within a module'sforward()method while MLflow autologging is enabled.**## Problem
When using
mlflow.dspy.autolog(), callingdspy.Parallelfrom inside a module'sforward()method causes all results to silently returnNone.The root cause is twofold:
1. Incorrect ContextVar access in
parallelizer.py(line 95)thread_local_overridesis aContextVar, which requires.get()and.set()methods for access. The existing code incorrectly attempts to access an.overridesattribute directly:This raises
AttributeError: '_contextvars.ContextVar' object has no attribute 'overrides'.The bug only manifests when
usage_trackeris present in the parent context, which occurs when MLflow autologging is enabled.2. Silent exception handling (line ~155)
Worker exceptions are caught and silently discarded:
This masked the underlying
AttributeError.**## Solution
1. Restructured ContextVar access to build the complete overrides dictionary before setting:
2. Added error logging for worker failures:
**## Reproduction