From aed9c4ec5a6827e2e7f5e98eb7b568956952177c Mon Sep 17 00:00:00 2001 From: jlnav Date: Thu, 26 Jun 2025 17:00:58 -0500 Subject: [PATCH 01/28] initial commit, experimenting with pickling non-libE_field fields (x or f) out of a given update-index to file. --- libensemble/history.py | 20 ++++++++++++++++++++ pyproject.toml | 2 +- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/libensemble/history.py b/libensemble/history.py index 3d3b5bc6fd..04828b8a0b 100644 --- a/libensemble/history.py +++ b/libensemble/history.py @@ -1,5 +1,7 @@ import logging +import pickle import time +from pathlib import Path import numpy as np import numpy.typing as npt @@ -114,6 +116,18 @@ def _append_new_fields(self, H_f: npt.NDArray) -> None: H_new[field][: len(self.H)] = self.H[field] self.H = H_new + def _shelf_longrunning_sims(self, cache_file, index): + """Cache any f values that ran for more than a second.""" + if 1: # self.H[index]['sim_ended_time'] - self.H[index]['sim_started_time'] > 1: + try: + cache = pickle.load(cache_file) + except EOFError: + cache = [] + entry = self.H[index] + presumptive_keys_to_cache = [i for i in self.H.dtype.names if i not in [k[0] for k in libE_fields]] + cache.append(entry[presumptive_keys_to_cache]) + pickle.dump(cache, cache_file) + def update_history_f(self, D: dict, kill_canceled_sims: bool = False) -> None: """ Updates the history after points have been evaluated @@ -126,6 +140,10 @@ def update_history_f(self, D: dict, kill_canceled_sims: bool = False) -> None: if returned_H is not None and any([field not in self.H.dtype.names for field in returned_H.dtype.names]): self._append_new_fields(returned_H) + cache_dir = Path.home() / ".libE" + cache_dir.mkdir(parents=True, exist_ok=True) + cache = open(cache_dir / "sims.pickle", "wb+") + for j, ind in enumerate(new_inds): for field in fields: if self.safe_mode: @@ -147,6 +165,8 @@ def update_history_f(self, D: dict, kill_canceled_sims: bool = False) -> None: self.H["sim_ended"][ind] = True self.H["sim_ended_time"][ind] = time.time() self.sim_ended_count += 1 + self._shelf_longrunning_sims(cache, ind) + cache.close() if kill_canceled_sims: for j in range(self.last_ended + 1, np.max(new_inds) + 1): diff --git a/pyproject.toml b/pyproject.toml index 68d5654da3..4be2a32759 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -142,4 +142,4 @@ extend-exclude = ["*.bib", "*.xml", "docs/nitpicky"] disable_error_code = ["import-not-found", "import-untyped"] [dependency-groups] -dev = ["pyenchant", "enchant>=0.0.1,<0.0.2", "flake8-modern-annotations>=1.6.0,<2", "flake8-type-checking>=3.0.0,<4"] +dev = ["pyenchant", "enchant>=0.0.1,<0.0.2", "flake8-modern-annotations>=1.6.0,<2", "flake8-type-checking>=3.0.0,<4", "wat>=0.6.0,<0.7"] From b33dd7b51f0e855e1bc66ca1e23e83399be7b4b4 Mon Sep 17 00:00:00 2001 From: jlnav Date: Thu, 28 Aug 2025 16:50:16 -0500 Subject: [PATCH 02/28] additional poking around and experimenting with history saving cache, and then seeing if values exist in that cache before sending sims... --- libensemble/history.py | 36 ++++++++++++++++++++++-------------- libensemble/manager.py | 8 ++++++++ 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/libensemble/history.py b/libensemble/history.py index 04828b8a0b..f185e8c575 100644 --- a/libensemble/history.py +++ b/libensemble/history.py @@ -1,5 +1,4 @@ import logging -import pickle import time from pathlib import Path @@ -108,6 +107,11 @@ def __init__( self.last_started = -1 self.last_ended = -1 + self.cache_dir = Path.home() / ".libE" + self.cache_dir.mkdir(parents=True, exist_ok=True) + self.cache = open(self.cache_dir / "sims.pickle", "wb+") + self.cache_set = False + def _append_new_fields(self, H_f: npt.NDArray) -> None: dtype_new = np.dtype(list(set(self.H.dtype.descr + H_f.dtype.descr))) H_new = np.zeros(len(self.H), dtype=dtype_new) @@ -116,17 +120,26 @@ def _append_new_fields(self, H_f: npt.NDArray) -> None: H_new[field][: len(self.H)] = self.H[field] self.H = H_new - def _shelf_longrunning_sims(self, cache_file, index): + def _shelf_longrunning_sims(self, index): """Cache any f values that ran for more than a second.""" if 1: # self.H[index]['sim_ended_time'] - self.H[index]['sim_started_time'] > 1: + presumptive_keys_to_cache = [i for i in self.H.dtype.names if i not in [k[0] for k in libE_fields]] + self.new_dtype_cache_keys = [(name, self.H.dtype.fields[name][0]) for name in presumptive_keys_to_cache] try: - cache = pickle.load(cache_file) + in_cache = np.load(self.cache, allow_pickle=True) except EOFError: - cache = [] - entry = self.H[index] - presumptive_keys_to_cache = [i for i in self.H.dtype.names if i not in [k[0] for k in libE_fields]] - cache.append(entry[presumptive_keys_to_cache]) - pickle.dump(cache, cache_file) + in_cache = np.zeros(1, dtype=self.new_dtype_cache_keys) + entry = self.H[index][presumptive_keys_to_cache] + in_cache = np.append(in_cache, entry) + np.save(self.cache, in_cache) + self.cache_set = True + + def get_shelved_sims(self) -> npt.NDArray: + try: + in_cache = np.load(self.cache, allow_pickle=True) + except EOFError: + in_cache = np.zeros(1, dtype=self.new_dtype_cache_keys) + return in_cache def update_history_f(self, D: dict, kill_canceled_sims: bool = False) -> None: """ @@ -140,10 +153,6 @@ def update_history_f(self, D: dict, kill_canceled_sims: bool = False) -> None: if returned_H is not None and any([field not in self.H.dtype.names for field in returned_H.dtype.names]): self._append_new_fields(returned_H) - cache_dir = Path.home() / ".libE" - cache_dir.mkdir(parents=True, exist_ok=True) - cache = open(cache_dir / "sims.pickle", "wb+") - for j, ind in enumerate(new_inds): for field in fields: if self.safe_mode: @@ -165,8 +174,7 @@ def update_history_f(self, D: dict, kill_canceled_sims: bool = False) -> None: self.H["sim_ended"][ind] = True self.H["sim_ended_time"][ind] = time.time() self.sim_ended_count += 1 - self._shelf_longrunning_sims(cache, ind) - cache.close() + self._shelf_longrunning_sims(ind) if kill_canceled_sims: for j in range(self.last_ended + 1, np.max(new_inds) + 1): diff --git a/libensemble/manager.py b/libensemble/manager.py index b12b96a774..00eea0e7d1 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -431,6 +431,14 @@ def _send_work_order(self, Work: dict, w: int) -> None: for i, row in enumerate(work_rows): H_to_be_sent[i] = repack_fields(self.hist.H[Work["H_fields"]][row]) + if Work["tag"] == EVAL_SIM_TAG and self.hist.cache_set: + cached_H = self.hist.get_shelved_sims() + for entry in H_to_be_sent: + if np.allclose(entry[self.hist.new_dtype_cache_keys], cached_H, rtol=1e-8, atol=1e-8): + # probably figure out indexes for entries in H_to_be_sent that + # can simply be read back into History from cache? + pass + self.wcomms[w].send(0, H_to_be_sent) def _update_state_on_alloc(self, Work: dict, w: int): From a020a589a959688df4141b1442a154fa5c34398d Mon Sep 17 00:00:00 2001 From: jlnav Date: Fri, 29 Aug 2025 12:56:51 -0500 Subject: [PATCH 03/28] better making of .npy database, use History attributes created upon the cache being created. iterate over the cache_keys corresponding to those in gen_specs out, then check if they're close to a cache entry. grab those indexes so we can slot in the corresponding data into the manager's H later on --- libensemble/history.py | 22 +++++++++++----------- libensemble/manager.py | 24 +++++++++++++++++------- 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/libensemble/history.py b/libensemble/history.py index f185e8c575..1abb362f85 100644 --- a/libensemble/history.py +++ b/libensemble/history.py @@ -109,7 +109,9 @@ def __init__( self.cache_dir = Path.home() / ".libE" self.cache_dir.mkdir(parents=True, exist_ok=True) - self.cache = open(self.cache_dir / "sims.pickle", "wb+") + self.cache = self.cache_dir / "cache.npy" + if not self.cache.exists(): + self.cache.touch() self.cache_set = False def _append_new_fields(self, H_f: npt.NDArray) -> None: @@ -123,22 +125,20 @@ def _append_new_fields(self, H_f: npt.NDArray) -> None: def _shelf_longrunning_sims(self, index): """Cache any f values that ran for more than a second.""" if 1: # self.H[index]['sim_ended_time'] - self.H[index]['sim_started_time'] > 1: - presumptive_keys_to_cache = [i for i in self.H.dtype.names if i not in [k[0] for k in libE_fields]] - self.new_dtype_cache_keys = [(name, self.H.dtype.fields[name][0]) for name in presumptive_keys_to_cache] + self.cache_keys = sorted([i for i in self.H.dtype.names if i not in [k[0] for k in libE_fields]]) + self.cache_dtype = sorted([(name, self.H.dtype.fields[name][0]) for name in self.cache_keys]) try: in_cache = np.load(self.cache, allow_pickle=True) except EOFError: - in_cache = np.zeros(1, dtype=self.new_dtype_cache_keys) - entry = self.H[index][presumptive_keys_to_cache] - in_cache = np.append(in_cache, entry) - np.save(self.cache, in_cache) + in_cache = np.zeros(1, dtype=self.cache_dtype) + entry = self.H[index][self.cache_keys] + if entry not in in_cache: + in_cache = np.append(in_cache, entry) + np.save(self.cache, in_cache, allow_pickle=True) self.cache_set = True def get_shelved_sims(self) -> npt.NDArray: - try: - in_cache = np.load(self.cache, allow_pickle=True) - except EOFError: - in_cache = np.zeros(1, dtype=self.new_dtype_cache_keys) + in_cache = np.load(self.cache, allow_pickle=True) return in_cache def update_history_f(self, D: dict, kill_canceled_sims: bool = False) -> None: diff --git a/libensemble/manager.py b/libensemble/manager.py index 00eea0e7d1..9a24f213b8 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -410,7 +410,7 @@ def _freeup_resources(self, w: int) -> None: if self.resources: self.resources.resource_manager.free_rsets(w) - def _send_work_order(self, Work: dict, w: int) -> None: + def _send_work_order(self, Work: dict, w: int) -> list: """Sends an allocation function order to a worker""" logger.debug(f"Manager sending work unit to worker {w}") @@ -431,15 +431,24 @@ def _send_work_order(self, Work: dict, w: int) -> None: for i, row in enumerate(work_rows): H_to_be_sent[i] = repack_fields(self.hist.H[Work["H_fields"]][row]) + # check if any of the generated points are already in the cache if Work["tag"] == EVAL_SIM_TAG and self.hist.cache_set: cached_H = self.hist.get_shelved_sims() - for entry in H_to_be_sent: - if np.allclose(entry[self.hist.new_dtype_cache_keys], cached_H, rtol=1e-8, atol=1e-8): - # probably figure out indexes for entries in H_to_be_sent that - # can simply be read back into History from cache? - pass + gen_keys = [j[0] for j in self.gen_specs["out"]] + cache_gen_keys = [i for i in self.hist.cache_keys if i in gen_keys] + discovered_cache_indexes = [] + for index, entry in enumerate(H_to_be_sent): + for field in cache_gen_keys: + if np.allclose(entry[field], cached_H[field], rtol=1e-8, atol=1e-8): + discovered_cache_indexes.append(index) + break + if len(discovered_cache_indexes) > 0: + for index in discovered_cache_indexes: + H_to_be_sent = np.delete(H_to_be_sent, index, axis=0) + return discovered_cache_indexes self.wcomms[w].send(0, H_to_be_sent) + return [] def _update_state_on_alloc(self, Work: dict, w: int): """Updates a workers' active/idle status following an allocation order""" @@ -702,7 +711,8 @@ def run(self, persis_info: dict) -> (dict, int, int): if self._sim_max_given(): break self._check_work_order(Work[w], w) - self._send_work_order(Work[w], w) + cache_indexes = self._send_work_order(Work[w], w) + print(cache_indexes) self._update_state_on_alloc(Work[w], w) assert self.term_test() or any( self.W["active"] != 0 From fe4bc821ecd2f4a9cedd99f95b950adba6ec254a Mon Sep 17 00:00:00 2001 From: jlnav Date: Fri, 29 Aug 2025 12:59:31 -0500 Subject: [PATCH 04/28] little note...? --- libensemble/manager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/libensemble/manager.py b/libensemble/manager.py index 9a24f213b8..c15d13b9f4 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -712,6 +712,7 @@ def run(self, persis_info: dict) -> (dict, int, int): break self._check_work_order(Work[w], w) cache_indexes = self._send_work_order(Work[w], w) + # JLN TODO: take these indexes, grab the data from cache, slot into history, then what...? print(cache_indexes) self._update_state_on_alloc(Work[w], w) assert self.term_test() or any( From 60bd6f50b1a45c30c18fe7b404e325c2bc96b1d5 Mon Sep 17 00:00:00 2001 From: jlnav Date: Fri, 29 Aug 2025 14:21:27 -0500 Subject: [PATCH 05/28] comments --- libensemble/history.py | 9 +++++++-- libensemble/manager.py | 25 ++++++++++++++++--------- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/libensemble/history.py b/libensemble/history.py index 1abb362f85..bbd00873ce 100644 --- a/libensemble/history.py +++ b/libensemble/history.py @@ -125,8 +125,13 @@ def _append_new_fields(self, H_f: npt.NDArray) -> None: def _shelf_longrunning_sims(self, index): """Cache any f values that ran for more than a second.""" if 1: # self.H[index]['sim_ended_time'] - self.H[index]['sim_started_time'] > 1: - self.cache_keys = sorted([i for i in self.H.dtype.names if i not in [k[0] for k in libE_fields]]) - self.cache_dtype = sorted([(name, self.H.dtype.fields[name][0]) for name in self.cache_keys]) + # ('f', 'x') and ('x', 'f') are not equivalent dtypes, unfortunately. So maybe sorted helps. + self.cache_keys = sorted( + [i for i in self.H.dtype.names if i not in [k[0] for k in libE_fields]] + ) # ('f', 'x') keys only + self.cache_dtype = sorted( + [(name, self.H.dtype.fields[name][0]) for name in self.cache_keys] + ) # only needed to init cache try: in_cache = np.load(self.cache, allow_pickle=True) except EOFError: diff --git a/libensemble/manager.py b/libensemble/manager.py index c15d13b9f4..fe8a2df73e 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -425,6 +425,8 @@ def _send_work_order(self, Work: dict, w: int) -> list: work_rows = Work["libE_info"]["H_rows"] work_name = calc_type_strings[Work["tag"]] logger.debug(f"Manager sending {work_name} work to worker {w}. Rows {extract_H_ranges(Work) or None}") + + discovered_cache_indexes = [] if len(work_rows): new_dtype = [(name, self.hist.H.dtype.fields[name][0]) for name in Work["H_fields"]] H_to_be_sent = np.empty(len(work_rows), dtype=new_dtype) @@ -433,22 +435,27 @@ def _send_work_order(self, Work: dict, w: int) -> list: # check if any of the generated points are already in the cache if Work["tag"] == EVAL_SIM_TAG and self.hist.cache_set: - cached_H = self.hist.get_shelved_sims() - gen_keys = [j[0] for j in self.gen_specs["out"]] - cache_gen_keys = [i for i in self.hist.cache_keys if i in gen_keys] + cached_H = self.hist.get_shelved_sims() # get the cache + gen_keys = [j[0] for j in self.gen_specs["out"]] # get 'x' keys + cache_gen_keys = [i for i in self.hist.cache_keys if i in gen_keys] # get 'x' keys in cache discovered_cache_indexes = [] - for index, entry in enumerate(H_to_be_sent): + for index, entry in enumerate( + H_to_be_sent + ): # find indexes of H_to_be_sent where 'x' fields are in cache for field in cache_gen_keys: - if np.allclose(entry[field], cached_H[field], rtol=1e-8, atol=1e-8): + if np.allclose( + entry[field], cached_H[field], rtol=1e-8, atol=1e-8 + ): # iterate through cache entries too? discovered_cache_indexes.append(index) - break + break # but maybe the other 'x' fields are also in the cache...? if len(discovered_cache_indexes) > 0: for index in discovered_cache_indexes: - H_to_be_sent = np.delete(H_to_be_sent, index, axis=0) - return discovered_cache_indexes + H_to_be_sent = np.delete( + H_to_be_sent, index, axis=0 + ) # delete rows from H_to_be_sent. we already have f self.wcomms[w].send(0, H_to_be_sent) - return [] + return discovered_cache_indexes def _update_state_on_alloc(self, Work: dict, w: int): """Updates a workers' active/idle status following an allocation order""" From a5d33cc1e11fdd5ea3201d68cc45be55572e9499 Mon Sep 17 00:00:00 2001 From: jlnav Date: Thu, 11 Sep 2025 16:34:06 -0500 Subject: [PATCH 06/28] experimenting with having caching being a step of the alloc, once we've determined points_to_evaluate --- .../alloc_funcs/give_sim_work_first.py | 14 ++++++++ libensemble/manager.py | 34 ++++--------------- pyproject.toml | 2 +- 3 files changed, 21 insertions(+), 29 deletions(-) diff --git a/libensemble/alloc_funcs/give_sim_work_first.py b/libensemble/alloc_funcs/give_sim_work_first.py index 7ac4d75e5e..4913fe2844 100644 --- a/libensemble/alloc_funcs/give_sim_work_first.py +++ b/libensemble/alloc_funcs/give_sim_work_first.py @@ -64,6 +64,20 @@ def give_sim_work_first( points_to_evaluate = ~H["sim_started"] & ~H["cancel_requested"] + if len(libE_info["cache"]) and np.any(points_to_evaluate): + for H_index, H_entry in enumerate(H): + for cache_index, cache_entry in enumerate(libE_info["cache"]): + for field in [j[0] for j in gen_specs["out"]]: + if field in libE_info["cache"].dtype.names and np.allclose( + H_entry[field], cache_entry[field], rtol=1e-8, atol=1e-8 + ): + H[H_index][field] = cache_entry[field] + libE_info["hist"].update_history_x_out(q_inds=np.array([H_index]), sim_worker=1) + libE_info["hist"].update_history_to_gen(q_inds=np.array([H_index])) + print(f"Using cache entry {cache_index} for History index {H_index}. Field: {field}") + + points_to_evaluate = ~H["sim_started"] & ~H["cancel_requested"] + if np.any(points_to_evaluate): for wid in support.avail_worker_ids(gen_workers=False): sim_ids_to_send = support.points_by_priority(H, points_avail=points_to_evaluate, batch=batch_give) diff --git a/libensemble/manager.py b/libensemble/manager.py index fe8a2df73e..5507a7595d 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -410,7 +410,7 @@ def _freeup_resources(self, w: int) -> None: if self.resources: self.resources.resource_manager.free_rsets(w) - def _send_work_order(self, Work: dict, w: int) -> list: + def _send_work_order(self, Work: dict, w: int) -> None: """Sends an allocation function order to a worker""" logger.debug(f"Manager sending work unit to worker {w}") @@ -425,37 +425,13 @@ def _send_work_order(self, Work: dict, w: int) -> list: work_rows = Work["libE_info"]["H_rows"] work_name = calc_type_strings[Work["tag"]] logger.debug(f"Manager sending {work_name} work to worker {w}. Rows {extract_H_ranges(Work) or None}") - - discovered_cache_indexes = [] if len(work_rows): new_dtype = [(name, self.hist.H.dtype.fields[name][0]) for name in Work["H_fields"]] H_to_be_sent = np.empty(len(work_rows), dtype=new_dtype) for i, row in enumerate(work_rows): H_to_be_sent[i] = repack_fields(self.hist.H[Work["H_fields"]][row]) - # check if any of the generated points are already in the cache - if Work["tag"] == EVAL_SIM_TAG and self.hist.cache_set: - cached_H = self.hist.get_shelved_sims() # get the cache - gen_keys = [j[0] for j in self.gen_specs["out"]] # get 'x' keys - cache_gen_keys = [i for i in self.hist.cache_keys if i in gen_keys] # get 'x' keys in cache - discovered_cache_indexes = [] - for index, entry in enumerate( - H_to_be_sent - ): # find indexes of H_to_be_sent where 'x' fields are in cache - for field in cache_gen_keys: - if np.allclose( - entry[field], cached_H[field], rtol=1e-8, atol=1e-8 - ): # iterate through cache entries too? - discovered_cache_indexes.append(index) - break # but maybe the other 'x' fields are also in the cache...? - if len(discovered_cache_indexes) > 0: - for index in discovered_cache_indexes: - H_to_be_sent = np.delete( - H_to_be_sent, index, axis=0 - ) # delete rows from H_to_be_sent. we already have f - self.wcomms[w].send(0, H_to_be_sent) - return discovered_cache_indexes def _update_state_on_alloc(self, Work: dict, w: int): """Updates a workers' active/idle status following an allocation order""" @@ -650,6 +626,8 @@ def _sim_max_given(self) -> bool: def _get_alloc_libE_info(self) -> dict: """Selected statistics useful for alloc_f""" + cache = self.hist.get_shelved_sims() if self.hist.cache_set else [] + return { "any_idle_workers": any(self.W["active"] == 0), "exit_criteria": self.exit_criteria, @@ -664,6 +642,8 @@ def _get_alloc_libE_info(self) -> dict: "gen_num_procs": self.gen_num_procs, "gen_num_gpus": self.gen_num_gpus, "gen_on_manager": self.libE_specs.get("gen_on_manager", False), + "cache": cache, + "hist": self.hist, } def _alloc_work(self, H: npt.NDArray, persis_info: dict) -> dict: @@ -718,9 +698,7 @@ def run(self, persis_info: dict) -> (dict, int, int): if self._sim_max_given(): break self._check_work_order(Work[w], w) - cache_indexes = self._send_work_order(Work[w], w) - # JLN TODO: take these indexes, grab the data from cache, slot into history, then what...? - print(cache_indexes) + self._send_work_order(Work[w], w) self._update_state_on_alloc(Work[w], w) assert self.term_test() or any( self.W["active"] != 0 diff --git a/pyproject.toml b/pyproject.toml index 4be2a32759..231afa5bbf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -142,4 +142,4 @@ extend-exclude = ["*.bib", "*.xml", "docs/nitpicky"] disable_error_code = ["import-not-found", "import-untyped"] [dependency-groups] -dev = ["pyenchant", "enchant>=0.0.1,<0.0.2", "flake8-modern-annotations>=1.6.0,<2", "flake8-type-checking>=3.0.0,<4", "wat>=0.6.0,<0.7"] +dev = ["pyenchant", "enchant>=0.0.1,<0.0.2", "flake8-modern-annotations>=1.6.0,<2", "flake8-type-checking>=3.0.0,<4", "wat>=0.7.0,<0.8"] From e8816a8a64c7fa58349610aa8b5dedc0bc8b3065 Mon Sep 17 00:00:00 2001 From: jlnav Date: Thu, 11 Sep 2025 16:41:24 -0500 Subject: [PATCH 07/28] set those H entries to sim_started? --- libensemble/alloc_funcs/give_sim_work_first.py | 1 + libensemble/history.py | 1 + 2 files changed, 2 insertions(+) diff --git a/libensemble/alloc_funcs/give_sim_work_first.py b/libensemble/alloc_funcs/give_sim_work_first.py index 4913fe2844..2f470d9327 100644 --- a/libensemble/alloc_funcs/give_sim_work_first.py +++ b/libensemble/alloc_funcs/give_sim_work_first.py @@ -72,6 +72,7 @@ def give_sim_work_first( H_entry[field], cache_entry[field], rtol=1e-8, atol=1e-8 ): H[H_index][field] = cache_entry[field] + H[H_index]["sim_started"] = True libE_info["hist"].update_history_x_out(q_inds=np.array([H_index]), sim_worker=1) libE_info["hist"].update_history_to_gen(q_inds=np.array([H_index])) print(f"Using cache entry {cache_index} for History index {H_index}. Field: {field}") diff --git a/libensemble/history.py b/libensemble/history.py index bbd00873ce..705fb0d67f 100644 --- a/libensemble/history.py +++ b/libensemble/history.py @@ -139,6 +139,7 @@ def _shelf_longrunning_sims(self, index): entry = self.H[index][self.cache_keys] if entry not in in_cache: in_cache = np.append(in_cache, entry) + in_cache = np.unique(in_cache, axis=0) np.save(self.cache, in_cache, allow_pickle=True) self.cache_set = True From 28945237cf4f9d1d4c7c836bcf677cdbe6160f77 Mon Sep 17 00:00:00 2001 From: jlnav Date: Fri, 12 Sep 2025 15:42:25 -0500 Subject: [PATCH 08/28] grab update-able indexes, then call update_history_x_out and update_history_f on/with those indexes and the associated cache values, pretending that the array of cache-values are a worker message --- .../alloc_funcs/give_sim_work_first.py | 39 ++++++++++++++++--- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/libensemble/alloc_funcs/give_sim_work_first.py b/libensemble/alloc_funcs/give_sim_work_first.py index 2f470d9327..02967c2118 100644 --- a/libensemble/alloc_funcs/give_sim_work_first.py +++ b/libensemble/alloc_funcs/give_sim_work_first.py @@ -62,21 +62,50 @@ def give_sim_work_first( gen_count = support.count_gens() Work = {} + if not persis_info.get("updated_H_indices"): + persis_info["updated_H_indices"] = [] + points_to_evaluate = ~H["sim_started"] & ~H["cancel_requested"] + gen_out_fields = [j[0] for j in gen_specs["out"]] + + indices_to_update = [] + cache_hit = False + if len(libE_info["cache"]) and np.any(points_to_evaluate): for H_index, H_entry in enumerate(H): + if H_index in persis_info["updated_H_indices"]: + continue for cache_index, cache_entry in enumerate(libE_info["cache"]): - for field in [j[0] for j in gen_specs["out"]]: + for field in gen_out_fields: if field in libE_info["cache"].dtype.names and np.allclose( H_entry[field], cache_entry[field], rtol=1e-8, atol=1e-8 ): - H[H_index][field] = cache_entry[field] - H[H_index]["sim_started"] = True - libE_info["hist"].update_history_x_out(q_inds=np.array([H_index]), sim_worker=1) - libE_info["hist"].update_history_to_gen(q_inds=np.array([H_index])) + cache_hit = True + indices_to_update.append({"cache_index": cache_index, "H_index": H_index, "field": field}) + persis_info["updated_H_indices"].append(H_index) print(f"Using cache entry {cache_index} for History index {H_index}. Field: {field}") + if cache_hit: + + q_inds = np.array([i["H_index"] for i in indices_to_update]) + libE_info["hist"].update_history_x_out(q_inds=q_inds, sim_worker=1) + + simulated_calc_out = np.zeros(len(q_inds), dtype=sim_specs["out"]) + for i, H_index in enumerate(q_inds): + simulated_calc_out[i] = libE_info["cache"][indices_to_update[i]["cache_index"]][ + indices_to_update[i]["field"] + ] + + simulated_D_recv = { + "calc_out": simulated_calc_out, + "libE_info": { + "H_rows": q_inds, + }, + } + + libE_info["hist"].update_history_f(simulated_D_recv, kill_canceled_sims=False) + points_to_evaluate = ~H["sim_started"] & ~H["cancel_requested"] if np.any(points_to_evaluate): From df29125a99a60619b1cf780cbd2c5527952ef312 Mon Sep 17 00:00:00 2001 From: jlnav Date: Wed, 17 Sep 2025 15:47:51 -0500 Subject: [PATCH 09/28] moving cache logic into manager, into handle_msg_from_worker that overrides .recv, and in _send_work_order that forms a buffer from corresponding cache entries --- .../alloc_funcs/give_sim_work_first.py | 86 +++++++++---------- libensemble/manager.py | 65 +++++++++++--- pyproject.toml | 14 +-- 3 files changed, 105 insertions(+), 60 deletions(-) diff --git a/libensemble/alloc_funcs/give_sim_work_first.py b/libensemble/alloc_funcs/give_sim_work_first.py index 02967c2118..7ab64b2191 100644 --- a/libensemble/alloc_funcs/give_sim_work_first.py +++ b/libensemble/alloc_funcs/give_sim_work_first.py @@ -62,49 +62,49 @@ def give_sim_work_first( gen_count = support.count_gens() Work = {} - if not persis_info.get("updated_H_indices"): - persis_info["updated_H_indices"] = [] - - points_to_evaluate = ~H["sim_started"] & ~H["cancel_requested"] - - gen_out_fields = [j[0] for j in gen_specs["out"]] - - indices_to_update = [] - cache_hit = False - - if len(libE_info["cache"]) and np.any(points_to_evaluate): - for H_index, H_entry in enumerate(H): - if H_index in persis_info["updated_H_indices"]: - continue - for cache_index, cache_entry in enumerate(libE_info["cache"]): - for field in gen_out_fields: - if field in libE_info["cache"].dtype.names and np.allclose( - H_entry[field], cache_entry[field], rtol=1e-8, atol=1e-8 - ): - cache_hit = True - indices_to_update.append({"cache_index": cache_index, "H_index": H_index, "field": field}) - persis_info["updated_H_indices"].append(H_index) - print(f"Using cache entry {cache_index} for History index {H_index}. Field: {field}") - - if cache_hit: - - q_inds = np.array([i["H_index"] for i in indices_to_update]) - libE_info["hist"].update_history_x_out(q_inds=q_inds, sim_worker=1) - - simulated_calc_out = np.zeros(len(q_inds), dtype=sim_specs["out"]) - for i, H_index in enumerate(q_inds): - simulated_calc_out[i] = libE_info["cache"][indices_to_update[i]["cache_index"]][ - indices_to_update[i]["field"] - ] - - simulated_D_recv = { - "calc_out": simulated_calc_out, - "libE_info": { - "H_rows": q_inds, - }, - } - - libE_info["hist"].update_history_f(simulated_D_recv, kill_canceled_sims=False) + # if not persis_info.get("updated_H_indices"): + # persis_info["updated_H_indices"] = [] + + # points_to_evaluate = ~H["sim_started"] & ~H["cancel_requested"] + + # gen_out_fields = [j[0] for j in gen_specs["out"]] + + # indices_to_update = [] + # cache_hit = False + + # if len(libE_info["cache"]) and np.any(points_to_evaluate): + # for H_index, H_entry in enumerate(H): + # if H_index in persis_info["updated_H_indices"]: + # continue + # for cache_index, cache_entry in enumerate(libE_info["cache"]): + # for field in gen_out_fields: + # if field in libE_info["cache"].dtype.names and np.allclose( + # H_entry[field], cache_entry[field], rtol=1e-8, atol=1e-8 + # ): + # cache_hit = True + # indices_to_update.append({"cache_index": cache_index, "H_index": H_index, "field": field}) + # persis_info["updated_H_indices"].append(H_index) + # print(f"Using cache entry {cache_index} for History index {H_index}. Field: {field}") + + # if cache_hit: + + # q_inds = np.array([i["H_index"] for i in indices_to_update]) + # libE_info["hist"].update_history_x_out(q_inds=q_inds, sim_worker=1) + + # simulated_calc_out = np.zeros(len(q_inds), dtype=sim_specs["out"]) + # for i, H_index in enumerate(q_inds): + # simulated_calc_out[i] = libE_info["cache"][indices_to_update[i]["cache_index"]][ + # indices_to_update[i]["field"] + # ] + + # simulated_D_recv = { + # "calc_out": simulated_calc_out, + # "libE_info": { + # "H_rows": q_inds, + # }, + # } + + # libE_info["hist"].update_history_f(simulated_D_recv, kill_canceled_sims=False) points_to_evaluate = ~H["sim_started"] & ~H["cancel_requested"] diff --git a/libensemble/manager.py b/libensemble/manager.py index 5507a7595d..28d357f394 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -217,6 +217,8 @@ def __init__( self.WorkerExc = False self.persis_pending = [] self.live_data = libE_specs.get("live_data") + self.from_cache = [] + self.cache_hit = False dyn_keys = ("resource_sets", "num_procs", "num_gpus") dyn_keys_in_H = any(k in self.hist.H.dtype.names for k in dyn_keys) @@ -414,6 +416,32 @@ def _send_work_order(self, Work: dict, w: int) -> None: """Sends an allocation function order to a worker""" logger.debug(f"Manager sending work unit to worker {w}") + work_rows = Work["libE_info"]["H_rows"] + new_dtype = [(name, self.hist.H.dtype.fields[name][0]) for name in Work["H_fields"]] + + if Work["tag"] == EVAL_SIM_TAG: + cache = self.hist.get_shelved_sims() + dtype_with_idx = np.dtype(cache.dtype.descr + np.dtype([("H_row", int)]).descr) + self.from_cache = np.zeros(len(work_rows), dtype=dtype_with_idx) # all work may be in cache + + for field in np.dtype(new_dtype).names: + if not len(self.from_cache): + break + if field in cache.dtype.names: + for row in work_rows: + for cache_row in cache: + if np.allclose( + cache_row[field], self.hist.H[field][row] + ): # we found outbound work in cache + self.cache_hit = True + from_cache_entry = np.empty( + 1, dtype=dtype_with_idx + ) # make an entry for this row, plus H_row + from_cache_entry["H_row"] = row + for remaining_field in cache.dtype.names: + from_cache_entry[remaining_field] = cache_row[remaining_field] + self.from_cache[row] = from_cache_entry + if self.resources: self._set_resources(Work, w) @@ -422,11 +450,19 @@ def _send_work_order(self, Work: dict, w: int) -> None: if Work["tag"] == EVAL_GEN_TAG: self.W[w]["gen_started_time"] = time.time() - work_rows = Work["libE_info"]["H_rows"] work_name = calc_type_strings[Work["tag"]] - logger.debug(f"Manager sending {work_name} work to worker {w}. Rows {extract_H_ranges(Work) or None}") + if self.cache_hit: + logger.debug( + f"Manager retrieved {work_name} work for worker {w} from cache. Rows {extract_H_ranges(Work) or None}" + ) + else: + logger.debug(f"Manager sending {work_name} work to worker {w}. Rows {extract_H_ranges(Work) or None}") + if len(work_rows): - new_dtype = [(name, self.hist.H.dtype.fields[name][0]) for name in Work["H_fields"]] + + if all([i in self.from_cache["H_row"] for i in work_rows]): # if all rows in work_rows are found in cache + return + H_to_be_sent = np.empty(len(work_rows), dtype=new_dtype) for i, row in enumerate(work_rows): H_to_be_sent[i] = repack_fields(self.hist.H[Work["H_fields"]][row]) @@ -462,6 +498,11 @@ def _receive_from_workers(self, persis_info: dict) -> dict: new_stuff = True while new_stuff: new_stuff = False + if self.cache_hit or len(self.from_cache): + self.cache_hit = False + new_stuff = True + self._handle_msg_from_worker(persis_info, 0, process_cache=True) + self.from_cache = [] for w in self.W["worker_id"]: if self.wcomms[w].mail_flag(): new_stuff = True @@ -516,11 +557,19 @@ def _update_state_on_worker_msg(self, persis_info: dict, D_recv: dict, w: int) - if D_recv.get("persis_info"): persis_info.setdefault(int(w), {}).update(D_recv["persis_info"]) - def _handle_msg_from_worker(self, persis_info: dict, w: int) -> None: + def _handle_msg_from_worker(self, persis_info: dict, w: int, process_cache: bool = False) -> None: """Handles a message from worker w""" try: - msg = self.wcomms[w].recv() - tag, D_recv = msg + if process_cache: + D_recv = { + "calc_out": self.from_cache, # need cache entries without H_row + "libE_info": { + "H_rows": self.from_cache["H_row"], + }, + } + else: + msg = self.wcomms[w].recv() + tag, D_recv = msg except CommFinishedException: logger.debug(f"Finalizing message from Worker {w}") return @@ -626,8 +675,6 @@ def _sim_max_given(self) -> bool: def _get_alloc_libE_info(self) -> dict: """Selected statistics useful for alloc_f""" - cache = self.hist.get_shelved_sims() if self.hist.cache_set else [] - return { "any_idle_workers": any(self.W["active"] == 0), "exit_criteria": self.exit_criteria, @@ -642,8 +689,6 @@ def _get_alloc_libE_info(self) -> dict: "gen_num_procs": self.gen_num_procs, "gen_num_gpus": self.gen_num_gpus, "gen_on_manager": self.libE_specs.get("gen_on_manager", False), - "cache": cache, - "hist": self.hist, } def _alloc_work(self, H: npt.NDArray, persis_info: dict) -> dict: diff --git a/pyproject.toml b/pyproject.toml index 231afa5bbf..42c30028ac 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -92,13 +92,13 @@ types-pyyaml = ">=6.0.12.20250402,<7" [tool.pixi.dependencies] python = ">=3.10,<3.14" -pip = ">=24.3.1,<25" -setuptools = ">=75.6.0,<76" -numpy = ">=1.21,<3" -pydantic = ">=1.10,<3" -pyyaml = ">=6.0,<7" -tomli = ">=1.2.1,<3" -psutil = ">=5.9.4,<7" +pip = "*" +setuptools = "*" +numpy = "*" +pydantic = "*" +pyyaml = "*" +tomli = "*" +psutil = "*" [tool.pixi.target.osx-arm64.dependencies] clang_osx-arm64 = ">=19.1.2,<20" From 0a70303f3f249552210fd01b74f8f6fd97feaa94 Mon Sep 17 00:00:00 2001 From: jlnav Date: Thu, 18 Sep 2025 10:34:05 -0500 Subject: [PATCH 10/28] grow the manager's internal record of cache hits, instead of overwriting the same one --- libensemble/manager.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/libensemble/manager.py b/libensemble/manager.py index 28d357f394..e7afa52694 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -419,14 +419,15 @@ def _send_work_order(self, Work: dict, w: int) -> None: work_rows = Work["libE_info"]["H_rows"] new_dtype = [(name, self.hist.H.dtype.fields[name][0]) for name in Work["H_fields"]] - if Work["tag"] == EVAL_SIM_TAG: + if Work["tag"] == EVAL_SIM_TAG and len(work_rows): cache = self.hist.get_shelved_sims() dtype_with_idx = np.dtype(cache.dtype.descr + np.dtype([("H_row", int)]).descr) - self.from_cache = np.zeros(len(work_rows), dtype=dtype_with_idx) # all work may be in cache + if not len(self.from_cache): + self.from_cache = np.zeros(len(work_rows), dtype=dtype_with_idx) # all work may be in cache + else: + self.from_cache = np.append(self.from_cache, np.zeros(len(work_rows), dtype=dtype_with_idx)) for field in np.dtype(new_dtype).names: - if not len(self.from_cache): - break if field in cache.dtype.names: for row in work_rows: for cache_row in cache: @@ -463,6 +464,9 @@ def _send_work_order(self, Work: dict, w: int) -> None: if all([i in self.from_cache["H_row"] for i in work_rows]): # if all rows in work_rows are found in cache return + if self.cache_hit: + work_rows = [row for row in work_rows if row not in self.from_cache["H_row"]] + H_to_be_sent = np.empty(len(work_rows), dtype=new_dtype) for i, row in enumerate(work_rows): H_to_be_sent[i] = repack_fields(self.hist.H[Work["H_fields"]][row]) From 593b5225a973beea05d25f65b4fff920917d021a Mon Sep 17 00:00:00 2001 From: jlnav Date: Thu, 18 Sep 2025 16:24:28 -0500 Subject: [PATCH 11/28] save presumptive workerID for the worker that would've been given cached work. increment a cache_index for slotting into local buffer --- libensemble/manager.py | 40 ++++++++++++------- .../regression_tests/test_1d_sampling.py | 6 ++- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/libensemble/manager.py b/libensemble/manager.py index e7afa52694..ff5de50c6e 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -218,6 +218,7 @@ def __init__( self.persis_pending = [] self.live_data = libE_specs.get("live_data") self.from_cache = [] + self.cache_index = 0 self.cache_hit = False dyn_keys = ("resource_sets", "num_procs", "num_gpus") @@ -419,9 +420,9 @@ def _send_work_order(self, Work: dict, w: int) -> None: work_rows = Work["libE_info"]["H_rows"] new_dtype = [(name, self.hist.H.dtype.fields[name][0]) for name in Work["H_fields"]] - if Work["tag"] == EVAL_SIM_TAG and len(work_rows): + if Work["tag"] == EVAL_SIM_TAG and len(work_rows) and self.hist.cache_set: cache = self.hist.get_shelved_sims() - dtype_with_idx = np.dtype(cache.dtype.descr + np.dtype([("H_row", int)]).descr) + dtype_with_idx = np.dtype(cache.dtype.descr + np.dtype([("H_row", int), ("worker_id", int)]).descr) if not len(self.from_cache): self.from_cache = np.zeros(len(work_rows), dtype=dtype_with_idx) # all work may be in cache else: @@ -439,17 +440,18 @@ def _send_work_order(self, Work: dict, w: int) -> None: 1, dtype=dtype_with_idx ) # make an entry for this row, plus H_row from_cache_entry["H_row"] = row + from_cache_entry["worker_id"] = w for remaining_field in cache.dtype.names: from_cache_entry[remaining_field] = cache_row[remaining_field] - self.from_cache[row] = from_cache_entry + self.from_cache[self.cache_index] = from_cache_entry + self.cache_index += 1 if self.resources: self._set_resources(Work, w) - self.wcomms[w].send(Work["tag"], Work) - - if Work["tag"] == EVAL_GEN_TAG: + elif Work["tag"] == EVAL_GEN_TAG: self.W[w]["gen_started_time"] = time.time() + self.wcomms[w].send(Work["tag"], Work) work_name = calc_type_strings[Work["tag"]] if self.cache_hit: @@ -461,16 +463,18 @@ def _send_work_order(self, Work: dict, w: int) -> None: if len(work_rows): - if all([i in self.from_cache["H_row"] for i in work_rows]): # if all rows in work_rows are found in cache - return - if self.cache_hit: work_rows = [row for row in work_rows if row not in self.from_cache["H_row"]] + if all( + [i in self.from_cache["H_row"] for i in work_rows] + ): # if all rows in work_rows are found in cache + return H_to_be_sent = np.empty(len(work_rows), dtype=new_dtype) for i, row in enumerate(work_rows): H_to_be_sent[i] = repack_fields(self.hist.H[Work["H_fields"]][row]) + self.wcomms[w].send(Work["tag"], Work) self.wcomms[w].send(0, H_to_be_sent) def _update_state_on_alloc(self, Work: dict, w: int): @@ -499,14 +503,17 @@ def _receive_from_workers(self, persis_info: dict) -> dict: looped back over. """ time.sleep(0.0001) # Critical for multiprocessing performance + + if self.cache_hit or len(self.from_cache): + self.cache_hit = False + for w in self.from_cache["worker_id"]: + self._handle_msg_from_worker(persis_info, w, process_cache=True) + self.from_cache = [] + self.cache_index = 0 + new_stuff = True while new_stuff: new_stuff = False - if self.cache_hit or len(self.from_cache): - self.cache_hit = False - new_stuff = True - self._handle_msg_from_worker(persis_info, 0, process_cache=True) - self.from_cache = [] for w in self.W["worker_id"]: if self.wcomms[w].mail_flag(): new_stuff = True @@ -566,10 +573,13 @@ def _handle_msg_from_worker(self, persis_info: dict, w: int, process_cache: bool try: if process_cache: D_recv = { - "calc_out": self.from_cache, # need cache entries without H_row + "calc_out": self.from_cache[[name[0] for name in self.sim_specs["out"]]], "libE_info": { "H_rows": self.from_cache["H_row"], + "workerID": w, }, + "calc_status": 0, + "calc_type": 1, } else: msg = self.wcomms[w].recv() diff --git a/libensemble/tests/regression_tests/test_1d_sampling.py b/libensemble/tests/regression_tests/test_1d_sampling.py index edecabb668..94a470e5fc 100644 --- a/libensemble/tests/regression_tests/test_1d_sampling.py +++ b/libensemble/tests/regression_tests/test_1d_sampling.py @@ -15,7 +15,7 @@ import numpy as np -from libensemble import Ensemble +from libensemble import Ensemble, logger from libensemble.gen_funcs.sampling import latin_hypercube_sample as gen_f # Import libEnsemble items for this test @@ -23,10 +23,12 @@ from libensemble.specs import ExitCriteria, GenSpecs, LibeSpecs, SimSpecs from libensemble.tools import add_unique_random_streams +logger.set_level("DEBUG") # For testing the test + # Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). if __name__ == "__main__": sampling = Ensemble(parse_args=True) - sampling.libE_specs = LibeSpecs(save_every_k_gens=300, safe_mode=False, disable_log_files=True) + sampling.libE_specs = LibeSpecs(save_every_k_gens=300, safe_mode=False) # , disable_log_files=True) sampling.sim_specs = SimSpecs(sim_f=sim_f) sampling.gen_specs = GenSpecs( gen_f=gen_f, From e55e691a4e0f484d2e6dda90955fd67fd3f3c0a2 Mon Sep 17 00:00:00 2001 From: jlnav Date: Fri, 19 Sep 2025 13:27:22 -0500 Subject: [PATCH 12/28] prevent redundant insertions into local cache retrieval. fix a bug involving gen hang. only if all sim work already in cache do we skip the send-stage. when processing cache entries, process them in the order in which they presumptively would've been received from that worker. additional logging. --- libensemble/manager.py | 29 ++++++++++++++++------------- pyproject.toml | 16 ++++++++-------- 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/libensemble/manager.py b/libensemble/manager.py index ff5de50c6e..c0f8407cdd 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -432,13 +432,12 @@ def _send_work_order(self, Work: dict, w: int) -> None: if field in cache.dtype.names: for row in work_rows: for cache_row in cache: - if np.allclose( - cache_row[field], self.hist.H[field][row] - ): # we found outbound work in cache + if ( + np.allclose(cache_row[field], self.hist.H[field][row]) + and row not in self.from_cache["H_row"] + ): # we found outbound work in cache, that's not already been retrieved self.cache_hit = True - from_cache_entry = np.empty( - 1, dtype=dtype_with_idx - ) # make an entry for this row, plus H_row + from_cache_entry = np.empty(1, dtype=dtype_with_idx) from_cache_entry["H_row"] = row from_cache_entry["worker_id"] = w for remaining_field in cache.dtype.names: @@ -449,7 +448,7 @@ def _send_work_order(self, Work: dict, w: int) -> None: if self.resources: self._set_resources(Work, w) - elif Work["tag"] == EVAL_GEN_TAG: + if Work["tag"] == EVAL_GEN_TAG: self.W[w]["gen_started_time"] = time.time() self.wcomms[w].send(Work["tag"], Work) @@ -465,8 +464,8 @@ def _send_work_order(self, Work: dict, w: int) -> None: if self.cache_hit: work_rows = [row for row in work_rows if row not in self.from_cache["H_row"]] - if all( - [i in self.from_cache["H_row"] for i in work_rows] + if ( + all([i in self.from_cache["H_row"] for i in work_rows]) and Work["tag"] == EVAL_SIM_TAG ): # if all rows in work_rows are found in cache return @@ -504,7 +503,7 @@ def _receive_from_workers(self, persis_info: dict) -> dict: """ time.sleep(0.0001) # Critical for multiprocessing performance - if self.cache_hit or len(self.from_cache): + if self.cache_hit: self.cache_hit = False for w in self.from_cache["worker_id"]: self._handle_msg_from_worker(persis_info, w, process_cache=True) @@ -572,10 +571,11 @@ def _handle_msg_from_worker(self, persis_info: dict, w: int, process_cache: bool """Handles a message from worker w""" try: if process_cache: + cache_entry_by_worker = self.from_cache["worker_id"] == w D_recv = { - "calc_out": self.from_cache[[name[0] for name in self.sim_specs["out"]]], + "calc_out": self.from_cache[cache_entry_by_worker][[name[0] for name in self.sim_specs["out"]]], "libE_info": { - "H_rows": self.from_cache["H_row"], + "H_rows": self.from_cache[cache_entry_by_worker]["H_row"], "workerID": w, }, "calc_status": 0, @@ -598,7 +598,10 @@ def _handle_msg_from_worker(self, persis_info: dict, w: int, process_cache: bool logger.vdebug(f"Manager received a log message from worker {w}") logging.getLogger(D_recv.name).handle(D_recv) else: - logger.debug(f"Manager received data message from worker {w}") + if process_cache: + logger.debug(f"Manager retrieved cached message redirected from worker {w}") + else: + logger.debug(f"Manager received data message from worker {w}") self._update_state_on_worker_msg(persis_info, D_recv, w) def _kill_cancelled_sims(self) -> None: diff --git a/pyproject.toml b/pyproject.toml index 42c30028ac..68d5654da3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -92,13 +92,13 @@ types-pyyaml = ">=6.0.12.20250402,<7" [tool.pixi.dependencies] python = ">=3.10,<3.14" -pip = "*" -setuptools = "*" -numpy = "*" -pydantic = "*" -pyyaml = "*" -tomli = "*" -psutil = "*" +pip = ">=24.3.1,<25" +setuptools = ">=75.6.0,<76" +numpy = ">=1.21,<3" +pydantic = ">=1.10,<3" +pyyaml = ">=6.0,<7" +tomli = ">=1.2.1,<3" +psutil = ">=5.9.4,<7" [tool.pixi.target.osx-arm64.dependencies] clang_osx-arm64 = ">=19.1.2,<20" @@ -142,4 +142,4 @@ extend-exclude = ["*.bib", "*.xml", "docs/nitpicky"] disable_error_code = ["import-not-found", "import-untyped"] [dependency-groups] -dev = ["pyenchant", "enchant>=0.0.1,<0.0.2", "flake8-modern-annotations>=1.6.0,<2", "flake8-type-checking>=3.0.0,<4", "wat>=0.7.0,<0.8"] +dev = ["pyenchant", "enchant>=0.0.1,<0.0.2", "flake8-modern-annotations>=1.6.0,<2", "flake8-type-checking>=3.0.0,<4"] From 9575d7665b4d33bd2428def4c1b0275ed2ec7416 Mon Sep 17 00:00:00 2001 From: jlnav Date: Fri, 19 Sep 2025 14:34:54 -0500 Subject: [PATCH 13/28] refactor, and remove first draft of code that was in alloc_f --- .../alloc_funcs/give_sim_work_first.py | 44 ---------- libensemble/manager.py | 88 +++++++++++++------ 2 files changed, 60 insertions(+), 72 deletions(-) diff --git a/libensemble/alloc_funcs/give_sim_work_first.py b/libensemble/alloc_funcs/give_sim_work_first.py index 7ab64b2191..7ac4d75e5e 100644 --- a/libensemble/alloc_funcs/give_sim_work_first.py +++ b/libensemble/alloc_funcs/give_sim_work_first.py @@ -62,50 +62,6 @@ def give_sim_work_first( gen_count = support.count_gens() Work = {} - # if not persis_info.get("updated_H_indices"): - # persis_info["updated_H_indices"] = [] - - # points_to_evaluate = ~H["sim_started"] & ~H["cancel_requested"] - - # gen_out_fields = [j[0] for j in gen_specs["out"]] - - # indices_to_update = [] - # cache_hit = False - - # if len(libE_info["cache"]) and np.any(points_to_evaluate): - # for H_index, H_entry in enumerate(H): - # if H_index in persis_info["updated_H_indices"]: - # continue - # for cache_index, cache_entry in enumerate(libE_info["cache"]): - # for field in gen_out_fields: - # if field in libE_info["cache"].dtype.names and np.allclose( - # H_entry[field], cache_entry[field], rtol=1e-8, atol=1e-8 - # ): - # cache_hit = True - # indices_to_update.append({"cache_index": cache_index, "H_index": H_index, "field": field}) - # persis_info["updated_H_indices"].append(H_index) - # print(f"Using cache entry {cache_index} for History index {H_index}. Field: {field}") - - # if cache_hit: - - # q_inds = np.array([i["H_index"] for i in indices_to_update]) - # libE_info["hist"].update_history_x_out(q_inds=q_inds, sim_worker=1) - - # simulated_calc_out = np.zeros(len(q_inds), dtype=sim_specs["out"]) - # for i, H_index in enumerate(q_inds): - # simulated_calc_out[i] = libE_info["cache"][indices_to_update[i]["cache_index"]][ - # indices_to_update[i]["field"] - # ] - - # simulated_D_recv = { - # "calc_out": simulated_calc_out, - # "libE_info": { - # "H_rows": q_inds, - # }, - # } - - # libE_info["hist"].update_history_f(simulated_D_recv, kill_canceled_sims=False) - points_to_evaluate = ~H["sim_started"] & ~H["cancel_requested"] if np.any(points_to_evaluate): diff --git a/libensemble/manager.py b/libensemble/manager.py index c0f8407cdd..7afb705896 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -413,6 +413,44 @@ def _freeup_resources(self, w: int) -> None: if self.resources: self.resources.resource_manager.free_rsets(w) + def _refresh_from_cache( + self, cache: npt.NDArray, dtype_with_idx: np.dtype, cache_row: npt.NDArray, work_row: int, w: int + ) -> None: + """Add a cache entry, workerID, and H_row to the record array.""" + self.cache_hit = True + from_cache_entry = np.empty(1, dtype=dtype_with_idx) + from_cache_entry["H_row"] = work_row + from_cache_entry["worker_id"] = w + for remaining_field in cache.dtype.names: + from_cache_entry[remaining_field] = cache_row[remaining_field] + self.from_cache[self.cache_index] = from_cache_entry + self.cache_index += 1 + + def _cache_scan( + self, cache: npt.NDArray, Work: dict, w: int, dtype_with_idx: np.dtype, new_dtype: np.dtype + ) -> None: + """Check if any work rows are in the cache, and if so, call the above, _refresh_from_cache.""" + + for field in np.dtype(new_dtype).names: + if field in cache.dtype.names: + for work_row in Work["libE_info"]["H_rows"]: + for cache_row in cache: + if ( + np.allclose(cache_row[field], self.hist.H[field][work_row]) + and work_row not in self.from_cache["H_row"] + ): # we found outbound work in cache, that's not already been retrieved + self._refresh_from_cache(cache, dtype_with_idx, cache_row, work_row, w) + + def _update_state_from_cache(self, Work: dict, work_rows: npt.NDArray, w: int, new_dtype: np.dtype) -> None: + """Retrieve saved cache from history, create local record-array of matching cache entries.""" + cache = self.hist.get_shelved_sims() + dtype_with_idx = np.dtype(cache.dtype.descr + np.dtype([("H_row", int), ("worker_id", int)]).descr) + if not len(self.from_cache): + self.from_cache = np.zeros(len(work_rows), dtype=dtype_with_idx) # all work may be in cache + else: + self.from_cache = np.append(self.from_cache, np.zeros(len(work_rows), dtype=dtype_with_idx)) + self._cache_scan(cache, Work, w, dtype_with_idx, new_dtype) + def _send_work_order(self, Work: dict, w: int) -> None: """Sends an allocation function order to a worker""" logger.debug(f"Manager sending work unit to worker {w}") @@ -421,29 +459,7 @@ def _send_work_order(self, Work: dict, w: int) -> None: new_dtype = [(name, self.hist.H.dtype.fields[name][0]) for name in Work["H_fields"]] if Work["tag"] == EVAL_SIM_TAG and len(work_rows) and self.hist.cache_set: - cache = self.hist.get_shelved_sims() - dtype_with_idx = np.dtype(cache.dtype.descr + np.dtype([("H_row", int), ("worker_id", int)]).descr) - if not len(self.from_cache): - self.from_cache = np.zeros(len(work_rows), dtype=dtype_with_idx) # all work may be in cache - else: - self.from_cache = np.append(self.from_cache, np.zeros(len(work_rows), dtype=dtype_with_idx)) - - for field in np.dtype(new_dtype).names: - if field in cache.dtype.names: - for row in work_rows: - for cache_row in cache: - if ( - np.allclose(cache_row[field], self.hist.H[field][row]) - and row not in self.from_cache["H_row"] - ): # we found outbound work in cache, that's not already been retrieved - self.cache_hit = True - from_cache_entry = np.empty(1, dtype=dtype_with_idx) - from_cache_entry["H_row"] = row - from_cache_entry["worker_id"] = w - for remaining_field in cache.dtype.names: - from_cache_entry[remaining_field] = cache_row[remaining_field] - self.from_cache[self.cache_index] = from_cache_entry - self.cache_index += 1 + self._update_state_from_cache(Work, work_rows, w, new_dtype) if self.resources: self._set_resources(Work, w) @@ -467,6 +483,7 @@ def _send_work_order(self, Work: dict, w: int) -> None: if ( all([i in self.from_cache["H_row"] for i in work_rows]) and Work["tag"] == EVAL_SIM_TAG ): # if all rows in work_rows are found in cache + logger.debug("Manager skipping sending work to worker %s due to cache", w) return H_to_be_sent = np.empty(len(work_rows), dtype=new_dtype) @@ -495,8 +512,19 @@ def _update_state_on_alloc(self, Work: dict, w: int): # --- Handle incoming messages from workers - def _receive_from_workers(self, persis_info: dict) -> dict: - """Receives calculation output from workers. Loops over all + def _receive_from_workers_or_cache(self, persis_info: dict) -> dict: + """ + Two stage process of handling either: + 1. Messages that could've been sent to a worker, but are already in the cache. + 2. Messages that have been sent by a worker. + + 1. + If the cache is not empty, the cache is scanned for messages that could've been sent. + Messages are processed as though they came from their corresponding worker. The local + record of the cache is then cleared to prevent duplicate processing. + + 2. + Receives calculation output from workers. Loops over all active workers and probes to see if worker is ready to communticate. If any output is received, all other workers are looped back over. @@ -568,7 +596,11 @@ def _update_state_on_worker_msg(self, persis_info: dict, D_recv: dict, w: int) - persis_info.setdefault(int(w), {}).update(D_recv["persis_info"]) def _handle_msg_from_worker(self, persis_info: dict, w: int, process_cache: bool = False) -> None: - """Handles a message from worker w""" + """Handles a message from worker w. + + If processing from the cache, create a simulated worker message containing + the cache entry. + """ try: if process_cache: cache_entry_by_worker = self.from_cache["worker_id"] == w @@ -662,7 +694,7 @@ def _final_receive_and_kill(self, persis_info: dict) -> (dict, int, int): exit_flag = 0 while (any(self.W["active"]) or any(self.W["persis_state"])) and exit_flag == 0: - persis_info = self._receive_from_workers(persis_info) + persis_info = self._receive_from_workers_or_cache(persis_info) if self.term_test(logged=False) == 2: # Elapsed Wallclock has expired if not any(self.W["persis_state"]): @@ -751,7 +783,7 @@ def run(self, persis_info: dict) -> (dict, int, int): try: while not self.term_test(): self._kill_cancelled_sims() - persis_info = self._receive_from_workers(persis_info) + persis_info = self._receive_from_workers_or_cache(persis_info) Work, persis_info, flag = self._alloc_work(self.hist.trim_H(), persis_info) if flag: break From c01dcd3c93b45f789539e5c98f0bf6fc65f25194 Mon Sep 17 00:00:00 2001 From: jlnav Date: Fri, 19 Sep 2025 14:38:19 -0500 Subject: [PATCH 14/28] for now, enable cache for sims that lasted longer than a second --- libensemble/history.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libensemble/history.py b/libensemble/history.py index 705fb0d67f..748e9694c9 100644 --- a/libensemble/history.py +++ b/libensemble/history.py @@ -124,7 +124,7 @@ def _append_new_fields(self, H_f: npt.NDArray) -> None: def _shelf_longrunning_sims(self, index): """Cache any f values that ran for more than a second.""" - if 1: # self.H[index]['sim_ended_time'] - self.H[index]['sim_started_time'] > 1: + if self.H[index]["sim_ended_time"] - self.H[index]["sim_started_time"] > 1: # ('f', 'x') and ('x', 'f') are not equivalent dtypes, unfortunately. So maybe sorted helps. self.cache_keys = sorted( [i for i in self.H.dtype.names if i not in [k[0] for k in libE_fields]] From 6e90b5caf11e95750a8eb30fa2f8c8fdc0b5179b Mon Sep 17 00:00:00 2001 From: jlnav Date: Fri, 19 Sep 2025 14:45:23 -0500 Subject: [PATCH 15/28] experimenting with making disk cache name match calling script plus easily accessible args --- libensemble/history.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libensemble/history.py b/libensemble/history.py index 748e9694c9..2c6ecfa89d 100644 --- a/libensemble/history.py +++ b/libensemble/history.py @@ -1,4 +1,5 @@ import logging +import sys import time from pathlib import Path @@ -109,7 +110,7 @@ def __init__( self.cache_dir = Path.home() / ".libE" self.cache_dir.mkdir(parents=True, exist_ok=True) - self.cache = self.cache_dir / "cache.npy" + self.cache = self.cache_dir / Path("".join(sys.argv) + ".npy") if not self.cache.exists(): self.cache.touch() self.cache_set = False From c7a7f523b4e865a924295e63e1e14eac6cd7cbb1 Mon Sep 17 00:00:00 2001 From: jlnav Date: Fri, 19 Sep 2025 15:39:30 -0500 Subject: [PATCH 16/28] fix redundant send of work if rows send to gen. tiny test fix --- libensemble/manager.py | 5 +++-- .../functionality_tests/test_executor_hworld_pass_fail.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/libensemble/manager.py b/libensemble/manager.py index 7afb705896..0cfa4b1224 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -483,14 +483,15 @@ def _send_work_order(self, Work: dict, w: int) -> None: if ( all([i in self.from_cache["H_row"] for i in work_rows]) and Work["tag"] == EVAL_SIM_TAG ): # if all rows in work_rows are found in cache - logger.debug("Manager skipping sending work to worker %s due to cache", w) + logger.debug("Manager skipping sending *all* work to worker %s due to cache", w) return H_to_be_sent = np.empty(len(work_rows), dtype=new_dtype) for i, row in enumerate(work_rows): H_to_be_sent[i] = repack_fields(self.hist.H[Work["H_fields"]][row]) - self.wcomms[w].send(Work["tag"], Work) + if Work["tag"] == EVAL_SIM_TAG: + self.wcomms[w].send(Work["tag"], Work) self.wcomms[w].send(0, H_to_be_sent) def _update_state_on_alloc(self, Work: dict, w: int): diff --git a/libensemble/tests/functionality_tests/test_executor_hworld_pass_fail.py b/libensemble/tests/functionality_tests/test_executor_hworld_pass_fail.py index 1b61bf8d25..0ddc852641 100644 --- a/libensemble/tests/functionality_tests/test_executor_hworld_pass_fail.py +++ b/libensemble/tests/functionality_tests/test_executor_hworld_pass_fail.py @@ -101,7 +101,7 @@ # For debug print(f"Expecting: {calc_status_list}") - print("Received: {H['cstat']}\n") + print(f"Received: {H['cstat']}\n") assert np.array_equal(H["cstat"], calc_status_list), "Error - unexpected calc status. Received: " + str( H["cstat"] From 343cc9be9944ecdaf05615eab4e7c3086f3ff3d4 Mon Sep 17 00:00:00 2001 From: jlnav Date: Wed, 24 Sep 2025 12:33:27 -0500 Subject: [PATCH 17/28] add libE_specs.cache_long_sims, plus more/better docstrings --- libensemble/history.py | 7 +++-- libensemble/manager.py | 71 +++++++++++++++++++++++++++++++----------- libensemble/specs.py | 10 ++++++ 3 files changed, 68 insertions(+), 20 deletions(-) diff --git a/libensemble/history.py b/libensemble/history.py index 2c6ecfa89d..f5400aa3b3 100644 --- a/libensemble/history.py +++ b/libensemble/history.py @@ -95,6 +95,7 @@ def __init__( self.index = len(H0) self.grow_count = 0 self.safe_mode = False + self.use_cache = False self.sim_started_count = np.sum(H["sim_started"]) self.sim_ended_count = np.sum(H["sim_ended"]) @@ -108,6 +109,7 @@ def __init__( self.last_started = -1 self.last_ended = -1 + def init_cache(self) -> None: self.cache_dir = Path.home() / ".libE" self.cache_dir.mkdir(parents=True, exist_ok=True) self.cache = self.cache_dir / Path("".join(sys.argv) + ".npy") @@ -140,7 +142,7 @@ def _shelf_longrunning_sims(self, index): entry = self.H[index][self.cache_keys] if entry not in in_cache: in_cache = np.append(in_cache, entry) - in_cache = np.unique(in_cache, axis=0) + in_cache = np.unique(in_cache, axis=0) # attempt to remove duplicates np.save(self.cache, in_cache, allow_pickle=True) self.cache_set = True @@ -181,7 +183,8 @@ def update_history_f(self, D: dict, kill_canceled_sims: bool = False) -> None: self.H["sim_ended"][ind] = True self.H["sim_ended_time"][ind] = time.time() self.sim_ended_count += 1 - self._shelf_longrunning_sims(ind) + if self.use_cache: + self._shelf_longrunning_sims(ind) if kill_canceled_sims: for j in range(self.last_ended + 1, np.max(new_inds) + 1): diff --git a/libensemble/manager.py b/libensemble/manager.py index 0cfa4b1224..7010fd31f4 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -204,9 +204,11 @@ def __init__( timer.start() self.date_start = timer.date_start.replace(" ", "_") self.safe_mode = libE_specs.get("safe_mode") + self.use_cache = libE_specs.get("cache_long_sims") self.kill_canceled_sims = libE_specs.get("kill_canceled_sims") self.hist = hist self.hist.safe_mode = self.safe_mode + self.hist.use_cache = self.use_cache self.libE_specs = libE_specs self.alloc_specs = alloc_specs self.sim_specs = sim_specs @@ -217,8 +219,10 @@ def __init__( self.WorkerExc = False self.persis_pending = [] self.live_data = libE_specs.get("live_data") - self.from_cache = [] - self.cache_index = 0 + if self.use_cache: + self.hist.init_cache() + self.from_cache = [] + self.cache_index = 0 self.cache_hit = False dyn_keys = ("resource_sets", "num_procs", "num_gpus") @@ -416,7 +420,11 @@ def _freeup_resources(self, w: int) -> None: def _refresh_from_cache( self, cache: npt.NDArray, dtype_with_idx: np.dtype, cache_row: npt.NDArray, work_row: int, w: int ) -> None: - """Add a cache entry, workerID, and H_row to the record array.""" + """Add a cache entry, workerID, and H_row to the local record array. + + Later on when we iterate over the cache for entries that could've been sent to a worker (but weren't), + we'll process that entry as though it came from this worker, with these H_rows. + """ self.cache_hit = True from_cache_entry = np.empty(1, dtype=dtype_with_idx) from_cache_entry["H_row"] = work_row @@ -429,7 +437,10 @@ def _refresh_from_cache( def _cache_scan( self, cache: npt.NDArray, Work: dict, w: int, dtype_with_idx: np.dtype, new_dtype: np.dtype ) -> None: - """Check if any work rows are in the cache, and if so, call the above, _refresh_from_cache.""" + """ + Check if any work rows are in the cache, and if so, call the above, _refresh_from_cache + to update the local `from_cache` record. + """ for field in np.dtype(new_dtype).names: if field in cache.dtype.names: @@ -438,17 +449,33 @@ def _cache_scan( if ( np.allclose(cache_row[field], self.hist.H[field][work_row]) and work_row not in self.from_cache["H_row"] - ): # we found outbound work in cache, that's not already been retrieved + ): # we found outbound work in cache, that's not already in the local record self._refresh_from_cache(cache, dtype_with_idx, cache_row, work_row, w) def _update_state_from_cache(self, Work: dict, work_rows: npt.NDArray, w: int, new_dtype: np.dtype) -> None: - """Retrieve saved cache from history, create local record-array of matching cache entries.""" + """Retrieve saved cache from history, create local record-array of matching cache entries. + + The `from_cache` local record contains cache entries and the workerID and H_rows they are associated with, had + they been sent to a worker. + + Cache entries *must* be associated with the preempted outbound worker and H_rows because those values + are always associated with actual inbound results. Later on, when we iterate over the cache for entries that + could've been sent to a worker (but weren't), we'll process that entry as though it came from that worker, + with those H_rows. + """ + cache = self.hist.get_shelved_sims() + + # our local record resembles the cache, but additionally with the worker_id and H_row from the alloc_f dtype_with_idx = np.dtype(cache.dtype.descr + np.dtype([("H_row", int), ("worker_id", int)]).descr) + + # initialize or grow the local record, then call _cache_scan to fill it if not len(self.from_cache): - self.from_cache = np.zeros(len(work_rows), dtype=dtype_with_idx) # all work may be in cache + self.from_cache = np.zeros(len(work_rows), dtype=dtype_with_idx) else: self.from_cache = np.append(self.from_cache, np.zeros(len(work_rows), dtype=dtype_with_idx)) + + # populates the local record self._cache_scan(cache, Work, w, dtype_with_idx, new_dtype) def _send_work_order(self, Work: dict, w: int) -> None: @@ -458,7 +485,7 @@ def _send_work_order(self, Work: dict, w: int) -> None: work_rows = Work["libE_info"]["H_rows"] new_dtype = [(name, self.hist.H.dtype.fields[name][0]) for name in Work["H_fields"]] - if Work["tag"] == EVAL_SIM_TAG and len(work_rows) and self.hist.cache_set: + if self.use_cache and Work["tag"] == EVAL_SIM_TAG and len(work_rows) and self.hist.cache_set: self._update_state_from_cache(Work, work_rows, w, new_dtype) if self.resources: @@ -532,6 +559,7 @@ def _receive_from_workers_or_cache(self, persis_info: dict) -> dict: """ time.sleep(0.0001) # Critical for multiprocessing performance + # Process messages from the cache if self.cache_hit: self.cache_hit = False for w in self.from_cache["worker_id"]: @@ -539,6 +567,7 @@ def _receive_from_workers_or_cache(self, persis_info: dict) -> dict: self.from_cache = [] self.cache_index = 0 + # Process messages from workers new_stuff = True while new_stuff: new_stuff = False @@ -596,6 +625,21 @@ def _update_state_on_worker_msg(self, persis_info: dict, D_recv: dict, w: int) - if D_recv.get("persis_info"): persis_info.setdefault(int(w), {}).update(D_recv["persis_info"]) + def _create_simulated_D_recv(self, w: int) -> dict: + """Create a simulated worker message containing the cache entry instead of a message from a worker.""" + + cache_entry_by_worker = self.from_cache[self.from_cache["worker_id"] == w] + D_recv = { + "calc_out": cache_entry_by_worker[[name[0] for name in self.sim_specs["out"]]], + "libE_info": { + "H_rows": cache_entry_by_worker["H_row"], + "workerID": w, + }, + "calc_status": 0, + "calc_type": 1, + } + return D_recv + def _handle_msg_from_worker(self, persis_info: dict, w: int, process_cache: bool = False) -> None: """Handles a message from worker w. @@ -604,16 +648,7 @@ def _handle_msg_from_worker(self, persis_info: dict, w: int, process_cache: bool """ try: if process_cache: - cache_entry_by_worker = self.from_cache["worker_id"] == w - D_recv = { - "calc_out": self.from_cache[cache_entry_by_worker][[name[0] for name in self.sim_specs["out"]]], - "libE_info": { - "H_rows": self.from_cache[cache_entry_by_worker]["H_row"], - "workerID": w, - }, - "calc_status": 0, - "calc_type": 1, - } + D_recv = self._create_simulated_D_recv(w) else: msg = self.wcomms[w].recv() tag, D_recv = msg diff --git a/libensemble/specs.py b/libensemble/specs.py index 308491303d..6450b98e0d 100644 --- a/libensemble/specs.py +++ b/libensemble/specs.py @@ -306,6 +306,16 @@ class LibeSpecs(BaseModel): Forms the base of a generator directory. """ + cache_long_sims: bool | None = False + """ + Cache simulation results with runtimes >1s to disk. Subsequent runs of the same + base script with the same command-line arguments will access this cache. + + Upon the generator creating points already in the cache, those points will be skipped from + being sent for evaluation. Instead the corresponding cached results are retrieved and returned + to the generator. + """ + calc_dir_id_width: int | None = 4 """ The width of the numerical ID component of a calculation directory name. Leading From ca7fe008dd909734ac761515f14b28f85361754b Mon Sep 17 00:00:00 2001 From: jlnav Date: Thu, 25 Sep 2025 15:40:37 -0500 Subject: [PATCH 18/28] manager builds libE_stats messages corresponding to cache retrievals. tiny refactors to worker.py to help reuse a method. add CACHE_RETRIEVE tag --- libensemble/manager.py | 31 ++++++++++++------- libensemble/message_numbers.py | 3 ++ .../test_executor_hworld_pass_fail.py | 2 ++ libensemble/worker.py | 7 +++-- pyproject.toml | 2 +- 5 files changed, 30 insertions(+), 15 deletions(-) diff --git a/libensemble/manager.py b/libensemble/manager.py index 7010fd31f4..f0b8922176 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -19,8 +19,10 @@ from numpy.lib.recfunctions import repack_fields from libensemble.comms.comms import CommFinishedException, QCommThread +from libensemble.comms.logs import LogConfig from libensemble.executors.executor import Executor from libensemble.message_numbers import ( + CACHE_RETRIEVE, EVAL_GEN_TAG, EVAL_SIM_TAG, FINISHED_PERSISTENT_GEN_TAG, @@ -29,6 +31,7 @@ MAN_SIGNAL_KILL, PERSIS_STOP, STOP_TAG, + calc_status_strings, calc_type_strings, ) from libensemble.resources.resources import Resources @@ -37,7 +40,7 @@ from libensemble.utils.misc import _WorkerIndexer, extract_H_ranges from libensemble.utils.output_directory import EnsembleDirectory from libensemble.utils.timer import Timer -from libensemble.worker import WorkerErrMsg, worker_main +from libensemble.worker import Worker, WorkerErrMsg, worker_main logger = logging.getLogger(__name__) # For debug messages - uncomment @@ -442,15 +445,17 @@ def _cache_scan( to update the local `from_cache` record. """ - for field in np.dtype(new_dtype).names: - if field in cache.dtype.names: - for work_row in Work["libE_info"]["H_rows"]: - for cache_row in cache: - if ( - np.allclose(cache_row[field], self.hist.H[field][work_row]) - and work_row not in self.from_cache["H_row"] - ): # we found outbound work in cache, that's not already in the local record - self._refresh_from_cache(cache, dtype_with_idx, cache_row, work_row, w) + self.cache_timer = Timer() + with self.cache_timer: + for field in np.dtype(new_dtype).names: + if field in cache.dtype.names: + for work_row in Work["libE_info"]["H_rows"]: + for cache_row in cache: + if ( + np.allclose(cache_row[field], self.hist.H[field][work_row]) + and work_row not in self.from_cache["H_row"] + ): # we found outbound work in cache, that's not already in the local record + self._refresh_from_cache(cache, dtype_with_idx, cache_row, work_row, w) def _update_state_from_cache(self, Work: dict, work_rows: npt.NDArray, w: int, new_dtype: np.dtype) -> None: """Retrieve saved cache from history, create local record-array of matching cache entries. @@ -635,7 +640,7 @@ def _create_simulated_D_recv(self, w: int) -> dict: "H_rows": cache_entry_by_worker["H_row"], "workerID": w, }, - "calc_status": 0, + "calc_status": CACHE_RETRIEVE, "calc_type": 1, } return D_recv @@ -649,6 +654,7 @@ def _handle_msg_from_worker(self, persis_info: dict, w: int, process_cache: bool try: if process_cache: D_recv = self._create_simulated_D_recv(w) + enum_desc, calc_id = Worker._extract_debug_data(1, D_recv) else: msg = self.wcomms[w].recv() tag, D_recv = msg @@ -668,6 +674,9 @@ def _handle_msg_from_worker(self, persis_info: dict, w: int, process_cache: bool else: if process_cache: logger.debug(f"Manager retrieved cached message redirected from worker {w}") + calc_msg = f"{enum_desc} {calc_id}: {"sim"} {self.cache_timer}" + calc_msg += f" Status: {calc_status_strings[CACHE_RETRIEVE]}" + logging.getLogger(LogConfig.config.stats_name).info(calc_msg) else: logger.debug(f"Manager received data message from worker {w}") self._update_state_on_worker_msg(persis_info, D_recv, w) diff --git a/libensemble/message_numbers.py b/libensemble/message_numbers.py index 0ecc7092e4..46ff8c987f 100644 --- a/libensemble/message_numbers.py +++ b/libensemble/message_numbers.py @@ -41,6 +41,8 @@ WORKER_DONE = 35 # Calculation was successful # last_calc_status_rst_tag CALC_EXCEPTION = 36 # Reserved: Automatically used if user_f raised an exception +CACHE_RETRIEVE = 40 # Manager retrieved sim from cache + MAN_KILL_SIGNALS = [MAN_SIGNAL_FINISH, MAN_SIGNAL_KILL] @@ -57,6 +59,7 @@ TASK_FAILED_TO_START: "Task Failed to start", WORKER_DONE: "Completed", CALC_EXCEPTION: "Exception occurred", + CACHE_RETRIEVE: "Retrieved from cache", None: "Unknown Status", } # last_calc_status_string_rst_tag diff --git a/libensemble/tests/functionality_tests/test_executor_hworld_pass_fail.py b/libensemble/tests/functionality_tests/test_executor_hworld_pass_fail.py index 0ddc852641..8a22f5df5b 100644 --- a/libensemble/tests/functionality_tests/test_executor_hworld_pass_fail.py +++ b/libensemble/tests/functionality_tests/test_executor_hworld_pass_fail.py @@ -55,6 +55,8 @@ if is_manager: print(f"\nCores req: {cores_all_tasks} Cores avail: {logical_cores}\n {mess_resources}\n") + libE_specs["cache_long_sims"] = True + sim_app = "./my_simtask.x" if not os.path.isfile(sim_app): build_simfunc() diff --git a/libensemble/worker.py b/libensemble/worker.py index 44d5f0ddeb..a82ff1db91 100644 --- a/libensemble/worker.py +++ b/libensemble/worker.py @@ -225,7 +225,8 @@ def _set_resources(workerID, comm: Comm) -> bool: logger.debug(f"No resources set on worker {workerID}") return False - def _extract_debug_data(self, calc_type, Work): + @staticmethod + def _extract_debug_data(calc_type, Work): if calc_type == EVAL_SIM_TAG: enum_desc = "sim_id" calc_id = extract_H_ranges(Work) @@ -256,7 +257,7 @@ def _handle_calc(self, Work: dict, calc_in: npt.NDArray) -> (npt.NDArray, dict, calc_type = Work["tag"] self.calc_iter[calc_type] += 1 - enum_desc, calc_id = self._extract_debug_data(calc_type, Work) + enum_desc, calc_id = Worker._extract_debug_data(calc_type, Work) timer = Timer() @@ -312,7 +313,7 @@ def _handle_calc(self, Work: dict, calc_in: npt.NDArray) -> (npt.NDArray, dict, logging.getLogger(LogConfig.config.stats_name).info(calc_msg) - def _get_calc_msg(self, enum_desc: str, calc_id: int, calc_type: int, timer: Timer, status: str) -> str: + def _get_calc_msg(self, enum_desc: str, calc_id: int, calc_type: str, timer: Timer, status: str) -> str: """Construct line for libE_stats.txt file""" calc_msg = f"{enum_desc} {calc_id}: {calc_type} {timer}" diff --git a/pyproject.toml b/pyproject.toml index 68d5654da3..231afa5bbf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -142,4 +142,4 @@ extend-exclude = ["*.bib", "*.xml", "docs/nitpicky"] disable_error_code = ["import-not-found", "import-untyped"] [dependency-groups] -dev = ["pyenchant", "enchant>=0.0.1,<0.0.2", "flake8-modern-annotations>=1.6.0,<2", "flake8-type-checking>=3.0.0,<4"] +dev = ["pyenchant", "enchant>=0.0.1,<0.0.2", "flake8-modern-annotations>=1.6.0,<2", "flake8-type-checking>=3.0.0,<4", "wat>=0.7.0,<0.8"] From 1f55a616f80ee327fe7bed5296315e87bd5e166f Mon Sep 17 00:00:00 2001 From: jlnav Date: Fri, 26 Sep 2025 15:06:37 -0500 Subject: [PATCH 19/28] user can specify database name; trying to figure out occasionally-malformed cache data / H_rows from the cache --- libensemble/history.py | 11 +++++++---- libensemble/manager.py | 7 ++++--- libensemble/specs.py | 8 ++++++++ .../test_executor_hworld_pass_fail.py | 1 + 4 files changed, 20 insertions(+), 7 deletions(-) diff --git a/libensemble/history.py b/libensemble/history.py index f5400aa3b3..d6f38b8e18 100644 --- a/libensemble/history.py +++ b/libensemble/history.py @@ -1,5 +1,4 @@ import logging -import sys import time from pathlib import Path @@ -109,12 +108,13 @@ def __init__( self.last_started = -1 self.last_ended = -1 - def init_cache(self) -> None: + def init_cache(self, cache_name: str) -> None: self.cache_dir = Path.home() / ".libE" self.cache_dir.mkdir(parents=True, exist_ok=True) - self.cache = self.cache_dir / Path("".join(sys.argv) + ".npy") + self.cache = self.cache_dir / Path(cache_name + ".npy") if not self.cache.exists(): self.cache.touch() + self.use_cache = True self.cache_set = False def _append_new_fields(self, H_f: npt.NDArray) -> None: @@ -147,7 +147,10 @@ def _shelf_longrunning_sims(self, index): self.cache_set = True def get_shelved_sims(self) -> npt.NDArray: - in_cache = np.load(self.cache, allow_pickle=True) + try: + in_cache = np.load(self.cache, allow_pickle=True) + except EOFError: + in_cache = np.zeros(1, dtype=self.cache_dtype) return in_cache def update_history_f(self, D: dict, kill_canceled_sims: bool = False) -> None: diff --git a/libensemble/manager.py b/libensemble/manager.py index f0b8922176..a8a17733db 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -211,7 +211,6 @@ def __init__( self.kill_canceled_sims = libE_specs.get("kill_canceled_sims") self.hist = hist self.hist.safe_mode = self.safe_mode - self.hist.use_cache = self.use_cache self.libE_specs = libE_specs self.alloc_specs = alloc_specs self.sim_specs = sim_specs @@ -223,7 +222,7 @@ def __init__( self.persis_pending = [] self.live_data = libE_specs.get("live_data") if self.use_cache: - self.hist.init_cache() + self.hist.init_cache(self.libE_specs.get("cache_name")) self.from_cache = [] self.cache_index = 0 self.cache_hit = False @@ -655,6 +654,8 @@ def _handle_msg_from_worker(self, persis_info: dict, w: int, process_cache: bool if process_cache: D_recv = self._create_simulated_D_recv(w) enum_desc, calc_id = Worker._extract_debug_data(1, D_recv) + if calc_id.startswith("0_0"): + print("why do we have weird sim_id values?") else: msg = self.wcomms[w].recv() tag, D_recv = msg @@ -676,7 +677,7 @@ def _handle_msg_from_worker(self, persis_info: dict, w: int, process_cache: bool logger.debug(f"Manager retrieved cached message redirected from worker {w}") calc_msg = f"{enum_desc} {calc_id}: {"sim"} {self.cache_timer}" calc_msg += f" Status: {calc_status_strings[CACHE_RETRIEVE]}" - logging.getLogger(LogConfig.config.stats_name).info(calc_msg) + logging.getLogger(LogConfig.config.stats_name).info(calc_msg) # libE_stats else: logger.debug(f"Manager received data message from worker {w}") self._update_state_on_worker_msg(persis_info, D_recv, w) diff --git a/libensemble/specs.py b/libensemble/specs.py index 6450b98e0d..bebef1c5c4 100644 --- a/libensemble/specs.py +++ b/libensemble/specs.py @@ -1,4 +1,5 @@ import random +import sys import warnings from pathlib import Path @@ -314,6 +315,13 @@ class LibeSpecs(BaseModel): Upon the generator creating points already in the cache, those points will be skipped from being sent for evaluation. Instead the corresponding cached results are retrieved and returned to the generator. + + The cache is saved in $HOME/.libE, and by default is named after the joined command-line arguments. + """ + + cache_name: str | None = Path.home() / ".libE" / Path("".join(sys.argv) + ".npy") + """ + The name of the cache file. By default is the joined command-line arguments. """ calc_dir_id_width: int | None = 4 diff --git a/libensemble/tests/functionality_tests/test_executor_hworld_pass_fail.py b/libensemble/tests/functionality_tests/test_executor_hworld_pass_fail.py index 8a22f5df5b..4bd12a61db 100644 --- a/libensemble/tests/functionality_tests/test_executor_hworld_pass_fail.py +++ b/libensemble/tests/functionality_tests/test_executor_hworld_pass_fail.py @@ -56,6 +56,7 @@ print(f"\nCores req: {cores_all_tasks} Cores avail: {logical_cores}\n {mess_resources}\n") libE_specs["cache_long_sims"] = True + libE_specs["cache_name"] = "asdf" sim_app = "./my_simtask.x" if not os.path.isfile(sim_app): From 9a3e0c48f2e03a9b297e3dee7e4a449e0e9f71f0 Mon Sep 17 00:00:00 2001 From: jlnav Date: Wed, 22 Oct 2025 08:28:17 -0500 Subject: [PATCH 20/28] fix syntax error uncaught by black and other tools? --- libensemble/manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libensemble/manager.py b/libensemble/manager.py index a8a17733db..8e395f6210 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -675,7 +675,7 @@ def _handle_msg_from_worker(self, persis_info: dict, w: int, process_cache: bool else: if process_cache: logger.debug(f"Manager retrieved cached message redirected from worker {w}") - calc_msg = f"{enum_desc} {calc_id}: {"sim"} {self.cache_timer}" + calc_msg = f"""{enum_desc} {calc_id}: {"sim"} {self.cache_timer}""" calc_msg += f" Status: {calc_status_strings[CACHE_RETRIEVE]}" logging.getLogger(LogConfig.config.stats_name).info(calc_msg) # libE_stats else: From 702c8d9615a7e842e01b890601ffe73271f43485 Mon Sep 17 00:00:00 2001 From: jlnav Date: Wed, 22 Oct 2025 08:32:21 -0500 Subject: [PATCH 21/28] cache_name is only string. path not needed in specs.py --- libensemble/specs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libensemble/specs.py b/libensemble/specs.py index bebef1c5c4..96a869671e 100644 --- a/libensemble/specs.py +++ b/libensemble/specs.py @@ -319,7 +319,7 @@ class LibeSpecs(BaseModel): The cache is saved in $HOME/.libE, and by default is named after the joined command-line arguments. """ - cache_name: str | None = Path.home() / ".libE" / Path("".join(sys.argv) + ".npy") + cache_name: str | Path | None = Path.home() / ".libE" / Path("".join(sys.argv) + ".npy") """ The name of the cache file. By default is the joined command-line arguments. """ From 08c179001bbccd48937e4aa5c550152e924a0feb Mon Sep 17 00:00:00 2001 From: jlnav Date: Wed, 22 Oct 2025 09:27:27 -0500 Subject: [PATCH 22/28] param fix --- libensemble/specs.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libensemble/specs.py b/libensemble/specs.py index 96a869671e..5f750b42b3 100644 --- a/libensemble/specs.py +++ b/libensemble/specs.py @@ -319,9 +319,10 @@ class LibeSpecs(BaseModel): The cache is saved in $HOME/.libE, and by default is named after the joined command-line arguments. """ - cache_name: str | Path | None = Path.home() / ".libE" / Path("".join(sys.argv) + ".npy") + cache_name: str | None = "".join(sys.argv) """ The name of the cache file. By default is the joined command-line arguments. + Stored in $HOME/.libE, and by default is named after the joined command-line arguments. """ calc_dir_id_width: int | None = 4 From 9270b939feb105d84afeadb6815c577a2daa1b56 Mon Sep 17 00:00:00 2001 From: jlnav Date: Wed, 22 Oct 2025 09:43:12 -0500 Subject: [PATCH 23/28] still want to send Work on persis_stop if we're doing final_gen_send --- libensemble/manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libensemble/manager.py b/libensemble/manager.py index 8e395f6210..7d33cb2f14 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -521,7 +521,7 @@ def _send_work_order(self, Work: dict, w: int) -> None: for i, row in enumerate(work_rows): H_to_be_sent[i] = repack_fields(self.hist.H[Work["H_fields"]][row]) - if Work["tag"] == EVAL_SIM_TAG: + if Work["tag"] in [EVAL_SIM_TAG, PERSIS_STOP]: # inclusion of PERSIS_STOP for final_gen_send self.wcomms[w].send(Work["tag"], Work) self.wcomms[w].send(0, H_to_be_sent) From d92b3c3273e4db6bbcd75e412a58cf2e36e59f08 Mon Sep 17 00:00:00 2001 From: jlnav Date: Wed, 22 Oct 2025 13:36:18 -0500 Subject: [PATCH 24/28] don't necessarily need cache collisions for these executor tests - since we're comparing to returned calc statuses --- .../tests/functionality_tests/test_executor_hworld_pass_fail.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libensemble/tests/functionality_tests/test_executor_hworld_pass_fail.py b/libensemble/tests/functionality_tests/test_executor_hworld_pass_fail.py index 4bd12a61db..4d6d89ee20 100644 --- a/libensemble/tests/functionality_tests/test_executor_hworld_pass_fail.py +++ b/libensemble/tests/functionality_tests/test_executor_hworld_pass_fail.py @@ -56,7 +56,7 @@ print(f"\nCores req: {cores_all_tasks} Cores avail: {logical_cores}\n {mess_resources}\n") libE_specs["cache_long_sims"] = True - libE_specs["cache_name"] = "asdf" + libE_specs["cache_name"] = "executor_hworld_" + str(nworkers) + "_" + libE_specs.get("comms") sim_app = "./my_simtask.x" if not os.path.isfile(sim_app): From 10946bc5334ba3daf2ffb3f07e770d9f3e3c687f Mon Sep 17 00:00:00 2001 From: jlnav Date: Wed, 22 Oct 2025 13:58:33 -0500 Subject: [PATCH 25/28] fix iterating over blank template cache entries as though they're valid data --- libensemble/manager.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/libensemble/manager.py b/libensemble/manager.py index 7d33cb2f14..a16901e778 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -567,7 +567,8 @@ def _receive_from_workers_or_cache(self, persis_info: dict) -> dict: if self.cache_hit: self.cache_hit = False for w in self.from_cache["worker_id"]: - self._handle_msg_from_worker(persis_info, w, process_cache=True) + if w > 0: # actual cache entry - not blank. assuming w0 gets no sim work + self._handle_msg_from_worker(persis_info, w, process_cache=True) self.from_cache = [] self.cache_index = 0 @@ -654,8 +655,6 @@ def _handle_msg_from_worker(self, persis_info: dict, w: int, process_cache: bool if process_cache: D_recv = self._create_simulated_D_recv(w) enum_desc, calc_id = Worker._extract_debug_data(1, D_recv) - if calc_id.startswith("0_0"): - print("why do we have weird sim_id values?") else: msg = self.wcomms[w].recv() tag, D_recv = msg From 11694e7df3fddc44bfe8ca34dbb37a97d7238e76 Mon Sep 17 00:00:00 2001 From: jlnav Date: Wed, 22 Oct 2025 14:00:25 -0500 Subject: [PATCH 26/28] add functionality test for cache_sims --- .../functionality_tests/test_cache_sims.py | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 libensemble/tests/functionality_tests/test_cache_sims.py diff --git a/libensemble/tests/functionality_tests/test_cache_sims.py b/libensemble/tests/functionality_tests/test_cache_sims.py new file mode 100644 index 0000000000..885a6556ea --- /dev/null +++ b/libensemble/tests/functionality_tests/test_cache_sims.py @@ -0,0 +1,70 @@ +""" +Runs libEnsemble with Latin hypercube sampling on a simple 1D problem + +Execute via one of the following commands (e.g. 3 workers): + mpiexec -np 4 python test_1d_sampling.py + python test_1d_sampling.py --nworkers 3 + python test_1d_sampling.py --nworkers 3 --comms tcp + +The number of concurrent evaluations of the objective function will be 4-1=3. +""" + +# Do not change these lines - they are parsed by run-tests.sh +# TESTSUITE_COMMS: mpi local +# TESTSUITE_NPROCS: 2 4 + +import time + +import numpy as np + +from libensemble.gen_funcs.sampling import latin_hypercube_sample as gen_f + +# Import libEnsemble items for this test +from libensemble.libE import libE +from libensemble.tools import add_unique_random_streams, parse_args, save_libE_output + + +def sim_f(In): + Out = np.zeros(1, dtype=[("f", float)]) + time.sleep(1.1) + Out["f"] = np.linalg.norm(In) + return Out + + +if __name__ == "__main__": + nworkers, is_manager, libE_specs, _ = parse_args() + libE_specs["cache_long_sims"] = True + + sim_specs = { + "sim_f": sim_f, + "in": ["x"], + "out": [("f", float)], + } + + gen_specs = { + "gen_f": gen_f, + "out": [("x", float, (1,))], + "user": { + "gen_batch_size": 10, + "lb": np.array([-3]), + "ub": np.array([3]), + }, + } + + persis_info = add_unique_random_streams({}, nworkers + 1, seed=1234) + + exit_criteria = {"sim_max": 11} + + H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, libE_specs=libE_specs) + + if is_manager: + assert len(H) >= 11 + print("\nlibEnsemble with random sampling has generated enough points") + save_libE_output(H, persis_info, __file__, nworkers) + + persis_info = add_unique_random_streams({}, nworkers + 1, seed=1234) + H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, libE_specs=libE_specs) + + if is_manager: + # better way of seeing "long" sims not actually taking so long (because of cache?) + assert any(H["sim_ended_time"] - H["sim_started_time"] < 1.1) From 019b82e8e828dd11993f02a7c5537586701335f0 Mon Sep 17 00:00:00 2001 From: jlnav Date: Wed, 22 Oct 2025 14:36:20 -0500 Subject: [PATCH 27/28] non-existing cache already dealt with earlier? --- libensemble/history.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/libensemble/history.py b/libensemble/history.py index d6f38b8e18..011792de64 100644 --- a/libensemble/history.py +++ b/libensemble/history.py @@ -147,11 +147,7 @@ def _shelf_longrunning_sims(self, index): self.cache_set = True def get_shelved_sims(self) -> npt.NDArray: - try: - in_cache = np.load(self.cache, allow_pickle=True) - except EOFError: - in_cache = np.zeros(1, dtype=self.cache_dtype) - return in_cache + return np.load(self.cache, allow_pickle=True) def update_history_f(self, D: dict, kill_canceled_sims: bool = False) -> None: """ From 75105b8835fea9570a6845b17dd849fa76f90475 Mon Sep 17 00:00:00 2001 From: jlnav Date: Thu, 23 Oct 2025 09:04:03 -0500 Subject: [PATCH 28/28] add the new libe specs options to libe_specs.rst --- docs/data_structures/libE_specs.rst | 21 +++++++++++++++------ libensemble/specs.py | 4 ++-- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/docs/data_structures/libE_specs.rst b/docs/data_structures/libE_specs.rst index caa7b2eda8..6cf58b4e2a 100644 --- a/docs/data_structures/libE_specs.rst +++ b/docs/data_structures/libE_specs.rst @@ -9,12 +9,7 @@ libEnsemble is primarily customized by setting options within a ``LibeSpecs`` cl from libensemble.specs import LibeSpecs - specs = LibeSpecs( - gen_on_manager=True, - save_every_k_gens=100, - sim_dirs_make=True, - nworkers=4 - ) + specs = LibeSpecs(gen_on_manager=True, save_every_k_gens=100, sim_dirs_make=True, nworkers=4) .. dropdown:: Settings by Category :open: @@ -60,6 +55,20 @@ libEnsemble is primarily customized by setting options within a ``LibeSpecs`` cl List of workers that should run only generators. All other workers will run only simulator functions. + **cache_long_sims** [bool] = ``False``: + Cache simulation results with runtimes >1s to disk. Subsequent runs of the same + base script with the same command-line arguments will access this cache. + + Upon the generator creating points already in the cache, those points will be skipped from + being sent for evaluation. Instead the corresponding cached results are retrieved and returned + to the generator. + + The cache is saved in ``$HOME/.libE``, and by default is named after the joined command-line arguments. + + **cache_name** [str] = ``"".join(sys.argv)`` + The name of the cache file. Stored in ``$HOME/.libE``, and by default is named after the + joined command-line arguments. + .. tab-item:: Directories .. tab-set:: diff --git a/libensemble/specs.py b/libensemble/specs.py index 5f750b42b3..5cfb781a83 100644 --- a/libensemble/specs.py +++ b/libensemble/specs.py @@ -321,8 +321,8 @@ class LibeSpecs(BaseModel): cache_name: str | None = "".join(sys.argv) """ - The name of the cache file. By default is the joined command-line arguments. - Stored in $HOME/.libE, and by default is named after the joined command-line arguments. + The name of the cache file. Stored in $HOME/.libE, and by default is named after the + joined command-line arguments. """ calc_dir_id_width: int | None = 4