Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions bec_lib/bec_lib/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,86 @@ class MessageEndpoints:
Class for message endpoints.
"""

@staticmethod
def shared_memory_info():
"""
Endpoint for shared memory information. This endpoint is used to publish the shared memory information using
a messages.SharedMemAllocationInfo message.

Returns:
EndpointInfo: Endpoint for shared memory information.
"""
endpoint = f"{EndpointType.INFO.value}/shared_memory/info/"
return EndpointInfo(
endpoint=endpoint,
message_type=messages.SharedMemAllocationInfo,
message_op=MessageOp.SET_PUBLISH,
)

@staticmethod
def shared_memory_allocate():
"""
Endpoint for shared memory allocation. This endpoint is used to request the allocation of a shared memory object using
a messages.SharedMemAllocationRequest message.

Returns:
EndpointInfo: Endpoint for shared memory allocation.
"""
endpoint = f"{EndpointType.INFO.value}/shared_memory/allocate"
return EndpointInfo(
endpoint=endpoint,
message_type=messages.SharedMemAllocationRequest,
message_op=MessageOp.STREAM,
)

@staticmethod
def shared_memory_deallocate():
"""
Endpoint for shared memory deallocation. This endpoint is used to request the deallocation of a shared memory object using
a messages.SharedMemDeallocationRequest message.

Returns:
EndpointInfo: Endpoint for shared memory deallocation.
"""
endpoint = f"{EndpointType.INFO.value}/shared_memory/deallocate"
return EndpointInfo(
endpoint=endpoint,
message_type=messages.SharedMemDeallocationRequest,
message_op=MessageOp.STREAM,
)

@staticmethod
def shared_memory_slot_written():
"""
Endpoint for writer-complete shared memory events. This endpoint is used when a writer has finished writing
one shared memory slot.

Returns:
EndpointInfo: Endpoint for shared memory slot written events.
"""
endpoint = f"{EndpointType.INFO.value}/shared_memory/slot_written"
return EndpointInfo(
endpoint=endpoint,
message_type=messages.SharedMemSlotWritten,
message_op=MessageOp.STREAM,
)

@staticmethod
def shared_memory_slot_processed():
"""
Endpoint for reader-result shared memory events. This endpoint is used when a reader has finished processing
one shared memory slot.

Returns:
EndpointInfo: Endpoint for shared memory slot processed events.
"""
endpoint = f"{EndpointType.INFO.value}/shared_memory/slot_processed"
return EndpointInfo(
endpoint=endpoint,
message_type=messages.SharedMemSlotProcessed,
message_op=MessageOp.STREAM,
)

# devices feedback
@staticmethod
def device_status(device: str):
Expand Down
57 changes: 57 additions & 0 deletions bec_lib/bec_lib/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

from bec_lib.metadata_schema import get_metadata_schema_for_scan

# TODO remove bec_server depencency..
from bec_server.shared_memory.models import PayloadDescriptor, SharedMemInfo

Comment on lines +20 to +22

class ProcedureWorkerStatus(Enum):
RUNNING = auto()
Expand Down Expand Up @@ -94,6 +97,60 @@ def __hash__(self) -> int:
return self.model_dump_json().__hash__()


class SharedMemAllocationInfo(BECMessage):
"""
This message is published by the shared memory manager and contains a list of all currently allocated shared memory objects.
Once shared memory objects are created or destroyed, this message will publish the updated list of shared memory objects.
"""

msg_type: ClassVar[str] = "shared_mem_allocation_info"

# Consider structure with dict[str, SharedMemInfo]. signal dotted name as key, which allows to identify this directly
# Alternatively, dict[str, dict[str, SharedMemInfo]] with device name as key, and then signal name as nested key
info: dict[str, dict[str, SharedMemInfo]]


class SharedMemAllocationRequest(BECMessage):
"""Message to request information about a shared memory object."""

msg_type: ClassVar[str] = "shared_mem_allocation_request"

client_id: str
slots: int
payload_desc: PayloadDescriptor
signal: str | None = None


class SharedMemDeallocationRequest(BECMessage):
"""Message to request deallocation of a shared memory object."""

msg_type: ClassVar[str] = "shared_mem_deallocation_request"

client_id: str
signal: str | None = None


class SharedMemSlotWritten(BECMessage):
"""Event emitted after a writer finished writing one shared-memory slot."""

msg_type: ClassVar[str] = "shared_mem_slot_written"

client_id: str
signal: str
slot_index: int


class SharedMemSlotProcessed(BECMessage):
"""Event emitted after a reader finished processing one shared-memory slot."""

msg_type: ClassVar[str] = "shared_mem_slot_processed"

client_id: str
signal: str
slot_index: int
result: dict[str, Any]


class BundleMessage(BECMessage):
"""Message type to send a bundle of BECMessages.

Expand Down
71 changes: 71 additions & 0 deletions bec_lib/tests/test_bec_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@
import pytest

from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints, MessageOp
from bec_lib.serialization import MsgpackSerialization
from bec_server.shared_memory.models import (
DTypeDescriptor,
PayloadDescriptor,
RingBufferDescriptor,
SharedMemInfo,
)
Comment on lines +8 to +15


@pytest.mark.parametrize("version", [1.0, 1.1, 1.2, None])
Expand Down Expand Up @@ -704,3 +711,67 @@ def test_feedback_message():
assert res_loaded == msg
assert res_loaded.username == getpass.getuser()
assert res_loaded.versions == messages.ServiceVersions._get_version_numbers()


def test_shared_memory_allocation_messages_round_trip():
payload = PayloadDescriptor(
nbytes=16, shape=(4,), dtype=DTypeDescriptor(kind="float", itemsize=4, byte_order="little")
)
descriptor = RingBufferDescriptor(
name="bec_psm_abcdef",
reader_count_name="bec_psm_abcdef_cnt",
data_lock_ids=("bec_psm_abcdef_d_0",),
reader_gate_ids=("bec_psm_abcdef_g_0",),
reader_count_lock_ids=("bec_psm_abcdef_c_0",),
slots=1,
payload=payload,
)
info = SharedMemInfo(client_id="client", buffer_desc=descriptor, signal="detector.data")

request = messages.SharedMemAllocationRequest(
client_id="client", slots=1, payload_desc=payload, signal="detector.data"
)
allocation_info = messages.SharedMemAllocationInfo(info={"client": {"detector.data": info}})
deallocation = messages.SharedMemDeallocationRequest(client_id="client", signal="detector.data")

for msg in (request, allocation_info, deallocation):
assert MsgpackSerialization.loads(MsgpackSerialization.dumps(msg)) == msg


def test_shared_memory_endpoints_match_message_contracts():
assert MessageEndpoints.shared_memory_info().message_type is messages.SharedMemAllocationInfo
assert MessageEndpoints.shared_memory_info().message_op is MessageOp.SET_PUBLISH
assert (
MessageEndpoints.shared_memory_allocate().message_type
is messages.SharedMemAllocationRequest
)
assert MessageEndpoints.shared_memory_allocate().message_op is MessageOp.STREAM
assert (
MessageEndpoints.shared_memory_deallocate().message_type
is messages.SharedMemDeallocationRequest
)
assert MessageEndpoints.shared_memory_deallocate().message_op is MessageOp.STREAM


def test_shared_memory_slot_event_messages_round_trip():
written = messages.SharedMemSlotWritten(
client_id="client", signal="detector.data", slot_index=1
)
processed = messages.SharedMemSlotProcessed(
client_id="client", signal="detector.data", slot_index=1, result={"sum": 10.0}
)

for msg in (written, processed):
assert MsgpackSerialization.loads(MsgpackSerialization.dumps(msg)) == msg


def test_shared_memory_slot_event_endpoints_match_message_contracts():
assert (
MessageEndpoints.shared_memory_slot_written().message_type is messages.SharedMemSlotWritten
)
assert MessageEndpoints.shared_memory_slot_written().message_op is MessageOp.STREAM
assert (
MessageEndpoints.shared_memory_slot_processed().message_type
is messages.SharedMemSlotProcessed
)
assert MessageEndpoints.shared_memory_slot_processed().message_op is MessageOp.STREAM
61 changes: 61 additions & 0 deletions bec_server/bec_server/shared_memory/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Shared Memory Ring Buffer

The shared-memory ring buffer keeps payload storage and control-plane policy separate.

## Memory Layout

The payload shared-memory object contains only slot bytes:

```text
[ slot 0 payload ][ slot 1 payload ] ... [ slot N payload ]
```

The payload shape, dtype, slot count, and synchronization resource names are distributed through
`RingBufferDescriptor`. This keeps attachment explicit and avoids a mutable metadata header in the
payload memory.

Reader counts live in a second, small shared-memory object:

```text
[ reader_count[0] ][ reader_count[1] ] ... [ reader_count[N] ]
```

The counter memory stores only synchronization state. It does not store write position, slot
availability, processing state, or scheduling policy.

## Locking

Each slot has one logical readers/writer lock built from three named POSIX semaphores:

- `data_lock`: held by a writer exclusively, or collectively by active readers.
- `reader_gate`: lets a waiting writer block new readers from entering the slot.
- `reader_count_lock`: protects updates to `reader_count[index]`.

Readers briefly pass through the gate, increment the shared counter, copy the payload, and decrement
the counter. The first reader acquires the data lock, and the last reader releases it.

Writers acquire the gate first, then the data lock. This allows existing readers to finish, prevents
new readers from entering while the writer waits, and guarantees that no reader observes a partial
write.

## Ownership

`RingBuffer` owns the operating-system resources. It creates and unlinks the payload memory, reader
counter memory, and all named semaphores.

`RingBufferView` only attaches to existing resources. It closes local handles during shutdown and
never unlinks resources.

## Write Position

The ring buffer assumes one writer service per buffer. The writer handle keeps a local circular
cursor and returns the written slot index from `write_data(...)`. Shared memory does not contain a
global write cursor.

Slot reuse, FIFO/LIFO ordering, release decisions, and processing results belong to the broker/event
control layer rather than the shared-memory implementation.

## Timeout Behavior

On macOS, positive semaphore timeouts are not reliable for this code path. Use `timeout=0` for a
non-blocking acquire or `timeout=None` to wait indefinitely.
Empty file.
Empty file.
36 changes: 36 additions & 0 deletions bec_server/bec_server/shared_memory/cli/launch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Description: Launch the shared memory manager server.
# This script is the entry point for the Shared Memory Manager Server. It is called either
# by the bec-shared-mem-manager entry point or directly from the command line.
import threading

from bec_lib.bec_service import parse_cmdline_args
from bec_lib.logger import bec_logger
from bec_lib.redis_connector import RedisConnector
from bec_server.shared_memory.manager import SharedMemoryManager

logger = bec_logger.logger
bec_logger.level = bec_logger.LOGLEVEL.INFO


def main():
"""
Launch the shared memory manager server.
"""
_, _, config = parse_cmdline_args()

bec_server = SharedMemoryManager(config=config, connector_cls=RedisConnector)
bec_server.start()

try:
event = threading.Event()
logger.success(
f"Started Shared Memory Manager server (id: {bec_server._service_id}). Press Ctrl+C to stop."
)
event.wait()
except KeyboardInterrupt:
bec_server.shutdown()
event.set()


if __name__ == "__main__":
main()
Loading
Loading