diff --git a/PLAN.md b/PLAN.md index bf51cca..c2204d2 100644 --- a/PLAN.md +++ b/PLAN.md @@ -1,86 +1,36 @@ -# PLAN: MES Core — Week 2 (Modbus Machine State Reader) +# PLAN: MES Core — Week 3 (OEE Calculator Service) -**Branch:** `feat/mes-week2-state-reader` -**Issue:** Mikecranesync/MIRA#320 +**Branch:** `feat/mes-week3-oee-calculator` +**Issue:** Mikecranesync/MIRA#321 **PRD:** `docs/PRD-MES-CORE.md` -**Date:** 2026-04-15 -**Depends on:** Week 1 (feat/mes-week1-db-schema) merged +**Date:** 2026-04-16 +**Depends on:** Weeks 1+2 merged to main ✓ --- ## Objective -Build the machine state reader: a background poller that reads the plc-modbus HTTP API every 5 seconds per configured line, detects state transitions (RUNNING/DOWN/IDLE/OFFLINE), writes them to `machine_states`, and exposes `GET /api/mes/lines` and `GET /api/mes/lines/{id}/state` REST endpoints. +60-second tick OEE calculator. Reads ItemCount delta from plc-modbus, +computes Availability/Performance/Quality/OEE/TEEP from machine_states, +writes to oee_snapshots, exposes REST endpoints, and alerts when OEE < 60%. -## Affected Files +## OEE Formula -**New:** -- `services/mes/backend/services/__init__.py` -- `services/mes/backend/services/plc_client.py` — async HTTP client wrapping plc-modbus -- `services/mes/backend/services/state_machine.py` — pure state detection from IO snapshot -- `services/mes/backend/services/state_poller.py` — asyncio background poll loop -- `services/mes/backend/routes/lines.py` — GET /api/mes/lines, GET /lines/{id}/state -- `services/mes/tests/test_machine_states.py` — 10 unit tests, all mocked + Availability = run_time_sec / planned_time_sec + Performance = (ideal_cycle_sec x total_count) / max(run_time_sec, 1) + Quality = good_count / max(total_count, 1) + OEE = Availability x Performance x Quality + TEEP = OEE (no schedule yet; Week 4 wires utilization) -**Modified:** -- `services/mes/requirements.txt` — add httpx -- `services/mes/backend/config.py` — add plc_modbus_url setting -- `services/mes/backend/main.py` — wire poller into lifespan, add lines router -- `docker-compose.yml` — add PLC_MODBUS_URL env to mes container +Clamp all values to [0.0, 1.0]. -## Approach +## New Endpoints -1. `plc_client.py` — thin async wrapper around `GET /api/plc/io` (httpx). Raises `PLCOfflineError` on timeout/connection failure so caller can set OFFLINE state. -2. `state_machine.py` — pure function `detect_state(io_data)` → `(MachineStateEnum, reason_code | None)`. Derived from `VFDStatus` and `ErrorCode` registers. No DB or network calls — fully testable without mocks. -3. `state_poller.py` — asyncio task, one iteration per line every 5s. Maintains in-memory cache to avoid DB reads on every tick. Writes to `machine_states` only on transition. -4. `lines.py` routes — two endpoints: list all lines (from DB), get current state (from in-memory cache + last DB row). -5. `main.py` lifespan — start poller task on startup, cancel on shutdown. - -State transition write: close open row (`ended_at = NOW()`), insert new row. - -## State Machine - -``` -IO: VFDStatus=1, ErrorCode=0 → RUNNING -IO: VFDStatus=2 OR ErrorCode>0 → DOWN (reason_code from ErrorCode map) -IO: VFDStatus=0, ErrorCode=0 → IDLE -HTTP failure / timeout → OFFLINE -``` - -## ErrorCode → reason_code map - -```python -{1: "OVERLOAD", 2: "OVERHEAT", 3: "SENSOR_FAIL", 4: "JAM", 7: "E_STOP"} -``` - -## Risks - -- plc-modbus in mock mode returns VFDStatus=0 at rest — poller sees IDLE immediately (expected) -- Multiple lines share one plc-modbus service currently — same io_data, different `line_id` rows + GET /api/mes/lines/{id}/oee + GET /api/mes/lines/{id}/oee/history?hours=8 + GET /api/mes/oee/summary + GET /api/mes/kpis ## Rollback -```bash -git checkout feat/mes-week1-db-schema -``` - -## Verification Steps - -```bash -# Unit tests (no docker needed) -cd services/mes && pytest tests/test_machine_states.py -v - -# Integration: start stack, check state endpoint -docker compose up mes-db mes plc-modbus -d -curl localhost:8300/api/mes/lines -curl localhost:8300/api/mes/lines//state - -# Inject a fault and verify DB transition -curl -X POST localhost:8001/api/plc/mock/fault -H "Content-Type: application/json" -d '{"fault_type":"jam"}' -sleep 8 -curl localhost:8300/api/mes/lines//state # should show DOWN / JAM -``` - -## Note on Active Focus Window - -Explicitly authorized by Mike (2026-04-15 session). + git checkout main diff --git a/services/mes/backend/config.py b/services/mes/backend/config.py index f76f6b6..a563e8e 100644 --- a/services/mes/backend/config.py +++ b/services/mes/backend/config.py @@ -24,7 +24,10 @@ class Settings(BaseSettings): # Polling interval in seconds (default 5, set lower in tests) plc_poll_interval_sec: int = 5 - # Set True to skip poller startup (useful in unit tests) + # OEE calculator tick interval in seconds (default 60) + oee_tick_sec: int = 60 + + # Set True to skip background task startup (useful in unit tests) plc_use_mock: bool = False diff --git a/services/mes/backend/main.py b/services/mes/backend/main.py index 477dbfc..6240a6c 100644 --- a/services/mes/backend/main.py +++ b/services/mes/backend/main.py @@ -1,12 +1,14 @@ """FactoryLM MES API — FastAPI entry point. Lifespan: - startup → seed state cache, launch background state poller - shutdown → signal poller to stop cleanly + startup → launch state poller + OEE calculator background tasks + shutdown → stop both tasks cleanly Routes (cumulative by week): Week 1: /api/health Week 2: /api/mes/lines, /api/mes/lines/{id}/state + Week 3: /api/mes/lines/{id}/oee, /api/mes/lines/{id}/oee/history + /api/mes/oee/summary, /api/mes/kpis """ import asyncio @@ -19,7 +21,8 @@ from backend.config import settings from backend.routes.health import router as health_router from backend.routes.lines import router as lines_router -from backend.services import state_poller +from backend.routes.oee import router as oee_router +from backend.services import oee_calculator, state_poller logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -30,25 +33,34 @@ async def lifespan(app: FastAPI): db_host = settings.database_url.split("@")[-1] logger.info("MES service starting — DB: %s PLC: %s", db_host, settings.plc_modbus_url) - poller_task = None + poller_task = oee_task = None + if not settings.plc_use_mock: poller_task = asyncio.create_task( state_poller.run(poll_interval_sec=settings.plc_poll_interval_sec), name="state_poller", ) - logger.info("State poller started (interval=%ds)", settings.plc_poll_interval_sec) + oee_task = asyncio.create_task( + oee_calculator.run(tick_sec=settings.oee_tick_sec), + name="oee_calculator", + ) + logger.info( + "Background tasks started — poller=%ds oee_tick=%ds", + settings.plc_poll_interval_sec, settings.oee_tick_sec, + ) else: - logger.info("PLC mock mode — state poller disabled") + logger.info("PLC mock mode — background tasks disabled") yield logger.info("MES service shutting down") - if poller_task: - state_poller.stop() + state_poller.stop() + oee_calculator.stop() + for task in [t for t in [poller_task, oee_task] if t]: try: - await asyncio.wait_for(poller_task, timeout=8.0) - except asyncio.TimeoutError: - poller_task.cancel() + await asyncio.wait_for(task, timeout=8.0) + except (asyncio.TimeoutError, asyncio.CancelledError): + task.cancel() app = FastAPI( @@ -67,6 +79,7 @@ async def lifespan(app: FastAPI): app.include_router(health_router, prefix=settings.api_prefix) app.include_router(lines_router, prefix=settings.api_prefix) +app.include_router(oee_router, prefix=settings.api_prefix) if __name__ == "__main__": diff --git a/services/mes/backend/routes/oee.py b/services/mes/backend/routes/oee.py new file mode 100644 index 0000000..75d882e --- /dev/null +++ b/services/mes/backend/routes/oee.py @@ -0,0 +1,225 @@ +"""OEE routes — snapshots, history, fleet summary, KPIs. + +Week 3 endpoints: + GET /api/mes/lines/{id}/oee + GET /api/mes/lines/{id}/oee/history + GET /api/mes/oee/summary + GET /api/mes/kpis +""" + +import logging +from datetime import datetime, timedelta, timezone +from typing import Optional + +from fastapi import APIRouter, Depends, HTTPException, Query +from pydantic import BaseModel +from sqlalchemy import text +from sqlalchemy.orm import Session + +from backend.database import get_db +from backend.models.db_models import Line, MachineStateEnum, OEESnapshot +from backend.services import oee_calculator, state_poller + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/mes", tags=["oee"]) + + +# ── Response models ─────────────────────────────────────────────────────────── + +class OEESnapshotResponse(BaseModel): + line_id: str + line_name: str + ts: datetime + availability: float + performance: float + quality: float + oee: float + teep: Optional[float] + run_time_sec: int + planned_time_sec: int + total_count: int + good_count: int + ideal_cycle_sec: float + low_oee_ticks: int # consecutive ticks below 60% (alert indicator) + + +class OEEHistoryItem(BaseModel): + ts: datetime + availability: float + performance: float + quality: float + oee: float + total_count: int + run_time_sec: int + + +class OEESummaryItem(BaseModel): + line_id: str + line_name: str + oee: float + availability: float + performance: float + quality: float + teep: Optional[float] + run_state: str + ts: Optional[datetime] + alert: bool # True if low_oee_ticks >= 30 + + +class KPIResponse(BaseModel): + fleet_oee: float # average OEE across all lines (latest tick) + downtime_minutes_today: int # total DOWN + OFFLINE minutes today, all lines + lines_in_alert: int # lines with OEE < 60% for 30+ min + snapshot_ts: datetime + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _latest_snapshot(db: Session, line_id: str) -> Optional[OEESnapshot]: + return ( + db.query(OEESnapshot) + .filter(OEESnapshot.line_id == line_id) + .order_by(OEESnapshot.ts.desc()) + .first() + ) + + +def _downtime_minutes_today(db: Session, line_id: str) -> int: + """Sum minutes in DOWN or OFFLINE state since midnight UTC today.""" + midnight = datetime.now(timezone.utc).replace( + hour=0, minute=0, second=0, microsecond=0 + ) + row = db.execute( + text(""" + SELECT COALESCE(SUM( + EXTRACT(EPOCH FROM ( + COALESCE(ended_at, NOW()) - GREATEST(started_at, :midnight) + )) + ), 0) / 60 + FROM machine_states + WHERE line_id = :lid + AND state IN ('DOWN', 'OFFLINE') + AND COALESCE(ended_at, NOW()) >= :midnight + """), + {"lid": line_id, "midnight": midnight}, + ).scalar() + return int(row or 0) + + +# ── Endpoints ───────────────────────────────────────────────────────────────── + +@router.get("/lines/{line_id}/oee", response_model=OEESnapshotResponse) +def get_line_oee(line_id: str, db: Session = Depends(get_db)): + """Latest OEE snapshot for one line.""" + line = db.query(Line).filter(Line.id == line_id).first() + if not line: + raise HTTPException(status_code=404, detail=f"Line {line_id} not found") + + snap = _latest_snapshot(db, line_id) + if not snap: + raise HTTPException( + status_code=404, + detail="No OEE snapshots yet — calculator ticks every 60s", + ) + + return OEESnapshotResponse( + line_id=line_id, + line_name=line.name, + ts=snap.ts, + availability=snap.availability, + performance=snap.performance, + quality=snap.quality, + oee=snap.oee, + teep=snap.teep, + run_time_sec=snap.run_time_sec, + planned_time_sec=snap.planned_time_sec, + total_count=snap.total_count, + good_count=snap.good_count, + ideal_cycle_sec=snap.ideal_cycle_sec, + low_oee_ticks=oee_calculator.get_low_oee_ticks(line_id), + ) + + +@router.get("/lines/{line_id}/oee/history", response_model=list[OEEHistoryItem]) +def get_line_oee_history( + line_id: str, + hours: int = Query(default=8, ge=1, le=72), + db: Session = Depends(get_db), +): + """OEE time-series for one line over the last N hours (default 8).""" + line = db.query(Line).filter(Line.id == line_id).first() + if not line: + raise HTTPException(status_code=404, detail=f"Line {line_id} not found") + + since = datetime.now(timezone.utc) - timedelta(hours=hours) + snaps = ( + db.query(OEESnapshot) + .filter(OEESnapshot.line_id == line_id, OEESnapshot.ts >= since) + .order_by(OEESnapshot.ts.asc()) + .all() + ) + return [ + OEEHistoryItem( + ts=s.ts, + availability=s.availability, + performance=s.performance, + quality=s.quality, + oee=s.oee, + total_count=s.total_count, + run_time_sec=s.run_time_sec, + ) + for s in snaps + ] + + +@router.get("/oee/summary", response_model=list[OEESummaryItem]) +def get_oee_summary(db: Session = Depends(get_db)): + """Fleet OEE — latest snapshot for every configured line.""" + lines = db.query(Line).order_by(Line.name).all() + result = [] + for line in lines: + line_id = str(line.id) + snap = _latest_snapshot(db, line_id) + state = state_poller.get_cached_state(line_id) + low_ticks = oee_calculator.get_low_oee_ticks(line_id) + result.append(OEESummaryItem( + line_id=line_id, + line_name=line.name, + oee=snap.oee if snap else 0.0, + availability=snap.availability if snap else 0.0, + performance=snap.performance if snap else 0.0, + quality=snap.quality if snap else 0.0, + teep=snap.teep if snap else None, + run_state=state.value if state else MachineStateEnum.OFFLINE.value, + ts=snap.ts if snap else None, + alert=(low_ticks >= oee_calculator.OEE_ALERT_TICKS), + )) + return result + + +@router.get("/kpis", response_model=KPIResponse) +def get_kpis(db: Session = Depends(get_db)): + """Aggregate KPIs: fleet OEE, total downtime minutes today, alert count.""" + lines = db.query(Line).all() + oee_values = [] + total_downtime = 0 + alert_count = 0 + + for line in lines: + line_id = str(line.id) + snap = _latest_snapshot(db, line_id) + if snap: + oee_values.append(snap.oee) + total_downtime += _downtime_minutes_today(db, line_id) + if oee_calculator.get_low_oee_ticks(line_id) >= oee_calculator.OEE_ALERT_TICKS: + alert_count += 1 + + fleet_oee = round(sum(oee_values) / len(oee_values), 4) if oee_values else 0.0 + + return KPIResponse( + fleet_oee=fleet_oee, + downtime_minutes_today=total_downtime, + lines_in_alert=alert_count, + snapshot_ts=datetime.now(timezone.utc), + ) diff --git a/services/mes/backend/services/oee_calculator.py b/services/mes/backend/services/oee_calculator.py new file mode 100644 index 0000000..130c00d --- /dev/null +++ b/services/mes/backend/services/oee_calculator.py @@ -0,0 +1,293 @@ +"""OEE Calculator — 60-second tick per active line. + +Computes Availability, Performance, Quality, OEE, and TEEP from: + - ItemCount delta (HR100) via plc-modbus HTTP + - RUNNING-state duration from machine_states table + - Active work order's ideal_cycle_sec (default 1.0 until Week 4) + +Writes one oee_snapshots row per line per tick. +Raises an in-process alert when a line's OEE < 0.60 for 30+ consecutive +minutes (30 ticks at 60s each). + +Formula: + Availability = run_time_sec / planned_time_sec (clamped 0-1) + Performance = (ideal_cycle_sec * total_count) / run_time_sec (clamped 0-1) + Quality = good_count / total_count (clamped 0-1) + OEE = A * P * Q + TEEP = OEE (utilisation = 1.0 until schedule wired in Week 4) +""" + +import asyncio +import logging +from datetime import datetime, timedelta, timezone +from typing import Optional + +from sqlalchemy import text +from sqlalchemy.orm import Session + +from backend.config import settings +from backend.database import SessionLocal +from backend.models.db_models import Line, MachineStateEnum, OEESnapshot +from backend.services import state_poller +from backend.services.plc_client import PLCOfflineError, fetch_io, item_count + +logger = logging.getLogger(__name__) + +# ── Constants ───────────────────────────────────────────────────────────────── + +TICK_SEC = 60 # OEE snapshot interval +OEE_ALERT_THRESHOLD = 0.60 # alert below this +OEE_ALERT_TICKS = 30 # consecutive ticks below threshold before alert (30 min) +DEFAULT_IDEAL_CYCLE_SEC = 1.0 + +# ── In-memory state ─────────────────────────────────────────────────────────── + +# {line_id: int} — ItemCount at previous tick (cumulative register) +_last_count: dict = {} + +# {line_id: int} — consecutive ticks below OEE_ALERT_THRESHOLD +_low_oee_ticks: dict = {} + +_stop_event: Optional[asyncio.Event] = None + + +# ── OEE math (pure functions) ───────────────────────────────────────────────── + +def _clamp(value: float, lo: float = 0.0, hi: float = 1.0) -> float: + return max(lo, min(hi, value)) + + +def compute_oee( + run_time_sec: int, + planned_time_sec: int, + total_count: int, + good_count: int, + ideal_cycle_sec: float, +) -> dict: + """Return dict with availability, performance, quality, oee, teep. + + All outputs clamped to [0.0, 1.0]. + Division-by-zero safe — returns 0.0 on any degenerate input. + """ + if planned_time_sec <= 0: + return dict(availability=0.0, performance=0.0, quality=0.0, oee=0.0, teep=0.0) + + availability = _clamp(run_time_sec / planned_time_sec) + + if run_time_sec > 0 and total_count > 0: + performance = _clamp((ideal_cycle_sec * total_count) / run_time_sec) + else: + performance = 0.0 + + quality = _clamp(good_count / total_count) if total_count > 0 else 0.0 + + oee = availability * performance * quality + teep = oee # utilisation = 1.0 until Week 4 adds schedule + + return dict( + availability=round(availability, 4), + performance=round(performance, 4), + quality=round(quality, 4), + oee=round(oee, 4), + teep=round(teep, 4), + ) + + +# ── DB helpers ──────────────────────────────────────────────────────────────── + +def _run_time_in_window(db: Session, line_id: str, window_sec: int) -> int: + """Sum seconds spent in RUNNING state in the last window_sec seconds.""" + cutoff = datetime.now(timezone.utc) - timedelta(seconds=window_sec) + rows = db.execute( + text(""" + SELECT + GREATEST(started_at, :cutoff) AS seg_start, + COALESCE(ended_at, NOW()) AS seg_end + FROM machine_states + WHERE line_id = :lid + AND state = 'RUNNING' + AND COALESCE(ended_at, NOW()) >= :cutoff + """), + {"lid": line_id, "cutoff": cutoff}, + ).fetchall() + total = 0 + for row in rows: + seg_start, seg_end = row[0], row[1] + if seg_end > seg_start: + total += int((seg_end - seg_start).total_seconds()) + return min(total, window_sec) + + +def _active_ideal_cycle(db: Session, line_id: str) -> float: + """Return ideal_cycle_sec from the active work order's product, or default.""" + row = db.execute( + text(""" + SELECT p.ideal_cycle_sec + FROM work_orders wo + JOIN products p ON p.id = wo.product_id + WHERE wo.line_id = :lid AND wo.status = 'ACTIVE' + LIMIT 1 + """), + {"lid": line_id}, + ).fetchone() + return float(row[0]) if row else DEFAULT_IDEAL_CYCLE_SEC + + +def _active_work_order_id(db: Session, line_id: str) -> Optional[str]: + row = db.execute( + text("SELECT id FROM work_orders WHERE line_id=:lid AND status='ACTIVE' LIMIT 1"), + {"lid": line_id}, + ).fetchone() + return str(row[0]) if row else None + + +def _write_snapshot(db: Session, line_id: str, work_order_id: Optional[str], + run_time_sec: int, planned_time_sec: int, + total_count: int, good_count: int, + ideal_cycle_sec: float, metrics: dict) -> None: + snap = OEESnapshot( + line_id=line_id, + work_order_id=work_order_id, + ts=datetime.now(timezone.utc), + run_time_sec=run_time_sec, + planned_time_sec=planned_time_sec, + total_count=total_count, + good_count=good_count, + ideal_cycle_sec=ideal_cycle_sec, + availability=metrics["availability"], + performance=metrics["performance"], + quality=metrics["quality"], + oee=metrics["oee"], + teep=metrics["teep"], + ) + db.add(snap) + + +# ── Alert logic ─────────────────────────────────────────────────────────────── + +def _check_alert(line_name: str, line_id: str, oee: float) -> None: + if oee < OEE_ALERT_THRESHOLD: + _low_oee_ticks[line_id] = _low_oee_ticks.get(line_id, 0) + 1 + if _low_oee_ticks[line_id] == OEE_ALERT_TICKS: + logger.warning( + "ALERT Line %-12s OEE %.1f%% below %.0f%% for %d consecutive minutes", + line_name, oee * 100, OEE_ALERT_THRESHOLD * 100, OEE_ALERT_TICKS, + ) + else: + _low_oee_ticks[line_id] = 0 + + +# ── Per-line tick ───────────────────────────────────────────────────────────── + +async def _tick_line(line: Line) -> None: + """Compute and persist one OEE snapshot for a single line.""" + line_id = str(line.id) + + # Skip if OFFLINE — no meaningful OEE + current_state = state_poller.get_cached_state(line_id) + is_offline = (current_state == MachineStateEnum.OFFLINE or current_state is None) + planned_time_sec = 0 if is_offline else TICK_SEC + + # Fetch current item count (non-blocking) + total_count = 0 + try: + io_data = await fetch_io(settings.plc_modbus_url) + current_count = item_count(io_data) + prev = _last_count.get(line_id) + if prev is None: + # First tick — seed without counting; no items to credit yet + _last_count[line_id] = current_count + logger.debug("Line %s — seeding count at %d", line.name, current_count) + return + delta = max(0, current_count - prev) + _last_count[line_id] = current_count + total_count = delta + except PLCOfflineError: + # PLC unreachable — keep count at 0 for this tick + pass + + good_count = total_count # no reject tracking until Week 5 + + # DB work in thread + def _db_work(): + db = SessionLocal() + try: + run_time_sec = _run_time_in_window(db, line_id, TICK_SEC) + ideal_cycle_sec = _active_ideal_cycle(db, line_id) + wo_id = _active_work_order_id(db, line_id) + + metrics = compute_oee( + run_time_sec=run_time_sec, + planned_time_sec=planned_time_sec, + total_count=total_count, + good_count=good_count, + ideal_cycle_sec=ideal_cycle_sec, + ) + _write_snapshot(db, line_id, wo_id, run_time_sec, planned_time_sec, + total_count, good_count, ideal_cycle_sec, metrics) + db.commit() + return metrics, ideal_cycle_sec, run_time_sec + except Exception: + db.rollback() + logger.exception("OEE snapshot write failed for line %s", line.name) + return None, DEFAULT_IDEAL_CYCLE_SEC, 0 + finally: + db.close() + + result = await asyncio.to_thread(_db_work) + metrics, ideal_cycle_sec, run_time_sec = result + if metrics: + _check_alert(line.name, line_id, metrics["oee"]) + logger.info( + "OEE %-12s A=%.2f P=%.2f Q=%.2f OEE=%.2f " + "run=%ds count=%d cycle=%.1fs", + line.name, + metrics["availability"], metrics["performance"], + metrics["quality"], metrics["oee"], + run_time_sec, total_count, ideal_cycle_sec, + ) + + +# ── Tick loop ───────────────────────────────────────────────────────────────── + +async def run(tick_sec: int = TICK_SEC) -> None: + """Background OEE calculator — one tick per line every tick_sec seconds.""" + global _stop_event + _stop_event = asyncio.Event() + logger.info("OEE calculator starting (tick=%ds, threshold=%.0f%%)", + tick_sec, OEE_ALERT_THRESHOLD * 100) + + tick = 0 + lines: list = [] + + while not _stop_event.is_set(): + if tick % 60 == 0: + db = SessionLocal() + try: + lines = db.query(Line).all() + finally: + db.close() + + if lines: + await asyncio.gather( + *[_tick_line(line) for line in lines], + return_exceptions=True, + ) + + tick += 1 + try: + await asyncio.wait_for(_stop_event.wait(), timeout=tick_sec) + except asyncio.TimeoutError: + pass + + +def stop() -> None: + if _stop_event: + _stop_event.set() + + +# ── Public read API (used by routes) ───────────────────────────────────────── + +def get_low_oee_ticks(line_id: str) -> int: + """How many consecutive ticks has this line been below threshold.""" + return _low_oee_ticks.get(line_id, 0) diff --git a/services/mes/tests/test_oee.py b/services/mes/tests/test_oee.py new file mode 100644 index 0000000..b78bfd5 --- /dev/null +++ b/services/mes/tests/test_oee.py @@ -0,0 +1,183 @@ +"""Week 3 tests — OEE computation and calculator logic. + +All tests use unittest.mock — no live DB or plc-modbus required. +Run: pytest tests/test_oee.py -v + +Acceptance Criteria (PRD-MES-CORE.md §10): + AC#3 OEE calculates correctly (known inputs → expected output ± 0.01) + AC#5 TEEP reported alongside OEE +""" + +import pytest +from backend.services.oee_calculator import ( + OEE_ALERT_TICKS, + OEE_ALERT_THRESHOLD, + _check_alert, + _low_oee_ticks, + compute_oee, +) + + +# ── compute_oee pure function ───────────────────────────────────────────────── + +class TestComputeOEE: + def test_perfect_oee(self): + """100% on all three components → OEE = 1.0""" + m = compute_oee( + run_time_sec=60, + planned_time_sec=60, + total_count=60, + good_count=60, + ideal_cycle_sec=1.0, + ) + assert m["availability"] == pytest.approx(1.0, abs=0.01) + assert m["performance"] == pytest.approx(1.0, abs=0.01) + assert m["quality"] == pytest.approx(1.0, abs=0.01) + assert m["oee"] == pytest.approx(1.0, abs=0.01) + + def test_typical_factory_oee(self): + """Walker Reynolds benchmark: typical factory ~55% OEE. + + Inputs: + planned=60s, run=48s → A=0.80 + count=40 @ 1s/part, run=48s → P=40/48=0.833 + good=38/40 → Q=0.95 + OEE = 0.80 × 0.833 × 0.95 ≈ 0.633 + """ + m = compute_oee( + run_time_sec=48, + planned_time_sec=60, + total_count=40, + good_count=38, + ideal_cycle_sec=1.0, + ) + assert m["availability"] == pytest.approx(0.80, abs=0.01) + assert m["performance"] == pytest.approx(0.833, abs=0.01) + assert m["quality"] == pytest.approx(0.95, abs=0.01) + assert m["oee"] == pytest.approx(0.633, abs=0.01) + assert "teep" in m + + def test_zero_planned_time_returns_zeros(self): + """OFFLINE line — planned_time=0 → all zeros, no division error.""" + m = compute_oee( + run_time_sec=0, + planned_time_sec=0, + total_count=0, + good_count=0, + ideal_cycle_sec=1.0, + ) + assert m["oee"] == 0.0 + assert m["availability"] == 0.0 + + def test_zero_count_returns_zero_performance_and_quality(self): + """Line running but no parts produced — P and Q are 0.""" + m = compute_oee( + run_time_sec=60, + planned_time_sec=60, + total_count=0, + good_count=0, + ideal_cycle_sec=1.0, + ) + assert m["availability"] == pytest.approx(1.0, abs=0.01) + assert m["performance"] == 0.0 + assert m["quality"] == 0.0 + assert m["oee"] == 0.0 + + def test_performance_clamped_to_1(self): + """Items produced faster than ideal → Performance capped at 1.0.""" + m = compute_oee( + run_time_sec=60, + planned_time_sec=60, + total_count=120, # twice the ideal rate + good_count=120, + ideal_cycle_sec=1.0, + ) + assert m["performance"] == pytest.approx(1.0, abs=0.01) + assert m["oee"] == pytest.approx(1.0, abs=0.01) + + def test_availability_clamped_to_1(self): + """run_time > planned_time (clock skew) → clamped to 1.0.""" + m = compute_oee( + run_time_sec=65, + planned_time_sec=60, + total_count=60, + good_count=60, + ideal_cycle_sec=1.0, + ) + assert m["availability"] == pytest.approx(1.0, abs=0.01) + + def test_teep_equals_oee_without_schedule(self): + """Until schedules are wired (Week 4), TEEP == OEE.""" + m = compute_oee(60, 60, 60, 60, 1.0) + assert m["teep"] == pytest.approx(m["oee"], abs=0.001) + + def test_slow_cycle_reduces_performance(self): + """Ideal=1s, actual=2s/part → P=0.5""" + m = compute_oee( + run_time_sec=60, + planned_time_sec=60, + total_count=30, # 30 parts in 60s = 2s/part + good_count=30, + ideal_cycle_sec=1.0, + ) + assert m["performance"] == pytest.approx(0.50, abs=0.01) + assert m["oee"] == pytest.approx(0.50, abs=0.01) + + def test_world_class_oee_benchmark(self): + """Walker benchmark: world-class ≥ 85%""" + m = compute_oee( + run_time_sec=55, + planned_time_sec=60, + total_count=54, + good_count=54, + ideal_cycle_sec=1.0, + ) + assert m["oee"] >= 0.85 + + def test_output_rounded_to_4_decimals(self): + m = compute_oee(48, 60, 40, 38, 1.0) + for key in ["availability", "performance", "quality", "oee", "teep"]: + val = m[key] + assert val == round(val, 4), f"{key} not rounded: {val}" + + +# ── Alert logic ─────────────────────────────────────────────────────────────── + +class TestAlertLogic: + def setup_method(self): + _low_oee_ticks.clear() + + def test_counter_increments_below_threshold(self): + _check_alert("Line-1", "lid1", OEE_ALERT_THRESHOLD - 0.01) + assert _low_oee_ticks["lid1"] == 1 + + def test_counter_resets_above_threshold(self): + _low_oee_ticks["lid2"] = 10 + _check_alert("Line-2", "lid2", OEE_ALERT_THRESHOLD + 0.01) + assert _low_oee_ticks["lid2"] == 0 + + def test_alert_fires_at_threshold_tick(self, caplog): + import logging + _low_oee_ticks["lid3"] = OEE_ALERT_TICKS - 1 + with caplog.at_level(logging.WARNING): + _check_alert("Line-3", "lid3", 0.40) + assert "ALERT" in caplog.text + assert _low_oee_ticks["lid3"] == OEE_ALERT_TICKS + + def test_no_alert_before_threshold_tick(self, caplog): + import logging + _low_oee_ticks["lid4"] = OEE_ALERT_TICKS - 2 + with caplog.at_level(logging.WARNING): + _check_alert("Line-4", "lid4", 0.40) + assert "ALERT" not in caplog.text + + +# ── OEE Threshold constants ─────────────────────────────────────────────────── + +class TestConstants: + def test_alert_threshold_is_60_percent(self): + assert OEE_ALERT_THRESHOLD == 0.60 + + def test_alert_ticks_is_30(self): + """30 ticks × 60s = 30 minutes before alert fires.""" + assert OEE_ALERT_TICKS == 30