diff --git a/GUI/src/vast/desktop/Dockerfile b/GUI/src/vast/desktop/Dockerfile index 048ecc4ba..6281310bd 100644 --- a/GUI/src/vast/desktop/Dockerfile +++ b/GUI/src/vast/desktop/Dockerfile @@ -22,7 +22,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ libxcb-randr0 && rm -rf /var/lib/apt/lists/* # # ───────── optional CA certs ───────── -COPY certs /app/certs +#COPY certs /app/certs RUN if [ -d ./certs ] && [ "$(ls ./certs/*.crt 2>/dev/null)" ]; then \ echo "Configuring NetFree certificates..."; \ cp ./certs/*.crt /usr/local/share/ca-certificates/; \ diff --git a/GUI/src/vast/gateway/Dockerfile b/GUI/src/vast/gateway/Dockerfile index 27486d095..2b5972f1f 100644 --- a/GUI/src/vast/gateway/Dockerfile +++ b/GUI/src/vast/gateway/Dockerfile @@ -6,7 +6,7 @@ WORKDIR /app ARG USE_NETFREE=true RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl && rm -rf /var/lib/apt/lists/* -COPY certs /app/certs +#COPY certs /app/certs # System CA + add NetFree certs RUN if [ "$USE_NETFREE" = "true" ] && [ -d ./certs ] && [ "$(ls ./certs/*.crt 2>/dev/null)" ]; then \ echo "Configuring NetFree certificates..."; \ diff --git a/GUI/src/vast/runner/Dockerfile b/GUI/src/vast/runner/Dockerfile index 7b6c3330f..dfd5fc8fd 100644 --- a/GUI/src/vast/runner/Dockerfile +++ b/GUI/src/vast/runner/Dockerfile @@ -6,7 +6,7 @@ WORKDIR /app ARG USE_NETFREE=true RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl && rm -rf /var/lib/apt/lists/* -COPY certs /app/certs +#COPY certs /app/certs # System CA + add NetFree certs RUN if [ "$USE_NETFREE" = "true" ] && [ -d ./certs ] && [ "$(ls ./certs/*.crt 2>/dev/null)" ]; then \ echo "Configuring NetFree certificates..."; \ diff --git a/GUI/src/vast/services/Dockerfile b/GUI/src/vast/services/Dockerfile index 7c7c7d4f0..75fb5d636 100644 --- a/GUI/src/vast/services/Dockerfile +++ b/GUI/src/vast/services/Dockerfile @@ -3,7 +3,7 @@ ENV PYTHONDONTWRITEBYTECODE=1 PYTHONUNBUFFERED=1 WORKDIR /app # # System CA + NetFree RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl && rm -rf /var/lib/apt/lists/* -COPY certs/*.crt /usr/local/share/ca-certificates/ +#COPY certs/*.crt /usr/local/share/ca-certificates/ RUN update-ca-certificates || true ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt \ REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt \ diff --git a/RelDB/build_tables/schema.sql b/RelDB/build_tables/schema.sql index 87096af50..61a8d683b 100644 --- a/RelDB/build_tables/schema.sql +++ b/RelDB/build_tables/schema.sql @@ -212,6 +212,47 @@ CREATE TABLE IF NOT EXISTS inference_logs ( image_url TEXT ); +-- Ripeness predictions table +CREATE TABLE IF NOT EXISTS ripeness_predictions ( + id BIGSERIAL PRIMARY KEY, + inference_log_id BIGINT NOT NULL REFERENCES inference_logs(id) ON DELETE CASCADE, + ts TIMESTAMPTZ NOT NULL DEFAULT NOW(), + ripeness_label TEXT NOT NULL CHECK (ripeness_label IN ('ripe', 'unripe', 'overripe')), + ripeness_score DOUBLE PRECISION NOT NULL, + model_name TEXT NOT NULL, + run_id UUID NOT NULL, + device_id TEXT REFERENCES devices(device_id), + UNIQUE (inference_log_id) +); + +-- Create indexes for ripeness_predictions +CREATE INDEX IF NOT EXISTS ix_ripeness_inflog ON ripeness_predictions(inference_log_id); +CREATE INDEX IF NOT EXISTS ix_ripeness_ts ON ripeness_predictions(ts); +CREATE INDEX IF NOT EXISTS ix_ripeness_device ON ripeness_predictions(device_id); +CREATE INDEX IF NOT EXISTS ix_ripeness_run ON ripeness_predictions(run_id); + +-- Weekly ripeness rollups table +CREATE TABLE IF NOT EXISTS ripeness_weekly_rollups_ts ( + id BIGSERIAL PRIMARY KEY, + ts TIMESTAMPTZ NOT NULL DEFAULT NOW(), + window_start TIMESTAMPTZ NOT NULL, + window_end TIMESTAMPTZ NOT NULL, + fruit_type TEXT NOT NULL, + device_id TEXT REFERENCES devices(device_id), + run_id UUID NOT NULL, + cnt_total INTEGER NOT NULL, + cnt_ripe INTEGER NOT NULL, + cnt_unripe INTEGER NOT NULL, + cnt_overripe INTEGER NOT NULL, + pct_ripe DOUBLE PRECISION NOT NULL +); + +-- Create indexes for ripeness_weekly_rollups_ts +CREATE INDEX IF NOT EXISTS ix_rwrt_ts ON ripeness_weekly_rollups_ts(ts); +CREATE INDEX IF NOT EXISTS ix_rwrt_fruit_ts ON ripeness_weekly_rollups_ts(fruit_type, ts); +CREATE INDEX IF NOT EXISTS ix_rwrt_device ON ripeness_weekly_rollups_ts(device_id); +CREATE INDEX IF NOT EXISTS ix_rwrt_run ON ripeness_weekly_rollups_ts(run_id); + -- Sensor event logs table. CREATE TABLE IF NOT EXISTS event_logs_sensors( id bigserial PRIMARY KEY, @@ -252,8 +293,6 @@ CREATE TABLE IF NOT EXISTS public.sensor_anomalies ( inserted_at TIMESTAMPTZ NOT NULL DEFAULT now() ); - - CREATE TABLE IF NOT EXISTS public.sensor_zone_stats ( id BIGSERIAL PRIMARY KEY, zone VARCHAR(128) NOT NULL, @@ -372,6 +411,7 @@ CREATE INDEX IF NOT EXISTS ix_task_thresholds_updated_at ON task_thresholds (upd -- === Indexes for performance optimization === +>>>>>>> origin/main CREATE INDEX IF NOT EXISTS ix_sensor_anomalies_ts_brin ON public.sensor_anomalies USING BRIN (ts); diff --git a/docker-compose.yml b/docker-compose.yml index bbdf09714..f2d042204 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,9 +1,7 @@ - # ========================== # Docker Compose - AG Cloud # ========================== -version: "3.9" # -------------------------- # Networks # -------------------------- @@ -11,7 +9,6 @@ networks: ag_cloud: driver: bridge - # -------------------------- # Volumes # -------------------------- @@ -53,7 +50,7 @@ services: - wal_archive:/var/lib/postgresql/wal_archive - backups:/var/lib/postgresql/backups healthcheck: - test: ["CMD", "pg_isready", "-U", "missions_user", "-d", "missions_db"] + test: [ "CMD", "pg_isready", "-U", "missions_user", "-d", "missions_db" ] interval: 10s timeout: 5s retries: 5 @@ -76,31 +73,75 @@ services: - "9187:9187" networks: - ag_cloud + # ------------------------- + # Sound Metrics Service + # ------------------------- - plant_stress: - build: ./services/plant_stress + sound_metrics: + build: + context: ./services/sound_metrics + dockerfile: Dockerfile environment: - - INPUT_DIR=/data/inbox - - MODEL_DIR=/models - - POSTGRES_DSN=postgresql://missions_user:pg123@postgres:5432/missions_db - - PERIOD_DAYS=0 - - CONFIDENCE_THRESHOLD=0.6 - # - TF_ENABLE_ONEDNN_OPTS=0 # optional - volumes: - - "./services/plant_stress/models:/models:ro" - - "./services/plant_stress/samples:/data/inbox:ro" + - ADDR=0.0.0.0 + - PORT=8005 + - USE_UTC=false + - WINDOW_MIN=1 + - STABLE_SEC=1 + - PYTHONUNBUFFERED=1 + + - MINIO_ENDPOINT=minio-hot:9000 + - MINIO_ACCESS_KEY=minioadmin + - MINIO_SECRET_KEY=minioadmin123 + - MINIO_BUCKET=sound + - MINIO_PREFIX=sounds/ + command: [ "python", "-u", "src/metrics.py" ] + ports: + - "8005:8005" depends_on: - - postgres - command: ["python", "-u", "/app/app.py"] + - minio-hot networks: - ag_cloud + restart: unless-stopped + + # ------------------------- + # Plant Stress Daily Batch + # ------------------------- + + plant_stress_daily: + build: ./services/plant_stress + env_file: + - ./services/plant_stress/.env + restart: "no" + environment: + MODEL_DIR: /models + CONFIDENCE_THRESHOLD: "0.60" + TF_CPP_MIN_LOG_LEVEL: "2" + TIMEZONE: Asia/Jerusalem + POSTGRES_DSN: postgresql://missions_user:pg123@postgres:5432/missions_db + MINIO_ENDPOINT: minio-hot:9000 + MINIO_ACCESS_KEY: minioadmin + MINIO_SECRET_KEY: minioadmin123 + MINIO_BUCKET: sound + MINIO_PREFIX: plants/ + MINIO_SECURE: "false" + command: ["python","-u","/app/predict_minio_daily.py"] + volumes: + - "./services/plant_stress/models:/models:ro" + depends_on: + postgres: + condition: service_healthy + minio-hot: + condition: service_healthy + mc-bootstrap: + condition: service_started + networks: [ag_cloud] # ------------------------- # MQTT + Kafka + Connect + Init # ------------------------- kafka: - build: + build: context: ./mqtt_and_kafka/kafka dockerfile: dockerfile container_name: kafka @@ -125,15 +166,16 @@ services: networks: - ag_cloud healthcheck: - test: ["CMD-SHELL", "/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list >/dev/null 2>&1 || exit 1"] + test: [ "CMD-SHELL", "nc -z localhost 9092 || exit 1" ] interval: 10s timeout: 5s retries: 20 + mosquitto: image: eclipse-mosquitto:2.0 container_name: mosquitto - command: ["mosquitto", "-c", "/mqtt_and_kafka/mosquitto/config/mosquitto.conf"] + command: [ "mosquitto", "-c", "/mqtt_and_kafka/mosquitto/config/mosquitto.conf" ] ports: - "1883:1883" volumes: @@ -144,7 +186,7 @@ services: networks: - ag_cloud healthcheck: - test: ["CMD", "mosquitto_sub", "-h", "localhost", "-p", "1883", "-t", "$$SYS/#", "-C", "1", "-W", "15"] + test: [ "CMD", "mosquitto_sub", "-h", "localhost", "-p", "1883", "-t", "$$SYS/#", "-C", "1", "-W", "15" ] interval: 10s timeout: 5s retries: 12 @@ -178,7 +220,7 @@ services: networks: - ag_cloud healthcheck: - test: ["CMD", "curl", "-sf", "http://localhost:8083/connectors"] + test: [ "CMD", "curl", "-sf", "http://localhost:8083/connectors" ] interval: 10s timeout: 5s retries: 12 @@ -286,6 +328,7 @@ services: - prometheus networks: - ag_cloud + pushgateway: image: prom/pushgateway:v1.8.0 container_name: pushgateway @@ -294,7 +337,7 @@ services: networks: - ag_cloud restart: unless-stopped - + # -------------------------- # Desktop App # -------------------------- @@ -316,13 +359,12 @@ services: - notification_api - alerts-gateway volumes: - - ./GUI/src/vast:/app/src/vast - - ./templates:/app/templates:ro + - ./GUI/src/vast:/app/src/vast + - ./templates:/app/templates:ro networks: - ag_cloud restart: unless-stopped - # -------------------------- # Large Mosquitto # -------------------------- @@ -348,12 +390,46 @@ services: MINIO_PROMETHEUS_AUTH_TYPE: public MINIO_ROOT_USER: minioadmin MINIO_ROOT_PASSWORD: minioadmin123 + + # ===== IMAGE NOTIFIERS ===== + MINIO_NOTIFY_KAFKA_ENABLE_aerial: "on" + MINIO_NOTIFY_KAFKA_BROKERS_aerial: "kafka:9092" + MINIO_NOTIFY_KAFKA_TOPIC_aerial: "image.new.aerial" + + MINIO_NOTIFY_KAFKA_ENABLE_air: "on" + MINIO_NOTIFY_KAFKA_BROKERS_air: "kafka:9092" + MINIO_NOTIFY_KAFKA_TOPIC_air: "image.new.air" + + MINIO_NOTIFY_KAFKA_ENABLE_fruits: "on" + MINIO_NOTIFY_KAFKA_BROKERS_fruits: "kafka:9092" + MINIO_NOTIFY_KAFKA_TOPIC_fruits: "image.new.fruits" + + MINIO_NOTIFY_KAFKA_ENABLE_leaves: "on" + MINIO_NOTIFY_KAFKA_BROKERS_leaves: "kafka:9092" + MINIO_NOTIFY_KAFKA_TOPIC_leaves: "image.new.leaves" + + MINIO_NOTIFY_KAFKA_ENABLE_ground: "on" + MINIO_NOTIFY_KAFKA_BROKERS_ground: "kafka:9092" + MINIO_NOTIFY_KAFKA_TOPIC_ground: "image.new.ground" + + MINIO_NOTIFY_KAFKA_ENABLE_field: "on" + MINIO_NOTIFY_KAFKA_BROKERS_field: "kafka:9092" + MINIO_NOTIFY_KAFKA_TOPIC_field: "image.new.field" + + # ===== SOUND NOTIFIERS ===== + MINIO_NOTIFY_KAFKA_ENABLE_plants: "on" + MINIO_NOTIFY_KAFKA_BROKERS_plants: "kafka:9092" + MINIO_NOTIFY_KAFKA_TOPIC_plants: "sound.new.plants" + + MINIO_NOTIFY_KAFKA_ENABLE_sounds: "on" + MINIO_NOTIFY_KAFKA_BROKERS_sounds: "kafka:9092" + MINIO_NOTIFY_KAFKA_TOPIC_sounds: "sound.new.sounds" ports: - - "9001:9000" # HOT S3 - - "9002:9001" # HOT Console - networks: [ag_cloud] + - "9001:9000" # HOT S3 + - "9002:9001" # HOT Console + networks: [ ag_cloud ] healthcheck: - test: ["CMD", "curl", "-fsS", "http://localhost:9000/minio/health/ready"] + test: [ "CMD", "curl", "-fsS", "http://localhost:9000/minio/health/ready" ] interval: 3s timeout: 2s retries: 40 @@ -369,11 +445,11 @@ services: MINIO_ROOT_USER: minioadmin MINIO_ROOT_PASSWORD: minioadmin123 ports: - - "9101:9000" # COLD S3 - - "9102:9001" # COLD Console - networks: [ag_cloud] + - "9101:9000" # COLD S3 + - "9102:9001" # COLD Console + networks: [ ag_cloud ] healthcheck: - test: ["CMD", "curl", "-fsS", "http://localhost:9000/minio/health/ready"] + test: [ "CMD", "curl", "-fsS", "http://localhost:9000/minio/health/ready" ] interval: 3s timeout: 2s retries: 40 @@ -383,6 +459,7 @@ services: mc-bootstrap: build: context: ./storage_with_mqtt/storage/Lifecycle_rules/minio-bootstrap + container_name: mc-bootstrap volumes: - ./storage_with_mqtt/storage/combined_minio_setup/config:/config:ro - ./storage_with_mqtt/data/config:/config @@ -391,7 +468,8 @@ services: condition: service_healthy minio-cold: condition: service_healthy - command: ["/bin/bash","-lc","/entrypoint/init.sh; tail -f /dev/null"] + kafka: + condition: service_healthy environment: MINIO_ROOT_USER: minioadmin MINIO_ROOT_PASSWORD: minioadmin123 @@ -400,8 +478,8 @@ services: MC_ALIAS_HOT: hot MC_ALIAS_COLD: cold BUCKET_IMAGERY: imagery - BUCKET_TELEMETRY: telemetry - networks: [ag_cloud] + BUCKET_SOUND: sound + networks: [ ag_cloud ] restart: unless-stopped # -------------------------- @@ -416,7 +494,7 @@ services: MINIO_ROOT_USER: minioadmin MINIO_ROOT_PASSWORD: minioadmin123 BUCKET_IMAGERY: imagery - BUCKET_TELEMETRY: telemetry + BUCKET_SOUND: sound MQTT_BROKER: large-mosquitto MQTT_PORT: 1885 MQTT_TOPIC: MQTT/imagery/# @@ -451,12 +529,12 @@ services: MINIO_ENDPOINT: http://minio-hot:9000 MINIO_ACCESS_KEY: minioadmin MINIO_SECRET_KEY: minioadmin123 - S3_BUCKET: telemetry + S3_BUCKET: sound MQTT_BROKER: large-mosquitto MQTT_PORT: 1885 - MQTT_TOPIC: MQTT/sound/# - DEFAULT_PREFIX: MIC-01 - CAMERA_PREFIX: camera + MQTT_TOPIC: MQTT/sound/# + DEFAULT_PREFIX: MIC-01 + CAMERA_PREFIX: camera MICROPHONE_PREFIX: microphone DUMMY_DB: 0 DB_API_BASE: http://db_api_service:8001 @@ -561,7 +639,7 @@ services: networks: - ag_cloud healthcheck: - test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8088/health').read()"] + test: [ "CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8088/health').read()" ] interval: 45s timeout: 5s retries: 10 @@ -572,7 +650,7 @@ services: # -------------------------- - + contracts-gen: build: context: ./services/db_api_service @@ -590,7 +668,6 @@ services: - ag_cloud restart: "no" - db_api_service: build: context: ./services/db_api_service @@ -619,7 +696,6 @@ services: - ag_cloud restart: unless-stopped - notification_api: build: context: ./services/API-notifications/src @@ -632,8 +708,7 @@ services: depends_on: - postgres - - + # -------------------------- # Flink JobManager & TaskManager # -------------------------- @@ -641,12 +716,12 @@ services: build: context: ./streaming/flink dockerfile: Dockerfile.flink-py - image: agcloud-flink-py:1.18 + image: agcloud-flink-py:1.18 container_name: flink-jobmanager command: jobmanager ports: - "8081:8081" - networks: [ag_cloud] + networks: [ ag_cloud ] environment: - | FLINK_PROPERTIES= @@ -672,29 +747,29 @@ services: - ./streaming/flink/connectors/lz4-java-1.8.0.jar:/opt/flink/lib/lz4-java-1.8.0.jar:ro - ./streaming/flink/connectors/snappy-java-1.1.10.5.jar:/opt/flink/lib/snappy-java-1.1.10.5.jar:ro restart: unless-stopped - + audio_compression: - build: - context: ./services/compression - dockerfile: Dockerfile - container_name: audio_compression - environment: - - RAW_MAX_AGE_DAYS=30 - - COMPRESSION_CODEC=opus - - COMPRESSED_MAX_AGE_DAYS=90 - - CHECK_INTERVAL_SECONDS=3600 - - MINIO_ENDPOINT=minio-hot:9000 - - ACCESS_KEY=minioadmin - - SECRET_KEY=minioadmin123 - - BUCKET_NAME=imagery - depends_on: - minio-hot: - condition: service_healthy - mc-bootstrap: - condition: service_started - networks: - - ag_cloud - restart: unless-stopped + build: + context: ./services/compression + dockerfile: Dockerfile + container_name: audio_compression + environment: + - RAW_MAX_AGE_DAYS=30 + - COMPRESSION_CODEC=opus + - COMPRESSED_MAX_AGE_DAYS=90 + - CHECK_INTERVAL_SECONDS=3600 + - MINIO_ENDPOINT=minio-hot:9000 + - ACCESS_KEY=minioadmin + - SECRET_KEY=minioadmin123 + - BUCKET_NAME=imagery + depends_on: + minio-hot: + condition: service_healthy + mc-bootstrap: + condition: service_started + networks: + - ag_cloud + restart: unless-stopped flink_writer_db: build: @@ -718,7 +793,6 @@ services: - ag_cloud restart: unless-stopped - flink-taskmanager: image: agcloud-flink-py:1.18 container_name: flink-taskmanager @@ -726,7 +800,7 @@ services: depends_on: flink-jobmanager: condition: service_started - networks: [ag_cloud] + networks: [ ag_cloud ] environment: - | FLINK_PROPERTIES= @@ -742,7 +816,7 @@ services: fs.s3a.connection.ssl.enabled: false python.client.executable: /usr/bin/python3 python.executable: /usr/bin/python3 - - HTTP_INFER_URL=http://fruit-inference-http:8000/infer_json + - HTTP_INFER_URL=http://fruit-inference-http:8004/infer_json volumes: - ./streaming/flink/connectors/flink-json-1.18.1.jar:/opt/flink/lib/flink-json-1.18.1.jar:ro - ./streaming/flink/connectors/flink-sql-connector-kafka-3.2.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.2.0-1.18.jar:ro @@ -752,28 +826,49 @@ services: - ./streaming/flink/connectors/snappy-java-1.1.10.5.jar:/opt/flink/lib/snappy-java-1.1.10.5.jar:ro restart: unless-stopped - - # -------------------------- # Inference HTTP Service # -------------------------- fruit-inference-http: build: - context: ./services/inference_http + context: ./services/inference_http dockerfile: Dockerfile environment: - - TEAM=fruit - - WEIGHTS_PATH=/app/weights/fruit_cls_best.ts - - MINIO_ENDPOINT=minio-hot:9000 - - MINIO_ACCESS_KEY=minioadmin - - MINIO_SECRET_KEY=minioadmin123 - - MINIO_SECURE=0 + - TEAM=fruit + - WEIGHTS_PATH=/app/weights/fruit_cls_best.ts + - MINIO_ENDPOINT=minio-hot:9000 + - MINIO_ACCESS_KEY=minioadmin + - MINIO_SECRET_KEY=minioadmin123 + - MINIO_SECURE=0 volumes: - ./services/inference_http/weights:/app/weights:ro container_name: fruit-inference-http + networks: [ ag_cloud ] + ports: + - "8011:8004" + restart: unless-stopped + + + # -------------------------- + # Camera Inference HTTP + # -------------------------- + camera-inference-http: + build: + context: ./services/inference_http + dockerfile: Dockerfile + environment: + - TEAM=camera + - WEIGHTS_PATH=/app/weights/yolov8-fruits.pt + - MINIO_ENDPOINT=minio-hot:9000 + - MINIO_ACCESS_KEY=minioadmin + - MINIO_SECRET_KEY=minioadmin123 + - MINIO_SECURE=0 + volumes: + - ./services/inference_http/weights:/app/weights:ro + container_name: camera-inference-http networks: [ag_cloud] ports: - - "8011:8000" + - "8012:8004" restart: unless-stopped @@ -787,12 +882,12 @@ services: flink-jobmanager: { condition: service_started } flink-taskmanager: { condition: service_started } fruit-inference-http: { condition: service_started } - networks: [ag_cloud] + networks: [ ag_cloud ] environment: - KAFKA_BOOTSTRAP=kafka:9092 - INPUT_TOPIC=imagery.new.fruit - TEAM=fruit - - HTTP_URL=http://fruit-inference-http:8000/infer_json + - HTTP_URL=http://fruit-inference-http:8004/infer_json - DLQ_TOPIC=dlq.inference.http - GROUP_ID=http-dispatcher-fruit - PARALLELISM=2 @@ -805,31 +900,32 @@ services: - ./streaming/flink/connectors/kafka-clients-3.2.3.jar:/opt/flink/lib/kafka-clients-3.2.3.jar:ro - ./streaming/flink/connectors/lz4-java-1.8.0.jar:/opt/flink/lib/lz4-java-1.8.0.jar:ro - ./streaming/flink/connectors/snappy-java-1.1.10.5.jar:/opt/flink/lib/snappy-java-1.1.10.5.jar:ro - command: [ - "bash","-lc", - "set -e; - echo 'Waiting for JobManager to accept commands...'; - until /opt/flink/bin/flink list --jobmanager flink-jobmanager:8081 >/dev/null 2>&1; do - echo 'still waiting...'; sleep 3; - done; - echo 'JobManager is ready!'; - /opt/flink/bin/flink run \ - -Dpython.client.executable=/usr/bin/python3 \ - -Dpython.executable=/usr/bin/python3 \ - -Dpipeline.jars=file:///opt/flink/lib/flink-connector-kafka-3.2.0-1.18.jar,file:///opt/flink/lib/flink-sql-connector-kafka-3.2.0-1.18.jar,file:///opt/flink/lib/flink-json-1.18.1.jar \ - --jobmanager flink-jobmanager:8081 \ - --detached \ - --python /opt/flink/jobs/http_dispatcher.py \ - -- \ - --bootstrap kafka:9092 \ - --input-topic imagery.new.fruit \ - --team fruit \ - --http-url http://fruit-inference-http:8000/infer_json \ - --group-id http-dispatcher-fruit \ - --dlq-topic dlq.inference.http; - tail -f /dev/null" - ] + command: [ "bash", "-lc", "set -e; echo 'Waiting for JobManager to accept commands...'; until /opt/flink/bin/flink list --jobmanager flink-jobmanager:8081 >/dev/null 2>&1; do echo 'still waiting...'; sleep 3; done; echo 'JobManager is ready!'; /opt/flink/bin/flink run -Dpython.client.executable=/usr/bin/python3 -Dpython.executable=/usr/bin/python3 -Dpipeline.jars=file:///opt/flink/lib/flink-connector-kafka-3.2.0-1.18.jar,file:///opt/flink/lib/flink-sql-connector-kafka-3.2.0-1.18.jar,file:///opt/flink/lib/flink-json-1.18.1.jar --jobmanager flink-jobmanager:8081 --detached --python /opt/flink/jobs/http_dispatcher.py -- --bootstrap kafka:9092 --input-topic imagery.new.fruit --team fruit --http-url http://fruit-inference-http:8004/infer_json --group-id http-dispatcher-fruit --dlq-topic dlq.inference.http; tail -f /dev/null" ] + flink-dispatcher-camera: + image: agcloud-flink-py:1.18 + container_name: flink-dispatcher-camera + depends_on: + flink-jobmanager: { condition: service_started } + flink-taskmanager: { condition: service_started } + camera-inference-http: { condition: service_started } + networks: [ag_cloud] + environment: + - KAFKA_BOOTSTRAP=kafka:9092 + - TOPIC_OUT=imagery.new.fruit + - INPUT_TOPIC=imagery.new.camera + - TEAM=camera + - HTTP_URL=http://camera-inference-http:8004/infer_json + - DLQ_TOPIC=dlq.inference.http + - GROUP_ID=http-dispatcher-camera + - PARALLELISM=2 + - PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3 + volumes: + - ./streaming/flink/jobs:/opt/flink/jobs:ro + - ./streaming/flink/connectors:/opt/flink/lib/connectors:ro + command: [ "bash", "-lc", "set -e; echo 'Waiting for JobManager to accept commands...'; until +/opt/flink/bin/flink list --jobmanager flink-jobmanager:8081 >/dev/null 2>&1; do echo 'still waiting...'; sleep 3; done; echo 'JobManager is ready!'; /opt/flink/bin/flink run -Dpython.client.executable=/usr/bin/python3 -Dpython.executable=/usr/bin/python3 -Dpipeline.jars=file:///opt/flink/lib/connectors/flink-connector-kafka-3.2.0-1.18.jar,file:///opt/flink/lib/connectors/flink-sql-connector-kafka-3.2.0-1.18.jar,file:///opt/flink/lib/connectors/flink-json-1.18.1.jar --jobmanager flink-jobmanager:8081 --detached --python /opt/flink/jobs/http_dispatcher.py -- --bootstrap kafka:9092 --input-topic imagery.new.camera --team camera --http-url http://camera-inference-http:8004/infer_json --group-id http-dispatcher-camera --dlq-topic dlq.inference.http; tail -f /dev/null" ] + restart: always flink-alerts-job: build: @@ -838,18 +934,17 @@ services: container_name: alerts-forwarder depends_on: kafka: - condition: service_healthy + condition: service_healthy alertmanager_service: - condition: service_started + condition: service_started environment: - PYTHONPATH=/opt/app - KAFKA_BROKERS=kafka:9092 - ALERTMANAGER_SERVICE_URL=http://alertmanager_service:8090/alerts - command: ["python", "/opt/app/alerts_forwarder.py"] + command: [ "python", "/opt/app/alerts_forwarder.py" ] networks: - ag_cloud restart: unless-stopped - alertmanager: image: prom/alertmanager:v0.27.0 @@ -865,7 +960,7 @@ services: networks: - ag_cloud restart: always - + alertmanager_service: build: context: ./services/alertmanager_service/src @@ -873,7 +968,7 @@ services: container_name: alertmanager_service ports: - "8090:8090" - command: ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8090"] + command: [ "uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8090" ] volumes: - ./templates:/app/templates:ro environment: @@ -891,10 +986,55 @@ services: context: ./services/alertmanager_service/src dockerfile: Dockerfile container_name: alerts_gateway - command: ["uvicorn", "gateway:app", "--host", "0.0.0.0", "--port", "8000"] + command: [ "uvicorn", "gateway:app", "--host", "0.0.0.0", "--port", "8000" ] + ports: + - "8010:8000" # host:container + networks: + - ag_cloud + + image-linker-jobmanager: + build: + context: ./services/image-linker + dockerfile: Dockerfile.flink + container_name: image-linker-jobmanager + command: jobmanager ports: - - "8010:8000" # host:container + - "8084:8081" # ? unique external port (no overlap) + environment: + - JOB_MANAGER_RPC_ADDRESS=image-linker-jobmanager + - KAFKA_BROKERS=kafka:9092 + - CONFIG_PATH=/opt/app/config/topics.yaml networks: - ag_cloud - + image-linker-taskmanager: + build: + context: ./services/image-linker + dockerfile: Dockerfile.flink + container_name: image-linker-taskmanager + command: taskmanager + environment: + - JOB_MANAGER_RPC_ADDRESS=image-linker-jobmanager + - KAFKA_BROKERS=kafka:9092 + - CONFIG_PATH=/opt/app/config/topics.yaml + depends_on: + image-linker-jobmanager: + condition: service_started + networks: + - ag_cloud + + image-linker-submitter: + build: + context: ./services/image-linker + dockerfile: Dockerfile.flink + container_name: image-linker-submit + depends_on: + image-linker-jobmanager: + condition: service_started + command: > + bash -lc "sleep 10 && + flink run -m image-linker-jobmanager:8081 -py /opt/app/job_linker.py && + echo 'Image-Linker job submitted successfully' && + sleep 1" + networks: + - ag_cloud \ No newline at end of file diff --git a/mqtt_and_kafka/kafka/dockerfile b/mqtt_and_kafka/kafka/dockerfile index 928189796..8a4e366c9 100644 --- a/mqtt_and_kafka/kafka/dockerfile +++ b/mqtt_and_kafka/kafka/dockerfile @@ -7,9 +7,14 @@ USER root RUN install_packages kafkacat # Copy scripts to a writable location for user 1001 -COPY --chmod=755 kafka-files/create-topics.sh /opt/bitnami/create-topics.sh -COPY --chmod=755 kafka-files/smoke-test.sh /opt/bitnami/smoke-test.sh -COPY --chmod=755 kafka-files/app-start.sh /opt/bitnami/app-start.sh +COPY kafka-files/create-topics.sh /opt/bitnami/create-topics.sh +RUN chmod +x /opt/bitnami/create-topics.sh + +COPY kafka-files/smoke-test.sh /opt/bitnami/smoke-test.sh +RUN chmod +x /opt/bitnami/smoke-test.sh + +COPY kafka-files/app-start.sh /opt/bitnami/app-start.sh +RUN chmod +x /opt/bitnami/app-start.sh # Expose ports: internal 9092, external 9094 (will be mapped to host 29092) EXPOSE 9092 9094 diff --git a/services/API-notifications/src/Dockerfile b/services/API-notifications/src/Dockerfile index c17f4ae4c..ca454539f 100644 --- a/services/API-notifications/src/Dockerfile +++ b/services/API-notifications/src/Dockerfile @@ -4,14 +4,19 @@ WORKDIR /app COPY requirements.txt . -COPY certs /app/certs - -RUN apt-get update && \ - apt-get install -y ca-certificates && \ - cp /app/certs/*.crt /usr/local/share/ca-certificates/ && \ - update-ca-certificates && \ - apt-get clean && \ - rm -rf /var/lib/apt/lists/* +#COPY certs /app/certs + +ARG USE_NETFREE=true + +RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl && rm -rf /var/lib/apt/lists/* + +# System CA + add NetFree certs +RUN if [ "$USE_NETFREE" = "true" ] && [ -d ./certs ] && [ "$(ls ./certs/*.crt 2>/dev/null)" ]; then \ + echo "Configuring NetFree certificates..."; \ + cp ./certs/*.crt /usr/local/share/ca-certificates/; \ + update-ca-certificates; \ + fi + ENV REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt diff --git a/services/compression/Dockerfile b/services/compression/Dockerfile index 26bbd2595..bd2a5fe40 100644 --- a/services/compression/Dockerfile +++ b/services/compression/Dockerfile @@ -1,20 +1,20 @@ FROM python:3.12-slim -# Install ffmpeg, cron, and ca-certificates -RUN apt-get update && \ - apt-get install -y ffmpeg cron ca-certificates && \ - rm -rf /var/lib/apt/lists/* - WORKDIR /app +ARG USE_NETFREE=true + # Copy certificates to container (make sure the 'certs' folder contains the necessary .crt files) -COPY certs /app/certs +#COPY certs /app/certs + +RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl && rm -rf /var/lib/apt/lists/* -# Install necessary certificates and add custom ones -RUN cp /app/certs/*.crt /usr/local/share/ca-certificates/ && \ - update-ca-certificates && \ - apt-get clean && \ - rm -rf /var/lib/apt/lists/* +# System CA + add NetFree certs +RUN if [ "$USE_NETFREE" = "true" ] && [ -d ./certs ] && [ "$(ls ./certs/*.crt 2>/dev/null)" ]; then \ + echo "Configuring NetFree certificates..."; \ + cp ./certs/*.crt /usr/local/share/ca-certificates/; \ + update-ca-certificates; \ + fi # Copy requirements and install dependencies COPY requirements.txt . diff --git a/services/db_api_service/.env.example b/services/db_api_service/.env.example deleted file mode 100644 index 9b1daeed6..000000000 --- a/services/db_api_service/.env.example +++ /dev/null @@ -1,9 +0,0 @@ -DB_DSN=postgresql+psycopg://missions_user:pg123@host.docker.internal:5432/missions_db - -PORT=8080 - -CONTRACTS_DIR=app/contracts - -ALLOWED_TABLES=["event_logs_sensors","devices","image_new_aerial_connections"] - -STRICT_UNKNOWN_FIELDS=true diff --git a/services/db_api_service/Dockerfile b/services/db_api_service/Dockerfile index f10cb2ab4..b242379d0 100644 --- a/services/db_api_service/Dockerfile +++ b/services/db_api_service/Dockerfile @@ -2,12 +2,28 @@ FROM python:3.12-slim WORKDIR /app -RUN apt-get update && apt-get install -y --no-install-recommends \ - build-essential curl ca-certificates && \ +# Build argument +ARG USE_NETFREE=true + +# Install basic packages +RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl && \ rm -rf /var/lib/apt/lists/* -COPY *.crt /usr/local/share/ca-certificates/ -RUN chmod 644 /usr/local/share/ca-certificates/*.crt && update-ca-certificates + +# Install NetFree certificates +RUN if [ "$USE_NETFREE" = "true" ]; then \ + echo "Configuring NetFree certificates..."; \ + ls -la /app/; \ + if [ -f /app/netfree-ca.crt ]; then \ + echo "Found NetFree certificate, installing..."; \ + cp /app/netfree-ca.crt /usr/local/share/ca-certificates/netfree-ca.crt; \ + chmod 644 /usr/local/share/ca-certificates/netfree-ca.crt; \ + update-ca-certificates; \ + echo "Certificate installed successfully"; \ + else \ + echo "WARNING: No NetFree certificate found at /app/netfree-ca.crt, continuing without it."; \ + fi; \ + fi ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt \ REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt \ diff --git a/services/db_api_service/app/contracts/Dockerfile b/services/db_api_service/app/contracts/Dockerfile index ed6d31775..bfb5eb5e5 100644 --- a/services/db_api_service/app/contracts/Dockerfile +++ b/services/db_api_service/app/contracts/Dockerfile @@ -1,12 +1,28 @@ FROM python:3.12-slim WORKDIR /app -RUN apt-get update && apt-get install -y --no-install-recommends \ - build-essential curl ca-certificates && \ +# Build argument +ARG USE_NETFREE=true + +# Install basic packages +RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl && \ rm -rf /var/lib/apt/lists/* -COPY *.crt /usr/local/share/ca-certificates/ -RUN chmod 644 /usr/local/share/ca-certificates/*.crt && update-ca-certificates + +# Install NetFree certificates +RUN if [ "$USE_NETFREE" = "true" ]; then \ + echo "Configuring NetFree certificates..."; \ + ls -la /app/; \ + if [ -f /app/netfree-ca.crt ]; then \ + echo "Found NetFree certificate, installing..."; \ + cp /app/netfree-ca.crt /usr/local/share/ca-certificates/netfree-ca.crt; \ + chmod 644 /usr/local/share/ca-certificates/netfree-ca.crt; \ + update-ca-certificates; \ + echo "Certificate installed successfully"; \ + else \ + echo "WARNING: No NetFree certificate found at /app/netfree-ca.crt, continuing without it."; \ + fi; \ + fi ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt \ REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt \ diff --git a/services/db_api_service/app/router.py b/services/db_api_service/app/router.py index 1e2d2db27..fa02955ed 100644 --- a/services/db_api_service/app/router.py +++ b/services/db_api_service/app/router.py @@ -1,11 +1,12 @@ # app/router.py + from fastapi import APIRouter, Depends from app.auth import require_auth from app.tables.files.router import router as files_router from app.tables.generic.router import build_generic_router from app.tables.task_thresholds.router import router as task_thresholds_router - +from app.tables.ripeness_weekly_rollups_ts.router import router as ripeness_weekly_router def build_router(contract_store) -> APIRouter: @@ -18,6 +19,7 @@ def build_router(contract_store) -> APIRouter: api.include_router(files_router) api.include_router(task_thresholds_router) + api.include_router(ripeness_weekly_router) api.include_router(build_generic_router(contract_store)) return api diff --git a/services/db_api_service/app/tables/ripeness_weekly_rollups_ts/__init__.py b/services/db_api_service/app/tables/ripeness_weekly_rollups_ts/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/services/db_api_service/app/tables/ripeness_weekly_rollups_ts/repo.py b/services/db_api_service/app/tables/ripeness_weekly_rollups_ts/repo.py new file mode 100644 index 000000000..0f96921c0 --- /dev/null +++ b/services/db_api_service/app/tables/ripeness_weekly_rollups_ts/repo.py @@ -0,0 +1,40 @@ +from typing import Optional, Dict, Any, List +from sqlalchemy import text +from app.db import session_scope +from datetime import datetime + + +def list_rollups(from_ts: str | None = None, to_ts: str | None = None) -> List[Dict[str, Any]]: + q = """ + SELECT * FROM ripeness_weekly_rollups_ts + WHERE 1=1 + """ + params: Dict[str, Any] = {} + + if from_ts: + q += " AND ts >= :from_ts" + params["from_ts"] = parse_ts(from_ts) + if to_ts: + q += " AND ts <= :to_ts" + params["to_ts"] = parse_ts(to_ts) + + q += " ORDER BY ts DESC" + + with session_scope() as s: + rows = s.execute(text(q), params).mappings().all() + return [dict(r) for r in rows] + + +def get_rollup(id: int) -> Optional[Dict[str, Any]]: + """ + Retrieve a single rollup entry by ID. + """ + sql = text(""" + SELECT id, ts, window_start, window_end, fruit_type, device_id, + run_id, cnt_total, cnt_ripe, cnt_unripe, cnt_overripe, pct_ripe + FROM public.ripeness_weekly_rollups_ts + WHERE id = :id + """) + with session_scope() as s: + row = s.execute(sql, {"id": id}).mappings().first() + return dict(row) if row else None diff --git a/services/db_api_service/app/tables/ripeness_weekly_rollups_ts/router.py b/services/db_api_service/app/tables/ripeness_weekly_rollups_ts/router.py new file mode 100644 index 000000000..1c71bf1cb --- /dev/null +++ b/services/db_api_service/app/tables/ripeness_weekly_rollups_ts/router.py @@ -0,0 +1,32 @@ + +from typing import Optional, List +from fastapi import APIRouter, HTTPException, Query +from . import schemas, repo + +router = APIRouter(prefix="/ripeness_weekly_rollups_ts", tags=["ripeness_weekly_rollups_ts"]) + +@router.get("", response_model=List[schemas.RipenessWeeklyRollupRead]) +def list_rollups( + from_ts: Optional[str] = Query(None, description="Filter from timestamp (ISO8601)"), + to_ts: Optional[str] = Query(None, description="Filter to timestamp (ISO8601)"), +): + """ + Retrieve weekly ripeness rollups by time range. + """ + try: + rows = repo.list_rollups(from_ts=from_ts, to_ts=to_ts) + return rows + except Exception as e: + print(f"[ERROR][router] list_rollups failed: {e}") + raise HTTPException(status_code=400, detail=str(e)) + + +@router.get("/{id}", response_model=RipenessWeeklyRollupOut) +def get_rollup(id: int): + """ + Retrieve a specific rollup entry by ID. + """ + row = repo.get_rollup(id) + if not row: + raise HTTPException(status_code=404, detail="Rollup not found") + return row diff --git a/services/db_api_service/app/tables/ripeness_weekly_rollups_ts/schemas.py b/services/db_api_service/app/tables/ripeness_weekly_rollups_ts/schemas.py new file mode 100644 index 000000000..0c9142cd5 --- /dev/null +++ b/services/db_api_service/app/tables/ripeness_weekly_rollups_ts/schemas.py @@ -0,0 +1,32 @@ +from datetime import datetime +from typing import Optional +from uuid import UUID +from pydantic import BaseModel, Field, conint, confloat + + +class RipenessWeeklyRollupBase(BaseModel): + """Base schema for weekly ripeness rollups table.""" + ts: Optional[datetime] = Field(None, description="Insertion timestamp") + window_start: datetime = Field(..., description="Start of weekly window") + window_end: datetime = Field(..., description="End of weekly window") + fruit_type: str = Field(..., description="Type of fruit analyzed") + device_id: Optional[str] = Field(None, description="Source device ID") + run_id: UUID = Field(..., description="Unique identifier for the run") # ? UUID instead of str + cnt_total: conint(ge=0) = Field(..., description="Total fruit count in window") + cnt_ripe: conint(ge=0) = Field(..., description="Ripe fruit count") + cnt_unripe: conint(ge=0) = Field(..., description="Unripe fruit count") + cnt_overripe: conint(ge=0) = Field(..., description="Overripe fruit count") + pct_ripe: confloat(ge=0, le=1) = Field(..., description="Ripe ratio (01)") + + +class RipenessWeeklyRollupCreate(RipenessWeeklyRollupBase): + """Schema used for POST inserts (single or batch).""" + pass + + +class RipenessWeeklyRollupRead(RipenessWeeklyRollupBase): + """Schema used for GET responses (includes DB ID).""" + id: int = Field(..., description="Primary key ID") + + class Config: + orm_mode = True diff --git a/services/flink_writer_db/Dockerfile.flink b/services/flink_writer_db/Dockerfile.flink index bd043ed8e..47016881a 100644 --- a/services/flink_writer_db/Dockerfile.flink +++ b/services/flink_writer_db/Dockerfile.flink @@ -2,7 +2,7 @@ FROM flink:1.20.0-scala_2.12-java11 USER root # Copy certs dir (may be empty) and trust *.crt if present -COPY certs/ /tmp/certs/ +#COPY certs/ /tmp/certs/ RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl && \ rm -rf /var/lib/apt/lists/* && \ diff --git a/services/fruit_ripeness_alert/Dockerfile b/services/fruit_ripeness_alert/Dockerfile new file mode 100644 index 000000000..9ce14035e --- /dev/null +++ b/services/fruit_ripeness_alert/Dockerfile @@ -0,0 +1,18 @@ +FROM python:3.11-slim + +# --- System setup --- +WORKDIR /app +ENV PYTHONUNBUFFERED=1 \ + PYTHONDONTWRITEBYTECODE=1 \ + TZ=UTC + +# --- Dependencies --- +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# --- App --- +COPY . . + + +# --- Default command --- +CMD ["python", "-u", "app.py"] diff --git a/services/fruit_ripeness_alert/app.py b/services/fruit_ripeness_alert/app.py new file mode 100644 index 000000000..360c727e4 --- /dev/null +++ b/services/fruit_ripeness_alert/app.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 +import os, json, uuid, requests +from datetime import datetime, timedelta, timezone +from kafka import KafkaProducer +from token_bootstrap import get_service_token +from datetime import datetime, timezone + +# === Environment === +DB_API_BASE = os.getenv("DB_API_BASE", "http://db_api_service:8001") +DB_API_TOKEN_FILE = os.getenv("DB_API_TOKEN_FILE", "/app/secret/db_api_token") +KAFKA_BROKER = os.getenv("KAFKA_BROKER", "kafka:9092") +ALERT_TOPIC = os.getenv("ALERT_TOPIC", "alerts") +WINDOW_HOURS = int(os.getenv("WINDOW_HOURS", "168")) + + +def now_utc() -> datetime: + return datetime.now(timezone.utc) + +def iso(ts: datetime) -> str: + return ts.replace(tzinfo=timezone.utc).isoformat() + +def get_threshold(task_name="ripeness", headers=None): + """שולף את אחוז הסף מהטבלה task_thresholds לפי שם המשימה.""" + url = f"{DB_API_BASE}/api/tables/task_thresholds" + r = requests.get(url, headers=headers, timeout=15) + r.raise_for_status() + rows = r.json().get("rows", []) + if not rows: + print(f"[WARN] No thresholds found at all, using default 0.8") + return 0.8 + + match = next((row for row in rows if row.get("task") == task_name), None) + if not match: + print(f"[WARN] No threshold found for task={task_name}, using default 0.8") + return 0.8 + + threshold = float(match.get("threshold", 0.8)) + print(f"[INFO] Task '{task_name}' threshold: {threshold*100:.1f}%") + return threshold +from datetime import datetime, timezone + +def get_rollups(window_start, window_end, headers=None): + """ + שולפת את כל הרשומות מהטבלה ripeness_weekly_rollups_ts + ואז מסננת לפי טווח התאריכים (window_start → window_end) בפייתון. + """ + url = f"{DB_API_BASE}/api/tables/ripeness_weekly_rollups_ts" + print(f"[DEBUG] Fetching full table from {url}", flush=True) + + try: + # שולף את כל הנתונים (בלי פילטרים) + r = requests.get(url, headers=headers, timeout=60) + r.raise_for_status() + except requests.exceptions.HTTPError as e: + print(f"[ERROR] HTTP {r.status_code}: {r.text}", flush=True) + return [] + except Exception as e: + print(f"[ERROR] failed to fetch rollups: {e}", flush=True) + return [] + + data = r.json() + rows = data.get("rows", data) + + + def parse_ts(ts_str: str) -> datetime: + try: + return datetime.fromisoformat(ts_str.replace("Z", "+00:00")) + except Exception: + return datetime.min.replace(tzinfo=timezone.utc) + + filtered = [] + for row in rows: + ts = parse_ts(row.get("ts", "")) + if window_start <= ts <= window_end: + filtered.append(row) + + print(f"[INFO] Retrieved {len(filtered)} rollups after filtering (out of {len(rows)} total)") + return filtered + +def send_kafka_alert(producer, device_id, ratio, threshold): + alert = { + "alert_id": str(uuid.uuid4()), + "alert_type": "fruit_ripeness_high", + "device_id": device_id, + "started_at": iso(now_utc()), + "confidence": float(ratio), + "severity": 3, + "threshold": threshold, + "description": f"{ratio*100:.1f}% ripe/overripe fruits", # <── וגם את זה + } + + producer.send(ALERT_TOPIC, json.dumps(alert).encode("utf-8")) + producer.flush() + print(f"[ALERT] sent for {device_id}: {ratio*100:.1f}%") + +def main(): + token = get_service_token() + headers = {"Content-Type": "application/json"} + if token: + headers["X-Service-Token"] = token + + window_end = now_utc() + window_start = window_end - timedelta(hours=WINDOW_HOURS) + print(f"[INFO] Checking rollups {window_start} → {window_end}") + + threshold = get_threshold("ripeness", headers) + rows = get_rollups(window_start, window_end, headers) + if not rows: + print("[INFO] No data found.") + return + + producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER]) + + # iterate each device + for row in rows: + device_id = row.get("device_id") + pct = row.get("pct_ripe", 0.0) + if pct >= threshold: + send_kafka_alert(producer, device_id, pct, threshold) + else: + print(f"[INFO] {device_id}: below threshold {pct:.2f} < {threshold:.2f}") + + producer.close() + print("[DONE] process complete.") + +if __name__ == "__main__": + main() diff --git a/services/fruit_ripeness_alert/docker-compose.yml b/services/fruit_ripeness_alert/docker-compose.yml new file mode 100644 index 000000000..7b38bd72c --- /dev/null +++ b/services/fruit_ripeness_alert/docker-compose.yml @@ -0,0 +1,23 @@ +services: + fruit_ripeness_alert: + build: . + container_name: fruit_ripeness_alert + environment: + - DB_API_BASE=http://db_api_service:8001 + - DB_API_SERVICE_NAME=fruit_ripeness_alert + - DB_ADMIN_USER=admin + - DB_ADMIN_PASS=admin123 + - DB_API_TOKEN_FILE=/app/secret/db_api_token + - KAFKA_BROKER=kafka:9092 + - ALERT_TOPIC=alerts + - WINDOW_HOURS=168 + volumes: + - .:/app + - ./secret:/app/secret + command: ["sleep", "infinity"] + networks: + - ag_cloud + +networks: + ag_cloud: + external: true diff --git a/services/fruit_ripeness_alert/requirements.txt b/services/fruit_ripeness_alert/requirements.txt new file mode 100644 index 000000000..ff4cc1b07 --- /dev/null +++ b/services/fruit_ripeness_alert/requirements.txt @@ -0,0 +1,2 @@ +requests +kafka-python diff --git a/services/fruit_ripeness_alert/secret/db_api_token b/services/fruit_ripeness_alert/secret/db_api_token new file mode 100644 index 000000000..348d369ac --- /dev/null +++ b/services/fruit_ripeness_alert/secret/db_api_token @@ -0,0 +1 @@ +e1bee6be-521c-42ba-989f-b36a319208c9 \ No newline at end of file diff --git a/services/fruit_ripeness_alert/token_bootstrap.py b/services/fruit_ripeness_alert/token_bootstrap.py new file mode 100644 index 000000000..2bcd5ab40 --- /dev/null +++ b/services/fruit_ripeness_alert/token_bootstrap.py @@ -0,0 +1,62 @@ +import os, pathlib, time, requests + +DB_API_BASE = os.getenv("DB_API_BASE", "").strip() +DB_API_TOKEN_FILE = os.getenv("DB_API_TOKEN_FILE", "/app/secret/db_api_token") +DB_API_SERVICE_NAME = os.getenv("DB_API_SERVICE_NAME", "fruit_ripeness_alert").strip() or "fruit_ripeness_alert" + +def _safe_join_url(base: str, path: str) -> str: + return f"{base.rstrip('/')}/{path.lstrip('/')}" + +def _read_token(path: str) -> str | None: + p = pathlib.Path(path) + if p.exists(): + t = p.read_text(encoding="utf-8").strip() + if t and "***" not in t: + return t + return None + +def _write_token(path: str, token: str) -> None: + p = pathlib.Path(path) + p.parent.mkdir(parents=True, exist_ok=True) + p.write_text(token, encoding="utf-8") + +def _try_dev_bootstrap(): + """Try to get token using /auth/_dev_bootstrap (new API).""" + url = _safe_join_url(DB_API_BASE, "/auth/_dev_bootstrap") + payload = {"service_name": DB_API_SERVICE_NAME, "rotate_if_exists": True} + try: + r = requests.post(url, json=payload, timeout=10) + if r.status_code in (200, 201): + data = r.json() + sa = data.get("service_account") or {} + token = sa.get("raw_token") or sa.get("token") + if token and "***" not in token: + print("[BOOTSTRAP] obtained token via /auth/_dev_bootstrap") + return token.strip() + print(f"[BOOTSTRAP][WARN] _dev_bootstrap returned {r.status_code}: {r.text[:100]}") + except Exception as e: + print(f"[BOOTSTRAP][ERROR] {e}") + return None + +def get_service_token() -> str | None: + """Get or create a service token automatically.""" + if not DB_API_BASE: + print("[BOOTSTRAP][WARN] DB_API_BASE not set") + return None + + # Try existing file + token = _read_token(DB_API_TOKEN_FILE) + if token: + print(f"[BOOTSTRAP] using existing token from {DB_API_TOKEN_FILE}") + return token + + # Try bootstrap (new unified API) + print(f"[BOOTSTRAP] fetching new service token from {DB_API_BASE}") + token = _try_dev_bootstrap() + if token: + _write_token(DB_API_TOKEN_FILE, token) + print(f"[BOOTSTRAP] wrote token to {DB_API_TOKEN_FILE}") + return token + + print("[BOOTSTRAP][ERROR] Could not obtain service token.") + return None diff --git a/services/inference_http/Dockerfile b/services/inference_http/Dockerfile index ba9055325..256d6cc49 100644 --- a/services/inference_http/Dockerfile +++ b/services/inference_http/Dockerfile @@ -1,4 +1,42 @@ + +# FROM python:3.11-slim + + +# pip +ENV PIP_NO_CACHE_DIR=1 \ + PIP_DISABLE_PIP_VERSION_CHECK=1 \ + PIP_DEFAULT_TIMEOUT=1200 + +# +WORKDIR /app + +# ------------------------------------------------------------ +# 1: (OpenCV, Torch, Numpy ) +# ------------------------------------------------------------ +RUN apt-get update && apt-get install -y --no-install-recommends \ + libglib2.0-0 \ + libglib2.0-dev \ + libsm6 \ + libxrender1 \ + libxext6 \ + libgl1 \ + libopenblas-dev \ + liblapack-dev \ + && rm -rf /var/lib/apt/lists/* + +# ------------------------------------------------------------ +# 2: pip NumPy Torch +# ------------------------------------------------------------ +RUN python -m pip install --upgrade pip setuptools wheel --only-binary=:all: && \ + pip install --no-cache-dir numpy==1.26.4 --only-binary=:all: && \ + pip install --no-cache-dir --extra-index-url https://download.pytorch.org/whl/cpu \ + torch==2.1.2 torchvision==0.16.2 --only-binary=:all: --upgrade-strategy only-if-needed + +# ------------------------------------------------------------ +# 3: +# ------------------------------------------------------------ + ENV PIP_NO_CACHE_DIR=1 \ PIP_DEFAULT_TIMEOUT=1200 \ PIP_DISABLE_PIP_VERSION_CHECK=1 @@ -6,17 +44,17 @@ ENV PIP_NO_CACHE_DIR=1 \ WORKDIR /app # Copy certs dir (may be empty) and trust *.crt if present -COPY certs/ /tmp/certs/ +# COPY certs/ /tmp/certs/ RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl && \ rm -rf /var/lib/apt/lists/* && \ if [ -d /tmp/certs ] && ls /tmp/certs/*.crt >/dev/null 2>&1; then \ - cp /tmp/certs/*.crt /usr/local/share/ca-certificates/ && \ - chmod 644 /usr/local/share/ca-certificates/*.crt && \ - update-ca-certificates; \ + cp /tmp/certs/*.crt /usr/local/share/ca-certificates/ && \ + chmod 644 /usr/local/share/ca-certificates/*.crt && \ + update-ca-certificates; \ else \ - echo "No extra CA certs found. Skipping CA update."; \ + echo "No extra CA certs found. Skipping CA update."; \ fi - + ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt \ REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt \ CURL_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt \ @@ -31,11 +69,34 @@ RUN pip install --no-cache-dir --extra-index-url https://download.pytorch.org/wh torch==2.1.2 torchvision==0.16.2 --only-binary=:all: --upgrade-strategy only-if-needed COPY requirements.txt /app/requirements.txt -RUN pip install --no-cache-dir -r /app/requirements.txt --only-binary=:all: --upgrade-strategy only-if-needed +RUN pip install kafka-python + + +# : +# boto3, ultralytics, opencv, pillow, requests +RUN pip install --no-cache-dir -r /app/requirements.txt && \ + pip install --no-cache-dir \ + opencv-python-headless \ + ultralytics==8.2.34 \ + boto3 \ + pillow \ + requests \ + "numpy<2" \ + && rm -rf /root/.cache/pip + +# ------------------------------------------------------------ +# 4: +# ------------------------------------------------------------ COPY app.py model_registry.py /app/ COPY adapters /app/adapters -COPY models /app/models +COPY models /app/models +COPY weights /app/weights + + +# ------------------------------------------------------------ +# 5: +# ------------------------------------------------------------ +EXPOSE 8004 -EXPOSE 8000 -CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"] +CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8004"] diff --git a/services/inference_http/adapters/fruit_segmentation_runner.py b/services/inference_http/adapters/fruit_segmentation_runner.py new file mode 100644 index 000000000..a5f79eac9 --- /dev/null +++ b/services/inference_http/adapters/fruit_segmentation_runner.py @@ -0,0 +1,110 @@ +import os, io, tempfile, hashlib, cv2, numpy as np, boto3, torch + +def allow_unrestricted_torch_load(): + _original_load = torch.load + def patched_load(*args, **kwargs): + kwargs["weights_only"] = False + return _original_load(*args, **kwargs) + torch.load = patched_load + +allow_unrestricted_torch_load() +# === End Patch === + +import time +from typing import Any, Dict, Optional +from datetime import datetime, timezone +from ultralytics import YOLO + +def sha256_hex(path: str) -> str: + h = hashlib.sha256() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(8192), b""): + h.update(chunk) + return h.hexdigest() + +class FruitSegmentationRunner: + def __init__(self, weights_path: Optional[str] = None, model_tag: Optional[str] = None): + self.weights_path = weights_path or os.getenv("WEIGHTS_PATH", "/app/weights/yolov8-fruits.pt") + self.model = YOLO(self.weights_path) + raw_endpoint = os.getenv("MINIO_ENDPOINT", "minio-hot:9000").strip() + if not raw_endpoint.startswith(("http://", "https://")): + endpoint = f"http://{raw_endpoint}" + else: + endpoint = raw_endpoint + self.s3 = boto3.client( + "s3", + endpoint_url=endpoint, + aws_access_key_id=os.getenv("MINIO_ACCESS_KEY", "minioadmin"), + aws_secret_access_key=os.getenv("MINIO_SECRET_KEY", "minioadmin123") + ) + self.kafka_bootstrap = os.getenv("KAFKA_BOOTSTRAP", "kafka:9092") + self.topic_out = os.getenv("TOPIC_OUT", "imagery.new.fruit") + + # Kafka producer for next-stage notification + self.producer = KafkaProducer( + bootstrap_servers=self.kafka_bootstrap, + value_serializer=lambda v: json.dumps(v).encode("utf-8") + ) + + + def run(self, image_bytes: bytes | None = None, model_tag=None, extra=None) -> Dict[str, Any]: + """Main inference entrypoint for HTTP""" + bucket_in = extra.get("bucket") if extra else "imagery" + key = extra.get("key") if extra else None + if not key: + return {"error": "missing key"} + + + if image_bytes: + img_array = np.frombuffer(image_bytes, np.uint8) + img = cv2.imdecode(img_array, cv2.IMREAD_COLOR) + if img is None: + return {"error": "failed to decode image from bytes"} + else: + with tempfile.TemporaryDirectory() as tmpdir: + local_path = os.path.join(tmpdir, os.path.basename(key)) + self.s3.download_file(bucket_in, key, local_path) + img = cv2.imread(local_path) + if img is None: + return {"error": "failed to read image"} + + t0 = time.time() + results = self.model.predict(img, conf=0.3, iou=0.45, verbose=False) + latency_ms = int((time.time() - t0) * 1000) + boxes = results[0].boxes + count = 0 + + if boxes: + with tempfile.TemporaryDirectory() as tmpdir: + for i, box in enumerate(boxes): + label = results[0].names[int(box.cls[0])] + if label.lower() not in [ + "apple", "banana", "orange", "pear", "peach", "plum", + "mango", "grape", "cherry", "pomegranate" + ]: + continue + x1, y1, x2, y2 = map(int, box.xyxy[0]) + crop = img[y1:y2, x1:x2] + if crop.size == 0: + continue + out_name = f"{os.path.splitext(os.path.basename(key))[0]}_fruit_{i+1}.jpg" + out_key = f"segments/{out_name}" + out_path = os.path.join(tmpdir, out_name) + cv2.imwrite(out_path, crop) + self.s3.upload_file(out_path, bucket_in, out_key) + count += 1 + # ? Send Kafka message to notify next stage + msg = { + "bucket": bucket_in, + "key": out_key, + "source": "fruit_segmentation", + } + self.producer.send(self.topic_out, msg) + self.producer.flush() # optional, ensures its sent immediately + + return { + "label": "fruit", + "count": count, + "latency_ms_model": latency_ms, + "bucket_out": bucket_in + } diff --git a/services/inference_http/app.py b/services/inference_http/app.py index 3a490493d..ec6b984e9 100644 --- a/services/inference_http/app.py +++ b/services/inference_http/app.py @@ -60,12 +60,14 @@ def infer_json( obj.close() obj.release_conn() + # Attempt to run the model with bytes input first # Attempt to run the model with bytes input first try: - result = runner.run(image_bytes) + result = runner.run(image_bytes, extra={"bucket": req.bucket, "key": req.key}) except TypeError: # If the function does not accept bytes, try with URI instead - result = runner.run(s3_uri) + result = runner.run(s3_uri, extra={"bucket": req.bucket, "key": req.key}) + latency_ms = int((time.perf_counter() - started) * 1000) return { diff --git a/services/inference_http/model_registry.py b/services/inference_http/model_registry.py index 3c5c0d6df..39891d8b6 100644 --- a/services/inference_http/model_registry.py +++ b/services/inference_http/model_registry.py @@ -1,15 +1,27 @@ -from typing import Any, Dict -from adapters.fruit_defect_runner import FruitDefectRunner +# from typing import Any, Dict +# from adapters.fruit_defect_runner import FruitDefectRunner + +# class FruitRunner: +# def __init__(self): +# self.impl = FruitDefectRunner() + +# def run(self, image_bytes: bytes, model_tag=None, extra=None) -> Dict[str, Any]: +# return self.impl.run(image_bytes, model_tag=model_tag, extra=extra) -class FruitRunner: - def __init__(self): - self.impl = FruitDefectRunner() +# def get_model_runner(team: str): +# t = (team or "").lower() +# if t == "fruit": +# return FruitRunner() +# raise ValueError(f"unknown TEAM {t}") - def run(self, image_bytes: bytes, model_tag=None, extra=None) -> Dict[str, Any]: - return self.impl.run(image_bytes, model_tag=model_tag, extra=extra) + +from adapters.fruit_defect_runner import FruitDefectRunner +from adapters.fruit_segmentation_runner import FruitSegmentationRunner def get_model_runner(team: str): t = (team or "").lower() - if t == "fruit": - return FruitRunner() + if t == "fruit_defect": + return FruitDefectRunner() + if t == "camera": + return FruitSegmentationRunner() raise ValueError(f"unknown TEAM {t}") diff --git a/services/inference_http/requirements.txt b/services/inference_http/requirements.txt index a7d58b826..69006df50 100644 --- a/services/inference_http/requirements.txt +++ b/services/inference_http/requirements.txt @@ -1,7 +1,13 @@ + fastapi -uvicorn[standard] +uvicorn +pydantic minio +requests +torch +numpy<2 +opencv-python-headless +ultralytics==8.2.34 +boto3 pillow numpy==1.26.4 -pydantic -numpy==1.26.4 \ No newline at end of file diff --git a/services/inference_http/weights/yolov8-fruits.pt b/services/inference_http/weights/yolov8-fruits.pt new file mode 100644 index 000000000..d61ef50d3 Binary files /dev/null and b/services/inference_http/weights/yolov8-fruits.pt differ diff --git a/services/plant_stress/Dockerfile b/services/plant_stress/Dockerfile index 69a7a6d3d..ab9f5e3d5 100644 --- a/services/plant_stress/Dockerfile +++ b/services/plant_stress/Dockerfile @@ -9,7 +9,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ WORKDIR /app -COPY certs /app/certs + #COPY certs /app/certs RUN if [ -f /app/certs/*.crt ]; then \ echo "Installing NetFree certificate..."; \ diff --git a/services/ripeness-ml/Dockerfile b/services/ripeness-ml/Dockerfile index a59d5c84d..2a76fd59e 100644 --- a/services/ripeness-ml/Dockerfile +++ b/services/ripeness-ml/Dockerfile @@ -10,7 +10,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ ca-certificates openssl libpq-dev build-essential gcc \ && rm -rf /var/lib/apt/lists/* -COPY certs/ /usr/local/share/ca-certificates/ +#COPY certs/ /usr/local/share/ca-certificates/ RUN set -eux; \ for f in /usr/local/share/ca-certificates/*.cer; do \ [ -f "$f" ] && openssl x509 -inform der -in "$f" -out "${f%.cer}.crt" && rm -f "$f" || true; \ @@ -18,7 +18,7 @@ RUN set -eux; \ update-ca-certificates RUN printf "[global]\n\ -cert = /etc/ssl/certs/ca-certificates.crt\n\ +#cert = /etc/ssl/certs/ca-certificates.crt\n\ index-url = https://pypi.org/simple\n\ trusted-host =\n\ pypi.org\n\ diff --git a/services/sound_metrics/Dockerfile b/services/sound_metrics/Dockerfile index 03d69f007..2445fc89f 100644 --- a/services/sound_metrics/Dockerfile +++ b/services/sound_metrics/Dockerfile @@ -7,7 +7,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ WORKDIR /app # --- Copy and trust all *.crt files (if any) --- -COPY certs/ /tmp/certs/ +#COPY certs/ /tmp/certs/ RUN if [ -d /tmp/certs ] && ls /tmp/certs/*.crt >/dev/null 2>&1; then \ cp /tmp/certs/*.crt /usr/local/share/ca-certificates/ && \ chmod 644 /usr/local/share/ca-certificates/*.crt && \ diff --git a/services/sounds_classifier/Dockerfile.classifier-svc b/services/sounds_classifier/Dockerfile.classifier-svc index 54efca366..6e096533d 100644 --- a/services/sounds_classifier/Dockerfile.classifier-svc +++ b/services/sounds_classifier/Dockerfile.classifier-svc @@ -15,8 +15,8 @@ WORKDIR /app # ---- Corporate CAs ---- # Place your CA files under classify/certs/*.crt before build -COPY certs/*.crt /usr/local/share/ca-certificates/ -RUN update-ca-certificates +#COPY certs/*.crt /usr/local/share/ca-certificates/ +#RUN update-ca-certificates ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt \ REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt \ diff --git a/templates/templates.yml b/templates/templates.yml index aa10b6fba..6bc8b034c 100644 --- a/templates/templates.yml +++ b/templates/templates.yml @@ -8,3 +8,9 @@ templates: category: security summary: "Person wearing a mask detected by ${device_id} at ${timestamp}" recommendation: "Verify the person’s authorization using the live feed." + + + fruit_ripeness_high: + category: agriculture + summary: "🍓 High fruit ripeness detected by ${device_id} (${confidence} ≥ ${threshold})" + recommendation: "Harvest or inspect the plantation area. Ripe/overripe fruits ratio: ${description}" \ No newline at end of file