From dd362f0c28a0ba4921d98b2ca1294f220f710a23 Mon Sep 17 00:00:00 2001 From: CharlieNode Date: Thu, 16 Apr 2026 00:29:26 -0400 Subject: [PATCH 1/2] =?UTF-8?q?feat(mes):=20Week=203=20=E2=80=94=20OEE=20c?= =?UTF-8?q?alculator=20service=20(16=20tests,=2048=20total=20pass)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds OEE computation engine, 60-second tick loop, fleet endpoints: - compute_oee() pure function: A×P×Q with clamp/rounding (PRD §10 AC#3, AC#5) - 30-tick alert at OEE < 60% per line (OEE_ALERT_TICKS=30) - ItemCount delta tracking from HR100 (cumulative register) - run_time_in_window() SQL: sums RUNNING seconds in last tick window - 4 new endpoints: /oee, /oee/history, /oee/summary, /kpis - oee_calculator background task wired into FastAPI lifespan - 16 new tests, zero regressions (48/48 suite) Closes MIRA#321 Generated with [Claude Code](https://claude.ai/code) via [Happy](https://happy.engineering) Co-Authored-By: Claude Co-Authored-By: Happy --- PLAN.md | 92 ++---- services/mes/backend/config.py | 5 +- services/mes/backend/main.py | 35 ++- services/mes/backend/routes/oee.py | 225 ++++++++++++++ .../mes/backend/services/oee_calculator.py | 293 ++++++++++++++++++ services/mes/tests/test_oee.py | 183 +++++++++++ 6 files changed, 750 insertions(+), 83 deletions(-) create mode 100644 services/mes/backend/routes/oee.py create mode 100644 services/mes/backend/services/oee_calculator.py create mode 100644 services/mes/tests/test_oee.py diff --git a/PLAN.md b/PLAN.md index bf51cca..c2204d2 100644 --- a/PLAN.md +++ b/PLAN.md @@ -1,86 +1,36 @@ -# PLAN: MES Core — Week 2 (Modbus Machine State Reader) +# PLAN: MES Core — Week 3 (OEE Calculator Service) -**Branch:** `feat/mes-week2-state-reader` -**Issue:** Mikecranesync/MIRA#320 +**Branch:** `feat/mes-week3-oee-calculator` +**Issue:** Mikecranesync/MIRA#321 **PRD:** `docs/PRD-MES-CORE.md` -**Date:** 2026-04-15 -**Depends on:** Week 1 (feat/mes-week1-db-schema) merged +**Date:** 2026-04-16 +**Depends on:** Weeks 1+2 merged to main ✓ --- ## Objective -Build the machine state reader: a background poller that reads the plc-modbus HTTP API every 5 seconds per configured line, detects state transitions (RUNNING/DOWN/IDLE/OFFLINE), writes them to `machine_states`, and exposes `GET /api/mes/lines` and `GET /api/mes/lines/{id}/state` REST endpoints. +60-second tick OEE calculator. Reads ItemCount delta from plc-modbus, +computes Availability/Performance/Quality/OEE/TEEP from machine_states, +writes to oee_snapshots, exposes REST endpoints, and alerts when OEE < 60%. -## Affected Files +## OEE Formula -**New:** -- `services/mes/backend/services/__init__.py` -- `services/mes/backend/services/plc_client.py` — async HTTP client wrapping plc-modbus -- `services/mes/backend/services/state_machine.py` — pure state detection from IO snapshot -- `services/mes/backend/services/state_poller.py` — asyncio background poll loop -- `services/mes/backend/routes/lines.py` — GET /api/mes/lines, GET /lines/{id}/state -- `services/mes/tests/test_machine_states.py` — 10 unit tests, all mocked + Availability = run_time_sec / planned_time_sec + Performance = (ideal_cycle_sec x total_count) / max(run_time_sec, 1) + Quality = good_count / max(total_count, 1) + OEE = Availability x Performance x Quality + TEEP = OEE (no schedule yet; Week 4 wires utilization) -**Modified:** -- `services/mes/requirements.txt` — add httpx -- `services/mes/backend/config.py` — add plc_modbus_url setting -- `services/mes/backend/main.py` — wire poller into lifespan, add lines router -- `docker-compose.yml` — add PLC_MODBUS_URL env to mes container +Clamp all values to [0.0, 1.0]. -## Approach +## New Endpoints -1. `plc_client.py` — thin async wrapper around `GET /api/plc/io` (httpx). Raises `PLCOfflineError` on timeout/connection failure so caller can set OFFLINE state. -2. `state_machine.py` — pure function `detect_state(io_data)` → `(MachineStateEnum, reason_code | None)`. Derived from `VFDStatus` and `ErrorCode` registers. No DB or network calls — fully testable without mocks. -3. `state_poller.py` — asyncio task, one iteration per line every 5s. Maintains in-memory cache to avoid DB reads on every tick. Writes to `machine_states` only on transition. -4. `lines.py` routes — two endpoints: list all lines (from DB), get current state (from in-memory cache + last DB row). -5. `main.py` lifespan — start poller task on startup, cancel on shutdown. - -State transition write: close open row (`ended_at = NOW()`), insert new row. - -## State Machine - -``` -IO: VFDStatus=1, ErrorCode=0 → RUNNING -IO: VFDStatus=2 OR ErrorCode>0 → DOWN (reason_code from ErrorCode map) -IO: VFDStatus=0, ErrorCode=0 → IDLE -HTTP failure / timeout → OFFLINE -``` - -## ErrorCode → reason_code map - -```python -{1: "OVERLOAD", 2: "OVERHEAT", 3: "SENSOR_FAIL", 4: "JAM", 7: "E_STOP"} -``` - -## Risks - -- plc-modbus in mock mode returns VFDStatus=0 at rest — poller sees IDLE immediately (expected) -- Multiple lines share one plc-modbus service currently — same io_data, different `line_id` rows + GET /api/mes/lines/{id}/oee + GET /api/mes/lines/{id}/oee/history?hours=8 + GET /api/mes/oee/summary + GET /api/mes/kpis ## Rollback -```bash -git checkout feat/mes-week1-db-schema -``` - -## Verification Steps - -```bash -# Unit tests (no docker needed) -cd services/mes && pytest tests/test_machine_states.py -v - -# Integration: start stack, check state endpoint -docker compose up mes-db mes plc-modbus -d -curl localhost:8300/api/mes/lines -curl localhost:8300/api/mes/lines//state - -# Inject a fault and verify DB transition -curl -X POST localhost:8001/api/plc/mock/fault -H "Content-Type: application/json" -d '{"fault_type":"jam"}' -sleep 8 -curl localhost:8300/api/mes/lines//state # should show DOWN / JAM -``` - -## Note on Active Focus Window - -Explicitly authorized by Mike (2026-04-15 session). + git checkout main diff --git a/services/mes/backend/config.py b/services/mes/backend/config.py index f76f6b6..a563e8e 100644 --- a/services/mes/backend/config.py +++ b/services/mes/backend/config.py @@ -24,7 +24,10 @@ class Settings(BaseSettings): # Polling interval in seconds (default 5, set lower in tests) plc_poll_interval_sec: int = 5 - # Set True to skip poller startup (useful in unit tests) + # OEE calculator tick interval in seconds (default 60) + oee_tick_sec: int = 60 + + # Set True to skip background task startup (useful in unit tests) plc_use_mock: bool = False diff --git a/services/mes/backend/main.py b/services/mes/backend/main.py index 477dbfc..6240a6c 100644 --- a/services/mes/backend/main.py +++ b/services/mes/backend/main.py @@ -1,12 +1,14 @@ """FactoryLM MES API — FastAPI entry point. Lifespan: - startup → seed state cache, launch background state poller - shutdown → signal poller to stop cleanly + startup → launch state poller + OEE calculator background tasks + shutdown → stop both tasks cleanly Routes (cumulative by week): Week 1: /api/health Week 2: /api/mes/lines, /api/mes/lines/{id}/state + Week 3: /api/mes/lines/{id}/oee, /api/mes/lines/{id}/oee/history + /api/mes/oee/summary, /api/mes/kpis """ import asyncio @@ -19,7 +21,8 @@ from backend.config import settings from backend.routes.health import router as health_router from backend.routes.lines import router as lines_router -from backend.services import state_poller +from backend.routes.oee import router as oee_router +from backend.services import oee_calculator, state_poller logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -30,25 +33,34 @@ async def lifespan(app: FastAPI): db_host = settings.database_url.split("@")[-1] logger.info("MES service starting — DB: %s PLC: %s", db_host, settings.plc_modbus_url) - poller_task = None + poller_task = oee_task = None + if not settings.plc_use_mock: poller_task = asyncio.create_task( state_poller.run(poll_interval_sec=settings.plc_poll_interval_sec), name="state_poller", ) - logger.info("State poller started (interval=%ds)", settings.plc_poll_interval_sec) + oee_task = asyncio.create_task( + oee_calculator.run(tick_sec=settings.oee_tick_sec), + name="oee_calculator", + ) + logger.info( + "Background tasks started — poller=%ds oee_tick=%ds", + settings.plc_poll_interval_sec, settings.oee_tick_sec, + ) else: - logger.info("PLC mock mode — state poller disabled") + logger.info("PLC mock mode — background tasks disabled") yield logger.info("MES service shutting down") - if poller_task: - state_poller.stop() + state_poller.stop() + oee_calculator.stop() + for task in [t for t in [poller_task, oee_task] if t]: try: - await asyncio.wait_for(poller_task, timeout=8.0) - except asyncio.TimeoutError: - poller_task.cancel() + await asyncio.wait_for(task, timeout=8.0) + except (asyncio.TimeoutError, asyncio.CancelledError): + task.cancel() app = FastAPI( @@ -67,6 +79,7 @@ async def lifespan(app: FastAPI): app.include_router(health_router, prefix=settings.api_prefix) app.include_router(lines_router, prefix=settings.api_prefix) +app.include_router(oee_router, prefix=settings.api_prefix) if __name__ == "__main__": diff --git a/services/mes/backend/routes/oee.py b/services/mes/backend/routes/oee.py new file mode 100644 index 0000000..75d882e --- /dev/null +++ b/services/mes/backend/routes/oee.py @@ -0,0 +1,225 @@ +"""OEE routes — snapshots, history, fleet summary, KPIs. + +Week 3 endpoints: + GET /api/mes/lines/{id}/oee + GET /api/mes/lines/{id}/oee/history + GET /api/mes/oee/summary + GET /api/mes/kpis +""" + +import logging +from datetime import datetime, timedelta, timezone +from typing import Optional + +from fastapi import APIRouter, Depends, HTTPException, Query +from pydantic import BaseModel +from sqlalchemy import text +from sqlalchemy.orm import Session + +from backend.database import get_db +from backend.models.db_models import Line, MachineStateEnum, OEESnapshot +from backend.services import oee_calculator, state_poller + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/mes", tags=["oee"]) + + +# ── Response models ─────────────────────────────────────────────────────────── + +class OEESnapshotResponse(BaseModel): + line_id: str + line_name: str + ts: datetime + availability: float + performance: float + quality: float + oee: float + teep: Optional[float] + run_time_sec: int + planned_time_sec: int + total_count: int + good_count: int + ideal_cycle_sec: float + low_oee_ticks: int # consecutive ticks below 60% (alert indicator) + + +class OEEHistoryItem(BaseModel): + ts: datetime + availability: float + performance: float + quality: float + oee: float + total_count: int + run_time_sec: int + + +class OEESummaryItem(BaseModel): + line_id: str + line_name: str + oee: float + availability: float + performance: float + quality: float + teep: Optional[float] + run_state: str + ts: Optional[datetime] + alert: bool # True if low_oee_ticks >= 30 + + +class KPIResponse(BaseModel): + fleet_oee: float # average OEE across all lines (latest tick) + downtime_minutes_today: int # total DOWN + OFFLINE minutes today, all lines + lines_in_alert: int # lines with OEE < 60% for 30+ min + snapshot_ts: datetime + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _latest_snapshot(db: Session, line_id: str) -> Optional[OEESnapshot]: + return ( + db.query(OEESnapshot) + .filter(OEESnapshot.line_id == line_id) + .order_by(OEESnapshot.ts.desc()) + .first() + ) + + +def _downtime_minutes_today(db: Session, line_id: str) -> int: + """Sum minutes in DOWN or OFFLINE state since midnight UTC today.""" + midnight = datetime.now(timezone.utc).replace( + hour=0, minute=0, second=0, microsecond=0 + ) + row = db.execute( + text(""" + SELECT COALESCE(SUM( + EXTRACT(EPOCH FROM ( + COALESCE(ended_at, NOW()) - GREATEST(started_at, :midnight) + )) + ), 0) / 60 + FROM machine_states + WHERE line_id = :lid + AND state IN ('DOWN', 'OFFLINE') + AND COALESCE(ended_at, NOW()) >= :midnight + """), + {"lid": line_id, "midnight": midnight}, + ).scalar() + return int(row or 0) + + +# ── Endpoints ───────────────────────────────────────────────────────────────── + +@router.get("/lines/{line_id}/oee", response_model=OEESnapshotResponse) +def get_line_oee(line_id: str, db: Session = Depends(get_db)): + """Latest OEE snapshot for one line.""" + line = db.query(Line).filter(Line.id == line_id).first() + if not line: + raise HTTPException(status_code=404, detail=f"Line {line_id} not found") + + snap = _latest_snapshot(db, line_id) + if not snap: + raise HTTPException( + status_code=404, + detail="No OEE snapshots yet — calculator ticks every 60s", + ) + + return OEESnapshotResponse( + line_id=line_id, + line_name=line.name, + ts=snap.ts, + availability=snap.availability, + performance=snap.performance, + quality=snap.quality, + oee=snap.oee, + teep=snap.teep, + run_time_sec=snap.run_time_sec, + planned_time_sec=snap.planned_time_sec, + total_count=snap.total_count, + good_count=snap.good_count, + ideal_cycle_sec=snap.ideal_cycle_sec, + low_oee_ticks=oee_calculator.get_low_oee_ticks(line_id), + ) + + +@router.get("/lines/{line_id}/oee/history", response_model=list[OEEHistoryItem]) +def get_line_oee_history( + line_id: str, + hours: int = Query(default=8, ge=1, le=72), + db: Session = Depends(get_db), +): + """OEE time-series for one line over the last N hours (default 8).""" + line = db.query(Line).filter(Line.id == line_id).first() + if not line: + raise HTTPException(status_code=404, detail=f"Line {line_id} not found") + + since = datetime.now(timezone.utc) - timedelta(hours=hours) + snaps = ( + db.query(OEESnapshot) + .filter(OEESnapshot.line_id == line_id, OEESnapshot.ts >= since) + .order_by(OEESnapshot.ts.asc()) + .all() + ) + return [ + OEEHistoryItem( + ts=s.ts, + availability=s.availability, + performance=s.performance, + quality=s.quality, + oee=s.oee, + total_count=s.total_count, + run_time_sec=s.run_time_sec, + ) + for s in snaps + ] + + +@router.get("/oee/summary", response_model=list[OEESummaryItem]) +def get_oee_summary(db: Session = Depends(get_db)): + """Fleet OEE — latest snapshot for every configured line.""" + lines = db.query(Line).order_by(Line.name).all() + result = [] + for line in lines: + line_id = str(line.id) + snap = _latest_snapshot(db, line_id) + state = state_poller.get_cached_state(line_id) + low_ticks = oee_calculator.get_low_oee_ticks(line_id) + result.append(OEESummaryItem( + line_id=line_id, + line_name=line.name, + oee=snap.oee if snap else 0.0, + availability=snap.availability if snap else 0.0, + performance=snap.performance if snap else 0.0, + quality=snap.quality if snap else 0.0, + teep=snap.teep if snap else None, + run_state=state.value if state else MachineStateEnum.OFFLINE.value, + ts=snap.ts if snap else None, + alert=(low_ticks >= oee_calculator.OEE_ALERT_TICKS), + )) + return result + + +@router.get("/kpis", response_model=KPIResponse) +def get_kpis(db: Session = Depends(get_db)): + """Aggregate KPIs: fleet OEE, total downtime minutes today, alert count.""" + lines = db.query(Line).all() + oee_values = [] + total_downtime = 0 + alert_count = 0 + + for line in lines: + line_id = str(line.id) + snap = _latest_snapshot(db, line_id) + if snap: + oee_values.append(snap.oee) + total_downtime += _downtime_minutes_today(db, line_id) + if oee_calculator.get_low_oee_ticks(line_id) >= oee_calculator.OEE_ALERT_TICKS: + alert_count += 1 + + fleet_oee = round(sum(oee_values) / len(oee_values), 4) if oee_values else 0.0 + + return KPIResponse( + fleet_oee=fleet_oee, + downtime_minutes_today=total_downtime, + lines_in_alert=alert_count, + snapshot_ts=datetime.now(timezone.utc), + ) diff --git a/services/mes/backend/services/oee_calculator.py b/services/mes/backend/services/oee_calculator.py new file mode 100644 index 0000000..130c00d --- /dev/null +++ b/services/mes/backend/services/oee_calculator.py @@ -0,0 +1,293 @@ +"""OEE Calculator — 60-second tick per active line. + +Computes Availability, Performance, Quality, OEE, and TEEP from: + - ItemCount delta (HR100) via plc-modbus HTTP + - RUNNING-state duration from machine_states table + - Active work order's ideal_cycle_sec (default 1.0 until Week 4) + +Writes one oee_snapshots row per line per tick. +Raises an in-process alert when a line's OEE < 0.60 for 30+ consecutive +minutes (30 ticks at 60s each). + +Formula: + Availability = run_time_sec / planned_time_sec (clamped 0-1) + Performance = (ideal_cycle_sec * total_count) / run_time_sec (clamped 0-1) + Quality = good_count / total_count (clamped 0-1) + OEE = A * P * Q + TEEP = OEE (utilisation = 1.0 until schedule wired in Week 4) +""" + +import asyncio +import logging +from datetime import datetime, timedelta, timezone +from typing import Optional + +from sqlalchemy import text +from sqlalchemy.orm import Session + +from backend.config import settings +from backend.database import SessionLocal +from backend.models.db_models import Line, MachineStateEnum, OEESnapshot +from backend.services import state_poller +from backend.services.plc_client import PLCOfflineError, fetch_io, item_count + +logger = logging.getLogger(__name__) + +# ── Constants ───────────────────────────────────────────────────────────────── + +TICK_SEC = 60 # OEE snapshot interval +OEE_ALERT_THRESHOLD = 0.60 # alert below this +OEE_ALERT_TICKS = 30 # consecutive ticks below threshold before alert (30 min) +DEFAULT_IDEAL_CYCLE_SEC = 1.0 + +# ── In-memory state ─────────────────────────────────────────────────────────── + +# {line_id: int} — ItemCount at previous tick (cumulative register) +_last_count: dict = {} + +# {line_id: int} — consecutive ticks below OEE_ALERT_THRESHOLD +_low_oee_ticks: dict = {} + +_stop_event: Optional[asyncio.Event] = None + + +# ── OEE math (pure functions) ───────────────────────────────────────────────── + +def _clamp(value: float, lo: float = 0.0, hi: float = 1.0) -> float: + return max(lo, min(hi, value)) + + +def compute_oee( + run_time_sec: int, + planned_time_sec: int, + total_count: int, + good_count: int, + ideal_cycle_sec: float, +) -> dict: + """Return dict with availability, performance, quality, oee, teep. + + All outputs clamped to [0.0, 1.0]. + Division-by-zero safe — returns 0.0 on any degenerate input. + """ + if planned_time_sec <= 0: + return dict(availability=0.0, performance=0.0, quality=0.0, oee=0.0, teep=0.0) + + availability = _clamp(run_time_sec / planned_time_sec) + + if run_time_sec > 0 and total_count > 0: + performance = _clamp((ideal_cycle_sec * total_count) / run_time_sec) + else: + performance = 0.0 + + quality = _clamp(good_count / total_count) if total_count > 0 else 0.0 + + oee = availability * performance * quality + teep = oee # utilisation = 1.0 until Week 4 adds schedule + + return dict( + availability=round(availability, 4), + performance=round(performance, 4), + quality=round(quality, 4), + oee=round(oee, 4), + teep=round(teep, 4), + ) + + +# ── DB helpers ──────────────────────────────────────────────────────────────── + +def _run_time_in_window(db: Session, line_id: str, window_sec: int) -> int: + """Sum seconds spent in RUNNING state in the last window_sec seconds.""" + cutoff = datetime.now(timezone.utc) - timedelta(seconds=window_sec) + rows = db.execute( + text(""" + SELECT + GREATEST(started_at, :cutoff) AS seg_start, + COALESCE(ended_at, NOW()) AS seg_end + FROM machine_states + WHERE line_id = :lid + AND state = 'RUNNING' + AND COALESCE(ended_at, NOW()) >= :cutoff + """), + {"lid": line_id, "cutoff": cutoff}, + ).fetchall() + total = 0 + for row in rows: + seg_start, seg_end = row[0], row[1] + if seg_end > seg_start: + total += int((seg_end - seg_start).total_seconds()) + return min(total, window_sec) + + +def _active_ideal_cycle(db: Session, line_id: str) -> float: + """Return ideal_cycle_sec from the active work order's product, or default.""" + row = db.execute( + text(""" + SELECT p.ideal_cycle_sec + FROM work_orders wo + JOIN products p ON p.id = wo.product_id + WHERE wo.line_id = :lid AND wo.status = 'ACTIVE' + LIMIT 1 + """), + {"lid": line_id}, + ).fetchone() + return float(row[0]) if row else DEFAULT_IDEAL_CYCLE_SEC + + +def _active_work_order_id(db: Session, line_id: str) -> Optional[str]: + row = db.execute( + text("SELECT id FROM work_orders WHERE line_id=:lid AND status='ACTIVE' LIMIT 1"), + {"lid": line_id}, + ).fetchone() + return str(row[0]) if row else None + + +def _write_snapshot(db: Session, line_id: str, work_order_id: Optional[str], + run_time_sec: int, planned_time_sec: int, + total_count: int, good_count: int, + ideal_cycle_sec: float, metrics: dict) -> None: + snap = OEESnapshot( + line_id=line_id, + work_order_id=work_order_id, + ts=datetime.now(timezone.utc), + run_time_sec=run_time_sec, + planned_time_sec=planned_time_sec, + total_count=total_count, + good_count=good_count, + ideal_cycle_sec=ideal_cycle_sec, + availability=metrics["availability"], + performance=metrics["performance"], + quality=metrics["quality"], + oee=metrics["oee"], + teep=metrics["teep"], + ) + db.add(snap) + + +# ── Alert logic ─────────────────────────────────────────────────────────────── + +def _check_alert(line_name: str, line_id: str, oee: float) -> None: + if oee < OEE_ALERT_THRESHOLD: + _low_oee_ticks[line_id] = _low_oee_ticks.get(line_id, 0) + 1 + if _low_oee_ticks[line_id] == OEE_ALERT_TICKS: + logger.warning( + "ALERT Line %-12s OEE %.1f%% below %.0f%% for %d consecutive minutes", + line_name, oee * 100, OEE_ALERT_THRESHOLD * 100, OEE_ALERT_TICKS, + ) + else: + _low_oee_ticks[line_id] = 0 + + +# ── Per-line tick ───────────────────────────────────────────────────────────── + +async def _tick_line(line: Line) -> None: + """Compute and persist one OEE snapshot for a single line.""" + line_id = str(line.id) + + # Skip if OFFLINE — no meaningful OEE + current_state = state_poller.get_cached_state(line_id) + is_offline = (current_state == MachineStateEnum.OFFLINE or current_state is None) + planned_time_sec = 0 if is_offline else TICK_SEC + + # Fetch current item count (non-blocking) + total_count = 0 + try: + io_data = await fetch_io(settings.plc_modbus_url) + current_count = item_count(io_data) + prev = _last_count.get(line_id) + if prev is None: + # First tick — seed without counting; no items to credit yet + _last_count[line_id] = current_count + logger.debug("Line %s — seeding count at %d", line.name, current_count) + return + delta = max(0, current_count - prev) + _last_count[line_id] = current_count + total_count = delta + except PLCOfflineError: + # PLC unreachable — keep count at 0 for this tick + pass + + good_count = total_count # no reject tracking until Week 5 + + # DB work in thread + def _db_work(): + db = SessionLocal() + try: + run_time_sec = _run_time_in_window(db, line_id, TICK_SEC) + ideal_cycle_sec = _active_ideal_cycle(db, line_id) + wo_id = _active_work_order_id(db, line_id) + + metrics = compute_oee( + run_time_sec=run_time_sec, + planned_time_sec=planned_time_sec, + total_count=total_count, + good_count=good_count, + ideal_cycle_sec=ideal_cycle_sec, + ) + _write_snapshot(db, line_id, wo_id, run_time_sec, planned_time_sec, + total_count, good_count, ideal_cycle_sec, metrics) + db.commit() + return metrics, ideal_cycle_sec, run_time_sec + except Exception: + db.rollback() + logger.exception("OEE snapshot write failed for line %s", line.name) + return None, DEFAULT_IDEAL_CYCLE_SEC, 0 + finally: + db.close() + + result = await asyncio.to_thread(_db_work) + metrics, ideal_cycle_sec, run_time_sec = result + if metrics: + _check_alert(line.name, line_id, metrics["oee"]) + logger.info( + "OEE %-12s A=%.2f P=%.2f Q=%.2f OEE=%.2f " + "run=%ds count=%d cycle=%.1fs", + line.name, + metrics["availability"], metrics["performance"], + metrics["quality"], metrics["oee"], + run_time_sec, total_count, ideal_cycle_sec, + ) + + +# ── Tick loop ───────────────────────────────────────────────────────────────── + +async def run(tick_sec: int = TICK_SEC) -> None: + """Background OEE calculator — one tick per line every tick_sec seconds.""" + global _stop_event + _stop_event = asyncio.Event() + logger.info("OEE calculator starting (tick=%ds, threshold=%.0f%%)", + tick_sec, OEE_ALERT_THRESHOLD * 100) + + tick = 0 + lines: list = [] + + while not _stop_event.is_set(): + if tick % 60 == 0: + db = SessionLocal() + try: + lines = db.query(Line).all() + finally: + db.close() + + if lines: + await asyncio.gather( + *[_tick_line(line) for line in lines], + return_exceptions=True, + ) + + tick += 1 + try: + await asyncio.wait_for(_stop_event.wait(), timeout=tick_sec) + except asyncio.TimeoutError: + pass + + +def stop() -> None: + if _stop_event: + _stop_event.set() + + +# ── Public read API (used by routes) ───────────────────────────────────────── + +def get_low_oee_ticks(line_id: str) -> int: + """How many consecutive ticks has this line been below threshold.""" + return _low_oee_ticks.get(line_id, 0) diff --git a/services/mes/tests/test_oee.py b/services/mes/tests/test_oee.py new file mode 100644 index 0000000..b78bfd5 --- /dev/null +++ b/services/mes/tests/test_oee.py @@ -0,0 +1,183 @@ +"""Week 3 tests — OEE computation and calculator logic. + +All tests use unittest.mock — no live DB or plc-modbus required. +Run: pytest tests/test_oee.py -v + +Acceptance Criteria (PRD-MES-CORE.md §10): + AC#3 OEE calculates correctly (known inputs → expected output ± 0.01) + AC#5 TEEP reported alongside OEE +""" + +import pytest +from backend.services.oee_calculator import ( + OEE_ALERT_TICKS, + OEE_ALERT_THRESHOLD, + _check_alert, + _low_oee_ticks, + compute_oee, +) + + +# ── compute_oee pure function ───────────────────────────────────────────────── + +class TestComputeOEE: + def test_perfect_oee(self): + """100% on all three components → OEE = 1.0""" + m = compute_oee( + run_time_sec=60, + planned_time_sec=60, + total_count=60, + good_count=60, + ideal_cycle_sec=1.0, + ) + assert m["availability"] == pytest.approx(1.0, abs=0.01) + assert m["performance"] == pytest.approx(1.0, abs=0.01) + assert m["quality"] == pytest.approx(1.0, abs=0.01) + assert m["oee"] == pytest.approx(1.0, abs=0.01) + + def test_typical_factory_oee(self): + """Walker Reynolds benchmark: typical factory ~55% OEE. + + Inputs: + planned=60s, run=48s → A=0.80 + count=40 @ 1s/part, run=48s → P=40/48=0.833 + good=38/40 → Q=0.95 + OEE = 0.80 × 0.833 × 0.95 ≈ 0.633 + """ + m = compute_oee( + run_time_sec=48, + planned_time_sec=60, + total_count=40, + good_count=38, + ideal_cycle_sec=1.0, + ) + assert m["availability"] == pytest.approx(0.80, abs=0.01) + assert m["performance"] == pytest.approx(0.833, abs=0.01) + assert m["quality"] == pytest.approx(0.95, abs=0.01) + assert m["oee"] == pytest.approx(0.633, abs=0.01) + assert "teep" in m + + def test_zero_planned_time_returns_zeros(self): + """OFFLINE line — planned_time=0 → all zeros, no division error.""" + m = compute_oee( + run_time_sec=0, + planned_time_sec=0, + total_count=0, + good_count=0, + ideal_cycle_sec=1.0, + ) + assert m["oee"] == 0.0 + assert m["availability"] == 0.0 + + def test_zero_count_returns_zero_performance_and_quality(self): + """Line running but no parts produced — P and Q are 0.""" + m = compute_oee( + run_time_sec=60, + planned_time_sec=60, + total_count=0, + good_count=0, + ideal_cycle_sec=1.0, + ) + assert m["availability"] == pytest.approx(1.0, abs=0.01) + assert m["performance"] == 0.0 + assert m["quality"] == 0.0 + assert m["oee"] == 0.0 + + def test_performance_clamped_to_1(self): + """Items produced faster than ideal → Performance capped at 1.0.""" + m = compute_oee( + run_time_sec=60, + planned_time_sec=60, + total_count=120, # twice the ideal rate + good_count=120, + ideal_cycle_sec=1.0, + ) + assert m["performance"] == pytest.approx(1.0, abs=0.01) + assert m["oee"] == pytest.approx(1.0, abs=0.01) + + def test_availability_clamped_to_1(self): + """run_time > planned_time (clock skew) → clamped to 1.0.""" + m = compute_oee( + run_time_sec=65, + planned_time_sec=60, + total_count=60, + good_count=60, + ideal_cycle_sec=1.0, + ) + assert m["availability"] == pytest.approx(1.0, abs=0.01) + + def test_teep_equals_oee_without_schedule(self): + """Until schedules are wired (Week 4), TEEP == OEE.""" + m = compute_oee(60, 60, 60, 60, 1.0) + assert m["teep"] == pytest.approx(m["oee"], abs=0.001) + + def test_slow_cycle_reduces_performance(self): + """Ideal=1s, actual=2s/part → P=0.5""" + m = compute_oee( + run_time_sec=60, + planned_time_sec=60, + total_count=30, # 30 parts in 60s = 2s/part + good_count=30, + ideal_cycle_sec=1.0, + ) + assert m["performance"] == pytest.approx(0.50, abs=0.01) + assert m["oee"] == pytest.approx(0.50, abs=0.01) + + def test_world_class_oee_benchmark(self): + """Walker benchmark: world-class ≥ 85%""" + m = compute_oee( + run_time_sec=55, + planned_time_sec=60, + total_count=54, + good_count=54, + ideal_cycle_sec=1.0, + ) + assert m["oee"] >= 0.85 + + def test_output_rounded_to_4_decimals(self): + m = compute_oee(48, 60, 40, 38, 1.0) + for key in ["availability", "performance", "quality", "oee", "teep"]: + val = m[key] + assert val == round(val, 4), f"{key} not rounded: {val}" + + +# ── Alert logic ─────────────────────────────────────────────────────────────── + +class TestAlertLogic: + def setup_method(self): + _low_oee_ticks.clear() + + def test_counter_increments_below_threshold(self): + _check_alert("Line-1", "lid1", OEE_ALERT_THRESHOLD - 0.01) + assert _low_oee_ticks["lid1"] == 1 + + def test_counter_resets_above_threshold(self): + _low_oee_ticks["lid2"] = 10 + _check_alert("Line-2", "lid2", OEE_ALERT_THRESHOLD + 0.01) + assert _low_oee_ticks["lid2"] == 0 + + def test_alert_fires_at_threshold_tick(self, caplog): + import logging + _low_oee_ticks["lid3"] = OEE_ALERT_TICKS - 1 + with caplog.at_level(logging.WARNING): + _check_alert("Line-3", "lid3", 0.40) + assert "ALERT" in caplog.text + assert _low_oee_ticks["lid3"] == OEE_ALERT_TICKS + + def test_no_alert_before_threshold_tick(self, caplog): + import logging + _low_oee_ticks["lid4"] = OEE_ALERT_TICKS - 2 + with caplog.at_level(logging.WARNING): + _check_alert("Line-4", "lid4", 0.40) + assert "ALERT" not in caplog.text + + +# ── OEE Threshold constants ─────────────────────────────────────────────────── + +class TestConstants: + def test_alert_threshold_is_60_percent(self): + assert OEE_ALERT_THRESHOLD == 0.60 + + def test_alert_ticks_is_30(self): + """30 ticks × 60s = 30 minutes before alert fires.""" + assert OEE_ALERT_TICKS == 30 From 08b2bb00f8f64e94bb4605c3c4ca8bf06491f31f Mon Sep 17 00:00:00 2001 From: CharlieNode Date: Thu, 16 Apr 2026 00:59:35 -0400 Subject: [PATCH 2/2] =?UTF-8?q?feat(mes):=20Week=204=20=E2=80=94=20Work=20?= =?UTF-8?q?orders,=20products,=20schedule-aware=20TEEP=20(18=20tests,=2066?= =?UTF-8?q?=20total=20pass)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Work order CRUD + product management: - POST/GET /api/mes/products — create SKU with ideal_cycle_sec - POST /api/mes/work-orders — creates PENDING; validates line + product + unique order_number - GET /api/mes/work-orders — list with ?line_id= / ?status= filters - GET /api/mes/work-orders/{id} — detail - PATCH /api/mes/work-orders/{id}/status — enforced transitions (PENDING→ACTIVE→COMPLETE/CANCELLED) - 409 when second ACTIVE work order attempted on same line Schedule-aware TEEP: - compute_oee() gains utilisation param (default 1.0 — Week 3 backward compat) - TEEP = OEE × utilisation (clamped 0-1) - _active_utilisation() queries schedules table; falls back to 1.0 when no schedules exist 18 new tests, zero regressions (66/66 suite) Closes MIRA#322 Generated with [Claude Code](https://claude.ai/code) via [Happy](https://happy.engineering) Co-Authored-By: Claude Co-Authored-By: Happy --- PLAN.md | 96 +++++-- services/mes/backend/main.py | 12 +- services/mes/backend/routes/work_orders.py | 255 +++++++++++++++++ .../mes/backend/services/oee_calculator.py | 53 +++- services/mes/tests/test_work_orders.py | 267 ++++++++++++++++++ 5 files changed, 654 insertions(+), 29 deletions(-) create mode 100644 services/mes/backend/routes/work_orders.py create mode 100644 services/mes/tests/test_work_orders.py diff --git a/PLAN.md b/PLAN.md index c2204d2..8e45d7e 100644 --- a/PLAN.md +++ b/PLAN.md @@ -1,36 +1,92 @@ -# PLAN: MES Core — Week 3 (OEE Calculator Service) +# PLAN: MES Core — Week 4 (Work Orders + Scheduling + TEEP) -**Branch:** `feat/mes-week3-oee-calculator` -**Issue:** Mikecranesync/MIRA#321 +**Branch:** `feat/mes-week4-work-orders` +**Issue:** Mikecranesync/MIRA#322 **PRD:** `docs/PRD-MES-CORE.md` **Date:** 2026-04-16 -**Depends on:** Weeks 1+2 merged to main ✓ +**Depends on:** Week 3 (feat/mes-week3-oee-calculator) merged --- ## Objective -60-second tick OEE calculator. Reads ItemCount delta from plc-modbus, -computes Availability/Performance/Quality/OEE/TEEP from machine_states, -writes to oee_snapshots, exposes REST endpoints, and alerts when OEE < 60%. +Wire work orders into the MES: create/list/detail/transition endpoints, +expose schedule-aware TEEP (utilisation = scheduled_time / calendar_time), +and update Pydantic UDTs (LineDataType, CountDispatch, OEEDataType) to be +the standard payload shape across all MES responses. -## OEE Formula +## Affected Files - Availability = run_time_sec / planned_time_sec - Performance = (ideal_cycle_sec x total_count) / max(run_time_sec, 1) - Quality = good_count / max(total_count, 1) - OEE = Availability x Performance x Quality - TEEP = OEE (no schedule yet; Week 4 wires utilization) +**New:** +- `services/mes/backend/routes/work_orders.py` — CRUD + status transitions +- `services/mes/tests/test_work_orders.py` — unit tests (mocked DB) -Clamp all values to [0.0, 1.0]. +**Modified:** +- `services/mes/backend/models/mes_models.py` — finalise UDTs, add WorkOrder schemas +- `services/mes/backend/services/oee_calculator.py` — TEEP uses schedule utilisation +- `services/mes/backend/main.py` — include work_orders router +- `PLAN.md` — this file -## New Endpoints +--- + +## Approach + +### 1. Work Order Routes (`work_orders.py`) + +Four endpoints: + +| Method | Path | Action | +|--------|------|--------| +| POST | `/api/mes/work-orders` | Create — status defaults to PENDING | +| GET | `/api/mes/work-orders` | List — filter by `?line_id=` and/or `?status=` | +| GET | `/api/mes/work-orders/{id}` | Detail | +| PATCH | `/api/mes/work-orders/{id}/status` | Transition: PENDING→ACTIVE→COMPLETE / CANCELLED | + +Transition rules (enforced server-side): +- PENDING → ACTIVE (start the job) +- ACTIVE → COMPLETE (job done) +- ACTIVE → CANCELLED +- PENDING → CANCELLED + +One line can only have **one ACTIVE work order at a time** — enforced with a +409 Conflict response. + +### 2. Schedule-Aware TEEP + +TEEP = OEE × Utilisation +Utilisation = scheduled_minutes_in_period / calendar_minutes_in_period - GET /api/mes/lines/{id}/oee - GET /api/mes/lines/{id}/oee/history?hours=8 - GET /api/mes/oee/summary - GET /api/mes/kpis +`compute_oee()` gains an optional `utilisation` param (default 1.0, preserving +Week 3 behaviour). The tick loop queries the `schedules` table for the active +shift and passes the utilisation factor. + +Until schedules are seeded, utilisation stays 1.0 — no regression. + +### 3. Pydantic UDTs (mes_models.py) + +Finalise: +- `LineDataType` — full live status payload +- `CountDispatch` — part count event +- `OEEDataType` — OEE snapshot shape (matches DB + API) +- `WorkOrderCreate`, `WorkOrderResponse`, `WorkOrderStatusUpdate` + +--- + +## Risks + +- One-ACTIVE-per-line constraint must be checked atomically — use DB query + inside the same transaction, not an in-memory cache. +- `compute_oee()` signature change adds `utilisation` param — must be + keyword-only with a default so Week 3 callers need zero changes. ## Rollback - git checkout main +Revert this branch. No DB migrations needed — all tables were created in +Week 1. Work order data is additive. + +## Verification Steps + +1. `pytest tests/test_work_orders.py -v` — all new tests pass +2. `pytest tests/ -v` — full suite (48 + new) passes, zero regressions +3. Manually: POST work order → PATCH to ACTIVE → OEE tick uses correct ideal_cycle_sec +4. Manually: attempt second ACTIVE work order on same line → 409 diff --git a/services/mes/backend/main.py b/services/mes/backend/main.py index 6240a6c..89ac2c2 100644 --- a/services/mes/backend/main.py +++ b/services/mes/backend/main.py @@ -9,6 +9,10 @@ Week 2: /api/mes/lines, /api/mes/lines/{id}/state Week 3: /api/mes/lines/{id}/oee, /api/mes/lines/{id}/oee/history /api/mes/oee/summary, /api/mes/kpis + Week 4: /api/mes/products, /api/mes/products (POST/GET) + /api/mes/work-orders (POST/GET), /api/mes/work-orders/{id} (GET) + /api/mes/work-orders/{id}/status (PATCH) + Schedule-aware TEEP via schedules table """ import asyncio @@ -22,6 +26,7 @@ from backend.routes.health import router as health_router from backend.routes.lines import router as lines_router from backend.routes.oee import router as oee_router +from backend.routes.work_orders import router as work_orders_router from backend.services import oee_calculator, state_poller logging.basicConfig(level=logging.INFO) @@ -77,9 +82,10 @@ async def lifespan(app: FastAPI): allow_headers=["*"], ) -app.include_router(health_router, prefix=settings.api_prefix) -app.include_router(lines_router, prefix=settings.api_prefix) -app.include_router(oee_router, prefix=settings.api_prefix) +app.include_router(health_router, prefix=settings.api_prefix) +app.include_router(lines_router, prefix=settings.api_prefix) +app.include_router(oee_router, prefix=settings.api_prefix) +app.include_router(work_orders_router, prefix=settings.api_prefix) if __name__ == "__main__": diff --git a/services/mes/backend/routes/work_orders.py b/services/mes/backend/routes/work_orders.py new file mode 100644 index 0000000..fc35097 --- /dev/null +++ b/services/mes/backend/routes/work_orders.py @@ -0,0 +1,255 @@ +"""Work Order routes — CRUD + status transitions. + +Week 4 endpoints: + POST /api/mes/products — create a product (SKU + ideal cycle) + GET /api/mes/products — list all products + POST /api/mes/work-orders — create a work order + GET /api/mes/work-orders — list (filter ?line_id= ?status=) + GET /api/mes/work-orders/{id} — detail + PATCH /api/mes/work-orders/{id}/status — PENDING→ACTIVE→COMPLETE / CANCELLED + +Transition rules: + PENDING → ACTIVE (start the job) + ACTIVE → COMPLETE (job done) + ACTIVE → CANCELLED + PENDING → CANCELLED + +Constraint: one line can only have ONE ACTIVE work order at a time (409 on violation). +""" + +import logging +from datetime import datetime, timezone +from typing import Optional + +from fastapi import APIRouter, Depends, HTTPException, Query +from pydantic import BaseModel, Field +from sqlalchemy.orm import Session + +from backend.database import get_db +from backend.models.db_models import Line, Product, WorkOrder, WorkOrderStatus +from backend.models.mes_models import WorkOrderCreate, WorkOrderResponse + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/mes", tags=["work-orders"]) + + +# ── Valid status transitions ─────────────────────────────────────────────────── + +_ALLOWED_TRANSITIONS = { + WorkOrderStatus.PENDING: {WorkOrderStatus.ACTIVE, WorkOrderStatus.CANCELLED}, + WorkOrderStatus.ACTIVE: {WorkOrderStatus.COMPLETE, WorkOrderStatus.CANCELLED}, + WorkOrderStatus.PAUSED: {WorkOrderStatus.ACTIVE, WorkOrderStatus.CANCELLED}, + WorkOrderStatus.COMPLETE: set(), + WorkOrderStatus.CANCELLED: set(), +} + + +# ── Request / response models ────────────────────────────────────────────────── + +class ProductCreate(BaseModel): + sku: str + name: str + ideal_cycle_sec: float = Field(gt=0, description="Seconds per part at 100% performance") + description: Optional[str] = None + + +class ProductResponse(BaseModel): + id: str + sku: str + name: str + ideal_cycle_sec: float + description: Optional[str] = None + + model_config = {"from_attributes": True} + + +class WorkOrderStatusUpdate(BaseModel): + status: str # validated against allowed transitions at runtime + + +# ── Product endpoints ────────────────────────────────────────────────────────── + +@router.post("/products", response_model=ProductResponse, status_code=201) +def create_product(body: ProductCreate, db: Session = Depends(get_db)): + """Create a new product SKU with an ideal cycle time.""" + existing = db.query(Product).filter(Product.sku == body.sku).first() + if existing: + raise HTTPException(status_code=409, detail=f"Product SKU '{body.sku}' already exists") + + product = Product( + sku=body.sku, + name=body.name, + ideal_cycle_sec=body.ideal_cycle_sec, + description=body.description, + ) + db.add(product) + db.commit() + db.refresh(product) + logger.info("Product created: %s (%.2fs/part)", product.sku, product.ideal_cycle_sec) + return ProductResponse( + id=str(product.id), + sku=product.sku, + name=product.name, + ideal_cycle_sec=product.ideal_cycle_sec, + description=product.description, + ) + + +@router.get("/products", response_model=list[ProductResponse]) +def list_products(db: Session = Depends(get_db)): + """List all products.""" + products = db.query(Product).order_by(Product.sku).all() + return [ + ProductResponse( + id=str(p.id), + sku=p.sku, + name=p.name, + ideal_cycle_sec=p.ideal_cycle_sec, + description=p.description, + ) + for p in products + ] + + +# ── Work order endpoints ─────────────────────────────────────────────────────── + +@router.post("/work-orders", response_model=WorkOrderResponse, status_code=201) +def create_work_order(body: WorkOrderCreate, db: Session = Depends(get_db)): + """Create a new work order (status = PENDING).""" + # Validate line exists + line = db.query(Line).filter(Line.id == body.line_id).first() + if not line: + raise HTTPException(status_code=404, detail=f"Line '{body.line_id}' not found") + + # Validate product exists + product = db.query(Product).filter(Product.id == body.product_id).first() + if not product: + raise HTTPException(status_code=404, detail=f"Product '{body.product_id}' not found") + + # Unique order number + if db.query(WorkOrder).filter(WorkOrder.order_number == body.order_number).first(): + raise HTTPException(status_code=409, detail=f"Order number '{body.order_number}' already exists") + + wo = WorkOrder( + order_number=body.order_number, + product_id=body.product_id, + line_id=body.line_id, + target_qty=body.target_qty, + scheduled_start=body.scheduled_start, + notes=body.notes, + status=WorkOrderStatus.PENDING, + ) + db.add(wo) + db.commit() + db.refresh(wo) + logger.info("Work order created: %s → line=%s qty=%d", wo.order_number, line.name, wo.target_qty) + return _to_response(wo) + + +@router.get("/work-orders", response_model=list[WorkOrderResponse]) +def list_work_orders( + line_id: Optional[str] = Query(default=None), + status: Optional[str] = Query(default=None), + db: Session = Depends(get_db), +): + """List work orders, optionally filtered by line_id and/or status.""" + q = db.query(WorkOrder) + if line_id: + q = q.filter(WorkOrder.line_id == line_id) + if status: + try: + s = WorkOrderStatus(status.upper()) + except ValueError: + raise HTTPException(status_code=422, detail=f"Unknown status '{status}'") + q = q.filter(WorkOrder.status == s) + return [_to_response(wo) for wo in q.order_by(WorkOrder.created_at.desc()).all()] + + +@router.get("/work-orders/{work_order_id}", response_model=WorkOrderResponse) +def get_work_order(work_order_id: str, db: Session = Depends(get_db)): + """Get a single work order by ID.""" + wo = db.query(WorkOrder).filter(WorkOrder.id == work_order_id).first() + if not wo: + raise HTTPException(status_code=404, detail=f"Work order '{work_order_id}' not found") + return _to_response(wo) + + +@router.patch("/work-orders/{work_order_id}/status", response_model=WorkOrderResponse) +def update_work_order_status( + work_order_id: str, + body: WorkOrderStatusUpdate, + db: Session = Depends(get_db), +): + """Transition a work order's status. + + PENDING → ACTIVE → COMPLETE + PENDING → CANCELLED + ACTIVE → CANCELLED + """ + wo = db.query(WorkOrder).filter(WorkOrder.id == work_order_id).first() + if not wo: + raise HTTPException(status_code=404, detail=f"Work order '{work_order_id}' not found") + + # Parse and validate new status + try: + new_status = WorkOrderStatus(body.status.upper()) + except ValueError: + raise HTTPException(status_code=422, detail=f"Unknown status '{body.status}'") + + allowed = _ALLOWED_TRANSITIONS.get(wo.status, set()) + if new_status not in allowed: + raise HTTPException( + status_code=422, + detail=f"Cannot transition from {wo.status.value} to {new_status.value}. " + f"Allowed: {[s.value for s in allowed] or 'none'}", + ) + + # Enforce one-ACTIVE-per-line + if new_status == WorkOrderStatus.ACTIVE: + conflict = ( + db.query(WorkOrder) + .filter( + WorkOrder.line_id == wo.line_id, + WorkOrder.status == WorkOrderStatus.ACTIVE, + WorkOrder.id != wo.id, + ) + .first() + ) + if conflict: + raise HTTPException( + status_code=409, + detail=f"Line already has an active work order: {conflict.order_number}", + ) + + # Apply transition + now = datetime.now(timezone.utc) + if new_status == WorkOrderStatus.ACTIVE and wo.actual_start is None: + wo.actual_start = now + elif new_status in (WorkOrderStatus.COMPLETE, WorkOrderStatus.CANCELLED): + wo.actual_end = now + + wo.status = new_status + db.commit() + db.refresh(wo) + logger.info("Work order %s → %s", wo.order_number, new_status.value) + return _to_response(wo) + + +# ── Helper ──────────────────────────────────────────────────────────────────── + +def _to_response(wo: WorkOrder) -> WorkOrderResponse: + return WorkOrderResponse( + id=str(wo.id), + order_number=wo.order_number, + product_id=str(wo.product_id), + line_id=str(wo.line_id), + target_qty=wo.target_qty, + good_qty=wo.good_qty, + status=wo.status.value, + scheduled_start=wo.scheduled_start, + actual_start=wo.actual_start, + actual_end=wo.actual_end, + notes=wo.notes, + created_at=wo.created_at, + ) diff --git a/services/mes/backend/services/oee_calculator.py b/services/mes/backend/services/oee_calculator.py index 130c00d..4b21358 100644 --- a/services/mes/backend/services/oee_calculator.py +++ b/services/mes/backend/services/oee_calculator.py @@ -3,7 +3,8 @@ Computes Availability, Performance, Quality, OEE, and TEEP from: - ItemCount delta (HR100) via plc-modbus HTTP - RUNNING-state duration from machine_states table - - Active work order's ideal_cycle_sec (default 1.0 until Week 4) + - Active work order's ideal_cycle_sec (from products table, default 1.0) + - Schedule utilisation (from schedules table — Week 4) Writes one oee_snapshots row per line per tick. Raises an in-process alert when a line's OEE < 0.60 for 30+ consecutive @@ -14,7 +15,9 @@ Performance = (ideal_cycle_sec * total_count) / run_time_sec (clamped 0-1) Quality = good_count / total_count (clamped 0-1) OEE = A * P * Q - TEEP = OEE (utilisation = 1.0 until schedule wired in Week 4) + TEEP = OEE * utilisation + Utilisation = 1.0 if line is within a scheduled shift, else 0.0 + Falls back to 1.0 when no schedules exist (Week 3 compat) """ import asyncio @@ -63,11 +66,18 @@ def compute_oee( total_count: int, good_count: int, ideal_cycle_sec: float, + utilisation: float = 1.0, ) -> dict: """Return dict with availability, performance, quality, oee, teep. All outputs clamped to [0.0, 1.0]. Division-by-zero safe — returns 0.0 on any degenerate input. + + Args: + utilisation: Fraction of this period covered by a shift schedule. + 1.0 = fully scheduled (or no schedule defined). + 0.0 = outside any schedule. + TEEP = OEE * utilisation. """ if planned_time_sec <= 0: return dict(availability=0.0, performance=0.0, quality=0.0, oee=0.0, teep=0.0) @@ -81,8 +91,8 @@ def compute_oee( quality = _clamp(good_count / total_count) if total_count > 0 else 0.0 - oee = availability * performance * quality - teep = oee # utilisation = 1.0 until Week 4 adds schedule + oee = availability * performance * quality + teep = _clamp(oee * _clamp(utilisation)) return dict( availability=round(availability, 4), @@ -133,6 +143,35 @@ def _active_ideal_cycle(db: Session, line_id: str) -> float: return float(row[0]) if row else DEFAULT_IDEAL_CYCLE_SEC +def _active_utilisation(db: Session, line_id: str) -> float: + """Return schedule utilisation for the current tick window. + + - If no schedules exist for this line at all → 1.0 (Week 3 compat). + - If schedules exist but none cover NOW → 0.0 (outside scheduled time). + - If a schedule covers NOW → 1.0 (line is within its shift). + """ + # Check whether any schedules exist for this line + has_schedules = db.execute( + text("SELECT 1 FROM schedules WHERE line_id = :lid LIMIT 1"), + {"lid": line_id}, + ).fetchone() + if not has_schedules: + return 1.0 # no schedule defined — preserve Week 3 behaviour + + # Check if NOW falls inside an active schedule entry + in_schedule = db.execute( + text(""" + SELECT 1 FROM schedules + WHERE line_id = :lid + AND planned_start <= NOW() + AND planned_end >= NOW() + LIMIT 1 + """), + {"lid": line_id}, + ).fetchone() + return 1.0 if in_schedule else 0.0 + + def _active_work_order_id(db: Session, line_id: str) -> Optional[str]: row = db.execute( text("SELECT id FROM work_orders WHERE line_id=:lid AND status='ACTIVE' LIMIT 1"), @@ -212,9 +251,10 @@ async def _tick_line(line: Line) -> None: def _db_work(): db = SessionLocal() try: - run_time_sec = _run_time_in_window(db, line_id, TICK_SEC) + run_time_sec = _run_time_in_window(db, line_id, TICK_SEC) ideal_cycle_sec = _active_ideal_cycle(db, line_id) - wo_id = _active_work_order_id(db, line_id) + wo_id = _active_work_order_id(db, line_id) + utilisation = _active_utilisation(db, line_id) metrics = compute_oee( run_time_sec=run_time_sec, @@ -222,6 +262,7 @@ def _db_work(): total_count=total_count, good_count=good_count, ideal_cycle_sec=ideal_cycle_sec, + utilisation=utilisation, ) _write_snapshot(db, line_id, wo_id, run_time_sec, planned_time_sec, total_count, good_count, ideal_cycle_sec, metrics) diff --git a/services/mes/tests/test_work_orders.py b/services/mes/tests/test_work_orders.py new file mode 100644 index 0000000..b4ce219 --- /dev/null +++ b/services/mes/tests/test_work_orders.py @@ -0,0 +1,267 @@ +"""Week 4 tests — Work order CRUD, status transitions, and schedule-aware TEEP. + +All tests use unittest.mock — no live DB or plc-modbus required. +Run: pytest tests/test_work_orders.py -v + +Acceptance Criteria (PRD-MES-CORE.md §10): + AC#6 Work orders create / transition without errors + AC#7 One ACTIVE work order per line enforced +""" + +import uuid +from datetime import datetime, timezone +from unittest.mock import MagicMock, patch + +import pytest +from fastapi.testclient import TestClient + +from backend.main import app +from backend.models.db_models import Line, Product, WorkOrder, WorkOrderStatus +from backend.services.oee_calculator import compute_oee + +# ── Fixtures ────────────────────────────────────────────────────────────────── + +LINE_ID = str(uuid.uuid4()) +PRODUCT_ID = str(uuid.uuid4()) +WO_ID = str(uuid.uuid4()) + +NOW = datetime.now(timezone.utc) + + +def _make_line(): + line = MagicMock(spec=Line) + line.id = uuid.UUID(LINE_ID) + line.name = "Conveyor-1" + return line + + +def _make_product(ideal_cycle_sec: float = 2.0): + p = MagicMock(spec=Product) + p.id = uuid.UUID(PRODUCT_ID) + p.sku = "SKU-001" + p.name = "Widget A" + p.ideal_cycle_sec = ideal_cycle_sec + p.description = None + return p + + +def _make_wo(status: WorkOrderStatus = WorkOrderStatus.PENDING): + wo = MagicMock(spec=WorkOrder) + wo.id = uuid.UUID(WO_ID) + wo.order_number = "WO-001" + wo.product_id = uuid.UUID(PRODUCT_ID) + wo.line_id = uuid.UUID(LINE_ID) + wo.target_qty = 100 + wo.good_qty = 0 + wo.status = status + wo.scheduled_start = None + wo.actual_start = None + wo.actual_end = None + wo.notes = None + wo.created_at = NOW + return wo + + +# ── compute_oee: utilisation param (TEEP) ───────────────────────────────────── + +class TestComputeOEEUtilisation: + def test_teep_equals_oee_when_utilisation_1(self): + """Default utilisation=1.0 → TEEP == OEE (Week 3 behaviour preserved).""" + m = compute_oee( + run_time_sec=48, planned_time_sec=60, + total_count=40, good_count=38, ideal_cycle_sec=1.0, + ) + assert m["teep"] == pytest.approx(m["oee"], abs=0.001) + + def test_teep_scaled_by_utilisation(self): + """utilisation=0.5 → TEEP = OEE × 0.5""" + m = compute_oee( + run_time_sec=60, planned_time_sec=60, + total_count=60, good_count=60, ideal_cycle_sec=1.0, + utilisation=0.5, + ) + assert m["oee"] == pytest.approx(1.0, abs=0.001) + assert m["teep"] == pytest.approx(0.5, abs=0.001) + + def test_teep_zero_when_outside_schedule(self): + """utilisation=0.0 → TEEP = 0 (line running outside shift).""" + m = compute_oee( + run_time_sec=60, planned_time_sec=60, + total_count=60, good_count=60, ideal_cycle_sec=1.0, + utilisation=0.0, + ) + assert m["oee"] == pytest.approx(1.0, abs=0.001) + assert m["teep"] == 0.0 + + def test_teep_clamped_to_1(self): + """utilisation > 1.0 is clamped — TEEP never exceeds 1.0.""" + m = compute_oee( + run_time_sec=60, planned_time_sec=60, + total_count=60, good_count=60, ideal_cycle_sec=1.0, + utilisation=2.0, + ) + assert m["teep"] == pytest.approx(1.0, abs=0.001) + + def test_teep_rounded_to_4_decimals(self): + m = compute_oee(48, 60, 40, 38, 1.0, utilisation=0.75) + assert m["teep"] == round(m["teep"], 4) + + +# ── Work order API (via TestClient + mocked DB) ──────────────────────────────── + +@pytest.fixture() +def client(): + """FastAPI test client with DB dependency overridden.""" + from backend.database import get_db + + mock_db = MagicMock() + app.dependency_overrides[get_db] = lambda: mock_db + with TestClient(app, raise_server_exceptions=True) as c: + yield c, mock_db + app.dependency_overrides.clear() + + +class TestCreateWorkOrder: + def test_create_returns_201(self, client): + tc, db = client + db.query.return_value.filter.return_value.first.side_effect = [ + _make_line(), # line exists + _make_product(), # product exists + None, # order_number unique check + ] + wo = _make_wo() + db.refresh.side_effect = lambda obj: None + + with patch("backend.routes.work_orders.WorkOrder", return_value=wo): + resp = tc.post("/api/mes/work-orders", json={ + "order_number": "WO-001", + "product_id": PRODUCT_ID, + "line_id": LINE_ID, + "target_qty": 100, + }) + + assert resp.status_code == 201 + assert resp.json()["order_number"] == "WO-001" + + def test_create_missing_line_returns_404(self, client): + tc, db = client + db.query.return_value.filter.return_value.first.return_value = None # line not found + resp = tc.post("/api/mes/work-orders", json={ + "order_number": "WO-002", + "product_id": PRODUCT_ID, + "line_id": LINE_ID, + "target_qty": 50, + }) + assert resp.status_code == 404 + + def test_duplicate_order_number_returns_409(self, client): + tc, db = client + db.query.return_value.filter.return_value.first.side_effect = [ + _make_line(), # line exists + _make_product(), # product exists + _make_wo(), # duplicate order_number + ] + resp = tc.post("/api/mes/work-orders", json={ + "order_number": "WO-001", + "product_id": PRODUCT_ID, + "line_id": LINE_ID, + "target_qty": 100, + }) + assert resp.status_code == 409 + + +class TestListWorkOrders: + def test_list_returns_200(self, client): + tc, db = client + db.query.return_value.order_by.return_value.all.return_value = [_make_wo()] + # No filters applied — query chain has no .filter() + db.query.return_value.filter.return_value.order_by.return_value.all.return_value = [_make_wo()] + resp = tc.get("/api/mes/work-orders") + assert resp.status_code == 200 + + def test_invalid_status_filter_returns_422(self, client): + tc, db = client + resp = tc.get("/api/mes/work-orders?status=BOGUS") + assert resp.status_code == 422 + + +class TestGetWorkOrder: + def test_found_returns_200(self, client): + tc, db = client + db.query.return_value.filter.return_value.first.return_value = _make_wo() + resp = tc.get(f"/api/mes/work-orders/{WO_ID}") + assert resp.status_code == 200 + assert resp.json()["id"] == WO_ID + + def test_not_found_returns_404(self, client): + tc, db = client + db.query.return_value.filter.return_value.first.return_value = None + resp = tc.get(f"/api/mes/work-orders/{WO_ID}") + assert resp.status_code == 404 + + +class TestStatusTransitions: + def test_pending_to_active(self, client): + tc, db = client + wo = _make_wo(WorkOrderStatus.PENDING) + db.query.return_value.filter.return_value.first.side_effect = [ + wo, # fetch the WO + None, # no conflicting ACTIVE on this line + ] + db.refresh.side_effect = lambda obj: None + resp = tc.patch(f"/api/mes/work-orders/{WO_ID}/status", json={"status": "ACTIVE"}) + assert resp.status_code == 200 + + def test_complete_to_any_is_invalid(self, client): + tc, db = client + wo = _make_wo(WorkOrderStatus.COMPLETE) + db.query.return_value.filter.return_value.first.return_value = wo + resp = tc.patch(f"/api/mes/work-orders/{WO_ID}/status", json={"status": "ACTIVE"}) + assert resp.status_code == 422 + + def test_second_active_on_same_line_returns_409(self, client): + tc, db = client + wo = _make_wo(WorkOrderStatus.PENDING) + conflict = _make_wo(WorkOrderStatus.ACTIVE) + conflict.id = uuid.UUID(str(uuid.uuid4())) + db.query.return_value.filter.return_value.first.side_effect = [ + wo, # fetch the WO + conflict, # conflicting ACTIVE WO + ] + resp = tc.patch(f"/api/mes/work-orders/{WO_ID}/status", json={"status": "ACTIVE"}) + assert resp.status_code == 409 + + def test_unknown_status_value_returns_422(self, client): + tc, db = client + wo = _make_wo(WorkOrderStatus.PENDING) + db.query.return_value.filter.return_value.first.return_value = wo + resp = tc.patch(f"/api/mes/work-orders/{WO_ID}/status", json={"status": "LAUNCHED"}) + assert resp.status_code == 422 + + +# ── Product endpoints ────────────────────────────────────────────────────────── + +class TestProducts: + def test_create_product_returns_201(self, client): + tc, db = client + db.query.return_value.filter.return_value.first.return_value = None # no duplicate + p = _make_product() + db.refresh.side_effect = lambda obj: None + + with patch("backend.routes.work_orders.Product", return_value=p): + resp = tc.post("/api/mes/products", json={ + "sku": "SKU-001", + "name": "Widget A", + "ideal_cycle_sec": 2.0, + }) + assert resp.status_code == 201 + + def test_duplicate_sku_returns_409(self, client): + tc, db = client + db.query.return_value.filter.return_value.first.return_value = _make_product() + resp = tc.post("/api/mes/products", json={ + "sku": "SKU-001", + "name": "Widget A", + "ideal_cycle_sec": 2.0, + }) + assert resp.status_code == 409