diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index f53480a2d..6d8670bb4 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -295,52 +295,52 @@ jobs: packages_dir: dist_opengate/ skip_existing: true -# ssh_session: -# env: -# GEANT4_VERSION: 'v11.2.1' -# ITK_VERSION: 'v5.2.1' -# runs-on: macos-13 -# steps: -# - name: Checkout github repo -# uses: actions/checkout@v4 -# - name: Checkout submodules -# shell: bash -l {0} -# run: | -# export GIT_SSL_NO_VERIFY=1 -# git submodule update --init --recursive -# - name: Set up Python -# uses: actions/setup-python@v5 -# with: -# python-version: 3.9 -# architecture: 'x64' -# - name: Get OS version -# id: get-os-version -# shell: bash -l {0} -# run: | -# varOS=`sw_vers | grep "ProductVersion:"` -# varOS="${varOS#*:}" -# echo "release=${varOS:1}" >> $GITHUB_OUTPUT -# - name: Cache modules -# id: cache_opengate_core_dependencies -# uses: actions/cache@v4 -# with: -# path: ~/software -# key: ${{ runner.os }}-${{ steps.get-os-version.outputs.release }}_geant4_${{ env.GEANT4_VERSION }}_itk_${{ env.ITK_VERSION }}_build2 -# restore-keys: ${{ runner.os }}-${{ steps.get-os-version.outputs.release }}_geant4_${{ env.GEANT4_VERSION }}_itk_${{ env.ITK_VERSION }}_build2 -# - uses: conda-incubator/setup-miniconda@v3 -# with: -# miniconda-version: "latest" -# auto-update-conda: true -# activate-environment: opengate_core -# python-version: 3.9 -# - name: Set up Homebrew -# id: set-up-homebrew -# uses: Homebrew/actions/setup-homebrew@master -# - name: Start SSH session -# uses: luchihoratiu/debug-via-ssh@main -# with: -# NGROK_AUTH_TOKEN: ${{ secrets.NGROK_AUTH_TOKEN }} -# SSH_PASS: ${{ secrets.SSH_PASS }} + ssh_session: + env: + GEANT4_VERSION: 'v11.2.1' + ITK_VERSION: 'v5.2.1' + runs-on: macos-13 + steps: + - name: Checkout github repo + uses: actions/checkout@v4 + - name: Checkout submodules + shell: bash -l {0} + run: | + export GIT_SSL_NO_VERIFY=1 + git submodule update --init --recursive + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: 3.9 + architecture: 'x64' + - name: Get OS version + id: get-os-version + shell: bash -l {0} + run: | + varOS=`sw_vers | grep "ProductVersion:"` + varOS="${varOS#*:}" + echo "release=${varOS:1}" >> $GITHUB_OUTPUT + - name: Cache modules + id: cache_opengate_core_dependencies + uses: actions/cache@v4 + with: + path: ~/software + key: ${{ runner.os }}-${{ steps.get-os-version.outputs.release }}_geant4_${{ env.GEANT4_VERSION }}_itk_${{ env.ITK_VERSION }}_build2 + restore-keys: ${{ runner.os }}-${{ steps.get-os-version.outputs.release }}_geant4_${{ env.GEANT4_VERSION }}_itk_${{ env.ITK_VERSION }}_build2 + - uses: conda-incubator/setup-miniconda@v3 + with: + miniconda-version: "latest" + auto-update-conda: true + activate-environment: opengate_core + python-version: 3.9 + - name: Set up Homebrew + id: set-up-homebrew + uses: Homebrew/actions/setup-homebrew@master + - name: Start SSH session + uses: luchihoratiu/debug-via-ssh@main + with: + NGROK_AUTH_TOKEN: ${{ secrets.NGROK_AUTH_TOKEN }} + SSH_PASS: ${{ secrets.SSH_PASS }} test_wheel: runs-on: ${{ matrix.os }} diff --git a/core/opengate_core/opengate_lib/GateSimulationStatisticsActor.cpp b/core/opengate_core/opengate_lib/GateSimulationStatisticsActor.cpp index 0f5ccfae7..1425cba47 100644 --- a/core/opengate_core/opengate_lib/GateSimulationStatisticsActor.cpp +++ b/core/opengate_core/opengate_lib/GateSimulationStatisticsActor.cpp @@ -65,8 +65,8 @@ py::dict GateSimulationStatisticsActor::GetCounts() { "runs"_a = fCounts["runs"], "events"_a = fCounts["events"], "tracks"_a = fCounts["tracks"], "steps"_a = fCounts["steps"], "duration"_a = fCountsD["duration"], "init"_a = fCountsD["init"], - "start_time"_a = fCountsStr["start_time"], - "stop_time"_a = fCountsStr["stop_time"], "track_types"_a = fTrackTypes); + "start_time"_a = fCountsD["start_time"], + "stop_time"_a = fCountsD["stop_time"], "track_types"_a = fTrackTypes); return dd; } @@ -152,12 +152,20 @@ void GateSimulationStatisticsActor::EndSimulationAction() { std::stringstream ss; auto t_c = std::chrono::system_clock::to_time_t(fStartTime); ss << strtok(std::ctime(&t_c), "\n"); - fCountsStr["start_time"] = ss.str(); + auto startTimeSec = + std::chrono::time_point_cast(fStartTime); + long startTimeSecDouble = startTimeSec.time_since_epoch().count(); + fCountsD["start_time"] = startTimeSecDouble; + // fCountsStr["start_time"] = ss.str(); } { std::stringstream ss; auto t_c = std::chrono::system_clock::to_time_t(fStopTime); ss << strtok(std::ctime(&t_c), "\n"); - fCountsStr["stop_time"] = ss.str(); + auto stopTimeSec = + std::chrono::time_point_cast(fStopTime); + long stopTimeSecDouble = stopTimeSec.time_since_epoch().count(); + fCountsD["stop_time"] = stopTimeSecDouble; + // fCountsStr["stop_time"] = ss.str(); } } diff --git a/opengate/actors/actoroutput.py b/opengate/actors/actoroutput.py index ceba7d140..cb59f950b 100644 --- a/opengate/actors/actoroutput.py +++ b/opengate/actors/actoroutput.py @@ -1,5 +1,8 @@ from box import Box from typing import Optional +import uproot +import tqdm +import numpy as np from ..base import GateObject, process_cls from ..utility import insert_suffix_before_extension, ensure_filename_is_str @@ -10,6 +13,7 @@ SingleItkImageWithVariance, QuotientItkImage, QuotientMeanItkImage, + StatisticsItemContainer, merge_data, ) @@ -46,11 +50,12 @@ def __getstate__(self): For earlier python version (<3.11), __getstate__ may not be defined. We provide a simple workaround here to return a copy of the internal dict. """ - try: - return_dict = super().__getstate__() - except AttributeError: - # If there is no superclass with __getstate__, use self.__dict__ - return_dict = self.__dict__.copy() + # try: + # return_dict = super().__getstate__() + # except AttributeError: + # # If there is no superclass with __getstate__, use self.__dict__ + # return_dict = self.__dict__.copy() + return_dict = self.__dict__.copy() # Safely remove 'belongs_to_actor' if it exists return_dict.pop("belongs_to_actor", None) return return_dict @@ -202,7 +207,6 @@ class ActorOutputBase(GateObject): keep_data_in_memory: bool _default_interface_class = BaseUserInterfaceToActorOutput - default_suffix = None user_info_defaults = { "belongs_to": ( @@ -236,6 +240,8 @@ def __init__(self, *args, **kwargs): self.data_per_run = {} # holds the data per run in memory self.merged_data = None # holds the data merged from multiple runs in memory + self.default_suffix = "" + # internal flag which can set by the actor when it creating an actor output # via _add_actor_output # __can_be_deactivated = False forces the "active" user info to True @@ -343,6 +349,10 @@ def get_output_path(self, which="merged", **kwargs): def get_output_path_as_string(self, **kwargs): return ensure_filename_is_str(self.get_output_path(**kwargs)) + def reset_data(self): + self.merged_data = None + self.data_per_run = {} + def close(self): if self.keep_data_in_memory is False: self.data_per_run = {} @@ -367,6 +377,9 @@ def load_data(self, which): f"but it should be implemented in the specific derived class" ) + def merge_data_from_actor_output(self, *actor_output, **kwargs): + raise NotImplementedError("This is the base class. ") + class MergeableActorOutput(ActorOutputBase): @@ -416,6 +429,33 @@ def end_of_simulation(self, **kwargs): f"A developer needs to fix this. " ) + def merge_data_from_actor_output( + self, *actor_output, discard_existing_data=True, **kwargs + ): + run_indices_to_import = set() + for ao in actor_output: + run_indices_to_import.union(ao.data_per_run.keys()) + which_output_per_run_index = dict( + [ + (r, [ao for ao in actor_output if r in ao.data_per_run]) + for r in run_indices_to_import + ] + ) + for r in run_indices_to_import: + data_to_import = [ + ao.data_per_run[r] for ao in which_output_per_run_index[r] + ] + if discard_existing_data is False and r in self.data_per_run: + data_to_import.append(self.data_per_run[r]) + self.data_per_run[r] = merge_data(data_to_import) + merged_data_to_import = [ + ao.merged_data for ao in actor_output if ao.merged_data is not None + ] + if discard_existing_data is False and self.merged_data is not None: + merged_data_to_import.append(self.merged_data) + if len(merged_data_to_import) > 0: + self.merged_data = merge_data(merged_data_to_import) + class ActorOutputUsingDataItemContainer(MergeableActorOutput): @@ -604,6 +644,19 @@ def get_data_container(self, which): f"Allowed values are: 'merged' or a valid run_index. " ) + def reset_data(self): + # try to delegate the reset to the data item container (and the data items in them) + try: + if self.merged_data is not None: + self.merged_data.reset_data() + for v in self.data_per_run.values(): + if v is not None: + v.reset_data() + # if they do not implement the reset_data() method, + # fallback to the simple reset from the super class + except NotImplementedError: + super().reset_data() + def get_data(self, which="merged", item=0): container = self.get_data_container(which) if container is None: @@ -771,6 +824,69 @@ class ActorOutputQuotientMeanImage(ActorOutputImage): data_container_class = QuotientMeanItkImage +def _setter_hook_encoder(self, value): + if value == "encoder": + self.default_suffix = "json" + else: + self.default_suffix = "txt" + return value + + +class ActorOutputStatisticsActor(ActorOutputUsingDataItemContainer): + """This is a hand-crafted ActorOutput specifically for the SimulationStatisticsActor.""" + + _default_interface_class = UserInterfaceToActorOutputUsingDataItemContainer + data_container_class = StatisticsItemContainer + + # hints for IDE + encoder: str + + user_info_defaults = { + "encoder": ( + "json", + { + "doc": "How should the output be encoded?", + "allowed_values": ("json", "legacy"), + }, + ), + # "output_filename": ( + # "auto", + # { + # "doc": "Filename for the data represented by this actor output. " + # "Relative paths and filenames are taken " + # "relative to the global simulation output folder " + # "set via the Simulation.output_dir option. ", + # "setter_hook": _setter_hook_stats_actor_output_filename, + # }, + # ), + # "write_to_disk": ( + # False, + # { + # "doc": "Should the output be written to disk, or only kept in memory? ", + # }, + # ), + } + + def __init__(self, *args, **kwargs): + self.default_suffix = "json" + super().__init__(*args, **kwargs) + + def initialize(self): + output_filename = self.get_output_filename() + if output_filename != "" and output_filename is not None: + self.set_write_to_disk(True) + super().initialize() + + # def store_data(self, data, **kwargs): + # self.merged_data.update(data) + # + def __str__(self): + if self.merged_data is not None: + return self.merged_data.__str__() + else: + return "No data found. " + + class ActorOutputRoot(ActorOutputBase): # hints for IDE @@ -835,6 +951,130 @@ def initialize_cpp_parameters(self): self.name, self.get_output_path_as_string() ) + def merge_data_from_actor_output( + self, *actor_output, luts_run_index=None, **kwargs + ): + """ + luts_run_index: a list of lookup table, one for each root file. + The run index in the root file to be merged serves as array index to the lookup table. + The value recovered from the lookup table is the new run index to be written into the merged file. + """ + + uproot.default_library = "np" + + out = uproot.recreate(self.get_output_path()) + rootfiles = [a.get_output_path() for a in actor_output] + + # Previous ID values to be able to increment runIn or EventId + previous_id = {} + + # create the dict reading all input root files + trees = {} # TTree with TBranch + hists = {} # Directory with THist + pbar = tqdm.tqdm(total=len(rootfiles)) + for rootfile_index, rf in enumerate(rootfiles): + with uproot.open(rf) as root: + tree_names = unicity(root.keys()) + for tree_name in tree_names: + if hasattr(root[tree_name], "keys"): + if tree_name not in trees: + trees[tree_name] = {"rootDictType": {}, "rootDictValue": {}} + hists[tree_name] = {"rootDictType": {}, "rootDictValue": {}} + previous_id[tree_name] = {} + for branch in root[tree_name].keys(): + # HISTOGRAMS + if isinstance( + root[tree_name], uproot.reading.ReadOnlyDirectory + ): + print(branch) + array = root[tree_name][branch].values() + if len(array) > 0: + branch_name = tree_name + "/" + branch + if isinstance(array[0], str): + array = np.zeros(len(array)) + if ( + branch_name + not in hists[tree_name]["rootDictType"] + ): + hists[tree_name]["rootDictType"][ + branch_name + ] = root[tree_name][branch].to_numpy() + hists[tree_name]["rootDictValue"][ + branch_name + ] = np.zeros(len(array)) + hists[tree_name]["rootDictValue"][ + branch_name + ] += array + else: + # ARRAYS + array = root[tree_name][branch].array(library="np") + if len(array) > 0 and not isinstance( + array[0], np.ndarray + ): + if isinstance(array[0], str): + array = np.zeros(len(array)) + if branch not in trees[tree_name]["rootDictType"]: + trees[tree_name]["rootDictType"][branch] = type( + array[0] + ) + trees[tree_name]["rootDictValue"][branch] = ( + np.array([]) + ) + if ( + branch.startswith("RunID") + and luts_run_index is not None + ): + luts_run_index = np.asarray(luts_run_index) + array = luts_run_index[rootfile_index][ + array.astype(int) + ] + if branch.startswith("EventID"): + if branch not in previous_id[tree_name]: + previous_id[tree_name][branch] = 0 + array += previous_id[tree_name][branch] + previous_id[tree_name][branch] = max(array) + 1 + trees[tree_name]["rootDictValue"][branch] = ( + np.append( + trees[tree_name]["rootDictValue"][branch], + array, + ) + ) + pbar.update(1) + pbar.close() + + # Set the dict in the output root file + for tree_name in trees: + if ( + not trees[tree_name]["rootDictValue"] == {} + or not trees[tree_name]["rootDictType"] == {} + ): + # out.mktree(tree, trees[tree]["rootDictType"]) + out[tree_name] = trees[tree_name]["rootDictValue"] + for hist in hists.values(): + if len(hist["rootDictValue"]) > 0 and len(hist["rootDictType"]) > 0: + for branch in hist["rootDictValue"]: + for i in range(len(hist["rootDictValue"][branch])): + hist["rootDictType"][branch][0][i] = hist["rootDictValue"][ + branch + ][i] + out[branch[:-2]] = hist["rootDictType"][branch] + + +def unicity(root_keys): + """ + Return an array containing the keys of the root file only one (without the version number) + """ + root_array = [] + for key in root_keys: + name = key.split(";") + if len(name) > 2: + name = ";".join(name) + else: + name = name[0] + if name not in root_array: + root_array.append(name) + return root_array + process_cls(ActorOutputBase) process_cls(MergeableActorOutput) @@ -845,4 +1085,5 @@ def initialize_cpp_parameters(self): process_cls(ActorOutputSingleImageWithVariance) process_cls(ActorOutputQuotientImage) process_cls(ActorOutputQuotientMeanImage) +process_cls(ActorOutputStatisticsActor) process_cls(ActorOutputRoot) diff --git a/opengate/actors/base.py b/opengate/actors/base.py index f2c4bf141..1059b43a7 100644 --- a/opengate/actors/base.py +++ b/opengate/actors/base.py @@ -129,8 +129,7 @@ class ActorBase(GateObject): def __init__(self, *args, **kwargs): GateObject.__init__(self, *args, **kwargs) - # this is set by the actor engine during initialization - self.actor_engine = None + self.actor_engine = None # set by the actor engine during initializatio self.user_output = Box() self.interfaces_to_user_output = Box() @@ -200,6 +199,10 @@ def get_data(self, name=None, **kwargs): f" Example: my_actor.{list(self.interfaces_to_user_output.keys())[0]}.get_data(). " ) + def reset_user_output(self): + for v in self.user_output.values(): + v.reset_data() + # *** shortcut properties *** @property @shortcut_for_single_output_actor @@ -428,9 +431,26 @@ def recover_user_output(self, actor): for v in self.interfaces_to_user_output.values(): v.belongs_to_actor = self - def store_output_data(self, output_name, run_index, *data): - self._assert_output_exists(output_name) - self.user_output[output_name].store_data(run_index, *data) + def import_user_output_from_actor(self, *actor, **kwargs): + if not all([self.type_name == a.type_name for a in actor]): + fatal("An actor can only import user output from the same type of actor.") + if len(actor) == 1: + self.recover_user_output(actor[0]) + else: + for k in self.user_output: + try: + self.user_output[k].merge_data_from_actor_output( + *[a.user_output[k] for a in actor], **kwargs + ) + except NotImplementedError: + self.warn_user( + f"User output {k} in {self.type_name} cannot be imported " + f"because the function is not yet implemented for this type of output." + ) + + # def store_output_data(self, output_name, run_index, *data): + # self._assert_output_exists(output_name) + # self.user_output[output_name].store_data(run_index, *data) def write_output_to_disk_if_requested(self, output_name): self._assert_output_exists(output_name) @@ -457,5 +477,8 @@ def EndSimulationAction(self): """Default virtual method for inheritance""" pass + def EndOfMultiProcessAction(self): + pass + process_cls(ActorBase) diff --git a/opengate/actors/dataitems.py b/opengate/actors/dataitems.py index 119b79698..15f4402bd 100644 --- a/opengate/actors/dataitems.py +++ b/opengate/actors/dataitems.py @@ -2,9 +2,16 @@ import numpy as np import json from box import Box +import platform +import datetime from ..exception import fatal, warning, GateImplementationError -from ..utility import ensure_filename_is_str, calculate_variance +from ..utility import ( + ensure_filename_is_str, + calculate_variance, + g4_units, + g4_best_unit_tuple, +) from ..image import ( sum_itk_images, divide_itk_images, @@ -14,6 +21,7 @@ write_itk_image, get_info_from_image, ) +from ..serialization import dump_json # base classes @@ -41,6 +49,9 @@ def set_data(self, data, **kwargs): def data_is_none(self): return self.data is None + def reset_data(self): + raise NotImplementedError + def _assert_data_is_not_none(self): if self.data_is_none: raise ValueError( @@ -75,6 +86,8 @@ def hand_down(*args, **kwargs): getattr(self.data, item)(*args, **kwargs) return hand_down + else: + return getattr(self.data, item) else: raise AttributeError(f"No such attribute '{item}'") else: @@ -92,12 +105,13 @@ def merge_with(self, other): f"because the following ValueError was encountered: \n{e}" ) - def inplace_merge_with(self, other): + def inplace_merge_with(self, *other): """The base class implements merging as summation. Specific classes can override this, e.g. to merge mean values. """ try: - self += other + for o in other: + self.__iadd__(o) except ValueError as e: raise NotImplementedError( f"method 'inplace_merge_with' probably not implemented for data item class {type(self)} " @@ -123,6 +137,162 @@ def number_of_samples(self, value): self.meta_data["number_of_samples"] = int(value) +class StatisticsDataItem(DataItem): + + # def __init__(self, *args, **kwargs): + # super().__init__(*args, **kwargs) + # # super leaves data=None if no data is passed as kwarg, + # # but we want to initialize with a pre-filled Box + # if self.data is None: + # self.reset_data() + + def __str__(self): + s = "" + for k, v in self.get_processed_output().items(): + if k == "track_types": + if len(v["value"]) > 0: + s += "track_types\n" + for t, n in v["value"].items(): + s += f"{' ' * 24}{t}: {n}\n" + else: + if v["unit"] is None: + unit = "" + else: + unit = str(v["unit"]) + s += f"{k}{' ' * (20 - len(k))}{v['value']} {unit}\n" + # remove last line break + return s.rstrip("\n") + + def set_data(self, data, **kwargs): + """The input data must behave like a dictionary.""" + self.reset_data() + self.data.update(data) + + def reset_data(self): + self.data = Box() + self.data.runs = 0 + self.data.events = 0 + self.data.tracks = 0 + self.data.steps = 0 + self.data.duration = 0 + self.data.start_time = 0 + self.data.stop_time = 0 + self.data.sim_start_time = 0 + self.data.sim_stop_time = 0 + self.data.init = 0 + self.data.track_types = {} + self.data.nb_threads = 1 + + def inplace_merge_with(self, *other): + if self.data is None: + self.reset_data() + for o in other: + self.data.runs += o.data.runs + self.data.events += o.data.events + self.data.steps += o.data.steps + self.data.tracks += o.data.tracks + self.data.duration += o.data.duration + self.data.init += o.data.init + + common_entries = set(self.data.track_types.keys()).intersection( + o.data.track_types.keys() + ) + new_entries = set(o.data.track_types.keys()).difference( + self.data.track_types.keys() + ) + for k in common_entries: + self.data.track_types[k] += o.data.track_types[k] + for k in new_entries: + self.data.track_types[k] = o.data.track_types[k] + + # self.data.start_time = 0 + self.data.start_time = min([o.data.start_time for o in other]) + self.data.stop_time = max([o.data.stop_time for o in other]) + + self.data.sim_start_time = min([o.data.sim_start_time for o in other]) + self.data.sim_stop_time = max([o.data.sim_stop_time for o in other]) + + @property + def start_date_time(self): + return datetime.datetime.fromtimestamp(int(self.data.start_time)).strftime("%c") + + @property + def stop_date_time(self): + return datetime.datetime.fromtimestamp(int(self.data.stop_time)).strftime("%c") + + @property + def pps(self): + if self.data.duration != 0: + return int(self.data.events / (self.data.duration / g4_units.s)) + else: + return 0 + + @property + def tps(self): + if self.data.duration != 0: + return int(self.data.tracks / (self.data.duration / g4_units.s)) + else: + return 0 + + @property + def sps(self): + if self.data.duration != 0: + return int(self.data.steps / (self.data.duration / g4_units.s)) + else: + return 0 + + def get_processed_output(self): + d = {} + d["runs"] = {"value": self.data.runs, "unit": None} + d["events"] = {"value": self.data.events, "unit": None} + d["tracks"] = {"value": self.data.tracks, "unit": None} + d["steps"] = {"value": self.data.steps, "unit": None} + val, unit = g4_best_unit_tuple(self.data.init, "Time") + d["init"] = { + "value": val, + "unit": unit, + } + val, unit = g4_best_unit_tuple(self.data.duration, "Time") + d["duration"] = { + "value": val, + "unit": unit, + } + d["pps"] = {"value": self.pps, "unit": None} + d["tps"] = {"value": self.tps, "unit": None} + d["sps"] = {"value": self.sps, "unit": None} + d["start_time"] = { + "value": self.data.start_time, + "unit": None, + } + d["stop_time"] = { + "value": self.data.stop_time, + "unit": None, + } + val, unit = g4_best_unit_tuple(self.data.sim_start_time, "Time") + d["sim_start_time"] = { + "value": val, + "unit": unit, + } + val, unit = g4_best_unit_tuple(self.data.sim_stop_time, "Time") + d["sim_stop_time"] = { + "value": val, + "unit": unit, + } + d["threads"] = {"value": self.data.nb_threads, "unit": None} + d["arch"] = {"value": platform.system(), "unit": None} + d["python"] = {"value": platform.python_version(), "unit": None} + d["track_types"] = {"value": self.data.track_types, "unit": None} + return d + + def write(self, path, encoder="json", **kwargs): + """Override virtual method from base class.""" + with open(path, "w+") as f: + if encoder == "json": + dump_json(self.get_processed_output(), f, indent=4) + else: + f.write(self.__str__()) + + class MeanValueDataItemMixin: """This class cannot be instantiated on its own. It is solely meant to be mixed into a class that inherits from DataItem (or daughters). @@ -268,6 +438,10 @@ def __itruediv__(self, other): self.set_data(divide_itk_images(self.data, other.data)) return self + def inplace_merge_with(self, *other): + for o in other: + self.__iadd__(o) + def set_image_properties(self, **properties): if not self.data_is_none: if "spacing" in properties and properties["spacing"] is not None: @@ -525,12 +699,12 @@ def inplace_merge_with(self, other): if (self.data[i] is None or self.data[i].data is None) is not ( other.data[i] is None or other.data[i].data is None ): - s_not = {True: "", False: "not_"} + s_not = {True: "", False: "not "} fatal( "Cannot apply inplace merge data to container " "with unset (None) data items. " - f"In this case, the inplace item {i} is {s_not[self.data[i] is None]} None, " - f"and the other item {i} is {s_not[other.data[i] is None]} None. " + f"In this case, the inplace item {i} is {s_not[self.data[i] is None]}None, " + f"and the other item {i} is {s_not[other.data[i] is None]}None. " f"This is likely an implementation error in GATE. " ) return self @@ -746,10 +920,23 @@ class QuotientMeanItkImage(QuotientItkImage): ) +class StatisticsItemContainer(DataItemContainer): + + _data_item_classes = (StatisticsDataItem,) + default_data_item_config = Box( + { + 0: Box({"output_filename": "auto", "write_to_disk": False, "active": True}), + } + ) + + def merge_data(list_of_data): merged_data = list_of_data[0] - for d in list_of_data[1:]: - merged_data.inplace_merge_with(d) + try: + merged_data.inplace_merge_with(*list_of_data[1:]) + except: + for d in list_of_data[1:]: + merged_data.inplace_merge_with(d) return merged_data diff --git a/opengate/actors/doseactors.py b/opengate/actors/doseactors.py index 27ff6719c..6a5ac66d1 100644 --- a/opengate/actors/doseactors.py +++ b/opengate/actors/doseactors.py @@ -243,12 +243,17 @@ def EndOfRunActionMasterThread(self, run_index): u.end_of_run(run_index) return 0 - def EndSimulationAction(self): - # inform actor output that this simulation is over and write data + def inform_user_output_about_end(self): for u in self.user_output.values(): if u.get_active(item="all"): u.end_of_simulation() + def EndSimulationAction(self): + self.inform_user_output_about_end() + + def EndOfMultiProcessAction(self): + self.inform_user_output_about_end() + def compute_std_from_sample( number_of_samples, value_array, squared_value_array, correct_bias=False @@ -397,20 +402,6 @@ class DoseActor(VoxelDepositActor, g4.GateDoseActor): ), }, ), - # "calculate_density_from": ( - # "auto", - # { - # "doc": "How should density be calculated?\n" - # "'simulation': via scoring along with the rest of the quantities.\n" - # "'image': from the CT image, if the actor is attached to an ImageVolume.\n" - # "'auto' (default): Let GATE pick the right one for you. ", - # "allowed_values": ( - # "auto", - # "simulation", - # "image" - # ), - # }, - # ), "ste_of_mean": ( False, { diff --git a/opengate/actors/miscactors.py b/opengate/actors/miscactors.py index f580f6abc..869cf14e8 100644 --- a/opengate/actors/miscactors.py +++ b/opengate/actors/miscactors.py @@ -2,189 +2,11 @@ import platform import opengate_core as g4 from .base import ActorBase -from ..utility import g4_units, g4_best_unit_tuple -from .actoroutput import ActorOutputBase -from ..serialization import dump_json -from ..exception import warning +from ..utility import g4_units +from .actoroutput import ActorOutputStatisticsActor from ..base import process_cls -def _setter_hook_stats_actor_output_filename(self, output_filename): - # By default, write_to_disk is False. - # However, if user actively sets the output_filename - # s/he most likely wants to write to disk also - if output_filename != "" and output_filename is not None: - self.write_to_disk = True - return output_filename - - -class ActorOutputStatisticsActor(ActorOutputBase): - """This is a hand-crafted ActorOutput specifically for the SimulationStatisticsActor.""" - - # hints for IDE - encoder: str - output_filename: str - write_to_disk: bool - - user_info_defaults = { - "encoder": ( - "json", - { - "doc": "How should the output be encoded?", - "allowed_values": ("json", "legacy"), - }, - ), - "output_filename": ( - "auto", - { - "doc": "Filename for the data represented by this actor output. " - "Relative paths and filenames are taken " - "relative to the global simulation output folder " - "set via the Simulation.output_dir option. ", - "setter_hook": _setter_hook_stats_actor_output_filename, - }, - ), - "write_to_disk": ( - False, - { - "doc": "Should the output be written to disk, or only kept in memory? ", - }, - ), - } - - default_suffix = "json" - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - # predefine the merged_data - self.merged_data = Box() - self.merged_data.runs = 0 - self.merged_data.events = 0 - self.merged_data.tracks = 0 - self.merged_data.steps = 0 - self.merged_data.duration = 0 - self.merged_data.start_time = 0 - self.merged_data.stop_time = 0 - self.merged_data.sim_start_time = 0 - self.merged_data.sim_stop_time = 0 - self.merged_data.init = 0 - self.merged_data.track_types = {} - self.merged_data.nb_threads = 1 - - @property - def pps(self): - if self.merged_data.duration != 0: - return int( - self.merged_data.events / (self.merged_data.duration / g4_units.s) - ) - else: - return 0 - - @property - def tps(self): - if self.merged_data.duration != 0: - return int( - self.merged_data.tracks / (self.merged_data.duration / g4_units.s) - ) - else: - return 0 - - @property - def sps(self): - if self.merged_data.duration != 0: - return int( - self.merged_data.steps / (self.merged_data.duration / g4_units.s) - ) - else: - return 0 - - def store_data(self, data, **kwargs): - self.merged_data.update(data) - - def get_data(self, **kwargs): - if "which" in kwargs and kwargs["which"] != "merged": - warning( - f"The statistics actor output only stores merged data currently. " - f"The which={kwargs['which']} you provided will be ignored. " - ) - # the statistics actor currently only handles merged data, so we return it - # no input variable 'which' as in other output classes - return self.merged_data - - def get_processed_output(self): - d = {} - d["runs"] = {"value": self.merged_data.runs, "unit": None} - d["events"] = {"value": self.merged_data.events, "unit": None} - d["tracks"] = {"value": self.merged_data.tracks, "unit": None} - d["steps"] = {"value": self.merged_data.steps, "unit": None} - val, unit = g4_best_unit_tuple(self.merged_data.init, "Time") - d["init"] = { - "value": val, - "unit": unit, - } - val, unit = g4_best_unit_tuple(self.merged_data.duration, "Time") - d["duration"] = { - "value": val, - "unit": unit, - } - d["pps"] = {"value": self.pps, "unit": None} - d["tps"] = {"value": self.tps, "unit": None} - d["sps"] = {"value": self.sps, "unit": None} - d["start_time"] = { - "value": self.merged_data.start_time, - "unit": None, - } - d["stop_time"] = { - "value": self.merged_data.stop_time, - "unit": None, - } - val, unit = g4_best_unit_tuple(self.merged_data.sim_start_time, "Time") - d["sim_start_time"] = { - "value": val, - "unit": unit, - } - val, unit = g4_best_unit_tuple(self.merged_data.sim_stop_time, "Time") - d["sim_stop_time"] = { - "value": val, - "unit": unit, - } - d["threads"] = {"value": self.merged_data.nb_threads, "unit": None} - d["arch"] = {"value": platform.system(), "unit": None} - d["python"] = {"value": platform.python_version(), "unit": None} - d["track_types"] = {"value": self.merged_data.track_types, "unit": None} - return d - - def __str__(self): - s = "" - for k, v in self.get_processed_output().items(): - if k == "track_types": - if len(v["value"]) > 0: - s += "track_types\n" - for t, n in v["value"].items(): - s += f"{' ' * 24}{t}: {n}\n" - else: - if v["unit"] is None: - unit = "" - else: - unit = str(v["unit"]) - s += f"{k}{' ' * (20 - len(k))}{v['value']} {unit}\n" - # remove last line break - return s.rstrip("\n") - - def write_data(self, **kwargs): - """Override virtual method from base class.""" - with open(self.get_output_path(which="merged"), "w+") as f: - if self.encoder == "json": - dump_json(self.get_processed_output(), f, indent=4) - else: - f.write(self.__str__()) - - def write_data_if_requested(self, **kwargs): - if self.write_to_disk is True: - self.write_data(**kwargs) - - class SimulationStatisticsActor(ActorBase, g4.GateSimulationStatisticsActor): """ Store statistics about a simulation run. @@ -205,22 +27,20 @@ class SimulationStatisticsActor(ActorBase, g4.GateSimulationStatisticsActor): def __init__(self, *args, **kwargs): ActorBase.__init__(self, *args, **kwargs) self._add_user_output(ActorOutputStatisticsActor, "stats") + self.user_output.stats.set_write_to_disk(False) self.__initcpp__() def __initcpp__(self): g4.GateSimulationStatisticsActor.__init__(self, self.user_info) self.AddActions({"StartSimulationAction", "EndSimulationAction"}) - def __str__(self): - s = self.user_output["stats"].__str__() - return s - @property def counts(self): return self.user_output.stats.merged_data - def store_output_data(self, output_name, run_index, *data): - raise NotImplementedError + def __str__(self): + s = self.user_output["stats"].__str__() + return s def initialize(self): ActorBase.initialize(self) @@ -229,13 +49,17 @@ def initialize(self): def StartSimulationAction(self): g4.GateSimulationStatisticsActor.StartSimulationAction(self) - self.user_output.stats.merged_data.nb_threads = ( - self.simulation.number_of_threads - ) + # self.user_output.stats.merged_data.nb_threads = ( + # self.simulation.number_of_threads + # ) + + # def EndOfRunActionMasterThread(self, run_index): + # self.user_output.stats.store_data() def EndSimulationAction(self): g4.GateSimulationStatisticsActor.EndSimulationAction(self) - self.user_output.stats.store_data(self.GetCounts()) + + data = dict([(k, v) for k, v in self.GetCounts().items()]) if self.simulation is not None: sim_start = self.simulation.run_timing_intervals[0][0] @@ -247,18 +71,16 @@ def EndSimulationAction(self): else: sim_stop = 0 - self.user_output.stats.store_data( - {"sim_start": sim_start, "sim_stop": sim_stop} - ) - self.user_output.stats.merged_data.sim_start_time = ( - self.simulation.run_timing_intervals[0][0] - ) - self.user_output.stats.merged_data.sim_stop_time = ( - self.simulation.run_timing_intervals[-1][1] - ) - self.user_output.stats.merged_data.nb_threads = ( - self.simulation.number_of_threads - ) + data["sim_start"] = sim_start + data["sim_stop"] = sim_stop + data["sim_start_time"] = self.simulation.run_timing_intervals[0][0] + data["sim_stop_time"] = self.simulation.run_timing_intervals[-1][1] + data["nb_threads"] = self.simulation.number_of_threads + self.user_output.stats.store_data("merged", data) + + self.user_output.stats.write_data_if_requested() + + def EndOfMultiProcessAction(self): self.user_output.stats.write_data_if_requested() diff --git a/opengate/base.py b/opengate/base.py index 8a6ae8b8f..91000ebd2 100644 --- a/opengate/base.py +++ b/opengate/base.py @@ -656,7 +656,7 @@ def warn_user(self, message): self._temporary_warning_cache.append(message) # if possible, register the warning directly else: - self.simulation._user_warnings.append(message) + self.simulation.warnings.append(message) warning(message) @@ -703,7 +703,7 @@ def process_dynamic_parametrisation(self, params): extra_params = {} extra_params["auto_changer"] = params.pop( "auto_changer", True - ) # True of key not found (default) + ) # True if key not found (default) if extra_params["auto_changer"] not in (False, True): fatal( f"Received wrong value type for 'auto_changer': got {type(extra_params['auto_changer'])}, " @@ -763,6 +763,14 @@ def add_dynamic_parametrisation(self, name=None, **params): s += f"{k}: {v}\n" log.debug(s) + def reassign_dynamic_params_for_process(self, run_indices): + # loop over all dynamic parametrisations of this object, + for param in self.user_info["dynamic_params"].values(): + for k, v in param.items(): + # extract the subset of entries to the list that are relevant to this process + if k in self.dynamic_user_info: + param[k] = [v[i] for i in run_indices] + def create_changers(self): # this base class implementation is here to keep inheritance intact. return [] diff --git a/opengate/engines.py b/opengate/engines.py index 42ac8b2e0..c2e4bfde8 100644 --- a/opengate/engines.py +++ b/opengate/engines.py @@ -922,9 +922,23 @@ def __init__(self): self.sources_by_thread = {} self.pid = os.getpid() self.ppid = os.getppid() + self.simulation_id = None self.current_random_seed = None self.user_hook_log = [] self.warnings = None + self.process_index = None + + def store_output_from_simulation_engine(self, simulation_engine): + self.store_actors(simulation_engine) + self.store_sources(simulation_engine) + self.store_hook_log(simulation_engine) + self.current_random_seed = simulation_engine.current_random_seed + self.expected_number_of_events = ( + simulation_engine.source_engine.expected_number_of_events + ) + self.warnings = simulation_engine.simulation.warnings + self.simulation_id = id(simulation_engine.simulation) + self.process_index = simulation_engine.process_index def store_actors(self, simulation_engine): self.actors = simulation_engine.simulation.actor_manager.actors @@ -1016,6 +1030,7 @@ def __init__(self, simulation, new_process=False): # this is only for info. # Process handling is done in Simulation class, not in SimulationEngine! self.new_process = new_process + self.process_index = None # LATER : option to wait the end of completion or not @@ -1140,7 +1155,7 @@ def run_engine(self): # because everything else has already been executed in the main process # and potential warnings have already been registered. if self.new_process is True: - self.simulation.reset_warnings() + self.simulation.meta_data.reset_warnings() # initialization self.initialize() @@ -1154,34 +1169,18 @@ def run_engine(self): else: self.user_hook_after_init(self) - # if init only, we stop - if self.simulation.init_only: - output.store_actors(self) - output.store_sources(self) - output.store_hook_log(self) - output.current_random_seed = self.current_random_seed - output.expected_number_of_events = ( - self.source_engine.expected_number_of_events - ) - return output - - # go - self.start_and_stop() - - # start visualization if vrml or gdml - self.visu_engine.start_visualisation() - if self.user_hook_after_run: - log.info("Simulation: User hook after run") - self.user_hook_after_run(self) + # if init only, we skip the actual run + if not self.simulation.init_only: + # go + self.start_and_stop() + # start visualization if vrml or gdml + self.visu_engine.start_visualisation() + if self.user_hook_after_run: + log.info("Simulation: User hook after run") + self.user_hook_after_run(self) # prepare the output - output.store_actors(self) - output.store_sources(self) - output.store_hook_log(self) - output.current_random_seed = self.current_random_seed - output.expected_number_of_events = self.source_engine.expected_number_of_events - output.warnings = self.simulation.warnings - + output.store_output_from_simulation_engine(self) return output def start_and_stop(self): diff --git a/opengate/image.py b/opengate/image.py index 010aadb24..a9407063e 100644 --- a/opengate/image.py +++ b/opengate/image.py @@ -12,6 +12,8 @@ ) from .definitions import __gate_list_objects__ +import SimpleITK as sitk + def update_image_py_to_cpp(py_img, cpp_img, copy_data=False): cpp_img.set_size(py_img.GetLargestPossibleRegion().GetSize()) @@ -355,16 +357,48 @@ def divide_itk_images( return imgarrOut -def sum_itk_images(images): - image_type = type(images[0]) - add_image_filter = itk.AddImageFilter[image_type, image_type, image_type].New() - output = images[0] - for img in images[1:]: - add_image_filter.SetInput1(output) - add_image_filter.SetInput2(img) - add_image_filter.Update() - output = add_image_filter.GetOutput() - return output +# IMPLEMENTATION BASED ON ITK +# def sum_itk_images(images): +# image_type = type(images[0]) +# add_image_filter = itk.AddImageFilter[image_type, image_type, image_type].New() +# output = images[0] +# for img in images[1:]: +# add_image_filter.SetInput1(output) +# add_image_filter.SetInput2(img) +# add_image_filter.Update() +# output = add_image_filter.GetOutput() +# return output + + +def itk_to_sitk(itk_image): + array = itk.GetArrayFromImage(itk_image) + sitk_image = sitk.GetImageFromArray(array) + sitk_image.SetOrigin(np.array(itk_image.GetOrigin())) + sitk_image.SetSpacing(np.array(itk_image.GetSpacing())) + sitk_image.SetDirection(np.array(itk_image.GetDirection()).flatten()) + return sitk_image + + +def sitk_to_itk(sitk_image): + array = sitk.GetArrayFromImage(sitk_image) # Convert SimpleITK image to NumPy array + itk_image = itk.GetImageFromArray(array) # Convert NumPy array to ITK image + + # Set the metadata from SimpleITK to ITK image + itk_image.SetOrigin(np.array(sitk_image.GetOrigin())) + itk_image.SetSpacing(np.array(sitk_image.GetSpacing())) + itk_image.SetDirection(np.array(sitk_image.GetDirection()).reshape(3, 3)) + + return itk_image + + +def sum_itk_images(itk_image_list): + if not itk_image_list: + raise ValueError("The image list is empty.") + summed_image = itk_to_sitk(itk_image_list[0]) + for itk_image in itk_image_list[1:]: + sitk_image = itk_to_sitk(itk_image) + summed_image = sitk.Add(summed_image, sitk_image) + return sitk_to_itk(summed_image) def multiply_itk_images(images): diff --git a/opengate/managers.py b/opengate/managers.py index a0de748be..963ea09f8 100644 --- a/opengate/managers.py +++ b/opengate/managers.py @@ -7,6 +7,7 @@ import os from pathlib import Path import weakref +import multiprocessing import opengate_core as g4 @@ -35,6 +36,7 @@ ensure_directory_exists, ensure_filename_is_str, insert_suffix_before_extension, + delete_folder_contents, ) from . import logger from .logger import log @@ -46,7 +48,10 @@ ) from .userinfo import UserInfo from .serialization import dump_json, dumps_json, loads_json, load_json -from .processing import dispatch_to_subprocess +from .processing import ( + dispatch_to_subprocess, + MultiProcessingHandlerEqualPerRunTimingInterval, +) from .geometry.volumes import ( VolumeBase, @@ -1203,6 +1208,49 @@ def print_material_database_names(self): print(self.dump_material_database_names()) +class SimulationMetaData(Box): + + def __init__(self, *args, simulation_output=None, **kwargs): + super().__init__(*args, **kwargs) + self.warnings = [] + self.expected_number_of_events = 0 # FIXME workaround + self.user_hook_log = [] + self.current_random_seed = None + self.number_of_sub_processes = None + self.start_new_process = None + self.simulation_id = None + if simulation_output is not None: + self.extract_from_simulation_output(simulation_output) + + def reset(self): + self.reset_warnings() + self.expected_number_of_events = 0 + self.user_hook_log = [] + self.current_random_seed = None + self.number_of_sub_processes = None + self.start_new_process = None + + def reset_warnings(self): + self.warnings = [] + + def import_from_simulation_meta_data(self, *meta_data): + for m in meta_data: + self.warnings.extend(m.warnings) + self.expected_number_of_events += m.expected_number_of_events + self.user_hook_log.extend(m.user_hook_log) + if self.current_random_seed is None: + self.current_random_seed = m.current_random_seed + + def extract_from_simulation_output(self, *sim_output): + for so in sim_output: + self.warnings.extend(so.warnings) + self.expected_number_of_events += so.expected_number_of_events + self.user_hook_log.extend(so.user_hook_log) + if self.current_random_seed is None: + self.current_random_seed = so.current_random_seed + self.simulation_id = so.simulation_id + + def setter_hook_verbose_level(self, verbose_level): try: level = int(verbose_level) @@ -1479,13 +1527,15 @@ def __init__(self, name="simulation", **kwargs): kwargs.pop("simulation", None) super().__init__(name=name, **kwargs) - # list to store warning messages issued somewhere in the simulation - self._user_warnings = [] - # for debug only self.verbose_getstate = False self.verbose_close = False + self.meta_data = SimulationMetaData() + self.meta_data_per_process = {} + self.sub_process_registry = None + self.multi_proc_handler = None + # main managers self.volume_manager = VolumeManager(self) self.source_manager = SourceManager(self) @@ -1497,12 +1547,14 @@ def __init__(self, name="simulation", **kwargs): self.user_hook_after_init = None self.user_hook_after_init_arg = None self.user_hook_after_run = None - self.user_hook_log = None - # read-only info - self._current_random_seed = None - - self.expected_number_of_events = None + def __getattr__(self, item): + try: + return self.meta_data[item] + except KeyError: + raise AttributeError( + f"Item {item} not found in {type(self)}, nor in the simulation meta data. " + ) def __str__(self): s = ( @@ -1530,21 +1582,10 @@ def use_multithread(self): def world(self): return self.volume_manager.world_volume - @property - def current_random_seed(self): - return self._current_random_seed - - @property - def warnings(self): - return self._user_warnings - - def reset_warnings(self): - self._user_warnings = [] - def warn_user(self, message): # We need this specific implementation because the Simulation does not hold a reference 'simulation', # as required by the base class implementation of warn_user() - self._user_warnings.append(message) + self.warnings.append(message) super().warn_user(message) def to_dictionary(self): @@ -1696,7 +1737,7 @@ def add_filter(self, filter_type, name): def multithreaded(self): return self.number_of_threads > 1 or self.force_multithread_mode - def _run_simulation_engine(self, start_new_process): + def _run_simulation_engine(self, start_new_process, process_index=None): """Method that creates a simulation engine in a context (with ...) and runs a simulation. Args: @@ -1711,10 +1752,56 @@ def _run_simulation_engine(self, start_new_process): with SimulationEngine(self) as se: se.new_process = start_new_process se.init_only = self.init_only + se.process_index = process_index output = se.run_engine() return output - def run(self, start_new_process=False): + def run_in_process( + self, + multi_process_handler, + process_index, + output_dir, + avoid_write_to_disk_in_subprocess, + ): + # Important: this method is intended to run in a processes spawned off the main process. + # Therefore, self is actually a separate instance from the original simulation + # and we can safely adapt it in this process. + + # adapt the output_dir + self.output_dir = output_dir + if self.random_seed != "auto": + self.random_seed += process_index + + # adapt the run timing intervals in + self.run_timing_intervals = ( + multi_process_handler.get_run_timing_intervals_for_process(process_index) + ) + # adapt all dynamic volumes + for vol in self.volume_manager.dynamic_volumes: + vol.reassign_dynamic_params_for_process( + multi_process_handler.get_original_run_timing_indices_for_process( + process_index + ) + ) + + if avoid_write_to_disk_in_subprocess is True: + for actor in self.actor_manager.actors.values(): + actor.write_to_disk = False + + output = self._run_simulation_engine(True, process_index=process_index) + print(f"run_in_process finished in process {process_index}") + self.to_json_file() + return output + + def run( + self, + start_new_process=False, + number_of_sub_processes=0, + avoid_write_to_disk_in_subprocess=True, + merge_after_multiprocessing=True, + clear_output_dir_before_run=False, + ): + global __spec__ # if windows and MT -> fail if os.name == "nt" and self.multithreaded: fatal( @@ -1722,6 +1809,19 @@ def run(self, start_new_process=False): "Run the simulation with one thread." ) + if number_of_sub_processes == 1: + start_new_process = True + + self.meta_data.reset() + self.meta_data.number_of_sub_processes = number_of_sub_processes + self.meta_data.start_new_process = start_new_process + + if clear_output_dir_before_run is True: + delete_folder_contents(self.get_output_path()) + + for actor in self.actor_manager.actors.values(): + actor.reset_user_output() + # prepare sub process if start_new_process is True: """Important: put: @@ -1748,19 +1848,55 @@ def run(self, start_new_process=False): source.fTotalSkippedEvents = s.user_info.fTotalSkippedEvents source.fTotalZeroEvents = s.user_info.fTotalZeroEvents + self.meta_data.extract_from_simulation_output(output) + + elif number_of_sub_processes > 1: + self.multi_proc_handler = MultiProcessingHandlerEqualPerRunTimingInterval( + name="multi_proc_handler", + simulation=self, + number_of_processes=number_of_sub_processes, + ) + self.multi_proc_handler.initialize() + __spec__ = None + try: + multiprocessing.set_start_method("spawn") + except RuntimeError: + print(f"Could not set start method 'spawn'. __spec__ = {__spec__}") + pass + # q = multiprocessing.Queue() + + self.sub_process_registry = dict( + [ + (i, {"output_dir": str(Path(self.output_dir) / f"process_{i}")}) + for i in range(number_of_sub_processes) + ] + ) + + with multiprocessing.Pool(number_of_sub_processes) as pool: + results = [ + pool.apply_async( + self.run_in_process, + ( + self.multi_proc_handler, + k, + v["output_dir"], + avoid_write_to_disk_in_subprocess, + ), + ) + for k, v in self.sub_process_registry.items() + ] + # `.apply_async()` immediately returns AsyncResult (ApplyResult) object + list_of_output = [res.get() for res in results] + log.info("End of multiprocessing") + + if merge_after_multiprocessing is True: + self.merge_simulations_from_multiprocessing(list_of_output) + else: # Nothing special to do if the simulation engine ran in the native python process # because everything is already in place. output = self._run_simulation_engine(False) - - self._user_warnings.extend(output.warnings) - - # FIXME workaround - self.expected_number_of_events = output.expected_number_of_events - - # store the hook log - self.user_hook_log = output.user_hook_log - self._current_random_seed = output.current_random_seed + self.meta_data.extract_from_simulation_output(output) if self.store_json_archive is True: self.to_json_file() @@ -1773,11 +1909,59 @@ def run(self, start_new_process=False): print("*" * 20) print(f"{len(self.warnings)} warnings occurred in this simulation: \n") for i, w in enumerate(self.warnings): - print(f"{i+1}) " + "-" * 10) + print(f"{i + 1}) " + "-" * 10) print(w) print() print("*" * 20) + def merge_simulations_from_multiprocessing(self, list_of_output): + """To be run after a simulation has run in a multiple subprocesses. + Currently, the input is a list of SimulationOutput instances, + but in the future the input will be a list of Simulation instances + (returned or recreated from the subprocesses).""" + if self.multi_proc_handler is None: + fatal( + "Cannot execute merge_simulations_from_multiprocessing without a multi_proc_handler. " + ) + + luts_run_index = [ + self.multi_proc_handler.get_original_run_timing_indices_for_process( + o.process_index + ) + for o in list_of_output + ] + + # loop over actors in original simulation + for actor in self.actor_manager.actors.values(): + actors_to_merge = [ + o.get_actor(actor.name) for o in list_of_output + ] # these are the actors from the process + actor.import_user_output_from_actor( + *actors_to_merge, luts_run_index=luts_run_index + ) + + for actor in self.actor_manager.actors.values(): + actor.EndOfMultiProcessAction() + + self.meta_data.extract_from_simulation_output(*list_of_output) + for i, o in enumerate(list_of_output): + self.meta_data_per_process[i] = SimulationMetaData(simulation_output=o) + + # FIXME: temporary workaround to collect extra info from output + # will be implemented similar to actor.import_user_output_from_actor after source refactoring + for source in self.source_manager.user_info_sources.values(): + for o in list_of_output: + try: + s = o.get_source(source.name) + except: + continue + if "fTotalSkippedEvents" in s.user_info.__dict__: + if not hasattr(source, "fTotalSkippedEvents"): + source.fTotalSkippedEvents = 0 + source.fTotalZeroEvents = 0 + source.fTotalSkippedEvents += s.user_info.fTotalSkippedEvents + source.fTotalZeroEvents += s.user_info.fTotalZeroEvents + def voxelize_geometry( self, extent="auto", diff --git a/opengate/processing.py b/opengate/processing.py index 7de54292e..5967a0892 100644 --- a/opengate/processing.py +++ b/opengate/processing.py @@ -2,6 +2,7 @@ import queue from .exception import fatal +from .base import GateObject, process_cls # define thin wrapper function to handle the queue @@ -28,3 +29,120 @@ def dispatch_to_subprocess(func, *args, **kwargs): return q.get(block=False) except queue.Empty: fatal("The queue is empty. The spawned process probably died.") + + +def _setter_hook_number_of_processes(self, number_of_processes): + if self.number_of_processes != number_of_processes: + self._dispatch_configuration = {} + self.process_run_index_map = {} + self.inverse_process_to_run_index_map = {} + return number_of_processes + + +class MultiProcessingHandlerBase(GateObject): + + user_info_defaults = { + "number_of_processes": ( + 1, + { + "doc": "In how many parallel process should the simulation be run? " + "Must be a multiple of the number of run timing intervals. ", + "setter_hook": _setter_hook_number_of_processes, + }, + ) + } + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._dispatch_configuration = {} + self.process_run_index_map = {} + self.inverse_process_to_run_index_map = {} + + @property + def original_run_timing_intervals(self): + return self.simulation.run_timing_intervals + + @property + def dispatch_configuration(self): + return self._dispatch_configuration + + @dispatch_configuration.setter + def dispatch_configuration(self, config): + self._dispatch_configuration = config + self.update_process_to_run_index_maps() + + def assert_dispatch_configuration(self): + if self.dispatch_configuration is None or len(self.dispatch_configuration) == 0: + fatal("No dispatch configuration is available. ") + + def initialize(self): + self.generate_dispatch_configuration() + + def get_original_run_timing_indices_for_process(self, process_index): + return self.dispatch_configuration[process_index]["lut_original_rti"] + + def get_run_timing_intervals_for_process(self, process_index): + return self.dispatch_configuration[process_index]["run_timing_intervals"] + + def generate_dispatch_configuration(self): + raise NotImplementedError + + def update_process_to_run_index_maps(self): + """Creates a mapping (process index, local run index) -> (original run index)""" + self.assert_dispatch_configuration() + + p_r_map = {} + for k, v in self.dispatch_configuration.items(): + for lri, ori in enumerate(v["lut_original_rti"]): + p_r_map[(k, lri)] = ori + + # and the inverse + p_r_map_inv = dict([(i, []) for i in set(p_r_map.values())]) + for k, v in p_r_map.items(): + p_r_map_inv[v].append(k) + + self.process_run_index_map = p_r_map + self.inverse_process_to_run_index_map = p_r_map_inv + + def dispatch_to_processes(self, dispatch_function, *args): + return [ + dispatch_function(i, *args) for i in range(len(self.dispatch_configuration)) + ] + + +class MultiProcessingHandlerEqualPerRunTimingInterval(MultiProcessingHandlerBase): + + def generate_dispatch_configuration(self): + if self.number_of_processes % len(self.original_run_timing_intervals) != 0: + fatal( + "number_of_sub_processes must be a multiple of the number of run_timing_intervals, \n" + f"but I received {self.number_of_processes}, while there are {len(self.original_run_timing_intervals)}." + ) + + number_of_processes_per_run = int( + self.number_of_processes / len(self.original_run_timing_intervals) + ) + dispatch_configuration = {} + process_index = 0 + for i, rti in enumerate(self.original_run_timing_intervals): + t_start, t_end = rti + duration_original = t_end - t_start + duration_in_process = duration_original / number_of_processes_per_run + t_intermediate = [ + t_start + (j + 1) * duration_in_process + for j in range(number_of_processes_per_run - 1) + ] + t_all = [t_start] + t_intermediate + [t_end] + for t_s, t_e in zip(t_all[:-1], t_all[1:]): + dispatch_configuration[process_index] = { + "run_timing_intervals": [[t_s, t_e]], + "lut_original_rti": [i], + "process_id": None, + } + process_index += 1 + self.dispatch_configuration = dispatch_configuration + return dispatch_configuration + + +process_cls(MultiProcessingHandlerBase) +process_cls(MultiProcessingHandlerEqualPerRunTimingInterval) diff --git a/opengate/tests/src/test006_runs.py b/opengate/tests/src/test006_runs.py index b9f00d80c..a23d9e7ca 100755 --- a/opengate/tests/src/test006_runs.py +++ b/opengate/tests/src/test006_runs.py @@ -1,6 +1,8 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +from box import Box + import opengate as gate import opengate.tests.utility as utility @@ -86,13 +88,16 @@ print(stats) stats_ref = gate.actors.miscactors.SimulationStatisticsActor(name="stat_ref") - c = stats_ref.counts + c = Box() c.runs = 3 c.events = 7800 c.tracks = 37584 # 56394 c.steps = 266582 # 217234 # stats_ref.pps = 4059.6 3 3112.2 c.duration = 1 / 4059.6 * 7800 * sec + + stats_ref.user_output.stats.store_data("merged", c) + print("-" * 80) is_ok = utility.assert_stats(stats, stats_ref, 0.185) diff --git a/opengate/tests/src/test008_dose_actor_multiproc.py b/opengate/tests/src/test008_dose_actor_multiproc.py new file mode 100755 index 000000000..d6c57d563 --- /dev/null +++ b/opengate/tests/src/test008_dose_actor_multiproc.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import opengate as gate +from opengate.tests import utility +from scipy.spatial.transform import Rotation +from pathlib import Path +import time + +if __name__ == "__main__": + paths = utility.get_default_test_paths(__file__, "gate_test008_dose_actor") + ref_path = paths.gate_output + + # create the simulation + sim = gate.Simulation() + + # main options + sim.g4_verbose = False + sim.g4_verbose_level = 1 + sim.visu = False + sim.random_seed = 12345678 + + # shortcuts for units + m = gate.g4_units.m + cm = gate.g4_units.cm + + # change world size + world = sim.world + world.size = [1 * m, 1 * m, 1 * m] + + # add a simple fake volume to test hierarchy + # translation and rotation like in the Gate macro + fake = sim.add_volume("Box", "fake") + fake.size = [40 * cm, 40 * cm, 40 * cm] + fake.translation = [1 * cm, 2 * cm, 3 * cm] + fake.rotation = Rotation.from_euler("x", 10, degrees=True).as_matrix() + fake.material = "G4_AIR" + fake.color = [1, 0, 1, 1] + + # waterbox + waterbox = sim.add_volume("Box", "waterbox") + waterbox.mother = "fake" + waterbox.size = [10 * cm, 10 * cm, 10 * cm] + waterbox.translation = [-3 * cm, -2 * cm, -1 * cm] + waterbox.rotation = Rotation.from_euler("y", 20, degrees=True).as_matrix() + waterbox.material = "G4_WATER" + waterbox.color = [0, 0, 1, 1] + + # physics + sim.physics_manager.physics_list_name = "QGSP_BERT_EMV" + sim.physics_manager.enable_decay = False + sim.physics_manager.apply_cuts = True # default + um = gate.g4_units.um + global_cut = 700 * um + sim.physics_manager.global_production_cuts.gamma = global_cut + sim.physics_manager.global_production_cuts.electron = global_cut + sim.physics_manager.global_production_cuts.positron = global_cut + sim.physics_manager.global_production_cuts.proton = global_cut + + # default source for tests + source = sim.add_source("GenericSource", "mysource") + MeV = gate.g4_units.MeV + Bq = gate.g4_units.Bq + source.energy.mono = 150 * MeV + nm = gate.g4_units.nm + source.particle = "proton" + source.position.type = "disc" + source.position.radius = 1 * nm + source.direction.type = "momentum" + source.direction.momentum = [0, 0, 1] + source.activity = 50000 * Bq + + # add dose actor + dose = sim.add_actor("DoseActor", "dose") + dose.attached_to = "waterbox" + dose.size = [99, 99, 99] + mm = gate.g4_units.mm + dose.spacing = [2 * mm, 2 * mm, 2 * mm] + dose.translation = [2 * mm, 3 * mm, -2 * mm] + dose.edep_uncertainty.active = True + dose.hit_type = "random" + dose.output_coordinate_system = "local" + dose.output_filename = "test.nii.gz" + + # add stat actor + stat_actor = sim.add_actor("SimulationStatisticsActor", "Stats") + stat_actor.write_to_disk = True + stat_actor.output_filename = "stats.json" + stat_actor.track_types_flag = True + + # # start simulation + t1 = time.time() + sim.output_dir = paths.output / Path(__file__.rstrip(".py")).stem / "nproc_4" + path_edep_nproc4 = dose.edep.get_output_path() + sim.run(number_of_sub_processes=4, avoid_write_to_disk_in_subprocess=False) + t2 = time.time() + delta_t_nproc4 = t2 - t1 + + sim.output_dir = paths.output / Path(__file__.rstrip(".py")).stem / "nproc_1" + path_edep_nproc1 = dose.edep.get_output_path() + t1 = time.time() + sim.run(number_of_sub_processes=1) + t2 = time.time() + delta_t_nproc1 = t2 - t1 + + # t1 = time.time() + # sim.run(number_of_sub_processes=0) + # t2 = time.time() + # delta_t_no_subproc = t2 - t1 + + print("Simulation times: ") + print(f"One subprocess: {delta_t_nproc1}") + print( + f"Four subprocesses: {delta_t_nproc4}, speed-up: {delta_t_nproc1 / delta_t_nproc4}" + ) + # # print(f"No subprocess: {delta_t_no_subproc}") + + # # tests + print("\nDifference for EDEP") + is_ok = utility.assert_images( + path_edep_nproc1, + path_edep_nproc4, + tolerance=13, + ignore_value_data2=0, + sum_tolerance=1, + ) + # + # print("\nDifference for uncertainty") + # is_ok = ( + # utility.assert_images( + # ref_path / "output-Edep-Uncertainty.mhd", + # dose.edep_uncertainty.get_output_path(), + # stat, + # tolerance=30, + # ignore_value=1, + # sum_tolerance=1, + # ) + # and is_ok + # ) + # + utility.test_ok(is_ok) diff --git a/opengate/tests/src/test009_voxels_dynamic.py b/opengate/tests/src/test009_voxels_dynamic.py index 459b83d12..4a27a6b17 100755 --- a/opengate/tests/src/test009_voxels_dynamic.py +++ b/opengate/tests/src/test009_voxels_dynamic.py @@ -93,6 +93,7 @@ # add dose actor dose = sim.add_actor("DoseActor", "dose") dose.output_filename = "test009-edep.mhd" + dose.edep.keep_data_per_run = True dose.attached_to = "patient" dose.size = [99, 99, 99] dose.spacing = [2 * mm, 2 * mm, 2 * mm] diff --git a/opengate/tests/src/test019_phsp_actor_multiproc.py b/opengate/tests/src/test019_phsp_actor_multiproc.py new file mode 100755 index 000000000..f64ffd833 --- /dev/null +++ b/opengate/tests/src/test019_phsp_actor_multiproc.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import opengate as gate +from opengate.tests import utility +import uproot +import numpy as np + + +if __name__ == "__main__": + paths = utility.get_default_test_paths( + __file__, "", output_folder="test019_multiproc" + ) + + # create the simulation + sim = gate.Simulation() + + # main options + sim.output_dir = paths.output + sim.g4_verbose = False + sim.visu = False + sim.visu_type = "vrml" + sim.check_volumes_overlap = False + sim.number_of_threads = 1 + sim.random_seed = 321654 + + # units + m = gate.g4_units.m + mm = gate.g4_units.mm + nm = gate.g4_units.nm + Bq = gate.g4_units.Bq + MeV = gate.g4_units.MeV + + # adapt world size + sim.world.size = [1 * m, 1 * m, 1 * m] + sim.world.material = "G4_AIR" + + # virtual plane for phase space + plane = sim.add_volume("Tubs", "phase_space_plane") + plane.mother = sim.world + plane.material = "G4_AIR" + plane.rmin = 0 + plane.rmax = 700 * mm + plane.dz = 1 * nm # half height + plane.translation = [0, 0, -100 * mm] + plane.color = [1, 0, 0, 1] # red + + # e- source + source = sim.add_source("GenericSource", "Default") + source.particle = "gamma" + source.energy.type = "gauss" + source.energy.mono = 1 * MeV + source.energy.sigma_gauss = 0.5 * MeV + source.position.type = "disc" + source.position.radius = 20 * mm + source.position.translation = [0, 0, 0 * mm] + source.direction.type = "momentum" + source.n = 66 + + # add stat actor + stats_actor = sim.add_actor("SimulationStatisticsActor", "Stats") + stats_actor.track_types_flag = True + + # PhaseSpace Actor + phsp_actor = sim.add_actor("PhaseSpaceActor", "PhaseSpace") + phsp_actor.attached_to = plane.name + phsp_actor.attributes = [ + "KineticEnergy", + "PostPosition", + "PrePosition", + "PrePositionLocal", + "ParticleName", + "PreDirection", + "PreDirectionLocal", + "PostDirection", + "TimeFromBeginOfEvent", + "GlobalTime", + "LocalTime", + "EventPosition", + "PDGCode", + "EventID", + "RunID", + ] + phsp_actor.debug = False + + # run the simulation once with no particle in the phsp + source.direction.momentum = [0, 0, -1] + phsp_actor.output_filename = "test019_phsp_actor.root" + + sim.run_timing_intervals = [(i, i + 1) for i in range(3)] + + # run + nb_proc = 3 + sim.output_dir = paths.output / "multiproc" + sim.run( + number_of_sub_processes=nb_proc, + avoid_write_to_disk_in_subprocess=False, + clear_output_dir_before_run=True, + ) + path_phsp_output_single = phsp_actor.get_output_path() + + sim.output_dir = paths.output / "singleproc" + sim.run( + number_of_sub_processes=nb_proc, + avoid_write_to_disk_in_subprocess=False, + clear_output_dir_before_run=True, + ) + path_phsp_output_multi = phsp_actor.get_output_path() + + f_multi = uproot.open(path_phsp_output_multi) + eventid_multi = np.asarray(f_multi["PhaseSpace;1"]["EventID"]) + runid_multi = np.asarray(f_multi["PhaseSpace;1"]["RunID"]) + + f_single = uproot.open(path_phsp_output_single) + eventid_single = np.asarray(f_single["PhaseSpace;1"]["EventID"]) + runid_single = np.asarray(f_single["PhaseSpace;1"]["RunID"]) + + assert len(set(eventid_multi)) == len(eventid_multi) + assert set(runid_multi) == set([i for i in range(len(sim.run_timing_intervals))]) + assert set(eventid_single) == set(eventid_multi) + assert set(runid_single) == set(runid_multi) diff --git a/opengate/tests/src/test030_dose_motion_dynamic_param_multiproc.py b/opengate/tests/src/test030_dose_motion_dynamic_param_multiproc.py new file mode 100755 index 000000000..b0c00b92d --- /dev/null +++ b/opengate/tests/src/test030_dose_motion_dynamic_param_multiproc.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import opengate as gate +from scipy.spatial.transform import Rotation +from opengate.tests import utility + +if __name__ == "__main__": + paths = utility.get_default_test_paths( + __file__, "gate_test029_volume_time_rotation", "test030" + ) + + # create the simulation + sim = gate.Simulation() + + # main options + sim.g4_verbose = False + sim.visu = False + sim.random_seed = 983456 + sim.output_dir = paths.output + + # units + m = gate.g4_units.m + mm = gate.g4_units.mm + cm = gate.g4_units.cm + um = gate.g4_units.um + nm = gate.g4_units.nm + MeV = gate.g4_units.MeV + Bq = gate.g4_units.Bq + sec = gate.g4_units.second + + # change world size + sim.world.size = [1 * m, 1 * m, 1 * m] + + # add a simple fake volume to test hierarchy + # translation and rotation like in the Gate macro + fake = sim.add_volume("Box", "fake") + fake.size = [40 * cm, 40 * cm, 40 * cm] + fake.translation = [1 * cm, 2 * cm, 3 * cm] + fake.material = "G4_AIR" + fake.color = [1, 0, 1, 1] + + # waterbox + waterbox = sim.add_volume("Box", "waterbox") + waterbox.mother = fake + waterbox.size = [20 * cm, 20 * cm, 20 * cm] + waterbox.translation = [-3 * cm, -2 * cm, -1 * cm] + waterbox.rotation = Rotation.from_euler("y", -20, degrees=True).as_matrix() + waterbox.material = "G4_WATER" + waterbox.color = [0, 0, 1, 1] + + # physics + sim.physics_manager.set_production_cut("world", "all", 700 * um) + + # default source for tests + # the source is fixed at the center, only the volume will move + source = sim.add_source("GenericSource", "mysource") + source.energy.mono = 150 * MeV + source.particle = "proton" + source.position.type = "disc" + source.position.radius = 5 * mm + source.direction.type = "momentum" + source.direction.momentum = [0, 0, 1] + source.activity = 30000 * Bq + + # add dose actor + dose = sim.add_actor("DoseActor", "dose") + dose.output_filename = "test030.mhd" + dose.attached_to = waterbox + dose.size = [99, 99, 99] + mm = gate.g4_units.mm + dose.spacing = [2 * mm, 2 * mm, 2 * mm] + dose.translation = [2 * mm, 3 * mm, -2 * mm] + dose.edep.keep_data_per_run = True + dose.edep.auto_merge = True + dose.edep_uncertainty.active = True + + # add stat actor + stats = sim.add_actor("SimulationStatisticsActor", "Stats") + + # motion + n = 3 + interval_length = 1 * sec / n + sim.run_timing_intervals = [ + (i * interval_length, (i + 1) * interval_length) for i in range(n) + ] + gantry_angles_deg = [i * 20 for i in range(n)] + ( + dynamic_translations, + dynamic_rotations, + ) = gate.geometry.utility.get_transform_orbiting( + initial_position=fake.translation, axis="Y", angle_deg=gantry_angles_deg + ) + fake.add_dynamic_parametrisation( + translation=dynamic_translations, rotation=dynamic_rotations + ) + + # start simulation + sim.run(number_of_sub_processes=3 * len(sim.run_timing_intervals)) + + # # print results at the end + # print(stats) + # + # # tests + # stats_ref = utility.read_stat_file(paths.output_ref / "stats030.txt") + # is_ok = utility.assert_stats(stats, stats_ref, 0.11) + # + # print() + # gate.exception.warning("Difference for EDEP") + # is_ok = ( + # utility.assert_images( + # paths.output_ref / "test030-edep.mhd", + # dose.edep.get_output_path(), + # stats, + # tolerance=30, + # ignore_value=0, + # ) + # and is_ok + # ) + # + # print("\nDifference for uncertainty") + # is_ok = ( + # utility.assert_images( + # paths.output_ref / "test030-edep_uncertainty.mhd", + # dose.edep_uncertainty.get_output_path(), + # stats, + # tolerance=15, + # ignore_value=1, + # ) + # and is_ok + # ) + # + # utility.test_ok(is_ok) diff --git a/opengate/tests/src/test081_simulation_optigan_with_random_seed.py b/opengate/tests/src/test081_simulation_optigan_with_random_seed.py index b7b1c04d1..f3365bd1d 100755 --- a/opengate/tests/src/test081_simulation_optigan_with_random_seed.py +++ b/opengate/tests/src/test081_simulation_optigan_with_random_seed.py @@ -10,7 +10,7 @@ import os if __name__ == "__main__": - paths = tu.get_default_test_paths(__file__, output_folder="test075_optigan") + paths = tu.get_default_test_paths(__file__, output_folder="test081_optigan") # create simulation sim = gate.Simulation() @@ -76,7 +76,7 @@ "TrackID", ] - phsp_actor.output_filename = "test075_simulation_optigan_with_random_seed_600.root" + phsp_actor.output_filename = "simulation_optigan_with_random_seed_600.root" # add a kill actor to the crystal ka = sim.add_actor("KillActor", "kill_actor2") diff --git a/opengate/tests/src/test082_multiprocessing_1.py b/opengate/tests/src/test082_multiprocessing_1.py new file mode 100755 index 000000000..6ee1c2544 --- /dev/null +++ b/opengate/tests/src/test082_multiprocessing_1.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +from opengate.utility import g4_units +import opengate as gate +from opengate.tests.utility import get_default_test_paths, test_ok + +if __name__ == "__main__": + paths = get_default_test_paths(__file__, output_folder="test080") + + s = g4_units.s + + sim = gate.Simulation() + sim.run_timing_intervals = [[0 * s, 1 * s], [1 * s, 3 * s], [10 * s, 15 * s]] + sim.output_dir = paths.output + sim.store_json_archive = True + + box1 = sim.add_volume("BoxVolume", "box1") + box1.add_dynamic_parametrisation( + translation=[[i, i, i] for i in range(len(sim.run_timing_intervals))] + ) + + n_proc = 4 * len(sim.run_timing_intervals) + + sim.run(number_of_sub_processes=n_proc) + + ids = [m.simulation_id for m in sim.meta_data_per_process.values()] + print("ID of the main sim:") + print(id(sim)) + print(f"ID of the sims in subprocesses:") + for _id in ids: + print(_id) + + # check that the ID of the Simulation instance in the main process is + # different from the IDs in the subprocesses + is_ok = id(sim) not in ids + + # Check that the IDs in the subprocesses are mutually independent + is_ok = is_ok and len(set(ids)) == len(ids) + + test_ok(is_ok) diff --git a/opengate/tests/src/test082_multiprocessing_handler.py b/opengate/tests/src/test082_multiprocessing_handler.py new file mode 100755 index 000000000..db1d18e9f --- /dev/null +++ b/opengate/tests/src/test082_multiprocessing_handler.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +from opengate.utility import g4_units +import opengate as gate +from opengate.tests.utility import get_default_test_paths +from opengate.processing import MultiProcessingHandlerEqualPerRunTimingInterval + + +if __name__ == "__main__": + paths = get_default_test_paths(__file__, output_folder="test080") + + s = g4_units.s + + sim = gate.Simulation() + sim.run_timing_intervals = [[0 * s, 1 * s], [1 * s, 3 * s], [10 * s, 15 * s]] + sim.output_dir = paths.output + + box1 = sim.add_volume("BoxVolume", "box1") + box1.add_dynamic_parametrisation( + translation=[[i, i, i] for i in range(len(sim.run_timing_intervals))] + ) + + n_proc = 4 * len(sim.run_timing_intervals) + + multi_proc_handler = MultiProcessingHandlerEqualPerRunTimingInterval( + name="multi_proc_handler", simulation=sim, number_of_processes=n_proc + ) diff --git a/opengate/tests/utility.py b/opengate/tests/utility.py index 0878f3f58..70ea3ecc5 100644 --- a/opengate/tests/utility.py +++ b/opengate/tests/utility.py @@ -71,7 +71,7 @@ def read_stat_file_json(filename): for k, d in data.items(): counts[k] = d["value"] stat = SimulationStatisticsActor(name=r) - stat.user_output.stats.store_data(counts) + stat.user_output.stats.store_data("merged", counts) return stat @@ -112,7 +112,7 @@ def read_stat_file_legacy(filename): counts.nb_threads = int(a) except: counts.nb_threads = "?" - stat.user_output.stats.store_data(counts) + stat.user_output.stats.store_data("merged", counts) return stat @@ -128,21 +128,20 @@ def print_test(b, s): def assert_stats(stats_actor_1, stats_actor_2, tolerance=0): return assert_stats_json( - stats_actor_1.user_output.stats, - stats_actor_2.user_output.stats, + stats_actor_1, + stats_actor_2, tolerance, track_types_flag=stats_actor_1.track_types_flag, ) def assert_stats_json(stats_actor_1, stats_actor_2, tolerance=0, track_types_flag=None): - output1 = stats_actor_1 # .user_output.stats - output2 = stats_actor_2 # .user_output.stats - if track_types_flag is None: - track_types_flag = len(output1.track_types) > 0 - counts1 = output1.merged_data - counts2 = output2.merged_data + counts1 = stats_actor_1.counts + counts2 = stats_actor_2.counts + + if track_types_flag is None: + track_types_flag = len(counts1.track_types) > 0 if counts2.events != 0: event_d = counts1.events / counts2.events * 100 - 100 else: @@ -155,18 +154,18 @@ def assert_stats_json(stats_actor_1, stats_actor_2, tolerance=0, track_types_fla step_d = counts1.steps / counts2.steps * 100 - 100 else: step_d = 100 - if output2.pps != 0: - pps_d = output1.pps / output2.pps * 100 - 100 + if counts2.pps != 0: + pps_d = counts1.pps / counts2.pps * 100 - 100 else: pps_d = 100 - if output2.tps != 0: - tps_d = output1.tps / output2.tps * 100 - 100 + if counts2.tps != 0: + tps_d = counts1.tps / counts2.tps * 100 - 100 else: tps_d = 100 - if output2.sps != 0: - sps_d = output1.sps / output2.sps * 100 - 100 + if counts2.sps != 0: + sps_d = counts1.sps / counts2.sps * 100 - 100 else: sps_d = 100 @@ -198,17 +197,17 @@ def assert_stats_json(stats_actor_1, stats_actor_2, tolerance=0, track_types_fla print_test( True, - f"PPS: {output1.pps:.1f} {output2.pps:.1f} : " + f"PPS: {counts1.pps:.1f} {counts2.pps:.1f} : " f"{pps_d:+.1f}% speedup = x{(pps_d + 100) / 100:.1f}", ) print_test( True, - f"TPS: {output1.tps:.1f} {output2.tps:.1f} : " + f"TPS: {counts1.tps:.1f} {counts2.tps:.1f} : " f"{tps_d:+.1f}% speedup = x{(tps_d + 100) / 100:.1f}", ) print_test( True, - f"SPS: {output1.sps:.1f} {output2.sps:.1f} : " + f"SPS: {counts1.sps:.1f} {counts2.sps:.1f} : " f"{sps_d:+.1f}% speedup = x{(sps_d + 100) / 100:.1f}", ) @@ -237,7 +236,7 @@ def assert_stats_json(stats_actor_1, stats_actor_2, tolerance=0, track_types_fla n += int(t) b = n == counts1.tracks print_test(b, f"Tracks : {counts1.track_types}") - if "track_types" in counts2: + if hasattr(counts2, "track_types"): print_test(b, f"Tracks (ref): {counts2.track_types}") print_test(b, f"Tracks vs track_types : {counts1.tracks} {n}") is_ok = b and is_ok @@ -323,6 +322,10 @@ def assert_images( scaleImageValuesFactor=None, sad_profile_tolerance=None, ): + + if stats is not None: + DeprecationWarning("kwarg 'stats' in function assert_images is deprecated.") + # read image and info (size, spacing, etc.) ref_filename1 = ensure_filename_is_str(ref_filename1) filename2 = ensure_filename_is_str(filename2) @@ -378,11 +381,6 @@ def assert_images( print(f"Image1: {info1.size} {info1.spacing} {info1.origin} {ref_filename1}") print(f"Image2: {info2.size} {info2.spacing} {info2.origin} {filename2}") - # normalise by event - if stats is not None: - d1 = d1 / stats.counts.events - d2 = d2 / stats.counts.events - # normalize by sum of d1 s = np.sum(d2) d1 = d1 / s