From 1304ac3261cf0a16434594fe1568144d563989f3 Mon Sep 17 00:00:00 2001 From: dmmulroy Date: Thu, 1 Jan 2026 19:08:44 -0500 Subject: [PATCH 1/4] feat(python): add saga rollback API to Python Workflows SDK Add withRollback/rollbackAll for compensating transactions in Python. --- .../workers-api/src/workers/_workers.py | 177 ++++++++++++ src/workerd/server/tests/python/BUILD.bazel | 2 + .../tests/python/workflow-rollback/worker.js | 125 ++++++++ .../workflow-rollback.wd-test | 34 +++ .../python/workflow-rollback/workflow.py | 267 ++++++++++++++++++ 5 files changed, 605 insertions(+) create mode 100644 src/workerd/server/tests/python/workflow-rollback/worker.js create mode 100644 src/workerd/server/tests/python/workflow-rollback/workflow-rollback.wd-test create mode 100644 src/workerd/server/tests/python/workflow-rollback/workflow.py diff --git a/src/pyodide/internal/workers-api/src/workers/_workers.py b/src/pyodide/internal/workers-api/src/workers/_workers.py index 21b97ed55fb..fa610647dca 100644 --- a/src/pyodide/internal/workers-api/src/workers/_workers.py +++ b/src/pyodide/internal/workers-api/src/workers/_workers.py @@ -1125,11 +1125,100 @@ def wrapper(*args, **kwargs): return wrapper +class _UndoStackEntry: + """Internal entry for tracking rollback handlers.""" + + def __init__(self, name: str, value: Any, undo_fn, undo_config: dict | None = None): + self.name = name + self.value = value + self.undo_fn = undo_fn + self.undo_config = undo_config + + +class _RollbackStepWrapper: + """ + Wrapper returned by step.with_rollback() that allows attaching an .undo decorator. + + Usage: + @step.with_rollback("save to db") + async def save(): + return await db.insert(data) + + @save.undo + async def undo_save(error, record_id): + await db.delete(record_id) + + record_id = await save() + """ + + def __init__( + self, + step_wrapper: "_WorkflowStepWrapper", + name: str, + do_fn, + config: dict | None = None, + ): + self._step_wrapper = step_wrapper + self._name = name + self._do_fn = do_fn + self._config = config + self._undo_fn = None + self._undo_config = None + self._step_name = name # For compatibility with dependency resolution + + def undo(self, config: dict | None = None): + """ + Decorator to register an undo/compensation function for this step. + + The undo function receives (error, value) where value is the result + of the do function. + + Args: + config: Optional WorkflowStepConfig for the undo step's retry behavior. + If not provided, inherits from the do step's config. + """ + + def decorator(fn): + self._undo_fn = fn + self._undo_config = config + return fn + + # Support both @save.undo and @save.undo(config={...}) + if callable(config): + fn = config + self._undo_fn = fn + self._undo_config = None + return fn + + return decorator + + async def __call__(self): + """Execute the step and register it on the undo stack if an undo handler is defined.""" + result = await _do_call( + self._step_wrapper, self._name, self._config, self._do_fn + ) + + # Only add to undo stack if an undo handler was registered + if self._undo_fn is not None: + self._step_wrapper._undo_stack.append( + _UndoStackEntry( + name=self._name, + value=result, + undo_fn=self._undo_fn, + undo_config=self._undo_config or self._config, + ) + ) + + return result + + class _WorkflowStepWrapper: def __init__(self, js_step): self._js_step = js_step self._memoized_dependencies = {} self._in_flight = {} + self._undo_stack: list[_UndoStackEntry] = [] + self._is_rolling_back = False def do(self, name, depends=None, concurrent=False, config=None): def decorator(func): @@ -1169,6 +1258,94 @@ def wait_for_event(self, name, event_type, /, timeout="24 hours"): ), ) + def with_rollback(self, name: str, config: dict | None = None): + """ + Decorator to define a step with rollback/compensation support (saga pattern). + + Returns a callable wrapper that allows attaching an .undo decorator for + compensation logic. The undo function executes in LIFO order when + rollback_all() is called. + + Args: + name: The name of the step. + config: Optional WorkflowStepConfig for configuring retry behavior. + + Usage: + @step.with_rollback("save to db") + async def save(): + return await db.insert(data) + + @save.undo + async def undo_save(error, record_id): + await db.delete(record_id) + + record_id = await save() + + # Later, if something fails: + try: + await some_failing_step() + except Exception as e: + await step.rollback_all(e) + raise + """ + + def decorator(func): + return _RollbackStepWrapper(self, name, func, config) + + return decorator + + async def rollback_all(self, trigger_error: Exception): + """ + Execute all registered undo handlers in LIFO (Last-In, First-Out) order. + + This implements the saga pattern's compensation/rollback mechanism. + Each undo handler is wrapped in a step.do for durability and retry support. + + Args: + trigger_error: The error that triggered the rollback. Passed to each + undo handler as the first argument. + + Raises: + RuntimeError: If called while a rollback is already in progress. + Exception: Re-raises the first undo failure if continueOnUndoFailure + is not set (default behavior). + + Usage: + try: + # ... workflow steps with rollback handlers ... + except Exception as e: + await step.rollback_all(e) + raise + """ + if self._is_rolling_back: + raise RuntimeError( + "Cannot call rollback_all() while rollback is in progress" + ) + + if len(self._undo_stack) == 0: + return + + self._is_rolling_back = True + + try: + # Execute undos in LIFO order + while len(self._undo_stack) > 0: + entry = self._undo_stack.pop() + undo_step_name = f"__undo__{entry.name}" + + # Create undo callback that invokes the user's undo function + async def make_undo_callback(e=entry, err=trigger_error): + undo_result = e.undo_fn(err, e.value) + if inspect.iscoroutine(undo_result): + await undo_result + + # Wrap undo in step.do for durability/retry + await _do_call( + self, undo_step_name, entry.undo_config, make_undo_callback + ) + finally: + self._is_rolling_back = False + async def _resolve_dependency(self, dep): if dep._step_name in self._memoized_dependencies: return self._memoized_dependencies[dep._step_name] diff --git a/src/workerd/server/tests/python/BUILD.bazel b/src/workerd/server/tests/python/BUILD.bazel index 386ea963098..6c55984b181 100644 --- a/src/workerd/server/tests/python/BUILD.bazel +++ b/src/workerd/server/tests/python/BUILD.bazel @@ -58,6 +58,8 @@ py_wd_test("python-rpc") py_wd_test("workflow-entrypoint") +py_wd_test("workflow-rollback") + py_wd_test("vendor_dir_compat_flag") py_wd_test("multiprocessing") diff --git a/src/workerd/server/tests/python/workflow-rollback/worker.js b/src/workerd/server/tests/python/workflow-rollback/worker.js new file mode 100644 index 00000000000..ab9f2f29520 --- /dev/null +++ b/src/workerd/server/tests/python/workflow-rollback/worker.js @@ -0,0 +1,125 @@ +import { RpcTarget } from 'cloudflare:workers'; +import * as assert from 'node:assert'; + +class Context extends RpcTarget { + async do(name, configOrFn, maybeFn) { + const fn = maybeFn ?? configOrFn; + try { + const result = await fn(); + return result; + } catch (e) { + console.log(`Error received: ${e.name} Message: ${e.message}`); + throw e; + } + } +} + +export default { + async test(ctrl, env, ctx) { + const stubStep = new Context(); + + // Test 1: Basic rollback + { + const resp = await env.PythonWorkflow.run( + { test: 'basic_rollback' }, + stubStep + ); + assert.deepStrictEqual(resp, [ + 'do_1', + 'do_2', + 'undo_2:value_2', + 'undo_1:value_1', + ]); + console.log('✓ basic_rollback'); + } + + // Test 2: LIFO order + { + const resp = await env.PythonWorkflow.run( + { test: 'lifo_order' }, + stubStep + ); + assert.deepStrictEqual(resp, ['third', 'second', 'first']); + console.log('✓ lifo_order'); + } + + // Test 3: Steps without undo handlers + { + const resp = await env.PythonWorkflow.run( + { test: 'no_undo_handler' }, + stubStep + ); + assert.deepStrictEqual(resp, [ + 'do_with_undo', + 'do_without_undo', + 'undo_with_undo', + ]); + console.log('✓ no_undo_handler'); + } + + // Test 4: Undo receives value and error + { + const resp = await env.PythonWorkflow.run( + { test: 'undo_receives_value' }, + stubStep + ); + assert.deepStrictEqual(resp.value, { id: 123, data: 'important' }); + assert.strictEqual(resp.error_msg, 'the error message'); + console.log('✓ undo_receives_value'); + } + + // Test 5: Undo with separate config + { + const resp = await env.PythonWorkflow.run( + { test: 'undo_with_config' }, + stubStep + ); + assert.strictEqual(resp.undo, 1); + console.log('✓ undo_with_config'); + } + + // Test 6: Empty undo stack is no-op + { + const resp = await env.PythonWorkflow.run( + { test: 'empty_undo_stack_noop' }, + stubStep + ); + assert.strictEqual(resp.success, true); + console.log('✓ empty_undo_stack_noop'); + } + + // Test 7: Nested rollback_all throws RuntimeError + { + const resp = await env.PythonWorkflow.run( + { test: 'nested_rollback_throws' }, + stubStep + ); + assert.strictEqual(resp.type, 'RuntimeError'); + assert.ok(resp.message.includes('rollback')); + console.log('✓ nested_rollback_throws'); + } + + // Test 8: Stop on first undo failure + { + const resp = await env.PythonWorkflow.run( + { test: 'stop_on_first_undo_failure' }, + stubStep + ); + // LIFO order: undo_3 runs, undo_2 fails, undo_1 never runs + assert.deepStrictEqual(resp, ['undo_3', 'undo_2']); + console.log('✓ stop_on_first_undo_failure'); + } + + // Test 9: Rollback without error argument + { + const resp = await env.PythonWorkflow.run( + { test: 'rollback_without_error_arg' }, + stubStep + ); + assert.deepStrictEqual(resp, ['undo_1:error=true']); + console.log('✓ rollback_without_error_arg'); + } + + console.log('All rollback tests passed!'); + }, +}; diff --git a/src/workerd/server/tests/python/workflow-rollback/workflow-rollback.wd-test b/src/workerd/server/tests/python/workflow-rollback/workflow-rollback.wd-test new file mode 100644 index 00000000000..c209f4036e3 --- /dev/null +++ b/src/workerd/server/tests/python/workflow-rollback/workflow-rollback.wd-test @@ -0,0 +1,34 @@ +using Workerd = import "/workerd/workerd.capnp"; + +const config :Workerd.Config = ( + services = [ + (name = "py", worker = .pyWorker), + (name = "js", worker = .jsWorker), + ], +); + +const pyWorker :Workerd.Worker = ( + + compatibilityFlags = [%PYTHON_FEATURE_FLAGS, "python_workflows", "rpc", "disable_python_no_global_handlers"], + + modules = [ + (name = "workflow.py", pythonModule = embed "workflow.py"), + ], + + bindings = [ + (name = "PythonWorkflow", service = (name = "py", entrypoint = "WorkflowRollbackExample")), + ], +); + +const jsWorker :Workerd.Worker = ( + + compatibilityFlags = ["nodejs_compat", "rpc", "disable_python_no_global_handlers"], + + modules = [ + (name = "worker", esModule = embed "worker.js"), + ], + + bindings = [ + (name = "PythonWorkflow", service = (name = "py", entrypoint = "WorkflowRollbackExample")), + ], +); diff --git a/src/workerd/server/tests/python/workflow-rollback/workflow.py b/src/workerd/server/tests/python/workflow-rollback/workflow.py new file mode 100644 index 00000000000..4838479385e --- /dev/null +++ b/src/workerd/server/tests/python/workflow-rollback/workflow.py @@ -0,0 +1,267 @@ +# Copyright (c) 2025 Cloudflare, Inc. +# Licensed under the Apache 2.0 license found in the LICENSE file or at: +# https://opensource.org/licenses/Apache-2.0 + +from workers import WorkflowEntrypoint + + +class WorkflowRollbackExample(WorkflowEntrypoint): + """Test workflow demonstrating the saga rollback pattern.""" + + async def run(self, event, step): + test_name = event.get("test", "basic_rollback") + + if test_name == "basic_rollback": + return await self._test_basic_rollback(step) + elif test_name == "lifo_order": + return await self._test_lifo_order(step) + elif test_name == "no_undo_handler": + return await self._test_no_undo_handler(step) + elif test_name == "undo_receives_value": + return await self._test_undo_receives_value(step) + elif test_name == "undo_with_config": + return await self._test_undo_with_config(step) + elif test_name == "empty_undo_stack_noop": + return await self._test_empty_undo_stack_noop(step) + elif test_name == "nested_rollback_throws": + return await self._test_nested_rollback_throws(step) + elif test_name == "stop_on_first_undo_failure": + return await self._test_stop_on_first_undo_failure(step) + elif test_name == "rollback_without_error_arg": + return await self._test_rollback_without_error_arg(step) + else: + raise ValueError(f"Unknown test: {test_name}") + + async def _test_basic_rollback(self, step): + """Test that rollback_all executes undo handlers.""" + results = [] + + @step.with_rollback("step_1") + async def step_1(): + results.append("do_1") + return "value_1" + + @step_1.undo + async def undo_1(error, value): + results.append(f"undo_1:{value}") + + @step.with_rollback("step_2") + async def step_2(): + results.append("do_2") + return "value_2" + + @step_2.undo + async def undo_2(error, value): + results.append(f"undo_2:{value}") + + await step_1() + await step_2() + + # Trigger rollback + try: + raise ValueError("test error") + except ValueError as e: + await step.rollback_all(e) + + return results + + async def _test_lifo_order(self, step): + """Test that undos execute in LIFO (reverse) order.""" + order = [] + + @step.with_rollback("first") + async def first(): + return 1 + + @first.undo + async def undo_first(error, value): + order.append("first") + + @step.with_rollback("second") + async def second(): + return 2 + + @second.undo + async def undo_second(error, value): + order.append("second") + + @step.with_rollback("third") + async def third(): + return 3 + + @third.undo + async def undo_third(error, value): + order.append("third") + + await first() + await second() + await third() + + await step.rollback_all(Exception("trigger")) + + # Should be LIFO: third, second, first + return order + + async def _test_no_undo_handler(self, step): + """Test that steps without undo handlers don't break rollback.""" + results = [] + + @step.with_rollback("with_undo") + async def with_undo(): + results.append("do_with_undo") + return "value" + + @with_undo.undo + async def undo_with(error, value): + results.append("undo_with_undo") + + # Step without .undo decorator + @step.with_rollback("without_undo") + async def without_undo(): + results.append("do_without_undo") + return "no_undo_value" + + await with_undo() + await without_undo() + + await step.rollback_all(Exception("trigger")) + + # Only with_undo should have undo called + return results + + async def _test_undo_receives_value(self, step): + """Test that undo receives the original step's return value.""" + received = {} + + @step.with_rollback("step_with_value") + async def step_with_value(): + return {"id": 123, "data": "important"} + + @step_with_value.undo + async def undo_step(error, value): + received["value"] = value + received["error_msg"] = str(error) + + await step_with_value() + await step.rollback_all(ValueError("the error message")) + + return received + + async def _test_undo_with_config(self, step): + """Test that undo can have its own retry config.""" + call_count = {"undo": 0} + + @step.with_rollback("step", config={"retries": {"limit": 1}}) + async def step_fn(): + return "done" + + @step_fn.undo(config={"retries": {"limit": 3}}) + async def undo_fn(error, value): + call_count["undo"] += 1 + return "undone" + + await step_fn() + await step.rollback_all(Exception("trigger")) + + return call_count + + async def _test_empty_undo_stack_noop(self, step): + """Test that rollback_all is a no-op when undo stack is empty.""" + + # Don't register any rollback handlers + @step.do("regular_step") + async def regular_step(): + return "done" + + await regular_step() + + # This should not raise or do anything + await step.rollback_all(Exception("trigger")) + + return {"success": True} + + async def _test_nested_rollback_throws(self, step): + """Test that calling rollback_all during rollback throws RuntimeError.""" + error_caught = {"type": None, "message": None} + + @step.with_rollback("step_1") + async def step_1(): + return "value" + + @step_1.undo + async def undo_1(error, value): + # Try to call rollback_all from within an undo handler + try: + await step.rollback_all(Exception("nested")) + except RuntimeError as e: + error_caught["type"] = "RuntimeError" + error_caught["message"] = str(e) + + await step_1() + await step.rollback_all(Exception("trigger")) + + return error_caught + + async def _test_stop_on_first_undo_failure(self, step): + """Test that rollback stops on first undo failure by default.""" + executed = [] + + @step.with_rollback("step_1") + async def step_1(): + return 1 + + @step_1.undo + async def undo_1(error, value): + executed.append("undo_1") + + @step.with_rollback("step_2") + async def step_2(): + return 2 + + @step_2.undo + async def undo_2(error, value): + executed.append("undo_2") + raise Exception("undo_2 failed") + + @step.with_rollback("step_3") + async def step_3(): + return 3 + + @step_3.undo + async def undo_3(error, value): + executed.append("undo_3") + + await step_1() + await step_2() + await step_3() + + # Rollback should: undo_3 (ok), undo_2 (fail), stop before undo_1 + try: + await step.rollback_all(Exception("trigger")) + except Exception: + pass # Expected to fail + + return executed + + async def _test_rollback_without_error_arg(self, step): + """Test that rollback_all works when called without an error argument.""" + results = [] + + @step.with_rollback("step_1") + async def step_1(): + return "value" + + @step_1.undo + async def undo_1(error, value): + results.append(f"undo_1:error={error is None}") + + await step_1() + + # Call without error - should use None + await step.rollback_all(None) + + return results + + +async def test(ctrl, env, ctx): + pass From aeabeb92c3c900973eba8adf0bc375fd2b9c0de0 Mon Sep 17 00:00:00 2001 From: dmmulroy Date: Thu, 1 Jan 2026 23:14:17 -0500 Subject: [PATCH 2/4] fix: address lint errors in workflow rollback tests --- .../server/tests/python/workflow-rollback/workflow.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/workerd/server/tests/python/workflow-rollback/workflow.py b/src/workerd/server/tests/python/workflow-rollback/workflow.py index 4838479385e..bdc896c16aa 100644 --- a/src/workerd/server/tests/python/workflow-rollback/workflow.py +++ b/src/workerd/server/tests/python/workflow-rollback/workflow.py @@ -58,10 +58,7 @@ async def undo_2(error, value): await step_2() # Trigger rollback - try: - raise ValueError("test error") - except ValueError as e: - await step.rollback_all(e) + await step.rollback_all(ValueError("test error")) return results @@ -221,7 +218,7 @@ async def step_2(): @step_2.undo async def undo_2(error, value): executed.append("undo_2") - raise Exception("undo_2 failed") + raise RuntimeError("undo_2 failed") @step.with_rollback("step_3") async def step_3(): From fa5e3a68c54236d432d1c0d3a2733a0218a86fa4 Mon Sep 17 00:00:00 2001 From: Dillon Mulroy Date: Mon, 5 Jan 2026 16:14:31 -0500 Subject: [PATCH 3/4] Refactor: delegate to engine withRollback/rollbackAll instead of Python-side undo stack - Remove Python-side _UndoStackEntry and _undo_stack - Add _withRollback_call() to wrap Python callbacks for JS engine - Add continue_on_error parameter to rollback_all() - Update tests for engine-delegated behavior and ExceptionGroup handling --- .../workers-api/src/workers/_workers.py | 167 ++++++++++-------- .../tests/python/workflow-rollback/worker.js | 18 +- .../python/workflow-rollback/workflow.py | 60 ++++++- 3 files changed, 166 insertions(+), 79 deletions(-) diff --git a/src/pyodide/internal/workers-api/src/workers/_workers.py b/src/pyodide/internal/workers-api/src/workers/_workers.py index fa610647dca..6a0ede474a8 100644 --- a/src/pyodide/internal/workers-api/src/workers/_workers.py +++ b/src/pyodide/internal/workers-api/src/workers/_workers.py @@ -1125,20 +1125,12 @@ def wrapper(*args, **kwargs): return wrapper -class _UndoStackEntry: - """Internal entry for tracking rollback handlers.""" - - def __init__(self, name: str, value: Any, undo_fn, undo_config: dict | None = None): - self.name = name - self.value = value - self.undo_fn = undo_fn - self.undo_config = undo_config - - class _RollbackStepWrapper: """ Wrapper returned by step.with_rollback() that allows attaching an .undo decorator. + Delegates to the engine's withRollback for durable undo stack management. + Usage: @step.with_rollback("save to db") async def save(): @@ -1193,32 +1185,22 @@ def decorator(fn): return decorator async def __call__(self): - """Execute the step and register it on the undo stack if an undo handler is defined.""" - result = await _do_call( - self._step_wrapper, self._name, self._config, self._do_fn + """Execute the step via engine's withRollback for durable undo stack.""" + return await _withRollback_call( + self._step_wrapper, + self._name, + self._config, + self._undo_config, + self._do_fn, + self._undo_fn, ) - # Only add to undo stack if an undo handler was registered - if self._undo_fn is not None: - self._step_wrapper._undo_stack.append( - _UndoStackEntry( - name=self._name, - value=result, - undo_fn=self._undo_fn, - undo_config=self._undo_config or self._config, - ) - ) - - return result - class _WorkflowStepWrapper: def __init__(self, js_step): self._js_step = js_step self._memoized_dependencies = {} self._in_flight = {} - self._undo_stack: list[_UndoStackEntry] = [] - self._is_rolling_back = False def do(self, name, depends=None, concurrent=False, config=None): def decorator(func): @@ -1294,57 +1276,36 @@ def decorator(func): return decorator - async def rollback_all(self, trigger_error: Exception): + async def rollback_all( + self, trigger_error: Exception = None, *, continue_on_error: bool = False + ): """ - Execute all registered undo handlers in LIFO (Last-In, First-Out) order. - - This implements the saga pattern's compensation/rollback mechanism. - Each undo handler is wrapped in a step.do for durability and retry support. + Execute all registered undo handlers in LIFO order. Args: - trigger_error: The error that triggered the rollback. Passed to each - undo handler as the first argument. - - Raises: - RuntimeError: If called while a rollback is already in progress. - Exception: Re-raises the first undo failure if continueOnUndoFailure - is not set (default behavior). - - Usage: - try: - # ... workflow steps with rollback handlers ... - except Exception as e: - await step.rollback_all(e) - raise + trigger_error: Error passed to each undo handler. + continue_on_error: Continue after undo failure (raises AggregateError). """ - if self._is_rolling_back: - raise RuntimeError( - "Cannot call rollback_all() while rollback is in progress" - ) - - if len(self._undo_stack) == 0: - return - - self._is_rolling_back = True - try: - # Execute undos in LIFO order - while len(self._undo_stack) > 0: - entry = self._undo_stack.pop() - undo_step_name = f"__undo__{entry.name}" - - # Create undo callback that invokes the user's undo function - async def make_undo_callback(e=entry, err=trigger_error): - undo_result = e.undo_fn(err, e.value) - if inspect.iscoroutine(undo_result): - await undo_result - - # Wrap undo in step.do for durability/retry - await _do_call( - self, undo_step_name, entry.undo_config, make_undo_callback + # Convert Python exception to JS error for the engine + js_error = None + if trigger_error is not None: + js_error = to_js( + { + "name": type(trigger_error).__name__, + "message": str(trigger_error), + }, + dict_converter=Object.fromEntries, ) - finally: - self._is_rolling_back = False + + options = to_js( + {"continueOnError": continue_on_error}, + dict_converter=Object.fromEntries, + ) + + await self._js_step.rollbackAll(js_error, options) + except Exception as exc: + raise _from_js_error(exc) from exc async def _resolve_dependency(self, dep): if dep._step_name in self._memoized_dependencies: @@ -1388,6 +1349,66 @@ async def _closure(): return result +async def _withRollback_call(entrypoint, name, config, undo_config, do_fn, undo_fn): + """Call the engine's withRollback with Python callbacks wrapped for JS.""" + + async def _closure(): + async def _do_callback(): + result = do_fn() + if inspect.iscoroutine(result): + result = await result + return to_js(result, dict_converter=Object.fromEntries) + + async def _undo_callback(js_err, js_value): + py_err = None + if js_err is not None: + py_err = ( + _from_js_error(js_err) if hasattr(js_err, "message") else js_err + ) + + py_value = python_from_rpc(js_value) + + result = undo_fn(py_err, py_value) + if inspect.iscoroutine(result): + await result + + handler = {"do": _do_callback} + if undo_fn is not None: + handler["undo"] = _undo_callback + + js_handler = to_js(handler, dict_converter=Object.fromEntries) + + js_config = None + if config is not None or undo_config is not None: + config_dict = dict(config) if config else {} + if undo_config is not None: + config_dict["undoConfig"] = undo_config + js_config = to_js(config_dict, dict_converter=Object.fromEntries) + + try: + if js_config is None: + result = await entrypoint._js_step.withRollback(name, js_handler) + else: + result = await entrypoint._js_step.withRollback( + name, js_handler, js_config + ) + + return python_from_rpc(result) + except Exception as exc: + raise _from_js_error(exc) from exc + + task = create_task(_closure()) + entrypoint._in_flight[name] = task + + try: + result = await task + entrypoint._memoized_dependencies[name] = result + finally: + del entrypoint._in_flight[name] + + return result + + def _wrap_subclass(cls): # Override the class __init__ so that we can wrap the `env` in the constructor. original_init = cls.__init__ diff --git a/src/workerd/server/tests/python/workflow-rollback/worker.js b/src/workerd/server/tests/python/workflow-rollback/worker.js index ab9f2f29520..2b99bf11edc 100644 --- a/src/workerd/server/tests/python/workflow-rollback/worker.js +++ b/src/workerd/server/tests/python/workflow-rollback/worker.js @@ -88,13 +88,14 @@ export default { console.log('✓ empty_undo_stack_noop'); } - // Test 7: Nested rollback_all throws RuntimeError + // Test 7: Nested rollback_all throws an error (engine's WorkflowFatalError) { const resp = await env.PythonWorkflow.run( { test: 'nested_rollback_throws' }, stubStep ); - assert.strictEqual(resp.type, 'RuntimeError'); + // Engine throws WorkflowFatalError when rollbackAll is called during rollback + assert.ok(resp.type !== null, 'Expected an error type'); assert.ok(resp.message.includes('rollback')); console.log('✓ nested_rollback_throws'); } @@ -120,6 +121,19 @@ export default { console.log('✓ rollback_without_error_arg'); } + // Test 10: continue_on_error executes all undos + { + const resp = await env.PythonWorkflow.run( + { test: 'continue_on_error' }, + stubStep + ); + // All undos should execute (LIFO: undo_3, undo_2 fails, undo_1) + assert.deepStrictEqual(resp.executed, ['undo_3', 'undo_2', 'undo_1']); + assert.strictEqual(resp.error.type, 'ExceptionGroup'); + assert.strictEqual(resp.error.count, 1); + console.log('✓ continue_on_error'); + } + console.log('All rollback tests passed!'); }, }; diff --git a/src/workerd/server/tests/python/workflow-rollback/workflow.py b/src/workerd/server/tests/python/workflow-rollback/workflow.py index bdc896c16aa..de40a6fd8e6 100644 --- a/src/workerd/server/tests/python/workflow-rollback/workflow.py +++ b/src/workerd/server/tests/python/workflow-rollback/workflow.py @@ -29,6 +29,8 @@ async def run(self, event, step): return await self._test_stop_on_first_undo_failure(step) elif test_name == "rollback_without_error_arg": return await self._test_rollback_without_error_arg(step) + elif test_name == "continue_on_error": + return await self._test_continue_on_error(step) else: raise ValueError(f"Unknown test: {test_name}") @@ -178,7 +180,7 @@ async def regular_step(): return {"success": True} async def _test_nested_rollback_throws(self, step): - """Test that calling rollback_all during rollback throws RuntimeError.""" + """Test that calling rollback_all during rollback throws an error.""" error_caught = {"type": None, "message": None} @step.with_rollback("step_1") @@ -188,14 +190,18 @@ async def step_1(): @step_1.undo async def undo_1(error, value): # Try to call rollback_all from within an undo handler + # Engine throws WorkflowFatalError, Python-side catches as Exception try: await step.rollback_all(Exception("nested")) - except RuntimeError as e: - error_caught["type"] = "RuntimeError" + except Exception as e: + error_caught["type"] = type(e).__name__ error_caught["message"] = str(e) await step_1() - await step.rollback_all(Exception("trigger")) + try: + await step.rollback_all(Exception("trigger")) + except Exception: + pass # Rollback always throws after completion return error_caught @@ -259,6 +265,52 @@ async def undo_1(error, value): return results + async def _test_continue_on_error(self, step): + """Test that continue_on_error=True executes all undos and collects errors.""" + executed = [] + + @step.with_rollback("step_1") + async def step_1(): + return 1 + + @step_1.undo + async def undo_1(error, value): + executed.append("undo_1") + + @step.with_rollback("step_2") + async def step_2(): + return 2 + + @step_2.undo + async def undo_2(error, value): + executed.append("undo_2") + raise RuntimeError("undo_2 failed") + + @step.with_rollback("step_3") + async def step_3(): + return 3 + + @step_3.undo + async def undo_3(error, value): + executed.append("undo_3") + + await step_1() + await step_2() + await step_3() + + # With continue_on_error=True, all undos should execute + error_info = {"type": None, "count": 0} + try: + await step.rollback_all(Exception("trigger"), continue_on_error=True) + except ExceptionGroup as eg: + error_info["type"] = "ExceptionGroup" + error_info["count"] = len(eg.exceptions) + except Exception as e: + error_info["type"] = type(e).__name__ + + # All undos should have executed (LIFO: undo_3, undo_2, undo_1) + return {"executed": executed, "error": error_info} + async def test(ctrl, env, ctx): pass From 28aacc0209316c2b05885eb6f0b930beb5b76240 Mon Sep 17 00:00:00 2001 From: dmmulroy Date: Tue, 6 Jan 2026 13:23:42 -0500 Subject: [PATCH 4/4] refactor: remove rollback_all from Python SDK - Remove step.rollback_all() method (now handled by engine auto-rollback) - Update with_rollback docstring to describe auto-rollback behavior - Simplify tests to only test with_rollback decorator registration Rollback is now triggered automatically by the engine when workflow throws an uncaught error (if rollback config enabled at instance creation). --- .../workers-api/src/workers/_workers.py | 44 +-- .../tests/python/workflow-rollback/worker.js | 110 ++---- .../python/workflow-rollback/workflow.py | 314 ++++-------------- 3 files changed, 89 insertions(+), 379 deletions(-) diff --git a/src/pyodide/internal/workers-api/src/workers/_workers.py b/src/pyodide/internal/workers-api/src/workers/_workers.py index 6a0ede474a8..f19adac1224 100644 --- a/src/pyodide/internal/workers-api/src/workers/_workers.py +++ b/src/pyodide/internal/workers-api/src/workers/_workers.py @@ -1245,8 +1245,9 @@ def with_rollback(self, name: str, config: dict | None = None): Decorator to define a step with rollback/compensation support (saga pattern). Returns a callable wrapper that allows attaching an .undo decorator for - compensation logic. The undo function executes in LIFO order when - rollback_all() is called. + compensation logic. Undo functions execute automatically in LIFO order + when the workflow throws an uncaught error (if rollback config is enabled + at instance creation). Args: name: The name of the step. @@ -1263,12 +1264,8 @@ async def undo_save(error, record_id): record_id = await save() - # Later, if something fails: - try: - await some_failing_step() - except Exception as e: - await step.rollback_all(e) - raise + # If any step throws, undo functions run automatically + # (when instance created with rollback config enabled) """ def decorator(func): @@ -1276,37 +1273,6 @@ def decorator(func): return decorator - async def rollback_all( - self, trigger_error: Exception = None, *, continue_on_error: bool = False - ): - """ - Execute all registered undo handlers in LIFO order. - - Args: - trigger_error: Error passed to each undo handler. - continue_on_error: Continue after undo failure (raises AggregateError). - """ - try: - # Convert Python exception to JS error for the engine - js_error = None - if trigger_error is not None: - js_error = to_js( - { - "name": type(trigger_error).__name__, - "message": str(trigger_error), - }, - dict_converter=Object.fromEntries, - ) - - options = to_js( - {"continueOnError": continue_on_error}, - dict_converter=Object.fromEntries, - ) - - await self._js_step.rollbackAll(js_error, options) - except Exception as exc: - raise _from_js_error(exc) from exc - async def _resolve_dependency(self, dep): if dep._step_name in self._memoized_dependencies: return self._memoized_dependencies[dep._step_name] diff --git a/src/workerd/server/tests/python/workflow-rollback/worker.js b/src/workerd/server/tests/python/workflow-rollback/worker.js index 2b99bf11edc..bb941195fb5 100644 --- a/src/workerd/server/tests/python/workflow-rollback/worker.js +++ b/src/workerd/server/tests/python/workflow-rollback/worker.js @@ -18,122 +18,58 @@ export default { async test(ctrl, env, ctx) { const stubStep = new Context(); - // Test 1: Basic rollback + // Test 1: Basic with_rollback - executes do and returns value { const resp = await env.PythonWorkflow.run( - { test: 'basic_rollback' }, + { test: 'with_rollback_basic' }, stubStep ); - assert.deepStrictEqual(resp, [ - 'do_1', - 'do_2', - 'undo_2:value_2', - 'undo_1:value_1', - ]); - console.log('✓ basic_rollback'); + assert.deepStrictEqual(resp, ['do_1', 'returned:value_1']); + console.log('✓ with_rollback_basic'); } - // Test 2: LIFO order + // Test 2: Undo decorator properly registers handler { const resp = await env.PythonWorkflow.run( - { test: 'lifo_order' }, + { test: 'with_rollback_undo_decorator' }, stubStep ); - assert.deepStrictEqual(resp, ['third', 'second', 'first']); - console.log('✓ lifo_order'); + assert.strictEqual(resp.has_undo, true); + console.log('✓ with_rollback_undo_decorator'); } - // Test 3: Steps without undo handlers + // Test 3: Handler structure captures return value { const resp = await env.PythonWorkflow.run( - { test: 'no_undo_handler' }, + { test: 'with_rollback_undo_receives_value' }, stubStep ); - assert.deepStrictEqual(resp, [ - 'do_with_undo', - 'do_without_undo', - 'undo_with_undo', - ]); - console.log('✓ no_undo_handler'); + assert.deepStrictEqual(resp.do_result, { id: 123, data: 'important' }); + assert.strictEqual(resp.undo_registered, true); + console.log('✓ with_rollback_undo_receives_value'); } - // Test 4: Undo receives value and error + // Test 4: Config and undoConfig are stored { const resp = await env.PythonWorkflow.run( - { test: 'undo_receives_value' }, + { test: 'with_rollback_config' }, stubStep ); - assert.deepStrictEqual(resp.value, { id: 123, data: 'important' }); - assert.strictEqual(resp.error_msg, 'the error message'); - console.log('✓ undo_receives_value'); + assert.deepStrictEqual(resp.step_config, { retries: { limit: 1 } }); + assert.deepStrictEqual(resp.undo_config, { retries: { limit: 3 } }); + console.log('✓ with_rollback_config'); } - // Test 5: Undo with separate config + // Test 5: with_rollback works without undo handler { const resp = await env.PythonWorkflow.run( - { test: 'undo_with_config' }, + { test: 'with_rollback_no_undo' }, stubStep ); - assert.strictEqual(resp.undo, 1); - console.log('✓ undo_with_config'); + assert.deepStrictEqual(resp, ['do_without_undo', 'returned:value']); + console.log('✓ with_rollback_no_undo'); } - // Test 6: Empty undo stack is no-op - { - const resp = await env.PythonWorkflow.run( - { test: 'empty_undo_stack_noop' }, - stubStep - ); - assert.strictEqual(resp.success, true); - console.log('✓ empty_undo_stack_noop'); - } - - // Test 7: Nested rollback_all throws an error (engine's WorkflowFatalError) - { - const resp = await env.PythonWorkflow.run( - { test: 'nested_rollback_throws' }, - stubStep - ); - // Engine throws WorkflowFatalError when rollbackAll is called during rollback - assert.ok(resp.type !== null, 'Expected an error type'); - assert.ok(resp.message.includes('rollback')); - console.log('✓ nested_rollback_throws'); - } - - // Test 8: Stop on first undo failure - { - const resp = await env.PythonWorkflow.run( - { test: 'stop_on_first_undo_failure' }, - stubStep - ); - // LIFO order: undo_3 runs, undo_2 fails, undo_1 never runs - assert.deepStrictEqual(resp, ['undo_3', 'undo_2']); - console.log('✓ stop_on_first_undo_failure'); - } - - // Test 9: Rollback without error argument - { - const resp = await env.PythonWorkflow.run( - { test: 'rollback_without_error_arg' }, - stubStep - ); - assert.deepStrictEqual(resp, ['undo_1:error=true']); - console.log('✓ rollback_without_error_arg'); - } - - // Test 10: continue_on_error executes all undos - { - const resp = await env.PythonWorkflow.run( - { test: 'continue_on_error' }, - stubStep - ); - // All undos should execute (LIFO: undo_3, undo_2 fails, undo_1) - assert.deepStrictEqual(resp.executed, ['undo_3', 'undo_2', 'undo_1']); - assert.strictEqual(resp.error.type, 'ExceptionGroup'); - assert.strictEqual(resp.error.count, 1); - console.log('✓ continue_on_error'); - } - - console.log('All rollback tests passed!'); + console.log('All with_rollback tests passed!'); }, }; diff --git a/src/workerd/server/tests/python/workflow-rollback/workflow.py b/src/workerd/server/tests/python/workflow-rollback/workflow.py index de40a6fd8e6..aa05223f17a 100644 --- a/src/workerd/server/tests/python/workflow-rollback/workflow.py +++ b/src/workerd/server/tests/python/workflow-rollback/workflow.py @@ -6,36 +6,31 @@ class WorkflowRollbackExample(WorkflowEntrypoint): - """Test workflow demonstrating the saga rollback pattern.""" + """Test workflow demonstrating the saga rollback pattern. - async def run(self, event, step): - test_name = event.get("test", "basic_rollback") + Note: Rollback is triggered automatically by the engine when a workflow + throws an uncaught error (if rollback config is enabled at instance creation). + These tests verify the with_rollback decorator and undo handler registration. + """ - if test_name == "basic_rollback": - return await self._test_basic_rollback(step) - elif test_name == "lifo_order": - return await self._test_lifo_order(step) - elif test_name == "no_undo_handler": - return await self._test_no_undo_handler(step) - elif test_name == "undo_receives_value": - return await self._test_undo_receives_value(step) - elif test_name == "undo_with_config": - return await self._test_undo_with_config(step) - elif test_name == "empty_undo_stack_noop": - return await self._test_empty_undo_stack_noop(step) - elif test_name == "nested_rollback_throws": - return await self._test_nested_rollback_throws(step) - elif test_name == "stop_on_first_undo_failure": - return await self._test_stop_on_first_undo_failure(step) - elif test_name == "rollback_without_error_arg": - return await self._test_rollback_without_error_arg(step) - elif test_name == "continue_on_error": - return await self._test_continue_on_error(step) + async def run(self, event, step): + test_name = event.get("test", "with_rollback_basic") + + if test_name == "with_rollback_basic": + return await self._test_with_rollback_basic(step) + elif test_name == "with_rollback_undo_decorator": + return await self._test_with_rollback_undo_decorator(step) + elif test_name == "with_rollback_undo_receives_value": + return await self._test_with_rollback_undo_receives_value(step) + elif test_name == "with_rollback_config": + return await self._test_with_rollback_config(step) + elif test_name == "with_rollback_no_undo": + return await self._test_with_rollback_no_undo(step) else: raise ValueError(f"Unknown test: {test_name}") - async def _test_basic_rollback(self, step): - """Test that rollback_all executes undo handlers.""" + async def _test_with_rollback_basic(self, step): + """Test that with_rollback executes the do function and returns value.""" results = [] @step.with_rollback("step_1") @@ -47,90 +42,34 @@ async def step_1(): async def undo_1(error, value): results.append(f"undo_1:{value}") - @step.with_rollback("step_2") - async def step_2(): - results.append("do_2") - return "value_2" - - @step_2.undo - async def undo_2(error, value): - results.append(f"undo_2:{value}") - - await step_1() - await step_2() - - # Trigger rollback - await step.rollback_all(ValueError("test error")) + value = await step_1() + results.append(f"returned:{value}") return results - async def _test_lifo_order(self, step): - """Test that undos execute in LIFO (reverse) order.""" - order = [] - - @step.with_rollback("first") - async def first(): - return 1 - - @first.undo - async def undo_first(error, value): - order.append("first") - - @step.with_rollback("second") - async def second(): - return 2 - - @second.undo - async def undo_second(error, value): - order.append("second") - - @step.with_rollback("third") - async def third(): - return 3 + async def _test_with_rollback_undo_decorator(self, step): + """Test that .undo decorator properly registers undo handler.""" + registered = {"has_undo": False} - @third.undo - async def undo_third(error, value): - order.append("third") - - await first() - await second() - await third() - - await step.rollback_all(Exception("trigger")) - - # Should be LIFO: third, second, first - return order - - async def _test_no_undo_handler(self, step): - """Test that steps without undo handlers don't break rollback.""" - results = [] - - @step.with_rollback("with_undo") - async def with_undo(): - results.append("do_with_undo") + @step.with_rollback("step_1") + async def step_1(): return "value" - @with_undo.undo - async def undo_with(error, value): - results.append("undo_with_undo") - - # Step without .undo decorator - @step.with_rollback("without_undo") - async def without_undo(): - results.append("do_without_undo") - return "no_undo_value" - - await with_undo() - await without_undo() + # Check that undo decorator returns the wrapper + @step_1.undo + async def undo_1(error, value): + registered["has_undo"] = True - await step.rollback_all(Exception("trigger")) + # The wrapper should have _undo_fn set + registered["has_undo"] = step_1._undo_fn is not None - # Only with_undo should have undo called - return results + await step_1() + return registered - async def _test_undo_receives_value(self, step): - """Test that undo receives the original step's return value.""" - received = {} + async def _test_with_rollback_undo_receives_value(self, step): + """Test that undo handler receives the correct value from do function.""" + # This test verifies the handler structure is correct + received = {"value": None} @step.with_rollback("step_with_value") async def step_with_value(): @@ -139,16 +78,17 @@ async def step_with_value(): @step_with_value.undo async def undo_step(error, value): received["value"] = value - received["error_msg"] = str(error) - - await step_with_value() - await step.rollback_all(ValueError("the error message")) - return received + result = await step_with_value() + # Verify do function returned expected value + return { + "do_result": result, + "undo_registered": step_with_value._undo_fn is not None, + } - async def _test_undo_with_config(self, step): - """Test that undo can have its own retry config.""" - call_count = {"undo": 0} + async def _test_with_rollback_config(self, step): + """Test that with_rollback accepts config and undoConfig.""" + configs = {"step_config": None, "undo_config": None} @step.with_rollback("step", config={"retries": {"limit": 1}}) async def step_fn(): @@ -156,161 +96,29 @@ async def step_fn(): @step_fn.undo(config={"retries": {"limit": 3}}) async def undo_fn(error, value): - call_count["undo"] += 1 - return "undone" - - await step_fn() - await step.rollback_all(Exception("trigger")) - - return call_count - - async def _test_empty_undo_stack_noop(self, step): - """Test that rollback_all is a no-op when undo stack is empty.""" - - # Don't register any rollback handlers - @step.do("regular_step") - async def regular_step(): - return "done" - - await regular_step() - - # This should not raise or do anything - await step.rollback_all(Exception("trigger")) - - return {"success": True} - - async def _test_nested_rollback_throws(self, step): - """Test that calling rollback_all during rollback throws an error.""" - error_caught = {"type": None, "message": None} - - @step.with_rollback("step_1") - async def step_1(): - return "value" - - @step_1.undo - async def undo_1(error, value): - # Try to call rollback_all from within an undo handler - # Engine throws WorkflowFatalError, Python-side catches as Exception - try: - await step.rollback_all(Exception("nested")) - except Exception as e: - error_caught["type"] = type(e).__name__ - error_caught["message"] = str(e) - - await step_1() - try: - await step.rollback_all(Exception("trigger")) - except Exception: - pass # Rollback always throws after completion - - return error_caught - - async def _test_stop_on_first_undo_failure(self, step): - """Test that rollback stops on first undo failure by default.""" - executed = [] + pass - @step.with_rollback("step_1") - async def step_1(): - return 1 - - @step_1.undo - async def undo_1(error, value): - executed.append("undo_1") + configs["step_config"] = step_fn._config + configs["undo_config"] = step_fn._undo_config - @step.with_rollback("step_2") - async def step_2(): - return 2 - - @step_2.undo - async def undo_2(error, value): - executed.append("undo_2") - raise RuntimeError("undo_2 failed") - - @step.with_rollback("step_3") - async def step_3(): - return 3 - - @step_3.undo - async def undo_3(error, value): - executed.append("undo_3") - - await step_1() - await step_2() - await step_3() - - # Rollback should: undo_3 (ok), undo_2 (fail), stop before undo_1 - try: - await step.rollback_all(Exception("trigger")) - except Exception: - pass # Expected to fail - - return executed + await step_fn() + return configs - async def _test_rollback_without_error_arg(self, step): - """Test that rollback_all works when called without an error argument.""" + async def _test_with_rollback_no_undo(self, step): + """Test that with_rollback works without an undo handler attached.""" results = [] - @step.with_rollback("step_1") - async def step_1(): + @step.with_rollback("step_without_undo") + async def step_without_undo(): + results.append("do_without_undo") return "value" - @step_1.undo - async def undo_1(error, value): - results.append(f"undo_1:error={error is None}") - - await step_1() - - # Call without error - should use None - await step.rollback_all(None) + # Don't attach .undo - should still work + value = await step_without_undo() + results.append(f"returned:{value}") return results - async def _test_continue_on_error(self, step): - """Test that continue_on_error=True executes all undos and collects errors.""" - executed = [] - - @step.with_rollback("step_1") - async def step_1(): - return 1 - - @step_1.undo - async def undo_1(error, value): - executed.append("undo_1") - - @step.with_rollback("step_2") - async def step_2(): - return 2 - - @step_2.undo - async def undo_2(error, value): - executed.append("undo_2") - raise RuntimeError("undo_2 failed") - - @step.with_rollback("step_3") - async def step_3(): - return 3 - - @step_3.undo - async def undo_3(error, value): - executed.append("undo_3") - - await step_1() - await step_2() - await step_3() - - # With continue_on_error=True, all undos should execute - error_info = {"type": None, "count": 0} - try: - await step.rollback_all(Exception("trigger"), continue_on_error=True) - except ExceptionGroup as eg: - error_info["type"] = "ExceptionGroup" - error_info["count"] = len(eg.exceptions) - except Exception as e: - error_info["type"] = type(e).__name__ - - # All undos should have executed (LIFO: undo_3, undo_2, undo_1) - return {"executed": executed, "error": error_info} - async def test(ctrl, env, ctx): pass