Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
198acb7
139: Moving big chunks of code places.
lfse-slafleur Oct 28, 2025
7af8771
139: Finish up first draft before executing it.
lfse-slafleur Oct 31, 2025
69ebb06
139: Get async mostly to work, started on getting sync to work.
lfse-slafleur Oct 31, 2025
8578468
Merge branch 'main' into 139-s2connection-should-have-an-async-capabl…
lfse-slafleur Nov 27, 2025
e1c98b8
139: Add a bunch of stuff. Threading/task management is now clear for…
lfse-slafleur Mar 9, 2026
bfaaab0
139: Some fixes.
lfse-slafleur Mar 9, 2026
18b8919
139: Fix all linting issues.
lfse-slafleur Mar 9, 2026
ad677fc
139: Fix all linting and typing issues.
lfse-slafleur Mar 10, 2026
6b4fa58
Merge branch 'main' into 139-s2connection-should-have-an-async-capabl…
lfse-slafleur Mar 10, 2026
fb15159
139: Propagate asset details from ResourceManagerHandler to all under…
lfse-slafleur Mar 10, 2026
d1d2231
139: Fix missing return value in func sig.
lfse-slafleur Mar 10, 2026
2e43cb9
Merge branch 'main' into 139-s2connection-should-have-an-async-capabl…
lfse-slafleur Mar 10, 2026
fb96895
139: Add examples on how to perform another task after the connection…
lfse-slafleur Mar 18, 2026
9aff094
139: Add functionality to let ws_medium disconnect.
lfse-slafleur Mar 18, 2026
4831386
139: websockets import should be behind the try import.
lfse-slafleur Mar 18, 2026
a2ce80d
139: Fix typing and linting issues and add docs regarding the verify_…
lfse-slafleur Mar 18, 2026
6537af3
139: Document the send_and_await_reception_status functions for both …
lfse-slafleur Mar 18, 2026
5c0f1dd
139: Add DDBC control type handlers in class based approach.
lfse-slafleur Mar 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ ignore-paths=src/s2python/generated/
# avoid hangs.
jobs=1

disable=missing-class-docstring,missing-module-docstring,too-few-public-methods,missing-function-docstring,no-member,unsubscriptable-object,line-too-long
disable=missing-class-docstring,missing-module-docstring,too-few-public-methods,missing-function-docstring,no-member,unsubscriptable-object,line-too-long,duplicate-code
202 changes: 202 additions & 0 deletions examples/async_frbc_rm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import argparse
import asyncio
import logging
import sys
import uuid
import signal
import datetime

from s2python.connection.types import S2ConnectionEventsAndMessages, SendOkayRunAsync
from s2python.common import (
Duration,
Role,
RoleType,
Commodity,
Currency,
NumberRange,
PowerRange,
CommodityQuantity,
)
from s2python.frbc import (
FRBCInstruction,
FRBCSystemDescription,
FRBCActuatorDescription,
FRBCStorageDescription,
FRBCOperationMode,
FRBCOperationModeElement,
FRBCFillLevelTargetProfile,
FRBCFillLevelTargetProfileElement,
FRBCStorageStatus,
FRBCActuatorStatus,
)
from s2python.connection import AssetDetails
from s2python.connection.async_ import S2AsyncConnection, WebsocketClientMedium
from s2python.connection.async_.control_type.class_based import (
FRBCControlType,
NoControlControlType,
ResourceManagerHandler,
)

logger = logging.getLogger("s2python")
logger.addHandler(logging.StreamHandler(sys.stdout))
logger.setLevel(logging.DEBUG)


class MyFRBCControlType(FRBCControlType):
async def handle_instruction(
self,
connection: S2AsyncConnection,
msg: S2ConnectionEventsAndMessages,
send_okay: SendOkayRunAsync,
) -> None:
if not isinstance(msg, FRBCInstruction):
raise RuntimeError(
f"Expected an FRBCInstruction but received a message of type {type(msg)}."
)
print(f"I have received the message {msg} from {connection}")

async def activate(self, connection: S2AsyncConnection) -> None:
print("The control type FRBC is now activated.")

print("Time to send a FRBC SystemDescription")
actuator_id = uuid.uuid4()
operation_mode_id = uuid.uuid4()
await connection.send_msg_and_await_reception_status(
FRBCSystemDescription(
message_id=uuid.uuid4(),
valid_from=datetime.datetime.now(tz=datetime.timezone.utc),
actuators=[
FRBCActuatorDescription(
id=actuator_id,
operation_modes=[
FRBCOperationMode(
id=operation_mode_id,
elements=[
FRBCOperationModeElement(
fill_level_range=NumberRange(
start_of_range=0.0, end_of_range=100.0
),
fill_rate=NumberRange(
start_of_range=-5.0, end_of_range=5.0
),
power_ranges=[
PowerRange(
start_of_range=-200.0,
end_of_range=200.0,
commodity_quantity=CommodityQuantity.ELECTRIC_POWER_L1,
)
],
)
],
diagnostic_label="Load & unload battery",
abnormal_condition_only=False,
)
],
transitions=[],
timers=[],
supported_commodities=[Commodity.ELECTRICITY],
)
],
storage=FRBCStorageDescription(
fill_level_range=NumberRange(start_of_range=0.0, end_of_range=100.0),
fill_level_label="%",
diagnostic_label="Imaginary battery",
provides_fill_level_target_profile=True,
provides_leakage_behaviour=False,
provides_usage_forecast=False,
),
)
)
print("Also send the target profile")

await connection.send_msg_and_await_reception_status(
FRBCFillLevelTargetProfile(
message_id=uuid.uuid4(),
start_time=datetime.datetime.now(tz=datetime.timezone.utc),
elements=[
FRBCFillLevelTargetProfileElement(
duration=Duration.from_milliseconds(30_000),
fill_level_range=NumberRange(start_of_range=20.0, end_of_range=30.0),
),
FRBCFillLevelTargetProfileElement(
duration=Duration.from_milliseconds(300_000),
fill_level_range=NumberRange(start_of_range=40.0, end_of_range=50.0),
),
],
)
)

print("Also send the storage status.")
await connection.send_msg_and_await_reception_status(
FRBCStorageStatus(message_id=uuid.uuid4(), present_fill_level=10.0)
)

print("Also send the actuator status.")
await connection.send_msg_and_await_reception_status(
FRBCActuatorStatus(
message_id=uuid.uuid4(),
actuator_id=actuator_id,
active_operation_mode_id=operation_mode_id,
operation_mode_factor=0.5,
)
)

async def deactivate(self, connection: S2AsyncConnection) -> None:
print("The control type FRBC is now deactivated.")


class MyNoControlControlType(NoControlControlType):
async def activate(self, connection: S2AsyncConnection) -> None:
print("The control type NoControl is now activated.")

async def deactivate(self, connection: S2AsyncConnection) -> None:
print("The control type NoControl is now deactivated.")


async def start_s2_session(url, rm_id: uuid.UUID):
# Configure a resource manager
rm_handler = ResourceManagerHandler(
asset_details=AssetDetails(
resource_id=rm_id,
name="Some asset",
instruction_processing_delay=Duration.from_milliseconds(20),
roles=[Role(role=RoleType.ENERGY_CONSUMER, commodity=Commodity.ELECTRICITY)],
currency=Currency.EUR,
provides_forecast=False,
provides_power_measurements=[CommodityQuantity.ELECTRIC_POWER_L1],
),
control_types=[MyFRBCControlType(), MyNoControlControlType()],
)

# Setup the underlying websocket connection
ws_medium = WebsocketClientMedium(url=url, verify_certificate=False)
await ws_medium.connect()

# Configure the S2 connection on top of the websocket connection
s2_conn = S2AsyncConnection(medium=ws_medium)
rm_handler.register_handlers(s2_conn)

eventloop = asyncio.get_running_loop()

async def stop():
print("Received signal. Will stop S2 connection.")
await s2_conn.stop()

eventloop.add_signal_handler(signal.SIGINT, lambda: eventloop.create_task(stop()))
eventloop.add_signal_handler(signal.SIGTERM, lambda: eventloop.create_task(stop()))
await s2_conn.run()
Comment thread
lfse-slafleur marked this conversation as resolved.
Outdated


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="A simple S2 reseource manager example.")
RM_ID = uuid.uuid4()
parser.add_argument(
"--endpoint",
type=str,
required=False,
help=f"WebSocket endpoint uri for the server (CEM) e.g. ws://localhost:8003/ws/{RM_ID}",
default=f"ws://localhost:8003/ws/{RM_ID}",
)
args = parser.parse_args()

asyncio.run(start_s2_session(args.endpoint, RM_ID))
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import argparse
from functools import partial
import logging
import sys
import uuid
import signal
import datetime
from typing import Callable
from typing import Callable, Optional

from s2python.common import (
EnergyManagementRole,
Duration,
Role,
RoleType,
Expand All @@ -18,6 +16,7 @@
PowerRange,
CommodityQuantity,
)
from s2python.connection.types import S2ConnectionEventsAndMessages
from s2python.frbc import (
FRBCInstruction,
FRBCSystemDescription,
Expand All @@ -30,9 +29,9 @@
FRBCStorageStatus,
FRBCActuatorStatus,
)
from s2python.s2_connection import S2Connection, AssetDetails
from s2python.s2_control_type import FRBCControlType, NoControlControlType
from s2python.message import S2Message
from s2python.connection import AssetDetails, BlockingWebsocketClientRM
from s2python.connection.sync import S2SyncConnection
from s2python.connection.sync.control_type.class_based import FRBCControlType, NoControlControlType

logger = logging.getLogger("s2python")
logger.addHandler(logging.StreamHandler(sys.stdout))
Expand All @@ -41,21 +40,23 @@

class MyFRBCControlType(FRBCControlType):
def handle_instruction(
self, conn: S2Connection, msg: S2Message, send_okay: Callable[[], None]
self, connection: S2SyncConnection, msg: S2ConnectionEventsAndMessages, send_okay: Optional[Callable[[], None]]
) -> None:
assert send_okay
if not isinstance(msg, FRBCInstruction):
raise RuntimeError(
f"Expected an FRBCInstruction but received a message of type {type(msg)}."
)
print(f"I have received the message {msg} from {conn}")
print(f"I have received the message {msg} from {connection}")
send_okay()

def activate(self, conn: S2Connection) -> None:
def activate(self, connection: S2SyncConnection) -> None:
print("The control type FRBC is now activated.")

print("Time to send a FRBC SystemDescription")
actuator_id = uuid.uuid4()
operation_mode_id = uuid.uuid4()
conn.send_msg_and_await_reception_status_sync(
connection.send_msg_and_await_reception_status(
FRBCSystemDescription(
message_id=uuid.uuid4(),
valid_from=datetime.datetime.now(tz=datetime.timezone.utc),
Expand Down Expand Up @@ -103,7 +104,7 @@ def activate(self, conn: S2Connection) -> None:
)
print("Also send the target profile")

conn.send_msg_and_await_reception_status_sync(
connection.send_msg_and_await_reception_status(
FRBCFillLevelTargetProfile(
message_id=uuid.uuid4(),
start_time=datetime.datetime.now(tz=datetime.timezone.utc),
Expand All @@ -121,12 +122,12 @@ def activate(self, conn: S2Connection) -> None:
)

print("Also send the storage status.")
conn.send_msg_and_await_reception_status_sync(
connection.send_msg_and_await_reception_status(
FRBCStorageStatus(message_id=uuid.uuid4(), present_fill_level=10.0)
)

print("Also send the actuator status.")
conn.send_msg_and_await_reception_status_sync(
connection.send_msg_and_await_reception_status(
FRBCActuatorStatus(
message_id=uuid.uuid4(),
actuator_id=actuator_id,
Expand All @@ -135,54 +136,56 @@ def activate(self, conn: S2Connection) -> None:
)
)

def deactivate(self, conn: S2Connection) -> None:
def deactivate(self, connection: S2SyncConnection) -> None:
print("The control type FRBC is now deactivated.")


class MyNoControlControlType(NoControlControlType):
def activate(self, conn: S2Connection) -> None:
def activate(self, connection: S2SyncConnection) -> None:
print("The control type NoControl is now activated.")

def deactivate(self, conn: S2Connection) -> None:
def deactivate(self, connection: S2SyncConnection) -> None:
print("The control type NoControl is now deactivated.")


def stop(s2_connection, signal_num, _current_stack_frame):
print(f"Received signal {signal_num}. Will stop S2 connection.")
s2_connection.stop()


def start_s2_session(url, client_node_id=str(uuid.uuid4())):
s2_conn = S2Connection(
url=url,
role=EnergyManagementRole.RM,
control_types=[MyFRBCControlType(), MyNoControlControlType()],
asset_details=AssetDetails(
resource_id=client_node_id,
def start_s2_session(url, rm_id: uuid.UUID):
# Configure a resource manager
asset_details = AssetDetails(
resource_id=rm_id,
name="Some asset",
instruction_processing_delay=Duration.from_milliseconds(20),
roles=[Role(role=RoleType.ENERGY_CONSUMER, commodity=Commodity.ELECTRICITY)],
currency=Currency.EUR,
provides_forecast=False,
provides_power_measurements=[CommodityQuantity.ELECTRIC_POWER_L1],
),
reconnect=True,
verify_certificate=False,
)
signal.signal(signal.SIGINT, partial(stop, s2_conn))
signal.signal(signal.SIGTERM, partial(stop, s2_conn))
)

# Configure the S2 connection on top of the websocket connection
s2_conn = BlockingWebsocketClientRM(url=url, asset_details=asset_details, control_types=[MyFRBCControlType(), MyNoControlControlType()])

def stop(signal_num, _current_stack_frame):
print(f"Received signal {signal_num}. Will stop S2 connection.")
s2_conn.stop()

signal.signal(signal.SIGINT, stop)
signal.signal(signal.SIGTERM, stop)

s2_conn.start_as_rm()
print('Starting s2 connection')
s2_conn.start()
print('S2 connection stopped')
s2_conn.wait_till_done()


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="A simple S2 reseource manager example.")
RM_ID = uuid.uuid4()
parser.add_argument(
"endpoint",
"--endpoint",
type=str,
help="WebSocket endpoint uri for the server (CEM) e.g. "
"ws://localhost:8080/backend/rm/s2python-frbc/cem/dummy_model/ws",
required=False,
help=f"WebSocket endpoint uri for the server (CEM) e.g. ws://localhost:8003/ws/{RM_ID}",
default=f"ws://localhost:8003/ws/{RM_ID}",
)
args = parser.parse_args()

start_s2_session(args.endpoint)
start_s2_session(args.endpoint, RM_ID)
Loading