diff --git a/src/webhooks/actix_web.rs b/src/webhooks/actix_web.rs index 9393140..96ffbb7 100644 --- a/src/webhooks/actix_web.rs +++ b/src/webhooks/actix_web.rs @@ -1,62 +1,61 @@ -use crate::Incoming; -use actix_web::{ - dev::Payload, - error::{Error, ErrorBadRequest, ErrorUnauthorized}, - web::Json, - FromRequest, HttpRequest, -}; -use serde::de::DeserializeOwned; +use super::IncomingPayload; use std::{ future::Future, pin::Pin, - task::{ready, Context, Poll}, + task::{Context, Poll, ready}, +}; + +use actix_web::{ + FromRequest, HttpRequest, + dev::Payload, + error::{Error, ErrorBadRequest, ErrorUnauthorized}, }; +use futures_core::stream::Stream; #[doc(hidden)] -pub struct IncomingFut { +pub struct IncomingPayloadFut { req: HttpRequest, - json_fut: as FromRequest>::Future, + payload: Payload, + body: Vec, } -impl Future for IncomingFut -where - T: DeserializeOwned, -{ - type Output = Result, Error>; +impl Future for IncomingPayloadFut { + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if let Ok(json) = ready!(Pin::new(&mut self.json_fut).poll(cx)) { - let headers = self.req.headers(); - - if let Some(authorization) = headers.get("Authorization") { - if let Ok(authorization) = authorization.to_str() { - return Poll::Ready(Ok(Incoming { - authorization: authorization.to_owned(), - data: json.into_inner(), - })); - } + while let Some(body) = ready!(Pin::new(&mut self.payload).poll_next(cx)) { + match body { + Ok(body) => self.body.extend_from_slice(&body), + + Err(_) => return Poll::Ready(Err(ErrorBadRequest("400"))), } + } + + let headers = self.req.headers(); - return Poll::Ready(Err(ErrorUnauthorized("401"))); + if let (Some(signature), Some(trace)) = ( + headers.get("x-topgg-signature"), + headers.get("x-topgg-trace"), + ) && let (Ok(signature), Ok(trace)) = (signature.to_str(), trace.to_str()) + && let Some(incoming) = IncomingPayload::new(signature, self.body.clone(), trace) + { + return Poll::Ready(Ok(incoming)); } - Poll::Ready(Err(ErrorBadRequest("400"))) + Poll::Ready(Err(ErrorUnauthorized("401"))) } } #[cfg_attr(docsrs, doc(cfg(feature = "actix-web")))] -impl FromRequest for Incoming -where - T: DeserializeOwned, -{ +impl FromRequest for IncomingPayload { type Error = Error; - type Future = IncomingFut; + type Future = IncomingPayloadFut; - #[inline(always)] fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { - IncomingFut { + IncomingPayloadFut { req: req.clone(), - json_fut: Json::from_request(req, payload), + payload: payload.take(), + body: vec![], } } } diff --git a/src/webhooks/axum.rs b/src/webhooks/axum.rs index 4175b1b..7a163e5 100644 --- a/src/webhooks/axum.rs +++ b/src/webhooks/axum.rs @@ -1,45 +1,69 @@ -use super::Webhook; +use super::Payload; +use std::sync::Arc; + use axum::{ + Router, extract::State, http::{HeaderMap, StatusCode}, - response::IntoResponse, + response::{IntoResponse, Response}, routing::post, - Router, }; -use serde::de::DeserializeOwned; -use std::sync::Arc; + +/// An axum webhook listener for listening to payloads. +/// +/// # Example +/// +/// ```rust,no_run +/// struct MyTopggListener {} +/// +/// #[async_trait::async_trait] +/// impl topgg::axum::Listener for MyTopggListener { +/// async fn callback(self: Arc, payload: Payload, _trace: &str) -> Response { +/// println!("{payload:?}"); +/// +/// (StatusCode::NO_CONTENT, ()).into_response() +/// } +/// } +/// ``` +#[async_trait::async_trait] +#[cfg_attr(docsrs, doc(cfg(feature = "axum")))] +pub trait Listener: Send + Sync + 'static { + async fn callback(self: Arc, payload: Payload, trace: &str) -> Response; +} struct WebhookState { state: Arc, - password: Arc, + secret: Arc, } impl Clone for WebhookState { - #[inline(always)] fn clone(&self) -> Self { Self { - state: Arc::clone(&self.state), - password: Arc::clone(&self.password), + state: self.state.clone(), + secret: self.secret.clone(), } } } -/// Creates a new axum [`Router`] for receiving vote events. +/// Creates a new axum [`Router`] for receiving webhook payloads. /// /// # Example /// /// ```rust,no_run -/// use axum::{routing::get, Router}; -/// use topgg::{VoteEvent, Webhook}; -/// use tokio::net::TcpListener; +/// use topgg::Payload; /// use std::sync::Arc; /// -/// struct MyVoteListener {} +/// use axum::{http::status::StatusCode, response::{IntoResponse, Response}, routing::get, Router}; +/// use tokio::net::TcpListener; +/// +/// struct MyTopggListener {} /// /// #[async_trait::async_trait] -/// impl Webhook for MyVoteListener { -/// async fn callback(&self, vote: VoteEvent) { -/// println!("A user with the ID of {} has voted us on Top.gg!", vote.voter_id); +/// impl topgg::axum::Listener for MyTopggListener { +/// async fn callback(self: Arc, payload: Payload, _trace: &str) -> Response { +/// println!("{payload:?}"); +/// +/// (StatusCode::NO_CONTENT, ()).into_response() /// } /// } /// @@ -49,11 +73,11 @@ impl Clone for WebhookState { /// /// #[tokio::main] /// async fn main() { -/// let state = Arc::new(MyVoteListener {}); +/// let state = Arc::new(MyTopggListener {}); /// /// let router = Router::new().route("/", get(index)).nest( -/// "/votes", -/// topgg::axum::webhook(env!("MY_TOPGG_WEBHOOK_SECRET").to_string(), Arc::clone(&state)), +/// "/webhook", +/// topgg::axum::webhook(Arc::clone(&state), env!("TOPGG_WEBHOOK_SECRET").to_string()), /// ); /// /// let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap(); @@ -61,36 +85,31 @@ impl Clone for WebhookState { /// axum::serve(listener, router).await.unwrap(); /// } /// ``` -#[inline(always)] #[cfg_attr(docsrs, doc(cfg(feature = "axum")))] -pub fn webhook(password: String, state: Arc) -> Router +pub fn webhook(state: Arc, secret: String) -> Router where - D: DeserializeOwned + Send, - T: Webhook, + S: Listener, { Router::new() .route( "/", post( - async |headers: HeaderMap, State(webhook): State>, body: String| { - if let Some(authorization) = headers.get("Authorization") { - if let Ok(authorization) = authorization.to_str() { - if authorization == *(webhook.password) { - if let Ok(data) = serde_json::from_str(&body) { - webhook.state.callback(data).await; - - return (StatusCode::NO_CONTENT, ()).into_response(); - } - } - } + async |headers: HeaderMap, State(wrapped_state): State>, body: String| { + if let Some(signature) = headers.get("x-topgg-signature") + && let Ok(signature) = signature.to_str() + && let Some(trace) = headers.get("x-topgg-trace") + && let Ok(trace) = trace.to_str() + && let Some(payload) = Payload::new(signature, &body, &wrapped_state.secret) + { + wrapped_state.state.callback(payload, trace).await + } else { + (StatusCode::UNAUTHORIZED, ()).into_response() } - - (StatusCode::UNAUTHORIZED, ()).into_response() }, ), ) .with_state(WebhookState { state, - password: Arc::new(password), + secret: Arc::new(secret), }) } diff --git a/src/webhooks/mod.rs b/src/webhooks/mod.rs index a5ec591..124daf9 100644 --- a/src/webhooks/mod.rs +++ b/src/webhooks/mod.rs @@ -1,6 +1,6 @@ -mod vote; -#[cfg_attr(docsrs, doc(cfg(feature = "webhooks")))] -pub use vote::*; +mod payload; + +pub use payload::Payload; #[cfg(feature = "actix-web")] mod actix_web; @@ -26,49 +26,127 @@ cfg_if::cfg_if! { cfg_if::cfg_if! { if #[cfg(any(feature = "actix-web", feature = "rocket"))] { - /// An unauthenticated incoming Top.gg webhook request. - #[must_use] + use std::collections::HashMap; + + use hmac::{Hmac, Mac}; + use sha2::Sha256; + + /// An incoming [`Payload`] that is yet to be [authenticated with a secret][IncomingPayload::authenticate]. + /// + /// # Examples + /// + /// With actix-web: + /// + /// ```rust,no_run + /// use topgg::IncomingPayload; + /// use std::io; + /// + /// use actix_web::{ + /// error::{Error, ErrorUnauthorized}, + /// get, post, App, HttpServer, + /// }; + /// + /// #[get("/")] + /// async fn index() -> &'static str { + /// "Hello, World!" + /// } + /// + /// #[post("/webhook")] + /// async fn webhook(payload: IncomingPayload) -> Result<&'static str, Error> { + /// match payload.authenticate(env!("TOPGG_WEBHOOK_SECRET")) { + /// Some(payload) => { + /// println!("{payload:?}"); + /// + /// Ok("ok") + /// } + /// + /// _ => Err(ErrorUnauthorized("401")), + /// } + /// } + /// + /// #[actix_web::main] + /// async fn main() -> io::Result<()> { + /// HttpServer::new(|| App::new().service(index).service(webhook)) + /// .bind("127.0.0.1:8080")? + /// .run() + /// .await + /// } + /// ``` + /// + /// With rocket: + /// + /// ```rust,no_run + /// use topgg::IncomingPayload; + /// + /// use rocket::{get, http::Status, launch, post, routes, Build, Rocket}; + /// + /// #[get("/")] + /// fn index() -> &'static str { + /// "Hello, World!" + /// } + /// + /// #[post("/webhook", data = "")] + /// fn webhook(payload: IncomingPayload) -> Status { + /// match payload.authenticate(env!("TOPGG_WEBHOOK_SECRET")) { + /// Some(payload) => { + /// println!("{payload:?}"); + /// + /// Status::Ok + /// }, + /// _ => { + /// println!("found an unauthorized attacker."); + /// + /// Status::Unauthorized + /// } + /// } + /// } + /// + /// #[launch] + /// fn rocket() -> Rocket { + /// rocket::build().mount("/", routes![index, webhook]) + /// } + /// ``` #[cfg_attr(docsrs, doc(cfg(any(feature = "actix-web", feature = "rocket"))))] - pub struct Incoming { - pub(crate) authorization: String, - pub(crate) data: T, + pub struct IncomingPayload { + t: String, + signature: String, + body: String, + trace: String, } - impl Incoming { - /// Authenticates a valid password with this request. + impl IncomingPayload { + pub(super) fn new(signature: &str, body: Vec, trace: &str) -> Option { + let signature = signature.split(',').filter_map(|p| p.split_once('=')).collect::>(); + + Some(Self { + t: signature.get("t")?.to_string(), + signature: signature.get("v1")?.to_string(), + body: String::from_utf8(body).ok()?, + trace: trace.into(), + }) + } + + /// Tries to authenticate a valid secret with this request. #[must_use] - #[inline(always)] - pub fn authenticate(self, password: &str) -> Option { - if self.authorization == password { - Some(self.data) + pub fn authenticate(&self, secret: &str) -> Option { + let mut hmac = Hmac::::new_from_slice(secret.as_bytes()).ok()?; + + hmac.update(format!("{}.{}", self.t, self.body).as_bytes()); + + let digest = hex::encode(hmac.finalize().into_bytes()); + + if digest == self.signature && let Ok(payload) = serde_json::from_str(&self.body) { + Some(payload) } else { None } } - } - impl Clone for Incoming - where - T: Clone, - { - #[inline(always)] - fn clone(&self) -> Self { - Self { - authorization: self.authorization.clone(), - data: self.data.clone(), - } + /// Retrieves the payload's `x-topgg-trace` header for debugging and correlating requests with Top.gg support. + #[must_use] + pub fn get_trace(&self) -> &str { + &self.trace } } } } - -cfg_if::cfg_if! { - if #[cfg(any(feature = "axum", feature = "warp"))] { - /// Webhook event handler. - #[cfg_attr(docsrs, doc(cfg(any(feature = "axum", feature = "warp"))))] - #[async_trait::async_trait] - pub trait Webhook: Send + Sync + 'static { - async fn callback(&self, data: T); - } - } -} diff --git a/src/webhooks/payload.rs b/src/webhooks/payload.rs new file mode 100644 index 0000000..12b16c1 --- /dev/null +++ b/src/webhooks/payload.rs @@ -0,0 +1,104 @@ +use super::super::{PartialProject, User, snowflake}; + +use chrono::{DateTime, Utc}; +use serde::Deserialize; + +/// A webhook payload. +#[non_exhaustive] +#[derive(Clone, Debug, Deserialize)] +#[serde(tag = "type", content = "data")] +#[cfg_attr(docsrs, doc(cfg(feature = "webhooks")))] +pub enum Payload { + /// An `integration.create` webhook payload. Fires when a user has connected to your webhook integration. + #[serde(rename = "integration.create")] + IntegrationCreate { + /// The unique identifier for this connection. + #[serde(deserialize_with = "snowflake::deserialize")] + connection_id: u64, + + /// The secret used to verify future webhook deliveries. + #[serde(rename = "webhook_secret")] + secret: String, + + /// The project that the integration refers to. + project: PartialProject, + + /// The user who triggered this event. + user: User, + }, + + /// An `integration.delete` webhook payload. Fires when a user has disconnected from your webhook integration. + #[serde(rename = "integration.delete")] + IntegrationDelete { + /// The unique identifier for this connection. + #[serde(deserialize_with = "snowflake::deserialize")] + connection_id: u64, + }, + + /// A `webhook.test` webhook payload. Fires upon sent test from the project dashboard. + #[serde(rename = "webhook.test")] + Test { + /// The project that the test refers to. + project: PartialProject, + + /// The user who triggered this test. + user: User, + }, + + /// A `vote.create` webhook payload. Fires when a user votes for your project. + #[serde(rename = "vote.create")] + VoteCreate { + /// The vote's ID. + #[serde(deserialize_with = "snowflake::deserialize")] + id: u64, + + /// The number of votes this vote counted for. This is a rounded integer value which determines how many points this individual vote was worth. + weight: u64, + + /// When the vote was cast. + #[serde(rename = "created_at")] + voted_at: DateTime, + + /// When the vote expires and the user is required to vote again. + expires_at: DateTime, + + /// The project that received this vote. + project: PartialProject, + + /// The user who voted for this project. + user: User, + }, +} + +impl Payload { + #[cfg(any(feature = "axum", feature = "warp"))] + pub(super) fn new(signature: &str, body: &str, secret: &str) -> Option { + use std::collections::HashMap; + + use hmac::{Hmac, Mac}; + use sha2::Sha256; + + let signature = signature + .split(',') + .filter_map(|p| p.split_once('=')) + .collect::>(); + + let (Some(t), Some(signature)) = (signature.get("t"), signature.get("v1")) else { + return None; + }; + + let mut hmac = Hmac::::new_from_slice(secret.as_bytes()).ok()?; + + hmac.update(format!("{t}.{body}").as_bytes()); + + let digest = hex::encode(hmac.finalize().into_bytes()); + + if &digest == signature + && let Ok(payload) = serde_json::from_str(body) + { + Some(payload) + } else { + None + } + } +} diff --git a/src/webhooks/rocket.rs b/src/webhooks/rocket.rs index 0bfb988..31d0034 100644 --- a/src/webhooks/rocket.rs +++ b/src/webhooks/rocket.rs @@ -1,31 +1,30 @@ -use crate::Incoming; +use super::IncomingPayload; + use rocket::{ - data::{Data, FromData, Outcome}, + data::{Data, FromData, Outcome, ToByteUnit}, http::Status, request::Request, - serde::json::Json, }; -use serde::de::DeserializeOwned; #[cfg_attr(docsrs, doc(cfg(feature = "rocket")))] #[rocket::async_trait] -impl<'r, T> FromData<'r> for Incoming -where - T: DeserializeOwned, -{ +impl<'r> FromData<'r> for IncomingPayload { type Error = (); async fn from_data(request: &'r Request<'_>, data: Data<'r>) -> Outcome<'r, Self> { let headers = request.headers(); - if let Some(authorization) = headers.get_one("Authorization") { - return match as FromData>::from_data(request, data).await { - Outcome::Success(data) => Outcome::Success(Self { - authorization: authorization.to_owned(), - data: data.into_inner(), - }), - _ => Outcome::Error((Status::BadRequest, ())), - }; + if let (Some(signature), Some(trace)) = ( + headers.get_one("x-topgg-signature"), + headers.get_one("x-topgg-trace"), + ) { + if let Ok(body) = data.open(2.mebibytes()).into_bytes().await + && let Some(output) = Self::new(signature, body.into_inner(), trace) + { + return Outcome::Success(output); + } + + return Outcome::Error((Status::BadRequest, ())); } Outcome::Error((Status::Unauthorized, ())) diff --git a/src/webhooks/vote.rs b/src/webhooks/vote.rs deleted file mode 100644 index 0bbbb54..0000000 --- a/src/webhooks/vote.rs +++ /dev/null @@ -1,67 +0,0 @@ -use crate::snowflake; -use serde::{Deserialize, Deserializer}; -use std::collections::HashMap; - -#[inline(always)] -fn deserialize_is_test<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - String::deserialize(deserializer).map(|s| s == "test") -} - -fn deserialize_query_string<'de, D>(deserializer: D) -> Result, D::Error> -where - D: Deserializer<'de>, -{ - Ok( - String::deserialize(deserializer) - .map(|s| { - let mut output = HashMap::new(); - - for mut it in s - .trim_start_matches('?') - .split('&') - .map(|pair| pair.split('=')) - { - if let (Some(k), Some(v)) = (it.next(), it.next()) { - if let Ok(v) = urlencoding::decode(v) { - output.insert(k.to_owned(), v.into_owned()); - } - } - } - - output - }) - .unwrap_or_default(), - ) -} - -/// A dispatched Top.gg vote event. -#[must_use] -#[derive(Clone, Debug, Deserialize)] -pub struct VoteEvent { - /// The ID of the project that received a vote. - #[serde( - deserialize_with = "snowflake::deserialize", - alias = "bot", - alias = "guild" - )] - pub receiver_id: u64, - - /// The ID of the Top.gg user who voted. - #[serde(deserialize_with = "snowflake::deserialize", rename = "user")] - pub voter_id: u64, - - /// Whether this vote is just a test done from the page settings. - #[serde(deserialize_with = "deserialize_is_test", rename = "type")] - pub is_test: bool, - - /// Whether the weekend multiplier is active, where a single vote counts as two. - #[serde(default, rename = "isWeekend")] - pub is_weekend: bool, - - /// Query strings found on the vote page. - #[serde(default, deserialize_with = "deserialize_query_string")] - pub query: HashMap, -} diff --git a/src/webhooks/warp.rs b/src/webhooks/warp.rs index 51c108b..30d3dba 100644 --- a/src/webhooks/warp.rs +++ b/src/webhooks/warp.rs @@ -1,36 +1,34 @@ -use super::Webhook; -use serde::de::DeserializeOwned; -use std::sync::Arc; -use warp::{body, header, http::StatusCode, path, Filter, Rejection, Reply}; +use super::Payload; + +use bytes::Bytes; +use warp::{Filter, Rejection, body, header, path}; /// Creates a new warp [`Filter`] for receiving webhook events. /// /// # Example /// /// ```rust,no_run -/// use std::{net::SocketAddr, sync::Arc}; -/// use topgg::{VoteEvent, Webhook}; -/// use warp::Filter; -/// -/// struct MyVoteListener {} +/// use std::net::SocketAddr; /// -/// #[async_trait::async_trait] -/// impl Webhook for MyVoteListener { -/// async fn callback(&self, vote: VoteEvent) { -/// println!("A user with the ID of {} has voted us on Top.gg!", vote.voter_id); -/// } -/// } +/// use warp::{http::StatusCode, reply, Filter}; /// /// #[tokio::main] /// async fn main() { -/// let state = Arc::new(MyVoteListener {}); -/// -/// // POST /votes +/// // POST /webhook /// let webhook = topgg::warp::webhook( -/// "votes", -/// env!("MY_TOPGG_WEBHOOK_SECRET").to_string(), -/// Arc::clone(&state), -/// ); +/// "webhook", +/// env!("TOPGG_WEBHOOK_SECRET").to_string() +/// ).then(|payload, _trace| async move { +/// match payload { +/// Some(payload) => { +/// println!("{payload:?}"); +/// +/// reply::with_status("", StatusCode::NO_CONTENT) +/// }, +/// +/// None => reply::with_status("Unauthorized", StatusCode::UNAUTHORIZED) +/// } +/// }); /// /// let routes = warp::get().map(|| "Hello, World!").or(webhook); /// @@ -39,34 +37,20 @@ use warp::{body, header, http::StatusCode, path, Filter, Rejection, Reply}; /// warp::serve(routes).run(addr).await /// } /// ``` +#[must_use] #[cfg_attr(docsrs, doc(cfg(feature = "warp")))] -pub fn webhook( +pub fn webhook( endpoint: &'static str, - password: String, - state: Arc, -) -> impl Filter + Clone -where - D: DeserializeOwned + Send, - T: Webhook, -{ - let password = Arc::new(password); - + secret: String, +) -> impl Filter, String), Error = Rejection> + Clone { warp::post() .and(path(endpoint)) - .and(header("Authorization")) - .and(body::json()) - .then(move |auth: String, data: D| { - let current_state = Arc::clone(&state); - let current_password = Arc::clone(&password); - - async move { - if auth == *current_password { - current_state.callback(data).await; - - StatusCode::NO_CONTENT - } else { - StatusCode::UNAUTHORIZED - } - } + .and(header("x-topgg-signature")) + .and(body::bytes()) + .map(move |signature: String, body: Bytes| { + str::from_utf8(&body) + .ok() + .and_then(|body| Payload::new(&signature, body, &secret)) }) + .and(header("x-topgg-trace")) }