diff --git a/src/pyodide/internal/workers-api/src/workers/_workers.py b/src/pyodide/internal/workers-api/src/workers/_workers.py index 21b97ed55fb..f19adac1224 100644 --- a/src/pyodide/internal/workers-api/src/workers/_workers.py +++ b/src/pyodide/internal/workers-api/src/workers/_workers.py @@ -1125,6 +1125,77 @@ def wrapper(*args, **kwargs): return wrapper +class _RollbackStepWrapper: + """ + Wrapper returned by step.with_rollback() that allows attaching an .undo decorator. + + Delegates to the engine's withRollback for durable undo stack management. + + Usage: + @step.with_rollback("save to db") + async def save(): + return await db.insert(data) + + @save.undo + async def undo_save(error, record_id): + await db.delete(record_id) + + record_id = await save() + """ + + def __init__( + self, + step_wrapper: "_WorkflowStepWrapper", + name: str, + do_fn, + config: dict | None = None, + ): + self._step_wrapper = step_wrapper + self._name = name + self._do_fn = do_fn + self._config = config + self._undo_fn = None + self._undo_config = None + self._step_name = name # For compatibility with dependency resolution + + def undo(self, config: dict | None = None): + """ + Decorator to register an undo/compensation function for this step. + + The undo function receives (error, value) where value is the result + of the do function. + + Args: + config: Optional WorkflowStepConfig for the undo step's retry behavior. + If not provided, inherits from the do step's config. + """ + + def decorator(fn): + self._undo_fn = fn + self._undo_config = config + return fn + + # Support both @save.undo and @save.undo(config={...}) + if callable(config): + fn = config + self._undo_fn = fn + self._undo_config = None + return fn + + return decorator + + async def __call__(self): + """Execute the step via engine's withRollback for durable undo stack.""" + return await _withRollback_call( + self._step_wrapper, + self._name, + self._config, + self._undo_config, + self._do_fn, + self._undo_fn, + ) + + class _WorkflowStepWrapper: def __init__(self, js_step): self._js_step = js_step @@ -1169,6 +1240,39 @@ def wait_for_event(self, name, event_type, /, timeout="24 hours"): ), ) + def with_rollback(self, name: str, config: dict | None = None): + """ + Decorator to define a step with rollback/compensation support (saga pattern). + + Returns a callable wrapper that allows attaching an .undo decorator for + compensation logic. Undo functions execute automatically in LIFO order + when the workflow throws an uncaught error (if rollback config is enabled + at instance creation). + + Args: + name: The name of the step. + config: Optional WorkflowStepConfig for configuring retry behavior. + + Usage: + @step.with_rollback("save to db") + async def save(): + return await db.insert(data) + + @save.undo + async def undo_save(error, record_id): + await db.delete(record_id) + + record_id = await save() + + # If any step throws, undo functions run automatically + # (when instance created with rollback config enabled) + """ + + def decorator(func): + return _RollbackStepWrapper(self, name, func, config) + + return decorator + async def _resolve_dependency(self, dep): if dep._step_name in self._memoized_dependencies: return self._memoized_dependencies[dep._step_name] @@ -1211,6 +1315,66 @@ async def _closure(): return result +async def _withRollback_call(entrypoint, name, config, undo_config, do_fn, undo_fn): + """Call the engine's withRollback with Python callbacks wrapped for JS.""" + + async def _closure(): + async def _do_callback(): + result = do_fn() + if inspect.iscoroutine(result): + result = await result + return to_js(result, dict_converter=Object.fromEntries) + + async def _undo_callback(js_err, js_value): + py_err = None + if js_err is not None: + py_err = ( + _from_js_error(js_err) if hasattr(js_err, "message") else js_err + ) + + py_value = python_from_rpc(js_value) + + result = undo_fn(py_err, py_value) + if inspect.iscoroutine(result): + await result + + handler = {"do": _do_callback} + if undo_fn is not None: + handler["undo"] = _undo_callback + + js_handler = to_js(handler, dict_converter=Object.fromEntries) + + js_config = None + if config is not None or undo_config is not None: + config_dict = dict(config) if config else {} + if undo_config is not None: + config_dict["undoConfig"] = undo_config + js_config = to_js(config_dict, dict_converter=Object.fromEntries) + + try: + if js_config is None: + result = await entrypoint._js_step.withRollback(name, js_handler) + else: + result = await entrypoint._js_step.withRollback( + name, js_handler, js_config + ) + + return python_from_rpc(result) + except Exception as exc: + raise _from_js_error(exc) from exc + + task = create_task(_closure()) + entrypoint._in_flight[name] = task + + try: + result = await task + entrypoint._memoized_dependencies[name] = result + finally: + del entrypoint._in_flight[name] + + return result + + def _wrap_subclass(cls): # Override the class __init__ so that we can wrap the `env` in the constructor. original_init = cls.__init__ diff --git a/src/workerd/server/tests/python/BUILD.bazel b/src/workerd/server/tests/python/BUILD.bazel index 386ea963098..6c55984b181 100644 --- a/src/workerd/server/tests/python/BUILD.bazel +++ b/src/workerd/server/tests/python/BUILD.bazel @@ -58,6 +58,8 @@ py_wd_test("python-rpc") py_wd_test("workflow-entrypoint") +py_wd_test("workflow-rollback") + py_wd_test("vendor_dir_compat_flag") py_wd_test("multiprocessing") diff --git a/src/workerd/server/tests/python/workflow-rollback/worker.js b/src/workerd/server/tests/python/workflow-rollback/worker.js new file mode 100644 index 00000000000..bb941195fb5 --- /dev/null +++ b/src/workerd/server/tests/python/workflow-rollback/worker.js @@ -0,0 +1,75 @@ +import { RpcTarget } from 'cloudflare:workers'; +import * as assert from 'node:assert'; + +class Context extends RpcTarget { + async do(name, configOrFn, maybeFn) { + const fn = maybeFn ?? configOrFn; + try { + const result = await fn(); + return result; + } catch (e) { + console.log(`Error received: ${e.name} Message: ${e.message}`); + throw e; + } + } +} + +export default { + async test(ctrl, env, ctx) { + const stubStep = new Context(); + + // Test 1: Basic with_rollback - executes do and returns value + { + const resp = await env.PythonWorkflow.run( + { test: 'with_rollback_basic' }, + stubStep + ); + assert.deepStrictEqual(resp, ['do_1', 'returned:value_1']); + console.log('✓ with_rollback_basic'); + } + + // Test 2: Undo decorator properly registers handler + { + const resp = await env.PythonWorkflow.run( + { test: 'with_rollback_undo_decorator' }, + stubStep + ); + assert.strictEqual(resp.has_undo, true); + console.log('✓ with_rollback_undo_decorator'); + } + + // Test 3: Handler structure captures return value + { + const resp = await env.PythonWorkflow.run( + { test: 'with_rollback_undo_receives_value' }, + stubStep + ); + assert.deepStrictEqual(resp.do_result, { id: 123, data: 'important' }); + assert.strictEqual(resp.undo_registered, true); + console.log('✓ with_rollback_undo_receives_value'); + } + + // Test 4: Config and undoConfig are stored + { + const resp = await env.PythonWorkflow.run( + { test: 'with_rollback_config' }, + stubStep + ); + assert.deepStrictEqual(resp.step_config, { retries: { limit: 1 } }); + assert.deepStrictEqual(resp.undo_config, { retries: { limit: 3 } }); + console.log('✓ with_rollback_config'); + } + + // Test 5: with_rollback works without undo handler + { + const resp = await env.PythonWorkflow.run( + { test: 'with_rollback_no_undo' }, + stubStep + ); + assert.deepStrictEqual(resp, ['do_without_undo', 'returned:value']); + console.log('✓ with_rollback_no_undo'); + } + + console.log('All with_rollback tests passed!'); + }, +}; diff --git a/src/workerd/server/tests/python/workflow-rollback/workflow-rollback.wd-test b/src/workerd/server/tests/python/workflow-rollback/workflow-rollback.wd-test new file mode 100644 index 00000000000..c209f4036e3 --- /dev/null +++ b/src/workerd/server/tests/python/workflow-rollback/workflow-rollback.wd-test @@ -0,0 +1,34 @@ +using Workerd = import "/workerd/workerd.capnp"; + +const config :Workerd.Config = ( + services = [ + (name = "py", worker = .pyWorker), + (name = "js", worker = .jsWorker), + ], +); + +const pyWorker :Workerd.Worker = ( + + compatibilityFlags = [%PYTHON_FEATURE_FLAGS, "python_workflows", "rpc", "disable_python_no_global_handlers"], + + modules = [ + (name = "workflow.py", pythonModule = embed "workflow.py"), + ], + + bindings = [ + (name = "PythonWorkflow", service = (name = "py", entrypoint = "WorkflowRollbackExample")), + ], +); + +const jsWorker :Workerd.Worker = ( + + compatibilityFlags = ["nodejs_compat", "rpc", "disable_python_no_global_handlers"], + + modules = [ + (name = "worker", esModule = embed "worker.js"), + ], + + bindings = [ + (name = "PythonWorkflow", service = (name = "py", entrypoint = "WorkflowRollbackExample")), + ], +); diff --git a/src/workerd/server/tests/python/workflow-rollback/workflow.py b/src/workerd/server/tests/python/workflow-rollback/workflow.py new file mode 100644 index 00000000000..aa05223f17a --- /dev/null +++ b/src/workerd/server/tests/python/workflow-rollback/workflow.py @@ -0,0 +1,124 @@ +# Copyright (c) 2025 Cloudflare, Inc. +# Licensed under the Apache 2.0 license found in the LICENSE file or at: +# https://opensource.org/licenses/Apache-2.0 + +from workers import WorkflowEntrypoint + + +class WorkflowRollbackExample(WorkflowEntrypoint): + """Test workflow demonstrating the saga rollback pattern. + + Note: Rollback is triggered automatically by the engine when a workflow + throws an uncaught error (if rollback config is enabled at instance creation). + These tests verify the with_rollback decorator and undo handler registration. + """ + + async def run(self, event, step): + test_name = event.get("test", "with_rollback_basic") + + if test_name == "with_rollback_basic": + return await self._test_with_rollback_basic(step) + elif test_name == "with_rollback_undo_decorator": + return await self._test_with_rollback_undo_decorator(step) + elif test_name == "with_rollback_undo_receives_value": + return await self._test_with_rollback_undo_receives_value(step) + elif test_name == "with_rollback_config": + return await self._test_with_rollback_config(step) + elif test_name == "with_rollback_no_undo": + return await self._test_with_rollback_no_undo(step) + else: + raise ValueError(f"Unknown test: {test_name}") + + async def _test_with_rollback_basic(self, step): + """Test that with_rollback executes the do function and returns value.""" + results = [] + + @step.with_rollback("step_1") + async def step_1(): + results.append("do_1") + return "value_1" + + @step_1.undo + async def undo_1(error, value): + results.append(f"undo_1:{value}") + + value = await step_1() + results.append(f"returned:{value}") + + return results + + async def _test_with_rollback_undo_decorator(self, step): + """Test that .undo decorator properly registers undo handler.""" + registered = {"has_undo": False} + + @step.with_rollback("step_1") + async def step_1(): + return "value" + + # Check that undo decorator returns the wrapper + @step_1.undo + async def undo_1(error, value): + registered["has_undo"] = True + + # The wrapper should have _undo_fn set + registered["has_undo"] = step_1._undo_fn is not None + + await step_1() + return registered + + async def _test_with_rollback_undo_receives_value(self, step): + """Test that undo handler receives the correct value from do function.""" + # This test verifies the handler structure is correct + received = {"value": None} + + @step.with_rollback("step_with_value") + async def step_with_value(): + return {"id": 123, "data": "important"} + + @step_with_value.undo + async def undo_step(error, value): + received["value"] = value + + result = await step_with_value() + # Verify do function returned expected value + return { + "do_result": result, + "undo_registered": step_with_value._undo_fn is not None, + } + + async def _test_with_rollback_config(self, step): + """Test that with_rollback accepts config and undoConfig.""" + configs = {"step_config": None, "undo_config": None} + + @step.with_rollback("step", config={"retries": {"limit": 1}}) + async def step_fn(): + return "done" + + @step_fn.undo(config={"retries": {"limit": 3}}) + async def undo_fn(error, value): + pass + + configs["step_config"] = step_fn._config + configs["undo_config"] = step_fn._undo_config + + await step_fn() + return configs + + async def _test_with_rollback_no_undo(self, step): + """Test that with_rollback works without an undo handler attached.""" + results = [] + + @step.with_rollback("step_without_undo") + async def step_without_undo(): + results.append("do_without_undo") + return "value" + + # Don't attach .undo - should still work + value = await step_without_undo() + results.append(f"returned:{value}") + + return results + + +async def test(ctrl, env, ctx): + pass