Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added decorators/ccc/__init__.py
Empty file.
204 changes: 204 additions & 0 deletions decorators/ccc/ccc_frame.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
"""
E2SM-CCC REPORT decorator for the xDevSM-dApp framework.

Implements the periodic REPORT Style 2 (cell-level) flow:
- subscribe(): builds Event Trigger Definition Format 3 + Action
Definition Format 2 as JSON and ships them via the OSC subscription
manager.
- decode_message(): parses the JSON indication header/message bytes
received from the FlexRIC ccc_sm plugin and dispatches them to the
user-registered callback.
"""
Comment on lines +1 to +11
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to delete some comments here. I'll do it

from typing import List, Optional, Tuple

from ricxappframe.xapp_frame import rmr

from decorators.report import xAppReportService

from utils.constants import Values

from sm_framework.py_oran.ccc.constants import (
SM_CCC_ID,
STRUCT_O_NRCELLDU,
REPORT_TYPE_ALL,
REPORT_STYLE_CELL_LEVEL,
)
from sm_framework.py_oran.ccc.ccc_encoder import (
encode_event_trigger_periodic,
encode_action_definition_cell_level,
)
from sm_framework.py_oran.ccc.ccc_decoder import (
CccIndicationHeader,
CccIndicationMessage,
)

Comment on lines +20 to +34
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can leave the same notation as above


class XappCccFrame(xAppReportService):
"""
CCC REPORT decorator. Mirrors XappKpmFrame's lifecycle but speaks
JSON instead of the ASN.1-backed flexric helpers.
"""

def __init__(
self,
xapp_handler,
logger,
server,
xapp_name,
rmr_port,
http_port,
pltnamespace,
app_namespace,
):
super().__init__(
xapp_handler, logger, server, xapp_name, rmr_port, http_port, pltnamespace, app_namespace
)
# FlexRIC SM identifier — used to filter the incoming indication
# messages that come back over RMR. The RIC will assign a runtime
# RAN Function ID matching this plugin.
self.function_id = SM_CCC_ID

# ------------------------------------------------------------------
# Inbound: indication dispatch
# ------------------------------------------------------------------

def handle(self, xapp, summary, sbuf):
xapp.logger.debug("[XappCccFrame] received: {}".format(summary))
if summary[rmr.RMR_MS_MSG_TYPE] == Values.RIC_INDICATION:
self._handle_indication(xapp, summary)
elif summary[rmr.RMR_MS_MSG_TYPE] == Values.RIC_ERROR_INDICATION:
xapp.logger.error("[XappCccFrame] Error indication received")
else:
xapp.logger.debug(
"[XappCccFrame] unhandled message type: {}".format(summary[rmr.RMR_MS_MSG_TYPE])
)
self._xapp_handler.handle(xapp, summary, sbuf)

def decode_message(self, function_id, ba_ind_header, ba_ind_msg, meid):
self.logger.info("[XappCccFrame] E2AP function id: {}".format(function_id))
if function_id != self.function_id:
self.logger.info(
"[XappCccFrame] indication for different function id: {}".format(function_id)
)
return

# The base xAppReportService calls get_c_byte_array_from_py_byte_string
# which returns a ctypes uint8 array (or None). Convert it to a Python
# bytes object so the pure-Python CCC JSON decoders can consume it.
hdr_bytes = bytes(ba_ind_header) if ba_ind_header is not None else b""
msg_bytes = bytes(ba_ind_msg) if ba_ind_msg is not None else b""

hdr = CccIndicationHeader(hdr_bytes)
msg = CccIndicationMessage(msg_bytes)

self.logger.info(
"[XappCccFrame] decoded header: reason={} eventTime={}".format(
hdr.indication_reason, hdr.event_time
)
)

cb = self.get_indication_msg_callback()
if cb is None:
self._log_default(hdr, msg)
else:
cb(hdr, msg, meid)

def _log_default(self, hdr: CccIndicationHeader, msg: CccIndicationMessage) -> None:
for cell, struct in msg.iter_structures():
cgi = cell.get("cellGlobalId", {})
name = struct.get("ranConfigurationStructureName")
vs = (struct.get("valuesOfAttributes") or {}).get("ranConfigurationStructure") or {}
arfcn_dl = vs.get("arfcnDL")
bw_dl = vs.get("bSChannelBwDL")
bwps = vs.get("bWPList") or []
self.logger.info(
"[XappCccFrame] cell={} struct={} arfcnDL={} bSChannelBwDL={} bWPs={}".format(
cgi, name, arfcn_dl, bw_dl, len(bwps)
)
)
for i, b in enumerate(bwps):
self.logger.info(
"[XappCccFrame] BWP[{}] scs={} numberOfRBs={} startRB={} ctx={}".format(
i,
b.get("subCarrierSpacing"),
b.get("numberOfRBs"),
b.get("startRB"),
b.get("bwpContext"),
)
)

# ------------------------------------------------------------------
# Outbound: subscription
# ------------------------------------------------------------------

def subscribe(
self,
gnb,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Noemi2001 can you tell me what passes the xapp here? Is it the json object or the inventory name? In case I change it to a JSON object (just to use the same notation as kpm).

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The xApp invokes the subscribe method as follows:

gnb, gnb_info = xapp_gen.get_selected_e2node_info(args.gnb_target)
....
ccc_xapp.subscribe(
    gnb=gnb,
    period_ms=args.event_trigger,
    attributes=all_o_nr_cell_du_attrs,
)

So, the arguments passed by the xapp are:

  • gnb: the gNB object retrieved via xapp_gen.get_selected_e2node_info(args.gnb_target)
  • period_ms: the reporting period (ms) taken from the --event-trigger CLI argument.
  • attributes: the list all_o_nr_cell_du_attrs, containing the O-RAN O-NRCellDU attributes to be monitored.

period_ms: int = 1000,
ran_cfg_structure_name: str = STRUCT_O_NRCELLDU,
attributes: Optional[List[str]] = None,
report_type: str = REPORT_TYPE_ALL,
cell_global_id: Optional[dict] = None,
action_type: str = Values.ACTION_TYPE,
):
"""Subscribe the dApp to a periodic CCC REPORT.

Defaults to REPORT Style 2 (cell-level) on O-NRCellDU with the
three attributes targeted by this implementation iteration:
arfcnDL, bSChannelBwDL, bWPList.
"""
if attributes is None:
attributes = ["arfcnDL", "bSChannelBwDL", "bWPList"]

self.logger.info(
"[XappCccFrame] preparing subscription for gnb={} period_ms={} attrs={}".format(
getattr(gnb, "inventory_name", "?"), period_ms, attributes
)
)

if self.subscriber.ResponseHandler(self.subs_response_cb, self.server) is not True:
self.logger.error("Error when trying to set the subscription response callback")

ev_trig = encode_event_trigger_periodic(period_ms)
act_def = encode_action_definition_cell_level(
ran_cfg_structure_name=ran_cfg_structure_name,
attribute_names=attributes,
cell_global_id=cell_global_id,
report_type=report_type,
ric_style_type=REPORT_STYLE_CELL_LEVEL,
)

action = self.subscriber.ActionToBeSetup(
action_id=1,
action_type=action_type,
action_definition=act_def.byte_array_to_tuple(),
subsequent_action=self.subscriber.SubsequentAction(
subsequent_action_type="continue", time_to_wait="w5ms"
),
)

return self.send_subscription(gnb, ev_trig, [action])

# ------------------------------------------------------------------
# Convenience: ad-hoc subscription helper
# ------------------------------------------------------------------

def subscribe_arfcn_bw_bwps(self, gnb, period_ms: int = 1000):
"""One-call helper for the three target metrics of this iteration."""
return self.subscribe(
gnb=gnb,
period_ms=period_ms,
ran_cfg_structure_name=STRUCT_O_NRCELLDU,
attributes=["arfcnDL", "bSChannelBwDL", "bWPList"],
)
Comment on lines +186 to +193
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Noemi2001 how is this used in the xApp? So I can add a better description :)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a wrapper around subscribe() that hardcodes the three "minimal" attributes, but currently my xApp does not use it anymore (it directly calls subscribe(...) with all 20 O-NRCellDU attributes)


def terminate(self, signum, frame):
self.logger.info("[XappCccFrame] received termination signal")
if not self.subscription_id:
self.logger.info("[XappCccFrame] not subscribed - terminating")
else:
for key, sid in self.subscription_id.items():
self.logger.info(
"[XappCccFrame] unsubscribing gnb={} subid={}".format(key, sid)
)
self._xapp_handler.terminate(signum, frame)
2 changes: 1 addition & 1 deletion handlers/xDevSM_rmr_xapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def get_selected_e2node_info(self, e2node_inventory_name=None):

gnb_info = self.get_ran_info(e2node=gnb)

if gnb_info["connectionStatus"] != "CONNECTED":
if gnb_info.get("connectionStatus") != "CONNECTED":
self.logger.info("[xDevSMRMRXapp] E2 node {} not connected! Skipping...".format(gnb.inventory_name))
continue

Expand Down
Empty file.
74 changes: 74 additions & 0 deletions sm_framework/py_oran/ccc/ccc_decoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""
Pure-Python JSON decoders for the E2SM-CCC IEs received from the RIC.

The FlexRIC ccc_sm plugin forwards the on-wire JSON bytes verbatim into
the indication header / message OCTET STRINGs, so on the dApp side we
just need `json.loads`. Two thin wrappers expose the decoded dicts and
a few helpers to drill into Indication Message Format 2 (cell-level).
"""
Comment on lines +1 to +8
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment should be deleted

import json
from typing import Iterator, List, Optional


class CccIndicationHeader:
"""Wrapper around an E2SM-CCC Indication Header Format 1 payload."""

def __init__(self, raw: bytes):
self.raw = raw
try:
outer = json.loads(raw.decode("utf-8")) if raw else {}
except (UnicodeDecodeError, json.JSONDecodeError):
outer = {}
self.payload = outer.get("indicationHeaderFormat", outer) or {}

@property
def indication_reason(self) -> Optional[str]:
return self.payload.get("indicationReason")

@property
def event_time(self) -> Optional[str]:
return self.payload.get("eventTime")

def __repr__(self):
return (
f"CccIndicationHeader(reason={self.indication_reason!r}, "
f"event_time={self.event_time!r})"
)


class CccIndicationMessage:
"""Wrapper around an E2SM-CCC Indication Message Format 2 payload.

Provides convenience accessors for iterating over reported cells and
the configuration structures they carry.
"""

def __init__(self, raw: bytes):
self.raw = raw
try:
outer = json.loads(raw.decode("utf-8")) if raw else {}
except (UnicodeDecodeError, json.JSONDecodeError):
outer = {}
self.payload = outer.get("indicationMessageFormat", outer) or {}

def cells(self) -> List[dict]:
return self.payload.get("listOfCellsReported", []) or []

def iter_structures(self) -> Iterator[dict]:
"""Yield (cell, structure) tuples (Indication Message Format 2)."""
for c in self.cells():
for s in c.get("listOfConfigurationStructuresReported", []) or []:
yield c, s

def find_o_nr_cell_du(self) -> List[dict]:
"""Return all O-NRCellDU `valuesOfAttributes.ranConfigurationStructure`
dicts reported across all cells in this indication."""
out = []
for _, s in self.iter_structures():
if s.get("ranConfigurationStructureName") == "O-NRCellDU":
vs = (s.get("valuesOfAttributes") or {}).get("ranConfigurationStructure") or {}
out.append(vs)
return out

def __repr__(self):
return f"CccIndicationMessage(cells={len(self.cells())})"
89 changes: 89 additions & 0 deletions sm_framework/py_oran/ccc/ccc_encoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""
Pure-Python JSON encoders for the E2SM-CCC IEs that ride inside the
E2AP OCTET STRINGs. The dApp builds Python dicts that mirror the JSON
Schema in O-RAN.WG3.TS.E2SM-CCC §9.4.2 and ships them to the SM as bytes.
"""
Comment on lines +1 to +5
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments to be removed

import ctypes
import json
from typing import List, Optional

from sm_framework.py_oran.ccc.constants import (
EVENT_TRIGGER_FORMAT_PERIODIC,
ACTION_DEF_FORMAT_CELL_LEVEL,
REPORT_STYLE_CELL_LEVEL,
REPORT_TYPE_ALL,
)


class CCCByteArray:
"""Drop-in replacement for ByteArray with a `byte_array_to_tuple`
method matching the xAppReportService.send_subscription contract.

Holds the raw bytes in a ctypes uint8 array so the lifetime survives
the subscription POST call.
"""

def __init__(self, raw: bytes):
self._raw = raw
arr_type = ctypes.c_uint8 * len(raw)
self._buf = arr_type.from_buffer_copy(raw)
self.len = len(raw)
self.buf = ctypes.cast(self._buf, ctypes.POINTER(ctypes.c_uint8))

def byte_array_to_tuple(self):
return tuple(self._raw)

def to_bytes(self) -> bytes:
return bytes(self._raw)


def encode_event_trigger_periodic(period_ms: int) -> CCCByteArray:
"""E2SM-CCC Event Trigger Definition Format 3 (periodic, §9.2.1.1.3).

The single field `period` is in milliseconds (range 10..4294967295).
"""
obj = {
"eventTriggerDefinitionFormat": {
"period": int(period_ms),
}
}
return CCCByteArray(json.dumps(obj, separators=(",", ":")).encode("utf-8"))


def encode_action_definition_cell_level(
ran_cfg_structure_name: str,
attribute_names: List[str],
cell_global_id: Optional[dict] = None,
report_type: str = REPORT_TYPE_ALL,
ric_style_type: int = REPORT_STYLE_CELL_LEVEL,
) -> CCCByteArray:
"""E2SM-CCC Action Definition Format 2 (cell-level, §9.2.1.2.2).

Parameters
----------
ran_cfg_structure_name : e.g. "O-NRCellDU"
attribute_names : e.g. ["arfcnDL", "bSChannelBwDL", "bWPList"]
cell_global_id : optional dict matching NR-CGI ({"plmnIdentity":{...},
"nRCellIdentity":"<9-hex>"}); when omitted the action
applies to all cells of the E2 Node.
"""
one_struct = {
"reportType": report_type,
"ranConfigurationStructureName": ran_cfg_structure_name,
"listOfAttributes": [
{"attributeName": n} for n in attribute_names
],
}
one_cell = {
"listOfCellLevelRANConfigurationStructuresForADF": [one_struct]
}
if cell_global_id is not None:
one_cell["cellGlobalId"] = cell_global_id

obj = {
"ricStyleType": int(ric_style_type),
"actionDefinitionFormat": {
"listOfCellConfigurationsToBeReportedForADF": [one_cell]
},
}
return CCCByteArray(json.dumps(obj, separators=(",", ":")).encode("utf-8"))
Loading