Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/yamcs/pymdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
from yamcs.pymdb.parameters import * # noqa
from yamcs.pymdb.systems import * # noqa
from yamcs.pymdb.verifiers import * # noqa
from yamcs.pymdb.headers import *
161 changes: 161 additions & 0 deletions src/yamcs/pymdb/headers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
from __future__ import annotations

from collections.abc import Mapping
from typing import TYPE_CHECKING, TextIO
from enum import Enum

from functools import total_ordering
import re

class ValidationStatus(str, Enum):
UNKNOWN = "Unknown"
WORKING = "Working"
DRAFT = "Draft"
TEST = "Test"
VALIDATED = "Validated"
RELEASED = "Released"
WITHDRAWN = "Withdrawn"

@total_ordering
class History:
__slots__ = ("version", "date", "message", "author")

def __init__(
self,
version: str,
date: str | None = None,
message: str | None = None,
author: str | None = None,
):
if version is None:
raise ValueError("Version can not be null")

if not re.match(r"[0-9]+.*", version):
raise ValueError(f"Invalid version format '{version}'")

self.version: str = version
self.date: str | None = date
self.message: str | None = message
self.author: str | None = author

def __str__(self) -> str:
base = f"{self.version}"
base = f"{base}; {self.date}" if self.date else f"{base}; "
base = f"{base}; {self.message}" if self.message else f"{base}; "
base = f"{base}; {self.author}" if self.author else f"{base}; "

return base

def _version_parts(self) -> list[str]:
return self.version.split(".")

def __eq__(self, other: object) -> bool:
if not isinstance(other, History):
return NotImplemented
return self._compare_versions(other) == 0

def __lt__(self, other: History) -> bool:
if other is None:
return False
return self._compare_versions(other) < 0

def _compare_versions(self, other: History) -> int:
parts = self._version_parts()
oparts = other._version_parts()

length = max(len(parts), len(oparts))

for i in range(length):
p = parts[i] if i < len(parts) else "0"
o = oparts[i] if i < len(oparts) else "0"

try:
pi = int(p)
oi = int(o)
if pi != oi:
return -1 if pi < oi else 1
except ValueError:
if p != o:
return -1 if p < o else 1

return 0

class Header:
__slots__ = ("_version", "_date", "_history_list", "_author_list", "_validation_status", "_classification", "_classification_instructions")

def __init__(self, validation_status: str | ValidationStatus) -> None:
self._version: str | None = None
self._date: str | None = None
self._history_list: list[History] = []
self._author_list: list[str] = []
self._classification: str = "NotClassified"
self._classification_instructions: str | None = None

# Mandatory field
self.validation_status: ValidationStatus = validation_status

# setters
def set_version(self, version: str | None) -> None:
self._version = version

def set_date(self, date: str | None) -> None:
self._date = date

def set_classification(self, classification: str | None) -> None:
if classification:
self._classification = classification

def set_classification_instructions(self, instructions: str | None) -> None:
self._classification_instructions = instructions

@property
def validation_status(self) -> str:
return self._validation_status.value

@validation_status.setter
def validation_status(self, value: str | ValidationStatus) -> None:

if isinstance(value, ValidationStatus):
self._validation_status = value
return

if isinstance(value, str):
try:
self._validation_status = ValidationStatus(value)
return
except ValueError:
pass

raise ValueError(
f"Invalid validation status: {value!r}. "
f"Allowed: {[s.value for s in ValidationStatus]}"
)

# getters
def get_classification(self) -> str:
return self._classification

def get_classification_instructions(self) -> str | None:
return self._classification_instructions

def get_version(self) -> str | None:
return self._version

def get_date(self) -> str | None:
return self._date

def get_history_list(self) -> list[History]:
return self._history_list

def get_author_list(self) -> list[str]:
return self._author_list

def add_history(self, history: History) -> None:
self._history_list.append(history)

def add_author(self, author: str) -> None:
self._author_list.append(author)


def __str__(self) -> str:
return f"version: {self._version}, date: {self._date}"
9 changes: 9 additions & 0 deletions src/yamcs/pymdb/systems.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from yamcs.pymdb.commands import Command
from yamcs.pymdb.containers import Container
from yamcs.pymdb.parameters import Parameter
from yamcs.pymdb.headers import Header


class System:
Expand All @@ -32,6 +33,7 @@ def __init__(
short_description: str | None = None,
long_description: str | None = None,
extra: Mapping[str, str] | None = None,
header: Header | None = None
):
self.name: str = name
"""Short name of this system"""
Expand All @@ -48,12 +50,19 @@ def __init__(
self.extra: dict[str, str] = dict(extra or {})
"""Arbitrary information, keyed by name"""

self._header: Header | None = header
"""Header content of thee SpaceSystem"""

self._algorithms_by_name: dict[str, Algorithm] = {}
self._commands_by_name: dict[str, Command] = {}
self._containers_by_name: dict[str, Container] = {}
self._parameters_by_name: dict[str, Parameter] = {}
self._subsystems_by_name: dict[str, Subsystem] = {}

@property
def header(self) -> Header | None:
return self._header

@property
def root(self) -> System:
"""
Expand Down
39 changes: 39 additions & 0 deletions src/yamcs/pymdb/xtce.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@
OrExpression,
ParameterMember,
)
from yamcs.pymdb.headers import (
Header,
History
)
from yamcs.pymdb.parameters import (
AbsoluteTimeParameter,
AggregateParameter,
Expand Down Expand Up @@ -2389,6 +2393,38 @@ def add_space_systems(self, parent: ET.Element, system: System):
for subsystem in system.subsystems:
self.generate_space_system(subsystem, parent)

def add_header(self, parent: ET.Element, header: Header):
header_el = ET.SubElement(parent, "Header")

history_list: list[History] = header.get_history_list()
author_list: list[str] = header.get_author_list()

date: str | None = header.get_date()
version: str | None = header.get_version()
validation_status: str = header.validation_status
classification: str = header.get_classification()
classification_instructions: str | None = header.get_classification_instructions()

header_el.attrib["validationStatus"] = validation_status
header_el.attrib["classification"] = classification

if classification_instructions:
header_el.attrib["classificationInstructions"] = classification_instructions
if version:
header_el.attrib["version"] = version
if date:
header_el.attrib["date"] = date

if history_list:
history_set_el = ET.SubElement(header_el, "HistorySet")
for history in history_list:
ET.SubElement(history_set_el, "History").text = str(history)

if author_list:
author_set_el = ET.SubElement(header_el, "AuthorSet")
for author in author_list:
ET.SubElement(author_set_el, "Author").text = author

def generate_space_system(
self,
system: System,
Expand All @@ -2415,6 +2451,9 @@ def generate_space_system(
if system.long_description:
ET.SubElement(el, "LongDescription").text = system.long_description

if system.header:
self.add_header(el, system.header)

if system.aliases:
self.add_aliases(el, system.aliases)

Expand Down