Skip to content

Dramatically Updated Globus Compute coupling #1734

@jlnav

Description

@jlnav

Claude's artifacts:

Globus Compute Integration Overhaul

Shared Infrastructure

Session cache utilitylibensemble/utils/globus_compute.py

GCSession provides a per-process, thread-safe cache for Globus Compute
executor instances. All GC code paths (manager-side, user-facing executor)
share it.

  • get_or_create(endpoint_id, func)(executor, func_id) — cached
  • get_or_create_executor(endpoint_id)executor — no function registration
  • register_function(endpoint_id, func)(executor, func_id) — always registers; caller caches the func_id
  • clear() — reset cache (for tests)
  • threading.Lock guards all cache access
  • ModuleNotFoundError handled gracefully (SDK not installed → returns None)

Approach 1: Manager-side GC submission (workerless)

When sim_specs["globus_compute_endpoint"] is set, libEnsemble enters
GC-only mode: no worker processes are launched; the manager submits
sim work directly to Globus Compute and polls futures for results.

Detection — libE.py

In libE(), after spec validation:

  • globus_compute_endpoint set in sim_specs
  • libE_specs["_gc_only"] = True; comms forced to "local"
  • If gen_on_worker was set, it is overridden to False with a log message
    (generator always runs on the manager in GC-only mode)

libE_local() dispatches to _libE_local_gc_only() which:

  • Skips start_proc_team(), resource setup, and executor wiring
  • Passes empty wcomms=[] to manager()

Virtual workers — manager.py

nworkers is repurposed as virtual concurrency (defaults to 1 when
unset/zero). In Manager.__init__, N virtual worker entries are
appended to the W array after the gen worker slot (w=0). Each virtual
worker has gen_worker=False, active=0, persis_state=0. The
corresponding wcomms entries are None.

The allocator (only_persistent_gens) sees virtual workers the same
way it sees real workers — any_idle_workers is True when virtual slots
are idle, and avail_worker_ids(persistent=False, gen_workers=False)
returns idle virtual slot IDs. No allocator changes required.

GC submission / receive — manager.py

_init_gc() — registers sim_f with GCSession, stores the GC
executor and func_id, inspects sim_f signature for arg truncation.

_gc_submit(Work, w) — for each sim_id in Work:

  • Extracts calc_in from history
  • Builds a picklable libE_info (comm=None, no executor)
  • Truncates args to match sim_f signature
  • Submits via submit_to_registered_function
  • Stores future → (sim_id, virtual_w) in _gc_futures

_normalize_gc_result(result) (static) — normalizes a sim_f
return value to a consistent 3-tuple (out, persis_info, calc_status).
Handles all return conventions:

  • 3-tuple (H_o, persis_info, calc_status) — legacy passthrough
  • 2-tuple (H_o, persis_info) — gest-api wrappers; defaults calc_status
    to WORKER_DONE
  • 2-tuple (H_o, int|str) — treats second element as calc_status
  • 1-tuple or bare value — wraps with empty persis_info and WORKER_DONE

Mirrors the normalization logic in worker.py:293-305.

_gather_gc_results(persis_info) — replaces _receive_from_workers:

  • Polls the gen worker comm (w=0) so generator output reaches history
  • Skips virtual workers (wcomms[w] is None)
  • Drains completed GC futures, passes each result through
    _normalize_gc_result, constructs D_recv dicts, calls
    _update_state_on_worker_msg to update history and mark virtual
    workers idle
  • Failed futures → TASK_FAILED status; manager continues

_run_gc_only(persis_info) — replaces the normal run() main loop.
Same structure: term_test → kill_cancelled → gather_gc_results →
alloc_work → gc_submit. Dispatched from Manager.run() via
libE_specs["_gc_only"] check.

_gc_final_receive_and_kill(persis_info) — sends PERSIS_STOP to
persistent gen worker via real comm (w=0), drains remaining GC futures
(via _normalize_gc_result), cleans up gen thread.

_kill_cancelled_sims() — in GC mode, cancels in-flight GC futures
directly instead of sending kill signals via wcomms.

_gc_futures structure

dict[concurrent.futures.Future, tuple[int, int]]  # future → (sim_id, virtual_w)

Approach 2: GlobusComputeExecutor (user-facing)

A new executor class that users create in their calling script and use
inside sim functions via info["executor"].

libensemble/executors/globus_compute_executor.py

GlobusComputeTask(Task) — wraps a concurrent.futures.Future:

  • poll(): future.done()RUNNING / FINISHED / FAILED
  • wait(timeout): future.result(timeout=…)
  • kill(): future.cancel()USER_KILLED
  • stdout / stderr: empty strings (stubs)

GlobusComputeExecutor(Executor):

  • Does not call super().__init__() — avoids auto-setting Executor.executor
  • _ensure_gc(): lazily creates the GC executor via GCSession.get_or_create_executor
  • _get_func_id(func): registers function with GC executor, caches by id(func)
  • submit(func=…) / submit(app_name=…) / submit(calc_type=…)GlobusComputeTask
  • register_app(…, pyobj=callable): stores Python callable for later submission

User flow

from libensemble.executors.globus_compute_executor import GlobusComputeExecutor

exctr = GlobusComputeExecutor(endpoint_id="...")
Executor.executor = exctr

def my_sim(H, _, sim_specs, info):
    exctr = info["executor"]
    task = exctr.submit(func=my_remote_func, app_args="...")
    while not task.finished:
        task.poll()
        if exctr.manager_kill_received():
            task.kill()
            break
        time.sleep(0.1)
    return H_o, UNSET_TAG

Implementation status

Step What Where Status
1 GCSession utility libensemble/utils/globus_compute.py Done
2 GlobusComputeTask(Task) libensemble/executors/globus_compute_executor.py Done
3 GlobusComputeExecutor(Executor) libensemble/executors/globus_compute_executor.py Done
4 Approach 1: detector + GC loop libE.py, manager.py Done
5 gest-api sim support (_normalize_gc_result) manager.py Done
6 Remove legacy GlobusComputeRunner runners.py, libE.py, specs.py, tests Done
7 Unit tests (35 total) tests/unit_tests/test_globus_compute.py Done
8 Docs: platforms GC section (2 modes) docs/platforms/platforms_index.rst Done
9 Docs: GlobusComputeExecutor API reference docs/executor/ex_globus_compute.rst Done
10 Docs: executor index + overview cross-refs docs/executor/ex_index.rst, ex_overview.rst Done
11 Docs: install instructions docs/advanced_installation/advanced_installation.rst Done
12 Docs: update running_libE.rst GC reference docs/running_libE.rst Done
13 Regression test: manager-side GC tests/regression_tests/ Pending
14 Regression test: executor tests/regression_tests/ Pending

Edge cases / notes

  1. GCSession cache across tests — class-level _instances persists
    between pytest functions. Tests must call GCSession.clear() in
    setup_method.

  2. manager_kill_received requires commmanager_poll() asserts
    self.comm is not None. Users must call
    set_worker_info(comm=…, workerid=…) before kill-polling.

  3. Mock path — old tests patched "globus_compute_sdk.Executor"
    directly (fails without SDK installed). New approach patches
    GCSession._create_executor via mock.patch.object.

  4. Task requires app — base Task.__init__ asserts app is not None.
    GlobusComputeTask bypasses super().__init__() and creates a minimal
    Application("", name=func.__name__, …) for raw callable submissions.

  5. _send_work_order to gen in GC mode
    _gc_final_receive_and_kill sends PERSIS_STOP via self.wcomms[w].
    This works because the gen worker (w=0) has a real QCommThread, not
    None.

  6. _gather_gc_results polls gen comm — must poll wcomms[0] (gen
    worker thread) in addition to GC futures, otherwise generator output
    never reaches the history and nothing gets submitted. Virtual worker
    comms (None) are skipped via is not None guard.

  7. Resources.init_resources still runs — called in libE() before
    GC-only detection. Harmless (just detects local resources) but
    unnecessary. Could be skipped in a future cleanup.

  8. gest-api sim return normalizationgest_api_sim returns a
    2-tuple (H_o, persis_info) while the original GC drain code expected
    a 3-tuple (H_o, persis_info, calc_status). _normalize_gc_result
    handles both, defaulting calc_status to WORKER_DONE for 2-tuples.
    This matches how worker.py:293-305 normalizes sim_f returns.

  9. gest-api sim via GC requires picklable components — when
    SimSpecs(simulator=my_func, vocs=vocs) is used in GC-only mode,
    sim_f is set to gest_api_sim (the wrapper). The wrapper is
    submitted to GC along with sim_specs containing simulator and
    vocs. Both the user's callable and the VOCS object must be
    picklable for GC serialization.

Goal

  • Overhaul Globus Compute (GC) integration in libEnsemble: implement manager-side GC submission (GC-only mode) and a user-facing GlobusComputeExecutor, remove legacy GlobusComputeRunner, update docs.

Constraints & Preferences

  • Option A chosen for gest-api support: submit gest_api_sim wrapper to GC (not raw simulator), just fix return value handling
  • calc_status defaults to WORKER_DONE for 2-tuple returns
  • No new tutorial for now; defer until regression test scripts exist
  • Keep inline breadcrumb nav style in executor docs
  • Non-persistence caveat applies to all GC modes; clarify in docs
  • Short cross-reference paragraph in ex_overview.rst (not full subsection)
  • Legacy GlobusComputeRunner fully removed; only two GC approaches remain (GC-only mode + GlobusComputeExecutor)
  • gen_on_worker is now overridden to False with a log message when globus_compute_endpoint is set

Progress

Done

  • GCSession utility (libensemble/utils/globus_compute.py) — per-process thread-safe cache for GC executors
  • GlobusComputeTask and GlobusComputeExecutor (libensemble/executors/globus_compute_executor.py)
  • Manager-side GC loop: _init_gc, _gc_submit, _normalize_gc_result, _gather_gc_results, _run_gc_only, _gc_final_receive_and_kill, _kill_cancelled_sims in manager.py
  • _normalize_gc_result static method handles 2-tuple (gest-api) and 3-tuple (legacy) returns, mirroring worker.py:293-305
  • WORKER_DONE added to manager.py imports
  • Removed GlobusComputeRunner class + dispatch from runners.py, removed GCSession import from runners.py
  • Updated libE.py: globus_compute_endpoint always triggers GC-only mode; gen_on_worker overridden to False
  • Updated specs.py globus_compute_endpoint docstring to describe GC-only mode
  • Docs: rewrote GC section in platforms_index.rst (2 modes: GC-only + executor; legacy removed)
  • Docs: created docs/executor/ex_globus_compute.rst API reference page
  • Docs: updated ex_index.rst (nav + toctree), ex_overview.rst (cross-ref paragraph)
  • Docs: expanded advanced_installation.rst with pip install globus-compute-sdk
  • Docs: updated running_libE.rst GC reference wording
  • Removed all GlobusComputeRunner tests from test_ufunc_runners.py and test_globus_compute.py
  • All 38 tests pass (35 in test_globus_compute.py + 3 in test_ufunc_runners.py)
  • Updated PLAN.md throughout

In Progress

  • (none)

Blocked

  • (none)

Key Decisions

  • Option A for gest-api: submit gest_api_sim wrapper (not raw simulator) to GC; only fix return normalization
  • Removed legacy GlobusComputeRunner entirely — Runner.from_specs() no longer dispatches to it
  • gen_on_worker always overridden when globus_compute_endpoint set — no silent ignore, explicit log + override to False
  • _normalize_gc_result as static method on Manager — reusable by both _gather_gc_results and _gc_final_receive_and_kill

Next Steps

  • Step 13 in PLAN.md: regression test for manager-side GC (tests/regression_tests/)
  • Step 14 in PLAN.md: regression test for GlobusComputeExecutor (tests/regression_tests/)
  • Grep for any remaining stale GlobusComputeRunner references (rg not available; use grep -rn)

Critical Context

  • gest_api_sim returns 2-tuple (H_o, persis_info)_normalize_gc_result defaults calc_status to WORKER_DONE (value 35)
  • Both simulator callable and VOCS object must be picklable for GC serialization
  • GCSession._instances (class-level dict) persists across pytest functions — tests must call GCSession.clear() in setup_method
  • Pre-existing test failures in test_executor.py / test_executor_gpus.py (mpicc compile failure) and test_launcher.py (missing file) — unrelated to GC changes
  • GlobusComputeExecutor does not call super().__init__() to avoid auto-setting Executor.executor
  • GlobusComputeTask bypasses super().__init__() and creates minimal Application for raw callables

Relevant Files

  • libensemble/utils/globus_compute.py: GCSession cache (shared by manager + executor)
  • libensemble/executors/globus_compute_executor.py: GlobusComputeExecutor + GlobusComputeTask
  • libensemble/manager.py: all _gc_* methods + _normalize_gc_result; WORKER_DONE import added
  • libensemble/libE.py: GC-only detection at ~line 269; _libE_local_gc_only at ~line 539
  • libensemble/utils/runners.py: GlobusComputeRunner removed; GCSession import removed
  • libensemble/specs.py: globus_compute_endpoint field on SimSpecs (~line 117)
  • libensemble/sim_funcs/gest_api_wrapper.py: gest_api_sim wrapper (2-tuple return)
  • libensemble/tests/unit_tests/test_globus_compute.py: 35 tests (GCSession, Task, Executor, normalize, gather)
  • libensemble/tests/unit_tests/test_ufunc_runners.py: 3 remaining tests (GC runner tests removed)
  • docs/platforms/platforms_index.rst: GC section with 2 subsections (GC-only + executor)
  • docs/executor/ex_globus_compute.rst: new API reference page
  • docs/executor/ex_index.rst: updated nav + toctree
  • docs/executor/ex_overview.rst: GC cross-reference paragraph
  • docs/advanced_installation/advanced_installation.rst: expanded GC install section
  • docs/running_libE.rst: updated GC wording (~line 87)
  • PLAN.md: master tracking document

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions