From 3ed8a6b6e85747057a718b357346984adf971690 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 21 Nov 2025 10:56:24 -0800 Subject: [PATCH 01/10] WIP add workflow_multiprocessing sample --- README.md | 1 + pyproject.toml | 32 +++--- uv.lock | 17 +-- workflow_multiprocessing/README.md | 4 + workflow_multiprocessing/__init__.py | 2 + workflow_multiprocessing/activities.py | 7 ++ workflow_multiprocessing/starter.py | 49 ++++++++ workflow_multiprocessing/worker.py | 150 +++++++++++++++++++++++++ workflow_multiprocessing/workflows.py | 22 ++++ 9 files changed, 253 insertions(+), 31 deletions(-) create mode 100644 workflow_multiprocessing/README.md create mode 100644 workflow_multiprocessing/__init__.py create mode 100644 workflow_multiprocessing/activities.py create mode 100644 workflow_multiprocessing/starter.py create mode 100644 workflow_multiprocessing/worker.py create mode 100644 workflow_multiprocessing/workflows.py diff --git a/README.md b/README.md index d9ae57a30..e572bb42e 100644 --- a/README.md +++ b/README.md @@ -84,6 +84,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [updatable_timer](updatable_timer) - A timer that can be updated while sleeping. * [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers. * [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code. +* [workflow_multiprocessing](workflow_multiprocessing) - Leverage Python multiprocessing to parallelize workflow tasks. ## Test diff --git a/pyproject.toml b/pyproject.toml index ed007f47a..542731ef4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ authors = [{ name = "Temporal Technologies Inc", email = "sdk@temporal.io" }] requires-python = ">=3.10" readme = "README.md" license = "MIT" -dependencies = ["temporalio>=1.19.0,<2"] +dependencies = ["temporalio @ git+https://github.com/temporalio/sdk-python"] [project.urls] Homepage = "https://github.com/temporalio/samples-python" @@ -28,15 +28,8 @@ dev = [ "poethepoet>=0.36.0", ] bedrock = ["boto3>=1.34.92,<2"] -dsl = [ - "pyyaml>=6.0.1,<7", - "types-pyyaml>=6.0.12,<7", - "dacite>=1.8.1,<2", -] -encryption = [ - "cryptography>=38.0.1,<39", - "aiohttp>=3.8.1,<4", -] +dsl = ["pyyaml>=6.0.1,<7", "types-pyyaml>=6.0.12,<7", "dacite>=1.8.1,<2"] +encryption = ["cryptography>=38.0.1,<39", "aiohttp>=3.8.1,<4"] gevent = ["gevent==25.4.2 ; python_version >= '3.8'"] langchain = [ "langchain>=0.1.7,<0.2 ; python_version >= '3.8.1' and python_version < '4.0'", @@ -47,9 +40,7 @@ langchain = [ "tqdm>=4.62.0,<5", "uvicorn[standard]>=0.24.0.post1,<0.25", ] -nexus = [ - "nexus-rpc>=1.1.0,<2", -] +nexus = ["nexus-rpc>=1.1.0,<2"] open-telemetry = [ "temporalio[opentelemetry]", "opentelemetry-exporter-otlp-proto-grpc", @@ -61,10 +52,7 @@ openai-agents = [ ] pydantic-converter = ["pydantic>=2.10.6,<3"] sentry = ["sentry-sdk>=2.13.0"] -trio-async = [ - "trio>=0.28.0,<0.29", - "trio-asyncio>=0.15.0,<0.16", -] +trio-async = ["trio>=0.28.0,<0.29", "trio-asyncio>=0.15.0,<0.16"] cloud-export-to-parquet = [ "pandas>=2.2.2,<3 ; python_version >= '3.10' and python_version < '4.0'", "numpy>=1.26.0,<2 ; python_version >= '3.10' and python_version < '3.13'", @@ -72,6 +60,8 @@ cloud-export-to-parquet = [ "pyarrow>=19.0.1", ] +[tool.hatch.metadata] +allow-direct-references = true [tool.hatch.build.targets.sdist] include = ["./**/*.py"] @@ -119,8 +109,12 @@ requires = ["hatchling"] build-backend = "hatchling.build" [tool.poe.tasks] -format = [{cmd = "uv run black ."}, {cmd = "uv run isort ."}] -lint = [{cmd = "uv run black --check ."}, {cmd = "uv run isort --check-only ."}, {ref = "lint-types" }] +format = [{ cmd = "uv run black ." }, { cmd = "uv run isort ." }] +lint = [ + { cmd = "uv run black --check ." }, + { cmd = "uv run isort --check-only ." }, + { ref = "lint-types" }, +] lint-types = "uv run --all-groups mypy --check-untyped-defs --namespace-packages ." test = "uv run --all-groups pytest" diff --git a/uv.lock b/uv.lock index b2c83a737..2a2c0c194 100644 --- a/uv.lock +++ b/uv.lock @@ -1405,14 +1405,14 @@ wheels = [ [[package]] name = "nexus-rpc" -version = "1.1.0" +version = "1.2.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/ef/66/540687556bd28cf1ec370cc6881456203dfddb9dab047b8979c6865b5984/nexus_rpc-1.1.0.tar.gz", hash = "sha256:d65ad6a2f54f14e53ebe39ee30555eaeb894102437125733fb13034a04a44553", size = 77383, upload-time = "2025-07-07T19:03:58.368Z" } +sdist = { url = "https://files.pythonhosted.org/packages/06/50/95d7bc91f900da5e22662c82d9bf0f72a4b01f2a552708bf2f43807707a1/nexus_rpc-1.2.0.tar.gz", hash = "sha256:b4ddaffa4d3996aaeadf49b80dfcdfbca48fe4cb616defaf3b3c5c2c8fc61890", size = 74142, upload-time = "2025-11-17T19:17:06.798Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/bf/2f/9e9d0dcaa4c6ffa22b7aa31069a8a264c753ff8027b36af602cce038c92f/nexus_rpc-1.1.0-py3-none-any.whl", hash = "sha256:d1b007af2aba186a27e736f8eaae39c03aed05b488084ff6c3d1785c9ba2ad38", size = 27743, upload-time = "2025-07-07T19:03:57.556Z" }, + { url = "https://files.pythonhosted.org/packages/13/04/eaac430d0e6bf21265ae989427d37e94be5e41dc216879f1fbb6c5339942/nexus_rpc-1.2.0-py3-none-any.whl", hash = "sha256:977876f3af811ad1a09b2961d3d1ac9233bda43ff0febbb0c9906483b9d9f8a3", size = 28166, upload-time = "2025-11-17T19:17:05.64Z" }, ] [[package]] @@ -2597,7 +2597,7 @@ wheels = [ [[package]] name = "temporalio" version = "1.19.0" -source = { registry = "https://pypi.org/simple" } +source = { git = "https://github.com/temporalio/sdk-python#5d1630ddecf59739aaf505ab2a5502b3a026de42" } dependencies = [ { name = "nexus-rpc" }, { name = "protobuf" }, @@ -2605,13 +2605,6 @@ dependencies = [ { name = "types-protobuf" }, { name = "typing-extensions" }, ] -wheels = [ - { url = "https://files.pythonhosted.org/packages/3f/92/0775d831fa245d61b74db2059d5a24a04cef0532ed2c48310a5ab007de9c/temporalio-1.19.0-cp310-abi3-macosx_10_12_x86_64.whl", hash = "sha256:c2d6d5cad8aec56e048705aa4f0bab83fec15343757ea7acf8504f2e0c289b60", size = 13175255, upload-time = "2025-11-13T22:35:54.22Z" }, - { url = "https://files.pythonhosted.org/packages/e2/e1/2a818fefc0023eb132bfff1a03440bcaff154d4d97445ef88a40c23c20c8/temporalio-1.19.0-cp310-abi3-macosx_11_0_arm64.whl", hash = "sha256:d85c89018cba9471ce529d90c9cee5bcc31790fd64176b9ada32cc76440f8d73", size = 12854549, upload-time = "2025-11-13T22:35:57.217Z" }, - { url = "https://files.pythonhosted.org/packages/ff/78/fe5c8c9b112b38e01aba845335df17a8bbfd60a434ffe3c1c4737ced40a0/temporalio-1.19.0-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f772f0698d60f808bc3c4a055fb53e40d757fa646411845b911863eebbf0549d", size = 13237772, upload-time = "2025-11-13T22:36:00.511Z" }, - { url = "https://files.pythonhosted.org/packages/d9/82/be0fd31119651f518f8db8685fd61976d9d5bbecf3b562d51f13a6442a17/temporalio-1.19.0-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f706c8f49771daf342ac8daa8ed07f4124fae943177f9feef458a1255aee717c", size = 13374621, upload-time = "2025-11-13T22:36:03.431Z" }, - { url = "https://files.pythonhosted.org/packages/d8/94/18f6ae06ffd91507ded9111af1041146a5ba4b56e9256520c5ce82629fc4/temporalio-1.19.0-cp310-abi3-win_amd64.whl", hash = "sha256:162459c293553be39994f20c635a132f7332ae71bd7ba4042f8473701fcf1c7c", size = 14256891, upload-time = "2025-11-13T22:36:06.778Z" }, -] [package.optional-dependencies] openai-agents = [ @@ -2698,7 +2691,7 @@ trio-async = [ ] [package.metadata] -requires-dist = [{ name = "temporalio", specifier = ">=1.19.0,<2" }] +requires-dist = [{ name = "temporalio", git = "https://github.com/temporalio/sdk-python" }] [package.metadata.requires-dev] bedrock = [{ name = "boto3", specifier = ">=1.34.92,<2" }] diff --git a/workflow_multiprocessing/README.md b/workflow_multiprocessing/README.md new file mode 100644 index 000000000..1d4438855 --- /dev/null +++ b/workflow_multiprocessing/README.md @@ -0,0 +1,4 @@ + +# Workflow Multiprocessing + +This same shows how to use a `concurrent.futures.ProcessPoolExecutor` to \ No newline at end of file diff --git a/workflow_multiprocessing/__init__.py b/workflow_multiprocessing/__init__.py new file mode 100644 index 000000000..d9e30b8a7 --- /dev/null +++ b/workflow_multiprocessing/__init__.py @@ -0,0 +1,2 @@ +WORKFLOW_TASK_QUEUE = "workflow-task-queue" +ACTIVITY_TASK_QUEUE = "activity-task-queue" diff --git a/workflow_multiprocessing/activities.py b/workflow_multiprocessing/activities.py new file mode 100644 index 000000000..3416327a5 --- /dev/null +++ b/workflow_multiprocessing/activities.py @@ -0,0 +1,7 @@ +import os +from temporalio import activity + + +@activity.defn +async def echo_pid_activity(input: str) -> str: + return f"{input} | activity-pid:{os.getpid()}" diff --git a/workflow_multiprocessing/starter.py b/workflow_multiprocessing/starter.py new file mode 100644 index 000000000..c87035d6d --- /dev/null +++ b/workflow_multiprocessing/starter.py @@ -0,0 +1,49 @@ +import asyncio +import argparse +import uuid + +from temporalio.client import Client +from temporalio.envconfig import ClientConfig + +from workflow_multiprocessing import WORKFLOW_TASK_QUEUE +from workflow_multiprocessing.workflows import ParallelizedWorkflow + + +class Args(argparse.Namespace): + num_workflows: int + + +async def main(): + parser = argparse.ArgumentParser() + parser.add_argument( + "-n", + "--num-workflows", + help="the number of workflows to execute", + type=int, + default=25, + ) + args = parser.parse_args(namespace=Args()) + + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + client = await Client.connect(**config) + + # Start several workflows + wf_handles = [ + client.execute_workflow( + ParallelizedWorkflow.run, + "temporal", + id=f"greeting-workflow-id-{uuid.uuid4()}", + task_queue=WORKFLOW_TASK_QUEUE, + ) + for _ in range(args.num_workflows) + ] + + # Wait for workflow completion (runs indefinitely until it receives a signal) + for wf in asyncio.as_completed(wf_handles): + result = await wf + print(result) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_multiprocessing/worker.py b/workflow_multiprocessing/worker.py new file mode 100644 index 000000000..651b0f6a7 --- /dev/null +++ b/workflow_multiprocessing/worker.py @@ -0,0 +1,150 @@ +import argparse +import concurrent.futures +import dataclasses +import multiprocessing +import traceback +import asyncio +import logging +from typing import Literal + +from temporalio.client import Client +from temporalio.envconfig import ClientConfig +from temporalio.worker import Worker, PollerBehaviorSimpleMaximum +from temporalio.worker.workflow_sandbox import ( + SandboxedWorkflowRunner, + SandboxRestrictions, +) +from temporalio.runtime import Runtime, TelemetryConfig + +from workflow_multiprocessing import ACTIVITY_TASK_QUEUE, WORKFLOW_TASK_QUEUE +from workflow_multiprocessing.workflows import ParallelizedWorkflow +from workflow_multiprocessing.activities import echo_pid_activity + +logger = logging.getLogger() + +# Immediately prevent the default Runtime from being created to ensure +# each process creates it's own +Runtime.prevent_default() + + +class Args(argparse.Namespace): + num_workflow_workers: int + num_activity_workers: int + + @property + def total_workers(self) -> int: + return self.num_activity_workers + self.num_workflow_workers + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("-w", "--num-workflow-workers", type=int, default=2) + parser.add_argument("-a", "--num-activity-workers", type=int, default=1) + args = parser.parse_args(namespace=Args()) + logger.info( + f"starting {args.num_workflow_workers} workflow worker(s) and {args.num_activity_workers} activity worker(s)" + ) + + # This sample prefers fork to avoid re-importing + # and decrease startup time. Fork is not available on all + # operating systems, so we fallback to 'spawn' when not available + try: + mp_ctx = multiprocessing.get_context("fork") + except ValueError: + mp_ctx = multiprocessing.get_context("spawn") + + with concurrent.futures.ProcessPoolExecutor( + args.total_workers, mp_context=mp_ctx + ) as executor: + # Start workflow workers by submitting them to the + # ProcessPoolExecutor + worker_futures = [ + executor.submit(worker_entry, "workflow") + for _ in range(args.num_workflow_workers) + ] + + # In this sample, we start activity workers as separate processes in the + # same way we do workflow workers. In production, activity workers + # are often deployed separately from workflow workers to account for + # the different scaling characteristics. + worker_futures.extend( + [ + executor.submit(worker_entry, "activity") + for _ in range(args.num_activity_workers) + ] + ) + + try: + logger.info("waiting for keyboard interrupt or for all workers to exit") + for worker in concurrent.futures.as_completed(worker_futures): + logger.error("worker exited unexpectedly") + if worker.exception(): + traceback.print_exception(worker.exception()) + except KeyboardInterrupt: + pass + finally: + executor.shutdown(wait=True) + + +def worker_entry(worker_type: Literal["workflow", "activity"]): + Runtime.set_default(Runtime(telemetry=TelemetryConfig())) + + async def run_worker(): + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + client = await Client.connect(**config) + + if worker_type == "workflow": + worker = workflow_worker(client) + else: + worker = activity_worker(client) + + try: + await asyncio.shield(worker.run()) + except asyncio.CancelledError: + await worker.shutdown() + + asyncio.run(run_worker()) + + +def workflow_worker(client: Client) -> Worker: + """ + Create a workflow worker that is configured to leverage being run + as many child processes. + """ + return Worker( + client, + task_queue=WORKFLOW_TASK_QUEUE, + workflows=[ParallelizedWorkflow], + # Workflow tasks are CPU bound, but generally execute quickly. + # Because we're leveraging multiprocessing to achieve parallelism, + # we want each workflow worker to be confirgured for small workflow + # task processing. + max_concurrent_workflow_tasks=2, + workflow_task_poller_behavior=PollerBehaviorSimpleMaximum(2), + # Allow workflows to access the os module to access the pid + workflow_runner=SandboxedWorkflowRunner( + restrictions=dataclasses.replace( + SandboxRestrictions.default, + invalid_module_members=SandboxRestrictions.invalid_module_members_default.with_child_unrestricted( + "os" + ), + ) + ), + ) + + +def activity_worker(client: Client) -> Worker: + """ + Create a basic activity worker + """ + return Worker( + client, + task_queue=ACTIVITY_TASK_QUEUE, + activities=[echo_pid_activity], + ) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + main() diff --git a/workflow_multiprocessing/workflows.py b/workflow_multiprocessing/workflows.py new file mode 100644 index 000000000..e831ec121 --- /dev/null +++ b/workflow_multiprocessing/workflows.py @@ -0,0 +1,22 @@ +from datetime import timedelta +import os + +from temporalio import workflow + +from workflow_multiprocessing import ACTIVITY_TASK_QUEUE +from workflow_multiprocessing.activities import echo_pid_activity + + +@workflow.defn +class ParallelizedWorkflow: + @workflow.run + async def run(self, input: str) -> str: + pid = os.getpid() + activity_result = await workflow.execute_activity( + echo_pid_activity, + f"wf-starting-pid:{pid}", + task_queue=ACTIVITY_TASK_QUEUE, + start_to_close_timeout=timedelta(seconds=10), + ) + + return f"{activity_result} | wf-ending-pid:{pid}" From cf90f1a1b56fbb4312fcc8e2e4af13e5a3c96918 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 21 Nov 2025 15:30:00 -0800 Subject: [PATCH 02/10] Add README content and other minor improvements --- sleep_for_days/README.md | 2 +- workflow_multiprocessing/README.md | 4 - workflow_task_multiprocessing/README.md | 111 ++++++++++++++++++ .../__init__.py | 0 .../activities.py | 0 .../starter.py | 10 +- .../worker.py | 26 ++-- .../workflows.py | 4 +- 8 files changed, 134 insertions(+), 23 deletions(-) delete mode 100644 workflow_multiprocessing/README.md create mode 100644 workflow_task_multiprocessing/README.md rename {workflow_multiprocessing => workflow_task_multiprocessing}/__init__.py (100%) rename {workflow_multiprocessing => workflow_task_multiprocessing}/activities.py (100%) rename {workflow_multiprocessing => workflow_task_multiprocessing}/starter.py (81%) rename {workflow_multiprocessing => workflow_task_multiprocessing}/worker.py (84%) rename {workflow_multiprocessing => workflow_task_multiprocessing}/workflows.py (79%) diff --git a/sleep_for_days/README.md b/sleep_for_days/README.md index 69302cc79..5117fcdb2 100644 --- a/sleep_for_days/README.md +++ b/sleep_for_days/README.md @@ -2,7 +2,7 @@ This sample demonstrates how to create a Temporal workflow that runs forever, sending an email every 30 days. -To run, first see the main [README.md](../../README.md) for prerequisites. +To run, first see the main [README.md](../README.md) for prerequisites. Then create two terminals. diff --git a/workflow_multiprocessing/README.md b/workflow_multiprocessing/README.md deleted file mode 100644 index 1d4438855..000000000 --- a/workflow_multiprocessing/README.md +++ /dev/null @@ -1,4 +0,0 @@ - -# Workflow Multiprocessing - -This same shows how to use a `concurrent.futures.ProcessPoolExecutor` to \ No newline at end of file diff --git a/workflow_task_multiprocessing/README.md b/workflow_task_multiprocessing/README.md new file mode 100644 index 000000000..1ef8137a1 --- /dev/null +++ b/workflow_task_multiprocessing/README.md @@ -0,0 +1,111 @@ +# Workflow Task Multiprocessing Sample + + +## Python Concurrency Limitations + +CPU bound tasks effectively cannot be run in parallel in Python due to the [global interpreter lock](https://docs.python.org/3/glossary.html#term-global-interpreter-lock). The [Python standard library `threading` module](https://docs.python.org/3/library/threading.html) shares the following advice: + +> CPython implementation detail: In CPython, due to the Global Interpreter Lock, only one thread can execute Python code at once (even though certain performance-oriented libraries might overcome this limitation). If you want your application to make better use of the computational resources of multi-core machines, you are advised to use multiprocessing or concurrent.futures.ProcessPoolExecutor. However, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously. + +## Temporal Workflow Tasks in Python + +[Temporal Workflow Tasks](https://docs.temporal.io/tasks#workflow-task) are CPU bound operations and therefore cannot be run concurrently using threads or an async runtime. Instead, we can use [`concurrent.futures.ProcessPoolExecutor`](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor) or the [`multiprocessing` module](https://docs.python.org/3/library/multiprocessing.html) as suggested by the `threading` documentation to more appropriately utilize machine resources. + +This sample demonstrates using `concurrent.futures.ProcessPoolExecutor` to run multiple workflow worker processes. + +## Running the Sample + +To run, first see the root [README.md](../README.md) for prerequisites. Then, run the following from the root directory to run the `workflow_task_multiprocessing/` sample: + +```sh +uv run workflow_task_multiprocessing/worker.py +uv run workflow_task_multiprocessing/starter.py +``` + +Both `worker.py` and `starter.py` have minimal arguments that can be modified to run the sample in varying contexts. + +```sh +uv run workflow_task_multiprocessing/worker.py -h + +usage: worker.py [-h] [-w NUM_WORKFLOW_WORKERS] [-a NUM_ACTIVITY_WORKERS] + +options: + -h, --help show this help message and exit + -w, --num-workflow-workers NUM_WORKFLOW_WORKERS + -a, --num-activity-workers NUM_ACTIVITY_WORKERS +``` + +```sh +uv run workflow_task_multiprocessing/starter.py -h + +usage: starter.py [-h] [-n NUM_WORKFLOWS] + +options: + -h, --help show this help message and exit + -n, --num-workflows NUM_WORKFLOWS + the number of workflows to execute +``` + +## Example Output + + + + + + + + + + + +
worker.pystarter.py
+ +```sh +uv run workflow_task_multiprocessing/worker.py + +INFO:worker:starting 2 workflow worker(s) and 1 activity worker(s) +INFO:worker:waiting for keyboard interrupt or for all workers to exit +INFO:root:activity-worker:0 starting +INFO:root:workflow-worker:1 starting +INFO:root:workflow-worker:0 starting +INFO:root:workflow-worker:1 shutting down +INFO:root:activity-worker:0 shutting down +INFO:root:workflow-worker:0 shutting down +INFO:temporalio.worker._worker:Beginning worker shutdown, will wait 0:00:00 before cancelling activities +INFO:temporalio.worker._worker:Beginning worker shutdown, will wait 0:00:00 before cancelling activities +INFO:temporalio.worker._worker:Beginning worker shutdown, will wait 0:00:00 before cancelling activities +``` + + + +```sh +uv run workflow_task_multiprocessing/starter.py + +INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 +INFO:starter:wf-starting-pid:18493 | activity-pid:18495 | wf-ending-pid:18493 +INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 +INFO:starter:wf-starting-pid:18493 | activity-pid:18495 | wf-ending-pid:18493 +INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 +INFO:starter:wf-starting-pid:18493 | activity-pid:18495 | wf-ending-pid:18493 +INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 +INFO:starter:wf-starting-pid:18493 | activity-pid:18495 | wf-ending-pid:18493 +INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 +INFO:starter:wf-starting-pid:18493 | activity-pid:18495 | wf-ending-pid:18493 +INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 +INFO:starter:wf-starting-pid:18493 | activity-pid:18495 | wf-ending-pid:18493 +INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 +INFO:starter:wf-starting-pid:18493 | activity-pid:18495 | wf-ending-pid:18493 +INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 +INFO:starter:wf-starting-pid:18493 | activity-pid:18495 | wf-ending-pid:18493 +INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 +INFO:starter:wf-starting-pid:18493 | activity-pid:18495 | wf-ending-pid:18493 +INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 +INFO:starter:wf-starting-pid:18493 | activity-pid:18495 | wf-ending-pid:18493 +INFO:starter:wf-starting-pid:18493 | activity-pid:18495 | wf-ending-pid:18493 +INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 +INFO:starter:wf-starting-pid:18493 | activity-pid:18495 | wf-ending-pid:18493 +INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 +INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 +``` + +
\ No newline at end of file diff --git a/workflow_multiprocessing/__init__.py b/workflow_task_multiprocessing/__init__.py similarity index 100% rename from workflow_multiprocessing/__init__.py rename to workflow_task_multiprocessing/__init__.py diff --git a/workflow_multiprocessing/activities.py b/workflow_task_multiprocessing/activities.py similarity index 100% rename from workflow_multiprocessing/activities.py rename to workflow_task_multiprocessing/activities.py diff --git a/workflow_multiprocessing/starter.py b/workflow_task_multiprocessing/starter.py similarity index 81% rename from workflow_multiprocessing/starter.py rename to workflow_task_multiprocessing/starter.py index c87035d6d..83aef3ea8 100644 --- a/workflow_multiprocessing/starter.py +++ b/workflow_task_multiprocessing/starter.py @@ -1,12 +1,15 @@ import asyncio import argparse import uuid +import logging from temporalio.client import Client from temporalio.envconfig import ClientConfig -from workflow_multiprocessing import WORKFLOW_TASK_QUEUE -from workflow_multiprocessing.workflows import ParallelizedWorkflow +from workflow_task_multiprocessing import WORKFLOW_TASK_QUEUE +from workflow_task_multiprocessing.workflows import ParallelizedWorkflow + +logger = logging.getLogger("starter") class Args(argparse.Namespace): @@ -42,8 +45,9 @@ async def main(): # Wait for workflow completion (runs indefinitely until it receives a signal) for wf in asyncio.as_completed(wf_handles): result = await wf - print(result) + logger.info(result) if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) asyncio.run(main()) diff --git a/workflow_multiprocessing/worker.py b/workflow_task_multiprocessing/worker.py similarity index 84% rename from workflow_multiprocessing/worker.py rename to workflow_task_multiprocessing/worker.py index 651b0f6a7..58bbc395e 100644 --- a/workflow_multiprocessing/worker.py +++ b/workflow_task_multiprocessing/worker.py @@ -16,11 +16,11 @@ ) from temporalio.runtime import Runtime, TelemetryConfig -from workflow_multiprocessing import ACTIVITY_TASK_QUEUE, WORKFLOW_TASK_QUEUE -from workflow_multiprocessing.workflows import ParallelizedWorkflow -from workflow_multiprocessing.activities import echo_pid_activity +from workflow_task_multiprocessing import ACTIVITY_TASK_QUEUE, WORKFLOW_TASK_QUEUE +from workflow_task_multiprocessing.workflows import ParallelizedWorkflow +from workflow_task_multiprocessing.activities import echo_pid_activity -logger = logging.getLogger() +logger = logging.getLogger("worker") # Immediately prevent the default Runtime from being created to ensure # each process creates it's own @@ -45,7 +45,7 @@ def main(): f"starting {args.num_workflow_workers} workflow worker(s) and {args.num_activity_workers} activity worker(s)" ) - # This sample prefers fork to avoid re-importing + # This sample prefers fork to avoid re-importing modules # and decrease startup time. Fork is not available on all # operating systems, so we fallback to 'spawn' when not available try: @@ -59,18 +59,18 @@ def main(): # Start workflow workers by submitting them to the # ProcessPoolExecutor worker_futures = [ - executor.submit(worker_entry, "workflow") - for _ in range(args.num_workflow_workers) + executor.submit(worker_entry, "workflow", i) + for i in range(args.num_workflow_workers) ] # In this sample, we start activity workers as separate processes in the # same way we do workflow workers. In production, activity workers # are often deployed separately from workflow workers to account for - # the different scaling characteristics. + # differing scaling characteristics. worker_futures.extend( [ - executor.submit(worker_entry, "activity") - for _ in range(args.num_activity_workers) + executor.submit(worker_entry, "activity", i) + for i in range(args.num_activity_workers) ] ) @@ -82,11 +82,9 @@ def main(): traceback.print_exception(worker.exception()) except KeyboardInterrupt: pass - finally: - executor.shutdown(wait=True) -def worker_entry(worker_type: Literal["workflow", "activity"]): +def worker_entry(worker_type: Literal["workflow", "activity"], id: int): Runtime.set_default(Runtime(telemetry=TelemetryConfig())) async def run_worker(): @@ -100,8 +98,10 @@ async def run_worker(): worker = activity_worker(client) try: + logging.info(f"{worker_type}-worker:{id} starting") await asyncio.shield(worker.run()) except asyncio.CancelledError: + logging.info(f"{worker_type}-worker:{id} shutting down") await worker.shutdown() asyncio.run(run_worker()) diff --git a/workflow_multiprocessing/workflows.py b/workflow_task_multiprocessing/workflows.py similarity index 79% rename from workflow_multiprocessing/workflows.py rename to workflow_task_multiprocessing/workflows.py index e831ec121..76d354bec 100644 --- a/workflow_multiprocessing/workflows.py +++ b/workflow_task_multiprocessing/workflows.py @@ -3,8 +3,8 @@ from temporalio import workflow -from workflow_multiprocessing import ACTIVITY_TASK_QUEUE -from workflow_multiprocessing.activities import echo_pid_activity +from workflow_task_multiprocessing import ACTIVITY_TASK_QUEUE +from workflow_task_multiprocessing.activities import echo_pid_activity @workflow.defn From 76e6a270ff6b962a7468cbad410a371b243fdb36 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 21 Nov 2025 15:47:10 -0800 Subject: [PATCH 03/10] remove html table that rendered poorly. switch to print over logger for simplicity --- workflow_task_multiprocessing/README.md | 84 ++++++++++-------------- workflow_task_multiprocessing/starter.py | 6 +- workflow_task_multiprocessing/worker.py | 13 ++-- 3 files changed, 39 insertions(+), 64 deletions(-) diff --git a/workflow_task_multiprocessing/README.md b/workflow_task_multiprocessing/README.md index 1ef8137a1..3dcd9c639 100644 --- a/workflow_task_multiprocessing/README.md +++ b/workflow_task_multiprocessing/README.md @@ -48,64 +48,46 @@ options: ## Example Output - - - - - - - - - - -
worker.pystarter.py
- ```sh uv run workflow_task_multiprocessing/worker.py -INFO:worker:starting 2 workflow worker(s) and 1 activity worker(s) -INFO:worker:waiting for keyboard interrupt or for all workers to exit -INFO:root:activity-worker:0 starting -INFO:root:workflow-worker:1 starting -INFO:root:workflow-worker:0 starting -INFO:root:workflow-worker:1 shutting down -INFO:root:activity-worker:0 shutting down -INFO:root:workflow-worker:0 shutting down -INFO:temporalio.worker._worker:Beginning worker shutdown, will wait 0:00:00 before cancelling activities -INFO:temporalio.worker._worker:Beginning worker shutdown, will wait 0:00:00 before cancelling activities -INFO:temporalio.worker._worker:Beginning worker shutdown, will wait 0:00:00 before cancelling activities +starting 2 workflow worker(s) and 1 activity worker(s) +waiting for keyboard interrupt or for all workers to exit +workflow-worker:0 starting +workflow-worker:1 starting +activity-worker:0 starting +workflow-worker:0 shutting down +activity-worker:0 shutting down +workflow-worker:1 shutting down ``` - ```sh uv run workflow_task_multiprocessing/starter.py -INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 -INFO:starter:wf-starting-pid:18493 | activity-pid:18495 | wf-ending-pid:18493 -INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 -INFO:starter:wf-starting-pid:18493 | activity-pid:18495 | wf-ending-pid:18493 -INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 -INFO:starter:wf-starting-pid:18493 | activity-pid:18495 | wf-ending-pid:18493 -INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 -INFO:starter:wf-starting-pid:18493 | activity-pid:18495 | wf-ending-pid:18493 -INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 -INFO:starter:wf-starting-pid:18493 | activity-pid:18495 | wf-ending-pid:18493 -INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 -INFO:starter:wf-starting-pid:18493 | activity-pid:18495 | wf-ending-pid:18493 -INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 -INFO:starter:wf-starting-pid:18493 | activity-pid:18495 | wf-ending-pid:18493 -INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 -INFO:starter:wf-starting-pid:18493 | activity-pid:18495 | wf-ending-pid:18493 -INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 -INFO:starter:wf-starting-pid:18493 | activity-pid:18495 | wf-ending-pid:18493 -INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 -INFO:starter:wf-starting-pid:18493 | activity-pid:18495 | wf-ending-pid:18493 -INFO:starter:wf-starting-pid:18493 | activity-pid:18495 | wf-ending-pid:18493 -INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 -INFO:starter:wf-starting-pid:18493 | activity-pid:18495 | wf-ending-pid:18493 -INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 -INFO:starter:wf-starting-pid:18494 | activity-pid:18495 | wf-ending-pid:18494 +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 +wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179 +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 +wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179 +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 +wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179 +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 +wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179 +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 +wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179 +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 +wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179 +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 +wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179 +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 +wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179 +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 +wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179 +wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179 +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 +wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179 +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 +wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179 +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 +wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 ``` - -
\ No newline at end of file diff --git a/workflow_task_multiprocessing/starter.py b/workflow_task_multiprocessing/starter.py index 83aef3ea8..74e1578d4 100644 --- a/workflow_task_multiprocessing/starter.py +++ b/workflow_task_multiprocessing/starter.py @@ -1,7 +1,6 @@ import asyncio import argparse import uuid -import logging from temporalio.client import Client from temporalio.envconfig import ClientConfig @@ -9,8 +8,6 @@ from workflow_task_multiprocessing import WORKFLOW_TASK_QUEUE from workflow_task_multiprocessing.workflows import ParallelizedWorkflow -logger = logging.getLogger("starter") - class Args(argparse.Namespace): num_workflows: int @@ -45,9 +42,8 @@ async def main(): # Wait for workflow completion (runs indefinitely until it receives a signal) for wf in asyncio.as_completed(wf_handles): result = await wf - logger.info(result) + print(result) if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) asyncio.run(main()) diff --git a/workflow_task_multiprocessing/worker.py b/workflow_task_multiprocessing/worker.py index 58bbc395e..438941cbc 100644 --- a/workflow_task_multiprocessing/worker.py +++ b/workflow_task_multiprocessing/worker.py @@ -4,7 +4,6 @@ import multiprocessing import traceback import asyncio -import logging from typing import Literal from temporalio.client import Client @@ -20,7 +19,6 @@ from workflow_task_multiprocessing.workflows import ParallelizedWorkflow from workflow_task_multiprocessing.activities import echo_pid_activity -logger = logging.getLogger("worker") # Immediately prevent the default Runtime from being created to ensure # each process creates it's own @@ -41,7 +39,7 @@ def main(): parser.add_argument("-w", "--num-workflow-workers", type=int, default=2) parser.add_argument("-a", "--num-activity-workers", type=int, default=1) args = parser.parse_args(namespace=Args()) - logger.info( + print( f"starting {args.num_workflow_workers} workflow worker(s) and {args.num_activity_workers} activity worker(s)" ) @@ -75,9 +73,9 @@ def main(): ) try: - logger.info("waiting for keyboard interrupt or for all workers to exit") + print("waiting for keyboard interrupt or for all workers to exit") for worker in concurrent.futures.as_completed(worker_futures): - logger.error("worker exited unexpectedly") + print("ERROR: worker exited unexpectedly") if worker.exception(): traceback.print_exception(worker.exception()) except KeyboardInterrupt: @@ -98,10 +96,10 @@ async def run_worker(): worker = activity_worker(client) try: - logging.info(f"{worker_type}-worker:{id} starting") + print(f"{worker_type}-worker:{id} starting") await asyncio.shield(worker.run()) except asyncio.CancelledError: - logging.info(f"{worker_type}-worker:{id} shutting down") + print(f"{worker_type}-worker:{id} shutting down") await worker.shutdown() asyncio.run(run_worker()) @@ -146,5 +144,4 @@ def activity_worker(client: Client) -> Worker: if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) main() From 8b418388e7a0f8ef36819abdbd15a9b5ce53be82 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 21 Nov 2025 15:53:26 -0800 Subject: [PATCH 04/10] remove syntax highlighting on blocks --- workflow_task_multiprocessing/README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/workflow_task_multiprocessing/README.md b/workflow_task_multiprocessing/README.md index 3dcd9c639..c277dc0bf 100644 --- a/workflow_task_multiprocessing/README.md +++ b/workflow_task_multiprocessing/README.md @@ -17,14 +17,14 @@ This sample demonstrates using `concurrent.futures.ProcessPoolExecutor` to run m To run, first see the root [README.md](../README.md) for prerequisites. Then, run the following from the root directory to run the `workflow_task_multiprocessing/` sample: -```sh +``` uv run workflow_task_multiprocessing/worker.py uv run workflow_task_multiprocessing/starter.py ``` Both `worker.py` and `starter.py` have minimal arguments that can be modified to run the sample in varying contexts. -```sh +``` uv run workflow_task_multiprocessing/worker.py -h usage: worker.py [-h] [-w NUM_WORKFLOW_WORKERS] [-a NUM_ACTIVITY_WORKERS] @@ -35,7 +35,7 @@ options: -a, --num-activity-workers NUM_ACTIVITY_WORKERS ``` -```sh +``` uv run workflow_task_multiprocessing/starter.py -h usage: starter.py [-h] [-n NUM_WORKFLOWS] @@ -48,7 +48,7 @@ options: ## Example Output -```sh +``` uv run workflow_task_multiprocessing/worker.py starting 2 workflow worker(s) and 1 activity worker(s) @@ -62,7 +62,7 @@ workflow-worker:1 shutting down ``` -```sh +``` uv run workflow_task_multiprocessing/starter.py wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 From d6f1cc0e6189a0cc847732b67e56046db8c1b479 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 21 Nov 2025 16:03:40 -0800 Subject: [PATCH 05/10] Apply some minor improvements to the readme --- workflow_task_multiprocessing/README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/workflow_task_multiprocessing/README.md b/workflow_task_multiprocessing/README.md index c277dc0bf..2cb64df01 100644 --- a/workflow_task_multiprocessing/README.md +++ b/workflow_task_multiprocessing/README.md @@ -3,26 +3,26 @@ ## Python Concurrency Limitations -CPU bound tasks effectively cannot be run in parallel in Python due to the [global interpreter lock](https://docs.python.org/3/glossary.html#term-global-interpreter-lock). The [Python standard library `threading` module](https://docs.python.org/3/library/threading.html) shares the following advice: +CPU-bound tasks effectively cannot run in parallel in Python due to the [Global Interpreter Lock (GIL)](https://docs.python.org/3/glossary.html#term-global-interpreter-lock). The Python standard library's [`threading` module](https://docs.python.org/3/library/threading.html) provides the following guidance: > CPython implementation detail: In CPython, due to the Global Interpreter Lock, only one thread can execute Python code at once (even though certain performance-oriented libraries might overcome this limitation). If you want your application to make better use of the computational resources of multi-core machines, you are advised to use multiprocessing or concurrent.futures.ProcessPoolExecutor. However, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously. ## Temporal Workflow Tasks in Python -[Temporal Workflow Tasks](https://docs.temporal.io/tasks#workflow-task) are CPU bound operations and therefore cannot be run concurrently using threads or an async runtime. Instead, we can use [`concurrent.futures.ProcessPoolExecutor`](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor) or the [`multiprocessing` module](https://docs.python.org/3/library/multiprocessing.html) as suggested by the `threading` documentation to more appropriately utilize machine resources. +[Temporal Workflow Tasks](https://docs.temporal.io/tasks#workflow-task) are CPU-bound operations and therefore cannot be run concurrently using threads or an async runtime. Instead, we can use [`concurrent.futures.ProcessPoolExecutor`](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor) or the [`multiprocessing` module](https://docs.python.org/3/library/multiprocessing.html), as suggested by the `threading` documentation, to more appropriately utilize machine resources. -This sample demonstrates using `concurrent.futures.ProcessPoolExecutor` to run multiple workflow worker processes. +This sample demonstrates how to use `concurrent.futures.ProcessPoolExecutor` to run multiple workflow worker processes. ## Running the Sample -To run, first see the root [README.md](../README.md) for prerequisites. Then, run the following from the root directory to run the `workflow_task_multiprocessing/` sample: +To run, first see the root [README.md](../README.md) for prerequisites. Then execute the following commands from the root directory: ``` uv run workflow_task_multiprocessing/worker.py uv run workflow_task_multiprocessing/starter.py ``` -Both `worker.py` and `starter.py` have minimal arguments that can be modified to run the sample in varying contexts. +Both `worker.py` and `starter.py` have minimal arguments that can be adjusted to modify how the sample runs. ``` uv run workflow_task_multiprocessing/worker.py -h From 2369dc98d454c907478e3f33af3f1836f6721cb5 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 21 Nov 2025 16:17:41 -0800 Subject: [PATCH 06/10] fix broken link in top level README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index e572bb42e..f213a3833 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [updatable_timer](updatable_timer) - A timer that can be updated while sleeping. * [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers. * [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code. -* [workflow_multiprocessing](workflow_multiprocessing) - Leverage Python multiprocessing to parallelize workflow tasks. +* [workflow_task_multiprocessing](workflow_task_multiprocessing) - Leverage Python multiprocessing to parallelize workflow tasks. ## Test From eae6e9f3ef9896f17afbf16b310fe24eb93e41f7 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 21 Nov 2025 16:21:50 -0800 Subject: [PATCH 07/10] remove inaccurate comment in starter.py --- workflow_task_multiprocessing/starter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workflow_task_multiprocessing/starter.py b/workflow_task_multiprocessing/starter.py index 74e1578d4..d238837c2 100644 --- a/workflow_task_multiprocessing/starter.py +++ b/workflow_task_multiprocessing/starter.py @@ -39,7 +39,7 @@ async def main(): for _ in range(args.num_workflows) ] - # Wait for workflow completion (runs indefinitely until it receives a signal) + # Wait for workflow completion for wf in asyncio.as_completed(wf_handles): result = await wf print(result) From 019fce19c448976ea42574d146b8764f3d6821ad Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 21 Nov 2025 16:23:00 -0800 Subject: [PATCH 08/10] remove unused workflow input --- workflow_task_multiprocessing/starter.py | 1 - workflow_task_multiprocessing/workflows.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/workflow_task_multiprocessing/starter.py b/workflow_task_multiprocessing/starter.py index d238837c2..c884cadf5 100644 --- a/workflow_task_multiprocessing/starter.py +++ b/workflow_task_multiprocessing/starter.py @@ -32,7 +32,6 @@ async def main(): wf_handles = [ client.execute_workflow( ParallelizedWorkflow.run, - "temporal", id=f"greeting-workflow-id-{uuid.uuid4()}", task_queue=WORKFLOW_TASK_QUEUE, ) diff --git a/workflow_task_multiprocessing/workflows.py b/workflow_task_multiprocessing/workflows.py index 76d354bec..d3d062ddc 100644 --- a/workflow_task_multiprocessing/workflows.py +++ b/workflow_task_multiprocessing/workflows.py @@ -10,7 +10,7 @@ @workflow.defn class ParallelizedWorkflow: @workflow.run - async def run(self, input: str) -> str: + async def run(self) -> str: pid = os.getpid() activity_result = await workflow.execute_activity( echo_pid_activity, From 548f4dfe7af6edb5f7cb1095b90e55177133e0d8 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 21 Nov 2025 16:29:50 -0800 Subject: [PATCH 09/10] run formatter. Ignore linter error on multiprocessing.get_context b/c both ForkContext and SpawnContext are acceptable for what we need --- workflow_task_multiprocessing/activities.py | 1 + workflow_task_multiprocessing/starter.py | 2 +- workflow_task_multiprocessing/worker.py | 11 +++++------ workflow_task_multiprocessing/workflows.py | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/workflow_task_multiprocessing/activities.py b/workflow_task_multiprocessing/activities.py index 3416327a5..dbbbc6778 100644 --- a/workflow_task_multiprocessing/activities.py +++ b/workflow_task_multiprocessing/activities.py @@ -1,4 +1,5 @@ import os + from temporalio import activity diff --git a/workflow_task_multiprocessing/starter.py b/workflow_task_multiprocessing/starter.py index c884cadf5..82d4e51a9 100644 --- a/workflow_task_multiprocessing/starter.py +++ b/workflow_task_multiprocessing/starter.py @@ -1,5 +1,5 @@ -import asyncio import argparse +import asyncio import uuid from temporalio.client import Client diff --git a/workflow_task_multiprocessing/worker.py b/workflow_task_multiprocessing/worker.py index 438941cbc..8efb2561c 100644 --- a/workflow_task_multiprocessing/worker.py +++ b/workflow_task_multiprocessing/worker.py @@ -1,24 +1,23 @@ import argparse +import asyncio import concurrent.futures import dataclasses import multiprocessing import traceback -import asyncio from typing import Literal from temporalio.client import Client from temporalio.envconfig import ClientConfig -from temporalio.worker import Worker, PollerBehaviorSimpleMaximum +from temporalio.runtime import Runtime, TelemetryConfig +from temporalio.worker import PollerBehaviorSimpleMaximum, Worker from temporalio.worker.workflow_sandbox import ( SandboxedWorkflowRunner, SandboxRestrictions, ) -from temporalio.runtime import Runtime, TelemetryConfig from workflow_task_multiprocessing import ACTIVITY_TASK_QUEUE, WORKFLOW_TASK_QUEUE -from workflow_task_multiprocessing.workflows import ParallelizedWorkflow from workflow_task_multiprocessing.activities import echo_pid_activity - +from workflow_task_multiprocessing.workflows import ParallelizedWorkflow # Immediately prevent the default Runtime from being created to ensure # each process creates it's own @@ -49,7 +48,7 @@ def main(): try: mp_ctx = multiprocessing.get_context("fork") except ValueError: - mp_ctx = multiprocessing.get_context("spawn") + mp_ctx = multiprocessing.get_context("spawn") # type: ignore with concurrent.futures.ProcessPoolExecutor( args.total_workers, mp_context=mp_ctx diff --git a/workflow_task_multiprocessing/workflows.py b/workflow_task_multiprocessing/workflows.py index d3d062ddc..6c378172e 100644 --- a/workflow_task_multiprocessing/workflows.py +++ b/workflow_task_multiprocessing/workflows.py @@ -1,5 +1,5 @@ -from datetime import timedelta import os +from datetime import timedelta from temporalio import workflow From 001cb1a38c6ef7f5fc586f0f0c97d99918b2b7b1 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Mon, 24 Nov 2025 13:08:33 -0800 Subject: [PATCH 10/10] rename to worker_multiprocessing --- README.md | 2 +- .../README.md | 14 +++++++------- .../__init__.py | 0 .../activities.py | 0 .../starter.py | 4 ++-- .../worker.py | 6 +++--- .../workflows.py | 4 ++-- 7 files changed, 15 insertions(+), 15 deletions(-) rename {workflow_task_multiprocessing => worker_multiprocessing}/README.md (92%) rename {workflow_task_multiprocessing => worker_multiprocessing}/__init__.py (100%) rename {workflow_task_multiprocessing => worker_multiprocessing}/activities.py (100%) rename {workflow_task_multiprocessing => worker_multiprocessing}/starter.py (88%) rename {workflow_task_multiprocessing => worker_multiprocessing}/worker.py (95%) rename {workflow_task_multiprocessing => worker_multiprocessing}/workflows.py (78%) diff --git a/README.md b/README.md index f213a3833..5a39e834b 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [updatable_timer](updatable_timer) - A timer that can be updated while sleeping. * [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers. * [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code. -* [workflow_task_multiprocessing](workflow_task_multiprocessing) - Leverage Python multiprocessing to parallelize workflow tasks. +* [worker_multiprocessing](worker_multiprocessing) - Leverage Python multiprocessing to parallelize workflow tasks and other CPU bound operations by running multiple workers. ## Test diff --git a/workflow_task_multiprocessing/README.md b/worker_multiprocessing/README.md similarity index 92% rename from workflow_task_multiprocessing/README.md rename to worker_multiprocessing/README.md index 2cb64df01..a4f06f86b 100644 --- a/workflow_task_multiprocessing/README.md +++ b/worker_multiprocessing/README.md @@ -1,4 +1,4 @@ -# Workflow Task Multiprocessing Sample +# Worker Multiprocessing Sample ## Python Concurrency Limitations @@ -18,14 +18,14 @@ This sample demonstrates how to use `concurrent.futures.ProcessPoolExecutor` to To run, first see the root [README.md](../README.md) for prerequisites. Then execute the following commands from the root directory: ``` -uv run workflow_task_multiprocessing/worker.py -uv run workflow_task_multiprocessing/starter.py +uv run worker_multiprocessing/worker.py +uv run worker_multiprocessing/starter.py ``` Both `worker.py` and `starter.py` have minimal arguments that can be adjusted to modify how the sample runs. ``` -uv run workflow_task_multiprocessing/worker.py -h +uv run worker_multiprocessing/worker.py -h usage: worker.py [-h] [-w NUM_WORKFLOW_WORKERS] [-a NUM_ACTIVITY_WORKERS] @@ -36,7 +36,7 @@ options: ``` ``` -uv run workflow_task_multiprocessing/starter.py -h +uv run worker_multiprocessing/starter.py -h usage: starter.py [-h] [-n NUM_WORKFLOWS] @@ -49,7 +49,7 @@ options: ## Example Output ``` -uv run workflow_task_multiprocessing/worker.py +uv run worker_multiprocessing/worker.py starting 2 workflow worker(s) and 1 activity worker(s) waiting for keyboard interrupt or for all workers to exit @@ -63,7 +63,7 @@ workflow-worker:1 shutting down ``` -uv run workflow_task_multiprocessing/starter.py +uv run worker_multiprocessing/starter.py wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178 wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179 diff --git a/workflow_task_multiprocessing/__init__.py b/worker_multiprocessing/__init__.py similarity index 100% rename from workflow_task_multiprocessing/__init__.py rename to worker_multiprocessing/__init__.py diff --git a/workflow_task_multiprocessing/activities.py b/worker_multiprocessing/activities.py similarity index 100% rename from workflow_task_multiprocessing/activities.py rename to worker_multiprocessing/activities.py diff --git a/workflow_task_multiprocessing/starter.py b/worker_multiprocessing/starter.py similarity index 88% rename from workflow_task_multiprocessing/starter.py rename to worker_multiprocessing/starter.py index 82d4e51a9..3267694de 100644 --- a/workflow_task_multiprocessing/starter.py +++ b/worker_multiprocessing/starter.py @@ -5,8 +5,8 @@ from temporalio.client import Client from temporalio.envconfig import ClientConfig -from workflow_task_multiprocessing import WORKFLOW_TASK_QUEUE -from workflow_task_multiprocessing.workflows import ParallelizedWorkflow +from worker_multiprocessing import WORKFLOW_TASK_QUEUE +from worker_multiprocessing.workflows import ParallelizedWorkflow class Args(argparse.Namespace): diff --git a/workflow_task_multiprocessing/worker.py b/worker_multiprocessing/worker.py similarity index 95% rename from workflow_task_multiprocessing/worker.py rename to worker_multiprocessing/worker.py index 8efb2561c..bda69ba63 100644 --- a/workflow_task_multiprocessing/worker.py +++ b/worker_multiprocessing/worker.py @@ -15,9 +15,9 @@ SandboxRestrictions, ) -from workflow_task_multiprocessing import ACTIVITY_TASK_QUEUE, WORKFLOW_TASK_QUEUE -from workflow_task_multiprocessing.activities import echo_pid_activity -from workflow_task_multiprocessing.workflows import ParallelizedWorkflow +from worker_multiprocessing import ACTIVITY_TASK_QUEUE, WORKFLOW_TASK_QUEUE +from worker_multiprocessing.activities import echo_pid_activity +from worker_multiprocessing.workflows import ParallelizedWorkflow # Immediately prevent the default Runtime from being created to ensure # each process creates it's own diff --git a/workflow_task_multiprocessing/workflows.py b/worker_multiprocessing/workflows.py similarity index 78% rename from workflow_task_multiprocessing/workflows.py rename to worker_multiprocessing/workflows.py index 6c378172e..04f6b6351 100644 --- a/workflow_task_multiprocessing/workflows.py +++ b/worker_multiprocessing/workflows.py @@ -3,8 +3,8 @@ from temporalio import workflow -from workflow_task_multiprocessing import ACTIVITY_TASK_QUEUE -from workflow_task_multiprocessing.activities import echo_pid_activity +from worker_multiprocessing import ACTIVITY_TASK_QUEUE +from worker_multiprocessing.activities import echo_pid_activity @workflow.defn