diff --git a/GUI/src/vast/main_window.py b/GUI/src/vast/main_window.py index 16c273e4a..29bf9d788 100644 --- a/GUI/src/vast/main_window.py +++ b/GUI/src/vast/main_window.py @@ -235,8 +235,6 @@ def reposition_badge(): "Ground Image": self.ground_view, "Auth": self.auth_status - -main } for view in self.views.values(): self.stack.addWidget(view) diff --git a/GUI/src/vast/views/ground_view.py b/GUI/src/vast/views/ground_view.py index 3056bfaf7..09db68187 100644 --- a/GUI/src/vast/views/ground_view.py +++ b/GUI/src/vast/views/ground_view.py @@ -4,7 +4,7 @@ from typing import Optional, Any, Dict, List from PyQt6.QtCore import Qt, QTimer, QSize, QRectF -from PyQt6.QtGui import QPixmap, QKeyEvent, QPainter, QColor, QPen, QBrush +from PyQt6.QtGui import QPixmap, QKeyEvent, QPainter, QColor, QPen, QFont from PyQt6.QtWidgets import ( QWidget, QVBoxLayout, QHBoxLayout, QLabel, QPushButton, QProgressBar, QMessageBox, QSizePolicy, QFrame @@ -13,11 +13,9 @@ # GUI never touches MinIO directly – it uses DashboardApi only. from vast.dashboard_api import DashboardApi - GROUND_BUCKET = os.getenv("GROUND_BUCKET", "ground") GROUND_PREFIX = os.getenv("GROUND_PREFIX", "") - # ---------------------------- # PHI data model # ---------------------------- @@ -29,7 +27,7 @@ class PhiSnapshot: severity_avg: Optional[float] # usually 0..1 (clamped) trend: Optional[float] week_start: Optional[str] - source: str = "" # textual hint of data source + source: str = "" # textual hint of data source (NOT shown in UI) def _phi_band_color(v: float) -> str: @@ -51,37 +49,44 @@ def _safe_float(x) -> Optional[float]: # ---------------------------- -# Visual PHI circle +# Visual PHI circle (pie) # ---------------------------- class PhiCircleWidget(QWidget): """ - Draws a full circle: - - green background (healthy part) - - red pie slice for the infected part (severity in 0..1) + Draws a pie: + - red slice = severity in [0..1] + - green slice = 1 - severity (healthy remainder) + Always draws red on top so it's never hidden. + Also draws the severity percentage text centered on the pie. """ def __init__(self, parent=None): super().__init__(parent) self._severity = 0.0 # 0..1 - self.setMinimumHeight(140) - self.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed) + self.setAttribute(Qt.WidgetAttribute.WA_OpaquePaintEvent, True) + self.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed) self.setToolTip("Red slice = severity (0..1). Green = healthy remainder.") + def sizeHint(self): + return QSize(120, 120) + + def minimumSizeHint(self): + return QSize(100, 100) + def setSeverity(self, value: float) -> None: - # Clamp to [0,1] and repaint try: v = float(value) except Exception: v = 0.0 self._severity = max(0.0, min(1.0, v)) - self.update() + self.update() # ensure repaint def paintEvent(self, event) -> None: - # Guard rendering so drawing can never crash the app try: painter = QPainter(self) painter.setRenderHint(QPainter.RenderHint.Antialiasing, True) + painter.setPen(Qt.PenStyle.NoPen) - pad = 12 + pad = 10 size = min(self.width(), self.height()) - 2 * pad if size <= 0: return @@ -89,17 +94,28 @@ def paintEvent(self, event) -> None: cy = (self.height() - size) / 2 rect = QRectF(cx, cy, size, size) - # Healthy background (green) - painter.setPen(Qt.PenStyle.NoPen) - painter.setBrush(QBrush(QColor("#16a34a"))) - painter.drawEllipse(rect) + start = 90 * 16 # 12 o'clock (Qt: 0° is 3 o'clock) + full = -360 * 16 + + s = self._severity + # Degenerate cases + if s <= 1e-6: + painter.setBrush(QColor("#16a34a")) + painter.drawEllipse(rect) + elif s >= 1 - 1e-6: + painter.setBrush(QColor("#dc2626")) + painter.drawEllipse(rect) + else: + span_red = int(round(full * s)) # negative (clockwise) + span_green = full - span_red # the remainder - # Infected slice (red) from 12 o'clock clockwise - if self._severity > 0.0: - start_angle = 90 * 16 # 12 o'clock - span_angle = -360 * float(self._severity) * 16 # clockwise negative - painter.setBrush(QBrush(QColor("#dc2626"))) - painter.drawPie(rect, start_angle, span_angle) + # Draw green remainder first + painter.setBrush(QColor("#16a34a")) + painter.drawPie(rect, start + span_red, span_green) + + # Draw red slice on top (so it's always visible) + painter.setBrush(QColor("#dc2626")) + painter.drawPie(rect, start, span_red) # Outline pen = QPen(QColor("#334155")) @@ -107,6 +123,21 @@ def paintEvent(self, event) -> None: painter.setPen(pen) painter.setBrush(Qt.BrushStyle.NoBrush) painter.drawEllipse(rect) + + # Percentage text (centered) + percent_text = f"{int(round(s * 100))}%" + font = QFont() + font.setBold(True) + font.setPointSize(int(size * 0.22)) # responsive sizing + painter.setFont(font) + + # Soft shadow for readability + painter.setPen(QColor(0, 0, 0, 160)) + painter.drawText(rect, Qt.AlignmentFlag.AlignCenter, percent_text) + # Foreground text + painter.setPen(QColor("#ffffff")) + painter.drawText(rect, Qt.AlignmentFlag.AlignCenter, percent_text) + except Exception as e: print(f"[PhiCircleWidget] paintEvent error: {e}") @@ -191,26 +222,38 @@ def __init__(self, api: DashboardApi, parent=None): phi_layout.setContentsMargins(12, 12, 12, 12) phi_layout.setSpacing(8) + # Row with headline + (trimmed) details row = QHBoxLayout() self.phi_label = QLabel("PHI: –") self.phi_label.setStyleSheet("font-size:16px;font-weight:700;color:#0f172a;") row.addWidget(self.phi_label) row.addStretch(1) - self.phi_details = QLabel("") + self.phi_details = QLabel("") # will show severity/coverage/trend (without src) self.phi_details.setStyleSheet("color:#475569;font-size:12px;") row.addWidget(self.phi_details) phi_layout.addLayout(row) + # PHI progress (axis-like) + pie at its side self.phi_bar = QProgressBar() self.phi_bar.setRange(0, 100) self.phi_bar.setValue(0) self.phi_bar.setFormat("%v") self._style_phi_bar(None) - phi_layout.addWidget(self.phi_bar) - # PHI circle (green background, red severity slice) + phi_row2 = QHBoxLayout() + phi_row2.setContentsMargins(0, 0, 0, 0) + phi_row2.setSpacing(10) + phi_row2.addWidget(self.phi_bar, stretch=1) + self.phi_circle = PhiCircleWidget() - phi_layout.addWidget(self.phi_circle) + self.phi_circle.setFixedSize(120, 120) + phi_row2.addWidget(self.phi_circle) + + phi_layout.addLayout(phi_row2) + + legend = QLabel("אדום = Severity | ירוק = Healthy") + legend.setStyleSheet("color:#64748b;font-size:11px;") + phi_layout.addWidget(legend) root.addWidget(phi_frame, stretch=1) @@ -297,6 +340,7 @@ def _lm(o): self._keys = keys self._idx = 0 if self._keys else -1 self._update_counter() + self._update_nav_buttons() if self._idx >= 0: self.load_current_image() else: @@ -312,6 +356,11 @@ def _update_counter(self) -> None: pos = (self._idx + 1) if self._idx >= 0 else 0 self.counter_label.setText(f"({pos} / {total})") + def _update_nav_buttons(self) -> None: + has = bool(self._keys) + for b in (self.btn_prev, self.btn_next, self.btn_show_phi): + b.setEnabled(has) + def prev_image(self) -> None: if not self._keys: return @@ -348,6 +397,7 @@ def _set_image(self, pix: Optional[QPixmap]) -> None: target_size: QSize = self.image_label.size() if target_size.width() <= 4 or target_size.height() <= 4: self.image_label.setPixmap(pix) + self.image_label.setText("") return scaled = pix.scaled( target_size.width(), @@ -377,14 +427,14 @@ def load_current_image(self) -> None: getter = getattr(self.api, "get_image_bytes_from_minio", None) if not callable(getter): self._warn("DashboardApi.get_image_bytes_from_minio is missing.") + self._set_image(None) + self._render_phi_none() return data = None try: - # Prefer signature with bucket param data = getter(key, bucket=GROUND_BUCKET) except TypeError: - # Older signature without bucket data = getter(key) except Exception as e: self._warn(f"Failed fetching image bytes: {e}") @@ -411,6 +461,7 @@ def load_current_image(self) -> None: except Exception as e: self._warn(f"load_current_image error: {e}") + self._render_phi_none() # ---------------------------- # PHI flow @@ -496,6 +547,7 @@ def _render_phi(self, snap: PhiSnapshot) -> None: val = max(0, min(100, int(round(snap.phi)))) self.phi_label.setText(f"PHI: {val}") parts = [] + # keep useful metrics, but DO NOT show 'src=' anymore if snap.density is not None: parts.append(f"density={snap.density:.2f}") if snap.coverage is not None: @@ -506,9 +558,7 @@ def _render_phi(self, snap: PhiSnapshot) -> None: parts.append(f"trend={snap.trend:+.2f}") if snap.week_start: parts.append(f"week={snap.week_start}") - if snap.source: - parts.append(f"src={snap.source}") - self.phi_details.setText(" | ".join(parts)) + self.phi_details.setText(" | ".join(parts)) # no src here self.phi_bar.setValue(val) self._style_phi_bar(val) diff --git a/airflow_bundle/leaf-pipeline/airflow/dags/leaf_pipeline_dag.py b/airflow_bundle/leaf-pipeline/airflow/dags/leaf_pipeline_dag.py old mode 100755 new mode 100644 diff --git a/mqtt_and_kafka/mqtt-router/Dockerfile b/mqtt_and_kafka/mqtt-router/Dockerfile index 5f0779ad7..3ec5173ee 100644 --- a/mqtt_and_kafka/mqtt-router/Dockerfile +++ b/mqtt_and_kafka/mqtt-router/Dockerfile @@ -1,39 +1,39 @@ -FROM python:3.12-slim - -# ---- Build-time toggle for NetFree CA injection ---- -ARG USE_NETFREE=false - -# ---- System deps (CA, curl). librdkafka1 helps if confluent-kafka wheel is not fully static on your base ---- -RUN apt-get update && apt-get install -y --no-install-recommends \ - ca-certificates curl librdkafka1 \ - && rm -rf /var/lib/apt/lists/* - -WORKDIR /app - -# ---- Optional NetFree certificates (mount or COPY your certs/*.crt alongside the Dockerfile) ---- -# If you keep certs in repo, uncomment the next line: -# COPY certs/*.crt /app/certs/ -RUN if [ "$USE_NETFREE" = "true" ] && [ -d /app/certs ] && ls /app/certs/*.crt >/dev/null 2>&1; then \ - echo "Configuring NetFree certificates..."; \ - cp /app/certs/*.crt /usr/local/share/ca-certificates/ && update-ca-certificates; \ - else \ - echo "No NetFree certs applied (USE_NETFREE=$USE_NETFREE)."; \ - fi - -# ---- Make requests/libs use system CA (works both with and without NetFree) ---- -ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt \ - REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt \ - PIP_CERT=/etc/ssl/certs/ca-certificates.crt \ - PYTHONUNBUFFERED=1 - -# ---- Install Python deps ---- -COPY requirements.txt . -# When behind NetFree, trusted-host can help even אם אין צורך זה לא מזיק: -RUN python -m pip install --no-cache-dir \ - --trusted-host pypi.org --trusted-host files.pythonhosted.org \ - -r requirements.txt - -# ---- App code ---- -COPY app.py . - -ENTRYPOINT ["python", "app.py"] +FROM python:3.12-slim + +# ---- Build-time toggle for NetFree CA injection ---- +ARG USE_NETFREE=false + +# ---- System deps (CA, curl). librdkafka1 helps if confluent-kafka wheel is not fully static on your base ---- +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates curl librdkafka1 \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +# ---- Optional NetFree certificates (mount or COPY your certs/*.crt alongside the Dockerfile) ---- +# If you keep certs in repo, uncomment the next line: +# COPY certs/*.crt /app/certs/ +RUN if [ "$USE_NETFREE" = "true" ] && [ -d /app/certs ] && ls /app/certs/*.crt >/dev/null 2>&1; then \ + echo "Configuring NetFree certificates..."; \ + cp /app/certs/*.crt /usr/local/share/ca-certificates/ && update-ca-certificates; \ + else \ + echo "No NetFree certs applied (USE_NETFREE=$USE_NETFREE)."; \ + fi + +# ---- Make requests/libs use system CA (works both with and without NetFree) ---- +ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt \ + REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt \ + PIP_CERT=/etc/ssl/certs/ca-certificates.crt \ + PYTHONUNBUFFERED=1 + +# ---- Install Python deps ---- +COPY requirements.txt . +# When behind NetFree, trusted-host can help even אם אין צורך זה לא מזיק: +RUN python -m pip install --no-cache-dir \ + --trusted-host pypi.org --trusted-host files.pythonhosted.org \ + -r requirements.txt + +# ---- App code ---- +COPY app.py . + +ENTRYPOINT ["python", "app.py"] diff --git a/mqtt_and_kafka/mqtt-router/app.py b/mqtt_and_kafka/mqtt-router/app.py index ac32bef86..3383671aa 100644 --- a/mqtt_and_kafka/mqtt-router/app.py +++ b/mqtt_and_kafka/mqtt-router/app.py @@ -1,154 +1,154 @@ -import os -import re -import signal -import sys -from typing import Optional - -import paho.mqtt.client as mqtt -from confluent_kafka import Producer, KafkaException, KafkaError -from confluent_kafka.admin import AdminClient, NewTopic - -# ---------- Env ---------- -MQTT_HOST = os.getenv("MQTT_HOST", "mosquitto") -MQTT_PORT = int(os.getenv("MQTT_PORT", "1883")) -MQTT_USERNAME = os.getenv("MQTT_USERNAME", "") -MQTT_PASSWORD = os.getenv("MQTT_PASSWORD", "") -MQTT_TOPIC_FILTER = os.getenv("MQTT_TOPIC_FILTER", "mqtt/#") - -KAFKA_BOOTSTRAP = os.getenv("KAFKA_BOOTSTRAP", "kafka:9092") -KAFKA_CLIENT_ID = os.getenv("KAFKA_CLIENT_ID", "mqtt-router") -CREATE_TOPICS = os.getenv("CREATE_TOPICS", "false").lower() == "true" -DEFAULT_PARTITIONS = int(os.getenv("DEFAULT_PARTITIONS", "1")) -DEFAULT_REPLICATION = int(os.getenv("DEFAULT_REPLICATION", "1")) - -# Optional security (set via env if needed) -KAFKA_SECURITY_PROTOCOL = os.getenv("KAFKA_SECURITY_PROTOCOL", "") # e.g. "SASL_PLAINTEXT", "SASL_SSL", "SSL" -KAFKA_SASL_MECHANISM = os.getenv("KAFKA_SASL_MECHANISM", "") # e.g. "PLAIN" -KAFKA_SASL_USERNAME = os.getenv("KAFKA_SASL_USERNAME", "") -KAFKA_SASL_PASSWORD = os.getenv("KAFKA_SASL_PASSWORD", "") - -# ---------- Topic mapping ---------- -# Allow arbitrary depth after "mqtt/" → replace "/" with "." -VALID_CHARS = re.compile(r'[^A-Za-z0-9._-]') - -def map_mqtt_to_kafka_topic(mqtt_topic: str) -> Optional[str]: - prefix = "mqtt/" - if not mqtt_topic.startswith(prefix): - return None - tail = mqtt_topic[len(prefix):].strip("/") - if not tail: - return None - parts = [seg for seg in tail.split("/") if seg] - dotted = "_".join(parts) - dotted = VALID_CHARS.sub("_", dotted) - return dotted[:249] if dotted else None - -# ---------- Kafka clients ---------- -producer_conf = { - "bootstrap.servers": KAFKA_BOOTSTRAP, - "client.id": KAFKA_CLIENT_ID, - - # Strong delivery semantics - "acks": "all", - "enable.idempotence": True, - - # Throughput tuning - "compression.type": os.getenv("KAFKA_COMPRESSION", "lz4"), - "linger.ms": int(os.getenv("KAFKA_LINGER_MS", "5")), - "batch.size": int(os.getenv("KAFKA_BATCH_SIZE", str(64 * 1024))), # bytes - - # Resilience - "socket.keepalive.enable": True, - "delivery.timeout.ms": int(os.getenv("KAFKA_DELIVERY_TIMEOUT_MS", "120000")), - "request.timeout.ms": int(os.getenv("KAFKA_REQUEST_TIMEOUT_MS", "30000")), -} - -# Optional security -if KAFKA_SECURITY_PROTOCOL: - producer_conf["security.protocol"] = KAFKA_SECURITY_PROTOCOL -if KAFKA_SASL_MECHANISM: - producer_conf["sasl.mechanism"] = KAFKA_SASL_MECHANISM -if KAFKA_SASL_USERNAME: - producer_conf["sasl.username"] = KAFKA_SASL_USERNAME -if KAFKA_SASL_PASSWORD: - producer_conf["sasl.password"] = KAFKA_SASL_PASSWORD - -p = Producer(producer_conf) -admin = AdminClient({"bootstrap.servers": KAFKA_BOOTSTRAP}) # kept for CREATE_TOPICS toggle - -def ensure_topic(topic: str): - if not CREATE_TOPICS: - return - try: - fs = admin.create_topics([NewTopic(topic, num_partitions=DEFAULT_PARTITIONS, - replication_factor=DEFAULT_REPLICATION)]) - fs[topic].result() - print(f"[router] Created topic: {topic}", flush=True) - except Exception as e: - msg = str(e) - if "exists" in msg.lower() or "TopicExistsError" in msg or "TOPIC_ALREADY_EXISTS" in msg: - return - print(f"[router] create_topics warning for {topic}: {e}", flush=True) - -def delivery_report(err, msg): - if err is not None: - print(f"[router] Delivery failed for {msg.topic()}: {err}", flush=True) - else: - print(f"[router] Delivered to {msg.topic()} [partition {msg.partition()} offset {msg.offset()}]", flush=True) - -# ---------- MQTT callbacks ---------- -def on_connect(client, userdata, flags, rc, properties=None): - if rc == 0: - print(f"[router] Connected MQTT {MQTT_HOST}:{MQTT_PORT}, subscribe: {MQTT_TOPIC_FILTER}", flush=True) - client.subscribe(MQTT_TOPIC_FILTER, qos=0) - else: - print(f"[router] MQTT connect failed: rc={rc}", flush=True) - -def on_message(client, userdata, msg): - src = msg.topic - dst = map_mqtt_to_kafka_topic(src) - if not dst: - print(f"[router] Skipping topic (no match): {src}", flush=True) - return - try: - ensure_topic(dst) - p.produce(dst, value=msg.payload, on_delivery=delivery_report) - # Poll to serve delivery callbacks; small 0 keeps loop snappy - p.poll(0) - except KafkaException as e: - # Helpful message when topics are not pre-created - kafka_err = e.args[0] if e.args else None - if isinstance(kafka_err, KafkaError) and kafka_err.code() == KafkaError.UNKNOWN_TOPIC_OR_PART: - print(f"[router] ERROR UnknownTopicOrPartition for '{dst}'. " - f"CREATE_TOPICS=false → please pre-create this topic.", flush=True) - else: - print(f"[router] Kafka produce error: {e}", flush=True) - -# ---------- Main ---------- -def main(): - client = mqtt.Client(client_id="mqtt-router", protocol=mqtt.MQTTv5) - if MQTT_USERNAME or MQTT_PASSWORD: - client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) - - # Gentle reconnect backoff - client.reconnect_delay_set(min_delay=1, max_delay=30) - - client.on_connect = on_connect - client.on_message = on_message - - def handle_sigterm(signum, frame): - print("[router] SIGTERM received, flushing producer...", flush=True) - p.flush(10) - sys.exit(0) - - signal.signal(signal.SIGTERM, handle_sigterm) - signal.signal(signal.SIGINT, handle_sigterm) - - client.connect(MQTT_HOST, MQTT_PORT, keepalive=30) - print(f"[router] Boot: MQTT={MQTT_HOST}:{MQTT_PORT} Kafka={KAFKA_BOOTSTRAP} " - f"CREATE_TOPICS={CREATE_TOPICS}", flush=True) - client.loop_forever() - -if __name__ == "__main__": - main() - +import os +import re +import signal +import sys +from typing import Optional + +import paho.mqtt.client as mqtt +from confluent_kafka import Producer, KafkaException, KafkaError +from confluent_kafka.admin import AdminClient, NewTopic + +# ---------- Env ---------- +MQTT_HOST = os.getenv("MQTT_HOST", "mosquitto") +MQTT_PORT = int(os.getenv("MQTT_PORT", "1883")) +MQTT_USERNAME = os.getenv("MQTT_USERNAME", "") +MQTT_PASSWORD = os.getenv("MQTT_PASSWORD", "") +MQTT_TOPIC_FILTER = os.getenv("MQTT_TOPIC_FILTER", "mqtt/#") + +KAFKA_BOOTSTRAP = os.getenv("KAFKA_BOOTSTRAP", "kafka:9092") +KAFKA_CLIENT_ID = os.getenv("KAFKA_CLIENT_ID", "mqtt-router") +CREATE_TOPICS = os.getenv("CREATE_TOPICS", "false").lower() == "true" +DEFAULT_PARTITIONS = int(os.getenv("DEFAULT_PARTITIONS", "1")) +DEFAULT_REPLICATION = int(os.getenv("DEFAULT_REPLICATION", "1")) + +# Optional security (set via env if needed) +KAFKA_SECURITY_PROTOCOL = os.getenv("KAFKA_SECURITY_PROTOCOL", "") # e.g. "SASL_PLAINTEXT", "SASL_SSL", "SSL" +KAFKA_SASL_MECHANISM = os.getenv("KAFKA_SASL_MECHANISM", "") # e.g. "PLAIN" +KAFKA_SASL_USERNAME = os.getenv("KAFKA_SASL_USERNAME", "") +KAFKA_SASL_PASSWORD = os.getenv("KAFKA_SASL_PASSWORD", "") + +# ---------- Topic mapping ---------- +# Allow arbitrary depth after "mqtt/" → replace "/" with "." +VALID_CHARS = re.compile(r'[^A-Za-z0-9._-]') + +def map_mqtt_to_kafka_topic(mqtt_topic: str) -> Optional[str]: + prefix = "mqtt/" + if not mqtt_topic.startswith(prefix): + return None + tail = mqtt_topic[len(prefix):].strip("/") + if not tail: + return None + parts = [seg for seg in tail.split("/") if seg] + dotted = "_".join(parts) + dotted = VALID_CHARS.sub("_", dotted) + return dotted[:249] if dotted else None + +# ---------- Kafka clients ---------- +producer_conf = { + "bootstrap.servers": KAFKA_BOOTSTRAP, + "client.id": KAFKA_CLIENT_ID, + + # Strong delivery semantics + "acks": "all", + "enable.idempotence": True, + + # Throughput tuning + "compression.type": os.getenv("KAFKA_COMPRESSION", "lz4"), + "linger.ms": int(os.getenv("KAFKA_LINGER_MS", "5")), + "batch.size": int(os.getenv("KAFKA_BATCH_SIZE", str(64 * 1024))), # bytes + + # Resilience + "socket.keepalive.enable": True, + "delivery.timeout.ms": int(os.getenv("KAFKA_DELIVERY_TIMEOUT_MS", "120000")), + "request.timeout.ms": int(os.getenv("KAFKA_REQUEST_TIMEOUT_MS", "30000")), +} + +# Optional security +if KAFKA_SECURITY_PROTOCOL: + producer_conf["security.protocol"] = KAFKA_SECURITY_PROTOCOL +if KAFKA_SASL_MECHANISM: + producer_conf["sasl.mechanism"] = KAFKA_SASL_MECHANISM +if KAFKA_SASL_USERNAME: + producer_conf["sasl.username"] = KAFKA_SASL_USERNAME +if KAFKA_SASL_PASSWORD: + producer_conf["sasl.password"] = KAFKA_SASL_PASSWORD + +p = Producer(producer_conf) +admin = AdminClient({"bootstrap.servers": KAFKA_BOOTSTRAP}) # kept for CREATE_TOPICS toggle + +def ensure_topic(topic: str): + if not CREATE_TOPICS: + return + try: + fs = admin.create_topics([NewTopic(topic, num_partitions=DEFAULT_PARTITIONS, + replication_factor=DEFAULT_REPLICATION)]) + fs[topic].result() + print(f"[router] Created topic: {topic}", flush=True) + except Exception as e: + msg = str(e) + if "exists" in msg.lower() or "TopicExistsError" in msg or "TOPIC_ALREADY_EXISTS" in msg: + return + print(f"[router] create_topics warning for {topic}: {e}", flush=True) + +def delivery_report(err, msg): + if err is not None: + print(f"[router] Delivery failed for {msg.topic()}: {err}", flush=True) + else: + print(f"[router] Delivered to {msg.topic()} [partition {msg.partition()} offset {msg.offset()}]", flush=True) + +# ---------- MQTT callbacks ---------- +def on_connect(client, userdata, flags, rc, properties=None): + if rc == 0: + print(f"[router] Connected MQTT {MQTT_HOST}:{MQTT_PORT}, subscribe: {MQTT_TOPIC_FILTER}", flush=True) + client.subscribe(MQTT_TOPIC_FILTER, qos=0) + else: + print(f"[router] MQTT connect failed: rc={rc}", flush=True) + +def on_message(client, userdata, msg): + src = msg.topic + dst = map_mqtt_to_kafka_topic(src) + if not dst: + print(f"[router] Skipping topic (no match): {src}", flush=True) + return + try: + ensure_topic(dst) + p.produce(dst, value=msg.payload, on_delivery=delivery_report) + # Poll to serve delivery callbacks; small 0 keeps loop snappy + p.poll(0) + except KafkaException as e: + # Helpful message when topics are not pre-created + kafka_err = e.args[0] if e.args else None + if isinstance(kafka_err, KafkaError) and kafka_err.code() == KafkaError.UNKNOWN_TOPIC_OR_PART: + print(f"[router] ERROR UnknownTopicOrPartition for '{dst}'. " + f"CREATE_TOPICS=false → please pre-create this topic.", flush=True) + else: + print(f"[router] Kafka produce error: {e}", flush=True) + +# ---------- Main ---------- +def main(): + client = mqtt.Client(client_id="mqtt-router", protocol=mqtt.MQTTv5) + if MQTT_USERNAME or MQTT_PASSWORD: + client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) + + # Gentle reconnect backoff + client.reconnect_delay_set(min_delay=1, max_delay=30) + + client.on_connect = on_connect + client.on_message = on_message + + def handle_sigterm(signum, frame): + print("[router] SIGTERM received, flushing producer...", flush=True) + p.flush(10) + sys.exit(0) + + signal.signal(signal.SIGTERM, handle_sigterm) + signal.signal(signal.SIGINT, handle_sigterm) + + client.connect(MQTT_HOST, MQTT_PORT, keepalive=30) + print(f"[router] Boot: MQTT={MQTT_HOST}:{MQTT_PORT} Kafka={KAFKA_BOOTSTRAP} " + f"CREATE_TOPICS={CREATE_TOPICS}", flush=True) + client.loop_forever() + +if __name__ == "__main__": + main() + diff --git a/mqtt_and_kafka/mqtt-router/requirements.txt b/mqtt_and_kafka/mqtt-router/requirements.txt index 40450e3b7..3d853480c 100644 --- a/mqtt_and_kafka/mqtt-router/requirements.txt +++ b/mqtt_and_kafka/mqtt-router/requirements.txt @@ -1,2 +1,2 @@ -paho-mqtt==2.1.0 -confluent-kafka>=2.4 +paho-mqtt==2.1.0 +confluent-kafka>=2.4 diff --git a/services/fruit_ripeness_alert/token_bootstrap.py b/services/fruit_ripeness_alert/token_bootstrap.py index 2bcd5ab40..a1dee593b 100644 --- a/services/fruit_ripeness_alert/token_bootstrap.py +++ b/services/fruit_ripeness_alert/token_bootstrap.py @@ -1,62 +1,62 @@ -import os, pathlib, time, requests - -DB_API_BASE = os.getenv("DB_API_BASE", "").strip() -DB_API_TOKEN_FILE = os.getenv("DB_API_TOKEN_FILE", "/app/secret/db_api_token") -DB_API_SERVICE_NAME = os.getenv("DB_API_SERVICE_NAME", "fruit_ripeness_alert").strip() or "fruit_ripeness_alert" - -def _safe_join_url(base: str, path: str) -> str: - return f"{base.rstrip('/')}/{path.lstrip('/')}" - -def _read_token(path: str) -> str | None: - p = pathlib.Path(path) - if p.exists(): - t = p.read_text(encoding="utf-8").strip() - if t and "***" not in t: - return t - return None - -def _write_token(path: str, token: str) -> None: - p = pathlib.Path(path) - p.parent.mkdir(parents=True, exist_ok=True) - p.write_text(token, encoding="utf-8") - -def _try_dev_bootstrap(): - """Try to get token using /auth/_dev_bootstrap (new API).""" - url = _safe_join_url(DB_API_BASE, "/auth/_dev_bootstrap") - payload = {"service_name": DB_API_SERVICE_NAME, "rotate_if_exists": True} - try: - r = requests.post(url, json=payload, timeout=10) - if r.status_code in (200, 201): - data = r.json() - sa = data.get("service_account") or {} - token = sa.get("raw_token") or sa.get("token") - if token and "***" not in token: - print("[BOOTSTRAP] obtained token via /auth/_dev_bootstrap") - return token.strip() - print(f"[BOOTSTRAP][WARN] _dev_bootstrap returned {r.status_code}: {r.text[:100]}") - except Exception as e: - print(f"[BOOTSTRAP][ERROR] {e}") - return None - -def get_service_token() -> str | None: - """Get or create a service token automatically.""" - if not DB_API_BASE: - print("[BOOTSTRAP][WARN] DB_API_BASE not set") - return None - - # Try existing file - token = _read_token(DB_API_TOKEN_FILE) - if token: - print(f"[BOOTSTRAP] using existing token from {DB_API_TOKEN_FILE}") - return token - - # Try bootstrap (new unified API) - print(f"[BOOTSTRAP] fetching new service token from {DB_API_BASE}") - token = _try_dev_bootstrap() - if token: - _write_token(DB_API_TOKEN_FILE, token) - print(f"[BOOTSTRAP] wrote token to {DB_API_TOKEN_FILE}") - return token - - print("[BOOTSTRAP][ERROR] Could not obtain service token.") - return None +import os, pathlib, time, requests + +DB_API_BASE = os.getenv("DB_API_BASE", "").strip() +DB_API_TOKEN_FILE = os.getenv("DB_API_TOKEN_FILE", "/app/secret/db_api_token") +DB_API_SERVICE_NAME = os.getenv("DB_API_SERVICE_NAME", "fruit_ripeness_alert").strip() or "fruit_ripeness_alert" + +def _safe_join_url(base: str, path: str) -> str: + return f"{base.rstrip('/')}/{path.lstrip('/')}" + +def _read_token(path: str) -> str | None: + p = pathlib.Path(path) + if p.exists(): + t = p.read_text(encoding="utf-8").strip() + if t and "***" not in t: + return t + return None + +def _write_token(path: str, token: str) -> None: + p = pathlib.Path(path) + p.parent.mkdir(parents=True, exist_ok=True) + p.write_text(token, encoding="utf-8") + +def _try_dev_bootstrap(): + """Try to get token using /auth/_dev_bootstrap (new API).""" + url = _safe_join_url(DB_API_BASE, "/auth/_dev_bootstrap") + payload = {"service_name": DB_API_SERVICE_NAME, "rotate_if_exists": True} + try: + r = requests.post(url, json=payload, timeout=10) + if r.status_code in (200, 201): + data = r.json() + sa = data.get("service_account") or {} + token = sa.get("raw_token") or sa.get("token") + if token and "***" not in token: + print("[BOOTSTRAP] obtained token via /auth/_dev_bootstrap") + return token.strip() + print(f"[BOOTSTRAP][WARN] _dev_bootstrap returned {r.status_code}: {r.text[:100]}") + except Exception as e: + print(f"[BOOTSTRAP][ERROR] {e}") + return None + +def get_service_token() -> str | None: + """Get or create a service token automatically.""" + if not DB_API_BASE: + print("[BOOTSTRAP][WARN] DB_API_BASE not set") + return None + + # Try existing file + token = _read_token(DB_API_TOKEN_FILE) + if token: + print(f"[BOOTSTRAP] using existing token from {DB_API_TOKEN_FILE}") + return token + + # Try bootstrap (new unified API) + print(f"[BOOTSTRAP] fetching new service token from {DB_API_BASE}") + token = _try_dev_bootstrap() + if token: + _write_token(DB_API_TOKEN_FILE, token) + print(f"[BOOTSTRAP] wrote token to {DB_API_TOKEN_FILE}") + return token + + print("[BOOTSTRAP][ERROR] Could not obtain service token.") + return None diff --git a/services/sounds_flink/config.py b/services/sounds_flink/config.py index 0d30cdf02..989f8677e 100644 --- a/services/sounds_flink/config.py +++ b/services/sounds_flink/config.py @@ -1,23 +1,23 @@ -import os - -# Kafka / topics -KAFKA_BROKERS = os.getenv("KAFKA_BROKERS", "kafka:9092") -SOURCE_TOPIC = os.getenv("SOURCE_TOPIC", "sound_new_sounds_connections") -SINK_TOPIC = os.getenv("SINK_TOPIC", "") # empty = print to stdout only -GROUP_ID = os.getenv("GROUP_ID", "flink-classifier-sounds") -KAFKA_START = os.getenv("KAFKA_START", "earliest") # earliest|latest - -# HTTP classifier -CLASSIFIER_HTTP_URL = os.getenv("CLASSIFIER_HTTP_URL", "http://sounds_classifier:8088/classify") -REQUEST_TIMEOUT = float(os.getenv("REQUEST_TIMEOUT", "5.0")) -RETRIES_TOTAL = int(os.getenv("RETRIES_TOTAL", "3")) -BACKOFF_FACTOR = float(os.getenv("BACKOFF_FACTOR", "0.5")) - -# Flink -DEFAULT_PARALLELISM = int(os.getenv("DEFAULT_PARALLELISM", "1")) -CHECKPOINT_MS = int(os.getenv("CHECKPOINT_MS", "10000")) # 10s -DELIVERY_GUARANTEE = os.getenv("DELIVERY_GUARANTEE", "AT_LEAST_ONCE") # AT_LEAST_ONCE|NONE -TRANSACTION_TIMEOUT_MS = os.getenv("TRANSACTION_TIMEOUT_MS", "600000") # 10 min - -# Optional default bucket to use when input only carries an object key +import os + +# Kafka / topics +KAFKA_BROKERS = os.getenv("KAFKA_BROKERS", "kafka:9092") +SOURCE_TOPIC = os.getenv("SOURCE_TOPIC", "sound_new_sounds_connections") +SINK_TOPIC = os.getenv("SINK_TOPIC", "") # empty = print to stdout only +GROUP_ID = os.getenv("GROUP_ID", "flink-classifier-sounds") +KAFKA_START = os.getenv("KAFKA_START", "earliest") # earliest|latest + +# HTTP classifier +CLASSIFIER_HTTP_URL = os.getenv("CLASSIFIER_HTTP_URL", "http://sounds_classifier:8088/classify") +REQUEST_TIMEOUT = float(os.getenv("REQUEST_TIMEOUT", "5.0")) +RETRIES_TOTAL = int(os.getenv("RETRIES_TOTAL", "3")) +BACKOFF_FACTOR = float(os.getenv("BACKOFF_FACTOR", "0.5")) + +# Flink +DEFAULT_PARALLELISM = int(os.getenv("DEFAULT_PARALLELISM", "1")) +CHECKPOINT_MS = int(os.getenv("CHECKPOINT_MS", "10000")) # 10s +DELIVERY_GUARANTEE = os.getenv("DELIVERY_GUARANTEE", "AT_LEAST_ONCE") # AT_LEAST_ONCE|NONE +TRANSACTION_TIMEOUT_MS = os.getenv("TRANSACTION_TIMEOUT_MS", "600000") # 10 min + +# Optional default bucket to use when input only carries an object key DEFAULT_BUCKET = os.getenv("DEFAULT_BUCKET", "sound") \ No newline at end of file diff --git a/services/sounds_flink/flink_job.py b/services/sounds_flink/flink_job.py index 7b42b0799..de2429cc9 100644 --- a/services/sounds_flink/flink_job.py +++ b/services/sounds_flink/flink_job.py @@ -1,83 +1,83 @@ -""" -Flink Python DataStream job: -- Kafka source (JSON notifications) -- Per-record HTTP classification via pooled Session (processor.process_json_line) -- Optional Kafka sink; if SINK_TOPIC is empty -> print to stdout -""" - -from pyflink.datastream import StreamExecutionEnvironment -from pyflink.datastream.connectors.kafka import ( - KafkaSource, KafkaSink, KafkaRecordSerializationSchema, DeliveryGuarantee -) -from pyflink.common.serialization import SimpleStringSchema -from pyflink.common.watermark_strategy import WatermarkStrategy -from pyflink.datastream.checkpointing_mode import CheckpointingMode -from pyflink.common import Types -from processor import process_json_line - -from config import ( - KAFKA_BROKERS, - SOURCE_TOPIC, - SINK_TOPIC, - GROUP_ID, - KAFKA_START, - DEFAULT_PARALLELISM, - CHECKPOINT_MS, - DELIVERY_GUARANTEE, - TRANSACTION_TIMEOUT_MS, -) - -def main(): - env = StreamExecutionEnvironment.get_execution_environment() - env.set_parallelism(DEFAULT_PARALLELISM) - env.enable_checkpointing(CHECKPOINT_MS, CheckpointingMode.EXACTLY_ONCE) - - source = ( - KafkaSource.builder() - .set_bootstrap_servers(KAFKA_BROKERS) - .set_topics(SOURCE_TOPIC) - .set_group_id(GROUP_ID) - .set_property("auto.offset.reset", KAFKA_START) - .set_value_only_deserializer(SimpleStringSchema()) - .build() - ) - - stream = env.from_source( - source, - WatermarkStrategy.no_watermarks(), - f"source-{SOURCE_TOPIC}", - ) - - mapped = stream.map(process_json_line, output_type=Types.STRING()) - filtered = mapped.filter(lambda s: bool(s and s.strip())) - - # Always print for quick debugging - filtered.name("stdout-preview").print() - - # Optional Kafka sink - if SINK_TOPIC: - guarantee = ( - DeliveryGuarantee.AT_LEAST_ONCE - if DELIVERY_GUARANTEE.upper() == "AT_LEAST_ONCE" - else DeliveryGuarantee.NONE - ) - sink = ( - KafkaSink.builder() - .set_bootstrap_servers(KAFKA_BROKERS) - .set_record_serializer( - KafkaRecordSerializationSchema.builder() - .set_topic(SINK_TOPIC) - .set_value_serialization_schema(SimpleStringSchema()) - .build() - ) - .set_delivery_guarantee(guarantee) - .set_property("transaction.timeout.ms", TRANSACTION_TIMEOUT_MS) - .build() - ) - filtered.sink_to(sink).name(f"sink-{SINK_TOPIC}") - - env.execute("flink-http-classifier") - - -if __name__ == "__main__": - main() +""" +Flink Python DataStream job: +- Kafka source (JSON notifications) +- Per-record HTTP classification via pooled Session (processor.process_json_line) +- Optional Kafka sink; if SINK_TOPIC is empty -> print to stdout +""" + +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.datastream.connectors.kafka import ( + KafkaSource, KafkaSink, KafkaRecordSerializationSchema, DeliveryGuarantee +) +from pyflink.common.serialization import SimpleStringSchema +from pyflink.common.watermark_strategy import WatermarkStrategy +from pyflink.datastream.checkpointing_mode import CheckpointingMode +from pyflink.common import Types +from processor import process_json_line + +from config import ( + KAFKA_BROKERS, + SOURCE_TOPIC, + SINK_TOPIC, + GROUP_ID, + KAFKA_START, + DEFAULT_PARALLELISM, + CHECKPOINT_MS, + DELIVERY_GUARANTEE, + TRANSACTION_TIMEOUT_MS, +) + +def main(): + env = StreamExecutionEnvironment.get_execution_environment() + env.set_parallelism(DEFAULT_PARALLELISM) + env.enable_checkpointing(CHECKPOINT_MS, CheckpointingMode.EXACTLY_ONCE) + + source = ( + KafkaSource.builder() + .set_bootstrap_servers(KAFKA_BROKERS) + .set_topics(SOURCE_TOPIC) + .set_group_id(GROUP_ID) + .set_property("auto.offset.reset", KAFKA_START) + .set_value_only_deserializer(SimpleStringSchema()) + .build() + ) + + stream = env.from_source( + source, + WatermarkStrategy.no_watermarks(), + f"source-{SOURCE_TOPIC}", + ) + + mapped = stream.map(process_json_line, output_type=Types.STRING()) + filtered = mapped.filter(lambda s: bool(s and s.strip())) + + # Always print for quick debugging + filtered.name("stdout-preview").print() + + # Optional Kafka sink + if SINK_TOPIC: + guarantee = ( + DeliveryGuarantee.AT_LEAST_ONCE + if DELIVERY_GUARANTEE.upper() == "AT_LEAST_ONCE" + else DeliveryGuarantee.NONE + ) + sink = ( + KafkaSink.builder() + .set_bootstrap_servers(KAFKA_BROKERS) + .set_record_serializer( + KafkaRecordSerializationSchema.builder() + .set_topic(SINK_TOPIC) + .set_value_serialization_schema(SimpleStringSchema()) + .build() + ) + .set_delivery_guarantee(guarantee) + .set_property("transaction.timeout.ms", TRANSACTION_TIMEOUT_MS) + .build() + ) + filtered.sink_to(sink).name(f"sink-{SINK_TOPIC}") + + env.execute("flink-http-classifier") + + +if __name__ == "__main__": + main() diff --git a/services/sounds_flink/processor.py b/services/sounds_flink/processor.py index b65784e9d..1b4c77814 100644 --- a/services/sounds_flink/processor.py +++ b/services/sounds_flink/processor.py @@ -1,136 +1,136 @@ -import json -import logging -from datetime import datetime -from typing import Tuple, Optional, Dict -from urllib.parse import unquote, unquote_plus - -import requests -from requests.adapters import HTTPAdapter -from urllib3.util.retry import Retry - -from config import ( - CLASSIFIER_HTTP_URL, - REQUEST_TIMEOUT, - RETRIES_TOTAL, - BACKOFF_FACTOR, - DEFAULT_BUCKET, -) - -# Reusable HTTP session with retries/backoff -_session = requests.Session() -_retries = Retry( - total=RETRIES_TOTAL, - backoff_factor=BACKOFF_FACTOR, - status_forcelist=(429, 500, 502, 503, 504), - allowed_methods=["GET", "POST"], - respect_retry_after_header=True, -) -_session.mount("http://", HTTPAdapter(max_retries=_retries)) -_session.mount("https://", HTTPAdapter(max_retries=_retries)) - - -def _try_json(raw: str) -> Optional[Dict]: - try: - return json.loads(raw) - except Exception: - return None - - -def _extract_bucket_key(event: Dict) -> Tuple[Optional[str], Optional[str]]: - """ - Extract (bucket, key) from multiple possible MinIO/S3 event shapes. - Supports: - - short link format: {"file_name": "...", "key": "/", "linked_time": "..."} - - flat: {"Bucket": "...", "Key": "..."} - - Records[0].s3.bucket.name / Records[0].s3.object.key - """ - bucket: Optional[str] = None - key: Optional[str] = None - - # 1) Short link format (from *_connections topics): key="/" - if isinstance(event.get("key"), str): - k = event["key"].strip() - k = unquote_plus(unquote(k)) - if "/" in k: - bucket, key = k.split("/", 1) - else: - key = k # no bucket provided here - - # 2) Flat shape - if (bucket is None or key is None) and event.get("Bucket") and event.get("Key"): - bucket = bucket or event.get("Bucket") - key = key or event.get("Key") - - # 3) Records[...] S3-style - if bucket is None or key is None: - records = event.get("Records") or [] - if records: - r0 = records[0] - s3 = r0.get("s3", {}) - b = s3.get("bucket", {}) - o = s3.get("object", {}) - bucket = bucket or b.get("name") - key = key or o.get("key") - - # Normalize/URL-decode - if isinstance(key, str) and key: - key = unquote_plus(unquote(key)) - - return bucket, key - - -def _classify(bucket: Optional[str], key: Optional[str]) -> Optional[Dict]: - """ - Call the classifier service with the resolved (bucket, key). - The classifier expects: - { "s3_bucket": "...", "s3_key": "..." } - """ - if not key: - return None - - # Prefer provided bucket, otherwise fallback to DEFAULT_BUCKET if configured - eff_bucket = bucket or (DEFAULT_BUCKET if DEFAULT_BUCKET else None) - if not eff_bucket: - # Without a bucket we cannot call the classifier - return None - - payload = { - "s3_bucket": eff_bucket, - "s3_key": key, - } - - try: - resp = _session.post(CLASSIFIER_HTTP_URL, json=payload, timeout=REQUEST_TIMEOUT) - if resp.status_code >= 400: - logging.warning("Classifier returned %s for key=%s", resp.status_code, key) - return None - return resp.json() - except Exception as e: - logging.warning("Classifier request failed for key=%s: %s", key, e) - return None - - -def process_json_line(raw: str) -> str: - """ - Map function: input raw JSON string -> output JSON string or "" to skip. - 1) Parse JSON - 2) Extract (bucket, key) - 3) Call classifier (payload: s3_bucket/s3_key) - 4) Return compact JSON result or "" to drop - """ - event = _try_json(raw) - if not event: - return "" - - bucket, key = _extract_bucket_key(event) - result = _classify(bucket, key) - if not result: - return "" - - out = { - "s3_bucket": bucket or DEFAULT_BUCKET or "", - "s3_key": key, - "result": result, - "received_at": datetime.utcnow().isoformat(timespec="seconds") + "Z", - } - return json.dumps(out, separators=(",", ":")) +import json +import logging +from datetime import datetime +from typing import Tuple, Optional, Dict +from urllib.parse import unquote, unquote_plus + +import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +from config import ( + CLASSIFIER_HTTP_URL, + REQUEST_TIMEOUT, + RETRIES_TOTAL, + BACKOFF_FACTOR, + DEFAULT_BUCKET, +) + +# Reusable HTTP session with retries/backoff +_session = requests.Session() +_retries = Retry( + total=RETRIES_TOTAL, + backoff_factor=BACKOFF_FACTOR, + status_forcelist=(429, 500, 502, 503, 504), + allowed_methods=["GET", "POST"], + respect_retry_after_header=True, +) +_session.mount("http://", HTTPAdapter(max_retries=_retries)) +_session.mount("https://", HTTPAdapter(max_retries=_retries)) + + +def _try_json(raw: str) -> Optional[Dict]: + try: + return json.loads(raw) + except Exception: + return None + + +def _extract_bucket_key(event: Dict) -> Tuple[Optional[str], Optional[str]]: + """ + Extract (bucket, key) from multiple possible MinIO/S3 event shapes. + Supports: + - short link format: {"file_name": "...", "key": "/", "linked_time": "..."} + - flat: {"Bucket": "...", "Key": "..."} + - Records[0].s3.bucket.name / Records[0].s3.object.key + """ + bucket: Optional[str] = None + key: Optional[str] = None + + # 1) Short link format (from *_connections topics): key="/" + if isinstance(event.get("key"), str): + k = event["key"].strip() + k = unquote_plus(unquote(k)) + if "/" in k: + bucket, key = k.split("/", 1) + else: + key = k # no bucket provided here + + # 2) Flat shape + if (bucket is None or key is None) and event.get("Bucket") and event.get("Key"): + bucket = bucket or event.get("Bucket") + key = key or event.get("Key") + + # 3) Records[...] S3-style + if bucket is None or key is None: + records = event.get("Records") or [] + if records: + r0 = records[0] + s3 = r0.get("s3", {}) + b = s3.get("bucket", {}) + o = s3.get("object", {}) + bucket = bucket or b.get("name") + key = key or o.get("key") + + # Normalize/URL-decode + if isinstance(key, str) and key: + key = unquote_plus(unquote(key)) + + return bucket, key + + +def _classify(bucket: Optional[str], key: Optional[str]) -> Optional[Dict]: + """ + Call the classifier service with the resolved (bucket, key). + The classifier expects: + { "s3_bucket": "...", "s3_key": "..." } + """ + if not key: + return None + + # Prefer provided bucket, otherwise fallback to DEFAULT_BUCKET if configured + eff_bucket = bucket or (DEFAULT_BUCKET if DEFAULT_BUCKET else None) + if not eff_bucket: + # Without a bucket we cannot call the classifier + return None + + payload = { + "s3_bucket": eff_bucket, + "s3_key": key, + } + + try: + resp = _session.post(CLASSIFIER_HTTP_URL, json=payload, timeout=REQUEST_TIMEOUT) + if resp.status_code >= 400: + logging.warning("Classifier returned %s for key=%s", resp.status_code, key) + return None + return resp.json() + except Exception as e: + logging.warning("Classifier request failed for key=%s: %s", key, e) + return None + + +def process_json_line(raw: str) -> str: + """ + Map function: input raw JSON string -> output JSON string or "" to skip. + 1) Parse JSON + 2) Extract (bucket, key) + 3) Call classifier (payload: s3_bucket/s3_key) + 4) Return compact JSON result or "" to drop + """ + event = _try_json(raw) + if not event: + return "" + + bucket, key = _extract_bucket_key(event) + result = _classify(bucket, key) + if not result: + return "" + + out = { + "s3_bucket": bucket or DEFAULT_BUCKET or "", + "s3_key": key, + "result": result, + "received_at": datetime.utcnow().isoformat(timespec="seconds") + "Z", + } + return json.dumps(out, separators=(",", ":")) diff --git a/services/sounds_flink/requirements.txt b/services/sounds_flink/requirements.txt index bb811ad65..f5de43b68 100644 --- a/services/sounds_flink/requirements.txt +++ b/services/sounds_flink/requirements.txt @@ -1,6 +1,6 @@ -apache-flink==1.19.3 -requests==2.32.3 -urllib3==2.2.3 -protobuf==4.25.3 -googleapis-common-protos==1.63.0 -grpcio==1.60.0 +apache-flink==1.19.3 +requests==2.32.3 +urllib3==2.2.3 +protobuf==4.25.3 +googleapis-common-protos==1.63.0 +grpcio==1.60.0 diff --git a/simulators/data/ultra-sound/metadata/mic-u-2_20251003T120500Z.json b/simulators/data/ultra-sound/metadata/mic-u-2_20251003T120500Z.json index 904c0e9d7..43ce7d2a8 100644 --- a/simulators/data/ultra-sound/metadata/mic-u-2_20251003T120500Z.json +++ b/simulators/data/ultra-sound/metadata/mic-u-2_20251003T120500Z.json @@ -1,14 +1,14 @@ -{ - "file_name": "mic-u-2_20251003T120500Z.wav", - "device_id": "mic-u-2", - "capture_time": "2025-10-03T12:05:00Z", - "duration_sec": 0.002, - "done": false, - "sample_rate_hz": 500000, - "channels": 1, - "content_type": "audio/wav", - "gis_origin": { - "latitude": 32.89561, - "longitude": 30.9681 - } +{ + "file_name": "mic-u-2_20251003T120500Z.wav", + "device_id": "mic-u-2", + "capture_time": "2025-10-03T12:05:00Z", + "duration_sec": 0.002, + "done": false, + "sample_rate_hz": 500000, + "channels": 1, + "content_type": "audio/wav", + "gis_origin": { + "latitude": 32.89561, + "longitude": 30.9681 + } } \ No newline at end of file diff --git a/simulators/data/ultra-sound/metadata/mic-u-2_20251101T120500Z.json b/simulators/data/ultra-sound/metadata/mic-u-2_20251101T120500Z.json index ea64becee..d6ca0d3fc 100644 --- a/simulators/data/ultra-sound/metadata/mic-u-2_20251101T120500Z.json +++ b/simulators/data/ultra-sound/metadata/mic-u-2_20251101T120500Z.json @@ -1,14 +1,14 @@ -{ - "file_name": "mic-u-2_20251101T120500Z.wav", - "device_id": "mic-u-2", - "capture_time": "2025-11-01T12:05:00Z", - "duration_sec": 0.002, - "done": false, - "sample_rate_hz": 500000, - "channels": 1, - "content_type": "audio/wav", - "gis_origin": { - "latitude": 32.89561, - "longitude": 30.9681 - } +{ + "file_name": "mic-u-2_20251101T120500Z.wav", + "device_id": "mic-u-2", + "capture_time": "2025-11-01T12:05:00Z", + "duration_sec": 0.002, + "done": false, + "sample_rate_hz": 500000, + "channels": 1, + "content_type": "audio/wav", + "gis_origin": { + "latitude": 32.89561, + "longitude": 30.9681 + } } \ No newline at end of file diff --git a/simulators/data/ultra-sound/metadata/mic-u-2_20251102T120500Z.json b/simulators/data/ultra-sound/metadata/mic-u-2_20251102T120500Z.json index aa691f67c..d5a7b2360 100644 --- a/simulators/data/ultra-sound/metadata/mic-u-2_20251102T120500Z.json +++ b/simulators/data/ultra-sound/metadata/mic-u-2_20251102T120500Z.json @@ -1,14 +1,14 @@ -{ - "file_name": "mic-u-2_20251102T120500Z.wav", - "device_id": "mic-u-2", - "capture_time": "2025-11-02T12:05:00Z", - "duration_sec": 0.002, - "done": false, - "sample_rate_hz": 500000, - "channels": 1, - "content_type": "audio/wav", - "gis_origin": { - "latitude": 32.89561, - "longitude": 30.9681 - } +{ + "file_name": "mic-u-2_20251102T120500Z.wav", + "device_id": "mic-u-2", + "capture_time": "2025-11-02T12:05:00Z", + "duration_sec": 0.002, + "done": false, + "sample_rate_hz": 500000, + "channels": 1, + "content_type": "audio/wav", + "gis_origin": { + "latitude": 32.89561, + "longitude": 30.9681 + } } \ No newline at end of file diff --git a/simulators/data/ultra-sound/metadata/mic-u-2_20251102T140500Z.json b/simulators/data/ultra-sound/metadata/mic-u-2_20251102T140500Z.json index b2175772f..fdae67d8b 100644 --- a/simulators/data/ultra-sound/metadata/mic-u-2_20251102T140500Z.json +++ b/simulators/data/ultra-sound/metadata/mic-u-2_20251102T140500Z.json @@ -1,14 +1,14 @@ -{ - "file_name": "mic-u-2_20251102T140500Z.wav", - "device_id": "mic-u-2", - "capture_time": "2025-11-02T14:05:00Z", - "duration_sec": 0.002, - "done": false, - "sample_rate_hz": 500000, - "channels": 1, - "content_type": "audio/wav", - "gis_origin": { - "latitude": 32.89561, - "longitude": 30.9681 - } +{ + "file_name": "mic-u-2_20251102T140500Z.wav", + "device_id": "mic-u-2", + "capture_time": "2025-11-02T14:05:00Z", + "duration_sec": 0.002, + "done": false, + "sample_rate_hz": 500000, + "channels": 1, + "content_type": "audio/wav", + "gis_origin": { + "latitude": 32.89561, + "longitude": 30.9681 + } } \ No newline at end of file diff --git a/simulators/data/ultra-sound/metadata/mic-u-2_20251103T120500Z.json b/simulators/data/ultra-sound/metadata/mic-u-2_20251103T120500Z.json index 4c9194165..51251b6f5 100644 --- a/simulators/data/ultra-sound/metadata/mic-u-2_20251103T120500Z.json +++ b/simulators/data/ultra-sound/metadata/mic-u-2_20251103T120500Z.json @@ -1,14 +1,14 @@ -{ - "file_name": "mic-u-2_20251103T120500Z.wav", - "device_id": "mic-u-2", - "capture_time": "2025-11-03T12:05:00Z", - "duration_sec": 0.002, - "done": false, - "sample_rate_hz": 500000, - "channels": 1, - "content_type": "audio/wav", - "gis_origin": { - "latitude": 32.89561, - "longitude": 30.9681 - } +{ + "file_name": "mic-u-2_20251103T120500Z.wav", + "device_id": "mic-u-2", + "capture_time": "2025-11-03T12:05:00Z", + "duration_sec": 0.002, + "done": false, + "sample_rate_hz": 500000, + "channels": 1, + "content_type": "audio/wav", + "gis_origin": { + "latitude": 32.89561, + "longitude": 30.9681 + } } \ No newline at end of file diff --git a/simulators/data/ultra-sound/metadata/mic-u-2_20251104T120500Z.json b/simulators/data/ultra-sound/metadata/mic-u-2_20251104T120500Z.json index ec582d87f..268fa727e 100644 --- a/simulators/data/ultra-sound/metadata/mic-u-2_20251104T120500Z.json +++ b/simulators/data/ultra-sound/metadata/mic-u-2_20251104T120500Z.json @@ -1,14 +1,14 @@ -{ - "file_name": "mic-u-2_20251104T120500Z.wav", - "device_id": "mic-u-2", - "capture_time": "2025-11-04T12:05:00Z", - "duration_sec": 0.002, - "done": false, - "sample_rate_hz": 500000, - "channels": 1, - "content_type": "audio/wav", - "gis_origin": { - "latitude": 32.89561, - "longitude": 30.9681 - } +{ + "file_name": "mic-u-2_20251104T120500Z.wav", + "device_id": "mic-u-2", + "capture_time": "2025-11-04T12:05:00Z", + "duration_sec": 0.002, + "done": false, + "sample_rate_hz": 500000, + "channels": 1, + "content_type": "audio/wav", + "gis_origin": { + "latitude": 32.89561, + "longitude": 30.9681 + } } \ No newline at end of file diff --git a/simulators/data/ultra-sound/metadata/mic-u-2_20251105T120500Z.json b/simulators/data/ultra-sound/metadata/mic-u-2_20251105T120500Z.json index 47e76c56f..41fc118b5 100644 --- a/simulators/data/ultra-sound/metadata/mic-u-2_20251105T120500Z.json +++ b/simulators/data/ultra-sound/metadata/mic-u-2_20251105T120500Z.json @@ -1,14 +1,14 @@ -{ - "file_name": "mic-u-2_20251105T120500Z.wav", - "device_id": "mic-u-2", - "capture_time": "2025-11-05T12:05:00Z", - "duration_sec": 0.002, - "done": false, - "sample_rate_hz": 500000, - "channels": 1, - "content_type": "audio/wav", - "gis_origin": { - "latitude": 32.89561, - "longitude": 30.9681 - } +{ + "file_name": "mic-u-2_20251105T120500Z.wav", + "device_id": "mic-u-2", + "capture_time": "2025-11-05T12:05:00Z", + "duration_sec": 0.002, + "done": false, + "sample_rate_hz": 500000, + "channels": 1, + "content_type": "audio/wav", + "gis_origin": { + "latitude": 32.89561, + "longitude": 30.9681 + } } \ No newline at end of file