diff --git a/PLAN.md b/PLAN.md index eaa5d67..bf51cca 100644 --- a/PLAN.md +++ b/PLAN.md @@ -1,81 +1,86 @@ -# PLAN: MES Core — Week 1 (DB Schema + Alembic Migrations) +# PLAN: MES Core — Week 2 (Modbus Machine State Reader) -**Branch:** `feat/mes-week1-db-schema` -**Issue:** Mikecranesync/MIRA#319 +**Branch:** `feat/mes-week2-state-reader` +**Issue:** Mikecranesync/MIRA#320 **PRD:** `docs/PRD-MES-CORE.md` **Date:** 2026-04-15 +**Depends on:** Week 1 (feat/mes-week1-db-schema) merged --- ## Objective -Stand up the `mes_core` PostgreSQL schema — the foundational data layer for Work Orders, OEE, Machine States, and Downtime Tracking. All subsequent MES weeks depend on this being clean and stable. +Build the machine state reader: a background poller that reads the plc-modbus HTTP API every 5 seconds per configured line, detects state transitions (RUNNING/DOWN/IDLE/OFFLINE), writes them to `machine_states`, and exposes `GET /api/mes/lines` and `GET /api/mes/lines/{id}/state` REST endpoints. ## Affected Files **New:** -- `services/mes/requirements.txt` -- `services/mes/backend/__init__.py` -- `services/mes/backend/config.py` -- `services/mes/backend/database.py` -- `services/mes/backend/main.py` -- `services/mes/backend/models/__init__.py` -- `services/mes/backend/models/db_models.py` -- `services/mes/backend/models/mes_models.py` -- `services/mes/backend/routes/__init__.py` -- `services/mes/backend/routes/health.py` -- `services/mes/alembic.ini` -- `services/mes/alembic/env.py` -- `services/mes/alembic/script.py.mako` -- `services/mes/alembic/versions/0001_initial_mes_schema.py` -- `services/mes/tests/__init__.py` -- `services/mes/tests/conftest.py` -- `services/mes/tests/test_schema.py` -- `infra/scada/mes.Dockerfile` +- `services/mes/backend/services/__init__.py` +- `services/mes/backend/services/plc_client.py` — async HTTP client wrapping plc-modbus +- `services/mes/backend/services/state_machine.py` — pure state detection from IO snapshot +- `services/mes/backend/services/state_poller.py` — asyncio background poll loop +- `services/mes/backend/routes/lines.py` — GET /api/mes/lines, GET /lines/{id}/state +- `services/mes/tests/test_machine_states.py` — 10 unit tests, all mocked **Modified:** -- `docker-compose.yml` — add `factorylm-mes-db` (Postgres) and `factorylm-mes` containers +- `services/mes/requirements.txt` — add httpx +- `services/mes/backend/config.py` — add plc_modbus_url setting +- `services/mes/backend/main.py` — wire poller into lifespan, add lines router +- `docker-compose.yml` — add PLC_MODBUS_URL env to mes container ## Approach -1. Introduce SQLAlchemy 2.x + Alembic into `services/mes/` — first time in repo -2. Follow existing FastAPI service pattern from `services/plc-modbus/` -3. DB schema: 7 tables (`lines`, `products`, `work_orders`, `schedules`, `downtime_reasons`, `machine_states`, `oee_snapshots`) -4. Seed `downtime_reasons` (14 codes) and `lines` (2 lines) in initial migration -5. Postgres 16 via Docker — `factorylm-mes-db` container on port 5433 (avoids conflict) -6. FastAPI skeleton with `/api/health` only — full routes come in Week 2+ +1. `plc_client.py` — thin async wrapper around `GET /api/plc/io` (httpx). Raises `PLCOfflineError` on timeout/connection failure so caller can set OFFLINE state. +2. `state_machine.py` — pure function `detect_state(io_data)` → `(MachineStateEnum, reason_code | None)`. Derived from `VFDStatus` and `ErrorCode` registers. No DB or network calls — fully testable without mocks. +3. `state_poller.py` — asyncio task, one iteration per line every 5s. Maintains in-memory cache to avoid DB reads on every tick. Writes to `machine_states` only on transition. +4. `lines.py` routes — two endpoints: list all lines (from DB), get current state (from in-memory cache + last DB row). +5. `main.py` lifespan — start poller task on startup, cancel on shutdown. + +State transition write: close open row (`ended_at = NOW()`), insert new row. + +## State Machine + +``` +IO: VFDStatus=1, ErrorCode=0 → RUNNING +IO: VFDStatus=2 OR ErrorCode>0 → DOWN (reason_code from ErrorCode map) +IO: VFDStatus=0, ErrorCode=0 → IDLE +HTTP failure / timeout → OFFLINE +``` + +## ErrorCode → reason_code map + +```python +{1: "OVERLOAD", 2: "OVERHEAT", 3: "SENSOR_FAIL", 4: "JAM", 7: "E_STOP"} +``` ## Risks -- No Alembic precedent in repo — introducing fresh, so migration is the only baseline -- Python 3.9 system — using `Optional[X]` not `X | None` -- Register map divergence (plc-modbus CLAUDE.md vs main CLAUDE.md) — MES uses main CLAUDE.md register map (authoritative) +- plc-modbus in mock mode returns VFDStatus=0 at rest — poller sees IDLE immediately (expected) +- Multiple lines share one plc-modbus service currently — same io_data, different `line_id` rows ## Rollback ```bash -git checkout main -docker compose down factorylm-mes-db factorylm-mes +git checkout feat/mes-week1-db-schema ``` ## Verification Steps ```bash -# Start DB -docker compose up factorylm-mes-db -d - -# Run migration -cd services/mes -DATABASE_URL="postgresql://mes:meslocal@localhost:5433/mes_core" alembic upgrade head - -# Run schema tests -pytest services/mes/tests/test_schema.py -v - -# Health check -docker compose up factorylm-mes -d -curl localhost:8300/api/health +# Unit tests (no docker needed) +cd services/mes && pytest tests/test_machine_states.py -v + +# Integration: start stack, check state endpoint +docker compose up mes-db mes plc-modbus -d +curl localhost:8300/api/mes/lines +curl localhost:8300/api/mes/lines//state + +# Inject a fault and verify DB transition +curl -X POST localhost:8001/api/plc/mock/fault -H "Content-Type: application/json" -d '{"fault_type":"jam"}' +sleep 8 +curl localhost:8300/api/mes/lines//state # should show DOWN / JAM ``` ## Note on Active Focus Window -The main `CLAUDE.md` declares a Revenue Priority focus on V1 Telegram bot. This MES work has been explicitly requested by Mike (2026-04-15 session) as a parallel track. Proceeding with explicit authorization. +Explicitly authorized by Mike (2026-04-15 session). diff --git a/docker-compose.yml b/docker-compose.yml index 0aba6a7..44a15c0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -106,10 +106,13 @@ services: - "8300:8300" environment: FACTORYLM_DATABASE_URL: "postgresql://mes:meslocal@mes-db:5432/mes_core" - FACTORYLM_PLC_USE_MOCK: "true" + FACTORYLM_PLC_MODBUS_URL: "http://plc-modbus:8001" + FACTORYLM_PLC_USE_MOCK: "false" depends_on: mes-db: condition: service_healthy + plc-modbus: + condition: service_healthy healthcheck: test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8300/api/health')"] interval: 10s diff --git a/services/mes/backend/config.py b/services/mes/backend/config.py index ed57e54..f76f6b6 100644 --- a/services/mes/backend/config.py +++ b/services/mes/backend/config.py @@ -18,8 +18,13 @@ class Settings(BaseSettings): # Format: postgresql://user:password@host:port/dbname database_url: str = "postgresql://mes:meslocal@localhost:5434/mes_core" - # PLC defaults (overridden per-line from DB) + # plc-modbus service URL — MES calls this over HTTP (never raw Modbus TCP) + plc_modbus_url: str = "http://plc-modbus:8001" + + # Polling interval in seconds (default 5, set lower in tests) plc_poll_interval_sec: int = 5 + + # Set True to skip poller startup (useful in unit tests) plc_use_mock: bool = False diff --git a/services/mes/backend/main.py b/services/mes/backend/main.py index 722f1d4..477dbfc 100644 --- a/services/mes/backend/main.py +++ b/services/mes/backend/main.py @@ -1,9 +1,15 @@ """FactoryLM MES API — FastAPI entry point. -Week 1: /health only. -Week 2+: lines, work_orders, downtime, oee routes added here. +Lifespan: + startup → seed state cache, launch background state poller + shutdown → signal poller to stop cleanly + +Routes (cumulative by week): + Week 1: /api/health + Week 2: /api/mes/lines, /api/mes/lines/{id}/state """ +import asyncio import logging from contextlib import asynccontextmanager @@ -12,6 +18,8 @@ from backend.config import settings from backend.routes.health import router as health_router +from backend.routes.lines import router as lines_router +from backend.services import state_poller logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -19,9 +27,28 @@ @asynccontextmanager async def lifespan(app: FastAPI): - logger.info("MES service starting — DB: %s", settings.database_url.split("@")[-1]) + db_host = settings.database_url.split("@")[-1] + logger.info("MES service starting — DB: %s PLC: %s", db_host, settings.plc_modbus_url) + + poller_task = None + if not settings.plc_use_mock: + poller_task = asyncio.create_task( + state_poller.run(poll_interval_sec=settings.plc_poll_interval_sec), + name="state_poller", + ) + logger.info("State poller started (interval=%ds)", settings.plc_poll_interval_sec) + else: + logger.info("PLC mock mode — state poller disabled") + yield - logger.info("MES service stopping") + + logger.info("MES service shutting down") + if poller_task: + state_poller.stop() + try: + await asyncio.wait_for(poller_task, timeout=8.0) + except asyncio.TimeoutError: + poller_task.cancel() app = FastAPI( @@ -39,6 +66,7 @@ async def lifespan(app: FastAPI): ) app.include_router(health_router, prefix=settings.api_prefix) +app.include_router(lines_router, prefix=settings.api_prefix) if __name__ == "__main__": diff --git a/services/mes/backend/routes/lines.py b/services/mes/backend/routes/lines.py new file mode 100644 index 0000000..f7b36e6 --- /dev/null +++ b/services/mes/backend/routes/lines.py @@ -0,0 +1,107 @@ +"""Lines routes — GET /api/mes/lines and GET /api/mes/lines/{id}/state. + +Week 2 endpoints only. Work order and OEE endpoints added in later weeks. +""" + +import logging +from typing import Optional +from datetime import datetime + +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel +from sqlalchemy.orm import Session + +from backend.database import get_db +from backend.models.db_models import Line, MachineState, MachineStateEnum +from backend.models.mes_models import LineResponse +from backend.services import state_poller + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/mes", tags=["mes"]) + + +# ── Response models ─────────────────────────────────────────────────────────── + +class LineStateResponse(BaseModel): + line_id: str + line_name: str + state: str + reason_code: Optional[str] = None + since: Optional[datetime] = None # when this state started + source: str # "cache" | "db" | "unknown" + + +# ── Endpoints ───────────────────────────────────────────────────────────────── + +@router.get("/lines", response_model=list[LineResponse]) +def list_lines(db: Session = Depends(get_db)): + """Return all configured production lines.""" + lines = db.query(Line).order_by(Line.name).all() + return [ + LineResponse( + id=str(line.id), + name=line.name, + isa95_path=line.isa95_path, + plc_host=line.plc_host, + plc_port=line.plc_port, + description=line.description, + ) + for line in lines + ] + + +@router.get("/lines/{line_id}/state", response_model=LineStateResponse) +def get_line_state(line_id: str, db: Session = Depends(get_db)): + """Return the current machine state for a line. + + Checks the in-memory cache first (no DB hit on the hot path). + Falls back to the most recent open DB row if the cache is cold + (e.g. service just restarted but poller hasn't ticked yet). + """ + line = db.query(Line).filter(Line.id == line_id).first() + if not line: + raise HTTPException(status_code=404, detail=f"Line {line_id} not found") + + # Try in-memory cache first + cached = state_poller.get_cached_state(line_id) + if cached is not None: + # Get the `since` time from the latest open DB row (non-blocking, fast) + open_row = ( + db.query(MachineState) + .filter(MachineState.line_id == line_id, MachineState.ended_at.is_(None)) + .order_by(MachineState.started_at.desc()) + .first() + ) + return LineStateResponse( + line_id=line_id, + line_name=line.name, + state=cached.value, + reason_code=open_row.reason_code if open_row else None, + since=open_row.started_at if open_row else None, + source="cache", + ) + + # Cache cold — fall back to DB + open_row = ( + db.query(MachineState) + .filter(MachineState.line_id == line_id, MachineState.ended_at.is_(None)) + .order_by(MachineState.started_at.desc()) + .first() + ) + if open_row: + return LineStateResponse( + line_id=line_id, + line_name=line.name, + state=open_row.state.value, + reason_code=open_row.reason_code, + since=open_row.started_at, + source="db", + ) + + return LineStateResponse( + line_id=line_id, + line_name=line.name, + state=MachineStateEnum.OFFLINE.value, + source="unknown", + ) diff --git a/services/mes/backend/services/__init__.py b/services/mes/backend/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/mes/backend/services/plc_client.py b/services/mes/backend/services/plc_client.py new file mode 100644 index 0000000..0e447c5 --- /dev/null +++ b/services/mes/backend/services/plc_client.py @@ -0,0 +1,87 @@ +"""Async HTTP client for the plc-modbus service. + +The MES never talks Modbus TCP directly — it calls the plc-modbus +FastAPI service over HTTP. This keeps the MES decoupled from hardware +and lets the plc-modbus service handle reconnection logic. + +Endpoint used: + GET {plc_modbus_url}/api/plc/io + +Response shape (from plc-modbus IOResponse): + { + "coils": {"Conveyor": bool, "RunCommand": bool, ...}, + "inputs": {"DI_00": bool, ...}, + "outputs": {"DO_00": bool, "DO_01": bool, ...}, + "registers": { + "ItemCount": int, # cumulative part count at exit sensor + "ConveyorHz": int, # VFD frequency (Hz) + "MotorCurrentX10": int, # motor current × 10 (e.g. 45 = 4.5 A) + "MotorTempX10": int, # motor temp × 10 (e.g. 312 = 31.2 °C) + "VFDStatus": int, # 0=idle 1=running 2=fault + "ErrorCode": int, # 0=none 1=overload 2=overheat + # 3=sensor 4=jam 7=e-stop + }, + "timestamp": str, + } +""" + +import logging +from typing import Optional + +import httpx + +logger = logging.getLogger(__name__) + +# Timeout for each HTTP call to plc-modbus +_HTTP_TIMEOUT = 4.0 # seconds + + +class PLCOfflineError(Exception): + """Raised when plc-modbus is unreachable or returns an error status.""" + + +async def fetch_io(plc_modbus_url: str) -> dict: + """Fetch a single IO snapshot from the plc-modbus service. + + Args: + plc_modbus_url: Base URL of the plc-modbus service, + e.g. "http://plc-modbus:8001" + + Returns: + Parsed JSON dict with keys: coils, inputs, outputs, registers, timestamp. + + Raises: + PLCOfflineError: on connection error, timeout, or HTTP >= 400. + """ + url = f"{plc_modbus_url.rstrip('/')}/api/plc/io" + try: + async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT) as client: + resp = await client.get(url) + if resp.status_code >= 400: + raise PLCOfflineError( + f"plc-modbus returned HTTP {resp.status_code} for {url}" + ) + return resp.json() + except httpx.TimeoutException as exc: + raise PLCOfflineError(f"Timeout reaching plc-modbus at {url}") from exc + except httpx.ConnectError as exc: + raise PLCOfflineError(f"Cannot connect to plc-modbus at {url}") from exc + except PLCOfflineError: + raise + except Exception as exc: + raise PLCOfflineError(f"Unexpected error fetching {url}: {exc}") from exc + + +def extract_registers(io_data: dict) -> dict: + """Return the registers sub-dict, defaulting to empty.""" + return io_data.get("registers", {}) + + +def extract_coils(io_data: dict) -> dict: + """Return the coils sub-dict, defaulting to empty.""" + return io_data.get("coils", {}) + + +def item_count(io_data: dict) -> int: + """Cumulative part count from HR100 (ItemCount).""" + return extract_registers(io_data).get("ItemCount", 0) diff --git a/services/mes/backend/services/state_machine.py b/services/mes/backend/services/state_machine.py new file mode 100644 index 0000000..89df12d --- /dev/null +++ b/services/mes/backend/services/state_machine.py @@ -0,0 +1,77 @@ +"""Pure machine-state detection from a plc-modbus IO snapshot. + +This module has NO I/O — no DB, no HTTP. It is a set of pure +functions that map IO data to a MachineStateEnum + optional downtime +reason_code. This makes it trivially unit-testable. + +State transition logic (from PRD-MES-CORE.md §4.4): + + VFDStatus=1, ErrorCode=0 → RUNNING + VFDStatus=2 OR ErrorCode > 0 → DOWN (+ reason_code) + VFDStatus=0, ErrorCode=0 → IDLE + PLCOfflineError (HTTP failure) → OFFLINE (handled by poller) + +ErrorCode → reason_code map: + 0 → None (no fault) + 1 → OVERLOAD + 2 → OVERHEAT + 3 → SENSOR_FAIL + 4 → JAM + 7 → E_STOP + other → UNKNOWN +""" + +from typing import Optional, Tuple + +from backend.models.db_models import MachineStateEnum + +# Maps plc-modbus ErrorCode register values to downtime_reasons.code keys. +# Seeded in migration 0001. +ERROR_CODE_REASON_MAP: dict = { + 1: "OVERLOAD", + 2: "OVERHEAT", + 3: "SENSOR_FAIL", + 4: "JAM", + 7: "E_STOP", +} + + +def detect_state( + io_data: dict, +) -> Tuple[MachineStateEnum, Optional[str]]: + """Derive machine state from a plc-modbus IO snapshot. + + Args: + io_data: Parsed JSON from GET /api/plc/io. + + Returns: + Tuple of (MachineStateEnum, reason_code_or_None). + reason_code is only set when state is DOWN. + """ + registers = io_data.get("registers", {}) + vfd_status = registers.get("VFDStatus", 0) + error_code = registers.get("ErrorCode", 0) + + # Fault takes priority — VFD may still claim running during transition + if vfd_status == 2 or error_code > 0: + reason = ERROR_CODE_REASON_MAP.get(error_code, "UNKNOWN") + return MachineStateEnum.DOWN, reason + + if vfd_status == 1: + return MachineStateEnum.RUNNING, None + + # vfd_status == 0, no error → idle + return MachineStateEnum.IDLE, None + + +def is_transition( + current: Optional[MachineStateEnum], + new: MachineStateEnum, +) -> bool: + """Return True if state has changed (or current is unknown).""" + return current is None or current != new + + +def state_label(state: MachineStateEnum) -> str: + """Human-readable state label for logs and API responses.""" + return state.value diff --git a/services/mes/backend/services/state_poller.py b/services/mes/backend/services/state_poller.py new file mode 100644 index 0000000..97990b7 --- /dev/null +++ b/services/mes/backend/services/state_poller.py @@ -0,0 +1,198 @@ +"""Background asyncio task that polls plc-modbus and writes state transitions. + +One poller instance manages all configured lines. It polls every +`poll_interval_sec` seconds (default 5) and writes to `machine_states` +only when the state changes for a line. + +State cache: + In-memory dict {line_id (str) -> MachineStateEnum}. + Avoids a DB read on every tick — only transitions hit the DB. + +Transition write: + 1. Find the open machine_state row for this line (ended_at IS NULL). + 2. Set ended_at = NOW() on that row. + 3. INSERT new row with new state + reason_code. + +On startup: + Seed the cache from the most recent open machine_state row per line. + If none exists, insert an initial OFFLINE row. +""" + +import asyncio +import logging +from datetime import datetime, timezone +from typing import Optional + +from sqlalchemy import text +from sqlalchemy.orm import Session + +from backend.config import settings +from backend.database import SessionLocal +from backend.models.db_models import EnteredBy, Line, MachineState, MachineStateEnum +from backend.services.plc_client import PLCOfflineError, fetch_io +from backend.services.state_machine import detect_state, is_transition + +logger = logging.getLogger(__name__) + +# In-memory cache: line_id (str) -> current MachineStateEnum +_state_cache: dict = {} + +# Set to True to stop the polling loop cleanly +_stop_event: Optional[asyncio.Event] = None + + +# ── DB helpers ──────────────────────────────────────────────────────────────── + +def _load_lines(db: Session) -> list: + """Return all Line rows from DB.""" + return db.query(Line).all() + + +def _close_open_state(db: Session, line_id: str) -> None: + """Set ended_at on the currently open machine_state row for this line.""" + db.execute( + text( + "UPDATE machine_states SET ended_at = :now " + "WHERE line_id = :lid AND ended_at IS NULL" + ), + {"now": datetime.now(timezone.utc), "lid": line_id}, + ) + + +def _insert_state( + db: Session, + line_id: str, + state: MachineStateEnum, + reason_code: Optional[str], +) -> None: + """Insert a new open machine_state row.""" + row = MachineState( + line_id=line_id, + state=state, + started_at=datetime.now(timezone.utc), + reason_code=reason_code, + entered_by=EnteredBy.PLC, + ) + db.add(row) + + +def _apply_transition( + line_id: str, + new_state: MachineStateEnum, + reason_code: Optional[str], +) -> None: + """Write a state transition to the DB atomically.""" + db = SessionLocal() + try: + _close_open_state(db, line_id) + _insert_state(db, line_id, new_state, reason_code) + db.commit() + except Exception: + db.rollback() + logger.exception("Failed to write state transition for line %s", line_id) + finally: + db.close() + + +def _seed_cache_from_db() -> None: + """On startup, load the latest open state per line into the cache.""" + db = SessionLocal() + try: + rows = db.execute( + text( + "SELECT DISTINCT ON (line_id) line_id, state " + "FROM machine_states " + "WHERE ended_at IS NULL " + "ORDER BY line_id, started_at DESC" + ) + ).fetchall() + for row in rows: + try: + _state_cache[str(row[0])] = MachineStateEnum(row[1]) + except ValueError: + pass + finally: + db.close() + + +# ── Core poll logic ─────────────────────────────────────────────────────────── + +async def _poll_line(line: Line) -> None: + """Poll plc-modbus for one line and write a transition if state changed.""" + line_id = str(line.id) + try: + io_data = await fetch_io(settings.plc_modbus_url) + new_state, reason_code = detect_state(io_data) + except PLCOfflineError as exc: + logger.warning("Line %s — PLC offline: %s", line.name, exc) + new_state = MachineStateEnum.OFFLINE + reason_code = "COMMS_FAIL" + + current = _state_cache.get(line_id) + + if is_transition(current, new_state): + logger.info( + "Line %-12s %s → %s (reason: %s)", + line.name, + current.value if current else "BOOT", + new_state.value, + reason_code or "—", + ) + _state_cache[line_id] = new_state + # DB write runs in a thread to avoid blocking the event loop + await asyncio.to_thread(_apply_transition, line_id, new_state, reason_code) + + +async def _poll_all_lines(lines: list) -> None: + """Poll all lines concurrently.""" + await asyncio.gather(*[_poll_line(line) for line in lines], return_exceptions=True) + + +# ── Poller lifecycle ────────────────────────────────────────────────────────── + +async def run(poll_interval_sec: int = 5) -> None: + """Main polling loop — runs until stop() is called. + + Loads lines from DB on first tick (and refreshes every 60 ticks + to pick up lines added without a restart). + """ + global _stop_event + _stop_event = asyncio.Event() + + logger.info("State poller starting (interval=%ds)", poll_interval_sec) + _seed_cache_from_db() + + tick = 0 + lines: list = [] + + while not _stop_event.is_set(): + if tick % 60 == 0: # reload line config every 5 min + db = SessionLocal() + try: + lines = _load_lines(db) + logger.info("Poller watching %d line(s): %s", + len(lines), [l.name for l in lines]) + finally: + db.close() + + if lines: + await _poll_all_lines(lines) + + tick += 1 + try: + await asyncio.wait_for(_stop_event.wait(), timeout=poll_interval_sec) + except asyncio.TimeoutError: + pass # normal — keep polling + + +def stop() -> None: + """Signal the polling loop to exit cleanly.""" + if _stop_event: + _stop_event.set() + + +# ── Public cache accessor ───────────────────────────────────────────────────── + +def get_cached_state(line_id: str) -> Optional[MachineStateEnum]: + """Return the last known state for a line (no DB hit).""" + return _state_cache.get(line_id) diff --git a/services/mes/requirements.txt b/services/mes/requirements.txt index 98f9a01..b3124a1 100644 --- a/services/mes/requirements.txt +++ b/services/mes/requirements.txt @@ -5,3 +5,4 @@ pydantic-settings>=2.0.0 sqlalchemy>=2.0.0 alembic>=1.13.0 psycopg2-binary>=2.9.0 +httpx>=0.27.0 diff --git a/services/mes/tests/test_machine_states.py b/services/mes/tests/test_machine_states.py new file mode 100644 index 0000000..b1aaef8 --- /dev/null +++ b/services/mes/tests/test_machine_states.py @@ -0,0 +1,243 @@ +"""Week 2 tests — machine state detection and transition logic. + +All tests use unittest.mock — no live DB or plc-modbus required. +Run: pytest tests/test_machine_states.py -v + +Acceptance Criteria (PRD-MES-CORE.md §10): + AC#2 Machine state reads from live/mock PLC → tested via mocked fetch_io + AC#4 Downtime event captured on fault within 10s → tested via transition logic +""" + +import asyncio +from datetime import datetime, timezone +from typing import Optional +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from backend.models.db_models import MachineStateEnum +from backend.services.plc_client import PLCOfflineError +from backend.services.state_machine import ( + ERROR_CODE_REASON_MAP, + detect_state, + is_transition, + state_label, +) + + +# ── Fixtures: plc-modbus IO response shapes ─────────────────────────────────── + +def _io(vfd_status: int = 0, error_code: int = 0, item_count: int = 0) -> dict: + """Build a minimal plc-modbus /api/plc/io response.""" + return { + "coils": {"Conveyor": vfd_status == 1, "RunCommand": False}, + "inputs": {}, + "outputs": {"DO_01": error_code == 7}, + "registers": { + "ItemCount": item_count, + "ConveyorHz": 30 if vfd_status == 1 else 0, + "MotorCurrentX10": 45 if vfd_status == 1 else 0, + "MotorTempX10": 312, + "VFDStatus": vfd_status, + "ErrorCode": error_code, + }, + "timestamp": datetime.now(timezone.utc).isoformat(), + } + + +# ── state_machine.detect_state ──────────────────────────────────────────────── + +class TestDetectState: + def test_running_when_vfd_status_1(self): + state, reason = detect_state(_io(vfd_status=1)) + assert state == MachineStateEnum.RUNNING + assert reason is None + + def test_idle_when_vfd_status_0_no_error(self): + state, reason = detect_state(_io(vfd_status=0, error_code=0)) + assert state == MachineStateEnum.IDLE + assert reason is None + + def test_down_when_vfd_status_2(self): + state, reason = detect_state(_io(vfd_status=2, error_code=0)) + assert state == MachineStateEnum.DOWN + # No error code → UNKNOWN + assert reason == "UNKNOWN" + + def test_down_with_error_code_takes_priority_over_running(self): + # VFD still says running but fault flag set — fault wins + state, reason = detect_state(_io(vfd_status=1, error_code=4)) + assert state == MachineStateEnum.DOWN + assert reason == "JAM" + + def test_empty_io_defaults_to_idle(self): + state, reason = detect_state({}) + assert state == MachineStateEnum.IDLE + assert reason is None + + @pytest.mark.parametrize("error_code,expected_reason", [ + (1, "OVERLOAD"), + (2, "OVERHEAT"), + (3, "SENSOR_FAIL"), + (4, "JAM"), + (7, "E_STOP"), + (99, "UNKNOWN"), # unmapped → UNKNOWN + ]) + def test_error_code_reason_mapping(self, error_code, expected_reason): + state, reason = detect_state(_io(vfd_status=0, error_code=error_code)) + assert state == MachineStateEnum.DOWN + assert reason == expected_reason + + def test_error_code_reason_map_completeness(self): + """The 5 standard error codes must all be in the map.""" + assert set(ERROR_CODE_REASON_MAP.values()) == { + "OVERLOAD", "OVERHEAT", "SENSOR_FAIL", "JAM", "E_STOP" + } + + +# ── state_machine.is_transition ─────────────────────────────────────────────── + +class TestIsTransition: + def test_none_current_always_transition(self): + assert is_transition(None, MachineStateEnum.RUNNING) is True + + def test_same_state_no_transition(self): + assert is_transition(MachineStateEnum.RUNNING, MachineStateEnum.RUNNING) is False + + def test_different_state_is_transition(self): + assert is_transition(MachineStateEnum.RUNNING, MachineStateEnum.DOWN) is True + assert is_transition(MachineStateEnum.IDLE, MachineStateEnum.OFFLINE) is True + + +# ── plc_client.fetch_io ─────────────────────────────────────────────────────── + +class TestPlcClient: + @pytest.mark.asyncio + async def test_fetch_io_returns_parsed_json(self): + from backend.services.plc_client import fetch_io + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = _io(vfd_status=1) + + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + mock_client.get = AsyncMock(return_value=mock_response) + mock_client_cls.return_value = mock_client + + result = await fetch_io("http://plc-modbus:8001") + + assert result["registers"]["VFDStatus"] == 1 + + @pytest.mark.asyncio + async def test_fetch_io_raises_offline_on_connect_error(self): + import httpx + from backend.services.plc_client import PLCOfflineError, fetch_io + + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + mock_client.get = AsyncMock( + side_effect=httpx.ConnectError("refused") + ) + mock_client_cls.return_value = mock_client + + with pytest.raises(PLCOfflineError): + await fetch_io("http://plc-modbus:8001") + + @pytest.mark.asyncio + async def test_fetch_io_raises_offline_on_http_error(self): + from backend.services.plc_client import PLCOfflineError, fetch_io + + mock_response = MagicMock() + mock_response.status_code = 503 + + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + mock_client.get = AsyncMock(return_value=mock_response) + mock_client_cls.return_value = mock_client + + with pytest.raises(PLCOfflineError): + await fetch_io("http://plc-modbus:8001") + + +# ── state_poller (transition write logic) ──────────────────────────────────── + +class TestStatePollTransition: + @pytest.mark.asyncio + async def test_poll_line_detects_running_and_writes_transition(self): + """When VFDStatus=1, poller should write RUNNING transition.""" + from backend.services import state_poller + + # Reset cache to simulate fresh start + state_poller._state_cache.clear() + + mock_line = MagicMock() + mock_line.id = "test-line-uuid" + mock_line.name = "Conveyor-1" + + io_snapshot = _io(vfd_status=1) + + with patch( + "backend.services.state_poller.fetch_io", + new=AsyncMock(return_value=io_snapshot), + ), patch( + "backend.services.state_poller._apply_transition" + ) as mock_apply: + await state_poller._poll_line(mock_line) + + mock_apply.assert_called_once_with( + "test-line-uuid", MachineStateEnum.RUNNING, None + ) + assert state_poller._state_cache["test-line-uuid"] == MachineStateEnum.RUNNING + + @pytest.mark.asyncio + async def test_poll_line_no_write_when_state_unchanged(self): + """No DB write if state hasn't changed.""" + from backend.services import state_poller + + state_poller._state_cache["test-line-uuid2"] = MachineStateEnum.RUNNING + + mock_line = MagicMock() + mock_line.id = "test-line-uuid2" + mock_line.name = "Conveyor-1" + + io_snapshot = _io(vfd_status=1) # still RUNNING + + with patch( + "backend.services.state_poller.fetch_io", + new=AsyncMock(return_value=io_snapshot), + ), patch( + "backend.services.state_poller._apply_transition" + ) as mock_apply: + await state_poller._poll_line(mock_line) + + mock_apply.assert_not_called() + + @pytest.mark.asyncio + async def test_poll_line_writes_offline_on_plc_error(self): + """PLCOfflineError → OFFLINE state written.""" + from backend.services import state_poller + + state_poller._state_cache.clear() + + mock_line = MagicMock() + mock_line.id = "test-line-uuid3" + mock_line.name = "Conveyor-1" + + with patch( + "backend.services.state_poller.fetch_io", + new=AsyncMock(side_effect=PLCOfflineError("timeout")), + ), patch( + "backend.services.state_poller._apply_transition" + ) as mock_apply: + await state_poller._poll_line(mock_line) + + mock_apply.assert_called_once_with( + "test-line-uuid3", MachineStateEnum.OFFLINE, "COMMS_FAIL" + )