Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bec_lib/bec_lib/bl_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from bec_lib.device import DeviceBase, Signal
from bec_lib.devicemanager import DeviceManagerBase
from bec_lib.endpoints import MessageEndpoints
from bec_lib.messaging_hooks import MessagingEvent
from bec_lib.redis_connector import MessageObject, RedisConnector


Expand Down Expand Up @@ -204,6 +205,9 @@ def _handle_state_exception(self, exc: Exception) -> None:
compact_error_message=info,
)
self.connector.raise_alarm(severity=Alarms.WARNING, info=error_info)
self.connector.notify(
MessagingEvent.INVALID_STATE, f"Beamline state invalid: {self.config.name}"
)

out = messages.BeamlineStateMessage(name=self.config.name, status="unknown", label=info)
self._emit_state(out)
Expand Down
32 changes: 32 additions & 0 deletions bec_lib/bec_lib/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -1957,6 +1957,38 @@ def available_messaging_services():
message_op=MessageOp.STREAM,
)

@staticmethod
def notification(event_type: str):
"""
Endpoint for transient notification events that SciHub can route to
configured messaging services.

Args:
event_type (str): Notification event name such as ``new_scan``.

Returns:
EndpointInfo: Endpoint for notification events.
"""
endpoint = f"{EndpointType.INTERNAL.value}/messaging_services/notification/{event_type}"
return EndpointInfo(
endpoint=endpoint, message_type=messages.NotificationMessage, message_op=MessageOp.SEND
)

@staticmethod
def notification_config():
"""
Endpoint for persisted notification routing configuration.

Returns:
EndpointInfo: Endpoint for notification routing config.
"""
endpoint = f"{EndpointType.USER.value}/messaging_services/notification_config"
return EndpointInfo(
endpoint=endpoint,
message_type=messages.NotificationConfigMessage,
message_op=MessageOp.SET_PUBLISH,
)

@staticmethod
def message_service_ingest(deployment_name: str):
"""
Expand Down
33 changes: 33 additions & 0 deletions bec_lib/bec_lib/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -1728,6 +1728,39 @@ class MessagingConfig(BaseModel):
scilog: MessagingServiceScopeConfig


class NotificationServiceTarget(BaseModel):
service_name: Literal["signal", "teams", "scilog"]
scope: str | list[str] | None = None


class NotificationMessage(BECMessage):
"""
Message for notification events that should be routed through configured
messaging services.

Args:
message (str): Notification body text.
"""

msg_type: ClassVar[str] = "notification_message"
message: str


class NotificationConfigMessage(BECMessage):
"""
Routing configuration for notification events.

Args:
routes: Mapping of event name to messaging service targets.
"""

msg_type: ClassVar[str] = "notification_config_message"
routes: dict[
Literal["new_scan", "scan_completed", "alarm", "invalid_state"],
list[NotificationServiceTarget],
] = Field(default_factory=dict)


AvailableMessagingServices = Annotated[
Union[SignalServiceInfo, SciLogServiceInfo, TeamsServiceInfo],
Field(discriminator="service_type"),
Expand Down
101 changes: 101 additions & 0 deletions bec_lib/bec_lib/messaging_hooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from __future__ import annotations

import enum
from typing import TYPE_CHECKING, cast

from bec_lib import messages
from bec_lib.connector import MessageObject
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.messaging_services import (
SciLogMessagingService,
SignalMessagingService,
TeamsMessagingService,
)

if TYPE_CHECKING:
from bec_lib.redis_connector import RedisConnector

logger = bec_logger.logger


class MessagingEvent(str, enum.Enum):
"""
Enumeration of messaging events that can trigger configured hooks.
"""

SCAN = "new_scan"
SCAN_COMPLETED = "scan_completed"
ALARM = "alarm"
INVALID_STATE = "invalid_state"


class MessagingManager:
"""
Manage notification routing from internal events to concrete messaging
services.
"""

def __init__(self, connector: RedisConnector):
self.connector = connector
self.config: dict[MessagingEvent, list[messages.NotificationServiceTarget]] = {}
self.signal = SignalMessagingService(self.connector)
self.scilog = SciLogMessagingService(self.connector)
self.teams = TeamsMessagingService(self.connector)
self._service_by_name = {"signal": self.signal, "scilog": self.scilog, "teams": self.teams}

self.connector.register(
patterns=MessageEndpoints.notification("*"), cb=self._handle_notification
)
self.connector.register(
topics=MessageEndpoints.notification_config(), cb=self._handle_notification_config
)

config_msg = self.connector.get(MessageEndpoints.notification_config())
if config_msg is not None:
self.on_notification_config(config_msg)

def _handle_notification(self, msg_obj: MessageObject[messages.NotificationMessage], **_kwargs):
prefix = MessageEndpoints.notification("").endpoint
event_type_str = msg_obj.topic.removeprefix(prefix)
try:
event_type = MessagingEvent(event_type_str)
except ValueError:
logger.warning(f"Unknown notification event received on topic {msg_obj.topic}")
return
self.on_notification(event_type, cast(messages.NotificationMessage, msg_obj.value))

def _handle_notification_config(
self, msg_obj: MessageObject[messages.NotificationConfigMessage], **_kwargs
):
self.on_notification_config(cast(messages.NotificationConfigMessage, msg_obj.value))

def on_notification(
self, event_type: MessagingEvent, message: messages.NotificationMessage
) -> None:
routes = self.config.get(event_type, [])
for route in routes:
service = self._service_by_name.get(route.service_name)
if service is None:
logger.warning(f"Unknown messaging service: {route.service_name}")
continue
try:
service.new().add_text(message.message).send(scope=route.scope)
except RuntimeError as exc:
logger.warning(
f"Failed to send notification for {event_type.value} via {route.service_name}: {exc}"
)

def on_notification_config(self, message: messages.NotificationConfigMessage) -> None:
config: dict[MessagingEvent, list[messages.NotificationServiceTarget]] = {}
for event_name, targets in message.routes.items():
config[MessagingEvent(event_name)] = targets
self.config = config

def shutdown(self) -> None:
self.connector.unregister(
patterns=MessageEndpoints.notification("*"), cb=self._handle_notification
)
self.connector.unregister(
topics=MessageEndpoints.notification_config(), cb=self._handle_notification_config
)
18 changes: 18 additions & 0 deletions bec_lib/bec_lib/redis_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
DynamicMetricMessage,
ErrorInfo,
)
from bec_lib.messaging_hooks import MessagingEvent
from bec_lib.serialization import MsgpackSerialization

logger = bec_logger.logger
Expand Down Expand Up @@ -655,6 +656,23 @@ def raise_alarm(self, severity: Alarms, info: ErrorInfo, metadata: dict | None =
"""
alarm_msg = AlarmMessage(severity=severity, info=info, metadata=metadata or {})
self.set_and_publish(MessageEndpoints.alarm(), alarm_msg)
compact_message = info.compact_error_message or info.error_message or info.exception_type
self.notify(MessagingEvent.ALARM, compact_message)

def notify(self, event: MessagingEvent, message: str, pipe: Pipeline | None = None) -> None:
"""
Publish a notification event for downstream routing by SciHub.

Args:
event: Notification event type.
message: Notification body text.
pipe: Optional pipeline to enqueue the publish operation into.
"""
self.send(
MessageEndpoints.notification(event.value),
messages.NotificationMessage(message=message),
pipe=pipe,
)

def pipeline(self) -> redis.client.Pipeline:
"""Create a new pipeline"""
Expand Down
7 changes: 7 additions & 0 deletions bec_lib/bec_lib/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,13 @@ def send(self, topic, msg, pipe=None):
raise TypeError("Message must be a BECMessage")
return self.raw_send(topic, msg, pipe)

def notify(self, event, message: str, pipe=None):
return self.send(
MessageEndpoints.notification(event.value),
messages.NotificationMessage(message=message),
pipe=pipe,
)

def set_and_publish(self, topic, msg, pipe=None, expire: int = None):
if pipe:
pipe._pipe_buffer.append(("set_and_publish", (topic.endpoint, msg), {"expire": expire}))
Expand Down
22 changes: 22 additions & 0 deletions bec_lib/tests/test_beamline_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,28 @@ def test_update_device_state_publishes_when_state_changes(
assert out is not None
assert out[0]["data"].status == "valid"

def test_state_exception_publishes_invalid_state_notification(
self, connected_connector, dm_with_devices
):
state = bl_states.ShutterState(
name="shutter_open",
device="samx",
signal="samx",
redis_connector=connected_connector,
device_manager=dm_with_devices,
)

with (
mock.patch.object(connected_connector, "raise_alarm") as raise_alarm,
mock.patch.object(connected_connector, "notify") as notify,
):
state._handle_state_exception(RuntimeError("device state unavailable"))

raise_alarm.assert_called_once()
notify.assert_called_once_with(
bl_states.MessagingEvent.INVALID_STATE, "Beamline state invalid: shutter_open"
)


class TestConcreteStates:
def test_shutter_state_open_and_closed(self, connected_connector, dm_with_devices):
Expand Down
25 changes: 25 additions & 0 deletions bec_lib/tests/test_bec_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,31 @@ def test_ClientInfoMessage_raises():
)


def test_NotificationMessage():
msg = messages.NotificationMessage(message="Scan started")
res = MsgpackSerialization.dumps(msg)
res_loaded = MsgpackSerialization.loads(res)
assert res_loaded == msg


def test_NotificationConfigMessage():
msg = messages.NotificationConfigMessage(
routes={
"new_scan": [
messages.NotificationServiceTarget(service_name="scilog", scope="logbook")
],
"alarm": [
messages.NotificationServiceTarget(
service_name="signal", scope=["+41791234567", "+41797654321"]
)
],
}
)
res = MsgpackSerialization.dumps(msg)
res_loaded = MsgpackSerialization.loads(res)
assert res_loaded == msg


def test_DeviceRPCMessage():
msg = messages.DeviceRPCMessage(
device="samx", return_val=1, out="done", success=True, metadata={"RID": "1234"}
Expand Down
86 changes: 86 additions & 0 deletions bec_lib/tests/test_messaging_hooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints, MessageOp
from bec_lib.messaging_hooks import MessagingEvent, MessagingManager


def _available_services_message():
return messages.AvailableMessagingServicesMessage(
config=messages.MessagingConfig(
signal=messages.MessagingServiceScopeConfig(enabled=True, default=None),
teams=messages.MessagingServiceScopeConfig(enabled=False, default=None),
scilog=messages.MessagingServiceScopeConfig(enabled=True, default=None),
),
deployment_services=[
messages.SciLogServiceInfo(
id="scilog-default", scope="logbook", enabled=True, logbook_id="lb-1"
)
],
session_services=[],
)


def test_notification_endpoints():
event_endpoint = MessageEndpoints.notification("new_scan")
config_endpoint = MessageEndpoints.notification_config()

assert event_endpoint.endpoint == "internal/messaging_services/notification/new_scan"
assert event_endpoint.message_type is messages.NotificationMessage
assert event_endpoint.message_op == MessageOp.SEND

assert config_endpoint.endpoint == "user/messaging_services/notification_config"
assert config_endpoint.message_type is messages.NotificationConfigMessage
assert config_endpoint.message_op == MessageOp.SET_PUBLISH


def test_messaging_manager_loads_initial_config(connected_connector):
config_msg = messages.NotificationConfigMessage(
routes={
"new_scan": [messages.NotificationServiceTarget(service_name="scilog", scope="logbook")]
}
)
connected_connector.set_and_publish(MessageEndpoints.notification_config(), config_msg)

manager = MessagingManager(connected_connector)
try:
assert manager.config == {
MessagingEvent.SCAN: [
messages.NotificationServiceTarget(service_name="scilog", scope="logbook")
]
}
finally:
manager.shutdown()


def test_messaging_manager_routes_notifications_to_message_service_queue(connected_connector):
manager = MessagingManager(connected_connector)
try:
available_services = _available_services_message()
manager.scilog._on_new_scope_change_msg({"data": available_services})
manager.signal._on_new_scope_change_msg({"data": available_services})
manager.teams._on_new_scope_change_msg({"data": available_services})

manager.on_notification_config(
messages.NotificationConfigMessage(
routes={
"new_scan": [
messages.NotificationServiceTarget(service_name="scilog", scope="logbook")
]
}
)
)

manager.on_notification(
MessagingEvent.SCAN, messages.NotificationMessage(message="Scan started")
)

out = connected_connector.xread(MessageEndpoints.message_service_queue(), from_start=True)
assert len(out) == 1
sent_message = out[0]["data"]
assert sent_message.service_name == "scilog"
assert sent_message.scope == "logbook"
assert isinstance(sent_message.message[0], messages.MessagingServiceTextContent)
assert sent_message.message[0].content == "Scan started"
assert isinstance(sent_message.message[1], messages.MessagingServiceTagsContent)
assert sent_message.message[1].tags == ["bec"]
finally:
manager.shutdown()
Loading
Loading