Skip to content

Commit 9ca1409

Browse files
139: Split up S2Connection into a sync and async interface as well as decoupling the underlying medium e.g. websockets, mqtt and d-bus interfaces (#144)
* 139: Moving big chunks of code places. * 139: Finish up first draft before executing it. * 139: Get async mostly to work, started on getting sync to work. * 139: Add a bunch of stuff. Threading/task management is now clear for sync and async S2 connections and quickstart for BlockingWebsocketRM is created. * 139: Some fixes. * 139: Fix all linting issues. * 139: Fix all linting and typing issues. * 139: Propagate asset details from ResourceManagerHandler to all underlying control types. * 139: Fix missing return value in func sig. * 139: Add examples on how to perform another task after the connection is live. * 139: Add functionality to let ws_medium disconnect. * 139: websockets import should be behind the try import. * 139: Fix typing and linting issues and add docs regarding the verify_certificate and how it prevents security. * 139: Document the send_and_await_reception_status functions for both sync and async. * 139: Add DDBC control type handlers in class based approach.
1 parent 2652c8c commit 9ca1409

27 files changed

+2054
-729
lines changed

.pylintrc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,4 @@ ignore-paths=src/s2python/generated/
1010
# avoid hangs.
1111
jobs=1
1212

13-
disable=missing-class-docstring,missing-module-docstring,too-few-public-methods,missing-function-docstring,no-member,unsubscriptable-object,line-too-long
13+
disable=missing-class-docstring,missing-module-docstring,too-few-public-methods,missing-function-docstring,no-member,unsubscriptable-object,line-too-long,duplicate-code

examples/async_frbc_rm.py

Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
import argparse
2+
import asyncio
3+
import logging
4+
import random
5+
import sys
6+
import uuid
7+
import signal
8+
import datetime
9+
from typing import Optional
10+
11+
from s2python.connection.types import S2ConnectionEventsAndMessages, SendOkayRunAsync
12+
from s2python.common import (
13+
Duration,
14+
Role,
15+
RoleType,
16+
Commodity,
17+
Currency,
18+
NumberRange,
19+
PowerRange,
20+
CommodityQuantity,
21+
PowerValue,
22+
PowerMeasurement,
23+
)
24+
from s2python.frbc import (
25+
FRBCInstruction,
26+
FRBCSystemDescription,
27+
FRBCActuatorDescription,
28+
FRBCStorageDescription,
29+
FRBCOperationMode,
30+
FRBCOperationModeElement,
31+
FRBCFillLevelTargetProfile,
32+
FRBCFillLevelTargetProfileElement,
33+
FRBCStorageStatus,
34+
FRBCActuatorStatus,
35+
)
36+
from s2python.connection import AssetDetails
37+
from s2python.connection.async_ import S2AsyncConnection, WebsocketClientMedium
38+
from s2python.connection.async_.control_type.class_based import (
39+
FRBCControlType,
40+
NoControlControlType,
41+
ResourceManagerHandler,
42+
)
43+
44+
logger = logging.getLogger("s2python")
45+
logger.addHandler(logging.StreamHandler(sys.stdout))
46+
logger.setLevel(logging.DEBUG)
47+
48+
49+
class SendPowerMeasurementPeriodically:
50+
_connection: S2AsyncConnection
51+
_period: datetime.timedelta
52+
_task: Optional[asyncio.Task]
53+
54+
def __init__(self, connection: S2AsyncConnection, period: datetime.timedelta):
55+
self._connection = connection
56+
self._period = period
57+
self._task = None
58+
59+
async def _send_power_measurement(self):
60+
while True:
61+
# Grab the value from an API or anywhere else. Using a random value in this example.
62+
value = random.uniform(10.0, 100.0)
63+
print(f"Sending a power measurement message with value={value}")
64+
await self._connection.send_msg_and_await_reception_status(
65+
PowerMeasurement(
66+
message_id=uuid.uuid4(),
67+
values=[
68+
PowerValue(
69+
value=value,
70+
commodity_quantity=CommodityQuantity.ELECTRIC_POWER_3_PHASE_SYMMETRIC,
71+
)
72+
],
73+
measurement_timestamp=datetime.datetime.now(tz=datetime.timezone.utc),
74+
)
75+
)
76+
print("Sent a power measurement message.")
77+
await asyncio.sleep(self._period.total_seconds())
78+
79+
async def start(self):
80+
if self._task is not None:
81+
raise RuntimeError("Already started")
82+
print("Start sending power measurements periodically")
83+
self._task = asyncio.create_task(self._send_power_measurement())
84+
85+
async def stop(self):
86+
if self._task is None:
87+
raise RuntimeError("Not started yet")
88+
self._task.cancel()
89+
try:
90+
await self._task
91+
except asyncio.CancelledError:
92+
pass
93+
self._task = None
94+
print("Stopped sending power measurements periodically")
95+
96+
97+
class MyFRBCControlType(FRBCControlType):
98+
_power_measurement_task: Optional[SendPowerMeasurementPeriodically]
99+
100+
def __init__(self):
101+
super().__init__()
102+
self._power_measurement_task = None
103+
104+
async def handle_instruction(
105+
self,
106+
connection: S2AsyncConnection,
107+
msg: S2ConnectionEventsAndMessages,
108+
send_okay: SendOkayRunAsync,
109+
) -> None:
110+
if not isinstance(msg, FRBCInstruction):
111+
raise RuntimeError(
112+
f"Expected an FRBCInstruction but received a message of type {type(msg)}."
113+
)
114+
print(f"I have received the message {msg} from {connection}")
115+
116+
async def activate(self, connection: S2AsyncConnection) -> None:
117+
print("The control type FRBC is now activated.")
118+
119+
print("Time to send a FRBC SystemDescription")
120+
actuator_id = uuid.uuid4()
121+
operation_mode_id = uuid.uuid4()
122+
await connection.send_msg_and_await_reception_status(
123+
FRBCSystemDescription(
124+
message_id=uuid.uuid4(),
125+
valid_from=datetime.datetime.now(tz=datetime.timezone.utc),
126+
actuators=[
127+
FRBCActuatorDescription(
128+
id=actuator_id,
129+
operation_modes=[
130+
FRBCOperationMode(
131+
id=operation_mode_id,
132+
elements=[
133+
FRBCOperationModeElement(
134+
fill_level_range=NumberRange(
135+
start_of_range=0.0, end_of_range=100.0
136+
),
137+
fill_rate=NumberRange(
138+
start_of_range=-5.0, end_of_range=5.0
139+
),
140+
power_ranges=[
141+
PowerRange(
142+
start_of_range=-200.0,
143+
end_of_range=200.0,
144+
commodity_quantity=CommodityQuantity.ELECTRIC_POWER_3_PHASE_SYMMETRIC,
145+
)
146+
],
147+
)
148+
],
149+
diagnostic_label="Load & unload battery",
150+
abnormal_condition_only=False,
151+
)
152+
],
153+
transitions=[],
154+
timers=[],
155+
supported_commodities=[Commodity.ELECTRICITY],
156+
)
157+
],
158+
storage=FRBCStorageDescription(
159+
fill_level_range=NumberRange(start_of_range=0.0, end_of_range=100.0),
160+
fill_level_label="%",
161+
diagnostic_label="Imaginary battery",
162+
provides_fill_level_target_profile=True,
163+
provides_leakage_behaviour=False,
164+
provides_usage_forecast=False,
165+
),
166+
)
167+
)
168+
print("Also send the target profile")
169+
170+
await connection.send_msg_and_await_reception_status(
171+
FRBCFillLevelTargetProfile(
172+
message_id=uuid.uuid4(),
173+
start_time=datetime.datetime.now(tz=datetime.timezone.utc),
174+
elements=[
175+
FRBCFillLevelTargetProfileElement(
176+
duration=Duration.from_milliseconds(30_000),
177+
fill_level_range=NumberRange(start_of_range=20.0, end_of_range=30.0),
178+
),
179+
FRBCFillLevelTargetProfileElement(
180+
duration=Duration.from_milliseconds(300_000),
181+
fill_level_range=NumberRange(start_of_range=40.0, end_of_range=50.0),
182+
),
183+
],
184+
)
185+
)
186+
187+
print("Also send the storage status.")
188+
await connection.send_msg_and_await_reception_status(
189+
FRBCStorageStatus(message_id=uuid.uuid4(), present_fill_level=10.0)
190+
)
191+
192+
print("Also send the actuator status.")
193+
await connection.send_msg_and_await_reception_status(
194+
FRBCActuatorStatus(
195+
message_id=uuid.uuid4(),
196+
actuator_id=actuator_id,
197+
active_operation_mode_id=operation_mode_id,
198+
operation_mode_factor=0.5,
199+
)
200+
)
201+
202+
self._power_measurement_task = SendPowerMeasurementPeriodically(
203+
connection, datetime.timedelta(seconds=3)
204+
)
205+
await self._power_measurement_task.start()
206+
207+
async def deactivate(self, connection: S2AsyncConnection) -> None:
208+
print("The control type FRBC is now deactivated.")
209+
if self._power_measurement_task is not None:
210+
await self._power_measurement_task.stop()
211+
self._power_measurement_task = None
212+
213+
214+
class MyNoControlControlType(NoControlControlType):
215+
async def activate(self, connection: S2AsyncConnection) -> None:
216+
print("The control type NoControl is now activated.")
217+
218+
async def deactivate(self, connection: S2AsyncConnection) -> None:
219+
print("The control type NoControl is now deactivated.")
220+
221+
222+
async def start_s2_session(url, rm_id: uuid.UUID):
223+
# Configure a resource manager
224+
rm_handler = ResourceManagerHandler(
225+
asset_details=AssetDetails(
226+
resource_id=rm_id,
227+
name="Some asset",
228+
instruction_processing_delay=Duration.from_milliseconds(20),
229+
roles=[Role(role=RoleType.ENERGY_CONSUMER, commodity=Commodity.ELECTRICITY)],
230+
currency=Currency.EUR,
231+
provides_forecast=False,
232+
provides_power_measurements=[CommodityQuantity.ELECTRIC_POWER_3_PHASE_SYMMETRIC],
233+
),
234+
control_types=[MyFRBCControlType(), MyNoControlControlType()],
235+
)
236+
237+
# Setup the underlying websocket connection
238+
async with WebsocketClientMedium(url=url, verify_certificate=False) as ws_medium:
239+
# Configure the S2 connection on top of the websocket connection
240+
s2_conn = S2AsyncConnection(medium=ws_medium)
241+
rm_handler.register_handlers(s2_conn)
242+
243+
eventloop = asyncio.get_running_loop()
244+
245+
async def stop():
246+
print("Received signal. Will stop S2 connection.")
247+
await s2_conn.stop()
248+
249+
eventloop.add_signal_handler(signal.SIGINT, lambda: eventloop.create_task(stop()))
250+
eventloop.add_signal_handler(signal.SIGTERM, lambda: eventloop.create_task(stop()))
251+
await s2_conn.run()
252+
253+
254+
if __name__ == "__main__":
255+
parser = argparse.ArgumentParser(description="A simple S2 reseource manager example.")
256+
RM_ID = uuid.uuid4()
257+
parser.add_argument(
258+
"--endpoint",
259+
type=str,
260+
required=False,
261+
help=f"WebSocket endpoint uri for the server (CEM) e.g. ws://localhost:8003/ws/{RM_ID}",
262+
default=f"ws://localhost:8003/ws/{RM_ID}",
263+
)
264+
args = parser.parse_args()
265+
266+
asyncio.run(start_s2_session(args.endpoint, RM_ID))

0 commit comments

Comments
 (0)