diff --git a/docs/data_structures/libE_specs/libE_specs_general.rst b/docs/data_structures/libE_specs/libE_specs_general.rst index f7f07f75f..5cac60665 100644 --- a/docs/data_structures/libE_specs/libE_specs_general.rst +++ b/docs/data_structures/libE_specs/libE_specs_general.rst @@ -38,3 +38,17 @@ General **gen_workers** [list of ints]: List of workers that should run only generators. All other workers will run only simulator functions. + +**service_mode** [bool] = ``False``: + If ``True``, the manager tolerates idle workers + an alloc returning no work + (instead of asserting). Used when libEnsemble is driven by an external + producer (e.g. a queue-backed Generator fed by an MCP server) and may + legitimately have nothing to dispatch for periods of time. The manager + sleeps briefly between checks instead of panicking. Termination is the + caller's responsibility (e.g. via ``exit_criteria`` or external stop). + +**service_mode_idle_timeout** [float]: + In ``service_mode``, exit after this many seconds with no active workers + and no work dispatched. Default ``None`` means run forever waiting for + new submissions. Useful so an external producer doesn't leave libE running + after it goes silent. diff --git a/libensemble/gen_classes/queue_gen.py b/libensemble/gen_classes/queue_gen.py new file mode 100644 index 000000000..0686ecddb --- /dev/null +++ b/libensemble/gen_classes/queue_gen.py @@ -0,0 +1,212 @@ +"""Queue-backed generator: work comes from an external producer (e.g. an MCP +tool, an LLM agent, a REST API) via an input queue. Completed results go back +out via an output queue. + +This lets libE run in 'service mode' — driven by an external loop instead of +its own gen function. + +Usage: + from queue import Queue + from gest_api.vocs import VOCS + from libensemble.gen_classes.queue_gen import QueueGenerator + + in_q = Queue() + out_q = Queue() + gen = QueueGenerator(VOCS(variables={"x": [-1, 1]}), + input_queue=in_q, output_queue=out_q) + in_q.put({"x": 0.5}) # external producer feeds work +""" +import threading +import time +from queue import Empty, Queue +from typing import Any, List, Optional + +from gest_api import Generator +from gest_api.vocs import VOCS + + +_SHUTDOWN = object() # sentinel: external producer signals "no more work" + + +class QueueGenerator(Generator): + """Generator that pulls work from an external Queue and pushes results back. + + suggest(n): + - Blocks up to ``poll_timeout`` waiting for the FIRST item (so the libE + manager doesn't spin hot when nothing is queued). + - Then drains up to ``n - 1`` more items non-blockingly. + - Returns [] on timeout (libE will call again). + - Returns [] if a shutdown sentinel is seen. + + ingest(results): + - Forwards each result dict to the output queue verbatim. + """ + + def __init__( + self, + vocs: VOCS, + *, + input_queue: Queue, + output_queue: Queue, + poll_timeout: float = 1.0, + ): + self.vocs = vocs + self.input_queue = input_queue + self.output_queue = output_queue + self.poll_timeout = poll_timeout + self._shutdown_seen = False + super().__init__(vocs) + + def _validate_vocs(self, vocs: VOCS) -> None: + assert len(self.vocs.variable_names), "VOCS must contain variables." + + def suggest(self, num_points: Optional[int]) -> List[dict]: + if self._shutdown_seen: + return [] + n = num_points or 1 + items: List[dict] = [] + try: + first = self.input_queue.get(timeout=self.poll_timeout) + except Empty: + return [] + if first is _SHUTDOWN: + self._shutdown_seen = True + return [] + items.append(first) + for _ in range(n - 1): + try: + nxt = self.input_queue.get_nowait() + except Empty: + break + if nxt is _SHUTDOWN: + self._shutdown_seen = True + break + items.append(nxt) + return items + + def ingest(self, results: List[dict]) -> None: + for r in results: + self.output_queue.put(r) + + def finalize(self, results: List[dict] = None, *args: Any, **kwargs: Any): + if results: + self.ingest(results) + return None + + @staticmethod + def shutdown_sentinel(): + """External producer puts this on the input queue to signal end.""" + return _SHUTDOWN + + +class QueueService: + """Service-mode wrapper: spawns libE in a thread with a QueueGenerator and + exposes submit/get_completed/shutdown to an external producer. + + Hides the queue/thread/generator plumbing every producer would otherwise + repeat. The producer just creates a service and feeds it work: + + service = QueueService(vocs, sim_specs, libE_specs, exit_criteria) + service.start() + service.submit({"x": 1.0}) + for r in service.get_completed(): + ... + service.shutdown() + """ + + def __init__( + self, + vocs: VOCS, + sim_specs, + libE_specs, + exit_criteria, + *, + persis_in: Optional[List[str]] = None, + batch_size: int = 0, + poll_timeout: float = 1.0, + ): + from libensemble import Ensemble + from libensemble.specs import GenSpecs + + # final_gen_send must be True for QueueGenerator — otherwise the last + # batch of completed sims is never ingested and never reaches the + # output_queue, so consumers silently miss results. + libE_specs.final_gen_send = True + + self.input_queue: Queue = Queue() + self.output_queue: Queue = Queue() + self._gen = QueueGenerator( + vocs, + input_queue=self.input_queue, + output_queue=self.output_queue, + poll_timeout=poll_timeout, + ) + gen_specs = GenSpecs( + generator=self._gen, + vocs=vocs, + persis_in=persis_in or [], + batch_size=batch_size, + ) + self._ensemble = Ensemble(sim_specs, gen_specs, exit_criteria, libE_specs) + self._thread: Optional[threading.Thread] = None + + def start(self) -> None: + """Spawn the libE thread.""" + self._thread = threading.Thread(target=self._ensemble.run, daemon=True) + self._thread.start() + + def submit(self, item: dict) -> None: + """Submit one work item.""" + self.input_queue.put(item) + + def get_completed(self) -> List[dict]: + """Drain all completed results (non-blocking).""" + out = [] + while True: + try: + out.append(self.output_queue.get_nowait()) + except Empty: + break + return out + + def collect_results(self, n: int, timeout: float = 60.0) -> List[dict]: + """Block-drain until ``n`` results collected or ``timeout`` elapses. + Returns whatever was collected (may be < n on timeout).""" + results: List[dict] = [] + deadline = time.time() + timeout + while len(results) < n and time.time() < deadline: + try: + results.append(self.output_queue.get(timeout=1)) + except Empty: + pass + return results + + def stream_results(self, n: Optional[int] = None, timeout: float = 60.0): + """Yield results as they arrive. Stops after ``n`` yielded or + ``timeout`` seconds elapse with no new result. ``n=None`` streams + indefinitely until timeout.""" + deadline = time.time() + timeout + yielded = 0 + while (n is None or yielded < n) and time.time() < deadline: + try: + r = self.output_queue.get(timeout=1) + except Empty: + continue + deadline = time.time() + timeout # reset on activity + yielded += 1 + yield r + + def shutdown(self, wait: bool = False) -> None: + """Signal libE to stop accepting new work and drain in-flight. + If ``wait=True``, block until the libE thread exits.""" + self.input_queue.put(_SHUTDOWN) + if wait: + self.join() + + def join(self, timeout: Optional[float] = None) -> None: + """Wait for libE thread to exit.""" + if self._thread is not None: + self._thread.join(timeout=timeout) + + def is_alive(self) -> bool: + return self._thread is not None and self._thread.is_alive() diff --git a/libensemble/manager.py b/libensemble/manager.py index 7995d2da9..1e706e5a5 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -213,6 +213,7 @@ def __init__( self.gen_specs = gen_specs self.exit_criteria = exit_criteria self.elapsed = lambda: timer.elapsed + self._service_idle_start: float | None = None # service_mode idle tracker self.wcomms = wcomms self.WorkerExc = False self.persis_pending: list[int] = [] @@ -714,9 +715,27 @@ def run(self, persis_info: dict) -> tuple[dict, int, int]: self._check_work_order(Work[w], w) self._send_work_order(Work[w], w) self._update_state_on_alloc(Work[w], w) - assert self.term_test() or any( - self.W["active"] != 0 - ), "alloc_f did not return any work, although all workers are idle." + if not (self.term_test() or any(self.W["active"] != 0)): + if self.libE_specs.get("service_mode"): + # Service mode: external producer may have nothing + # to dispatch right now. Throttle and re-poll instead + # of asserting (which would treat this as deadlock). + idle_timeout = self.libE_specs.get("service_mode_idle_timeout") + if idle_timeout is not None: + if self._service_idle_start is None: + self._service_idle_start = time.time() + elif time.time() - self._service_idle_start > idle_timeout: + logger.info( + f"service_mode idle for {idle_timeout}s with no work, exiting" + ) + break + time.sleep(0.1) + continue + raise AssertionError( + "alloc_f did not return any work, although all workers are idle." + ) + # work was dispatched OR workers active OR terminating — reset idle clock + self._service_idle_start = None except WorkerException as e: report_worker_exc(e) raise LoggedException(e.args[0], e.args[1]) from None diff --git a/libensemble/specs.py b/libensemble/specs.py index 9ee04baa3..c58709848 100644 --- a/libensemble/specs.py +++ b/libensemble/specs.py @@ -439,6 +439,24 @@ class LibeSpecs(BaseModel): By default, the generator runs on the manager process as a thread (Worker 0). """ + service_mode: bool = False + """ + If True, the manager tolerates idle workers + an alloc returning no work + (instead of asserting). Used when libEnsemble is driven by an external + producer (e.g. a queue-backed Generator fed by an MCP server) and may + legitimately have nothing to dispatch for periods of time. The manager + sleeps briefly between checks instead of panicking. Termination is the + caller's responsibility (e.g. via ``exit_criteria`` or external stop). + """ + + service_mode_idle_timeout: float | None = None + """ + In ``service_mode``, exit after this many seconds with no active workers + and no work dispatched. ``None`` (default) means run forever waiting for + new submissions. Useful so an MCP server doesn't leave libE running after + the agent goes silent. + """ + mpi_comm: object | None = None """ libEnsemble MPI communicator. Default: ``MPI.COMM_WORLD``""" diff --git a/libensemble/tests/regression_tests/test_queue_gen.py b/libensemble/tests/regression_tests/test_queue_gen.py new file mode 100644 index 000000000..8df0dfe35 --- /dev/null +++ b/libensemble/tests/regression_tests/test_queue_gen.py @@ -0,0 +1,66 @@ +"""Test for QueueGenerator / QueueService + +Uses QueueService to spin up libE in a thread with a trivial doubler sim, +submits N work items, drains results, shuts down, joins. + + +Run with: + python test_queue_gen.py +""" + +# Do not change these lines - they are parsed by run-tests.sh +# TESTSUITE_COMMS: local +# TESTSUITE_NPROCS: 4 + +import math +import time + +import numpy as np +from gest_api.vocs import VOCS + +from libensemble.gen_classes.queue_gen import QueueService +from libensemble.specs import ExitCriteria, LibeSpecs, SimSpecs + +NWORKERS = 4 +N_SUBMITS = 10 + + +def doubler_sim(InputArray, _, sim_specs): + """Trivial sim: returns 2*x. Sleeps a bit to mimic real work.""" + time.sleep(0.5) + out = np.zeros(1, dtype=sim_specs["out"]) + out["y"] = 2.0 * InputArray["x"][0] + return out + + +def main(): + vocs = VOCS(variables={"x": [-100.0, 100.0]}, objectives={"y": "MINIMIZE"}) + sim_specs = SimSpecs(sim_f=doubler_sim, inputs=["x"], outputs=[("y", float)]) + libE_specs = LibeSpecs(nworkers=NWORKERS, service_mode=True) + exit_criteria = ExitCriteria(sim_max=N_SUBMITS) + + service = QueueService(vocs, sim_specs, libE_specs, exit_criteria) + service.start() + print(f"libE thread started ({NWORKERS} workers)") + + # Submit work (1, 2, 3....), then signal shutdown. + for i in range(N_SUBMITS): + service.submit({"x": float(i)}) + service.shutdown() + print(f"submitted {N_SUBMITS} items + shutdown sentinel") + + # Block-drain until all results collected (or timeout) + results = service.collect_results(N_SUBMITS, timeout=60) + print(f"\ncollected {len(results)}/{N_SUBMITS}") + + # Verify y == 2*x + ok = all(math.isclose(r["y"], 2 * r["x"]) for r in results) + print("PASS" if ok else "FAIL") + + # Wait for libE to wind down + service.join(timeout=10) + print("done") + + +if __name__ == "__main__": + main() diff --git a/libensemble/tests/regression_tests/test_queue_gen_idle_timeout.py b/libensemble/tests/regression_tests/test_queue_gen_idle_timeout.py new file mode 100644 index 000000000..583609acc --- /dev/null +++ b/libensemble/tests/regression_tests/test_queue_gen_idle_timeout.py @@ -0,0 +1,59 @@ +"""Test for service_mode_idle_timeout. + +Producer stays silent after a single submit. With service_mode_idle_timeout +set, libE should exit by itself after the timeout instead of hanging. + +Run with: + python test_queue_gen_idle_timeout.py +""" + +# Do not change these lines - they are parsed by run-tests.sh +# TESTSUITE_COMMS: local +# TESTSUITE_NPROCS: 2 + +import time + +import numpy as np +from gest_api.vocs import VOCS + +from libensemble.gen_classes.queue_gen import QueueService +from libensemble.specs import ExitCriteria, LibeSpecs, SimSpecs + +NWORKERS = 2 +IDLE_TIMEOUT = 3.0 # seconds + + +def doubler_sim(InputArray, _, sim_specs): + out = np.zeros(1, dtype=sim_specs["out"]) + out["y"] = 2.0 * InputArray["x"][0] + return out + + +def main(): + vocs = VOCS(variables={"x": [-100.0, 100.0]}, objectives={"y": "MINIMIZE"}) + sim_specs = SimSpecs(sim_f=doubler_sim, inputs=["x"], outputs=[("y", float)]) + libE_specs = LibeSpecs( + nworkers=NWORKERS, + service_mode=True, + service_mode_idle_timeout=IDLE_TIMEOUT, + ) + exit_criteria = ExitCriteria(sim_max=1000) # large so idle_timeout fires first + + service = QueueService(vocs, sim_specs, libE_specs, exit_criteria) + service.start() + print(f"libE thread started ({NWORKERS} workers), idle_timeout={IDLE_TIMEOUT}s") + + service.submit({"x": 1.0}) + print("submitted 1 item, now going silent") + + t0 = time.time() + service.join(timeout=IDLE_TIMEOUT + 10) + elapsed = time.time() - t0 + print(f"libE thread exited after {elapsed:.2f}s (expected ~{IDLE_TIMEOUT}s)") + + assert not service.is_alive(), "libE didn't exit after idle_timeout" + print("PASS") + + +if __name__ == "__main__": + main() diff --git a/libensemble/tests/regression_tests/test_queue_gen_streaming.py b/libensemble/tests/regression_tests/test_queue_gen_streaming.py new file mode 100644 index 000000000..b7fd8e6cb --- /dev/null +++ b/libensemble/tests/regression_tests/test_queue_gen_streaming.py @@ -0,0 +1,73 @@ +"""Test for QueueGenerator / QueueService — streaming with replacement. + +Submit NWORKERS items so each worker has work, then as each result comes +back submit a replacement until TOTAL items have been submitted. Drain +the remaining results and shutdown. + +Run with: + python test_queue_gen_streaming.py +""" + +# Do not change these lines - they are parsed by run-tests.sh +# TESTSUITE_COMMS: local +# TESTSUITE_NPROCS: 4 + +import math +import time + +import numpy as np +from gest_api.vocs import VOCS + +from libensemble.gen_classes.queue_gen import QueueService +from libensemble.specs import ExitCriteria, LibeSpecs, SimSpecs + +NWORKERS = 4 +TOTAL = 16 # total items to submit (>> NWORKERS so the rolling window matters) + + +def doubler_sim(InputArray, _, sim_specs): + """Trivial sim: returns 2*x. Sleeps a bit to mimic real work.""" + time.sleep(0.5) + out = np.zeros(1, dtype=sim_specs["out"]) + out["y"] = 2.0 * InputArray["x"][0] + return out + + +def main(): + vocs = VOCS(variables={"x": [-100.0, 100.0]}, objectives={"y": "MINIMIZE"}) + sim_specs = SimSpecs(sim_f=doubler_sim, inputs=["x"], outputs=[("y", float)]) + libE_specs = LibeSpecs(nworkers=NWORKERS, service_mode=True, + service_mode_idle_timeout=30) + exit_criteria = ExitCriteria(sim_max=TOTAL) + + service = QueueService(vocs, sim_specs, libE_specs, exit_criteria) + service.start() + print(f"libE thread started ({NWORKERS} workers)") + + # Submit one item per worker + for i in range(NWORKERS): + service.submit({"x": float(i)}) + submitted = NWORKERS + print(f"submitted initial {NWORKERS} items") + + # As each result arrives, submit a replacement until TOTAL submitted + results = [] + for r in service.stream_results(TOTAL, timeout=60): + results.append(r) + print(f" got ({len(results)}/{TOTAL}): {r}") + if submitted < TOTAL: + service.submit({"x": float(submitted)}) + submitted += 1 + + service.shutdown() + print(f"\nsubmitted {submitted}, collected {len(results)}/{TOTAL}") + + ok = all(math.isclose(r["y"], 2 * r["x"]) for r in results) + print("PASS" if ok else "FAIL") + + service.join(timeout=10) + print("done") + + +if __name__ == "__main__": + main() diff --git a/libensemble/tools/persistent_support.py b/libensemble/tools/persistent_support.py index 7e9643e02..bbe7d4603 100644 --- a/libensemble/tools/persistent_support.py +++ b/libensemble/tools/persistent_support.py @@ -79,6 +79,11 @@ def recv(self, blocking: bool = True) -> (int, dict, npt.NDArray): if not isinstance(Work, dict): self.comm.push_to_buffer(tag, Work) return tag, Work, None + # PERSIS_STOP with a Work dict (final_gen_send path): if there are + # no rows pending, manager skipped the second send — return now + # rather than blocking on a recv that will never complete. + if not len(Work.get("libE_info", {}).get("H_rows", [])): + return tag, Work, None else: logger.debug(f"Persistent {self.calc_str} received work request from manager")