Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions api-server/src/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use axum::{extract::Path, http::StatusCode, Json};
use tracing::instrument;
use crate::schemas::*;
use crate::webhook;

/// Timestamp a new IP commitment. Returns the assigned IP ID.
#[utoipa::path(
Expand Down Expand Up @@ -146,6 +147,8 @@ pub async fn initiate_swap(Json(body): Json<InitiateSwapRequest>) -> Result<Json
#[instrument(skip(body))]
pub async fn accept_swap(Path(swap_id): Path<u64>, Json(body): Json<AcceptSwapRequest>) -> Result<StatusCode, (StatusCode, Json<ErrorResponse>)> {
// TODO: Call Soroban RPC to invoke atomic_swap.accept_swap
// Trigger webhook on status change (Pending -> Accepted)
webhook::trigger_swap_status_changed(swap_id, Some("Pending".to_string()), "Accepted".to_string());
Err((
StatusCode::NOT_FOUND,
Json(ErrorResponse {
Expand All @@ -170,6 +173,8 @@ pub async fn accept_swap(Path(swap_id): Path<u64>, Json(body): Json<AcceptSwapRe
#[instrument(skip(body))]
pub async fn reveal_key(Path(swap_id): Path<u64>, Json(body): Json<RevealKeyRequest>) -> Result<StatusCode, (StatusCode, Json<ErrorResponse>)> {
// TODO: Call Soroban RPC to invoke atomic_swap.reveal_key
// Trigger webhook on status change (Accepted -> Completed)
webhook::trigger_swap_status_changed(swap_id, Some("Accepted".to_string()), "Completed".to_string());
Err((
StatusCode::NOT_FOUND,
Json(ErrorResponse {
Expand All @@ -194,6 +199,8 @@ pub async fn reveal_key(Path(swap_id): Path<u64>, Json(body): Json<RevealKeyRequ
#[instrument(skip(body))]
pub async fn cancel_swap(Path(swap_id): Path<u64>, Json(body): Json<CancelSwapRequest>) -> Result<StatusCode, (StatusCode, Json<ErrorResponse>)> {
// TODO: Call Soroban RPC to invoke atomic_swap.cancel_swap
// Trigger webhook on status change (Pending -> Cancelled)
webhook::trigger_swap_status_changed(swap_id, Some("Pending".to_string()), "Cancelled".to_string());
Err((
StatusCode::NOT_FOUND,
Json(ErrorResponse {
Expand All @@ -218,6 +225,8 @@ pub async fn cancel_swap(Path(swap_id): Path<u64>, Json(body): Json<CancelSwapRe
#[instrument(skip(body))]
pub async fn cancel_expired_swap(Path(swap_id): Path<u64>, Json(body): Json<CancelExpiredSwapRequest>) -> Result<StatusCode, (StatusCode, Json<ErrorResponse>)> {
// TODO: Call Soroban RPC to invoke atomic_swap.cancel_expired_swap
// Trigger webhook on status change (Accepted -> Cancelled)
webhook::trigger_swap_status_changed(swap_id, Some("Accepted".to_string()), "Cancelled".to_string());
Err((
StatusCode::NOT_FOUND,
Json(ErrorResponse {
Expand Down Expand Up @@ -247,3 +256,65 @@ pub async fn get_swap(Path(swap_id): Path<u64>) -> Result<Json<SwapRecord>, (Sta
}),
))
}

/// Register a webhook URL to receive swap event notifications.
#[utoipa::path(
post,
path = "/webhooks",
tag = "Webhooks",
request_body = RegisterWebhookRequest,
responses(
(status = 200, description = "Webhook registered", body = WebhookResponse),
(status = 400, description = "Invalid request", body = ErrorResponse),
)
)]
pub async fn register_webhook(Json(body): Json<RegisterWebhookRequest>) -> Result<Json<WebhookResponse>, (StatusCode, Json<ErrorResponse>)> {
if body.url.is_empty() || body.events.is_empty() {
return Err((
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: "URL and events are required".to_string(),
}),
));
}

let config = webhook::register(body.url, body.events);

Ok(Json(WebhookResponse {
id: config.id.to_string(),
url: config.url,
events: config.events,
created_at: config.created_at,
}))
}

/// Unregister a webhook by ID.
#[utoipa::path(
delete,
path = "/webhooks/{id}",
tag = "Webhooks",
params(("id" = String, Path, description = "Webhook UUID")),
responses(
(status = 200, description = "Webhook unregistered"),
(status = 404, description = "Webhook not found", body = ErrorResponse),
)
)]
pub async fn unregister_webhook(Path(id): Path<String>) -> Result<StatusCode, (StatusCode, Json<ErrorResponse>)> {
let uuid = uuid::Uuid::parse_str(&id).map_err(|_| (
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: "Invalid webhook ID format".to_string(),
}),
))?;

if webhook::unregister(uuid) {
Ok(StatusCode::OK)
} else {
Err((
StatusCode::NOT_FOUND,
Json(ErrorResponse {
error: format!("Webhook {} not found", id),
}),
))
}
}
6 changes: 6 additions & 0 deletions api-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use utoipa_swagger_ui::SwaggerUi;
mod handlers;
mod metrics;
mod schemas;
mod webhook;

#[derive(OpenApi)]
#[openapi(
Expand All @@ -26,6 +27,8 @@ mod schemas;
handlers::cancel_swap,
handlers::cancel_expired_swap,
handlers::get_swap,
handlers::register_webhook,
handlers::unregister_webhook,
),
components(schemas(
schemas::CommitIpRequest,
Expand All @@ -42,10 +45,13 @@ mod schemas;
schemas::SwapRecord,
schemas::SwapStatus,
schemas::ErrorResponse,
schemas::RegisterWebhookRequest,
schemas::WebhookResponse,
)),
tags(
(name = "IP Registry", description = "Commit and query intellectual property records"),
(name = "Atomic Swap", description = "Trustless patent sale via atomic swap"),
(name = "Webhooks", description = "Real-time event notifications"),
)
)]
pub struct ApiDoc;
Expand Down
14 changes: 14 additions & 0 deletions api-server/src/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,17 @@ pub struct CancelExpiredSwapRequest {
pub struct ErrorResponse {
pub error: String,
}

#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct RegisterWebhookRequest {
pub url: String,
pub events: Vec<String>,
}

#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct WebhookResponse {
pub id: String,
pub url: String,
pub events: Vec<String>,
pub created_at: u64,
}
155 changes: 155 additions & 0 deletions api-server/src/webhook.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
use std::time::Duration;
use axum::http::StatusCode;
use dashmap::DashMap;
use once_cell::sync::Lazy;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tracing::{error, info, warn};
use uuid::Uuid;

/// Configuration for a registered webhook.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookConfig {
pub id: Uuid,
pub url: String,
pub events: Vec<String>, // e.g., ["swap.status_changed"]
pub created_at: u64,
}

/// Webhook delivery payload.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookPayload {
pub event: String,
pub swap_id: u64,
pub old_status: Option<String>,
pub new_status: String,
pub timestamp: u64,
}

/// In-memory webhook registry.
static REGISTRY: Lazy<DashMap<Uuid, WebhookConfig>> = Lazy::new(DashMap::new);

/// HTTP client for webhook delivery.
static CLIENT: Lazy<Client> = Lazy::new(|| {
Client::builder()
.timeout(Duration::from_secs(10))
.build()
.expect("Failed to build reqwest client")
});

/// Register a new webhook.
pub fn register(url: String, events: Vec<String>) -> WebhookConfig {
let config = WebhookConfig {
id: Uuid::new_v4(),
url,
events,
created_at: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
};
REGISTRY.insert(config.id, config.clone());
info!(webhook_id = %config.id, url = %config.url, "Webhook registered");
config
}

/// Unregister a webhook by ID.
pub fn unregister(id: Uuid) -> bool {
if REGISTRY.remove(&id).is_some() {
info!(webhook_id = %id, "Webhook unregistered");
true
} else {
false
}
}

/// List all registered webhooks.
pub fn list_all() -> Vec<WebhookConfig> {
REGISTRY.iter().map(|entry| entry.clone()).collect()
}

/// Trigger webhook delivery for a swap status change.
pub fn trigger_swap_status_changed(swap_id: u64, old_status: Option<String>, new_status: String) {
let payload = WebhookPayload {
event: "swap.status_changed".to_string(),
swap_id,
old_status,
new_status,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
};

for entry in REGISTRY.iter() {
let config = entry.value();
if config.events.contains(&"swap.status_changed".to_string()) || config.events.contains(&"*".to_string()) {
let config = config.clone();
let payload = payload.clone();
tokio::spawn(async move {
deliver_with_retry(&config, &payload).await;
});
}
}
}

/// Deliver a webhook payload with exponential backoff retry.
async fn deliver_with_retry(config: &WebhookConfig, payload: &WebhookPayload) {
let mut delay = Duration::from_secs(1);
let max_retries = 3;

for attempt in 1..=max_retries {
match deliver(&config.url, payload).await {
Ok(status) if status.is_success() => {
info!(
webhook_id = %config.id,
url = %config.url,
attempt,
"Webhook delivered successfully"
);
return;
}
Ok(status) => {
warn!(
webhook_id = %config.id,
url = %config.url,
attempt,
status = status.as_u16(),
"Webhook delivery returned non-success status"
);
}
Err(e) => {
warn!(
webhook_id = %config.id,
url = %config.url,
attempt,
error = %e,
"Webhook delivery failed"
);
}
}

if attempt < max_retries {
tokio::time::sleep(delay).await;
delay *= 2; // exponential backoff: 1s, 2s, 4s
}
}

error!(
webhook_id = %config.id,
url = %config.url,
"Webhook delivery exhausted all retries"
);
}

/// Single delivery attempt.
async fn deliver(url: &str, payload: &WebhookPayload) -> Result<StatusCode, reqwest::Error> {
let response = CLIENT
.post(url)
.json(&json!(payload))
.send()
.await?;
Ok(response.status())
}

Loading