diff --git a/.github/workflows/extra.yml b/.github/workflows/extra.yml index f024a08ea..cdc399644 100644 --- a/.github/workflows/extra.yml +++ b/.github/workflows/extra.yml @@ -103,7 +103,7 @@ jobs: conda install numpy scipy conda install -c conda-forge pytorch-cpu pip install --upgrade-strategy=only-if-needed git+https://github.com/xopt-org/xopt.git@generator_standard - pip install --no-deps git+https://github.com/optimas-org/optimas.git@main + pip install --no-deps git+https://github.com/optimas-org/optimas.git@multitask_uses_id - name: Remove test using octave, gpcam on Python 3.13 if: matrix.python-version >= '3.13' diff --git a/libensemble/manager.py b/libensemble/manager.py index 97f8f8225..22ae8b5d3 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -410,6 +410,14 @@ def _freeup_resources(self, w: int) -> None: if self.resources: self.resources.resource_manager.free_rsets(w) + def _ensure_sim_id_in_persis_in(self, D: npt.NDArray) -> None: + """Add sim_id to gen_specs persis_in if generator output contains sim_id (gest-api style generators only)""" + if self.gen_specs.get("generator") and len(D) > 0 and "sim_id" in D.dtype.names: + if "persis_in" not in self.gen_specs: + self.gen_specs["persis_in"] = [] + if "sim_id" not in self.gen_specs["persis_in"]: + self.gen_specs["persis_in"].append("sim_id") + def _send_work_order(self, Work: dict, w: int) -> None: """Sends an allocation function order to a worker""" logger.debug(f"Manager sending work unit to worker {w}") @@ -483,6 +491,7 @@ def _update_state_on_worker_msg(self, persis_info: dict, D_recv: dict, w: int) - final_data = D_recv.get("calc_out", None) if isinstance(final_data, np.ndarray): if calc_status is FINISHED_PERSISTENT_GEN_TAG and self.libE_specs.get("use_persis_return_gen", False): + self._ensure_sim_id_in_persis_in(final_data) self.hist.update_history_x_in(w, final_data, self.W[w]["gen_started_time"]) elif calc_status is FINISHED_PERSISTENT_SIM_TAG and self.libE_specs.get("use_persis_return_sim", False): self.hist.update_history_f(D_recv, self.kill_canceled_sims) @@ -500,7 +509,9 @@ def _update_state_on_worker_msg(self, persis_info: dict, D_recv: dict, w: int) - if calc_type == EVAL_SIM_TAG: self.hist.update_history_f(D_recv, self.kill_canceled_sims) if calc_type == EVAL_GEN_TAG: - self.hist.update_history_x_in(w, D_recv["calc_out"], self.W[w]["gen_started_time"]) + D = D_recv["calc_out"] + self._ensure_sim_id_in_persis_in(D) + self.hist.update_history_x_in(w, D, self.W[w]["gen_started_time"]) assert ( len(D_recv["calc_out"]) or np.any(self.W["active"]) or self.W[w]["persis_state"] ), "Gen must return work when is is the only thing active and not persistent." diff --git a/libensemble/specs.py b/libensemble/specs.py index dac1baae4..7d9ec92ae 100644 --- a/libensemble/specs.py +++ b/libensemble/specs.py @@ -247,6 +247,11 @@ def set_fields_from_vocs(self): persis_in_fields.extend(list(obj.keys())) self.persis_in = persis_in_fields + # Set inputs: same as persis_in for gest-api generators (needed for H0 ingestion) + if not self.inputs and self.generator is not None: + self.inputs = self.persis_in + print(f"inputs: {self.inputs}") + # Set outputs: variables + constants (what the generator produces) if not self.outputs: out_fields = [] diff --git a/libensemble/tests/regression_tests/test_optimas_ax_mf.py b/libensemble/tests/regression_tests/test_optimas_ax_mf.py new file mode 100644 index 000000000..b6f43b3ed --- /dev/null +++ b/libensemble/tests/regression_tests/test_optimas_ax_mf.py @@ -0,0 +1,84 @@ +""" +Tests libEnsemble with Optimas Multi-Fidelity Ax Generator + +*****currently fixing nworkers to batch_size***** + +Execute via one of the following commands (e.g. 4 workers): + mpiexec -np 5 python test_optimas_ax_mf.py + python test_optimas_ax_mf.py -n 4 + +When running with the above commands, the number of concurrent evaluations of +the objective function will be 4 as the generator is on the manager. + +""" + +# Do not change these lines - they are parsed by run-tests.sh +# TESTSUITE_COMMS: mpi local +# TESTSUITE_NPROCS: 4 +# TESTSUITE_EXTRA: true + +import numpy as np + +from gest_api.vocs import VOCS +from optimas.generators import AxMultiFidelityGenerator + +from libensemble import Ensemble +from libensemble.alloc_funcs.start_only_persistent import only_persistent_gens as alloc_f +from libensemble.specs import AllocSpecs, ExitCriteria, GenSpecs, LibeSpecs, SimSpecs + + +def eval_func_mf(input_params): + """Evaluation function for multifidelity test.""" + x0 = input_params["x0"] + x1 = input_params["x1"] + resolution = input_params["res"] + result = -( + (x0 + 10 * np.cos(x0 + 0.1 * resolution)) + * (x1 + 5 * np.cos(x1 - 0.2 * resolution)) + ) + return {"f": result} + + +# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). +if __name__ == "__main__": + + n = 2 + batch_size = 2 + + libE_specs = LibeSpecs(gen_on_manager=True, nworkers=batch_size) + + vocs = VOCS( + variables={"x0": [-50.0, 5.0], "x1": [-5.0, 15.0], "res": [1.0, 8.0]}, + objectives={"f": "MAXIMIZE"}, + ) + + gen = AxMultiFidelityGenerator(vocs=vocs) + + gen_specs = GenSpecs( + generator=gen, + batch_size=batch_size, + vocs=vocs, + ) + + sim_specs = SimSpecs( + simulator=eval_func_mf, + vocs=vocs, + ) + + alloc_specs = AllocSpecs(alloc_f=alloc_f) + exit_criteria = ExitCriteria(sim_max=6) + + workflow = Ensemble( + libE_specs=libE_specs, + sim_specs=sim_specs, + alloc_specs=alloc_specs, + gen_specs=gen_specs, + exit_criteria=exit_criteria, + ) + + H, _, _ = workflow.run() + + # Perform the run + if workflow.is_manager: + workflow.save_output(__file__) + print(f"Completed {len(H)} simulations") diff --git a/libensemble/tests/regression_tests/test_optimas_ax_multitask.py b/libensemble/tests/regression_tests/test_optimas_ax_multitask.py new file mode 100644 index 000000000..bc419b2bd --- /dev/null +++ b/libensemble/tests/regression_tests/test_optimas_ax_multitask.py @@ -0,0 +1,103 @@ +""" +Tests libEnsemble with Optimas Multitask Ax Generator + +Runs an initial ensemble, followed by another using the first as an H0. + +*****currently fixing nworkers to batch_size***** + +Execute via one of the following commands (e.g. 4 workers): + mpiexec -np 5 python test_optimas_ax.py + python test_optimas_ax.py -n 4 + +When running with the above commands, the number of concurrent evaluations of +the objective function will be 4 as the generator is on the manager. + +""" + +# Do not change these lines - they are parsed by run-tests.sh +# TESTSUITE_COMMS: mpi local +# TESTSUITE_NPROCS: 4 +# TESTSUITE_EXTRA: true + +import numpy as np +from gest_api.vocs import VOCS + +from optimas.core import Task +from optimas.generators import AxMultitaskGenerator + +from libensemble import Ensemble +from libensemble.alloc_funcs.start_only_persistent import only_persistent_gens as alloc_f +from libensemble.specs import AllocSpecs, ExitCriteria, GenSpecs, LibeSpecs, SimSpecs + + +def eval_func_multitask(input_params): + """Evaluation function for task1 or task2 in multitask test""" + print(f'input_params: {input_params}') + x0 = input_params["x0"] + x1 = input_params["x1"] + trial_type = input_params["trial_type"] + + if trial_type == "task_1": + result = -(x0 + 10 * np.cos(x0)) * (x1 + 5 * np.cos(x1)) + else: + result = -0.5 * (x0 + 10 * np.cos(x0)) * (x1 + 5 * np.cos(x1)) + + output_params = {"f": result} + return output_params + + +# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). +if __name__ == "__main__": + + n = 2 + batch_size = 2 + + libE_specs = LibeSpecs(gen_on_manager=True, nworkers=batch_size) + + vocs = VOCS( + variables={ + "x0": [-50.0, 5.0], + "x1": [-5.0, 15.0], + "trial_type": {"task_1", "task_2"}, + }, + objectives={"f": "MAXIMIZE"}, + ) + + sim_specs = SimSpecs( + simulator=eval_func_multitask, + vocs=vocs, + ) + + alloc_specs = AllocSpecs(alloc_f=alloc_f) + exit_criteria = ExitCriteria(sim_max=15) + + H0 = None + for run_num in range(2): + task1 = Task("task_1", n_init=2, n_opt=1) + task2 = Task("task_2", n_init=5, n_opt=3) + gen = AxMultitaskGenerator(vocs=vocs, hifi_task=task1, lofi_task=task2) + + gen_specs = GenSpecs( + generator=gen, + batch_size=batch_size, + vocs=vocs, + ) + + workflow = Ensemble( + libE_specs=libE_specs, + sim_specs=sim_specs, + alloc_specs=alloc_specs, + gen_specs=gen_specs, + exit_criteria=exit_criteria, + H0=H0, + ) + + H, _, _ = workflow.run() + + if run_num == 0: + H0 = H + + if workflow.is_manager: + if run_num == 1: + workflow.save_output("multitask_with_H0") + print(f"Second run completed: {len(H)} simulations") diff --git a/libensemble/tests/regression_tests/test_optimas_ax_sf.py b/libensemble/tests/regression_tests/test_optimas_ax_sf.py new file mode 100644 index 000000000..ba0b66c29 --- /dev/null +++ b/libensemble/tests/regression_tests/test_optimas_ax_sf.py @@ -0,0 +1,83 @@ +""" +Tests libEnsemble with Optimas Single-Fidelity Ax Generator + +*****currently fixing nworkers to batch_size***** + +Execute via one of the following commands (e.g. 4 workers): + mpiexec -np 5 python test_optimas_ax_sf.py + python test_optimas_ax_sf.py -n 4 + +When running with the above commands, the number of concurrent evaluations of +the objective function will be 4 as the generator is on the manager. + +""" + +# Do not change these lines - they are parsed by run-tests.sh +# TESTSUITE_COMMS: mpi local +# TESTSUITE_NPROCS: 4 +# TESTSUITE_EXTRA: true + +import numpy as np + +from gest_api.vocs import VOCS +from optimas.generators import AxSingleFidelityGenerator + +from libensemble import Ensemble +from libensemble.alloc_funcs.start_only_persistent import only_persistent_gens as alloc_f +from libensemble.specs import AllocSpecs, ExitCriteria, GenSpecs, LibeSpecs, SimSpecs + + +def eval_func_sf(input_params): + """Evaluation function for single-fidelity test. """ + x0 = input_params["x0"] + x1 = input_params["x1"] + result = -(x0 + 10 * np.cos(x0)) * (x1 + 5 * np.cos(x1)) + return {"f": result} + + +# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). +if __name__ == "__main__": + + n = 2 + batch_size = 2 + + libE_specs = LibeSpecs(gen_on_manager=True, nworkers=batch_size) + + vocs = VOCS( + variables={ + "x0": [-50.0, 5.0], + "x1": [-5.0, 15.0], + }, + objectives={"f": "MAXIMIZE"}, + ) + + gen = AxSingleFidelityGenerator(vocs=vocs) + + gen_specs = GenSpecs( + generator=gen, + batch_size=batch_size, + vocs=vocs, + ) + + sim_specs = SimSpecs( + simulator=eval_func_sf, + vocs=vocs, + ) + + alloc_specs = AllocSpecs(alloc_f=alloc_f) + exit_criteria = ExitCriteria(sim_max=10) + + workflow = Ensemble( + libE_specs=libE_specs, + sim_specs=sim_specs, + alloc_specs=alloc_specs, + gen_specs=gen_specs, + exit_criteria=exit_criteria, + ) + + H, _, _ = workflow.run() + + # Perform the run + if workflow.is_manager: + workflow.save_output(__file__) + print(f"Completed {len(H)} simulations") diff --git a/libensemble/utils/misc.py b/libensemble/utils/misc.py index 47823e281..3d94ff7d7 100644 --- a/libensemble/utils/misc.py +++ b/libensemble/utils/misc.py @@ -124,22 +124,29 @@ def _pack_field(input_dict: dict, field_names: list) -> tuple: def list_dicts_to_np(list_dicts: list, dtype: list = None, mapping: dict = {}) -> npt.NDArray: + """Convert list of dicts to numpy structured array""" if list_dicts is None: return None - if not isinstance(list_dicts, list): # presumably already a numpy array, conversion not necessary + if not isinstance(list_dicts, list): return list_dicts + if not list_dicts: + return np.array([], dtype=dtype if dtype else []) + # first entry is used to determine dtype first = list_dicts[0] + if "_id" in first and "sim_id" not in mapping: + mapping["sim_id"] = ["_id"] + # build a presumptive dtype new_dtype_names = _get_new_dtype_fields(first, mapping) combinable_names = _get_combinable_multidim_names(first, new_dtype_names) # [['x0', 'x1'], ['z']] if ( dtype is None - ): # rather roundabout. I believe default value gets set upon function instantiation. (default is mutable!) + ): # Default value gets set upon function instantiation (default is mutable). dtype = [] # build dtype of non-mapped fields. appending onto empty dtype @@ -148,9 +155,11 @@ def list_dicts_to_np(list_dicts: list, dtype: list = None, mapping: dict = {}) - # append dtype of mapped float fields if len(mapping): + existing_names = [f[0] for f in dtype] for name in mapping: - size = len(mapping[name]) - dtype.append(_decide_dtype(name, 0.0, size)) # float + if name not in existing_names: + size = len(mapping[name]) + dtype.append(_decide_dtype(name, 0.0, size)) # default to float out = np.zeros(len(list_dicts), dtype=dtype) @@ -161,6 +170,7 @@ def list_dicts_to_np(list_dicts: list, dtype: list = None, mapping: dict = {}) - out[output_name][j] = _pack_field(input_dict, input_names) else: out[output_name][j] = _pack_field(input_dict, mapping[output_name]) + return out @@ -215,6 +225,7 @@ def unmap_numpy_array(array: npt.NDArray, mapping: dict = {}) -> npt.NDArray: def np_to_list_dicts(array: npt.NDArray, mapping: dict = {}) -> List[dict]: + """Convert numpy structured array to list of dicts""" if array is None: return None out = [] diff --git a/libensemble/utils/runners.py b/libensemble/utils/runners.py index b0c78a7bc..0d96b099b 100644 --- a/libensemble/utils/runners.py +++ b/libensemble/utils/runners.py @@ -121,6 +121,9 @@ def _get_points_updates(self, batch_size: int) -> (npt.NDArray, npt.NDArray): def _convert_ingest(self, x: npt.NDArray) -> list: self.gen.ingest(np_to_list_dicts(x)) + def _convert_initial_ingest(self, x: npt.NDArray) -> list: + self.gen.ingest(np_to_list_dicts(x, mapping=getattr(self.gen, "variables_mapping", {}))) + def _loop_over_gen(self, tag, Work, H_in): """Interact with suggest/ingest generator that *does not* contain a background thread""" while tag not in [PERSIS_STOP, STOP_TAG]: @@ -139,12 +142,17 @@ def _get_initial_suggest(self, libE_info) -> npt.NDArray: def _start_generator_loop(self, tag, Work, H_in): """Start the generator loop after choosing best way of giving initial results to gen""" - self.gen.ingest(np_to_list_dicts(H_in, mapping=getattr(self.gen, "variables_mapping", {}))) + self._convert_initial_ingest(H_in) return self._loop_over_gen(tag, Work, H_in) def _persistent_result(self, calc_in, persis_info, libE_info): """Setup comms with manager, setup gen, loop gen to completion, return gen's results""" self.ps = PersistentSupport(libE_info, EVAL_GEN_TAG) + + # If H0 exists, ingest it into the generator before initial suggest + if calc_in is not None and len(calc_in) > 0: + self._convert_initial_ingest(calc_in) + # libE gens will hit the following line, but list_dicts_to_np will passthrough if the output is a numpy array H_out = list_dicts_to_np( self._get_initial_suggest(libE_info), @@ -182,10 +190,8 @@ def _get_points_updates(self, batch_size: int) -> (npt.NDArray, list): def _convert_ingest(self, x: npt.NDArray) -> list: self.gen.ingest_numpy(x) - def _start_generator_loop(self, tag, Work, H_in) -> npt.NDArray: - """Start the generator loop after choosing best way of giving initial results to gen""" - self.gen.ingest_numpy(H_in) - return self._loop_over_gen(tag, Work, H_in) # see parent class + def _convert_initial_ingest(self, x: npt.NDArray) -> list: + self.gen.ingest_numpy(x) class LibensembleGenThreadRunner(StandardGenRunner):