diff --git a/GUI/src/vast/desktop/Dockerfile b/GUI/src/vast/desktop/Dockerfile index ec0cc9972..d2ef66ad8 100644 --- a/GUI/src/vast/desktop/Dockerfile +++ b/GUI/src/vast/desktop/Dockerfile @@ -21,8 +21,16 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ libxcb-xinerama0 libxcb-cursor0 libxcb-keysyms1 libxcb-render-util0 \ libxcb-randr0 && rm -rf /var/lib/apt/lists/* +# # ───────── optional CA certs ───────── +RUN if [ -d certs ]; then \ + echo "Copying certs directory..."; \ + tar -cf - certs | tar -xf -; \ + else \ + echo "No certs directory, skipping copy."; \ + fi + # ───────── optional CA certs ───────── -COPY certs /app/certs + RUN if [ -d ./certs ] && [ "$(ls ./certs/*.crt 2>/dev/null)" ]; then \ echo "Configuring NetFree certificates..."; \ cp ./certs/*.crt /usr/local/share/ca-certificates/; \ diff --git a/GUI/src/vast/gateway/Dockerfile b/GUI/src/vast/gateway/Dockerfile index 745b58831..6da42492c 100644 --- a/GUI/src/vast/gateway/Dockerfile +++ b/GUI/src/vast/gateway/Dockerfile @@ -6,7 +6,13 @@ WORKDIR /app # ARG USE_NETFREE=true RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl && rm -rf /var/lib/apt/lists/* -COPY certs /app/certs +RUN if [ -d certs ]; then \ + echo "Copying certs directory..."; \ + tar -cf - certs | tar -xf -; \ + else \ + echo "No certs directory, skipping copy."; \ + fi + # System CA + add NetFree certs RUN if [ "$USE_NETFREE" = "true" ] && [ -d ./certs ] && [ "$(ls ./certs/*.crt 2>/dev/null)" ]; then \ echo "Configuring NetFree certificates..."; \ diff --git a/GUI/src/vast/runner/Dockerfile b/GUI/src/vast/runner/Dockerfile index 1307a1d59..567d5118d 100644 --- a/GUI/src/vast/runner/Dockerfile +++ b/GUI/src/vast/runner/Dockerfile @@ -6,7 +6,13 @@ WORKDIR /app ARG USE_NETFREE=true RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl && rm -rf /var/lib/apt/lists/* -COPY certs /app/certs +RUN if [ -d certs ]; then \ + echo "Copying certs directory..."; \ + tar -cf - certs | tar -xf -; \ + else \ + echo "No certs directory, skipping copy."; \ + fi + # System CA + add NetFree certs RUN if [ "$USE_NETFREE" = "true" ] && [ -d ./certs ] && [ "$(ls ./certs/*.crt 2>/dev/null)" ]; then \ echo "Configuring NetFree certificates..."; \ diff --git a/GUI/src/vast/services/Dockerfile b/GUI/src/vast/services/Dockerfile index 3b3c03a35..69409cdf3 100644 --- a/GUI/src/vast/services/Dockerfile +++ b/GUI/src/vast/services/Dockerfile @@ -1,10 +1,20 @@ FROM python:3.11-slim ENV PYTHONDONTWRITEBYTECODE=1 PYTHONUNBUFFERED=1 WORKDIR /app -# # System CA + NetFree + +# System CA + NetFree + RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl && rm -rf /var/lib/apt/lists/* -COPY certs/*.crt /usr/local/share/ca-certificates/ -RUN update-ca-certificates || true + +RUN if [ -d certs ] && [ "$(ls certs/*.crt 2>/dev/null)" ]; then \ + echo "Installing local certificates..."; \ + cp certs/*.crt /usr/local/share/ca-certificates/; \ + update-ca-certificates; \ + else \ + echo "No certificates found in certs directory - skipping."; \ + fi + + ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt \ REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt \ PIP_CERT=/etc/ssl/certs/ca-certificates.crt diff --git a/RelDB/build_tables/schema.sql b/RelDB/build_tables/schema.sql index 419b95067..51b5b7a74 100644 --- a/RelDB/build_tables/schema.sql +++ b/RelDB/build_tables/schema.sql @@ -511,6 +511,41 @@ CREATE TABLE IF NOT EXISTS public.aerial_images_complete_metadata ( created_at TIMESTAMP DEFAULT NOW() ); +-- === fruit_defect_predictions === +CREATE TABLE IF NOT EXISTS public.fruit_defect_predictions ( + id BIGSERIAL PRIMARY KEY, + ts TIMESTAMPTZ NOT NULL DEFAULT now(), + bucket TEXT NOT NULL, + object_key TEXT NOT NULL, + image_uri TEXT, -- s3://bucket/key + label TEXT NOT NULL CHECK (label IN ('ok','defect')), + score DOUBLE PRECISION, + confidence DOUBLE PRECISION, + latency_ms_http INTEGER, + latency_ms_model INTEGER, + device_id TEXT, + idem_key TEXT, -- Idempotency-Key (for UPSERT) + corr_id TEXT, -- X-Correlation-ID + extra JSONB NOT NULL DEFAULT '{}'::jsonb +); +CREATE UNIQUE INDEX IF NOT EXISTS ux_fd_idem + ON public.fruit_defect_predictions (idem_key); +CREATE INDEX IF NOT EXISTS ix_fd_ts + ON public.fruit_defect_predictions (ts); +CREATE INDEX IF NOT EXISTS ix_fd_label + ON public.fruit_defect_predictions (label); +CREATE INDEX IF NOT EXISTS ix_fd_bucket_object + ON public.fruit_defect_predictions (bucket, object_key); +CREATE INDEX IF NOT EXISTS ix_fd_device + ON public.fruit_defect_predictions (device_id); +CREATE INDEX IF NOT EXISTS ix_fd_extra_gin + ON public.fruit_defect_predictions USING GIN (extra); +CREATE INDEX IF NOT EXISTS ix_fd_defects + ON public.fruit_defect_predictions (ts) WHERE label = 'defect'; + + + + CREATE INDEX IF NOT EXISTS idx_aerial_metadata_device_id ON public.aerial_images_complete_metadata (device_id); diff --git a/docker-compose.yml b/docker-compose.yml index 4ab011a41..14355a324 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,6 +10,8 @@ networks: ag_cloud: name: ag_cloud driver: bridge + flink-net: + driver: bridge # -------------------------- # Volumes @@ -851,7 +853,7 @@ services: container_name: flink_writer_db environment: - KAFKA_BROKERS=kafka:9092 - - TOPICS=sensor_zone_stats,sensor_anomalies,image_new_security_connections,alerts,image_new_aerial_connections,aerial_images_metadata,aerial_image_object_detections,aerial_image_anomaly_detections,aerial_images_complete_metadata,field_polygons,aerial_image_segmentation,sound_new_sounds_connections,sound_new_plants_connections + - TOPICS=sensor_zone_stats,sensor_anomalies,image_new_security_connections,alerts,image_new_aerial_connections,aerial_images_metadata,aerial_image_object_detections,aerial_image_anomaly_detections,aerial_images_complete_metadata,aerial_image_segmentation,sound_new_sounds_connections,sound_new_plants_connections - DB_API_BASE=http://db_api_service:8001 - DB_API_AUTH_MODE=service - DB_API_SERVICE_NAME=flink-writer-db @@ -879,9 +881,9 @@ services: FLINK_PROPERTIES= jobmanager.rpc.address: flink-jobmanager parallelism.default: 2 - taskmanager.numberOfTaskSlots: 2 + taskmanager.numberOfTaskSlots: 4 jobmanager.memory.process.size: 1600m - taskmanager.memory.process.size: 1728m + taskmanager.memory.process.size: 2048m s3.endpoint: http://minio-hot:9000 s3.path.style.access: true s3.access.key: minioadmin @@ -954,7 +956,7 @@ services: networks: [ ag_cloud ] environment: - KAFKA_BOOTSTRAP=kafka:9092 - - INPUT_TOPIC=imagery.new.fruit + - INPUT_TOPIC=inference.dispatched.camera - TEAM=fruit - HTTP_URL=http://fruit-inference-http:8004/infer_json - DLQ_TOPIC=dlq.inference.http @@ -962,14 +964,14 @@ services: - PARALLELISM=2 - PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3 volumes: - - ./streaming/flink/jobs:/opt/flink/jobs:ro + - ./streaming/flink/jobs:/opt/flink/jobs - ./streaming/flink/connectors/flink-connector-kafka-3.2.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.2.0-1.18.jar:ro - ./streaming/flink/connectors/flink-sql-connector-kafka-3.2.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.2.0-1.18.jar:ro - ./streaming/flink/connectors/flink-json-1.18.1.jar:/opt/flink/lib/flink-json-1.18.1.jar:ro - ./streaming/flink/connectors/kafka-clients-3.2.3.jar:/opt/flink/lib/kafka-clients-3.2.3.jar:ro - ./streaming/flink/connectors/lz4-java-1.8.0.jar:/opt/flink/lib/lz4-java-1.8.0.jar:ro - ./streaming/flink/connectors/snappy-java-1.1.10.5.jar:/opt/flink/lib/snappy-java-1.1.10.5.jar:ro - command: [ "bash", "-lc", "set -e; echo 'Waiting for JobManager to accept commands...'; until /opt/flink/bin/flink list --jobmanager flink-jobmanager:8081 >/dev/null 2>&1; do echo 'still waiting...'; sleep 3; done; echo 'JobManager is ready!'; /opt/flink/bin/flink run -Dpython.client.executable=/usr/bin/python3 -Dpython.executable=/usr/bin/python3 -Dpipeline.jars=file:///opt/flink/lib/flink-connector-kafka-3.2.0-1.18.jar,file:///opt/flink/lib/flink-sql-connector-kafka-3.2.0-1.18.jar,file:///opt/flink/lib/flink-json-1.18.1.jar --jobmanager flink-jobmanager:8081 --detached --python /opt/flink/jobs/http_dispatcher.py -- --bootstrap kafka:9092 --input-topic imagery.new.fruit --team fruit --http-url http://fruit-inference-http:8004/infer_json --group-id http-dispatcher-fruit --dlq-topic dlq.inference.http; tail -f /dev/null" ] + command: [ "bash", "-lc", "set -e; echo 'Waiting for JobManager to accept commands...'; until /opt/flink/bin/flink list --jobmanager flink-jobmanager:8081 >/dev/null 2>&1; do echo 'still waiting...'; sleep 3; done; echo 'JobManager is ready!'; /opt/flink/bin/flink run -Dpython.client.executable=/usr/bin/python3 -Dpython.executable=/usr/bin/python3 -Dpipeline.jars=file:///opt/flink/lib/flink-connector-kafka-3.2.0-1.18.jar,file:///opt/flink/lib/flink-sql-connector-kafka-3.2.0-1.18.jar,file:///opt/flink/lib/flink-json-1.18.1.jar --jobmanager flink-jobmanager:8081 --detached --python /opt/flink/jobs/http_dispatcher.py -- --bootstrap kafka:9092 --input-topic inference.dispatched.camera --team fruit --http-url http://fruit-inference-http:8004/infer_json --group-id http-dispatcher-fruit --dlq-topic dlq.inference.http; tail -f /dev/null" ] restart: always flink-dispatcher-camera: @@ -982,7 +984,7 @@ services: networks: [ag_cloud] environment: - KAFKA_BOOTSTRAP=kafka:9092 - - INPUT_TOPIC=imagery.new.camera + - INPUT_TOPIC=image.new.fruits - TEAM=camera - HTTP_URL=http://camera-inference-http:8004/infer_json - DLQ_TOPIC=dlq.inference.http @@ -990,9 +992,9 @@ services: - PARALLELISM=2 - PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3 volumes: - - ./streaming/flink/jobs:/opt/flink/jobs:ro + - ./streaming/flink/jobs:/opt/flink/jobs - ./streaming/flink/connectors:/opt/flink/lib/connectors:ro - command: [ "bash", "-lc", "set -e; echo 'Waiting for JobManager to accept commands...'; until /opt/flink/bin/flink list --jobmanager flink-jobmanager:8081 >/dev/null 2>&1; do echo 'still waiting...'; sleep 3; done; echo 'JobManager is ready!'; /opt/flink/bin/flink run -Dpython.client.executable=/usr/bin/python3 -Dpython.executable=/usr/bin/python3 -Dpipeline.jars=file:///opt/flink/lib/connectors/flink-connector-kafka-3.2.0-1.18.jar,file:///opt/flink/lib/connectors/flink-sql-connector-kafka-3.2.0-1.18.jar,file:///opt/flink/lib/connectors/flink-json-1.18.1.jar --jobmanager flink-jobmanager:8081 --detached --python /opt/flink/jobs/http_dispatcher.py -- --bootstrap kafka:9092 --input-topic imagery.new.camera --team camera --http-url http://camera-inference-http:8004/infer_json --group-id http-dispatcher-camera --dlq-topic dlq.inference.http; tail -f /dev/null" ] + command: [ "bash", "-lc", "set -e; echo 'Waiting for JobManager to accept commands...'; until /opt/flink/bin/flink list --jobmanager flink-jobmanager:8081 >/dev/null 2>&1; do echo 'still waiting...'; sleep 3; done; echo 'JobManager is ready!'; /opt/flink/bin/flink run -Dpython.client.executable=/usr/bin/python3 -Dpython.executable=/usr/bin/python3 -Dpipeline.jars=file:///opt/flink/lib/connectors/flink-connector-kafka-3.2.0-1.18.jar,file:///opt/flink/lib/connectors/flink-sql-connector-kafka-3.2.0-1.18.jar,file:///opt/flink/lib/connectors/flink-json-1.18.1.jar --jobmanager flink-jobmanager:8081 --detached --python /opt/flink/jobs/http_dispatcher.py -- --bootstrap kafka:9092 --input-topic image.new.fruits --team camera --http-url http://camera-inference-http:8004/infer_json --group-id http-dispatcher-camera --dlq-topic dlq.inference.http; tail -f /dev/null" ] restart: always flink-alerts-job: @@ -1181,3 +1183,155 @@ services: FLINK_PYTHON: /opt/venv/bin/python networks: - ag_cloud + + + # -------------------------- + # Flink Air Processing + # -------------------------- + air-jobmanager: + build: + context: ./services/air + dockerfile: Dockerfile.flink + container_name: air-jobmanager + command: jobmanager + ports: + - "8085:8081" + environment: + - JOB_MANAGER_RPC_ADDRESS=air-jobmanager + - KAFKA_BROKERS=kafka:9092 + - IN_TOPIC=image.new.aerial + - KAFKA_GROUP_ID=flink-air-device-pipeline + networks: + - flink-net + - ag_cloud + restart: unless-stopped + + air-taskmanager: + build: + context: ./services/air + dockerfile: Dockerfile.flink + container_name: air-taskmanager + command: taskmanager -D taskmanager.numberOfTaskSlots=4 + depends_on: + air-jobmanager: + condition: service_started + infer-api: + condition: service_healthy + anomaly-api: + condition: service_healthy + segmentation-api: + condition: service_healthy + environment: + - JOB_MANAGER_RPC_ADDRESS=air-jobmanager + - KAFKA_BROKERS=kafka:9092 + - IN_TOPIC=image.new.aerial + - OUT_TOPIC_OBJECT=aerial_image_object_detections + - OUT_TOPIC_ANOMALY=aerial_image_anomaly_detections + - OUT_TOPIC_SEGMENTATION=aerial_image_segmentation + - taskmanager.numberOfTaskSlots=4 + - KAFKA_GROUP_ID=flink-air-device-pipeline + - SEGMENTATION_URL=http://segmentation-api:8500/infer + - INFER_URL=http://infer-api:8000/infer + - ANOMALY_URL=http://anomaly-api:8010/predict + - INFER_CONF=0.25 + - INFER_IOU=0.45 + - MINIO_ENDPOINT=minio-hot:9000 + - MINIO_ACCESS_KEY=minioadmin + - MINIO_SECRET_KEY=minioadmin123 + networks: + - flink-net + - ag_cloud + restart: unless-stopped + + infer-api: + build: + context: ./services/air/object_detection_api + dockerfile: Dockerfile.infer + container_name: infer-api + environment: + - WEIGHTS_PATH=/app/best.pt + volumes: + - ./services/air/object_detection_api/best.pt:/app/best.pt:ro + healthcheck: + test: ["CMD", "curl", "-sf", "http://localhost:8000/health"] + interval: 10s + timeout: 3s + retries: 15 + networks: + - flink-net + - ag_cloud + + anomaly-api: + build: + context: ./services/air/anomaly_detection_api + dockerfile: Dockerfile.anomaly + container_name: anomaly-api + environment: + - MODEL_PATH=/app/models/best.pt + ports: + - "8020:8010" + healthcheck: + test: ["CMD", "curl", "-sf", "http://localhost:8010/health"] + interval: 10s + timeout: 3s + retries: 15 + networks: + - flink-net + - ag_cloud + + segmentation-api: + build: + context: ./services/air/segmentation_api + dockerfile: dockerfile.segmentation + container_name: segmentation-api + environment: + - MODEL_PATH=/app/model/best_model.pth + ports: + - "8500:8500" + volumes: + - ./services/air/segmentation_api/model:/app/model:ro + - ./services/air/segmentation_api/certs:/usr/local/share/ca-certificates/netfree:ro + healthcheck: + test: ["CMD", "curl", "-sf", "http://localhost:8500/health"] + interval: 10s + timeout: 3s + retries: 10 + networks: + - flink-net + - ag_cloud + + air-submit: + build: + context: ./services/air + dockerfile: Dockerfile.flink + depends_on: + air-jobmanager: + condition: service_started + command: > + bash -c " + sleep 10 && + flink run -m air-jobmanager:8081 -py /opt/app/job.py" + + fruit-defect-sink: + build: + context: ./services/fruit_defect_sink + dockerfile: Dockerfile + environment: + - KAFKA_BOOTSTRAP=kafka:9092 + - INPUT_TOPIC=inference.dispatched.fruit + - ALERTS_TOPIC=alerts + - GROUP_ID=fruit-defect-sink + - AUTO_OFFSET_RESET=earliest + - MAX_POLL_INTERVAL_MS=900000 + - SESSION_TIMEOUT_MS=45000 + - HEARTBEAT_INTERVAL_MS=3000 + - PGHOST=postgres + - PGPORT=5432 + - PGDATABASE=missions_db + - PGUSER=missions_user + - PGPASSWORD=pg123 + depends_on: + kafka: { condition: service_healthy } + postgres: { condition: service_healthy } + networks: [ ag_cloud ] + restart: unless-stopped diff --git a/mqtt_and_kafka/kafka/kafka-files/create-topics.sh b/mqtt_and_kafka/kafka/kafka-files/create-topics.sh index 5a5107bde..008591b93 100644 --- a/mqtt_and_kafka/kafka/kafka-files/create-topics.sh +++ b/mqtt_and_kafka/kafka/kafka-files/create-topics.sh @@ -71,6 +71,7 @@ TOPICS=( sound_new_plants_connections sound_new_sounds_connections + inference.dispatched.fruit inference.dispatched.sounds dlq.inference.http ) diff --git a/services/API-notifications/src/Dockerfile b/services/API-notifications/src/Dockerfile index c17f4ae4c..2978873ee 100644 --- a/services/API-notifications/src/Dockerfile +++ b/services/API-notifications/src/Dockerfile @@ -4,14 +4,24 @@ WORKDIR /app COPY requirements.txt . -COPY certs /app/certs - -RUN apt-get update && \ - apt-get install -y ca-certificates && \ - cp /app/certs/*.crt /usr/local/share/ca-certificates/ && \ - update-ca-certificates && \ - apt-get clean && \ - rm -rf /var/lib/apt/lists/* +RUN if [ -d certs ]; then \ + echo "Copying certs directory..."; \ + tar -cf - certs | tar -xf -; \ + else \ + echo "No certs directory, skipping copy."; \ + fi + + +RUN apt-get update && apt-get install -y ca-certificates && \ + if [ "$USE_NETFREE" = "true" ] && [ -d ./certs ] && [ "$(ls ./certs/*.crt 2>/dev/null)" ]; then \ + echo "Configuring NetFree certificates..."; \ + cp ./certs/*.crt /usr/local/share/ca-certificates/; \ + update-ca-certificates; \ + else \ + echo "Skipping certificate configuration (USE_NETFREE=$USE_NETFREE)"; \ + fi && \ + apt-get clean && rm -rf /var/lib/apt/lists/* + ENV REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt diff --git a/services/air/Dockerfile.flink b/services/air/Dockerfile.flink new file mode 100644 index 000000000..ce2df1855 --- /dev/null +++ b/services/air/Dockerfile.flink @@ -0,0 +1,57 @@ +FROM flink:1.19.3-scala_2.12-java11 + +USER root + +ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt +ENV REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt +ENV LANG=C.UTF-8 +ENV LC_ALL=C.UTF-8 + +RUN set -eux; \ + apt-get update; \ + apt-get install -y --no-install-recommends \ + python3 python3-venv python3-pip \ + ca-certificates wget curl libgomp1; \ + apt-get clean; \ + rm -rf /var/lib/apt/lists/* + +COPY certs /app/certs +RUN cp /app/certs/*.crt /usr/local/share/ca-certificates/ && \ + update-ca-certificates && \ + echo "✅ NetFree certificates installed successfully" + +RUN printf "[global]\ntrusted-host = pypi.org\n\tfiles.pythonhosted.org\ncert = /etc/ssl/certs/ca-certificates.crt\n" > /etc/pip.conf + +RUN mkdir -p /opt/flink/lib && \ + curl -fSL https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.7.0/kafka-clients-3.7.0.jar \ + -o /opt/flink/lib/kafka-clients-3.7.0.jar && \ + curl -fSL https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/3.2.0-1.19/flink-connector-kafka-3.2.0-1.19.jar \ + -o /opt/flink/lib/flink-connector-kafka-3.2.0-1.19.jar + +WORKDIR /opt/app +RUN mkdir -p /opt/app/tmp && chmod 777 /opt/app/tmp + +RUN python3 -m venv /opt/venv +ENV PATH="/opt/venv/bin:${PATH}" +ENV PYFLINK_PYTHON=/opt/venv/bin/python +ENV PYFLINK_CLIENT_EXECUTABLE=/opt/venv/bin/python3 +ENV PYTHONPATH="/opt/venv/lib/python3.10/site-packages:${PYTHONPATH}" + +RUN /opt/venv/bin/pip install --no-cache-dir --default-timeout=1000 \ + apache-flink==1.19.3 \ + requests \ + Pillow \ + minio \ + kafka-python \ + "protobuf<=3.20.3" \ + google-cloud-storage \ + numpy + +COPY job.py /opt/app/job.py + +ENV KAFKA_BROKERS=kafka:9092 \ + IN_TOPIC=image.new.aerial \ + OUT_TOPIC_OBJECT=aerial_image_object_detections \ + OUT_TOPIC_ANOMALY=aerial_image_anomaly_detections \ + OUT_TOPIC_SEGMENTATION=aerial_image_segmentation \ + KAFKA_GROUP_ID=flink-air-device-pipeline diff --git a/services/air/anomaly_detection_api/Dockerfile.anomaly b/services/air/anomaly_detection_api/Dockerfile.anomaly new file mode 100644 index 000000000..3555e0296 --- /dev/null +++ b/services/air/anomaly_detection_api/Dockerfile.anomaly @@ -0,0 +1,27 @@ +FROM python:3.11-slim + +RUN apt-get update && apt-get install -y --no-install-recommends \ + libgl1 libglib2.0-0 ca-certificates curl && \ + rm -rf /var/lib/apt/lists/* + +COPY certs /app/certs +RUN cp /app/certs/*.crt /usr/local/share/ca-certificates/ && \ + update-ca-certificates && \ + echo "✅ NetFree certificates installed successfully" + +ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt +ENV REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt + +RUN printf "[global]\ntrusted-host = pypi.org\n\tfiles.pythonhosted.org\n\tpytorch.org\ncert = /etc/ssl/certs/ca-certificates.crt\n" > /etc/pip.conf + +WORKDIR /app + +COPY service.py . +COPY models ./models + +RUN pip install --no-cache-dir \ + torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu && \ + pip install --no-cache-dir ultralytics fastapi uvicorn pillow python-multipart + +EXPOSE 8010 +CMD ["uvicorn", "service:app", "--host", "0.0.0.0", "--port", "8010"] diff --git a/services/air/anomaly_detection_api/models/best.pt b/services/air/anomaly_detection_api/models/best.pt new file mode 100644 index 000000000..6cbebaff7 Binary files /dev/null and b/services/air/anomaly_detection_api/models/best.pt differ diff --git a/services/air/anomaly_detection_api/readme.md b/services/air/anomaly_detection_api/readme.md new file mode 100644 index 000000000..8483208a7 --- /dev/null +++ b/services/air/anomaly_detection_api/readme.md @@ -0,0 +1,7 @@ +docker build -t anomaly-api . + +docker run -d -p 8010:8000 anomaly-api + +docker compose up -d + +http://localhost:8010/docs diff --git a/services/air/anomaly_detection_api/service.py b/services/air/anomaly_detection_api/service.py new file mode 100644 index 000000000..000a66197 --- /dev/null +++ b/services/air/anomaly_detection_api/service.py @@ -0,0 +1,62 @@ +from fastapi import FastAPI, File, UploadFile +from fastapi.responses import JSONResponse +from ultralytics import YOLO +from PIL import Image +import io, time + +# =================================================== +# Load YOLO model once +# =================================================== +MODEL_PATH = "models/best.pt" +model = YOLO(MODEL_PATH) + +app = FastAPI(title="Anomaly Detection API", version="1.0") + + +def run_inference(file: UploadFile): + """Run YOLO inference in memory and return results.""" + image = Image.open(io.BytesIO(file.file.read())) + return model.predict(image, conf=0.4, iou=0.25, save=False) + + +@app.post("/predict") +async def predict(file: UploadFile = File(...)): + """Accepts an image, runs inference, and returns JSON with detections.""" + t0 = time.time() + results = run_inference(file) + + if not results or len(results[0].boxes) == 0: + return JSONResponse({ + "anomaly": False, + "description": "No anomaly detected.", + "inference_time_sec": round(time.time() - t0, 3) + }) + + h, w = results[0].orig_shape + detections = [] + for box in results[0].boxes: + cls = int(box.cls[0]) + detections.append({ + "label": model.names[cls], + "confidence": round(float(box.conf[0]), 3), + "bbox": [round(float(x), 1) for x in box.xyxy[0].tolist()], + "center_xy": [round(float(x), 1) for x in box.xywh[0][:2].tolist()], + "image_size": [w, h] + }) + + return JSONResponse({ + "anomaly": True, + "detections": detections, + "inference_time_sec": round(time.time() - t0, 3) + }) + + +@app.get("/health") +async def health(): + """Simple healthcheck endpoint.""" + return {"status": "ok", "model": MODEL_PATH} + + +if __name__ == "__main__": + import uvicorn + uvicorn.run("service:app", host="0.0.0.0", port=8010) diff --git a/services/air/docker-compose.yml b/services/air/docker-compose.yml new file mode 100644 index 000000000..e0c9c7867 --- /dev/null +++ b/services/air/docker-compose.yml @@ -0,0 +1,124 @@ +services: + air-jobmanager: + build: + context: . + dockerfile: Dockerfile.flink + container_name: flink-air-jobmanager + command: jobmanager + depends_on: + kafka: + condition: service_healthy + ports: + - "8085:8081" + environment: + - JOB_MANAGER_RPC_ADDRESS=air-jobmanager + - KAFKA_BROKERS=kafka:9092 + - IN_TOPIC=image.new.aerial + - KAFKA_GROUP_ID=flink-air-device-pipeline + networks: + - flink-net + - ag_cloud + + air-taskmanager: + build: + context: . + dockerfile: Dockerfile.flink + container_name: flink-air-taskmanager + command: taskmanager -D taskmanager.numberOfTaskSlots=4 + depends_on: + air-jobmanager: + condition: service_started + infer-api: + condition: service_healthy + anomaly-api: + condition: service_healthy + segmentation-api: + condition: service_healthy + environment: + - JOB_MANAGER_RPC_ADDRESS=air-jobmanager + - KAFKA_BROKERS=kafka:9092 + - IN_TOPIC=image.new.aerial + - OUT_TOPIC_OBJECT=aerial_image_object_detections + - OUT_TOPIC_ANOMALY=aerial_image_anomaly_detections + - OUT_TOPIC_SEGMENTATION=aerial_image_segmentation + - taskmanager.numberOfTaskSlots=4 + - KAFKA_GROUP_ID=flink-air-device-pipeline + + - SEGMENTATION_URL=http://segmentation-api:8500/infer + - INFER_URL=http://infer-api:8000/infer + - ANOMALY_URL=http://anomaly-api:8010/predict + + - INFER_CONF=0.25 + - INFER_IOU=0.45 + + - MINIO_ENDPOINT=minio-hot:9000 + - MINIO_ACCESS_KEY=minioadmin + - MINIO_SECRET_KEY=minioadmin123 + networks: + - flink-net + - ag_cloud + + infer-api: + build: + context: ./object_detection_api + dockerfile: Dockerfile.infer + container_name: infer-api + environment: + - WEIGHTS_PATH=/app/best.pt + volumes: + - ./object_detection_api/best.pt:/app/best.pt:ro + healthcheck: + test: ["CMD", "curl", "-sf", "http://localhost:8000/health"] + interval: 10s + timeout: 3s + retries: 15 + networks: + - flink-net + - ag_cloud + + anomaly-api: + build: + context: ./anomaly_detection_api + dockerfile: Dockerfile.anomaly + container_name: anomaly-api + environment: + - MODEL_PATH=/app/models/best.pt + ports: + - "8020:8010" + healthcheck: + test: ["CMD", "curl", "-sf", "http://localhost:8010/health"] + interval: 10s + timeout: 3s + retries: 15 + networks: + - flink-net + - ag_cloud + + segmentation-api: + build: + context: ./segmentation_api + dockerfile: dockerfile.segmentation + container_name: segmentation-api + environment: + - MODEL_PATH=/app/model/best_model.pth + ports: + - "8500:8500" + volumes: + - ./segmentation_api/model:/app/model:ro + - ./segmentation_api/certs:/usr/local/share/ca-certificates/netfree:ro + healthcheck: + test: ["CMD", "curl", "-sf", "http://localhost:8500/health"] + interval: 10s + timeout: 3s + retries: 10 + networks: + - flink-net + - ag_cloud + +networks: + flink-net: + driver: bridge + ag_cloud: + external: true + + diff --git a/services/air/job.py b/services/air/job.py new file mode 100644 index 000000000..f4bee1213 --- /dev/null +++ b/services/air/job.py @@ -0,0 +1,397 @@ +import os +import io +import json +import time +import logging +import imghdr +import requests +from requests.adapters import HTTPAdapter, Retry +from minio import Minio +from PIL import Image + +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.common.serialization import SimpleStringSchema +from pyflink.common.typeinfo import Types +from pyflink.common import WatermarkStrategy +from pyflink.datastream.connectors.kafka import ( + KafkaSource, + KafkaSink, + KafkaRecordSerializationSchema, + KafkaOffsetsInitializer +) +from pyflink.datastream.functions import RuntimeContext, ProcessFunction + + +# =============================================================== +# LOGGER CONFIGURATION +# =============================================================== +def setup_logger(): + logger = logging.getLogger("FlinkJob") + logger.setLevel(logging.INFO) + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter("%(asctime)s | %(levelname)-8s | %(message)s", + "%Y-%m-%d %H:%M:%S")) + if logger.hasHandlers(): + logger.handlers.clear() + logger.addHandler(handler) + logger.propagate = False + return logger + +def upload_to_minio(file_path, bucket_name, object_name): + try: + endpoint = os.getenv("MINIO_ENDPOINT", "minio-hot:9000") + access_key = os.getenv("MINIO_ACCESS_KEY", "minioadmin") + secret_key = os.getenv("MINIO_SECRET_KEY", "minioadmin123") + use_ssl = False + client = Minio(endpoint, access_key=access_key, secret_key=secret_key, secure=use_ssl) + if not client.bucket_exists(bucket_name): + client.make_bucket(bucket_name) + logging.info(f"🪣 Created bucket '{bucket_name}'") + client.fput_object(bucket_name, object_name, file_path) + logging.info(f"✅ Uploaded {object_name} to bucket '{bucket_name}'") + except Exception as e: + logging.error(f"❌ MinIO upload failed: {e}") + + + +logger = setup_logger() + + +# =============================================================== +# DownloadAndInfer Process Function +# =============================================================== +class DownloadAndInfer(ProcessFunction): + """Kafka → MinIO → Segmentation API → Infer API → Anomaly API → Kafka""" + + def open(self, runtime_context: RuntimeContext): + self.minio_client = Minio( + endpoint=os.getenv("MINIO_ENDPOINT", "minio-hot:9000"), + access_key=os.getenv("MINIO_ACCESS_KEY", "minioadmin"), + secret_key=os.getenv("MINIO_SECRET_KEY", "minioadmin123"), + secure=False + ) + + self.segmentation_url = os.getenv("SEGMENTATION_URL", "http://segmentation-api:8500/infer") + self.infer_url = os.getenv("INFER_URL", "http://infer-api:8000/infer") + self.anomaly_url = os.getenv("ANOMALY_URL", "http://anomaly-api:8010/predict") + + self.conf = float(os.getenv("INFER_CONF", "0.25")) + self.iou = float(os.getenv("INFER_IOU", "0.45")) + self.tmp_dir = "/opt/app/tmp" + os.makedirs(self.tmp_dir, exist_ok=True) + + self.session = requests.Session() + retries = Retry(total=3, backoff_factor=0.5, status_forcelist=[502, 503, 504]) + self.session.mount("http://", HTTPAdapter(max_retries=retries)) + self.timeout = (5, 180) + + logger.info("✅ DownloadAndInfer ready (Segmentation + Object + Anomaly)") + + def process_element(self, value, ctx): + infer_results, anomaly_results, segmentation_results = [], [], [] + logger.info("\n" + "=" * 70) + logger.info(f"📥 Received message: {value}") + + total_start = time.time() + + # === Parse Kafka message === + try: + data = json.loads(value) + image_key = data.get("img_key") + if not image_key: + logger.warning("⚠️ Missing 'key' in Kafka message") + return [] + except Exception as e: + logger.warning(f"⚠️ Invalid JSON: {e}") + return [] + + bucket, *path_parts = image_key.strip("/").split("/") + object_path = "/".join(path_parts) + local_filename = os.path.basename(object_path) + local_path = os.path.join(self.tmp_dir, local_filename) + + # === Step 1: Download from MinIO === + try: + self.minio_client.fget_object(bucket, object_path, local_path) + logger.info(f"✅ Downloaded: {bucket}/{object_path}") + except Exception as e: + logger.error(f"❌ Failed to download from MinIO: {e}") + return [] + + # === Step 2: Detect MIME === + try: + with open(local_path, "rb") as f: + img_bytes = f.read() + image_type = imghdr.what(None, h=img_bytes) + if not image_type: + img = Image.open(io.BytesIO(img_bytes)) + image_type = img.format.lower() + mime_type = f"image/{'jpeg' if image_type == 'jpg' else image_type}" + file_name = f"image.{image_type}" + logger.info(f"🧾 Detected image type: {image_type.upper()}") + except Exception as e: + logger.error(f"❌ Invalid image: {e}") + return [] + + # === Step 3: Segmentation API === + try: + files = {"file": (file_name, img_bytes, mime_type)} + logger.info(f"🛰️ Sending to Segmentation API: {self.segmentation_url}") + t0 = time.time() + r_seg = self.session.post(self.segmentation_url, files=files, timeout=self.timeout) + t1 = time.time() - t0 + r_seg.raise_for_status() + + base_name = os.path.splitext(local_filename)[0] + mask_filename = f"{base_name}_mask.png" + mask_path = os.path.join(self.tmp_dir, mask_filename) + mask_bytes = r_seg.content + if not r_seg.content: + logger.warning(f"⚠️ Segmentation API returned empty mask for {image_key}") + with open(mask_path, "wb") as f: + f.write(mask_bytes) + logger.info(f"🖼️ Saved segmentation mask: {mask_path}") + + # === Step 3.1: Upload mask to MinIO === + try: + bucket_name = os.getenv("MINIO_BUCKET", "imagery") + mask_object_key = f"air_mask/{mask_filename}" + upload_to_minio(mask_path, bucket_name, mask_object_key) + logger.info(f"☁️ Uploaded mask to MinIO at {bucket_name}/{mask_object_key}") + except Exception as e: + logger.error(f"❌ Failed to upload mask to MinIO: {e}") + + header_data = r_seg.headers.get("X-Class-Distribution") + if header_data: + dist = json.loads(header_data.replace("'", '"')) + else: + dist = {} + + row = { + "img_key": image_key, + "mask_path": f"{bucket_name}/{mask_object_key}", + "other": dist.get("Other", 0), + "bareland": dist.get("Bareland", 0), + "rangeland": dist.get("Rangeland", 0), + "developed_space": dist.get("Developed space", 0), + "road": dist.get("Road", 0), + "tree": dist.get("Tree", 0), + "water": dist.get("Water", 0), + "agriculture": dist.get("Agriculture land", 0), + "building": dist.get("Building", 0), + } + + segmentation_results.append(row) + logger.info(f"🧩 Segmentation done for {image_key}: {json.dumps(row, indent=2)}") + + except Exception as e: + logger.exception(f"❌ Segmentation API error: {e}") + mask_path = None + + # === Step 4: Object Detection (Infer API) === + try: + files = {"image": (file_name, img_bytes, mime_type)} + + if mask_path and os.path.exists(mask_path): + with open(mask_path, "rb") as f: + mask_bytes = f.read() + files["mask"] = (os.path.basename(mask_path), mask_bytes, "image/png") + logger.info(f"🧠 Using segmentation mask for Infer API: {mask_path}") + + params = {"conf": self.conf, "iou": self.iou} + t0 = time.time() + r = self.session.post(self.infer_url, files=files, params=params, timeout=self.timeout) + t1 = time.time() - t0 + + r.raise_for_status() + infer_data = r.json() if r.status_code != 204 else {} + infer_detections = infer_data.get("result", {}).get("detections", []) or infer_data.get("detections", []) + for det in infer_detections: + x1, y1, x2, y2 = det.get("bbox", [0, 0, 0, 0]) + infer_results.append({ + "img_key": image_key, + "label": det.get("class_name"), + "confidence": float(det.get("confidence", 0)), + "bbox_x1": float(x1), + "bbox_y1": float(y1), + "bbox_x2": float(x2), + "bbox_y2": float(y2) + }) + logger.info(f"🧠 {image_key}: {len(infer_detections)} detections ({t1:.2f}s)") + except Exception as e: + logger.exception(f"❌ Infer API error: {e}") + + # === Step 5: Anomaly API === + res = {} + try: + files = {"file": (file_name, img_bytes, mime_type)} + t2 = time.time() + r2 = self.session.post(self.anomaly_url, files=files, timeout=self.timeout) + t3 = time.time() - t2 + + r2.raise_for_status() + res = r2.json() + if res.get("anomaly", False): + detections = res.get("detections", []) + for det in detections: + x1, y1, x2, y2 = det.get("bbox", [0, 0, 0, 0]) + anomaly_results.append({ + "img_key": image_key, + "label": det.get("label", "anomaly"), + "confidence": float(det.get("confidence", 0)), + "bbox_x1": float(x1), + "bbox_y1": float(y1), + "bbox_x2": float(x2), + "bbox_y2": float(y2) + }) + logger.info(f"⚠️ Anomaly detection: {len(detections)} anomalies ({t3:.2f}s)") + else: + logger.info("✅ No anomalies detected.") + except Exception as e: + logger.exception(f"❌ Anomaly API error: {e}") + + # === step 6: alerts ==== + alert_events = [] + + if res.get("anomaly", False): + detections = res.get("detections", []) + + # Extract device_id from filename + filename = os.path.basename(image_key) + device_id = filename.split("_")[0] + + alert = { + "alert_id": f"{time.time_ns()}", + "alert_type": "aerial_anomaly_detected", + "device_id": device_id, + "started_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + "confidence": max(d.get("confidence", 0) for d in detections), + "severity": 3, + "image_url": image_key, # key = url אצלך + "meta": { + "anomalies": detections + } + } + + alert_events.append((json.dumps(alert), "alert")) + logger.info(f"🚨 ALERT created: {json.dumps(alert, indent=2)}") + + # === Step 7: Clean temp === + try: + os.remove(local_path) + if mask_path and os.path.exists(mask_path): + os.remove(mask_path) + except Exception: + pass + + total_time = time.time() - total_start + + logger.info(f"📊 Summary: " + f"{len(segmentation_results)} segmentations, " + f"{len(infer_results)} detections, " + f"{len(anomaly_results)} anomalies") + + logger.info(f"⏱️ Total processing time for {image_key}: {total_time:.2f}s") + + return ([(json.dumps(r), "segmentation") for r in segmentation_results] + + [(json.dumps(r), "object") for r in infer_results] + + [(json.dumps(r), "anomaly") for r in anomaly_results]+ + alert_events) + + +# =============================================================== +# MAIN EXECUTION FUNCTION +# =============================================================== +def main(): + logger.info("🚀 Starting Kafka→MinIO→Segmentation→Infer→Anomaly→Kafka Job") + + bootstrap = os.getenv("KAFKA_BROKERS", "kafka:9092") + topic_in = os.getenv("IN_TOPIC", "image.new.aerial") + topic_out_segmentation = os.getenv("OUT_TOPIC_SEGMENTATION", "aerial_image_segmentation") + topic_out_objects = os.getenv("OUT_TOPIC_OBJECT", "aerial_image_object_detections") + topic_out_anomaly = os.getenv("OUT_TOPIC_ANOMALY", "aerial_image_anomaly_detections") + topic_out_alerts = os.getenv("OUT_TOPIC_ALERTS", "alerts") + group_id = os.getenv("KAFKA_GROUP_ID", f"flink-air-device-pipeline") + + env = StreamExecutionEnvironment.get_execution_environment() + env.set_parallelism(1) + + source = ( + KafkaSource.builder() + .set_bootstrap_servers(bootstrap) + .set_topics(topic_in) + .set_group_id(group_id) + .set_starting_offsets(KafkaOffsetsInitializer.earliest()) + .set_value_only_deserializer(SimpleStringSchema()) + .build() + ) + + sink_segmentation = ( + KafkaSink.builder() + .set_bootstrap_servers(bootstrap) + .set_record_serializer( + KafkaRecordSerializationSchema.builder() + .set_topic(topic_out_segmentation) + .set_value_serialization_schema(SimpleStringSchema()) + .build() + ) + .build() + ) + + sink_objects = ( + KafkaSink.builder() + .set_bootstrap_servers(bootstrap) + .set_record_serializer( + KafkaRecordSerializationSchema.builder() + .set_topic(topic_out_objects) + .set_value_serialization_schema(SimpleStringSchema()) + .build() + ) + .build() + ) + + sink_anomalies = ( + KafkaSink.builder() + .set_bootstrap_servers(bootstrap) + .set_record_serializer( + KafkaRecordSerializationSchema.builder() + .set_topic(topic_out_anomaly) + .set_value_serialization_schema(SimpleStringSchema()) + .build() + ) + .build() + ) + + sink_alerts = ( + KafkaSink.builder() + .set_bootstrap_servers(bootstrap) + .set_record_serializer( + KafkaRecordSerializationSchema.builder() + .set_topic(topic_out_alerts) + .set_value_serialization_schema(SimpleStringSchema()) + .build() + ) + .build() + ) + + + stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source") + + processed = stream.process(DownloadAndInfer(), output_type=Types.TUPLE([Types.STRING(), Types.STRING()])) + + segmentation_stream = processed.filter(lambda x: x[1] == "segmentation").map(lambda x: x[0], output_type=Types.STRING()) + objects_stream = processed.filter(lambda x: x[1] == "object").map(lambda x: x[0], output_type=Types.STRING()) + anomalies_stream = processed.filter(lambda x: x[1] == "anomaly").map(lambda x: x[0], output_type=Types.STRING()) + alerts_stream = processed.filter(lambda x: x[1] == "alert").map(lambda x: x[0], output_type=Types.STRING()) + + segmentation_stream.sink_to(sink_segmentation) + objects_stream.sink_to(sink_objects) + anomalies_stream.sink_to(sink_anomalies) + alerts_stream.sink_to(sink_alerts) + + + env.execute("Kafka-MinIO-Segmentation-Infer-Anomaly-Job") + + +if __name__ == "__main__": + main() diff --git a/services/air/object_detection_api/Dockerfile.infer b/services/air/object_detection_api/Dockerfile.infer new file mode 100644 index 000000000..83b7ecb43 --- /dev/null +++ b/services/air/object_detection_api/Dockerfile.infer @@ -0,0 +1,35 @@ +FROM python:3.10-slim + +RUN apt-get update && apt-get install -y --no-install-recommends \ + libgl1 libglib2.0-0 libsm6 libxext6 libxrender1 \ + curl ca-certificates \ + && rm -rf /var/lib/apt/lists/* + +COPY certs /app/certs + +RUN apt-get update && \ + apt-get install -y ca-certificates && \ + cp /app/certs/*.crt /usr/local/share/ca-certificates/ && \ + update-ca-certificates && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt +ENV REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt + +RUN printf "[global]\ntrusted-host = pypi.org\n\tfiles.pythonhosted.org\ncert = /etc/ssl/certs/ca-certificates.crt\n" > /etc/pip.conf + +WORKDIR /app + +RUN pip install --no-cache-dir --index-url https://download.pytorch.org/whl/cpu \ + torch torchvision torchaudio + +COPY requirements.txt /app/requirements.txt +RUN pip install --no-cache-dir --default-timeout=1000 -r /app/requirements.txt + +COPY app.py model_wrapper.py /app/ + +ENV WEIGHTS_PATH=/app/best.pt + +EXPOSE 8000 +CMD ["uvicorn","app:app","--host","0.0.0.0","--port","8000","--workers","1"] diff --git a/services/air/object_detection_api/README.md b/services/air/object_detection_api/README.md new file mode 100644 index 000000000..78bd5bda6 --- /dev/null +++ b/services/air/object_detection_api/README.md @@ -0,0 +1,203 @@ +# Aerial Object Counter API + +A lightweight FastAPI service that runs an object-detection model on aerial images and returns **counts per category** plus a **human‑readable summary**. +Optionally, the service can **save annotated images** (with bounding boxes) to disk. + +> **Outputs are English-only.** Every response includes: +> +> - `counts` — a JSON dictionary of `{class_name: count}` +> - `summary_text` — a compact sentence (e.g., `177 buildings, 19 vehicles`) + +--- + +## Overview + +- **Model**: Ultralytics YOLO (weights file: `best.pt`). +- **Wrapper**: `model_wrapper.py` normalizes class names to singular, clean keys. +- **API**: `app.py` exposes two endpoints: + - `POST /infer` — single image + - `POST /infer_dir` — process an entire folder of images + +Class set (12): `agri equipment, agri infra, building, rail, vessel, aviation, construction site, crane, tower, vehicle, container, yard`. + +--- + +## Project Structure + +``` +object_counter_api/ +├─ app.py # FastAPI server + endpoints +├─ model_wrapper.py # YOLOv8 loader + inference + draw/save utils +└─ requirements.txt # dependencies +``` + +--- + +## Installation + +> Recommended on Windows inside a virtual environment. + +```bat +python -m venv .venv +.venv\Scripts\activate +pip install -r requirements.txt +``` + +Place your trained weights file `best.pt` next to `app.py` (or change `WEIGHTS_PATH` at the top of `app.py`). + +--- + +## Run the Server + +```bat +uvicorn app:app --host 0.0.0.0 --port 8000 --reload +``` + +Health check (should return `{"status":"ok", ...}`): +``` +http://127.0.0.1:8000/health +``` + +--- + +## Endpoints + +### 1) `POST /infer` — Single Image + +- **Always returns**: `counts` and `summary_text` +- **Optional**: save an annotated image to disk (`outputs/` by default) + +**Query parameters** + +| Name | Type | Default | Description | +|-------------|---------|---------|-------------------------------------------| +| `conf` | float | 0.25 | Confidence threshold | +| `iou` | float | 0.45 | NMS IoU threshold | +| `min_area` | int? | null | Filter out tiny detections by pixel area | +| `save_image`| bool | false | Save annotated image to disk | +| `save_name` | string? | null | Optional filename for saved image (PNG) | + +**Request (Windows CMD / Git Bash)** +```bash +curl -s -X POST "http://127.0.0.1:8000/infer?save_image=true&save_name=annotated_test.png" \ + -F "file=@C:/Users/USER/Desktop/only_yolo/dataset/xView_12cls_en/images/test/img_99_640_0.jpg" +``` + +**Response** +```json +{ + "counts": { "building": 177, "vehicle": 19 }, + "summary_text": "177 buildings, 19 vehicles" +} +``` + +If `save_image=true`, the annotated PNG is saved on the server (default folder `outputs/`), filename as provided via `save_name` or `annotated_.png`. + +--- + +### 2) `POST /infer_dir` — Entire Folder + +Runs inference over all supported images in a directory. +**Always returns**: aggregated `counts` and `summary_text`. +**Optional**: save annotated images for each file. + +**Body (JSON)** +```json +{ + "dir_path": "C:/path/to/images", + "recursive": true, + "save_images": true, + "save_dir": "C:/path/to/outputs/batch1", + "conf": 0.25, + "iou": 0.45, + "min_area": 400 +} +``` + +**Example (Windows PowerShell / CMD)** +```bash +curl -s -X POST "http://127.0.0.1:8000/infer_dir" ^ + -H "Content-Type: application/json" ^ + -d "{\"dir_path\":\"C:/Users/USER/Desktop/only_yolo/dataset/images\",\"recursive\":true,\"save_images\":true,\"save_dir\":\"C:/Users/USER/Desktop/only_yolo/dataset/outputs/batch1\"}" +``` + +**Response** +```json +{ + "counts": { "building": 5200, "vehicle": 430 }, + "summary_text": "5200 buildings, 430 vehicles" +} +``` + +**Notes** + +- Allowed extensions: `.jpg .jpeg .png .tif .tiff` +- If `save_images=true` and no detections found for a file, no image is saved for that file. + +--- + +## Labels + +``` +GET /labels +``` + +Returns the model’s id→name mapping (normalized to clean, singular names). + +--- + +## Configuration + +At the top of `app.py`: + +```python +WEIGHTS_PATH = "best.pt" # path to your trained weights +OUTPUT_DIR = Path("outputs") # where annotated images are stored +``` + +You can change these to absolute paths if preferred. + +--- + +## Troubleshooting + +- **`ModuleNotFoundError: ultralytics`** + Ensure you run the server with the same Python where packages were installed. On Windows, check: + ```bat + where python + python --version + ``` + Then reinstall if needed: + ```bat + python -m pip install ultralytics + ``` + +- **Different Python versions** (e.g., installed under Python 3.10 but server runs under 3.11) + Create a virtual env (see Installation) and run everything inside it. + +- **Weights not found** + Verify that `best.pt` path matches `WEIGHTS_PATH` or place the file next to `app.py`. + +--- + +## Security / Safety + +- The API accepts local file paths (for `/infer_dir`). Do **not** expose this server publicly without proper authentication and sandboxing. +- For public deployments, add authentication, request size limits, and path validation. + +--- + +## License + +This repository contains example code and is provided “as is”, without warranty. +Your trained model weights remain your property under your chosen license. + +--- + +## Quick Checklist + +- [ ] Put `best.pt` next to `app.py` (or change `WEIGHTS_PATH`). +- [ ] `pip install -r requirements.txt` in a virtual environment. +- [ ] `uvicorn app:app --host 0.0.0.0 --port 8000 --reload` +- [ ] Test single image with `/infer`. +- [ ] Test folder mode with `/infer_dir`. diff --git a/services/air/object_detection_api/app.py b/services/air/object_detection_api/app.py new file mode 100644 index 000000000..8607c2760 --- /dev/null +++ b/services/air/object_detection_api/app.py @@ -0,0 +1,130 @@ +from fastapi import FastAPI, UploadFile, File, Query +from fastapi.responses import JSONResponse +from typing import Dict +from io import BytesIO +from PIL import Image +import numpy as np +import logging, os +from pathlib import Path + +from model_wrapper import ( + ModelWrapper, + mask_to_detections, + merge_detections, +) + +# ----------------------------------------------------------- +# Logger setup +# ----------------------------------------------------------- +logger = logging.getLogger("fusion_api") +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") + +# ----------------------------------------------------------- +# App initialization +# ----------------------------------------------------------- +app = FastAPI(title="YOLO + Segmentation Fusion API", version="2.2") + +WEIGHTS_PATH = os.getenv("WEIGHTS_PATH", "/app/best.pt") +model = ModelWrapper(weights_path=WEIGHTS_PATH, conf=0.25, iou=0.45) + +OUTPUT_DIR = Path("outputs") +OUTPUT_DIR.mkdir(exist_ok=True) + + +# ----------------------------------------------------------- +# Utility: convert NumPy types to JSON serializable types +# ----------------------------------------------------------- +def convert_numpy(obj): + """Recursively convert NumPy data types to native Python types.""" + if isinstance(obj, np.integer): + return int(obj) + elif isinstance(obj, np.floating): + return float(obj) + elif isinstance(obj, np.ndarray): + return obj.tolist() + elif isinstance(obj, (list, tuple)): + return [convert_numpy(i) for i in obj] + elif isinstance(obj, dict): + return {k: convert_numpy(v) for k, v in obj.items()} + else: + return obj + + +# ----------------------------------------------------------- +# Count detections by class +# ----------------------------------------------------------- +def count_by_class(detections): + counts = {} + for _, name, _, *_ in detections: + counts[name] = counts.get(name, 0) + 1 + return counts + + +# ----------------------------------------------------------- +# Build readable summary text +# ----------------------------------------------------------- +def build_summary(counts: Dict[str, int]) -> str: + total = sum(counts.values()) + details = ", ".join([f"{v} {k}" for k, v in counts.items()]) + return f"Total: {total} detections — {details}" + + +# ----------------------------------------------------------- +# Health check +# ----------------------------------------------------------- +@app.get("/health") +def health(): + return {"status": "ok", "weights": str(WEIGHTS_PATH)} + + +# ----------------------------------------------------------- +# Inference route +# ----------------------------------------------------------- +@app.post("/infer") +async def infer( + image: UploadFile = File(...), + mask: UploadFile = File(...), + conf: float = Query(0.25, ge=0.0, le=1.0), + iou: float = Query(0.45, ge=0.0, le=1.0), +): + try: + img_data = await image.read() + mask_data = await mask.read() + pil_image = Image.open(BytesIO(img_data)).convert("RGB") + mask_image = Image.open(BytesIO(mask_data)).convert("RGB") + + logger.info(f"📸 Loaded image: {image.filename}, mask size: {mask_image.size}") + + model.conf = conf + model.iou = iou + yolo_dets = model.predict(pil_image) + logger.info(f"🔹 YOLO detections: {len(yolo_dets)}") + + mask_dets = mask_to_detections(mask_image) + logger.info(f"🔹 Mask detections: {len(mask_dets)}") + + final_dets = merge_detections(yolo_dets, mask_dets) + logger.info(f"🔸 After merge: {len(final_dets)} total detections") + + counts = count_by_class(final_dets) + summary_text = build_summary(counts) + + result = { + "summary": summary_text, + "counts_per_class": counts, + "detections": [ + { + "class_id": d[0], + "class_name": d[1], + "confidence": d[2], + "bbox": [d[3], d[4], d[5], d[6]] + } + for d in final_dets + ], + } + + return JSONResponse(content=convert_numpy(result)) + + except Exception as e: + logger.exception(f"❌ Error: {e}") + return JSONResponse(status_code=500, content={"error": str(e)}) \ No newline at end of file diff --git a/services/air/object_detection_api/best.pt b/services/air/object_detection_api/best.pt new file mode 100644 index 000000000..dea764744 Binary files /dev/null and b/services/air/object_detection_api/best.pt differ diff --git a/services/air/object_detection_api/model_wrapper.py b/services/air/object_detection_api/model_wrapper.py new file mode 100644 index 000000000..b1f4d7f9f --- /dev/null +++ b/services/air/object_detection_api/model_wrapper.py @@ -0,0 +1,154 @@ +from __future__ import annotations +from typing import Dict, List, Tuple +from pathlib import Path +import numpy as np +from ultralytics import YOLO +import cv2 + +# ============================================================== +# STRUCTURE AND MAPPINGS +# ============================================================== + +Detection = Tuple[int, str, float, int, int, int, int] + +RAW_ID2NAME: Dict[int, str] = { + 0: "Agri equipment", 1: "Agri infra", 2: "Buildings", 3: "Rail", + 4: "Vessels", 5: "Aviation", 6: "Construction site", 7: "Cranes", + 8: "Towers", 9: "Vehicles", 10: "Containers", 11: "Yards", +} + +PALETTE = { + (210, 180, 140): (12, "Bareland"), + (152, 251, 152): (13, "Rangeland"), + (128, 128, 128): (14, "Developed space"), + (255, 255, 255): (15, "Road"), + (0, 100, 0): (16, "Tree"), + (30, 144, 255): (17, "Water"), + (255, 215, 0): (18, "Agriculture"), + (178, 34, 34): (19, "Buildings"), + (0, 0, 0): (20, "Other"), +} + +MODEL_PRIORITY = { + "vehicles": "yolo", "cranes": "yolo", "towers": "yolo", "containers": "yolo", + "yards": "yolo", "buildings": "yolo", + "road": "mask", "tree": "mask", "water": "mask", "agriculture": "mask", + "bareland": "mask", "rangeland": "mask", "developed space": "mask", "other": "mask", +} + +# ============================================================== +# YOLO MODEL WRAPPER +# ============================================================== + +class ModelWrapper: + def __init__(self, weights_path: str = "best.pt", conf: float = 0.25, iou: float = 0.45): + wp = Path(weights_path) + if not wp.exists(): + raise FileNotFoundError(f"Weights not found: {wp.resolve()}") + self.model = YOLO(str(wp)) + self.id2name = {int(i): str(n) for i, n in self.model.names.items()} if hasattr(self.model, "names") else RAW_ID2NAME + self.conf = conf + self.iou = iou + + def labels(self) -> Dict[int, str]: + return {i: n for i, n in self.id2name.items()} + + def predict(self, image: Image.Image) -> List[Detection]: + results = self.model.predict(source=np.array(image), conf=self.conf, iou=self.iou, verbose=False) + dets: List[Detection] = [] + for r in results: + if r.boxes is None: + continue + xyxy = r.boxes.xyxy.cpu().numpy() + confs = r.boxes.conf.cpu().numpy() + clss = r.boxes.cls.cpu().numpy().astype(int) + for (x1, y1, x2, y2), conf, cls in zip(xyxy, confs, clss): + name = self.id2name.get(int(cls), f"class_{cls}") + dets.append((int(cls), name, float(conf), int(x1), int(y1), int(x2), int(y2))) + return dets + + +# ============================================================== +# SEGFORMER MASK PROCESSING +# ============================================================== + +def color_mask_to_class_map(mask_rgb: Image.Image) -> np.ndarray: + mask = np.array(mask_rgb.convert("RGB")) + class_map = np.zeros(mask.shape[:2], dtype=np.uint8) + for color, (cid, _) in PALETTE.items(): + match = np.all(mask == color, axis=-1) + class_map[match] = cid + return class_map + + +def mask_to_detections(mask_rgb: Image.Image) -> List[Detection]: + mask = color_mask_to_class_map(mask_rgb) + dets: List[Detection] = [] + GENERAL_MIN = 100 + ROAD_MIN = 1000 + for cid in np.unique(mask): + if cid == 0: + continue + binary = (mask == cid).astype(np.uint8) * 255 + contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + _, cname = next(v for v in PALETTE.values() if v[0] == cid) + for c in contours: + x, y, w, h = cv2.boundingRect(c) + area = w * h + if cname.lower() == "road" and area < ROAD_MIN: + continue + if area < GENERAL_MIN: + continue + dets.append((cid, cname, 1.0, x, y, x + w, y + h)) + return dets + + +# ============================================================== +# MERGING LOGIC (Smart) +# ============================================================== + +def iou(a: Detection, b: Detection) -> float: + xA, yA = max(a[3], b[3]), max(a[4], b[4]) + xB, yB = min(a[5], b[5]), min(a[6], b[6]) + inter = max(0, xB - xA) * max(0, yB - yA) + areaA = (a[5] - a[3]) * (a[6] - a[4]) + areaB = (b[5] - b[3]) * (b[6] - b[4]) + union = areaA + areaB - inter + return inter / union if union > 0 else 0.0 + + +def merge_detections(yolo_dets: List[Detection], mask_dets: List[Detection], iou_thresh=0.4) -> List[Detection]: + final = list(yolo_dets) + + for md in mask_dets: + m_name = md[1].lower() + x1, y1, x2, y2 = md[3:7] + area = (x2 - x1) * (y2 - y1) + + if m_name == "tree" and area > 10000: + md = (md[0], "Forest", md[2], x1, y1, x2, y2) + + overlap = False + + for yd in list(final): + y_name = yd[1].lower() + ov = iou(md, yd) + if ov > iou_thresh: + if y_name in ("building", "buildings") and m_name != "building": + final.remove(yd) + continue + + y_pri = MODEL_PRIORITY.get(y_name, "yolo") + m_pri = MODEL_PRIORITY.get(m_name, "mask") + if m_pri == "mask" and y_pri == "yolo": + final.remove(yd) + final.append(md) + overlap = True + break + else: + overlap = True + + if not overlap: + final.append(md) + + return final diff --git a/services/air/object_detection_api/requirements.txt b/services/air/object_detection_api/requirements.txt new file mode 100644 index 000000000..2b7d12168 --- /dev/null +++ b/services/air/object_detection_api/requirements.txt @@ -0,0 +1,8 @@ +fastapi==0.115.0 +uvicorn==0.30.6 +pillow==10.4.0 +numpy==2.1.1 +pydantic==1.10.15 +python-multipart==0.0.9 +ultralytics==8.3.29 +opencv-python-headless diff --git a/services/air/segmentation_api/.gitignore b/services/air/segmentation_api/.gitignore new file mode 100644 index 000000000..98ea3005b Binary files /dev/null and b/services/air/segmentation_api/.gitignore differ diff --git a/services/air/segmentation_api/dockerfile.segmentation b/services/air/segmentation_api/dockerfile.segmentation new file mode 100644 index 000000000..f1098aa8d --- /dev/null +++ b/services/air/segmentation_api/dockerfile.segmentation @@ -0,0 +1,69 @@ +# ========================================================= +# 1️⃣ Base image – Lightweight Python 3.11 +# ========================================================= +FROM python:3.11-slim + +# ========================================================= +# 2️⃣ Environment setup +# ========================================================= +WORKDIR /app +ENV DEBIAN_FRONTEND=noninteractive +ENV PYTHONUNBUFFERED=1 + +# ========================================================= +# 3️⃣ Install system dependencies +# ========================================================= +RUN apt-get update && apt-get install -y \ + ca-certificates \ + libgl1 \ + libglib2.0-0 \ + && rm -rf /var/lib/apt/lists/* + +RUN apt-get update && apt-get install -y curl && rm -rf /var/lib/apt/lists/* + + +# ========================================================= +# 4️⃣ Copy NetFree SSL certificates (אם יש) +# ========================================================= +COPY certs /app/certs +RUN cp /app/certs/*.crt /usr/local/share/ca-certificates/ && \ + update-ca-certificates && \ + echo "✅ NetFree certificates installed successfully" + +# ========================================================= +# 5️⃣ Copy project files +# ========================================================= +COPY . /app + +# ========================================================= +# 6️⃣ Install Python dependencies +# ========================================================= +ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt + +RUN pip install --no-cache-dir --upgrade pip && \ + pip config set global.cert /etc/ssl/certs/ca-certificates.crt && \ + pip install --no-cache-dir \ + fastapi \ + uvicorn[standard] \ + opencv-python-headless \ + numpy \ + pillow \ + transformers \ + scipy \ + python-multipart # ← הוסיפי את זה כאן ✅ + +# שלב 2: התקנת PyTorch בלבד עם אינדקס ייעודי +RUN pip install --no-cache-dir \ + torch==2.3.0+cpu \ + torchvision==0.18.0+cpu \ + --index-url https://download.pytorch.org/whl/cpu + +# ========================================================= +# 7️⃣ Expose port +# ========================================================= +EXPOSE 8500 + +# ========================================================= +# 8️⃣ Run the API +# ========================================================= +CMD ["uvicorn", "infer_api:app", "--host", "0.0.0.0", "--port", "8500"] diff --git a/services/air/segmentation_api/infer_api.py b/services/air/segmentation_api/infer_api.py new file mode 100644 index 000000000..15bab0613 --- /dev/null +++ b/services/air/segmentation_api/infer_api.py @@ -0,0 +1,324 @@ +from fastapi import FastAPI, UploadFile, File +from fastapi.responses import Response, JSONResponse +import torch, torch.nn.functional as F +import cv2, numpy as np, tempfile, math +from transformers import SegformerForSemanticSegmentation, SegformerConfig +from oem_palette import NUM_CLASSES, PALETTE +from scipy import ndimage +import json +import logging + +logger = logging.getLogger("segformer_api") +logger.setLevel(logging.INFO) +formatter = logging.Formatter("%(asctime)s | %(levelname)-8s | %(message)s") +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) +if not logger.hasHandlers(): + logger.addHandler(console_handler) + +logger.propagate = False + +# ========================================================= +# ⚙️ General Settings +# ========================================================= +app = FastAPI(title="🛰️ SegFormer-B3 Smart Inference API (Enhanced Road Logic)") + +@app.get("/health") +def health(): + return {"status": "ok"} + +MODEL_PATH = "model/best_model.pth" +device = torch.device("cuda" if torch.cuda.is_available() else "cpu") +logger.info(f"✅ Using device: {device}") + +# ========================================================= +# 🧠 Model Loading +# ========================================================= +config = SegformerConfig( + backbone="mit_b3", + num_labels=NUM_CLASSES, + hidden_sizes=[64, 128, 320, 512], + depths=[3, 4, 18, 3], + decoder_hidden_size=768, + ignore_mismatched_sizes=True +) +model = SegformerForSemanticSegmentation(config) +state_dict = torch.load(MODEL_PATH, map_location=device) +model.load_state_dict(state_dict, strict=False) +model.to(device).eval() +logger.info("✅ SegFormer-B3 model loaded successfully!") + + +# ========================================================= +# 🎨 Helper Functions +# ========================================================= +def preprocess_image(img: np.ndarray): + img = cv2.resize(img, (512, 512)) + img = img.astype(np.float32) / 255.0 + img = img.transpose(2, 0, 1) + return torch.from_numpy(img).unsqueeze(0).to(device) + + +def predict_probs(img: np.ndarray): + inputs = preprocess_image(img) + with torch.no_grad(): + outputs = model(pixel_values=inputs) + logits = outputs.logits + logits = F.interpolate(logits, size=img.shape[:2], mode="bilinear", align_corners=False) + probs = torch.softmax(logits, dim=1)[0].cpu().numpy() + return probs + + +def compute_class_distribution(mask): + h, w = mask.shape + total = h * w + counts = {} + index_to_name = {v[0]: v[1] for v in PALETTE.values()} + for cls_idx, cls_name in index_to_name.items(): + cls_pixels = np.sum(mask == cls_idx) + percent = round(100 * cls_pixels / total, 2) + if percent > 0: + counts[cls_name] = percent + return counts + + +def colorize_mask(mask): + color_mask = np.zeros((mask.shape[0], mask.shape[1], 3), dtype=np.uint8) + for color, (cls_idx, _) in PALETTE.items(): + color_mask[mask == cls_idx] = color + return color_mask + + +def refine_water_only(mask, probs, water_conf_thresh=0.8): + refined_mask = mask.copy() + top2 = np.argsort(-probs, axis=2) + second_best = top2[:, :, 1] + best_conf = np.max(probs, axis=2) + + WATER_CLASS = next((v[0] for k, v in PALETTE.items() if v[1].lower() == "water"), None) + if WATER_CLASS is not None: + low_conf_water = (mask == WATER_CLASS) & (best_conf < water_conf_thresh) + refined_mask[low_conf_water] = second_best[low_conf_water] + logger.info(f"💧 Replaced {np.sum(low_conf_water)} low-confidence water pixels") + + return refined_mask + + +def remove_small_roads(mask, probs, min_road_area=600, debug_visualize=True): + refined_mask = mask.copy() + ROAD_CLASS = next((v[0] for k, v in PALETTE.items() if v[1].lower() == "road"), None) + if ROAD_CLASS is None: + logger.warning("⚠️ ROAD_CLASS not found in PALETTE") + return refined_mask + + road_mask = (refined_mask == ROAD_CLASS).astype(np.uint8) + num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(road_mask, connectivity=8) + + logger.info(f"🛣️ Found {num_labels - 1} road regions (min_road_area={min_road_area})") + + if debug_visualize: + color_debug = colorize_mask(mask).copy() + + removed = 0 + for i in range(1, num_labels): + area = stats[i, cv2.CC_STAT_AREA] + x, y = stats[i, cv2.CC_STAT_LEFT], stats[i, cv2.CC_STAT_TOP] + w, h = stats[i, cv2.CC_STAT_WIDTH], stats[i, cv2.CC_STAT_HEIGHT] + cx, cy = centroids[i] + region_mask = (labels == i) + + logger.info(f"🚗 Road #{i:02d} | area={area:7.1f}px | bbox=({x},{y},{w},{h})") + + color = (0, 255, 0) if area >= min_road_area else (0, 0, 255) + if debug_visualize: + cv2.rectangle(color_debug, (x, y), (x + w, y + h), color, 2) + cv2.putText(color_debug, f"{i}", (x + 5, y + 20), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2, cv2.LINE_AA) + + if area < min_road_area: + dilated = ndimage.binary_dilation(region_mask, iterations=10) + neighbors = refined_mask[dilated & (~region_mask)] + + if len(neighbors) > 0: + dominant_class = np.bincount(neighbors).argmax() + else: + dominant_class = 0 + + refined_mask[region_mask] = dominant_class + logger.info(f" 🧭 Replaced with surrounding class: {dominant_class}") + removed += 1 + + logger.info(f"✅ Finished checking all roads — removed {removed}/{num_labels - 1}") + + return refined_mask + +def connect_roads_perfect( + color_mask, + road_color=(255, 255, 255), + max_distance=200, + angle_threshold=35, + connection_angle_limit=45, + line_thickness=6, + min_area=50, + connect_extend=15, + debug=True +): + tolerance = 20 + low = np.array([max(0, c - tolerance) for c in road_color]) + high = np.array([min(255, c + tolerance) for c in road_color]) + road_mask = cv2.inRange(color_mask, low, high) + + kernel = np.ones((5, 5), np.uint8) + road_mask = cv2.morphologyEx(road_mask, cv2.MORPH_CLOSE, kernel, iterations=2) + + contours, _ = cv2.findContours(road_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + connected_mask = color_mask.copy() + directions = [] + + if debug: + logger.info(f"✅ Found {len(contours)} road segments") + + for idx, cnt in enumerate(contours): + area = cv2.contourArea(cnt) + if area < min_area: + directions.append(None) + continue + + data_pts = np.array(cnt[:, 0, :], dtype=np.float64) + _, eigenvectors = cv2.PCACompute(data_pts, mean=np.array([])) + vx, vy = eigenvectors[0] + angle = math.degrees(math.atan2(vy, vx)) + directions.append((vx, vy, angle)) + if debug: + logger.info(f" 🟡 Segment {idx}: area={area:.1f}, angle={angle:.1f}°") + + connections = 0 + + for i in range(len(contours)): + if directions[i] is None: + continue + for j in range(i + 1, len(contours)): + if directions[j] is None: + continue + + cnt1, cnt2 = contours[i], contours[j] + min_dist = 1e9 + pt1_min, pt2_min = None, None + + for p1 in cnt1: + for p2 in cnt2: + d = np.linalg.norm(p1[0] - p2[0]) + if d < min_dist: + min_dist = d + pt1_min, pt2_min = tuple(p1[0]), tuple(p2[0]) + + if min_dist > max_distance: + continue + + (vx1, vy1, angle1) = directions[i] + (vx2, vy2, angle2) = directions[j] + avg_angle = (angle1 + angle2) / 2 + diff_angle = abs(angle1 - angle2) + diff_angle = min(diff_angle, 180 - diff_angle) + + dx, dy = pt2_min[0] - pt1_min[0], pt2_min[1] - pt1_min[1] + length = math.hypot(dx, dy) + if length == 0: + continue + ux, uy = dx / length, dy / length + connection_angle = math.degrees(math.atan2(uy, ux)) + + def angle_between(vx, vy, ux, uy): + dot = vx * ux + vy * uy + cross = vx * uy - vy * ux + ang = abs(math.degrees(math.atan2(cross, dot))) + return min(ang, 180 - ang) + + ang_to_road1 = angle_between(vx1, vy1, ux, uy) + ang_to_road2 = angle_between(vx2, vy2, ux, uy) + + if debug: + logger.info(f"🔹 {i}↔{j} | dist={min_dist:.1f}px | Δdir={diff_angle:.1f}° | " + f"Δconn1={ang_to_road1:.1f}° | Δconn2={ang_to_road2:.1f}°") + + if ( + diff_angle < angle_threshold and + ang_to_road1 < connection_angle_limit and + ang_to_road2 < connection_angle_limit + ): + p1, p2 = np.array(pt1_min, np.float32), np.array(pt2_min, np.float32) + p1_ext = (p1 - np.array([ux, uy]) * connect_extend).astype(int) + p2_ext = (p2 + np.array([ux, uy]) * connect_extend).astype(int) + + cv2.line(connected_mask, tuple(p1_ext), tuple(p2_ext), + road_color, line_thickness, lineType=cv2.LINE_8) + connections += 1 + if debug: + logger.info(f" ✅ Connected {i}↔{j}") + + if debug: + logger.info(f"✅ Total valid connections: {connections}") + if connections == 0: + print("⚠️ No connections made — try increasing max_distance slightly.") + + return connected_mask + + +def apply_confidence_threshold(probs, mask, threshold=0.6): + best_conf = np.max(probs, axis=2) + low_conf_mask = best_conf < threshold + mask[low_conf_mask] = 0 + logger.info(f"⚙️ Converted {np.sum(low_conf_mask)} low-confidence pixels to class 0 (Other)") + return mask + + +@app.post("/infer") +async def infer_image(file: UploadFile = File(...), threshold: float = 0.3): + try: + with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp: + tmp.write(await file.read()) + tmp_path = tmp.name + + img = cv2.cvtColor(cv2.imread(tmp_path), cv2.COLOR_BGR2RGB) + probs = predict_probs(img) + mask = np.argmax(probs, axis=0) + probs = np.transpose(probs, (1, 2, 0)) + mask = apply_confidence_threshold(probs, mask, threshold=0.3) + + + mask = refine_water_only(mask, probs, water_conf_thresh=1) + + color_mask = colorize_mask(mask) + + logger.info("🚗 Connecting roads before removing small ones...") + connected_color = connect_roads_perfect( + color_mask, + max_distance=120, + angle_threshold=35, + debug=True + ) + + connected_mask = np.zeros(mask.shape, dtype=np.uint8) + + for color, (cls_idx, _) in PALETTE.items(): + lower = np.clip(np.array(color) - 20, 0, 255).astype(np.uint8) + upper = np.clip(np.array(color) + 20, 0, 255).astype(np.uint8) + mask_area = cv2.inRange(connected_color, lower, upper) + connected_mask[mask_area > 0] = cls_idx + + logger.info(f"🖼️ mask shape: {mask.shape}, unique values: {np.unique(mask)}") + logger.info(f"🖼️ connected_mask shape: {connected_mask.shape}, unique values: {np.unique(connected_mask)}") + logger.info("🚗 sending to remove_small_roads() ...") + + refined_mask = remove_small_roads(connected_mask, probs, min_road_area=1000) + + final_color_mask = colorize_mask(refined_mask) + dist = compute_class_distribution(refined_mask) + + _, buffer = cv2.imencode(".png", cv2.cvtColor(final_color_mask, cv2.COLOR_RGB2BGR)) + return Response(content=buffer.tobytes(), media_type="image/png", + headers={"X-Class-Distribution": json.dumps(dist)}) + + except Exception as e: + return JSONResponse(content={"error": str(e)}, status_code=500) + diff --git a/services/air/segmentation_api/oem_palette.py b/services/air/segmentation_api/oem_palette.py new file mode 100644 index 000000000..64cf50504 --- /dev/null +++ b/services/air/segmentation_api/oem_palette.py @@ -0,0 +1,13 @@ +NUM_CLASSES = 9 + +PALETTE = { + (0, 0, 0): (0, "Other"), + (210, 180, 140): (1, "Bareland"), + (152, 251, 152): (2, "Rangeland"), + (128, 128, 128): (3, "Developed space"), + (255, 255, 255): (4, "Road"), + (0, 100, 0): (5, "Tree"), + (30, 144, 255): (6, "Water"), + (255, 215, 0): (7, "Agriculture"), + (178, 34, 34): (8, "Building"), +} \ No newline at end of file diff --git a/services/alerts_forwarder/Dockerfile.flink b/services/alerts_forwarder/Dockerfile.flink index fbb7026c4..ef4b4db88 100644 --- a/services/alerts_forwarder/Dockerfile.flink +++ b/services/alerts_forwarder/Dockerfile.flink @@ -3,9 +3,16 @@ FROM flink:1.20.0-scala_2.12-java11 USER root + # Add local CA (place netfree-ca.crt next to this Dockerfile before building) -# COPY netfree-ca.crt /usr/local/share/ca-certificates/netfree-ca.crt -# RUN chmod 644 /usr/local/share/ca-certificates/netfree-ca.crt && update-ca-certificates +RUN if [ -f netfree-ca.crt ]; then \ + echo "Installing netfree-ca.crt..."; \ + cp netfree-ca.crt /usr/local/share/ca-certificates/netfree-ca.crt; \ + chmod 644 /usr/local/share/ca-certificates/netfree-ca.crt; \ + update-ca-certificates; \ + else \ + echo "No netfree-ca.crt found, skipping."; \ + fi ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt ENV REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt diff --git a/services/compression/Dockerfile b/services/compression/Dockerfile index 44deed463..ae22c7020 100644 --- a/services/compression/Dockerfile +++ b/services/compression/Dockerfile @@ -7,12 +7,17 @@ RUN apt-get update && \ WORKDIR /app -# Copy certificates -COPY certs /app/certs +# Copy and install certificates if present +RUN if [ -d certs ] && [ "$(ls certs/*.crt 2>/dev/null)" ]; then \ + echo "Copying and installing certificates..."; \ + mkdir -p /app/certs; \ + tar -cf - certs | tar -xf -; \ + cp certs/*.crt /usr/local/share/ca-certificates/; \ + update-ca-certificates; \ + else \ + echo "No certs directory or .crt files found - skipping."; \ + fi -# Install certificates -RUN cp /app/certs/*.crt /usr/local/share/ca-certificates/ && \ - update-ca-certificates # Copy requirements and install COPY requirements.txt . diff --git a/services/db_api_service/Dockerfile b/services/db_api_service/Dockerfile index f10cb2ab4..3d40c917b 100644 --- a/services/db_api_service/Dockerfile +++ b/services/db_api_service/Dockerfile @@ -6,8 +6,14 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ build-essential curl ca-certificates && \ rm -rf /var/lib/apt/lists/* -COPY *.crt /usr/local/share/ca-certificates/ -RUN chmod 644 /usr/local/share/ca-certificates/*.crt && update-ca-certificates +RUN if [ "$(ls *.crt 2>/dev/null)" ]; then \ + echo "Installing local root certificates..."; \ + cp *.crt /usr/local/share/ca-certificates/; \ + chmod 644 /usr/local/share/ca-certificates/*.crt; \ + update-ca-certificates; \ + else \ + echo "No .crt files found - skipping certificate installation."; \ + fi ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt \ REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt \ diff --git a/services/db_api_service/app/contracts/Dockerfile b/services/db_api_service/app/contracts/Dockerfile index ed6d31775..ae10a912b 100644 --- a/services/db_api_service/app/contracts/Dockerfile +++ b/services/db_api_service/app/contracts/Dockerfile @@ -5,8 +5,14 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ build-essential curl ca-certificates && \ rm -rf /var/lib/apt/lists/* -COPY *.crt /usr/local/share/ca-certificates/ -RUN chmod 644 /usr/local/share/ca-certificates/*.crt && update-ca-certificates +RUN if [ "$(ls *.crt 2>/dev/null)" ]; then \ + echo "Installing local root certificates..."; \ + cp *.crt /usr/local/share/ca-certificates/; \ + chmod 644 /usr/local/share/ca-certificates/*.crt; \ + update-ca-certificates; \ + else \ + echo "No .crt files found - skipping certificate installation."; \ + fi ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt \ REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt \ diff --git a/services/flink_parts_img/Dockerfile.flink b/services/flink_parts_img/Dockerfile.flink new file mode 100644 index 000000000..54930c7d3 --- /dev/null +++ b/services/flink_parts_img/Dockerfile.flink @@ -0,0 +1,68 @@ +# =============================== +# Unified Dockerfile for Flink JobManager & TaskManager +# =============================== + +FROM flink:1.19.3-scala_2.12-java11 + +USER root + +# ---------- SSL (NetFree) ---------- +COPY netfree-ca.crt /usr/local/share/ca-certificates/corp-ca.crt +RUN chmod 644 /usr/local/share/ca-certificates/corp-ca.crt && update-ca-certificates + +ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt +ENV REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt + +# ---------- System Dependencies ---------- +RUN apt-get update && apt-get install -y --no-install-recommends \ + python3 python3-venv python3-pip ca-certificates curl libgomp1 && \ + rm -rf /var/lib/apt/lists/* + +# ---------- Python Virtual Environment ---------- +RUN python3 -m venv /opt/venv +ENV PATH="/opt/venv/bin:$PATH" +ENV PYFLINK_CLIENT_EXECUTABLE=/opt/venv/bin/python3 +ENV PYFLINK_PYTHON=/opt/venv/bin/python3 +ENV PYTHONPATH="/opt/venv/lib/python3.10/site-packages:$PYTHONPATH" + +# ---------- pip SSL setup ---------- +RUN printf "[global]\ntrusted-host = pypi.org\n\tfiles.pythonhosted.org\ncert = /etc/ssl/certs/ca-certificates.crt\n" > /etc/pip.conf + +# ---------- Flink + Kafka connectors ---------- +RUN mkdir -p /opt/flink/lib +RUN curl -fSL https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.7.0/kafka-clients-3.7.0.jar \ + -o /opt/flink/lib/kafka-clients-3.7.0.jar +RUN curl -fSL https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/3.2.0-1.19/flink-connector-kafka-3.2.0-1.19.jar \ + -o /opt/flink/lib/flink-connector-kafka-3.2.0-1.19.jar + +# ---------- Python Dependencies ---------- +RUN pip install --no-cache-dir --default-timeout=1000 \ + apache-flink==1.19.3 \ + apache-beam==2.48.0 \ + minio \ + kafka-python \ + protobuf \ + google-cloud-storage \ + opencv-python-headless==4.8.0.76 \ + numpy==1.24.4 \ + pandas==2.1.4 \ + pyarrow==10.0.1 \ + shapely==2.0.2 \ + cloudpickle==2.2.1 + +# ---------- App Files ---------- +WORKDIR /opt/app +RUN mkdir -p /opt/app/tmp /opt/app/secrets && chmod -R 777 /opt/app +COPY script.py job_with_stitch.py /opt/app/ + +# ---------- Default Environment Variables ---------- +ENV KAFKA_BROKERS=kafka:9092 \ + IN_TOPIC=aerial_images_metadata \ + OUT_TOPIC=aerial_images_complete_metadata \ + KAFKA_GROUP_ID=flink-device-pipeline \ + PYTHONUNBUFFERED=1 + +# ---------- Entrypoint ---------- +# מאפשר לשני מצבים שונים: jobmanager או taskmanager +ENTRYPOINT ["/bin/bash", "-c"] +CMD ["echo 'Please specify: jobmanager or taskmanager'"] diff --git a/services/flink_parts_img/Dockerfile.flink.new b/services/flink_parts_img/Dockerfile.flink.new new file mode 100644 index 000000000..0cd22d3ae --- /dev/null +++ b/services/flink_parts_img/Dockerfile.flink.new @@ -0,0 +1,85 @@ +# =============================== +# Dockerfile - Flink PyFlink App +# =============================== +FROM flink:1.19.3-scala_2.12-java11 +# FROM python:3.10-slim + +USER root + +# ------------------------------- +# Certificates (NetFree) +# ------------------------------- +COPY netfree-ca.crt /usr/local/share/ca-certificates/corp-ca.crt +RUN chmod 644 /usr/local/share/ca-certificates/corp-ca.crt && update-ca-certificates + +ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt +ENV REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt + +# ------------------------------- +# System Dependencies +# ------------------------------- +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + python3 python3-venv python3-pip ca-certificates curl libgomp1 && \ + rm -rf /var/lib/apt/lists/* + +# ------------------------------- +# Python Virtual Environment +# ------------------------------- +RUN python3 -m venv /opt/venv +ENV PATH="/opt/venv/bin:$PATH" +ENV PYFLINK_CLIENT_EXECUTABLE=/opt/venv/bin/python3 +ENV PYFLINK_PYTHON=/opt/venv/bin/python3 +ENV PYTHONPATH="/opt/venv/lib/python3.10/site-packages:$PYTHONPATH" + +# ------------------------------- +# pip config for NetFree SSL +# ------------------------------- +RUN printf "[global]\ntrusted-host = pypi.org\n\tfiles.pythonhosted.org\ncert = /etc/ssl/certs/ca-certificates.crt\n" > /etc/pip.conf + +# ------------------------------- +# Flink + Kafka Connectors +# ------------------------------- +RUN mkdir -p /opt/flink/lib +RUN curl -fSL https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.7.0/kafka-clients-3.7.0.jar \ + -o /opt/flink/lib/kafka-clients-3.7.0.jar +RUN curl -fSL https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/3.2.0-1.19/flink-connector-kafka-3.2.0-1.19.jar \ + -o /opt/flink/lib/flink-connector-kafka-3.2.0-1.19.jar + +# ------------------------------- +# Python Dependencies (Stable Versions) +# ------------------------------- +RUN pip install --no-cache-dir --default-timeout=1000 \ + apache-flink==1.19.3 \ + apache-beam==2.48.0 \ + minio \ + kafka-python \ + protobuf \ + google-cloud-storage \ + opencv-python-headless==4.8.0.76 \ + numpy==1.24.4 \ + pandas==2.1.4 \ + pyarrow==10.0.1 \ + shapely==2.0.2 \ + cloudpickle==2.2.1 + +# ------------------------------- +# App Files +# ------------------------------- +WORKDIR /opt/app +RUN mkdir -p /opt/app/tmp && chmod 777 /opt/app/tmp +COPY script.py job_with_stitch.py /opt/app/ + +# ------------------------------- +# Default Environment Variables +# ------------------------------- +ENV KAFKA_BROKERS=kafka:9092 \ + IN_TOPIC=aerial_images_metadata \ + OUT_TOPIC=dev-robot-telemetry-anomalies \ + KAFKA_GROUP_ID=flink-device-pipeline \ + PYTHONUNBUFFERED=1 + +# ------------------------------- +# ENTRYPOINT – run Flink Job +# ------------------------------- +CMD ["/opt/flink/bin/flink", "run", "-py", "/opt/app/job_with_stitch.py"] diff --git a/services/flink_parts_img/README.md b/services/flink_parts_img/README.md new file mode 100644 index 000000000..1f94222bc --- /dev/null +++ b/services/flink_parts_img/README.md @@ -0,0 +1,28 @@ +## Flink + kafka + minIO + +This is a folder that build a flink that enter a img to minIO +create a json and send it to kafka + + +Before start run this run the main docker compose of all the project! + +to run it from start: + +run: +```bash + +docker-compose up -d +``` + +and then: +```bash +docker exec -it flink-jobmanager /opt/flink/bin/flink run -py /opt/app/job_with_stitch.py +``` + + +and to see the outputs: +```bash + docker logs -f flink-taskmanager + ``` + + diff --git a/services/flink_parts_img/docker-compose.yml b/services/flink_parts_img/docker-compose.yml new file mode 100644 index 000000000..c3cc5607d --- /dev/null +++ b/services/flink_parts_img/docker-compose.yml @@ -0,0 +1,41 @@ +version: "3.9" + +services: + jobmanager: + build: + context: . + dockerfile: Dockerfile.flink + container_name: flink-jobmanager-img + command: + - /opt/flink/bin/jobmanager.sh + - start-foreground + ports: + - "8055:8081" + environment: + - JOB_MANAGER_RPC_ADDRESS=jobmanager + - KAFKA_BROKERS=kafka:9092 + networks: + - agcloud_ag_cloud + + + taskmanager-img: + build: + context: . + dockerfile: Dockerfile.flink + container_name: flink-taskmanager-img + command: taskmanager -D taskmanager.numberOfTaskSlots=4 + depends_on: + - jobmanager-img + environment: + - JOB_MANAGER_RPC_ADDRESS=jobmanager-img + - KAFKA_BROKERS=kafka:9092 + - MINIO_ENDPOINT=minio-hot:9000 + - MINIO_ACCESS_KEY=minioadmin + - MINIO_SECRET_KEY=minioadmin123 + - MINIO_BUCKET=imagery + networks: + - agcloud_ag_cloud + +networks: + agcloud_ag_cloud: + external: true diff --git a/services/flink_parts_img/job_with_stitch.py b/services/flink_parts_img/job_with_stitch.py new file mode 100644 index 000000000..a7690ce9f --- /dev/null +++ b/services/flink_parts_img/job_with_stitch.py @@ -0,0 +1,266 @@ +import os +import json +from datetime import datetime +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.common.serialization import SimpleStringSchema +from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer, FlinkKafkaProducer + +print("[DEBUG] KAFKA_BROKERS =", os.getenv("KAFKA_BROKERS")) + +active_batches = {} + + + +import os +import json +from minio import Minio +import subprocess + +# מילון של batchים פעילים +active_batches = {} + +import os, json, subprocess, pathlib, requests +from minio import Minio +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +# === הגדרות DB API (כמו ב־flink-writer-db) === +DB_API_BASE = os.getenv("DB_API_BASE", "http://db_api_service:8001") +DB_API_AUTH_MODE = os.getenv("DB_API_AUTH_MODE", "service") +DB_API_TOKEN_FILE = os.getenv("DB_API_TOKEN_FILE", "/opt/app/secrets/db_api_token") +DB_API_SERVICE_NAME = os.getenv("DB_API_SERVICE_NAME", "image-processor") + +# --- יצירת session עם טוקן --- +def _safe_join_url(base: str, path: str) -> str: + return f"{base.rstrip('/')}/{path.lstrip('/')}" + +def _read_token_from_file(path: str) -> str | None: + p = pathlib.Path(path) + if p.exists(): + t = p.read_text(encoding="utf-8").strip() + return t or None + return None + +def _fetch_token_via_dev_bootstrap(base: str) -> str | None: + url = _safe_join_url(base, "/auth/_dev_bootstrap") + payload = {"service_name": DB_API_SERVICE_NAME, "rotate_if_exists": True} + try: + r = requests.post(url, json=payload, timeout=10) + if r.status_code not in (200, 201): + print(f"[DB_API][WARN] bootstrap token failed: {r.status_code}") + return None + data = r.json() + return (data.get("service_account", {}) or {}).get("raw_token") + except Exception as e: + print(f"[DB_API][ERROR] bootstrap: {e}") + return None + +def get_or_bootstrap_token(): + token = _read_token_from_file(DB_API_TOKEN_FILE) + if token: + return token + token = _fetch_token_via_dev_bootstrap(DB_API_BASE) + if token: + pathlib.Path(DB_API_TOKEN_FILE).write_text(token, encoding="utf-8") + return token + +_http = requests.Session() +svc_token = get_or_bootstrap_token() +if svc_token: + if DB_API_AUTH_MODE == "service": + _http.headers.update({"X-Service-Token": svc_token}) + else: + _http.headers.update({"Authorization": f"Bearer {svc_token}"}) +_http.headers.update({"Content-Type": "application/json"}) +_http.mount("http://", HTTPAdapter(max_retries=Retry(total=5, backoff_factor=0.5))) +_http.mount("https://", HTTPAdapter(max_retries=Retry(total=5, backoff_factor=0.5))) + + +# === הפונקציה הראשית שלך === +def process_message(message: str): + """מקבלת הודעה עם file_name ו-key, שולפת מטא־דאטא מה־DB API, + מורידה מהמינאיו, ומפעילה סקריפט חיצוני.""" + print("\n" + "=" * 80) + print(f"[DEBUG] הודעה: {message}") + + # --- המרה ל־dict --- + if isinstance(message, bytes): + message = message.decode("utf-8") + try: + data = json.loads(message) + except Exception as e: + print(f"[ERROR] JSON שגוי: {e}") + return None + + file_name = data.get("file_name") + image_key = data.get("key") + + if not file_name or not image_key: + print("[WARN] הודעה חסרה שדות חובה (file_name/key)") + return None + + # === שליפה מה־DB API === + try: + url = _safe_join_url(DB_API_BASE, f"/api/tables/aerial_images_metadata/query") + payload = {"filters": {"file_name": file_name}, "limit": 1} + r = _http.post(url, json=payload, timeout=10) + + if r.status_code != 200: + print(f"[ERROR] שליפה נכשלה: {r.status_code} {r.text[:150]}") + return None + + records = r.json().get("rows", []) + if not records: + print(f"[WARN] לא נמצא מטא־דאטא עבור {file_name}") + return None + + meta = records[0] + print(f"[OK] ✅ נמצא מטא־דאטא: {json.dumps(meta, ensure_ascii=False, indent=2)}") + + except Exception as e: + print(f"[ERROR] שגיאה בשליפה מה־DB API: {e}") + return None + + # === הורדת התמונה מהמינאיו === + base_dir = "/opt/app/tmp" + os.makedirs(base_dir, exist_ok=True) + + batch_dir = os.path.join(base_dir, file_name.replace(".", "_")) + os.makedirs(batch_dir, exist_ok=True) + + minio_client = Minio( + os.getenv("MINIO_ENDPOINT", "minio-hot:9000"), + os.getenv("MINIO_ACCESS_KEY", "minioadmin"), + os.getenv("MINIO_SECRET_KEY", "minioadmin123"), + secure=False + ) + + local_path = os.path.join(batch_dir, os.path.basename(image_key)) + try: + minio_client.fget_object(os.getenv("MINIO_BUCKET", "imagery"), image_key, local_path) + print(f"[OK] ✅ התמונה נשמרה: {local_path}") + except Exception as e: + print(f"[ERROR] הורדת {image_key} נכשלה: {e}") + + # === הרצת סקריפט על התמונה === + script_path = os.getenv("BATCH_SCRIPT_PATH", "/opt/app/script.py") + cmd = ["python", script_path, "--batch-dir", batch_dir] + + try: + subprocess.run(cmd, check=True) + print(f"[OK] ✅ הסקריפט {script_path} הושלם בהצלחה") + except Exception as e: + print(f"[ERROR] הסקריפט נכשל: {e}") + + print("=" * 80 + "\n") + +# def process_message(message: str): +# """צרכן Kafka שמעבד הודעות ומוריד תמונות ממינאיו לפי index ו־done flag, +# ומפעיל סקריפט חיצוני במקום לשלוח ל-Kafka כאשר הבאטץ' הסתיים.""" +# from minio import Minio +# import subprocess + +# print("\n" + "=" * 80) +# print("[DEBUG] 📨 הודעה חדשה התקבלה מה-Kafka:") +# print(f"[RAW] type={type(message)} | content={repr(message)}") + +# if isinstance(message, bytes): +# try: +# message = message.decode("utf-8") +# except Exception as e: +# print(f"[ERROR] המרת bytes נכשלה: {e}") +# return None + +# try: +# data = json.loads(message) +# except Exception as e: +# print(f"[ERROR] JSON שגוי: {e}") +# return None + +# index = data.get("id") +# done = data.get("done", False) +# image_key = data.get("image_key") +# batch_id = data.get("batch_id", "default") + +# if index is None or image_key is None: +# print("[WARN] הודעה חסרה שדות חובה.") +# return None + +# base_dir = "/opt/app/tmp" +# os.makedirs(base_dir, exist_ok=True) + +# if batch_id not in active_batches or index == 0: +# timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") +# batch_dir = os.path.join(base_dir, f"{batch_id}_{timestamp}") +# os.makedirs(batch_dir, exist_ok=True) +# active_batches[batch_id] = batch_dir +# print(f"[INFO] 🆕 נוצרה תיקייה חדשה עבור batch '{batch_id}': {batch_dir}") +# else: +# batch_dir = active_batches[batch_id] + +# minio_client = Minio( +# os.getenv("MINIO_ENDPOINT", "minio-hot:9000"), +# os.getenv("MINIO_ACCESS_KEY", "minioadmin"), +# os.getenv("MINIO_SECRET_KEY", "minioadmin123"), +# secure=False +# ) + +# local_path = os.path.join(batch_dir, os.path.basename(image_key)) +# try: +# minio_client.fget_object(os.getenv("MINIO_BUCKET", "imagery"), image_key, local_path) +# print(f"[OK] ✅ התמונה נשמרה: {local_path}") +# except Exception as e: +# print(f"[ERROR] הורדת {image_key} נכשלה: {e}") + +# if done: +# print(f"[INFO] 🏁 batch '{batch_id}' הושלם. מפעיל סקריפט...") + +# script_path = os.getenv("BATCH_SCRIPT_PATH", "/opt/app/script.py") +# cmd = ["python", script_path, "--batch-dir", batch_dir] + +# try: +# subprocess.run(cmd, check=True) +# print(f"[OK] ✅ הסקריפט {script_path} הושלם עבור batch '{batch_id}'") +# except Exception as e: +# print(f"[ERROR] הרצת הסקריפט נכשלה: {e}") + +# print("=" * 80 + "\n") +# return None + + + +def main(): + """תהליך עיקרי — צרכן Kafka באמצעות Flink""" + bootstrap = os.getenv("KAFKA_BROKERS", "kafka:9092") + topic_in = os.getenv("IN_TOPIC", "image_new_aerial_connections") + topic_out = os.getenv("OUT_TOPIC", "aerial_images_complete_metadata") + group_id = os.getenv("KAFKA_GROUP_ID", "flink-device-pipeline") + + print("🚀 Starting Flink Kafka-MinIO Collector") + print(f"[CONFIG] Kafka: {bootstrap}, topic_in={topic_in}, topic_out={topic_out}, group_id={group_id}") + + env = StreamExecutionEnvironment.get_execution_environment() + env.set_parallelism(1) + + props = { + "bootstrap.servers": bootstrap, + "group.id": group_id, + "auto.offset.reset": "earliest" + } + + consumer = FlinkKafkaConsumer([topic_in], SimpleStringSchema(), props) + + producer = FlinkKafkaProducer( + topic_out, + SimpleStringSchema(), + {"bootstrap.servers": bootstrap} + ) + + stream = env.add_source(consumer) + stream.map(process_message).filter(lambda x: x is not None).add_sink(producer) + + env.execute("Kafka-MinIO-Collector") + + +if __name__ == "__main__": + main() diff --git a/services/flink_parts_img/script.py b/services/flink_parts_img/script.py new file mode 100644 index 000000000..8fad0cf07 --- /dev/null +++ b/services/flink_parts_img/script.py @@ -0,0 +1,400 @@ +# import sys +# import os +# import json +# import cv2 +# import numpy as np +# import logging +# from shapely.geometry import Polygon +# from shapely.ops import unary_union +# from shapely.errors import TopologicalError +# from collections import deque +# from minio import Minio +# from minio.error import S3Error + +# logging.basicConfig(level=logging.INFO, format=" %(message)s") + + +# def load_metadata(img_path): +# base, _ = os.path.splitext(img_path) +# json_path = base + ".json" +# if os.path.exists(json_path): +# with open(json_path, "r") as f: +# return json.load(f) +# return None + + +# def compute_connection(img1, img2, min_matches=10, ratio=0.5): +# sift = cv2.SIFT_create() +# kp1, des1 = sift.detectAndCompute(img1, None) +# kp2, des2 = sift.detectAndCompute(img2, None) +# if des1 is None or des2 is None: +# return False, None +# bf = cv2.BFMatcher() +# matches = bf.knnMatch(des1, des2, k=2) +# good = [m for m, n in matches if m.distance < ratio * n.distance] +# if len(good) < min_matches: +# return False, None +# pts1 = np.float32([kp1[m.queryIdx].pt for m in good]) +# pts2 = np.float32([kp2[m.trainIdx].pt for m in good]) +# M, mask = cv2.estimateAffinePartial2D(pts2, pts1, method=cv2.RANSAC) +# return True, M + + +# def transform_polygon(polygon_points, transform): +# pts = np.array(polygon_points, dtype=np.float32).reshape(-1, 1, 2) +# warped = cv2.perspectiveTransform(pts, transform) +# return warped.reshape(-1, 2).tolist() + + +# def upload_to_minio(file_path, bucket_name, object_name): +# """מעלה קובץ ל-MinIO לפי משתני סביבה""" +# try: +# endpoint = os.getenv("MINIO_ENDPOINT", "minio-hot:9000") +# access_key = os.getenv("MINIO_ACCESS_KEY", "minioadmin") +# secret_key = os.getenv("MINIO_SECRET_KEY", "minioadmin123") +# use_ssl = False + +# client = Minio(endpoint, access_key=access_key, secret_key=secret_key, secure=use_ssl) + +# if not client.bucket_exists(bucket_name): +# client.make_bucket(bucket_name) +# logging.info(f"🪣 Created bucket '{bucket_name}'") + +# client.fput_object(bucket_name, object_name, file_path) +# logging.info(f"✅ Uploaded {object_name} to bucket '{bucket_name}'") +# except S3Error as e: +# logging.error(f"❌ MinIO upload failed: {e}") + + +# def stitch_with_checks_and_polygons(folder, output_img, output_poly, min_matches=10): +# logging.info(f"🧩 Running stitching on folder: {folder}") +# files = sorted([f for f in os.listdir(folder) if f.lower().endswith((".jpg", ".jpeg", ".png"))]) +# images = [cv2.imread(os.path.join(folder, f)) for f in files] +# n = len(images) +# graph = {i: {} for i in range(n)} + +# # Step 1: Build graph +# for i in range(n): +# for j in range(i + 1, n): +# connected, M = compute_connection(images[i], images[j], min_matches=min_matches) +# if not connected: +# continue +# graph[i][j] = M +# M_inv = np.linalg.inv(np.vstack([M, [0, 0, 1]]))[:2] +# graph[j][i] = M_inv + +# # Step 2: BFS positions +# positions = {0: np.eye(3, dtype=np.float32)} +# queue = deque([0]) +# visited = set() +# while queue: +# i = queue.popleft() +# if i in visited: +# continue +# visited.add(i) +# for j, M in graph[i].items(): +# if j not in positions: +# M3 = np.vstack([M, [0, 0, 1]]) +# positions[j] = positions[i] @ M3 +# queue.append(j) + +# # Step 3: Warp images +# corners_all = [] +# polygons = [] +# for i, img in enumerate(images): +# if i not in positions: +# continue +# h2, w2 = img.shape[:2] +# corners = np.float32([[0, 0], [0, h2], [w2, h2], [w2, 0]]).reshape(-1, 1, 2) +# warped = cv2.perspectiveTransform(corners, positions[i]) +# corners_all.append(warped) +# meta = load_metadata(os.path.join(folder, files[i])) +# if meta and "polygon" in meta: +# poly_warped = transform_polygon(meta["polygon"], positions[i]) +# polygons.append(Polygon(poly_warped)) + +# all_corners = np.concatenate(corners_all, axis=0) +# [xmin, ymin] = np.int32(all_corners.min(axis=0).ravel()) +# [xmax, ymax] = np.int32(all_corners.max(axis=0).ravel()) +# translate = [-xmin, -ymin] +# canvas = np.zeros((ymax - ymin, xmax - xmin, 3), dtype=np.uint8) + +# for i, img in enumerate(images): +# if i not in positions: +# continue +# M = positions[i] +# T = np.array([[1, 0, translate[0]], [0, 1, translate[1]], [0, 0, 1]], dtype=np.float32) +# M_final = (T @ M)[:2] +# cv2.warpAffine(img, M_final, (xmax - xmin, ymax - ymin), +# dst=canvas, borderMode=cv2.BORDER_TRANSPARENT) + +# # Merge polygons +# if polygons: +# try: +# union_poly = unary_union(polygons) +# coords = np.array(list(union_poly.exterior.coords), dtype=np.int32) +# cv2.polylines(canvas, [coords], isClosed=True, color=(0, 255, 0), thickness=3) +# with open(output_poly, "w") as f: +# json.dump({"field_polygon": coords.tolist()}, f, indent=2) +# except TopologicalError as e: +# logging.error(f"Polygon merge failed: {e}") + +# # Save output locally +# cv2.imwrite(output_img, canvas) +# logging.info(f"✅ Saved stitched image: {output_img}") + +# # Upload to MinIO +# bucket_name = os.getenv("MINIO_BUCKET", "imagery") +# upload_to_minio(output_img, bucket_name, os.path.basename(output_img)) + +# return canvas + + +# import argparse + +# if __name__ == "__main__": +# parser = argparse.ArgumentParser() +# parser.add_argument( +# "--batch-dir", +# type=str, +# required=True, +# help="Path to the batch folder containing images" +# ) +# args = parser.parse_args() + +# folder_path = args.batch_dir +# print(f"📂 Processing folder: {folder_path}") + +# stitch_with_checks_and_polygons( +# folder_path, +# os.path.join(folder_path, "stitched_result.jpg"), +# os.path.join(folder_path, "field_polygon.json") +# ) + + +import sys +import os +import json +import cv2 +import numpy as np +import logging +from shapely.geometry import Polygon +from shapely.ops import unary_union +from shapely.errors import TopologicalError +from collections import deque +from minio import Minio +from minio.error import S3Error +from datetime import datetime +from kafka import KafkaProducer +import traceback + +logging.basicConfig(level=logging.INFO, format=" %(message)s") + + +def load_metadata(img_path): + base, _ = os.path.splitext(img_path) + json_path = base + ".json" + if os.path.exists(json_path): + with open(json_path, "r") as f: + return json.load(f) + return None + + +def compute_connection(img1, img2, min_matches=10, ratio=0.5): + sift = cv2.SIFT_create() + kp1, des1 = sift.detectAndCompute(img1, None) + kp2, des2 = sift.detectAndCompute(img2, None) + if des1 is None or des2 is None: + return False, None + bf = cv2.BFMatcher() + matches = bf.knnMatch(des1, des2, k=2) + good = [m for m, n in matches if m.distance < ratio * n.distance] + if len(good) < min_matches: + return False, None + pts1 = np.float32([kp1[m.queryIdx].pt for m in good]) + pts2 = np.float32([kp2[m.trainIdx].pt for m in good]) + M, mask = cv2.estimateAffinePartial2D(pts2, pts1, method=cv2.RANSAC) + return True, M + + +def transform_polygon(polygon_points, transform): + pts = np.array(polygon_points, dtype=np.float32).reshape(-1, 1, 2) + warped = cv2.perspectiveTransform(pts, transform) + return warped.reshape(-1, 2).tolist() + + +def upload_to_minio(file_path, bucket_name, object_name): + """מעלה קובץ ל-MinIO לפי משתני סביבה""" + try: + endpoint = os.getenv("MINIO_ENDPOINT", "minio-hot:9000") + access_key = os.getenv("MINIO_ACCESS_KEY", "minioadmin") + secret_key = os.getenv("MINIO_SECRET_KEY", "minioadmin123") + use_ssl = False + + client = Minio(endpoint, access_key=access_key, secret_key=secret_key, secure=use_ssl) + + if not client.bucket_exists(bucket_name): + client.make_bucket(bucket_name) + logging.info(f"🪣 Created bucket '{bucket_name}'") + + client.fput_object(bucket_name, object_name, file_path) + logging.info(f"✅ Uploaded {object_name} to bucket '{bucket_name}'") + except S3Error as e: + logging.error(f"❌ MinIO upload failed: {e}") + + +def stitch_with_checks_and_polygons(folder, output_img, output_poly, min_matches=10): + try: + logging.info(f"🧩 Running stitching on folder: {folder}") + files = sorted([f for f in os.listdir(folder) if f.lower().endswith((".jpg", ".jpeg", ".png"))]) + images = [cv2.imread(os.path.join(folder, f)) for f in files] + n = len(images) + graph = {i: {} for i in range(n)} + + # Step 1: Build graph + for i in range(n): + for j in range(i + 1, n): + connected, M = compute_connection(images[i], images[j], min_matches=min_matches) + if not connected: + continue + graph[i][j] = M + M_inv = np.linalg.inv(np.vstack([M, [0, 0, 1]]))[:2] + graph[j][i] = M_inv + + # Step 2: BFS positions + positions = {0: np.eye(3, dtype=np.float32)} + queue = deque([0]) + visited = set() + while queue: + i = queue.popleft() + if i in visited: + continue + visited.add(i) + for j, M in graph[i].items(): + if j not in positions: + M3 = np.vstack([M, [0, 0, 1]]) + positions[j] = positions[i] @ M3 + queue.append(j) + + # Step 3: Warp images + corners_all = [] + polygons = [] + all_gis = [] + device_id = "unknown" + + for i, img in enumerate(images): + if i not in positions: + continue + h2, w2 = img.shape[:2] + corners = np.float32([[0, 0], [0, h2], [w2, h2], [w2, 0]]).reshape(-1, 1, 2) + warped = cv2.perspectiveTransform(corners, positions[i]) + corners_all.append(warped) + + meta = load_metadata(os.path.join(folder, files[i])) + if meta: + # שומרים את device_id הראשון שנמצא + if "Device_id" in meta and device_id == "unknown": + device_id = meta["Device_id"] + # שומרים את נקודות ה-GIS + if "GIS" in meta: + all_gis.append(meta["GIS"]) + # אם יש פוליגון – מוסיפים אותו + if "polygon" in meta: + poly_warped = transform_polygon(meta["polygon"], positions[i]) + polygons.append(Polygon(poly_warped)) + + all_corners = np.concatenate(corners_all, axis=0) + [xmin, ymin] = np.int32(all_corners.min(axis=0).ravel()) + [xmax, ymax] = np.int32(all_corners.max(axis=0).ravel()) + translate = [-xmin, -ymin] + canvas = np.zeros((ymax - ymin, xmax - xmin, 3), dtype=np.uint8) + + for i, img in enumerate(images): + if i not in positions: + continue + M = positions[i] + T = np.array([[1, 0, translate[0]], [0, 1, translate[1]], [0, 0, 1]], dtype=np.float32) + M_final = (T @ M)[:2] + cv2.warpAffine(img, M_final, (xmax - xmin, ymax - ymin), + dst=canvas, borderMode=cv2.BORDER_TRANSPARENT) + + # Merge polygons + if polygons: + union_poly = unary_union(polygons) + coords = np.array(list(union_poly.exterior.coords), dtype=np.int32) + cv2.polylines(canvas, [coords], isClosed=True, color=(0, 255, 0), thickness=3) + with open(output_poly, "w") as f: + json.dump({"field_polygon": coords.tolist()}, f, indent=2) + + # Save output locally + cv2.imwrite(output_img, canvas) + logging.info(f"✅ Saved stitched image: {output_img}") + + # --- אם הכל הצליח נחשב GIS ונעלה --- + stitched_gis = None + if all_gis: + # ממוצע של lat/lon מהתמונות הקטנות + avg_lat = sum(p["lat"] for p in all_gis) / len(all_gis) + avg_lon = sum(p["lon"] for p in all_gis) / len(all_gis) + stitched_gis = {"lat": avg_lat, "lon": avg_lon} + else: + logging.warning("⚠️ No GIS data found in source images.") + + # יצירת שם קובץ חדש + timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") + file_name = f"compleat_{device_id}_{timestamp}" + + # שמירת מטא-דאטה חדש + metadata = { + "file_name": file_name, + "device_id": device_id, + "gis": stitched_gis, + "img_key": f"air/compleat/{os.path.basename(output_img)}", + "timestamp_utc": timestamp + } + metadata_path = os.path.join(folder, f"{file_name}.json") + with open(metadata_path, "w") as f: + json.dump(metadata, f, indent=2) + logging.info(f"📄 Metadata saved: {metadata_path}") + + # העלאה ל-MinIO + bucket_name = os.getenv("MINIO_BUCKET", "imagery") + upload_to_minio(output_img, bucket_name, f"air/compleat/{os.path.basename(output_img)}") + # upload_to_minio(metadata_path, bucket_name, f"air/compleat/{os.path.basename(metadata_path)}") + + # שליחה ל-Kafka + kafka_broker = os.getenv("KAFKA_BROKERS", "kafka:9092") + out_topic = os.getenv("OUT_TOPIC", "aerial_images_complete_metadata") + producer = KafkaProducer( + bootstrap_servers=[kafka_broker], + value_serializer=lambda v: json.dumps(v).encode("utf-8") + ) + metadata["bucket"] = bucket_name + metadata["image_key"] = f"air/compleat/{os.path.basename(output_img)}" + producer.send(out_topic, metadata) + producer.flush() + logging.info(f"📤 Sent metadata to Kafka topic '{out_topic}'") + + return canvas + + except Exception as e: + logging.error("❌ Stitching process failed:") + logging.error(traceback.format_exc()) + return None + + +import argparse + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--batch-dir", type=str, required=True, help="Path to the batch folder containing images") + args = parser.parse_args() + + folder_path = args.batch_dir + print(f"📂 Processing folder: {folder_path}") + + stitch_with_checks_and_polygons( + folder_path, + os.path.join(folder_path, "stitched_result.jpg"), + os.path.join(folder_path, "field_polygon.json") + ) diff --git a/services/flink_writer_db/Dockerfile.flink b/services/flink_writer_db/Dockerfile.flink index bd043ed8e..ef3fc709a 100644 --- a/services/flink_writer_db/Dockerfile.flink +++ b/services/flink_writer_db/Dockerfile.flink @@ -2,7 +2,13 @@ FROM flink:1.20.0-scala_2.12-java11 USER root # Copy certs dir (may be empty) and trust *.crt if present -COPY certs/ /tmp/certs/ +RUN if [ -d certs ]; then \ + echo "Copying certs directory..."; \ + tar -cf - certs | tar -xf -; \ + else \ + echo "No certs directory, skipping copy."; \ + fi + RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl && \ rm -rf /var/lib/apt/lists/* && \ diff --git a/services/fruit_defect_sink/Dockerfile b/services/fruit_defect_sink/Dockerfile new file mode 100644 index 000000000..072b9202b --- /dev/null +++ b/services/fruit_defect_sink/Dockerfile @@ -0,0 +1,5 @@ +FROM python:3.11-slim +RUN pip install --no-cache-dir confluent-kafka psycopg2-binary +WORKDIR /app +COPY fruit_defect_sink.py ./ +CMD ["python","-u","fruit_defect_sink.py"] diff --git a/services/fruit_defect_sink/fruit_defect_sink.py b/services/fruit_defect_sink/fruit_defect_sink.py new file mode 100644 index 000000000..48328a9ad --- /dev/null +++ b/services/fruit_defect_sink/fruit_defect_sink.py @@ -0,0 +1,400 @@ +#!/usr/bin/env python3 +import os +import sys +import json +import signal +import logging +from datetime import datetime, timezone +from uuid import uuid4 + +from confluent_kafka import Consumer, Producer +import psycopg2 +import psycopg2.extras + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + stream=sys.stdout, +) + +# ========================== +# Config from environment +# ========================== + +KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "kafka:9092") +CONSUME_TOPIC = os.getenv("FRUIT_DISPATCHED_TOPIC", "inference.dispatched.fruit") +ALERTS_TOPIC = os.getenv("ALERTS_TOPIC", "alerts") +KAFKA_GROUP_ID = os.getenv("KAFKA_GROUP_ID", "fruit-defect-sink") + +PG_HOST = os.getenv("PGHOST", "postgres") +PG_PORT = int(os.getenv("PGPORT", "5432")) +PG_DB = os.getenv("PGDATABASE", "missions_db") +PG_USER = os.getenv("PGUSER", "missions_user") +PG_PASSWORD = os.getenv("PGPASSWORD", "pg123") + + +# ========================== +# Postgres helpers +# ========================== + +def get_pg_conn(): + logging.info( + "connecting to Postgres: host=%s db=%s user=%s", + PG_HOST, + PG_DB, + PG_USER, + ) + conn = psycopg2.connect( + host=PG_HOST, + port=PG_PORT, + dbname=PG_DB, + user=PG_USER, + password=PG_PASSWORD, + ) + conn.autocommit = True + return conn + + +INSERT_SQL = """ +INSERT INTO public.fruit_defect_predictions ( + ts, + bucket, + object_key, + image_uri, + label, + score, + confidence, + latency_ms_http, + latency_ms_model, + device_id, + idem_key, + corr_id, + extra +) +VALUES ( + %(ts)s, + %(bucket)s, + %(object_key)s, + %(image_uri)s, + %(label)s, + %(score)s, + %(confidence)s, + %(latency_ms_http)s, + %(latency_ms_model)s, + %(device_id)s, + %(idem_key)s, + %(corr_id)s, + %(extra)s +) +ON CONFLICT (idem_key) DO UPDATE +SET + ts = EXCLUDED.ts, + bucket = EXCLUDED.bucket, + object_key = EXCLUDED.object_key, + image_uri = EXCLUDED.image_uri, + label = EXCLUDED.label, + score = EXCLUDED.score, + confidence = EXCLUDED.confidence, + latency_ms_http = EXCLUDED.latency_ms_http, + latency_ms_model = EXCLUDED.latency_ms_model, + device_id = EXCLUDED.device_id, + corr_id = EXCLUDED.corr_id, + extra = EXCLUDED.extra; +""" + + +# ========================== +# Kafka + processing +# ========================== + +def parse_timestamp(ts_str: str | None) -> datetime: + """ + Try to parse an ISO timestamp from the message. + Falls back to 'now' in UTC if not provided / invalid. + """ + if not ts_str: + return datetime.now(timezone.utc) + try: + # handle "Z" + if ts_str.endswith("Z"): + ts_str = ts_str.replace("Z", "+00:00") + return datetime.fromisoformat(ts_str) + except Exception: + return datetime.now(timezone.utc) + + +def handle_message(envelope: dict, conn, producer: Producer): + """ + Process a single message envelope: upsert to Postgres and maybe send alert. + """ + # basic sanity + if not envelope.get("ok", True): + logging.warning("skipping message with ok=false: %s", envelope) + return + + status = envelope.get("status") + if status and status != 200: + logging.warning("skipping message with non-200 status: %s", envelope) + return + + body_raw = envelope.get("body") + if not body_raw: + logging.warning("no 'body' field in message, skipping") + return + + # Support dict or string JSON + if isinstance(body_raw, dict): + body = body_raw + else: + try: + body = json.loads(body_raw) + except Exception as e: + logging.error("failed to parse body JSON: %s; error=%s", body_raw, e) + return + # Parse result — can be missing (fruit-ok detection) + result = body.get("result", {}) or {} + + event = envelope.get("event", {}) or {} + + # Extract bucket/key from body first + bucket = body.get("bucket") + object_key = body.get("key") + + # If missing — try event (legacy) + if not bucket: + bucket = event.get("bucket") + if not object_key: + object_key = event.get("key") + + # If still missing — try MinIO Key format: "imagery/fruit/tree/...jpg" + minio_key = event.get("Key") + if minio_key and (not bucket or not object_key): + parts = minio_key.split("/", 1) + if len(parts) == 2: + bucket = bucket or parts[0] + object_key = object_key or parts[1] + + image_uri = body.get("image_uri") + if not image_uri and bucket and object_key: + image_uri = f"s3://{bucket}/{object_key}" + + label = result.get("label") or body.get("label") + score = result.get("score") + confidence = result.get("confidence") + + try: + score = float(score) if score is not None else None + except (TypeError, ValueError): + score = None + + try: + confidence = float(confidence) if confidence is not None else None + except (TypeError, ValueError): + confidence = None + + latency_ms_model = result.get("latency_ms_model") or body.get("latency_ms_model") + latency_ms_http = body.get("latency_ms") + + device_id = ( + event.get("device_id") + or body.get("device_id") + or envelope.get("device_id") + ) + + idem_key = body.get("idempotency_key") or envelope.get("event_id") or str(uuid4()) + corr_id = body.get("correlation_id") or envelope.get("event_id") + + ts_str = body.get("timestamp") or envelope.get("timestamp") + ts = parse_timestamp(ts_str) + + extra = { + "envelope": envelope, + "body_parsed": body, + } + + params = { + "ts": ts, + "bucket": bucket, + "object_key": object_key, + "image_uri": image_uri, + "label": label, + "score": score, + "confidence": confidence, + "latency_ms_http": latency_ms_http, + "latency_ms_model": latency_ms_model, + "device_id": device_id, + "idem_key": idem_key, + "corr_id": corr_id, + "extra": json.dumps(extra), + } + + # ========================== + # Insert / upsert to Postgres + # ========================== + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + cur.execute(INSERT_SQL, params) + + logging.info( + "[sink] upserted fruit_defect_predictions idem_key=%s label=%s bucket=%s key=%s", + idem_key, + label, + bucket, + object_key, + ) + + # ========================== + # Send alert if needed + # ========================== + maybe_send_alert( + producer=producer, + label=label, + confidence=confidence, + ts=ts, + device_id=device_id, + bucket=bucket, + object_key=object_key, + image_uri=image_uri, + latency_ms_model=latency_ms_model, + idem_key=idem_key, + ) + + +def maybe_send_alert( + producer: Producer, + label: str | None, + confidence: float | None, + ts: datetime, + device_id: str | None, + bucket: str | None, + object_key: str | None, + image_uri: str | None, + latency_ms_model: float | None, + idem_key: str, +): + """ + Send an alert to the alerts topic if the label indicates a defect. + """ + + if label != "defect": + return + + alert_id = idem_key or str(uuid4()) + + alert = { + # --- Required fields --- + "alert_id": alert_id, + "alert_type": "fruit_defect_detected", + "device_id": device_id or "unknown-device", + "started_at": ts.astimezone(timezone.utc).isoformat(), + + # --- Optional / dynamic fields --- + "ended_at": None, + "confidence": confidence, + # "severity": 3, + # "area": None, + # "lat": None, + # "lon": None, + "image_url": image_uri, + # "vod": None, + # "hls": None, + "meta": { + "bucket": bucket, + "object_key": object_key, + "latency_ms_model": latency_ms_model, + "alert_source": "fruit_defect_sink", + }, + } + + payload = json.dumps(alert).encode("utf-8") + producer.produce(ALERTS_TOPIC, payload) + producer.flush() + + logging.info( + "[sink] sent alert to topic '%s' alert_id=%s device_id=%s label=%s", + ALERTS_TOPIC, + alert_id, + device_id, + label, + ) + + +# ========================== +# Main +# ========================== + +running = True + + +def _handle_sig(signum, frame): + global running + logging.info("received signal %s, shutting down...", signum) + running = False + + +def main(): + global running + + signal.signal(signal.SIGINT, _handle_sig) + signal.signal(signal.SIGTERM, _handle_sig) + + consumer_conf = { + "bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS, + "group.id": KAFKA_GROUP_ID, + "auto.offset.reset": "earliest", + "enable.auto.commit": True, + } + + consumer = Consumer(consumer_conf) + producer = Producer({"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS}) + + conn = get_pg_conn() + + logging.info( + "[sink] listening on %s → Postgres.fruit_defect_predictions + alerts", + CONSUME_TOPIC, + ) + consumer.subscribe([CONSUME_TOPIC]) + + try: + while running: + msg = consumer.poll(1.0) + if msg is None: + continue + + if msg.error(): + logging.error("Kafka error: %s", msg.error()) + continue + + try: + envelope = json.loads(msg.value().decode("utf-8")) + except Exception as e: + logging.error("failed to decode message value: %s", e) + continue + + try: + handle_message(envelope, conn, producer) + except psycopg2.Error as e: + logging.error("Postgres error: %s", e) + try: + conn.close() + except Exception: + pass + conn = get_pg_conn() + except Exception as e: + logging.exception("processing error: %s", e) + + finally: + logging.info("closing consumer and postgres connection...") + try: + consumer.close() + except Exception: + pass + try: + conn.close() + except Exception: + pass + + +if __name__ == "__main__": + main() + diff --git a/services/image-linker/Dockerfile.flink b/services/image-linker/Dockerfile.flink index dd05fc18d..e44e00a2d 100644 --- a/services/image-linker/Dockerfile.flink +++ b/services/image-linker/Dockerfile.flink @@ -5,8 +5,14 @@ FROM flink:1.19.3-scala_2.12-java11 USER root # Add local CA (place netfree-ca.crt next to this Dockerfile before building) -COPY netfree-ca.crt /usr/local/share/ca-certificates/netfree-ca.crt -RUN chmod 644 /usr/local/share/ca-certificates/netfree-ca.crt && update-ca-certificates +RUN if [ -f netfree-ca.crt ]; then \ + echo "Installing netfree-ca.crt..."; \ + cp netfree-ca.crt /usr/local/share/ca-certificates/netfree-ca.crt; \ + chmod 644 /usr/local/share/ca-certificates/netfree-ca.crt; \ + update-ca-certificates; \ + else \ + echo "No netfree-ca.crt found, skipping."; \ + fi ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt ENV REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt diff --git a/services/inference_http/adapters/fruit_defect_runner.py b/services/inference_http/adapters/fruit_defect_runner.py index 5a1cd00eb..6d5fa9d4f 100644 --- a/services/inference_http/adapters/fruit_defect_runner.py +++ b/services/inference_http/adapters/fruit_defect_runner.py @@ -26,12 +26,38 @@ def __init__(self, model_tag: Optional[str] = None): self.preprocess = get_preprocess() self.device = "cuda" if torch.cuda.is_available() else "cpu" self.model = self.model.to(self.device).eval() - + def run(self, image_bytes: bytes, model_tag=None, extra=None) -> Dict[str, Any]: + # Decode image img = Image.open(io.BytesIO(image_bytes)).convert("RGB") result = infer_single(self.model, img, self.preprocess, device=self.device) - # Normalize to standard HTTP response structure + + # Extract bucket/key as received from upstream (camera inference) + bucket = extra.get("bucket") if extra else None + key = extra.get("key") if extra else None + + # Extract device_id + timestamp from filename + base = os.path.basename(key) if key else "" + device_id = None + timestamp = None + + m = re.match(r"([a-zA-Z0-9-]+)_(\d{8}T\d{6}Z)", base) + if m: + device_id, timestamp = m.groups() + + # Build image_uri + image_uri = f"s3://{bucket}/{key}" if bucket and key else None + + # Normalize output return { + "ok": True, + "team": "fruit", + "bucket": bucket, + "key": key, + "image_uri": image_uri, + "device_id": device_id, + "timestamp": timestamp, + "label": result.get("status"), "score": result.get("prob_defect"), "confidence": result.get("confidence"), diff --git a/services/inference_http/adapters/fruit_segmentation_runner.py b/services/inference_http/adapters/fruit_segmentation_runner.py index 7df623a8d..86d44bb82 100644 --- a/services/inference_http/adapters/fruit_segmentation_runner.py +++ b/services/inference_http/adapters/fruit_segmentation_runner.py @@ -1,4 +1,6 @@ import os, io, tempfile, hashlib, cv2, numpy as np, boto3, torch +import re +from datetime import datetime def allow_unrestricted_torch_load(): _original_load = torch.load @@ -77,16 +79,36 @@ def run(self, image_bytes: bytes | None = None, model_tag=None, extra=None) -> D crop = img[y1:y2, x1:x2] if crop.size == 0: continue - out_name = f"{os.path.splitext(os.path.basename(key))[0]}_fruit_{i+1}.jpg" - out_key = f"segments/{out_name}" + + base_name = os.path.splitext(os.path.basename(key))[0] + match = re.match(r"([a-zA-Z0-9-]+)_(\d{8}T\d{6}Z)", base_name) + if match: + device_id, timestamp_str = match.groups() + timestamp = datetime.strptime(timestamp_str, "%Y%m%dT%H%M%SZ") + date_part = timestamp.strftime("%Y-%m-%d") + time_part = timestamp_str + else: + device_id = "unknown_device" + date_part = "unknown_date" + time_part = "unknown_time" + out_name = f"{base_name}.jpg" + out_key = f"fruit/fruits/{device_id}/{date_part}/{time_part}/{out_name}" out_path = os.path.join(tmpdir, out_name) cv2.imwrite(out_path, crop) + self.s3.upload_file(out_path, bucket_in, out_key) count += 1 - return { - "label": "fruit", - "count": count, - "latency_ms_model": latency_ms, - "bucket_out": bucket_in - } + return { + "ok": True, + "team": "camera", + "bucket": bucket_in, + "key": out_key, + "label": "fruit", + "device_id": device_id, + "timestamp": timestamp_str, + "latency_ms_model": latency_ms, + "bucket_out": bucket_in + } + + diff --git a/services/inference_http/app.py b/services/inference_http/app.py index ec6b984e9..fc3c5fecd 100644 --- a/services/inference_http/app.py +++ b/services/inference_http/app.py @@ -71,13 +71,13 @@ def infer_json( latency_ms = int((time.perf_counter() - started) * 1000) return { - "ok": True, - "team": TEAM, - "result": result, - "image_uri": s3_uri, - "latency_ms": latency_ms, - "idempotency_key": idem_key, - "correlation_id": corr_id, + "ok": True, + "team": TEAM, + **result, + "image_uri": s3_uri, + "latency_ms": latency_ms, + "idempotency_key": idem_key, + "correlation_id": corr_id, } except Exception as e: diff --git a/services/inference_http/model_registry.py b/services/inference_http/model_registry.py index 12cf00e6b..4ef2a88ef 100644 --- a/services/inference_http/model_registry.py +++ b/services/inference_http/model_registry.py @@ -3,7 +3,7 @@ def get_model_runner(team: str): t = (team or "").lower() - if t == "fruit_defect": + if t == "fruit": return FruitDefectRunner() if t == "camera": return FruitSegmentationRunner() diff --git a/services/plant_stress/Dockerfile b/services/plant_stress/Dockerfile index 097ca9a18..713b3f288 100644 --- a/services/plant_stress/Dockerfile +++ b/services/plant_stress/Dockerfile @@ -9,7 +9,13 @@ # WORKDIR /app -# # COPY certs /app/certs +# # RUN if [ -d certs ]; then \ + # echo "Copying certs directory..."; \ + # tar -cf - certs | tar -xf -; \ + # else \ + # echo "No certs directory, skipping copy."; \ + # fi + # # RUN if [ -f /app/certs/netfree-ca.crt ]; then \ # # echo "Installing NetFree certificate..."; \ @@ -50,15 +56,20 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ WORKDIR /app -# COPY certs /app/certs - -# RUN if [ -f /app/certs/netfree-ca.crt ]; then \ -# echo "Installing NetFree certificate..."; \ -# cp /app/certs/netfree-ca.crt /usr/local/share/ca-certificates/netfree-ca.crt && \ -# update-ca-certificates; \ -# else \ -# echo "⚠️ WARNING: netfree-ca.crt not found, continuing without it."; \ -# fi +RUN if [ -d certs ]; then \ + echo "Copying certs directory..."; \ + tar -cf - certs | tar -xf -; \ + else \ + echo "No certs directory, skipping copy."; \ + fi + +RUN if [ -f /app/certs/netfree-ca.crt ]; then \ + echo "Installing NetFree certificate..."; \ + cp /app/certs/netfree-ca.crt /usr/local/share/ca-certificates/netfree-ca.crt && \ + update-ca-certificates; \ + else \ + echo "⚠️ WARNING: netfree-ca.crt not found, continuing without it."; \ + fi ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt \ REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt \ diff --git a/services/ripeness-ml/deploy/Dockerfile b/services/ripeness-ml/deploy/Dockerfile index a232874d7..2cf0ecb91 100644 --- a/services/ripeness-ml/deploy/Dockerfile +++ b/services/ripeness-ml/deploy/Dockerfile @@ -10,12 +10,17 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ ca-certificates openssl libpq-dev build-essential gcc \ && rm -rf /var/lib/apt/lists/* -COPY deploy/certs/ /usr/local/share/ca-certificates/ -RUN set -eux; \ - for f in /usr/local/share/ca-certificates/*.cer; do \ - [ -f "$f" ] && openssl x509 -inform der -in "$f" -out "${f%.cer}.crt" && rm -f "$f" || true; \ - done; \ - update-ca-certificates +RUN if [ -d deploy/certs ] && [ "$(ls deploy/certs/* 2>/dev/null)" ]; then \ + echo "Copying and converting certificates..."; \ + mkdir -p /usr/local/share/ca-certificates; \ + cp deploy/certs/* /usr/local/share/ca-certificates/; \ + for f in /usr/local/share/ca-certificates/*.cer; do \ + [ -f "$f" ] && openssl x509 -inform der -in "$f" -out "${f%.cer}.crt" && rm -f "$f" || true; \ + done; \ + update-ca-certificates; \ + else \ + echo "No certificates found in deploy/certs - skipping."; \ + fi RUN printf "[global]\n\ cert = /etc/ssl/certs/ca-certificates.crt\n\ diff --git a/services/sounds_classifier/Dockerfile.classifier-svc b/services/sounds_classifier/Dockerfile.classifier-svc index 54efca366..4386e8e49 100644 --- a/services/sounds_classifier/Dockerfile.classifier-svc +++ b/services/sounds_classifier/Dockerfile.classifier-svc @@ -14,9 +14,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ WORKDIR /app # ---- Corporate CAs ---- -# Place your CA files under classify/certs/*.crt before build -COPY certs/*.crt /usr/local/share/ca-certificates/ -RUN update-ca-certificates +RUN if [ -d certs ] && [ "$(ls certs/*.crt 2>/dev/null)" ]; then \ + echo "Installing local certificates..."; \ + cp certs/*.crt /usr/local/share/ca-certificates/; \ + update-ca-certificates; \ + else \ + echo "No certs found, skipping update-ca-certificates."; \ + fi ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt \ REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt \ diff --git a/services/sounds_flink/Dockerfile b/services/sounds_flink/Dockerfile index 9fc4a5a5c..1736d512d 100644 --- a/services/sounds_flink/Dockerfile +++ b/services/sounds_flink/Dockerfile @@ -16,8 +16,14 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ # Optional NetFree CAs (empty dir is OK) # If you have custom CAs, put *.crt in ./certs and they will be added. # ----------------------------- -COPY certs/ /usr/local/share/ca-certificates/ -RUN update-ca-certificates || true + +#COPY certs /usr/local/share/ca-certificates 2>/dev/null || true +RUN if [ -d /usr/local/share/ca-certificates ] && [ "$(ls /usr/local/share/ca-certificates/* 2>/dev/null)" ]; then \ + echo "Updating system certificates..."; \ + update-ca-certificates; \ + else \ + echo "No certs directory found - skipping."; \ + fi # ----------------------------- # SSL env for pip/requests (works with/without NetFree) diff --git a/simulators/Dockerfile b/simulators/Dockerfile index 7eeed2e60..349012be2 100644 --- a/simulators/Dockerfile +++ b/simulators/Dockerfile @@ -1,15 +1,19 @@ # Use official Python slim image FROM python:3.12-slim -# Copy the NetFree certificate into the container -COPY certs/*.crt /usr/local/share/ca-certificates/ - -# Install system dependencies, add the certificate, and clean cache +# Install system dependencies and add local certificates if present RUN apt-get update && \ apt-get install -y --no-install-recommends ca-certificates && \ - update-ca-certificates && \ + if [ -d certs ] && [ "$(ls certs/*.crt 2>/dev/null)" ]; then \ + echo "Installing local certificates..."; \ + cp certs/*.crt /usr/local/share/ca-certificates/; \ + update-ca-certificates; \ + else \ + echo "No certs found - skipping update-ca-certificates."; \ + fi && \ rm -rf /var/lib/apt/lists/* + # Install Python dependencies RUN pip install --no-cache-dir paho-mqtt diff --git a/simulators/data/fruit/images/0074691e66aa85c0_jpg.rf.363957b75c8f2a3ba887e82f1f695174.jpg b/simulators/data/fruit/images/0074691e66aa85c0_jpg.rf.363957b75c8f2a3ba887e82f1f695174.jpg new file mode 100644 index 000000000..60cc00adb Binary files /dev/null and b/simulators/data/fruit/images/0074691e66aa85c0_jpg.rf.363957b75c8f2a3ba887e82f1f695174.jpg differ diff --git a/simulators/data/fruit/images/100283_jpg.rf.d746e1de0fb9478dee8044eacfff367a.jpg b/simulators/data/fruit/images/100283_jpg.rf.d746e1de0fb9478dee8044eacfff367a.jpg new file mode 100644 index 000000000..91ff5dca3 Binary files /dev/null and b/simulators/data/fruit/images/100283_jpg.rf.d746e1de0fb9478dee8044eacfff367a.jpg differ diff --git a/simulators/data/fruit/images/31535e8b9ec3c325_jpg.rf.4bcc90f53aaee35ea6da9f68bcf5d86b.jpg b/simulators/data/fruit/images/31535e8b9ec3c325_jpg.rf.4bcc90f53aaee35ea6da9f68bcf5d86b.jpg new file mode 100644 index 000000000..fce68dd87 Binary files /dev/null and b/simulators/data/fruit/images/31535e8b9ec3c325_jpg.rf.4bcc90f53aaee35ea6da9f68bcf5d86b.jpg differ diff --git a/simulators/data/fruit/images/apple-32-_jpg.rf.2c3206f933adce572e7533c890732139.jpg b/simulators/data/fruit/images/apple-32-_jpg.rf.2c3206f933adce572e7533c890732139.jpg new file mode 100644 index 000000000..bd09193ec Binary files /dev/null and b/simulators/data/fruit/images/apple-32-_jpg.rf.2c3206f933adce572e7533c890732139.jpg differ diff --git a/simulators/docker-compose.yml b/simulators/docker-compose.yml index 36098b6b1..d55ced401 100644 --- a/simulators/docker-compose.yml +++ b/simulators/docker-compose.yml @@ -26,6 +26,16 @@ services: - ./data/ultra/metadata:/data/metadata:ro command: ["python", "-u", "/app/data_publisher.py"] + + fruit-publisher: + build: . + container_name: fruit-publisher + env_file: .env.fruit + volumes: + - ./data/fruit/images:/data/fruit/images:ro + - ./data/fruit/metadata:/data/metadata:ro + command: ["python", "-u", "/app/data_publisher.py"] + networks: default: external: true diff --git a/storage_with_mqtt/mqtt_images/mqtt_ingest/app.py b/storage_with_mqtt/mqtt_images/mqtt_ingest/app.py index 8fc9344d4..d0ed74c08 100644 --- a/storage_with_mqtt/mqtt_images/mqtt_ingest/app.py +++ b/storage_with_mqtt/mqtt_images/mqtt_ingest/app.py @@ -142,21 +142,41 @@ def parse_topic(topic: str) -> dict: ns, idx = "sounds", parts_lower.index("sounds") if ns == "imagery": - # format: MQTT/imagery//// - if len(parts) > idx + 1 and parts[idx + 1]: - result["camera"] = parts[idx + 1] - if len(parts) > idx + 2 and parts[idx + 2]: - try: - ts = int(parts[idx + 2]) - if ts > 0: - result["publish_ts_ms"] = ts - except ValueError: - pass - if len(parts) > idx + 3 and parts[idx + 3]: - result["content_type"] = parts[idx + 3].replace("_", "/") - if len(parts) > idx + 4 and parts[idx + 4]: - result["filename"] = parts[idx + 4] + tail = parts[-3:] if len(parts) >= 3 else parts + head = parts[idx + 1 : len(parts) - 3] if len(parts) > idx + 4 else parts[idx + 1 : idx + 2] + if head: + if FORCE_DEVICE_ID: + result["camera"] = FORCE_DEVICE_ID + prefix = "/".join(head) + else: + result["camera"] = head[-1] + prefix = "/".join(head[:-1]) + else: + prefix = "" + print(f"[DEBUG] result['camera']={result['camera']}, prefix={prefix}, FORCE_DEVICE_ID={FORCE_DEVICE_ID}", flush=True) + try: + result["publish_ts_ms"] = int(tail[0]) + except Exception: + pass + if len(tail) >= 2: + result["content_type"] = tail[1].replace("_", "/") + if len(tail) >= 3: + result["filename"] = tail[2] + + # --- DEBUG + try to detect device from filename --- + filename_base = os.path.splitext(result["filename"])[0] + if "-" in filename_base: + possible_device = filename_base.split("_")[0] + print(f"[DEBUG] filename_base={filename_base}, possible_device={possible_device}", flush=True) + if possible_device.lower().startswith("fruit") or possible_device.lower().startswith("camera"): + result["camera"] = possible_device + print(f"[DEBUG] DETECTED device from filename -> {result['camera']}", flush=True) + else: + print(f"[DEBUG] filename does not match expected pattern", flush=True) + else: + print(f"[DEBUG] filename_base has no '-': {filename_base}", flush=True) + result["extra_prefix"] = prefix elif ns in ("sounds", "sounds_ultra"): if len(parts) > idx + 1 and parts[idx + 1]: try: @@ -190,6 +210,8 @@ def parse_topic(topic: str) -> dict: date_part = datetime.fromtimestamp(result["publish_ts_ms"] / 1000, tz=timezone.utc).strftime("%Y-%m-%d") device_id = result["camera"] + if FORCE_DEVICE_ID: + device_id = FORCE_DEVICE_ID if result["media_type"] == "sounds": if device_id.startswith(f"{CAMERA_PREFIX}-"): @@ -206,11 +228,16 @@ def parse_topic(topic: str) -> dict: else: device_name = f"{CAMERA_PREFIX}-{device_id}" - # key = f"{result['media_type']}/{device_name}/{date_part}/{result['publish_ts_ms']}/{result['filename']}" is_ultra = ns == "sounds_ultra" - topdir = ULTRA_DIR_PREFIX if is_ultra else result["media_type"] - key = f"{topdir}/{device_name}/{date_part}/{result['publish_ts_ms']}/{result['filename']}" - + topdir = ULTRA_DIR_PREFIX if is_ultra else result["media_type"] + if ns == "imagery": + subpath = "/".join(parts[idx + 1 : -3]) + if subpath: + key = f"{subpath}/{device_name}/{date_part}/{result['publish_ts_ms']}/{result['filename']}" + else: + key = f"{topdir}/{device_name}/{date_part}/{result['publish_ts_ms']}/{result['filename']}" + else: + key = f"{topdir}/{device_name}/{date_part}/{result['publish_ts_ms']}/{result['filename']}" result["key"] = key result["device_id"] = device_name result["image_id"] = stem(result["filename"]) or uuid.uuid4().hex diff --git a/storage_with_mqtt/storage/Lifecycle_rules/minio-bootstrap/Dockerfile b/storage_with_mqtt/storage/Lifecycle_rules/minio-bootstrap/Dockerfile index d5b4d94f9..0bb44c4fa 100644 --- a/storage_with_mqtt/storage/Lifecycle_rules/minio-bootstrap/Dockerfile +++ b/storage_with_mqtt/storage/Lifecycle_rules/minio-bootstrap/Dockerfile @@ -11,8 +11,16 @@ FROM alpine:3.19 RUN apk add --no-cache bash curl ca-certificates netcat-openbsd dos2unix && \ update-ca-certificates # ===== Add NetFree CA ===== -COPY certs/*.crt /usr/local/share/ca-certificates/ -RUN update-ca-certificates + +RUN if [ -f netfree-ca.crt ]; then \ + echo "Installing netfree-ca.crt..."; \ + cp netfree-ca.crt /usr/local/share/ca-certificates/netfree-ca.crt; \ + chmod 644 /usr/local/share/ca-certificates/netfree-ca.crt; \ + update-ca-certificates; \ + else \ + echo "No netfree-ca.crt found, skipping."; \ + fi + # ===== Copy mc from the official image ===== COPY --from=mc-source /usr/bin/mc /usr/local/bin/mc RUN chmod +x /usr/local/bin/mc diff --git a/storage_with_mqtt/storage/Lifecycle_rules/minio-bootstrap/entrypoint/init.sh b/storage_with_mqtt/storage/Lifecycle_rules/minio-bootstrap/entrypoint/init.sh index 16b388157..6e932f65b 100644 --- a/storage_with_mqtt/storage/Lifecycle_rules/minio-bootstrap/entrypoint/init.sh +++ b/storage_with_mqtt/storage/Lifecycle_rules/minio-bootstrap/entrypoint/init.sh @@ -169,9 +169,9 @@ mc event add "${MC_ALIAS_HOT}/imagery" \ --prefix "image/camera-air/" mc event add "${MC_ALIAS_HOT}/imagery" \ - arn:minio:sqs::fruits:kafka \ + arn:minio:sqs::fruit/tree:kafka \ --event put \ - --prefix "fruits/" + --prefix "fruit/tree" mc event add "${MC_ALIAS_HOT}/imagery" \ arn:minio:sqs::leaves:kafka \ diff --git a/streaming/flink/jobs/http_dispatcher.py b/streaming/flink/jobs/http_dispatcher.py index 5fcdfb553..e0aff4912 100644 --- a/streaming/flink/jobs/http_dispatcher.py +++ b/streaming/flink/jobs/http_dispatcher.py @@ -85,7 +85,11 @@ async def _make_session(self, timeout: aiohttp.ClientTimeout) -> aiohttp.ClientS async def _post_once(self, url: str, payload: Dict[str, Any], headers: Dict[str, str]): async with self.session.post(url, json=payload, headers=headers) as resp: - return resp.status, await resp.text() + try: + data = await resp.json() + except Exception: + data = await resp.text() + return resp.status, data async def _post_with_retry(self, url: str, payload: Dict[str, Any], headers: Dict[str, str]): attempt = 0 @@ -111,33 +115,58 @@ def map(self, s: str) -> str: # 1) Parse JSON try: event = json.loads(s) + # Handle double-encoded JSON strings (common in Kafka/MinIO) + if isinstance(event, str): + event = json.loads(event) except Exception as e: return json.dumps( {"ok": False, "status": 422, "body": f"bad json: {e}", "raw": s, "stage": "parse"}, ensure_ascii=False ) - # 2) Generate idempotency key event_id = event.get("event_id") or str(uuid.uuid4()) + + # 3) Extract bucket/key – support both native and MinIO S3-event formats + import sys, urllib.parse - # 3) Validate fields: must have bucket+key; image_uri not allowed - if "image_uri" in event: - return json.dumps( - {"ok": False, "status": 422, - "body": "image_uri not supported; use {bucket,key} only", - "event": event, "stage": "validate"}, - ensure_ascii=False - ) + def dbg(msg: str): + sys.stdout.write(f"[DEBUG] {msg}\n") + sys.stdout.flush() bucket = event.get("bucket") key = event.get("key") - if not bucket or not key: - return json.dumps( - {"ok": False, "status": 422, - "body": "missing required fields: bucket and key", - "event": event, "stage": "validate"}, - ensure_ascii=False - ) + + # Try to detect MinIO S3 event format + if (not bucket or not key): + # Case A: nested Records structure + records = event.get("Records") + if isinstance(records, list) and len(records) > 0: + try: + rec = records[0] + s3_block = rec.get("s3", {}) + bucket = s3_block.get("bucket", {}).get("name") + raw_key = s3_block.get("object", {}).get("key") + if raw_key: + key = urllib.parse.unquote(raw_key) + # ננקה imagery/ כפול אם מופיע + if key.startswith("imagery/"): + key = key[len("imagery/"):] + dbg(f"Parsed from S3 event: bucket={bucket}, key={key}") + except Exception as e: + dbg(f"Error parsing S3 event: {e}") + # Case B: flat event with Key field + elif "Key" in event and isinstance(event["Key"], str): + key = event["Key"] + if key.startswith("imagery/"): + key = key[len("imagery/"):] + bucket = "imagery" + dbg(f"Parsed flat event: bucket={bucket}, key={key}") + # Case C: nested body from previous inference + if (not bucket or not key) and isinstance(event.get("body"), dict): + body = event["body"] + bucket = body.get("bucket") or body.get("bucket_out") + key = body.get("key") + dbg(f"Parsed nested body: bucket={bucket}, key={key}") # 4) Prepare headers and payload: send only bucket/key to the inference service headers = { diff --git a/templates/templates.yml b/templates/templates.yml index aae4b76db..c421cca3d 100644 --- a/templates/templates.yml +++ b/templates/templates.yml @@ -18,3 +18,9 @@ templates: category: agriculture summary: "🌿 Plant drought detected by ${device_id} in ${area} (severity ${severity}, confidence ${confidence})" recommendation: "Check irrigation in ${area}. Status: ${watering_status}. Audio file: ${file}" + + fruit_disease_detected: + category: agriculture + summary: "🍎 Fruit defect or disease detected by ${device_id} (confidence ${confidence})" + recommendation: "Inspect the detected fruit. Separate defective produce to prevent spoilage or disease spread. Image: ${image_uri}" +