diff --git a/api-server/src/handlers.rs b/api-server/src/handlers.rs index b1f1c36..6769491 100644 --- a/api-server/src/handlers.rs +++ b/api-server/src/handlers.rs @@ -1,6 +1,7 @@ use axum::{extract::Path, http::StatusCode, Json}; use tracing::instrument; use crate::schemas::*; +use crate::webhook; /// Timestamp a new IP commitment. Returns the assigned IP ID. #[utoipa::path( @@ -146,6 +147,8 @@ pub async fn initiate_swap(Json(body): Json) -> Result, Json(body): Json) -> Result)> { // TODO: Call Soroban RPC to invoke atomic_swap.accept_swap + // Trigger webhook on status change (Pending -> Accepted) + webhook::trigger_swap_status_changed(swap_id, Some("Pending".to_string()), "Accepted".to_string()); Err(( StatusCode::NOT_FOUND, Json(ErrorResponse { @@ -170,6 +173,8 @@ pub async fn accept_swap(Path(swap_id): Path, Json(body): Json, Json(body): Json) -> Result)> { // TODO: Call Soroban RPC to invoke atomic_swap.reveal_key + // Trigger webhook on status change (Accepted -> Completed) + webhook::trigger_swap_status_changed(swap_id, Some("Accepted".to_string()), "Completed".to_string()); Err(( StatusCode::NOT_FOUND, Json(ErrorResponse { @@ -194,6 +199,8 @@ pub async fn reveal_key(Path(swap_id): Path, Json(body): Json, Json(body): Json) -> Result)> { // TODO: Call Soroban RPC to invoke atomic_swap.cancel_swap + // Trigger webhook on status change (Pending -> Cancelled) + webhook::trigger_swap_status_changed(swap_id, Some("Pending".to_string()), "Cancelled".to_string()); Err(( StatusCode::NOT_FOUND, Json(ErrorResponse { @@ -218,6 +225,8 @@ pub async fn cancel_swap(Path(swap_id): Path, Json(body): Json, Json(body): Json) -> Result)> { // TODO: Call Soroban RPC to invoke atomic_swap.cancel_expired_swap + // Trigger webhook on status change (Accepted -> Cancelled) + webhook::trigger_swap_status_changed(swap_id, Some("Accepted".to_string()), "Cancelled".to_string()); Err(( StatusCode::NOT_FOUND, Json(ErrorResponse { @@ -247,3 +256,65 @@ pub async fn get_swap(Path(swap_id): Path) -> Result, (Sta }), )) } + +/// Register a webhook URL to receive swap event notifications. +#[utoipa::path( + post, + path = "/webhooks", + tag = "Webhooks", + request_body = RegisterWebhookRequest, + responses( + (status = 200, description = "Webhook registered", body = WebhookResponse), + (status = 400, description = "Invalid request", body = ErrorResponse), + ) +)] +pub async fn register_webhook(Json(body): Json) -> Result, (StatusCode, Json)> { + if body.url.is_empty() || body.events.is_empty() { + return Err(( + StatusCode::BAD_REQUEST, + Json(ErrorResponse { + error: "URL and events are required".to_string(), + }), + )); + } + + let config = webhook::register(body.url, body.events); + + Ok(Json(WebhookResponse { + id: config.id.to_string(), + url: config.url, + events: config.events, + created_at: config.created_at, + })) +} + +/// Unregister a webhook by ID. +#[utoipa::path( + delete, + path = "/webhooks/{id}", + tag = "Webhooks", + params(("id" = String, Path, description = "Webhook UUID")), + responses( + (status = 200, description = "Webhook unregistered"), + (status = 404, description = "Webhook not found", body = ErrorResponse), + ) +)] +pub async fn unregister_webhook(Path(id): Path) -> Result)> { + let uuid = uuid::Uuid::parse_str(&id).map_err(|_| ( + StatusCode::BAD_REQUEST, + Json(ErrorResponse { + error: "Invalid webhook ID format".to_string(), + }), + ))?; + + if webhook::unregister(uuid) { + Ok(StatusCode::OK) + } else { + Err(( + StatusCode::NOT_FOUND, + Json(ErrorResponse { + error: format!("Webhook {} not found", id), + }), + )) + } +} diff --git a/api-server/src/main.rs b/api-server/src/main.rs index 5d5437a..7a06ee9 100644 --- a/api-server/src/main.rs +++ b/api-server/src/main.rs @@ -6,6 +6,7 @@ use utoipa_swagger_ui::SwaggerUi; mod handlers; mod metrics; mod schemas; +mod webhook; #[derive(OpenApi)] #[openapi( @@ -26,6 +27,8 @@ mod schemas; handlers::cancel_swap, handlers::cancel_expired_swap, handlers::get_swap, + handlers::register_webhook, + handlers::unregister_webhook, ), components(schemas( schemas::CommitIpRequest, @@ -42,10 +45,13 @@ mod schemas; schemas::SwapRecord, schemas::SwapStatus, schemas::ErrorResponse, + schemas::RegisterWebhookRequest, + schemas::WebhookResponse, )), tags( (name = "IP Registry", description = "Commit and query intellectual property records"), (name = "Atomic Swap", description = "Trustless patent sale via atomic swap"), + (name = "Webhooks", description = "Real-time event notifications"), ) )] pub struct ApiDoc; diff --git a/api-server/src/schemas.rs b/api-server/src/schemas.rs index b348e62..03a7557 100644 --- a/api-server/src/schemas.rs +++ b/api-server/src/schemas.rs @@ -107,3 +107,17 @@ pub struct CancelExpiredSwapRequest { pub struct ErrorResponse { pub error: String, } + +#[derive(Debug, Serialize, Deserialize, ToSchema)] +pub struct RegisterWebhookRequest { + pub url: String, + pub events: Vec, +} + +#[derive(Debug, Serialize, Deserialize, ToSchema)] +pub struct WebhookResponse { + pub id: String, + pub url: String, + pub events: Vec, + pub created_at: u64, +} diff --git a/api-server/src/webhook.rs b/api-server/src/webhook.rs new file mode 100644 index 0000000..a9999b7 --- /dev/null +++ b/api-server/src/webhook.rs @@ -0,0 +1,155 @@ +use std::time::Duration; +use axum::http::StatusCode; +use dashmap::DashMap; +use once_cell::sync::Lazy; +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use tracing::{error, info, warn}; +use uuid::Uuid; + +/// Configuration for a registered webhook. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WebhookConfig { + pub id: Uuid, + pub url: String, + pub events: Vec, // e.g., ["swap.status_changed"] + pub created_at: u64, +} + +/// Webhook delivery payload. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WebhookPayload { + pub event: String, + pub swap_id: u64, + pub old_status: Option, + pub new_status: String, + pub timestamp: u64, +} + +/// In-memory webhook registry. +static REGISTRY: Lazy> = Lazy::new(DashMap::new); + +/// HTTP client for webhook delivery. +static CLIENT: Lazy = Lazy::new(|| { + Client::builder() + .timeout(Duration::from_secs(10)) + .build() + .expect("Failed to build reqwest client") +}); + +/// Register a new webhook. +pub fn register(url: String, events: Vec) -> WebhookConfig { + let config = WebhookConfig { + id: Uuid::new_v4(), + url, + events, + created_at: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(), + }; + REGISTRY.insert(config.id, config.clone()); + info!(webhook_id = %config.id, url = %config.url, "Webhook registered"); + config +} + +/// Unregister a webhook by ID. +pub fn unregister(id: Uuid) -> bool { + if REGISTRY.remove(&id).is_some() { + info!(webhook_id = %id, "Webhook unregistered"); + true + } else { + false + } +} + +/// List all registered webhooks. +pub fn list_all() -> Vec { + REGISTRY.iter().map(|entry| entry.clone()).collect() +} + +/// Trigger webhook delivery for a swap status change. +pub fn trigger_swap_status_changed(swap_id: u64, old_status: Option, new_status: String) { + let payload = WebhookPayload { + event: "swap.status_changed".to_string(), + swap_id, + old_status, + new_status, + timestamp: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(), + }; + + for entry in REGISTRY.iter() { + let config = entry.value(); + if config.events.contains(&"swap.status_changed".to_string()) || config.events.contains(&"*".to_string()) { + let config = config.clone(); + let payload = payload.clone(); + tokio::spawn(async move { + deliver_with_retry(&config, &payload).await; + }); + } + } +} + +/// Deliver a webhook payload with exponential backoff retry. +async fn deliver_with_retry(config: &WebhookConfig, payload: &WebhookPayload) { + let mut delay = Duration::from_secs(1); + let max_retries = 3; + + for attempt in 1..=max_retries { + match deliver(&config.url, payload).await { + Ok(status) if status.is_success() => { + info!( + webhook_id = %config.id, + url = %config.url, + attempt, + "Webhook delivered successfully" + ); + return; + } + Ok(status) => { + warn!( + webhook_id = %config.id, + url = %config.url, + attempt, + status = status.as_u16(), + "Webhook delivery returned non-success status" + ); + } + Err(e) => { + warn!( + webhook_id = %config.id, + url = %config.url, + attempt, + error = %e, + "Webhook delivery failed" + ); + } + } + + if attempt < max_retries { + tokio::time::sleep(delay).await; + delay *= 2; // exponential backoff: 1s, 2s, 4s + } + } + + error!( + webhook_id = %config.id, + url = %config.url, + "Webhook delivery exhausted all retries" + ); +} + +/// Single delivery attempt. +async fn deliver(url: &str, payload: &WebhookPayload) -> Result { + let response = CLIENT + .post(url) + .json(&json!(payload)) + .send() + .await?; + Ok(response.status()) +} +