Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/data_structures/libE_specs/libE_specs_general.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,17 @@ General
**gen_workers** [list of ints]:
List of workers that should run only generators. All other workers will run
only simulator functions.

**service_mode** [bool] = ``False``:
If ``True``, the manager tolerates idle workers + an alloc returning no work
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe this docstring could emphasize usage and implications more than internal implementation.

e.g. "If True libEnsemble waits for work idly, like from queue-backed generators or MCP servers, instead of exiting immediately if work isn't available. Termination is still the user's responsibility, via "exit_criteria" or other external signals."

(instead of asserting). Used when libEnsemble is driven by an external
producer (e.g. a queue-backed Generator fed by an MCP server) and may
legitimately have nothing to dispatch for periods of time. The manager
sleeps briefly between checks instead of panicking. Termination is the
caller's responsibility (e.g. via ``exit_criteria`` or external stop).

**service_mode_idle_timeout** [float]:
In ``service_mode``, exit after this many seconds with no active workers
and no work dispatched. Default ``None`` means run forever waiting for
new submissions. Useful so an external producer doesn't leave libE running
after it goes silent.
212 changes: 212 additions & 0 deletions libensemble/gen_classes/queue_gen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
"""Queue-backed generator: work comes from an external producer (e.g. an MCP
tool, an LLM agent, a REST API) via an input queue. Completed results go back
out via an output queue.

This lets libE run in 'service mode' — driven by an external loop instead of
its own gen function.

Usage:
from queue import Queue
from gest_api.vocs import VOCS
from libensemble.gen_classes.queue_gen import QueueGenerator

in_q = Queue()
out_q = Queue()
gen = QueueGenerator(VOCS(variables={"x": [-1, 1]}),
input_queue=in_q, output_queue=out_q)
in_q.put({"x": 0.5}) # external producer feeds work
"""
import threading
import time
from queue import Empty, Queue
from typing import Any, List, Optional

from gest_api import Generator
from gest_api.vocs import VOCS


_SHUTDOWN = object() # sentinel: external producer signals "no more work"


class QueueGenerator(Generator):
"""Generator that pulls work from an external Queue and pushes results back.

suggest(n):
- Blocks up to ``poll_timeout`` waiting for the FIRST item (so the libE
manager doesn't spin hot when nothing is queued).
- Then drains up to ``n - 1`` more items non-blockingly.
- Returns [] on timeout (libE will call again).
- Returns [] if a shutdown sentinel is seen.

ingest(results):
- Forwards each result dict to the output queue verbatim.
"""

def __init__(
self,
vocs: VOCS,
*,
input_queue: Queue,
output_queue: Queue,
poll_timeout: float = 1.0,
):
self.vocs = vocs
self.input_queue = input_queue
self.output_queue = output_queue
self.poll_timeout = poll_timeout
self._shutdown_seen = False
super().__init__(vocs)

def _validate_vocs(self, vocs: VOCS) -> None:
assert len(self.vocs.variable_names), "VOCS must contain variables."

def suggest(self, num_points: Optional[int]) -> List[dict]:
if self._shutdown_seen:
return []
n = num_points or 1
items: List[dict] = []
try:
first = self.input_queue.get(timeout=self.poll_timeout)
except Empty:
return []
if first is _SHUTDOWN:
self._shutdown_seen = True
return []
items.append(first)
for _ in range(n - 1):
try:
nxt = self.input_queue.get_nowait()
except Empty:
break
if nxt is _SHUTDOWN:
self._shutdown_seen = True
break
items.append(nxt)
return items

def ingest(self, results: List[dict]) -> None:
for r in results:
self.output_queue.put(r)

def finalize(self, results: List[dict] = None, *args: Any, **kwargs: Any):
if results:
self.ingest(results)
return None

@staticmethod
def shutdown_sentinel():
"""External producer puts this on the input queue to signal end."""
return _SHUTDOWN


class QueueService:
"""Service-mode wrapper: spawns libE in a thread with a QueueGenerator and
exposes submit/get_completed/shutdown to an external producer.

Hides the queue/thread/generator plumbing every producer would otherwise
repeat. The producer just creates a service and feeds it work:

service = QueueService(vocs, sim_specs, libE_specs, exit_criteria)
service.start()
service.submit({"x": 1.0})
for r in service.get_completed():
...
service.shutdown()
"""

def __init__(
self,
vocs: VOCS,
sim_specs,
libE_specs,
exit_criteria,
*,
persis_in: Optional[List[str]] = None,
batch_size: int = 0,
poll_timeout: float = 1.0,
):
from libensemble import Ensemble
from libensemble.specs import GenSpecs

# final_gen_send must be True for QueueGenerator — otherwise the last
# batch of completed sims is never ingested and never reaches the
# output_queue, so consumers silently miss results.
libE_specs.final_gen_send = True

self.input_queue: Queue = Queue()
self.output_queue: Queue = Queue()
self._gen = QueueGenerator(
vocs,
input_queue=self.input_queue,
output_queue=self.output_queue,
poll_timeout=poll_timeout,
)
gen_specs = GenSpecs(
generator=self._gen,
vocs=vocs,
persis_in=persis_in or [],
batch_size=batch_size,
)
self._ensemble = Ensemble(sim_specs, gen_specs, exit_criteria, libE_specs)
self._thread: Optional[threading.Thread] = None

def start(self) -> None:
"""Spawn the libE thread."""
self._thread = threading.Thread(target=self._ensemble.run, daemon=True)
self._thread.start()

def submit(self, item: dict) -> None:
"""Submit one work item."""
self.input_queue.put(item)

def get_completed(self) -> List[dict]:
"""Drain all completed results (non-blocking)."""
out = []
while True:
try:
out.append(self.output_queue.get_nowait())
except Empty:
break
return out

def collect_results(self, n: int, timeout: float = 60.0) -> List[dict]:
"""Block-drain until ``n`` results collected or ``timeout`` elapses.
Returns whatever was collected (may be < n on timeout)."""
results: List[dict] = []
deadline = time.time() + timeout
while len(results) < n and time.time() < deadline:
try:
results.append(self.output_queue.get(timeout=1))
except Empty:
pass
return results

def stream_results(self, n: Optional[int] = None, timeout: float = 60.0):
"""Yield results as they arrive. Stops after ``n`` yielded or
``timeout`` seconds elapse with no new result. ``n=None`` streams
indefinitely until timeout."""
deadline = time.time() + timeout
yielded = 0
while (n is None or yielded < n) and time.time() < deadline:
try:
r = self.output_queue.get(timeout=1)
except Empty:
continue
deadline = time.time() + timeout # reset on activity
yielded += 1
yield r

def shutdown(self, wait: bool = False) -> None:
"""Signal libE to stop accepting new work and drain in-flight.
If ``wait=True``, block until the libE thread exits."""
self.input_queue.put(_SHUTDOWN)
if wait:
self.join()

def join(self, timeout: Optional[float] = None) -> None:
"""Wait for libE thread to exit."""
if self._thread is not None:
self._thread.join(timeout=timeout)

def is_alive(self) -> bool:
return self._thread is not None and self._thread.is_alive()
25 changes: 22 additions & 3 deletions libensemble/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ def __init__(
self.gen_specs = gen_specs
self.exit_criteria = exit_criteria
self.elapsed = lambda: timer.elapsed
self._service_idle_start: float | None = None # service_mode idle tracker
self.wcomms = wcomms
self.WorkerExc = False
self.persis_pending: list[int] = []
Expand Down Expand Up @@ -714,9 +715,27 @@ def run(self, persis_info: dict) -> tuple[dict, int, int]:
self._check_work_order(Work[w], w)
self._send_work_order(Work[w], w)
self._update_state_on_alloc(Work[w], w)
assert self.term_test() or any(
self.W["active"] != 0
), "alloc_f did not return any work, although all workers are idle."
if not (self.term_test() or any(self.W["active"] != 0)):
if self.libE_specs.get("service_mode"):
# Service mode: external producer may have nothing
# to dispatch right now. Throttle and re-poll instead
# of asserting (which would treat this as deadlock).
idle_timeout = self.libE_specs.get("service_mode_idle_timeout")
if idle_timeout is not None:
if self._service_idle_start is None:
self._service_idle_start = time.time()
elif time.time() - self._service_idle_start > idle_timeout:
logger.info(
f"service_mode idle for {idle_timeout}s with no work, exiting"
)
break
time.sleep(0.1)
continue
raise AssertionError(
"alloc_f did not return any work, although all workers are idle."
)
# work was dispatched OR workers active OR terminating — reset idle clock
self._service_idle_start = None
except WorkerException as e:
report_worker_exc(e)
raise LoggedException(e.args[0], e.args[1]) from None
Expand Down
18 changes: 18 additions & 0 deletions libensemble/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,24 @@ class LibeSpecs(BaseModel):
By default, the generator runs on the manager process as a thread (Worker 0).
"""

service_mode: bool = False
"""
If True, the manager tolerates idle workers + an alloc returning no work
(instead of asserting). Used when libEnsemble is driven by an external
producer (e.g. a queue-backed Generator fed by an MCP server) and may
legitimately have nothing to dispatch for periods of time. The manager
sleeps briefly between checks instead of panicking. Termination is the
caller's responsibility (e.g. via ``exit_criteria`` or external stop).
"""

service_mode_idle_timeout: float | None = None
"""
In ``service_mode``, exit after this many seconds with no active workers
and no work dispatched. ``None`` (default) means run forever waiting for
new submissions. Useful so an MCP server doesn't leave libE running after
the agent goes silent.
"""

mpi_comm: object | None = None
""" libEnsemble MPI communicator. Default: ``MPI.COMM_WORLD``"""

Expand Down
66 changes: 66 additions & 0 deletions libensemble/tests/regression_tests/test_queue_gen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""Test for QueueGenerator / QueueService

Uses QueueService to spin up libE in a thread with a trivial doubler sim,
submits N work items, drains results, shuts down, joins.


Run with:
python test_queue_gen.py
"""

# Do not change these lines - they are parsed by run-tests.sh
# TESTSUITE_COMMS: local
# TESTSUITE_NPROCS: 4

import math
import time

import numpy as np
from gest_api.vocs import VOCS

from libensemble.gen_classes.queue_gen import QueueService
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a different namespace like libensemble.services.QueueService ?

from libensemble.specs import ExitCriteria, LibeSpecs, SimSpecs

NWORKERS = 4
N_SUBMITS = 10


def doubler_sim(InputArray, _, sim_specs):
"""Trivial sim: returns 2*x. Sleeps a bit to mimic real work."""
time.sleep(0.5)
out = np.zeros(1, dtype=sim_specs["out"])
out["y"] = 2.0 * InputArray["x"][0]
return out


def main():
vocs = VOCS(variables={"x": [-100.0, 100.0]}, objectives={"y": "MINIMIZE"})
sim_specs = SimSpecs(sim_f=doubler_sim, inputs=["x"], outputs=[("y", float)])
libE_specs = LibeSpecs(nworkers=NWORKERS, service_mode=True)
exit_criteria = ExitCriteria(sim_max=N_SUBMITS)

service = QueueService(vocs, sim_specs, libE_specs, exit_criteria)
service.start()
print(f"libE thread started ({NWORKERS} workers)")

# Submit work (1, 2, 3....), then signal shutdown.
for i in range(N_SUBMITS):
service.submit({"x": float(i)})
service.shutdown()
print(f"submitted {N_SUBMITS} items + shutdown sentinel")

# Block-drain until all results collected (or timeout)
results = service.collect_results(N_SUBMITS, timeout=60)
print(f"\ncollected {len(results)}/{N_SUBMITS}")

# Verify y == 2*x
ok = all(math.isclose(r["y"], 2 * r["x"]) for r in results)
print("PASS" if ok else "FAIL")

# Wait for libE to wind down
service.join(timeout=10)
print("done")


if __name__ == "__main__":
main()
Loading
Loading