Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 47 additions & 10 deletions coriolis/tests/integration/transfers/test_schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ def _create_schedule(self, **overrides):

return sched

def _wait_for_scheduled_execution(self, timeout=180):
"""Poll until the schedule triggers a transfer execution.

Returns the first execution that appears, or None on timeout.
"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
executions = self._client.transfer_executions.list(
self._transfer.id, limit=1)
if executions:
return executions[0]
time.sleep(2)
return None


class TransferScheduleBasicTests(_TransferScheduleTestBase):

Expand Down Expand Up @@ -84,19 +98,42 @@ def test_scheduled_execution(self):
enabled=True,
)

# Poll until an execution appears (up to 180 s).
deadline = time.monotonic() + 180
execution = None
while time.monotonic() < deadline:
executions = self._client.transfer_executions.list(
self._transfer.id)
if executions:
execution = executions[0]
break
time.sleep(2)
execution = self._wait_for_scheduled_execution(timeout=180)

self.assertIsNotNone(
execution,
"No transfer execution was triggered within 180s by the schedule")

self.assertExecutionCompleted(execution.id)

def test_updated_schedule_triggers_execution(self):
"""Editing a schedule must re-register it with the new timing.

Covers the case where a schedule is created with one timing and then
updated: the updated schedule must fire. If register/update were
handled out-of-band from the running cron loop, the new timing would
never take effect.
"""
now = timeutils.utcnow()
# Create with a timing far enough away that it will not fire during
# the test (top of an hour several hours from now).
sched = self._create_schedule(
schedule={"hour": (now.hour + 5) % 24, "minute": 0},
enabled=True,
)

# Update it to the next wall-clock minute.
target = timeutils.utcnow() + datetime.timedelta(seconds=65)
self._client.transfer_schedules.update(
self._transfer.id,
sched.id,
{"schedule": {"minute": target.minute, "hour": target.hour}},
)

execution = self._wait_for_scheduled_execution(timeout=180)
self.assertIsNotNone(
execution,
"Updated schedule did not trigger a transfer execution within "
"180s")

self.assertExecutionCompleted(execution.id)
24 changes: 23 additions & 1 deletion coriolis/tests/transfer_cron/rpc/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,31 @@ class TransferCronServerEndpointTestCase(test_base.CoriolisBaseTestCase):
"""Test suite for the Coriolis TransferCronServerEndpoint class."""

@mock.patch.object(server.TransferCronServerEndpoint, '_init_cron')
def setUp(self, _):
@mock.patch.object(server.os, 'register_at_fork')
def setUp(self, mock_register_at_fork, _):
super(TransferCronServerEndpointTestCase, self).setUp()
self.server = server.TransferCronServerEndpoint()
# Pretend cron is already running so register/unregister don't
# trigger a lazy _init_cron() during these isolated tests.
self.server._cron_started = True

@mock.patch.object(server.os, 'register_at_fork')
@mock.patch.object(server.TransferCronServerEndpoint, '_init_cron')
def test_init_defers_cron_to_after_fork(
self, mock_init_cron, mock_register_at_fork):
srv = server.TransferCronServerEndpoint()

mock_init_cron.assert_not_called()
mock_register_at_fork.assert_called_once_with(
after_in_child=srv._ensure_cron_started)

@mock.patch.object(server.TransferCronServerEndpoint, '_init_cron')
def test_ensure_cron_started_is_idempotent(self, mock_init_cron):
self.server._cron_started = False
self.server._ensure_cron_started()
self.server._ensure_cron_started()

mock_init_cron.assert_called_once()

@ddt.data(
{
Expand Down
20 changes: 19 additions & 1 deletion coriolis/transfer_cron/rpc/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# All Rights Reserved.

import json
import os
import threading

from oslo_log import log as logging
from oslo_utils import timeutils
Expand Down Expand Up @@ -38,7 +40,21 @@ def __init__(self):
# Setup cron loop
self._cron = cron.Cron()
self._admin_ctx = context.get_admin_context()
self._init_cron()
self._cron_lock = threading.Lock()
self._cron_started = False
# NOTE (fabi200123): oslo_service forks worker processes even when
# workers=1. The cron loop must run in the same process as the RPC
# handlers that register/unregister jobs, otherwise the loop checks
# a job registry in the parent process while registrations land in
# the forked child. Defer cron startup until after the fork.
os.register_at_fork(after_in_child=self._ensure_cron_started)

def _ensure_cron_started(self):
with self._cron_lock:
if self._cron_started:
return
self._init_cron()
self._cron_started = True

def _deserialize_schedule(self, sched):
expires = sched.get("expiration_date")
Expand Down Expand Up @@ -88,12 +104,14 @@ def _get_all_schedules(self):
return schedules

def register(self, ctxt, schedule):
self._ensure_cron_started()
now = timeutils.utcnow()
LOG.debug("Registering new schedule %s: %r" % (
schedule["id"], schedule["schedule"]))
self._register_schedule(schedule, date=now)

def unregister(self, ctxt, schedule):
self._ensure_cron_started()
schedule_id = schedule["id"]
LOG.debug("removing schedule %s" % schedule_id)
self._cron.unregister(schedule_id)
Expand Down
Loading