diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..7625378 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,26 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + gcc \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements and install Python dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy the project files +COPY . . + +# Install the package in development mode +RUN pip install -e . + +# Disable bytecode compilation +ENV PYTHONDONTWRITEBYTECODE=1 +# Unbuffer stdout and stderr +ENV PYTHONUNBUFFERED=1 +# Set default command +CMD ["python", "tests/manage.py", "runserver", "0.0.0.0:8000"] + diff --git a/demo/tests/test_process.py b/demo/tests/test_process.py index cf6ef9a..3bfeb6a 100644 --- a/demo/tests/test_process.py +++ b/demo/tests/test_process.py @@ -41,15 +41,18 @@ def test_invoice_callbacks(self, debug_method): 'model_name': 'invoice', 'instance_id': invoice.id, 'process_name': 'invoice_process', + 'action_name': 'demo', 'field_name': 'status', - 'transition': InvoiceProcess.transitions[3] + 'foo': 'bar' } expected_callbacks_kwargs = { 'app_label': 'demo', 'model_name': 'invoice', 'instance_id': invoice.id, 'process_name': 'invoice_process', + 'action_name': 'demo', 'field_name': 'status', + 'foo': 'bar' } self.assertEqual(list(debug_method.call_args_list[0]), [('demo_task_1',), expected_side_effects_kwargs]) self.assertEqual(list(debug_method.call_args_list[1]), [('demo_task_2', None), expected_side_effects_kwargs]) @@ -83,16 +86,19 @@ def test_invoice_failure_callbacks(self, debug_method): 'model_name': 'invoice', 'instance_id': invoice.id, 'process_name': 'invoice_process', + 'action_name': 'failing_transition', 'field_name': 'status', - 'transition': InvoiceProcess.transitions[5] + 'foo': 'bar' } expected_callbacks_kwargs = { 'app_label': 'demo', 'model_name': 'invoice', 'instance_id': invoice.id, 'process_name': 'invoice_process', + 'action_name': 'failing_transition', 'field_name': 'status', - 'exception': ctx.exception, + 'foo': 'bar', + 'exception': ctx.exception } self.assertEqual(list(debug_method.call_args_list[0]), [('demo_task_1',), expected_side_effects_kwargs]) self.assertEqual(list(debug_method.call_args_list[1]), [('demo_task_3',), expected_callbacks_kwargs]) diff --git a/django_logic_celery/commands.py b/django_logic_celery/commands.py index de25f6d..18dc8c3 100644 --- a/django_logic_celery/commands.py +++ b/django_logic_celery/commands.py @@ -8,19 +8,46 @@ from django_logic.state import State +def _find_transition_in_process(process, action_name): + """Recursively search for transition by action_name in process and nested processes""" + # Search in current process transitions + for transition in process.transitions: + if transition.action_name == action_name: + return transition + + # Search in nested processes recursively + for sub_process_class in process.nested_processes: + sub_process = sub_process_class(state=process.state) + result = _find_transition_in_process(sub_process, action_name) + if result is not None: + return result + + return None + + +def get_transition_from_process(instance, process_name, action_name): + """Helper function to retrieve transition from process by action_name""" + process = getattr(instance, process_name) + transition = _find_transition_in_process(process, action_name) + if transition is None: + raise ValueError(f"Transition with action_name '{action_name}' not found in process '{process_name}'") + return transition + + @shared_task(acks_late=True) def complete_transition(*args, **kwargs): """Completes transition """ app_label = kwargs['app_label'] model_name = kwargs['model_name'] instance_id = kwargs['instance_id'] - transition = kwargs['transition'] + action_name = kwargs['action_name'] process_name = kwargs['process_name'] app = apps.get_app_config(app_label) model = app.get_model(model_name) instance = model.objects.get(id=instance_id) state = getattr(instance, process_name).state + transition = get_transition_from_process(instance, process_name, action_name) logging.info(f'{state.instance_key} complete transition task started') transition.complete_transition(state, **kwargs) @@ -37,13 +64,15 @@ def fail_transition(task_id, *args, **kwargs): app_label = kwargs['app_label'] model_name = kwargs['model_name'] instance_id = kwargs['instance_id'] - transition = kwargs['transition'] + action_name = kwargs['action_name'] + process_name = kwargs['process_name'] try: app = apps.get_app_config(app_label) model = app.get_model(kwargs['model_name']) instance = model.objects.get(id=kwargs['instance_id']) state = getattr(instance, kwargs['process_name']).state + transition = get_transition_from_process(instance, process_name, action_name) try: # If exception is raised in success callback, it will be passed through args error = args[0] @@ -54,18 +83,19 @@ def fail_transition(task_id, *args, **kwargs): logging.exception(error) transition.fail_transition(state, error, **kwargs) except Exception as error: - logging.info(f'{app_label}-{model_name}-{transition.action_name}-{instance_id}' + logging.info(f'{app_label}-{model_name}-{action_name}-{instance_id}' f'failure handler failed with error: {error}') logging.exception(error) @shared_task(acks_late=True) -def run_side_effects_as_task(app_label, model_name, transition, instance_id, process_name, **kwargs): +def run_side_effects_as_task(app_label, model_name, action_name, instance_id, process_name, **kwargs): """It runs all side-effects of provided transition under a single task""" app = apps.get_app_config(app_label) model = app.get_model(model_name) instance = model.objects.get(id=instance_id) state = getattr(instance, process_name).state + transition = get_transition_from_process(instance, process_name, action_name) logging.info(f"{state.instance_key} side effects of '{transition.action_name}' started") try: @@ -81,13 +111,14 @@ def run_side_effects_as_task(app_label, model_name, transition, instance_id, pro @shared_task(acks_late=True) -def run_callbacks_as_task(app_label, model_name, transition, instance_id, process_name, **kwargs): +def run_callbacks_as_task(app_label, model_name, action_name, instance_id, process_name, **kwargs): """It runs all callbacks of provided transition under a single task""" try: app = apps.get_app_config(app_label) model = app.get_model(model_name) instance = model.objects.get(id=instance_id) state = getattr(instance, process_name).state + transition = get_transition_from_process(instance, process_name, action_name) logging.info(f"{state.instance_key} callbacks of '{transition.action_name}' started") exception = kwargs.get('exception') @@ -96,7 +127,7 @@ def run_callbacks_as_task(app_label, model_name, transition, instance_id, proces for callback in commands: callback(instance, **callback_kwargs) except Exception as error: - logging.info(f'{app_label}-{model_name}-{transition.action_name}-{instance_id}' + logging.info(f'{app_label}-{model_name}-{action_name}-{instance_id}' f'callbacks failed with error: {error}') logging.exception(error) @@ -119,10 +150,28 @@ def get_task_kwargs(self, state: State, **kwargs): model_name=state.instance._meta.model_name, instance_id=state.instance.pk, process_name=state.process_name, - field_name=state.field_name + field_name=state.field_name, + action_name=self._transition.action_name ) - if 'exception' in kwargs: - task_kwargs['exception'] = kwargs['exception'] + + # Only include serializable kwargs - convert objects to IDs where possible + serializable_kwargs = {} + for key, value in kwargs.items(): + if key == 'exception': + serializable_kwargs[key] = value + continue + # Skip functions and other callables + if callable(value) and not isinstance(value, type): + continue + # Convert user objects to user_id + if key == 'user' and hasattr(value, 'pk'): + serializable_kwargs['user_id'] = value.pk + continue + # Include only primitive serializable types + if isinstance(value, (str, int, float, bool, type(None))): + serializable_kwargs[key] = value + + task_kwargs.update(serializable_kwargs) return task_kwargs @@ -138,9 +187,22 @@ class SideEffectTasks(CeleryCommandMixin, SideEffects): """ def queue_task(self, task_kwargs): - header = [signature(task_name, kwargs=task_kwargs) for task_name in self.commands] + # Convert function objects to task names (strings) + task_names = [] + for cmd in self.commands: + if isinstance(cmd, str): + task_names.append(cmd) + elif hasattr(cmd, 'name'): + # Celery task object + task_names.append(cmd.name) + elif callable(cmd): + # Regular function - use its name (assumes it's registered as a Celery task) + task_names.append(cmd.__name__) + else: + task_names.append(str(cmd)) + + header = [signature(task_name, kwargs=task_kwargs) for task_name in task_names] header = chain(*header) - task_kwargs.update(dict(transition=self._transition)) body = complete_transition.s(**task_kwargs) tasks = chain(header | body).on_error(fail_transition.s(**task_kwargs)) transaction.on_commit(tasks.delay) @@ -157,11 +219,6 @@ def queue_task(self, task_kwargs): class SideEffectSingleTask(CeleryCommandMixin, SideEffects): """Side-effects commands executed as a single celery task""" - def get_task_kwargs(self, state: State, **kwargs): - task_kwargs = super().get_task_kwargs(state, **kwargs) - task_kwargs['transition'] = self._transition - return task_kwargs - def queue_task(self, task_kwargs): sig = run_side_effects_as_task.signature(kwargs=task_kwargs) transaction.on_commit(sig.delay) @@ -170,11 +227,6 @@ def queue_task(self, task_kwargs): class CallbacksSingleTask(CeleryCommandMixin, Callbacks): """Callbacks commands executed as a single celery task""" - def get_task_kwargs(self, state: State, **kwargs): - task_kwargs = super().get_task_kwargs(state, **kwargs) - task_kwargs['transition'] = self._transition - return task_kwargs - def queue_task(self, task_kwargs): sig = run_callbacks_as_task.signature(kwargs=task_kwargs) transaction.on_commit(sig.delay) diff --git a/makefile b/makefile new file mode 100644 index 0000000..5a09453 --- /dev/null +++ b/makefile @@ -0,0 +1,14 @@ +PROJECT_NAME = django-logic-celery +info: + echo "Usage: make " + echo "Targets:" + echo " build - Build the Docker image" + echo " test - Run the tests" + echo " sh - Run a django shell" + +build: + docker build -t $(PROJECT_NAME) . +test: + docker run -p 8000:8000 -v $(PWD):/app $(PROJECT_NAME) python tests/manage.py test +sh: + docker run -p 8000:8000 -v $(PWD):/app $(PROJECT_NAME) python tests/manage.py shell