From 3a2051e6a8e83bcbe6f333d2a2dc0bbdc9d1e74d Mon Sep 17 00:00:00 2001 From: Fabian Fulga Date: Fri, 19 Jun 2026 15:07:11 +0300 Subject: [PATCH 1/2] Fix scheduled transfers not triggering on cron schedule The transfer-cron service started its cron loop in the TransferCronServerEndpoint constructor, which runs in the parent process before oslo_service forks its worker. Since oslo_service forks a worker even with workers=1, the cron loop ran in the parent while the RPC register/unregister handlers ran in the forked child. Each process kept its own in-memory job registry, so schedules created or updated at runtime were added to the child's registry while the parent's loop kept checking an empty one, and scheduled transfers never started. This was caused by the migration from eventlet to threading which introduced the parent/child process split for this. --- .../tests/transfer_cron/rpc/test_server.py | 24 ++++++++++++++++++- coriolis/transfer_cron/rpc/server.py | 20 +++++++++++++++- 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/coriolis/tests/transfer_cron/rpc/test_server.py b/coriolis/tests/transfer_cron/rpc/test_server.py index 0c00050ad..b136c68c5 100644 --- a/coriolis/tests/transfer_cron/rpc/test_server.py +++ b/coriolis/tests/transfer_cron/rpc/test_server.py @@ -56,9 +56,31 @@ class TransferCronServerEndpointTestCase(test_base.CoriolisBaseTestCase): """Test suite for the Coriolis TransferCronServerEndpoint class.""" @mock.patch.object(server.TransferCronServerEndpoint, '_init_cron') - def setUp(self, _): + @mock.patch.object(server.os, 'register_at_fork') + def setUp(self, mock_register_at_fork, _): super(TransferCronServerEndpointTestCase, self).setUp() self.server = server.TransferCronServerEndpoint() + # Pretend cron is already running so register/unregister don't + # trigger a lazy _init_cron() during these isolated tests. + self.server._cron_started = True + + @mock.patch.object(server.os, 'register_at_fork') + @mock.patch.object(server.TransferCronServerEndpoint, '_init_cron') + def test_init_defers_cron_to_after_fork( + self, mock_init_cron, mock_register_at_fork): + srv = server.TransferCronServerEndpoint() + + mock_init_cron.assert_not_called() + mock_register_at_fork.assert_called_once_with( + after_in_child=srv._ensure_cron_started) + + @mock.patch.object(server.TransferCronServerEndpoint, '_init_cron') + def test_ensure_cron_started_is_idempotent(self, mock_init_cron): + self.server._cron_started = False + self.server._ensure_cron_started() + self.server._ensure_cron_started() + + mock_init_cron.assert_called_once() @ddt.data( { diff --git a/coriolis/transfer_cron/rpc/server.py b/coriolis/transfer_cron/rpc/server.py index 1b0b2cb2a..901a8a887 100644 --- a/coriolis/transfer_cron/rpc/server.py +++ b/coriolis/transfer_cron/rpc/server.py @@ -2,6 +2,8 @@ # All Rights Reserved. import json +import os +import threading from oslo_log import log as logging from oslo_utils import timeutils @@ -38,7 +40,21 @@ def __init__(self): # Setup cron loop self._cron = cron.Cron() self._admin_ctx = context.get_admin_context() - self._init_cron() + self._cron_lock = threading.Lock() + self._cron_started = False + # NOTE (fabi200123): oslo_service forks worker processes even when + # workers=1. The cron loop must run in the same process as the RPC + # handlers that register/unregister jobs, otherwise the loop checks + # a job registry in the parent process while registrations land in + # the forked child. Defer cron startup until after the fork. + os.register_at_fork(after_in_child=self._ensure_cron_started) + + def _ensure_cron_started(self): + with self._cron_lock: + if self._cron_started: + return + self._init_cron() + self._cron_started = True def _deserialize_schedule(self, sched): expires = sched.get("expiration_date") @@ -88,12 +104,14 @@ def _get_all_schedules(self): return schedules def register(self, ctxt, schedule): + self._ensure_cron_started() now = timeutils.utcnow() LOG.debug("Registering new schedule %s: %r" % ( schedule["id"], schedule["schedule"])) self._register_schedule(schedule, date=now) def unregister(self, ctxt, schedule): + self._ensure_cron_started() schedule_id = schedule["id"] LOG.debug("removing schedule %s" % schedule_id) self._cron.unregister(schedule_id) From 5422df099446d0b35f2fde8c168746cd685e7007 Mon Sep 17 00:00:00 2001 From: Fabian Fulga Date: Fri, 19 Jun 2026 16:01:44 +0300 Subject: [PATCH 2/2] Add integration tests for scheduling to wait for the transfer to start --- .../integration/transfers/test_schedules.py | 57 +++++++++++++++---- 1 file changed, 47 insertions(+), 10 deletions(-) diff --git a/coriolis/tests/integration/transfers/test_schedules.py b/coriolis/tests/integration/transfers/test_schedules.py index 6591e88f3..be6f64ec9 100644 --- a/coriolis/tests/integration/transfers/test_schedules.py +++ b/coriolis/tests/integration/transfers/test_schedules.py @@ -34,6 +34,20 @@ def _create_schedule(self, **overrides): return sched + def _wait_for_scheduled_execution(self, timeout=180): + """Poll until the schedule triggers a transfer execution. + + Returns the first execution that appears, or None on timeout. + """ + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + executions = self._client.transfer_executions.list( + self._transfer.id, limit=1) + if executions: + return executions[0] + time.sleep(2) + return None + class TransferScheduleBasicTests(_TransferScheduleTestBase): @@ -84,19 +98,42 @@ def test_scheduled_execution(self): enabled=True, ) - # Poll until an execution appears (up to 180 s). - deadline = time.monotonic() + 180 - execution = None - while time.monotonic() < deadline: - executions = self._client.transfer_executions.list( - self._transfer.id) - if executions: - execution = executions[0] - break - time.sleep(2) + execution = self._wait_for_scheduled_execution(timeout=180) self.assertIsNotNone( execution, "No transfer execution was triggered within 180s by the schedule") self.assertExecutionCompleted(execution.id) + + def test_updated_schedule_triggers_execution(self): + """Editing a schedule must re-register it with the new timing. + + Covers the case where a schedule is created with one timing and then + updated: the updated schedule must fire. If register/update were + handled out-of-band from the running cron loop, the new timing would + never take effect. + """ + now = timeutils.utcnow() + # Create with a timing far enough away that it will not fire during + # the test (top of an hour several hours from now). + sched = self._create_schedule( + schedule={"hour": (now.hour + 5) % 24, "minute": 0}, + enabled=True, + ) + + # Update it to the next wall-clock minute. + target = timeutils.utcnow() + datetime.timedelta(seconds=65) + self._client.transfer_schedules.update( + self._transfer.id, + sched.id, + {"schedule": {"minute": target.minute, "hour": target.hour}}, + ) + + execution = self._wait_for_scheduled_execution(timeout=180) + self.assertIsNotNone( + execution, + "Updated schedule did not trigger a transfer execution within " + "180s") + + self.assertExecutionCompleted(execution.id)