Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion python/rapidsmpf/rapidsmpf/streaming/core/node.pyx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

from cython.operator cimport dereference as deref
Expand Down Expand Up @@ -200,6 +200,11 @@ def run_streaming_pipeline(*, nodes, py_executor = None):
Executor used to run Python nodes (required if any Python nodes are present).
If no Python nodes are provided, this is ignored.

``py_executor`` is set as the default executor on the event loop managed
by rapidsmpf. If your Python nodes include an
``await asyncio.to_thread(blocking_function)`` then the blocking function
will run in ``py_executor``.

Warnings
--------
C++ nodes are released and must not be used after this call.
Expand Down Expand Up @@ -252,6 +257,8 @@ def run_streaming_pipeline(*, nodes, py_executor = None):
)

async def runner():
loop = asyncio.get_running_loop()
loop.set_default_executor(py_executor)
return await asyncio.gather(*py_nodes)

if len(py_nodes) > 0:
Expand Down
Loading