Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 113 additions & 16 deletions rusty/src/server/session.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::sync::{broadcast, RwLock};

use crate::core::reconciler::Reconciler;
use crate::core::runtime::Runtime;
Expand All @@ -15,34 +15,39 @@ pub struct AppSession {
}

/// Manages per-connection AppSessions, keyed by connection ID.
/// Tracks active connections and creates isolated sessions on demand.
/// Stores `Arc<RwLock<AppSession>>` references so both the store and handlers
/// share ownership, enabling admin/monitoring, graceful shutdown, and timeout enforcement.
pub struct AppSessionStore {
sessions: RwLock<HashMap<String, ()>>,
sessions: RwLock<HashMap<String, Arc<RwLock<AppSession>>>>,
root_factory: Arc<dyn Fn() -> Box<dyn View> + Send + Sync>,
shutdown_tx: broadcast::Sender<()>,
}

impl AppSessionStore {
pub fn new(root_factory: Arc<dyn Fn() -> Box<dyn View> + Send + Sync>) -> Self {
let (shutdown_tx, _) = broadcast::channel(16);
AppSessionStore {
sessions: RwLock::new(HashMap::new()),
root_factory,
shutdown_tx,
}
}

/// Create a new session with an isolated Runtime and Reconciler.
/// Registers the connection and returns the session for the handler to own.
pub async fn create_session(&self, connection_id: String) -> AppSession {
/// Registers the connection and returns an Arc reference to the session.
pub async fn create_session(&self, connection_id: String) -> Arc<RwLock<AppSession>> {
let view = (self.root_factory)();
let runtime = Runtime::new(FuncView(view));
let reconciler = Reconciler::new();
let session = Arc::new(RwLock::new(AppSession {
runtime,
reconciler,
}));

let mut sessions = self.sessions.write().await;
sessions.insert(connection_id, ());
sessions.insert(connection_id, session.clone());

AppSession {
runtime,
reconciler,
}
session
}

/// Remove a session on disconnect.
Expand All @@ -56,6 +61,28 @@ impl AppSessionStore {
let sessions = self.sessions.read().await;
sessions.len()
}

/// Get a session by connection ID (for admin/monitoring).
pub async fn get_session(&self, connection_id: &str) -> Option<Arc<RwLock<AppSession>>> {
let sessions = self.sessions.read().await;
sessions.get(connection_id).cloned()
}

/// Get all active connection IDs (for monitoring/debug).
pub async fn connection_ids(&self) -> Vec<String> {
let sessions = self.sessions.read().await;
sessions.keys().cloned().collect()
}

/// Subscribe to the shutdown broadcast channel.
pub fn subscribe_shutdown(&self) -> broadcast::Receiver<()> {
self.shutdown_tx.subscribe()
}

/// Broadcast a shutdown signal to all subscribers.
pub fn broadcast_shutdown(&self) {
let _ = self.shutdown_tx.send(());
}
}

#[cfg(test)]
Expand Down Expand Up @@ -110,12 +137,12 @@ mod tests {
}));

// Create two sessions — each gets its own Runtime with a different view
let mut session_a = store.create_session("conn-a".to_string()).await;
let mut session_b = store.create_session("conn-b".to_string()).await;
let session_a = store.create_session("conn-a".to_string()).await;
let session_b = store.create_session("conn-b".to_string()).await;

// Build each session's tree independently
let tree_a = session_a.runtime.build().await;
let tree_b = session_b.runtime.build().await;
let tree_a = session_a.write().await.runtime.build().await;
let tree_b = session_b.write().await.runtime.build().await;

let json_a = serde_json::to_value(&tree_a).unwrap().to_string();
let json_b = serde_json::to_value(&tree_b).unwrap().to_string();
Expand Down Expand Up @@ -144,9 +171,9 @@ mod tests {
let store = store.clone();
let handle = tokio::spawn(async move {
let id = format!("conn-{}", i);
let mut session = store.create_session(id.clone()).await;
let session = store.create_session(id.clone()).await;
// Verify we got a valid session by building its tree
let tree = session.runtime.build().await;
let tree = session.write().await.runtime.build().await;
let json = serde_json::to_value(&tree).unwrap().to_string();
assert!(json.contains("concurrent"));
store.remove_session(&id).await;
Expand All @@ -160,4 +187,74 @@ mod tests {

assert_eq!(store.session_count().await, 0);
}

#[tokio::test]
async fn test_get_session() {
let store = AppSessionStore::new(Arc::new(|| Box::new(TestView::new("get-test"))));

store.create_session("conn-1".to_string()).await;

// Should return Some for an active session
assert!(store.get_session("conn-1").await.is_some());

// Should return None for a non-existent session
assert!(store.get_session("conn-999").await.is_none());

// Should return None after removal
store.remove_session("conn-1").await;
assert!(store.get_session("conn-1").await.is_none());
}

#[tokio::test]
async fn test_connection_ids() {
let store = AppSessionStore::new(Arc::new(|| Box::new(TestView::new("ids-test"))));

store.create_session("conn-a".to_string()).await;
store.create_session("conn-b".to_string()).await;
store.create_session("conn-c".to_string()).await;

let mut ids = store.connection_ids().await;
ids.sort();
assert_eq!(ids, vec!["conn-a", "conn-b", "conn-c"]);

store.remove_session("conn-b").await;
let mut ids = store.connection_ids().await;
ids.sort();
assert_eq!(ids, vec!["conn-a", "conn-c"]);
}

#[tokio::test]
async fn test_broadcast_shutdown() {
let store = AppSessionStore::new(Arc::new(|| Box::new(TestView::new("shutdown-test"))));

let mut rx1 = store.subscribe_shutdown();
let mut rx2 = store.subscribe_shutdown();
let mut rx3 = store.subscribe_shutdown();

store.broadcast_shutdown();

// All receivers should get the signal
assert!(rx1.recv().await.is_ok());
assert!(rx2.recv().await.is_ok());
assert!(rx3.recv().await.is_ok());
}

#[tokio::test]
async fn test_session_arc_lifecycle() {
let store = AppSessionStore::new(Arc::new(|| Box::new(TestView::new("lifecycle-test"))));

let session_arc = store.create_session("conn-1".to_string()).await;

// Handler holds a clone — simulates what handle_socket does
let handler_clone = session_arc.clone();

// Remove from store — store's reference is dropped
store.remove_session("conn-1").await;
assert!(store.get_session("conn-1").await.is_none());

// Handler's clone is still valid and usable
let tree = handler_clone.write().await.runtime.build().await;
let json = serde_json::to_value(&tree).unwrap().to_string();
assert!(json.contains("lifecycle-test"));
}
}
88 changes: 52 additions & 36 deletions rusty/src/server/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,58 +150,74 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>) {

// Generate a unique connection ID and create an isolated session
let connection_id = Uuid::new_v4().to_string();
let mut session = state
let session_arc = state
.session_store
.create_session(connection_id.clone())
.await;
let mut shutdown_rx = state.session_store.subscribe_shutdown();

// Send initial render from this session's own runtime
if let Some(tree) = session.runtime.current_tree().await {
let msg = ServerMessage::Refresh {
widgets: tree.clone(),
};
session.reconciler.reconcile(&tree);
if let Ok(json) = serde_json::to_string(&msg) {
let _ = sender.send(Message::Text(json.into())).await;
{
let mut session = session_arc.write().await;
if let Some(tree) = session.runtime.current_tree().await {
let msg = ServerMessage::Refresh {
widgets: tree.clone(),
};
session.reconciler.reconcile(&tree);
if let Ok(json) = serde_json::to_string(&msg) {
let _ = sender.send(Message::Text(json.into())).await;
}
}
}
let event_tx = session.runtime.event_sender();
let event_tx = session_arc.read().await.runtime.event_sender();

// Process incoming messages using this session's isolated runtime
while let Some(Ok(msg)) = receiver.next().await {
if let Message::Text(text) = msg {
if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(&text) {
match client_msg {
ClientMessage::Event {
widget_id,
event_name,
args,
} => {
let _ = event_tx
.send(RuntimeMessage::Event {
widget_id,
event_name,
args,
})
.await;

// After event, get updated tree from this session's runtime
if let Some(tree) = session.runtime.current_tree().await {
if let Some(patches) = session.reconciler.reconcile(&tree) {
if !patches.is_empty() {
let msg = ServerMessage::Update { patches };
if let Ok(json) = serde_json::to_string(&msg) {
let _ = sender.send(Message::Text(json.into())).await;
loop {
tokio::select! {
msg = receiver.next() => {
match msg {
Some(Ok(Message::Text(text))) => {
if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(&text) {
match client_msg {
ClientMessage::Event {
widget_id,
event_name,
args,
} => {
let _ = event_tx
.send(RuntimeMessage::Event {
widget_id,
event_name,
args,
})
.await;

// After event, get updated tree from this session's runtime
let mut session = session_arc.write().await;
if let Some(tree) = session.runtime.current_tree().await {
if let Some(patches) = session.reconciler.reconcile(&tree) {
if !patches.is_empty() {
let msg = ServerMessage::Update { patches };
if let Ok(json) = serde_json::to_string(&msg) {
let _ = sender.send(Message::Text(json.into())).await;
}
}
}
}
}
ClientMessage::Navigate { .. } => {
// Navigation handling (future)
}
}
}
}
ClientMessage::Navigate { .. } => {
// Navigation handling (future)
}
Some(Ok(_)) => {} // Ignore non-text messages
_ => break, // Connection closed or error
}
}
_ = shutdown_rx.recv() => {
break;
}
}
}

Expand Down
Loading