Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions libensemble/libE.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,18 @@ def libE(

libE_funcs = {"mpi": libE_mpi, "tcp": libE_tcp, "local": libE_local, "threads": libE_local}

if sim_specs.get("globus_compute_endpoint"):
libE_specs["_gc_only"] = True
if libE_specs.get("gen_on_worker"):
logger.info("GC-only mode: gen_on_worker is ignored (generator runs on manager)")
libE_specs["gen_on_worker"] = False
if libE_specs.get("comms", "mpi") != "local":
libE_specs["comms"] = "local"
logger.info("GC-only mode: switching to local comms (no workers needed)")
if not libE_specs.get("disable_resource_manager"):
libE_specs["disable_resource_manager"] = True
logger.info("GC-only mode: disabling resource manager (no local nodes)")

Resources.init_resources(libE_specs, platform_info)
if Executor.executor is not None:
Executor.executor.add_platform_info(platform_info)
Expand Down
206 changes: 203 additions & 3 deletions libensemble/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
============================
"""

import concurrent.futures
import cProfile
import glob
import inspect
import logging
import os
import platform
Expand All @@ -30,11 +32,14 @@
MAN_SIGNAL_KILL,
PERSIS_STOP,
STOP_TAG,
TASK_FAILED,
WORKER_DONE,
calc_type_strings,
)
from libensemble.resources.resources import Resources
from libensemble.tools.fields_keys import protected_libE_fields
from libensemble.tools.tools import _USER_CALC_DIR_WARNING
from libensemble.utils.globus_compute import GCSession
from libensemble.utils.misc import _WorkerIndexer, extract_H_ranges
from libensemble.utils.output_directory import EnsembleDirectory
from libensemble.utils.timer import Timer
Expand Down Expand Up @@ -243,6 +248,15 @@ def __init__(
local_worker_comm = self._run_additional_worker(hist, sim_specs, gen_specs, libE_specs)
self.wcomms = [local_worker_comm] + self.wcomms

if libE_specs.get("_gc_only"):
n_virtual = libE_specs.get("nworkers", 1) or 1
virtual_W = np.zeros(n_virtual, dtype=Manager.worker_dtype)
start_id = len(self.W)
virtual_W["worker_id"] = np.arange(start_id, start_id + n_virtual)
virtual_W["gen_worker"] = False
self.W = np.concatenate([self.W, virtual_W])
self.wcomms = self.wcomms + [None] * n_virtual

self.W = _WorkerIndexer(self.W, 1 - gen_on_worker) # if gen on worker, then no additional worker
self.wcomms = _WorkerIndexer(self.wcomms, 1 - gen_on_worker)

Expand Down Expand Up @@ -574,9 +588,18 @@ def _kill_cancelled_sims(self) -> None:
logger.debug(f"Manager sending kill signals to H indices {kill_sim_rows}")
kill_ids = self.hist.H["sim_id"][kill_sim_rows]
kill_on_workers = self.hist.H["sim_worker"][kill_sim_rows]
for w in kill_on_workers:
self.wcomms[w].send(STOP_TAG, MAN_SIGNAL_KILL)
self.hist.H["kill_sent"][kill_ids] = True

if self.libE_specs.get("_gc_only"):
sim_ids_to_kill = set(kill_ids)
for future, (sim_id, w) in list(self._gc_futures.items()):
if sim_id in sim_ids_to_kill:
future.cancel()
del self._gc_futures[future]
self.hist.H["kill_sent"][kill_ids] = True
else:
for w in kill_on_workers:
self.wcomms[w].send(STOP_TAG, MAN_SIGNAL_KILL)
self.hist.H["kill_sent"][kill_ids] = True

# --- Handle termination

Expand Down Expand Up @@ -692,13 +715,190 @@ def _alloc_work(self, H: npt.NDArray, persis_info: dict) -> dict:

return output

def _init_gc(self) -> None:
endpoint = self.sim_specs.get("globus_compute_endpoint", "")
if not endpoint:
raise ValueError("_gc_only mode requires globus_compute_endpoint in sim_specs")

self._gc_executor, self._gc_fid = GCSession.get_or_create(endpoint, self.sim_specs["sim_f"])
self._gc_futures: dict[concurrent.futures.Future, tuple[int, int]] = {}
self._gc_nparams = len(inspect.signature(self.sim_specs["sim_f"]).parameters)

def _gc_submit(self, Work: dict, w: int) -> None:
sim_ids = Work["libE_info"]["H_rows"]
H_fields = Work["H_fields"]

for sim_id in sim_ids:
calc_in = np.empty(1, dtype=[(name, self.hist.H.dtype.fields[name][0]) for name in H_fields])
for name in H_fields:
calc_in[name] = self.hist.H[name][sim_id]

p_info = Work.get("persis_info", {})

libE_info = dict(Work["libE_info"])
libE_info["comm"] = None
libE_info.pop("executor", None)

args = [calc_in, p_info, self.sim_specs, libE_info][: self._gc_nparams]
future = self._gc_executor.submit_to_registered_function(self._gc_fid, args)
self._gc_futures[future] = (sim_id, w)

@staticmethod
def _normalize_gc_result(result):
if isinstance(result, (tuple, list)):
if len(result) >= 3:
return result[0], result[1], result[2]
if len(result) == 2:
if isinstance(result[1], (int, str)):
return result[0], {}, result[1]
return result[0], result[1], WORKER_DONE
return result[0], {}, WORKER_DONE
return result, {}, WORKER_DONE

def _gather_gc_results(self, persis_info: dict) -> dict:
time.sleep(0.0001)

new_stuff = True
while new_stuff:
new_stuff = False
for w in self.W["worker_id"]:
if self.wcomms[w] is not None and self.wcomms[w].mail_flag():
new_stuff = True
self._handle_msg_from_worker(persis_info, w)

done = [f for f in self._gc_futures if f.done()]
for future in done:
sim_id, w = self._gc_futures.pop(future)
try:
out, p_info, calc_status = self._normalize_gc_result(future.result())
D_recv = {
"calc_type": EVAL_SIM_TAG,
"calc_status": calc_status,
"calc_out": out,
"libE_info": {"keep_state": False, "H_rows": [sim_id]},
"persis_info": p_info or {},
}
except Exception as e:
logger.error(f"GC task failed for sim_id {sim_id}: {e}")
D_recv = {
"calc_type": EVAL_SIM_TAG,
"calc_status": TASK_FAILED,
"calc_out": None,
"libE_info": {"keep_state": False, "H_rows": [sim_id]},
"persis_info": {},
}
self._update_state_on_worker_msg(persis_info, D_recv, w)

self._init_every_k_save()
return persis_info

def _gc_cancel_futures(self) -> None:
for future in list(self._gc_futures.keys()):
future.cancel()
self._gc_futures.clear()

def _run_gc_only(self, persis_info: dict) -> tuple[dict, int, int]:
self._init_gc()
try:
while not self.term_test():
self._kill_cancelled_sims()
persis_info = self._gather_gc_results(persis_info)
Work, persis_info, flag = self._alloc_work(self.hist.trim_H(), persis_info)
if flag:
break

for w in Work:
if self._sim_max_given():
break
self._check_work_order(Work[w], w)
if Work[w]["tag"] == EVAL_GEN_TAG:
self._send_work_order(Work[w], w)
else:
self._gc_submit(Work[w], w)
self._update_state_on_alloc(Work[w], w)

assert self.term_test() or any(
self.W["active"] != 0
), "alloc_f did not return any work, although all workers are idle."
except WorkerException as e:
report_worker_exc(e)
raise LoggedException(e.args[0], e.args[1]) from None
except Exception as e:
logger.error(traceback.format_exc())
raise LoggedException(e.args) from None
finally:
result = self._gc_final_receive_and_kill(persis_info)
self.wcomms = []
sys.stdout.flush()
sys.stderr.flush()
return result

def _gc_final_receive_and_kill(self, persis_info: dict) -> tuple[dict, int, int]:
if any(self.W["persis_state"]):
for w in self.W["worker_id"][self.W["persis_state"] > 0]:
logger.debug(f"Manager sending PERSIS_STOP to worker {w}")
if self.libE_specs.get("final_gen_send", False):
rows_to_send = np.where(self.hist.H["sim_ended"] & ~self.hist.H["gen_informed"])[0]
work = {
"H_fields": self.gen_specs["persis_in"],
"persis_info": persis_info.get(w),
"tag": PERSIS_STOP,
"libE_info": {"persistent": True, "H_rows": rows_to_send},
}
self._check_work_order(work, w, force=True)
self._send_work_order(work, w)
self.hist.update_history_to_gen(rows_to_send)
else:
self.wcomms[w].send(PERSIS_STOP, MAN_SIGNAL_KILL)
if not self.W[w]["active"]:
self.W[w]["active"] = self.W[w]["persis_state"]
self.persis_pending.append(w)

exit_flag = 0
while self._gc_futures:
done = [f for f in self._gc_futures if f.done()]
if not done:
time.sleep(0.1)
continue
for future in done:
sim_id, w = self._gc_futures.pop(future)
try:
out, p_info, calc_status = self._normalize_gc_result(future.result())
D_recv = {
"calc_type": EVAL_SIM_TAG,
"calc_status": calc_status,
"calc_out": out,
"libE_info": {"keep_state": False, "H_rows": [sim_id]},
"persis_info": p_info or {},
}
except Exception:
continue
self._update_state_on_worker_msg(persis_info, D_recv, w)

if 0 in self.W["worker_id"] and self.wcomms[0] is not None:
while self.wcomms[0].mail_flag():
self._handle_msg_from_worker(persis_info, 0)
self.wcomms[0].send(STOP_TAG, MAN_SIGNAL_FINISH)

self._init_every_k_save(complete=True)
self._clean_up_thread()

if self.live_data is not None:
self.live_data.finalize(self.hist)

persis_info["num_gens_started"] = 0
return persis_info, exit_flag, self.elapsed()

# --- Main loop

def run(self, persis_info: dict) -> tuple[dict, int, int]:
"""Runs the manager"""
logger.debug(f"Manager initiated on node {socket.gethostname()}")
logger.info(f"Manager exit_criteria: {self.exit_criteria}")

if self.libE_specs.get("_gc_only"):
return self._run_gc_only(persis_info)

# Continue receiving and giving until termination test is satisfied
try:
while not self.term_test():
Expand Down
112 changes: 112 additions & 0 deletions libensemble/tests/functionality_tests/test_gc_manager_submit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""
Tests libEnsemble's manager-side Globus Compute submission (GC-only mode).

The manager submits simulation work directly to a mocked Globus Compute
endpoint instead of dispatching to local workers. The generator runs on
the manager thread as normal.

Execute via:
python test_gc_manager_submit.py

No MPI or local workers are needed -- GC-only mode uses local comms with
nworkers acting as the maximum number of concurrent in-flight GC futures.
"""

# Do not change these lines - they are parsed by run-tests.sh
# TESTSUITE_COMMS: local
# TESTSUITE_NPROCS: 1

import concurrent.futures
from unittest import mock

import numpy as np
from gest_api.vocs import VOCS

from libensemble.gen_classes.sampling import UniformSample
from libensemble.libE import libE
from libensemble.specs import ExitCriteria, GenSpecs, LibeSpecs, SimSpecs
from libensemble.utils.globus_compute import GCSession


def norm_sim(H, persis_info, sim_specs, libE_info):
"""Evaluate the Euclidean norm of each input point."""
H_o = np.zeros(len(H), dtype=sim_specs["out"])
for i in range(len(H)):
H_o["f"][i] = float(np.linalg.norm(H["x"][i]))
return H_o, persis_info


def _make_done_future(value):
f = concurrent.futures.Future()
f.set_result(value)
return f


def _make_gc_executor(sim_f):
executor = mock.MagicMock()
executor.register_function.return_value = "mock-fid"

def fake_submit(fid, args):
result = sim_f(*args)
return _make_done_future(result)

executor.submit_to_registered_function.side_effect = fake_submit
return executor


if __name__ == "__main__":
GCSession.clear()

ENDPOINT = "mock-endpoint-uuid"
SIM_MAX = 20
N_VIRTUAL_WORKERS = 4

vocs = VOCS(
variables={"x0": [-3.0, 3.0], "x1": [-2.0, 2.0]},
objectives={"f": "MINIMIZE"},
)

sim_specs = SimSpecs(
sim_f=norm_sim,
inputs=["x"],
outputs=[("f", float)],
globus_compute_endpoint=ENDPOINT,
)

gen_specs = GenSpecs(
generator=UniformSample(vocs),
inputs=["sim_id"],
persis_in=["f", "sim_id"],
outputs=[("x", float, (2,))],
batch_size=N_VIRTUAL_WORKERS,
)

libE_specs = LibeSpecs(
nworkers=N_VIRTUAL_WORKERS,
comms="local",
disable_log_files=True,
safe_mode=False,
)

exit_criteria = ExitCriteria(sim_max=SIM_MAX)

mock_executor = _make_gc_executor(norm_sim)

with mock.patch.object(GCSession, "_create_executor", return_value=mock_executor):
H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, libE_specs=libE_specs)

assert flag == 0, f"libEnsemble exited with unexpected flag {flag}"
assert (
np.sum(H["sim_ended"]) >= SIM_MAX
), f"Expected at least {SIM_MAX} completed sims, got {np.sum(H['sim_ended'])}"

completed = H[H["sim_ended"]]
assert len(completed) >= SIM_MAX
assert np.all(completed["f"] >= 0.0), "Unexpected negative norm value"
assert (
mock_executor.submit_to_registered_function.call_count >= SIM_MAX
), f"Expected at least {SIM_MAX} GC submissions, got {mock_executor.submit_to_registered_function.call_count}"

print(f"\nGC-only mode: {np.sum(H['sim_ended'])} sims completed via mocked Globus Compute.")
print(f"Best f value: {completed['f'].min():.6f}")
print("\nlibEnsemble GC-only functionality test passed.")
Loading
Loading