From fe899f8205db1b48fa45d6cce28d93e38ed879a4 Mon Sep 17 00:00:00 2001 From: rorychatt Date: Thu, 16 Apr 2026 16:49:39 +0200 Subject: [PATCH] [00226] Store Arc> in AppSessionStore - Change sessions map from HashMap to HashMap>> - create_session now returns Arc> shared between store and handler - Add get_session and connection_ids query methods for admin/monitoring - Add broadcast shutdown channel for graceful shutdown support - Update handle_socket to use Arc> with tokio::select! for shutdown - Update existing tests and add 4 new tests (get_session, connection_ids, broadcast_shutdown, arc_lifecycle) --- rusty/src/server/session.rs | 129 +++++++++++++++++++++++++++++++----- rusty/src/server/ws.rs | 88 ++++++++++++++---------- 2 files changed, 165 insertions(+), 52 deletions(-) diff --git a/rusty/src/server/session.rs b/rusty/src/server/session.rs index dff69b0..d272446 100644 --- a/rusty/src/server/session.rs +++ b/rusty/src/server/session.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; -use tokio::sync::RwLock; +use tokio::sync::{broadcast, RwLock}; use crate::core::reconciler::Reconciler; use crate::core::runtime::Runtime; @@ -15,34 +15,39 @@ pub struct AppSession { } /// Manages per-connection AppSessions, keyed by connection ID. -/// Tracks active connections and creates isolated sessions on demand. +/// Stores `Arc>` references so both the store and handlers +/// share ownership, enabling admin/monitoring, graceful shutdown, and timeout enforcement. pub struct AppSessionStore { - sessions: RwLock>, + sessions: RwLock>>>, root_factory: Arc Box + Send + Sync>, + shutdown_tx: broadcast::Sender<()>, } impl AppSessionStore { pub fn new(root_factory: Arc Box + Send + Sync>) -> Self { + let (shutdown_tx, _) = broadcast::channel(16); AppSessionStore { sessions: RwLock::new(HashMap::new()), root_factory, + shutdown_tx, } } /// Create a new session with an isolated Runtime and Reconciler. - /// Registers the connection and returns the session for the handler to own. - pub async fn create_session(&self, connection_id: String) -> AppSession { + /// Registers the connection and returns an Arc reference to the session. + pub async fn create_session(&self, connection_id: String) -> Arc> { let view = (self.root_factory)(); let runtime = Runtime::new(FuncView(view)); let reconciler = Reconciler::new(); + let session = Arc::new(RwLock::new(AppSession { + runtime, + reconciler, + })); let mut sessions = self.sessions.write().await; - sessions.insert(connection_id, ()); + sessions.insert(connection_id, session.clone()); - AppSession { - runtime, - reconciler, - } + session } /// Remove a session on disconnect. @@ -56,6 +61,28 @@ impl AppSessionStore { let sessions = self.sessions.read().await; sessions.len() } + + /// Get a session by connection ID (for admin/monitoring). + pub async fn get_session(&self, connection_id: &str) -> Option>> { + let sessions = self.sessions.read().await; + sessions.get(connection_id).cloned() + } + + /// Get all active connection IDs (for monitoring/debug). + pub async fn connection_ids(&self) -> Vec { + let sessions = self.sessions.read().await; + sessions.keys().cloned().collect() + } + + /// Subscribe to the shutdown broadcast channel. + pub fn subscribe_shutdown(&self) -> broadcast::Receiver<()> { + self.shutdown_tx.subscribe() + } + + /// Broadcast a shutdown signal to all subscribers. + pub fn broadcast_shutdown(&self) { + let _ = self.shutdown_tx.send(()); + } } #[cfg(test)] @@ -110,12 +137,12 @@ mod tests { })); // Create two sessions — each gets its own Runtime with a different view - let mut session_a = store.create_session("conn-a".to_string()).await; - let mut session_b = store.create_session("conn-b".to_string()).await; + let session_a = store.create_session("conn-a".to_string()).await; + let session_b = store.create_session("conn-b".to_string()).await; // Build each session's tree independently - let tree_a = session_a.runtime.build().await; - let tree_b = session_b.runtime.build().await; + let tree_a = session_a.write().await.runtime.build().await; + let tree_b = session_b.write().await.runtime.build().await; let json_a = serde_json::to_value(&tree_a).unwrap().to_string(); let json_b = serde_json::to_value(&tree_b).unwrap().to_string(); @@ -144,9 +171,9 @@ mod tests { let store = store.clone(); let handle = tokio::spawn(async move { let id = format!("conn-{}", i); - let mut session = store.create_session(id.clone()).await; + let session = store.create_session(id.clone()).await; // Verify we got a valid session by building its tree - let tree = session.runtime.build().await; + let tree = session.write().await.runtime.build().await; let json = serde_json::to_value(&tree).unwrap().to_string(); assert!(json.contains("concurrent")); store.remove_session(&id).await; @@ -160,4 +187,74 @@ mod tests { assert_eq!(store.session_count().await, 0); } + + #[tokio::test] + async fn test_get_session() { + let store = AppSessionStore::new(Arc::new(|| Box::new(TestView::new("get-test")))); + + store.create_session("conn-1".to_string()).await; + + // Should return Some for an active session + assert!(store.get_session("conn-1").await.is_some()); + + // Should return None for a non-existent session + assert!(store.get_session("conn-999").await.is_none()); + + // Should return None after removal + store.remove_session("conn-1").await; + assert!(store.get_session("conn-1").await.is_none()); + } + + #[tokio::test] + async fn test_connection_ids() { + let store = AppSessionStore::new(Arc::new(|| Box::new(TestView::new("ids-test")))); + + store.create_session("conn-a".to_string()).await; + store.create_session("conn-b".to_string()).await; + store.create_session("conn-c".to_string()).await; + + let mut ids = store.connection_ids().await; + ids.sort(); + assert_eq!(ids, vec!["conn-a", "conn-b", "conn-c"]); + + store.remove_session("conn-b").await; + let mut ids = store.connection_ids().await; + ids.sort(); + assert_eq!(ids, vec!["conn-a", "conn-c"]); + } + + #[tokio::test] + async fn test_broadcast_shutdown() { + let store = AppSessionStore::new(Arc::new(|| Box::new(TestView::new("shutdown-test")))); + + let mut rx1 = store.subscribe_shutdown(); + let mut rx2 = store.subscribe_shutdown(); + let mut rx3 = store.subscribe_shutdown(); + + store.broadcast_shutdown(); + + // All receivers should get the signal + assert!(rx1.recv().await.is_ok()); + assert!(rx2.recv().await.is_ok()); + assert!(rx3.recv().await.is_ok()); + } + + #[tokio::test] + async fn test_session_arc_lifecycle() { + let store = AppSessionStore::new(Arc::new(|| Box::new(TestView::new("lifecycle-test")))); + + let session_arc = store.create_session("conn-1".to_string()).await; + + // Handler holds a clone — simulates what handle_socket does + let handler_clone = session_arc.clone(); + + // Remove from store — store's reference is dropped + store.remove_session("conn-1").await; + assert!(store.get_session("conn-1").await.is_none()); + + // Handler's clone is still valid and usable + let tree = handler_clone.write().await.runtime.build().await; + let json = serde_json::to_value(&tree).unwrap().to_string(); + assert!(json.contains("lifecycle-test")); + } } diff --git a/rusty/src/server/ws.rs b/rusty/src/server/ws.rs index 206a2b9..41f476a 100644 --- a/rusty/src/server/ws.rs +++ b/rusty/src/server/ws.rs @@ -150,58 +150,74 @@ async fn handle_socket(socket: WebSocket, state: Arc) { // Generate a unique connection ID and create an isolated session let connection_id = Uuid::new_v4().to_string(); - let mut session = state + let session_arc = state .session_store .create_session(connection_id.clone()) .await; + let mut shutdown_rx = state.session_store.subscribe_shutdown(); // Send initial render from this session's own runtime - if let Some(tree) = session.runtime.current_tree().await { - let msg = ServerMessage::Refresh { - widgets: tree.clone(), - }; - session.reconciler.reconcile(&tree); - if let Ok(json) = serde_json::to_string(&msg) { - let _ = sender.send(Message::Text(json.into())).await; + { + let mut session = session_arc.write().await; + if let Some(tree) = session.runtime.current_tree().await { + let msg = ServerMessage::Refresh { + widgets: tree.clone(), + }; + session.reconciler.reconcile(&tree); + if let Ok(json) = serde_json::to_string(&msg) { + let _ = sender.send(Message::Text(json.into())).await; + } } } - let event_tx = session.runtime.event_sender(); + let event_tx = session_arc.read().await.runtime.event_sender(); // Process incoming messages using this session's isolated runtime - while let Some(Ok(msg)) = receiver.next().await { - if let Message::Text(text) = msg { - if let Ok(client_msg) = serde_json::from_str::(&text) { - match client_msg { - ClientMessage::Event { - widget_id, - event_name, - args, - } => { - let _ = event_tx - .send(RuntimeMessage::Event { - widget_id, - event_name, - args, - }) - .await; - - // After event, get updated tree from this session's runtime - if let Some(tree) = session.runtime.current_tree().await { - if let Some(patches) = session.reconciler.reconcile(&tree) { - if !patches.is_empty() { - let msg = ServerMessage::Update { patches }; - if let Ok(json) = serde_json::to_string(&msg) { - let _ = sender.send(Message::Text(json.into())).await; + loop { + tokio::select! { + msg = receiver.next() => { + match msg { + Some(Ok(Message::Text(text))) => { + if let Ok(client_msg) = serde_json::from_str::(&text) { + match client_msg { + ClientMessage::Event { + widget_id, + event_name, + args, + } => { + let _ = event_tx + .send(RuntimeMessage::Event { + widget_id, + event_name, + args, + }) + .await; + + // After event, get updated tree from this session's runtime + let mut session = session_arc.write().await; + if let Some(tree) = session.runtime.current_tree().await { + if let Some(patches) = session.reconciler.reconcile(&tree) { + if !patches.is_empty() { + let msg = ServerMessage::Update { patches }; + if let Ok(json) = serde_json::to_string(&msg) { + let _ = sender.send(Message::Text(json.into())).await; + } + } + } } } + ClientMessage::Navigate { .. } => { + // Navigation handling (future) + } } } } - ClientMessage::Navigate { .. } => { - // Navigation handling (future) - } + Some(Ok(_)) => {} // Ignore non-text messages + _ => break, // Connection closed or error } } + _ = shutdown_rx.recv() => { + break; + } } }