From 2b6aeb26d362b8bbc105152f92e94776dd5d3eba Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 2 Jun 2025 12:16:46 +0200 Subject: [PATCH 001/141] attempt of streaming audio to gui --- .../src/components/dynamic_components.vue | 14 ++++- .../src/components/grid/grid_audio.vue | 61 +++++++++++++++++++ freedata_gui/src/js/audioStreamHandler.js | 12 ++++ freedata_gui/src/js/event_sock.js | 6 ++ freedata_gui/src/js/waterfallHandler.js | 1 + freedata_gui/src/store/audioStore.js | 2 + freedata_server/api/websocket.py | 17 ++++++ freedata_server/context.py | 2 + freedata_server/modem.py | 6 ++ freedata_server/websocket_manager.py | 42 ++++++++++++- 10 files changed, 159 insertions(+), 4 deletions(-) create mode 100644 freedata_gui/src/components/grid/grid_audio.vue create mode 100644 freedata_gui/src/js/audioStreamHandler.js diff --git a/freedata_gui/src/components/dynamic_components.vue b/freedata_gui/src/components/dynamic_components.vue index 092d058de..cb5743ffe 100644 --- a/freedata_gui/src/components/dynamic_components.vue +++ b/freedata_gui/src/components/dynamic_components.vue @@ -28,6 +28,7 @@ import grid_tune from "./grid/grid_tune.vue"; import grid_CQ_btn from "./grid/grid_CQ.vue"; import grid_ping from "./grid/grid_ping.vue"; import grid_freq from "./grid/grid_frequency.vue"; +import grid_audio from "./grid/grid_audio.vue"; import grid_beacon from "./grid/grid_beacon.vue"; import grid_mycall_small from "./grid/grid_mycall small.vue"; import grid_scatter from "./grid/grid_scatter.vue"; @@ -325,8 +326,19 @@ const gridWidgets = [ 18, false, { x: 16, y: 8, w: 2, h: 8 } + ), + new gridWidget( + grid_audio, + { x: 16, y: 8, w: 2, h: 8 }, + "Audio widget", + false, + true, + "Audio", + 24, + false, + { x: 16, y: 8, w: 2, h: 8 } ) - //Next new widget ID should be 23 + //Next new widget ID should be 24 ]; function updateFrequencyAndApply(frequency) { diff --git a/freedata_gui/src/components/grid/grid_audio.vue b/freedata_gui/src/components/grid/grid_audio.vue new file mode 100644 index 000000000..ddd395334 --- /dev/null +++ b/freedata_gui/src/components/grid/grid_audio.vue @@ -0,0 +1,61 @@ + + + diff --git a/freedata_gui/src/js/audioStreamHandler.js b/freedata_gui/src/js/audioStreamHandler.js new file mode 100644 index 000000000..a27684f15 --- /dev/null +++ b/freedata_gui/src/js/audioStreamHandler.js @@ -0,0 +1,12 @@ +import { setActivePinia } from "pinia"; +import pinia from "../store/index"; +setActivePinia(pinia); + +import { useAudioStore } from "../store/audioStore.js"; +const audio = useAudioStore(pinia); + +export function addDataToAudio(data) { + const int16 = new Int16Array(data); // ArrayBuffer → Int16 PCM + const copied = new Int16Array(int16); + audio.rxStream.push(copied); +} diff --git a/freedata_gui/src/js/event_sock.js b/freedata_gui/src/js/event_sock.js index ef03ec746..87d800408 100644 --- a/freedata_gui/src/js/event_sock.js +++ b/freedata_gui/src/js/event_sock.js @@ -5,6 +5,7 @@ import { loadAllData, } from "../js/eventHandler.js"; import { addDataToWaterfall } from "../js/waterfallHandler.js"; +import { addDataToAudio } from "../js/audioStreamHandler.js"; // ----------------- init pinia stores ------------- import { setActivePinia } from "pinia"; @@ -22,6 +23,10 @@ function connect(endpoint, dispatcher) { `${wsProtocol}//${hostname}:${adjustedPort}/${endpoint}`, ); + if (endpoint.includes("audio")){ + socket.binaryType = "arraybuffer"; + } + // handle opening socket.addEventListener("open", function () { console.log(`Connected to the WebSocket server: ${endpoint}`); @@ -56,4 +61,5 @@ export function initConnections() { connect("states", stateDispatcher); connect("events", eventDispatcher); connect("fft", addDataToWaterfall); + connect("audio_rx", addDataToAudio); } diff --git a/freedata_gui/src/js/waterfallHandler.js b/freedata_gui/src/js/waterfallHandler.js index 6ea51afaf..8636b7f40 100644 --- a/freedata_gui/src/js/waterfallHandler.js +++ b/freedata_gui/src/js/waterfallHandler.js @@ -28,6 +28,7 @@ export function addDataToWaterfall(data) { }); //window.dispatchEvent(new CustomEvent("wf-data-avail", {bubbles:true, detail: data })); } + /** * Setwaterfall colormap array by index * @param {number} index colormap index to use diff --git a/freedata_gui/src/store/audioStore.js b/freedata_gui/src/store/audioStore.js index cc62888bc..38118f072 100644 --- a/freedata_gui/src/store/audioStore.js +++ b/freedata_gui/src/store/audioStore.js @@ -15,6 +15,7 @@ const skel = [ export const useAudioStore = defineStore("audioStore", () => { const audioInputs = ref([]); const audioOutputs = ref([]); + const rxStream = ref([]); const loadAudioDevices = async () => { try { @@ -35,5 +36,6 @@ export const useAudioStore = defineStore("audioStore", () => { audioInputs, audioOutputs, loadAudioDevices, + rxStream, }; }); diff --git a/freedata_server/api/websocket.py b/freedata_server/api/websocket.py index b586f800b..cab682569 100644 --- a/freedata_server/api/websocket.py +++ b/freedata_server/api/websocket.py @@ -47,3 +47,20 @@ async def websocket_states( ctx.websocket_manager.states_client_list, ctx.state_queue ) + +@router.websocket("/audio_rx") +async def websocket_audio_rx( + websocket: WebSocket, + ctx: AppContext = Depends(get_ctx) +): + """ + WebSocket endpoint for state updates. + """ + await websocket.accept() + await ctx.websocket_manager.handle_connection( + websocket, + ctx.websocket_manager.audio_rx_client_list, + ctx.state_queue + ) + #while True: + # await websocket.send_bytes(b"\x00" * 1024) diff --git a/freedata_server/context.py b/freedata_server/context.py index d0ccf6b25..a81686e36 100644 --- a/freedata_server/context.py +++ b/freedata_server/context.py @@ -19,6 +19,8 @@ def __init__(self, config_file: str): self.modem_events = Queue() self.modem_fft = Queue() self.modem_service = Queue() + self.audio_rx_queue = Queue(maxsize=1) + self.event_manager = EventManager(self, [self.modem_events]) self.state_manager = StateManager(self.state_queue) self.schedule_manager = ScheduleManager(self) diff --git a/freedata_server/modem.py b/freedata_server/modem.py index d2a213fe6..bcbad2cf2 100644 --- a/freedata_server/modem.py +++ b/freedata_server/modem.py @@ -374,6 +374,8 @@ def sd_output_audio_callback(self, outdata: np.ndarray, frames: int, time, statu audio.calculate_fft(audio_8k, self.ctx.modem_fft, self.ctx.state_manager) outdata[:] = chunk.reshape(outdata.shape) + + else: # reset transmitting state only, if we are not actively processing audio # for avoiding a ptt toggle state bug @@ -409,6 +411,10 @@ def sd_input_audio_callback(self, indata: np.ndarray, frames: int, time, status) try: audio_48k = np.frombuffer(indata, dtype=np.int16) audio_8k = self.resampler.resample48_to_8(audio_48k) + + #self.ctx.audio_rx_queue.put({"audio": audio_8k}) + self.ctx.audio_rx_queue.put(audio_8k) + if self.ctx.config_manager.config['AUDIO'].get('rx_auto_audio_level'): audio_8k = audio.normalize_audio(audio_8k) diff --git a/freedata_server/websocket_manager.py b/freedata_server/websocket_manager.py index f31b9249e..a49e65cd0 100644 --- a/freedata_server/websocket_manager.py +++ b/freedata_server/websocket_manager.py @@ -1,6 +1,9 @@ import threading import json import asyncio +from asyncio import run_coroutine_threadsafe + +import numpy as np import structlog @@ -27,10 +30,12 @@ def __init__(self, ctx): self.events_client_list = set() self.fft_client_list = set() self.states_client_list = set() + self.audio_rx_client_list = set() self.events_thread = None self.states_thread = None self.fft_thread = None + self.audio_rx_thread = None async def handle_connection(self, websocket, client_list, event_queue): """Handles a WebSocket connection. @@ -45,6 +50,7 @@ async def handle_connection(self, websocket, client_list, event_queue): event_queue (queue.Queue): The event queue. Currently unused. """ client_list.add(websocket) + self.log.info(f"Client websocket connection established", ws=websocket) while not self.shutdown_flag.is_set(): try: await websocket.receive_text() @@ -70,7 +76,6 @@ def transmit_sock_data_worker(self, client_list, event_queue): while not self.shutdown_flag.is_set(): try: event = event_queue.get(timeout=1) - if event: json_event = json.dumps(event) clients = client_list.copy() @@ -82,6 +87,32 @@ def transmit_sock_data_worker(self, client_list, event_queue): except Exception: continue + def transmit_sock_audio_worker(self, client_list, audio_queue): + """Worker thread function for transmitting data to WebSocket clients. + + This method continuously retrieves events from the provided queue and + sends them as JSON strings to all connected clients in the specified + list. It handles client disconnections gracefully. + + Args: + client_list (set): The set of connected WebSocket clients. + event_queue (queue.Queue): The queue containing events to be transmitted. + """ + while not self.shutdown_flag.is_set(): + #loop = asyncio.get_event_loop() + try: + audio = audio_queue.get(timeout=1) + if isinstance(audio, np.ndarray): + audio = audio.tobytes() + clients = client_list.copy() + for client in clients: + try: + asyncio.run(client.send_bytes(audio)) + + except Exception: + client_list.remove(client) + except Exception: + continue def startWorkerThreads(self, app): @@ -103,7 +134,10 @@ def startWorkerThreads(self, app): self.fft_thread = threading.Thread(target=self.transmit_sock_data_worker, daemon=True, args=(self.fft_client_list, self.ctx.modem_fft)) self.fft_thread.start() - + + self.audio_rx_thread = threading.Thread(target=self.transmit_sock_audio_worker, daemon=True, args=(self.audio_rx_client_list, self.ctx.audio_rx_queue)) + self.audio_rx_thread.start() + def shutdown(self): """Shuts down the WebSocket manager. @@ -117,6 +151,8 @@ def shutdown(self): self.events_thread.join(0.5) if self.states_thread: self.states_thread.join(0.5) - if self.states_thread: + if self.fft_thread: self.fft_thread.join(0.5) + if self.audio_rx_thread: + self.audio_rx_thread.join(0.5) self.log.warning("[SHUTDOWN] websockets closed") \ No newline at end of file From c32e409d527042f0312e104531092e3d1c1ee47a Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 2 Jun 2025 12:33:56 +0200 Subject: [PATCH 002/141] attempt of streaming audio to gui --- freedata_gui/src/components/grid/grid_audio.vue | 2 +- freedata_gui/src/js/audioStreamHandler.js | 15 ++++++++++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/freedata_gui/src/components/grid/grid_audio.vue b/freedata_gui/src/components/grid/grid_audio.vue index ddd395334..12e0c6510 100644 --- a/freedata_gui/src/components/grid/grid_audio.vue +++ b/freedata_gui/src/components/grid/grid_audio.vue @@ -12,7 +12,7 @@ let isPlaying = false; function playRxStream() { if (isPlaying) return; - console.log("Starte PLayback"); + console.log("Start Playback"); audioCtx = new (window.AudioContext || window.webkitAudioContext)({ sampleRate: 8000 }); isPlaying = true; diff --git a/freedata_gui/src/js/audioStreamHandler.js b/freedata_gui/src/js/audioStreamHandler.js index a27684f15..70427a6f5 100644 --- a/freedata_gui/src/js/audioStreamHandler.js +++ b/freedata_gui/src/js/audioStreamHandler.js @@ -5,8 +5,17 @@ setActivePinia(pinia); import { useAudioStore } from "../store/audioStore.js"; const audio = useAudioStore(pinia); +const MAX_BLOCKS = 10; + export function addDataToAudio(data) { - const int16 = new Int16Array(data); // ArrayBuffer → Int16 PCM - const copied = new Int16Array(int16); - audio.rxStream.push(copied); + const int16 = new Int16Array(data); + const copy = new Int16Array(int16); // Kopie für Sicherheit + + const stream = audio.rxStream; + + if (stream.length >= MAX_BLOCKS) { + stream.shift(); + } + + stream.push(copy); } From 15603046015bf9b163a3dd517307d0cd85c705ed Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 2 Jun 2025 15:36:19 +0200 Subject: [PATCH 003/141] adjusted streaming --- freedata_gui/src/components/grid/grid_audio.vue | 6 +++++- freedata_server/modem.py | 14 ++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/freedata_gui/src/components/grid/grid_audio.vue b/freedata_gui/src/components/grid/grid_audio.vue index 12e0c6510..ba30407f3 100644 --- a/freedata_gui/src/components/grid/grid_audio.vue +++ b/freedata_gui/src/components/grid/grid_audio.vue @@ -24,9 +24,13 @@ function playRxStream() { //const BLOCK_DURATION_MS = 1024 / 8000 * 1000; const BLOCK_DURATION_MS = 10 + const MIN_BLOCKS_TO_START = 5 function loop() { if (!isPlaying) return; - + if (audio.rxStream.length < MIN_BLOCKS_TO_START){ + setTimeout(loop, 5); + return; + } if (audio.rxStream.length > 0) { const block = audio.rxStream.shift(); // Nächstes Audioblock holen const float32 = Float32Array.from(block, s => s / 32768); diff --git a/freedata_server/modem.py b/freedata_server/modem.py index bcbad2cf2..e26d3b858 100644 --- a/freedata_server/modem.py +++ b/freedata_server/modem.py @@ -75,6 +75,7 @@ def __init__(self, ctx) -> None: self.MODE = 0 self.rms_counter = 0 + self.AUDIO_STREAMING_CHUNK_SIZE = 600 self.audio_out_queue = queue.Queue() # Make sure our resampler will work @@ -351,6 +352,16 @@ def enqueue_audio_out(self, audio_48k) -> None: return + CHUNK_SIZE = 600 # z. B. 600 Samples = 75ms @ 8kHz + + def enqueue_streaming_audio_chunks(self, audio_block, queue): + total_samples = len(audio_block) + for start in range(0, total_samples, self.AUDIO_STREAMING_CHUNK_SIZE): + end = start + self.AUDIO_STREAMING_CHUNK_SIZE + chunk = audio_block[start:end] + queue.put(chunk.tobytes()) + + def sd_output_audio_callback(self, outdata: np.ndarray, frames: int, time, status) -> None: """Callback function for the audio output stream. @@ -412,8 +423,7 @@ def sd_input_audio_callback(self, indata: np.ndarray, frames: int, time, status) audio_48k = np.frombuffer(indata, dtype=np.int16) audio_8k = self.resampler.resample48_to_8(audio_48k) - #self.ctx.audio_rx_queue.put({"audio": audio_8k}) - self.ctx.audio_rx_queue.put(audio_8k) + self.enqueue_streaming_audio_chunks(audio_8k, self.ctx.audio_rx_queue) if self.ctx.config_manager.config['AUDIO'].get('rx_auto_audio_level'): audio_8k = audio.normalize_audio(audio_8k) From afafd71fb5dbef97a295c0391020bcd9d47abed1 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 2 Jun 2025 15:58:37 +0200 Subject: [PATCH 004/141] adjusted streaming --- .../src/components/grid/grid_audio.vue | 6 +++-- freedata_gui/src/js/audioStreamHandler.js | 1 - freedata_server/modem.py | 22 +++++++++++++------ 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/freedata_gui/src/components/grid/grid_audio.vue b/freedata_gui/src/components/grid/grid_audio.vue index ba30407f3..27d6f2886 100644 --- a/freedata_gui/src/components/grid/grid_audio.vue +++ b/freedata_gui/src/components/grid/grid_audio.vue @@ -22,13 +22,15 @@ function playRxStream() { }); } - //const BLOCK_DURATION_MS = 1024 / 8000 * 1000; - const BLOCK_DURATION_MS = 10 + const BLOCK_DURATION_MS = 600 / 8000 * 1000; + //const BLOCK_DURATION_MS = 10 const MIN_BLOCKS_TO_START = 5 function loop() { if (!isPlaying) return; + console.log(audio.rxStream.length) if (audio.rxStream.length < MIN_BLOCKS_TO_START){ setTimeout(loop, 5); + console.log("timeout....") return; } if (audio.rxStream.length > 0) { diff --git a/freedata_gui/src/js/audioStreamHandler.js b/freedata_gui/src/js/audioStreamHandler.js index 70427a6f5..3ac288fbd 100644 --- a/freedata_gui/src/js/audioStreamHandler.js +++ b/freedata_gui/src/js/audioStreamHandler.js @@ -10,7 +10,6 @@ const MAX_BLOCKS = 10; export function addDataToAudio(data) { const int16 = new Int16Array(data); const copy = new Int16Array(int16); // Kopie für Sicherheit - const stream = audio.rxStream; if (stream.length >= MAX_BLOCKS) { diff --git a/freedata_server/modem.py b/freedata_server/modem.py index e26d3b858..5f85378de 100644 --- a/freedata_server/modem.py +++ b/freedata_server/modem.py @@ -352,14 +352,22 @@ def enqueue_audio_out(self, audio_48k) -> None: return - CHUNK_SIZE = 600 # z. B. 600 Samples = 75ms @ 8kHz - def enqueue_streaming_audio_chunks(self, audio_block, queue): - total_samples = len(audio_block) - for start in range(0, total_samples, self.AUDIO_STREAMING_CHUNK_SIZE): - end = start + self.AUDIO_STREAMING_CHUNK_SIZE - chunk = audio_block[start:end] - queue.put(chunk.tobytes()) + #total_samples = len(audio_block) + #for start in range(0, total_samples, self.AUDIO_STREAMING_CHUNK_SIZE): + # end = start + self.AUDIO_STREAMING_CHUNK_SIZE + # chunk = audio_block[start:end] + # queue.put(chunk.tobytes()) + + block_size = self.AUDIO_STREAMING_CHUNK_SIZE + + pad_length = -len(audio_block) % block_size + padded_data = np.pad(audio_block, (0, pad_length), mode='constant') + sliced_audio_data = padded_data.reshape(-1, block_size) + # add each block to audio out queue + for block in sliced_audio_data: + queue.put(block) + def sd_output_audio_callback(self, outdata: np.ndarray, frames: int, time, status) -> None: From 2834089bd42af319cdaa46f1a9964e22e9993cad Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Tue, 3 Jun 2025 13:09:50 +0200 Subject: [PATCH 005/141] buffer adjustments --- freedata_gui/src/components/grid/grid_audio.vue | 4 ++-- freedata_server/context.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/freedata_gui/src/components/grid/grid_audio.vue b/freedata_gui/src/components/grid/grid_audio.vue index 27d6f2886..b16c42304 100644 --- a/freedata_gui/src/components/grid/grid_audio.vue +++ b/freedata_gui/src/components/grid/grid_audio.vue @@ -22,8 +22,8 @@ function playRxStream() { }); } - const BLOCK_DURATION_MS = 600 / 8000 * 1000; - //const BLOCK_DURATION_MS = 10 + //const BLOCK_DURATION_MS = 600 / 8000 * 1000; + const BLOCK_DURATION_MS = 10 const MIN_BLOCKS_TO_START = 5 function loop() { if (!isPlaying) return; diff --git a/freedata_server/context.py b/freedata_server/context.py index a81686e36..1bac7aa03 100644 --- a/freedata_server/context.py +++ b/freedata_server/context.py @@ -19,7 +19,7 @@ def __init__(self, config_file: str): self.modem_events = Queue() self.modem_fft = Queue() self.modem_service = Queue() - self.audio_rx_queue = Queue(maxsize=1) + self.audio_rx_queue = Queue(maxsize=10) self.event_manager = EventManager(self, [self.modem_events]) self.state_manager = StateManager(self.state_queue) From 0b74baf4f6fa62051951ead55d27090d4645787b Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Tue, 3 Jun 2025 13:29:18 +0200 Subject: [PATCH 006/141] buffer adjustments --- .../src/components/grid/grid_audio.vue | 50 ++++++++++--------- freedata_gui/src/js/audioStreamHandler.js | 5 ++ freedata_gui/src/store/audioStore.js | 49 ++++++++++++++++++ 3 files changed, 81 insertions(+), 23 deletions(-) diff --git a/freedata_gui/src/components/grid/grid_audio.vue b/freedata_gui/src/components/grid/grid_audio.vue index b16c42304..4360b2850 100644 --- a/freedata_gui/src/components/grid/grid_audio.vue +++ b/freedata_gui/src/components/grid/grid_audio.vue @@ -12,44 +12,48 @@ let isPlaying = false; function playRxStream() { if (isPlaying) return; - console.log("Start Playback"); - audioCtx = new (window.AudioContext || window.webkitAudioContext)({ sampleRate: 8000 }); + const SAMPLE_RATE = 8000; + const BLOCK_SIZE = 600; + const BLOCK_DURATION_MS = (BLOCK_SIZE / SAMPLE_RATE) * 1000; // z. B. 75ms + + audioCtx = new (window.AudioContext || window.webkitAudioContext)({ sampleRate: SAMPLE_RATE }); isPlaying = true; + scheduledTime = audioCtx.currentTime; + - if (audioCtx.state === 'suspended') { - audioCtx.resume().then(() => { - console.log("AudioContext resumed"); - }); - } - //const BLOCK_DURATION_MS = 600 / 8000 * 1000; - const BLOCK_DURATION_MS = 10 - const MIN_BLOCKS_TO_START = 5 function loop() { if (!isPlaying) return; - console.log(audio.rxStream.length) - if (audio.rxStream.length < MIN_BLOCKS_TO_START){ - setTimeout(loop, 5); - console.log("timeout....") - return; - } - if (audio.rxStream.length > 0) { - const block = audio.rxStream.shift(); // Nächstes Audioblock holen + + const block = audio.getNextBlock(); + + if (block) { const float32 = Float32Array.from(block, s => s / 32768); - const buffer = audioCtx.createBuffer(1, float32.length, 8000); + const buffer = audioCtx.createBuffer(1, float32.length, SAMPLE_RATE); buffer.copyToChannel(float32, 0); const source = audioCtx.createBufferSource(); source.buffer = buffer; source.connect(audioCtx.destination); - source.start(); + + const now = audioCtx.currentTime; + const playAt = Math.max(now, scheduledTime); + source.start(playAt); + scheduledTime = playAt + buffer.duration; + + } else { - console.log("Buffer empty, waiting..."); + console.warn("Buffer underrun"); } - setTimeout(loop, BLOCK_DURATION_MS); + + setTimeout(loop, 4); } - loop(); + if (audioCtx.state === 'suspended') { + audioCtx.resume().then(loop); + } else { + loop(); + } } function stopRxStream() { diff --git a/freedata_gui/src/js/audioStreamHandler.js b/freedata_gui/src/js/audioStreamHandler.js index 3ac288fbd..dea67dcad 100644 --- a/freedata_gui/src/js/audioStreamHandler.js +++ b/freedata_gui/src/js/audioStreamHandler.js @@ -10,6 +10,7 @@ const MAX_BLOCKS = 10; export function addDataToAudio(data) { const int16 = new Int16Array(data); const copy = new Int16Array(int16); // Kopie für Sicherheit +/* const stream = audio.rxStream; if (stream.length >= MAX_BLOCKS) { @@ -17,4 +18,8 @@ export function addDataToAudio(data) { } stream.push(copy); + */ + audio.addBlock(copy); + + } diff --git a/freedata_gui/src/store/audioStore.js b/freedata_gui/src/store/audioStore.js index 38118f072..c90f7a29b 100644 --- a/freedata_gui/src/store/audioStore.js +++ b/freedata_gui/src/store/audioStore.js @@ -17,6 +17,47 @@ export const useAudioStore = defineStore("audioStore", () => { const audioOutputs = ref([]); const rxStream = ref([]); + const BUFFER_SIZE = 64; + const rxStreamBuffer = new Array(BUFFER_SIZE).fill(null); + + let writePtr = 0; + let readPtr = 0; + let readyBlocks = 0; + + function addBlock(block) { + buffer[writePtr] = block; + writePtr = (writePtr + 1) % BUFFER_SIZE; + + if (readyBlocks < BUFFER_SIZE) { + readyBlocks++; + } else { + readPtr = (readPtr + 1) % BUFFER_SIZE; + } + } + + function getNextBlock() { + if (readyBlocks === 0) return null; + + const block = buffer[readPtr]; + readPtr = (readPtr + 1) % BUFFER_SIZE; + readyBlocks--; + return block; + } + + function resetBuffer() { + writePtr = 0; + readPtr = 0; + readyBlocks = 0; + for (let i = 0; i < BUFFER_SIZE; i++) { + buffer[i] = null; + } + } + + + + + + const loadAudioDevices = async () => { try { const devices = await getAudioDevices(); @@ -37,5 +78,13 @@ export const useAudioStore = defineStore("audioStore", () => { audioOutputs, loadAudioDevices, rxStream, + + addBlock, + getNextBlock, + resetBuffer, + get bufferedBlockCount() { + return readyBlocks; + }, + }; }); From c283197bd6e1ce6f1e70eb1aa280edbba6ba6826 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Wed, 4 Jun 2025 11:18:30 +0200 Subject: [PATCH 007/141] adjusted streaming --- freedata_gui/src/store/audioStore.js | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/freedata_gui/src/store/audioStore.js b/freedata_gui/src/store/audioStore.js index c90f7a29b..6f7945b22 100644 --- a/freedata_gui/src/store/audioStore.js +++ b/freedata_gui/src/store/audioStore.js @@ -25,7 +25,7 @@ export const useAudioStore = defineStore("audioStore", () => { let readyBlocks = 0; function addBlock(block) { - buffer[writePtr] = block; + rxStreamBuffer[writePtr] = block; writePtr = (writePtr + 1) % BUFFER_SIZE; if (readyBlocks < BUFFER_SIZE) { @@ -38,7 +38,7 @@ export const useAudioStore = defineStore("audioStore", () => { function getNextBlock() { if (readyBlocks === 0) return null; - const block = buffer[readPtr]; + const block = rxStreamBuffer[readPtr]; readPtr = (readPtr + 1) % BUFFER_SIZE; readyBlocks--; return block; @@ -49,7 +49,7 @@ export const useAudioStore = defineStore("audioStore", () => { readPtr = 0; readyBlocks = 0; for (let i = 0; i < BUFFER_SIZE; i++) { - buffer[i] = null; + rxStreamBuffer[i] = null; } } @@ -78,7 +78,6 @@ export const useAudioStore = defineStore("audioStore", () => { audioOutputs, loadAudioDevices, rxStream, - addBlock, getNextBlock, resetBuffer, From 9f17b2bfc3d11de84ed7741b15a677d30a2b9546 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Wed, 4 Jun 2025 11:39:21 +0200 Subject: [PATCH 008/141] adjusted streaming --- .../src/components/grid/grid_audio.vue | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/freedata_gui/src/components/grid/grid_audio.vue b/freedata_gui/src/components/grid/grid_audio.vue index 4360b2850..079674de6 100644 --- a/freedata_gui/src/components/grid/grid_audio.vue +++ b/freedata_gui/src/components/grid/grid_audio.vue @@ -6,27 +6,26 @@ setActivePinia(pinia); import { useAudioStore } from '@/store/audioStore'; const audio = useAudioStore(pinia); -let audioCtx = null; -let isPlaying = false; +var audioCtx = null; +var isPlaying = false; function playRxStream() { if (isPlaying) return; const SAMPLE_RATE = 8000; const BLOCK_SIZE = 600; - const BLOCK_DURATION_MS = (BLOCK_SIZE / SAMPLE_RATE) * 1000; // z. B. 75ms + const BLOCK_DURATION_MS = (BLOCK_SIZE / SAMPLE_RATE) * 1000; // ≈75ms audioCtx = new (window.AudioContext || window.webkitAudioContext)({ sampleRate: SAMPLE_RATE }); + let scheduledTime = audioCtx.currentTime; isPlaying = true; - scheduledTime = audioCtx.currentTime; - + console.log("▶️ Echtzeit-Wiedergabe gestartet"); function loop() { if (!isPlaying) return; const block = audio.getNextBlock(); - if (block) { const float32 = Float32Array.from(block, s => s / 32768); const buffer = audioCtx.createBuffer(1, float32.length, SAMPLE_RATE); @@ -35,18 +34,13 @@ function playRxStream() { const source = audioCtx.createBufferSource(); source.buffer = buffer; source.connect(audioCtx.destination); - - const now = audioCtx.currentTime; - const playAt = Math.max(now, scheduledTime); - source.start(playAt); - scheduledTime = playAt + buffer.duration; - + source.start(0); } else { - console.warn("Buffer underrun"); + console.warn("⛔ Audio buffer underrun"); } - setTimeout(loop, 4); + setTimeout(loop, 4); } if (audioCtx.state === 'suspended') { From 5de53bfadf00dd6050bbb9bd116a75bbfa8f326e Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Wed, 4 Jun 2025 14:06:26 +0200 Subject: [PATCH 009/141] adjusted streaming --- freedata_gui/src/components/grid/grid_audio.vue | 4 ++-- freedata_gui/src/store/audioStore.js | 2 +- freedata_server/modem.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/freedata_gui/src/components/grid/grid_audio.vue b/freedata_gui/src/components/grid/grid_audio.vue index 079674de6..51ba491d2 100644 --- a/freedata_gui/src/components/grid/grid_audio.vue +++ b/freedata_gui/src/components/grid/grid_audio.vue @@ -13,7 +13,7 @@ function playRxStream() { if (isPlaying) return; const SAMPLE_RATE = 8000; - const BLOCK_SIZE = 600; + const BLOCK_SIZE = 300; const BLOCK_DURATION_MS = (BLOCK_SIZE / SAMPLE_RATE) * 1000; // ≈75ms audioCtx = new (window.AudioContext || window.webkitAudioContext)({ sampleRate: SAMPLE_RATE }); @@ -37,7 +37,7 @@ function playRxStream() { source.start(0); } else { - console.warn("⛔ Audio buffer underrun"); + //console.warn("⛔ Audio buffer underrun"); } setTimeout(loop, 4); diff --git a/freedata_gui/src/store/audioStore.js b/freedata_gui/src/store/audioStore.js index 6f7945b22..6a94f80e8 100644 --- a/freedata_gui/src/store/audioStore.js +++ b/freedata_gui/src/store/audioStore.js @@ -17,7 +17,7 @@ export const useAudioStore = defineStore("audioStore", () => { const audioOutputs = ref([]); const rxStream = ref([]); - const BUFFER_SIZE = 64; + const BUFFER_SIZE = 1024; const rxStreamBuffer = new Array(BUFFER_SIZE).fill(null); let writePtr = 0; diff --git a/freedata_server/modem.py b/freedata_server/modem.py index 5f85378de..86ca708ed 100644 --- a/freedata_server/modem.py +++ b/freedata_server/modem.py @@ -75,7 +75,7 @@ def __init__(self, ctx) -> None: self.MODE = 0 self.rms_counter = 0 - self.AUDIO_STREAMING_CHUNK_SIZE = 600 + self.AUDIO_STREAMING_CHUNK_SIZE = 2400 self.audio_out_queue = queue.Queue() # Make sure our resampler will work From e73a8ca6f6d568d836ddc23ddd43daa392a2f842 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Wed, 4 Jun 2025 20:27:39 +0200 Subject: [PATCH 010/141] adjusted streaming --- .../src/components/grid/grid_audio.vue | 51 ++++++++++++++----- freedata_gui/src/locales/de_Deutsch.json | 1 + freedata_gui/src/locales/en_English.json | 1 + 3 files changed, 41 insertions(+), 12 deletions(-) diff --git a/freedata_gui/src/components/grid/grid_audio.vue b/freedata_gui/src/components/grid/grid_audio.vue index 51ba491d2..0ec7599b7 100644 --- a/freedata_gui/src/components/grid/grid_audio.vue +++ b/freedata_gui/src/components/grid/grid_audio.vue @@ -1,29 +1,33 @@ diff --git a/freedata_gui/src/locales/de_Deutsch.json b/freedata_gui/src/locales/de_Deutsch.json index 2976ea333..ddee74bf4 100644 --- a/freedata_gui/src/locales/de_Deutsch.json +++ b/freedata_gui/src/locales/de_Deutsch.json @@ -85,6 +85,7 @@ "downloadpreset": "Einstell. Herunterladen", "downloadpreset_help": "Lade die GUI-Einstellungen herunter um sie zu speichern und zu teilen", "components": { + "audiostream": "Audio Stream", "tune": "Tune", "stop_help": "Sitzung abbrechen und Aussendung beenden", "transmissioncharts": "Übertragungs-Diagramme", diff --git a/freedata_gui/src/locales/en_English.json b/freedata_gui/src/locales/en_English.json index 5a34e612c..e05d3361e 100644 --- a/freedata_gui/src/locales/en_English.json +++ b/freedata_gui/src/locales/en_English.json @@ -85,6 +85,7 @@ "downloadpreset": "Download Preset", "downloadpreset_help": "Download preset file for sharing or saving", "components": { + "audiostream": "Audio Stream", "tune": "Tune", "stop_help": "Abort session and stop transmissions", "transmissioncharts": "Transmission charts", From 20ed1ad501a18e23ee69c9171295f2f0e36c358f Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Wed, 4 Jun 2025 20:34:07 +0200 Subject: [PATCH 011/141] adjusted streaming --- freedata_gui/src/components/dynamic_components.vue | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/freedata_gui/src/components/dynamic_components.vue b/freedata_gui/src/components/dynamic_components.vue index cb5743ffe..c966023e7 100644 --- a/freedata_gui/src/components/dynamic_components.vue +++ b/freedata_gui/src/components/dynamic_components.vue @@ -329,14 +329,14 @@ const gridWidgets = [ ), new gridWidget( grid_audio, - { x: 16, y: 8, w: 2, h: 8 }, - "Audio widget", + { x: 16, y: 8, w: 4, h: 24 }, + "Audio Stream", false, true, "Audio", 24, false, - { x: 16, y: 8, w: 2, h: 8 } + { x: 16, y: 8, w: 4, h: 24 } ) //Next new widget ID should be 24 ]; From 5433a001b47fed2bc3d7c5450b9f9146c7a65126 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Sun, 22 Jun 2025 21:49:36 +0200 Subject: [PATCH 012/141] first basic work on NORM implementation --- freedata_server/command_norm.py | 47 +++++ freedata_server/data_frame_factory.py | 98 +++++++++- freedata_server/frame_dispatcher.py | 4 + freedata_server/frame_handler.py | 20 +- freedata_server/frame_handler_norm.py | 22 +++ freedata_server/modem_frametypes.py | 4 + freedata_server/norm/__init__.py | 0 freedata_server/norm/norm_transmission.py | 123 +++++++++++++ freedata_server/norm/norm_transmission_irs.py | 42 +++++ freedata_server/norm/norm_transmission_iss.py | 83 +++++++++ .../norm/norm_transmission_resend.py | 1 + tests/test_norm_protocol.py | 174 ++++++++++++++++++ 12 files changed, 606 insertions(+), 12 deletions(-) create mode 100644 freedata_server/command_norm.py create mode 100644 freedata_server/frame_handler_norm.py create mode 100644 freedata_server/norm/__init__.py create mode 100644 freedata_server/norm/norm_transmission.py create mode 100644 freedata_server/norm/norm_transmission_irs.py create mode 100644 freedata_server/norm/norm_transmission_iss.py create mode 100644 freedata_server/norm/norm_transmission_resend.py create mode 100644 tests/test_norm_protocol.py diff --git a/freedata_server/command_norm.py b/freedata_server/command_norm.py new file mode 100644 index 000000000..4947f4e0c --- /dev/null +++ b/freedata_server/command_norm.py @@ -0,0 +1,47 @@ +import queue +from command import TxCommand +import api_validations +import base64 +from queue import Queue +import numpy as np +import threading +from norm.norm_transmission_iss import NormTransmissionISS + +class Norm(TxCommand): + def set_params_from_api(self, apiParams): + self.origin = apiParams['origin'] + if not api_validations.validate_freedata_callsign(self.origin): + self.origin = f"{self.origin}-0" + + self.domain = apiParams['domain'] + if not api_validations.validate_freedata_callsign(self.domain): + self.domain = f"{self.domain}-0" + + self.data = base64.b64decode(apiParams['data']) + + if 'priority' not in apiParams: + self.priority = 1 + else: + self.priority = apiParams['priority'] + + self.msgtype = apiParams['type'] + self.gridsquare = apiParams['gridsquare'] + + + def run(self): + try: + self.emit_event() + self.logger.info(self.log_message()) + + # wait some random time and wait if we have an ongoing codec2 transmission + # on our channel. This should prevent some packet collision + random_delay = np.random.randint(0, 6) + threading.Event().wait(random_delay) + self.ctx.state_manager.channel_busy_condition_codec2.wait(0.5) + + NormTransmissionISS(self.ctx, self.origin, self.domain, self.gridsquare, self.data, self.priority, self.msgtype).prepare_and_transmit() + + except Exception as e: + self.log(f"Error starting NORM transmission: {e}", isWarning=True) + + return False \ No newline at end of file diff --git a/freedata_server/data_frame_factory.py b/freedata_server/data_frame_factory.py index 1c2d4435c..344a4eaf3 100644 --- a/freedata_server/data_frame_factory.py +++ b/freedata_server/data_frame_factory.py @@ -8,6 +8,7 @@ class DataFrameFactory: LENGTH_SIG0_FRAME = 14 LENGTH_SIG1_FRAME = 14 LENGTH_ACK_FRAME = 3 + LENGTH_NORM_FRAME = 126 """ helpers.set_flag(byte, 'DATA-ACK-NACK', True, FLAG_POSITIONS) @@ -28,6 +29,10 @@ class DataFrameFactory: 'ANNOUNCE_ARQ': 1, # Bit-position for announcing an ARQ session } + NORM_FLAGS = { + 'LAST_DATA': 0, # Bit-position for indicating the LAST DATA state + } + def __init__(self, ctx): self.ctx = ctx @@ -41,6 +46,7 @@ def __init__(self, ctx): self._load_ping_templates() self._load_arq_templates() self._load_p2p_connection_templates() + self._load_norm_templates() def _load_broadcast_templates(self): # cq frame @@ -224,7 +230,50 @@ def _load_p2p_connection_templates(self): "session_id": 1, } + def _load_norm_templates(self): + # data frame + self.template_list[FR_TYPE.NORM_DATA.value] = { + "frame_length": self.LENGTH_NORM_FRAME, + "origin": 6, + "domain": 6, + "gridsquare": 4, + "flag": 1, + "timestamp": 4, + "burst_info": 1, + "payload_size": 1, + "payload_data": 30 + } + # repair frame + # FIXME + self.template_list[FR_TYPE.NORM_REPAIR.value] = { + "frame_length": self.LENGTH_NORM_FRAME, + "origin": 6, + "domain": 6, + "flag": 1, + "timestamp": 4, + "burst_info": 1, + "payload_size": 1, + "payload_data": 34 + } + + # nack frame + # FIXME + self.template_list[FR_TYPE.NORM_NACK.value] = { + "frame_length": self.LENGTH_NORM_FRAME, + "origin": 6, + "domain": 4, + "flag": 2 + } + + # cmd frame + # FIXME + self.template_list[FR_TYPE.NORM_CMD.value] = { + "frame_length": self.LENGTH_NORM_FRAME, + "origin": 6, + "domain": 4, + "flag": 1 + } def construct(self, frametype, content, frame_length = LENGTH_SIG1_FRAME): frame_template = self.template_list[frametype.value] @@ -250,14 +299,13 @@ def construct(self, frametype, content, frame_length = LENGTH_SIG1_FRAME): #print(item_length) #print(content) if buffer_position + item_length > frame_length: - raise OverflowError("Frame data overflow!") + raise OverflowError(f"Frame data overflow! {buffer_position + item_length} of max {frame_length}") frame[buffer_position: buffer_position + item_length] = content[key] buffer_position += item_length return frame def deconstruct(self, frame, mode_name=None): - buffer_position = 1 # Handle the case where the frame type is not recognized #raise ValueError(f"Unknown frame type: {frametype}") @@ -266,6 +314,9 @@ def deconstruct(self, frame, mode_name=None): frame_template = self.template_list.get(frametype) frame = bytes([frametype]) + frame else: + print("------------------------") + print(frame) + print(type(frame)) # Extract frametype and get the corresponding template frametype = int.from_bytes(frame[:1], "big") frame_template = self.template_list.get(frametype) @@ -284,9 +335,12 @@ def deconstruct(self, frame, mode_name=None): data = frame[buffer_position: buffer_position + item_length] # Process the data based on the key - if key in ["origin", "destination"]: + if key in ["origin", "destination", "domain"]: extracted_data[key] = helpers.bytes_to_callsign(data).decode() + elif key in ["payload_data"]: + extracted_data[key] = data + elif key in ["origin_crc", "destination_crc", "total_crc"]: extracted_data[key] = data.hex() @@ -295,9 +349,9 @@ def deconstruct(self, frame, mode_name=None): elif key in ["session_id", "speed_level", "frames_per_burst", "version", - "offset", "total_length", "state", "type", "maximum_bandwidth", "protocol_version"]: + "offset", "total_length", "state", "type", "maximum_bandwidth", "protocol_version", "burst_info", "timestamp", "payload_size"]: extracted_data[key] = int.from_bytes(data, 'big') - + print(key, data) elif key in ["snr"]: extracted_data[key] = helpers.snr_from_bytes(data) @@ -327,6 +381,14 @@ def deconstruct(self, frame, mode_name=None): # get_flag returns True or False based on the bit value at the flag's position extracted_data[key][flag] = helpers.get_flag(data, flag, flag_dict) + if frametype in [FR_TYPE.NORM_DATA.value, FR_TYPE.NORM_NACK.value, FR_TYPE.NORM_REPAIR.value, FR_TYPE.NORM_CMD.value]: + extracted_data[key] = data + # flag_dict = self.NORM_FLAGS + # for flag in flag_dict: + # # Update extracted_data with the status of each flag + # # get_flag returns True or False based on the bit value at the flag's position + # extracted_data[key][flag] = helpers.get_flag(data, flag, flag_dict) + else: extracted_data[key] = data @@ -604,3 +666,29 @@ def build_p2p_connection_disconnect_ack(self, session_id): "session_id": session_id.to_bytes(1, 'big'), } return self.construct(FR_TYPE.P2P_CONNECTION_DISCONNECT_ACK, payload) + + def build_norm_data(self, origin, domain, gridsquare, timestamp, burst_info, payload_size, payload_data, flag): + + payload = { + "origin": helpers.callsign_to_bytes(origin), + "domain": helpers.callsign_to_bytes(domain), + "gridsquare": helpers.encode_grid(gridsquare), + "flag": flag.to_bytes(1, 'big'), + "timestamp": timestamp.to_bytes(4, 'big'), + "burst_info": burst_info.to_bytes(1, 'big'), + "payload_size": payload_size.to_bytes(1, 'big'), + "payload_data": payload_data, + } + return self.construct(FR_TYPE.NORM_DATA, payload) + + + + + def build_norm_nack(self): + pass + + def build_norm_repair(self, origin, domain, timestamp, burst_info, payload_size, payload, flag=None): + pass + + def build_norm_cmd(self): + pass \ No newline at end of file diff --git a/freedata_server/frame_dispatcher.py b/freedata_server/frame_dispatcher.py index 37439ca91..0a9251f19 100644 --- a/freedata_server/frame_dispatcher.py +++ b/freedata_server/frame_dispatcher.py @@ -15,6 +15,7 @@ from frame_handler_arq_session import ARQFrameHandler from frame_handler_p2p_connection import P2PConnectionFrameHandler from frame_handler_beacon import BeaconFrameHandler +from frame_handler_norm import NORMFrameHandler @@ -54,6 +55,9 @@ class DISPATCHER: FR_TYPE.PING_ACK.value: {"class": FrameHandler, "name": "PING ACK"}, FR_TYPE.PING.value: {"class": PingFrameHandler, "name": "PING"}, FR_TYPE.QRV.value: {"class": FrameHandler, "name": "QRV"}, + FR_TYPE.NORM_DATA.value: {"class": NORMFrameHandler, "name": "NORM DATA"}, + + #FR_TYPE.IS_WRITING.value: {"class": FrameHandler, "name": "IS_WRITING"}, #FR_TYPE.FEC.value: {"class": FrameHandler, "name": "FEC"}, #FR_TYPE.FEC_WAKEUP.value: {"class": FrameHandler, "name": "FEC WAKEUP"}, diff --git a/freedata_server/frame_handler.py b/freedata_server/frame_handler.py index 70ff03e86..3a5b87e9a 100644 --- a/freedata_server/frame_handler.py +++ b/freedata_server/frame_handler.py @@ -7,7 +7,7 @@ from message_system_db_manager import DatabaseManager from message_system_db_station import DatabaseManagerStations from message_system_db_messages import DatabaseManagerMessages - +from collections.abc import Iterable import maidenhead TESTMODE = False @@ -81,6 +81,13 @@ def is_frame_for_me(self): if session_id in self.ctx.state_manager.arq_iss_sessions: valid = True + # check for NORM data + elif ft in ['NORM_DATA']: + # TODO + # maybe we can add a list of domains, we are listening to in state manager? + valid = True + + # check for p2p connection elif ft in ['P2P_CONNECTION_CONNECT']: #Need to make sure this does not affect any other features in FreeDATA. @@ -184,7 +191,7 @@ def add_to_activity_list(self): if "session_id" in frame: activity["session_id"] = frame["session_id"] - if "flag" in frame: + if "flag" in frame and isinstance(frame["flag"], (list, dict, Iterable)): if "AWAY_FROM_KEY" in frame["flag"]: activity["away_from_key"] = frame["flag"]["AWAY_FROM_KEY"] @@ -216,7 +223,7 @@ def add_to_heard_stations(self): distance_miles = distance_dict['miles'] away_from_key = False - if "flag" in self.details['frame']: + if "flag" in self.details['frame'] and isinstance(frame["flag"], (list, dict, Iterable)): if "AWAY_FROM_KEY" in self.details['frame']["flag"]: away_from_key = self.details['frame']["flag"]["AWAY_FROM_KEY"] @@ -265,7 +272,9 @@ def make_event(self): event['distance_kilometers'] = 0 event['distance_miles'] = 0 - if "flag" in self.details['frame'] and "AWAY_FROM_KEY" in self.details['frame']["flag"]: + + + if "flag" in self.details and isinstance(self.details["flag"], (list, dict, Iterable)) and "AWAY_FROM_KEY" in self.details['frame']["flag"]: event['away_from_key'] = self.details['frame']["flag"]["AWAY_FROM_KEY"] return event @@ -279,7 +288,6 @@ def emit_event(self): broadcasts this event through the event manager. """ event_data = self.make_event() - print(event_data) self.ctx.event_manager.broadcast(event_data) def get_tx_mode(self): @@ -347,8 +355,6 @@ def handle(self, frame, snr, frequency_offset, freedv_inst, bytes_per_frame): self.details['freedv_inst'] = freedv_inst self.details['bytes_per_frame'] = bytes_per_frame - print(self.details) - if 'origin' not in self.details['frame'] and 'session_id' in self.details['frame']: dxcall = self.ctx.state_manager.get_dxcall_by_session_id(self.details['frame']['session_id']) if dxcall: diff --git a/freedata_server/frame_handler_norm.py b/freedata_server/frame_handler_norm.py new file mode 100644 index 000000000..d058eb103 --- /dev/null +++ b/freedata_server/frame_handler_norm.py @@ -0,0 +1,22 @@ +import threading + +import frame_handler_ping +import helpers +import data_frame_factory +import frame_handler +from message_system_db_messages import DatabaseManagerMessages +import numpy as np + +from norm.norm_transmission_irs import NormTransmissionIRS + + +class NORMFrameHandler(frame_handler.FrameHandler): + + def follow_protocol(self): + #self.logger.debug(f"[NORM] handling burst:{self.details}") + + #origin = self.details["frame"]["origin"] + #print(origin) + + + NormTransmissionIRS(self.details["frame"]) \ No newline at end of file diff --git a/freedata_server/modem_frametypes.py b/freedata_server/modem_frametypes.py index 5b524dbfa..6f802b43f 100644 --- a/freedata_server/modem_frametypes.py +++ b/freedata_server/modem_frametypes.py @@ -25,6 +25,10 @@ class FRAME_TYPE(Enum): #MESH_BROADCAST = 100 #MESH_SIGNALLING_PING = 101 #MESH_SIGNALLING_PING_ACK = 102 + NORM_DATA = 100 + NORM_NACK = 101 + NORM_REPAIR = 102 + NORM_CMD = 103 CQ = 200 QRV = 201 PING = 210 diff --git a/freedata_server/norm/__init__.py b/freedata_server/norm/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/freedata_server/norm/norm_transmission.py b/freedata_server/norm/norm_transmission.py new file mode 100644 index 000000000..c7cef287b --- /dev/null +++ b/freedata_server/norm/norm_transmission.py @@ -0,0 +1,123 @@ +# base class for norm transmission + + +import structlog +import time +import data_frame_factory +from enum import IntEnum + +class NORMMsgType(IntEnum): + UNDEFINED = 0 + MESSAGE = 1 # Generic text/data message + POSITION = 2 # GPS or grid locator info + SITREP = 3 # Situation report + PING = 4 # Ping or keepalive + ACK = 5 # Acknowledgement + COMMAND = 6 # Control or remote command + STATUS = 7 # System or device status + ALERT = 8 # High-priority broadcast + + +class NORMMsgPriority(IntEnum): + LOW = 0 + NORMAL = 1 + HIGH = 2 + CRITICAL = 3 + ALERT = 4 + EMERGENCY = 5 + +class NormTransmission: + def __init__(self, ctx, origin, domain): + self.logger = structlog.get_logger(type(self).__name__) + self.ctx = ctx + self.origin = origin + self.domain = domain + + self.frame_factory = data_frame_factory.DataFrameFactory(self.ctx) + + def log(self, message, isWarning=False): + """Logs a message with session context. + + Logs a message, including the class name, session ID, and current state, + using the appropriate log level (warning or info). + + Args: + message: The message to be logged. + isWarning: A boolean indicating whether the message should be logged as a warning. + """ + msg = f"[{type(self).__name__}][origin={self.origin},domain={self.domain}][state={self.state.name}]: {message}" + logger = self.logger.warn if isWarning else self.logger.info + logger(msg) + + def set_state(self, state): + + self.last_state_change_timestamp = time.time() + if self.state == state: + self.log(f"{type(self).__name__} state {self.state.name} unchanged.") + else: + self.log(f"{type(self).__name__} state change from {self.state.name} to {state.name} at {self.last_state_change_timestamp}") + self.state = state + + def on_frame_received(self, frame): + """Handles received frames based on the current session state. + + This method processes incoming frames, triggering state transitions and + data handling based on the frame type and current session state. + It logs received frame types and ignores unknown state transitions. + + Args: + frame: The received frame. + """ + self.event_frame_received.set() + self.log(f"Received {frame['frame_type']}") + frame_type = frame['frame_type_int'] + if self.state in self.STATE_TRANSITION and frame_type in self.STATE_TRANSITION[self.state]: + action_name = self.STATE_TRANSITION[self.state][frame_type] + received_data, type_byte = getattr(self, action_name)(frame) + + if isinstance(received_data, bytearray) and isinstance(type_byte, int): + self.arq_data_type_handler.dispatch(type_byte, received_data, + self.update_histograms(len(received_data), len(received_data))) + return + + self.log(f"Ignoring unknown transition from state {self.state.name} with frame {frame['frame_type']}") + + def encode_flags(self, msg_type, priority, is_last): + """ + Encodes message type, priority and 'last burst' flag into a single byte. + + Bit layout: + Bit 7 → is_last (1 = letzter Burst) + Bits 6–3 → msg_type (0–15) + Bits 2–0 → priority (0–7) + """ + if isinstance(msg_type, IntEnum): # e.g., MsgType + msg_type = int(msg_type) + + assert 0 <= msg_type <= 15, "msg_type must be 0–15" + assert 0 <= priority <= 7, "priority must be 0–7" + + return ((1 if is_last else 0) << 7) | ((msg_type & 0x0F) << 3) | (priority & 0x07) + + + def decode_flags(self, flags): + """ + Decodes a flags byte into (is_last, msg_type, priority). + + Bit layout: + Bit 7 → is_last + Bits 6–3 → msg_type (0–15) + Bits 2–0 → priority (0–7) + """ + is_last = bool((flags >> 7) & 0x01) + msg_type = (flags >> 3) & 0x0F + priority = flags & 0x07 + return is_last, msg_type, priority + + def encode_burst_info(self, burst_number, total_bursts): + return ((burst_number & 0x0F) << 4) | (total_bursts & 0x0F) + + def decode_burst_info(self, burst_info): + burst_number = (burst_info >> 4) & 0x0F + burst_total = burst_info & 0x0F + return burst_number, burst_total \ No newline at end of file diff --git a/freedata_server/norm/norm_transmission_irs.py b/freedata_server/norm/norm_transmission_irs.py new file mode 100644 index 000000000..c2aa938f5 --- /dev/null +++ b/freedata_server/norm/norm_transmission_irs.py @@ -0,0 +1,42 @@ +# file for handling received data +from norm.norm_transmission import NormTransmission + +class NormTransmissionIRS(NormTransmission): + MAX_PAYLOAD_SIZE = 96 + + def __init__(self, frame): + print("burst:", frame) + + is_last, msg_type, priority = self.decode_flags(frame["flag"]) + burst_number, total_bursts = self.decode_burst_info(frame["burst_info"]) + payload_size = frame["payload_size"] + payload_data = frame["payload_data"] + self.origin = frame["origin"] + self.domain = frame["domain"] + self.gridsquare = frame["gridsquare"] + + # FIXME + #if payload_size > len(frame["payload_data"]) and total_bursts > 1: + # payload_data = frame["payload_data"] + #else: + # payload_data = frame["payload_data"][:self.MAX_PAYLOAD_SIZE * burst_number] + + + + + print("####################################") + + print("payload_size:", payload_size) + print("payload_data:", payload_data) + + print("origin", self.origin) + print("domain", self.domain) + print("gridsquare", self.gridsquare) + print("is_last", is_last) + print("msg_type", msg_type) + print("priority", priority) + print("burst_number", burst_number) + print("total_bursts", total_bursts) + + # TODO: + # add data to database or update it \ No newline at end of file diff --git a/freedata_server/norm/norm_transmission_iss.py b/freedata_server/norm/norm_transmission_iss.py new file mode 100644 index 000000000..564e14533 --- /dev/null +++ b/freedata_server/norm/norm_transmission_iss.py @@ -0,0 +1,83 @@ +# file for handling transmitting data +from norm.norm_transmission import NormTransmission +from norm.norm_transmission import NORMMsgType, NORMMsgPriority +from enum import Enum +import time +from codec2 import FREEDV_MODE + +class NORM_ISS_State(Enum): + NEW = 0 + TRANSMITTING = 1 + ENDED = 2 + FAILED = 3 + ABORTING = 4 + ABORTED = 5 + +class NormTransmissionISS(NormTransmission): + MAX_PAYLOAD_SIZE = 96 + + def __init__(self, ctx, origin, domain, gridsquare, data, priority=NORMMsgPriority.NORMAL, message_type=NORMMsgType.UNDEFINED): + + super().__init__(ctx, origin, domain) + self.ctx = ctx + self.origin = origin + self.domain = domain + self.gridsquare = gridsquare + self.data = data + self.priority = priority + self.message_type = message_type + self.payload_size = len(data) + + self.timestamp = int(time.time()) + + self.state = NORM_ISS_State.NEW + + self.log("Initialized") + + def prepare_and_transmit(self): + bursts = self.create_bursts() + self.transmit_bursts(bursts) + + def create_bursts(self): + self.message_type = NORMMsgType.MESSAGE + self.message_priority = NORMMsgPriority.NORMAL + + + full_data = self.data + + total_bursts = (len(full_data) + self.MAX_PAYLOAD_SIZE - 1) // self.MAX_PAYLOAD_SIZE + bursts = [] + + for burst_number in range(1, total_bursts + 1): + offset = (burst_number-1) * self.MAX_PAYLOAD_SIZE + payload = full_data[offset: offset + self.MAX_PAYLOAD_SIZE] + + burst_info = self.encode_burst_info(burst_number, total_bursts) + + # set flag for last burst + is_last = (burst_number == total_bursts - 1) + flags = self.encode_flags( + msg_type=self.message_type, + priority=self.message_priority, + is_last=is_last + ) + + burst_frame = self.frame_factory.build_norm_data( + origin=self.origin, + domain=self.domain, + gridsquare=self.gridsquare, + timestamp=self.timestamp, + burst_info=burst_info, + payload_size=len(payload), + payload_data=payload, + flag=flags + ) + print(burst_frame) + bursts.append(burst_frame) + + return bursts + + def transmit_bursts(self, bursts): + + for burst in bursts: + self.ctx.rf_modem.transmit(FREEDV_MODE.datac4, 1, 100, burst) \ No newline at end of file diff --git a/freedata_server/norm/norm_transmission_resend.py b/freedata_server/norm/norm_transmission_resend.py new file mode 100644 index 000000000..cba14075d --- /dev/null +++ b/freedata_server/norm/norm_transmission_resend.py @@ -0,0 +1 @@ +# file for "healing requested transmissions... \ No newline at end of file diff --git a/tests/test_norm_protocol.py b/tests/test_norm_protocol.py new file mode 100644 index 000000000..1654bcb03 --- /dev/null +++ b/tests/test_norm_protocol.py @@ -0,0 +1,174 @@ +import sys +import time +import unittest +import unittest.mock +import queue +import threading +import random +import structlog +import base64 +import numpy as np + +sys.path.append('freedata_server') + +from config import CONFIG +from context import AppContext +from event_manager import EventManager +from state_manager import StateManager +from data_frame_factory import DataFrameFactory +from frame_dispatcher import DISPATCHER +import codec2 +import command_norm + + +class TestModem: + def __init__(self, event_q, state_q): + self.data_queue_received = queue.Queue() + self.demodulator = unittest.mock.Mock() + self.event_manager = EventManager([event_q]) + self.logger = structlog.get_logger('Modem') + self.states = StateManager(state_q) + + def getFrameTransmissionTime(self, mode): + samples = 0 + c2instance = codec2.open_instance(mode.value) + samples += codec2.api.freedv_get_n_tx_preamble_modem_samples(c2instance) + samples += codec2.api.freedv_get_n_tx_modem_samples(c2instance) + samples += codec2.api.freedv_get_n_tx_postamble_modem_samples(c2instance) + time = samples / 8000 + return time + + def transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray) -> bool: + tx_time = self.getFrameTransmissionTime(mode) + 0.1 + self.logger.info(f"TX {tx_time} seconds...") + threading.Event().wait(tx_time) + + transmission = { + 'mode': mode, + 'bytes': frames, + } + self.data_queue_received.put(transmission) + + +class TestMessageProtocol(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.logger = structlog.get_logger("TESTS") + + # ISS + + cls.ctx_ISS = AppContext('freedata_server/config.ini.example') + cls.ctx_ISS.TESTMODE = True + cls.ctx_ISS.startup() + + + # IRS + cls.ctx_IRS = AppContext('freedata_server/config.ini.example') + cls.ctx_IRS.TESTMODE = True + cls.ctx_IRS.startup() + + # simulate a busy condition + cls.ctx_IRS.state_manager.channel_busy_slot = [True, False, False, False, False] + # Frame loss probability in % + cls.loss_probability = 0 + + + cls.channels_running = False + + @classmethod + def tearDownClass(cls): + cls.ctx_IRS.shutdown() + cls.ctx_ISS.shutdown() + + + def channelWorker(self, ctx_a, ctx_b): + while self.channels_running: + try: + # Station A gets the data from its transmit queue + transmission = ctx_a.TESTMODE_TRANSMIT_QUEUE.get(timeout=1) + print(f"Station A sending: {transmission[1]}", len(transmission[1]), transmission[0]) + + transmission[1] += bytes(2) # 2bytes crc simulation + + if random.randint(0, 100) < self.loss_probability: + self.logger.info(f"[{threading.current_thread().name}] Frame lost...") + continue + + # Forward data from Station A to Station B's receive queue + if ctx_b: + for burst in transmission: + ctx_b.TESTMODE_RECEIVE_QUEUE.put(burst) + self.logger.info(f"Data forwarded to Station B") + + frame_bytes = transmission[1] + if len(frame_bytes) == 5: + mode_name = "SIGNALLING_ACK" + else: + mode_name = transmission[0] + + snr = 15 + ctx_b.service_manager.frame_dispatcher.process_data( + frame_bytes, None, len(frame_bytes), snr, 0, mode_name=mode_name + ) + + except queue.Empty: + continue + + self.logger.info(f"[{threading.current_thread().name}] Channel closed.") + + def waitForSession(self, event_queue, outbound=False): + key = 'arq-transfer-outbound' if outbound else 'arq-transfer-inbound' + while self.channels_running: + try: + ev = event_queue.get(timeout=2) + if key in ev and ('success' in ev[key] or 'ABORTED' in ev[key]): + self.logger.info(f"[{threading.current_thread().name}] {key} session ended.") + break + except queue.Empty: + continue + + def establishChannels(self): + self.channels_running = True + self.channelA = threading.Thread(target=self.channelWorker,args=[self.ctx_ISS, self.ctx_IRS],name = "channelA") + self.channelA.start() + + self.channelB = threading.Thread(target=self.channelWorker,args=[self.ctx_IRS, self.ctx_ISS],name = "channelB") + self.channelB.start() + + def waitAndCloseChannels(self): + self.waitForSession(self.ctx_ISS.modem_events, True) + self.channels_running = False + self.waitForSession(self.ctx_IRS.modem_events, False) + self.channels_running = False + + def testNormBroadcast(self): + self.loss_probability = 0 # no loss + self.establishChannels() + + params = { + 'origin': "AA1AAA-1", + 'domain': "BB1BBB-1", + 'gridsquare': "JN48ea", + 'type': 'MESSAGE', + 'priority': '1', + 'data': str(base64.b64encode(b"hello world!"), 'utf-8') + } + try: + command = command_norm.Norm(self.ctx_ISS, params) + command.run() + except Exception as e: + print(e) + + + #del cmd + #print(self.ctx_ISS.TESTMODE_EVENTS.empty()) + + while not self.ctx_ISS.TESTMODE_EVENTS.empty(): + event = self.ctx_ISS.TESTMODE_EVENTS.get() + success = event.get('arq-transfer-outbound', {}).get('success', None) + if success is not None: + self.assertTrue(success, f"Test failed because of wrong success: {success}") + +if __name__ == '__main__': + unittest.main() From 870e6e000168a8ecad0eb68802616a057a9d37fc Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Sun, 22 Jun 2025 23:20:57 +0200 Subject: [PATCH 013/141] save data to database --- freedata_server/constants.py | 2 +- freedata_server/data_frame_factory.py | 6 +- freedata_server/frame_handler_norm.py | 2 +- .../message_system_db_broadcasts.py | 117 ++++++++++++++++++ freedata_server/message_system_db_model.py | 47 +++++++ freedata_server/norm/norm_transmission.py | 10 +- freedata_server/norm/norm_transmission_irs.py | 57 +++++++-- freedata_server/norm/norm_transmission_iss.py | 7 +- 8 files changed, 227 insertions(+), 21 deletions(-) create mode 100644 freedata_server/message_system_db_broadcasts.py diff --git a/freedata_server/constants.py b/freedata_server/constants.py index b40a436c9..e73bf3dd3 100644 --- a/freedata_server/constants.py +++ b/freedata_server/constants.py @@ -8,4 +8,4 @@ DOCUMENTATION_URL = 'https://wiki.freedata.app' STATS_API_URL = 'https://api.freedata.app/stats.php' EXPLORER_API_URL = 'https://api.freedata.app/explorer.php' -MESSAGE_SYSTEM_DATABASE_VERSION = 0 \ No newline at end of file +MESSAGE_SYSTEM_DATABASE_VERSION = 1 \ No newline at end of file diff --git a/freedata_server/data_frame_factory.py b/freedata_server/data_frame_factory.py index 344a4eaf3..25ab93343 100644 --- a/freedata_server/data_frame_factory.py +++ b/freedata_server/data_frame_factory.py @@ -240,6 +240,7 @@ def _load_norm_templates(self): "flag": 1, "timestamp": 4, "burst_info": 1, + "checksum": 3, "payload_size": 1, "payload_data": 30 } @@ -341,7 +342,7 @@ def deconstruct(self, frame, mode_name=None): elif key in ["payload_data"]: extracted_data[key] = data - elif key in ["origin_crc", "destination_crc", "total_crc"]: + elif key in ["origin_crc", "destination_crc", "total_crc", "checksum"]: extracted_data[key] = data.hex() elif key == "gridsquare": @@ -667,7 +668,7 @@ def build_p2p_connection_disconnect_ack(self, session_id): } return self.construct(FR_TYPE.P2P_CONNECTION_DISCONNECT_ACK, payload) - def build_norm_data(self, origin, domain, gridsquare, timestamp, burst_info, payload_size, payload_data, flag): + def build_norm_data(self, origin, domain, gridsquare, timestamp, burst_info, payload_size, payload_data, flag, checksum): payload = { "origin": helpers.callsign_to_bytes(origin), @@ -678,6 +679,7 @@ def build_norm_data(self, origin, domain, gridsquare, timestamp, burst_info, pay "burst_info": burst_info.to_bytes(1, 'big'), "payload_size": payload_size.to_bytes(1, 'big'), "payload_data": payload_data, + "checksum": checksum } return self.construct(FR_TYPE.NORM_DATA, payload) diff --git a/freedata_server/frame_handler_norm.py b/freedata_server/frame_handler_norm.py index d058eb103..eb293cf29 100644 --- a/freedata_server/frame_handler_norm.py +++ b/freedata_server/frame_handler_norm.py @@ -19,4 +19,4 @@ def follow_protocol(self): #print(origin) - NormTransmissionIRS(self.details["frame"]) \ No newline at end of file + NormTransmissionIRS(self.ctx, self.details["frame"]) \ No newline at end of file diff --git a/freedata_server/message_system_db_broadcasts.py b/freedata_server/message_system_db_broadcasts.py new file mode 100644 index 000000000..6b6bc8138 --- /dev/null +++ b/freedata_server/message_system_db_broadcasts.py @@ -0,0 +1,117 @@ +from message_system_db_manager import DatabaseManager +from message_system_db_attachments import DatabaseManagerAttachments +from message_system_db_model import Status, BroadcastMessage +from message_system_db_station import DatabaseManagerStations +from sqlalchemy.exc import IntegrityError +from datetime import datetime, timedelta +import json +import os +from exceptions import MessageStatusError +import helpers +class DatabaseManagerBroadcasts(DatabaseManager): + + def __init__(self, ctx): + super().__init__(ctx) + + self.stations_manager = DatabaseManagerStations(self.ctx) + + def process_broadcast_message( + self, + id: str, + origin: str, + burst_index: int, + burst_data: str, + total_bursts: int, + checksum: str, + repairing_callsigns: dict = None, + domain: str = None, + gridsquare: str = None, + msg_type: str = None, + received_at: datetime = None, + expires_at: datetime = None, + priority: int = 1, + is_read: bool = True, + status: str = "queued", + error_reason: str = None + ) -> bool: + """ + Handles both creation of a new broadcast message and addition of bursts. + + If the message does not exist, it will be created. + If it exists, the burst will be added. + When all bursts are present, the final payload will be assembled and CRC checked. + """ + session = self.get_thread_scoped_session() + try: + # Try to find existing message + msg = session.query(BroadcastMessage).filter_by(id=id).first() + + if not msg: + # Create station and status + origin_station = self.stations_manager.get_or_create_station(origin, session) + status_obj = self.get_or_create_status(session, status) if status else None + received_at = received_at or datetime.utcnow() + + # New message + msg = BroadcastMessage( + id=id, + origin=origin_station.callsign, + repairing_callsigns=repairing_callsigns, + domain=domain, + gridsquare=gridsquare, + priority=priority, + is_read=is_read, + payload_size=0, + payload_data={"bursts": {str(burst_index): burst_data}}, + msg_type=msg_type, + total_bursts=total_bursts, + checksum=checksum, + received_at=received_at, + expires_at=expires_at, + status_id=status_obj.id if status_obj else None, + error_reason=error_reason + ) + session.add(msg) + self.log(f"Created new broadcast message {id}") + + else: + # Add burst to existing message + if not msg.payload_data: + msg.payload_data = {} + + if "bursts" not in msg.payload_data: + msg.payload_data["bursts"] = {} + + msg.payload_data["bursts"][str(burst_index)] = burst_data + self.log(f"Added burst {burst_index} to message {id}") + + # Check for final assembly + received = msg.payload_data["bursts"] + total = msg.total_bursts or 0 + + if total > 0 and len(received) == total and all(str(i) in received for i in range(total)): + ordered = [received[str(i)] for i in range(total)] + final = "".join(ordered) + + # CRC check + crc = helpers.get_crc_24(final) + if msg.checksum is None: + self.log(f"Missing checksum for {id}", isWarning=True) + elif crc != msg.checksum: + self.log(f"Checksum mismatch for {id}: expected {msg.checksum}, got {crc}", isWarning=True) + else: + msg.payload_data["final"] = final + msg.payload_size = len(final.encode("utf-8")) + self.log(f"Final payload assembled and verified for {id}") + + session.commit() + self.ctx.event_manager.freedata_message_db_change(message_id=msg.id) + return True + + except Exception as e: + session.rollback() + self.log(f"Error processing broadcast message {id}: {e}", isWarning=True) + return False + + finally: + session.remove() diff --git a/freedata_server/message_system_db_model.py b/freedata_server/message_system_db_model.py index 8c962f205..41066502c 100644 --- a/freedata_server/message_system_db_model.py +++ b/freedata_server/message_system_db_model.py @@ -2,6 +2,7 @@ from sqlalchemy import Index, Boolean, Column, String, Integer, JSON, ForeignKey, DateTime from sqlalchemy.orm import declarative_base, relationship +from datetime import datetime Base = declarative_base() @@ -195,3 +196,49 @@ def to_dict(self): 'checksum_crc32': self.checksum_crc32, 'hash_sha512' : self.hash_sha512 } + + +class BroadcastMessage(Base): + __tablename__ = 'broadcast_messages' + + id = Column(String, primary_key=True) + origin = Column(String, ForeignKey('station.callsign')) + repairing_callsigns = Column(JSON, nullable=True) + domain = Column(String) + gridsquare = Column(String) + frequency = Column(Integer, default=0) + priority = Column(Integer, default=1) + is_read = Column(Boolean, default=True) + payload_size = Column(Integer, default=0) + payload_data = Column(JSON, nullable=True) + msg_type = Column(String) + total_bursts = Column(Integer, default=0) + checksum = Column(String) + received_at = Column(DateTime, default=datetime.utcnow) + expires_at = Column(DateTime, nullable=True) + status_id = Column(Integer, ForeignKey('status.id'), nullable=True) + status = relationship('Status', backref='broadcast_messages') + error_reason = Column(String, nullable=True) + + Index('idx_broadcast_domain_received', 'domain', 'received_at') + + def to_dict(self): + return { + 'id': self.id, + 'origin': self.origin, + 'repairing_callsigns': self.repairing_callsigns, + 'domain': self.domain, + 'gridsquare': self.gridsquare, + 'frequency': self.frequency, + 'priority': self.priority, + 'is_read': self.is_read, + 'payload_size': self.payload_size, + 'payload_data': self.payload_data, + 'msg_type': self.msg_type, + 'total_bursts': self.total_bursts, + 'checksum': self.checksum, + 'received_at': self.received_at.isoformat() if self.received_at else None, + 'expires_at': self.expires_at.isoformat() if self.expires_at else None, + 'status': self.status.name if self.status else None, + 'error_reason': self.error_reason + } diff --git a/freedata_server/norm/norm_transmission.py b/freedata_server/norm/norm_transmission.py index c7cef287b..04ad95a0b 100644 --- a/freedata_server/norm/norm_transmission.py +++ b/freedata_server/norm/norm_transmission.py @@ -5,6 +5,8 @@ import time import data_frame_factory from enum import IntEnum +import hashlib + class NORMMsgType(IntEnum): UNDEFINED = 0 @@ -120,4 +122,10 @@ def encode_burst_info(self, burst_number, total_bursts): def decode_burst_info(self, burst_info): burst_number = (burst_info >> 4) & 0x0F burst_total = burst_info & 0x0F - return burst_number, burst_total \ No newline at end of file + return burst_number, burst_total + + + def create_broadcast_id(self, timestamp: int, domain: str, checksum: str, length: int = 10) -> str: + raw = f"{self}|{timestamp}|{domain}|{checksum}" + digest = hashlib.blake2s(raw.encode(), digest_size=6).hexdigest() + return f"bc_{digest[:max(8, min(length, 12))]}" diff --git a/freedata_server/norm/norm_transmission_irs.py b/freedata_server/norm/norm_transmission_irs.py index c2aa938f5..3d5d8b4f9 100644 --- a/freedata_server/norm/norm_transmission_irs.py +++ b/freedata_server/norm/norm_transmission_irs.py @@ -1,34 +1,40 @@ # file for handling received data from norm.norm_transmission import NormTransmission +from message_system_db_broadcasts import DatabaseManagerBroadcasts +import base64 + class NormTransmissionIRS(NormTransmission): MAX_PAYLOAD_SIZE = 96 - def __init__(self, frame): + def __init__(self, ctx, frame): + self.ctx = ctx print("burst:", frame) is_last, msg_type, priority = self.decode_flags(frame["flag"]) burst_number, total_bursts = self.decode_burst_info(frame["burst_info"]) payload_size = frame["payload_size"] payload_data = frame["payload_data"] - self.origin = frame["origin"] - self.domain = frame["domain"] - self.gridsquare = frame["gridsquare"] - # FIXME - #if payload_size > len(frame["payload_data"]) and total_bursts > 1: - # payload_data = frame["payload_data"] - #else: - # payload_data = frame["payload_data"][:self.MAX_PAYLOAD_SIZE * burst_number] + if total_bursts == 1: + payload_data = payload_data[:payload_size] + else: + start = (burst_number -1) * self.MAX_PAYLOAD_SIZE + end = min(start + self.MAX_PAYLOAD_SIZE, payload_size) + payload_data = payload_data[start:end] + self.origin = frame["origin"] + self.domain = frame["domain"] + self.gridsquare = frame["gridsquare"] + self.checksum = frame["checksum"] + self.timestamp = frame["timestamp"] + print("####################################") - print("payload_size:", payload_size) print("payload_data:", payload_data) - print("origin", self.origin) print("domain", self.domain) print("gridsquare", self.gridsquare) @@ -37,6 +43,31 @@ def __init__(self, frame): print("priority", priority) print("burst_number", burst_number) print("total_bursts", total_bursts) + print("checksum", self.checksum) + + + payload_b64 = base64.b64encode(payload_data).decode("ascii") + print("payload_b64", payload_b64) + + self.id = self.create_broadcast_id(self.timestamp, self.domain, self.checksum) + + db = DatabaseManagerBroadcasts(self.ctx) + success = db.process_broadcast_message( + id=self.id, + origin=self.origin, + burst_index=burst_number, + burst_data=payload_b64, + total_bursts=total_bursts, + checksum=self.checksum, + repairing_callsigns=frame.get("repairing_callsigns"), + domain=self.domain, + gridsquare=self.gridsquare, + msg_type=msg_type, + priority=priority, + received_at=self.timestamp, + is_read=True, + status="received" + ) - # TODO: - # add data to database or update it \ No newline at end of file + if not success: + print("Failed to process burst in database.") diff --git a/freedata_server/norm/norm_transmission_iss.py b/freedata_server/norm/norm_transmission_iss.py index 564e14533..4838b853c 100644 --- a/freedata_server/norm/norm_transmission_iss.py +++ b/freedata_server/norm/norm_transmission_iss.py @@ -4,7 +4,7 @@ from enum import Enum import time from codec2 import FREEDV_MODE - +import helpers class NORM_ISS_State(Enum): NEW = 0 TRANSMITTING = 1 @@ -53,7 +53,7 @@ def create_bursts(self): payload = full_data[offset: offset + self.MAX_PAYLOAD_SIZE] burst_info = self.encode_burst_info(burst_number, total_bursts) - + checksum = helpers.get_crc_24(full_data) # set flag for last burst is_last = (burst_number == total_bursts - 1) flags = self.encode_flags( @@ -70,7 +70,8 @@ def create_bursts(self): burst_info=burst_info, payload_size=len(payload), payload_data=payload, - flag=flags + flag=flags, + checksum=checksum ) print(burst_frame) bursts.append(burst_frame) From 76518ca9c48a49e2cf09f122f07569dd310fceb0 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 23 Jun 2025 10:04:42 +0200 Subject: [PATCH 014/141] save data to database --- freedata_server/api/freedata.py | 17 +++++ freedata_server/data_frame_factory.py | 2 +- .../message_system_db_broadcasts.py | 65 +++++++++++++++++-- freedata_server/message_system_db_model.py | 6 +- freedata_server/norm/norm_transmission.py | 18 ++++- freedata_server/norm/norm_transmission_irs.py | 13 ++-- freedata_server/norm/norm_transmission_iss.py | 2 +- tests/test_norm_protocol.py | 3 +- 8 files changed, 107 insertions(+), 19 deletions(-) diff --git a/freedata_server/api/freedata.py b/freedata_server/api/freedata.py index fe816e005..374f00889 100644 --- a/freedata_server/api/freedata.py +++ b/freedata_server/api/freedata.py @@ -5,6 +5,8 @@ from message_system_db_attachments import DatabaseManagerAttachments from message_system_db_beacon import DatabaseManagerBeacon from message_system_db_station import DatabaseManagerStations +from message_system_db_broadcasts import DatabaseManagerBroadcasts + import command_message_send import adif_udp_logger import wavelog_api_logger @@ -25,6 +27,9 @@ def _mgr_beacon(ctx: AppContext): def _mgr_stations(ctx: AppContext): return DatabaseManagerStations(ctx) +def _mgr_broadcasts(ctx: AppContext): + return DatabaseManagerBroadcasts(ctx) + @router.get("/messages/{message_id}", summary="Get Message by ID", tags=["FreeDATA"], responses={ @@ -699,3 +704,15 @@ async def set_station_info( if result is None: api_abort("Station not found", 404) return api_response(result) + +@router.get("/broadcast", summary="Get All Broadcast Messages", tags=["FreeDATA"], responses={}) +async def get_freedata_broadcasts( + ctx: AppContext = Depends(get_ctx) +): + #filters = {k: v for k, v in ctx.config_manager.read().get('FILTERS', {}).items()} + # use query params if needed + # filters = dict(ctx.request.query_params) + result = _mgr_broadcasts(ctx).get_all_broadcasts_json() + return api_response(result) + + diff --git a/freedata_server/data_frame_factory.py b/freedata_server/data_frame_factory.py index 25ab93343..51378abd4 100644 --- a/freedata_server/data_frame_factory.py +++ b/freedata_server/data_frame_factory.py @@ -242,7 +242,7 @@ def _load_norm_templates(self): "burst_info": 1, "checksum": 3, "payload_size": 1, - "payload_data": 30 + "payload_data": 99 } # repair frame diff --git a/freedata_server/message_system_db_broadcasts.py b/freedata_server/message_system_db_broadcasts.py index 6b6bc8138..7304f8e1f 100644 --- a/freedata_server/message_system_db_broadcasts.py +++ b/freedata_server/message_system_db_broadcasts.py @@ -2,12 +2,16 @@ from message_system_db_attachments import DatabaseManagerAttachments from message_system_db_model import Status, BroadcastMessage from message_system_db_station import DatabaseManagerStations -from sqlalchemy.exc import IntegrityError +from sqlalchemy.orm.attributes import flag_modified from datetime import datetime, timedelta import json import os from exceptions import MessageStatusError import helpers +import base64 + + + class DatabaseManagerBroadcasts(DatabaseManager): def __init__(self, ctx): @@ -19,6 +23,7 @@ def process_broadcast_message( self, id: str, origin: str, + timestamp: datetime, burst_index: int, burst_data: str, total_bursts: int, @@ -46,16 +51,18 @@ def process_broadcast_message( # Try to find existing message msg = session.query(BroadcastMessage).filter_by(id=id).first() + self.log(f"Broadcast ID: {id}, Burst: {burst_index}, Exists: {'yes' if msg else 'no'}") + if not msg: # Create station and status origin_station = self.stations_manager.get_or_create_station(origin, session) status_obj = self.get_or_create_status(session, status) if status else None - received_at = received_at or datetime.utcnow() # New message msg = BroadcastMessage( id=id, origin=origin_station.callsign, + timestamp=timestamp, repairing_callsigns=repairing_callsigns, domain=domain, gridsquare=gridsquare, @@ -83,23 +90,29 @@ def process_broadcast_message( msg.payload_data["bursts"] = {} msg.payload_data["bursts"][str(burst_index)] = burst_data + flag_modified(msg, "payload_data") + self.log(f"Added burst {burst_index} to message {id}") # Check for final assembly received = msg.payload_data["bursts"] - total = msg.total_bursts or 0 + total = msg.total_bursts - if total > 0 and len(received) == total and all(str(i) in received for i in range(total)): - ordered = [received[str(i)] for i in range(total)] + if total > 0 and len(received) == total and all(str(i) in received for i in range(1, total + 1)): + ordered = [received[str(i)] for i in range(1, total + 1)] final = "".join(ordered) # CRC check - crc = helpers.get_crc_24(final) + + final_bytes = base64.b64decode(final) + crc = helpers.get_crc_24(final_bytes).hex() + if msg.checksum is None: self.log(f"Missing checksum for {id}", isWarning=True) elif crc != msg.checksum: self.log(f"Checksum mismatch for {id}: expected {msg.checksum}, got {crc}", isWarning=True) else: + print("ja?=!") msg.payload_data["final"] = final msg.payload_size = len(final.encode("utf-8")) self.log(f"Final payload assembled and verified for {id}") @@ -115,3 +128,43 @@ def process_broadcast_message( finally: session.remove() + + def get_all_broadcasts_json(self) -> list: + """ + Returns all broadcast messages in JSON-serializable dict format. + """ + session = self.get_thread_scoped_session() + try: + messages = session.query(BroadcastMessage).all() + result = [] + + for msg in messages: + result.append({ + "id": msg.id, + "origin": msg.origin, + "timestamp": msg.timestamp.isoformat() if msg.timestamp else None, + "repairing_callsigns": msg.repairing_callsigns, + "domain": msg.domain, + "gridsquare": msg.gridsquare, + "frequency": msg.frequency, + "priority": msg.priority, + "is_read": msg.is_read, + "payload_size": msg.payload_size, + "payload_data": msg.payload_data, + "msg_type": msg.msg_type, + "total_bursts": msg.total_bursts, + "checksum": msg.checksum, + "received_at": msg.received_at.isoformat() if msg.received_at else None, + "expires_at": msg.expires_at.isoformat() if msg.expires_at else None, + "status": msg.status.name if msg.status else None, + "error_reason": msg.error_reason + }) + + return result + + except Exception as e: + self.log(f"Error fetching broadcasts: {e}", isWarning=True) + return [] + + finally: + session.remove() diff --git a/freedata_server/message_system_db_model.py b/freedata_server/message_system_db_model.py index 41066502c..8d153491a 100644 --- a/freedata_server/message_system_db_model.py +++ b/freedata_server/message_system_db_model.py @@ -203,6 +203,7 @@ class BroadcastMessage(Base): id = Column(String, primary_key=True) origin = Column(String, ForeignKey('station.callsign')) + timestamp = Column(DateTime) repairing_callsigns = Column(JSON, nullable=True) domain = Column(String) gridsquare = Column(String) @@ -214,8 +215,8 @@ class BroadcastMessage(Base): msg_type = Column(String) total_bursts = Column(Integer, default=0) checksum = Column(String) - received_at = Column(DateTime, default=datetime.utcnow) - expires_at = Column(DateTime, nullable=True) + received_at = Column(DateTime, default=0) + expires_at = Column(DateTime, default=0) status_id = Column(Integer, ForeignKey('status.id'), nullable=True) status = relationship('Status', backref='broadcast_messages') error_reason = Column(String, nullable=True) @@ -226,6 +227,7 @@ def to_dict(self): return { 'id': self.id, 'origin': self.origin, + 'timestamp': self.timestamp, 'repairing_callsigns': self.repairing_callsigns, 'domain': self.domain, 'gridsquare': self.gridsquare, diff --git a/freedata_server/norm/norm_transmission.py b/freedata_server/norm/norm_transmission.py index 04ad95a0b..b024ce81e 100644 --- a/freedata_server/norm/norm_transmission.py +++ b/freedata_server/norm/norm_transmission.py @@ -126,6 +126,18 @@ def decode_burst_info(self, burst_info): def create_broadcast_id(self, timestamp: int, domain: str, checksum: str, length: int = 10) -> str: - raw = f"{self}|{timestamp}|{domain}|{checksum}" - digest = hashlib.blake2s(raw.encode(), digest_size=6).hexdigest() - return f"bc_{digest[:max(8, min(length, 12))]}" + """ + Creates a deterministic broadcast ID using BLAKE2b. + + Args: + timestamp (int): UNIX timestamp (int). + domain (str): Domain/context string. + checksum (str): Checksum string. + length (int): Number of hex characters in result (max 128). + + Returns: + str: Broadcast ID, e.g., 'bc_8d12fa3b4e' + """ + base_str = f"{timestamp}:{domain}:{checksum}".encode("utf-8") + digest = hashlib.blake2b(base_str, digest_size=length // 2).hexdigest() # 1 hex char = 4 bits + return f"bc_{digest}" diff --git a/freedata_server/norm/norm_transmission_irs.py b/freedata_server/norm/norm_transmission_irs.py index 3d5d8b4f9..f856ed537 100644 --- a/freedata_server/norm/norm_transmission_irs.py +++ b/freedata_server/norm/norm_transmission_irs.py @@ -2,10 +2,10 @@ from norm.norm_transmission import NormTransmission from message_system_db_broadcasts import DatabaseManagerBroadcasts import base64 - +from datetime import datetime, timezone class NormTransmissionIRS(NormTransmission): - MAX_PAYLOAD_SIZE = 96 + MAX_PAYLOAD_SIZE = 99 def __init__(self, ctx, frame): self.ctx = ctx @@ -29,8 +29,7 @@ def __init__(self, ctx, frame): self.domain = frame["domain"] self.gridsquare = frame["gridsquare"] self.checksum = frame["checksum"] - self.timestamp = frame["timestamp"] - + self.timestamp = datetime.fromtimestamp(frame["timestamp"], tz=timezone.utc) print("####################################") print("payload_size:", payload_size) @@ -44,17 +43,20 @@ def __init__(self, ctx, frame): print("burst_number", burst_number) print("total_bursts", total_bursts) print("checksum", self.checksum) + print("timestamp", self.timestamp) payload_b64 = base64.b64encode(payload_data).decode("ascii") print("payload_b64", payload_b64) self.id = self.create_broadcast_id(self.timestamp, self.domain, self.checksum) + print("id", self.id) db = DatabaseManagerBroadcasts(self.ctx) success = db.process_broadcast_message( id=self.id, origin=self.origin, + timestamp=self.timestamp, burst_index=burst_number, burst_data=payload_b64, total_bursts=total_bursts, @@ -64,7 +66,8 @@ def __init__(self, ctx, frame): gridsquare=self.gridsquare, msg_type=msg_type, priority=priority, - received_at=self.timestamp, + received_at=datetime.now(timezone.utc), + expires_at=datetime.now(timezone.utc), is_read=True, status="received" ) diff --git a/freedata_server/norm/norm_transmission_iss.py b/freedata_server/norm/norm_transmission_iss.py index 4838b853c..140b02c92 100644 --- a/freedata_server/norm/norm_transmission_iss.py +++ b/freedata_server/norm/norm_transmission_iss.py @@ -14,7 +14,7 @@ class NORM_ISS_State(Enum): ABORTED = 5 class NormTransmissionISS(NormTransmission): - MAX_PAYLOAD_SIZE = 96 + MAX_PAYLOAD_SIZE = 99 def __init__(self, ctx, origin, domain, gridsquare, data, priority=NORMMsgPriority.NORMAL, message_type=NORMMsgType.UNDEFINED): diff --git a/tests/test_norm_protocol.py b/tests/test_norm_protocol.py index 1654bcb03..f57aca785 100644 --- a/tests/test_norm_protocol.py +++ b/tests/test_norm_protocol.py @@ -152,7 +152,8 @@ def testNormBroadcast(self): 'gridsquare': "JN48ea", 'type': 'MESSAGE', 'priority': '1', - 'data': str(base64.b64encode(b"hello world!"), 'utf-8') + #'data': str(base64.b64encode(b"hello world!"), 'utf-8') + 'data': str(base64.b64encode(b"hello world!hello world!hello world!hello world!hello world!hello world!hello world!hello world!hello world!hello world!hello world!hello world!hello world!hello world!hello world!hello world!hello world!hello world!hello world!hello world!"), 'utf-8') } try: command = command_norm.Norm(self.ctx_ISS, params) From 04efe8707560a9abc6f6ae4a6b459fcc20e03d7a Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 23 Jun 2025 10:29:57 +0200 Subject: [PATCH 015/141] work on multi burst --- freedata_server/data_frame_factory.py | 6 +++--- freedata_server/norm/norm_transmission_irs.py | 13 +++++++------ freedata_server/norm/norm_transmission_iss.py | 13 +++++++++---- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/freedata_server/data_frame_factory.py b/freedata_server/data_frame_factory.py index 51378abd4..c196ee971 100644 --- a/freedata_server/data_frame_factory.py +++ b/freedata_server/data_frame_factory.py @@ -241,8 +241,8 @@ def _load_norm_templates(self): "timestamp": 4, "burst_info": 1, "checksum": 3, - "payload_size": 1, - "payload_data": 99 + "payload_size": 2, + "payload_data": 98 } # repair frame @@ -677,7 +677,7 @@ def build_norm_data(self, origin, domain, gridsquare, timestamp, burst_info, pay "flag": flag.to_bytes(1, 'big'), "timestamp": timestamp.to_bytes(4, 'big'), "burst_info": burst_info.to_bytes(1, 'big'), - "payload_size": payload_size.to_bytes(1, 'big'), + "payload_size": payload_size.to_bytes(2, 'big'), "payload_data": payload_data, "checksum": checksum } diff --git a/freedata_server/norm/norm_transmission_irs.py b/freedata_server/norm/norm_transmission_irs.py index f856ed537..1b3b843a4 100644 --- a/freedata_server/norm/norm_transmission_irs.py +++ b/freedata_server/norm/norm_transmission_irs.py @@ -5,7 +5,7 @@ from datetime import datetime, timezone class NormTransmissionIRS(NormTransmission): - MAX_PAYLOAD_SIZE = 99 + MAX_PAYLOAD_SIZE = 98 def __init__(self, ctx, frame): self.ctx = ctx @@ -18,12 +18,13 @@ def __init__(self, ctx, frame): if total_bursts == 1: - payload_data = payload_data[:payload_size] - else: - start = (burst_number -1) * self.MAX_PAYLOAD_SIZE - end = min(start + self.MAX_PAYLOAD_SIZE, payload_size) - payload_data = payload_data[start:end] + payload_data = payload_data[:self.MAX_PAYLOAD_SIZE] + if is_last:# + + payload_data = payload_data.strip(b'\x00') + + payload_data = payload_data[:payload_size] self.origin = frame["origin"] self.domain = frame["domain"] diff --git a/freedata_server/norm/norm_transmission_iss.py b/freedata_server/norm/norm_transmission_iss.py index 140b02c92..2c4f93e59 100644 --- a/freedata_server/norm/norm_transmission_iss.py +++ b/freedata_server/norm/norm_transmission_iss.py @@ -14,7 +14,7 @@ class NORM_ISS_State(Enum): ABORTED = 5 class NormTransmissionISS(NormTransmission): - MAX_PAYLOAD_SIZE = 99 + MAX_PAYLOAD_SIZE = 98 def __init__(self, ctx, origin, domain, gridsquare, data, priority=NORMMsgPriority.NORMAL, message_type=NORMMsgType.UNDEFINED): @@ -44,8 +44,13 @@ def create_bursts(self): full_data = self.data - + print(self.data) total_bursts = (len(full_data) + self.MAX_PAYLOAD_SIZE - 1) // self.MAX_PAYLOAD_SIZE + print("total_bursts: ", total_bursts) + print("MAX_PAYLOAD_SIZE: ", self.MAX_PAYLOAD_SIZE) + print("len full data: ", len(full_data)) + + bursts = [] for burst_number in range(1, total_bursts + 1): @@ -55,7 +60,7 @@ def create_bursts(self): burst_info = self.encode_burst_info(burst_number, total_bursts) checksum = helpers.get_crc_24(full_data) # set flag for last burst - is_last = (burst_number == total_bursts - 1) + is_last = (burst_number == total_bursts) flags = self.encode_flags( msg_type=self.message_type, priority=self.message_priority, @@ -68,7 +73,7 @@ def create_bursts(self): gridsquare=self.gridsquare, timestamp=self.timestamp, burst_info=burst_info, - payload_size=len(payload), + payload_size=len(full_data), payload_data=payload, flag=flags, checksum=checksum From ecc4ea462d6415f8509306375f66cdcac53770ad Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 23 Jun 2025 11:03:41 +0200 Subject: [PATCH 016/141] fixing crc failure --- freedata_server/message_system_db_broadcasts.py | 10 ++++------ freedata_server/norm/norm_transmission_irs.py | 7 ++++--- freedata_server/norm/norm_transmission_iss.py | 1 + 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/freedata_server/message_system_db_broadcasts.py b/freedata_server/message_system_db_broadcasts.py index 7304f8e1f..8c4f8fcc6 100644 --- a/freedata_server/message_system_db_broadcasts.py +++ b/freedata_server/message_system_db_broadcasts.py @@ -99,12 +99,11 @@ def process_broadcast_message( total = msg.total_bursts if total > 0 and len(received) == total and all(str(i) in received for i in range(1, total + 1)): + ordered = [received[str(i)] for i in range(1, total + 1)] - final = "".join(ordered) + final_bytes = b"".join(base64.b64decode(b64part) for b64part in ordered) # CRC check - - final_bytes = base64.b64decode(final) crc = helpers.get_crc_24(final_bytes).hex() if msg.checksum is None: @@ -112,9 +111,8 @@ def process_broadcast_message( elif crc != msg.checksum: self.log(f"Checksum mismatch for {id}: expected {msg.checksum}, got {crc}", isWarning=True) else: - print("ja?=!") - msg.payload_data["final"] = final - msg.payload_size = len(final.encode("utf-8")) + msg.payload_data["final"] = base64.b64encode(final_bytes).decode() + msg.payload_size = len(final_bytes) self.log(f"Final payload assembled and verified for {id}") session.commit() diff --git a/freedata_server/norm/norm_transmission_irs.py b/freedata_server/norm/norm_transmission_irs.py index 1b3b843a4..90078fa83 100644 --- a/freedata_server/norm/norm_transmission_irs.py +++ b/freedata_server/norm/norm_transmission_irs.py @@ -21,10 +21,11 @@ def __init__(self, ctx, frame): payload_data = payload_data[:self.MAX_PAYLOAD_SIZE] if is_last:# + end = self.MAX_PAYLOAD_SIZE - ((total_bursts * self.MAX_PAYLOAD_SIZE) - payload_size) + print(end) + payload_data = payload_data[:end] - payload_data = payload_data.strip(b'\x00') - - payload_data = payload_data[:payload_size] + #payload_data = payload_data[:payload_size] self.origin = frame["origin"] self.domain = frame["domain"] diff --git a/freedata_server/norm/norm_transmission_iss.py b/freedata_server/norm/norm_transmission_iss.py index 2c4f93e59..cdccd5d7f 100644 --- a/freedata_server/norm/norm_transmission_iss.py +++ b/freedata_server/norm/norm_transmission_iss.py @@ -56,6 +56,7 @@ def create_bursts(self): for burst_number in range(1, total_bursts + 1): offset = (burst_number-1) * self.MAX_PAYLOAD_SIZE payload = full_data[offset: offset + self.MAX_PAYLOAD_SIZE] + print("payload: ", len(payload)) burst_info = self.encode_burst_info(burst_number, total_bursts) checksum = helpers.get_crc_24(full_data) From 6b42e3d382de85d80f98d1a4b199f0b93f4ec322 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 23 Jun 2025 11:10:39 +0200 Subject: [PATCH 017/141] update status if crc failure or received --- freedata_server/message_system_db_broadcasts.py | 8 ++++++++ freedata_server/message_system_db_manager.py | 5 +++-- freedata_server/norm/norm_transmission_irs.py | 2 +- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/freedata_server/message_system_db_broadcasts.py b/freedata_server/message_system_db_broadcasts.py index 8c4f8fcc6..071c4d44f 100644 --- a/freedata_server/message_system_db_broadcasts.py +++ b/freedata_server/message_system_db_broadcasts.py @@ -110,9 +110,17 @@ def process_broadcast_message( self.log(f"Missing checksum for {id}", isWarning=True) elif crc != msg.checksum: self.log(f"Checksum mismatch for {id}: expected {msg.checksum}, got {crc}", isWarning=True) + status_obj = self.get_or_create_status(session, "failed_checksum") + msg.status_id = status_obj.id + else: msg.payload_data["final"] = base64.b64encode(final_bytes).decode() msg.payload_size = len(final_bytes) + + status_obj = self.get_or_create_status(session, "received") + msg.status_id = status_obj.id + + self.log(f"Final payload assembled and verified for {id}") session.commit() diff --git a/freedata_server/message_system_db_manager.py b/freedata_server/message_system_db_manager.py index ddd3f6046..0fc827fc1 100644 --- a/freedata_server/message_system_db_manager.py +++ b/freedata_server/message_system_db_manager.py @@ -76,8 +76,9 @@ def initialize_default_values(self): "failed", "failed_checksum", "aborted", - "queued" - ] + "queued", + "receiving", + "assembling" ] # Add default statuses if they don't exist for status_name in statuses: diff --git a/freedata_server/norm/norm_transmission_irs.py b/freedata_server/norm/norm_transmission_irs.py index 90078fa83..828a99ff9 100644 --- a/freedata_server/norm/norm_transmission_irs.py +++ b/freedata_server/norm/norm_transmission_irs.py @@ -71,7 +71,7 @@ def __init__(self, ctx, frame): received_at=datetime.now(timezone.utc), expires_at=datetime.now(timezone.utc), is_read=True, - status="received" + status="assembling" ) if not success: From 1ff0da0fbbb3d49102837b38a009363f8d391d2f Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 23 Jun 2025 12:40:16 +0200 Subject: [PATCH 018/141] first parts of implementing gui stuff --- .../src/components/broadcast_domains.vue | 84 +++++++++++ .../src/components/broadcasts_screen.vue | 134 ++++++++++++++++++ .../src/components/main_left_navbar.vue | 19 ++- freedata_gui/src/components/main_modals.vue | 56 ++++++++ freedata_gui/src/components/main_screen.vue | 13 ++ freedata_gui/src/js/api.js | 19 +++ freedata_gui/src/js/broadcastsHandler.js | 3 + freedata_gui/src/store/broadcastStore.js | 24 ++++ freedata_server/b64.py | 6 + 9 files changed, 356 insertions(+), 2 deletions(-) create mode 100644 freedata_gui/src/components/broadcast_domains.vue create mode 100644 freedata_gui/src/components/broadcasts_screen.vue create mode 100644 freedata_gui/src/js/broadcastsHandler.js create mode 100644 freedata_gui/src/store/broadcastStore.js create mode 100644 freedata_server/b64.py diff --git a/freedata_gui/src/components/broadcast_domains.vue b/freedata_gui/src/components/broadcast_domains.vue new file mode 100644 index 000000000..e7afd4456 --- /dev/null +++ b/freedata_gui/src/components/broadcast_domains.vue @@ -0,0 +1,84 @@ + + + diff --git a/freedata_gui/src/components/broadcasts_screen.vue b/freedata_gui/src/components/broadcasts_screen.vue new file mode 100644 index 000000000..95c0be218 --- /dev/null +++ b/freedata_gui/src/components/broadcasts_screen.vue @@ -0,0 +1,134 @@ + + + + + diff --git a/freedata_gui/src/components/main_left_navbar.vue b/freedata_gui/src/components/main_left_navbar.vue index 71ef57ee9..61fb602f9 100644 --- a/freedata_gui/src/components/main_left_navbar.vue +++ b/freedata_gui/src/components/main_left_navbar.vue @@ -2,7 +2,7 @@ import { computed } from 'vue'; import { getOverallHealth } from '../js/eventHandler.js'; -import { getFreedataMessages } from '../js/api'; +import { getFreedataMessages, getFreedataBroadcasts } from '../js/api'; import { loadAllData } from '../js/eventHandler'; import { setActivePinia } from 'pinia'; @@ -89,7 +89,7 @@ const isNetworkTraffic = computed(() => state.is_network_traffic); aria-controls="list-chat" :title="$t('navbar.chat_help')" :class="{ disabled: isNetworkDisconnected }" - @click="isNetworkDisconnected ? null : getFreedataMessages" + @click="isNetworkDisconnected ? null : getFreedataMessages()" > state.is_network_traffic); + + + + + ({ + + + + +
+ +
+ +
+
{ + + // Indicator if we are loading data + var loading = ref(false); + + /* ------------------------------------------------ */ + // Scroll to bottom functions + const scrollTrigger = ref(0); + + function triggerScrollToBottom() { + scrollTrigger.value++; + } + + + return { + scrollTrigger, + triggerScrollToBottom, + loading, + }; +}); diff --git a/freedata_server/b64.py b/freedata_server/b64.py new file mode 100644 index 000000000..54e9deacc --- /dev/null +++ b/freedata_server/b64.py @@ -0,0 +1,6 @@ +import base64 + + +string = b'aGVsbG8gd29ybGQhaGVsbG8gd29ybGQhaGVsbG8gd29ybGQhaGVsbG8gd29ybGQhaGVsbG8gd29ybGQhaGVsbG8gd29ybGQhaGVsbG8gd29ybGQhaGVsbG8gd29ybGQhaGVsbG8gd29ybGQhaGVsbG8gd29ybGQhaGVsbG8gd29ybGQhaGVsbG8gd29ybGQhaGVsbG8gd29ybGQhaGVsbG8gd29ybGQhaGVsbG8gd29ybGQhaGVsbG8gd29ybGQhaGVsbG8gd29ybGQhaGVsbG8gd29ybGQhaGVsbG8gd29ybGQhaGVsbG8gd29ybGQh' + +print(base64.b64decode(string)) \ No newline at end of file From 5631f1ffb05b6c93a131d60a40629852f5b3eec8 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Tue, 24 Jun 2025 10:24:35 +0200 Subject: [PATCH 019/141] adjust database --- freedata_server/api/freedata.py | 12 +++++- .../message_system_db_broadcasts.py | 39 +++++++++++++++++++ freedata_server/message_system_db_model.py | 2 + freedata_server/norm/norm_transmission_irs.py | 1 + 4 files changed, 53 insertions(+), 1 deletion(-) diff --git a/freedata_server/api/freedata.py b/freedata_server/api/freedata.py index 374f00889..05fb1a4a2 100644 --- a/freedata_server/api/freedata.py +++ b/freedata_server/api/freedata.py @@ -705,7 +705,7 @@ async def set_station_info( api_abort("Station not found", 404) return api_response(result) -@router.get("/broadcast", summary="Get All Broadcast Messages", tags=["FreeDATA"], responses={}) +@router.get("/broadcasts", summary="Get All Broadcast Messages", tags=["FreeDATA"], responses={}) async def get_freedata_broadcasts( ctx: AppContext = Depends(get_ctx) ): @@ -715,4 +715,14 @@ async def get_freedata_broadcasts( result = _mgr_broadcasts(ctx).get_all_broadcasts_json() return api_response(result) +@router.get("/broadcasts/domains", summary="Get All Broadcast Messages", tags=["FreeDATA"], responses={}) +async def get_freedata_broadcasts( + ctx: AppContext = Depends(get_ctx) +): + #filters = {k: v for k, v in ctx.config_manager.read().get('FILTERS', {}).items()} + # use query params if needed + # filters = dict(ctx.request.query_params) + result = _mgr_broadcasts(ctx).get_broadcast_domains_json() + return api_response(result) + diff --git a/freedata_server/message_system_db_broadcasts.py b/freedata_server/message_system_db_broadcasts.py index 071c4d44f..491441663 100644 --- a/freedata_server/message_system_db_broadcasts.py +++ b/freedata_server/message_system_db_broadcasts.py @@ -36,6 +36,7 @@ def process_broadcast_message( expires_at: datetime = None, priority: int = 1, is_read: bool = True, + direction: str = None, status: str = "queued", error_reason: str = None ) -> bool: @@ -68,6 +69,7 @@ def process_broadcast_message( gridsquare=gridsquare, priority=priority, is_read=is_read, + direction=direction, payload_size=0, payload_data={"bursts": {str(burst_index): burst_data}}, msg_type=msg_type, @@ -155,6 +157,7 @@ def get_all_broadcasts_json(self) -> list: "frequency": msg.frequency, "priority": msg.priority, "is_read": msg.is_read, + "direction": msg.direction, "payload_size": msg.payload_size, "payload_data": msg.payload_data, "msg_type": msg.msg_type, @@ -174,3 +177,39 @@ def get_all_broadcasts_json(self) -> list: finally: session.remove() + + def get_broadcast_domains_json(self) -> dict: + """ + Returns a JSON-compatible dictionary where each key is a domain (e.g. 'BB1AA-2'), + and each value is a dict containing message stats for that domain. + """ + session = self.get_thread_scoped_session() + try: + messages = ( + session.query(BroadcastMessage) + .filter(BroadcastMessage.domain.isnot(None)) + .order_by(BroadcastMessage.timestamp.desc()) + .all() + ) + + result = {} + for msg in messages: + domain = msg.domain + if domain not in result: + result[domain] = { + "message_count": 1, + "last_message_id": msg.id, + "last_message_timestamp": msg.timestamp.isoformat() if msg.timestamp else None, + "last_origin": msg.origin + } + else: + result[domain]["message_count"] += 1 + + return result + + except Exception as e: + self.log(f"Error fetching domain summary: {e}", isWarning=True) + return {} + + finally: + session.remove() diff --git a/freedata_server/message_system_db_model.py b/freedata_server/message_system_db_model.py index 8d153491a..1cf473a3b 100644 --- a/freedata_server/message_system_db_model.py +++ b/freedata_server/message_system_db_model.py @@ -210,6 +210,7 @@ class BroadcastMessage(Base): frequency = Column(Integer, default=0) priority = Column(Integer, default=1) is_read = Column(Boolean, default=True) + direction = Column(String, nullable=True) payload_size = Column(Integer, default=0) payload_data = Column(JSON, nullable=True) msg_type = Column(String) @@ -234,6 +235,7 @@ def to_dict(self): 'frequency': self.frequency, 'priority': self.priority, 'is_read': self.is_read, + 'direction': self.direction, 'payload_size': self.payload_size, 'payload_data': self.payload_data, 'msg_type': self.msg_type, diff --git a/freedata_server/norm/norm_transmission_irs.py b/freedata_server/norm/norm_transmission_irs.py index 828a99ff9..0288c5279 100644 --- a/freedata_server/norm/norm_transmission_irs.py +++ b/freedata_server/norm/norm_transmission_irs.py @@ -71,6 +71,7 @@ def __init__(self, ctx, frame): received_at=datetime.now(timezone.utc), expires_at=datetime.now(timezone.utc), is_read=True, + direction="receive", status="assembling" ) From 93190180664d6e0d5582f0774e86f53ec346663d Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Tue, 24 Jun 2025 11:03:23 +0200 Subject: [PATCH 020/141] get broadcasts per domain --- freedata_server/api/freedata.py | 9 ++++ .../message_system_db_broadcasts.py | 44 +++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/freedata_server/api/freedata.py b/freedata_server/api/freedata.py index 05fb1a4a2..1aec291a8 100644 --- a/freedata_server/api/freedata.py +++ b/freedata_server/api/freedata.py @@ -715,6 +715,15 @@ async def get_freedata_broadcasts( result = _mgr_broadcasts(ctx).get_all_broadcasts_json() return api_response(result) +@router.get("/broadcasts/{domain}/", summary="Get Broadcats per Domain", tags=["FreeDATA"], responses={}) +async def get_freedata_broadcasts_per_domain( + domain: str, + ctx: AppContext = Depends(get_ctx) +): + result = _mgr_broadcasts(ctx).get_broadcasts_per_domain_json(domain) + return api_response(result) + + @router.get("/broadcasts/domains", summary="Get All Broadcast Messages", tags=["FreeDATA"], responses={}) async def get_freedata_broadcasts( ctx: AppContext = Depends(get_ctx) diff --git a/freedata_server/message_system_db_broadcasts.py b/freedata_server/message_system_db_broadcasts.py index 491441663..9e7a0ee68 100644 --- a/freedata_server/message_system_db_broadcasts.py +++ b/freedata_server/message_system_db_broadcasts.py @@ -213,3 +213,47 @@ def get_broadcast_domains_json(self) -> dict: finally: session.remove() + + def get_broadcasts_per_domain_json(self, domain: str = None) -> dict: + + session = self.get_thread_scoped_session() + try: + query = session.query(BroadcastMessage).order_by(BroadcastMessage.timestamp.desc()) + + if domain: + query = query.filter(BroadcastMessage.domain == domain) + + messages = query.all() + result = {} + + for msg in messages: + d = msg.domain or "unknown" + + if domain and d != domain: + continue # just in case + + if d not in result: + result[d] = [] + + result[d].append({ + "id": msg.id, + "timestamp": msg.timestamp.isoformat() if msg.timestamp else None, + "origin": msg.origin, + "gridsquare": msg.gridsquare, + "msg_type": msg.msg_type, + "payload_size": msg.payload_size, + "direction": msg.direction, + "status": msg.status.name if msg.status else None, + "error_reason": msg.error_reason, + "received_at": msg.received_at.isoformat() if msg.received_at else None, + "expires_at": msg.expires_at.isoformat() if msg.expires_at else None + }) + + return result + + except Exception as e: + self.log(f"Error collecting broadcasts for domain '{domain}': {e}", isWarning=True) + return {} + + finally: + session.remove() From 6f57ade23c598da45112026fa128256d55a65c17 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Tue, 24 Jun 2025 11:59:46 +0200 Subject: [PATCH 021/141] adjusted gui --- .../src/components/broadcast_domains.vue | 102 ++++++++---- .../components/broadcast_message_received.vue | 109 +++++++++++++ .../src/components/broadcast_message_sent.vue | 127 +++++++++++++++ .../src/components/broadcast_messages.vue | 88 +++++++++++ .../src/components/broadcast_new_message.vue | 148 ++++++++++++++++++ .../src/components/broadcasts_screen.vue | 54 ++++--- .../src/components/main_left_navbar.vue | 4 +- freedata_gui/src/components/main_modals.vue | 57 +++++++ freedata_gui/src/js/api.js | 66 +++++++- freedata_gui/src/js/broadcastsHandler.js | 52 +++++- freedata_gui/src/store/broadcastStore.js | 25 ++- 11 files changed, 771 insertions(+), 61 deletions(-) create mode 100644 freedata_gui/src/components/broadcast_message_received.vue create mode 100644 freedata_gui/src/components/broadcast_message_sent.vue create mode 100644 freedata_gui/src/components/broadcast_messages.vue create mode 100644 freedata_gui/src/components/broadcast_new_message.vue diff --git a/freedata_gui/src/components/broadcast_domains.vue b/freedata_gui/src/components/broadcast_domains.vue index e7afd4456..46fbe8fb2 100644 --- a/freedata_gui/src/components/broadcast_domains.vue +++ b/freedata_gui/src/components/broadcast_domains.vue @@ -34,46 +34,52 @@
- {{ $t('chat.noConversations') }} + {{ $t('broadcast.noDomains') }}
+ -
diff --git a/freedata_gui/src/components/broadcast_message_received.vue b/freedata_gui/src/components/broadcast_message_received.vue new file mode 100644 index 000000000..e4d8dfb69 --- /dev/null +++ b/freedata_gui/src/components/broadcast_message_received.vue @@ -0,0 +1,109 @@ + + + diff --git a/freedata_gui/src/components/broadcast_message_sent.vue b/freedata_gui/src/components/broadcast_message_sent.vue new file mode 100644 index 000000000..4b362b9b0 --- /dev/null +++ b/freedata_gui/src/components/broadcast_message_sent.vue @@ -0,0 +1,127 @@ + + + diff --git a/freedata_gui/src/components/broadcast_messages.vue b/freedata_gui/src/components/broadcast_messages.vue new file mode 100644 index 000000000..ba7561984 --- /dev/null +++ b/freedata_gui/src/components/broadcast_messages.vue @@ -0,0 +1,88 @@ + + + + + + diff --git a/freedata_gui/src/components/broadcast_new_message.vue b/freedata_gui/src/components/broadcast_new_message.vue new file mode 100644 index 000000000..85f2342cc --- /dev/null +++ b/freedata_gui/src/components/broadcast_new_message.vue @@ -0,0 +1,148 @@ + + +