diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 53ce7d0d..2a3af4a1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -108,6 +108,7 @@ jobs: id-token: write steps: - uses: actions/checkout@v3 + with: {fetch-depth: 0} # deep clone for setuptools-scm - uses: actions/setup-python@v4 with: {python-version: "3.11"} - name: Retrieve pre-built distribution files diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7d409690..89455e67 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -41,7 +41,7 @@ repos: - id: isort - repo: https://github.com/psf/black - rev: 23.3.0 + rev: 24.8.0 hooks: - id: black language_version: python3 diff --git a/setup.cfg b/setup.cfg index c586ddba..0acf6227 100644 --- a/setup.cfg +++ b/setup.cfg @@ -49,10 +49,10 @@ python_requires >= 3.9 # For more information, check out https://semver.org/. install_requires = importlib-metadata; python_version<"3.8" - aiohttp - pandas - pydantic>=1.10.8,<2.0 - s2-python==0.2.0.dev2 + aiohttp<=3.9.1 + pandas>=2.1.4 + pydantic>=1.10.8 + s2-python>=0.3.1 # minimum version adding PPBC classes async_timeout [options.packages.find] diff --git a/setup.py b/setup.py index 8a3f4305..3696d25f 100644 --- a/setup.py +++ b/setup.py @@ -6,6 +6,7 @@ PyScaffold helps you to put up the scaffold of your new Python project. Learn more under: https://pyscaffold.org/ """ + from setuptools import setup if __name__ == "__main__": diff --git a/src/flexmeasures_client/client.py b/src/flexmeasures_client/client.py index d62c2f9d..667521c6 100644 --- a/src/flexmeasures_client/client.py +++ b/src/flexmeasures_client/client.py @@ -7,7 +7,7 @@ import socket from dataclasses import dataclass from datetime import datetime, timedelta -from typing import Any +from typing import Any, cast import async_timeout import pandas as pd @@ -59,6 +59,9 @@ class FlexMeasuresClient: session: ClientSession | None = None def __post_init__(self): + if self.session is None: + self.session = ClientSession() + if not re.match(r".+\@.+\..+", self.email): raise EmailValidationError( f"{self.email} is not an email address format string" @@ -102,7 +105,7 @@ def determine_port(self): async def close(self): """Function to close FlexMeasuresClient session when all requests are done""" - await self.session.close() + await cast(ClientSession, self.session).close() async def request( self, @@ -198,7 +201,7 @@ async def request_once( """Sends a single request to FlexMeasures and checks the response""" self.ensure_session() - response = await self.session.request( # type: ignore + response = await cast(ClientSession, self.session).request( method=method, url=url, params=params, diff --git a/src/flexmeasures_client/s2/__init__.py b/src/flexmeasures_client/s2/__init__.py index 42f68135..66117f5a 100644 --- a/src/flexmeasures_client/s2/__init__.py +++ b/src/flexmeasures_client/s2/__init__.py @@ -40,15 +40,15 @@ def wrap(*args, **kwargs): # TODO: implement function __hash__ in ID that returns # the value of __root__, this way we would be able to use # the ID as key directly - self.incoming_messages[ - get_message_id(incoming_message) - ] = incoming_message + self.incoming_messages[get_message_id(incoming_message)] = ( + incoming_message + ) outgoing_message = func(self, incoming_message) - self.outgoing_messages[ - get_message_id(outgoing_message) - ] = outgoing_message + self.outgoing_messages[get_message_id(outgoing_message)] = ( + outgoing_message + ) return outgoing_message @@ -80,6 +80,8 @@ class Handler: outgoing_messages_status: SizeLimitOrderedDict + background_tasks: set + def __init__(self, max_size: int = 100) -> None: """ Handler diff --git a/src/flexmeasures_client/s2/cem.py b/src/flexmeasures_client/s2/cem.py index c64bb828..8d40280a 100644 --- a/src/flexmeasures_client/s2/cem.py +++ b/src/flexmeasures_client/s2/cem.py @@ -63,9 +63,13 @@ def __init__( def supports_control_type(self, control_type: ControlType): return control_type in self._resource_manager_details.available_control_types - def close(self): + async def close(self): self._is_closed = True + for control_type, handler in self._control_types_handlers.items(): + print(control_type, handler) + await handler.close() + def is_closed(self): return self._is_closed @@ -92,9 +96,9 @@ def register_control_type(self, control_type_handler: ControlTypeHandler): control_type_handler._sending_queue = self._sending_queue # store control_type_handler - self._control_types_handlers[ - control_type_handler._control_type - ] = control_type_handler + self._control_types_handlers[control_type_handler._control_type] = ( + control_type_handler + ) async def handle_message(self, message: Dict | pydantic.BaseModel | str): """ @@ -273,3 +277,6 @@ def handle_revoke_object(self, message: RevokeObject): ) return get_reception_status(message, ReceptionStatusValues.OK) + + async def send_message(self, message): + await self._sending_queue.put(message) diff --git a/src/flexmeasures_client/s2/control_types/FRBC/__init__.py b/src/flexmeasures_client/s2/control_types/FRBC/__init__.py index e61e61a9..ad2b8c39 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/__init__.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/__init__.py @@ -32,6 +32,7 @@ class FRBC(ControlTypeHandler): _timer_status_history: SizeLimitOrderedDict[str, FRBCTimerStatus] _actuator_status_history: SizeLimitOrderedDict[str, FRBCActuatorStatus] _storage_status_history: SizeLimitOrderedDict[str, FRBCStorageStatus] + background_tasks: set def __init__(self, max_size: int = 100) -> None: super().__init__(max_size) @@ -51,6 +52,7 @@ def __init__(self, max_size: int = 100) -> None: self._system_description_history = SizeLimitOrderedDict(max_size=max_size) self._leakage_behaviour_history = SizeLimitOrderedDict(max_size=max_size) self._usage_forecast_history = SizeLimitOrderedDict(max_size=max_size) + self.background_tasks = set() @register(FRBCSystemDescription) def handle_system_description( @@ -62,15 +64,25 @@ def handle_system_description( self._system_description_history[system_description_id] = message # schedule trigger_schedule to run soon concurrently - asyncio.create_task(self.trigger_schedule(system_description_id)) - + task = asyncio.create_task(self.trigger_schedule(system_description_id)) + self.background_tasks.add( + task + ) # important to avoid a task disappearing mid-execution. + task.add_done_callback(self.background_tasks.discard) return get_reception_status(message, status=ReceptionStatusValues.OK) - async def send_storage_status(self, status: FRBCStorageStatus): - raise NotImplementedError() + @register(FRBCUsageForecast) + def handle_usage_forecast(self, message: FRBCUsageForecast) -> pydantic.BaseModel: + message_id = str(message.message_id) - async def send_actuator_status(self, status: FRBCActuatorStatus): - raise NotImplementedError() + self._usage_forecast_history[message_id] = message + + task = asyncio.create_task(self.send_usage_forecast(message)) + self.background_tasks.add( + task + ) # important to avoid a task disappearing mid-execution. + task.add_done_callback(self.background_tasks.discard) + return get_reception_status(message, status=ReceptionStatusValues.OK) @register(FRBCStorageStatus) def handle_storage_status(self, message: FRBCStorageStatus) -> pydantic.BaseModel: @@ -78,8 +90,11 @@ def handle_storage_status(self, message: FRBCStorageStatus) -> pydantic.BaseMode self._storage_status_history[message_id] = message - asyncio.create_task(self.send_storage_status(message)) - + task = asyncio.create_task(self.send_storage_status(message)) + self.background_tasks.add( + task + ) # important to avoid a task disappearing mid-execution. + task.add_done_callback(self.background_tasks.discard) return get_reception_status(message, status=ReceptionStatusValues.OK) @register(FRBCActuatorStatus) @@ -88,29 +103,64 @@ def handle_actuator_status(self, message: FRBCActuatorStatus) -> pydantic.BaseMo self._actuator_status_history[message_id] = message - asyncio.create_task(self.send_actuator_status(message)) - + task = asyncio.create_task(self.send_actuator_status(message)) + self.background_tasks.add( + task + ) # important to avoid a task disappearing mid-execution. + task.add_done_callback(self.background_tasks.discard) return get_reception_status(message, status=ReceptionStatusValues.OK) @register(FRBCLeakageBehaviour) def handle_leakage_behaviour( self, message: FRBCLeakageBehaviour ) -> pydantic.BaseModel: - # return get_reception_status(message, status=ReceptionStatusValues.OK) - raise NotImplementedError() + message_id = str(message.message_id) - @register(FRBCUsageForecast) - def handle_usage_forecast(self, message: FRBCUsageForecast) -> pydantic.BaseModel: - # return get_reception_status(message, status=ReceptionStatusValues.OK) - raise NotImplementedError() + self._leakage_behaviour_history[message_id] = message - async def trigger_schedule(self, system_description_id: str): - raise NotImplementedError() + task = asyncio.create_task(self.send_leakage_behaviour(message)) + self.background_tasks.add( + task + ) # important to avoid a task disappearing mid-execution. + task.add_done_callback(self.background_tasks.discard) + return get_reception_status(message, status=ReceptionStatusValues.OK) + + @register(FRBCFillLevelTargetProfile) + def handle_fill_level_target_profile( + self, message: FRBCFillLevelTargetProfile + ) -> pydantic.BaseModel: + message_id = str(message.message_id) + + self._fill_level_target_profile_history[message_id] = message + + task = asyncio.create_task(self.send_fill_level_target_profile(message)) + self.background_tasks.add( + task + ) # important to avoid a task disappearing mid-execution. + task.add_done_callback(self.background_tasks.discard) + return get_reception_status(message, status=ReceptionStatusValues.OK) @register(FRBCTimerStatus) def handle_frbc_timer_status(self, message: FRBCTimerStatus) -> pydantic.BaseModel: return get_reception_status(message, status=ReceptionStatusValues.OK) + async def send_storage_status(self, status: FRBCStorageStatus): + raise NotImplementedError() + + async def send_actuator_status(self, status: FRBCActuatorStatus): + raise NotImplementedError() + + async def send_leakage_behaviour(self, leakage_behaviour: FRBCLeakageBehaviour): + raise NotImplementedError() + + async def send_usage_forecast(self, usage_forecast: FRBCUsageForecast): + raise NotImplementedError() + + async def send_fill_level_target_profile( + self, fill_level_target_profile: FRBCFillLevelTargetProfile + ): + raise NotImplementedError() + class FRBCTest(FRBC): """Dummy class to simulate the triggering of a schedule.""" diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py index 85acb557..896c2816 100644 --- a/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_simple.py @@ -75,23 +75,6 @@ async def send_actuator_status(self, status: FRBCActuatorStatus): duration=timedelta(minutes=15), ) - # await self._fm_client.post_measurements( - # self._soc_sensor_id - # ) - - # system_description = self.find_system_description_from_actuator() - - # if system_description is None: - # return - - # #for a - # if system_description is not None: - - # self._system_description_history[] - # status.active_operation_mode_id - # status.actuator_id - # status.operation_mode_factor - async def trigger_schedule(self, system_description_id: str): """Translates S2 System Description into FM API calls""" @@ -108,24 +91,21 @@ async def trigger_schedule(self, system_description_id: str): return # call schedule - schedule_id = await self._fm_client.trigger_storage_schedule( + schedule = await self._fm_client.trigger_and_get_schedule( start=system_description.valid_from + self._valid_from_shift, # TODO: localize datetime sensor_id=self._power_sensor_id, - production_price_sensor=self._price_sensor_id, - consumption_price_sensor=self._price_sensor_id, - soc_unit="MWh", - soc_at_start=soc_at_start, # TODO: use forecast of the SOC instead + flex_context=dict( + production_price_sensor=self._price_sensor_id, + consumption_price_sensor=self._price_sensor_id, + ), + flex_model=dict( + soc_unit="MWh", + soc_at_start=soc_at_start, # TODO: use forecast of the SOC instead + ), duration=self._schedule_duration, # next 12 hours # TODO: add SOC MAX AND SOC MIN FROM fill_level_range, - # this needs chages on the client - ) - - # wait for the schedule to finish - schedule = await self._fm_client.get_schedule( - sensor_id=self._power_sensor_id, - schedule_id=schedule_id, - duration=self._schedule_duration, + # this needs changes on the client ) # translate FlexMeasures schedule into instructions. SOC -> Power -> PowerFactor diff --git a/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py b/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py new file mode 100644 index 00000000..e4b64ce3 --- /dev/null +++ b/src/flexmeasures_client/s2/control_types/FRBC/frbc_tunes.py @@ -0,0 +1,385 @@ +# flake8: noqa +""" +This control type is in a very EXPERIMENTAL stage. +Used it at your own risk :) +""" + +import asyncio +from datetime import datetime, timedelta + +import pandas as pd +import pydantic +import pytz +from s2python.common import NumberRange, ReceptionStatus, ReceptionStatusValues +from s2python.frbc import ( + FRBCActuatorStatus, + FRBCFillLevelTargetProfile, + FRBCInstruction, + FRBCStorageStatus, + FRBCSystemDescription, + FRBCUsageForecast, +) + +from flexmeasures_client.s2 import register +from flexmeasures_client.s2.control_types.FRBC import FRBC +from flexmeasures_client.s2.control_types.translations import ( + translate_fill_level_target_profile, + translate_usage_forecast_to_fm, +) +from flexmeasures_client.s2.utils import get_reception_status, get_unique_id + +RESOLUTION = "15min" +ENERGY_UNIT = "MWh" +POWER_UNIT = "MW" +DIMENSIONLESS = "dimensionless" +PERCENTAGE = "%" +TASK_PERIOD_SECONDS = 2 +CONVERSION_EFFICIENCY_DURATION = "PT24H" + + +class FillRateBasedControlTUNES(FRBC): + _fill_level_sensor_id: int | None + + _fill_rate_sensor_id: int | None + _thp_fill_rate_sensor_id: int | None + _thp_efficiency_sensor_id: int | None + _nes_fill_rate_sensor_id: int | None + _nes_efficiency_sensor_id: int | None + + _schedule_duration: timedelta + + _usage_forecast_sensor_id: int | None + _soc_minima_sensor_id: int | None + _soc_maxima_sensor_id: int | None + # _rm_discharge_sensor_id: int | None + + def __init__( + self, + soc_minima_sensor_id: int | None = None, + soc_maxima_sensor_id: int | None = None, + fill_level_sensor_id: int | None = None, + usage_forecast_sensor_id: int | None = None, + thp_fill_rate_sensor_id: int | None = None, + thp_efficiency_sensor_id: int | None = None, + nes_fill_rate_sensor_id: int | None = None, + nes_efficiency_sensor_id: int | None = None, + fill_rate_sensor_id: int | None = None, + rm_discharge_sensor_id: int | None = None, + timezone: str = "UTC", + schedule_duration: timedelta = timedelta(hours=12), + max_size: int = 100, + valid_from_shift: timedelta = timedelta(days=1), + **kwargs + ) -> None: + super().__init__(max_size) + + self._fill_level_sensor_id = fill_level_sensor_id + + self._fill_rate_sensor_id = fill_rate_sensor_id + self._thp_fill_rate_sensor_id = thp_fill_rate_sensor_id + self._thp_efficiency_sensor_id = thp_efficiency_sensor_id + self._nes_fill_rate_sensor_id = nes_fill_rate_sensor_id + self._nes_efficiency_sensor_id = nes_efficiency_sensor_id + + self._schedule_duration = schedule_duration + + self._usage_forecast_sensor_id = usage_forecast_sensor_id + self._soc_minima_sensor_id = soc_minima_sensor_id + self._soc_maxima_sensor_id = soc_maxima_sensor_id + self._rm_discharge_sensor_id = rm_discharge_sensor_id + + self._timezone = pytz.timezone(timezone) + + # delay the start of the schedule from the time `valid_from` + # of the FRBC.SystemDescritption + self._valid_from_shift = valid_from_shift + + self._active_recurring_schedule = False + + def now(self): + return self._timezone.localize(datetime.now()) + + async def send_storage_status(self, status: FRBCStorageStatus): + try: + await self._fm_client.post_measurements( + self._fill_level_sensor_id, + start=self.now(), + values=[status.present_fill_level], + unit=POWER_UNIT, + duration=timedelta(minutes=15), # INSTANTANEOUS + ) + except Exception as e: + response = ReceptionStatus( + subject_message_id=status.get("message_id"), + status=ReceptionStatusValues.PERMANENT_ERROR, + ) + await self._sending_queue.put(response) + + async def send_actuator_status(self, status: FRBCActuatorStatus): + factor = status.operation_mode_factor + system_description: FRBCSystemDescription = list( + self._system_description_history.values() + )[-1] + + # find the active FRBCOperationMode + for op_pos, operation_mode in enumerate( + system_description.actuators[0].operation_modes + ): + if operation_mode.id == status.active_operation_mode_id: + break + + dt = status.transition_timestamp # self.now() + + # Assume that THP is op_pos = 0 and NES = op_pos = 1. + # TODO: should we rely on a sensor_id? For example, "nes-actuator-mode", "thp-actuator-mode" + if op_pos == 0: + active_operation_mode_fill_rate_sensor_id = self._thp_fill_rate_sensor_id + else: + active_operation_mode_fill_rate_sensor_id = self._nes_fill_rate_sensor_id + + # Operation Mode Factor to fill rate + fill_rate = operation_mode.elements[0].fill_rate + fill_rate = ( + fill_rate.start_of_range + + (fill_rate.end_of_range - fill_rate.start_of_range) * factor + ) + + # Send data to the sensor of the fill rate corresponding to the active operation mode + await self._fm_client.post_measurements( + sensor_id=active_operation_mode_fill_rate_sensor_id, + start=dt, + values=[fill_rate], + unit=POWER_UNIT, + duration=timedelta(minutes=15), + ) + + # Send data to the sensor of the input fill_rate to the storage device + await self._fm_client.post_measurements( + sensor_id=self._fill_rate_sensor_id, + start=dt, + values=[fill_rate], + unit=POWER_UNIT, + duration=timedelta(minutes=15), + ) + + async def start_trigger_schedule(self): + """ + Start a recurring task to create new schedules. + + This function ensures that the scheduling task is started only once. + """ + + if not self._active_recurring_schedule: + self._active_recurring_schedule = True + self._recurrent_task = asyncio.create_task(self.trigger_schedule_task()) + self.background_tasks.add( + self._recurrent_task + ) # important to avoid a task disappearing mid-execution. + self._recurrent_task.add_done_callback(self.background_tasks.discard) + + async def stop_trigger_schedule(self): + """ + Stop the recurring task that creates new schedules. + + This function ensures that the scheduling task is stopped gracefully. + """ + + if self._active_recurring_schedule: + self._active_recurring_schedule = False + self._recurrent_task.cancel() + + async def trigger_schedule_task(self): + """ + Recurring task to trigger the schedule creation process. + + This task runs continuously while the active recurring schedule is enabled. + """ + + while self._active_recurring_schedule: + await self.trigger_scehdule() + await asyncio.sleep(TASK_PERIOD_SECONDS) + + async def trigger_scehdule(self): + """ + Ask FlexMeasures for a new schedule and create FRBC.Instructions to send back to the ResourceManager + """ + + # Retrieve the latest system description from history + system_description: FRBCSystemDescription = list( + self._system_description_history.values() + )[-1] + + actuator = system_description.actuators[0] + fill_level_range: NumberRange = system_description.storage.fill_level_range + + # get SOC Max and Min to be sent on the Flex Model + soc_min = fill_level_range.end_of_range + soc_max = fill_level_range.start_of_range + + operation_mode = actuator.operation_modes[0] + operation_mode_factor = 0.1 + + # TODO: 1) Call FlexMeasures + # TODO: 2) Select with which actuator to send the instruction + # TODO: 3) Create operation_mode_factor from power (we have a function for that) + + instruction = FRBCInstruction( + message_id=get_unique_id(), + id=get_unique_id(), + actuator_id=actuator.id, + operation_mode=operation_mode.id, # Based on the expeted fill_level, select the best actuator (most efficient) to fulfill a certain fill_rate + operation_mode_factor=operation_mode_factor, + execution_time=self.now(), + abnormal_condition=False, + ) + + # Put the instruction in the sending queue + await self._sending_queue.put(instruction) + + @register(FRBCSystemDescription) + def handle_system_description( + self, message: FRBCSystemDescription + ) -> pydantic.BaseModel: + """ + Handle FRBC.SystemDescription messages. + + Process: + 1) Store system_description message for later. + 2) Send conversion efficiencies (COP) to FlexMeasures. + 3) Start a recurring tasks to trigger the scehduler. + """ + + system_description_id = str(message.message_id) + + # store system_description message for later + self._system_description_history[system_description_id] = message + + # send conversion efficiencies + task = asyncio.create_task(self.send_conversion_efficiencies(message)) + self.background_tasks.add( + task + ) # important to avoid a task disappearing mid-execution. + task.add_done_callback(self.background_tasks.discard) + + # schedule trigger_schedule to run soon concurrently + task = asyncio.create_task(self.start_trigger_schedule()) + self.background_tasks.add( + task + ) # important to avoid a task disappearing mid-execution. + task.add_done_callback(self.background_tasks.discard) + + return get_reception_status(message, status=ReceptionStatusValues.OK) + + async def send_conversion_efficiencies( + self, system_description: FRBCSystemDescription + ): + """ + Send conversion efficiencies to FlexMeasures. + + Args: + system_description (FRBCSystemDescription): The system description containing actuator details. + """ + + start = system_description.valid_from + actuator = system_description.actuators[0] + + # Calculate the number of samples based on the conversion efficiency duration + N_SAMPLES = int( + pd.Timedelta(CONVERSION_EFFICIENCY_DURATION) / pd.Timedelta(RESOLUTION) + ) + + thp_op_mode_element = actuator.operation_modes[0].elements[-1] + nes_op_mode_element = actuator.operation_modes[1].elements[-1] + + # THP efficiencies: Calculate and post measurements for THP efficiencies + await self._fm_client.post_measurements( + sensor_id=self._thp_efficiency_sensor_id, + start=start, + values=[ + 100 + * thp_op_mode_element.fill_rate.end_of_range + / thp_op_mode_element.power_ranges[0].end_of_range + ] + * N_SAMPLES, + unit=PERCENTAGE, + duration=CONVERSION_EFFICIENCY_DURATION, + ) + + # NES efficiencies: Calculate and post measurements for NES efficiencies + await self._fm_client.post_measurements( + sensor_id=self._nes_efficiency_sensor_id, + start=start, + values=[ + 100 + * nes_op_mode_element.fill_rate.end_of_range + / nes_op_mode_element.power_ranges[0].end_of_range + ] + * N_SAMPLES, + unit=PERCENTAGE, + duration=CONVERSION_EFFICIENCY_DURATION, + ) + + async def close(self): + """ + Closing procedure: + 1) Stop recurrent task + """ + + await self.stop_trigger_schedule() + + async def send_usage_forecast(self, usage_forecast: FRBCUsageForecast): + """ + Send FRBC.UsageForecast to FlexMeasures. + + Args: + usage_forecast (FRBCUsageForecast): The usage forecast to be translated and sent. + """ + start_time = usage_forecast.start_time + # todo: floor to RESOLUTION + + usage_forecast = translate_usage_forecast_to_fm( + usage_forecast, RESOLUTION, strategy="mean" + ) + + await self._fm_client.post_measurements( + sensor_id=self._usage_forecast_sensor_id, + start=start_time, + values=usage_forecast.tolist(), + unit=POWER_UNIT, + duration=str(pd.Timedelta(RESOLUTION) * len(usage_forecast)), + ) + + async def send_fill_level_target_profile( + self, fill_level_target_profile: FRBCFillLevelTargetProfile + ): + """ + Send FRBC.FillLevelTargetProfile to FlexMeasures. + + Args: + fill_level_target_profile (FRBCFillLevelTargetProfile): The fill level target profile to be translated and sent. + """ + + soc_minima, soc_maxima = translate_fill_level_target_profile( + fill_level_target_profile, + resolution=RESOLUTION, + ) + + duration = str(pd.Timedelta(RESOLUTION) * len(soc_maxima)) + + # POST SOC Minima measurements to FlexMeasures + await self._fm_client.post_measurements( + sensor_id=self._soc_minima_sensor_id, + start=fill_level_target_profile.start_time, + values=soc_minima, + unit=POWER_UNIT, + duration=duration, + ) + + # POST SOC Maxima measurements to FlexMeasures + await self._fm_client.post_measurements( + sensor_id=self._soc_maxima_sensor_id, + start=fill_level_target_profile.start_time, + values=soc_maxima, + unit=POWER_UNIT, + duration=duration, + ) diff --git a/src/flexmeasures_client/s2/control_types/PPBC/__init__.py b/src/flexmeasures_client/s2/control_types/PPBC/__init__.py new file mode 100644 index 00000000..93101832 --- /dev/null +++ b/src/flexmeasures_client/s2/control_types/PPBC/__init__.py @@ -0,0 +1,68 @@ +import asyncio + +import pydantic +from s2python.common import ControlType, ReceptionStatusValues +from s2python.ppbc import PPBCPowerProfileDefinition, PPBCPowerProfileStatus + +from flexmeasures_client.s2 import SizeLimitOrderedDict, register +from flexmeasures_client.s2.control_types import ControlTypeHandler +from flexmeasures_client.s2.utils import get_reception_status + +# from flexmeasures_client.s2.utils import get_reception_status, get_unique_id + + +class PPBC(ControlTypeHandler): + _control_type = ControlType.POWER_PROFILE_BASED_CONTROL + + _power_profile_definition_history: SizeLimitOrderedDict[ + str, PPBCPowerProfileDefinition + ] + _power_profile_status_history: SizeLimitOrderedDict[str, PPBCPowerProfileStatus] + + def __init__(self, max_size: int = 100) -> None: + super().__init__(max_size) + + self._power_profile_definition_history = SizeLimitOrderedDict(max_size=max_size) + self._power_profile_status_history = SizeLimitOrderedDict(max_size=max_size) + # Keep track of the tasks that are running asynchronously + self.background_tasks = set() + + @register(PPBCPowerProfileDefinition) + def handle_power_profile_definition( + self, message: PPBCPowerProfileDefinition + ) -> pydantic.BaseModel: + power_profile_id = str(message.message_id) + + # Store the power profile definition + self._power_profile_definition_history[power_profile_id] = message + + task = asyncio.create_task(self.send_power_profile_definition(message)) + self.background_tasks.add( + task + ) # Important to avoid a task disappearing mid-execution. + task.add_done_callback(self.background_tasks.discard) + + return get_reception_status(message, status=ReceptionStatusValues.OK) + + @register(PPBCPowerProfileStatus) + def handle_power_profile_status( + self, message: PPBCPowerProfileStatus + ) -> pydantic.BaseModel: + power_profile_status_message_id = str(message.message_id) + + # Store the power profile status + self._power_profile_status_history[power_profile_status_message_id] = message + + task = asyncio.create_task(self.send_power_profile_status(message)) + self.background_tasks.add( + task + ) # Important to avoid a task disappearing mid-execution. + task.add_done_callback(self.background_tasks.discard) + + return get_reception_status(message, status=ReceptionStatusValues.OK) + + async def send_power_profile_definition(self, message: PPBCPowerProfileDefinition): + raise NotImplementedError() + + async def send_power_profile_status(self, message: PPBCPowerProfileStatus): + raise NotImplementedError() diff --git a/src/flexmeasures_client/s2/control_types/PPBC/ppbc_simple.py b/src/flexmeasures_client/s2/control_types/PPBC/ppbc_simple.py new file mode 100644 index 00000000..42caae85 --- /dev/null +++ b/src/flexmeasures_client/s2/control_types/PPBC/ppbc_simple.py @@ -0,0 +1,75 @@ +""" +This module contains the PPBC simple control type. +""" + +from datetime import datetime, timedelta + +import pytz +from s2python.ppbc import PPBCScheduleInstruction + +from flexmeasures_client.s2.control_types.PPBC import PPBC + + +class PPBCSimple(PPBC): + _power_sensor_id: int + _price_sensor_id: int + _schedule_duration: timedelta + _valid_from_shift: timedelta + + def __init__( + self, + power_sensor_id: int, + price_sensor_id: int, + timezone: str = "UTC", + schedule_duration: timedelta = timedelta(hours=12), + max_size: int = 100, + valid_from_shift: timedelta = timedelta(days=1), + ) -> None: + super().__init__(max_size) + self._power_sensor_id = power_sensor_id + self._price_sensor_id = price_sensor_id + self._schedule_duration = schedule_duration + self._timezone = pytz.timezone(timezone) + + # delay the start of the schedule from the time `valid_from` + # of the PPBC.SystemDescription. + self._valid_from_shift = valid_from_shift + + def now(self): + return self._timezone.localize(datetime.now()) + + # todo: let's make this more like FRBCSimple.trigger_schedule: + # a) call self._fm_client.trigger_and_get_schedule + # b) put instructions to sending queue + async def send_schedule_instruction(self, instruction: PPBCScheduleInstruction): + await self._fm_client.post_schedule( + self._power_sensor_id, + start=self.now(), + values=instruction.power_values, + unit="MW", + duration=self._schedule_duration, + price_sensor_id=self._price_sensor_id, + price_values=instruction.price_values, + price_unit="EUR/MWh", + valid_from=self.now() + self._valid_from_shift, + ) + + async def trigger_schedule(self, definition_id: str): + definition: PPBCScheduleInstruction = self._power_profile_definition_history[ + definition_id + ] + + if len(self._power_profile_status_history) == 0: + print("Can't trigger schedule without knowing the status of the profile...") + return + + # Call schedule + # schedule = await self._fm_client.trigger_and_get_schedule( + # start=definition.start_time + self._valid_from_shift, + # sensor_id=self._power_sensor_id, + # flex_context=dict( + # production_price_sensor=self._price_sensor_id, + # consumption_price_sensor=self._price_sensor_id, + # ), + + # ) diff --git a/src/flexmeasures_client/s2/control_types/PPBC/utils.py b/src/flexmeasures_client/s2/control_types/PPBC/utils.py new file mode 100644 index 00000000..e69de29b diff --git a/src/flexmeasures_client/s2/control_types/translations.py b/src/flexmeasures_client/s2/control_types/translations.py new file mode 100644 index 00000000..db22bcb4 --- /dev/null +++ b/src/flexmeasures_client/s2/control_types/translations.py @@ -0,0 +1,194 @@ +# flake8: noqa + +from datetime import timedelta + +import numpy as np +import pandas as pd +from s2python.frbc import ( + FRBCFillLevelTargetProfile, + FRBCLeakageBehaviour, + FRBCUsageForecast, +) + + +def leakage_behaviour_to_storage_efficieny( + message: FRBCLeakageBehaviour, resolution=timedelta(minutes=15) +) -> float: + """ + Convert a FRBC.LeakeageBehaviour message into a FlexMeasures compatible storage efficiency. + + Definitions: + + LeakageBehaviour: how fast the momentary fill level will decrease per second + due to leakage within the given range of the fill level. This is defined as a function of the + fill level. + + Storage Efficiency: percentage of the storage that remains after one time period. + + Example: + + { + ..., + "elements" : [ + { + "fill_level_range" : {"start_of_range" : 0, "end_of_range" : 5}, + "leakage_rate" : 0 + }, + { + "fill_level_range" : {"start_of_range" : 5, "end_of_range" : 95}, + "leakage_rate" : 1/3600 + } + { + "fill_level_range" : {"start_of_range" : 95, "end_of_range" : 100}, + "leakage_rate" : 2/3600 + } + ] + } + + """ + + last_element = message.elements[-1] + return ( + 1 + - (resolution / timedelta(seconds=1)) + * last_element.leakage_rate + / last_element.fill_level_range.end_of_range + ) + + +def unevenly_ts_to_evenly( + start: pd.Timestamp, + values: list[float], + durations: list[pd.Timedelta], + target_resolution: str, + strategy="mean", +) -> pd.Series: + """ + Convert unevenly spaced time series data into evenly spaced data. + + The function will: + - Floor the start time to align with the target resolution. + - Ceil the end time to align with the target resolution. + - Interpolate and resample the data based on the chosen aggregation strategy. + + Args: + start (pd.Timestamp): The starting timestamp of the time series data. + values (list[float]): The list of values for each time period. + durations (list[pd.Timedelta]): The list of durations for each value. + target_resolution (str): The target time resolution for resampling. + strategy (str): Aggregation strategy ("mean", "min", "max", etc.) for resampling. + + Returns: + pd.Series: A Pandas Series with evenly spaced timestamps and interpolated values. + """ + + # Calculate the time from the absolute start of each event + deltas = pd.TimedeltaIndex(np.cumsum([timedelta(0)] + durations)) + + # Ceil the end time to align with the target resolution + end = pd.Timestamp(start + deltas[-1]).ceil(target_resolution) + + # Floor the start time to align with the target resolution + start = start.floor(target_resolution) + + # Create an index for the time series based on the start time and deltas + index = start + deltas + + # Make a copy of the values list and append a NaN to handle the end boundary + values = values.copy() + values.append(np.nan) + series = pd.Series(values, index) + + # Reindex the series with a regular time grid and forward-fill missing values + series = series.reindex( + pd.date_range( + start=start, + end=end, + freq=min(min(durations), pd.Timedelta(target_resolution)), + inclusive="left", + ) + ).ffill() + + # Resample the series to the target resolution using the specified aggregation strategy and forward-fill + series = series.resample(target_resolution).agg(strategy).ffill() + + return series + + +def translate_usage_forecast_to_fm( + usage_forecast: FRBCUsageForecast, + resolution: str = "1h", + strategy: str = "mean", +) -> pd.Series: + """ + Translate a FRBC.UsageForecast into a FlexMeasures compatible format with evenly spaced data. + + Args: + usage_forecast (FRBCUsageForecast): The usage forecast message with start time and elements. + resolution (str): The target time resolution for resampling (e.g., "1h"). + + Returns: + pd.Series: A Pandas Series with evenly spaced timestamps and usage forecast values. + """ + + start = pd.Timestamp(usage_forecast.start_time) + + durations = [element.duration.to_timedelta() for element in usage_forecast.elements] + values = [element.usage_rate_expected for element in usage_forecast.elements] + + return unevenly_ts_to_evenly( + start=start, + values=values, + durations=durations, + target_resolution=resolution, + strategy=strategy, + ) + + +def translate_fill_level_target_profile( + fill_level_target_profile: FRBCFillLevelTargetProfile, resolution: str = "1h" +) -> tuple[pd.Series, pd.Series]: + """ + Translate a FRBC.FillLevelTargetProfile into SOC minima and maxima compatible with FlexMeasures. + + Args: + fill_level_target_profile (FRBCFillLevelTargetProfile): The target profile message with start time and elements. + resolution (str): The target time resolution for resampling (e.g., "1h"). + + Returns: + tuple[pd.Series, pd.Series]: A tuple containing SOC minima and maxima as Pandas Series. + """ + + start = pd.Timestamp(fill_level_target_profile.start_time) + + durations = [ + element.duration.to_timedelta() + for element in fill_level_target_profile.elements + ] + + soc_minima_values = [ + element.fill_level_range.start_of_range + for element in fill_level_target_profile.elements + ] + soc_maxima_values = [ + element.fill_level_range.end_of_range + for element in fill_level_target_profile.elements + ] + + soc_minima = unevenly_ts_to_evenly( + start=start, + values=soc_minima_values, + durations=durations, + target_resolution=resolution, + strategy="min", + ) + + soc_maxima = unevenly_ts_to_evenly( + start=start, + values=soc_maxima_values, + durations=durations, + target_resolution=resolution, + strategy="max", + ) + + return soc_minima, soc_maxima diff --git a/src/flexmeasures_client/s2/wrapper.py b/src/flexmeasures_client/s2/wrapper.py new file mode 100644 index 00000000..09003bba --- /dev/null +++ b/src/flexmeasures_client/s2/wrapper.py @@ -0,0 +1,13 @@ +from datetime import datetime + +from pydantic import BaseModel, Field +from s2python.message import S2Message + + +class MetaData(BaseModel): + dt: datetime + + +class S2Wrapper(BaseModel): + message: S2Message = Field(discriminator="message_type") + metadata: MetaData diff --git a/tests/conftest.py b/tests/conftest.py index e0657022..8d49a3e4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,8 +1,237 @@ -""" - Dummy conftest.py for flexmeasures_client. - - If you don't know what this is for, just leave it empty. - Read more about conftest.py under: - - https://docs.pytest.org/en/stable/fixture.html - - https://docs.pytest.org/en/stable/writing_plugins.html -""" +from __future__ import annotations + +from datetime import datetime, timedelta, timezone + +import pytest +from s2python.common import ( + Commodity, + CommodityQuantity, + ControlType, + Duration, + EnergyManagementRole, + Handshake, + NumberRange, + PowerForecastValue, + PowerRange, + ResourceManagerDetails, + Role, + RoleType, +) +from s2python.frbc import ( + FRBCActuatorDescription, + FRBCOperationMode, + FRBCOperationModeElement, + FRBCStorageDescription, + FRBCSystemDescription, +) +from s2python.ppbc import ( + PPBCPowerProfileDefinition, + PPBCPowerSequence, + PPBCPowerSequenceContainer, + PPBCPowerSequenceElement, +) + +from flexmeasures_client.s2.utils import get_unique_id + + +@pytest.fixture(scope="session") +def frbc_system_description(): + ######## + # FRBC # + ######## + + thp_operation_mode_element = FRBCOperationModeElement( + fill_level_range=NumberRange(start_of_range=0, end_of_range=80), + fill_rate=NumberRange(start_of_range=0, end_of_range=2), + power_ranges=[ + PowerRange( + start_of_range=10, + end_of_range=1000, + commodity_quantity=CommodityQuantity.ELECTRIC_POWER_3_PHASE_SYMMETRIC, + ) + ], + ) + + thp_operation_mode = FRBCOperationMode( + id=get_unique_id(), + elements=[thp_operation_mode_element], + abnormal_condition_only=False, + ) + + nes_operation_mode_element = FRBCOperationModeElement( + fill_level_range=NumberRange(start_of_range=0, end_of_range=100), + fill_rate=NumberRange(start_of_range=0, end_of_range=1), + power_ranges=[ + PowerRange( + start_of_range=10, + end_of_range=1000, + commodity_quantity=CommodityQuantity.ELECTRIC_POWER_3_PHASE_SYMMETRIC, + ) + ], + ) + + nes_operation_mode = FRBCOperationMode( + id=get_unique_id(), + elements=[nes_operation_mode_element], + abnormal_condition_only=False, + ) + + actuator = FRBCActuatorDescription( + id=get_unique_id(), + supported_commodities=[Commodity.ELECTRICITY], + operation_modes=[thp_operation_mode, nes_operation_mode], + transitions=[], + timers=[], + ) + + storage = FRBCStorageDescription( + provides_leakage_behaviour=True, + provides_fill_level_target_profile=True, + provides_usage_forecast=True, + fill_level_range=NumberRange(start_of_range=0, end_of_range=1), + ) + + system_description_message = FRBCSystemDescription( + message_id=get_unique_id(), + valid_from=datetime(2024, 1, 1, tzinfo=timezone.utc), # Attach UTC timezone + actuators=[actuator], + storage=storage, + ) + + return system_description_message + + +@pytest.fixture(scope="session") +def ppbc_power_profile_definition(): + forecast1 = PowerForecastValue( + value_expected=100.0, commodity_quantity=CommodityQuantity.ELECTRIC_POWER_L1 + ) + forecast2 = PowerForecastValue( + value_expected=200.0, commodity_quantity=CommodityQuantity.ELECTRIC_POWER_L1 + ) + forecast3 = PowerForecastValue( + value_expected=300.0, commodity_quantity=CommodityQuantity.ELECTRIC_POWER_L1 + ) + + element1 = PPBCPowerSequenceElement( + duration=Duration(1), power_values=[forecast1, forecast2] + ) + element2 = PPBCPowerSequenceElement( + duration=Duration(1), power_values=[forecast2, forecast3, forecast1] + ) + + power_sequence1 = PPBCPowerSequence( + id=get_unique_id(), + elements=[element1, element2], + is_interruptible=False, + max_pause_before=Duration(0), + abnormal_condition_only=False, + ) + + power_sequence2 = PPBCPowerSequence( + id=get_unique_id(), + elements=[element2, element1], + is_interruptible=True, + max_pause_before=Duration(0), + abnormal_condition_only=True, + ) + + power_sequence3 = PPBCPowerSequence( + id=get_unique_id(), + elements=[element2], + is_interruptible=False, + max_pause_before=Duration(10000), + abnormal_condition_only=False, + ) + + power_sequence4 = PPBCPowerSequence( + id=get_unique_id(), + elements=[element1], + is_interruptible=True, + max_pause_before=Duration(10000), + abnormal_condition_only=True, + ) + + power_sequence_container1 = PPBCPowerSequenceContainer( + id=get_unique_id(), + power_sequences=[ + power_sequence1, + power_sequence2, + ], + ) + + power_sequence_container2 = PPBCPowerSequenceContainer( + id=get_unique_id(), + power_sequences=[ + power_sequence3, + ], + ) + + power_sequence_container3 = PPBCPowerSequenceContainer( + id=get_unique_id(), + power_sequences=[ + power_sequence4, + ], + ) + + power_profile_definition = PPBCPowerProfileDefinition( + message_id=get_unique_id(), + id=get_unique_id(), + start_time=datetime.now(timezone.utc), + end_time=datetime.now(timezone.utc) + timedelta(hours=4), + power_sequences_containers=[ + power_sequence_container1, + power_sequence_container2, + power_sequence_container3, + ], + ) + + return power_profile_definition + + +@pytest.fixture(scope="session") +def resource_manager_details_frbc(): + + return ResourceManagerDetails( + message_id=get_unique_id(), + resource_id=get_unique_id(), + roles=[Role(role=RoleType.ENERGY_STORAGE, commodity=Commodity.ELECTRICITY)], + instruction_processing_delay=Duration(1), + available_control_types=[ + ControlType.FILL_RATE_BASED_CONTROL, + ControlType.NO_SELECTION, + ], + provides_forecast=True, + provides_power_measurement_types=[ + CommodityQuantity.ELECTRIC_POWER_3_PHASE_SYMMETRIC + ], + ) + + +@pytest.fixture(scope="session") +def resource_manager_details_ppbc(): + return ResourceManagerDetails( + message_id=get_unique_id(), + resource_id=get_unique_id(), + roles=[Role(role=RoleType.ENERGY_CONSUMER, commodity=Commodity.ELECTRICITY)], + instruction_processing_delay=Duration(1), + available_control_types=[ + ControlType.POWER_PROFILE_BASED_CONTROL, + ControlType.NO_SELECTION, + ], + provides_forecast=True, + provides_power_measurement_types=[ + CommodityQuantity.ELECTRIC_POWER_L1, + CommodityQuantity.ELECTRIC_POWER_L2, + CommodityQuantity.ELECTRIC_POWER_L3, + ], + ) + + +@pytest.fixture(scope="session") +def rm_handshake(): + return Handshake( + message_id=get_unique_id(), + role=EnergyManagementRole.RM, + supported_protocol_versions=["1.0.0"], + ) diff --git a/tests/test_cem.py b/tests/test_cem.py new file mode 100644 index 00000000..36283428 --- /dev/null +++ b/tests/test_cem.py @@ -0,0 +1,379 @@ +from __future__ import annotations + +import pytest +from s2python.common import ControlType, ReceptionStatus, ReceptionStatusValues + +from flexmeasures_client.s2.cem import CEM +from flexmeasures_client.s2.control_types.FRBC import FRBCTest +from flexmeasures_client.s2.control_types.PPBC import PPBC + + +@pytest.mark.asyncio +async def test_handshake(rm_handshake): + cem = CEM(fm_client=None) + frbc = FRBCTest() + ppbc = PPBC() + + cem.register_control_type(frbc) + cem.register_control_type(ppbc) + + ############# + # Handshake # + ############# + + # RM sends HandShake + await cem.handle_message(rm_handshake) + + assert ( + cem._sending_queue.qsize() == 1 + ) # check that message is put to the outgoing queue + + # CEM response + response = await cem.get_message() + + assert ( + response["message_type"] == "HandshakeResponse" + ), "response message_type should be HandshakeResponse" + assert ( + response["selected_protocol_version"] == "0.1.0" + ), "CEM selected protocol version should be supported by the Resource Manager" + + +# FRBC +@pytest.mark.asyncio +async def test_resource_manager_details_frbc( + resource_manager_details_frbc, rm_handshake +): + + cem = CEM(fm_client=None) + frbc = FRBCTest() + + cem.register_control_type(frbc) + + ############# + # Handshake # + ############# + + await cem.handle_message(rm_handshake) + + assert ( + cem._sending_queue.qsize() == 1 + ) # check that message is put to the outgoing queue + + response = await cem.get_message() + + ########################## + # ResourceManagerDetails # + ########################## + + # RM sends ResourceManagerDetails + await cem.handle_message(resource_manager_details_frbc) + + response = await cem.get_message() + + # CEM response is ReceptionStatus with an OK status + assert response["message_type"] == "ReceptionStatus" + assert response["status"] == "OK" + + assert ( + cem._resource_manager_details == resource_manager_details_frbc + ), "CEM should store the resource_manager_details" + assert cem.control_type == ControlType.NO_SELECTION, ( + "CEM control type should switch to ControlType.NO_SELECTION," + "independently of the original type" + ) + + +@pytest.mark.asyncio +async def test_activate_control_type_frbc( + frbc_system_description, resource_manager_details_frbc, rm_handshake +): + cem = CEM(fm_client=None) + frbc = FRBCTest() + + cem.register_control_type(frbc) + + ############# + # Handshake # + ############# + + await cem.handle_message(rm_handshake) + response = await cem.get_message() + + ########################## + # ResourceManagerDetails # + ########################## + await cem.handle_message(resource_manager_details_frbc) + + response = await cem.get_message() + + ######################### + # Activate control type # + ######################### + + # CEM sends a request to change the control type + await cem.activate_control_type(ControlType.FILL_RATE_BASED_CONTROL) + message = await cem.get_message() + + assert cem.control_type == ControlType.NO_SELECTION, ( + "the control type should still be NO_SELECTION (rather than FRBC)," + " because the RM has not yet confirmed FRBC activation" + ) + + response = ReceptionStatus( + subject_message_id=message.get("message_id"), status=ReceptionStatusValues.OK + ) + + await cem.handle_message(response) + + assert ( + cem.control_type == ControlType.FILL_RATE_BASED_CONTROL + ), "after a positive ResponseStatus, the status changes from NO_SELECTION to FRBC" + + +@pytest.mark.asyncio +async def test_messages_route_to_control_type_handler_frbc( + frbc_system_description, resource_manager_details_frbc, rm_handshake +): + cem = CEM(fm_client=None) + frbc = FRBCTest() + + cem.register_control_type(frbc) + + ############# + # Handshake # + ############# + + await cem.handle_message(rm_handshake) + response = await cem.get_message() + + ########################## + # ResourceManagerDetails # + ########################## + await cem.handle_message(resource_manager_details_frbc) + + response = await cem.get_message() + + ######################### + # Activate control type # + ######################### + + await cem.activate_control_type(ControlType.FILL_RATE_BASED_CONTROL) + message = await cem.get_message() + + response = ReceptionStatus( + subject_message_id=message.get("message_id"), status=ReceptionStatusValues.OK + ) + + await cem.handle_message(response) + + ######## + # FRBC # + ######## + + await cem.handle_message(frbc_system_description) + response = await cem.get_message() + + # checking that FRBC handler is being called + assert ( + cem._control_types_handlers[ + ControlType.FILL_RATE_BASED_CONTROL + ]._system_description_history[str(frbc_system_description.message_id)] + == frbc_system_description + ), ( + "the FRBC.SystemDescription message should be stored" + "in the frbc.system_description_history variable" + ) + + # change of control type is not performed in case that the RM answers + # with a negative response + await cem.activate_control_type(ControlType.NO_SELECTION) + response = await cem.get_message() + assert ( + cem._control_type == ControlType.FILL_RATE_BASED_CONTROL + ), "control type should not change, confirmation still pending" + + await cem.handle_message( + ReceptionStatus( + subject_message_id=response.get("message_id"), + status=ReceptionStatusValues.INVALID_CONTENT, + ) + ) + + assert ( + cem._control_type == ControlType.FILL_RATE_BASED_CONTROL + ), "control type should not change, confirmation state is not 'OK'" + assert ( + response.get("message_id") + not in cem._control_types_handlers[ + ControlType.FILL_RATE_BASED_CONTROL + ].success_callbacks + ), "success callback should be deleted" + + +# PPBC +@pytest.mark.asyncio +async def test_resource_manager_details_ppbc( + resource_manager_details_ppbc, rm_handshake +): + cem = CEM(fm_client=None) + ppbc = PPBC() + + cem.register_control_type(ppbc) + + ############# + # Handshake # + ############# + + await cem.handle_message(rm_handshake) + + assert cem._sending_queue.qsize() == 1 + + response = await cem.get_message() + + ########################## + # ResourceManagerDetails # + ########################## + + # RM sends ResourceManagerDetails + await cem.handle_message(resource_manager_details_ppbc) + response = await cem.get_message() + + # CEM response is ReceptionStatus with an OK status + assert response["message_type"] == "ReceptionStatus" + assert response["status"] == "OK" + + assert ( + cem._resource_manager_details == resource_manager_details_ppbc + ), "CEM should store the resource_manager_details" + assert cem.control_type == ControlType.NO_SELECTION, ( + "CEM control type should switch to ControlType.NO_SELECTION," + "independently of the original type" + ) + + +@pytest.mark.asyncio +async def test_activate_control_type_ppbc( + ppbc_power_profile_definition, resource_manager_details_ppbc, rm_handshake +): + cem = CEM(fm_client=None) + ppbc = PPBC() + + cem.register_control_type(ppbc) + + ############# + # Handshake # + ############# + + await cem.handle_message(rm_handshake) + response = await cem.get_message() + + ########################## + # ResourceManagerDetails # + ########################## + await cem.handle_message(resource_manager_details_ppbc) + response = await cem.get_message() + + ######################### + # Activate control type # + ######################### + + # CEM sends a request to change the control type + await cem.activate_control_type(ControlType.POWER_PROFILE_BASED_CONTROL) + message = await cem.get_message() + + assert cem.control_type == ControlType.NO_SELECTION, ( + "the control type should still be NO_SELECTION (rather than PPBC)," + " because the RM has not yet confirmed PPBC activation" + ) + + response = ReceptionStatus( + subject_message_id=message.get("message_id"), status=ReceptionStatusValues.OK + ) + + await cem.handle_message(response) + + assert ( + cem.control_type == ControlType.POWER_PROFILE_BASED_CONTROL + ), "after a positive ResponseStatus, the status changes from NO_SELECTION to PPBC" + + +@pytest.mark.asyncio +async def test_messages_route_to_control_type_handler_ppbc( + ppbc_power_profile_definition, resource_manager_details_ppbc, rm_handshake +): + cem = CEM(fm_client=None) + ppbc = PPBC() + + cem.register_control_type(ppbc) + + ############# + # Handshake # + ############# + + await cem.handle_message(rm_handshake) + response = await cem.get_message() + + ########################## + # ResourceManagerDetails # + ########################## + await cem.handle_message(resource_manager_details_ppbc) + response = await cem.get_message() + + ######################### + # Activate control type # + ######################### + + await cem.activate_control_type(ControlType.POWER_PROFILE_BASED_CONTROL) + message = await cem.get_message() + + response = ReceptionStatus( + subject_message_id=message.get("message_id"), status=ReceptionStatusValues.OK + ) + + await cem.handle_message(response) + + ######## + # PPBC # + ######## + + await cem.handle_message(ppbc_power_profile_definition) + response = await cem.get_message() + + # checking that PPBC handler is being called + assert ( + cem._control_types_handlers[ + ControlType.POWER_PROFILE_BASED_CONTROL + ]._power_profile_definition_history[ + str(ppbc_power_profile_definition.message_id) + ] + == ppbc_power_profile_definition + ), ( + "the PPBC.power_profile_definition message should be stored" + "in the ppbc._power_profile_definition_history variable" + ) + + # change of control type is not performed in case that the RM answers + # with a negative response + await cem.activate_control_type(ControlType.NO_SELECTION) + response = await cem.get_message() + assert ( + cem._control_type == ControlType.POWER_PROFILE_BASED_CONTROL + ), "control type should not change, confirmation still pending" + + await cem.handle_message( + ReceptionStatus( + subject_message_id=response.get("message_id"), + status=ReceptionStatusValues.INVALID_CONTENT, + ) + ) + + assert ( + cem._control_type == ControlType.POWER_PROFILE_BASED_CONTROL + ), "control type should not change, confirmation state is not 'OK'" + assert ( + response.get("message_id") + not in cem._control_types_handlers[ + ControlType.POWER_PROFILE_BASED_CONTROL + ].success_callbacks + ), "success callback should be deleted" diff --git a/tests/test_frbc_tunes.py b/tests/test_frbc_tunes.py new file mode 100644 index 00000000..a2223d3a --- /dev/null +++ b/tests/test_frbc_tunes.py @@ -0,0 +1,295 @@ +from __future__ import annotations + +import asyncio +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock + +import numpy as np +import pandas as pd +import pytest +from s2python.common import ControlType, ReceptionStatus, ReceptionStatusValues + +from flexmeasures_client.client import FlexMeasuresClient +from flexmeasures_client.s2.cem import CEM +from flexmeasures_client.s2.control_types.FRBC.frbc_tunes import ( + FillRateBasedControlTUNES, +) +from flexmeasures_client.s2.utils import get_unique_id + + +@pytest.fixture(scope="function") +async def setup_cem(resource_manager_details_frbc, rm_handshake): + fm_client = AsyncMock(FlexMeasuresClient) + cem = CEM(fm_client=fm_client) + frbc = FillRateBasedControlTUNES( + soc_minima_sensor_id=2, + soc_maxima_sensor_id=3, + rm_discharge_sensor_id=4, + fill_level_sensor_id=7, + thp_fill_rate_sensor_id=8, + thp_efficiency_sensor_id=9, + nes_fill_rate_sensor_id=10, + nes_efficiency_sensor_id=11, + usage_forecast_sensor_id=12, + fill_rate_sensor_id=13, + timezone="UTC", + schedule_duration=timedelta(hours=12), + max_size=100, + valid_from_shift=timedelta(days=1), + ) + + cem.register_control_type(frbc) + + ############# + # Handshake # + ############# + + await cem.handle_message(rm_handshake) + response = await cem.get_message() + + ########################## + # ResourceManagerDetails # + ########################## + await cem.handle_message(resource_manager_details_frbc) + response = await cem.get_message() + + ######################### + # Activate control type # + ######################### + + await cem.activate_control_type(ControlType.FILL_RATE_BASED_CONTROL) + message = await cem.get_message() + + response = ReceptionStatus( + subject_message_id=message.get("message_id"), status=ReceptionStatusValues.OK + ) + + await cem.handle_message(response) + + return cem, fm_client + + +@pytest.fixture(scope="function") +async def cem_in_frbc_control_type(setup_cem, frbc_system_description): + cem, fm_client = await setup_cem + + ######## + # FRBC # + ######## + + await cem.handle_message(frbc_system_description) + await cem.get_message() + + return cem, fm_client, frbc_system_description + + +@pytest.mark.asyncio +async def test_system_description(cem_in_frbc_control_type, frbc_system_description): + cem, fm_client, frbc_system_description = await cem_in_frbc_control_type + + ######## + # FRBC # + ######## + + await cem.handle_message(frbc_system_description) + frbc = cem._control_types_handlers[cem.control_type] + + tasks = get_pending_tasks() + + # check that we are sending the conversion efficiencies + await tasks["send_conversion_efficiencies"] + from flexmeasures_client.s2.control_types.FRBC.frbc_tunes import ( + CONVERSION_EFFICIENCY_DURATION, + RESOLUTION, + ) + + N_SAMPLES = int( + pd.Timedelta(CONVERSION_EFFICIENCY_DURATION) / pd.Timedelta(RESOLUTION) + ) + + # first call of post_measurements which corresponds to the THP efficiency + first_call = fm_client.post_measurements.call_args_list[0][1] + first_call_expected = { + "sensor_id": frbc._thp_efficiency_sensor_id, + "start": datetime(2024, 1, 1, tzinfo=timezone.utc), + "values": [0.2] * N_SAMPLES, + "unit": "%", + "duration": "PT24H", + } + for key in first_call.keys(): + assert first_call[key] == first_call_expected[key] + + # second call of post_measurements which corresponds to the NES efficiency + second_call = fm_client.post_measurements.call_args_list[1][1] + + second_call_expected = { + "sensor_id": frbc._nes_efficiency_sensor_id, + "start": datetime(2024, 1, 1, tzinfo=timezone.utc), + "values": [0.1] * N_SAMPLES, + "unit": "%", + "duration": "PT24H", + } + for key in second_call.keys(): + assert second_call[key] == second_call_expected[key] + + await cem.close() + get_pending_tasks() + + +def get_pending_tasks(): + pending = asyncio.all_tasks() + + tasks = {} + + # get all pending tasks + for task in pending: + func_name = task.get_coro().cr_code.co_name + tasks[func_name] = task + + return tasks + + +@pytest.mark.asyncio +async def test_fill_level_target_profile(cem_in_frbc_control_type): + cem, fm_client, frbc_system_description = await cem_in_frbc_control_type + + fill_level_target_profile = { + "start_time": "2024-01-01T00:00:00+01:00", + "message_type": "FRBC.FillLevelTargetProfile", + "message_id": get_unique_id(), + "elements": [ + { + "duration": 1e3 * 3600, + "fill_level_range": {"start_of_range": 0, "end_of_range": 100}, + }, + { + "duration": 1e3 * 2 * 3600, + "fill_level_range": {"start_of_range": 10, "end_of_range": 90}, + }, + { + "duration": 1e3 * 3 * 3600, + "fill_level_range": {"start_of_range": 20, "end_of_range": 80}, + }, + ], + } + + await cem.handle_message(fill_level_target_profile) + + tasks = get_pending_tasks() + + # clear mock state because it contains previous such as + # the ones used to process the system description + fm_client.reset_mock() + + # wait for the task send_fill_level_target_profile to finish + await tasks["send_fill_level_target_profile"] + + start = datetime(2024, 1, 1, 0, 0, tzinfo=timezone(timedelta(seconds=3600))) + + first_call = fm_client.post_measurements.call_args_list[0][1] + assert first_call["sensor_id"] == 2 + assert first_call["start"] == start + assert np.isclose(first_call["values"].values, [0] * 4 + [10] * 8 + [20] * 12).all() + + second_call = fm_client.post_measurements.call_args_list[1][1] + assert second_call["sensor_id"] == 3 + assert second_call["start"] == start + assert np.isclose( + second_call["values"].values, [100] * 4 + [90] * 8 + [80] * 12 + ).all() + + await cem.close() + get_pending_tasks() + + +@pytest.mark.asyncio +async def test_fill_rate_relay(cem_in_frbc_control_type): + """Check whether the fill rate from the Tarnoc or Nestor is relayed + to the overall heating system's fill rate sensor, and the fill rate sensor ID + corresponds correctly to the Tarnoc fill rate sensor or the Nestor fill rate sensor. + """ + + cem, fm_client, frbc_system_description = await cem_in_frbc_control_type + frbc = cem._control_types_handlers[cem.control_type] + + actuator_status = { + "active_operation_mode_id": frbc_system_description.actuators[0] + .operation_modes[0] + .id, # ID representing Tarnoc operation mode + "actuator_id": frbc_system_description.actuators[0].id, # ID of the actuator + "message_type": "FRBC.ActuatorStatus", + "message_id": get_unique_id(), + "operation_mode_factor": 0.0, + } + + await cem.handle_message(actuator_status) + + tasks = get_pending_tasks() + + # clear mock state because it contains previous such as + # the ones used to process the system description + fm_client.reset_mock() + + # wait for the task send_actuator_status to finish + await tasks["send_actuator_status"] + + first_call = fm_client.post_measurements.call_args_list[0][1] + assert first_call["sensor_id"] == frbc._thp_fill_rate_sensor_id + + second_call = fm_client.post_measurements.call_args_list[1][1] + assert second_call["sensor_id"] == frbc._fill_rate_sensor_id + + # Switch operation mode to Nestore + actuator_status["active_operation_mode_id"] = ( + frbc_system_description.actuators[0].operation_modes[1].id + ) # ID representing NEStore operation mode + + await cem.handle_message(actuator_status) + tasks = get_pending_tasks() + + # clear mock state because it contains previous such as + # the ones used to process the system description + fm_client.reset_mock() + + # wait for the task send_actuator_status to finish + await tasks["send_actuator_status"] + + first_call = fm_client.post_measurements.call_args_list[0][1] + assert first_call["sensor_id"] == frbc._nes_fill_rate_sensor_id + + second_call = fm_client.post_measurements.call_args_list[1][1] + assert second_call["sensor_id"] == frbc._fill_rate_sensor_id + + await cem.close() + + +@pytest.mark.asyncio +async def test_trigger_schedule(cem_in_frbc_control_type): + """Work in progress. + + # todo: add steps + + Steps + + 1. Check whether the task starts and stops + 2. Check call arguments + 3. Check queue for results mocking the results of FM client + + # todo consider splitting up test + S2 2 FM: converging system description to flex config + FM 2 S2: schedules to instructions + """ + cem, fm_client, frbc_system_description = await cem_in_frbc_control_type + # frbc = cem._control_types_handlers[cem.control_type] + + tasks = get_pending_tasks() + + assert tasks["trigger_schedule_task"]._state == "PENDING" + await cem.close() + + tasks = get_pending_tasks() + + assert tasks["trigger_schedule_task"]._state == "PENDING" + + await cem.close() + get_pending_tasks() diff --git a/tests/test_s2_coordinator.py b/tests/test_s2_coordinator.py deleted file mode 100644 index 9903e4ac..00000000 --- a/tests/test_s2_coordinator.py +++ /dev/null @@ -1,201 +0,0 @@ -from __future__ import annotations - -from datetime import datetime - -import pytest -from s2python.common import ( - Commodity, - CommodityQuantity, - ControlType, - Duration, - EnergyManagementRole, - Handshake, - NumberRange, - PowerRange, - ReceptionStatus, - ReceptionStatusValues, - ResourceManagerDetails, - Role, - RoleType, -) -from s2python.frbc import ( - FRBCActuatorDescription, - FRBCOperationMode, - FRBCOperationModeElement, - FRBCStorageDescription, - FRBCSystemDescription, -) - -from flexmeasures_client.s2.cem import CEM -from flexmeasures_client.s2.control_types.FRBC import FRBCTest -from flexmeasures_client.s2.utils import get_unique_id - - -@pytest.mark.asyncio -async def test_cem(): # TODO: move into different test functions - cem = CEM(fm_client=None) - frbc = FRBCTest() - - cem.register_control_type(frbc) - - ############# - # Handshake # - ############# - - handshake_message = Handshake( - message_id=get_unique_id(), - role=EnergyManagementRole.RM, - supported_protocol_versions=["0.1.0"], - ) - - await cem.handle_message(handshake_message) - - assert ( - cem._sending_queue.qsize() == 1 - ) # check that message is put to the outgoing queue - - response = await cem.get_message() - - assert ( - response["message_type"] == "HandshakeResponse" - ), "response message_type should be HandshakeResponse" - assert ( - response["selected_protocol_version"] == "0.1.0" - ), "CEM selected protocol version should be supported by the Resource Manager" - - ########################## - # ResourceManagerDetails # - ########################## - - resource_manager_details_message = ResourceManagerDetails( - message_id=get_unique_id(), - resource_id=get_unique_id(), - roles=[Role(role=RoleType.ENERGY_STORAGE, commodity=Commodity.ELECTRICITY)], - instruction_processing_delay=Duration(__root__=1.0), - available_control_types=[ - ControlType.FILL_RATE_BASED_CONTROL, - ControlType.NO_SELECTION, - ], - provides_forecast=True, - provides_power_measurement_types=[ - CommodityQuantity.ELECTRIC_POWER_3_PHASE_SYMMETRIC - ], - ) - - await cem.handle_message(resource_manager_details_message) - response = await cem.get_message() - - assert response["message_type"] == "ReceptionStatus" - assert response["status"] == "OK" - assert ( - cem._resource_manager_details == resource_manager_details_message - ), "CEM should store the resource_manager_details" - assert cem.control_type == ControlType.NO_SELECTION, ( - "CEM control type should switch to ControlType.NO_SELECTION," - "independently of the original type" - ) - - ######################### - # Activate control type # - ######################### - - await cem.activate_control_type(ControlType.FILL_RATE_BASED_CONTROL) - message = await cem.get_message() - - assert cem.control_type == ControlType.NO_SELECTION, ( - "the control type should still be NO_SELECTION (rather than FRBC)," - " because the RM has not yet confirmed FRBC activation" - ) - - response = ReceptionStatus( - subject_message_id=message.get("message_id"), status=ReceptionStatusValues.OK - ) - - await cem.handle_message(response) - - assert ( - cem.control_type == ControlType.FILL_RATE_BASED_CONTROL - ), "after a positive ResponseStatus, the status changes from NO_SELECTION to FRBC" - - ######## - # FRBC # - ######## - - operation_mode_element = FRBCOperationModeElement( - fill_level_range=NumberRange(start_of_range=0, end_of_range=1), - fill_rate=NumberRange(start_of_range=0, end_of_range=1), - power_ranges=[ - PowerRange( - start_of_range=10, - end_of_range=1000, - commodity_quantity=CommodityQuantity.ELECTRIC_POWER_3_PHASE_SYMMETRIC, - ) - ], - ) - - operation_mode = FRBCOperationMode( - id=get_unique_id(), - elements=[operation_mode_element], - abnormal_condition_only=False, - ) - - actuator = FRBCActuatorDescription( - id=get_unique_id(), - supported_commodities=[Commodity.ELECTRICITY], - operation_modes=[operation_mode], - transitions=[], - timers=[], - ) - - storage = FRBCStorageDescription( - provides_leakage_behaviour=False, - provides_fill_level_target_profile=False, - provides_usage_forecast=False, - fill_level_range=NumberRange(start_of_range=0, end_of_range=1), - ) - - system_description_message = FRBCSystemDescription( - message_id=get_unique_id(), - valid_from=datetime.now(), - actuators=[actuator], - storage=storage, - ) - - await cem.handle_message(system_description_message) - response = await cem.get_message() - - # checking that FRBC handler is being called - assert ( - cem._control_types_handlers[ - ControlType.FILL_RATE_BASED_CONTROL - ]._system_description_history[str(system_description_message.message_id)] - == system_description_message - ), ( - "the FRBC.SystemDescription message should be stored" - "in the frbc.system_description_history variable" - ) - - # change of control type is not performed in case that the RM answers - # with a negative response - await cem.activate_control_type(ControlType.NO_SELECTION) - response = await cem.get_message() - assert ( - cem._control_type == ControlType.FILL_RATE_BASED_CONTROL - ), "control type should not change, confirmation still pending" - - await cem.handle_message( - ReceptionStatus( - subject_message_id=response.get("message_id"), - status=ReceptionStatusValues.INVALID_CONTENT, - ) - ) - - assert ( - cem._control_type == ControlType.FILL_RATE_BASED_CONTROL - ), "control type should not change, confirmation state is not 'OK'" - assert ( - response.get("message_id") - not in cem._control_types_handlers[ - ControlType.FILL_RATE_BASED_CONTROL - ].success_callbacks - ), "success callback should be deleted" diff --git a/tests/test_s2_models.py b/tests/test_s2_models.py new file mode 100644 index 00000000..905094a6 --- /dev/null +++ b/tests/test_s2_models.py @@ -0,0 +1,33 @@ +from flexmeasures_client.s2.utils import get_unique_id +from flexmeasures_client.s2.wrapper import S2Wrapper +from datetime import datetime +import pytz + + +def test_simple_model(): + wrapped_message = { + "message": { + "message_id": get_unique_id(), + "resource_id": get_unique_id(), + "roles": [{"role": "ENERGY_STORAGE", "commodity": "ELECTRICITY"}], + "instruction_processing_delay": 1.0, + "available_control_types": ["FILL_RATE_BASED_CONTROL", "NO_SELECTION"], + "provides_forecast": True, + "provides_power_measurement_types": ["ELECTRIC.POWER.3_PHASE_SYMMETRIC"], + "message_type": "ResourceManagerDetails", + }, + "metadata": {"dt": "2023-01-01T00:00:00+00:00"}, + } + + S2Wrapper.validate(wrapped_message) + + wrapped_message_2 = { + "message": { + "message_id": get_unique_id(), + "message_type": "Handshake", + "role": "CEM", + }, + "metadata": {"dt": "2024-01-01T00:00:00+00:00"}, + } + + S2Wrapper.validate(wrapped_message_2) diff --git a/tests/test_s2_translations.py b/tests/test_s2_translations.py new file mode 100644 index 00000000..e634375e --- /dev/null +++ b/tests/test_s2_translations.py @@ -0,0 +1,78 @@ +import pytest +from s2python.frbc import FRBCUsageForecast + +from flexmeasures_client.s2.control_types.translations import ( + translate_usage_forecast_to_fm, +) +from flexmeasures_client.s2.utils import get_unique_id + + +@pytest.mark.parametrize( + "start, resolution, values", + [ + ("2024-01-01T00:00:00+01:00", "1h", [100, 100]), + ("2024-01-01T00:00:00+01:00", "15min", [100] * 4 * 2), + ("2024-01-01T00:30:00+01:00", "1h", [100, 100, 100]), + ], +) +def test_resampling_one_block(start, resolution, values): + message = { + "elements": [ + {"duration": 2 * 3600 * 1e3, "usage_rate_expected": 100}, + ], + "message_id": get_unique_id(), + "message_type": "FRBC.UsageForecast", + "start_time": start, + } + + usage_forecast = FRBCUsageForecast.from_dict(message) + + s = translate_usage_forecast_to_fm(usage_forecast, resolution=resolution) + assert all(abs(s.values - values) < 1e-5) + + +@pytest.mark.parametrize( + "start, resolution, values", + [ + ("2024-01-01T00:00:00+01:00", "1h", [100, 200, 200, 350, 450, 600]), + ("2024-01-01T00:45:00+01:00", "1h", [100, 200, 200, 350, 450, 600, 600]), + ( + "2024-01-01T00:00:00+01:00", + "30min", + [100] * 2 + [200] * 2 * 2 + [300] * 1 + [400] * 2 + [500] * 1 + [600] * 1, + ), + ( + "2024-01-01T00:00:00+01:00", + "15min", + [100] * 4 + [200] * 4 * 2 + [300] * 2 + [400] * 4 + [500] * 2 + [600] * 2, + ), + ], +) +def test_usage_forecast(start, resolution, values): + """ + - 100 for 1h + - 200 for 2h + - 300 for 30min + - 400 for 1h + - 500 for 30min + - 600 for 30min + + """ + message = { + "elements": [ + {"duration": 3600 * 1e3, "usage_rate_expected": 100}, + {"duration": 2 * 3600 * 1e3, "usage_rate_expected": 200}, + {"duration": 0.5 * 3600 * 1e3, "usage_rate_expected": 300}, + {"duration": 3600 * 1e3, "usage_rate_expected": 400}, + {"duration": 0.5 * 3600 * 1e3, "usage_rate_expected": 500}, + {"duration": 0.5 * 3600 * 1e3, "usage_rate_expected": 600}, + ], + "message_id": get_unique_id(), + "message_type": "FRBC.UsageForecast", + "start_time": start, + } + + usage_forecast = FRBCUsageForecast.from_dict(message) + + s = translate_usage_forecast_to_fm(usage_forecast, resolution=resolution) + assert all(abs(s.values - values) < 1e-5)