diff --git a/coriolis/tests/integration/transfers/test_schedules.py b/coriolis/tests/integration/transfers/test_schedules.py index 6591e88f3..be6f64ec9 100644 --- a/coriolis/tests/integration/transfers/test_schedules.py +++ b/coriolis/tests/integration/transfers/test_schedules.py @@ -34,6 +34,20 @@ def _create_schedule(self, **overrides): return sched + def _wait_for_scheduled_execution(self, timeout=180): + """Poll until the schedule triggers a transfer execution. + + Returns the first execution that appears, or None on timeout. + """ + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + executions = self._client.transfer_executions.list( + self._transfer.id, limit=1) + if executions: + return executions[0] + time.sleep(2) + return None + class TransferScheduleBasicTests(_TransferScheduleTestBase): @@ -84,19 +98,42 @@ def test_scheduled_execution(self): enabled=True, ) - # Poll until an execution appears (up to 180 s). - deadline = time.monotonic() + 180 - execution = None - while time.monotonic() < deadline: - executions = self._client.transfer_executions.list( - self._transfer.id) - if executions: - execution = executions[0] - break - time.sleep(2) + execution = self._wait_for_scheduled_execution(timeout=180) self.assertIsNotNone( execution, "No transfer execution was triggered within 180s by the schedule") self.assertExecutionCompleted(execution.id) + + def test_updated_schedule_triggers_execution(self): + """Editing a schedule must re-register it with the new timing. + + Covers the case where a schedule is created with one timing and then + updated: the updated schedule must fire. If register/update were + handled out-of-band from the running cron loop, the new timing would + never take effect. + """ + now = timeutils.utcnow() + # Create with a timing far enough away that it will not fire during + # the test (top of an hour several hours from now). + sched = self._create_schedule( + schedule={"hour": (now.hour + 5) % 24, "minute": 0}, + enabled=True, + ) + + # Update it to the next wall-clock minute. + target = timeutils.utcnow() + datetime.timedelta(seconds=65) + self._client.transfer_schedules.update( + self._transfer.id, + sched.id, + {"schedule": {"minute": target.minute, "hour": target.hour}}, + ) + + execution = self._wait_for_scheduled_execution(timeout=180) + self.assertIsNotNone( + execution, + "Updated schedule did not trigger a transfer execution within " + "180s") + + self.assertExecutionCompleted(execution.id) diff --git a/coriolis/tests/transfer_cron/rpc/test_server.py b/coriolis/tests/transfer_cron/rpc/test_server.py index 0c00050ad..b136c68c5 100644 --- a/coriolis/tests/transfer_cron/rpc/test_server.py +++ b/coriolis/tests/transfer_cron/rpc/test_server.py @@ -56,9 +56,31 @@ class TransferCronServerEndpointTestCase(test_base.CoriolisBaseTestCase): """Test suite for the Coriolis TransferCronServerEndpoint class.""" @mock.patch.object(server.TransferCronServerEndpoint, '_init_cron') - def setUp(self, _): + @mock.patch.object(server.os, 'register_at_fork') + def setUp(self, mock_register_at_fork, _): super(TransferCronServerEndpointTestCase, self).setUp() self.server = server.TransferCronServerEndpoint() + # Pretend cron is already running so register/unregister don't + # trigger a lazy _init_cron() during these isolated tests. + self.server._cron_started = True + + @mock.patch.object(server.os, 'register_at_fork') + @mock.patch.object(server.TransferCronServerEndpoint, '_init_cron') + def test_init_defers_cron_to_after_fork( + self, mock_init_cron, mock_register_at_fork): + srv = server.TransferCronServerEndpoint() + + mock_init_cron.assert_not_called() + mock_register_at_fork.assert_called_once_with( + after_in_child=srv._ensure_cron_started) + + @mock.patch.object(server.TransferCronServerEndpoint, '_init_cron') + def test_ensure_cron_started_is_idempotent(self, mock_init_cron): + self.server._cron_started = False + self.server._ensure_cron_started() + self.server._ensure_cron_started() + + mock_init_cron.assert_called_once() @ddt.data( { diff --git a/coriolis/transfer_cron/rpc/server.py b/coriolis/transfer_cron/rpc/server.py index 1b0b2cb2a..901a8a887 100644 --- a/coriolis/transfer_cron/rpc/server.py +++ b/coriolis/transfer_cron/rpc/server.py @@ -2,6 +2,8 @@ # All Rights Reserved. import json +import os +import threading from oslo_log import log as logging from oslo_utils import timeutils @@ -38,7 +40,21 @@ def __init__(self): # Setup cron loop self._cron = cron.Cron() self._admin_ctx = context.get_admin_context() - self._init_cron() + self._cron_lock = threading.Lock() + self._cron_started = False + # NOTE (fabi200123): oslo_service forks worker processes even when + # workers=1. The cron loop must run in the same process as the RPC + # handlers that register/unregister jobs, otherwise the loop checks + # a job registry in the parent process while registrations land in + # the forked child. Defer cron startup until after the fork. + os.register_at_fork(after_in_child=self._ensure_cron_started) + + def _ensure_cron_started(self): + with self._cron_lock: + if self._cron_started: + return + self._init_cron() + self._cron_started = True def _deserialize_schedule(self, sched): expires = sched.get("expiration_date") @@ -88,12 +104,14 @@ def _get_all_schedules(self): return schedules def register(self, ctxt, schedule): + self._ensure_cron_started() now = timeutils.utcnow() LOG.debug("Registering new schedule %s: %r" % ( schedule["id"], schedule["schedule"])) self._register_schedule(schedule, date=now) def unregister(self, ctxt, schedule): + self._ensure_cron_started() schedule_id = schedule["id"] LOG.debug("removing schedule %s" % schedule_id) self._cron.unregister(schedule_id)