From d2dab45e0dc828311505fd07b68a6d1e01b7d930 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Costa?= Date: Sat, 3 Jan 2026 19:35:47 +0100 Subject: [PATCH] fix: store connection id so we remove the correct builder --- crates/libs/ej-web/src/ctx/ctx_client.rs | 1 + crates/libs/ej-web/src/ejconnected_builder.rs | 3 ++ crates/services/ejd/src/api.rs | 34 +++++++++++-------- crates/services/ejd/src/dispatcher.rs | 1 + 4 files changed, 25 insertions(+), 14 deletions(-) diff --git a/crates/libs/ej-web/src/ctx/ctx_client.rs b/crates/libs/ej-web/src/ctx/ctx_client.rs index 3a4eb31..c661e13 100644 --- a/crates/libs/ej-web/src/ctx/ctx_client.rs +++ b/crates/libs/ej-web/src/ctx/ctx_client.rs @@ -99,6 +99,7 @@ impl CtxClient { builder: self, tx, addr, + connection_id: Uuid::new_v4(), } } } diff --git a/crates/libs/ej-web/src/ejconnected_builder.rs b/crates/libs/ej-web/src/ejconnected_builder.rs index d79dbe0..1ecc548 100644 --- a/crates/libs/ej-web/src/ejconnected_builder.rs +++ b/crates/libs/ej-web/src/ejconnected_builder.rs @@ -4,6 +4,7 @@ use std::net::SocketAddr; use ej_dispatcher_sdk::ejws_message::EjWsServerMessage; use tokio::sync::mpsc::Sender; +use uuid::Uuid; use crate::ctx::ctx_client::CtxClient; @@ -16,4 +17,6 @@ pub struct EjConnectedBuilder { pub tx: Sender, /// The builder's network address. pub addr: SocketAddr, + /// Connection ID + pub connection_id: Uuid, } diff --git a/crates/services/ejd/src/api.rs b/crates/services/ejd/src/api.rs index f56c4cf..883233e 100644 --- a/crates/services/ejd/src/api.rs +++ b/crates/services/ejd/src/api.rs @@ -36,6 +36,7 @@ use ej_web::{ use tokio::{sync::mpsc::channel, task::JoinHandle}; use tower_cookies::{CookieManagerLayer, Cookies}; use tracing::{error, info}; +use uuid::Uuid; use std::net::SocketAddr; use tower_http::{ @@ -256,16 +257,19 @@ async fn builder_handler( /// RAII guard to automatically remove builders from the dispatcher when connections close. struct BuilderGuard { dispatcher: Dispatcher, - index: usize, + connection_id: Uuid, } impl Drop for BuilderGuard { /// Automatically removes the builder from the dispatcher's builder list when dropped. fn drop(&mut self) { let builders = self.dispatcher.builders.clone(); - let index = self.index; + let connection_id = self.connection_id; tokio::spawn(async move { - builders.lock().await.remove(index); + builders + .lock() + .await + .retain(|b| b.connection_id != connection_id); }); } } @@ -273,17 +277,6 @@ impl Drop for BuilderGuard { async fn handle_socket(ctx: Ctx, dispatcher: Dispatcher, mut socket: WebSocket, addr: SocketAddr) { let (tx, mut rx) = channel(2); - let builder_index = { - let mut builders = dispatcher.builders.lock().await; - builders.push(ctx.client.connect(tx.clone(), addr)); - builders.len() - 1 - }; - - let _guard = BuilderGuard { - dispatcher: dispatcher.clone(), - index: builder_index, - }; - if socket .send(Message::Ping(Bytes::from_static(&[1, 2, 3]))) .await @@ -305,6 +298,19 @@ async fn handle_socket(ctx: Ctx, dispatcher: Dispatcher, mut socket: WebSocket, } } + let connection_id = { + let mut builders = dispatcher.builders.lock().await; + let connected_client = ctx.client.connect(tx.clone(), addr); + let connection_id = connected_client.connection_id.clone(); + builders.push(connected_client); + connection_id + }; + + let _guard = BuilderGuard { + dispatcher: dispatcher.clone(), + connection_id, + }; + let (mut sender, mut receiver) = socket.split(); let mut send_task: JoinHandle> = tokio::spawn(async move { diff --git a/crates/services/ejd/src/dispatcher.rs b/crates/services/ejd/src/dispatcher.rs index 2ffece9..848907b 100644 --- a/crates/services/ejd/src/dispatcher.rs +++ b/crates/services/ejd/src/dispatcher.rs @@ -889,6 +889,7 @@ mod test { builder: CtxClient { id: builder_id }, tx, addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(10, 0, 0, 2), 11111)), + connection_id: Uuid::new_v4(), } }