From a6f2a43a0d028817817ee8a2e1b3f70aa4b36189 Mon Sep 17 00:00:00 2001 From: YedidyaHKfir Date: Thu, 18 Dec 2025 18:55:06 +0200 Subject: [PATCH 1/2] [support-to-invoke-task-for-same-worker] - check all chain params are called with the kwargs --- tests/integration/hatchet/assertions.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/hatchet/assertions.py b/tests/integration/hatchet/assertions.py index 8e8955c..ca22bb0 100644 --- a/tests/integration/hatchet/assertions.py +++ b/tests/integration/hatchet/assertions.py @@ -226,11 +226,11 @@ def assert_chain_done( chain_tasks = [task_map[task_id] for task_id in chain_signature.tasks] assert_tasks_in_order(wf_by_signature, chain_tasks) output_value = None - input_params = {} for chain_task_id in chain_signature.tasks: + input_params = chain_signature.kwargs.copy() task = task_map[chain_task_id] if output_value: - input_params = {task.return_value_field(): output_value} + input_params |= {task.return_value_field(): output_value} task_wf = _assert_task_done(chain_task_id, wf_by_signature, input_params) output_value = task_wf.output["hatchet_results"] From cf33cf1de1bde5a3801d8ae5f9729e956cd3b81d Mon Sep 17 00:00:00 2001 From: YedidyaHKfir Date: Thu, 18 Dec 2025 18:56:28 +0200 Subject: [PATCH 2/2] [support-to-invoke-task-for-same-worker] - send data via kwargs, not message --- tests/integration/hatchet/chain/test__chain.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/integration/hatchet/chain/test__chain.py b/tests/integration/hatchet/chain/test__chain.py index e9062ed..dabacb0 100644 --- a/tests/integration/hatchet/chain/test__chain.py +++ b/tests/integration/hatchet/chain/test__chain.py @@ -3,6 +3,7 @@ import pytest import mageflow +from hatchet_sdk.runnables.types import EmptyModel from mageflow.signature.model import TaskSignature from tests.integration.hatchet.assertions import ( assert_redis_is_clean, @@ -35,7 +36,7 @@ async def test_chain_integration( hatchet_client_init.redis_client, hatchet_client_init.hatchet, ) - message = ContextMessage(base_data=test_ctx) + message = EmptyModel() signature2 = await mageflow.sign(task2, success_callbacks=[sign_callback1]) chain_success_error_callback = await mageflow.sign(error_callback) @@ -48,6 +49,7 @@ async def test_chain_integration( [sign_task1, signature2.key, task3], success=success_chain_signature, ) + await chain_signature.kwargs.aupdate(base_data=test_ctx) chain_tasks = await TaskSignature.afind() await chain_signature.aio_run_no_wait(message, options=trigger_options)