From 4b9defb65e1b191326975d3cc475f8e88db39da9 Mon Sep 17 00:00:00 2001 From: pablorilo Date: Sun, 15 Feb 2026 17:06:33 +0100 Subject: [PATCH 1/9] feat: implement Pydantic models and in-memory store for notification requests --- app/main.py | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/app/main.py b/app/main.py index fe37819..a7bf9ab 100644 --- a/app/main.py +++ b/app/main.py @@ -1,3 +1,34 @@ -from fastapi import FastAPI +from fastapi import FastAPI +from pydantic import BaseModel, Field +from typing import Literal +import uuid app = FastAPI(title="Notification Service (Technical Test)") + + +# --- Pydantic models --- + +class RequestCreate(BaseModel): + user_input: str = Field( + ..., example="Manda un mail a feda@test.com diciendo hola" + ) + + +class ResponseCreate(BaseModel): + id: str = Field(..., example="550e8400-e29b-41d4-a716-446655440000") # noqa: E501 + + +RequestStatus = Literal["queued", "processing", "sent", "failed"] + + +class ResponseStatus(BaseModel): + id: str = Field(..., example="550e8400-e29b-41d4-a716-446655440000") + status: RequestStatus = Field(..., example="queued") + + +# --- In-memory store --- +_store: dict[str, dict] = {} # id -> {user_input, status, created_at} + + +def _generate_id() -> str: + return str(uuid.uuid4()) From b7511aaf236a252ab5798e69dfd0fe2c3b0d23aa Mon Sep 17 00:00:00 2001 From: pablorilo Date: Sun, 15 Feb 2026 17:07:36 +0100 Subject: [PATCH 2/9] feat: add endpoints for creating requests and checking request status --- app/main.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/app/main.py b/app/main.py index a7bf9ab..8e6a3e7 100644 --- a/app/main.py +++ b/app/main.py @@ -1,4 +1,4 @@ -from fastapi import FastAPI +from fastapi import FastAPI, HTTPException from pydantic import BaseModel, Field from typing import Literal import uuid @@ -32,3 +32,23 @@ class ResponseStatus(BaseModel): def _generate_id() -> str: return str(uuid.uuid4()) + + +# --- Endpoints --- +@app.post("/v1/requests", status_code=201, response_model=ResponseCreate) +def create_request(body: RequestCreate) -> ResponseCreate: + """Ingesta de intenciones: crea una nueva solicitud en cola.""" + req_id = _generate_id() + _store[req_id] = { + "user_input": body.user_input, + "status": "queued", + } + return ResponseCreate(id=req_id) + + +@app.get("/v1/requests/{req_id}", response_model=ResponseStatus) +def get_request_status(req_id: str) -> ResponseStatus: + """Consulta de estado de una solicitud.""" + if req_id not in _store: + raise HTTPException(status_code=404, detail="Request not found") + return ResponseStatus(id=req_id, status=_store[req_id]["status"]) From d1ad7d256eeec2bc6452a724c0dd4d9f15d0a210 Mon Sep 17 00:00:00 2001 From: pablorilo Date: Sun, 15 Feb 2026 17:08:56 +0100 Subject: [PATCH 3/9] feat: implement HTTP client for AI extract and notification provider --- app/client.py | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 app/client.py diff --git a/app/client.py b/app/client.py new file mode 100644 index 0000000..d8ef0d0 --- /dev/null +++ b/app/client.py @@ -0,0 +1,48 @@ +"""HTTP client for the AI extract and notification provider.""" + +import httpx + +PROVIDER_BASE = "http://localhost:3001" +API_KEY = "test-dev-2026" + +_HEADERS = {"X-API-Key": API_KEY, "Content-Type": "application/json"} + + +async def ai_extract(system_prompt: str, user_input: str) -> str: + """ + Call the AI extract endpoint and return the assistant content. + Raises httpx.HTTPStatusError on 4xx/5xx. + """ + payload = { + "messages": [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_input}, + ] + } + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.post( + f"{PROVIDER_BASE}/v1/ai/extract", + json=payload, + headers=_HEADERS, + ) + resp.raise_for_status() + data = resp.json() + choices = data.get("choices", []) + if not choices: + return "" + return choices[0].get("message", {}).get("content", "") + + +async def notify(to: str, message: str, type_: str) -> None: + """ + Send a notification via the provider. + Raises httpx.HTTPStatusError on 4xx/5xx. + """ + payload = {"to": to, "message": message, "type": type_} # type_ -> "type" in JSON + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.post( + f"{PROVIDER_BASE}/v1/notify", + json=payload, + headers=_HEADERS, + ) + resp.raise_for_status() From e95360f283696512b0b4ccd3ce3851d190129836 Mon Sep 17 00:00:00 2001 From: pablorilo Date: Sun, 15 Feb 2026 17:17:53 +0100 Subject: [PATCH 4/9] feat: add guardrails for parsing and normalizing AI extraction output --- app/guardrails.py | 139 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 app/guardrails.py diff --git a/app/guardrails.py b/app/guardrails.py new file mode 100644 index 0000000..040431f --- /dev/null +++ b/app/guardrails.py @@ -0,0 +1,139 @@ +"""Guardrails: parse and normalize AI extraction output to {to, message, type}.""" + +import json +import re +# Alternate keys the mock may return -> canonical keys +_TO_KEYS = ("to", "recipient", "destination") # "To" -> to via lowercase +_MESSAGE_KEYS = ("message", "body", "text") +_TYPE_KEYS = ("type", "channel", "method") + + +def _extract_json_candidate(raw: str) -> str | None: + """Extract a JSON object string from raw text (Markdown blocks, embedded, etc).""" + if not raw or not raw.strip(): + return None + + # 1. Try ```json ... ``` or ``` ... ``` + code_block = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", raw) + if code_block: + return code_block.group(1).strip() + + # 2. Find {...} in text + start = raw.find("{") + if start == -1: + return None + + depth = 0 + in_string = False + escape = False + quote_char = None + i = start + + while i < len(raw): + c = raw[i] + if escape: + escape = False + i += 1 + continue + if c == "\\" and in_string: + escape = True + i += 1 + continue + if not in_string: + if c == "{": + depth += 1 + elif c == "}": + depth -= 1 + if depth == 0: + return raw[start : i + 1] + elif c in ('"', "'"): + in_string = True + quote_char = c + else: + if c == quote_char: + in_string = False + i += 1 + + return None # Unbalanced braces (truncated) + + +def _try_parse_json(s: str) -> dict | None: + """Try to parse JSON, including single-quoted variant.""" + # 1. Standard JSON + try: + return json.loads(s) + except json.JSONDecodeError: + pass + + # 2. Single-quoted keys/values (invalid JSON but common in Python) + try: + normalized = re.sub( + r"'([^']*)'(\s*[:,}\]])", + lambda m: json.dumps(m.group(1)) + m.group(2), + s, + ) + return json.loads(normalized) + except (json.JSONDecodeError, ValueError): + pass + + # 3. Unquoted keys: {to: "x", message: "y", type: "email"} + try: + fixed = re.sub(r"(\w+)\s*:", r'"\1":', s) + return json.loads(fixed) + except json.JSONDecodeError: + pass + + return None + + +def _normalize_keys(data: dict) -> dict | None: + """Map alternate keys to canonical {to, message, type}. Validate type.""" + result = {} + by_lower = {k.lower(): (k, v) for k, v in data.items() if isinstance(k, str)} + + # to + for k in _TO_KEYS: + if k in by_lower: + _, val = by_lower[k] + if val and isinstance(val, str): + result["to"] = val.strip() + break + if "to" not in result: + return None + + # message + for k in _MESSAGE_KEYS: + if k in by_lower: + _, val = by_lower[k] + result["message"] = str(val).strip() if val else "" + break + result.setdefault("message", "") + + # type: must be email or sms + for k in _TYPE_KEYS: + if k in by_lower: + _, val = by_lower[k] + if val and isinstance(val, str): + t = val.strip().lower() + if t in ("email", "sms"): + result["type"] = t + return result + break + return None + + +def parse_extraction(raw: str) -> dict | None: + """ + Parse AI extraction output into {to, message, type} or None if invalid. + Handles: Markdown blocks, alternate keys, single quotes, unquoted keys, + extra fields, missing message. Fails on: truncation, refusals, missing to/type. + """ + candidate = _extract_json_candidate(raw) + if not candidate: + return None + + parsed = _try_parse_json(candidate) + if not isinstance(parsed, dict): + return None + + return _normalize_keys(parsed) From a1c316126c0795fa2ae2b7fa81a56f4d7dd48448 Mon Sep 17 00:00:00 2001 From: pablorilo Date: Sun, 15 Feb 2026 17:20:42 +0100 Subject: [PATCH 5/9] feat: add background processing for notification requests with AI extraction and notification handling --- app/main.py | 62 ++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/app/main.py b/app/main.py index 8e6a3e7..3f52192 100644 --- a/app/main.py +++ b/app/main.py @@ -1,10 +1,22 @@ -from fastapi import FastAPI, HTTPException +from fastapi import BackgroundTasks, FastAPI, HTTPException +from fastapi.responses import Response from pydantic import BaseModel, Field from typing import Literal import uuid +import httpx + +from client import ai_extract, notify +from guardrails import parse_extraction + app = FastAPI(title="Notification Service (Technical Test)") +SYSTEM_PROMPT = ( + "Extract notification intent from the user message. Return ONLY a valid JSON " + 'object with exactly these keys: "to" (email or phone), "message" (content), ' + '"type" (either "email" or "sms"). No explanation, no markdown.' +) + # --- Pydantic models --- @@ -34,6 +46,39 @@ def _generate_id() -> str: return str(uuid.uuid4()) +async def _run_pipeline(req_id: str) -> None: + """Extract -> guardrails -> notify. Updates _store status.""" + entry = _store.get(req_id) + if not entry: + return + + entry["status"] = "processing" + user_input = entry.get("user_input", "") + + try: + content = await ai_extract(SYSTEM_PROMPT, user_input) + except httpx.HTTPStatusError: + entry["status"] = "failed" + return + + parsed = parse_extraction(content) + if not parsed: + entry["status"] = "failed" + return + + try: + await notify( + to=parsed["to"], + message=parsed["message"], + type_=parsed["type"], + ) + except httpx.HTTPStatusError: + entry["status"] = "failed" + return + + entry["status"] = "sent" + + # --- Endpoints --- @app.post("/v1/requests", status_code=201, response_model=ResponseCreate) def create_request(body: RequestCreate) -> ResponseCreate: @@ -46,6 +91,21 @@ def create_request(body: RequestCreate) -> ResponseCreate: return ResponseCreate(id=req_id) +@app.post("/v1/requests/{req_id}/process") +async def process_request(req_id: str, background_tasks: BackgroundTasks) -> Response: + """Procesamiento de envío: extrae, valida y notifica en background.""" + if req_id not in _store: + raise HTTPException(status_code=404, detail="Request not found") + + status = _store[req_id]["status"] + if status in ("processing", "sent", "failed"): + return Response(status_code=200) + + _store[req_id]["status"] = "processing" + background_tasks.add_task(_run_pipeline, req_id) + return Response(status_code=202) + + @app.get("/v1/requests/{req_id}", response_model=ResponseStatus) def get_request_status(req_id: str) -> ResponseStatus: """Consulta de estado de una solicitud.""" From 9761fb41f523a32ef154698ec665826564f74c36 Mon Sep 17 00:00:00 2001 From: pablorilo Date: Sun, 15 Feb 2026 17:34:45 +0100 Subject: [PATCH 6/9] feat: enhance HTTP client with connection pooling and retry logic for rate limits and server errors --- app/client.py | 72 +++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 55 insertions(+), 17 deletions(-) diff --git a/app/client.py b/app/client.py index d8ef0d0..8bde98a 100644 --- a/app/client.py +++ b/app/client.py @@ -1,12 +1,45 @@ """HTTP client for the AI extract and notification provider.""" import httpx +from tenacity import ( + retry, + retry_if_exception, + stop_after_attempt, + wait_exponential, +) PROVIDER_BASE = "http://localhost:3001" API_KEY = "test-dev-2026" _HEADERS = {"X-API-Key": API_KEY, "Content-Type": "application/json"} +# Shared client with connection pooling to avoid EOF/reset under load +_client: httpx.AsyncClient | None = None + + +def _get_client() -> httpx.AsyncClient: + global _client + if _client is None: + _client = httpx.AsyncClient( + timeout=30.0, + limits=httpx.Limits(max_connections=200, max_keepalive_connections=50), + ) + return _client + + +async def close_client() -> None: + global _client + if _client: + await _client.aclose() + _client = None + + +def _retry_on_rate_limit_or_server_error(exc: BaseException) -> bool: + """Retry on 429 (rate limit) or 5xx (server error).""" + if isinstance(exc, httpx.HTTPStatusError): + return exc.response.status_code in (429, 500, 502, 503, 504) + return False + async def ai_extract(system_prompt: str, user_input: str) -> str: """ @@ -19,13 +52,13 @@ async def ai_extract(system_prompt: str, user_input: str) -> str: {"role": "user", "content": user_input}, ] } - async with httpx.AsyncClient(timeout=30.0) as client: - resp = await client.post( - f"{PROVIDER_BASE}/v1/ai/extract", - json=payload, - headers=_HEADERS, - ) - resp.raise_for_status() + client = _get_client() + resp = await client.post( + f"{PROVIDER_BASE}/v1/ai/extract", + json=payload, + headers=_HEADERS, + ) + resp.raise_for_status() data = resp.json() choices = data.get("choices", []) if not choices: @@ -33,16 +66,21 @@ async def ai_extract(system_prompt: str, user_input: str) -> str: return choices[0].get("message", {}).get("content", "") +@retry( + retry=retry_if_exception(_retry_on_rate_limit_or_server_error), + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=1, max=10), +) async def notify(to: str, message: str, type_: str) -> None: """ - Send a notification via the provider. - Raises httpx.HTTPStatusError on 4xx/5xx. + Send a notification via the provider. Retries on 429/5xx (up to 3 attempts). + Raises httpx.HTTPStatusError on 4xx/5xx after retries exhausted. """ - payload = {"to": to, "message": message, "type": type_} # type_ -> "type" in JSON - async with httpx.AsyncClient(timeout=30.0) as client: - resp = await client.post( - f"{PROVIDER_BASE}/v1/notify", - json=payload, - headers=_HEADERS, - ) - resp.raise_for_status() + payload = {"to": to, "message": message, "type": type_} + client = _get_client() + resp = await client.post( + f"{PROVIDER_BASE}/v1/notify", + json=payload, + headers=_HEADERS, + ) + resp.raise_for_status() From 49c4eb7dea8b649cf6b9ffaaa448c1f3231dcbea Mon Sep 17 00:00:00 2001 From: pablorilo Date: Sun, 15 Feb 2026 17:39:10 +0100 Subject: [PATCH 7/9] feat: implement async context manager for HTTP client lifecycle and throttle concurrent processing with semaphore --- app/main.py | 79 +++++++++++++++++++++++++++++++---------------------- 1 file changed, 47 insertions(+), 32 deletions(-) diff --git a/app/main.py b/app/main.py index 3f52192..c4cd45e 100644 --- a/app/main.py +++ b/app/main.py @@ -1,3 +1,6 @@ +import asyncio +from contextlib import asynccontextmanager + from fastapi import BackgroundTasks, FastAPI, HTTPException from fastapi.responses import Response from pydantic import BaseModel, Field @@ -6,10 +9,21 @@ import httpx -from client import ai_extract, notify +from client import ai_extract, close_client, notify from guardrails import parse_extraction -app = FastAPI(title="Notification Service (Technical Test)") + +@asynccontextmanager +async def lifespan(app: FastAPI): + yield + await close_client() + + +# Throttle concurrent pipelines to avoid overwhelming the server +_pipeline_semaphore = asyncio.Semaphore(30) + + +app = FastAPI(title="Notification Service (Technical Test)", lifespan=lifespan) SYSTEM_PROMPT = ( "Extract notification intent from the user message. Return ONLY a valid JSON " @@ -47,36 +61,37 @@ def _generate_id() -> str: async def _run_pipeline(req_id: str) -> None: - """Extract -> guardrails -> notify. Updates _store status.""" - entry = _store.get(req_id) - if not entry: - return - - entry["status"] = "processing" - user_input = entry.get("user_input", "") - - try: - content = await ai_extract(SYSTEM_PROMPT, user_input) - except httpx.HTTPStatusError: - entry["status"] = "failed" - return - - parsed = parse_extraction(content) - if not parsed: - entry["status"] = "failed" - return - - try: - await notify( - to=parsed["to"], - message=parsed["message"], - type_=parsed["type"], - ) - except httpx.HTTPStatusError: - entry["status"] = "failed" - return - - entry["status"] = "sent" + """Extract -> guardrails -> notify. Limited by semaphore.""" + async with _pipeline_semaphore: + entry = _store.get(req_id) + if not entry: + return + + entry["status"] = "processing" + user_input = entry.get("user_input", "") + + try: + content = await ai_extract(SYSTEM_PROMPT, user_input) + except httpx.HTTPStatusError: + entry["status"] = "failed" + return + + parsed = parse_extraction(content) + if not parsed: + entry["status"] = "failed" + return + + try: + await notify( + to=parsed["to"], + message=parsed["message"], + type_=parsed["type"], + ) + except httpx.HTTPStatusError: + entry["status"] = "failed" + return + + entry["status"] = "sent" # --- Endpoints --- From f3729f67170dc6785b4ffb3cdb5e41f6628e9b96 Mon Sep 17 00:00:00 2001 From: pablorilo Date: Sun, 15 Feb 2026 17:44:39 +0100 Subject: [PATCH 8/9] feat: refactor notification service with new pipeline and schemas, enhancing request processing and state management --- app/guardrails.py | 15 ++++--- app/main.py | 106 +++++++--------------------------------------- app/pipeline.py | 52 +++++++++++++++++++++++ app/schemas.py | 25 +++++++++++ app/store.py | 29 +++++++++++++ 5 files changed, 131 insertions(+), 96 deletions(-) create mode 100644 app/pipeline.py create mode 100644 app/schemas.py create mode 100644 app/store.py diff --git a/app/guardrails.py b/app/guardrails.py index 040431f..ba95d1a 100644 --- a/app/guardrails.py +++ b/app/guardrails.py @@ -1,4 +1,5 @@ -"""Guardrails: parse and normalize AI extraction output to {to, message, type}.""" +"""Guardrails: parse and normalize AI extraction output to +{to, message, type}.""" import json import re @@ -9,7 +10,8 @@ def _extract_json_candidate(raw: str) -> str | None: - """Extract a JSON object string from raw text (Markdown blocks, embedded, etc).""" + """Extract a JSON object string from raw text (Markdown blocks, + embedded, etc).""" if not raw or not raw.strip(): return None @@ -45,7 +47,7 @@ def _extract_json_candidate(raw: str) -> str | None: elif c == "}": depth -= 1 if depth == 0: - return raw[start : i + 1] + return raw[start: i + 1] elif c in ('"', "'"): in_string = True quote_char = c @@ -89,7 +91,9 @@ def _try_parse_json(s: str) -> dict | None: def _normalize_keys(data: dict) -> dict | None: """Map alternate keys to canonical {to, message, type}. Validate type.""" result = {} - by_lower = {k.lower(): (k, v) for k, v in data.items() if isinstance(k, str)} + by_lower = { + k.lower(): (k, v) for k, v in data.items() if isinstance(k, str) + } # to for k in _TO_KEYS: @@ -126,7 +130,8 @@ def parse_extraction(raw: str) -> dict | None: """ Parse AI extraction output into {to, message, type} or None if invalid. Handles: Markdown blocks, alternate keys, single quotes, unquoted keys, - extra fields, missing message. Fails on: truncation, refusals, missing to/type. + extra fields, missing message. Fails on: truncation, refusals, missing + to/type. """ candidate = _extract_json_candidate(raw) if not candidate: diff --git a/app/main.py b/app/main.py index c4cd45e..4c0d955 100644 --- a/app/main.py +++ b/app/main.py @@ -1,16 +1,14 @@ -import asyncio +"""FastAPI application for the notification service.""" + from contextlib import asynccontextmanager from fastapi import BackgroundTasks, FastAPI, HTTPException from fastapi.responses import Response -from pydantic import BaseModel, Field -from typing import Literal -import uuid - -import httpx -from client import ai_extract, close_client, notify -from guardrails import parse_extraction +from client import close_client +from pipeline import run as run_pipeline +from schemas import RequestCreate, ResponseCreate, ResponseStatus +from store import create, get, update_status @asynccontextmanager @@ -19,111 +17,37 @@ async def lifespan(app: FastAPI): await close_client() -# Throttle concurrent pipelines to avoid overwhelming the server -_pipeline_semaphore = asyncio.Semaphore(30) - - app = FastAPI(title="Notification Service (Technical Test)", lifespan=lifespan) -SYSTEM_PROMPT = ( - "Extract notification intent from the user message. Return ONLY a valid JSON " - 'object with exactly these keys: "to" (email or phone), "message" (content), ' - '"type" (either "email" or "sms"). No explanation, no markdown.' -) - - -# --- Pydantic models --- - -class RequestCreate(BaseModel): - user_input: str = Field( - ..., example="Manda un mail a feda@test.com diciendo hola" - ) - - -class ResponseCreate(BaseModel): - id: str = Field(..., example="550e8400-e29b-41d4-a716-446655440000") # noqa: E501 - - -RequestStatus = Literal["queued", "processing", "sent", "failed"] - - -class ResponseStatus(BaseModel): - id: str = Field(..., example="550e8400-e29b-41d4-a716-446655440000") - status: RequestStatus = Field(..., example="queued") - - -# --- In-memory store --- -_store: dict[str, dict] = {} # id -> {user_input, status, created_at} - - -def _generate_id() -> str: - return str(uuid.uuid4()) - - -async def _run_pipeline(req_id: str) -> None: - """Extract -> guardrails -> notify. Limited by semaphore.""" - async with _pipeline_semaphore: - entry = _store.get(req_id) - if not entry: - return - - entry["status"] = "processing" - user_input = entry.get("user_input", "") - - try: - content = await ai_extract(SYSTEM_PROMPT, user_input) - except httpx.HTTPStatusError: - entry["status"] = "failed" - return - - parsed = parse_extraction(content) - if not parsed: - entry["status"] = "failed" - return - - try: - await notify( - to=parsed["to"], - message=parsed["message"], - type_=parsed["type"], - ) - except httpx.HTTPStatusError: - entry["status"] = "failed" - return - - entry["status"] = "sent" - # --- Endpoints --- @app.post("/v1/requests", status_code=201, response_model=ResponseCreate) def create_request(body: RequestCreate) -> ResponseCreate: """Ingesta de intenciones: crea una nueva solicitud en cola.""" - req_id = _generate_id() - _store[req_id] = { - "user_input": body.user_input, - "status": "queued", - } + req_id = create(body.user_input) return ResponseCreate(id=req_id) @app.post("/v1/requests/{req_id}/process") async def process_request(req_id: str, background_tasks: BackgroundTasks) -> Response: """Procesamiento de envío: extrae, valida y notifica en background.""" - if req_id not in _store: + entry = get(req_id) + if not entry: raise HTTPException(status_code=404, detail="Request not found") - status = _store[req_id]["status"] + status = entry["status"] if status in ("processing", "sent", "failed"): return Response(status_code=200) - _store[req_id]["status"] = "processing" - background_tasks.add_task(_run_pipeline, req_id) + update_status(req_id, "processing") + background_tasks.add_task(run_pipeline, req_id) return Response(status_code=202) @app.get("/v1/requests/{req_id}", response_model=ResponseStatus) def get_request_status(req_id: str) -> ResponseStatus: """Consulta de estado de una solicitud.""" - if req_id not in _store: + entry = get(req_id) + if not entry: raise HTTPException(status_code=404, detail="Request not found") - return ResponseStatus(id=req_id, status=_store[req_id]["status"]) + return ResponseStatus(id=req_id, status=entry["status"]) diff --git a/app/pipeline.py b/app/pipeline.py new file mode 100644 index 0000000..d2ee8ea --- /dev/null +++ b/app/pipeline.py @@ -0,0 +1,52 @@ +"""Pipeline: extract -> guardrails -> notify.""" + +import asyncio + +import httpx + +from client import ai_extract, notify +from guardrails import parse_extraction +from store import get, update_status + +SYSTEM_PROMPT = ( + "Extract notification intent from the user message. " + "Return ONLY a valid JSON object with keys: " + '"to" (email or phone), "message" (content), "type" ("email" or "sms"). ' + "No explanation, no markdown." +) + +_pipeline_semaphore = asyncio.Semaphore(30) + + +async def run(req_id: str) -> None: + """Extract -> guardrails -> notify. Limited by semaphore.""" + async with _pipeline_semaphore: + entry = get(req_id) + if not entry: + return + + update_status(req_id, "processing") + user_input = entry.get("user_input", "") + + try: + content = await ai_extract(SYSTEM_PROMPT, user_input) + except httpx.HTTPStatusError: + update_status(req_id, "failed") + return + + parsed = parse_extraction(content) + if not parsed: + update_status(req_id, "failed") + return + + try: + await notify( + to=parsed["to"], + message=parsed["message"], + type_=parsed["type"], + ) + except httpx.HTTPStatusError: + update_status(req_id, "failed") + return + + update_status(req_id, "sent") diff --git a/app/schemas.py b/app/schemas.py new file mode 100644 index 0000000..7f4a4ef --- /dev/null +++ b/app/schemas.py @@ -0,0 +1,25 @@ +"""Pydantic schemas for the notification service API.""" + +from typing import Literal + +from pydantic import BaseModel, Field + + +class RequestCreate(BaseModel): + user_input: str = Field( + ..., example="Manda un mail a feda@test.com diciendo hola" + ) + + +class ResponseCreate(BaseModel): + id: str = Field( # noqa: E501 + ..., example="550e8400-e29b-41d4-a716-446655440000" + ) + + +RequestStatus = Literal["queued", "processing", "sent", "failed"] + + +class ResponseStatus(BaseModel): + id: str = Field(..., example="550e8400-e29b-41d4-a716-446655440000") + status: RequestStatus = Field(..., example="queued") diff --git a/app/store.py b/app/store.py new file mode 100644 index 0000000..9ba0b42 --- /dev/null +++ b/app/store.py @@ -0,0 +1,29 @@ +"""In-memory store for notification request state.""" + +import uuid +from typing import Any + +_store: dict[str, dict[str, Any]] = {} + + +def create(user_input: str) -> str: + """Create a new request and return its id.""" + req_id = str(uuid.uuid4()) + _store[req_id] = {"user_input": user_input, "status": "queued"} + return req_id + + +def get(req_id: str) -> dict[str, Any] | None: + """Get a request by id.""" + return _store.get(req_id) + + +def update_status(req_id: str, status: str) -> None: + """Update the status of a request.""" + if req_id in _store: + _store[req_id]["status"] = status + + +def exists(req_id: str) -> bool: + """Check if a request exists.""" + return req_id in _store From 0d63b9ee388c61909e19c637e1316f92ba6dfe5c Mon Sep 17 00:00:00 2001 From: pablorilo Date: Sun, 15 Feb 2026 17:50:42 +0100 Subject: [PATCH 9/9] refactor: update SYSTEM_PROMPT for notification intent extraction with detailed output and validation rules --- app/pipeline.py | 67 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 61 insertions(+), 6 deletions(-) diff --git a/app/pipeline.py b/app/pipeline.py index d2ee8ea..3964f76 100644 --- a/app/pipeline.py +++ b/app/pipeline.py @@ -8,12 +8,67 @@ from guardrails import parse_extraction from store import get, update_status -SYSTEM_PROMPT = ( - "Extract notification intent from the user message. " - "Return ONLY a valid JSON object with keys: " - '"to" (email or phone), "message" (content), "type" ("email" or "sms"). ' - "No explanation, no markdown." -) +SYSTEM_PROMPT = """You are a notification intent extractor. Your task is to +parse a natural language request and output structured JSON. + +## Output Schema (STRICT) +Return exactly one raw JSON object with these three keys — no additional keys, +no nesting, no explanations: +- "to": string — destination (email address or phone number) +- "message": string — the text/body to send +- "type": string — MUST be exactly "email" or "sms" (lowercase) + +The response must contain ONLY the JSON object. +Do not include markdown code blocks. +Do not include commentary. +Use double quotes for keys and string values. + +## Extraction Rules + +1. Destination ("to"): + - Email: extract any valid email matching user@domain.tld + - Phone: extract digits in formats like: + - 6XXXXXXXXX + - 6XX-XXX-XXX + - Keep the phone number exactly as written. + - If multiple valid destinations are present, use the first one. + - Never invent a destination. + +2. Type ("type"): + - "email" if: + - The text contains: email, correo, mail, e-mail + - OR the destination is an email address + - "sms" if: + - The text contains: sms, teléfono, tel, móvil, celular, phone + - OR the destination is a phone number + - If both phone and email appear, choose the type that matches the + extracted destination. + - The value must be exactly "email" or "sms". + +3. Message ("message"): + - Extract text after indicators such as: + - "diciendo" + - ":" + - "con el mensaje" + - "que" (only if clearly introducing the message after the destination) + - Preserve the user's wording. + - Do not paraphrase. + - Trim leading/trailing whitespace. + - If no explicit message is found, use a short placeholder like + "Recordatorio". + +## Validation Rules +- Do not hallucinate or fabricate missing data. +- If a valid destination cannot be confidently extracted, return: + {"to":"","message":"","type":""} +- Ensure the output is valid JSON. +- Output only the JSON object and nothing else. + +## Example +Input: "Manda un correo a feda@test.com diciendo hola" +Output: {"to":"feda@test.com","message":"hola","type":"email"} +""" + _pipeline_semaphore = asyncio.Semaphore(30)