diff --git a/decorators/ccc/__init__.py b/decorators/ccc/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/decorators/ccc/ccc_frame.py b/decorators/ccc/ccc_frame.py new file mode 100644 index 0000000..c6850dd --- /dev/null +++ b/decorators/ccc/ccc_frame.py @@ -0,0 +1,204 @@ +""" +E2SM-CCC REPORT decorator for the xDevSM-dApp framework. + +Implements the periodic REPORT Style 2 (cell-level) flow: + - subscribe(): builds Event Trigger Definition Format 3 + Action + Definition Format 2 as JSON and ships them via the OSC subscription + manager. + - decode_message(): parses the JSON indication header/message bytes + received from the FlexRIC ccc_sm plugin and dispatches them to the + user-registered callback. +""" +from typing import List, Optional, Tuple + +from ricxappframe.xapp_frame import rmr + +from decorators.report import xAppReportService + +from utils.constants import Values + +from sm_framework.py_oran.ccc.constants import ( + SM_CCC_ID, + STRUCT_O_NRCELLDU, + REPORT_TYPE_ALL, + REPORT_STYLE_CELL_LEVEL, +) +from sm_framework.py_oran.ccc.ccc_encoder import ( + encode_event_trigger_periodic, + encode_action_definition_cell_level, +) +from sm_framework.py_oran.ccc.ccc_decoder import ( + CccIndicationHeader, + CccIndicationMessage, +) + + +class XappCccFrame(xAppReportService): + """ + CCC REPORT decorator. Mirrors XappKpmFrame's lifecycle but speaks + JSON instead of the ASN.1-backed flexric helpers. + """ + + def __init__( + self, + xapp_handler, + logger, + server, + xapp_name, + rmr_port, + http_port, + pltnamespace, + app_namespace, + ): + super().__init__( + xapp_handler, logger, server, xapp_name, rmr_port, http_port, pltnamespace, app_namespace + ) + # FlexRIC SM identifier — used to filter the incoming indication + # messages that come back over RMR. The RIC will assign a runtime + # RAN Function ID matching this plugin. + self.function_id = SM_CCC_ID + + # ------------------------------------------------------------------ + # Inbound: indication dispatch + # ------------------------------------------------------------------ + + def handle(self, xapp, summary, sbuf): + xapp.logger.debug("[XappCccFrame] received: {}".format(summary)) + if summary[rmr.RMR_MS_MSG_TYPE] == Values.RIC_INDICATION: + self._handle_indication(xapp, summary) + elif summary[rmr.RMR_MS_MSG_TYPE] == Values.RIC_ERROR_INDICATION: + xapp.logger.error("[XappCccFrame] Error indication received") + else: + xapp.logger.debug( + "[XappCccFrame] unhandled message type: {}".format(summary[rmr.RMR_MS_MSG_TYPE]) + ) + self._xapp_handler.handle(xapp, summary, sbuf) + + def decode_message(self, function_id, ba_ind_header, ba_ind_msg, meid): + self.logger.info("[XappCccFrame] E2AP function id: {}".format(function_id)) + if function_id != self.function_id: + self.logger.info( + "[XappCccFrame] indication for different function id: {}".format(function_id) + ) + return + + # The base xAppReportService calls get_c_byte_array_from_py_byte_string + # which returns a ctypes uint8 array (or None). Convert it to a Python + # bytes object so the pure-Python CCC JSON decoders can consume it. + hdr_bytes = bytes(ba_ind_header) if ba_ind_header is not None else b"" + msg_bytes = bytes(ba_ind_msg) if ba_ind_msg is not None else b"" + + hdr = CccIndicationHeader(hdr_bytes) + msg = CccIndicationMessage(msg_bytes) + + self.logger.info( + "[XappCccFrame] decoded header: reason={} eventTime={}".format( + hdr.indication_reason, hdr.event_time + ) + ) + + cb = self.get_indication_msg_callback() + if cb is None: + self._log_default(hdr, msg) + else: + cb(hdr, msg, meid) + + def _log_default(self, hdr: CccIndicationHeader, msg: CccIndicationMessage) -> None: + for cell, struct in msg.iter_structures(): + cgi = cell.get("cellGlobalId", {}) + name = struct.get("ranConfigurationStructureName") + vs = (struct.get("valuesOfAttributes") or {}).get("ranConfigurationStructure") or {} + arfcn_dl = vs.get("arfcnDL") + bw_dl = vs.get("bSChannelBwDL") + bwps = vs.get("bWPList") or [] + self.logger.info( + "[XappCccFrame] cell={} struct={} arfcnDL={} bSChannelBwDL={} bWPs={}".format( + cgi, name, arfcn_dl, bw_dl, len(bwps) + ) + ) + for i, b in enumerate(bwps): + self.logger.info( + "[XappCccFrame] BWP[{}] scs={} numberOfRBs={} startRB={} ctx={}".format( + i, + b.get("subCarrierSpacing"), + b.get("numberOfRBs"), + b.get("startRB"), + b.get("bwpContext"), + ) + ) + + # ------------------------------------------------------------------ + # Outbound: subscription + # ------------------------------------------------------------------ + + def subscribe( + self, + gnb, + period_ms: int = 1000, + ran_cfg_structure_name: str = STRUCT_O_NRCELLDU, + attributes: Optional[List[str]] = None, + report_type: str = REPORT_TYPE_ALL, + cell_global_id: Optional[dict] = None, + action_type: str = Values.ACTION_TYPE, + ): + """Subscribe the dApp to a periodic CCC REPORT. + + Defaults to REPORT Style 2 (cell-level) on O-NRCellDU with the + three attributes targeted by this implementation iteration: + arfcnDL, bSChannelBwDL, bWPList. + """ + if attributes is None: + attributes = ["arfcnDL", "bSChannelBwDL", "bWPList"] + + self.logger.info( + "[XappCccFrame] preparing subscription for gnb={} period_ms={} attrs={}".format( + getattr(gnb, "inventory_name", "?"), period_ms, attributes + ) + ) + + if self.subscriber.ResponseHandler(self.subs_response_cb, self.server) is not True: + self.logger.error("Error when trying to set the subscription response callback") + + ev_trig = encode_event_trigger_periodic(period_ms) + act_def = encode_action_definition_cell_level( + ran_cfg_structure_name=ran_cfg_structure_name, + attribute_names=attributes, + cell_global_id=cell_global_id, + report_type=report_type, + ric_style_type=REPORT_STYLE_CELL_LEVEL, + ) + + action = self.subscriber.ActionToBeSetup( + action_id=1, + action_type=action_type, + action_definition=act_def.byte_array_to_tuple(), + subsequent_action=self.subscriber.SubsequentAction( + subsequent_action_type="continue", time_to_wait="w5ms" + ), + ) + + return self.send_subscription(gnb, ev_trig, [action]) + + # ------------------------------------------------------------------ + # Convenience: ad-hoc subscription helper + # ------------------------------------------------------------------ + + def subscribe_arfcn_bw_bwps(self, gnb, period_ms: int = 1000): + """One-call helper for the three target metrics of this iteration.""" + return self.subscribe( + gnb=gnb, + period_ms=period_ms, + ran_cfg_structure_name=STRUCT_O_NRCELLDU, + attributes=["arfcnDL", "bSChannelBwDL", "bWPList"], + ) + + def terminate(self, signum, frame): + self.logger.info("[XappCccFrame] received termination signal") + if not self.subscription_id: + self.logger.info("[XappCccFrame] not subscribed - terminating") + else: + for key, sid in self.subscription_id.items(): + self.logger.info( + "[XappCccFrame] unsubscribing gnb={} subid={}".format(key, sid) + ) + self._xapp_handler.terminate(signum, frame) diff --git a/handlers/xDevSM_rmr_xapp.py b/handlers/xDevSM_rmr_xapp.py index 7ddb67c..ced00c6 100644 --- a/handlers/xDevSM_rmr_xapp.py +++ b/handlers/xDevSM_rmr_xapp.py @@ -186,7 +186,7 @@ def get_selected_e2node_info(self, e2node_inventory_name=None): gnb_info = self.get_ran_info(e2node=gnb) - if gnb_info["connectionStatus"] != "CONNECTED": + if gnb_info.get("connectionStatus") != "CONNECTED": self.logger.info("[xDevSMRMRXapp] E2 node {} not connected! Skipping...".format(gnb.inventory_name)) continue diff --git a/sm_framework/py_oran/ccc/__init__.py b/sm_framework/py_oran/ccc/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sm_framework/py_oran/ccc/ccc_decoder.py b/sm_framework/py_oran/ccc/ccc_decoder.py new file mode 100644 index 0000000..1e95de1 --- /dev/null +++ b/sm_framework/py_oran/ccc/ccc_decoder.py @@ -0,0 +1,74 @@ +""" +Pure-Python JSON decoders for the E2SM-CCC IEs received from the RIC. + +The FlexRIC ccc_sm plugin forwards the on-wire JSON bytes verbatim into +the indication header / message OCTET STRINGs, so on the dApp side we +just need `json.loads`. Two thin wrappers expose the decoded dicts and +a few helpers to drill into Indication Message Format 2 (cell-level). +""" +import json +from typing import Iterator, List, Optional + + +class CccIndicationHeader: + """Wrapper around an E2SM-CCC Indication Header Format 1 payload.""" + + def __init__(self, raw: bytes): + self.raw = raw + try: + outer = json.loads(raw.decode("utf-8")) if raw else {} + except (UnicodeDecodeError, json.JSONDecodeError): + outer = {} + self.payload = outer.get("indicationHeaderFormat", outer) or {} + + @property + def indication_reason(self) -> Optional[str]: + return self.payload.get("indicationReason") + + @property + def event_time(self) -> Optional[str]: + return self.payload.get("eventTime") + + def __repr__(self): + return ( + f"CccIndicationHeader(reason={self.indication_reason!r}, " + f"event_time={self.event_time!r})" + ) + + +class CccIndicationMessage: + """Wrapper around an E2SM-CCC Indication Message Format 2 payload. + + Provides convenience accessors for iterating over reported cells and + the configuration structures they carry. + """ + + def __init__(self, raw: bytes): + self.raw = raw + try: + outer = json.loads(raw.decode("utf-8")) if raw else {} + except (UnicodeDecodeError, json.JSONDecodeError): + outer = {} + self.payload = outer.get("indicationMessageFormat", outer) or {} + + def cells(self) -> List[dict]: + return self.payload.get("listOfCellsReported", []) or [] + + def iter_structures(self) -> Iterator[dict]: + """Yield (cell, structure) tuples (Indication Message Format 2).""" + for c in self.cells(): + for s in c.get("listOfConfigurationStructuresReported", []) or []: + yield c, s + + def find_o_nr_cell_du(self) -> List[dict]: + """Return all O-NRCellDU `valuesOfAttributes.ranConfigurationStructure` + dicts reported across all cells in this indication.""" + out = [] + for _, s in self.iter_structures(): + if s.get("ranConfigurationStructureName") == "O-NRCellDU": + vs = (s.get("valuesOfAttributes") or {}).get("ranConfigurationStructure") or {} + out.append(vs) + return out + + def __repr__(self): + return f"CccIndicationMessage(cells={len(self.cells())})" diff --git a/sm_framework/py_oran/ccc/ccc_encoder.py b/sm_framework/py_oran/ccc/ccc_encoder.py new file mode 100644 index 0000000..72e8415 --- /dev/null +++ b/sm_framework/py_oran/ccc/ccc_encoder.py @@ -0,0 +1,89 @@ +""" +Pure-Python JSON encoders for the E2SM-CCC IEs that ride inside the +E2AP OCTET STRINGs. The dApp builds Python dicts that mirror the JSON +Schema in O-RAN.WG3.TS.E2SM-CCC §9.4.2 and ships them to the SM as bytes. +""" +import ctypes +import json +from typing import List, Optional + +from sm_framework.py_oran.ccc.constants import ( + EVENT_TRIGGER_FORMAT_PERIODIC, + ACTION_DEF_FORMAT_CELL_LEVEL, + REPORT_STYLE_CELL_LEVEL, + REPORT_TYPE_ALL, +) + + +class CCCByteArray: + """Drop-in replacement for ByteArray with a `byte_array_to_tuple` + method matching the xAppReportService.send_subscription contract. + + Holds the raw bytes in a ctypes uint8 array so the lifetime survives + the subscription POST call. + """ + + def __init__(self, raw: bytes): + self._raw = raw + arr_type = ctypes.c_uint8 * len(raw) + self._buf = arr_type.from_buffer_copy(raw) + self.len = len(raw) + self.buf = ctypes.cast(self._buf, ctypes.POINTER(ctypes.c_uint8)) + + def byte_array_to_tuple(self): + return tuple(self._raw) + + def to_bytes(self) -> bytes: + return bytes(self._raw) + + +def encode_event_trigger_periodic(period_ms: int) -> CCCByteArray: + """E2SM-CCC Event Trigger Definition Format 3 (periodic, §9.2.1.1.3). + + The single field `period` is in milliseconds (range 10..4294967295). + """ + obj = { + "eventTriggerDefinitionFormat": { + "period": int(period_ms), + } + } + return CCCByteArray(json.dumps(obj, separators=(",", ":")).encode("utf-8")) + + +def encode_action_definition_cell_level( + ran_cfg_structure_name: str, + attribute_names: List[str], + cell_global_id: Optional[dict] = None, + report_type: str = REPORT_TYPE_ALL, + ric_style_type: int = REPORT_STYLE_CELL_LEVEL, +) -> CCCByteArray: + """E2SM-CCC Action Definition Format 2 (cell-level, §9.2.1.2.2). + + Parameters + ---------- + ran_cfg_structure_name : e.g. "O-NRCellDU" + attribute_names : e.g. ["arfcnDL", "bSChannelBwDL", "bWPList"] + cell_global_id : optional dict matching NR-CGI ({"plmnIdentity":{...}, + "nRCellIdentity":"<9-hex>"}); when omitted the action + applies to all cells of the E2 Node. + """ + one_struct = { + "reportType": report_type, + "ranConfigurationStructureName": ran_cfg_structure_name, + "listOfAttributes": [ + {"attributeName": n} for n in attribute_names + ], + } + one_cell = { + "listOfCellLevelRANConfigurationStructuresForADF": [one_struct] + } + if cell_global_id is not None: + one_cell["cellGlobalId"] = cell_global_id + + obj = { + "ricStyleType": int(ric_style_type), + "actionDefinitionFormat": { + "listOfCellConfigurationsToBeReportedForADF": [one_cell] + }, + } + return CCCByteArray(json.dumps(obj, separators=(",", ":")).encode("utf-8")) diff --git a/sm_framework/py_oran/ccc/constants.py b/sm_framework/py_oran/ccc/constants.py new file mode 100644 index 0000000..c181407 --- /dev/null +++ b/sm_framework/py_oran/ccc/constants.py @@ -0,0 +1,43 @@ +""" +E2SM-CCC constants (O-RAN.WG3.TS.E2SM-CCC-R004-v06.00). + +Function ID is assigned at runtime by the RIC when registering the RAN +function, but we keep the FlexRIC plugin-side SM identifier here for the +dApp to filter inbound indications. +""" + +# Must match flexric/src/sm/ccc_sm/ccc_sm_id.h:SM_CCC_ID. +SM_CCC_ID = 149 + +SM_CCC_OID = "1.3.6.1.4.1.53148.1.1.2.4" +SM_CCC_NAME = "ORAN-E2SM-CCC" + +# RIC Service Styles (§7.4) +REPORT_STYLE_NODE_LEVEL = 1 +REPORT_STYLE_CELL_LEVEL = 2 + +# IE Formats (§7.8) +EVENT_TRIGGER_FORMAT_NODE_CHANGE = 1 +EVENT_TRIGGER_FORMAT_CELL_CHANGE = 2 +EVENT_TRIGGER_FORMAT_PERIODIC = 3 + +ACTION_DEF_FORMAT_NODE_LEVEL = 1 +ACTION_DEF_FORMAT_CELL_LEVEL = 2 + +IND_HDR_FORMAT_1 = 1 +IND_MSG_FORMAT_NODE_LEVEL = 1 +IND_MSG_FORMAT_CELL_LEVEL = 2 + +# Indication reasons (§9.2.1.3.1) +IND_REASON_UPON_SUBSCRIPTION = "uponSubscription" +IND_REASON_UPON_CHANGE = "uponChange" +IND_REASON_PERIODIC = "periodic" + +# Report types (§9.3.9) +REPORT_TYPE_ALL = "all" +REPORT_TYPE_CHANGE = "change" + +# Cell-level RAN Configuration Structure names (§8.2.2). +STRUCT_O_NRCELLDU = "O-NRCellDU" +STRUCT_O_NRCELLCU = "O-NRCellCU" +STRUCT_O_BWP = "O-BWP"