Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions app/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""HTTP client for the AI extract and notification provider."""

import httpx
from tenacity import (
retry,
retry_if_exception,
stop_after_attempt,
wait_exponential,
)

PROVIDER_BASE = "http://localhost:3001"
API_KEY = "test-dev-2026"

_HEADERS = {"X-API-Key": API_KEY, "Content-Type": "application/json"}

# Shared client with connection pooling to avoid EOF/reset under load
_client: httpx.AsyncClient | None = None


def _get_client() -> httpx.AsyncClient:
global _client
if _client is None:
_client = httpx.AsyncClient(
timeout=30.0,
limits=httpx.Limits(max_connections=200, max_keepalive_connections=50),
)
return _client


async def close_client() -> None:
global _client
if _client:
await _client.aclose()
_client = None


def _retry_on_rate_limit_or_server_error(exc: BaseException) -> bool:
"""Retry on 429 (rate limit) or 5xx (server error)."""
if isinstance(exc, httpx.HTTPStatusError):
return exc.response.status_code in (429, 500, 502, 503, 504)
return False


async def ai_extract(system_prompt: str, user_input: str) -> str:
"""
Call the AI extract endpoint and return the assistant content.
Raises httpx.HTTPStatusError on 4xx/5xx.
"""
payload = {
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_input},
]
}
client = _get_client()
resp = await client.post(
f"{PROVIDER_BASE}/v1/ai/extract",
json=payload,
headers=_HEADERS,
)
resp.raise_for_status()
data = resp.json()
choices = data.get("choices", [])
if not choices:
return ""
return choices[0].get("message", {}).get("content", "")


@retry(
retry=retry_if_exception(_retry_on_rate_limit_or_server_error),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
)
async def notify(to: str, message: str, type_: str) -> None:
"""
Send a notification via the provider. Retries on 429/5xx (up to 3 attempts).
Raises httpx.HTTPStatusError on 4xx/5xx after retries exhausted.
"""
payload = {"to": to, "message": message, "type": type_}
client = _get_client()
resp = await client.post(
f"{PROVIDER_BASE}/v1/notify",
json=payload,
headers=_HEADERS,
)
resp.raise_for_status()
144 changes: 144 additions & 0 deletions app/guardrails.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
"""Guardrails: parse and normalize AI extraction output to
{to, message, type}."""

import json
import re
# Alternate keys the mock may return -> canonical keys
_TO_KEYS = ("to", "recipient", "destination") # "To" -> to via lowercase
_MESSAGE_KEYS = ("message", "body", "text")
_TYPE_KEYS = ("type", "channel", "method")


def _extract_json_candidate(raw: str) -> str | None:
"""Extract a JSON object string from raw text (Markdown blocks,
embedded, etc)."""
if not raw or not raw.strip():
return None

# 1. Try ```json ... ``` or ``` ... ```
code_block = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", raw)
if code_block:
return code_block.group(1).strip()

# 2. Find {...} in text
start = raw.find("{")
if start == -1:
return None

depth = 0
in_string = False
escape = False
quote_char = None
i = start

while i < len(raw):
c = raw[i]
if escape:
escape = False
i += 1
continue
if c == "\\" and in_string:
escape = True
i += 1
continue
if not in_string:
if c == "{":
depth += 1
elif c == "}":
depth -= 1
if depth == 0:
return raw[start: i + 1]
elif c in ('"', "'"):
in_string = True
quote_char = c
else:
if c == quote_char:
in_string = False
i += 1

return None # Unbalanced braces (truncated)


def _try_parse_json(s: str) -> dict | None:
"""Try to parse JSON, including single-quoted variant."""
# 1. Standard JSON
try:
return json.loads(s)
except json.JSONDecodeError:
pass

# 2. Single-quoted keys/values (invalid JSON but common in Python)
try:
normalized = re.sub(
r"'([^']*)'(\s*[:,}\]])",
lambda m: json.dumps(m.group(1)) + m.group(2),
s,
)
return json.loads(normalized)
except (json.JSONDecodeError, ValueError):
pass

# 3. Unquoted keys: {to: "x", message: "y", type: "email"}
try:
fixed = re.sub(r"(\w+)\s*:", r'"\1":', s)
return json.loads(fixed)
except json.JSONDecodeError:
pass

return None


def _normalize_keys(data: dict) -> dict | None:
"""Map alternate keys to canonical {to, message, type}. Validate type."""
result = {}
by_lower = {
k.lower(): (k, v) for k, v in data.items() if isinstance(k, str)
}

# to
for k in _TO_KEYS:
if k in by_lower:
_, val = by_lower[k]
if val and isinstance(val, str):
result["to"] = val.strip()
break
if "to" not in result:
return None

# message
for k in _MESSAGE_KEYS:
if k in by_lower:
_, val = by_lower[k]
result["message"] = str(val).strip() if val else ""
break
result.setdefault("message", "")

# type: must be email or sms
for k in _TYPE_KEYS:
if k in by_lower:
_, val = by_lower[k]
if val and isinstance(val, str):
t = val.strip().lower()
if t in ("email", "sms"):
result["type"] = t
return result
break
return None


def parse_extraction(raw: str) -> dict | None:
"""
Parse AI extraction output into {to, message, type} or None if invalid.
Handles: Markdown blocks, alternate keys, single quotes, unquoted keys,
extra fields, missing message. Fails on: truncation, refusals, missing
to/type.
"""
candidate = _extract_json_candidate(raw)
if not candidate:
return None

parsed = _try_parse_json(candidate)
if not isinstance(parsed, dict):
return None

return _normalize_keys(parsed)
54 changes: 52 additions & 2 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,53 @@
from fastapi import FastAPI
"""FastAPI application for the notification service."""

app = FastAPI(title="Notification Service (Technical Test)")
from contextlib import asynccontextmanager

from fastapi import BackgroundTasks, FastAPI, HTTPException
from fastapi.responses import Response

from client import close_client
from pipeline import run as run_pipeline
from schemas import RequestCreate, ResponseCreate, ResponseStatus
from store import create, get, update_status


@asynccontextmanager
async def lifespan(app: FastAPI):
yield
await close_client()


app = FastAPI(title="Notification Service (Technical Test)", lifespan=lifespan)


# --- Endpoints ---
@app.post("/v1/requests", status_code=201, response_model=ResponseCreate)
def create_request(body: RequestCreate) -> ResponseCreate:
"""Ingesta de intenciones: crea una nueva solicitud en cola."""
req_id = create(body.user_input)
return ResponseCreate(id=req_id)


@app.post("/v1/requests/{req_id}/process")
async def process_request(req_id: str, background_tasks: BackgroundTasks) -> Response:
"""Procesamiento de envío: extrae, valida y notifica en background."""
entry = get(req_id)
if not entry:
raise HTTPException(status_code=404, detail="Request not found")

status = entry["status"]
if status in ("processing", "sent", "failed"):
return Response(status_code=200)

update_status(req_id, "processing")
background_tasks.add_task(run_pipeline, req_id)
return Response(status_code=202)


@app.get("/v1/requests/{req_id}", response_model=ResponseStatus)
def get_request_status(req_id: str) -> ResponseStatus:
"""Consulta de estado de una solicitud."""
entry = get(req_id)
if not entry:
raise HTTPException(status_code=404, detail="Request not found")
return ResponseStatus(id=req_id, status=entry["status"])
107 changes: 107 additions & 0 deletions app/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""Pipeline: extract -> guardrails -> notify."""

import asyncio

import httpx

from client import ai_extract, notify
from guardrails import parse_extraction
from store import get, update_status

SYSTEM_PROMPT = """You are a notification intent extractor. Your task is to
parse a natural language request and output structured JSON.

## Output Schema (STRICT)
Return exactly one raw JSON object with these three keys — no additional keys,
no nesting, no explanations:
- "to": string — destination (email address or phone number)
- "message": string — the text/body to send
- "type": string — MUST be exactly "email" or "sms" (lowercase)

The response must contain ONLY the JSON object.
Do not include markdown code blocks.
Do not include commentary.
Use double quotes for keys and string values.

## Extraction Rules

1. Destination ("to"):
- Email: extract any valid email matching user@domain.tld
- Phone: extract digits in formats like:
- 6XXXXXXXXX
- 6XX-XXX-XXX
- Keep the phone number exactly as written.
- If multiple valid destinations are present, use the first one.
- Never invent a destination.

2. Type ("type"):
- "email" if:
- The text contains: email, correo, mail, e-mail
- OR the destination is an email address
- "sms" if:
- The text contains: sms, teléfono, tel, móvil, celular, phone
- OR the destination is a phone number
- If both phone and email appear, choose the type that matches the
extracted destination.
- The value must be exactly "email" or "sms".

3. Message ("message"):
- Extract text after indicators such as:
- "diciendo"
- ":"
- "con el mensaje"
- "que" (only if clearly introducing the message after the destination)
- Preserve the user's wording.
- Do not paraphrase.
- Trim leading/trailing whitespace.
- If no explicit message is found, use a short placeholder like
"Recordatorio".

## Validation Rules
- Do not hallucinate or fabricate missing data.
- If a valid destination cannot be confidently extracted, return:
{"to":"","message":"","type":""}
- Ensure the output is valid JSON.
- Output only the JSON object and nothing else.

## Example
Input: "Manda un correo a feda@test.com diciendo hola"
Output: {"to":"feda@test.com","message":"hola","type":"email"}
"""


_pipeline_semaphore = asyncio.Semaphore(30)


async def run(req_id: str) -> None:
"""Extract -> guardrails -> notify. Limited by semaphore."""
async with _pipeline_semaphore:
entry = get(req_id)
if not entry:
return

update_status(req_id, "processing")
user_input = entry.get("user_input", "")

try:
content = await ai_extract(SYSTEM_PROMPT, user_input)
except httpx.HTTPStatusError:
update_status(req_id, "failed")
return

parsed = parse_extraction(content)
if not parsed:
update_status(req_id, "failed")
return

try:
await notify(
to=parsed["to"],
message=parsed["message"],
type_=parsed["type"],
)
except httpx.HTTPStatusError:
update_status(req_id, "failed")
return

update_status(req_id, "sent")
Loading