diff --git a/RelDB/build_tables/schema.sql b/RelDB/build_tables/schema.sql index ade97f4a0..ffc834c45 100644 --- a/RelDB/build_tables/schema.sql +++ b/RelDB/build_tables/schema.sql @@ -267,9 +267,9 @@ CREATE TABLE IF NOT EXISTS public.sensor_zone_stats ( inserted_at TIMESTAMPTZ NOT NULL DEFAULT now() ); ---- Alerts table +--- Alerts_leaves table -CREATE TABLE IF NOT EXISTS public.alerts ( +CREATE TABLE IF NOT EXISTS public.alerts_leaves ( id bigserial PRIMARY KEY, entity_id text NOT NULL, rule text NOT NULL, @@ -282,12 +282,15 @@ CREATE TABLE IF NOT EXISTS public.alerts ( meta_json jsonb ); +CREATE INDEX IF NOT EXISTS ix_alerts_leaves_entity_rule ON public.alerts_leaves(entity_id, rule); +CREATE INDEX IF NOT EXISTS ix_alerts_leaves_status ON public.alerts_leaves(status); + --- === Soil moisture irrigation tables === CREATE TABLE IF NOT EXISTS soil_moisture_events ( id SERIAL PRIMARY KEY, - zone_id TEXT NOT NULL, + device_id TEXT NOT NULL REFERENCES devices(device_id), ts TIMESTAMPTZ NOT NULL DEFAULT NOW(), dry_ratio REAL NOT NULL, decision TEXT NOT NULL, @@ -300,7 +303,8 @@ CREATE TABLE IF NOT EXISTS soil_moisture_events ( CREATE UNIQUE INDEX IF NOT EXISTS idx_events_idem ON soil_moisture_events (idempotency_key); CREATE TABLE IF NOT EXISTS irrigation_schedule ( - zone_id TEXT PRIMARY KEY, + device_id TEXT PRIMARY KEY REFERENCES devices(device_id), + next_run_at TIMESTAMPTZ NOT NULL, duration_min INT NOT NULL, updated_by TEXT NOT NULL, @@ -310,7 +314,7 @@ CREATE TABLE IF NOT EXISTS irrigation_schedule ( CREATE TABLE IF NOT EXISTS irrigation_schedule_audit ( id SERIAL PRIMARY KEY, - zone_id TEXT NOT NULL, + device_id TEXT NOT NULL, prev_next_run_at TIMESTAMPTZ, prev_duration_min INT, next_run_at TIMESTAMPTZ NOT NULL, @@ -320,6 +324,21 @@ CREATE TABLE IF NOT EXISTS irrigation_schedule_audit ( updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); +CREATE TABLE irrigation_policies ( + device_id TEXT NOT NULL, + prev_state TEXT, + dry_ratio_high REAL, + dry_ratio_low REAL, + min_patches INT, + duration_min INT, + updated_at TIMESTAMP DEFAULT NOW(), + PRIMARY KEY (device_id), + CONSTRAINT fk_device + FOREIGN KEY (device_id) REFERENCES devices(device_id) + ON DELETE CASCADE +); + + -- === Task thresholds (enum + table) === DO $$ BEGIN @@ -412,5 +431,4 @@ CREATE INDEX IF NOT EXISTS ix_event_logs_sensors_start_brin ON event_logs_sens CREATE INDEX IF NOT EXISTS ix_event_logs_sensors_details_gin ON event_logs_sensors USING GIN (details jsonb_path_ops); -CREATE INDEX IF NOT EXISTS ix_alerts_entity_rule ON public.alerts(entity_id, rule); -CREATE INDEX IF NOT EXISTS ix_alerts_status ON public.alerts(status); + diff --git a/docker-compose.yml b/docker-compose.yml index ea690204e..56fb0f7a1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,17 +1,16 @@ - # ========================== # Docker Compose - AG Cloud # ========================== -version: "3.9" +# version: "3.9" # -------------------------- # Networks # -------------------------- networks: ag_cloud: + name: ag_cloud driver: bridge - # -------------------------- # Volumes # -------------------------- @@ -53,7 +52,7 @@ services: - wal_archive:/var/lib/postgresql/wal_archive - backups:/var/lib/postgresql/backups healthcheck: - test: ["CMD", "pg_isready", "-U", "missions_user", "-d", "missions_db"] + test: [ "CMD", "pg_isready", "-U", "missions_user", "-d", "missions_db" ] interval: 10s timeout: 5s retries: 5 @@ -76,7 +75,6 @@ services: - "9187:9187" networks: - ag_cloud - # ------------------------- # Sound Metrics Service # ------------------------- @@ -96,72 +94,72 @@ services: - MINIO_ENDPOINT=minio-hot:9000 - MINIO_ACCESS_KEY=minioadmin - MINIO_SECRET_KEY=minioadmin123 - - MINIO_BUCKET=telemetry - - MINIO_PREFIX=sounds/ + - MINIO_BUCKET=sound + - MINIO_PREFIX=sounds/ - command: ["python","-u","src/metrics.py"] + command: [ "python", "-u", "src/metrics.py" ] ports: - "8005:8005" depends_on: - minio-hot networks: - - ag_cloud + - ag_cloud restart: unless-stopped + # ------------------------- - # Plant Stress Detector + # Plant Stress Daily Batch # ------------------------- - plant_stress: + plant_stress_daily: build: ./services/plant_stress + env_file: + - ./services/plant_stress/.env + restart: "no" environment: - - INPUT_DIR=/data/inbox - - MODEL_DIR=/models - - POSTGRES_DSN=postgresql://missions_user:pg123@postgres:5432/missions_db - - PERIOD_DAYS=0 - - CONFIDENCE_THRESHOLD=0.6 - # - TF_ENABLE_ONEDNN_OPTS=0 # Disable oneDNN optimizations (for CPU compatibility) + MODEL_DIR: /models + CONFIDENCE_THRESHOLD: "0.60" + TF_CPP_MIN_LOG_LEVEL: "2" + TIMEZONE: Asia/Jerusalem + POSTGRES_DSN: postgresql://missions_user:pg123@postgres:5432/missions_db + MINIO_ENDPOINT: minio-hot:9000 + MINIO_ACCESS_KEY: minioadmin + MINIO_SECRET_KEY: minioadmin123 + MINIO_BUCKET: sound + MINIO_PREFIX: plants/ + MINIO_SECURE: "false" + + DEFAULT_AREA: unknown + DEFAULT_LAT: 0.0 + DEFAULT_LON: 0.0 + DEFAULT_IMAGE_URL: https://example.com/placeholder.jpg + DEFAULT_VOD: https://example.com/placeholder.mp4 + DEFAULT_HLS: https://example.com/placeholder.m3u8 + # ==== Alerts → Kafka ==== + ENABLE_ALERTS: "true" + KAFKA_BOOTSTRAP: "kafka:9092" + ALERT_TOPIC: "alerts" + ALERT_TYPE: "plant_drought_detected" + KAFKA_CLIENT_ID: "plant-stress-producer" + command: ["python","-u","/app/predict_minio_daily.py"] volumes: - "./services/plant_stress/models:/models:ro" - - "./services/plant_stress/samples:/data/inbox:ro" - depends_on: - - postgres - command: ["python", "-u", "/app/app.py"] - networks: - - ag_cloud - - - flink_writer_db: - build: - context: ./services/flink_writer_db - dockerfile: Dockerfile.flink - container_name: flink_writer_db - environment: - - KAFKA_BROKERS=kafka:9092 - - TOPICS=sensor_zone_stats,sensor_anomalies - - DB_API_BASE=http://db_api_service:8001 - - DB_API_AUTH_MODE=service - - DB_API_SERVICE_NAME=flink-writer-db - - DB_API_TOKEN_FILE=/opt/app/secrets/db_api_token - - FLINK_PARALLELISM=1 - depends_on: - kafka: - condition: service_healthy - db_api_service: - condition: service_started - networks: - - ag_cloud - restart: unless-stopped - - - + postgres: + condition: service_healthy + minio-hot: + condition: service_healthy + mc-bootstrap: + condition: service_started + kafka: + condition: service_healthy + networks: [ag_cloud] # ------------------------- - # MQTT + Kafka + Connect + Init + # MQTT + Kafka + MQTT-router # ------------------------- kafka: - build: + build: context: ./mqtt_and_kafka/kafka dockerfile: dockerfile container_name: kafka @@ -186,7 +184,7 @@ services: networks: - ag_cloud healthcheck: - test: ["CMD-SHELL", "/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list >/dev/null 2>&1 || exit 1"] + test: [ "CMD-SHELL", "/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list >/dev/null 2>&1 || exit 1" ] interval: 10s timeout: 5s retries: 20 @@ -194,7 +192,7 @@ services: mosquitto: image: eclipse-mosquitto:2.0 container_name: mosquitto - command: ["mosquitto", "-c", "/mqtt_and_kafka/mosquitto/config/mosquitto.conf"] + command: [ "mosquitto", "-c", "/mqtt_and_kafka/mosquitto/config/mosquitto.conf" ] ports: - "1883:1883" volumes: @@ -205,60 +203,37 @@ services: networks: - ag_cloud healthcheck: - test: ["CMD", "mosquitto_sub", "-h", "localhost", "-p", "1883", "-t", "$$SYS/#", "-C", "1", "-W", "15"] + test: [ "CMD", "mosquitto_sub", "-h", "localhost", "-p", "1883", "-t", "$$SYS/#", "-C", "1", "-W", "15" ] interval: 10s timeout: 5s retries: 12 - connect: - build: - context: ./mqtt_and_kafka - dockerfile: connect.Dockerfile - image: local/connect-with-mqtt:1.0.0 - container_name: connect - depends_on: - kafka: - condition: service_healthy - mosquitto: - condition: service_healthy - ports: - - "8083:8083" - environment: - - CONNECT_BOOTSTRAP_SERVERS=kafka:9092 - - CONNECT_GROUP_ID=agcloud-connect - - CONNECT_CONFIG_STORAGE_TOPIC=_connect_configs - - CONNECT_OFFSET_STORAGE_TOPIC=_connect_offsets - - CONNECT_STATUS_STORAGE_TOPIC=_connect_status - - CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 - - CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 - - CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 - - CONNECT_KEY_CONVERTER=org.apache.kafka.connect.storage.StringConverter - - CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.storage.StringConverter - - CONNECT_REST_ADVERTISED_HOST_NAME=localhost - - CONNECT_PLUGIN_PATH=/usr/share/java,/usr/share/confluent-hub-components - networks: - - ag_cloud - healthcheck: - test: ["CMD", "curl", "-sf", "http://localhost:8083/connectors"] - interval: 10s - timeout: 5s - retries: 12 + mqtt-router: + build: + context: ./mqtt_and_kafka/mqtt-router + image: local/mqtt-router:1.0.0 + depends_on: + kafka: + condition: service_healthy + mosquitto: + condition: service_healthy + environment: + - MQTT_HOST=mosquitto + - MQTT_PORT=1883 + - MQTT_TOPIC_FILTER=mqtt/# - init-connector: - image: curlimages/curl:8.7.1 - depends_on: - connect: - condition: service_healthy - volumes: - - ./mqtt_and_kafka/connectors:/connectors - networks: - - ag_cloud - entrypoint: > - sh -c " - echo '==> Creating MQTT connector...'; - curl -X POST -H 'Content-Type: application/json' --data @/connectors/mqtt-source.json http://connect:8083/connectors; - echo '==> Done.'; - " + - KAFKA_BOOTSTRAP=kafka:9092 + - CREATE_TOPICS=false + - DEFAULT_PARTITIONS=1 + - DEFAULT_REPLICATION=1 + networks: + - ag_cloud + restart: unless-stopped + healthcheck: + test: ["CMD", "python", "-c", "import socket; socket.create_connection(('mosquitto',1883),3); socket.create_connection(('kafka',9092),3)"] + interval: 15s + timeout: 5s + retries: 5 # -------------------------- # GUI / Runner / Gateway @@ -348,6 +323,15 @@ services: networks: - ag_cloud + pushgateway: + image: prom/pushgateway:v1.8.0 + container_name: pushgateway + ports: + - "9091:9091" + networks: + - ag_cloud + restart: unless-stopped + # -------------------------- # Desktop App # -------------------------- @@ -361,19 +345,24 @@ services: - DISPLAY=host.docker.internal:0.0 - GATEWAY_URL=http://sensors_metrics:8000 - NOTIFICATION_API_URL=http://notification_api:5000 + + - API_BASE_URL=http://db_api_service:8001 + - AUTH_BOOTSTRAP_URL=http://db_api_service:8001/auth/_dev_bootstrap + - ALERTS_WS_URL=ws://alerts-gateway:8000/ws/alerts ports: - "5900:5900" - "8080:8080" depends_on: - db_api_service - notification_api + - alerts-gateway volumes: - - ./GUI/src/vast:/app/src/vast + - ./GUI/src/vast:/app/src/vast + - ./templates:/app/templates:ro networks: - ag_cloud restart: unless-stopped - # -------------------------- # Large Mosquitto # -------------------------- @@ -399,12 +388,51 @@ services: MINIO_PROMETHEUS_AUTH_TYPE: public MINIO_ROOT_USER: minioadmin MINIO_ROOT_PASSWORD: minioadmin123 + + # ===== IMAGE NOTIFIERS ===== + MINIO_NOTIFY_KAFKA_ENABLE_aerial: "on" + MINIO_NOTIFY_KAFKA_BROKERS_aerial: "kafka:9092" + MINIO_NOTIFY_KAFKA_TOPIC_aerial: "image.new.aerial" + + MINIO_NOTIFY_KAFKA_ENABLE_air: "on" + MINIO_NOTIFY_KAFKA_BROKERS_air: "kafka:9092" + MINIO_NOTIFY_KAFKA_TOPIC_air: "image.new.air" + + MINIO_NOTIFY_KAFKA_ENABLE_fruits: "on" + MINIO_NOTIFY_KAFKA_BROKERS_fruits: "kafka:9092" + MINIO_NOTIFY_KAFKA_TOPIC_fruits: "image.new.fruits" + + MINIO_NOTIFY_KAFKA_ENABLE_leaves: "on" + MINIO_NOTIFY_KAFKA_BROKERS_leaves: "kafka:9092" + MINIO_NOTIFY_KAFKA_TOPIC_leaves: "image.new.leaves" + + MINIO_NOTIFY_KAFKA_ENABLE_ground: "on" + MINIO_NOTIFY_KAFKA_BROKERS_ground: "kafka:9092" + MINIO_NOTIFY_KAFKA_TOPIC_ground: "image.new.ground" + + MINIO_NOTIFY_KAFKA_ENABLE_field: "on" + MINIO_NOTIFY_KAFKA_BROKERS_field: "kafka:9092" + MINIO_NOTIFY_KAFKA_TOPIC_field: "image.new.field" + + # ===== SOUND NOTIFIERS ===== + MINIO_NOTIFY_KAFKA_ENABLE_plants: "on" + MINIO_NOTIFY_KAFKA_BROKERS_plants: "kafka:9092" + MINIO_NOTIFY_KAFKA_TOPIC_plants: "sound.new.plants" + + MINIO_NOTIFY_KAFKA_ENABLE_sounds: "on" + MINIO_NOTIFY_KAFKA_BROKERS_sounds: "kafka:9092" + MINIO_NOTIFY_KAFKA_TOPIC_sounds: "sound.new.sounds" + + # ===== SECURITY NOTIFIER ===== + MINIO_NOTIFY_KAFKA_ENABLE_security: "on" + MINIO_NOTIFY_KAFKA_BROKERS_security: "kafka:9092" + MINIO_NOTIFY_KAFKA_TOPIC_security: "image.new.security" ports: - - "9001:9000" # HOT S3 - - "9002:9001" # HOT Console - networks: [ag_cloud] + - "9001:9000" # HOT S3 + - "9002:9001" # HOT Console + networks: [ ag_cloud ] healthcheck: - test: ["CMD", "curl", "-fsS", "http://localhost:9000/minio/health/ready"] + test: [ "CMD", "curl", "-fsS", "http://localhost:9000/minio/health/ready" ] interval: 3s timeout: 2s retries: 40 @@ -420,11 +448,11 @@ services: MINIO_ROOT_USER: minioadmin MINIO_ROOT_PASSWORD: minioadmin123 ports: - - "9101:9000" # COLD S3 - - "9102:9001" # COLD Console - networks: [ag_cloud] + - "9101:9000" # COLD S3 + - "9102:9001" # COLD Console + networks: [ ag_cloud ] healthcheck: - test: ["CMD", "curl", "-fsS", "http://localhost:9000/minio/health/ready"] + test: [ "CMD", "curl", "-fsS", "http://localhost:9000/minio/health/ready" ] interval: 3s timeout: 2s retries: 40 @@ -434,6 +462,7 @@ services: mc-bootstrap: build: context: ./storage_with_mqtt/storage/Lifecycle_rules/minio-bootstrap + container_name: mc-bootstrap volumes: - ./storage_with_mqtt/storage/combined_minio_setup/config:/config:ro - ./storage_with_mqtt/data/config:/config @@ -442,7 +471,8 @@ services: condition: service_healthy minio-cold: condition: service_healthy - command: ["/bin/bash","-lc","/entrypoint/init.sh; tail -f /dev/null"] + kafka: + condition: service_healthy environment: MINIO_ROOT_USER: minioadmin MINIO_ROOT_PASSWORD: minioadmin123 @@ -451,8 +481,8 @@ services: MC_ALIAS_HOT: hot MC_ALIAS_COLD: cold BUCKET_IMAGERY: imagery - BUCKET_TELEMETRY: telemetry - networks: [ag_cloud] + BUCKET_SOUND: sound + networks: [ ag_cloud ] restart: unless-stopped # -------------------------- @@ -467,7 +497,7 @@ services: MINIO_ROOT_USER: minioadmin MINIO_ROOT_PASSWORD: minioadmin123 BUCKET_IMAGERY: imagery - BUCKET_TELEMETRY: telemetry + BUCKET_SOUND: sound MQTT_BROKER: large-mosquitto MQTT_PORT: 1885 MQTT_TOPIC: MQTT/imagery/# @@ -494,6 +524,83 @@ services: - ag_cloud restart: unless-stopped + mqtt_ingest_sound: + build: + context: ./storage_with_mqtt/mqtt_images/mqtt_ingest + container_name: mqtt_ingest_sound + environment: + MINIO_ENDPOINT: http://minio-hot:9000 + MINIO_ACCESS_KEY: minioadmin + MINIO_SECRET_KEY: minioadmin123 + S3_BUCKET: sound + MQTT_BROKER: large-mosquitto + MQTT_PORT: 1885 + MQTT_TOPIC: MQTT/sounds/# + MQTT_PUB_TOPIC: sound/sounds/ingested + DEFAULT_PREFIX: MIC-01 + CAMERA_PREFIX: camera + MICROPHONE_PREFIX: microphone + DUMMY_DB: 0 + DB_API_BASE: http://db_api_service:8001 + DB_API_TOKEN: auto + OUTBOX_DIR: /app/outbox + DB_API_AUTH_MODE: service + DB_API_SERVICE_NAME: mqtt_ingest_sound + INGEST_WORKERS: 8 + volumes: + - ./storage_with_mqtt/mqtt_images/outbox:/app/outbox + depends_on: + large-mosquitto: + condition: service_started + minio-hot: + condition: service_healthy + mc-bootstrap: + condition: service_started + db_api_service: + condition: service_started + networks: + - ag_cloud + restart: unless-stopped + + mqtt_ingest_sounds_ultra: + build: + context: ./storage_with_mqtt/mqtt_images/mqtt_ingest + container_name: mqtt_ingest_sounds_ultra + environment: + MINIO_ENDPOINT: http://minio-hot:9000 + MINIO_ACCESS_KEY: minioadmin + MINIO_SECRET_KEY: minioadmin123 + S3_BUCKET: sound + MQTT_BROKER: large-mosquitto + MQTT_PORT: 1885 + MQTT_TOPIC: MQTT/sounds_ultra/# + MQTT_PUB_TOPIC: sound/sounds_ultra/ingested + DEFAULT_PREFIX: MIC-02 + CAMERA_PREFIX: camera + MICROPHONE_PREFIX: microphone + DUMMY_DB: 0 + DB_API_BASE: http://db_api_service:8001 + DB_API_TOKEN: auto + OUTBOX_DIR: /app/outbox + DB_API_AUTH_MODE: service + DB_API_SERVICE_NAME: mqtt_ingest_sounds_ultra + INGEST_WORKERS: 8 + ULTRA_DIR_PREFIX: plants + volumes: + - ./storage_with_mqtt/mqtt_images/outbox:/app/outbox + depends_on: + large-mosquitto: + condition: service_started + minio-hot: + condition: service_healthy + mc-bootstrap: + condition: service_started + db_api_service: + condition: service_started + networks: + - ag_cloud + restart: unless-stopped + mqtt_publisher: build: context: ./storage_with_mqtt/mqtt_images/mqtt_publisher @@ -521,14 +628,10 @@ services: # ------------------------ sounds_classifier: build: - context: ./services/sounds/sounds_classifier + context: ./services/sounds_classifier dockerfile: Dockerfile.classifier-svc - # args: - # CHECKPOINT_URL: "CHECKPOINT=/app/classification/models/panns_data/Cnn14_mAP=0.431.pth" container_name: sounds_classifier restart: unless-stopped - # env_file: - # - ./services/sounds/sounds_classifier/src/classification/.env environment: # Runtime mode - DEVICE=cpu @@ -547,7 +650,8 @@ services: # Kafka - KAFKA_BROKERS=kafka:9092 - - ALERTS_TOPIC=dev-robot-alerts + - ALERTS_TOPIC=alerts + - ENABLE_ALERTS=true # MinIO - MINIO_ENDPOINT=minio-hot:9000 @@ -556,7 +660,7 @@ services: - MINIO_SECURE=false # Request validation - - ALLOWED_BUCKETS=imagery + - ALLOWED_BUCKETS=sound - ALLOWED_CONTENT_TYPES=audio/wav,audio/x-wav,audio/mpeg,audio/flac,audio/ogg,audio/mp4 - MAX_BYTES=104857600 @@ -578,7 +682,7 @@ services: networks: - ag_cloud healthcheck: - test: ["CMD", "curl", "-fsS", "http://localhost:8088/health"] + test: [ "CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8088/health').read()" ] interval: 45s timeout: 5s retries: 10 @@ -588,8 +692,6 @@ services: # DB API Service # -------------------------- - - contracts-gen: build: context: ./services/db_api_service @@ -607,7 +709,6 @@ services: - ag_cloud restart: "no" - db_api_service: build: context: ./services/db_api_service @@ -616,16 +717,18 @@ services: env_file: - ./services/db_api_service/.env environment: - DB_DSN: postgresql+psycopg://missions_user:pg123@host.docker.internal:5432/missions_db + DB_DSN: postgresql+psycopg://missions_user:pg123@postgres:5432/missions_db ENV: dev JWT_SECRET: change-me-please-very-secret JWT_ALGO: HS256 ACCESS_TTL_MIN: 15 REFRESH_TTL_DAYS: 14 DEV_SA_NAME: my-ingest-service + ADDR: 0.0.0.0 ports: - "8001:8001" volumes: + - ./services/db_api_service/app:/app/app - contracts:/app/app/contracts depends_on: contracts-gen: @@ -636,10 +739,9 @@ services: - ag_cloud restart: unless-stopped - notification_api: build: - context: ./services/sounds/API-development/src + context: ./services/API-notifications/src dockerfile: Dockerfile container_name: notification_api environment: @@ -649,20 +751,53 @@ services: depends_on: - postgres - + ripeness-api: + build: + context: ./services/ripeness-ml + dockerfile: deploy/Dockerfile + image: ripeness-api:latest + environment: + - PGHOST=postgres + - PGPORT=5432 + - PGDATABASE=missions_db + - PGUSER=missions_user + - PGPASSWORD=pg123 + - MINIO_ENDPOINT=minio-hot:9000 + - MINIO_SECURE=false + - MINIO_ACCESS_KEY=minioadmin + - MINIO_SECRET_KEY=minioadmin123 + - MODEL_NAME=best_conditional + - BATCH_LIMIT=500 + - FRUITS=Apple,Banana,Orange + depends_on: + - postgres + - minio-hot + volumes: + - ./services/ripeness-ml/checkpoints:/app/checkpoints + - ./services/ripeness-ml/configs:/app/configs + - ./services/ripeness-ml/model:/app/model + container_name: ripeness-api + networks: [ ag_cloud ] + ports: + - "8091:8088" + restart: unless-stopped + # -------------------------- # Flink JobManager & TaskManager # -------------------------- + + + flink-jobmanager: build: context: ./streaming/flink dockerfile: Dockerfile.flink-py - image: agcloud-flink-py:1.18 + image: agcloud-flink-py:1.18 container_name: flink-jobmanager command: jobmanager ports: - "8081:8081" - networks: [ag_cloud] + networks: [ ag_cloud ] environment: - | FLINK_PROPERTIES= @@ -678,7 +813,7 @@ services: fs.s3a.connection.ssl.enabled: false python.client.executable: /usr/bin/python3 python.executable: /usr/bin/python3 - - HTTP_INFER_URL=http://fruit-inference-http:8000/infer_json + - HTTP_INFER_URL=http://fruit-inference-http:8004/infer_json volumes: - ./streaming/flink/jobs:/opt/flink/jobs:ro - ./streaming/flink/connectors/flink-json-1.18.1.jar:/opt/flink/lib/flink-json-1.18.1.jar:ro @@ -689,6 +824,50 @@ services: - ./streaming/flink/connectors/snappy-java-1.1.10.5.jar:/opt/flink/lib/snappy-java-1.1.10.5.jar:ro restart: unless-stopped + audio_compression: + build: + context: ./services/compression + dockerfile: Dockerfile + container_name: audio_compression + environment: + - RAW_MAX_AGE_DAYS=30 + - COMPRESSION_CODEC=opus + - COMPRESSED_MAX_AGE_DAYS=90 + - CHECK_INTERVAL_SECONDS=3600 + - MINIO_ENDPOINT=minio-hot:9000 + - ACCESS_KEY=minioadmin + - SECRET_KEY=minioadmin123 + - BUCKET_NAME=imagery + depends_on: + minio-hot: + condition: service_healthy + mc-bootstrap: + condition: service_started + networks: + - ag_cloud + restart: unless-stopped + + flink_writer_db: + build: + context: ./services/flink_writer_db + dockerfile: Dockerfile.flink + container_name: flink_writer_db + environment: + - KAFKA_BROKERS=kafka:9092 + - TOPICS=sensor_zone_stats,sensor_anomalies,image_new_security_connections,alerts,image_new_aerial_connections,aerial_images_metadata,aerial_image_object_detections,aerial_image_anomaly_detections,aerial_images_complete_metadata,field_polygons,aerial_image_segmentation,sound_new_sounds_connections,sound_new_plants_connections + - DB_API_BASE=http://db_api_service:8001 + - DB_API_AUTH_MODE=service + - DB_API_SERVICE_NAME=flink-writer-db + - DB_API_TOKEN_FILE=/opt/app/secrets/db_api_token + - FLINK_PARALLELISM=1 + depends_on: + kafka: + condition: service_healthy + db_api_service: + condition: service_started + networks: + - ag_cloud + restart: unless-stopped flink-taskmanager: image: agcloud-flink-py:1.18 @@ -697,11 +876,12 @@ services: depends_on: flink-jobmanager: condition: service_started - networks: [ag_cloud] + networks: [ ag_cloud ] environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: flink-jobmanager + rpc.ask.timeout: 30s parallelism.default: 2 taskmanager.numberOfTaskSlots: 2 jobmanager.memory.process.size: 1600m @@ -713,7 +893,7 @@ services: fs.s3a.connection.ssl.enabled: false python.client.executable: /usr/bin/python3 python.executable: /usr/bin/python3 - - HTTP_INFER_URL=http://fruit-inference-http:8000/infer_json + - HTTP_INFER_URL=http://fruit-inference-http:8004/infer_json volumes: - ./streaming/flink/connectors/flink-json-1.18.1.jar:/opt/flink/lib/flink-json-1.18.1.jar:ro - ./streaming/flink/connectors/flink-sql-connector-kafka-3.2.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.2.0-1.18.jar:ro @@ -723,31 +903,81 @@ services: - ./streaming/flink/connectors/snappy-java-1.1.10.5.jar:/opt/flink/lib/snappy-java-1.1.10.5.jar:ro restart: unless-stopped - - # -------------------------- # Inference HTTP Service # -------------------------- fruit-inference-http: build: - context: ./services/inference_http + context: ./services/inference_http dockerfile: Dockerfile environment: - - TEAM=fruit - - WEIGHTS_PATH=/app/weights/fruit_cls_best.ts - - MINIO_ENDPOINT=minio-hot:9000 - - MINIO_ACCESS_KEY=minioadmin - - MINIO_SECRET_KEY=minioadmin123 - - MINIO_SECURE=0 + - TEAM=fruit + - WEIGHTS_PATH=/app/weights/fruit_cls_best.ts + - MINIO_ENDPOINT=minio-hot:9000 + - MINIO_ACCESS_KEY=minioadmin + - MINIO_SECRET_KEY=minioadmin123 + - MINIO_SECURE=0 volumes: - ./services/inference_http/weights:/app/weights:ro container_name: fruit-inference-http + networks: [ ag_cloud ] + ports: + - "8011:8004" + restart: unless-stopped + + + soil-inference-http: + build: + context: ./services/inference_http + dockerfile: Dockerfile + environment: + + - TEAM=soil_moisture + - WEIGHTS_PATH=/app/weights/soil_moisture_best.onnx + - MINIO_ENDPOINT=minio-hot:9000 + - MINIO_ACCESS_KEY=minioadmin + - MINIO_SECRET_KEY=minioadmin123 + - MINIO_SECURE=0 + - PG_DSN=postgresql://missions_user:pg123@postgres:5432/missions_db + - KAFKA_BROKERS=kafka:9092 + - KAFKA_TOPIC=irrigation.control + - KAFKA_DLT=irrigation.control.dlq + + + volumes: + - ./services/inference_http/weights:/app/weights:ro + - ./services/inference_http/adapters:/app/adapters + - ./services/inference_http/soil_moisture:/app/soil_moisture + depends_on: + - minio-hot + - postgres + ports: + - "8013:8004" + networks: [ag_cloud] + restart: unless-stopped + + + camera-inference-http: + + build: + context: ./services/inference_http + dockerfile: Dockerfile + environment: + + - TEAM=camera + - WEIGHTS_PATH=/app/weights/yolov8-fruits.pt + - MINIO_ENDPOINT=minio-hot:9000 + - MINIO_ACCESS_KEY=minioadmin + - MINIO_SECRET_KEY=minioadmin123 + - MINIO_SECURE=0 + volumes: + - ./services/inference_http/weights:/app/weights:ro + container_name: camera-inference-http networks: [ag_cloud] ports: - - "8011:8000" + - "8012:8004" restart: unless-stopped - # -------------------------- # Flink Jobs # -------------------------- @@ -758,12 +988,12 @@ services: flink-jobmanager: { condition: service_started } flink-taskmanager: { condition: service_started } fruit-inference-http: { condition: service_started } - networks: [ag_cloud] + networks: [ ag_cloud ] environment: - KAFKA_BOOTSTRAP=kafka:9092 - INPUT_TOPIC=imagery.new.fruit - TEAM=fruit - - HTTP_URL=http://fruit-inference-http:8000/infer_json + - HTTP_URL=http://fruit-inference-http:8004/infer_json - DLQ_TOPIC=dlq.inference.http - GROUP_ID=http-dispatcher-fruit - PARALLELISM=2 @@ -776,27 +1006,237 @@ services: - ./streaming/flink/connectors/kafka-clients-3.2.3.jar:/opt/flink/lib/kafka-clients-3.2.3.jar:ro - ./streaming/flink/connectors/lz4-java-1.8.0.jar:/opt/flink/lib/lz4-java-1.8.0.jar:ro - ./streaming/flink/connectors/snappy-java-1.1.10.5.jar:/opt/flink/lib/snappy-java-1.1.10.5.jar:ro - command: [ - "bash","-lc", - "set -e; - echo 'Waiting for JobManager to accept commands...'; - until /opt/flink/bin/flink list --jobmanager flink-jobmanager:8081 >/dev/null 2>&1; do - echo 'still waiting...'; sleep 3; - done; - echo 'JobManager is ready!'; - /opt/flink/bin/flink run \ - -Dpython.client.executable=/usr/bin/python3 \ - -Dpython.executable=/usr/bin/python3 \ - -Dpipeline.jars=file:///opt/flink/lib/flink-connector-kafka-3.2.0-1.18.jar,file:///opt/flink/lib/flink-sql-connector-kafka-3.2.0-1.18.jar,file:///opt/flink/lib/flink-json-1.18.1.jar \ - --jobmanager flink-jobmanager:8081 \ - --detached \ - --python /opt/flink/jobs/http_dispatcher.py \ - -- \ - --bootstrap kafka:9092 \ - --input-topic imagery.new.fruit \ - --team fruit \ - --http-url http://fruit-inference-http:8000/infer_json \ - --group-id http-dispatcher-fruit \ - --dlq-topic dlq.inference.http; - tail -f /dev/null" - ] + + command: [ "bash", "-lc", "set -e; echo 'Waiting for JobManager to accept commands...'; until /opt/flink/bin/flink list --jobmanager flink-jobmanager:8081 >/dev/null 2>&1; do echo 'still waiting...'; sleep 3; done; echo 'JobManager is ready!'; /opt/flink/bin/flink run -Dpython.client.executable=/usr/bin/python3 -Dpython.executable=/usr/bin/python3 -Dpipeline.jars=file:///opt/flink/lib/flink-connector-kafka-3.2.0-1.18.jar,file:///opt/flink/lib/flink-sql-connector-kafka-3.2.0-1.18.jar,file:///opt/flink/lib/flink-json-1.18.1.jar --jobmanager flink-jobmanager:8081 --detached --python /opt/flink/jobs/http_dispatcher.py -- --bootstrap kafka:9092 --input-topic imagery.new.fruit --team fruit --http-url http://fruit-inference-http:8004/infer_json --group-id http-dispatcher-fruit --dlq-topic dlq.inference.http; tail -f /dev/null" ] + restart: alway + flink-dispatcher-soil: + image: agcloud-flink-py:1.18 + depends_on: + flink-jobmanager: { condition: service_started } + flink-taskmanager: { condition: service_started } + soil-inference-http: { condition: service_started } + networks: [ag_cloud] + environment: + - KAFKA_BOOTSTRAP=kafka:9092 + - INPUT_TOPIC=image.new.ground + - TEAM=soil_moisture + - HTTP_URL=http://soil-inference-http:8004/infer_json + - DLQ_TOPIC=dlq.inference.http + - GROUP_ID=http-dispatcher-soil + - PARALLELISM=1 + - PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3 + volumes: + - ./streaming/flink/jobs:/opt/flink/jobs:ro + - ./streaming/flink/connectors:/opt/flink/lib/connectors:ro + command: [ "bash", "-lc", "set -e; echo 'Waiting...'; until /opt/flink/bin/flink list --jobmanager flink-jobmanager:8081 >/dev/null 2>&1; do echo 'still waiting...'; sleep 3; done; echo 'JobManager is ready!'; /opt/flink/bin/flink run -Dpython.client.executable=/usr/bin/python3 -Dpython.executable=/usr/bin/python3 -Dpipeline.jars=file:///opt/flink/lib/connectors/... --jobmanager flink-jobmanager:8081 --detached --python /opt/flink/jobs/http_dispatcher.py -- --bootstrap kafka:9092 --input-topic image.new.ground --team soil_moisture --http-url http://soil-inference-http:8004/infer_json --group-id http-dispatcher-soil --dlq-topic dlq.inference.http; tail -f /dev/null" ] + + flink-dispatcher-camera: + image: agcloud-flink-py:1.18 + container_name: flink-dispatcher-camera + depends_on: + flink-jobmanager: { condition: service_started } + flink-taskmanager: { condition: service_started } + camera-inference-http: { condition: service_started } + networks: [ag_cloud] + environment: + - KAFKA_BOOTSTRAP=kafka:9092 + - INPUT_TOPIC=imagery.new.camera + - TEAM=camera + - HTTP_URL=http://camera-inference-http:8004/infer_json + - DLQ_TOPIC=dlq.inference.http + - GROUP_ID=http-dispatcher-camera + - PARALLELISM=2 + + - PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3 + volumes: + - ./streaming/flink/jobs:/opt/flink/jobs:ro + - ./streaming/flink/connectors:/opt/flink/lib/connectors:ro + command: [ "bash", "-lc", "set -e; echo 'Waiting for JobManager to accept commands...'; until /opt/flink/bin/flink list --jobmanager flink-jobmanager:8081 >/dev/null 2>&1; do echo 'still waiting...'; sleep 3; done; echo 'JobManager is ready!'; /opt/flink/bin/flink run -Dpython.client.executable=/usr/bin/python3 -Dpython.executable=/usr/bin/python3 -Dpipeline.jars=file:///opt/flink/lib/connectors/flink-connector-kafka-3.2.0-1.18.jar,file:///opt/flink/lib/connectors/flink-sql-connector-kafka-3.2.0-1.18.jar,file:///opt/flink/lib/connectors/flink-json-1.18.1.jar --jobmanager flink-jobmanager:8081 --detached --python /opt/flink/jobs/http_dispatcher.py -- --bootstrap kafka:9092 --input-topic imagery.new.camera --team camera --http-url http://camera-inference-http:8004/infer_json --group-id http-dispatcher-camera --dlq-topic dlq.inference.http; tail -f /dev/null" ] + restart: always + + flink-alerts-job: + build: + context: ./services/alerts_forwarder + dockerfile: Dockerfile.flink + container_name: alerts-forwarder + depends_on: + kafka: + condition: service_healthy + alertmanager_service: + condition: service_started + environment: + - PYTHONPATH=/opt/app + - KAFKA_BROKERS=kafka:9092 + - ALERTMANAGER_SERVICE_URL=http://alertmanager_service:8090/alerts + command: [ "python", "/opt/app/alerts_forwarder.py" ] + networks: + - ag_cloud + restart: unless-stopped + + alertmanager: + image: prom/alertmanager:v0.27.0 + container_name: alertmanager + command: + - "--config.file=/etc/alertmanager/alertmanager.yml" + - "--storage.path=/alertmanager" + - "--log.level=debug" + volumes: + - ./services/alertmanager_service/compose/alertmanager.yml:/etc/alertmanager/alertmanager.yml:ro + ports: + - "9093:9093" + networks: + - ag_cloud + restart: always + + alertmanager_service: + build: + context: ./services/alertmanager_service/src + dockerfile: Dockerfile + container_name: alertmanager_service + ports: + - "8090:8090" + command: [ "uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8090" ] + volumes: + - ./templates:/app/templates:ro + environment: + - CFG_PATH=/app/templates/templates.yml + - ALERTMANAGER_URL=http://alertmanager:9093 + - GATEWAY_URL=http://alerts-gateway:8000/internal/alert + depends_on: + - alertmanager + - alerts-gateway + networks: + - ag_cloud + + alerts-gateway: + build: + context: ./services/alertmanager_service/src + dockerfile: Dockerfile + container_name: alerts_gateway + command: [ "uvicorn", "gateway:app", "--host", "0.0.0.0", "--port", "8000" ] + ports: + - "8010:8000" + networks: + - ag_cloud + + image-linker-jobmanager: + build: + context: ./services/image-linker + dockerfile: Dockerfile.flink + container_name: image-linker-jobmanager + command: jobmanager + ports: + - "8084:8081" + environment: + - JOB_MANAGER_RPC_ADDRESS=image-linker-jobmanager + - KAFKA_BROKERS=kafka:9092 + - CONFIG_PATH=/opt/app/config/topics.yaml + networks: + - ag_cloud + + image-linker-taskmanager: + build: + context: ./services/image-linker + dockerfile: Dockerfile.flink + container_name: image-linker-taskmanager + command: taskmanager + environment: + - JOB_MANAGER_RPC_ADDRESS=image-linker-jobmanager + - KAFKA_BROKERS=kafka:9092 + - CONFIG_PATH=/opt/app/config/topics.yaml + depends_on: + image-linker-jobmanager: + condition: service_started + networks: + - ag_cloud + + image-linker-submitter: + build: + context: ./services/image-linker + dockerfile: Dockerfile.flink + container_name: image-linker-submit + depends_on: + image-linker-jobmanager: + condition: service_started + command: > + bash -lc "sleep 10 && + flink run -m image-linker-jobmanager:8081 -py /opt/app/job_linker.py && + echo 'Image-Linker job submitted successfully' && + sleep 1" + networks: + - ag_cloud + + flink-sounds-http-jobmanager: + build: + context: ./services/sounds_flink + dockerfile: Dockerfile + container_name: flink-sounds-http-jobmanager + command: jobmanager + ports: + - "8083:8081" + environment: + JOB_MANAGER_RPC_ADDRESS: flink-sounds-http-jobmanager + KAFKA_BROKERS: kafka:9092 + SOURCE_TOPIC: sound_new_sounds_connections + SINK_TOPIC: "" + GROUP_ID: flink-classifier-sounds + CLASSIFIER_HTTP_URL: http://sounds_classifier:8088/classify + DEFAULT_PARALLELISM: 2 + KAFKA_START: earliest + PYTHON: /opt/venv/bin/python + FLINK_PYTHON: /opt/venv/bin/python + networks: + - ag_cloud + + flink-sounds-http-taskmanager: + build: + context: ./services/sounds_flink + dockerfile: Dockerfile + container_name: flink-sounds-http-taskmanager + command: taskmanager + depends_on: + flink-sounds-http-jobmanager: + condition: service_started + environment: + JOB_MANAGER_RPC_ADDRESS: flink-sounds-http-jobmanager + PYTHON: /opt/venv/bin/python + FLINK_PYTHON: /opt/venv/bin/python + FLINK_PROPERTIES: |- + jobmanager.rpc.address: flink-sounds-http-jobmanager + taskmanager.numberOfTaskSlots: 2 + networks: + - ag_cloud + + flink-sounds-http-submit: + build: + context: ./services/sounds_flink + dockerfile: Dockerfile + container_name: flink-sounds-http-submit + depends_on: + flink-sounds-http-jobmanager: + condition: service_started + flink-sounds-http-taskmanager: + condition: service_started + command: + - /opt/flink/bin/flink + - run + - -d + - -m + - flink-sounds-http-jobmanager:8081 + - -Dpython.client.executable=/opt/venv/bin/python + - -Dpython.executable=/opt/venv/bin/python + - -py + - /opt/app/flink_job.py + environment: + JOB_MANAGER_RPC_ADDRESS: flink-sounds-http-jobmanager + KAFKA_BROKERS: kafka:9092 + SOURCE_TOPIC: sound_new_sounds_connections + SINK_TOPIC: "" + GROUP_ID: flink-classifier-sounds + CLASSIFIER_HTTP_URL: http://sounds_classifier:8088/classify + DEFAULT_PARALLELISM: 2 + KAFKA_START: earliest + PYTHON: /opt/venv/bin/python + FLINK_PYTHON: /opt/venv/bin/python + networks: + - ag_cloud diff --git a/mqtt_and_kafka/kafka/kafka-files/create-topics.sh b/mqtt_and_kafka/kafka/kafka-files/create-topics.sh index 0ec1b6b6b..5c6c0f9ba 100644 --- a/mqtt_and_kafka/kafka/kafka-files/create-topics.sh +++ b/mqtt_and_kafka/kafka/kafka-files/create-topics.sh @@ -42,6 +42,7 @@ TOPICS=( dev-aerial-images-keys image.new.aerial image.new.aerial.connections + imagery.new.soil_moisture ) # Idempotent creation with retention.ms diff --git a/services/db_api_service/.env.example b/services/db_api_service/.env.example deleted file mode 100644 index a2c4fe7d0..000000000 --- a/services/db_api_service/.env.example +++ /dev/null @@ -1,9 +0,0 @@ -DB_DSN=postgresql+psycopg://missions_user:pg123@host.docker.internal:5432/missions_db - -PORT=8080 - -CONTRACTS_DIR=app/contracts - -ALLOWED_TABLES=["event_logs_sensors","devices"] - -STRICT_UNKNOWN_FIELDS=true diff --git a/services/inference_http/.dockerignore b/services/inference_http/.dockerignore new file mode 100644 index 000000000..e2ae0b5d4 --- /dev/null +++ b/services/inference_http/.dockerignore @@ -0,0 +1,9 @@ +models/soil_moisture/samples/ +models/soil_moisture/tests/ +.venv +__pycache__ +data/ +samples/ +*.jpg +*.png +*.mp4 \ No newline at end of file diff --git a/services/inference_http/Dockerfile b/services/inference_http/Dockerfile index 764a286ed..a01fa34b4 100644 --- a/services/inference_http/Dockerfile +++ b/services/inference_http/Dockerfile @@ -1,22 +1,82 @@ +# ============================================================ +# Unified Inference HTTP Dockerfile (Fruit + Camera + YOLO + Soil Moisture) +# ============================================================ FROM python:3.11-slim -ENV PIP_NO_CACHE_DIR=1 PIP_DEFAULT_TIMEOUT=1200 PIP_DISABLE_PIP_VERSION_CHECK=1 + +ENV PIP_NO_CACHE_DIR=1 \ + PIP_DISABLE_PIP_VERSION_CHECK=1 \ + PIP_DEFAULT_TIMEOUT=1200 + +WORKDIR /app + +RUN apt-get update && apt-get install -y --no-install-recommends \ + libglib2.0-0 \ + libglib2.0-dev \ + libsm6 \ + libxrender1 \ + libxext6 \ + libgl1 \ + libopenblas-dev \ + liblapack-dev \ + && rm -rf /var/lib/apt/lists/* + +RUN python -m pip install --upgrade pip setuptools wheel --only-binary=:all: && \ + pip install --no-cache-dir numpy==1.26.4 --only-binary=:all: && \ + pip install --no-cache-dir --extra-index-url https://download.pytorch.org/whl/cpu \ + torch==2.1.2 torchvision==0.16.2 --only-binary=:all: --upgrade-strategy only-if-needed + + +ENV PIP_NO_CACHE_DIR=1 \ + PIP_DEFAULT_TIMEOUT=1200 \ + PIP_DISABLE_PIP_VERSION_CHECK=1 + WORKDIR /app -RUN python -m pip install --upgrade pip setuptools wheel && \ +# Copy certs dir (may be empty) and trust *.crt if present +RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl && \ + rm -rf /var/lib/apt/lists/* && \ + if [ -d /tmp/certs ] && ls /tmp/certs/*.crt >/dev/null 2>&1; then \ + cp /tmp/certs/*.crt /usr/local/share/ca-certificates/ && \ + chmod 644 /usr/local/share/ca-certificates/*.crt && \ + update-ca-certificates; \ + else \ + echo "No extra CA certs found. Skipping CA update."; \ + fi + +ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt \ + REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt \ + CURL_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt \ + PIP_CERT=/etc/ssl/certs/ca-certificates.crt +RUN printf "[global]\ncert = /etc/ssl/certs/ca-certificates.crt\n" > /etc/pip.conf + +# Python deps +RUN python -m pip install --upgrade pip setuptools wheel certifi && \ pip install --no-cache-dir numpy==1.26.4 --only-binary=:all: RUN pip install --no-cache-dir --extra-index-url https://download.pytorch.org/whl/cpu \ torch==2.1.2 torchvision==0.16.2 --only-binary=:all: --upgrade-strategy only-if-needed - COPY requirements.txt /app/requirements.txt -RUN pip install --no-cache-dir -r /app/requirements.txt --only-binary=:all: --upgrade-strategy only-if-needed +RUN pip install --no-cache-dir -r /app/requirements.txt && \ + pip install --no-cache-dir \ + opencv-python-headless \ + ultralytics==8.2.34 \ + boto3 \ + pillow \ + requests \ + "numpy<2" \ + && rm -rf /root/.cache/pip + COPY app.py model_registry.py /app/ COPY adapters /app/adapters -COPY models /app/models -EXPOSE 8000 -CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"] +COPY models /app/models +COPY weights /app/weights +COPY models/soil_moisture/artifacts /app/artifacts + +EXPOSE 8004 + +CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8004"] diff --git a/services/inference_http/adapters/soil_moisture_runner.py b/services/inference_http/adapters/soil_moisture_runner.py new file mode 100644 index 000000000..7c1624c70 --- /dev/null +++ b/services/inference_http/adapters/soil_moisture_runner.py @@ -0,0 +1,158 @@ + +""" +Adapter for soil moisture inference in the generic HTTP inference flow. +Uses the shared inference logic from the soil-moisture service. +""" + +import os +import base64 +import logging +import sys +from typing import Any, Dict, Optional +from PIL import Image +from io import BytesIO +import numpy as np +import cv2 +import time +import re + +logger = logging.getLogger(__name__) + + +class SoilMoistureRunner: + """ + Adapter that wraps the soil moisture inference logic. + """ + + def __init__(self, weights_path: Optional[str] = None, model_tag: Optional[str] = None): + self.model_tag = model_tag + self.weights_path = weights_path + + try: + # Add models directory to path + models_dir = os.path.join(os.path.dirname(__file__), '..', 'models') + if models_dir not in sys.path: + sys.path.insert(0, models_dir) + + # Import soil moisture components + from soil_moisture.src.app.config import Settings, load_zones + from soil_moisture.src.app.inference import Inferencer + from soil_moisture.src.app.db import DB + from soil_moisture.src.app.inference_logic import SoilMoistureInferenceLogic + + logger.info("Initializing SoilMoistureRunner...") + + # Initialize components + self.settings = Settings() + + # Load zones config if available + if hasattr(self.settings, 'zones_file') and self.settings.zones_file: + if os.path.exists(self.settings.zones_file): + self.zones_cfg = load_zones(self.settings.zones_file) + else: + logger.warning(f"zones_file not found: {self.settings.zones_file}") + self.zones_cfg = {} + else: + self.zones_cfg = {} + + self.db = DB(self.settings.pg_dsn) + self.inferencer = Inferencer(self.settings, self.db) + + # Initialize Kafka producer (optional) + producer = None + try: + from soil_moisture.src.app.kafka_producer import ControlProducer + producer = ControlProducer( + self.settings.kafka_brokers, + self.settings.kafka_topic, + self.settings.kafka_dlt + ) + except Exception as e: + logger.warning(f"Kafka producer init failed: {e}") + + # Initialize shared inference logic + self.inference_logic = SoilMoistureInferenceLogic( + settings=self.settings, + db=self.db, + inferencer=self.inferencer, + producer=producer + ) + + logger.info("SoilMoistureRunner initialized successfully!") + + except Exception as e: + logger.error(f"Failed to initialize SoilMoistureRunner: {e}", exc_info=True) + raise + + def run(self, image_bytes: Any, model_tag: Optional[str] = None, + extra: Optional[Dict] = None) -> Dict: + """ + Run soil moisture inference using the shared inference logic. + """ + start_time = time.time() + + try: + bucket_in = extra.get("bucket") if extra else "imagery" + key = extra.get("key") if extra else None + if not key: + return {"error": "missing key"} + + # --- Extract device_id from the key (pattern: path/to/image/dev-id_ts.jpg) --- + def extract_device_id_from_key(key: str) -> str: + filename = key.split("/")[-1] # get "dev-id_ts.jpg" + match = re.match(r"([^_]+)_", filename) # capture part before "_" + if match: + return match.group(1) + return "unknown" + + # --- Decode image --- + img_array = np.frombuffer(image_bytes, np.uint8) + img = cv2.imdecode(img_array, cv2.IMREAD_COLOR) + if img is None: + return {"error": "failed to decode image from bytes"} + + # --- Determine device_id --- + device_id = "unknown" + if extra: + if "device_id" in extra: + device_id = extra["device_id"] + elif "filename" in extra: + device_id = self.inference_logic.extract_device_id(extra["filename"]) + elif "key" in extra: + device_id = extract_device_id_from_key(extra["key"]) + + # --- Convert input to PIL Image --- + if isinstance(image_bytes, bytes): + img = Image.open(BytesIO(image_bytes)) + elif isinstance(image_bytes, str): + img_bytes = base64.b64decode(image_bytes) + img = Image.open(BytesIO(img_bytes)) + elif isinstance(image_bytes, Image.Image): + img = image_bytes + else: + raise ValueError(f"Unsupported input type: {type(image_bytes)}") + + # --- Run inference --- + result = self.inference_logic.infer_from_image(img, device_id) + + return { + "device_id": result["device_id"], + "dry_ratio": result["dry_ratio"], + "decision": result["decision"], + "confidence": result["confidence"], + "patch_count": result["patch_count"], + "duration_min": result.get("duration_min", 0), + "latency_ms_model": result.get("latency_ms", 0), + "ts": result.get("ts"), + "idempotency_key": result.get("idempotency_key"), + "debug": result.get("debug") + } + + except Exception as e: + logger.error(f"Inference failed: {e}", exc_info=True) + latency_ms = int((time.time() - start_time) * 1000) + return { + "error": str(e), + "device_id": locals().get("device_id", "unknown"), + "latency_ms_model": latency_ms + } \ No newline at end of file diff --git a/services/inference_http/app.py b/services/inference_http/app.py index 3a490493d..ba1c20535 100644 --- a/services/inference_http/app.py +++ b/services/inference_http/app.py @@ -62,10 +62,10 @@ def infer_json( # Attempt to run the model with bytes input first try: - result = runner.run(image_bytes) + result = runner.run(image_bytes, extra={"bucket": req.bucket, "key": req.key}) except TypeError: # If the function does not accept bytes, try with URI instead - result = runner.run(s3_uri) + result = runner.run(s3_uri, extra={"bucket": req.bucket, "key": req.key}) latency_ms = int((time.perf_counter() - started) * 1000) return { diff --git a/services/inference_http/model_registry.py b/services/inference_http/model_registry.py index 3c5c0d6df..5c89e1589 100644 --- a/services/inference_http/model_registry.py +++ b/services/inference_http/model_registry.py @@ -1,5 +1,6 @@ -from typing import Any, Dict from adapters.fruit_defect_runner import FruitDefectRunner +from adapters.soil_moisture_runner import SoilMoistureRunner +from adapters.fruit_segmentation_runner import FruitSegmentationRunner class FruitRunner: def __init__(self): @@ -12,4 +13,10 @@ def get_model_runner(team: str): t = (team or "").lower() if t == "fruit": return FruitRunner() + if t == "soil_moisture": + return SoilMoistureRunner() + if t == "fruit_defect": + return FruitDefectRunner() + if t == "camera": + return FruitSegmentationRunner() raise ValueError(f"unknown TEAM {t}") diff --git a/services/inference_http/models/soil_moisture/.gitignore b/services/inference_http/models/soil_moisture/.gitignore new file mode 100644 index 000000000..62d87daf0 --- /dev/null +++ b/services/inference_http/models/soil_moisture/.gitignore @@ -0,0 +1 @@ +samples/ \ No newline at end of file diff --git a/services/inference_http/models/soil_moisture/Dockerfile b/services/inference_http/models/soil_moisture/Dockerfile new file mode 100644 index 000000000..9b1bb644d --- /dev/null +++ b/services/inference_http/models/soil_moisture/Dockerfile @@ -0,0 +1,44 @@ + +FROM python:3.10-slim + +WORKDIR /app + +# --- 1) installing ca-certificates --- +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates curl \ + && rm -rf /var/lib/apt/lists/* + +# --- 2) copying NetFree certificate and adding it to the system --- +COPY netfree-ca.crt /usr/local/share/ca-certificates/netfree-ca.crt +RUN chmod 644 /usr/local/share/ca-certificates/netfree-ca.crt && \ + update-ca-certificates + +# Setting to ensure the updated certificate is used +ENV REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt +ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt +ENV PIP_CERT=/etc/ssl/certs/ca-certificates.crt + +# --- 3) System dependencies required for FastAPI etc. --- +RUN apt-get update && apt-get install -y --no-install-recommends \ + libglib2.0-0 libsm6 libxrender1 libxext6 \ + && rm -rf /var/lib/apt/lists/* + +# --- 4) Installing dependencies --- +COPY requirements-api.txt . +# RUN pip install --trusted-host pypi.org --trusted-host pypi.python.org \ +# --trusted-host files.pythonhosted.org --no-cache-dir -r requirements-api.txt + +RUN pip config set global.require-hashes false && \ + pip install --trusted-host pypi.org --trusted-host pypi.python.org \ + --trusted-host files.pythonhosted.org --no-cache-dir -r requirements-api.txt +# --- 5) Copying code --- +COPY src ./src +COPY configs ./configs +COPY artifacts ./artifacts +COPY src/sql/init_db.sql /initdb/init_db.sql + +ENV PYTHONPATH=/app +ENV SCHEDULE_UPDATE=1 +RUN pip install python-multipart + +CMD ["uvicorn", "src.app.service:app", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file diff --git a/services/inference_http/models/soil_moisture/README.md b/services/inference_http/models/soil_moisture/README.md new file mode 100644 index 000000000..0aacb3f61 --- /dev/null +++ b/services/inference_http/models/soil_moisture/README.md @@ -0,0 +1,119 @@ +# Soil Moisture DL Pipeline – Real-Time Irrigation Control (ONNX Inference) + +This repository delivers an end-to-end **deep learning** pipeline to detect soil moisture state +(**wet / dry**) from ground-level RGB images and trigger **real-time irrigation** actions. + +## Highlights +- **Training (PyTorch)**: MobileNetV3-small (transfer learning) + augmentations. +- **Export** to **ONNX** for light-weight **CPU/Jetson** inference. +- **Inference Service (FastAPI)**: + - Tiling into patches + - Per-patch ONNX inference + - Zone policy with hysteresis (dry_ratio_high / dry_ratio_low / min_patches) + - **Kafka** publish to `irrigation.control` (idempotent) + DLQ + - **Postgres** persistence in `soil_moisture_events` (+ optional schedule UPSERT + audit) + - **Prometheus** metrics + health/ready endpoints + +--- + +## Run + +```bash +docker compose up -d api +``` + +The API will be available at: [http://localhost:8000](http://localhost:8000) + +--- + +## Endpoints + +| Method | Path | Description | +|--------|------|-------------| +| `GET` | `/health` | Basic health check | +| `GET` | `/ready` | Checks DB connectivity | +| `GET` | `/metrics` | Prometheus metrics | +| `POST` | `/infer` | Run inference on uploaded image | + +### Example request + +```bash +curl -X POST "http://localhost:8000/infer" -F "zone_id=zone1" -F "image=@sample.jpg" +``` + +Response: +```json +{ + "device_id": "zone1", + "dry_ratio": 0.42, + "decision": "stop", + "confidence": 0.87, + "patch_count": 48, + "ts": "2025-10-29T09:41:00Z", + "idempotency_key": "zone1:345621" +} +``` + +--- + +## Environment Variables + +| Name | Description | Example | +|------|--------------|----------| +| `PG_DSN` | Postgres connection string | `postgresql://user:pass@host.docker.internal:5432/missions_db` | +| `KAFKA_BROKERS` | Kafka brokers | `kafka:9092` | +| `KAFKA_TOPIC` | Kafka topic for irrigation control | `irrigation.control` | +| `KAFKA_DLT` | Kafka DLQ topic | `irrigation.control.dlq` | +| `ZONES_FILE` | Path to zone configuration | `/app/configs/zones.yaml` | +| `SCHEDULE_UPDATE` | Enables schedule table update | `1` | +| `DECISION_WINDOW_SEC` | Time window for decision hysteresis | `3` | + +--- + +## Notes +- The service depends on Postgres and Kafka within the `ag_cloud` Docker network. +- If Kafka is unreachable, messages are logged but not published. +- Duplicate inferences are prevented using an idempotency key per decision window. +- Metrics exposed for Prometheus under `/metrics`. + +--- + +## Example Compose Context + +```yaml +services: + api: + build: + context: . + dockerfile: Dockerfile + environment: + PG_DSN: postgresql://missions_user:pg123@host.docker.internal:5432/missions_db + KAFKA_BROKERS: kafka:9092 + KAFKA_TOPIC: irrigation.control + KAFKA_DLT: irrigation.control.dlq + ZONES_FILE: /app/configs/zones.yaml + DECISION_WINDOW_SEC: 3 + PATCH_SIZE: 256 + PATCH_STRIDE: 256 + SCHEDULE_UPDATE: 1 + volumes: + - ./configs:/app/configs + - ./artifacts:/app/artifacts + ports: + - "8000:8000" + networks: + - ag_cloud +``` + +--- + +## Testing + +```bash +pytest -v +``` + +--- + +## License +Internal AgCloud component – for research and development use only. diff --git a/services/inference_http/models/soil_moisture/artifacts/best.pt b/services/inference_http/models/soil_moisture/artifacts/best.pt new file mode 100644 index 000000000..4fbed017d Binary files /dev/null and b/services/inference_http/models/soil_moisture/artifacts/best.pt differ diff --git a/services/inference_http/models/soil_moisture/artifacts/label_mapping.json b/services/inference_http/models/soil_moisture/artifacts/label_mapping.json new file mode 100644 index 000000000..7b688603a --- /dev/null +++ b/services/inference_http/models/soil_moisture/artifacts/label_mapping.json @@ -0,0 +1,4 @@ +{ + "0": "dry", + "1": "wet" +} \ No newline at end of file diff --git a/services/inference_http/models/soil_moisture/artifacts/model.onnx b/services/inference_http/models/soil_moisture/artifacts/model.onnx new file mode 100644 index 000000000..6052e846e Binary files /dev/null and b/services/inference_http/models/soil_moisture/artifacts/model.onnx differ diff --git a/services/inference_http/models/soil_moisture/configs/zones.yaml b/services/inference_http/models/soil_moisture/configs/zones.yaml new file mode 100644 index 000000000..fa76afbb7 --- /dev/null +++ b/services/inference_http/models/soil_moisture/configs/zones.yaml @@ -0,0 +1,12 @@ +zones: + ZONE_A: + dry_ratio_high: 0.35 + dry_ratio_low: 0.25 + min_patches: 2 + duration_min: 10 + + ZONE_B: + dry_ratio_high: 0.40 + dry_ratio_low: 0.30 + min_patches: 2 + duration_min: 12 \ No newline at end of file diff --git a/services/inference_http/models/soil_moisture/docker-compose.yml b/services/inference_http/models/soil_moisture/docker-compose.yml new file mode 100644 index 000000000..c23e6b3e3 --- /dev/null +++ b/services/inference_http/models/soil_moisture/docker-compose.yml @@ -0,0 +1,30 @@ +networks: + worktree-main_ag_cloud: + external: true + +services: + api: + build: + context: . + dockerfile: Dockerfile + container_name: soil_api + environment: + PG_DSN: postgresql://missions_user:pg123@host.docker.internal:5432/missions_db + KAFKA_BROKERS: kafka:9092 # host.docker.internal:29092 + KAFKA_TOPIC: irrigation.control + KAFKA_DLT: irrigation.control.dlq + ZONES_FILE: /app/configs/zones.yaml + DECISION_WINDOW_SEC: 3 + PATCH_SIZE: 256 + PATCH_STRIDE: 256 + SCHEDULE_UPDATE: 1 + volumes: + - ./configs:/app/configs + - ./artifacts:/app/artifacts + ports: + - "8000:8000" + + networks: + - worktree-main_ag_cloud + + diff --git a/services/inference_http/models/soil_moisture/requirements-api.txt b/services/inference_http/models/soil_moisture/requirements-api.txt new file mode 100644 index 000000000..6e3ae713a --- /dev/null +++ b/services/inference_http/models/soil_moisture/requirements-api.txt @@ -0,0 +1,14 @@ +fastapi==0.114.2 +uvicorn==0.30.6 +onnxruntime==1.20.0 +numpy==2.1.1 +Pillow==10.4.0 +opencv-python==4.10.0.84 +kafka-python==2.0.2 +psycopg2-binary==2.9.10 +prometheus_client==0.21.0 +PyYAML==6.0.2 +python-dotenv==1.0.1 +requests==2.32.3 +python-multipart==0.0.6 +confluent_kafka==2.12.0 \ No newline at end of file diff --git a/services/inference_http/models/soil_moisture/requirements-train.txt b/services/inference_http/models/soil_moisture/requirements-train.txt new file mode 100644 index 000000000..21a78fde4 --- /dev/null +++ b/services/inference_http/models/soil_moisture/requirements-train.txt @@ -0,0 +1,9 @@ +torch>=2.2.0 +torchvision==0.23.0 +numpy==2.1.1 +Pillow==10.4.0 +opencv-python==4.10.0.84 +scikit-learn==1.5.2 +tqdm==4.66.5 +PyYAML==6.0.2 +onnx==1.19.0 diff --git a/services/inference_http/models/soil_moisture/src/.dockerignore b/services/inference_http/models/soil_moisture/src/.dockerignore new file mode 100644 index 000000000..f5bddaa38 --- /dev/null +++ b/services/inference_http/models/soil_moisture/src/.dockerignore @@ -0,0 +1,2 @@ +models/soil_moisture/samples/ +models/soil_moisture/tests/ \ No newline at end of file diff --git a/services/inference_http/models/soil_moisture/src/app/__init__.py b/services/inference_http/models/soil_moisture/src/app/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/services/inference_http/models/soil_moisture/src/app/config.py b/services/inference_http/models/soil_moisture/src/app/config.py new file mode 100644 index 000000000..719e03fa3 --- /dev/null +++ b/services/inference_http/models/soil_moisture/src/app/config.py @@ -0,0 +1,21 @@ +import os +import yaml +from dataclasses import dataclass +from typing import Dict, Any +from dotenv import load_dotenv +load_dotenv() + +@dataclass +class Settings: + kafka_brokers: str = os.getenv("KAFKA_BROKERS", "localhost:9092") + kafka_topic: str = os.getenv("KAFKA_TOPIC", "irrigation.control") + kafka_dlt: str = os.getenv("KAFKA_DLT", "irrigation.control.dlq") + pg_dsn: str = os.getenv("PG_DSN", "postgresql://postgres:postgres@localhost:5432/soil") + zones_file: str = os.getenv("ZONES_FILE", "configs/zones.yaml") + decision_window_sec: int = int(os.getenv("DECISION_WINDOW_SEC", "1")) + patch_size: int = int(os.getenv("PATCH_SIZE", "256")) + patch_stride: int = int(os.getenv("PATCH_STRIDE", "256")) + +def load_zones(path: str) -> Dict[str, Any]: + with open(path, "r", encoding="utf-8") as f: + return yaml.safe_load(f) diff --git a/services/inference_http/models/soil_moisture/src/app/db.py b/services/inference_http/models/soil_moisture/src/app/db.py new file mode 100644 index 000000000..356f523b2 --- /dev/null +++ b/services/inference_http/models/soil_moisture/src/app/db.py @@ -0,0 +1,134 @@ + +import json +from typing import Optional, Dict, Any +import psycopg2 +import psycopg2.extras +from contextlib import contextmanager + +class DB: + def __init__(self, dsn: str): + self.dsn = dsn + + @contextmanager + def conn(self): + conn = psycopg2.connect(self.dsn) + try: + yield conn + finally: + conn.close() + + def init_ok(self) -> bool: + try: + with self.conn() as c: + with c.cursor() as cur: + cur.execute("SELECT 1") + return True + except Exception: + return False + + def log_event(self, device_id: str, ts_iso: str, dry_ratio: float, + decision: str, confidence: float, patch_count: int, + idem_key: str, extra: Optional[Dict[str, Any]]=None) -> bool: + q = ''' + INSERT INTO soil_moisture_events + (device_id, ts, dry_ratio, decision, confidence, patch_count, idempotency_key, extra) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (idempotency_key) DO NOTHING + ''' + with self.conn() as c: + with c.cursor() as cur: + cur.execute(q, ( + device_id, + ts_iso, + dry_ratio, + decision, + confidence, + patch_count, + idem_key, + json.dumps(extra or {}) + )) + c.commit() + return cur.rowcount > 0 + + def load_device_policy(self, device_id: str) -> dict: + try: + with self.conn() as c: + with c.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + cur.execute(""" + SELECT prev_state, dry_ratio_high, dry_ratio_low, + min_patches, duration_min + FROM irrigation_policies + WHERE device_id = %s + """, (device_id,)) + row = cur.fetchone() + if not row: + print(f"No row found for device_id={device_id}") + raise ValueError("not found") + print(f"Loaded from DB: {dict(row)}") + return dict(row) + except Exception as e: + print(f"Falling back to defaults because: {e}") + # fallback defaults + return { + "prev_state": "stop", + "dry_ratio_high": 0.35, + "dry_ratio_low": 0.25, + "min_patches": 2, + "duration_min": 10 + } + + + def upsert_schedule(self, device_id: str, next_run_at: str, duration_min: int, + updated_by: str, update_reason: str) -> None: + with self.conn() as c: + with c.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + cur.execute("SELECT next_run_at, duration_min FROM irrigation_schedule WHERE device_id=%s", (device_id,)) + prev = cur.fetchone() + cur.execute(''' + INSERT INTO irrigation_schedule(device_id, next_run_at, duration_min, updated_by, update_reason) + VALUES (%s, %s, %s, %s, %s) + ON CONFLICT (device_id) DO UPDATE SET + next_run_at=EXCLUDED.next_run_at, + duration_min=EXCLUDED.duration_min, + updated_by=EXCLUDED.updated_by, + update_reason=EXCLUDED.update_reason, + updated_at=NOW() + ''', (device_id, next_run_at, duration_min, updated_by, update_reason)) + cur.execute(''' + INSERT INTO irrigation_schedule_audit(device_id, prev_next_run_at, prev_duration_min, + next_run_at, duration_min, updated_by, update_reason) + VALUES (%s, %s, %s, %s, %s, %s, %s) + ''', (device_id, + prev["next_run_at"] if prev else None, + prev["duration_min"] if prev else None, + next_run_at, duration_min, updated_by, update_reason)) + c.commit() + + def update_prev_state(self, device_id: str, new_state: str) -> None: + # default values for other new fields + default_policy = { + "dry_ratio_high": 0.35, + "dry_ratio_low": 0.25, + "min_patches": 2, + "duration_min": 10 + } + + with self.conn() as c: + with c.cursor() as cur: + cur.execute(""" + INSERT INTO irrigation_policies + (device_id, prev_state, dry_ratio_high, dry_ratio_low, min_patches, duration_min) + VALUES (%s, %s, %s, %s, %s, %s) + ON CONFLICT (device_id) DO UPDATE + SET prev_state = EXCLUDED.prev_state, + updated_at = NOW() + """, ( + device_id, + new_state, + default_policy["dry_ratio_high"], + default_policy["dry_ratio_low"], + default_policy["min_patches"], + default_policy["duration_min"] + )) + c.commit() + diff --git a/services/inference_http/models/soil_moisture/src/app/inference.py b/services/inference_http/models/soil_moisture/src/app/inference.py new file mode 100644 index 000000000..95c4d948c --- /dev/null +++ b/services/inference_http/models/soil_moisture/src/app/inference.py @@ -0,0 +1,106 @@ +from typing import Dict, Any, Tuple +from PIL import Image +import numpy as np +import os, time, logging +from .config import Settings +from .utils import normalize_lighting, tile_image, preprocess_onnx +from .metrics import METRICS +from .onnx_model import ONNXMoistureModel +from .db import DB + +logger = logging.getLogger("soil_api") + +DRY_LABEL = "dry" + +class Inferencer: + def __init__(self, settings: Settings, db: DB, + model_path: str = "artifacts/model.onnx", + label_map_path: str = "artifacts/label_mapping.json"): + self.settings = settings + self.db = db + self.model = ONNXMoistureModel(model_path, label_map_path) + self.classes = [self.model.label_map[str(i)] for i in range(len(self.model.label_map))] + + def decision_window_bucket(self, ts: float) -> int: + return int(ts // self.settings.decision_window_sec) * self.settings.decision_window_sec + + def infer_image(self, img: Image.Image, device_id: str) -> Tuple[Dict[str, Any], Dict[str, Any]]: + + t0 = time.time() + img_n = normalize_lighting(img) + patches = tile_image(img_n, self.settings.patch_size, self.settings.patch_stride) + + dry_votes = 0 + probs = [] + + logger.info("infer_image start device_id=%s patch_size=%d stride=%d total_patches=%d",device_id, self.settings.patch_size, self.settings.patch_stride, len(patches)) + + try: + dry_idx = self.classes.index(DRY_LABEL) + except ValueError: + logger.warning("DRY_LABEL '%s' not found in classes %s", DRY_LABEL, self.classes) + dry_idx = None + + for idx, p in enumerate(patches): + try: + proba = self.model.predict_proba_patch(p) + except Exception as e: + logger.exception("model.predict_proba_patch failed for patch idx=%d: %s", idx, e) + proba = np.zeros(len(self.classes), dtype=float) + + probs.append(proba) + arg = int(proba.argmax()) if proba.size else -1 + arg_label = self.classes[arg] if (arg >= 0 and arg < len(self.classes)) else "unknown" + maxp = float(proba.max()) if proba.size else 0.0 + logger.debug("patch idx=%d arg=%d label=%s maxp=%.4f", idx, arg, arg_label, maxp) + + if dry_idx is not None and arg == dry_idx: + dry_votes += 1 + + mean_confidence = float(np.mean([max(x) for x in probs])) if probs else 0.0 + + patch_count = len(patches) + dry_ratio = dry_votes / max(1, patch_count) + + # Policy / hysteresis + policy = self.db.load_device_policy(device_id) + prev_state = policy.get("prev_state") or "stop" + high = policy.get("dry_ratio_high") or 0.35 + low = policy.get("dry_ratio_low") or 0.25 + min_patches = policy.get("min_patches") or 2 + duration_min = policy.get("duration_min") or 10 + + logger.info("decision inputs prev_state=%s dry_votes=%d patch_count=%d dry_ratio=%.4f high=%.4f low=%.4f min_patches=%d", + prev_state, dry_votes, patch_count, dry_ratio, high, low, min_patches) + + decision = "noop" + if patch_count >= min_patches: + if prev_state != "run" and dry_ratio >= high: + decision = "run" + elif prev_state != "stop" and dry_ratio <= low: + decision = "stop" + else: + logger.debug("hysteresis conditions not met (prev_state=%s)", prev_state) + else: + logger.debug("not enough patches for decision: patch_count=%d min_patches=%d", patch_count, min_patches) + + new_state = decision if decision in ("run", "stop") else prev_state + logger.info("decision result=%s updated_state=%s duration_min=%d confidence=%.4f", + decision, new_state, duration_min, mean_confidence) + + METRICS["inference_latency_ms"].observe((time.time() - t0) * 1000.0) + + result = { + "dry_ratio": float(dry_ratio), + "decision": decision, + "confidence": float(mean_confidence), + "patch_count": int(patch_count), + "duration_min": duration_min + } + debug = { + "probs_shape": (len(probs), len(probs[0]) if probs else 0) + } + if new_state != prev_state: + self.db.update_prev_state(device_id, new_state) + + return result, debug diff --git a/services/inference_http/models/soil_moisture/src/app/inference_logic.py b/services/inference_http/models/soil_moisture/src/app/inference_logic.py new file mode 100644 index 000000000..8da8bae44 --- /dev/null +++ b/services/inference_http/models/soil_moisture/src/app/inference_logic.py @@ -0,0 +1,171 @@ +""" +Shared inference logic that can be used by both the API and adapters. +""" +import logging +import time +import datetime as dt +from typing import Dict, Any, Tuple, Optional +from PIL import Image + +logger = logging.getLogger(__name__) + + +class SoilMoistureInferenceLogic: + """ + Encapsulates the inference logic for soil moisture detection. + This can be used by both the FastAPI service and the Flink adapter. + """ + + def __init__(self, settings, db, inferencer, producer=None): + self.settings = settings + self.db = db + self.inferencer = inferencer + self.producer = producer + + def extract_device_id(self, filename: str) -> str: + """ + Extract device ID from filename in the format device_ts + (For example dev-f_20251106T1030.jpg) + """ + import os + base = os.path.basename(filename) + device_id = base.split("_")[0] + if not device_id: + raise ValueError(f"Invalid filename format: {filename}") + return device_id + + def build_idem_key(self, device_id: str, ts_unix: float) -> str: + """Build idempotency key""" + bucket = self.inferencer.decision_window_bucket(ts_unix) + return f"{device_id}:{int(bucket)}" + + def publish_and_persist( + self, + device_id: str, + decision: str, + duration_min: int, + confidence: float, + dry_ratio: float, + patch_count: int, + idem: str, + ts_iso: str + ) -> bool: + """ + Publish to Kafka and persist to database. + Returns True if saved successfully, False if duplicate. + """ + import json + import os + + payload = { + "device_id": device_id, + "command": decision if decision in ("run", "stop") else "noop", + "reason": "soil_dry", + "duration_min": duration_min if decision == "run" else None, + "confidence": confidence, + "ts": ts_iso, + "idempotency_key": idem + } + + saved = self.db.log_event( + device_id, ts_iso, dry_ratio, payload["command"], + confidence, patch_count, idem, + extra={"dry_ratio": dry_ratio} + ) + + if not saved: + logger.info(json.dumps({ + "msg": "duplicate_idempotency", + "device_id": device_id, + "idem": idem + })) + return False + + # Schedule update + schedule_update = os.getenv('SCHEDULE_UPDATE', '1') == '1' + if schedule_update and decision == 'run': + try: + self.db.upsert_schedule( + device_id, ts_iso, duration_min, + updated_by='soil_api', + update_reason='soil_dry' + ) + except Exception as e: + logger.warning('schedule update failed: %s', e) + + # Publish to Kafka + if self.producer: + self.producer.publish(payload) + else: + logger.warning( + "Kafka producer unavailable; skipping publish. payload=%s", + payload + ) + + return True + + def infer_from_image( + self, + img: Image.Image, + device_id: str, + ts_unix: Optional[float] = None + ) -> Dict[str, Any]: + """ + Run inference on an image and return results. + + Args: + img: PIL Image object + device_id: Device identifier + ts_unix: Unix timestamp (optional, defaults to current time) + + Returns: + Dictionary with inference results including: + - device_id + - dry_ratio + - decision + - confidence + - patch_count + - duration_min + - ts + - idempotency_key + - latency_ms + """ + if ts_unix is None: + ts_unix = time.time() + + start_time = time.time() + ts_iso = dt.datetime.utcfromtimestamp(ts_unix).isoformat() + "Z" + + # Run inference + result, debug = self.inferencer.infer_image(img, device_id) + + # Build idempotency key + idem = self.build_idem_key(device_id, ts_unix) + + # Publish and persist + saved = self.publish_and_persist( + device_id=device_id, + decision=result["decision"], + duration_min=result["duration_min"], + confidence=result["confidence"], + dry_ratio=result["dry_ratio"], + patch_count=result["patch_count"], + idem=idem, + ts_iso=ts_iso + ) + + latency_ms = int((time.time() - start_time) * 1000) + + return { + "device_id": device_id, + "dry_ratio": result["dry_ratio"], + "decision": result["decision"], + "confidence": result["confidence"], + "patch_count": result["patch_count"], + "duration_min": result.get("duration_min", 0), + "ts": ts_iso, + "idempotency_key": idem, + "latency_ms": latency_ms, + "saved": saved, + "debug": debug + } \ No newline at end of file diff --git a/services/inference_http/models/soil_moisture/src/app/kafka_producer.py b/services/inference_http/models/soil_moisture/src/app/kafka_producer.py new file mode 100644 index 000000000..b6052accd --- /dev/null +++ b/services/inference_http/models/soil_moisture/src/app/kafka_producer.py @@ -0,0 +1,39 @@ +import json +import logging +from confluent_kafka import Producer, KafkaError +from .metrics import METRICS + +logger = logging.getLogger("soil_api") + +class ControlProducer: + def __init__(self, brokers: str, topic: str, dlt: str): + self.topic = topic + self.dlt = dlt + self.producer = Producer({"bootstrap.servers": brokers}) + + def publish(self, payload: dict) -> None: + try: + self.producer.produce( + self.topic, + value=json.dumps(payload).encode("utf-8"), + on_delivery=self._delivery_report + ) + self.producer.flush(2) + METRICS["alerts_sent_total"].labels(decision=payload.get("command", "unknown")).inc() + except Exception as e: + logger.warning("Kafka publish failed: %s", e) + METRICS["kafka_publish_errors_total"].labels(reason=type(e).__name__).inc() + # try send to DLT + try: + dlt_payload = dict(payload) + dlt_payload["error"] = str(e) + self.producer.produce(self.dlt, value=json.dumps(dlt_payload).encode("utf-8")) + self.producer.flush(2) + except Exception as e2: + logger.error("DLT publish failed: %s", e2) + + def _delivery_report(self, err, msg): + if err is not None: + logger.warning("Delivery failed for record %s: %s", msg.key(), err) + else: + logger.debug("Record delivered to %s [%d] @ %d", msg.topic(), msg.partition(), msg.offset()) diff --git a/services/inference_http/models/soil_moisture/src/app/metrics.py b/services/inference_http/models/soil_moisture/src/app/metrics.py new file mode 100644 index 000000000..ce38f34de --- /dev/null +++ b/services/inference_http/models/soil_moisture/src/app/metrics.py @@ -0,0 +1,11 @@ +from prometheus_client import Counter, Histogram + +METRICS = { + "alerts_sent_total": Counter("alerts_sent_total", "Total alerts sent", ["decision"]), + "kafka_publish_errors_total": Counter("kafka_publish_errors_total", "Kafka publish errors", ["reason"]), + "inference_latency_ms": Histogram( + "inference_latency_ms", + "Inference latency (ms)", + buckets=(5,10,20,50,100,200,500,1000,2000) + ), +} diff --git a/services/inference_http/models/soil_moisture/src/app/onnx_model.py b/services/inference_http/models/soil_moisture/src/app/onnx_model.py new file mode 100644 index 000000000..c78db3f26 --- /dev/null +++ b/services/inference_http/models/soil_moisture/src/app/onnx_model.py @@ -0,0 +1,22 @@ +import json +import numpy as np +import onnxruntime as ort +from typing import List +from PIL import Image +from .utils import preprocess_onnx + +class ONNXMoistureModel: + def __init__(self, model_path: str, label_map_path: str): + self.sess = ort.InferenceSession(model_path, providers=['CPUExecutionProvider']) + with open(label_map_path, "r", encoding="utf-8") as f: + self.label_map = json.load(f) # index -> label + self.input_name = self.sess.get_inputs()[0].name + self.output_name = self.sess.get_outputs()[0].name + + def predict_proba_patch(self, patch: Image.Image): + x = preprocess_onnx(patch, size=224) + logits = self.sess.run([self.output_name], {self.input_name: x})[0] + # softmax on logits + e = np.exp(logits - logits.max(axis=1, keepdims=True)) + proba = e / e.sum(axis=1, keepdims=True) + return proba[0] diff --git a/services/inference_http/models/soil_moisture/src/app/schemas.py b/services/inference_http/models/soil_moisture/src/app/schemas.py new file mode 100644 index 000000000..336e7c323 --- /dev/null +++ b/services/inference_http/models/soil_moisture/src/app/schemas.py @@ -0,0 +1,14 @@ +from pydantic import BaseModel + +class InferRequest(BaseModel): + device_id: str + image_b64: str # base64-encoded RGB image + +class InferResponse(BaseModel): + device_id: str + dry_ratio: float + decision: str + confidence: float + patch_count: int + ts: str + idempotency_key: str diff --git a/services/inference_http/models/soil_moisture/src/app/service.py b/services/inference_http/models/soil_moisture/src/app/service.py new file mode 100644 index 000000000..36e09cc20 --- /dev/null +++ b/services/inference_http/models/soil_moisture/src/app/service.py @@ -0,0 +1,103 @@ +""" +FastAPI service for soil moisture inference. +Delegates business logic to inference_logic.py +""" +import base64 +import logging +from fastapi import FastAPI, UploadFile, File, HTTPException +from fastapi.responses import PlainTextResponse +from prometheus_client import generate_latest, CONTENT_TYPE_LATEST + +from .config import Settings, load_zones +from .schemas import InferRequest, InferResponse +from .inference import Inferencer +from .kafka_producer import ControlProducer +from .db import DB +from .metrics import METRICS +from .utils import load_image_from_b64 +from .inference_logic import SoilMoistureInferenceLogic + +logging.basicConfig(level=logging.DEBUG, format='%(message)s') +logger = logging.getLogger("soil_api") + +# Initialize components +settings = Settings() +zones_cfg = load_zones(settings.zones_file) +db = DB(settings.pg_dsn) +inferencer = Inferencer(settings, db) + +# Initialize Kafka producer +producer = None +try: + from kafka import KafkaProducer + producer = ControlProducer( + settings.kafka_brokers, + settings.kafka_topic, + settings.kafka_dlt + ) +except Exception as e: + import traceback + logger.warning("Kafka init failed: %s\n%s", e, traceback.format_exc()) + +# Initialize shared inference logic +inference_logic = SoilMoistureInferenceLogic( + settings=settings, + db=db, + inferencer=inferencer, + producer=producer +) + +app = FastAPI(title="Soil Moisture DL API", version="1.0.0") + + +@app.get("/health", response_class=PlainTextResponse) +def health(): + return "ok" + + +@app.get("/ready", response_class=PlainTextResponse) +def ready(): + if not db.init_ok(): + raise HTTPException(status_code=503, detail="DB not ready") + return "ready" + + +@app.get("/metrics") +def metrics(): + return PlainTextResponse(generate_latest(), media_type=CONTENT_TYPE_LATEST) + + +@app.post("/infer", response_model=InferResponse) +async def infer(image: UploadFile = File(None), body: InferRequest | None = None): + """ + Run inference on a soil moisture image. + Accepts either multipart form data (file upload) or JSON with base64 image. + """ + # Parse input + if body is not None: + img = load_image_from_b64(body.image_b64) + device_id = inference_logic.extract_device_id(body.filename) + else: + if image is None: + raise HTTPException( + status_code=400, + detail="Provide multipart (file) or JSON (image_b64)" + ) + filename = image.filename + device_id = inference_logic.extract_device_id(filename) + content = await image.read() + img = load_image_from_b64(base64.b64encode(content).decode("utf-8")) + + # Run inference using shared logic + result = inference_logic.infer_from_image(img, device_id) + + # Return response + return InferResponse( + device_id=result["device_id"], + dry_ratio=result["dry_ratio"], + decision=result["decision"], + confidence=result["confidence"], + patch_count=result["patch_count"], + ts=result["ts"], + idempotency_key=result["idempotency_key"] + ) \ No newline at end of file diff --git a/services/inference_http/models/soil_moisture/src/app/utils.py b/services/inference_http/models/soil_moisture/src/app/utils.py new file mode 100644 index 000000000..2853c5250 --- /dev/null +++ b/services/inference_http/models/soil_moisture/src/app/utils.py @@ -0,0 +1,29 @@ +import base64, io +from PIL import Image, ImageOps +import numpy as np +from typing import List + +def load_image_from_b64(b64: str) -> Image.Image: + data = base64.b64decode(b64) + return Image.open(io.BytesIO(data)).convert("RGB") + +def normalize_lighting(img: Image.Image) -> Image.Image: + r, g, b = img.split() + r, g, b = ImageOps.equalize(r), ImageOps.equalize(g), ImageOps.equalize(b) + return Image.merge("RGB", (r, g, b)) + +def tile_image(img: Image.Image, patch_size: int, stride: int) -> List[Image.Image]: + w, h = img.size + patches = [] + for y in range(0, h - patch_size + 1, stride): + for x in range(0, w - patch_size + 1, stride): + patches.append(img.crop((x, y, x + patch_size, y + patch_size))) + if not patches: + patches.append(img.resize((patch_size, patch_size))) + return patches + +def preprocess_onnx(pil_img: Image.Image, size: int = 224) -> np.ndarray: + img = pil_img.resize((size, size)) + arr = np.asarray(img).astype("float32") / 255.0 + arr = arr.transpose(2,0,1) # HWC -> CHW + return arr[None, :, :, :] # NCHW diff --git a/services/inference_http/models/soil_moisture/src/scripts/consume_once.py b/services/inference_http/models/soil_moisture/src/scripts/consume_once.py new file mode 100644 index 000000000..b035f4017 --- /dev/null +++ b/services/inference_http/models/soil_moisture/src/scripts/consume_once.py @@ -0,0 +1,40 @@ +import argparse +import json +from kafka import KafkaConsumer, errors + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--brokers", default="localhost:29092", help="Broker list, e.g. localhost:29092 or kafka:9092") + parser.add_argument("--topic", default="irrigation.control") + parser.add_argument("--group", default="debug-consumer") + parser.add_argument("--from-beginning", action="store_true", help="Read from earliest offset") + args = parser.parse_args() + + print(f"Connecting to Kafka brokers: {args.brokers}") + try: + consumer = KafkaConsumer( + args.topic, + bootstrap_servers=args.brokers.split(","), + group_id=args.group, + enable_auto_commit=False, + auto_offset_reset="earliest" if args.from_beginning else "latest", + value_deserializer=lambda v: json.loads(v.decode("utf-8")), + consumer_timeout_ms=0 + ) + print(f"Listening on topic '{args.topic}' (group={args.group})...") + except errors.NoBrokersAvailable: + print("❌ Cannot connect to Kafka. Check host/port and Docker networking.") + return + + try: + for message in consumer: + print("\n--- New message ---") + print(json.dumps(message.value, indent=2)) + except KeyboardInterrupt: + print("\nStopped by user.") + finally: + consumer.close() + print("Consumer closed.") + +if __name__ == "__main__": + main() diff --git a/services/inference_http/models/soil_moisture/src/scripts/demo_feed.py b/services/inference_http/models/soil_moisture/src/scripts/demo_feed.py new file mode 100644 index 000000000..3be42924a --- /dev/null +++ b/services/inference_http/models/soil_moisture/src/scripts/demo_feed.py @@ -0,0 +1,48 @@ +import argparse, os, base64, time, json, glob +import requests + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--images-dir", required=True) + ap.add_argument("--api", default="http://localhost:8000") + args = ap.parse_args() + + # Collect all images + imgs = [] + for ext in ("*.jpg", "*.jpeg", "*.png", "*.bmp"): + imgs += glob.glob(os.path.join(args.images_dir, '**', ext), recursive=True) + imgs = sorted(imgs) + + if not imgs: + print("No images found in", args.images_dir) + return + + for path in imgs: + filename = os.path.basename(path) + + # IMPORTANT: The device_id must be encoded inside the filename, + # e.g. device123_20250101T1030.jpg + print(f"Sending {filename} ...") + + try: + with open(path, "rb") as f: + files = {"image": (filename, f)} + r = requests.post( + args.api + "/infer", + files=files, + timeout=60 + ) + + if r.status_code != 200: + print("Error:", r.status_code, r.text) + else: + print(json.dumps(r.json(), indent=2)) + + except Exception as e: + print("Request failed:", e) + + time.sleep(0.4) + + +if __name__ == "__main__": + main() diff --git a/services/inference_http/models/soil_moisture/src/scripts/eval_test_set.py b/services/inference_http/models/soil_moisture/src/scripts/eval_test_set.py new file mode 100644 index 000000000..875834dea --- /dev/null +++ b/services/inference_http/models/soil_moisture/src/scripts/eval_test_set.py @@ -0,0 +1,42 @@ +# src/scripts/eval_test_onnx.py +import json, numpy as np, onnxruntime as ort +from pathlib import Path +from torchvision import transforms +from torchvision.datasets import ImageFolder +from torch.utils.data import DataLoader + +def load_label_mapping(path="artifacts/label_mapping.json"): + with open(path,"r") as f: + return json.load(f) + +def preprocess_pil(img): + tf = transforms.Compose([transforms.Resize((224,224)), transforms.ToTensor()]) + t = tf(img).numpy() + return np.expand_dims(t, axis=0).astype(np.float32) + +def run_onnx_eval(onnx_path="artifacts/model.onnx", test_dir="samples/test", batch_size=16): + label_map = load_label_mapping() + classes = [label_map[str(i)] for i in range(len(label_map))] + ds = ImageFolder(test_dir, transform=None) # we'll read PIL ourselves + loader = DataLoader(ds, batch_size=batch_size, shuffle=False) + + sess = ort.InferenceSession(onnx_path, providers=['CPUExecutionProvider']) + input_name = sess.get_inputs()[0].name + output_name = sess.get_outputs()[0].name + + y_true, y_pred = [], [] + from PIL import Image + for path, label in ds.samples: + img = Image.open(path).convert("RGB") + x = preprocess_pil(img) + logits = sess.run([output_name], {input_name: x})[0] + pred = int(np.argmax(logits, axis=1)[0]) + y_true.append(label) + y_pred.append(pred) + + from sklearn.metrics import classification_report, confusion_matrix + print(classification_report(y_true, y_pred, target_names=classes, digits=4)) + print(confusion_matrix(y_true, y_pred)) + +if __name__ == "__main__": + run_onnx_eval() diff --git a/services/inference_http/models/soil_moisture/src/scripts/print_db_events.py b/services/inference_http/models/soil_moisture/src/scripts/print_db_events.py new file mode 100644 index 000000000..07b41b875 --- /dev/null +++ b/services/inference_http/models/soil_moisture/src/scripts/print_db_events.py @@ -0,0 +1,24 @@ +import argparse, psycopg2, psycopg2.extras, json + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--dsn", default="postgresql://missions_user:pg123@127.0.0.1:5432/missions_db") + ap.add_argument("--limit", type=int, default=10) + args = ap.parse_args() + + conn = psycopg2.connect(args.dsn) + try: + cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) + cur.execute(""" + SELECT id, device_id, ts, dry_ratio, decision, confidence, patch_count, idempotency_key + FROM soil_moisture_events + ORDER BY id DESC + LIMIT %s + """, (args.limit,)) + rows = cur.fetchall() + print(json.dumps(rows, indent=2, default=str)) + finally: + conn.close() + +if __name__ == "__main__": + main() diff --git a/services/inference_http/models/soil_moisture/src/sql/init_db.sql b/services/inference_http/models/soil_moisture/src/sql/init_db.sql new file mode 100644 index 000000000..1476755fc --- /dev/null +++ b/services/inference_http/models/soil_moisture/src/sql/init_db.sql @@ -0,0 +1,47 @@ +CREATE TABLE IF NOT EXISTS soil_moisture_events ( + id SERIAL PRIMARY KEY, + device_id TEXT NOT NULL, + ts TIMESTAMPTZ NOT NULL DEFAULT NOW(), + dry_ratio REAL NOT NULL, + decision TEXT NOT NULL, + confidence REAL NOT NULL, + patch_count INT NOT NULL, + idempotency_key TEXT NOT NULL, + extra JSONB DEFAULT '{}'::jsonb +); + +CREATE UNIQUE INDEX IF NOT EXISTS idx_events_idem ON soil_moisture_events (idempotency_key); + +CREATE TABLE IF NOT EXISTS irrigation_schedule ( + device_id TEXT PRIMARY KEY, + next_run_at TIMESTAMPTZ NOT NULL, + duration_min INT NOT NULL, + updated_by TEXT NOT NULL, + update_reason TEXT NOT NULL, + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS irrigation_schedule_audit ( + id SERIAL PRIMARY KEY, + device_id TEXT NOT NULL, + prev_next_run_at TIMESTAMPTZ, + prev_duration_min INT, + next_run_at TIMESTAMPTZ NOT NULL, + duration_min INT NOT NULL, + updated_by TEXT NOT NULL, + update_reason TEXT NOT NULL, + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE TABLE irrigation_policies ( + device_id TEXT NOT NULL, + prev_state TEXT, + dry_ratio_high REAL, + dry_ratio_low REAL, + min_patches INT, + duration_min INT, + updated_at TIMESTAMP DEFAULT NOW(), + PRIMARY KEY (device_id), + CONSTRAINT fk_device + FOREIGN KEY (device_id) REFERENCES devices(device_id) + ON DELETE CASCADE +); diff --git a/services/inference_http/models/soil_moisture/src/train/train_torch.py b/services/inference_http/models/soil_moisture/src/train/train_torch.py new file mode 100644 index 000000000..db7c89d9a --- /dev/null +++ b/services/inference_http/models/soil_moisture/src/train/train_torch.py @@ -0,0 +1,110 @@ +import argparse, os, json +from pathlib import Path +import numpy as np +from PIL import Image +import torch +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import DataLoader +from torchvision import datasets, transforms, models +from tqdm import tqdm + +def build_dataloaders(train_dir, val_dir, batch_size): + aug = transforms.Compose([ + transforms.RandomResizedCrop(224, scale=(0.7, 1.0)), + transforms.RandomHorizontalFlip(), + transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2), + transforms.ToTensor() + ]) + val_tf = transforms.Compose([ + transforms.Resize((224,224)), + transforms.ToTensor() + ]) + train_ds = datasets.ImageFolder(train_dir, transform=aug) + val_ds = datasets.ImageFolder(val_dir, transform=val_tf) + train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True) + val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True) + return train_loader, val_loader, train_ds.classes + +@torch.no_grad() +def evaluate(model, loader, device): + model.eval() + correct, total = 0, 0 + for x, y in loader: + x, y = x.to(device), y.to(device) + logits = model(x) + pred = logits.argmax(1) + correct += (pred == y).sum().item() + total += y.numel() + return correct / max(1,total) + +def export_onnx(model, out_path, device): + model.eval() + dummy = torch.randn(1,3,224,224, device=device) + out_dir = os.path.dirname(out_path) + os.makedirs(out_dir, exist_ok=True) + torch.onnx.export( + model, dummy, out_path, + input_names=["input"], output_names=["logits"], + opset_version=17, dynamic_axes=None + ) + print("Exported ONNX to", out_path) + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--train-dir", required=True) + ap.add_argument("--val-dir", required=True) + ap.add_argument("--epochs", type=int, default=15) + ap.add_argument("--batch-size", type=int, default=64) + ap.add_argument("--lr", type=float, default=3e-4) + ap.add_argument("--out", required=True) # ONNX output path + args = ap.parse_args() + + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + train_loader, val_loader, classes = build_dataloaders(args.train_dir, args.val_dir, args.batch_size) + + # MobileNetV3-small transfer learning + model = models.mobilenet_v3_small(weights=models.MobileNet_V3_Small_Weights.DEFAULT) + in_features = model.classifier[3].in_features + model.classifier[3] = nn.Linear(in_features, len(classes)) + model.to(device) + + criterion = nn.CrossEntropyLoss() + optimizer = optim.AdamW(model.parameters(), lr=args.lr) + + best_acc = 0.0 + best_pt = "artifacts/best.pt" + os.makedirs("artifacts", exist_ok=True) + + for epoch in range(1, args.epochs+1): + model.train() + pbar = tqdm(train_loader, desc=f"Epoch {epoch}/{args.epochs}") + for x, y in pbar: + x, y = x.to(device), y.to(device) + logits = model(x) + loss = criterion(logits, y) + optimizer.zero_grad() + loss.backward() + optimizer.step() + pbar.set_postfix(loss=float(loss.item())) + + acc = evaluate(model, val_loader, device) + print(f"Val acc: {acc:.4f}") + if acc > best_acc: + best_acc = acc + torch.save(model.state_dict(), best_pt) + + # Export ONNX from best weights + model.load_state_dict(torch.load(best_pt, map_location=device)) + export_onnx(model, args.out, device=device) + + # Save label mapping + lbl_path = os.path.join(os.path.dirname(args.out), "label_mapping.json") + label_mapping = {str(i): cls for i, cls in enumerate(classes)} + with open(lbl_path, "w", encoding="utf-8") as f: + json.dump(label_mapping, f, indent=2) + print("Saved label mapping:", lbl_path) + print("Best val acc:", best_acc) + +if __name__ == "__main__": + main() diff --git a/services/inference_http/models/soil_moisture/tests/conftest.py b/services/inference_http/models/soil_moisture/tests/conftest.py new file mode 100644 index 000000000..5cfe605bc --- /dev/null +++ b/services/inference_http/models/soil_moisture/tests/conftest.py @@ -0,0 +1,10 @@ +import os +import sys + + +# Ensure `app` package (under src/) is importable when running tests +TEST_DIR = os.path.dirname(__file__) +SRC_DIR = os.path.abspath(os.path.join(TEST_DIR, "..", "src")) +if SRC_DIR not in sys.path: + sys.path.insert(0, SRC_DIR) + diff --git a/services/inference_http/models/soil_moisture/tests/test_config_and_schemas.py b/services/inference_http/models/soil_moisture/tests/test_config_and_schemas.py new file mode 100644 index 000000000..0fb9abae9 --- /dev/null +++ b/services/inference_http/models/soil_moisture/tests/test_config_and_schemas.py @@ -0,0 +1,43 @@ +import os +from app.config import load_zones, Settings +from app.schemas import InferRequest, InferResponse + + +def test_load_zones_file_exists_and_parses(): + # Use the repo's zones.yaml + base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) + zones_path = os.path.join(base_dir, "configs", "zones.yaml") + data = load_zones(zones_path) + assert isinstance(data, dict) + assert "zones" in data + assert isinstance(data["zones"], dict) + + +def test_settings_defaults_are_present(): + s = Settings() + # Ensure critical fields exist and are strings/ints + assert isinstance(s.kafka_brokers, str) + assert isinstance(s.kafka_topic, str) + assert isinstance(s.pg_dsn, str) + assert isinstance(s.decision_window_sec, int) + assert isinstance(s.patch_size, int) + assert isinstance(s.patch_stride, int) + + +def test_schemas_models_construction(): + req = InferRequest(device_id="zone-a", image_b64="abcd==") + assert req.device_id == "zone-a" + assert isinstance(req.image_b64, str) + + resp = InferResponse( + device_id="zone-a", + dry_ratio=0.5, + decision="run", + confidence=0.9, + patch_count=4, + ts="2024-01-01T00:00:00Z", + idempotency_key="zone-a:12345", + ) + assert resp.decision in {"run", "stop", "noop"} + assert resp.patch_count == 4 + diff --git a/services/inference_http/models/soil_moisture/tests/test_inference.py b/services/inference_http/models/soil_moisture/tests/test_inference.py new file mode 100644 index 000000000..5906bacfe --- /dev/null +++ b/services/inference_http/models/soil_moisture/tests/test_inference.py @@ -0,0 +1,96 @@ +import sys +import types +import numpy as np +from PIL import Image + +# Pre-inject a lightweight stub for app.onnx_model to avoid importing onnxruntime +stub_mod = types.ModuleType("app.onnx_model") +class _StubONNX: + def __init__(self, *a, **k): + # minimal surface to satisfy Inferencer init + self.label_map = {"0": "dry", "1": "wet"} + def predict_proba_patch(self, patch): + return np.array([0.5, 0.5], dtype=float) +stub_mod.ONNXMoistureModel = _StubONNX +sys.modules.setdefault("app.onnx_model", stub_mod) + +from app.config import Settings +from app import inference as infmod + + +class _FakeDryModel: + def __init__(self, *args, **kwargs): + # emulates label_map: index->label + self.label_map = {"0": "dry", "1": "wet"} + + def predict_proba_patch(self, patch): + # Always predict class 0 (dry) with high confidence + return np.array([0.9, 0.1], dtype=float) + + +class _FakeWetModel: + def __init__(self, *args, **kwargs): + self.label_map = {"0": "dry", "1": "wet"} + + def predict_proba_patch(self, patch): + # Always predict class 1 (wet) + return np.array([0.1, 0.9], dtype=float) + + +def _make_inferencer(monkeypatch, model_cls): + # Replace ONNX model with a lightweight fake + monkeypatch.setattr(infmod, "ONNXMoistureModel", lambda *a, **k: model_cls()) + + s = Settings() + s.patch_size = 10 + s.patch_stride = 10 + s.decision_window_sec = 300 + return infmod.Inferencer(s) + + +def _make_image(w=20, h=10): + return Image.new("RGB", (w, h), color=(128, 128, 128)) + + +def test_decision_run_when_dry_ratio_high(monkeypatch): + inf = _make_inferencer(monkeypatch, _FakeDryModel) + # 20x10 with 10x10 patches & stride 10 => 2 patches + img = _make_image(20, 10) + zone_cfg = {"_state": "stop", "dry_ratio_high": 0.5, "dry_ratio_low": 0.3, "min_patches": 2, "duration_min": 7} + + result, debug = inf.infer_image(img, zone_cfg) + assert result["patch_count"] == 2 + assert result["dry_ratio"] == 1.0 + assert result["decision"] == "run" + assert zone_cfg["_state"] == "run" + + +def test_decision_stop_when_dry_ratio_low_and_prev_run(monkeypatch): + inf = _make_inferencer(monkeypatch, _FakeWetModel) + img = _make_image(20, 10) # 2 patches + zone_cfg = {"_state": "run", "dry_ratio_high": 0.6, "dry_ratio_low": 0.25, "min_patches": 2, "duration_min": 5} + + result, _ = inf.infer_image(img, zone_cfg) + assert result["patch_count"] == 2 + assert result["dry_ratio"] == 0.0 + assert result["decision"] == "stop" + assert zone_cfg["_state"] == "stop" + + +def test_noop_when_not_enough_patches(monkeypatch): + inf = _make_inferencer(monkeypatch, _FakeDryModel) + img = _make_image(20, 10) # 2 patches + zone_cfg = {"_state": "stop", "dry_ratio_high": 0.5, "dry_ratio_low": 0.3, "min_patches": 3, "duration_min": 7} + + result, _ = inf.infer_image(img, zone_cfg) + assert result["patch_count"] == 2 + assert result["decision"] == "noop" + # State remains unchanged + assert zone_cfg["_state"] == "stop" + + +def test_decision_window_bucket_rounds_down(monkeypatch): + inf = _make_inferencer(monkeypatch, _FakeDryModel) + # With window 300s, 1234 -> bucket start 1200 + bucket = inf.decision_window_bucket(1234.0) + assert bucket == 1200 diff --git a/services/inference_http/models/soil_moisture/tests/test_utils.py b/services/inference_http/models/soil_moisture/tests/test_utils.py new file mode 100644 index 000000000..a45ea10e4 --- /dev/null +++ b/services/inference_http/models/soil_moisture/tests/test_utils.py @@ -0,0 +1,58 @@ +import base64 +import io +from PIL import Image +import numpy as np + +from app.utils import ( + load_image_from_b64, + normalize_lighting, + tile_image, + preprocess_onnx, +) + + +def make_rgb_image(w=8, h=6, color=(120, 100, 80)): + return Image.new("RGB", (w, h), color=color) + + +def test_load_image_from_b64_roundtrip(): + img = make_rgb_image(5, 7, (10, 20, 30)) + buf = io.BytesIO() + img.save(buf, format="PNG") + b64 = base64.b64encode(buf.getvalue()).decode("utf-8") + + out = load_image_from_b64(b64) + assert out.mode == "RGB" + assert out.size == (5, 7) + + +def test_normalize_lighting_basic_properties(): + img = make_rgb_image(10, 10, (50, 100, 150)) + out = normalize_lighting(img) + assert out.mode == "RGB" + assert out.size == img.size + + +def test_tile_image_regular_grid(): + img = make_rgb_image(5, 5, (0, 0, 0)) + patches = tile_image(img, patch_size=3, stride=2) + # Positions: x in {0,2}, y in {0,2} => 4 patches + assert len(patches) == 4 + assert all(p.size == (3, 3) for p in patches) + + +def test_tile_image_small_image_resizes_to_single_patch(): + img = make_rgb_image(2, 2, (0, 0, 0)) + patches = tile_image(img, patch_size=4, stride=4) + assert len(patches) == 1 + assert patches[0].size == (4, 4) + + +def test_preprocess_onnx_output_shape_and_range(): + img = make_rgb_image(6, 6, (255, 128, 0)) + arr = preprocess_onnx(img, size=8) + assert arr.shape == (1, 3, 8, 8) + assert arr.dtype == np.float32 + assert np.isfinite(arr).all() + assert arr.min() >= 0.0 and arr.max() <= 1.0 + diff --git a/services/inference_http/requirements.txt b/services/inference_http/requirements.txt index 2e65f4b67..0a6125300 100644 --- a/services/inference_http/requirements.txt +++ b/services/inference_http/requirements.txt @@ -4,3 +4,13 @@ minio pillow numpy==1.26.4 pydantic +onnxruntime==1.20.0 +opencv-python==4.10.0.84 +kafka-python==2.0.2 +psycopg2-binary==2.9.10 +prometheus_client==0.21.0 +PyYAML==6.0.2 +python-dotenv==1.0.1 +requests==2.32.3 +python-multipart==0.0.6 +confluent_kafka==2.12.0 \ No newline at end of file diff --git a/services/inference_http/weights/soil_moisture_best.onnx b/services/inference_http/weights/soil_moisture_best.onnx new file mode 100644 index 000000000..6052e846e Binary files /dev/null and b/services/inference_http/weights/soil_moisture_best.onnx differ diff --git a/services/inference_http/weights/soil_moisture_best.pt b/services/inference_http/weights/soil_moisture_best.pt new file mode 100644 index 000000000..4fbed017d Binary files /dev/null and b/services/inference_http/weights/soil_moisture_best.pt differ diff --git a/services/inference_http/weights/soil_moisture_label_mapping.json b/services/inference_http/weights/soil_moisture_label_mapping.json new file mode 100644 index 000000000..7b688603a --- /dev/null +++ b/services/inference_http/weights/soil_moisture_label_mapping.json @@ -0,0 +1,4 @@ +{ + "0": "dry", + "1": "wet" +} \ No newline at end of file diff --git a/streaming/flink/Dockerfile.flink-py b/streaming/flink/Dockerfile.flink-py index 60664e730..a7394ad8b 100644 --- a/streaming/flink/Dockerfile.flink-py +++ b/streaming/flink/Dockerfile.flink-py @@ -1,6 +1,16 @@ FROM flink:1.18-scala_2.12-java11 USER root + +# === NetFree CA certificate integration === +COPY certs/netfree-ca.crt /usr/local/share/ca-certificates/netfree-ca.crt +RUN chmod 644 /usr/local/share/ca-certificates/netfree-ca.crt && update-ca-certificates + +# Tell pip, requests, and Python SSL to trust NetFree certificate +ENV REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt +ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt + +# === Python + dependencies === RUN apt-get update && apt-get install -y --no-install-recommends \ python3 python3-pip python3-venv curl && \ rm -rf /var/lib/apt/lists/* @@ -17,3 +27,5 @@ ENV PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3 ENV FLINK_PYTHON=/usr/bin/python3 USER flink + + diff --git a/streaming/flink/jobs/start_dispatcher.sh b/streaming/flink/jobs/start_dispatcher.sh new file mode 100644 index 000000000..b38cc3458 --- /dev/null +++ b/streaming/flink/jobs/start_dispatcher.sh @@ -0,0 +1,12 @@ +#!/bin/bash +until /opt/flink/bin/flink list --jobmanager flink-jobmanager:8081 >/dev/null 2>&1; do + sleep 3 +done + +/opt/flink/bin/flink run -Dpython.client.executable=/usr/bin/python3 -Dpython.executable=/usr/bin/python3 \ + --jobmanager flink-jobmanager:8081 --detached \ + --python /opt/flink/jobs/http_dispatcher.py \ + -- --bootstrap $KAFKA_BOOTSTRAP --input-topic $INPUT_TOPIC --team $TEAM \ + --http-url $HTTP_URL --group-id $GROUP_ID --dlq-topic $DLQ_TOPIC + +tail -f /dev/null