This guide walks through setting up MageFlow with your preferred task manager backend and configuring the necessary dependencies.
Install MageFlow with your preferred task manager backend using the appropriate extra:
pip install mageflow[hatchet]MageFlow requires configuration for both the task manager backend and Redis storage.
import redis
from hatchet_sdk import Hatchet, ClientConfig
import mageflow
config_obj = ClientConfig(token="your-hatchet-token")
redis_client = redis.asyncio.from_url(
"redis-url",
max_connections=1028,
decode_responses=True,
)
hatchet = Hatchet(debug=True, config=config_obj)
# Create the MageFlow instance
hatchet = mageflow.Mageflow(hatchet, redis_client=redis_client)For a smooth transition experience, we recommend calling the wrapped object with the original name, it has all the same functions and configurations.
Define tasks using the mageflow task decorators:
@hatchet.task(name="process-data", input_validator=YourModelA)
async def process_data(msg: YourModelA):
result = {"processed": msg.data}
return result
@hatchet.task(name="send-notification", input_validator=YourModelB)
async def send_notification(msg: YourModelB):
print(f"Notification sent for: {msg.data}")
return {"status": "sent"}For long-running or critical tasks, use durable tasks:
@hatchet.durable_task(name="critical-process", input_validator=YourModelB)
async def critical_process(msg):
await asyncio.sleep(5)
return {"completed": True}If you want the task to stay with the hatchet definition (with ctx parameter), use the param_config parameter:
hatchet = mageflow.Mageflow(hatchet, redis_client=redis_client, param_config=AcceptParams.ALL)
@hatchet.task(name="send-notification", input_validator=YourModelB)
async def send_notification(msg: YourModelB, ctx: Context):
print(f"Notification sent for: {msg.data}")
return {"status": "sent"}MageFlow provides two special decorators that allow you to override the default behavior on a per-task basis:
The with_ctx decorator allows a specific task to receive the Hatchet context even when the client is configured with NO_CTX. This is useful when most of your tasks don't need the context, but specific tasks do:
hatchet = mageflow.Mageflow(hatchet, redis_client=redis_client, param_config=AcceptParams.NO_CTX)
@hatchet.task(name="context-aware-task", input_validator=YourModel)
@hatchet.with_ctx
async def context_aware_task(msg: YourModel, ctx: Context):
step_id = ctx.step_run_id
print(f"Running in step: {step_id}")
return {"processed": True}
@hatchet.task(name="regular-task", input_validator=YourModel)
async def regular_task(msg: YourModel):
return {"processed": True}The with_signature decorator allows a task to receive its own task signature as a parameter. This is useful when a task needs to inspect or manipulate its own execution configuration:
from thirdmagic.task import TaskSignature
@hatchet.task(name="self-aware-task", input_validator=YourModel)
@hatchet.with_signature
async def self_aware_task(msg: YourModel, signature: TaskSignature):
task_name = signature.task_name
task_id = signature.key
print(f"Running task: {task_name} with ID: {task_id}")
return {"task_name": task_name}
# Combine both decorators
@hatchet.task(name="full-access-task", input_validator=YourModel)
@hatchet.with_ctx
@hatchet.with_signature
async def full_access_task(msg: YourModel, ctx: Context, signature: TaskSignature):
step_id = ctx.step_run_id
task_name = signature.task_name
return {"step_id": step_id, "task_name": task_name}Create a worker to run your registered tasks:
def main():
workflows = [
process_data,
send_notification,
critical_process,
]
worker = hatchet.worker("my-worker", workflows=workflows)
worker.start()
if __name__ == "__main__":
main()With your setup complete, you're ready to:
- Define workflows using callbacks
- Chain tasks together with task chains
- Run parallel tasks using swarms
- Monitor and debug your workflows using the built-in tools