Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/libs/ej-web/src/ctx/ctx_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl CtxClient {
builder: self,
tx,
addr,
connection_id: Uuid::new_v4(),
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions crates/libs/ej-web/src/ejconnected_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::net::SocketAddr;

use ej_dispatcher_sdk::ejws_message::EjWsServerMessage;
use tokio::sync::mpsc::Sender;
use uuid::Uuid;

use crate::ctx::ctx_client::CtxClient;

Expand All @@ -16,4 +17,6 @@ pub struct EjConnectedBuilder {
pub tx: Sender<EjWsServerMessage>,
/// The builder's network address.
pub addr: SocketAddr,
/// Connection ID
pub connection_id: Uuid,
}
34 changes: 20 additions & 14 deletions crates/services/ejd/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use ej_web::{
use tokio::{sync::mpsc::channel, task::JoinHandle};
use tower_cookies::{CookieManagerLayer, Cookies};
use tracing::{error, info};
use uuid::Uuid;

use std::net::SocketAddr;
use tower_http::{
Expand Down Expand Up @@ -256,34 +257,26 @@ async fn builder_handler(
/// RAII guard to automatically remove builders from the dispatcher when connections close.
struct BuilderGuard {
dispatcher: Dispatcher,
index: usize,
connection_id: Uuid,
}

impl Drop for BuilderGuard {
/// Automatically removes the builder from the dispatcher's builder list when dropped.
fn drop(&mut self) {
let builders = self.dispatcher.builders.clone();
let index = self.index;
let connection_id = self.connection_id;
tokio::spawn(async move {
builders.lock().await.remove(index);
builders
.lock()
.await
.retain(|b| b.connection_id != connection_id);
});
}
}
/// Actual websocket statemachine (one will be spawned per connection)
async fn handle_socket(ctx: Ctx, dispatcher: Dispatcher, mut socket: WebSocket, addr: SocketAddr) {
let (tx, mut rx) = channel(2);

let builder_index = {
let mut builders = dispatcher.builders.lock().await;
builders.push(ctx.client.connect(tx.clone(), addr));
builders.len() - 1
};

let _guard = BuilderGuard {
dispatcher: dispatcher.clone(),
index: builder_index,
};

if socket
.send(Message::Ping(Bytes::from_static(&[1, 2, 3])))
.await
Expand All @@ -305,6 +298,19 @@ async fn handle_socket(ctx: Ctx, dispatcher: Dispatcher, mut socket: WebSocket,
}
}

let connection_id = {
let mut builders = dispatcher.builders.lock().await;
let connected_client = ctx.client.connect(tx.clone(), addr);
let connection_id = connected_client.connection_id.clone();
builders.push(connected_client);
connection_id
};

let _guard = BuilderGuard {
dispatcher: dispatcher.clone(),
connection_id,
};

let (mut sender, mut receiver) = socket.split();

let mut send_task: JoinHandle<Result<()>> = tokio::spawn(async move {
Expand Down
1 change: 1 addition & 0 deletions crates/services/ejd/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,7 @@ mod test {
builder: CtxClient { id: builder_id },
tx,
addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(10, 0, 0, 2), 11111)),
connection_id: Uuid::new_v4(),
}
}

Expand Down