From e0dfc14914df9fb63dad31a43ae55e1f8054fa11 Mon Sep 17 00:00:00 2001 From: shudson Date: Mon, 25 May 2026 12:46:18 -0500 Subject: [PATCH 1/7] Add libE service mode --- libensemble/gen_classes/queue_gen.py | 97 +++++++++++++++++ libensemble/manager.py | 25 ++++- libensemble/specs.py | 18 ++++ .../tests/regression_tests/test_queue_gen.py | 101 ++++++++++++++++++ 4 files changed, 238 insertions(+), 3 deletions(-) create mode 100644 libensemble/gen_classes/queue_gen.py create mode 100644 libensemble/tests/regression_tests/test_queue_gen.py diff --git a/libensemble/gen_classes/queue_gen.py b/libensemble/gen_classes/queue_gen.py new file mode 100644 index 000000000..27ef1b2bf --- /dev/null +++ b/libensemble/gen_classes/queue_gen.py @@ -0,0 +1,97 @@ +"""Queue-backed generator: work comes from an external producer (e.g. an MCP +tool, an LLM agent, a REST API) via an input queue. Completed results go back +out via an output queue. + +This lets libE run in 'service mode' — driven by an external loop instead of +its own gen function. + +Usage: + from queue import Queue + from gest_api.vocs import VOCS + from libensemble.gen_classes.queue_gen import QueueGenerator + + in_q = Queue() + out_q = Queue() + gen = QueueGenerator(VOCS(variables={"x": [-1, 1]}), + input_queue=in_q, output_queue=out_q) + in_q.put({"x": 0.5}) # external producer feeds work +""" +from queue import Empty, Queue +from typing import Any, List, Optional + +from gest_api import Generator +from gest_api.vocs import VOCS + + +_SHUTDOWN = object() # sentinel: external producer signals "no more work" + + +class QueueGenerator(Generator): + """Generator that pulls work from an external Queue and pushes results back. + + suggest(n): + - Blocks up to ``poll_timeout`` waiting for the FIRST item (so the libE + manager doesn't spin hot when nothing is queued). + - Then drains up to ``n - 1`` more items non-blockingly. + - Returns [] on timeout (libE will call again). + - Returns [] if a shutdown sentinel is seen. + + ingest(results): + - Forwards each result dict to the output queue verbatim. + """ + + def __init__( + self, + vocs: VOCS, + *, + input_queue: Queue, + output_queue: Queue, + poll_timeout: float = 1.0, + ): + self.vocs = vocs + self.input_queue = input_queue + self.output_queue = output_queue + self.poll_timeout = poll_timeout + self._shutdown_seen = False + super().__init__(vocs) + + def _validate_vocs(self, vocs: VOCS) -> None: + assert len(self.vocs.variable_names), "VOCS must contain variables." + + def suggest(self, num_points: Optional[int]) -> List[dict]: + if self._shutdown_seen: + return [] + n = num_points or 1 + items: List[dict] = [] + try: + first = self.input_queue.get(timeout=self.poll_timeout) + except Empty: + return [] + if first is _SHUTDOWN: + self._shutdown_seen = True + return [] + items.append(first) + for _ in range(n - 1): + try: + nxt = self.input_queue.get_nowait() + except Empty: + break + if nxt is _SHUTDOWN: + self._shutdown_seen = True + break + items.append(nxt) + return items + + def ingest(self, results: List[dict]) -> None: + for r in results: + self.output_queue.put(r) + + def finalize(self, results: List[dict] = None, *args: Any, **kwargs: Any): + if results: + self.ingest(results) + return None + + @staticmethod + def shutdown_sentinel(): + """External producer puts this on the input queue to signal end.""" + return _SHUTDOWN diff --git a/libensemble/manager.py b/libensemble/manager.py index 7995d2da9..1e706e5a5 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -213,6 +213,7 @@ def __init__( self.gen_specs = gen_specs self.exit_criteria = exit_criteria self.elapsed = lambda: timer.elapsed + self._service_idle_start: float | None = None # service_mode idle tracker self.wcomms = wcomms self.WorkerExc = False self.persis_pending: list[int] = [] @@ -714,9 +715,27 @@ def run(self, persis_info: dict) -> tuple[dict, int, int]: self._check_work_order(Work[w], w) self._send_work_order(Work[w], w) self._update_state_on_alloc(Work[w], w) - assert self.term_test() or any( - self.W["active"] != 0 - ), "alloc_f did not return any work, although all workers are idle." + if not (self.term_test() or any(self.W["active"] != 0)): + if self.libE_specs.get("service_mode"): + # Service mode: external producer may have nothing + # to dispatch right now. Throttle and re-poll instead + # of asserting (which would treat this as deadlock). + idle_timeout = self.libE_specs.get("service_mode_idle_timeout") + if idle_timeout is not None: + if self._service_idle_start is None: + self._service_idle_start = time.time() + elif time.time() - self._service_idle_start > idle_timeout: + logger.info( + f"service_mode idle for {idle_timeout}s with no work, exiting" + ) + break + time.sleep(0.1) + continue + raise AssertionError( + "alloc_f did not return any work, although all workers are idle." + ) + # work was dispatched OR workers active OR terminating — reset idle clock + self._service_idle_start = None except WorkerException as e: report_worker_exc(e) raise LoggedException(e.args[0], e.args[1]) from None diff --git a/libensemble/specs.py b/libensemble/specs.py index 9ee04baa3..c58709848 100644 --- a/libensemble/specs.py +++ b/libensemble/specs.py @@ -439,6 +439,24 @@ class LibeSpecs(BaseModel): By default, the generator runs on the manager process as a thread (Worker 0). """ + service_mode: bool = False + """ + If True, the manager tolerates idle workers + an alloc returning no work + (instead of asserting). Used when libEnsemble is driven by an external + producer (e.g. a queue-backed Generator fed by an MCP server) and may + legitimately have nothing to dispatch for periods of time. The manager + sleeps briefly between checks instead of panicking. Termination is the + caller's responsibility (e.g. via ``exit_criteria`` or external stop). + """ + + service_mode_idle_timeout: float | None = None + """ + In ``service_mode``, exit after this many seconds with no active workers + and no work dispatched. ``None`` (default) means run forever waiting for + new submissions. Useful so an MCP server doesn't leave libE running after + the agent goes silent. + """ + mpi_comm: object | None = None """ libEnsemble MPI communicator. Default: ``MPI.COMM_WORLD``""" diff --git a/libensemble/tests/regression_tests/test_queue_gen.py b/libensemble/tests/regression_tests/test_queue_gen.py new file mode 100644 index 000000000..bac58e6ad --- /dev/null +++ b/libensemble/tests/regression_tests/test_queue_gen.py @@ -0,0 +1,101 @@ +"""Test for QueueGenerator + +Spins up libE in a thread with the QueueGenerator + a trivial doubler sim, +submits N work items, drains results, sends shutdown sentinel, joins. + +Run with: + python test_queue_gen.py +""" + +# Do not change these lines - they are parsed by run-tests.sh +# TESTSUITE_COMMS: local +# TESTSUITE_NPROCS: 4 + +import threading +import time +from queue import Empty, Queue + +import numpy as np +from gest_api.vocs import VOCS + +from libensemble import Ensemble +from libensemble.gen_classes.queue_gen import QueueGenerator +from libensemble.specs import ExitCriteria, GenSpecs, LibeSpecs, SimSpecs + +NWORKERS = 4 +N_SUBMITS = 10 + + +def doubler_sim(InputArray, _, sim_specs): + """Trivial sim: returns 2*x. Sleeps a bit to mimic real work.""" + time.sleep(0.5) + out = np.zeros(1, dtype=sim_specs["out"]) + out["y"] = 2.0 * InputArray["x"][0] + return out + + +def run_libe(input_q, output_q, nworkers, sim_max): + vocs = VOCS(variables={"x": [-100.0, 100.0]}, objectives={"y": "MINIMIZE"}) + gen = QueueGenerator(vocs, input_queue=input_q, output_queue=output_q) + + # service_mode=True lets the manager tolerate idle workers + empty alloc + # while we wait for new submissions on the queue. + libE_specs = LibeSpecs( + nworkers=nworkers, + service_mode=True, + final_gen_send=True, # ensure last batch of completed sims is ingested before stop + ) + sim_specs = SimSpecs(sim_f=doubler_sim, inputs=["x"], outputs=[("y", float)]) + gen_specs = GenSpecs(generator=gen, vocs=vocs, persis_in=["x", "y"], batch_size=1) + # QueueGenerator.shutdown_sentinel() which makes suggest return [] forever. + exit_criteria = ExitCriteria(sim_max=sim_max) + + workflow = Ensemble(sim_specs, gen_specs, exit_criteria, libE_specs) + workflow.run() + + +def main(): + input_q: Queue = Queue() + output_q: Queue = Queue() + + libe_thread = threading.Thread( + target=run_libe, args=(input_q, output_q, NWORKERS, N_SUBMITS), daemon=True + ) + libe_thread.start() + print(f"libE thread started ({NWORKERS} workers)") + + # Submit work (1, 2, 3....), then signal shutdown. + for i in range(N_SUBMITS): + input_q.put({"x": float(i)}) + input_q.put(QueueGenerator.shutdown_sentinel()) + print(f"submitted {N_SUBMITS} items + shutdown sentinel") + + # Drain until we have all results (or timeout) + results = [] + deadline = time.time() + 60 + while len(results) < N_SUBMITS and time.time() < deadline: + try: + results.append(output_q.get(timeout=1)) + print(f" got: {results[-1]}") + except Empty: + pass + + print(f"\ncollected {len(results)}/{N_SUBMITS}") + + # Verify y == 2*x + ok = True + for r in results: + x = r.get("x") + y = r.get("y") + if x is None or y is None or abs(y - 2 * x) > 1e-9: + print(f" MISMATCH: {r}") + ok = False + print("PASS" if ok else "FAIL") + + # Wait for libE to wind down + libe_thread.join(timeout=10) + print("done") + + +if __name__ == "__main__": + main() From 4f013c3e761a04d21981f8a55fce76d435f7562d Mon Sep 17 00:00:00 2001 From: shudson Date: Mon, 25 May 2026 14:00:55 -0500 Subject: [PATCH 2/7] Add queue service functions and blocking v streaming tests --- libensemble/gen_classes/queue_gen.py | 110 ++++++++++++++++++ .../tests/regression_tests/test_queue_gen.py | 72 +++--------- .../test_queue_gen_streaming.py | 73 ++++++++++++ 3 files changed, 202 insertions(+), 53 deletions(-) create mode 100644 libensemble/tests/regression_tests/test_queue_gen_streaming.py diff --git a/libensemble/gen_classes/queue_gen.py b/libensemble/gen_classes/queue_gen.py index 27ef1b2bf..7ce24b1e7 100644 --- a/libensemble/gen_classes/queue_gen.py +++ b/libensemble/gen_classes/queue_gen.py @@ -16,6 +16,8 @@ input_queue=in_q, output_queue=out_q) in_q.put({"x": 0.5}) # external producer feeds work """ +import threading +import time from queue import Empty, Queue from typing import Any, List, Optional @@ -95,3 +97,111 @@ def finalize(self, results: List[dict] = None, *args: Any, **kwargs: Any): def shutdown_sentinel(): """External producer puts this on the input queue to signal end.""" return _SHUTDOWN + + +class QueueService: + """Service-mode wrapper: spawns libE in a thread with a QueueGenerator and + exposes submit/get_completed/shutdown to an external producer. + + Hides the queue/thread/generator plumbing every producer would otherwise + repeat. The producer just creates a service and feeds it work: + + service = QueueService(vocs, sim_specs, libE_specs, exit_criteria) + service.start() + service.submit({"x": 1.0}) + for r in service.get_completed(): + ... + service.shutdown() + """ + + def __init__( + self, + vocs: VOCS, + sim_specs, + libE_specs, + exit_criteria, + *, + persis_in: Optional[List[str]] = None, + batch_size: int = 1, + poll_timeout: float = 1.0, + ): + from libensemble import Ensemble + from libensemble.specs import GenSpecs + + self.input_queue: Queue = Queue() + self.output_queue: Queue = Queue() + self._gen = QueueGenerator( + vocs, + input_queue=self.input_queue, + output_queue=self.output_queue, + poll_timeout=poll_timeout, + ) + gen_specs = GenSpecs( + generator=self._gen, + vocs=vocs, + persis_in=persis_in or [], + batch_size=batch_size, + ) + self._ensemble = Ensemble(sim_specs, gen_specs, exit_criteria, libE_specs) + self._thread: Optional[threading.Thread] = None + + def start(self) -> None: + """Spawn the libE thread.""" + self._thread = threading.Thread(target=self._ensemble.run, daemon=True) + self._thread.start() + + def submit(self, item: dict) -> None: + """Submit one work item.""" + self.input_queue.put(item) + + def get_completed(self) -> List[dict]: + """Drain all completed results (non-blocking).""" + out = [] + while True: + try: + out.append(self.output_queue.get_nowait()) + except Empty: + break + return out + + def collect_results(self, n: int, timeout: float = 60.0) -> List[dict]: + """Block-drain until ``n`` results collected or ``timeout`` elapses. + Returns whatever was collected (may be < n on timeout).""" + results: List[dict] = [] + deadline = time.time() + timeout + while len(results) < n and time.time() < deadline: + try: + results.append(self.output_queue.get(timeout=1)) + except Empty: + pass + return results + + def stream_results(self, n: Optional[int] = None, timeout: float = 60.0): + """Yield results as they arrive. Stops after ``n`` yielded or + ``timeout`` seconds elapse with no new result. ``n=None`` streams + indefinitely until timeout.""" + deadline = time.time() + timeout + yielded = 0 + while (n is None or yielded < n) and time.time() < deadline: + try: + r = self.output_queue.get(timeout=1) + except Empty: + continue + deadline = time.time() + timeout # reset on activity + yielded += 1 + yield r + + def shutdown(self, wait: bool = False) -> None: + """Signal libE to stop accepting new work and drain in-flight. + If ``wait=True``, block until the libE thread exits.""" + self.input_queue.put(_SHUTDOWN) + if wait: + self.join() + + def join(self, timeout: Optional[float] = None) -> None: + """Wait for libE thread to exit.""" + if self._thread is not None: + self._thread.join(timeout=timeout) + + def is_alive(self) -> bool: + return self._thread is not None and self._thread.is_alive() diff --git a/libensemble/tests/regression_tests/test_queue_gen.py b/libensemble/tests/regression_tests/test_queue_gen.py index bac58e6ad..56066f9df 100644 --- a/libensemble/tests/regression_tests/test_queue_gen.py +++ b/libensemble/tests/regression_tests/test_queue_gen.py @@ -1,7 +1,8 @@ -"""Test for QueueGenerator +"""Test for QueueGenerator / QueueService + +Uses QueueService to spin up libE in a thread with a trivial doubler sim, +submits N work items, drains results, shuts down, joins. -Spins up libE in a thread with the QueueGenerator + a trivial doubler sim, -submits N work items, drains results, sends shutdown sentinel, joins. Run with: python test_queue_gen.py @@ -11,16 +12,14 @@ # TESTSUITE_COMMS: local # TESTSUITE_NPROCS: 4 -import threading +import math import time -from queue import Empty, Queue import numpy as np from gest_api.vocs import VOCS -from libensemble import Ensemble -from libensemble.gen_classes.queue_gen import QueueGenerator -from libensemble.specs import ExitCriteria, GenSpecs, LibeSpecs, SimSpecs +from libensemble.gen_classes.queue_gen import QueueService +from libensemble.specs import ExitCriteria, LibeSpecs, SimSpecs NWORKERS = 4 N_SUBMITS = 10 @@ -34,66 +33,33 @@ def doubler_sim(InputArray, _, sim_specs): return out -def run_libe(input_q, output_q, nworkers, sim_max): +def main(): vocs = VOCS(variables={"x": [-100.0, 100.0]}, objectives={"y": "MINIMIZE"}) - gen = QueueGenerator(vocs, input_queue=input_q, output_queue=output_q) - - # service_mode=True lets the manager tolerate idle workers + empty alloc - # while we wait for new submissions on the queue. - libE_specs = LibeSpecs( - nworkers=nworkers, - service_mode=True, - final_gen_send=True, # ensure last batch of completed sims is ingested before stop - ) sim_specs = SimSpecs(sim_f=doubler_sim, inputs=["x"], outputs=[("y", float)]) - gen_specs = GenSpecs(generator=gen, vocs=vocs, persis_in=["x", "y"], batch_size=1) - # QueueGenerator.shutdown_sentinel() which makes suggest return [] forever. - exit_criteria = ExitCriteria(sim_max=sim_max) - - workflow = Ensemble(sim_specs, gen_specs, exit_criteria, libE_specs) - workflow.run() - + libE_specs = LibeSpecs(nworkers=NWORKERS, service_mode=True, final_gen_send=True) + exit_criteria = ExitCriteria(sim_max=N_SUBMITS) -def main(): - input_q: Queue = Queue() - output_q: Queue = Queue() - - libe_thread = threading.Thread( - target=run_libe, args=(input_q, output_q, NWORKERS, N_SUBMITS), daemon=True - ) - libe_thread.start() + service = QueueService(vocs, sim_specs, libE_specs, exit_criteria, + persis_in=["x", "y"], batch_size=1) + service.start() print(f"libE thread started ({NWORKERS} workers)") # Submit work (1, 2, 3....), then signal shutdown. for i in range(N_SUBMITS): - input_q.put({"x": float(i)}) - input_q.put(QueueGenerator.shutdown_sentinel()) + service.submit({"x": float(i)}) + service.shutdown() print(f"submitted {N_SUBMITS} items + shutdown sentinel") - # Drain until we have all results (or timeout) - results = [] - deadline = time.time() + 60 - while len(results) < N_SUBMITS and time.time() < deadline: - try: - results.append(output_q.get(timeout=1)) - print(f" got: {results[-1]}") - except Empty: - pass - + # Block-drain until all results collected (or timeout) + results = service.collect_results(N_SUBMITS, timeout=60) print(f"\ncollected {len(results)}/{N_SUBMITS}") # Verify y == 2*x - ok = True - for r in results: - x = r.get("x") - y = r.get("y") - if x is None or y is None or abs(y - 2 * x) > 1e-9: - print(f" MISMATCH: {r}") - ok = False + ok = all(math.isclose(r["y"], 2 * r["x"]) for r in results) print("PASS" if ok else "FAIL") # Wait for libE to wind down - libe_thread.join(timeout=10) + service.join(timeout=10) print("done") diff --git a/libensemble/tests/regression_tests/test_queue_gen_streaming.py b/libensemble/tests/regression_tests/test_queue_gen_streaming.py new file mode 100644 index 000000000..29c952eef --- /dev/null +++ b/libensemble/tests/regression_tests/test_queue_gen_streaming.py @@ -0,0 +1,73 @@ +"""Test for QueueGenerator / QueueService — streaming with replacement. + +Submit NWORKERS items so each worker has work, then as each result comes +back submit a replacement until TOTAL items have been submitted. Drain +the remaining results and shutdown. + +Run with: + python test_queue_gen_streaming.py +""" + +# Do not change these lines - they are parsed by run-tests.sh +# TESTSUITE_COMMS: local +# TESTSUITE_NPROCS: 4 + +import math +import time + +import numpy as np +from gest_api.vocs import VOCS + +from libensemble.gen_classes.queue_gen import QueueService +from libensemble.specs import ExitCriteria, LibeSpecs, SimSpecs + +NWORKERS = 4 +TOTAL = 16 # total items to submit (>> NWORKERS so the rolling window matters) + + +def doubler_sim(InputArray, _, sim_specs): + """Trivial sim: returns 2*x. Sleeps a bit to mimic real work.""" + time.sleep(0.5) + out = np.zeros(1, dtype=sim_specs["out"]) + out["y"] = 2.0 * InputArray["x"][0] + return out + + +def main(): + vocs = VOCS(variables={"x": [-100.0, 100.0]}, objectives={"y": "MINIMIZE"}) + sim_specs = SimSpecs(sim_f=doubler_sim, inputs=["x"], outputs=[("y", float)]) + libE_specs = LibeSpecs(nworkers=NWORKERS, service_mode=True, final_gen_send=True) + exit_criteria = ExitCriteria(sim_max=TOTAL) + + service = QueueService(vocs, sim_specs, libE_specs, exit_criteria, + persis_in=["x", "y"], batch_size=1) + service.start() + print(f"libE thread started ({NWORKERS} workers)") + + # Submit one item per worker + for i in range(NWORKERS): + service.submit({"x": float(i)}) + submitted = NWORKERS + print(f"submitted initial {NWORKERS} items") + + # As each result arrives, submit a replacement until TOTAL submitted + results = [] + for r in service.stream_results(TOTAL, timeout=60): + results.append(r) + print(f" got ({len(results)}/{TOTAL}): {r}") + if submitted < TOTAL: + service.submit({"x": float(submitted)}) + submitted += 1 + + service.shutdown() + print(f"\nsubmitted {submitted}, collected {len(results)}/{TOTAL}") + + ok = all(math.isclose(r["y"], 2 * r["x"]) for r in results) + print("PASS" if ok else "FAIL") + + service.join(timeout=10) + print("done") + + +if __name__ == "__main__": + main() From f329d7428a37d92be31cfac045eebfb8a3c5ab05 Mon Sep 17 00:00:00 2001 From: shudson Date: Mon, 25 May 2026 14:08:23 -0500 Subject: [PATCH 3/7] Remove redundant persis_in --- libensemble/tests/regression_tests/test_queue_gen.py | 3 +-- libensemble/tests/regression_tests/test_queue_gen_streaming.py | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/libensemble/tests/regression_tests/test_queue_gen.py b/libensemble/tests/regression_tests/test_queue_gen.py index 56066f9df..fdc26949d 100644 --- a/libensemble/tests/regression_tests/test_queue_gen.py +++ b/libensemble/tests/regression_tests/test_queue_gen.py @@ -39,8 +39,7 @@ def main(): libE_specs = LibeSpecs(nworkers=NWORKERS, service_mode=True, final_gen_send=True) exit_criteria = ExitCriteria(sim_max=N_SUBMITS) - service = QueueService(vocs, sim_specs, libE_specs, exit_criteria, - persis_in=["x", "y"], batch_size=1) + service = QueueService(vocs, sim_specs, libE_specs, exit_criteria, batch_size=1) service.start() print(f"libE thread started ({NWORKERS} workers)") diff --git a/libensemble/tests/regression_tests/test_queue_gen_streaming.py b/libensemble/tests/regression_tests/test_queue_gen_streaming.py index 29c952eef..79a6b0339 100644 --- a/libensemble/tests/regression_tests/test_queue_gen_streaming.py +++ b/libensemble/tests/regression_tests/test_queue_gen_streaming.py @@ -39,8 +39,7 @@ def main(): libE_specs = LibeSpecs(nworkers=NWORKERS, service_mode=True, final_gen_send=True) exit_criteria = ExitCriteria(sim_max=TOTAL) - service = QueueService(vocs, sim_specs, libE_specs, exit_criteria, - persis_in=["x", "y"], batch_size=1) + service = QueueService(vocs, sim_specs, libE_specs, exit_criteria, batch_size=1) service.start() print(f"libE thread started ({NWORKERS} workers)") From c7f2a4da988060774dabb5bea3e51d29e4847f83 Mon Sep 17 00:00:00 2001 From: shudson Date: Mon, 25 May 2026 14:37:20 -0500 Subject: [PATCH 4/7] Change default batch handling for queues --- libensemble/gen_classes/queue_gen.py | 2 +- libensemble/tests/regression_tests/test_queue_gen.py | 2 +- libensemble/tests/regression_tests/test_queue_gen_streaming.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/libensemble/gen_classes/queue_gen.py b/libensemble/gen_classes/queue_gen.py index 7ce24b1e7..e3784f152 100644 --- a/libensemble/gen_classes/queue_gen.py +++ b/libensemble/gen_classes/queue_gen.py @@ -122,7 +122,7 @@ def __init__( exit_criteria, *, persis_in: Optional[List[str]] = None, - batch_size: int = 1, + batch_size: int = 0, poll_timeout: float = 1.0, ): from libensemble import Ensemble diff --git a/libensemble/tests/regression_tests/test_queue_gen.py b/libensemble/tests/regression_tests/test_queue_gen.py index fdc26949d..f022bf49f 100644 --- a/libensemble/tests/regression_tests/test_queue_gen.py +++ b/libensemble/tests/regression_tests/test_queue_gen.py @@ -39,7 +39,7 @@ def main(): libE_specs = LibeSpecs(nworkers=NWORKERS, service_mode=True, final_gen_send=True) exit_criteria = ExitCriteria(sim_max=N_SUBMITS) - service = QueueService(vocs, sim_specs, libE_specs, exit_criteria, batch_size=1) + service = QueueService(vocs, sim_specs, libE_specs, exit_criteria) service.start() print(f"libE thread started ({NWORKERS} workers)") diff --git a/libensemble/tests/regression_tests/test_queue_gen_streaming.py b/libensemble/tests/regression_tests/test_queue_gen_streaming.py index 79a6b0339..ca42c0245 100644 --- a/libensemble/tests/regression_tests/test_queue_gen_streaming.py +++ b/libensemble/tests/regression_tests/test_queue_gen_streaming.py @@ -39,7 +39,7 @@ def main(): libE_specs = LibeSpecs(nworkers=NWORKERS, service_mode=True, final_gen_send=True) exit_criteria = ExitCriteria(sim_max=TOTAL) - service = QueueService(vocs, sim_specs, libE_specs, exit_criteria, batch_size=1) + service = QueueService(vocs, sim_specs, libE_specs, exit_criteria) service.start() print(f"libE thread started ({NWORKERS} workers)") From c8a6e999d35608f04f690ae47d8b1eaa490ec60e Mon Sep 17 00:00:00 2001 From: shudson Date: Mon, 25 May 2026 14:50:27 -0500 Subject: [PATCH 5/7] Add docs --- .../libE_specs/libE_specs_general.rst | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/docs/data_structures/libE_specs/libE_specs_general.rst b/docs/data_structures/libE_specs/libE_specs_general.rst index f7f07f75f..5cac60665 100644 --- a/docs/data_structures/libE_specs/libE_specs_general.rst +++ b/docs/data_structures/libE_specs/libE_specs_general.rst @@ -38,3 +38,17 @@ General **gen_workers** [list of ints]: List of workers that should run only generators. All other workers will run only simulator functions. + +**service_mode** [bool] = ``False``: + If ``True``, the manager tolerates idle workers + an alloc returning no work + (instead of asserting). Used when libEnsemble is driven by an external + producer (e.g. a queue-backed Generator fed by an MCP server) and may + legitimately have nothing to dispatch for periods of time. The manager + sleeps briefly between checks instead of panicking. Termination is the + caller's responsibility (e.g. via ``exit_criteria`` or external stop). + +**service_mode_idle_timeout** [float]: + In ``service_mode``, exit after this many seconds with no active workers + and no work dispatched. Default ``None`` means run forever waiting for + new submissions. Useful so an external producer doesn't leave libE running + after it goes silent. From 5d5a27d1f008dcb0d465f94834315fe148a11501 Mon Sep 17 00:00:00 2001 From: shudson Date: Mon, 25 May 2026 15:17:07 -0500 Subject: [PATCH 6/7] Fix shutdown bug with final gen send and idle timeout --- .../test_queue_gen_idle_timeout.py | 60 +++++++++++++++++++ .../test_queue_gen_streaming.py | 3 +- libensemble/tools/persistent_support.py | 5 ++ 3 files changed, 67 insertions(+), 1 deletion(-) create mode 100644 libensemble/tests/regression_tests/test_queue_gen_idle_timeout.py diff --git a/libensemble/tests/regression_tests/test_queue_gen_idle_timeout.py b/libensemble/tests/regression_tests/test_queue_gen_idle_timeout.py new file mode 100644 index 000000000..43bd221dc --- /dev/null +++ b/libensemble/tests/regression_tests/test_queue_gen_idle_timeout.py @@ -0,0 +1,60 @@ +"""Test for service_mode_idle_timeout. + +Producer stays silent after a single submit. With service_mode_idle_timeout +set, libE should exit by itself after the timeout instead of hanging. + +Run with: + python test_queue_gen_idle_timeout.py +""" + +# Do not change these lines - they are parsed by run-tests.sh +# TESTSUITE_COMMS: local +# TESTSUITE_NPROCS: 2 + +import time + +import numpy as np +from gest_api.vocs import VOCS + +from libensemble.gen_classes.queue_gen import QueueService +from libensemble.specs import ExitCriteria, LibeSpecs, SimSpecs + +NWORKERS = 2 +IDLE_TIMEOUT = 3.0 # seconds + + +def doubler_sim(InputArray, _, sim_specs): + out = np.zeros(1, dtype=sim_specs["out"]) + out["y"] = 2.0 * InputArray["x"][0] + return out + + +def main(): + vocs = VOCS(variables={"x": [-100.0, 100.0]}, objectives={"y": "MINIMIZE"}) + sim_specs = SimSpecs(sim_f=doubler_sim, inputs=["x"], outputs=[("y", float)]) + libE_specs = LibeSpecs( + nworkers=NWORKERS, + service_mode=True, + service_mode_idle_timeout=IDLE_TIMEOUT, + final_gen_send=True, + ) + exit_criteria = ExitCriteria(sim_max=1000) # large so idle_timeout fires first + + service = QueueService(vocs, sim_specs, libE_specs, exit_criteria) + service.start() + print(f"libE thread started ({NWORKERS} workers), idle_timeout={IDLE_TIMEOUT}s") + + service.submit({"x": 1.0}) + print("submitted 1 item, now going silent") + + t0 = time.time() + service.join(timeout=IDLE_TIMEOUT + 10) + elapsed = time.time() - t0 + print(f"libE thread exited after {elapsed:.2f}s (expected ~{IDLE_TIMEOUT}s)") + + assert not service.is_alive(), "libE didn't exit after idle_timeout" + print("PASS") + + +if __name__ == "__main__": + main() diff --git a/libensemble/tests/regression_tests/test_queue_gen_streaming.py b/libensemble/tests/regression_tests/test_queue_gen_streaming.py index ca42c0245..6a00b2b6c 100644 --- a/libensemble/tests/regression_tests/test_queue_gen_streaming.py +++ b/libensemble/tests/regression_tests/test_queue_gen_streaming.py @@ -36,7 +36,8 @@ def doubler_sim(InputArray, _, sim_specs): def main(): vocs = VOCS(variables={"x": [-100.0, 100.0]}, objectives={"y": "MINIMIZE"}) sim_specs = SimSpecs(sim_f=doubler_sim, inputs=["x"], outputs=[("y", float)]) - libE_specs = LibeSpecs(nworkers=NWORKERS, service_mode=True, final_gen_send=True) + libE_specs = LibeSpecs(nworkers=NWORKERS, service_mode=True, + service_mode_idle_timeout=30, final_gen_send=True) exit_criteria = ExitCriteria(sim_max=TOTAL) service = QueueService(vocs, sim_specs, libE_specs, exit_criteria) diff --git a/libensemble/tools/persistent_support.py b/libensemble/tools/persistent_support.py index 7e9643e02..bbe7d4603 100644 --- a/libensemble/tools/persistent_support.py +++ b/libensemble/tools/persistent_support.py @@ -79,6 +79,11 @@ def recv(self, blocking: bool = True) -> (int, dict, npt.NDArray): if not isinstance(Work, dict): self.comm.push_to_buffer(tag, Work) return tag, Work, None + # PERSIS_STOP with a Work dict (final_gen_send path): if there are + # no rows pending, manager skipped the second send — return now + # rather than blocking on a recv that will never complete. + if not len(Work.get("libE_info", {}).get("H_rows", [])): + return tag, Work, None else: logger.debug(f"Persistent {self.calc_str} received work request from manager") From 41bac1f2a6f2733e65fc4830ec282252b199cf3c Mon Sep 17 00:00:00 2001 From: shudson Date: Mon, 25 May 2026 15:54:32 -0500 Subject: [PATCH 7/7] Force final_gen_send for service queue generator --- libensemble/gen_classes/queue_gen.py | 5 +++++ libensemble/tests/regression_tests/test_queue_gen.py | 2 +- .../tests/regression_tests/test_queue_gen_idle_timeout.py | 1 - .../tests/regression_tests/test_queue_gen_streaming.py | 2 +- 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/libensemble/gen_classes/queue_gen.py b/libensemble/gen_classes/queue_gen.py index e3784f152..0686ecddb 100644 --- a/libensemble/gen_classes/queue_gen.py +++ b/libensemble/gen_classes/queue_gen.py @@ -128,6 +128,11 @@ def __init__( from libensemble import Ensemble from libensemble.specs import GenSpecs + # final_gen_send must be True for QueueGenerator — otherwise the last + # batch of completed sims is never ingested and never reaches the + # output_queue, so consumers silently miss results. + libE_specs.final_gen_send = True + self.input_queue: Queue = Queue() self.output_queue: Queue = Queue() self._gen = QueueGenerator( diff --git a/libensemble/tests/regression_tests/test_queue_gen.py b/libensemble/tests/regression_tests/test_queue_gen.py index f022bf49f..8df0dfe35 100644 --- a/libensemble/tests/regression_tests/test_queue_gen.py +++ b/libensemble/tests/regression_tests/test_queue_gen.py @@ -36,7 +36,7 @@ def doubler_sim(InputArray, _, sim_specs): def main(): vocs = VOCS(variables={"x": [-100.0, 100.0]}, objectives={"y": "MINIMIZE"}) sim_specs = SimSpecs(sim_f=doubler_sim, inputs=["x"], outputs=[("y", float)]) - libE_specs = LibeSpecs(nworkers=NWORKERS, service_mode=True, final_gen_send=True) + libE_specs = LibeSpecs(nworkers=NWORKERS, service_mode=True) exit_criteria = ExitCriteria(sim_max=N_SUBMITS) service = QueueService(vocs, sim_specs, libE_specs, exit_criteria) diff --git a/libensemble/tests/regression_tests/test_queue_gen_idle_timeout.py b/libensemble/tests/regression_tests/test_queue_gen_idle_timeout.py index 43bd221dc..583609acc 100644 --- a/libensemble/tests/regression_tests/test_queue_gen_idle_timeout.py +++ b/libensemble/tests/regression_tests/test_queue_gen_idle_timeout.py @@ -36,7 +36,6 @@ def main(): nworkers=NWORKERS, service_mode=True, service_mode_idle_timeout=IDLE_TIMEOUT, - final_gen_send=True, ) exit_criteria = ExitCriteria(sim_max=1000) # large so idle_timeout fires first diff --git a/libensemble/tests/regression_tests/test_queue_gen_streaming.py b/libensemble/tests/regression_tests/test_queue_gen_streaming.py index 6a00b2b6c..b7fd8e6cb 100644 --- a/libensemble/tests/regression_tests/test_queue_gen_streaming.py +++ b/libensemble/tests/regression_tests/test_queue_gen_streaming.py @@ -37,7 +37,7 @@ def main(): vocs = VOCS(variables={"x": [-100.0, 100.0]}, objectives={"y": "MINIMIZE"}) sim_specs = SimSpecs(sim_f=doubler_sim, inputs=["x"], outputs=[("y", float)]) libE_specs = LibeSpecs(nworkers=NWORKERS, service_mode=True, - service_mode_idle_timeout=30, final_gen_send=True) + service_mode_idle_timeout=30) exit_criteria = ExitCriteria(sim_max=TOTAL) service = QueueService(vocs, sim_specs, libE_specs, exit_criteria)