Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
FROM python:3.12-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*

# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy the project files
COPY . .

# Install the package in development mode
RUN pip install -e .

# Disable bytecode compilation
ENV PYTHONDONTWRITEBYTECODE=1
# Unbuffer stdout and stderr
ENV PYTHONUNBUFFERED=1
# Set default command
CMD ["python", "tests/manage.py", "runserver", "0.0.0.0:8000"]

12 changes: 9 additions & 3 deletions demo/tests/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,18 @@ def test_invoice_callbacks(self, debug_method):
'model_name': 'invoice',
'instance_id': invoice.id,
'process_name': 'invoice_process',
'action_name': 'demo',
'field_name': 'status',
'transition': InvoiceProcess.transitions[3]
'foo': 'bar'
}
expected_callbacks_kwargs = {
'app_label': 'demo',
'model_name': 'invoice',
'instance_id': invoice.id,
'process_name': 'invoice_process',
'action_name': 'demo',
'field_name': 'status',
'foo': 'bar'
}
self.assertEqual(list(debug_method.call_args_list[0]), [('demo_task_1',), expected_side_effects_kwargs])
self.assertEqual(list(debug_method.call_args_list[1]), [('demo_task_2', None), expected_side_effects_kwargs])
Expand Down Expand Up @@ -83,16 +86,19 @@ def test_invoice_failure_callbacks(self, debug_method):
'model_name': 'invoice',
'instance_id': invoice.id,
'process_name': 'invoice_process',
'action_name': 'failing_transition',
'field_name': 'status',
'transition': InvoiceProcess.transitions[5]
'foo': 'bar'
}
expected_callbacks_kwargs = {
'app_label': 'demo',
'model_name': 'invoice',
'instance_id': invoice.id,
'process_name': 'invoice_process',
'action_name': 'failing_transition',
'field_name': 'status',
'exception': ctx.exception,
'foo': 'bar',
'exception': ctx.exception
}
self.assertEqual(list(debug_method.call_args_list[0]), [('demo_task_1',), expected_side_effects_kwargs])
self.assertEqual(list(debug_method.call_args_list[1]), [('demo_task_3',), expected_callbacks_kwargs])
Expand Down
94 changes: 73 additions & 21 deletions django_logic_celery/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,46 @@
from django_logic.state import State


def _find_transition_in_process(process, action_name):
"""Recursively search for transition by action_name in process and nested processes"""
# Search in current process transitions
for transition in process.transitions:
if transition.action_name == action_name:
return transition

# Search in nested processes recursively
for sub_process_class in process.nested_processes:
sub_process = sub_process_class(state=process.state)
result = _find_transition_in_process(sub_process, action_name)
if result is not None:
return result

return None


def get_transition_from_process(instance, process_name, action_name):
"""Helper function to retrieve transition from process by action_name"""
process = getattr(instance, process_name)
transition = _find_transition_in_process(process, action_name)
if transition is None:
raise ValueError(f"Transition with action_name '{action_name}' not found in process '{process_name}'")
return transition


@shared_task(acks_late=True)
def complete_transition(*args, **kwargs):
"""Completes transition """
app_label = kwargs['app_label']
model_name = kwargs['model_name']
instance_id = kwargs['instance_id']
transition = kwargs['transition']
action_name = kwargs['action_name']
process_name = kwargs['process_name']

app = apps.get_app_config(app_label)
model = app.get_model(model_name)
instance = model.objects.get(id=instance_id)
state = getattr(instance, process_name).state
transition = get_transition_from_process(instance, process_name, action_name)

logging.info(f'{state.instance_key} complete transition task started')
transition.complete_transition(state, **kwargs)
Expand All @@ -37,13 +64,15 @@ def fail_transition(task_id, *args, **kwargs):
app_label = kwargs['app_label']
model_name = kwargs['model_name']
instance_id = kwargs['instance_id']
transition = kwargs['transition']
action_name = kwargs['action_name']
process_name = kwargs['process_name']

try:
app = apps.get_app_config(app_label)
model = app.get_model(kwargs['model_name'])
instance = model.objects.get(id=kwargs['instance_id'])
state = getattr(instance, kwargs['process_name']).state
transition = get_transition_from_process(instance, process_name, action_name)
try:
# If exception is raised in success callback, it will be passed through args
error = args[0]
Expand All @@ -54,18 +83,19 @@ def fail_transition(task_id, *args, **kwargs):
logging.exception(error)
transition.fail_transition(state, error, **kwargs)
except Exception as error:
logging.info(f'{app_label}-{model_name}-{transition.action_name}-{instance_id}'
logging.info(f'{app_label}-{model_name}-{action_name}-{instance_id}'
f'failure handler failed with error: {error}')
logging.exception(error)


@shared_task(acks_late=True)
def run_side_effects_as_task(app_label, model_name, transition, instance_id, process_name, **kwargs):
def run_side_effects_as_task(app_label, model_name, action_name, instance_id, process_name, **kwargs):
"""It runs all side-effects of provided transition under a single task"""
app = apps.get_app_config(app_label)
model = app.get_model(model_name)
instance = model.objects.get(id=instance_id)
state = getattr(instance, process_name).state
transition = get_transition_from_process(instance, process_name, action_name)
logging.info(f"{state.instance_key} side effects of '{transition.action_name}' started")

try:
Expand All @@ -81,13 +111,14 @@ def run_side_effects_as_task(app_label, model_name, transition, instance_id, pro


@shared_task(acks_late=True)
def run_callbacks_as_task(app_label, model_name, transition, instance_id, process_name, **kwargs):
def run_callbacks_as_task(app_label, model_name, action_name, instance_id, process_name, **kwargs):
"""It runs all callbacks of provided transition under a single task"""
try:
app = apps.get_app_config(app_label)
model = app.get_model(model_name)
instance = model.objects.get(id=instance_id)
state = getattr(instance, process_name).state
transition = get_transition_from_process(instance, process_name, action_name)
logging.info(f"{state.instance_key} callbacks of '{transition.action_name}' started")

exception = kwargs.get('exception')
Expand All @@ -96,7 +127,7 @@ def run_callbacks_as_task(app_label, model_name, transition, instance_id, proces
for callback in commands:
callback(instance, **callback_kwargs)
except Exception as error:
logging.info(f'{app_label}-{model_name}-{transition.action_name}-{instance_id}'
logging.info(f'{app_label}-{model_name}-{action_name}-{instance_id}'
f'callbacks failed with error: {error}')
logging.exception(error)

Expand All @@ -119,10 +150,28 @@ def get_task_kwargs(self, state: State, **kwargs):
model_name=state.instance._meta.model_name,
instance_id=state.instance.pk,
process_name=state.process_name,
field_name=state.field_name
field_name=state.field_name,
action_name=self._transition.action_name
)
if 'exception' in kwargs:
task_kwargs['exception'] = kwargs['exception']

# Only include serializable kwargs - convert objects to IDs where possible
serializable_kwargs = {}
for key, value in kwargs.items():
if key == 'exception':
serializable_kwargs[key] = value
continue
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Exception objects passed to Celery are not JSON serializable

The code explicitly passes exception objects through to serializable_kwargs without any serialization. When key == 'exception', the value (a Python exception object) is included directly in the kwargs that get passed to Celery task signatures. Since Python exception objects are not JSON serializable, this will cause the same type of kombu.exceptions.EncodeError that this PR aims to fix. The exception needs to be converted to a string representation (e.g., str(value)) or excluded from the Celery task kwargs entirely.

Fix in Cursor Fix in Web

# Skip functions and other callables
if callable(value) and not isinstance(value, type):
continue
# Convert user objects to user_id
if key == 'user' and hasattr(value, 'pk'):
serializable_kwargs['user_id'] = value.pk
continue
# Include only primitive serializable types
if isinstance(value, (str, int, float, bool, type(None))):
serializable_kwargs[key] = value

task_kwargs.update(serializable_kwargs)

return task_kwargs

Expand All @@ -138,9 +187,22 @@ class SideEffectTasks(CeleryCommandMixin, SideEffects):
"""

def queue_task(self, task_kwargs):
header = [signature(task_name, kwargs=task_kwargs) for task_name in self.commands]
# Convert function objects to task names (strings)
task_names = []
for cmd in self.commands:
if isinstance(cmd, str):
task_names.append(cmd)
elif hasattr(cmd, 'name'):
# Celery task object
task_names.append(cmd.name)
elif callable(cmd):
# Regular function - use its name (assumes it's registered as a Celery task)
task_names.append(cmd.__name__)
else:
task_names.append(str(cmd))

header = [signature(task_name, kwargs=task_kwargs) for task_name in task_names]
header = chain(*header)
task_kwargs.update(dict(transition=self._transition))
body = complete_transition.s(**task_kwargs)
tasks = chain(header | body).on_error(fail_transition.s(**task_kwargs))
transaction.on_commit(tasks.delay)
Expand All @@ -157,11 +219,6 @@ def queue_task(self, task_kwargs):
class SideEffectSingleTask(CeleryCommandMixin, SideEffects):
"""Side-effects commands executed as a single celery task"""

def get_task_kwargs(self, state: State, **kwargs):
task_kwargs = super().get_task_kwargs(state, **kwargs)
task_kwargs['transition'] = self._transition
return task_kwargs

def queue_task(self, task_kwargs):
sig = run_side_effects_as_task.signature(kwargs=task_kwargs)
transaction.on_commit(sig.delay)
Expand All @@ -170,11 +227,6 @@ def queue_task(self, task_kwargs):
class CallbacksSingleTask(CeleryCommandMixin, Callbacks):
"""Callbacks commands executed as a single celery task"""

def get_task_kwargs(self, state: State, **kwargs):
task_kwargs = super().get_task_kwargs(state, **kwargs)
task_kwargs['transition'] = self._transition
return task_kwargs

def queue_task(self, task_kwargs):
sig = run_callbacks_as_task.signature(kwargs=task_kwargs)
transaction.on_commit(sig.delay)
14 changes: 14 additions & 0 deletions makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
PROJECT_NAME = django-logic-celery
info:
echo "Usage: make <target>"
echo "Targets:"
echo " build - Build the Docker image"
echo " test - Run the tests"
echo " sh - Run a django shell"

build:
docker build -t $(PROJECT_NAME) .
test:
docker run -p 8000:8000 -v $(PWD):/app $(PROJECT_NAME) python tests/manage.py test
sh:
docker run -p 8000:8000 -v $(PWD):/app $(PROJECT_NAME) python tests/manage.py shell