Skip to content

Commit b0ff7c7

Browse files
committed
Implement non-blocking disk-based state queue to fix race conditions
- Replace blocking `while` loop in `state.apply(queue=True)` with a non-blocking queue-and-return model to prevent connection timeouts. - Implement `process_state_queue` in the Minion `periodic_callbacks` to asynchronously execute queued jobs. - Use `filelock` to synchronize access to the queue directory (`cachedir/state_queue`) and prevent race inversions. - Serialize job payloads to disk (`queued_<timestamp>_<jid>.p`) to ensure strict FIFO ordering and persistence. - Fix "Invisible Gap" race condition by reserving a process slot before execution. - Update `_prior_running_states` to respect both currently running jobs and queued jobs. - Handle local execution (`salt-call`) scenarios where `__pub_jid` is missing. - Add `filelock` to `salt.utils.thin` to support the queue lock in thin/remote environments. - Add `test_state_queue.py` integration test to verify queuing behavior and execution order.
1 parent c871fac commit b0ff7c7

6 files changed

Lines changed: 464 additions & 122 deletions

File tree

salt/minion.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import salt.utils.process
5858
import salt.utils.schedule
5959
import salt.utils.ssdp
60+
import salt.utils.state
6061
import salt.utils.user
6162
import salt.utils.zeromq
6263
from salt._compat import ipaddress
@@ -3224,6 +3225,111 @@ def handle_schedule():
32243225

32253226
self.add_periodic_callback("schedule", handle_schedule)
32263227

3228+
def setup_state_queue_processing(self):
3229+
"""
3230+
Set up the state queue processing.
3231+
This is safe to call multiple times.
3232+
"""
3233+
if "state_queue" not in self.periodic_callbacks:
3234+
3235+
def handle_state_queue():
3236+
try:
3237+
self.process_state_queue()
3238+
except Exception: # pylint: disable=broad-exception-caught
3239+
log.critical("The state queue processing errored: ", exc_info=True)
3240+
3241+
self.add_periodic_callback("state_queue", handle_state_queue, interval=1)
3242+
3243+
def process_state_queue(self):
3244+
"""
3245+
Check the state queue for pending jobs and execute them if safe.
3246+
"""
3247+
queue_dir = os.path.join(self.opts["cachedir"], "state_queue")
3248+
if not os.path.exists(queue_dir):
3249+
return
3250+
3251+
# Acquire lock to check queue
3252+
with salt.utils.state.acquire_queue_lock(self.opts):
3253+
# Check if any state jobs are running
3254+
# We use saltutil.is_running logic
3255+
active = self.functions["saltutil.is_running"]("state.*")
3256+
if active:
3257+
log.info("State queue processing: active jobs found: %s", active)
3258+
return
3259+
3260+
# No active jobs, check for queued jobs
3261+
files = []
3262+
try:
3263+
for fn in os.listdir(queue_dir):
3264+
if fn.startswith("queued_") and fn.endswith(".p"):
3265+
files.append(fn)
3266+
except OSError:
3267+
pass
3268+
3269+
if not files:
3270+
log.info(
3271+
"State queue processing: no queued files found in %s", queue_dir
3272+
)
3273+
return
3274+
3275+
log.info("State queue processing: found queued files: %s", files)
3276+
3277+
# Sort by timestamp (filename)
3278+
# queued_<timestamp>_<jid>.p
3279+
files.sort()
3280+
3281+
# Pick oldest
3282+
fn = files[0]
3283+
path = os.path.join(queue_dir, fn)
3284+
3285+
try:
3286+
with salt.utils.files.fopen(path, "rb") as fp_:
3287+
data = salt.payload.load(fp_)
3288+
except (OSError, ValueError):
3289+
# Corrupt or unreadable?
3290+
log.error("Failed to load queued job %s, removing.", fn)
3291+
try:
3292+
os.remove(path)
3293+
except OSError:
3294+
pass
3295+
return
3296+
3297+
# Execute
3298+
log.info("Executing queued job %s", data.get("jid"))
3299+
3300+
# Create a placeholder proc file to ensure visibility during the transition
3301+
# from queue to execution. This prevents a race condition where a new job
3302+
# checks the queue after we remove the file but before the job actually starts.
3303+
if "jid" in data:
3304+
try:
3305+
# Attempt to use the configured proc_dir if available
3306+
proc_dir = getattr(
3307+
self, "proc_dir", os.path.join(self.opts["cachedir"], "proc")
3308+
)
3309+
if not os.path.isdir(proc_dir):
3310+
# If it doesn't exist yet (unlikely), try to make it or just ignore
3311+
pass
3312+
else:
3313+
proc_path = os.path.join(proc_dir, str(data["jid"]))
3314+
with salt.utils.files.fopen(proc_path, "w+b") as fp_:
3315+
data["pid"] = os.getpid()
3316+
salt.payload.dump(data, fp_)
3317+
except OSError:
3318+
# If we fail to create the placeholder, we still proceed.
3319+
# The race is rare and handling it is best-effort.
3320+
pass
3321+
3322+
try:
3323+
os.remove(path)
3324+
except OSError:
3325+
pass
3326+
3327+
if hasattr(self, "io_loop"):
3328+
self.io_loop.add_callback(self._handle_decoded_payload, data)
3329+
else:
3330+
# Fallback if io_loop is not explicit (should not happen in Minion)
3331+
self._handle_decoded_payload(data)
3332+
32273333
def add_periodic_callback(self, name, method, interval=1):
32283334
"""
32293335
Add a periodic callback to the event loop and call its start method.
@@ -3281,6 +3387,7 @@ def tune_in(self, start=True):
32813387

32823388
self.setup_beacons()
32833389
self.setup_scheduler()
3390+
self.setup_state_queue_processing()
32843391
self.add_periodic_callback("cleanup", self.cleanup_subprocesses)
32853392

32863393
# schedule the stuff that runs every interval

salt/modules/state.py

Lines changed: 85 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
import time
2020
from collections import OrderedDict
2121

22-
import filelock
23-
2422
import salt.config
2523
import salt.defaults.exitcodes
2624
import salt.payload
@@ -129,13 +127,13 @@ def _wait(jid, max_queue=0):
129127
if jid is None:
130128
jid = salt.utils.jid.gen_jid(__opts__)
131129

132-
with _acquire_queue_lock():
130+
with salt.utils.state.acquire_queue_lock(__opts__):
133131
states = _prior_running_states(jid)
134132

135133
if not max_queue or len(states) < max_queue:
136134
while states:
137135
time.sleep(1)
138-
with _acquire_queue_lock():
136+
with salt.utils.state.acquire_queue_lock(__opts__):
139137
states = _prior_running_states(jid)
140138
return True
141139
return False
@@ -404,8 +402,7 @@ def _acquire_queue_lock():
404402
"""
405403
Acquire the state queue lock
406404
"""
407-
lock_path = os.path.join(__opts__["cachedir"], "state_queue.lock")
408-
return filelock.FileLock(lock_path)
405+
return salt.utils.state.acquire_queue_lock(__opts__)
409406

410407

411408
def _set_queue_flag(jid):
@@ -449,57 +446,39 @@ def _prior_running_states(jid):
449446
Return a list of dicts of prior calls to state functions. This function is
450447
used to queue state calls so only one is run at a time.
451448
"""
452-
453449
ret = []
454450
active = __salt__["saltutil.is_running"]("state.*")
455451

456-
# Also check the state_queue directory for jobs that are starting up
452+
# Check for queued jobs
457453
queue_dir = os.path.join(__opts__["cachedir"], "state_queue")
458454
if os.path.exists(queue_dir):
459-
# We assume the lock is held by the caller (_wait)
460455
for fn in os.listdir(queue_dir):
461-
# Check if this jid is already in active list to avoid duplicates
462-
if any(str(item["jid"]) == fn for item in active):
463-
continue
464-
465-
# Construct a dummy data object for the queued job
466-
try:
467-
# verify it is a valid integer JID
468-
int(fn)
469-
470-
# Check if the process is still running
471-
queue_path = os.path.join(queue_dir, fn)
472-
try:
473-
with salt.utils.files.fopen(queue_path, "r") as fp_:
474-
pid = int(fp_.read().strip())
475-
if not salt.utils.process.os_is_running(pid):
476-
# The process is dead, clean up the flag
477-
os.remove(queue_path)
478-
continue
479-
except (ValueError, OSError):
480-
# If we can't read the PID or file, assume it's stale or bad
481-
continue
482-
483-
active.append({"jid": fn, "fun": "unknown", "pid": pid})
484-
except ValueError:
485-
continue
456+
if fn.startswith("queued_") and fn.endswith(".p"):
457+
# fn is queued_<timestamp>_<jid>.p
458+
parts = fn[:-2].split("_")
459+
if len(parts) >= 3:
460+
job_jid = parts[2]
461+
# We use PID 0 or similar to indicate it's not a real process yet,
462+
# but saltutil.is_running structure usually expects a pid.
463+
active.append({"jid": job_jid, "fun": "state.apply", "pid": 0})
486464

487465
for data in active:
488466
try:
489467
data_jid = int(data["jid"])
490468
except ValueError:
491469
continue
492-
if data_jid < int(jid):
470+
471+
if jid is None:
472+
# If no JID is provided (e.g. local call without JID), assume current job is newer
473+
# than any running job, so any running job is a "prior" job.
493474
ret.append(data)
494-
elif data_jid > int(jid):
495-
# If the jid is newer than the current jid, check if the newer jid
496-
# has set a queue flag. If the flag is set, it means that the newer
497-
# jid is waiting for the older jid to finish, so we can ignore it.
498-
# If the flag is not set, it means that the newer jid has already
499-
# started running, so we must wait for it to finish.
500-
queue_path = os.path.join(queue_dir, str(data_jid))
501-
if not os.path.exists(queue_path):
475+
continue
476+
477+
try:
478+
if data_jid < int(jid):
502479
ret.append(data)
480+
except (ValueError, TypeError):
481+
continue
503482
return ret
504483

505484

@@ -513,15 +492,73 @@ def _check_queue(queue, kwargs):
513492

514493
if queue is True:
515494
jid = kwargs.get("__pub_jid")
516-
_set_queue_flag(jid)
517-
try:
518-
_wait(jid)
519-
finally:
520-
_clear_queue_flag(jid)
495+
if jid is None:
496+
# If running locally (salt-call), JID might be in opts or not present.
497+
# Fallback to __opts__['jid'] to ensure we have a JID for comparison.
498+
jid = __opts__.get("jid")
499+
500+
with salt.utils.state.acquire_queue_lock(__opts__):
501+
states = _prior_running_states(jid)
502+
if states:
503+
# Conflict found, queue the job
504+
queue_dir = os.path.join(__opts__["cachedir"], "state_queue")
505+
if not os.path.exists(queue_dir):
506+
try:
507+
os.makedirs(queue_dir)
508+
except OSError:
509+
pass
510+
511+
# Construct payload to persist
512+
# We need to save enough info to re-execute the job
513+
# Generate a new JID for the queued execution to ensure it is unique
514+
new_jid = salt.utils.jid.gen_jid(__opts__)
515+
516+
# Remove 'queue' from kwargs to prevent re-queuing logic when executed
517+
kwarg = {k: v for k, v in kwargs.items() if not k.startswith("__pub_")}
518+
if "queue" in kwarg:
519+
del kwarg["queue"]
520+
521+
payload = {
522+
"fun": kwargs.get("__pub_fun"),
523+
"arg": kwargs.get("__pub_arg", []),
524+
"tgt": kwargs.get("__pub_tgt"),
525+
"jid": new_jid,
526+
"ret": kwargs.get("__pub_ret", ""),
527+
"user": kwargs.get("__pub_user", "root"),
528+
"kwarg": kwarg,
529+
}
530+
531+
# Use timestamp to ensure FIFO ordering
532+
# We use microseconds to avoid collisions
533+
fn = f"queued_{int(time.time() * 1000000)}_{jid}.p"
534+
path = os.path.join(queue_dir, fn)
535+
536+
try:
537+
with salt.utils.files.fopen(path, "w+b") as fp_:
538+
salt.payload.dump(payload, fp_)
539+
540+
return {
541+
"result": True,
542+
"comment": "Job queued for execution",
543+
"queued": True,
544+
"changes": {},
545+
}
546+
except OSError:
547+
log.error("Failed to write queue file %s", path)
548+
return {
549+
"result": False,
550+
"comment": "Failed to queue job: unable to write queue file",
551+
"changes": {},
552+
}
553+
else:
554+
# No conflict, we can run.
555+
pass
556+
521557
else:
522558
queue_ret = False
523559
if not isinstance(queue, bool) and isinstance(queue, int):
524560
jid = kwargs.get("__pub_jid")
561+
# For max_queue (int), we retain blocking behavior but use lock
525562
_set_queue_flag(jid)
526563
try:
527564
queue_ret = _wait(jid, max_queue=queue)

salt/utils/state.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,24 @@
55
"""
66

77
import copy
8+
import os
9+
10+
import filelock
811

912
import salt.state
1013
from salt.exceptions import CommandExecutionError
1114

1215
_empty = object()
1316

1417

18+
def acquire_queue_lock(opts):
19+
"""
20+
Acquire the state queue lock
21+
"""
22+
lock_path = os.path.join(opts["cachedir"], "state_queue.lock")
23+
return filelock.FileLock(lock_path)
24+
25+
1526
def gen_tag(low):
1627
"""
1728
Generate the running dict tag string from the low data structure

salt/utils/thin.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import zipfile
1717

1818
import distro
19+
import filelock
1920
import jinja2
2021
import looseversion
2122
import msgpack
@@ -289,6 +290,7 @@ def get_tops_python(py_ver, exclude=None, ext_py_ver=None):
289290
"backports_abc",
290291
"looseversion",
291292
"packaging",
293+
"filelock",
292294
]
293295
# backports package doesn't exist in Python 3.13+
294296
if sys.version_info < (3, 13):
@@ -442,6 +444,7 @@ def get_tops(extra_mods="", so_mods=""):
442444
looseversion,
443445
packaging,
444446
backports,
447+
filelock,
445448
]
446449
modules = find_site_modules("contextvars")
447450
if modules:

0 commit comments

Comments
 (0)