diff --git a/openmc/deplete/abc.py b/openmc/deplete/abc.py index ee742f22bc5..2c6d177c4d0 100644 --- a/openmc/deplete/abc.py +++ b/openmc/deplete/abc.py @@ -31,6 +31,7 @@ from .pool import deplete from .reaction_rates import ReactionRates from .transfer_rates import TransferRates, ExternalSourceRates +from .keff_search_control import _KeffSearchControl __all__ = [ @@ -617,6 +618,11 @@ class Integrator(ABC): External source rates for the depletion system. .. versionadded:: 0.15.3 + keff_search_control : openmc.deplete._KeffSearchControl + Instance of _KeffSearchControl class to perform keff search during + transport-depletion simulation. + + .. versionadded:: 0.15.4 """) @@ -684,6 +690,7 @@ def __init__( self.transfer_rates = None self.external_source_rates = None + self._keff_search_control = None if isinstance(solver, str): # Delay importing of cram module, which requires this file @@ -729,6 +736,15 @@ def solver(self, func): self._solver = func + @property + def keff_search_control(self): + return self._keff_search_control + + @keff_search_control.setter + def keff_search_control(self, keff_search_control): + check_type('keff search control', keff_search_control, _KeffSearchControl) + self._keff_search_control = keff_search_control + def _timed_deplete(self, n, rates, dt, i=None, matrix_func=None): start = time.time() results = deplete( @@ -837,6 +853,13 @@ def _get_start_data(self) -> tuple[float, int]: return (self.operator.prev_res[-1].time[0], len(self.operator.prev_res) - 1) + def _get_bos_from_keff_search_control(self, step_index, bos_conc): + """Get BOS from keff search control.""" + x = deepcopy(bos_conc) + # Get new vector after keff criticality control + x, keff_search_root = self._keff_search_control.search_for_keff(x, step_index) + return x, keff_search_root + def integrate( self, final_step: bool = True, @@ -877,10 +900,19 @@ def integrate( # Solve transport equation (or obtain result from restart) if i > 0 or self.operator.prev_res is None: + # Update geometry/material according to keff search control + if self._keff_search_control is not None and source_rate != 0.0: + n, keff_search_root = self._get_bos_from_keff_search_control(i, n) + else: + keff_search_root = None n, res = self._get_bos_data_from_operator(i, source_rate, n) else: n, res = self._get_bos_data_from_restart(source_rate, n) - + # Get keff search root from keff search control + if self._keff_search_control: + n, keff_search_root = self._get_bos_from_keff_search_control(i, n) + else: + keff_search_root = None # Solve Bateman equations over time interval proc_time, n_end = self(n, res.rates, dt, source_rate, i) @@ -893,6 +925,7 @@ def integrate( self._i_res + i, proc_time, write_rates=write_rates, + keff_search_root=keff_search_root, path=path ) @@ -906,6 +939,10 @@ def integrate( # solve) if output and final_step and comm.rank == 0: print(f"[openmc.deplete] t={t} (final operator evaluation)") + if self._keff_search_control is not None and source_rate != 0.0: + n, keff_search_root = self._get_bos_from_keff_search_control(i+1, n) + else: + keff_search_root = None res_final = self.operator(n, source_rate if final_step else 0.0) StepResult.save( self.operator, @@ -916,6 +953,7 @@ def integrate( self._i_res + len(self), proc_time, write_rates=write_rates, + keff_search_root=keff_search_root, path=path ) self.operator.write_bos_data(len(self) + self._i_res) @@ -1048,6 +1086,113 @@ def add_redox(self, material, buffer, oxidation_states, timesteps=None): self.transfer_rates.set_redox(material, buffer, oxidation_states, timesteps) + def add_keff_search_control( + self, + function: Callable, + x0: float, + x1: float, + bracket: list[float], + **search_kwargs + ): + """Add keff search to the integrator scheme. + + This method creates a :class:`openmc.deplete._KeffSearchControl` that + performs keff searches during depletion to maintain a target keff + by adjusting a model parameter through the provided function. + + .. important:: + The function **must** modify the model through ``openmc.lib`` (e.g., + ``openmc.lib.cells``, ``openmc.lib.materials``) and **NOT** through + ``openmc.model``. The function is called within a + :class:`openmc.lib.TemporarySession` context where only the C API + (``openmc.lib``) is available for modifications. + + Parameters + ---------- + function : Callable + Function that modifies the model through ``openmc.lib`` based on a + parameter value. The function should take a single float parameter + and modify ``openmc.lib`` objects accordingly (e.g., adjust control + rod position via ``openmc.lib.cells[...].translation``, material + density via ``openmc.lib.materials[...].set_densities(...)``, etc.). + + **Important**: The function must modify ``openmc.lib`` objects, not + ``openmc.model`` objects. + x0: float + Initial lower bound for the keff search. + x1: float + Initial upper bound for the keff search. + bracket : list[float] + Bracket interval [x_min, x_max] that constrains the allowed parameter + values during the keff search. This is a required parameter + that defines the absolute bounds for the search. The bracket must contain + exactly 2 elements with bracket[0] < bracket[1]. These values are passed + directly to the ``x_min`` and ``x_max`` optional arguments in + :meth:`openmc.Model.keff_search`, which enforce hard limits on the + parameter range. If the keff search converges to a value outside this + bracket, it will be clamped to the nearest bracket bound with a warning. + **search_kwargs + Additional keyword arguments passed to + :meth:`openmc.Model.keff_search`. Common options include: + + * ``target`` : float, optional + Target keff value to search for. Defaults to 1.0. + * ``k_tol`` : float, optional + Stopping criterion on the function value. Defaults to 1e-4. + * ``sigma_final`` : float, optional + Maximum accepted k-effective uncertainty. Defaults to 3e-4. + * ``maxiter`` : int, optional + Maximum number of iterations. Defaults to 50. + + See :meth:`openmc.Model.keff_search` for a complete list of + available options. + + Examples + -------- + Add keff search that adjusts a control rod position: + + >>> def adjust_rod_position(position): + ... openmc.lib.cells[rod_cell.id].translation = [0, 0, position] + >>> integrator.add_keff_search_control( + ... adjust_rod_position, + ... x0=0.0, + ... x1=5.0, + ... bracket=[-10,10], + ... target=1.0, + ... k_tol=1e-4 + ... ) + + Add keff search that adjusts material density: + + >>> def adjust_material_density(density_factor): + ... # Get the material from openmc.lib + ... lib_mat = openmc.lib.materials[material_id] + ... # Get current nuclides and densities + ... nuclides = lib_mat.nuclides + ... current_densities = lib_mat.densities + ... # Scale all densities by the factor + ... new_densities = densities * density_factor + ... # Update the material densities + ... lib_mat.set_densities(nuclides, new_densities) + >>> integrator.add_keff_search_control( + ... adjust_material_density, + ... x0=0.8, + ... x1=1.2, + ... bracket=[0.2, 1.5], + ... target=1.0 + ... ) + + .. versionadded:: 0.15.4 + + """ + self._keff_search_control = _KeffSearchControl( + self.operator, + function, + x0, + x1, + bracket, + **search_kwargs) + @add_params class SIIntegrator(Integrator): r"""Abstract class for the Stochastic Implicit Euler integrators diff --git a/openmc/deplete/keff_search_control.py b/openmc/deplete/keff_search_control.py new file mode 100644 index 00000000000..fc8652154ac --- /dev/null +++ b/openmc/deplete/keff_search_control.py @@ -0,0 +1,146 @@ +import openmc.lib +from openmc.mpi import comm +from typing import Callable +from warnings import warn + +class _KeffSearchControl: + """Controller for keff search during depletion calculations. + + This class performs keff searches to maintain a target keff by adjusting + a model parameter through a provided function. + + Parameters + ---------- + operator : openmc.deplete.Operator + Depletion operator instance + function : Callable + Function that modifies the model based on a parameter value + x0 : float + Initial lower bound for the keff search + x1 : float + Initial upper bound for the keff search + bracket : list[float] + Absolute bracketing interval lower and upper + if keff search solution lies off these limits the closest + limit will be set as new result. + search_kwargs : dict, optional + Additional keyword arguments to pass to `model.keff_search` + """ + def __init__(self, operator, function: Callable, x0: float, x1: float, bracket: list[float], **search_kwargs): + if len(bracket) != 2: + raise ValueError(f"bracket must have exactly 2 elements, got {len(bracket)}") + if bracket[0] >= bracket[1]: + raise ValueError(f"bracket[0] must be < bracket[1], got {bracket}") + self.x0 = x0 + self.x1 = x1 + self.operator = operator + self.function = function + self.search_kwargs = search_kwargs + self.search_kwargs['x_min'] = bracket[0] + self.search_kwargs['x_max'] = bracket[1] + + def search_for_keff(self, x, step_index): + """Perform keff search and update the atom density vector. + + Parameters + ---------- + x : list of numpy.ndarray + Current atom density vector (atoms per material) + step_index : int + Current depletion step index + + Returns + ------- + x : list of numpy.ndarray + Updated atom density vector + root : float + Parameter value that achieves target keff + """ + root = self._search_for_keff() + x = self._update_vec(x) + return x, root + + def _search_for_keff(self) -> float: + """Perform the keff search using the model's keff_search method. + + Returns + ------- + float + Parameter value that achieves target keff + + Raises + ------ + ValueError + If the keff search fails to converge + """ + with openmc.lib.TemporarySession(model=self.operator.model) as session: + # Only pass the first 3 required args plus explicitly provided kwargs + result = self.operator.model.keff_search( + self.function, + self.x0, + self.x1, + **self.search_kwargs + ) + if not result.converged: + raise ValueError( + f"Search for keff failed to converge. " + f"Termination reason: {result.flag}" + ) + + root = result.root + + # Check if root is outside the bracket bounds and give a warning + if root < self.search_kwargs['x_min']: + warn( + f"keff search result ({root:.6f}) is below the lower bracket bound " + f"({self.search_kwargs['x_min']:.6f}). Clamping to bracket lower bound.", + UserWarning + ) + + elif root > self.search_kwargs['x_max']: + warn( + f"keff search result ({root:.6f}) is above the upper bracket bound " + f"({self.search_kwargs['x_max']:.6f}). Clamping to bracket upper bound.", + UserWarning + ) + + #restore the number of initial batches + openmc.lib.settings.set_batches(self.operator.model.settings.batches) + + return root + + def _update_vec(self, x): + """Update the atom density vector from openmc.lib.materials and AtomNumber object. + + This method synchronizes the atom densities across all MPI ranks by + broadcasting the number object from each rank and updating the x vector + with the current atom densities from openmc.lib.materials or AtomNumber object + if the nuclide is not in openmc.lib.materials. + + Parameters + ---------- + x : list of numpy.ndarray + Atom density vector to update (atoms per material) + + Returns + ------- + list of numpy.ndarray + Updated atom density vector synchronized across all ranks + """ + for rank in range(comm.size): + number_i = comm.bcast(self.operator.number, root=rank) + + for mat_idx, mat in enumerate(number_i.materials): + for nuc in number_i.nuclides: + if nuc in number_i.burnable_nuclides: + nuc_idx = number_i.burnable_nuclides.index(nuc) + volume = number_i.get_mat_volume(mat) # cm^3 + if nuc in openmc.lib.materials[int(mat)].nuclides: + _nuc_idx = openmc.lib.materials[int(mat)].nuclides.index(nuc) + val = 1.0e24 * openmc.lib.materials[int(mat)].densities[_nuc_idx] # atom/cm^3 + else: + val = number_i.get_atom_density(mat, nuc) # atom/cm^3 + x[mat_idx][nuc_idx] = val * volume + + x = comm.bcast(x, root=rank) + return x \ No newline at end of file diff --git a/openmc/deplete/stepresult.py b/openmc/deplete/stepresult.py index ff39e9acb6d..dc205774f2c 100644 --- a/openmc/deplete/stepresult.py +++ b/openmc/deplete/stepresult.py @@ -57,6 +57,8 @@ class StepResult: proc_time : int Average time spent depleting a material across all materials and processes + keff_search_root : float + The root returned by the keff search control. """ def __init__(self): @@ -72,6 +74,7 @@ def __init__(self): self.mat_to_hdf5_ind = None self.data = None + self.keff_search_root = None def __repr__(self): t = self.time[0] @@ -356,6 +359,10 @@ def _write_hdf5_metadata(self, handle, write_rates): "depletion time", (1,), maxshape=(None,), dtype="float64") + handle.create_dataset( + "keff_search_root", (1,), maxshape=(None,), + dtype="float64") + def _to_hdf5(self, handle, index, parallel=False, write_rates: bool = False): """Converts results object into an hdf5 object. @@ -388,6 +395,7 @@ def _to_hdf5(self, handle, index, parallel=False, write_rates: bool = False): time_dset = handle["/time"] source_rate_dset = handle["/source_rate"] proc_time_dset = handle["/depletion time"] + keff_search_root_dset = handle["/keff_search_root"] # Get number of results stored number_shape = list(number_dset.shape) @@ -421,6 +429,10 @@ def _to_hdf5(self, handle, index, parallel=False, write_rates: bool = False): proc_shape[0] = new_shape proc_time_dset.resize(proc_shape) + keff_search_root_shape = list(keff_search_root_dset.shape) + keff_search_root_shape[0] = new_shape + keff_search_root_dset.resize(keff_search_root_shape) + # If nothing to write, just return if len(self.index_mat) == 0: return @@ -440,6 +452,7 @@ def _to_hdf5(self, handle, index, parallel=False, write_rates: bool = False): proc_time_dset[index] = ( self.proc_time / (comm.size * self.n_hdf5_mats) ) + keff_search_root_dset[index] = self.keff_search_root @classmethod def from_hdf5(cls, handle, step): @@ -488,6 +501,10 @@ def from_hdf5(cls, handle, step): if step < proc_time_dset.shape[0]: results.proc_time = proc_time_dset[step] + if "keff_search_root" in handle: + keff_search_root_dset = handle["/keff_search_root"] + results.keff_search_root = keff_search_root_dset[step] + if results.proc_time is None: results.proc_time = np.array([np.nan]) @@ -539,6 +556,7 @@ def save( step_ind, proc_time=None, write_rates: bool = False, + keff_search_root=None, path: PathLike = "depletion_results.h5" ): """Creates and writes depletion results to disk @@ -563,6 +581,8 @@ def save( processes. write_rates : bool, optional Whether reaction rates should be written to the results file. + keff_search_root : float + The root returned by the keff search control. path : PathLike Path to file to write. Defaults to 'depletion_results.h5'. @@ -590,6 +610,7 @@ def save( results.proc_time = proc_time if results.proc_time is not None: results.proc_time = comm.reduce(proc_time, op=MPI.SUM) + results.keff_search_root = keff_search_root if not Path(path).is_file(): Path(path).parent.mkdir(parents=True, exist_ok=True) diff --git a/openmc/search.py b/openmc/search.py index 70ce011b634..cd626e4f77f 100644 --- a/openmc/search.py +++ b/openmc/search.py @@ -202,3 +202,4 @@ def search_for_keff(model_builder, initial_guess=None, target=1.0, zero_value = root_finder(**args) return zero_value, guesses, results + \ No newline at end of file diff --git a/tests/regression_tests/deplete_with_keff_search_control/__init__.py b/tests/regression_tests/deplete_with_keff_search_control/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/regression_tests/deplete_with_keff_search_control/ref_depletion_with_refuel.h5 b/tests/regression_tests/deplete_with_keff_search_control/ref_depletion_with_refuel.h5 new file mode 100644 index 00000000000..a335e3b4780 Binary files /dev/null and b/tests/regression_tests/deplete_with_keff_search_control/ref_depletion_with_refuel.h5 differ diff --git a/tests/regression_tests/deplete_with_keff_search_control/ref_depletion_with_rotation.h5 b/tests/regression_tests/deplete_with_keff_search_control/ref_depletion_with_rotation.h5 new file mode 100644 index 00000000000..c8b4f67ff25 Binary files /dev/null and b/tests/regression_tests/deplete_with_keff_search_control/ref_depletion_with_rotation.h5 differ diff --git a/tests/regression_tests/deplete_with_keff_search_control/ref_depletion_with_translation.h5 b/tests/regression_tests/deplete_with_keff_search_control/ref_depletion_with_translation.h5 new file mode 100644 index 00000000000..35285321bc2 Binary files /dev/null and b/tests/regression_tests/deplete_with_keff_search_control/ref_depletion_with_translation.h5 differ diff --git a/tests/regression_tests/deplete_with_keff_search_control/test.py b/tests/regression_tests/deplete_with_keff_search_control/test.py new file mode 100644 index 00000000000..0af6acd53ed --- /dev/null +++ b/tests/regression_tests/deplete_with_keff_search_control/test.py @@ -0,0 +1,137 @@ +""" Tests for KeffSearchControl class """ + +from hmac import new +from pathlib import Path +import shutil +import sys + +import pytest +import numpy as np + +import openmc +import openmc.lib +from openmc.deplete import CoupledOperator + +from tests.regression_tests import config + +@pytest.fixture +def model(): + f = openmc.Material(name='f') + f.set_density('g/cm3', 10.29769) + f.add_element('U', 1., enrichment=2.4) + f.add_element('O', 2.) + + h = openmc.Material(name='h') + h.set_density('g/cm3', 0.001598) + h.add_element('He', 2.4044e-4) + + w = openmc.Material(name='w') + w.set_density('g/cm3', 0.740582) + w.add_element('H', 2) + w.add_element('O', 1) + + # Define overall material + materials = openmc.Materials([f, h, w]) + + # Define surfaces + radii = [0.5, 0.8, 1] + height = 80 + surf_in = openmc.ZCylinder(r=radii[0]) + surf_mid = openmc.ZCylinder(r=radii[1]) + surf_out = openmc.ZCylinder(r=radii[2], boundary_type='reflective') + surf_top = openmc.ZPlane(z0=height/2, boundary_type='vacuum') + surf_bot = openmc.ZPlane(z0=-height/2, boundary_type='vacuum') + + surf_trans = openmc.ZPlane(z0=0) + surf_rot1 = openmc.XPlane(x0=0) + surf_rot2 = openmc.YPlane(y0=0) + + # Define cells + cell_f = openmc.Cell(name='fuel_cell', fill=f, + region=-surf_in & -surf_top & +surf_bot) + cell_g = openmc.Cell(fill=h, + region = +surf_in & -surf_mid & -surf_top & +surf_bot & +surf_rot2) + + # Define unbounded cells for rotation universe + cell_w = openmc.Cell(fill=w, region = -surf_rot1) + cell_h = openmc.Cell(fill=h, region = +surf_rot1) + universe_rot = openmc.Universe(cells=(cell_w, cell_h)) + cell_rot = openmc.Cell(name="rot_cell", fill=universe_rot, + region = +surf_in & -surf_mid & -surf_top & +surf_bot & -surf_rot2) + + # Define unbounded cells for translation universe + cell_w = openmc.Cell(fill=w, region=+surf_in & -surf_trans ) + cell_h = openmc.Cell(fill=h, region=+surf_in & +surf_trans) + universe_trans = openmc.Universe(cells=(cell_w, cell_h)) + cell_trans = openmc.Cell(name="trans_cell", fill=universe_trans, + region=+surf_mid & -surf_out & -surf_top & +surf_bot) + + # Define overall geometry + geometry = openmc.Geometry([cell_f, cell_g, cell_rot, cell_trans]) + + # Set material volume for depletion fuel. + f.volume = np.pi * radii[0]**2 * height + + settings = openmc.Settings() + settings.particles = 1000 + settings.inactive = 10 + settings.batches = 50 + + return openmc.Model(geometry, materials, settings) + +def translate_cell(position): + cell_trans = [cell for cell in openmc.lib.cells.values() if cell.name == 'trans_cell'][0] + openmc.lib.cells[cell_trans.id].translation = [0, 0, position] + +def rotate_cell(angle): + cell_rot = [cell for cell in openmc.lib.cells.values() if cell.name == 'rot_cell'][0] + openmc.lib.cells[cell_rot.id].rotation = [0, 0, angle] + +def adjust_material_density(density_factor): + f = [material for material in openmc.lib.materials.values() if material.name == 'f'][0] + nuclides = openmc.lib.materials[f.id].nuclides + densities = openmc.lib.materials[f.id].densities + nuc_idx = nuclides.index('U235') + new_density = densities[nuc_idx] * density_factor + densities[nuc_idx] = new_density + openmc.lib.materials[f.id].set_densities(nuclides, densities) + +@pytest.mark.parametrize("function, x0, x1, bracket, ref_result", [ + (translate_cell, -11, -5, [-15,0], 'depletion_with_translation'), + (rotate_cell, -80, -50, [-90,0], 'depletion_with_rotation'), + (adjust_material_density, 0.5, 2, [0.3, 3.0], 'depletion_with_refuel') + ]) + +def test_keff_search_control(run_in_tmpdir, model, function, x0, x1, bracket, ref_result): + + chain_file = Path(__file__).parents[2] / 'chain_simple.xml' + op = CoupledOperator(model, chain_file) + + integrator = openmc.deplete.PredictorIntegrator( + op, [1], 174., timestep_units = 'd') + integrator.add_keff_search_control( + function=function, + x0=x0, + x1=x1, + bracket=bracket, + output=True, + k_tol=1e-1, + sigma_final=5e-2) + + integrator.integrate() + + # Get path to test and reference results + path_test = op.output_dir / 'depletion_results.h5' + path_reference = Path(__file__).with_name(f'ref_{ref_result}.h5') + + # If updating results, do so and return + if config['update']: + shutil.copyfile(str(path_test), str(path_reference)) + return + + # Load the reference/test results + res_test = openmc.deplete.Results(path_test) + res_ref = openmc.deplete.Results(path_reference) + + # Use high tolerance here + assert res_test[0].keff_search_root == pytest.approx(res_ref[0].keff_search_root, rel=2) diff --git a/tests/unit_tests/test_deplete_keff_search_control.py b/tests/unit_tests/test_deplete_keff_search_control.py new file mode 100644 index 00000000000..12b4020f0af --- /dev/null +++ b/tests/unit_tests/test_deplete_keff_search_control.py @@ -0,0 +1,121 @@ +""" Tests for KeffSearchControl class """ + +from pathlib import Path + +import pytest +import numpy as np + +import openmc +import openmc.lib +from openmc.deplete import CoupledOperator + +CHAIN_PATH = Path(__file__).parents[1] / "chain_simple.xml" + +@pytest.fixture +def model(): + f = openmc.Material(name="fuel") + f.add_element("U", 1, percent_type="ao", enrichment=4.25) + f.add_element("O", 2) + f.set_density("g/cc", 10.4) + f.temperature = 293.15 + + w = openmc.Material(name="water") + w.add_element("O", 1) + w.add_element("H", 2) + w.set_density("g/cc", 1.0) + w.temperature = 293.15 + w.depletable = True + + h = openmc.Material(name='helium') + h.add_element('He', 1) + h.set_density('g/cm3', 0.001598) + + radii = [0.42, 0.45] + height = 0.5 + + f.volume = np.pi * radii[0] ** 2 * height + w.volume = np.pi * (radii[1]**2 - radii[0]**2) * height/2 + + materials = openmc.Materials([f, w, h]) + + surf_interface = openmc.ZPlane(z0=0) + surf_top = openmc.ZPlane(z0=height/2) + surf_bot = openmc.ZPlane(z0=-height/2) + surf_in = openmc.Sphere(r=radii[0]) + surf_out = openmc.Sphere(r=radii[1], boundary_type='vacuum') + + cell_water = openmc.Cell(fill=w, region=-surf_interface) + cell_helium = openmc.Cell(fill=h, region=+surf_interface) + universe = openmc.Universe(cells=(cell_water, cell_helium)) + cell_fuel = openmc.Cell(name='fuel_cell', fill=f, + region=-surf_in & -surf_top & +surf_bot) + cell_universe = openmc.Cell(name='universe_cell',fill=universe, + region=+surf_in & -surf_out & -surf_top & +surf_bot) + geometry = openmc.Geometry([cell_fuel, cell_universe]) + + settings = openmc.Settings() + settings.particles = 1000 + settings.inactive = 10 + settings.batches = 50 + + return openmc.Model(geometry, materials, settings) + +@pytest.fixture +def operator(model): + return CoupledOperator(model, CHAIN_PATH) + +@pytest.fixture +def integrator(operator): + return openmc.deplete.PredictorIntegrator( + operator, [1,1], 0.0, timestep_units = 'd') + +def translate_cell(position): + """Helper function to translate a cell""" + cell = [c for c in openmc.lib.cells.values() if c.name == 'universe_cell'][0] + openmc.lib.cells[cell.id].translation = [0, 0, position] + return position + +def rotate_cell(angle): + """Helper function to rotate a cell""" + cell = [c for c in openmc.lib.cells.values() if c.name == 'universe_cell'][0] + openmc.lib.cells[cell.id].rotation = [0, 0, angle] + return angle + +def adjust_fuel_density(density_factor): + """Helper function to adjust fuel density""" + fuel = [m for m in openmc.lib.materials.values() if m.name == 'fuel'][0] + nuclides = openmc.lib.materials[fuel.id].nuclides + current_densities = openmc.lib.materials[fuel.id].densities + new_densities = [d * density_factor for d in current_densities] + openmc.lib.materials[fuel.id].set_densities(nuclides, new_densities) + return density_factor + +@pytest.mark.parametrize("function, x0, x1, bracket, test_value", [ + (translate_cell, -1.0, 1.0, [-5.0, 5.0], 0.5), + (rotate_cell, -45.0, 45.0, [-90.0, 90.0], 10.0), + (adjust_fuel_density, 0.8, 1.2, [0.5, 1.5], 1.0) +]) +def test_integrator_add_keff_search_control(run_in_tmpdir, model, operator, integrator, + function, x0, x1, bracket, test_value): + """Test adding add_keff_search_control to integrator""" + model.export_to_xml() + openmc.lib.init() + test_function = function(test_value) + # Test that add_reactivity_control method exists and works + integrator.add_keff_search_control( + function=test_function, + x0=x0, + x1=x1, + bracket=bracket, + k_tol=0.1, + output=False, + ) + + assert integrator.keff_search_control.x0 == x0 + assert integrator.keff_search_control.x1 == x1 + assert integrator.keff_search_control.function == test_function + assert integrator.keff_search_control.search_kwargs['x_min'] == bracket[0] + assert integrator.keff_search_control.search_kwargs['x_max'] == bracket[1] + assert integrator.keff_search_control.search_kwargs['k_tol'] == 0.1 + assert integrator.keff_search_control.search_kwargs['output'] == False + openmc.lib.finalize()