Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions src/arinc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from rich.progress import track
from database import DbConfig
from record_maps import record_maps


class ArincRecord:
def __init__(self, record_map: dict):
self.section: int | None = record_map.get("section_code")
self.subsection: int | None = record_map.get("subsection_code")
self.section_pos: int | None = record_map.get("section_pos")
self.subsection_pos: int | None = record_map.get("subsection_pos")
self.cont_rec_pos: int | None = record_map.get("cont_rec_pos")
self.cont_rec_vals: list[str] = record_map.get("cont_rec_vals", [])
self.name: str = record_map.get("name", "")
self.columns: list[dict] = record_map.get("columns", [])
self.column_names: list[str] = [c["name"] for c in record_map["columns"]]


class ArincParser:
def __init__(self, db: DbConfig, file: str):
self.db = db
self.file = file
self.lines = self.read_file()
self.cycle = self.get_cycle()
self.schema = f"cycle{self.cycle}"

def read_file(self) -> list[str]:
with open(self.file) as file:
return file.readlines()

def parse(self) -> None:
self.create_schema()
for record in record_maps:
self.create_arinc_record(record)

def get_cycle(self) -> str:
return self.lines[0][35:39]

def create_schema(self) -> None:
self.db.create_schema(self.schema)

def create_table(self, record: ArincRecord) -> None:
self.db.create_table(self.schema, record.name, record.column_names)

def add_row(self, name: str, values: list, cycle: str) -> None:
self.db.add_row(self.schema, name, values)

def create_arinc_record(self, record_map) -> None:
record = ArincRecord(record_map)

self.create_table(record)

for line in track(self.lines, description=f"{record.name.rjust(26)}"):
if (
record.section_pos is not None
and record.subsection_pos is not None
and line[record.section_pos] == record.section
and line[record.subsection_pos] == record.subsection
):
if (
not record.cont_rec_pos
or line[record.cont_rec_pos] in record.cont_rec_vals
):
row = [f"{line[i['start']:i['end']]}" for i in record.columns]
self.add_row(record.name, row, self.cycle)
3 changes: 0 additions & 3 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ def __init__(self):
self.dbtype = "sqlite"
self.dbname = parser["sqlite"]["dbname"]

if not parser.has_section("cifp_file"):
raise ValueError("No cifp_file configuration found in config.ini")

self.file_loc = parser["cifp_file"]["file_loc"]


Expand Down
4 changes: 2 additions & 2 deletions src/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from config import UserConfigs


class DbConfig(Protocol):
class DbConfig(Protocol): # pragma: no cover
def connect(self):
pass

Expand Down Expand Up @@ -81,7 +81,7 @@ def connect(self) -> Generator[sqlite3.Cursor, None, None]:
conn.commit()
conn.close()

def create_schema(self, _) -> None:
def create_schema(self, _) -> None: # pragma: no cover
# SQLite does not support schemas in the same way as PostgreSQL.
# This method is intentionally a no-op.
pass
Expand Down
76 changes: 6 additions & 70 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,81 +1,17 @@
from rich.progress import track
from database import get_db, DbConfig
from arinc import ArincParser
from config import UserConfigs
from record_maps import record_maps


configs: UserConfigs = UserConfigs()


class ArincRecord:
def __init__(self, record_map: dict):
self.section: int | None = record_map.get("section_code")
self.subsection: int | None = record_map.get("subsection_code")
self.section_pos: int | None = record_map.get("section_pos")
self.subsection_pos: int | None = record_map.get("subsection_pos")
self.cont_rec_pos: int | None = record_map.get("cont_rec_pos")
self.cont_rec_vals: list[str] = record_map.get("cont_rec_vals", [])
self.name: str = record_map.get("name", "")
self.columns: list[dict] = record_map.get("columns", [])
self.column_names: list[str] = [c["name"] for c in record_map["columns"]]


class ArincParser:
def __init__(self, db: DbConfig, file=configs.file_loc):
self.db = db
self.file = file
self.lines = self.read_file()
self.cycle = self.get_cycle()
self.schema = f"cycle{self.cycle}"

def read_file(self) -> list[str]:
with open(self.file) as file:
return file.readlines()

def parse(self) -> None:
self.create_schema()
for record in record_maps:
self.create_arinc_record(record)

def get_cycle(self) -> str:
return self.lines[0][35:39]

def create_schema(self) -> None:
self.db.create_schema(self.schema)

def create_table(self, record: ArincRecord) -> None:
self.db.create_table(self.schema, record.name, record.column_names)

def add_row(self, name: str, values: list, cycle: str) -> None:
self.db.add_row(self.schema, name, values)

def create_arinc_record(self, record_map) -> None:
record = ArincRecord(record_map)

self.create_table(record)

for line in track(self.lines, description=f"{record.name.rjust(26)}"):
if (
record.section_pos is not None
and record.subsection_pos is not None
and line[record.section_pos] == record.section
and line[record.subsection_pos] == record.subsection
):
if (
not record.cont_rec_pos
or line[record.cont_rec_pos] in record.cont_rec_vals
):
row = [f"{line[i['start']:i['end']]}" for i in record.columns]
self.add_row(record.name, row, self.cycle)
from database import DbConfig, get_db


def main() -> None:
configs: UserConfigs = UserConfigs()

db: DbConfig = get_db(configs)

with db.connect():
parser = ArincParser(db)
parser = ArincParser(db, configs.file_loc)
parser.parse()


if __name__ == "__main__":
if __name__ == "__main__": # pragma: no cover
main()
23 changes: 19 additions & 4 deletions src/record_maps.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Store of mappings of each ARINC-424 record in FAA CIFP output.

record_maps = [
record_maps = [ # pragma: no cover
{
"name": "airport",
"section_code": "P",
Expand Down Expand Up @@ -648,8 +648,18 @@
{"id": 11, "start": 27, "end": 28, "name": "Route_Indicator"},
{"id": 12, "start": 28, "end": 30, "name": "SBAS_Service_Provider"},
{"id": 13, "start": 30, "end": 32, "name": "Reference_Path_Data_Selector"},
{"id": 14, "start": 32, "end": 36, "name": "Reference_Path_Data_Identifier"},
{"id": 15, "start": 36, "end": 37, "name": "Approach_Performance_Designator"},
{
"id": 14,
"start": 32,
"end": 36,
"name": "Reference_Path_Data_Identifier",
},
{
"id": 15,
"start": 36,
"end": 37,
"name": "Approach_Performance_Designator",
},
{"id": 16, "start": 37, "end": 48, "name": "LTP_Latitude"},
{"id": 17, "start": 48, "end": 60, "name": "LTP_Longitude"},
{"id": 18, "start": 60, "end": 66, "name": "LTP_Ellipsoid_Height"},
Expand Down Expand Up @@ -802,7 +812,12 @@
{"id": 20, "start": 81, "end": 85, "name": "LOC__MLS__GLS_Identifier"},
{"id": 21, "start": 85, "end": 86, "name": "Category__Class"},
{"id": 22, "start": 86, "end": 90, "name": "Stopway"},
{"id": 23, "start": 90, "end": 94, "name": "Secondary_LOC_MLS_GLS_Identifier"},
{
"id": 23,
"start": 90,
"end": 94,
"name": "Secondary_LOC_MLS_GLS_Identifier",
},
{"id": 24, "start": 94, "end": 95, "name": "Category__Class_2"},
{"id": 25, "start": 101, "end": 123, "name": "Runway_Description"},
{"id": 26, "start": 123, "end": 128, "name": "File_Record_Number"},
Expand Down
92 changes: 92 additions & 0 deletions tests/test_arinc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import os
import tempfile

import arinc # type: ignore


class MockDbConfig:
def __init__(self):
self.schemas_created = []
self.tables_created = []
self.rows_added = []

def create_schema(self, schema_name: str) -> None:
self.schemas_created.append(schema_name)

def create_table(
self, schema_name: str, table_name: str, columns: list[str]
) -> None:
self.tables_created.append((schema_name, table_name, columns))

def add_row(self, schema_name: str, table_name: str, values: list[str]) -> None:
self.rows_added.append((schema_name, table_name, values))


def test_arinc_parser_parse():
test_record_map = {
"section_code": "A",
"subsection_code": "B",
"section_pos": 0,
"subsection_pos": 1,
"cont_rec_pos": 2,
"cont_rec_vals": ["C"],
"name": "test_record_parser",
"columns": [
{"name": "col1", "start": 3, "end": 5},
{"name": "col2", "start": 5, "end": 7},
],
}
arinc.record_maps = [test_record_map]

cycle_line = "X" * 35 + "2023\n"
line_match = "ABCDEFAB\n"
line_nomatch = "ZZZZZZZZ\n"
file_content = cycle_line + line_match + line_nomatch + line_match

with tempfile.NamedTemporaryFile("w+", delete=False) as tmp_file:
tmp_file.write(file_content)
tmp_file_path = tmp_file.name

try:
mock_db = MockDbConfig()
parser = arinc.ArincParser(mock_db, tmp_file_path)
parser.parse()

expected_schema = "cycle2023"
assert expected_schema in mock_db.schemas_created

expected_table = (expected_schema, "test_record_parser", ["col1", "col2"])
assert expected_table in mock_db.tables_created

expected_row = ["DE", "FA"]
matching_rows = [
row for _, tbl, row in mock_db.rows_added if tbl == "test_record_parser"
]
assert len(matching_rows) == 2
for row in matching_rows:
assert row == expected_row
finally:
os.unlink(tmp_file_path)


def test_arinc_record():
record_map = {
"section_code": 1,
"subsection_code": 2,
"section_pos": 3,
"subsection_pos": 4,
"cont_rec_pos": 5,
"cont_rec_vals": ["val1", "val2"],
"name": "test_record",
"columns": [{"name": "col1"}, {"name": "col2"}],
}
record = arinc.ArincRecord(record_map)

assert record.section == 1
assert record.subsection == 2
assert record.section_pos == 3
assert record.subsection_pos == 4
assert record.cont_rec_pos == 5
assert record.cont_rec_vals == ["val1", "val2"]
assert record.name == "test_record"
assert record.column_names == ["col1", "col2"]
Loading