diff --git a/packages/backend/app/services/webhooks.py b/packages/backend/app/services/webhooks.py new file mode 100644 index 000000000..618fdefc7 --- /dev/null +++ b/packages/backend/app/services/webhooks.py @@ -0,0 +1,378 @@ +"""Webhook Event System for FinMind. + +Provides a WebhookManager that allows registering webhook URLs for specific +event types and triggering those webhooks when events occur. Includes +exponential-backoff retry logic and configurable timeout handling. + +Supported event types: + - trade_signal + - portfolio_update + - risk_alert + - market_anomaly +""" + +from __future__ import annotations + +import hashlib +import hmac +import json +import logging +import threading +import time +import uuid +from dataclasses import dataclass, field +from enum import Enum +from typing import Any + +import requests + +logger = logging.getLogger("finmind.webhooks") + + +class WebhookEventType(str, Enum): + """Supported webhook event types.""" + + TRADE_SIGNAL = "trade_signal" + PORTFOLIO_UPDATE = "portfolio_update" + RISK_ALERT = "risk_alert" + MARKET_ANOMALY = "market_anomaly" + + +@dataclass +class WebhookRegistration: + """Represents a registered webhook endpoint.""" + + id: str + url: str + events: list[str] + secret: str + active: bool = True + created_at: float = field(default_factory=time.time) + + def to_dict(self) -> dict[str, Any]: + return { + "id": self.id, + "url": self.url, + "events": self.events, + "secret": self.secret, + "active": self.active, + "created_at": self.created_at, + } + + +@dataclass +class WebhookDelivery: + """Tracks a single webhook delivery attempt.""" + + id: str + webhook_id: str + event_type: str + payload: dict[str, Any] + status_code: int | None = None + success: bool = False + attempts: int = 0 + last_attempt_at: float | None = None + error: str | None = None + + def to_dict(self) -> dict[str, Any]: + return { + "id": self.id, + "webhook_id": self.webhook_id, + "event_type": self.event_type, + "payload": self.payload, + "status_code": self.status_code, + "success": self.success, + "attempts": self.attempts, + "last_attempt_at": self.last_attempt_at, + "error": self.error, + } + + +class WebhookManager: + """Manages webhook registrations and event dispatching. + + Usage:: + + manager = WebhookManager() + manager.register_webhook( + "https://example.com/hook", + events=["trade_signal", "risk_alert"], + ) + manager.trigger_event("trade_signal", {"symbol": "AAPL", "action": "BUY"}) + + Args: + max_retries: Maximum number of retry attempts per delivery (default 3). + base_delay: Base delay in seconds for exponential backoff (default 1.0). + max_delay: Maximum delay cap in seconds (default 60.0). + timeout: HTTP request timeout in seconds (default 10). + secret_key: Optional global secret for signing payloads. If not provided, + a per-webhook secret is generated on registration. + """ + + def __init__( + self, + max_retries: int = 3, + base_delay: float = 1.0, + max_delay: float = 60.0, + timeout: float = 10.0, + secret_key: str | None = None, + ) -> None: + self._webhooks: dict[str, WebhookRegistration] = {} + self._deliveries: list[WebhookDelivery] = [] + self._max_retries = max_retries + self._base_delay = base_delay + self._max_delay = max_delay + self._timeout = timeout + self._secret_key = secret_key or "" + self._lock = threading.Lock() + + # ------------------------------------------------------------------ + # Registration + # ------------------------------------------------------------------ + + def register_webhook( + self, + url: str, + events: list[str] | None = None, + secret: str | None = None, + ) -> WebhookRegistration: + """Register a new webhook endpoint. + + Args: + url: The HTTPS (or HTTP for testing) URL to deliver events to. + events: List of event types to subscribe to. Defaults to all events. + secret: Optional HMAC secret. Generated automatically if omitted. + + Returns: + The created WebhookRegistration. + + Raises: + ValueError: If *url* is empty or *events* contains unknown types. + """ + if not url: + raise ValueError("Webhook URL must not be empty") + + if events is None: + events = [e.value for e in WebhookEventType] + else: + valid = {e.value for e in WebhookEventType} + unknown = set(events) - valid + if unknown: + raise ValueError(f"Unknown event types: {unknown}") + + registration = WebhookRegistration( + id=str(uuid.uuid4()), + url=url, + events=list(events), + secret=secret or uuid.uuid4().hex, + ) + + with self._lock: + self._webhooks[registration.id] = registration + + logger.info( + "Registered webhook %s -> %s (events: %s)", + registration.id, + url, + ", ".join(events), + ) + return registration + + def unregister_webhook(self, webhook_id: str) -> bool: + """Remove a registered webhook. Returns True if found and removed.""" + with self._lock: + removed = self._webhooks.pop(webhook_id, None) + if removed: + logger.info("Unregistered webhook %s", webhook_id) + return True + return False + + def list_webhooks(self) -> list[WebhookRegistration]: + """Return all registered webhooks.""" + with self._lock: + return list(self._webhooks.values()) + + def get_webhook(self, webhook_id: str) -> WebhookRegistration | None: + """Return a single webhook by ID, or None.""" + return self._webhooks.get(webhook_id) + + # ------------------------------------------------------------------ + # Event triggering + # ------------------------------------------------------------------ + + def trigger_event( + self, + event_type: str, + data: dict[str, Any] | None = None, + ) -> list[WebhookDelivery]: + """Trigger an event, delivering to all matching webhooks. + + Args: + event_type: One of the supported WebhookEventType values. + data: Arbitrary JSON-serializable payload. + + Returns: + List of WebhookDelivery objects (one per matching webhook). + + Raises: + ValueError: If *event_type* is not a known event type. + """ + valid_types = {e.value for e in WebhookEventType} + if event_type not in valid_types: + raise ValueError( + f"Unknown event type '{event_type}'. Valid: {sorted(valid_types)}" + ) + + payload = { + "event_type": event_type, + "data": data or {}, + "timestamp": time.time(), + "delivery_id": str(uuid.uuid4()), + } + + deliveries: list[WebhookDelivery] = [] + + with self._lock: + matching = [ + w for w in self._webhooks.values() if w.active and event_type in w.events + ] + + for webhook in matching: + delivery = self._deliver(webhook, payload) + deliveries.append(delivery) + + logger.info( + "Event '%s' triggered → %d delivery(s)", event_type, len(deliveries) + ) + return deliveries + + # ------------------------------------------------------------------ + # Delivery internals + # ------------------------------------------------------------------ + + def _deliver( + self, + webhook: WebhookRegistration, + payload: dict[str, Any], + ) -> WebhookDelivery: + """Deliver a payload to a webhook with exponential-backoff retry.""" + delivery = WebhookDelivery( + id=payload.get("delivery_id", str(uuid.uuid4())), + webhook_id=webhook.id, + event_type=payload["event_type"], + payload=payload, + ) + + body = json.dumps(payload, default=str) + signature = self._sign(webhook.secret, body) + + headers = { + "Content-Type": "application/json", + "X-FinMind-Signature": signature, + "X-FinMind-Event": payload["event_type"], + "X-FinMind-Delivery": delivery.id, + } + + last_error: str | None = None + + for attempt in range(1, self._max_retries + 1): + delivery.attempts = attempt + delivery.last_attempt_at = time.time() + + try: + resp = requests.post( + webhook.url, + data=body, + headers=headers, + timeout=self._timeout, + ) + delivery.status_code = resp.status_code + + if 200 <= resp.status_code < 300: + delivery.success = True + logger.info( + "Webhook %s delivered (status=%d, attempt=%d)", + webhook.id, + resp.status_code, + attempt, + ) + break + else: + last_error = f"HTTP {resp.status_code}" + logger.warning( + "Webhook %s returned %d (attempt %d/%d)", + webhook.id, + resp.status_code, + attempt, + self._max_retries, + ) + + except requests.RequestException as exc: + last_error = str(exc) + logger.warning( + "Webhook %s request failed: %s (attempt %d/%d)", + webhook.id, + exc, + attempt, + self._max_retries, + ) + + if attempt < self._max_retries: + delay = min(self._base_delay * (2 ** (attempt - 1)), self._max_delay) + logger.debug("Retrying in %.1fs …", delay) + time.sleep(delay) + + if not delivery.success: + delivery.error = last_error + logger.error( + "Webhook %s failed after %d attempts: %s", + webhook.id, + self._max_retries, + last_error, + ) + + with self._lock: + self._deliveries.append(delivery) + + return delivery + + # ------------------------------------------------------------------ + # HMAC signing + # ------------------------------------------------------------------ + + @staticmethod + def _sign(secret: str, body: str) -> str: + """Compute an HMAC-SHA256 signature for the payload body.""" + return hmac.new( + secret.encode("utf-8"), + body.encode("utf-8"), + hashlib.sha256, + ).hexdigest() + + def verify_signature(self, secret: str, body: str, signature: str) -> bool: + """Verify a payload signature. Returns True if valid.""" + expected = self._sign(secret, body) + return hmac.compare_digest(expected, signature) + + # ------------------------------------------------------------------ + # Delivery history + # ------------------------------------------------------------------ + + def get_deliveries( + self, + webhook_id: str | None = None, + event_type: str | None = None, + ) -> list[WebhookDelivery]: + """Retrieve delivery history, optionally filtered.""" + with self._lock: + results = list(self._deliveries) + if webhook_id: + results = [d for d in results if d.webhook_id == webhook_id] + if event_type: + results = [d for d in results if d.event_type == event_type] + return results + + def clear_deliveries(self) -> None: + """Clear all stored delivery records.""" + with self._lock: + self._deliveries.clear() diff --git a/packages/backend/tests/test_webhooks.py b/packages/backend/tests/test_webhooks.py new file mode 100644 index 000000000..5cd2f5fb4 --- /dev/null +++ b/packages/backend/tests/test_webhooks.py @@ -0,0 +1,300 @@ +"""Tests for the Webhook Event System.""" + +from __future__ import annotations + +import json +import time +from unittest.mock import MagicMock, patch + +import pytest +import requests + +from app.services.webhooks import ( + WebhookDelivery, + WebhookEventType, + WebhookManager, + WebhookRegistration, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _mock_response(status_code: int = 200) -> MagicMock: + resp = MagicMock(spec=requests.Response) + resp.status_code = status_code + resp.text = "ok" + return resp + + +# --------------------------------------------------------------------------- +# Registration tests +# --------------------------------------------------------------------------- + +class TestRegistration: + def test_register_with_defaults(self): + mgr = WebhookManager() + reg = mgr.register_webhook("https://example.com/hook") + assert isinstance(reg, WebhookRegistration) + assert reg.url == "https://example.com/hook" + assert set(reg.events) == {e.value for e in WebhookEventType} + assert reg.active is True + assert len(reg.secret) > 0 + + def test_register_specific_events(self): + mgr = WebhookManager() + reg = mgr.register_webhook( + "https://example.com/hook", + events=["trade_signal", "risk_alert"], + ) + assert reg.events == ["trade_signal", "risk_alert"] + + def test_register_rejects_empty_url(self): + mgr = WebhookManager() + with pytest.raises(ValueError, match="must not be empty"): + mgr.register_webhook("") + + def test_register_rejects_unknown_event(self): + mgr = WebhookManager() + with pytest.raises(ValueError, match="Unknown event types"): + mgr.register_webhook("https://example.com/hook", events=["nonexistent"]) + + def test_unregister(self): + mgr = WebhookManager() + reg = mgr.register_webhook("https://example.com/hook") + assert mgr.unregister_webhook(reg.id) is True + assert mgr.unregister_webhook(reg.id) is False + + def test_list_webhooks(self): + mgr = WebhookManager() + mgr.register_webhook("https://a.com") + mgr.register_webhook("https://b.com") + assert len(mgr.list_webhooks()) == 2 + + def test_get_webhook(self): + mgr = WebhookManager() + reg = mgr.register_webhook("https://example.com/hook") + assert mgr.get_webhook(reg.id) is reg + assert mgr.get_webhook("nonexistent") is None + + +# --------------------------------------------------------------------------- +# Trigger / delivery tests +# --------------------------------------------------------------------------- + +class TestTriggerEvent: + @patch("app.services.webhooks.requests.post") + def test_trigger_delivers_to_matching_webhook(self, mock_post): + mock_post.return_value = _mock_response(200) + mgr = WebhookManager() + mgr.register_webhook("https://example.com/hook", events=["trade_signal"]) + deliveries = mgr.trigger_event("trade_signal", {"symbol": "AAPL"}) + assert len(deliveries) == 1 + assert deliveries[0].success is True + assert deliveries[0].status_code == 200 + mock_post.assert_called_once() + + @patch("app.services.webhooks.requests.post") + def test_trigger_skips_non_matching_webhook(self, mock_post): + mock_post.return_value = _mock_response(200) + mgr = WebhookManager() + mgr.register_webhook("https://example.com/hook", events=["risk_alert"]) + deliveries = mgr.trigger_event("trade_signal") + assert len(deliveries) == 0 + mock_post.assert_not_called() + + def test_trigger_rejects_unknown_event_type(self): + mgr = WebhookManager() + with pytest.raises(ValueError, match="Unknown event type"): + mgr.trigger_event("unknown_event") + + @patch("app.services.webhooks.requests.post") + def test_trigger_all_event_types(self, mock_post): + mock_post.return_value = _mock_response(200) + mgr = WebhookManager() + mgr.register_webhook("https://example.com/hook") # all events + for event_type in WebhookEventType: + deliveries = mgr.trigger_event(event_type.value, {"test": True}) + assert len(deliveries) == 1 + assert deliveries[0].success is True + + @patch("app.services.webhooks.requests.post") + def test_trigger_skips_inactive_webhook(self, mock_post): + mgr = WebhookManager() + reg = mgr.register_webhook("https://example.com/hook", events=["trade_signal"]) + reg.active = False + deliveries = mgr.trigger_event("trade_signal") + assert len(deliveries) == 0 + mock_post.assert_not_called() + + +# --------------------------------------------------------------------------- +# Retry tests +# --------------------------------------------------------------------------- + +class TestRetry: + @patch("app.services.webhooks.requests.post") + @patch("app.services.webhooks.time.sleep") + def test_retry_on_server_error(self, mock_sleep, mock_post): + mock_post.side_effect = [ + _mock_response(500), + _mock_response(500), + _mock_response(200), + ] + mgr = WebhookManager(max_retries=3, base_delay=1.0) + mgr.register_webhook("https://example.com/hook", events=["trade_signal"]) + deliveries = mgr.trigger_event("trade_signal") + assert len(deliveries) == 1 + assert deliveries[0].success is True + assert deliveries[0].attempts == 3 + assert mock_post.call_count == 3 + + @patch("app.services.webhooks.requests.post") + @patch("app.services.webhooks.time.sleep") + def test_retry_exhausted(self, mock_sleep, mock_post): + mock_post.return_value = _mock_response(500) + mgr = WebhookManager(max_retries=3, base_delay=0.1) + mgr.register_webhook("https://example.com/hook", events=["risk_alert"]) + deliveries = mgr.trigger_event("risk_alert") + assert len(deliveries) == 1 + assert deliveries[0].success is False + assert deliveries[0].attempts == 3 + assert deliveries[0].error == "HTTP 500" + assert mock_post.call_count == 3 + + @patch("app.services.webhooks.requests.post") + @patch("app.services.webhooks.time.sleep") + def test_retry_on_connection_error(self, mock_sleep, mock_post): + mock_post.side_effect = requests.ConnectionError("refused") + mgr = WebhookManager(max_retries=2, base_delay=0.1) + mgr.register_webhook("https://example.com/hook", events=["market_anomaly"]) + deliveries = mgr.trigger_event("market_anomaly") + assert len(deliveries) == 1 + assert deliveries[0].success is False + assert deliveries[0].attempts == 2 + + @patch("app.services.webhooks.requests.post") + @patch("app.services.webhooks.time.sleep") + def test_exponential_backoff_delays(self, mock_sleep, mock_post): + mock_post.side_effect = [ + _mock_response(500), + _mock_response(500), + _mock_response(200), + ] + mgr = WebhookManager(max_retries=3, base_delay=2.0, max_delay=30.0) + mgr.register_webhook("https://example.com/hook", events=["trade_signal"]) + mgr.trigger_event("trade_signal") + # delays: 2^0*2=2, 2^1*2=4 + calls = mock_sleep.call_args_list + assert len(calls) == 2 + assert calls[0][0][0] == pytest.approx(2.0) + assert calls[1][0][0] == pytest.approx(4.0) + + +# --------------------------------------------------------------------------- +# Signature tests +# --------------------------------------------------------------------------- + +class TestSignature: + def test_sign_and_verify(self): + mgr = WebhookManager() + body = '{"test": true}' + secret = "mysecret" + sig = mgr._sign(secret, body) + assert mgr.verify_signature(secret, body, sig) is True + assert mgr.verify_signature("wrong", body, sig) is False + + @patch("app.services.webhooks.requests.post") + def test_signature_sent_in_header(self, mock_post): + mock_post.return_value = _mock_response(200) + mgr = WebhookManager() + mgr.register_webhook( + "https://example.com/hook", + events=["trade_signal"], + secret="testsecret", + ) + mgr.trigger_event("trade_signal", {"symbol": "AAPL"}) + + call_kwargs = mock_post.call_args + headers = call_kwargs[1]["headers"] if "headers" in call_kwargs[1] else call_kwargs.kwargs["headers"] + assert "X-FinMind-Signature" in headers + body = call_kwargs[1]["data"] if "data" in call_kwargs[1] else call_kwargs.kwargs["data"] + assert mgr.verify_signature("testsecret", body, headers["X-FinMind-Signature"]) + + +# --------------------------------------------------------------------------- +# Delivery history tests +# --------------------------------------------------------------------------- + +class TestDeliveryHistory: + @patch("app.services.webhooks.requests.post") + def test_get_deliveries(self, mock_post): + mock_post.return_value = _mock_response(200) + mgr = WebhookManager() + mgr.register_webhook("https://example.com/hook", events=["trade_signal"]) + mgr.trigger_event("trade_signal") + assert len(mgr.get_deliveries()) == 1 + + @patch("app.services.webhooks.requests.post") + def test_filter_deliveries_by_webhook(self, mock_post): + mock_post.return_value = _mock_response(200) + mgr = WebhookManager() + reg1 = mgr.register_webhook("https://a.com", events=["trade_signal"]) + mgr.register_webhook("https://b.com", events=["trade_signal"]) + mgr.trigger_event("trade_signal") + assert len(mgr.get_deliveries(webhook_id=reg1.id)) == 1 + + @patch("app.services.webhooks.requests.post") + def test_filter_deliveries_by_event(self, mock_post): + mock_post.return_value = _mock_response(200) + mgr = WebhookManager() + mgr.register_webhook("https://example.com/hook") + mgr.trigger_event("trade_signal") + mgr.trigger_event("risk_alert") + assert len(mgr.get_deliveries(event_type="trade_signal")) == 1 + assert len(mgr.get_deliveries(event_type="risk_alert")) == 1 + + def test_clear_deliveries(self): + mgr = WebhookManager() + mgr._deliveries.append( + WebhookDelivery( + id="test", webhook_id="w1", event_type="test", payload={} + ) + ) + mgr.clear_deliveries() + assert len(mgr.get_deliveries()) == 0 + + +# --------------------------------------------------------------------------- +# Payload structure tests +# --------------------------------------------------------------------------- + +class TestPayloadStructure: + @patch("app.services.webhooks.requests.post") + def test_payload_contains_required_fields(self, mock_post): + mock_post.return_value = _mock_response(200) + mgr = WebhookManager() + mgr.register_webhook("https://example.com/hook", events=["portfolio_update"]) + mgr.trigger_event("portfolio_update", {"total_value": 10000}) + + call_kwargs = mock_post.call_args + body_str = call_kwargs[1]["data"] if "data" in call_kwargs[1] else call_kwargs.kwargs["data"] + body = json.loads(body_str) + assert body["event_type"] == "portfolio_update" + assert body["data"]["total_value"] == 10000 + assert "timestamp" in body + assert "delivery_id" in body + + @patch("app.services.webhooks.requests.post") + def test_custom_headers_sent(self, mock_post): + mock_post.return_value = _mock_response(200) + mgr = WebhookManager() + mgr.register_webhook("https://example.com/hook", events=["risk_alert"]) + mgr.trigger_event("risk_alert") + + headers = mock_post.call_args.kwargs["headers"] + assert headers["Content-Type"] == "application/json" + assert headers["X-FinMind-Event"] == "risk_alert" + assert "X-FinMind-Delivery" in headers + assert "X-FinMind-Signature" in headers