diff --git a/app/client.py b/app/client.py new file mode 100644 index 0000000..8bde98a --- /dev/null +++ b/app/client.py @@ -0,0 +1,86 @@ +"""HTTP client for the AI extract and notification provider.""" + +import httpx +from tenacity import ( + retry, + retry_if_exception, + stop_after_attempt, + wait_exponential, +) + +PROVIDER_BASE = "http://localhost:3001" +API_KEY = "test-dev-2026" + +_HEADERS = {"X-API-Key": API_KEY, "Content-Type": "application/json"} + +# Shared client with connection pooling to avoid EOF/reset under load +_client: httpx.AsyncClient | None = None + + +def _get_client() -> httpx.AsyncClient: + global _client + if _client is None: + _client = httpx.AsyncClient( + timeout=30.0, + limits=httpx.Limits(max_connections=200, max_keepalive_connections=50), + ) + return _client + + +async def close_client() -> None: + global _client + if _client: + await _client.aclose() + _client = None + + +def _retry_on_rate_limit_or_server_error(exc: BaseException) -> bool: + """Retry on 429 (rate limit) or 5xx (server error).""" + if isinstance(exc, httpx.HTTPStatusError): + return exc.response.status_code in (429, 500, 502, 503, 504) + return False + + +async def ai_extract(system_prompt: str, user_input: str) -> str: + """ + Call the AI extract endpoint and return the assistant content. + Raises httpx.HTTPStatusError on 4xx/5xx. + """ + payload = { + "messages": [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_input}, + ] + } + client = _get_client() + resp = await client.post( + f"{PROVIDER_BASE}/v1/ai/extract", + json=payload, + headers=_HEADERS, + ) + resp.raise_for_status() + data = resp.json() + choices = data.get("choices", []) + if not choices: + return "" + return choices[0].get("message", {}).get("content", "") + + +@retry( + retry=retry_if_exception(_retry_on_rate_limit_or_server_error), + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=1, max=10), +) +async def notify(to: str, message: str, type_: str) -> None: + """ + Send a notification via the provider. Retries on 429/5xx (up to 3 attempts). + Raises httpx.HTTPStatusError on 4xx/5xx after retries exhausted. + """ + payload = {"to": to, "message": message, "type": type_} + client = _get_client() + resp = await client.post( + f"{PROVIDER_BASE}/v1/notify", + json=payload, + headers=_HEADERS, + ) + resp.raise_for_status() diff --git a/app/guardrails.py b/app/guardrails.py new file mode 100644 index 0000000..ba95d1a --- /dev/null +++ b/app/guardrails.py @@ -0,0 +1,144 @@ +"""Guardrails: parse and normalize AI extraction output to +{to, message, type}.""" + +import json +import re +# Alternate keys the mock may return -> canonical keys +_TO_KEYS = ("to", "recipient", "destination") # "To" -> to via lowercase +_MESSAGE_KEYS = ("message", "body", "text") +_TYPE_KEYS = ("type", "channel", "method") + + +def _extract_json_candidate(raw: str) -> str | None: + """Extract a JSON object string from raw text (Markdown blocks, + embedded, etc).""" + if not raw or not raw.strip(): + return None + + # 1. Try ```json ... ``` or ``` ... ``` + code_block = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", raw) + if code_block: + return code_block.group(1).strip() + + # 2. Find {...} in text + start = raw.find("{") + if start == -1: + return None + + depth = 0 + in_string = False + escape = False + quote_char = None + i = start + + while i < len(raw): + c = raw[i] + if escape: + escape = False + i += 1 + continue + if c == "\\" and in_string: + escape = True + i += 1 + continue + if not in_string: + if c == "{": + depth += 1 + elif c == "}": + depth -= 1 + if depth == 0: + return raw[start: i + 1] + elif c in ('"', "'"): + in_string = True + quote_char = c + else: + if c == quote_char: + in_string = False + i += 1 + + return None # Unbalanced braces (truncated) + + +def _try_parse_json(s: str) -> dict | None: + """Try to parse JSON, including single-quoted variant.""" + # 1. Standard JSON + try: + return json.loads(s) + except json.JSONDecodeError: + pass + + # 2. Single-quoted keys/values (invalid JSON but common in Python) + try: + normalized = re.sub( + r"'([^']*)'(\s*[:,}\]])", + lambda m: json.dumps(m.group(1)) + m.group(2), + s, + ) + return json.loads(normalized) + except (json.JSONDecodeError, ValueError): + pass + + # 3. Unquoted keys: {to: "x", message: "y", type: "email"} + try: + fixed = re.sub(r"(\w+)\s*:", r'"\1":', s) + return json.loads(fixed) + except json.JSONDecodeError: + pass + + return None + + +def _normalize_keys(data: dict) -> dict | None: + """Map alternate keys to canonical {to, message, type}. Validate type.""" + result = {} + by_lower = { + k.lower(): (k, v) for k, v in data.items() if isinstance(k, str) + } + + # to + for k in _TO_KEYS: + if k in by_lower: + _, val = by_lower[k] + if val and isinstance(val, str): + result["to"] = val.strip() + break + if "to" not in result: + return None + + # message + for k in _MESSAGE_KEYS: + if k in by_lower: + _, val = by_lower[k] + result["message"] = str(val).strip() if val else "" + break + result.setdefault("message", "") + + # type: must be email or sms + for k in _TYPE_KEYS: + if k in by_lower: + _, val = by_lower[k] + if val and isinstance(val, str): + t = val.strip().lower() + if t in ("email", "sms"): + result["type"] = t + return result + break + return None + + +def parse_extraction(raw: str) -> dict | None: + """ + Parse AI extraction output into {to, message, type} or None if invalid. + Handles: Markdown blocks, alternate keys, single quotes, unquoted keys, + extra fields, missing message. Fails on: truncation, refusals, missing + to/type. + """ + candidate = _extract_json_candidate(raw) + if not candidate: + return None + + parsed = _try_parse_json(candidate) + if not isinstance(parsed, dict): + return None + + return _normalize_keys(parsed) diff --git a/app/main.py b/app/main.py index fe37819..4c0d955 100644 --- a/app/main.py +++ b/app/main.py @@ -1,3 +1,53 @@ -from fastapi import FastAPI +"""FastAPI application for the notification service.""" -app = FastAPI(title="Notification Service (Technical Test)") +from contextlib import asynccontextmanager + +from fastapi import BackgroundTasks, FastAPI, HTTPException +from fastapi.responses import Response + +from client import close_client +from pipeline import run as run_pipeline +from schemas import RequestCreate, ResponseCreate, ResponseStatus +from store import create, get, update_status + + +@asynccontextmanager +async def lifespan(app: FastAPI): + yield + await close_client() + + +app = FastAPI(title="Notification Service (Technical Test)", lifespan=lifespan) + + +# --- Endpoints --- +@app.post("/v1/requests", status_code=201, response_model=ResponseCreate) +def create_request(body: RequestCreate) -> ResponseCreate: + """Ingesta de intenciones: crea una nueva solicitud en cola.""" + req_id = create(body.user_input) + return ResponseCreate(id=req_id) + + +@app.post("/v1/requests/{req_id}/process") +async def process_request(req_id: str, background_tasks: BackgroundTasks) -> Response: + """Procesamiento de envío: extrae, valida y notifica en background.""" + entry = get(req_id) + if not entry: + raise HTTPException(status_code=404, detail="Request not found") + + status = entry["status"] + if status in ("processing", "sent", "failed"): + return Response(status_code=200) + + update_status(req_id, "processing") + background_tasks.add_task(run_pipeline, req_id) + return Response(status_code=202) + + +@app.get("/v1/requests/{req_id}", response_model=ResponseStatus) +def get_request_status(req_id: str) -> ResponseStatus: + """Consulta de estado de una solicitud.""" + entry = get(req_id) + if not entry: + raise HTTPException(status_code=404, detail="Request not found") + return ResponseStatus(id=req_id, status=entry["status"]) diff --git a/app/pipeline.py b/app/pipeline.py new file mode 100644 index 0000000..3964f76 --- /dev/null +++ b/app/pipeline.py @@ -0,0 +1,107 @@ +"""Pipeline: extract -> guardrails -> notify.""" + +import asyncio + +import httpx + +from client import ai_extract, notify +from guardrails import parse_extraction +from store import get, update_status + +SYSTEM_PROMPT = """You are a notification intent extractor. Your task is to +parse a natural language request and output structured JSON. + +## Output Schema (STRICT) +Return exactly one raw JSON object with these three keys — no additional keys, +no nesting, no explanations: +- "to": string — destination (email address or phone number) +- "message": string — the text/body to send +- "type": string — MUST be exactly "email" or "sms" (lowercase) + +The response must contain ONLY the JSON object. +Do not include markdown code blocks. +Do not include commentary. +Use double quotes for keys and string values. + +## Extraction Rules + +1. Destination ("to"): + - Email: extract any valid email matching user@domain.tld + - Phone: extract digits in formats like: + - 6XXXXXXXXX + - 6XX-XXX-XXX + - Keep the phone number exactly as written. + - If multiple valid destinations are present, use the first one. + - Never invent a destination. + +2. Type ("type"): + - "email" if: + - The text contains: email, correo, mail, e-mail + - OR the destination is an email address + - "sms" if: + - The text contains: sms, teléfono, tel, móvil, celular, phone + - OR the destination is a phone number + - If both phone and email appear, choose the type that matches the + extracted destination. + - The value must be exactly "email" or "sms". + +3. Message ("message"): + - Extract text after indicators such as: + - "diciendo" + - ":" + - "con el mensaje" + - "que" (only if clearly introducing the message after the destination) + - Preserve the user's wording. + - Do not paraphrase. + - Trim leading/trailing whitespace. + - If no explicit message is found, use a short placeholder like + "Recordatorio". + +## Validation Rules +- Do not hallucinate or fabricate missing data. +- If a valid destination cannot be confidently extracted, return: + {"to":"","message":"","type":""} +- Ensure the output is valid JSON. +- Output only the JSON object and nothing else. + +## Example +Input: "Manda un correo a feda@test.com diciendo hola" +Output: {"to":"feda@test.com","message":"hola","type":"email"} +""" + + +_pipeline_semaphore = asyncio.Semaphore(30) + + +async def run(req_id: str) -> None: + """Extract -> guardrails -> notify. Limited by semaphore.""" + async with _pipeline_semaphore: + entry = get(req_id) + if not entry: + return + + update_status(req_id, "processing") + user_input = entry.get("user_input", "") + + try: + content = await ai_extract(SYSTEM_PROMPT, user_input) + except httpx.HTTPStatusError: + update_status(req_id, "failed") + return + + parsed = parse_extraction(content) + if not parsed: + update_status(req_id, "failed") + return + + try: + await notify( + to=parsed["to"], + message=parsed["message"], + type_=parsed["type"], + ) + except httpx.HTTPStatusError: + update_status(req_id, "failed") + return + + update_status(req_id, "sent") diff --git a/app/schemas.py b/app/schemas.py new file mode 100644 index 0000000..7f4a4ef --- /dev/null +++ b/app/schemas.py @@ -0,0 +1,25 @@ +"""Pydantic schemas for the notification service API.""" + +from typing import Literal + +from pydantic import BaseModel, Field + + +class RequestCreate(BaseModel): + user_input: str = Field( + ..., example="Manda un mail a feda@test.com diciendo hola" + ) + + +class ResponseCreate(BaseModel): + id: str = Field( # noqa: E501 + ..., example="550e8400-e29b-41d4-a716-446655440000" + ) + + +RequestStatus = Literal["queued", "processing", "sent", "failed"] + + +class ResponseStatus(BaseModel): + id: str = Field(..., example="550e8400-e29b-41d4-a716-446655440000") + status: RequestStatus = Field(..., example="queued") diff --git a/app/store.py b/app/store.py new file mode 100644 index 0000000..9ba0b42 --- /dev/null +++ b/app/store.py @@ -0,0 +1,29 @@ +"""In-memory store for notification request state.""" + +import uuid +from typing import Any + +_store: dict[str, dict[str, Any]] = {} + + +def create(user_input: str) -> str: + """Create a new request and return its id.""" + req_id = str(uuid.uuid4()) + _store[req_id] = {"user_input": user_input, "status": "queued"} + return req_id + + +def get(req_id: str) -> dict[str, Any] | None: + """Get a request by id.""" + return _store.get(req_id) + + +def update_status(req_id: str, status: str) -> None: + """Update the status of a request.""" + if req_id in _store: + _store[req_id]["status"] = status + + +def exists(req_id: str) -> bool: + """Check if a request exists.""" + return req_id in _store