diff --git a/libensemble/libE.py b/libensemble/libE.py index 219e2cd8c..63eb7eb2f 100644 --- a/libensemble/libE.py +++ b/libensemble/libE.py @@ -262,6 +262,18 @@ def libE( libE_funcs = {"mpi": libE_mpi, "tcp": libE_tcp, "local": libE_local, "threads": libE_local} + if sim_specs.get("globus_compute_endpoint"): + libE_specs["_gc_only"] = True + if libE_specs.get("gen_on_worker"): + logger.info("GC-only mode: gen_on_worker is ignored (generator runs on manager)") + libE_specs["gen_on_worker"] = False + if libE_specs.get("comms", "mpi") != "local": + libE_specs["comms"] = "local" + logger.info("GC-only mode: switching to local comms (no workers needed)") + if not libE_specs.get("disable_resource_manager"): + libE_specs["disable_resource_manager"] = True + logger.info("GC-only mode: disabling resource manager (no local nodes)") + Resources.init_resources(libE_specs, platform_info) if Executor.executor is not None: Executor.executor.add_platform_info(platform_info) diff --git a/libensemble/manager.py b/libensemble/manager.py index 7995d2da9..a455e96f7 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -3,8 +3,10 @@ ============================ """ +import concurrent.futures import cProfile import glob +import inspect import logging import os import platform @@ -30,11 +32,14 @@ MAN_SIGNAL_KILL, PERSIS_STOP, STOP_TAG, + TASK_FAILED, + WORKER_DONE, calc_type_strings, ) from libensemble.resources.resources import Resources from libensemble.tools.fields_keys import protected_libE_fields from libensemble.tools.tools import _USER_CALC_DIR_WARNING +from libensemble.utils.globus_compute import GCSession from libensemble.utils.misc import _WorkerIndexer, extract_H_ranges from libensemble.utils.output_directory import EnsembleDirectory from libensemble.utils.timer import Timer @@ -243,6 +248,15 @@ def __init__( local_worker_comm = self._run_additional_worker(hist, sim_specs, gen_specs, libE_specs) self.wcomms = [local_worker_comm] + self.wcomms + if libE_specs.get("_gc_only"): + n_virtual = libE_specs.get("nworkers", 1) or 1 + virtual_W = np.zeros(n_virtual, dtype=Manager.worker_dtype) + start_id = len(self.W) + virtual_W["worker_id"] = np.arange(start_id, start_id + n_virtual) + virtual_W["gen_worker"] = False + self.W = np.concatenate([self.W, virtual_W]) + self.wcomms = self.wcomms + [None] * n_virtual + self.W = _WorkerIndexer(self.W, 1 - gen_on_worker) # if gen on worker, then no additional worker self.wcomms = _WorkerIndexer(self.wcomms, 1 - gen_on_worker) @@ -574,9 +588,18 @@ def _kill_cancelled_sims(self) -> None: logger.debug(f"Manager sending kill signals to H indices {kill_sim_rows}") kill_ids = self.hist.H["sim_id"][kill_sim_rows] kill_on_workers = self.hist.H["sim_worker"][kill_sim_rows] - for w in kill_on_workers: - self.wcomms[w].send(STOP_TAG, MAN_SIGNAL_KILL) - self.hist.H["kill_sent"][kill_ids] = True + + if self.libE_specs.get("_gc_only"): + sim_ids_to_kill = set(kill_ids) + for future, (sim_id, w) in list(self._gc_futures.items()): + if sim_id in sim_ids_to_kill: + future.cancel() + del self._gc_futures[future] + self.hist.H["kill_sent"][kill_ids] = True + else: + for w in kill_on_workers: + self.wcomms[w].send(STOP_TAG, MAN_SIGNAL_KILL) + self.hist.H["kill_sent"][kill_ids] = True # --- Handle termination @@ -692,6 +715,180 @@ def _alloc_work(self, H: npt.NDArray, persis_info: dict) -> dict: return output + def _init_gc(self) -> None: + endpoint = self.sim_specs.get("globus_compute_endpoint", "") + if not endpoint: + raise ValueError("_gc_only mode requires globus_compute_endpoint in sim_specs") + + self._gc_executor, self._gc_fid = GCSession.get_or_create(endpoint, self.sim_specs["sim_f"]) + self._gc_futures: dict[concurrent.futures.Future, tuple[int, int]] = {} + self._gc_nparams = len(inspect.signature(self.sim_specs["sim_f"]).parameters) + + def _gc_submit(self, Work: dict, w: int) -> None: + sim_ids = Work["libE_info"]["H_rows"] + H_fields = Work["H_fields"] + + for sim_id in sim_ids: + calc_in = np.empty(1, dtype=[(name, self.hist.H.dtype.fields[name][0]) for name in H_fields]) + for name in H_fields: + calc_in[name] = self.hist.H[name][sim_id] + + p_info = Work.get("persis_info", {}) + + libE_info = dict(Work["libE_info"]) + libE_info["comm"] = None + libE_info.pop("executor", None) + + args = [calc_in, p_info, self.sim_specs, libE_info][: self._gc_nparams] + future = self._gc_executor.submit_to_registered_function(self._gc_fid, args) + self._gc_futures[future] = (sim_id, w) + + @staticmethod + def _normalize_gc_result(result): + if isinstance(result, (tuple, list)): + if len(result) >= 3: + return result[0], result[1], result[2] + if len(result) == 2: + if isinstance(result[1], (int, str)): + return result[0], {}, result[1] + return result[0], result[1], WORKER_DONE + return result[0], {}, WORKER_DONE + return result, {}, WORKER_DONE + + def _gather_gc_results(self, persis_info: dict) -> dict: + time.sleep(0.0001) + + new_stuff = True + while new_stuff: + new_stuff = False + for w in self.W["worker_id"]: + if self.wcomms[w] is not None and self.wcomms[w].mail_flag(): + new_stuff = True + self._handle_msg_from_worker(persis_info, w) + + done = [f for f in self._gc_futures if f.done()] + for future in done: + sim_id, w = self._gc_futures.pop(future) + try: + out, p_info, calc_status = self._normalize_gc_result(future.result()) + D_recv = { + "calc_type": EVAL_SIM_TAG, + "calc_status": calc_status, + "calc_out": out, + "libE_info": {"keep_state": False, "H_rows": [sim_id]}, + "persis_info": p_info or {}, + } + except Exception as e: + logger.error(f"GC task failed for sim_id {sim_id}: {e}") + D_recv = { + "calc_type": EVAL_SIM_TAG, + "calc_status": TASK_FAILED, + "calc_out": None, + "libE_info": {"keep_state": False, "H_rows": [sim_id]}, + "persis_info": {}, + } + self._update_state_on_worker_msg(persis_info, D_recv, w) + + self._init_every_k_save() + return persis_info + + def _gc_cancel_futures(self) -> None: + for future in list(self._gc_futures.keys()): + future.cancel() + self._gc_futures.clear() + + def _run_gc_only(self, persis_info: dict) -> tuple[dict, int, int]: + self._init_gc() + try: + while not self.term_test(): + self._kill_cancelled_sims() + persis_info = self._gather_gc_results(persis_info) + Work, persis_info, flag = self._alloc_work(self.hist.trim_H(), persis_info) + if flag: + break + + for w in Work: + if self._sim_max_given(): + break + self._check_work_order(Work[w], w) + if Work[w]["tag"] == EVAL_GEN_TAG: + self._send_work_order(Work[w], w) + else: + self._gc_submit(Work[w], w) + self._update_state_on_alloc(Work[w], w) + + assert self.term_test() or any( + self.W["active"] != 0 + ), "alloc_f did not return any work, although all workers are idle." + except WorkerException as e: + report_worker_exc(e) + raise LoggedException(e.args[0], e.args[1]) from None + except Exception as e: + logger.error(traceback.format_exc()) + raise LoggedException(e.args) from None + finally: + result = self._gc_final_receive_and_kill(persis_info) + self.wcomms = [] + sys.stdout.flush() + sys.stderr.flush() + return result + + def _gc_final_receive_and_kill(self, persis_info: dict) -> tuple[dict, int, int]: + if any(self.W["persis_state"]): + for w in self.W["worker_id"][self.W["persis_state"] > 0]: + logger.debug(f"Manager sending PERSIS_STOP to worker {w}") + if self.libE_specs.get("final_gen_send", False): + rows_to_send = np.where(self.hist.H["sim_ended"] & ~self.hist.H["gen_informed"])[0] + work = { + "H_fields": self.gen_specs["persis_in"], + "persis_info": persis_info.get(w), + "tag": PERSIS_STOP, + "libE_info": {"persistent": True, "H_rows": rows_to_send}, + } + self._check_work_order(work, w, force=True) + self._send_work_order(work, w) + self.hist.update_history_to_gen(rows_to_send) + else: + self.wcomms[w].send(PERSIS_STOP, MAN_SIGNAL_KILL) + if not self.W[w]["active"]: + self.W[w]["active"] = self.W[w]["persis_state"] + self.persis_pending.append(w) + + exit_flag = 0 + while self._gc_futures: + done = [f for f in self._gc_futures if f.done()] + if not done: + time.sleep(0.1) + continue + for future in done: + sim_id, w = self._gc_futures.pop(future) + try: + out, p_info, calc_status = self._normalize_gc_result(future.result()) + D_recv = { + "calc_type": EVAL_SIM_TAG, + "calc_status": calc_status, + "calc_out": out, + "libE_info": {"keep_state": False, "H_rows": [sim_id]}, + "persis_info": p_info or {}, + } + except Exception: + continue + self._update_state_on_worker_msg(persis_info, D_recv, w) + + if 0 in self.W["worker_id"] and self.wcomms[0] is not None: + while self.wcomms[0].mail_flag(): + self._handle_msg_from_worker(persis_info, 0) + self.wcomms[0].send(STOP_TAG, MAN_SIGNAL_FINISH) + + self._init_every_k_save(complete=True) + self._clean_up_thread() + + if self.live_data is not None: + self.live_data.finalize(self.hist) + + persis_info["num_gens_started"] = 0 + return persis_info, exit_flag, self.elapsed() + # --- Main loop def run(self, persis_info: dict) -> tuple[dict, int, int]: @@ -699,6 +896,9 @@ def run(self, persis_info: dict) -> tuple[dict, int, int]: logger.debug(f"Manager initiated on node {socket.gethostname()}") logger.info(f"Manager exit_criteria: {self.exit_criteria}") + if self.libE_specs.get("_gc_only"): + return self._run_gc_only(persis_info) + # Continue receiving and giving until termination test is satisfied try: while not self.term_test(): diff --git a/libensemble/tests/functionality_tests/test_gc_manager_submit.py b/libensemble/tests/functionality_tests/test_gc_manager_submit.py new file mode 100644 index 000000000..40b76e568 --- /dev/null +++ b/libensemble/tests/functionality_tests/test_gc_manager_submit.py @@ -0,0 +1,112 @@ +""" +Tests libEnsemble's manager-side Globus Compute submission (GC-only mode). + +The manager submits simulation work directly to a mocked Globus Compute +endpoint instead of dispatching to local workers. The generator runs on +the manager thread as normal. + +Execute via: + python test_gc_manager_submit.py + +No MPI or local workers are needed -- GC-only mode uses local comms with +nworkers acting as the maximum number of concurrent in-flight GC futures. +""" + +# Do not change these lines - they are parsed by run-tests.sh +# TESTSUITE_COMMS: local +# TESTSUITE_NPROCS: 1 + +import concurrent.futures +from unittest import mock + +import numpy as np +from gest_api.vocs import VOCS + +from libensemble.gen_classes.sampling import UniformSample +from libensemble.libE import libE +from libensemble.specs import ExitCriteria, GenSpecs, LibeSpecs, SimSpecs +from libensemble.utils.globus_compute import GCSession + + +def norm_sim(H, persis_info, sim_specs, libE_info): + """Evaluate the Euclidean norm of each input point.""" + H_o = np.zeros(len(H), dtype=sim_specs["out"]) + for i in range(len(H)): + H_o["f"][i] = float(np.linalg.norm(H["x"][i])) + return H_o, persis_info + + +def _make_done_future(value): + f = concurrent.futures.Future() + f.set_result(value) + return f + + +def _make_gc_executor(sim_f): + executor = mock.MagicMock() + executor.register_function.return_value = "mock-fid" + + def fake_submit(fid, args): + result = sim_f(*args) + return _make_done_future(result) + + executor.submit_to_registered_function.side_effect = fake_submit + return executor + + +if __name__ == "__main__": + GCSession.clear() + + ENDPOINT = "mock-endpoint-uuid" + SIM_MAX = 20 + N_VIRTUAL_WORKERS = 4 + + vocs = VOCS( + variables={"x0": [-3.0, 3.0], "x1": [-2.0, 2.0]}, + objectives={"f": "MINIMIZE"}, + ) + + sim_specs = SimSpecs( + sim_f=norm_sim, + inputs=["x"], + outputs=[("f", float)], + globus_compute_endpoint=ENDPOINT, + ) + + gen_specs = GenSpecs( + generator=UniformSample(vocs), + inputs=["sim_id"], + persis_in=["f", "sim_id"], + outputs=[("x", float, (2,))], + batch_size=N_VIRTUAL_WORKERS, + ) + + libE_specs = LibeSpecs( + nworkers=N_VIRTUAL_WORKERS, + comms="local", + disable_log_files=True, + safe_mode=False, + ) + + exit_criteria = ExitCriteria(sim_max=SIM_MAX) + + mock_executor = _make_gc_executor(norm_sim) + + with mock.patch.object(GCSession, "_create_executor", return_value=mock_executor): + H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, libE_specs=libE_specs) + + assert flag == 0, f"libEnsemble exited with unexpected flag {flag}" + assert ( + np.sum(H["sim_ended"]) >= SIM_MAX + ), f"Expected at least {SIM_MAX} completed sims, got {np.sum(H['sim_ended'])}" + + completed = H[H["sim_ended"]] + assert len(completed) >= SIM_MAX + assert np.all(completed["f"] >= 0.0), "Unexpected negative norm value" + assert ( + mock_executor.submit_to_registered_function.call_count >= SIM_MAX + ), f"Expected at least {SIM_MAX} GC submissions, got {mock_executor.submit_to_registered_function.call_count}" + + print(f"\nGC-only mode: {np.sum(H['sim_ended'])} sims completed via mocked Globus Compute.") + print(f"Best f value: {completed['f'].min():.6f}") + print("\nlibEnsemble GC-only functionality test passed.") diff --git a/libensemble/tests/unit_tests/test_ufunc_runners.py b/libensemble/tests/unit_tests/test_ufunc_runners.py index 79fda7c28..8765e077a 100644 --- a/libensemble/tests/unit_tests/test_ufunc_runners.py +++ b/libensemble/tests/unit_tests/test_ufunc_runners.py @@ -1,6 +1,4 @@ -import mock import numpy as np -import pytest import libensemble.tests.unit_tests.setup as setup from libensemble.tools.fields_keys import libE_fields @@ -68,75 +66,7 @@ def tupilize(arg1, arg2): assert result == (calc_in, {}) -@pytest.mark.extra -def test_globus_compute_runner_init(): - calc_in, sim_specs, gen_specs = get_ufunc_args() - - sim_specs["globus_compute_endpoint"] = "1234" - - with mock.patch("globus_compute_sdk.Executor"): - runner = Runner.from_specs(sim_specs) - - assert hasattr( - runner, "globus_compute_executor" - ), "Globus ComputeExecutor should have been instantiated when globus_compute_endpoint found in specs" - - -@pytest.mark.extra -def test_globus_compute_runner_pass(): - calc_in, sim_specs, gen_specs = get_ufunc_args() - - sim_specs["globus_compute_endpoint"] = "1234" - - with mock.patch("globus_compute_sdk.Executor"): - runner = Runner.from_specs(sim_specs) - - # Creating Mock Globus ComputeExecutor and Globus Compute future object - no exception - globus_compute_mock = mock.Mock() - globus_compute_future = mock.Mock() - globus_compute_mock.submit_to_registered_function.return_value = globus_compute_future - globus_compute_future.exception.return_value = None - globus_compute_future.result.return_value = (True, True) - - runner.globus_compute_executor = globus_compute_mock - runners = {1: runner.run} - - libE_info = {"H_rows": np.array([2, 3, 4]), "workerID": 1, "comm": "fakecomm"} - - out, persis_info = runners[1](calc_in, {"libE_info": libE_info, "persis_info": {}, "tag": 1}) - - assert all([out, persis_info]), "Globus Compute runner correctly returned results" - - -@pytest.mark.extra -def test_globus_compute_runner_fail(): - calc_in, sim_specs, gen_specs = get_ufunc_args() - - sim_specs["globus_compute_endpoint"] = "4321" - - with mock.patch("globus_compute_sdk.Executor"): - runner = Runner.from_specs(sim_specs) - - # Creating Mock Globus ComputeExecutor and Globus Compute future object - yes exception - globus_compute_mock = mock.Mock() - globus_compute_future = mock.Mock() - globus_compute_mock.submit_to_registered_function.return_value = globus_compute_future - globus_compute_future.exception.return_value = Exception - - runner.globus_compute_executor = globus_compute_mock - runners = {1: runner.run} - - libE_info = {"H_rows": np.array([2, 3, 4]), "workerID": 1, "comm": "fakecomm"} - - with pytest.raises(Exception): - out, persis_info = runners[1](calc_in, {"libE_info": libE_info, "persis_info": {}, "tag": 1}) - pytest.fail("Expected exception") - - if __name__ == "__main__": test_normal_runners() test_thread_runners() test_persis_info_from_none() - test_globus_compute_runner_init() - test_globus_compute_runner_pass() - test_globus_compute_runner_fail() diff --git a/libensemble/utils/runners.py b/libensemble/utils/runners.py index 72f8d1d4f..45f99435d 100644 --- a/libensemble/utils/runners.py +++ b/libensemble/utils/runners.py @@ -17,8 +17,6 @@ class Runner: @classmethod def from_specs(cls, specs): - if len(specs.get("globus_compute_endpoint", "")) > 0: - return GlobusComputeRunner(specs) if specs.get("threaded"): return ThreadRunner(specs) if (generator := specs.get("generator")) is not None: @@ -67,36 +65,6 @@ def run(self, calc_in: npt.NDArray, Work: dict) -> (npt.NDArray, dict, int | Non return out -class GlobusComputeRunner(Runner): - def __init__(self, specs): - super().__init__(specs) - self.globus_compute_executor = self._get_globus_compute_executor()(endpoint_id=specs["globus_compute_endpoint"]) - self.globus_compute_fid = self.globus_compute_executor.register_function(self.f) - - def _get_globus_compute_executor(self): - try: - from globus_compute_sdk import Executor - except ModuleNotFoundError: - logger.warning("Globus Compute use detected but Globus Compute not importable. Is it installed?") - logger.warning("Running function evaluations normally on local resources.") - return None - else: - return Executor - - def _result(self, calc_in: npt.NDArray, persis_info: dict, libE_info: dict) -> (npt.NDArray, dict, int | None): - from libensemble.worker import Worker - - libE_info["comm"] = None # 'comm' object not pickle-able - Worker._set_executor(0, None) # ditto for executor - - args = self._truncate_args(calc_in, persis_info, libE_info) - task_fut = self.globus_compute_executor.submit_to_registered_function(self.globus_compute_fid, args) - return task_fut.result() - - def shutdown(self) -> None: - self.globus_compute_executor.shutdown() - - class ThreadRunner(Runner): def __init__(self, specs): super().__init__(specs)