From 6f1cfa9fed2454dcf31daf07dc6ac72762140857 Mon Sep 17 00:00:00 2001 From: Ahmad Baalbaky Date: Mon, 9 Mar 2026 00:12:15 +0100 Subject: [PATCH 1/6] draft: records notifier impl Refs: #124. --- Cargo.toml | 13 +-- crates/game_api/Cargo.toml | 1 + crates/game_api/src/configure.rs | 11 ++- crates/game_api/src/graphql.rs | 35 ++++++-- crates/game_api/src/http/event.rs | 49 ++++++++--- crates/game_api/src/http/player.rs | 44 +++++++--- crates/game_api/src/http/player_finished.rs | 32 ++++++- crates/game_api/src/http/staggered.rs | 37 +++++--- crates/game_api/src/utils.rs | 25 +----- crates/graphql-api/Cargo.toml | 1 + crates/graphql-api/src/lib.rs | 1 + crates/graphql-api/src/schema.rs | 32 +++---- crates/graphql-api/src/subscriptions/mod.rs | 1 + crates/graphql-api/src/subscriptions/root.rs | 48 ++++++++++ crates/records-notifier/Cargo.toml | 12 +++ crates/records-notifier/src/lib.rs | 92 ++++++++++++++++++++ 16 files changed, 344 insertions(+), 90 deletions(-) create mode 100644 crates/graphql-api/src/subscriptions/mod.rs create mode 100644 crates/graphql-api/src/subscriptions/root.rs create mode 100644 crates/records-notifier/Cargo.toml create mode 100644 crates/records-notifier/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index dd5a421..377beea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,18 +1,19 @@ [workspace] resolver = "2" members = [ - "crates/game_api", - "crates/records_lib", - "crates/socc", "crates/admin", - "crates/request_filter", + "crates/compute-player-map-ranking", "crates/dsc_webhook", - "crates/migration", "crates/entity", + "crates/game_api", "crates/graphql-api", "crates/graphql-schema-generator", + "crates/migration", "crates/player-map-ranking", - "crates/compute-player-map-ranking", + "crates/records_lib", + "crates/records-notifier", + "crates/request_filter", + "crates/socc", "crates/test-env", ] diff --git a/crates/game_api/Cargo.toml b/crates/game_api/Cargo.toml index 5c401c0..bce2450 100644 --- a/crates/game_api/Cargo.toml +++ b/crates/game_api/Cargo.toml @@ -42,6 +42,7 @@ migration = { path = "../migration" } entity = { path = "../entity" } graphql-api = { path = "../graphql-api" } serde_json = { workspace = true } +records-notifier = { path = "../records-notifier" } [dev-dependencies] test-env = { path = "../test-env" } diff --git a/crates/game_api/src/configure.rs b/crates/game_api/src/configure.rs index e7cc554..8e6eb8d 100644 --- a/crates/game_api/src/configure.rs +++ b/crates/game_api/src/configure.rs @@ -13,6 +13,7 @@ use actix_web::{ use dsc_webhook::{FormattedRequestHead, WebhookBody, WebhookBodyEmbed, WebhookBodyEmbedField}; use mkenv::prelude::*; use records_lib::{Database, pool::clone_dbconn}; +use records_notifier::RecordsNotifier; use tracing_actix_web::{DefaultRootSpanBuilder, RequestId}; use crate::{ApiErrorKind, RecordsErrorKindResponse, RecordsResult, Res, TracedError}; @@ -240,12 +241,20 @@ pub fn configure(cfg: &mut web::ServiceConfig, db: Database) { .build() .unwrap(); + let records_notifier = RecordsNotifier::default(); + let records_subscription = records_notifier.get_subscription(); + cfg.app_data(web::Data::new(crate::AuthState::default())) .app_data(client.clone()) .app_data(clone_dbconn(&db.sql_conn)) .app_data(db.redis_pool.clone()) .app_data(db.clone()) - .service(crate::graphql_route(db.clone(), client)) + .app_data(records_notifier) + .service(crate::graphql_route( + db.clone(), + client, + records_subscription, + )) .service(crate::api_route()) .default_service(web::to(not_found)); } diff --git a/crates/game_api/src/graphql.rs b/crates/game_api/src/graphql.rs index 4a7f778..0cd2fb1 100644 --- a/crates/game_api/src/graphql.rs +++ b/crates/game_api/src/graphql.rs @@ -1,13 +1,14 @@ use actix_session::Session; -use actix_web::{HttpRequest, web}; -use actix_web::{HttpResponse, Resource, Responder}; +use actix_web::{HttpRequest, Scope, guard, web}; +use actix_web::{HttpResponse, Responder}; use async_graphql::ErrorExtensionValues; use async_graphql::http::{GraphQLPlaygroundConfig, playground_source}; -use async_graphql_actix_web::GraphQLRequest; +use async_graphql_actix_web::{GraphQLRequest, GraphQLSubscription}; use graphql_api::error::{ApiGqlError, ApiGqlErrorKind}; use graphql_api::schema::{Schema, create_schema}; use mkenv::prelude::*; use records_lib::Database; +use records_notifier::LatestRecordsSubscription; use reqwest::Client; use tracing_actix_web::RequestId; @@ -84,9 +85,27 @@ async fn index_playground() -> impl Responder { ))) } -pub fn graphql_route(db: Database, client: Client) -> Resource { - web::resource("/graphql") - .app_data(create_schema(db, client)) - .route(web::get().to(index_playground)) - .route(web::post().to(index_graphql)) +async fn index_subscriptions( + schema: Res, + req: HttpRequest, + payload: web::Payload, +) -> Result { + GraphQLSubscription::new(Schema::clone(&*schema)).start(&req, payload) +} + +pub fn graphql_route( + db: Database, + client: Client, + records_sub: LatestRecordsSubscription, +) -> Scope { + web::scope("/graphql") + .app_data(create_schema(db, client, records_sub)) + .route("", web::get().to(index_playground)) + .route("", web::post().to(index_graphql)) + .route( + "/subscriptions", + web::get() + .guard(guard::Header("upgrade", "websocket")) + .to(index_subscriptions), + ) } diff --git a/crates/game_api/src/http/event.rs b/crates/game_api/src/http/event.rs index f319f80..41ef48a 100644 --- a/crates/game_api/src/http/event.rs +++ b/crates/game_api/src/http/event.rs @@ -1,9 +1,10 @@ use std::collections::HashMap; use actix_web::{ - Responder, Scope, + HttpResponse, Responder, Scope, web::{self, Path}, }; +use chrono::{DateTime, Utc}; use deadpool_redis::redis::AsyncCommands as _; use entity::{ event_edition, event_edition_maps, event_edition_records, global_event_records, global_records, @@ -22,6 +23,7 @@ use records_lib::{ redis_key::alone_map_key, sync, }; +use records_notifier::RecordsNotifier; use sea_orm::{ ActiveValue::Set, ColumnTrait as _, ConnectionTrait, EntityTrait, FromQueryResult, QueryFilter, QueryOrder, @@ -805,14 +807,16 @@ async fn edition_finished( path: Path<(String, u32)>, body: pf::PlayerFinishedBody, mode_version: Option, + Res(records_notifier): Res, ) -> RecordsResult { edition_finished_at( login, db, path, body.0, - chrono::Utc::now().naive_utc(), + chrono::Utc::now(), mode_version.map(|m| m.0), + &records_notifier, ) .await } @@ -833,12 +837,21 @@ async fn edition_finished_impl( original_map_id, inner_params: params, }: EditionFinishedParams<'_>, + records_notifier: &RecordsNotifier, ) -> RecordsResult where C: ConnectionTrait + TransactionTrait, { // We insert the record for the global records - let res = pf::finished(conn, redis_pool, params, player_login, map).await?; + let res = pf::finished( + conn, + redis_pool, + params, + player_login, + map, + records_notifier, + ) + .await?; if let Some(original_map_id) = original_map_id { // Get the previous time of the player on the original map to check if it's a PB @@ -889,9 +902,10 @@ pub async fn edition_finished_at( db: Res, path: Path<(String, u32)>, body: pf::HasFinishedBody, - at: chrono::NaiveDateTime, + at: DateTime, mode_version: Option, -) -> RecordsResult { + records_notifier: &RecordsNotifier, +) -> RecordsResult { let (event_handle, edition_id) = path.into_inner(); // We first check that the event and its edition exist @@ -913,14 +927,25 @@ pub async fn edition_finished_at( // The edition is transparent, so we save the record for the map directly. if edition.is_transparent != 0 { - let res = - super::player::finished_at(&db.sql_conn, &db.redis_pool, mode_version, login, body, at) - .await?; - return Ok(utils::Either::Left(res)); + let res = super::player::finished_at( + &db.sql_conn, + &db.redis_pool, + mode_version, + login, + body, + at, + records_notifier, + ) + .await?; + return Ok(res); } if edition.has_expired() - && !(edition.start_date <= at && edition.expire_date().filter(|date| at > *date).is_none()) + && !(edition.start_date <= at.naive_utc() + && edition + .expire_date() + .filter(|date| at.naive_utc() > *date) + .is_none()) { return Err(ApiErrorKind::EventHasExpired(event.handle, edition.id)); } @@ -937,9 +962,9 @@ pub async fn edition_finished_at( }, }; - let res = edition_finished_impl(&db.sql_conn, &db.redis_pool, params).await?; + let res = edition_finished_impl(&db.sql_conn, &db.redis_pool, params, records_notifier).await?; - json(res.res).map(utils::Either::Right) + json(res.res) } pub async fn insert_event_record( diff --git a/crates/game_api/src/http/player.rs b/crates/game_api/src/http/player.rs index 58238af..0836ba5 100644 --- a/crates/game_api/src/http/player.rs +++ b/crates/game_api/src/http/player.rs @@ -2,14 +2,15 @@ mod auth; use actix_web::{ HttpResponse, Responder, Scope, - body::BoxBody, web::{self, Json}, }; +use chrono::{DateTime, Utc}; use deadpool_redis::redis::AsyncCommands as _; use entity::{banishments, current_bans, maps, players, records, role, types}; use futures::TryStreamExt; use mkenv::prelude::*; use records_lib::{Database, RedisPool, must, player, redis_key::alone_map_key, sync}; +use records_notifier::RecordsNotifier; use reqwest::Client; use sea_orm::{ ActiveValue::{Set, Unchanged}, @@ -165,7 +166,7 @@ pub async fn update_player( pub async fn get_ban_during( conn: &C, player_id: u32, - at: chrono::NaiveDateTime, + at: DateTime, ) -> RecordsResult> { banishments::Entity::find() .filter( @@ -205,11 +206,20 @@ async fn finished_impl( params: ExpandedInsertRecordParams<'_>, player_login: &str, map: &maps::Model, + records_notifier: &RecordsNotifier, ) -> RecordsResult where C: ConnectionTrait + TransactionTrait + StreamTrait, { - let res = pf::finished(conn, redis_pool, params, player_login, map).await?; + let res = pf::finished( + conn, + redis_pool, + params, + player_login, + map, + records_notifier, + ) + .await?; // If the record isn't in an event context, save the record to the events that have the map // and allow records saving without an event context. @@ -272,9 +282,19 @@ pub async fn finished_at_with_pool( mode_version: Option, login: String, body: pf::HasFinishedBody, - at: chrono::NaiveDateTime, -) -> RecordsResult { - let res = finished_at(&db.sql_conn, &db.redis_pool, mode_version, login, body, at).await?; + at: DateTime, + records_notifier: &RecordsNotifier, +) -> RecordsResult { + let res = finished_at( + &db.sql_conn, + &db.redis_pool, + mode_version, + login, + body, + at, + records_notifier, + ) + .await?; Ok(res) } @@ -284,8 +304,9 @@ pub async fn finished_at( mode_version: Option, login: String, body: pf::HasFinishedBody, - at: chrono::NaiveDateTime, -) -> RecordsResult + use> + at: DateTime, + records_notifier: &RecordsNotifier, +) -> RecordsResult where C: TransactionTrait + ConnectionTrait + StreamTrait, { @@ -298,7 +319,8 @@ where mode_version, }; - let res: pf::FinishedOutput = finished_impl(conn, redis_pool, params, &login, &map).await?; + let res: pf::FinishedOutput = + finished_impl(conn, redis_pool, params, &login, &map, records_notifier).await?; json(res.res) } @@ -310,13 +332,15 @@ async fn finished( MPAuthGuard { login }: MPAuthGuard, db: Res, body: pf::PlayerFinishedBody, + Res(records_notifier): Res, ) -> RecordsResult { finished_at_with_pool( db.0, mode_version.map(|x| x.0), login, body.0, - chrono::Utc::now().naive_utc(), + chrono::Utc::now(), + &records_notifier, ) .await } diff --git a/crates/game_api/src/http/player_finished.rs b/crates/game_api/src/http/player_finished.rs index 4ff8fc7..14b540b 100644 --- a/crates/game_api/src/http/player_finished.rs +++ b/crates/game_api/src/http/player_finished.rs @@ -1,10 +1,12 @@ use crate::{ApiErrorKind, RecordsResult, RecordsResultExt}; use actix_web::web::Json; +use chrono::{DateTime, Utc}; use deadpool_redis::redis; use entity::{checkpoint_times, event_edition_records, maps, records, types}; use records_lib::{ NullableInteger, RedisPool, opt_event::OptEvent, ranks, redis_key::map_key, sync, }; +use records_notifier::{NewRecordEvent, NewRecordMap, NewRecordPlayer, RecordsNotifier}; use sea_orm::{ ActiveValue::Set, ColumnTrait as _, ConnectionTrait, EntityTrait, QueryFilter as _, QueryOrder, QuerySelect, QueryTrait, TransactionTrait, @@ -104,7 +106,7 @@ async fn send_query( #[derive(Clone, Copy)] pub struct ExpandedInsertRecordParams<'a> { pub body: &'a InsertRecordParams, - pub at: chrono::NaiveDateTime, + pub at: DateTime, pub event: OptEvent<'a>, pub mode_version: Option, } @@ -126,7 +128,7 @@ where SendQueryParam { body: params.body, event_record_id, - at: params.at, + at: params.at.naive_utc(), mode_version: params.mode_version, event: params.event, }, @@ -196,6 +198,7 @@ pub async fn finished( params: ExpandedInsertRecordParams<'_>, player_login: &str, map: &maps::Model, + records_notifier: &RecordsNotifier, ) -> RecordsResult where C: ConnectionTrait + TransactionTrait, @@ -250,6 +253,10 @@ where // N.B.: we must update the time in Redis **after** the SQL transaction is committed, so // that any other operation acting on the same leaderboard doesn't update the ZSET with an // outdated version. + // + // If the Redis time were updated before the SQL transaction, a race condition could occur + // where other operations update the same leaderboard after the Redis update, but before + // the transaction finishes. let mut pipe = redis::pipe(); pipe.atomic(); @@ -261,7 +268,26 @@ where let mut redis_conn = redis_pool.get().await.with_api_err()?; let (count,): (i32,) = pipe.query_async(&mut redis_conn).await.with_api_err()?; - count + 1 + let new_rank = count + 1; + + records_notifier + .notify_new_record(NewRecordEvent { + record_id: result.record_id, + map: NewRecordMap { + map_uid: map.game_id.clone(), + name: map.name.clone(), + }, + player: NewRecordPlayer { + login: player.login, + name: player.name, + }, + rank: new_rank, + record_date: params.at, + time: new, + }) + .await; + + new_rank } else { ranks::get_rank(&mut redis_conn, map.id, old, params.event) .await diff --git a/crates/game_api/src/http/staggered.rs b/crates/game_api/src/http/staggered.rs index 7bad5e0..e2c6cc5 100644 --- a/crates/game_api/src/http/staggered.rs +++ b/crates/game_api/src/http/staggered.rs @@ -1,6 +1,7 @@ use actix_web::{Responder, Scope, web}; -use chrono::TimeZone; +use chrono::{DateTime, TimeZone, Utc}; use records_lib::Database; +use records_notifier::RecordsNotifier; use crate::{ RecordsResult, Res, @@ -25,11 +26,8 @@ struct Staggered { } impl Staggered { - fn get_time(&self) -> chrono::NaiveDateTime { - chrono::Utc - .timestamp_opt(self.req_tstp, 0) - .unwrap() - .naive_utc() + fn get_datetime(&self) -> DateTime { + chrono::Utc.timestamp_opt(self.req_tstp, 0).unwrap() } } @@ -42,9 +40,18 @@ async fn staggered_finished( MPAuthGuard { login }: MPAuthGuard, db: Res, body: StaggeredBody, + Res(records_notifier): Res, ) -> RecordsResult { - let time = body.get_time(); - player::finished_at_with_pool(db.0, mode_version.map(|x| x.0), login, body.0.body, time).await + let time = body.get_datetime(); + player::finished_at_with_pool( + db.0, + mode_version.map(|x| x.0), + login, + body.0.body, + time, + &records_notifier, + ) + .await } #[inline(always)] @@ -54,7 +61,17 @@ async fn staggered_edition_finished( path: web::Path<(String, u32)>, body: StaggeredBody, mode_version: crate::ModeVersion, + Res(records_notifier): Res, ) -> RecordsResult { - let time = body.get_time(); - event::edition_finished_at(login, db, path, body.0.body, time, Some(mode_version.0)).await + let time = body.get_datetime(); + event::edition_finished_at( + login, + db, + path, + body.0.body, + time, + Some(mode_version.0), + &records_notifier, + ) + .await } diff --git a/crates/game_api/src/utils.rs b/crates/game_api/src/utils.rs index 6996898..a9be4cb 100644 --- a/crates/game_api/src/utils.rs +++ b/crates/game_api/src/utils.rs @@ -3,9 +3,7 @@ use std::{ ops::{Deref, DerefMut}, }; -use actix_web::{ - FromRequest, HttpRequest, HttpResponse, Responder, body::MessageBody, dev::Payload, -}; +use actix_web::{FromRequest, HttpRequest, HttpResponse, dev::Payload}; use entity::{api_status, api_status_history, types}; use records_lib::Database; use sea_orm::{ @@ -157,24 +155,3 @@ impl FromRequest for ExtractDbConn { ) } } - -pub enum Either { - Left(L), - Right(R), -} - -impl Responder for Either -where - B: MessageBody + 'static, - L: Responder, - R: Responder, -{ - type Body = B; - - fn respond_to(self, req: &actix_web::HttpRequest) -> actix_web::HttpResponse { - match self { - Either::Left(l) => ::respond_to(l, req), - Either::Right(r) => ::respond_to(r, req), - } - } -} diff --git a/crates/graphql-api/Cargo.toml b/crates/graphql-api/Cargo.toml index 5dc69ec..69404b6 100644 --- a/crates/graphql-api/Cargo.toml +++ b/crates/graphql-api/Cargo.toml @@ -20,6 +20,7 @@ sha2 = "0.10.9" tokio = { workspace = true, features = ["macros"] } itertools.workspace = true serde_json.workspace = true +records-notifier = { path = "../records-notifier" } [dev-dependencies] test-env = { path = "../test-env" } diff --git a/crates/graphql-api/src/lib.rs b/crates/graphql-api/src/lib.rs index e1229a5..5aa40c2 100644 --- a/crates/graphql-api/src/lib.rs +++ b/crates/graphql-api/src/lib.rs @@ -2,6 +2,7 @@ pub mod error; pub mod loaders; pub mod objects; pub mod schema; +pub mod subscriptions; pub mod cursors; diff --git a/crates/graphql-api/src/schema.rs b/crates/graphql-api/src/schema.rs index c6bf4d2..a825bd6 100644 --- a/crates/graphql-api/src/schema.rs +++ b/crates/graphql-api/src/schema.rs @@ -1,8 +1,8 @@ use async_graphql::{ - EmptyMutation, EmptySubscription, SchemaBuilder, dataloader::DataLoader, - extensions::ApolloTracing, + EmptyMutation, SchemaBuilder, dataloader::DataLoader, extensions::ApolloTracing, }; use records_lib::Database; +use records_notifier::{LatestRecordsSubscription, RecordsNotifier}; use crate::{ loaders::{ @@ -10,30 +10,30 @@ use crate::{ player::PlayerLoader, }, objects::root::QueryRoot, + subscriptions::root::SubscriptionRoot, }; -pub type Schema = async_graphql::Schema< - QueryRoot, - async_graphql::EmptyMutation, - async_graphql::EmptySubscription, ->; +pub type Schema = async_graphql::Schema; -fn create_schema_impl() -> SchemaBuilder { - async_graphql::Schema::build( - QueryRoot, - async_graphql::EmptyMutation, - async_graphql::EmptySubscription, - ) +fn create_schema_impl( + records_sub: LatestRecordsSubscription, +) -> SchemaBuilder { + async_graphql::Schema::build(QueryRoot, EmptyMutation, SubscriptionRoot::new(records_sub)) } pub fn create_schema_standalone() -> Schema { - create_schema_impl().finish() + let dummy = RecordsNotifier::default(); + create_schema_impl(dummy.get_subscription()).finish() } -pub fn create_schema(db: Database, client: reqwest::Client) -> Schema { +pub fn create_schema( + db: Database, + client: reqwest::Client, + records_sub: LatestRecordsSubscription, +) -> Schema { let db_clone = db.clone(); - create_schema_impl() + create_schema_impl(records_sub) .extension(ApolloTracing) .data(DataLoader::new( PlayerLoader(db.clone().sql_conn), diff --git a/crates/graphql-api/src/subscriptions/mod.rs b/crates/graphql-api/src/subscriptions/mod.rs new file mode 100644 index 0000000..dec16f3 --- /dev/null +++ b/crates/graphql-api/src/subscriptions/mod.rs @@ -0,0 +1 @@ +pub mod root; diff --git a/crates/graphql-api/src/subscriptions/root.rs b/crates/graphql-api/src/subscriptions/root.rs new file mode 100644 index 0000000..59adb14 --- /dev/null +++ b/crates/graphql-api/src/subscriptions/root.rs @@ -0,0 +1,48 @@ +use async_graphql::Subscription; +use entity::records; +use futures::{Stream, StreamExt as _}; +use records_lib::internal; +use records_notifier::LatestRecordsSubscription; +use sea_orm::{DbConn, EntityTrait}; + +use crate::{error::GqlResult, objects::ranked_record::RankedRecord}; + +pub struct SubscriptionRoot { + records_sub: LatestRecordsSubscription, +} + +impl SubscriptionRoot { + pub fn new(records_sub: LatestRecordsSubscription) -> Self { + Self { records_sub } + } +} + +#[Subscription] +impl SubscriptionRoot { + async fn latest_records( + &self, + ctx: &async_graphql::Context<'_>, + ) -> impl Stream> { + let db = ctx.data_unchecked::(); + self.records_sub + .subscribe_new_client() + .then(move |new_record| async move { + let record = records::Entity::find_by_id(new_record.record_id) + .one(db) + .await? + .ok_or_else(|| { + internal!( + "new record yielded by stream returned an invalid record id: {}", + new_record.record_id + ) + })?; + + Ok(RankedRecord { + inner: records::RankedRecord { + rank: new_record.rank, + record, + }, + }) + }) + } +} diff --git a/crates/records-notifier/Cargo.toml b/crates/records-notifier/Cargo.toml new file mode 100644 index 0000000..52b1066 --- /dev/null +++ b/crates/records-notifier/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "records-notifier" +version = "0.1.0" +edition = "2024" + +[dependencies] +chrono.workspace = true +futures.workspace = true +parking_lot = "0.12.5" +serde = { workspace = true, features = ["derive"] } +tokio = { workspace = true, features = ["sync"] } +tokio-stream = { version = "0.1.18", features = ["sync"] } diff --git a/crates/records-notifier/src/lib.rs b/crates/records-notifier/src/lib.rs new file mode 100644 index 0000000..9887db2 --- /dev/null +++ b/crates/records-notifier/src/lib.rs @@ -0,0 +1,92 @@ +use std::{ + pin::Pin, + sync::Arc, + task::{Context, Poll, ready}, +}; + +use chrono::{DateTime, Utc}; +use futures::Stream; +use tokio::sync::broadcast; +use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}; + +#[derive(Clone, Debug, serde::Serialize)] +pub struct NewRecordPlayer { + pub login: String, + pub name: String, +} + +#[derive(Clone, Debug, serde::Serialize)] +pub struct NewRecordMap { + pub map_uid: String, + pub name: String, +} + +#[derive(Clone, Debug, serde::Serialize)] +pub struct NewRecordEvent { + pub record_id: u32, + pub rank: i32, + pub player: NewRecordPlayer, + pub map: NewRecordMap, + pub time: i32, + pub record_date: DateTime, +} + +struct LatestRecordsStream { + inner: BroadcastStream, +} + +impl Stream for LatestRecordsStream { + type Item = NewRecordEvent; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // SAFETY: we're retrieving a field of `Self` + let rx = unsafe { self.map_unchecked_mut(|f| &mut f.inner) }; + match ready!(rx.poll_next(cx)) { + Some(Ok(record)) => Poll::Ready(Some(record)), + Some(Err(BroadcastStreamRecvError::Lagged(_))) => Poll::Pending, + None => Poll::Ready(None), + } + } +} + +#[derive(Clone)] +struct Inner { + tx: Arc>, +} + +#[derive(Clone, Default)] +pub struct RecordsNotifier { + inner: Inner, +} + +impl Default for Inner { + fn default() -> Self { + let (tx, _rx) = broadcast::channel(10); + Self { tx: Arc::new(tx) } + } +} + +impl RecordsNotifier { + pub fn get_subscription(&self) -> LatestRecordsSubscription { + LatestRecordsSubscription { + inner: self.inner.clone(), + } + } + + pub async fn notify_new_record(&self, event: NewRecordEvent) { + // We ignore if any receiver received the event or not. + let _ = self.inner.tx.send(event); + } +} + +#[derive(Clone)] +pub struct LatestRecordsSubscription { + inner: Inner, +} + +impl LatestRecordsSubscription { + pub fn subscribe_new_client(&self) -> impl Stream { + let rx = self.inner.tx.subscribe(); + LatestRecordsStream { inner: rx.into() } + } +} From 4a92762255b1df1899a00411ab79606b5589618f Mon Sep 17 00:00:00 2001 From: Ahmad Baalbaky Date: Mon, 16 Mar 2026 00:05:17 +0100 Subject: [PATCH 2/6] refactor: records notifier * Move the records notifier from its crate into a module of the `records-lib` crate. * Add documentations to the various items of this new module, with a unit test. * Fix bug of the creation of many records notifier when constructing the web server, instead of a single one. Refs: #124. --- Cargo.toml | 1 - crates/game_api/Cargo.toml | 1 - crates/game_api/src/configure.rs | 6 +- crates/game_api/src/env.rs | 9 - crates/game_api/src/graphql.rs | 10 +- crates/game_api/src/http/event.rs | 3 +- crates/game_api/src/http/player.rs | 6 +- crates/game_api/src/http/player_finished.rs | 38 +-- crates/game_api/src/http/staggered.rs | 3 +- crates/game_api/src/main.rs | 6 +- crates/game_api/tests/base.rs | 4 +- crates/graphql-api/Cargo.toml | 1 - crates/graphql-api/src/schema.rs | 6 +- crates/graphql-api/src/subscriptions/root.rs | 3 +- crates/records-notifier/Cargo.toml | 12 - crates/records-notifier/src/lib.rs | 92 ------- crates/records_lib/Cargo.toml | 4 + crates/records_lib/src/lib.rs | 3 +- crates/records_lib/src/records_notifier.rs | 249 +++++++++++++++++++ 19 files changed, 300 insertions(+), 157 deletions(-) delete mode 100644 crates/records-notifier/Cargo.toml delete mode 100644 crates/records-notifier/src/lib.rs create mode 100644 crates/records_lib/src/records_notifier.rs diff --git a/Cargo.toml b/Cargo.toml index 377beea..b108414 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,6 @@ members = [ "crates/migration", "crates/player-map-ranking", "crates/records_lib", - "crates/records-notifier", "crates/request_filter", "crates/socc", "crates/test-env", diff --git a/crates/game_api/Cargo.toml b/crates/game_api/Cargo.toml index bce2450..5c401c0 100644 --- a/crates/game_api/Cargo.toml +++ b/crates/game_api/Cargo.toml @@ -42,7 +42,6 @@ migration = { path = "../migration" } entity = { path = "../entity" } graphql-api = { path = "../graphql-api" } serde_json = { workspace = true } -records-notifier = { path = "../records-notifier" } [dev-dependencies] test-env = { path = "../test-env" } diff --git a/crates/game_api/src/configure.rs b/crates/game_api/src/configure.rs index 8e6eb8d..50a4fed 100644 --- a/crates/game_api/src/configure.rs +++ b/crates/game_api/src/configure.rs @@ -12,8 +12,7 @@ use actix_web::{ }; use dsc_webhook::{FormattedRequestHead, WebhookBody, WebhookBodyEmbed, WebhookBodyEmbedField}; use mkenv::prelude::*; -use records_lib::{Database, pool::clone_dbconn}; -use records_notifier::RecordsNotifier; +use records_lib::{Database, pool::clone_dbconn, records_notifier::RecordsNotifier}; use tracing_actix_web::{DefaultRootSpanBuilder, RequestId}; use crate::{ApiErrorKind, RecordsErrorKindResponse, RecordsResult, Res, TracedError}; @@ -235,13 +234,12 @@ impl tracing_actix_web::RootSpanBuilder for RootSpanBuilder { } } -pub fn configure(cfg: &mut web::ServiceConfig, db: Database) { +pub fn configure(cfg: &mut web::ServiceConfig, db: Database, records_notifier: RecordsNotifier) { let client = reqwest::Client::builder() .timeout(Duration::from_secs(5)) .build() .unwrap(); - let records_notifier = RecordsNotifier::default(); let records_subscription = records_notifier.get_subscription(); cfg.app_data(web::Data::new(crate::AuthState::default())) diff --git a/crates/game_api/src/env.rs b/crates/game_api/src/env.rs index 05b58d3..9fc9355 100644 --- a/crates/game_api/src/env.rs +++ b/crates/game_api/src/env.rs @@ -126,15 +126,6 @@ mkenv::make_config! { default_val_fmt: "empty", }, - pub gql_endpoint: { - var_name: "GQL_ENDPOINT", - layers: [ - or_default_val(|| "/graphql".to_owned()), - ], - description: "The route to the GraphQL endpoint (e.g. /graphql)", - default_val_fmt: "/graphql", - }, - pub wh_rank_compute_err: { var_name: "WEBHOOK_RANK_COMPUTE_ERROR", layers: [or_default()], diff --git a/crates/game_api/src/graphql.rs b/crates/game_api/src/graphql.rs index 0cd2fb1..dcb4c29 100644 --- a/crates/game_api/src/graphql.rs +++ b/crates/game_api/src/graphql.rs @@ -6,9 +6,8 @@ use async_graphql::http::{GraphQLPlaygroundConfig, playground_source}; use async_graphql_actix_web::{GraphQLRequest, GraphQLSubscription}; use graphql_api::error::{ApiGqlError, ApiGqlErrorKind}; use graphql_api::schema::{Schema, create_schema}; -use mkenv::prelude::*; use records_lib::Database; -use records_notifier::LatestRecordsSubscription; +use records_lib::records_notifier::LatestRecordsSubscription; use reqwest::Client; use tracing_actix_web::RequestId; @@ -80,9 +79,10 @@ async fn index_graphql( async fn index_playground() -> impl Responder { HttpResponse::Ok() .content_type("text/html; charset=utf-8") - .body(playground_source(GraphQLPlaygroundConfig::new( - &crate::env().gql_endpoint.get(), - ))) + .body(playground_source( + GraphQLPlaygroundConfig::new("/graphql") + .subscription_endpoint("/graphql/subscriptions"), + )) } async fn index_subscriptions( diff --git a/crates/game_api/src/http/event.rs b/crates/game_api/src/http/event.rs index 41ef48a..2c18ea7 100644 --- a/crates/game_api/src/http/event.rs +++ b/crates/game_api/src/http/event.rs @@ -20,10 +20,11 @@ use records_lib::{ event::{self, EventMap}, opt_event::OptEvent, player, + records_notifier::RecordsNotifier, redis_key::alone_map_key, sync, }; -use records_notifier::RecordsNotifier; + use sea_orm::{ ActiveValue::Set, ColumnTrait as _, ConnectionTrait, EntityTrait, FromQueryResult, QueryFilter, QueryOrder, diff --git a/crates/game_api/src/http/player.rs b/crates/game_api/src/http/player.rs index 0836ba5..afbf7c9 100644 --- a/crates/game_api/src/http/player.rs +++ b/crates/game_api/src/http/player.rs @@ -9,8 +9,10 @@ use deadpool_redis::redis::AsyncCommands as _; use entity::{banishments, current_bans, maps, players, records, role, types}; use futures::TryStreamExt; use mkenv::prelude::*; -use records_lib::{Database, RedisPool, must, player, redis_key::alone_map_key, sync}; -use records_notifier::RecordsNotifier; +use records_lib::{ + Database, RedisPool, must, player, records_notifier::RecordsNotifier, redis_key::alone_map_key, + sync, +}; use reqwest::Client; use sea_orm::{ ActiveValue::{Set, Unchanged}, diff --git a/crates/game_api/src/http/player_finished.rs b/crates/game_api/src/http/player_finished.rs index 14b540b..fe58581 100644 --- a/crates/game_api/src/http/player_finished.rs +++ b/crates/game_api/src/http/player_finished.rs @@ -4,9 +4,13 @@ use chrono::{DateTime, Utc}; use deadpool_redis::redis; use entity::{checkpoint_times, event_edition_records, maps, records, types}; use records_lib::{ - NullableInteger, RedisPool, opt_event::OptEvent, ranks, redis_key::map_key, sync, + NullableInteger, RedisPool, + opt_event::OptEvent, + ranks, + records_notifier::{NewRecordEvent, NewRecordMap, NewRecordPlayer, RecordsNotifier}, + redis_key::map_key, + sync, }; -use records_notifier::{NewRecordEvent, NewRecordMap, NewRecordPlayer, RecordsNotifier}; use sea_orm::{ ActiveValue::Set, ColumnTrait as _, ConnectionTrait, EntityTrait, QueryFilter as _, QueryOrder, QuerySelect, QueryTrait, TransactionTrait, @@ -270,22 +274,20 @@ where let (count,): (i32,) = pipe.query_async(&mut redis_conn).await.with_api_err()?; let new_rank = count + 1; - records_notifier - .notify_new_record(NewRecordEvent { - record_id: result.record_id, - map: NewRecordMap { - map_uid: map.game_id.clone(), - name: map.name.clone(), - }, - player: NewRecordPlayer { - login: player.login, - name: player.name, - }, - rank: new_rank, - record_date: params.at, - time: new, - }) - .await; + records_notifier.notify_new_record(NewRecordEvent { + record_id: result.record_id, + map: NewRecordMap { + map_uid: map.game_id.clone(), + name: map.name.clone(), + }, + player: NewRecordPlayer { + login: player.login, + name: player.name, + }, + rank: new_rank, + record_date: params.at, + time: new, + }); new_rank } else { diff --git a/crates/game_api/src/http/staggered.rs b/crates/game_api/src/http/staggered.rs index e2c6cc5..17f73d5 100644 --- a/crates/game_api/src/http/staggered.rs +++ b/crates/game_api/src/http/staggered.rs @@ -1,7 +1,6 @@ use actix_web::{Responder, Scope, web}; use chrono::{DateTime, TimeZone, Utc}; -use records_lib::Database; -use records_notifier::RecordsNotifier; +use records_lib::{Database, records_notifier::RecordsNotifier}; use crate::{ RecordsResult, Res, diff --git a/crates/game_api/src/main.rs b/crates/game_api/src/main.rs index 993b0c4..a2e14a7 100644 --- a/crates/game_api/src/main.rs +++ b/crates/game_api/src/main.rs @@ -19,7 +19,7 @@ use game_api_lib::configure::{ }; use migration::MigratorTrait; use mkenv::prelude::*; -use records_lib::Database; +use records_lib::{Database, records_notifier::RecordsNotifier}; use tracing::level_filters::LevelFilter; use tracing_actix_web::TracingLogger; use tracing_subscriber::{EnvFilter, fmt::format::FmtSpan}; @@ -82,6 +82,8 @@ async fn main() -> anyhow::Result<()> { .build() .unwrap(); + let records_notifier = RecordsNotifier::default(); + HttpServer::new(move || { let cors = Cors::default() .supports_credentials() @@ -116,7 +118,7 @@ async fn main() -> anyhow::Result<()> { )) .build(), ) - .configure(|cfg| configure::configure(cfg, db.clone())) + .configure(|cfg| configure::configure(cfg, db.clone(), records_notifier.clone())) }) .bind(("0.0.0.0", game_api_lib::env().port.get())) .context("Cannot bind address")? diff --git a/crates/game_api/tests/base.rs b/crates/game_api/tests/base.rs index 811b287..e2b095b 100644 --- a/crates/game_api/tests/base.rs +++ b/crates/game_api/tests/base.rs @@ -9,7 +9,7 @@ use actix_web::{ dev::{Service, ServiceResponse}, middleware, test, }; -use records_lib::Database; +use records_lib::{Database, records_notifier::RecordsNotifier}; use test_env::IntoResult; use tracing_actix_web::TracingLogger; @@ -41,7 +41,7 @@ pub async fn get_app( App::new() .wrap(middleware::from_fn(configure::fit_request_id)) .wrap(TracingLogger::::new()) - .configure(|cfg| configure::configure(cfg, db.clone())), + .configure(|cfg| configure::configure(cfg, db.clone(), RecordsNotifier::default())), ) .await } diff --git a/crates/graphql-api/Cargo.toml b/crates/graphql-api/Cargo.toml index 69404b6..5dc69ec 100644 --- a/crates/graphql-api/Cargo.toml +++ b/crates/graphql-api/Cargo.toml @@ -20,7 +20,6 @@ sha2 = "0.10.9" tokio = { workspace = true, features = ["macros"] } itertools.workspace = true serde_json.workspace = true -records-notifier = { path = "../records-notifier" } [dev-dependencies] test-env = { path = "../test-env" } diff --git a/crates/graphql-api/src/schema.rs b/crates/graphql-api/src/schema.rs index a825bd6..338aa17 100644 --- a/crates/graphql-api/src/schema.rs +++ b/crates/graphql-api/src/schema.rs @@ -1,8 +1,10 @@ use async_graphql::{ EmptyMutation, SchemaBuilder, dataloader::DataLoader, extensions::ApolloTracing, }; -use records_lib::Database; -use records_notifier::{LatestRecordsSubscription, RecordsNotifier}; +use records_lib::{ + Database, + records_notifier::{LatestRecordsSubscription, RecordsNotifier}, +}; use crate::{ loaders::{ diff --git a/crates/graphql-api/src/subscriptions/root.rs b/crates/graphql-api/src/subscriptions/root.rs index 59adb14..152f3f9 100644 --- a/crates/graphql-api/src/subscriptions/root.rs +++ b/crates/graphql-api/src/subscriptions/root.rs @@ -1,8 +1,7 @@ use async_graphql::Subscription; use entity::records; use futures::{Stream, StreamExt as _}; -use records_lib::internal; -use records_notifier::LatestRecordsSubscription; +use records_lib::{internal, records_notifier::LatestRecordsSubscription}; use sea_orm::{DbConn, EntityTrait}; use crate::{error::GqlResult, objects::ranked_record::RankedRecord}; diff --git a/crates/records-notifier/Cargo.toml b/crates/records-notifier/Cargo.toml deleted file mode 100644 index 52b1066..0000000 --- a/crates/records-notifier/Cargo.toml +++ /dev/null @@ -1,12 +0,0 @@ -[package] -name = "records-notifier" -version = "0.1.0" -edition = "2024" - -[dependencies] -chrono.workspace = true -futures.workspace = true -parking_lot = "0.12.5" -serde = { workspace = true, features = ["derive"] } -tokio = { workspace = true, features = ["sync"] } -tokio-stream = { version = "0.1.18", features = ["sync"] } diff --git a/crates/records-notifier/src/lib.rs b/crates/records-notifier/src/lib.rs deleted file mode 100644 index 9887db2..0000000 --- a/crates/records-notifier/src/lib.rs +++ /dev/null @@ -1,92 +0,0 @@ -use std::{ - pin::Pin, - sync::Arc, - task::{Context, Poll, ready}, -}; - -use chrono::{DateTime, Utc}; -use futures::Stream; -use tokio::sync::broadcast; -use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}; - -#[derive(Clone, Debug, serde::Serialize)] -pub struct NewRecordPlayer { - pub login: String, - pub name: String, -} - -#[derive(Clone, Debug, serde::Serialize)] -pub struct NewRecordMap { - pub map_uid: String, - pub name: String, -} - -#[derive(Clone, Debug, serde::Serialize)] -pub struct NewRecordEvent { - pub record_id: u32, - pub rank: i32, - pub player: NewRecordPlayer, - pub map: NewRecordMap, - pub time: i32, - pub record_date: DateTime, -} - -struct LatestRecordsStream { - inner: BroadcastStream, -} - -impl Stream for LatestRecordsStream { - type Item = NewRecordEvent; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // SAFETY: we're retrieving a field of `Self` - let rx = unsafe { self.map_unchecked_mut(|f| &mut f.inner) }; - match ready!(rx.poll_next(cx)) { - Some(Ok(record)) => Poll::Ready(Some(record)), - Some(Err(BroadcastStreamRecvError::Lagged(_))) => Poll::Pending, - None => Poll::Ready(None), - } - } -} - -#[derive(Clone)] -struct Inner { - tx: Arc>, -} - -#[derive(Clone, Default)] -pub struct RecordsNotifier { - inner: Inner, -} - -impl Default for Inner { - fn default() -> Self { - let (tx, _rx) = broadcast::channel(10); - Self { tx: Arc::new(tx) } - } -} - -impl RecordsNotifier { - pub fn get_subscription(&self) -> LatestRecordsSubscription { - LatestRecordsSubscription { - inner: self.inner.clone(), - } - } - - pub async fn notify_new_record(&self, event: NewRecordEvent) { - // We ignore if any receiver received the event or not. - let _ = self.inner.tx.send(event); - } -} - -#[derive(Clone)] -pub struct LatestRecordsSubscription { - inner: Inner, -} - -impl LatestRecordsSubscription { - pub fn subscribe_new_client(&self) -> impl Stream { - let rx = self.inner.tx.subscribe(); - LatestRecordsStream { inner: rx.into() } - } -} diff --git a/crates/records_lib/Cargo.toml b/crates/records_lib/Cargo.toml index f2e1863..5f49976 100644 --- a/crates/records_lib/Cargo.toml +++ b/crates/records_lib/Cargo.toml @@ -20,6 +20,7 @@ nom = { workspace = true } rand = { workspace = true } sea-orm = { workspace = true } entity = { path = "../entity" } +tokio-stream = { version = "0.1.18", features = ["sync"] } [features] default = [] @@ -31,3 +32,6 @@ mock = ["sea-orm/mock"] [build-dependencies] rustc_version = "0.4.1" + +[dev-dependencies] +tokio = { workspace = true, features = ["macros", "rt", "time"] } diff --git a/crates/records_lib/src/lib.rs b/crates/records_lib/src/lib.rs index 09bc1b6..169e688 100644 --- a/crates/records_lib/src/lib.rs +++ b/crates/records_lib/src/lib.rs @@ -23,9 +23,10 @@ pub mod opt_event; pub mod player; pub mod pool; pub mod ranks; +pub mod records_notifier; pub mod redis_key; -pub mod time; pub mod sync; +pub mod time; /// The MySQL/MariaDB pool type. pub type MySqlPool = sqlx::MySqlPool; diff --git a/crates/records_lib/src/records_notifier.rs b/crates/records_lib/src/records_notifier.rs new file mode 100644 index 0000000..f911fd5 --- /dev/null +++ b/crates/records_lib/src/records_notifier.rs @@ -0,0 +1,249 @@ +//! Module related to new records notification. + +use std::{ + pin::Pin, + sync::Arc, + task::{Context, Poll, ready}, +}; + +use chrono::{DateTime, Utc}; +use futures::Stream; +use tokio::sync::broadcast; +use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}; + +/// The type representing the player in a [`NewRecordEvent`]. +#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize)] +pub struct NewRecordPlayer { + /// The login of the player. + pub login: String, + /// The name of the player. + pub name: String, +} + +/// The type representing the map in a [`NewRecordEvent`]. +#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize)] +pub struct NewRecordMap { + /// The UID of the map. + pub map_uid: String, + /// The name of the map. + pub name: String, +} + +/// The type yielded when a new record is notified. +#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize)] +pub struct NewRecordEvent { + /// The record ID. + pub record_id: u32, + /// The rank of the record in the map. + pub rank: i32, + /// The player who made the record. + pub player: NewRecordPlayer, + /// The map on which the record is set. + pub map: NewRecordMap, + /// The time of the run. + pub time: i32, + /// The date of the record. + pub record_date: DateTime, +} + +struct LatestRecordsStream { + inner: BroadcastStream, +} + +impl Stream for LatestRecordsStream { + type Item = NewRecordEvent; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // SAFETY: we're retrieving a field of `Self` + let rx = unsafe { self.map_unchecked_mut(|f| &mut f.inner) }; + match ready!(rx.poll_next(cx)) { + Some(Ok(record)) => Poll::Ready(Some(record)), + Some(Err(BroadcastStreamRecvError::Lagged(_))) => { + // According to the broadcast receiver documentation, attempting to receive again will + // return the oldest message still retained in the channel. So the next poll is ready. + cx.waker().wake_by_ref(); + Poll::Pending + } + None => Poll::Ready(None), + } + } +} + +/// Shared state between the notifier and its subscriptions. +#[derive(Clone)] +struct Shared { + tx: Arc>, +} + +/// Represents the source that will send the notifications of the new records. +#[derive(Clone, Default)] +pub struct RecordsNotifier { + inner: Shared, +} + +impl Default for Shared { + fn default() -> Self { + let (tx, _rx) = broadcast::channel(10); + Self { tx: Arc::new(tx) } + } +} + +impl RecordsNotifier { + /// Returns a subscription from this source, to be able to listen for record notifications. + pub fn get_subscription(&self) -> LatestRecordsSubscription { + LatestRecordsSubscription { + inner: self.inner.clone(), + } + } + + /// Notifies to every subscriptions that a new record happened. + pub fn notify_new_record(&self, event: NewRecordEvent) { + // We ignore if any receiver received the event or not. + let _ = self.inner.tx.send(event); + } +} + +/// Represents a listener for a new record notification. +#[derive(Clone)] +pub struct LatestRecordsSubscription { + inner: Shared, +} + +impl LatestRecordsSubscription { + /// Returns a stream from this subscription, yielding each new record. + /// + /// The stream returns `None` if this subscription and the notifier from which it was created + /// are both dropped. + pub fn subscribe_new_client(&self) -> impl Stream + 'static { + let rx = self.inner.tx.subscribe(); + LatestRecordsStream { inner: rx.into() } + } +} + +#[cfg(test)] +mod tests { + use std::{ + sync::{Arc, Mutex}, + time::Duration, + }; + + use chrono::{TimeZone, Utc}; + use tokio::{sync::mpsc, task, time}; + use tokio_stream::StreamExt; + + use crate::records_notifier::{ + LatestRecordsSubscription, NewRecordEvent, NewRecordMap, NewRecordPlayer, RecordsNotifier, + }; + + async fn timeout(f: F) -> Option + where + F: Future, + { + time::timeout(Duration::from_secs(1), f).await.ok() + } + + struct Listener { + rx: mpsc::Receiver<()>, + task: task::JoinHandle<()>, + results: Arc>>, + } + + impl Listener { + fn from_subscription(subscription: &LatestRecordsSubscription) -> Self { + let (tx, rx) = mpsc::channel(1); + let stream = subscription.subscribe_new_client(); + let results = Arc::new(Mutex::new(Vec::new())); + let results_in_task = Arc::clone(&results); + let task = task::spawn(async move { + let mut stream = stream; + while let Some(record) = stream.next().await { + results_in_task.lock().unwrap().push(record); + tx.send(()).await.unwrap(); + } + }); + + Self { rx, task, results } + } + } + + #[tokio::test] + async fn test_notifier() { + let notifier = RecordsNotifier::default(); + let subscription = notifier.get_subscription(); + + let mut listener1 = Listener::from_subscription(&subscription); + + let record1 = NewRecordEvent { + player: NewRecordPlayer { + login: "foo_player_login".to_owned(), + name: "foo_player_name".to_owned(), + }, + map: NewRecordMap { + map_uid: "foo_map_uid".to_owned(), + name: "foo_map_name".to_owned(), + }, + rank: 1, + record_date: Utc.with_ymd_and_hms(2026, 3, 15, 15, 0, 0).unwrap(), + record_id: 1, + time: 15000, + }; + + notifier.notify_new_record(record1.clone()); + assert!(timeout(listener1.rx.recv()).await.is_some()); + itertools::assert_equal(listener1.results.lock().unwrap().iter(), [&record1]); + + let mut listener2 = Listener::from_subscription(&subscription); + + let record2 = NewRecordEvent { + player: NewRecordPlayer { + login: "bar_player_login".to_owned(), + name: "bar_player_name".to_owned(), + }, + map: NewRecordMap { + map_uid: "bar_map_uid".to_owned(), + name: "bar_map_name".to_owned(), + }, + rank: 2, + record_date: Utc.with_ymd_and_hms(2025, 3, 15, 15, 0, 0).unwrap(), + record_id: 2, + time: 14000, + }; + let record3 = NewRecordEvent { + player: NewRecordPlayer { + login: "foobar_player_login".to_owned(), + name: "foobar_player_name".to_owned(), + }, + map: NewRecordMap { + map_uid: "foobar_map_uid".to_owned(), + name: "foobar_map_name".to_owned(), + }, + rank: 3, + record_date: Utc.with_ymd_and_hms(2024, 3, 15, 15, 0, 0).unwrap(), + record_id: 3, + time: 13000, + }; + + notifier.notify_new_record(record2.clone()); + notifier.notify_new_record(record3.clone()); + + assert!(timeout(listener1.rx.recv()).await.is_some()); + assert!(timeout(listener2.rx.recv()).await.is_some()); + assert!(timeout(listener1.rx.recv()).await.is_some()); + assert!(timeout(listener2.rx.recv()).await.is_some()); + + itertools::assert_equal( + listener1.results.lock().unwrap().iter(), + [&record1, &record2, &record3], + ); + itertools::assert_equal( + listener2.results.lock().unwrap().iter(), + [&record2, &record3], + ); + + drop(subscription); + drop(notifier); + + assert!(timeout(listener1.task).await.is_some()); + assert!(timeout(listener2.task).await.is_some()); + } +} From 98b61a8f6fab7137065c20b5f8a461739bd86bcc Mon Sep 17 00:00:00 2001 From: Ahmad <39441506+ahmadbky@users.noreply.github.com> Date: Mon, 16 Mar 2026 19:39:13 +0100 Subject: [PATCH 3/6] doc: fix grammatical issue Refs: #126. Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- crates/records_lib/src/records_notifier.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/records_lib/src/records_notifier.rs b/crates/records_lib/src/records_notifier.rs index f911fd5..d77fd6b 100644 --- a/crates/records_lib/src/records_notifier.rs +++ b/crates/records_lib/src/records_notifier.rs @@ -96,7 +96,7 @@ impl RecordsNotifier { } } - /// Notifies to every subscriptions that a new record happened. + /// Notifies all subscriptions that a new record happened. pub fn notify_new_record(&self, event: NewRecordEvent) { // We ignore if any receiver received the event or not. let _ = self.inner.tx.send(event); From 34cfc5e1321edcb297df735ef4cdd4fb35706ca1 Mon Sep 17 00:00:00 2001 From: Ahmad Baalbaky Date: Mon, 16 Mar 2026 19:47:11 +0100 Subject: [PATCH 4/6] refactor: use pin-project-lite instead of unsafe Refs: #124. --- crates/records_lib/Cargo.toml | 1 + crates/records_lib/src/records_notifier.rs | 13 ++++++++----- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/crates/records_lib/Cargo.toml b/crates/records_lib/Cargo.toml index 5f49976..22a8ae4 100644 --- a/crates/records_lib/Cargo.toml +++ b/crates/records_lib/Cargo.toml @@ -21,6 +21,7 @@ rand = { workspace = true } sea-orm = { workspace = true } entity = { path = "../entity" } tokio-stream = { version = "0.1.18", features = ["sync"] } +pin-project-lite.workspace = true [features] default = [] diff --git a/crates/records_lib/src/records_notifier.rs b/crates/records_lib/src/records_notifier.rs index d77fd6b..cc528d3 100644 --- a/crates/records_lib/src/records_notifier.rs +++ b/crates/records_lib/src/records_notifier.rs @@ -8,6 +8,7 @@ use std::{ use chrono::{DateTime, Utc}; use futures::Stream; +use pin_project_lite::pin_project; use tokio::sync::broadcast; use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}; @@ -46,17 +47,19 @@ pub struct NewRecordEvent { pub record_date: DateTime, } -struct LatestRecordsStream { - inner: BroadcastStream, +pin_project! { + struct LatestRecordsStream { + #[pin] + inner: BroadcastStream, + } } impl Stream for LatestRecordsStream { type Item = NewRecordEvent; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // SAFETY: we're retrieving a field of `Self` - let rx = unsafe { self.map_unchecked_mut(|f| &mut f.inner) }; - match ready!(rx.poll_next(cx)) { + let this = self.project(); + match ready!(this.inner.poll_next(cx)) { Some(Ok(record)) => Poll::Ready(Some(record)), Some(Err(BroadcastStreamRecvError::Lagged(_))) => { // According to the broadcast receiver documentation, attempting to receive again will From ca8ab597df619b738693158c7b46a1124edafce7 Mon Sep 17 00:00:00 2001 From: Ahmad Baalbaky Date: Mon, 16 Mar 2026 20:51:37 +0100 Subject: [PATCH 5/6] refactor: mask internal errors for subscriptions Refs: #124. --- crates/game_api/src/auth.rs | 2 - crates/game_api/src/graphql.rs | 157 +++++++++++++++++++++------------ 2 files changed, 103 insertions(+), 56 deletions(-) diff --git a/crates/game_api/src/auth.rs b/crates/game_api/src/auth.rs index fc816a6..7a3bf59 100644 --- a/crates/game_api/src/auth.rs +++ b/crates/game_api/src/auth.rs @@ -85,8 +85,6 @@ pub mod privilege { pub const ADMIN: Flags = 0b1111; } -pub const WEB_TOKEN_SESS_KEY: &str = "__obs_web_token"; - /// The state string expires in 5 minutes. /// /// This is typically used to set a timeout for the POST /player/get_token request sent by diff --git a/crates/game_api/src/graphql.rs b/crates/game_api/src/graphql.rs index dcb4c29..f7a4df5 100644 --- a/crates/game_api/src/graphql.rs +++ b/crates/game_api/src/graphql.rs @@ -1,9 +1,13 @@ -use actix_session::Session; +use std::sync::Arc; + +use actix_http::RequestHead; use actix_web::{HttpRequest, Scope, guard, web}; use actix_web::{HttpResponse, Responder}; -use async_graphql::ErrorExtensionValues; use async_graphql::http::{GraphQLPlaygroundConfig, playground_source}; +use async_graphql::{ErrorExtensionValues, Executor}; use async_graphql_actix_web::{GraphQLRequest, GraphQLSubscription}; +use futures::StreamExt; +use futures::stream::BoxStream; use graphql_api::error::{ApiGqlError, ApiGqlErrorKind}; use graphql_api::schema::{Schema, create_schema}; use records_lib::Database; @@ -11,67 +15,102 @@ use records_lib::records_notifier::LatestRecordsSubscription; use reqwest::Client; use tracing_actix_web::RequestId; -use crate::auth::{WEB_TOKEN_SESS_KEY, WebToken}; -use crate::{ApiErrorKind, RecordsResult, Res, configure, internal}; +use crate::{ApiErrorKind, RecordsResult, Res, configure}; -async fn index_graphql( +#[derive(Clone)] +struct ExecutorInventory { + req_head: RequestHead, request_id: RequestId, - client: Res, - req: HttpRequest, - session: Session, - schema: Res, - GraphQLRequest(request): GraphQLRequest, -) -> RecordsResult { - let web_token = session - .get::(WEB_TOKEN_SESS_KEY) - .map_err(|e| internal!("unable to retrieve web token: {e}"))?; - - let request = { - if let Some(web_token) = web_token { - request.data(web_token) - } else { - request - } - }; - - let mut result = schema.execute(request).await; - - for error in &mut result.errors { - tracing::error!("Error encountered when processing GraphQL request: {error:?}"); + client: reqwest::Client, +} - let api_error = error.source::().cloned(); +impl ExecutorInventory { + fn mask_internal_errors( + &self, + mut response: async_graphql::Response, + ) -> async_graphql::Response { + for error in &mut response.errors { + tracing::error!("Error encountered when processing GraphQL request: {error:?}"); - let extensions = error - .extensions - .get_or_insert_with(ErrorExtensionValues::default); + let api_error = error.source::().cloned(); - // Don't expose internal server errors - if let Some(err) = api_error - && let ApiGqlErrorKind::Lib(records_err) = err.kind() - { - let err = ApiErrorKind::Lib(records_err); - let (err_type, status_code) = err.get_err_type_and_status_code(); + let extensions = error + .extensions + .get_or_insert_with(ErrorExtensionValues::default); - let mapped_err_type = if (100..200).contains(&err_type) || status_code.is_server_error() + // Don't expose internal server errors + if let Some(err) = api_error + && let ApiGqlErrorKind::Lib(records_err) = err.kind() { - error.message = "Internal server error".to_owned(); - configure::send_internal_err_msg_detached( - client.0.clone(), - req.head().clone(), - request_id, - err, - ); - - 105 // Unknown type - } else { - err_type - }; - - extensions.set("error_code", mapped_err_type); + let err = ApiErrorKind::Lib(records_err); + let (err_type, status_code) = err.get_err_type_and_status_code(); + + let mapped_err_type = + if (100..200).contains(&err_type) || status_code.is_server_error() { + error.message = "Internal server error".to_owned(); + configure::send_internal_err_msg_detached( + self.client.clone(), + self.req_head.clone(), + self.request_id, + err, + ); + + 105 // Unknown type + } else { + err_type + }; + + extensions.set("error_code", mapped_err_type); + } + + extensions.set("request_id", self.request_id.to_string()); } - extensions.set("request_id", request_id.to_string()); + response + } +} + +#[derive(Clone)] +struct GraphqlApiExecutor { + schema: Schema, + inventory: ExecutorInventory, +} + +impl Executor for GraphqlApiExecutor { + async fn execute(&self, request: async_graphql::Request) -> async_graphql::Response { + let result = Executor::execute(&self.schema, request).await; + self.inventory.mask_internal_errors(result) + } + + fn execute_stream( + &self, + request: async_graphql::Request, + session_data: Option>, + ) -> BoxStream<'static, async_graphql::Response> { + let inventory = self.inventory.clone(); + Executor::execute_stream(&self.schema, request, session_data) + .map(move |response| inventory.mask_internal_errors(response)) + .boxed() } +} + +async fn index_graphql( + request_id: RequestId, + client: Res, + req: HttpRequest, + schema: Res, + GraphQLRequest(request): GraphQLRequest, +) -> RecordsResult { + let executor = GraphqlApiExecutor { + schema: schema.0, + inventory: ExecutorInventory { + req_head: req.head().clone(), + request_id, + client: client.0, + }, + }; + + let result = executor.execute(request).await; Ok(web::Json(result)) } @@ -86,11 +125,21 @@ async fn index_playground() -> impl Responder { } async fn index_subscriptions( + request_id: RequestId, + client: Res, schema: Res, req: HttpRequest, payload: web::Payload, ) -> Result { - GraphQLSubscription::new(Schema::clone(&*schema)).start(&req, payload) + GraphQLSubscription::new(GraphqlApiExecutor { + schema: schema.0, + inventory: ExecutorInventory { + req_head: req.head().clone(), + request_id, + client: client.0, + }, + }) + .start(&req, payload) } pub fn graphql_route( From c753721a06838b42a83bfc8c461c27415946d416 Mon Sep 17 00:00:00 2001 From: Ahmad Baalbaky Date: Mon, 16 Mar 2026 21:42:53 +0100 Subject: [PATCH 6/6] fix: conditional compilation issue Refs: #124. --- crates/game_api/src/http/player/auth/with_auth.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/game_api/src/http/player/auth/with_auth.rs b/crates/game_api/src/http/player/auth/with_auth.rs index 11a708b..ad7923a 100644 --- a/crates/game_api/src/http/player/auth/with_auth.rs +++ b/crates/game_api/src/http/player/auth/with_auth.rs @@ -8,11 +8,13 @@ use tracing::Level; use crate::{ AccessTokenErr, ApiErrorKind, RecordsResult, RecordsResultExt as _, Res, - auth::{self, ApiAvailable, Message, TIMEOUT, WEB_TOKEN_SESS_KEY, WebToken}, + auth::{self, ApiAvailable, Message, TIMEOUT, WebToken}, internal, utils::json, }; +const WEB_TOKEN_SESS_KEY: &str = "__obs_web_token"; + #[derive(serde::Serialize)] struct MPAccessTokenBody<'a> { grant_type: &'a str,