Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions configs/mapping.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
topics:
imagery.new.fruit:
name: fruit_defect
runner: http

http:
url_template: "http://inference-http:8000/infer_json/{name}"

kafka:
group_id: "http-dispatcher"
dlq_topic: "dlq.inference.http"
File renamed without changes.
70 changes: 51 additions & 19 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,6 @@ services:
fs.s3a.connection.ssl.enabled: false
python.client.executable: /usr/bin/python3
python.executable: /usr/bin/python3
- HTTP_INFER_URL=http://fruit-inference-http:8000/infer_json
volumes:
- ./streaming/flink/jobs:/opt/flink/jobs:ro
- ./streaming/flink/connectors/flink-json-1.18.1.jar:/opt/flink/lib/flink-json-1.18.1.jar:ro
Expand All @@ -770,6 +769,7 @@ services:
- ./streaming/flink/connectors/kafka-clients-3.2.3.jar:/opt/flink/lib/kafka-clients-3.2.3.jar:ro
- ./streaming/flink/connectors/lz4-java-1.8.0.jar:/opt/flink/lib/lz4-java-1.8.0.jar:ro
- ./streaming/flink/connectors/snappy-java-1.1.10.5.jar:/opt/flink/lib/snappy-java-1.1.10.5.jar:ro
- ./configs/mapping.yml:/etc/dispatcher/mapping.yml:ro
restart: unless-stopped

audio_compression:
Expand Down Expand Up @@ -840,58 +840,64 @@ services:
fs.s3a.connection.ssl.enabled: false
python.client.executable: /usr/bin/python3
python.executable: /usr/bin/python3
- HTTP_INFER_URL=http://fruit-inference-http:8000/infer_json
volumes:
- ./streaming/flink/jobs:/opt/flink/jobs:ro
- ./streaming/flink/connectors/flink-json-1.18.1.jar:/opt/flink/lib/flink-json-1.18.1.jar:ro
- ./streaming/flink/connectors/flink-sql-connector-kafka-3.2.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.2.0-1.18.jar:ro
- ./streaming/flink/connectors/flink-connector-kafka-3.2.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.2.0-1.18.jar:ro
- ./streaming/flink/connectors/kafka-clients-3.2.3.jar:/opt/flink/lib/kafka-clients-3.2.3.jar:ro
- ./streaming/flink/connectors/lz4-java-1.8.0.jar:/opt/flink/lib/lz4-java-1.8.0.jar:ro
- ./streaming/flink/connectors/snappy-java-1.1.10.5.jar:/opt/flink/lib/snappy-java-1.1.10.5.jar:ro
- ./configs/mapping.yml:/etc/dispatcher/mapping.yml:ro
restart: unless-stopped

# --------------------------
# Inference HTTP Service
# --------------------------
fruit-inference-http:
inference-http:
build:
context: ./services/inference_http
dockerfile: Dockerfile
environment:
- TEAM=fruit
- WEIGHTS_PATH=/app/weights/fruit_cls_best.ts
- WEIGHTS_ROOT=/app/services/inference_http/weights
- MINIO_ENDPOINT=minio-hot:9000
- MINIO_ACCESS_KEY=minioadmin
- MINIO_SECRET_KEY=minioadmin123
- MINIO_SECURE=0
volumes:
- ./services/inference_http/weights:/app/weights:ro
container_name: fruit-inference-http
networks: [ ag_cloud ]
- ./services:/app/services:ro
networks: [ag_cloud]
ports:
- "8011:8000"
restart: unless-stopped



# --------------------------
# Flink Jobs
# --------------------------
flink-dispatcher-fruit:
flink-dispatcher:
build:
context: ./streaming/flink
dockerfile: Dockerfile.flink-py
image: agcloud-flink-py:1.18
container_name: flink-dispatcher-fruit
container_name: flink-dispatcher
depends_on:
kafka: {condition: service_started}
flink-jobmanager: { condition: service_started }
flink-taskmanager: { condition: service_started }
fruit-inference-http: { condition: service_started }
networks: [ ag_cloud ]
inference-http: { condition: service_started }
networks: [ag_cloud]
environment:
- KAFKA_BOOTSTRAP=kafka:9092
- INPUT_TOPIC=imagery.new.fruit
- TEAM=fruit
- HTTP_URL=http://fruit-inference-http:8000/infer_json
- DLQ_TOPIC=dlq.inference.http
- GROUP_ID=http-dispatcher-fruit
- PARALLELISM=2
- INPUT_TOPICS=imagery.new.fruit
- CONFIG_PATH=/etc/dispatcher/mapping.yml
- RUNNER=http
- HTTP_URL_TEMPLATE=http://inference-http:8000/infer_json/{name}
- PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3
- GROUP_ID=http-dispatcher
- DLQ_TOPIC=dlq.inference.http
- OK_TOPIC=inference.dispatched
volumes:
- ./streaming/flink/jobs:/opt/flink/jobs:ro
- ./streaming/flink/connectors/flink-connector-kafka-3.2.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.2.0-1.18.jar:ro
Expand All @@ -900,7 +906,33 @@ services:
- ./streaming/flink/connectors/kafka-clients-3.2.3.jar:/opt/flink/lib/kafka-clients-3.2.3.jar:ro
- ./streaming/flink/connectors/lz4-java-1.8.0.jar:/opt/flink/lib/lz4-java-1.8.0.jar:ro
- ./streaming/flink/connectors/snappy-java-1.1.10.5.jar:/opt/flink/lib/snappy-java-1.1.10.5.jar:ro
command: [ "bash", "-lc", "set -e; echo 'Waiting for JobManager to accept commands...'; until /opt/flink/bin/flink list --jobmanager flink-jobmanager:8081 >/dev/null 2>&1; do echo 'still waiting...'; sleep 3; done; echo 'JobManager is ready!'; /opt/flink/bin/flink run -Dpython.client.executable=/usr/bin/python3 -Dpython.executable=/usr/bin/python3 -Dpipeline.jars=file:///opt/flink/lib/flink-connector-kafka-3.2.0-1.18.jar,file:///opt/flink/lib/flink-sql-connector-kafka-3.2.0-1.18.jar,file:///opt/flink/lib/flink-json-1.18.1.jar --jobmanager flink-jobmanager:8081 --detached --python /opt/flink/jobs/http_dispatcher.py -- --bootstrap kafka:9092 --input-topic imagery.new.fruit --team fruit --http-url http://fruit-inference-http:8000/infer_json --group-id http-dispatcher-fruit --dlq-topic dlq.inference.http; tail -f /dev/null" ]
- ./configs/mapping.yml:/etc/dispatcher/mapping.yml:ro # << חשוב!
command: >
bash -lc '
set -euo pipefail;
echo "Waiting for JobManager to accept commands...";
until /opt/flink/bin/flink list --jobmanager flink-jobmanager:8081 >/dev/null 2>&1; do
echo "still waiting..."; sleep 3;
done;
echo "JobManager is ready!";
/opt/flink/bin/flink run \
-Dpython.client.executable=/usr/bin/python3 \
-Dpython.executable=/usr/bin/python3 \
-Dpipeline.jars=file:///opt/flink/lib/flink-connector-kafka-3.2.0-1.18.jar,file:///opt/flink/lib/flink-sql-connector-kafka-3.2.0-1.18.jar,file:///opt/flink/lib/flink-json-1.18.1.jar \
--jobmanager flink-jobmanager:8081 \
--detached \
--python /opt/flink/jobs/http_dispatcher.py \
-- \
--bootstrap "$$KAFKA_BOOTSTRAP" \
--input-topics "$$INPUT_TOPICS" \
--config "$$CONFIG_PATH" \
--runner "$$RUNNER" \
--http-url-template "$$HTTP_URL_TEMPLATE" \
--group-id "$$GROUP_ID" \
--dlq-topic "$$DLQ_TOPIC" \
--ok-topic "$$OK_TOPIC";
tail -f /dev/null
'

flink-alerts-job:
build:
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,27 @@
except Exception:
TQDM_AVAILABLE = False

# --- קונפיג דרך ENV (ברירות מחדל מותאמות לסטאק שלך) ---
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "localhost:9001")
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "minioadmin")
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "minioadmin123")
MINIO_SECURE = os.getenv("MINIO_SECURE", "false").lower() == "true"

BUCKET_INPUT = os.getenv("MINIO_BUCKET_INPUT", "imagery")
BUCKET_OUTPUT = os.getenv("MINIO_BUCKET_OUTPUT", "telemetry")
INPUT_PREFIX = os.getenv("MINIO_INPUT_PREFIX", "inputs/batch1/") # מאיפה לקרוא תמונות
OUTPUT_PREFIX = os.getenv("MINIO_OUTPUT_PREFIX", "results/batch1/") # לאן להעלות תוצאות
INPUT_PREFIX = os.getenv("MINIO_INPUT_PREFIX", "inputs/batch1/")
OUTPUT_PREFIX = os.getenv("MINIO_OUTPUT_PREFIX", "results/batch1/")

WEIGHTS_BUCKET = os.getenv("MINIO_BUCKET_WEIGHTS","imagery")
WEIGHTS_PREFIX = os.getenv("MINIO_WEIGHTS_PREFIX","models/")
LOCAL_WEIGHTS_TS = os.getenv("MODEL_TS_LOCAL", "./outputs/fruit_cls_best.ts")
LOCAL_WEIGHTS_PT = os.getenv("MODEL_PT_LOCAL", "./outputs/fruit_cls_best.pt")

IMG_SIZE = int(os.getenv("IMG_SIZE", "192"))
THRESHOLD = float(os.getenv("CLS_THRESHOLD", "0.5")) # סף בינארי defect/ok
THRESHOLD = float(os.getenv("CLS_THRESHOLD", "0.5"))

DL_WORKERS = int(os.getenv("DL_WORKERS", "8"))
BATCH_SIZE = int(os.getenv("BATCH_SIZE", "8"))
HEARTBEAT_PERIOD = int(os.getenv("HEARTBEAT_PERIOD", "30")) # שניות
HEARTBEAT_PERIOD = int(os.getenv("HEARTBEAT_PERIOD", "30"))

# --- logging config ---
logging.basicConfig(
Expand Down Expand Up @@ -152,21 +151,25 @@ def fetch_weights_if_missing() -> Path:
log.info(f"Downloaded PT weights: {pt_local}")
return pt_local

def load_model(weights_path: Path):
log.info(f"Loading model from: {weights_path}")
if weights_path.suffix == ".ts":
model = torch.jit.load(str(weights_path), map_location="cpu")
else:
obj = torch.load(str(weights_path), map_location="cpu")
if hasattr(obj, "state_dict"):
model = obj
elif isinstance(obj, dict):
raise RuntimeError("Loaded a state_dict dict but no model class is defined here. Please export TorchScript (.ts).")
else:
model = obj
model.eval()
log.info("Model loaded and set to eval()")
return model
def load_model(weights_path, device=None):
p = Path(weights_path)
device = device or ("cuda" if torch.cuda.is_available() else "cpu")

print(f"Loading model from: {p}")

if p.suffix == ".ts":
model = torch.jit.load(str(p), map_location=device)
model.eval()
return model

if p.suffix == ".pt":
model = build_model_architecture()
state = torch.load(str(p), map_location=device)
model.load_state_dict(state)
model.eval()
return model

raise ValueError(f"Unsupported weights suffix: {p.suffix} for {p}")

def get_preprocess():
return transforms.Compose([
Expand All @@ -180,7 +183,7 @@ def infer_single(model, img: Image.Image, preprocess, device="cpu") -> Dict:
t0 = time.perf_counter()
with torch.no_grad():
y = model(x)
dt = (time.perf_counter() - t0) * 1000.0 # ms
dt = (time.perf_counter() - t0) * 1000.0

if isinstance(y, (list, tuple)):
y = y[0]
Expand Down
51 changes: 12 additions & 39 deletions services/inference_http/adapters/fruit_defect_runner.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,12 @@
import os, io
from pathlib import Path
from typing import Any, Dict, Optional

from PIL import Image
import torch

# Core code imported from your fruit-defect module
from models.fruit_defect.inference.infer_fruit_defect import (
load_model, get_preprocess, infer_single
)

# Local weights only
WEIGHTS_PATH = Path(os.getenv("WEIGHTS_PATH", "/app/weights/fruit_cls_best.ts"))

def _ensure_local_weights(p: Path) -> Path:
if not p.exists():
raise FileNotFoundError(f"Local weights not found at: {p}")
return p

class FruitDefectRunner:
def __init__(self, model_tag: Optional[str] = None):
# Allows selecting a different weights file in future via extra/model_tag
weights_path = _ensure_local_weights(WEIGHTS_PATH)
self.model = load_model(weights_path)
self.preprocess = get_preprocess()
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.model = self.model.to(self.device).eval()

def run(self, image_bytes: bytes, model_tag=None, extra=None) -> Dict[str, Any]:
img = Image.open(io.BytesIO(image_bytes)).convert("RGB")
result = infer_single(self.model, img, self.preprocess, device=self.device)
# Normalize to standard HTTP response structure
return {
"label": result.get("status"),
"score": result.get("prob_defect"),
"confidence": result.get("confidence"),
"latency_ms_model": result.get("latency_ms_model"),
}
from typing import Dict, Type
from adapters.fruit_defect_runner import FruitDefectRunner

REGISTRY: Dict[str, Type] = {
"fruit_defect": FruitDefectRunner,
}

def get_adapter(name: str):
key = (name or "").lower()
if key not in REGISTRY:
raise ValueError(f"Unknown model name: {key}")
return REGISTRY[key]()
Loading