Skip to content

Create a wrapper in order to be able to run two plans in parallel#75

Closed
RafaelLyra8 wants to merge 7 commits intomainfrom
parallel_plans_wrapper
Closed

Create a wrapper in order to be able to run two plans in parallel#75
RafaelLyra8 wants to merge 7 commits intomainfrom
parallel_plans_wrapper

Conversation

@RafaelLyra8
Copy link
Copy Markdown
Contributor

No description provided.

@RafaelLyra8 RafaelLyra8 marked this pull request as draft December 10, 2025 17:28
Copy link
Copy Markdown
Member

@ericonr ericonr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the two commits from this PR should be joined, because the first commit doesn't have a fully working implementation and reviewing the git history could be misleading.

Furthermore, I believe the commit message (or code comments) should expand on the purpose of this code. We need to bridge non-async code to run some asynchronous tasks simultaneously, and this means blocking on multiple async functions. This is necessary because we yield the tasks to be run, but if we receive a sleep/wait, we want to run that ourselves so the runner doesn't waste time.

Comment on lines +42 to +43
except Exception as exception:
plan_has_finished_list[list_id].set()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intended to catch StopIteration for when a plan is finished?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes if the plans was successful, but I also wanted to call it if one plan failed.

Comment thread src/sophys/common/plans/parallel_wrapper.py
Comment thread src/sophys/common/plans/parallel_wrapper.py Outdated
Comment thread src/sophys/common/plans/parallel_wrapper.py Outdated
Comment thread src/sophys/common/plans/parallel_wrapper.py Outdated
Comment thread src/sophys/common/plans/parallel_wrapper.py Outdated
Comment thread src/sophys/common/plans/parallel_wrapper.py Outdated
def yield_msg(msg):
yield msg
async def yield_msg(msg):
await asyncio.wait_for(RE._command_registry[msg[0]](msg), timeout=10**10)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • is it safe to use this RE internal member?
  • why use this timeout instead of None?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only used the timeout because the asyncio returned an error if it was not passed, but I didn't wan't to use it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I don't think RE internal member is supposed to be used outside of the run engine class, but because running plans in parallel is not something that the bluesky framework supports natively and because I want to use the same functions of the run engine for the message, I chose to use it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can do timeout=None for an infinite timeout, then!

Comment thread src/sophys/common/plans/parallel_wrapper.py Outdated
@RafaelLyra8 RafaelLyra8 force-pushed the parallel_plans_wrapper branch from fac0d74 to bb80d96 Compare December 12, 2025 17:58
@RafaelLyra8 RafaelLyra8 force-pushed the parallel_plans_wrapper branch from 7fb5677 to 8d556cc Compare December 12, 2025 19:42
Copy link
Copy Markdown
Contributor

@flowln flowln left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is working as intended, and this really should have automated tests. One example of broken behavior (the test is failing, it should succeed as far as I understand what you're trying to achieve here):

import time

import numpy as np

from bluesky import RunEngine, plan_stubs as bps
from bluesky.preprocessors import run_decorator

from sophys.common.plans.parallel_wrapper import parallel_plans_wrapper


def test_two_plans():
    RE = RunEngine()

    @run_decorator()
    def single_plan():
        yield from bps.sleep(0.5)

    plan = parallel_plans_wrapper(single_plan(), single_plan())

    _t = time.time()
    RE(plan)
    assert np.isclose(time.time() - _t, 0.5, atol=0.2), "Plan should last about half a second."

Comment on lines +16 to +25
@contextmanager
def thread_loop_manager():
loop = asyncio.new_event_loop()
thread = Thread(target=run_loop, args=(loop,), daemon=True)
thread.start()
try:
yield loop
finally:
loop.call_soon_threadsafe(loop.stop)
thread.join()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know a contextmanager decorator existed. That's really compact!

def yield_msg(msg):
yield msg
async def yield_msg(msg):
await asyncio.wait_for(RE._command_registry[msg[0]](msg), timeout=10**10)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can do timeout=None for an infinite timeout, then!

parallel_plan_list = list(args)
plan_has_finished_list = {}
current_event = {}
current_thread = {}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe rename to blocking_task_future?

Comment on lines +68 to +70
# Monitor 'wait' thread until its finished
if not current_thread[list_id].done():
current_thread[list_id] = None
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be if current_thread[list_id].done()? The way it's written here you're throwing out the future when it's still not done.

Comment on lines +71 to +73
except Exception as exception:
plan_has_finished_list[list_id].set()
print(exception)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What part of the code are we expecting to throw exceptions here? I think that should be documented in a comment. Is it just the next call? If you want to catch exceptions from the tasks passed to the async loop, you need to call result() in the if ... done() block.


def parallel_plans_wrapper(*args):
parallel_plan_list = list(args)
plan_has_finished_list = {}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seeing as this list is now only touched in the same thread inside of this class, could it be moved to use booleans instead of Events? It would also simplify all_plans_have_finished above.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants