Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 148 additions & 10 deletions tests/test_spawning.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
"""
Spawning basics
Spawning basics including audit of,

- subproc bootstrap, such as subactor runtime-data/config inheritance,
- basic (and mostly legacy) `ActorNursery` subactor starting and
cancel APIs.

Simple (and generally legacy) examples from the original
API design.

"""
from functools import partial
Expand Down Expand Up @@ -98,7 +105,9 @@ async def movie_theatre_question():


@tractor_test
async def test_movie_theatre_convo(start_method):
async def test_movie_theatre_convo(
start_method: str,
):
'''
The main ``tractor`` routine.

Expand Down Expand Up @@ -151,13 +160,16 @@ async def test_most_beautiful_word(
name='some_linguist',
)

print(await portal.result())
res: Any = await portal.wait_for_result()
assert res == return_value
# The ``async with`` will unblock here since the 'some_linguist'
# actor has completed its main task ``cellar_door``.

# this should pull the cached final result already captured during
# the nursery block exit.
print(await portal.result())
res: Any = await portal.wait_for_result()
assert res == return_value
print(res)


async def check_loglevel(level):
Expand All @@ -168,16 +180,24 @@ async def check_loglevel(level):
log.critical('yoyoyo')


@pytest.mark.parametrize(
'level', [
'debug',
'cancel',
'critical'
],
ids='loglevel={}'.format,
)
def test_loglevel_propagated_to_subactor(
start_method,
capfd,
reg_addr,
capfd: pytest.CaptureFixture,
start_method: str,
reg_addr: tuple,
level: str,
):
if start_method == 'mp_forkserver':
pytest.skip(
"a bug with `capfd` seems to make forkserver capture not work?")

level = 'critical'
"a bug with `capfd` seems to make forkserver capture not work?"
)

async def main():
async with tractor.open_nursery(
Expand All @@ -197,3 +217,121 @@ async def main():
# ensure subactor spits log message on stderr
captured = capfd.readouterr()
assert 'yoyoyo' in captured.err


async def check_parent_main_inheritance(
expect_inherited: bool,
) -> bool:
'''
Assert that the child actor's ``_parent_main_data`` matches the
``inherit_parent_main`` flag it was spawned with.

With the trio spawn backend the parent's ``__main__`` bootstrap
data is captured and forwarded to each child so it can replay
the parent's ``__main__`` as ``__mp_main__``, mirroring the
stdlib ``multiprocessing`` bootstrap:
https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods

When ``inherit_parent_main=False`` the data dict is empty
(``{}``) so no fixup ever runs and the child keeps its own
``__main__`` untouched.

NOTE: under `pytest` the parent ``__main__`` is
``pytest.__main__`` whose ``_fixup_main_from_name()`` is a no-op
(the name ends with ``.__main__``), so we cannot observe
a difference in ``sys.modules['__main__'].__name__`` between the
two modes. Checking ``_parent_main_data`` directly is the most
reliable verification that the flag is threaded through
correctly; a ``RemoteActorError[AssertionError]`` propagates on
mismatch.

'''
import tractor
actor: tractor.Actor = tractor.current_actor()
has_data: bool = bool(actor._parent_main_data)
assert has_data == expect_inherited, (
f'Expected _parent_main_data to be '
f'{"non-empty" if expect_inherited else "empty"}, '
f'got: {actor._parent_main_data!r}'
)
return has_data


def test_run_in_actor_can_skip_parent_main_inheritance(
start_method: str, # <- only support on `trio` backend rn.
):
'''
Verify ``inherit_parent_main=False`` on ``run_in_actor()``
prevents parent ``__main__`` data from reaching the child.

'''
if start_method != 'trio':
pytest.skip(
'parent main-inheritance opt-out only affects the trio backend'
)

async def main():
async with tractor.open_nursery(start_method='trio') as an:

# Default: child receives parent __main__ bootstrap data
replaying = await an.run_in_actor(
check_parent_main_inheritance,
name='replaying-parent-main',
expect_inherited=True,
)
await replaying.result()

# Opt-out: child gets no parent __main__ data
isolated = await an.run_in_actor(
check_parent_main_inheritance,
name='isolated-parent-main',
inherit_parent_main=False,
expect_inherited=False,
)
await isolated.result()

trio.run(main)


def test_start_actor_can_skip_parent_main_inheritance(
start_method: str, # <- only support on `trio` backend rn.
):
'''
Verify ``inherit_parent_main=False`` on ``start_actor()``
prevents parent ``__main__`` data from reaching the child.

'''
if start_method != 'trio':
pytest.skip(
'parent main-inheritance opt-out only affects the trio backend'
)

async def main():
async with tractor.open_nursery(start_method='trio') as an:

# Default: child receives parent __main__ bootstrap data
replaying = await an.start_actor(
'replaying-parent-main',
enable_modules=[__name__],
)
result = await replaying.run(
check_parent_main_inheritance,
expect_inherited=True,
)
assert result is True
await replaying.cancel_actor()

# Opt-out: child gets no parent __main__ data
isolated = await an.start_actor(
'isolated-parent-main',
enable_modules=[__name__],
inherit_parent_main=False,
)
result = await isolated.run(
check_parent_main_inheritance,
expect_inherited=False,
)
assert result is False
await isolated.cancel_actor()

trio.run(main)
21 changes: 15 additions & 6 deletions tractor/runtime/_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
from ._portal import Portal
from . import _state
from ..spawn import _mp_fixup_main
from ..spawn._mp_fixup_main import ParentMainData
from . import _rpc

if TYPE_CHECKING:
Expand Down Expand Up @@ -218,7 +219,7 @@ def ipc_server(self) -> _server.IPCServer:
return self._ipc_server

# Information about `__main__` from parent
_parent_main_data: dict[str, str]
_parent_main_data: ParentMainData
_parent_chan_cs: CancelScope|None = None
_spawn_spec: msgtypes.SpawnSpec|None = None

Expand All @@ -240,10 +241,11 @@ def __init__(
name: str,
uuid: str,
*,
enable_modules: list[str] = [],
enable_modules: list[str] | None = None,
loglevel: str|None = None,
registry_addrs: list[Address]|None = None,
spawn_method: str|None = None,
inherit_parent_main: bool = True,

arbiter_addr: UnwrappedAddress|None = None,

Expand All @@ -265,12 +267,15 @@ def __init__(
self._cancel_called_by: tuple[str, tuple]|None = None
self._cancel_called: bool = False

# retreive and store parent `__main__` data which
# retrieve and store parent `__main__` data which
# will be passed to children
self._parent_main_data = _mp_fixup_main._mp_figure_out_main()
self._parent_main_data: ParentMainData = _mp_fixup_main._mp_figure_out_main(
inherit_parent_main=inherit_parent_main,
)

# TODO? only add this when `is_debug_mode() == True` no?
# always include debugging tools module
enable_modules = list(enable_modules or [])
if _state.is_root_process():
enable_modules.append('tractor.devx.debug._tty_lock')

Expand Down Expand Up @@ -547,11 +552,15 @@ def load_modules(

'''
try:
if self._spawn_method == 'trio':
parent_data = self._parent_main_data
if (
self._spawn_method == 'trio'
and
(parent_data := self._parent_main_data)
):
if 'init_main_from_name' in parent_data:
_mp_fixup_main._fixup_main_from_name(
parent_data['init_main_from_name'])

elif 'init_main_from_path' in parent_data:
_mp_fixup_main._fixup_main_from_path(
parent_data['init_main_from_path'])
Expand Down
19 changes: 16 additions & 3 deletions tractor/runtime/_supervise.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,18 +194,26 @@ async def start_actor(
loglevel: str|None = None, # set log level per subactor
debug_mode: bool|None = None,
infect_asyncio: bool = False,
inherit_parent_main: bool = True,

# TODO: ideally we can rm this once we no longer have
# a `._ria_nursery` since the dependent APIs have been
# removed!
nursery: trio.Nursery|None = None,
proc_kwargs: dict[str, any] = {}
proc_kwargs: dict[str, typing.Any] | None = None,

) -> Portal:
'''
Start a (daemon) actor: an process that has no designated
"main task" besides the runtime.

Pass ``inherit_parent_main=False`` to keep this child on its
own bootstrap module for the trio spawn backend instead of
applying the parent ``__main__`` re-exec fixup during startup.
This does not affect ``multiprocessing`` ``spawn`` or
``forkserver`` which reconstruct the parent's ``__main__`` as
part of their normal stdlib bootstrap.

'''
__runtimeframe__: int = 1 # noqa
loglevel: str = (
Expand All @@ -224,7 +232,8 @@ async def start_actor(
_rtv['_debug_mode'] = debug_mode
self._at_least_one_child_in_debug = True

enable_modules = enable_modules or []
enable_modules = list(enable_modules or [])
proc_kwargs = dict(proc_kwargs or {})

if rpc_module_paths:
warnings.warn(
Expand All @@ -242,6 +251,7 @@ async def start_actor(
# modules allowed to invoked funcs from
enable_modules=enable_modules,
loglevel=loglevel,
inherit_parent_main=inherit_parent_main,

# verbatim relay this actor's registrar addresses
registry_addrs=current_actor().registry_addrs,
Expand Down Expand Up @@ -289,7 +299,8 @@ async def run_in_actor(
enable_modules: list[str] | None = None,
loglevel: str | None = None, # set log level per subactor
infect_asyncio: bool = False,
proc_kwargs: dict[str, any] = {},
inherit_parent_main: bool = True,
proc_kwargs: dict[str, typing.Any] | None = None,

**kwargs, # explicit args to ``fn``

Expand All @@ -310,6 +321,7 @@ async def run_in_actor(
# use the explicit function name if not provided
name = fn.__name__

proc_kwargs = dict(proc_kwargs or {})
portal: Portal = await self.start_actor(
name,
enable_modules=[mod_path] + (
Expand All @@ -320,6 +332,7 @@ async def run_in_actor(
# use the run_in_actor nursery
nursery=self._ria_nursery,
infect_asyncio=infect_asyncio,
inherit_parent_main=inherit_parent_main,
proc_kwargs=proc_kwargs
)

Expand Down
Loading
Loading