diff --git a/flixopt/components.py b/flixopt/components.py index 0cfed39eb..c5913c8e2 100644 --- a/flixopt/components.py +++ b/flixopt/components.py @@ -180,11 +180,11 @@ def create_model(self, model: FlowSystemModel) -> LinearConverterModel: self.submodel = LinearConverterModel(model, self) return self.submodel - def _set_flow_system(self, flow_system) -> None: + def link_to_flow_system(self, flow_system, prefix: str = '') -> None: """Propagate flow_system reference to parent Component and piecewise_conversion.""" - super()._set_flow_system(flow_system) + super().link_to_flow_system(flow_system, prefix) if self.piecewise_conversion is not None: - self.piecewise_conversion._set_flow_system(flow_system) + self.piecewise_conversion.link_to_flow_system(flow_system, self._sub_prefix('PiecewiseConversion')) def _plausibility_checks(self) -> None: super()._plausibility_checks() @@ -216,14 +216,13 @@ def _plausibility_checks(self) -> None: f'({flow.label_full}).' ) - def transform_data(self, name_prefix: str = '') -> None: - prefix = '|'.join(filter(None, [name_prefix, self.label_full])) - super().transform_data(prefix) + def transform_data(self) -> None: + super().transform_data() if self.conversion_factors: self.conversion_factors = self._transform_conversion_factors() if self.piecewise_conversion: self.piecewise_conversion.has_time_dim = True - self.piecewise_conversion.transform_data(f'{prefix}|PiecewiseConversion') + self.piecewise_conversion.transform_data() def _transform_conversion_factors(self) -> list[dict[str, xr.DataArray]]: """Converts all conversion factors to internal datatypes""" @@ -427,49 +426,50 @@ def create_model(self, model: FlowSystemModel) -> StorageModel: self.submodel = StorageModel(model, self) return self.submodel - def _set_flow_system(self, flow_system) -> None: + def link_to_flow_system(self, flow_system, prefix: str = '') -> None: """Propagate flow_system reference to parent Component and capacity_in_flow_hours if it's InvestParameters.""" - super()._set_flow_system(flow_system) + super().link_to_flow_system(flow_system, prefix) if isinstance(self.capacity_in_flow_hours, InvestParameters): - self.capacity_in_flow_hours._set_flow_system(flow_system) + self.capacity_in_flow_hours.link_to_flow_system(flow_system, self._sub_prefix('InvestParameters')) - def transform_data(self, name_prefix: str = '') -> None: - prefix = '|'.join(filter(None, [name_prefix, self.label_full])) - super().transform_data(prefix) + def transform_data(self) -> None: + super().transform_data() self.relative_minimum_charge_state = self._fit_coords( - f'{prefix}|relative_minimum_charge_state', self.relative_minimum_charge_state + f'{self.prefix}|relative_minimum_charge_state', self.relative_minimum_charge_state ) self.relative_maximum_charge_state = self._fit_coords( - f'{prefix}|relative_maximum_charge_state', self.relative_maximum_charge_state + f'{self.prefix}|relative_maximum_charge_state', self.relative_maximum_charge_state + ) + self.eta_charge = self._fit_coords(f'{self.prefix}|eta_charge', self.eta_charge) + self.eta_discharge = self._fit_coords(f'{self.prefix}|eta_discharge', self.eta_discharge) + self.relative_loss_per_hour = self._fit_coords( + f'{self.prefix}|relative_loss_per_hour', self.relative_loss_per_hour ) - self.eta_charge = self._fit_coords(f'{prefix}|eta_charge', self.eta_charge) - self.eta_discharge = self._fit_coords(f'{prefix}|eta_discharge', self.eta_discharge) - self.relative_loss_per_hour = self._fit_coords(f'{prefix}|relative_loss_per_hour', self.relative_loss_per_hour) if not isinstance(self.initial_charge_state, str): self.initial_charge_state = self._fit_coords( - f'{prefix}|initial_charge_state', self.initial_charge_state, dims=['period', 'scenario'] + f'{self.prefix}|initial_charge_state', self.initial_charge_state, dims=['period', 'scenario'] ) self.minimal_final_charge_state = self._fit_coords( - f'{prefix}|minimal_final_charge_state', self.minimal_final_charge_state, dims=['period', 'scenario'] + f'{self.prefix}|minimal_final_charge_state', self.minimal_final_charge_state, dims=['period', 'scenario'] ) self.maximal_final_charge_state = self._fit_coords( - f'{prefix}|maximal_final_charge_state', self.maximal_final_charge_state, dims=['period', 'scenario'] + f'{self.prefix}|maximal_final_charge_state', self.maximal_final_charge_state, dims=['period', 'scenario'] ) self.relative_minimum_final_charge_state = self._fit_coords( - f'{prefix}|relative_minimum_final_charge_state', + f'{self.prefix}|relative_minimum_final_charge_state', self.relative_minimum_final_charge_state, dims=['period', 'scenario'], ) self.relative_maximum_final_charge_state = self._fit_coords( - f'{prefix}|relative_maximum_final_charge_state', + f'{self.prefix}|relative_maximum_final_charge_state', self.relative_maximum_final_charge_state, dims=['period', 'scenario'], ) if isinstance(self.capacity_in_flow_hours, InvestParameters): - self.capacity_in_flow_hours.transform_data(f'{prefix}|InvestParameters') + self.capacity_in_flow_hours.transform_data() else: self.capacity_in_flow_hours = self._fit_coords( - f'{prefix}|capacity_in_flow_hours', self.capacity_in_flow_hours, dims=['period', 'scenario'] + f'{self.prefix}|capacity_in_flow_hours', self.capacity_in_flow_hours, dims=['period', 'scenario'] ) def _plausibility_checks(self) -> None: @@ -714,11 +714,10 @@ def create_model(self, model) -> TransmissionModel: self.submodel = TransmissionModel(model, self) return self.submodel - def transform_data(self, name_prefix: str = '') -> None: - prefix = '|'.join(filter(None, [name_prefix, self.label_full])) - super().transform_data(prefix) - self.relative_losses = self._fit_coords(f'{prefix}|relative_losses', self.relative_losses) - self.absolute_losses = self._fit_coords(f'{prefix}|absolute_losses', self.absolute_losses) + def transform_data(self) -> None: + super().transform_data() + self.relative_losses = self._fit_coords(f'{self.prefix}|relative_losses', self.relative_losses) + self.absolute_losses = self._fit_coords(f'{self.prefix}|absolute_losses', self.absolute_losses) class TransmissionModel(ComponentModel): @@ -729,6 +728,9 @@ def __init__(self, model: FlowSystemModel, element: Transmission): for flow in element.inputs + element.outputs: if flow.status_parameters is None: flow.status_parameters = StatusParameters() + flow.status_parameters.link_to_flow_system( + model.flow_system, f'{flow.label_full}|status_parameters' + ) super().__init__(model, element) diff --git a/flixopt/effects.py b/flixopt/effects.py index 5dd53258f..a7aae4bf4 100644 --- a/flixopt/effects.py +++ b/flixopt/effects.py @@ -237,50 +237,56 @@ def __init__( self.minimum_over_periods = minimum_over_periods self.maximum_over_periods = maximum_over_periods - def transform_data(self, name_prefix: str = '') -> None: - prefix = '|'.join(filter(None, [name_prefix, self.label_full])) - self.minimum_per_hour = self._fit_coords(f'{prefix}|minimum_per_hour', self.minimum_per_hour) - self.maximum_per_hour = self._fit_coords(f'{prefix}|maximum_per_hour', self.maximum_per_hour) + def link_to_flow_system(self, flow_system, prefix: str = '') -> None: + """Link this effect to a FlowSystem. + + Elements use their label_full as prefix by default, ignoring the passed prefix. + """ + super().link_to_flow_system(flow_system, self.label_full) + + def transform_data(self) -> None: + self.minimum_per_hour = self._fit_coords(f'{self.prefix}|minimum_per_hour', self.minimum_per_hour) + self.maximum_per_hour = self._fit_coords(f'{self.prefix}|maximum_per_hour', self.maximum_per_hour) self.share_from_temporal = self._fit_effect_coords( prefix=None, effect_values=self.share_from_temporal, - suffix=f'(temporal)->{prefix}(temporal)', + suffix=f'(temporal)->{self.prefix}(temporal)', dims=['time', 'period', 'scenario'], ) self.share_from_periodic = self._fit_effect_coords( prefix=None, effect_values=self.share_from_periodic, - suffix=f'(periodic)->{prefix}(periodic)', + suffix=f'(periodic)->{self.prefix}(periodic)', dims=['period', 'scenario'], ) self.minimum_temporal = self._fit_coords( - f'{prefix}|minimum_temporal', self.minimum_temporal, dims=['period', 'scenario'] + f'{self.prefix}|minimum_temporal', self.minimum_temporal, dims=['period', 'scenario'] ) self.maximum_temporal = self._fit_coords( - f'{prefix}|maximum_temporal', self.maximum_temporal, dims=['period', 'scenario'] + f'{self.prefix}|maximum_temporal', self.maximum_temporal, dims=['period', 'scenario'] ) self.minimum_periodic = self._fit_coords( - f'{prefix}|minimum_periodic', self.minimum_periodic, dims=['period', 'scenario'] + f'{self.prefix}|minimum_periodic', self.minimum_periodic, dims=['period', 'scenario'] ) self.maximum_periodic = self._fit_coords( - f'{prefix}|maximum_periodic', self.maximum_periodic, dims=['period', 'scenario'] + f'{self.prefix}|maximum_periodic', self.maximum_periodic, dims=['period', 'scenario'] ) self.minimum_total = self._fit_coords( - f'{prefix}|minimum_total', self.minimum_total, dims=['period', 'scenario'] + f'{self.prefix}|minimum_total', self.minimum_total, dims=['period', 'scenario'] ) self.maximum_total = self._fit_coords( - f'{prefix}|maximum_total', self.maximum_total, dims=['period', 'scenario'] + f'{self.prefix}|maximum_total', self.maximum_total, dims=['period', 'scenario'] ) self.minimum_over_periods = self._fit_coords( - f'{prefix}|minimum_over_periods', self.minimum_over_periods, dims=['scenario'] + f'{self.prefix}|minimum_over_periods', self.minimum_over_periods, dims=['scenario'] ) self.maximum_over_periods = self._fit_coords( - f'{prefix}|maximum_over_periods', self.maximum_over_periods, dims=['scenario'] + f'{self.prefix}|maximum_over_periods', self.maximum_over_periods, dims=['scenario'] ) self.period_weights = self._fit_coords( - f'{prefix}|period_weights', self.period_weights, dims=['period', 'scenario'] + f'{self.prefix}|period_weights', self.period_weights, dims=['period', 'scenario'] ) def create_model(self, model: FlowSystemModel) -> EffectModel: @@ -670,7 +676,7 @@ def _do_modeling(self): penalty_effect = self.effects._create_penalty_effect() # Link to FlowSystem (should already be linked, but ensure it) if penalty_effect._flow_system is None: - penalty_effect._set_flow_system(self._model.flow_system) + penalty_effect.link_to_flow_system(self._model.flow_system) # Create EffectModel for each effect for effect in self.effects.values(): diff --git a/flixopt/elements.py b/flixopt/elements.py index d2ebf7ac0..d97ff3ffb 100644 --- a/flixopt/elements.py +++ b/flixopt/elements.py @@ -20,7 +20,6 @@ Element, ElementModel, FlowSystemModel, - Interface, register_class_for_io, ) @@ -111,21 +110,23 @@ def create_model(self, model: FlowSystemModel) -> ComponentModel: self.submodel = ComponentModel(model, self) return self.submodel - def _set_flow_system(self, flow_system) -> None: - """Propagate flow_system reference to nested Interface objects and flows.""" - super()._set_flow_system(flow_system) + def link_to_flow_system(self, flow_system, prefix: str = '') -> None: + """Propagate flow_system reference to nested Interface objects and flows. + + Elements use their label_full as prefix by default, ignoring the passed prefix. + """ + super().link_to_flow_system(flow_system, self.label_full) if self.status_parameters is not None: - self.status_parameters._set_flow_system(flow_system) + self.status_parameters.link_to_flow_system(flow_system, self._sub_prefix('status_parameters')) for flow in self.inputs + self.outputs: - flow._set_flow_system(flow_system) + flow.link_to_flow_system(flow_system) - def transform_data(self, name_prefix: str = '') -> None: - prefix = '|'.join(filter(None, [name_prefix, self.label_full])) + def transform_data(self) -> None: if self.status_parameters is not None: - self.status_parameters.transform_data(prefix) + self.status_parameters.transform_data() for flow in self.inputs + self.outputs: - flow.transform_data() # Flow doesnt need the name_prefix + flow.transform_data() def _check_unique_flow_labels(self): all_flow_labels = [flow.label for flow in self.inputs + self.outputs] @@ -269,16 +270,18 @@ def create_model(self, model: FlowSystemModel) -> BusModel: self.submodel = BusModel(model, self) return self.submodel - def _set_flow_system(self, flow_system) -> None: - """Propagate flow_system reference to nested flows.""" - super()._set_flow_system(flow_system) + def link_to_flow_system(self, flow_system, prefix: str = '') -> None: + """Propagate flow_system reference to nested flows. + + Elements use their label_full as prefix by default, ignoring the passed prefix. + """ + super().link_to_flow_system(flow_system, self.label_full) for flow in self.inputs + self.outputs: - flow._set_flow_system(flow_system) + flow.link_to_flow_system(flow_system) - def transform_data(self, name_prefix: str = '') -> None: - prefix = '|'.join(filter(None, [name_prefix, self.label_full])) + def transform_data(self) -> None: self.imbalance_penalty_per_flow_hour = self._fit_coords( - f'{prefix}|imbalance_penalty_per_flow_hour', self.imbalance_penalty_per_flow_hour + f'{self.prefix}|imbalance_penalty_per_flow_hour', self.imbalance_penalty_per_flow_hour ) def _plausibility_checks(self) -> None: @@ -505,45 +508,49 @@ def create_model(self, model: FlowSystemModel) -> FlowModel: self.submodel = FlowModel(model, self) return self.submodel - def _set_flow_system(self, flow_system) -> None: - """Propagate flow_system reference to nested Interface objects.""" - super()._set_flow_system(flow_system) + def link_to_flow_system(self, flow_system, prefix: str = '') -> None: + """Propagate flow_system reference to nested Interface objects. + + Elements use their label_full as prefix by default, ignoring the passed prefix. + """ + super().link_to_flow_system(flow_system, self.label_full) if self.status_parameters is not None: - self.status_parameters._set_flow_system(flow_system) - if isinstance(self.size, Interface): - self.size._set_flow_system(flow_system) - - def transform_data(self, name_prefix: str = '') -> None: - prefix = '|'.join(filter(None, [name_prefix, self.label_full])) - self.relative_minimum = self._fit_coords(f'{prefix}|relative_minimum', self.relative_minimum) - self.relative_maximum = self._fit_coords(f'{prefix}|relative_maximum', self.relative_maximum) - self.fixed_relative_profile = self._fit_coords(f'{prefix}|fixed_relative_profile', self.fixed_relative_profile) - self.effects_per_flow_hour = self._fit_effect_coords(prefix, self.effects_per_flow_hour, 'per_flow_hour') + self.status_parameters.link_to_flow_system(flow_system, self._sub_prefix('status_parameters')) + if isinstance(self.size, InvestParameters): + self.size.link_to_flow_system(flow_system, self._sub_prefix('InvestParameters')) + + def transform_data(self) -> None: + self.relative_minimum = self._fit_coords(f'{self.prefix}|relative_minimum', self.relative_minimum) + self.relative_maximum = self._fit_coords(f'{self.prefix}|relative_maximum', self.relative_maximum) + self.fixed_relative_profile = self._fit_coords( + f'{self.prefix}|fixed_relative_profile', self.fixed_relative_profile + ) + self.effects_per_flow_hour = self._fit_effect_coords(self.prefix, self.effects_per_flow_hour, 'per_flow_hour') self.flow_hours_max = self._fit_coords( - f'{prefix}|flow_hours_max', self.flow_hours_max, dims=['period', 'scenario'] + f'{self.prefix}|flow_hours_max', self.flow_hours_max, dims=['period', 'scenario'] ) self.flow_hours_min = self._fit_coords( - f'{prefix}|flow_hours_min', self.flow_hours_min, dims=['period', 'scenario'] + f'{self.prefix}|flow_hours_min', self.flow_hours_min, dims=['period', 'scenario'] ) self.flow_hours_max_over_periods = self._fit_coords( - f'{prefix}|flow_hours_max_over_periods', self.flow_hours_max_over_periods, dims=['scenario'] + f'{self.prefix}|flow_hours_max_over_periods', self.flow_hours_max_over_periods, dims=['scenario'] ) self.flow_hours_min_over_periods = self._fit_coords( - f'{prefix}|flow_hours_min_over_periods', self.flow_hours_min_over_periods, dims=['scenario'] + f'{self.prefix}|flow_hours_min_over_periods', self.flow_hours_min_over_periods, dims=['scenario'] ) self.load_factor_max = self._fit_coords( - f'{prefix}|load_factor_max', self.load_factor_max, dims=['period', 'scenario'] + f'{self.prefix}|load_factor_max', self.load_factor_max, dims=['period', 'scenario'] ) self.load_factor_min = self._fit_coords( - f'{prefix}|load_factor_min', self.load_factor_min, dims=['period', 'scenario'] + f'{self.prefix}|load_factor_min', self.load_factor_min, dims=['period', 'scenario'] ) if self.status_parameters is not None: - self.status_parameters.transform_data(prefix) + self.status_parameters.transform_data() if isinstance(self.size, InvestParameters): - self.size.transform_data(prefix) + self.size.transform_data() else: - self.size = self._fit_coords(f'{prefix}|size', self.size, dims=['period', 'scenario']) + self.size = self._fit_coords(f'{self.prefix}|size', self.size, dims=['period', 'scenario']) def _plausibility_checks(self) -> None: # TODO: Incorporate into Variable? (Lower_bound can not be greater than upper bound @@ -955,11 +962,17 @@ def _do_modeling(self): for flow in all_flows: if flow.status_parameters is None: flow.status_parameters = StatusParameters() + flow.status_parameters.link_to_flow_system( + self._model.flow_system, f'{flow.label_full}|status_parameters' + ) if self.element.prevent_simultaneous_flows: for flow in self.element.prevent_simultaneous_flows: if flow.status_parameters is None: flow.status_parameters = StatusParameters() + flow.status_parameters.link_to_flow_system( + self._model.flow_system, f'{flow.label_full}|status_parameters' + ) # Create FlowModels (which creates their variables and constraints) for flow in all_flows: diff --git a/flixopt/flow_system.py b/flixopt/flow_system.py index 153354cce..bae700b85 100644 --- a/flixopt/flow_system.py +++ b/flixopt/flow_system.py @@ -730,6 +730,50 @@ def from_netcdf(cls, path: str | pathlib.Path) -> FlowSystem: flow_system.name = path.stem return flow_system + def copy(self) -> FlowSystem: + """Create a copy of the FlowSystem without optimization state. + + Creates a new FlowSystem with copies of all elements, but without: + - The solution dataset + - The optimization model + - Element submodels and variable/constraint names + + This is useful for creating variations of a FlowSystem for different + optimization scenarios without affecting the original. + + Returns: + A new FlowSystem instance that can be modified and optimized independently. + + Examples: + >>> original = FlowSystem(timesteps) + >>> original.add_elements(boiler, bus) + >>> original.optimize(solver) # Original now has solution + >>> + >>> # Create a copy to try different parameters + >>> variant = original.copy() # No solution, can be modified + >>> variant.add_elements(new_component) + >>> variant.optimize(solver) + """ + # Temporarily clear solution to use standard serialization without solution data + original_solution = self._solution + self._solution = None + try: + ds = self.to_dataset() + finally: + self._solution = original_solution + + # Create new FlowSystem from dataset (without solution) + new_fs = FlowSystem.from_dataset(ds.copy(deep=True)) + return new_fs + + def __copy__(self): + """Support for copy.copy().""" + return self.copy() + + def __deepcopy__(self, memo): + """Support for copy.deepcopy().""" + return self.copy() + def get_structure(self, clean: bool = False, stats: bool = False) -> dict: """ Get FlowSystem structure. @@ -827,7 +871,31 @@ def fit_effects_to_model_coords( } def connect_and_transform(self): - """Transform data for all elements using the new simplified approach.""" + """Connect the network and transform all element data to model coordinates. + + This method performs the following steps: + + 1. Connects flows to buses (establishing the network topology) + 2. Registers any missing carriers from CONFIG defaults + 3. Assigns colors to elements without explicit colors + 4. Transforms all element data to xarray DataArrays aligned with + FlowSystem coordinates (time, period, scenario) + 5. Validates system integrity + + This is called automatically by :meth:`build_model` and :meth:`optimize`. + + Warning: + After this method runs, element attributes (e.g., ``flow.size``, + ``flow.relative_minimum``) contain transformed xarray DataArrays, + not the original input values. If you modify element attributes after + transformation, call :meth:`invalidate` to ensure the changes take + effect on the next optimization. + + Note: + This method is idempotent within a single model lifecycle - calling + it multiple times has no effect once ``connected_and_transformed`` + is True. Use :meth:`invalidate` to reset this flag. + """ if self.connected_and_transformed: logger.debug('FlowSystem already connected and transformed') return @@ -884,13 +952,23 @@ def add_elements(self, *elements: Element) -> None: Args: *elements: childs of Element like Boiler, HeatPump, Bus,... modeling Elements + + Raises: + RuntimeError: If the FlowSystem is locked (has a solution). + Call `reset()` to unlock it first. """ - if self.connected_and_transformed: + if self.is_locked: + raise RuntimeError( + 'Cannot add elements to a FlowSystem that has a solution. ' + 'Call `reset()` first to clear the solution and allow modifications.' + ) + + if self.model is not None: warnings.warn( - 'You are adding elements to an already connected FlowSystem. This is not recommended (But it works).', + 'Adding elements to a FlowSystem with an existing model. The model will be invalidated.', stacklevel=2, ) - self._connected_and_transformed = False + self._invalidate_model() for new_element in list(elements): # Validate element type first @@ -924,6 +1002,10 @@ def add_carriers(self, *carriers: Carrier) -> None: Args: carrier: A Carrier object defining the carrier properties. + Raises: + RuntimeError: If the FlowSystem is locked (has a solution). + Call `reset()` to unlock it first. + Examples: ```python import flixopt as fx @@ -941,12 +1023,18 @@ def add_carriers(self, *carriers: Carrier) -> None: # The carrier color will be used in plots automatically ``` """ - if self.connected_and_transformed: + if self.is_locked: + raise RuntimeError( + 'Cannot add carriers to a FlowSystem that has a solution. ' + 'Call `reset()` first to clear the solution and allow modifications.' + ) + + if self.model is not None: warnings.warn( - 'You are adding a carrier to an already connected FlowSystem. This is not recommended (But it works).', + 'Adding carriers to a FlowSystem with an existing model. The model will be invalidated.', stacklevel=2, ) - self._connected_and_transformed = False + self._invalidate_model() for carrier in list(carriers): if not isinstance(carrier, Carrier): @@ -1120,6 +1208,103 @@ def solution(self, value: xr.Dataset | None) -> None: self._solution = value self._statistics = None # Invalidate cached statistics + @property + def is_locked(self) -> bool: + """Check if the FlowSystem is locked (has a solution). + + A locked FlowSystem cannot be modified. Use `reset()` to unlock it. + """ + return self._solution is not None + + def _invalidate_model(self) -> None: + """Invalidate the model and element submodels when structure changes. + + This clears the model, resets the ``connected_and_transformed`` flag, + and clears all element submodels and variable/constraint names. + + Called internally by :meth:`add_elements`, :meth:`add_carriers`, + :meth:`reset`, and :meth:`invalidate`. + + See Also: + :meth:`invalidate`: Public method for manual invalidation. + :meth:`reset`: Clears solution and invalidates (for locked FlowSystems). + """ + self.model = None + self._connected_and_transformed = False + for element in self.values(): + element.submodel = None + element._variable_names = [] + element._constraint_names = [] + + def reset(self) -> FlowSystem: + """Clear optimization state to allow modifications. + + This method unlocks the FlowSystem by clearing: + - The solution dataset + - The optimization model + - All element submodels and variable/constraint names + - The connected_and_transformed flag + + After calling reset(), the FlowSystem can be modified again + (e.g., adding elements or carriers). + + Returns: + Self, for method chaining. + + Examples: + >>> flow_system.optimize(solver) # FlowSystem is now locked + >>> flow_system.add_elements(new_bus) # Raises RuntimeError + >>> flow_system.reset() # Unlock the FlowSystem + >>> flow_system.add_elements(new_bus) # Now works + """ + self.solution = None # Also clears _statistics via setter + self._invalidate_model() + return self + + def invalidate(self) -> FlowSystem: + """Invalidate the model to allow re-transformation after modifying elements. + + Call this after modifying existing element attributes (e.g., ``flow.size``, + ``flow.relative_minimum``) to ensure changes take effect on the next + optimization. The next call to :meth:`optimize` or :meth:`build_model` + will re-run :meth:`connect_and_transform`. + + Note: + Adding new elements via :meth:`add_elements` automatically invalidates + the model. This method is only needed when modifying attributes of + elements that are already part of the FlowSystem. + + Returns: + Self, for method chaining. + + Raises: + RuntimeError: If the FlowSystem has a solution. Call :meth:`reset` + first to clear the solution. + + Examples: + Modify a flow's size and re-optimize: + + >>> flow_system.optimize(solver) + >>> flow_system.reset() # Clear solution first + >>> flow_system.components['Boiler'].inputs[0].size = 200 + >>> flow_system.invalidate() + >>> flow_system.optimize(solver) # Re-runs connect_and_transform + + Modify before first optimization: + + >>> flow_system.connect_and_transform() + >>> # Oops, need to change something + >>> flow_system.components['Boiler'].inputs[0].size = 200 + >>> flow_system.invalidate() + >>> flow_system.optimize(solver) # Changes take effect + """ + if self.is_locked: + raise RuntimeError( + 'Cannot invalidate a FlowSystem with a solution. Call `reset()` first to clear the solution.' + ) + self._invalidate_model() + return self + @property def optimize(self) -> OptimizeAccessor: """ @@ -1345,12 +1530,12 @@ def _validate_system_integrity(self) -> None: def _add_effects(self, *args: Effect) -> None: for effect in args: - effect._set_flow_system(self) # Link element to FlowSystem + effect.link_to_flow_system(self) # Link element to FlowSystem self.effects.add_effects(*args) def _add_components(self, *components: Component) -> None: for new_component in list(components): - new_component._set_flow_system(self) # Link element to FlowSystem + new_component.link_to_flow_system(self) # Link element to FlowSystem self.components.add(new_component) # Add to existing components # Invalidate cache once after all additions if components: @@ -1358,7 +1543,7 @@ def _add_components(self, *components: Component) -> None: def _add_buses(self, *buses: Bus): for new_bus in list(buses): - new_bus._set_flow_system(self) # Link element to FlowSystem + new_bus.link_to_flow_system(self) # Link element to FlowSystem self.buses.add(new_bus) # Add to existing buses # Invalidate cache once after all additions if buses: diff --git a/flixopt/interface.py b/flixopt/interface.py index 7995d5e78..7787eec18 100644 --- a/flixopt/interface.py +++ b/flixopt/interface.py @@ -74,10 +74,10 @@ def __init__(self, start: Numeric_TPS, end: Numeric_TPS): self.end = end self.has_time_dim = False - def transform_data(self, name_prefix: str = '') -> None: + def transform_data(self) -> None: dims = None if self.has_time_dim else ['period', 'scenario'] - self.start = self._fit_coords(f'{name_prefix}|start', self.start, dims=dims) - self.end = self._fit_coords(f'{name_prefix}|end', self.end, dims=dims) + self.start = self._fit_coords(f'{self.prefix}|start', self.start, dims=dims) + self.end = self._fit_coords(f'{self.prefix}|end', self.end, dims=dims) @register_class_for_io @@ -226,15 +226,15 @@ def __getitem__(self, index) -> Piece: def __iter__(self) -> Iterator[Piece]: return iter(self.pieces) # Enables iteration like for piece in piecewise: ... - def _set_flow_system(self, flow_system) -> None: + def link_to_flow_system(self, flow_system, prefix: str = '') -> None: """Propagate flow_system reference to nested Piece objects.""" - super()._set_flow_system(flow_system) - for piece in self.pieces: - piece._set_flow_system(flow_system) - - def transform_data(self, name_prefix: str = '') -> None: + super().link_to_flow_system(flow_system, prefix) for i, piece in enumerate(self.pieces): - piece.transform_data(f'{name_prefix}|Piece{i}') + piece.link_to_flow_system(flow_system, self._sub_prefix(f'Piece{i}')) + + def transform_data(self) -> None: + for piece in self.pieces: + piece.transform_data() @register_class_for_io @@ -458,15 +458,15 @@ def items(self): """ return self.piecewises.items() - def _set_flow_system(self, flow_system) -> None: + def link_to_flow_system(self, flow_system, prefix: str = '') -> None: """Propagate flow_system reference to nested Piecewise objects.""" - super()._set_flow_system(flow_system) - for piecewise in self.piecewises.values(): - piecewise._set_flow_system(flow_system) - - def transform_data(self, name_prefix: str = '') -> None: + super().link_to_flow_system(flow_system, prefix) for name, piecewise in self.piecewises.items(): - piecewise.transform_data(f'{name_prefix}|{name}') + piecewise.link_to_flow_system(flow_system, self._sub_prefix(name)) + + def transform_data(self) -> None: + for piecewise in self.piecewises.values(): + piecewise.transform_data() @register_class_for_io @@ -676,17 +676,17 @@ def has_time_dim(self, value): for piecewise in self.piecewise_shares.values(): piecewise.has_time_dim = value - def _set_flow_system(self, flow_system) -> None: + def link_to_flow_system(self, flow_system, prefix: str = '') -> None: """Propagate flow_system reference to nested Piecewise objects.""" - super()._set_flow_system(flow_system) - self.piecewise_origin._set_flow_system(flow_system) - for piecewise in self.piecewise_shares.values(): - piecewise._set_flow_system(flow_system) - - def transform_data(self, name_prefix: str = '') -> None: - self.piecewise_origin.transform_data(f'{name_prefix}|PiecewiseEffects|origin') + super().link_to_flow_system(flow_system, prefix) + self.piecewise_origin.link_to_flow_system(flow_system, self._sub_prefix('origin')) for effect, piecewise in self.piecewise_shares.items(): - piecewise.transform_data(f'{name_prefix}|PiecewiseEffects|{effect}') + piecewise.link_to_flow_system(flow_system, self._sub_prefix(effect)) + + def transform_data(self) -> None: + self.piecewise_origin.transform_data() + for piecewise in self.piecewise_shares.values(): + piecewise.transform_data() @register_class_for_io @@ -904,27 +904,27 @@ def __init__( self.maximum_size = maximum_size if maximum_size is not None else CONFIG.Modeling.big # default maximum self.linked_periods = linked_periods - def _set_flow_system(self, flow_system) -> None: + def link_to_flow_system(self, flow_system, prefix: str = '') -> None: """Propagate flow_system reference to nested PiecewiseEffects object if present.""" - super()._set_flow_system(flow_system) + super().link_to_flow_system(flow_system, prefix) if self.piecewise_effects_of_investment is not None: - self.piecewise_effects_of_investment._set_flow_system(flow_system) + self.piecewise_effects_of_investment.link_to_flow_system(flow_system, self._sub_prefix('PiecewiseEffects')) - def transform_data(self, name_prefix: str = '') -> None: + def transform_data(self) -> None: self.effects_of_investment = self._fit_effect_coords( - prefix=name_prefix, + prefix=self.prefix, effect_values=self.effects_of_investment, suffix='effects_of_investment', dims=['period', 'scenario'], ) self.effects_of_retirement = self._fit_effect_coords( - prefix=name_prefix, + prefix=self.prefix, effect_values=self.effects_of_retirement, suffix='effects_of_retirement', dims=['period', 'scenario'], ) self.effects_of_investment_per_size = self._fit_effect_coords( - prefix=name_prefix, + prefix=self.prefix, effect_values=self.effects_of_investment_per_size, suffix='effects_of_investment_per_size', dims=['period', 'scenario'], @@ -932,13 +932,13 @@ def transform_data(self, name_prefix: str = '') -> None: if self.piecewise_effects_of_investment is not None: self.piecewise_effects_of_investment.has_time_dim = False - self.piecewise_effects_of_investment.transform_data(f'{name_prefix}|PiecewiseEffects') + self.piecewise_effects_of_investment.transform_data() self.minimum_size = self._fit_coords( - f'{name_prefix}|minimum_size', self.minimum_size, dims=['period', 'scenario'] + f'{self.prefix}|minimum_size', self.minimum_size, dims=['period', 'scenario'] ) self.maximum_size = self._fit_coords( - f'{name_prefix}|maximum_size', self.maximum_size, dims=['period', 'scenario'] + f'{self.prefix}|maximum_size', self.maximum_size, dims=['period', 'scenario'] ) # Convert tuple (first_period, last_period) to DataArray if needed if isinstance(self.linked_periods, (tuple, list)): @@ -965,9 +965,9 @@ def transform_data(self, name_prefix: str = '') -> None: logger.debug(f'Computed {self.linked_periods=}') self.linked_periods = self._fit_coords( - f'{name_prefix}|linked_periods', self.linked_periods, dims=['period', 'scenario'] + f'{self.prefix}|linked_periods', self.linked_periods, dims=['period', 'scenario'] ) - self.fixed_size = self._fit_coords(f'{name_prefix}|fixed_size', self.fixed_size, dims=['period', 'scenario']) + self.fixed_size = self._fit_coords(f'{self.prefix}|fixed_size', self.fixed_size, dims=['period', 'scenario']) @property def minimum_or_fixed_size(self) -> Numeric_PS: @@ -1215,29 +1215,29 @@ def __init__( self.startup_limit = startup_limit self.force_startup_tracking: bool = force_startup_tracking - def transform_data(self, name_prefix: str = '') -> None: + def transform_data(self) -> None: self.effects_per_startup = self._fit_effect_coords( - prefix=name_prefix, + prefix=self.prefix, effect_values=self.effects_per_startup, suffix='per_startup', ) self.effects_per_active_hour = self._fit_effect_coords( - prefix=name_prefix, + prefix=self.prefix, effect_values=self.effects_per_active_hour, suffix='per_active_hour', ) - self.min_uptime = self._fit_coords(f'{name_prefix}|min_uptime', self.min_uptime) - self.max_uptime = self._fit_coords(f'{name_prefix}|max_uptime', self.max_uptime) - self.min_downtime = self._fit_coords(f'{name_prefix}|min_downtime', self.min_downtime) - self.max_downtime = self._fit_coords(f'{name_prefix}|max_downtime', self.max_downtime) + self.min_uptime = self._fit_coords(f'{self.prefix}|min_uptime', self.min_uptime) + self.max_uptime = self._fit_coords(f'{self.prefix}|max_uptime', self.max_uptime) + self.min_downtime = self._fit_coords(f'{self.prefix}|min_downtime', self.min_downtime) + self.max_downtime = self._fit_coords(f'{self.prefix}|max_downtime', self.max_downtime) self.active_hours_max = self._fit_coords( - f'{name_prefix}|active_hours_max', self.active_hours_max, dims=['period', 'scenario'] + f'{self.prefix}|active_hours_max', self.active_hours_max, dims=['period', 'scenario'] ) self.active_hours_min = self._fit_coords( - f'{name_prefix}|active_hours_min', self.active_hours_min, dims=['period', 'scenario'] + f'{self.prefix}|active_hours_min', self.active_hours_min, dims=['period', 'scenario'] ) self.startup_limit = self._fit_coords( - f'{name_prefix}|startup_limit', self.startup_limit, dims=['period', 'scenario'] + f'{self.prefix}|startup_limit', self.startup_limit, dims=['period', 'scenario'] ) @property diff --git a/flixopt/structure.py b/flixopt/structure.py index d00066683..da4eee0f6 100644 --- a/flixopt/structure.py +++ b/flixopt/structure.py @@ -316,14 +316,13 @@ class Interface: - Recursive handling of complex nested structures Subclasses must implement: - transform_data(name_prefix=''): Transform data to match FlowSystem dimensions + transform_data(): Transform data to match FlowSystem dimensions """ - def transform_data(self, name_prefix: str = '') -> None: + def transform_data(self) -> None: """Transform the data of the interface to match the FlowSystem's dimensions. - Args: - name_prefix: The prefix to use for the names of the variables. Defaults to '', which results in no prefix. + Uses `self._prefix` (set during `link_to_flow_system()`) to name transformed data. Raises: NotImplementedError: Must be implemented by subclasses @@ -335,20 +334,53 @@ def transform_data(self, name_prefix: str = '') -> None: """ raise NotImplementedError('Every Interface subclass needs a transform_data() method') - def _set_flow_system(self, flow_system: FlowSystem) -> None: - """Store flow_system reference and propagate to nested Interface objects. + @property + def prefix(self) -> str: + """The prefix used for naming transformed data (e.g., 'Boiler(Q_th)|status_parameters').""" + return getattr(self, '_prefix', '') + + def _sub_prefix(self, name: str) -> str: + """Build a prefix for a nested interface by appending name to current prefix.""" + return f'{self._prefix}|{name}' if self._prefix else name + + def link_to_flow_system(self, flow_system: FlowSystem, prefix: str = '') -> None: + """Link this interface and all nested interfaces to a FlowSystem. This method is called automatically during element registration to enable elements to access FlowSystem properties without passing the reference - through every method call. + through every method call. It also sets the prefix used for naming + transformed data. Subclasses with nested Interface objects should override this method - to explicitly propagate the reference to their nested interfaces. + to propagate the link to their nested interfaces by calling + `super().link_to_flow_system(flow_system, prefix)` first, then linking + nested objects with appropriate prefixes. Args: - flow_system: The FlowSystem that this interface belongs to + flow_system: The FlowSystem to link to + prefix: The prefix for naming transformed data (e.g., 'Boiler(Q_th)') + + Examples: + Override in a subclass with nested interfaces: + + ```python + def link_to_flow_system(self, flow_system, prefix: str = '') -> None: + super().link_to_flow_system(flow_system, prefix) + if self.nested_interface is not None: + self.nested_interface.link_to_flow_system(flow_system, f'{prefix}|nested' if prefix else 'nested') + ``` + + Creating an Interface dynamically during modeling: + + ```python + # In a Model class + if flow.status_parameters is None: + flow.status_parameters = StatusParameters() + flow.status_parameters.link_to_flow_system(self._model.flow_system, f'{flow.label_full}') + ``` """ self._flow_system = flow_system + self._prefix = prefix @property def flow_system(self) -> FlowSystem: diff --git a/tests/test_flow_system_locking.py b/tests/test_flow_system_locking.py new file mode 100644 index 000000000..68d3ec010 --- /dev/null +++ b/tests/test_flow_system_locking.py @@ -0,0 +1,403 @@ +""" +Tests for FlowSystem locking behavior (read-only after optimization). + +A FlowSystem becomes locked (read-only) when it has a solution. +This prevents accidental modifications to a system that has already been optimized. +""" + +import copy +import warnings + +import pytest + +import flixopt as fx + +# Note: We use simple_flow_system fixture from conftest.py + + +class TestIsLocked: + """Test the is_locked property.""" + + def test_not_locked_initially(self, simple_flow_system): + """A new FlowSystem should not be locked.""" + assert simple_flow_system.is_locked is False + + def test_not_locked_after_build_model(self, simple_flow_system): + """FlowSystem should not be locked after build_model (no solution yet).""" + simple_flow_system.build_model() + assert simple_flow_system.is_locked is False + + def test_locked_after_optimization(self, simple_flow_system, highs_solver): + """FlowSystem should be locked after optimization.""" + simple_flow_system.optimize(highs_solver) + assert simple_flow_system.is_locked is True + + def test_not_locked_after_reset(self, simple_flow_system, highs_solver): + """FlowSystem should not be locked after reset.""" + simple_flow_system.optimize(highs_solver) + assert simple_flow_system.is_locked is True + + simple_flow_system.reset() + assert simple_flow_system.is_locked is False + + +class TestAddElementsLocking: + """Test that add_elements respects locking.""" + + def test_add_elements_before_optimization(self, simple_flow_system): + """Should be able to add elements before optimization.""" + new_bus = fx.Bus('NewBus') + simple_flow_system.add_elements(new_bus) + assert 'NewBus' in simple_flow_system.buses + + def test_add_elements_raises_when_locked(self, simple_flow_system, highs_solver): + """Should raise RuntimeError when adding elements to a locked FlowSystem.""" + simple_flow_system.optimize(highs_solver) + + new_bus = fx.Bus('NewBus') + with pytest.raises(RuntimeError, match='Cannot add elements.*reset\\(\\)'): + simple_flow_system.add_elements(new_bus) + + def test_add_elements_after_reset(self, simple_flow_system, highs_solver): + """Should be able to add elements after reset.""" + simple_flow_system.optimize(highs_solver) + simple_flow_system.reset() + + new_bus = fx.Bus('NewBus') + simple_flow_system.add_elements(new_bus) + assert 'NewBus' in simple_flow_system.buses + + def test_add_elements_invalidates_model(self, simple_flow_system): + """Adding elements to a FlowSystem with a model should invalidate the model.""" + simple_flow_system.build_model() + assert simple_flow_system.model is not None + + new_bus = fx.Bus('NewBus') + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter('always') + simple_flow_system.add_elements(new_bus) + assert len(w) == 1 + assert 'model will be invalidated' in str(w[0].message) + + assert simple_flow_system.model is None + + +class TestAddCarriersLocking: + """Test that add_carriers respects locking.""" + + def test_add_carriers_before_optimization(self, simple_flow_system): + """Should be able to add carriers before optimization.""" + carrier = fx.Carrier('biogas', '#00FF00', 'kW') + simple_flow_system.add_carriers(carrier) + assert 'biogas' in simple_flow_system.carriers + + def test_add_carriers_raises_when_locked(self, simple_flow_system, highs_solver): + """Should raise RuntimeError when adding carriers to a locked FlowSystem.""" + simple_flow_system.optimize(highs_solver) + + carrier = fx.Carrier('biogas', '#00FF00', 'kW') + with pytest.raises(RuntimeError, match='Cannot add carriers.*reset\\(\\)'): + simple_flow_system.add_carriers(carrier) + + def test_add_carriers_after_reset(self, simple_flow_system, highs_solver): + """Should be able to add carriers after reset.""" + simple_flow_system.optimize(highs_solver) + simple_flow_system.reset() + + carrier = fx.Carrier('biogas', '#00FF00', 'kW') + simple_flow_system.add_carriers(carrier) + assert 'biogas' in simple_flow_system.carriers + + def test_add_carriers_invalidates_model(self, simple_flow_system): + """Adding carriers to a FlowSystem with a model should invalidate the model.""" + simple_flow_system.build_model() + assert simple_flow_system.model is not None + + carrier = fx.Carrier('biogas', '#00FF00', 'kW') + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter('always') + simple_flow_system.add_carriers(carrier) + assert len(w) == 1 + assert 'model will be invalidated' in str(w[0].message) + + assert simple_flow_system.model is None + + +class TestReset: + """Test the reset method.""" + + def test_reset_clears_solution(self, simple_flow_system, highs_solver): + """Reset should clear the solution.""" + simple_flow_system.optimize(highs_solver) + assert simple_flow_system.solution is not None + + simple_flow_system.reset() + assert simple_flow_system.solution is None + + def test_reset_clears_model(self, simple_flow_system, highs_solver): + """Reset should clear the model.""" + simple_flow_system.optimize(highs_solver) + assert simple_flow_system.model is not None + + simple_flow_system.reset() + assert simple_flow_system.model is None + + def test_reset_clears_element_submodels(self, simple_flow_system, highs_solver): + """Reset should clear element submodels.""" + simple_flow_system.optimize(highs_solver) + + # Check that elements have submodels after optimization + boiler = simple_flow_system.components['Boiler'] + assert boiler.submodel is not None + assert len(boiler._variable_names) > 0 + + simple_flow_system.reset() + + # Check that submodels are cleared + assert boiler.submodel is None + assert len(boiler._variable_names) == 0 + + def test_reset_returns_self(self, simple_flow_system, highs_solver): + """Reset should return self for method chaining.""" + simple_flow_system.optimize(highs_solver) + result = simple_flow_system.reset() + assert result is simple_flow_system + + def test_reset_allows_reoptimization(self, simple_flow_system, highs_solver): + """After reset, FlowSystem can be optimized again.""" + simple_flow_system.optimize(highs_solver) + original_cost = simple_flow_system.solution['costs'].item() + + simple_flow_system.reset() + simple_flow_system.optimize(highs_solver) + + assert simple_flow_system.solution is not None + # Cost should be the same since system structure didn't change + assert simple_flow_system.solution['costs'].item() == pytest.approx(original_cost) + + +class TestCopy: + """Test the copy method.""" + + def test_copy_creates_new_instance(self, simple_flow_system): + """Copy should create a new FlowSystem instance.""" + copy_fs = simple_flow_system.copy() + assert copy_fs is not simple_flow_system + + def test_copy_preserves_elements(self, simple_flow_system): + """Copy should preserve all elements.""" + copy_fs = simple_flow_system.copy() + + assert set(copy_fs.components.keys()) == set(simple_flow_system.components.keys()) + assert set(copy_fs.buses.keys()) == set(simple_flow_system.buses.keys()) + + def test_copy_does_not_copy_solution(self, simple_flow_system, highs_solver): + """Copy should not include the solution.""" + simple_flow_system.optimize(highs_solver) + assert simple_flow_system.solution is not None + + copy_fs = simple_flow_system.copy() + assert copy_fs.solution is None + + def test_copy_does_not_copy_model(self, simple_flow_system, highs_solver): + """Copy should not include the model.""" + simple_flow_system.optimize(highs_solver) + assert simple_flow_system.model is not None + + copy_fs = simple_flow_system.copy() + assert copy_fs.model is None + + def test_copy_is_not_locked(self, simple_flow_system, highs_solver): + """Copy should not be locked even if original is.""" + simple_flow_system.optimize(highs_solver) + assert simple_flow_system.is_locked is True + + copy_fs = simple_flow_system.copy() + assert copy_fs.is_locked is False + + def test_copy_can_be_modified(self, simple_flow_system, highs_solver): + """Copy should be modifiable even if original is locked.""" + simple_flow_system.optimize(highs_solver) + + copy_fs = simple_flow_system.copy() + new_bus = fx.Bus('NewBus') + copy_fs.add_elements(new_bus) # Should not raise + assert 'NewBus' in copy_fs.buses + + def test_copy_can_be_optimized_independently(self, simple_flow_system, highs_solver): + """Copy can be optimized independently of original.""" + simple_flow_system.optimize(highs_solver) + original_cost = simple_flow_system.solution['costs'].item() + + copy_fs = simple_flow_system.copy() + copy_fs.optimize(highs_solver) + + # Both should have solutions + assert simple_flow_system.solution is not None + assert copy_fs.solution is not None + + # Costs should be equal (same system) + assert copy_fs.solution['costs'].item() == pytest.approx(original_cost) + + def test_python_copy_uses_copy_method(self, simple_flow_system, highs_solver): + """copy.copy() should use the custom copy method.""" + simple_flow_system.optimize(highs_solver) + + copy_fs = copy.copy(simple_flow_system) + assert copy_fs.solution is None + assert copy_fs.is_locked is False + + def test_python_deepcopy_uses_copy_method(self, simple_flow_system, highs_solver): + """copy.deepcopy() should use the custom copy method.""" + simple_flow_system.optimize(highs_solver) + + copy_fs = copy.deepcopy(simple_flow_system) + assert copy_fs.solution is None + assert copy_fs.is_locked is False + + +class TestLoadedFlowSystem: + """Test that loaded FlowSystems respect locking.""" + + def test_loaded_fs_with_solution_is_locked(self, simple_flow_system, highs_solver, tmp_path): + """A FlowSystem loaded from file with solution should be locked.""" + simple_flow_system.optimize(highs_solver) + filepath = tmp_path / 'test_fs.nc' + simple_flow_system.to_netcdf(filepath) + + loaded_fs = fx.FlowSystem.from_netcdf(filepath) + assert loaded_fs.is_locked is True + + def test_loaded_fs_can_be_reset(self, simple_flow_system, highs_solver, tmp_path): + """A loaded FlowSystem can be reset to allow modifications.""" + simple_flow_system.optimize(highs_solver) + filepath = tmp_path / 'test_fs.nc' + simple_flow_system.to_netcdf(filepath) + + loaded_fs = fx.FlowSystem.from_netcdf(filepath) + loaded_fs.reset() + + assert loaded_fs.is_locked is False + new_bus = fx.Bus('NewBus') + loaded_fs.add_elements(new_bus) # Should not raise + + +class TestInvalidate: + """Test the invalidate method for manual model invalidation.""" + + def test_invalidate_resets_connected_and_transformed(self, simple_flow_system): + """Invalidate should reset the connected_and_transformed flag.""" + simple_flow_system.connect_and_transform() + assert simple_flow_system.connected_and_transformed is True + + simple_flow_system.invalidate() + assert simple_flow_system.connected_and_transformed is False + + def test_invalidate_clears_model(self, simple_flow_system): + """Invalidate should clear the model.""" + simple_flow_system.build_model() + assert simple_flow_system.model is not None + + simple_flow_system.invalidate() + assert simple_flow_system.model is None + + def test_invalidate_raises_when_locked(self, simple_flow_system, highs_solver): + """Invalidate should raise RuntimeError when FlowSystem has a solution.""" + simple_flow_system.optimize(highs_solver) + + with pytest.raises(RuntimeError, match='Cannot invalidate.*reset\\(\\)'): + simple_flow_system.invalidate() + + def test_invalidate_returns_self(self, simple_flow_system): + """Invalidate should return self for method chaining.""" + simple_flow_system.connect_and_transform() + result = simple_flow_system.invalidate() + assert result is simple_flow_system + + def test_invalidate_allows_retransformation(self, simple_flow_system, highs_solver): + """After invalidate, connect_and_transform should run again.""" + simple_flow_system.connect_and_transform() + assert simple_flow_system.connected_and_transformed is True + + simple_flow_system.invalidate() + assert simple_flow_system.connected_and_transformed is False + + # Should be able to connect_and_transform again + simple_flow_system.connect_and_transform() + assert simple_flow_system.connected_and_transformed is True + + def test_modify_element_and_invalidate(self, simple_flow_system, highs_solver): + """Test the workflow: optimize -> reset -> modify -> invalidate -> re-optimize.""" + # First optimization + simple_flow_system.optimize(highs_solver) + original_cost = simple_flow_system.solution['costs'].item() + + # Reset to unlock + simple_flow_system.reset() + + # Modify an element attribute (increase gas price, which should increase costs) + gas_tariff = simple_flow_system.components['Gastarif'] + original_effects = gas_tariff.outputs[0].effects_per_flow_hour + # Double the cost effect + gas_tariff.outputs[0].effects_per_flow_hour = {effect: value * 2 for effect, value in original_effects.items()} + + # Invalidate to trigger re-transformation + simple_flow_system.invalidate() + + # Re-optimize + simple_flow_system.optimize(highs_solver) + new_cost = simple_flow_system.solution['costs'].item() + + # Cost should have increased due to higher gas price + assert new_cost > original_cost + + def test_invalidate_needed_after_transform_before_optimize(self, simple_flow_system, highs_solver): + """Invalidate is needed to apply changes made after connect_and_transform but before optimize.""" + # Connect and transform (but don't optimize yet) + simple_flow_system.connect_and_transform() + + # Modify an attribute - double the gas costs + gas_tariff = simple_flow_system.components['Gastarif'] + original_effects = gas_tariff.outputs[0].effects_per_flow_hour + gas_tariff.outputs[0].effects_per_flow_hour = {effect: value * 2 for effect, value in original_effects.items()} + + # Call invalidate to ensure re-transformation + simple_flow_system.invalidate() + assert simple_flow_system.connected_and_transformed is False + + # Now optimize - the doubled values should take effect + simple_flow_system.optimize(highs_solver) + cost_with_doubled = simple_flow_system.solution['costs'].item() + + # Reset and use original values + simple_flow_system.reset() + gas_tariff.outputs[0].effects_per_flow_hour = { + effect: value / 2 for effect, value in gas_tariff.outputs[0].effects_per_flow_hour.items() + } + simple_flow_system.optimize(highs_solver) + cost_with_original = simple_flow_system.solution['costs'].item() + + # The doubled costs should result in higher total cost + assert cost_with_doubled > cost_with_original + + def test_reset_already_invalidates(self, simple_flow_system, highs_solver): + """Reset already invalidates, so modifications after reset take effect.""" + # First optimization + simple_flow_system.optimize(highs_solver) + original_cost = simple_flow_system.solution['costs'].item() + + # Reset - this already calls _invalidate_model() + simple_flow_system.reset() + assert simple_flow_system.connected_and_transformed is False + + # Modify an element attribute + gas_tariff = simple_flow_system.components['Gastarif'] + original_effects = gas_tariff.outputs[0].effects_per_flow_hour + gas_tariff.outputs[0].effects_per_flow_hour = {effect: value * 2 for effect, value in original_effects.items()} + + # Re-optimize - changes take effect because reset already invalidated + simple_flow_system.optimize(highs_solver) + new_cost = simple_flow_system.solution['costs'].item() + + # Cost should have increased + assert new_cost > original_cost