From 9874c76dc3f067414097aba4fd0bc035479a1c3d Mon Sep 17 00:00:00 2001 From: jamals86 Date: Wed, 6 May 2026 14:03:03 +0300 Subject: [PATCH] Add auto-fetch batch control for live subscriptions Introduce client-side control for auto-fetching initial-data batches across the stack. Adds an auto_fetch_batches option to subscription types (Rust, TS, Dart), parses it in the CLI, and wires it through the WASM client so the client can automatically request subsequent batches or let the UI/SDK request them manually. Exposes requestNextBatch on the WASM & TypeScript clients and a SubscriptionHandle with requestNextBatch/unsubscribe, plus tracing hooks to emit outbound WS messages for debugging. UI changes add live query indicators, a toggle for auto fetch, a "Fetch next batch" button, and live batch metadata handling. Also includes related tests, a small backend test about auditing for delegated DML, and minor example/index updates. --- .../kalamdb-core/src/sql/impersonation.rs | 9 -- .../misc/auth/test_as_user_impersonation.rs | 35 +++++ cli/src/session.rs | 103 ++++++++++--- examples/chat-with-ai/chat-app.sql | 6 + link/kalam-link-dart/src/models.rs | 1 + .../src/connection/shared/routing.rs | 7 +- .../models/subscription_options.rs | 11 ++ link/link-common/src/wasm/client.rs | 141 ++++++++++++++++-- link/link-common/src/wasm/reconnect.rs | 11 ++ link/sdks/typescript/client/src/client.ts | 36 ++++- .../src/helpers/subscription_helpers.ts | 9 +- link/sdks/typescript/client/src/index.ts | 1 + link/sdks/typescript/client/src/types.ts | 16 ++ .../client/tests/normalize.test.mjs | 13 ++ ui/index.html | 1 + .../input-form/QueryTabStrip.tsx | 28 +++- .../input-form/StudioEditorPanel.tsx | 15 ++ .../preview/StudioResultsGrid.tsx | 110 +++++++++----- .../components/sql-studio-v2/shared/types.ts | 1 + .../sql-studio-v2/shared/workspaceState.ts | 4 + .../state/sqlStudioWorkspaceSlice.ts | 17 ++- ui/src/lib/kalam-client.ts | 28 +++- ui/src/pages/SqlStudio.test.tsx | 51 ++++++- ui/src/pages/SqlStudio.tsx | 109 +++++++++++++- .../sqlStudioWorkspaceSyncService.test.ts | 16 +- .../services/sqlStudioWorkspaceSyncService.ts | 41 +++-- ui/vite.config.ts | 18 +-- 27 files changed, 697 insertions(+), 141 deletions(-) diff --git a/backend/crates/kalamdb-core/src/sql/impersonation.rs b/backend/crates/kalamdb-core/src/sql/impersonation.rs index 5b4a02988..9ea5b0052 100644 --- a/backend/crates/kalamdb-core/src/sql/impersonation.rs +++ b/backend/crates/kalamdb-core/src/sql/impersonation.rs @@ -87,15 +87,6 @@ impl SqlImpersonationService { .role_for_impersonation_target(&target_user_id); if can_impersonate_target_user(actor_user_id, actor_role, &target_user_id, target_role) { - self.audit_impersonation_event( - actor_user_id, - actor_role, - target_user_id.as_str(), - Some(&target_user_id), - true, - None, - ) - .await; return Ok(target_user_id); } diff --git a/backend/tests/misc/auth/test_as_user_impersonation.rs b/backend/tests/misc/auth/test_as_user_impersonation.rs index 90b929de1..496ca7e3b 100644 --- a/backend/tests/misc/auth/test_as_user_impersonation.rs +++ b/backend/tests/misc/auth/test_as_user_impersonation.rs @@ -363,6 +363,41 @@ async fn test_allowed_execute_as_can_select_update_and_delete_target_rows() { ); } +#[actix_web::test] +#[ntest::timeout(45000)] +async fn test_allowed_execute_as_dml_is_not_audited() { + let server = TestServer::new_shared().await; + let ns = unique_name("as_user_dml_no_audit"); + create_user_table(&server, &ns, "profiles").await; + + let actor = insert_user(&server, &unique_name("no_audit_actor"), Role::Dba).await; + let target = insert_user(&server, &unique_name("no_audit_target"), Role::User).await; + + let insert_sql = format!( + "EXECUTE AS USER '{}' (INSERT INTO {}.profiles (id, value) VALUES ('p1', 'delegated'))", + target.as_str(), + ns + ); + let insert_resp = server.execute_sql_as_user(&insert_sql, actor.as_str()).await; + assert_success(&insert_resp, "allowed cross-user insert without audit"); + + let logs = server + .app_context + .system_tables() + .audit_logs() + .scan_all() + .expect("Failed to read audit log"); + assert!( + !logs.iter().any(|entry| { + entry.action == "EXECUTE_AS_USER" + && entry.target == format!("user:{}", target.as_str()) + && entry.actor_user_id == actor + && entry.subject_user_id.as_ref() == Some(&target) + }), + "successful execute as DML should not create an audit entry" + ); +} + #[actix_web::test] #[ntest::timeout(45000)] async fn test_execute_as_stream_table_uses_target_user_scope() { diff --git a/cli/src/session.rs b/cli/src/session.rs index d2edf6df3..f1e796e5f 100644 --- a/cli/src/session.rs +++ b/cli/src/session.rs @@ -24,8 +24,8 @@ use indicatif::{ProgressBar, ProgressStyle}; use kalam_client::{ credentials::{CredentialStore, Credentials}, AuthProvider, AuthRefreshCallback, ClusterHealthResponse, ClusterNodeHealth, ConnectionOptions, - KalamLinkClient, KalamLinkError, KalamLinkTimeouts, SubscriptionConfig, SubscriptionOptions, - TimestampFormatter, UploadProgress, UploadProgressCallback, + KalamLinkClient, KalamLinkError, KalamLinkTimeouts, SeqId, SubscriptionConfig, + SubscriptionOptions, TimestampFormatter, UploadProgress, UploadProgressCallback, }; use rustyline::{ completion::Completer, error::ReadlineError, highlight::Highlighter, hint::Hinter, @@ -1703,8 +1703,8 @@ impl CLISession { /// Extract OPTIONS clause from SUBSCRIBE SQL query. /// - /// Parses `OPTIONS (last_rows=N)` from the SQL and returns cleaned SQL + options. - /// If no OPTIONS found, returns original SQL with default options (last_rows=100). + /// Parses CLI-managed options from `OPTIONS (...)` and returns cleaned SQL + options. + /// If no OPTIONS found, returns original SQL with default subscription options. /// /// # Examples /// ``` @@ -1728,48 +1728,63 @@ impl CLISession { let clean_sql = sql[..idx].trim().to_string(); let options_str = sql[idx + " OPTIONS".len()..].trim(); // " OPTIONS".len() == 8 - // Parse OPTIONS (last_rows=N) + // Parse CLI-managed options from OPTIONS (...) let options = Self::parse_subscribe_options(options_str); (clean_sql, options) } - /// Parse OPTIONS clause value: (last_rows=N) + /// Parse OPTIONS clause value: (last_rows=N, batch_size=N, from=N) fn parse_subscribe_options(options_str: &str) -> Option { let options_str = options_str.trim(); - // Expected format: (last_rows=N) or ( last_rows = N ) + // Expected format: (last_rows=N, batch_size=N, from=N) if !options_str.starts_with('(') || !options_str.ends_with(')') { eprintln!("Warning: Invalid OPTIONS format, using defaults"); return Some(SubscriptionOptions::default()); } let inner = options_str[1..options_str.len() - 1].trim(); + let mut options = SubscriptionOptions::new(); - // Parse last_rows=N - if let Some(equals_idx) = inner.find('=') { - let key = inner[..equals_idx].trim(); - let value = inner[equals_idx + 1..].trim(); + for pair in inner.split(',') { + let pair = pair.trim(); + if pair.is_empty() { + continue; + } - if key.to_lowercase() == "last_rows" { + let Some((key, value)) = pair.split_once('=') else { + eprintln!("Warning: Invalid option '{}', expected key=value", pair); + continue; + }; + + let key = key.trim().to_ascii_lowercase(); + let value = value.trim().trim_matches(['\'', '"']); + + if key == "last_rows" { if let Ok(last_rows) = value.parse::() { - return Some(SubscriptionOptions::new().with_last_rows(last_rows)); + options = options.with_last_rows(last_rows); } else { eprintln!("Warning: Invalid last_rows value '{}', using default", value); } - } else if key.to_lowercase() == "batch_size" { + } else if key == "batch_size" { if let Ok(batch_size) = value.parse::() { - return Some(SubscriptionOptions::new().with_batch_size(batch_size)); + options = options.with_batch_size(batch_size); } else { eprintln!("Warning: Invalid batch_size value '{}', using default", value); } + } else if key == "from" || key == "from_seq_id" { + if let Ok(seq_id) = value.parse::() { + options = options.with_from(SeqId::from(seq_id)); + } else { + eprintln!("Warning: Invalid from value '{}', using default", value); + } } else { eprintln!("Warning: Unknown option '{}', ignoring", key); } } - // Default if parsing failed - Some(SubscriptionOptions::default()) + Some(options) } /// Run a WebSocket subscription @@ -2286,9 +2301,9 @@ impl CLISession { ); } - // Display rows in the same format as snapshots + // Keep startup rows compact so each payload stays on a single CLI line. for row in rows { - let formatted = serde_json::to_string_pretty(&row).unwrap_or_default(); + let formatted = Self::format_row(row); if self.color { println!(" \x1b[90m{}\x1b[0m", formatted); } else { @@ -3516,7 +3531,55 @@ mod tests { let (sql, options) = CLISession::extract_subscribe_options("SELECT * FROM table OPTIONS (last_rows=50);"); assert_eq!(sql, "SELECT * FROM table"); - assert!(options.is_some()); + let options = options.expect("options should parse"); + assert_eq!(options.last_rows, Some(50)); + } + + #[test] + fn test_extract_subscribe_options_with_combined_options() { + let (sql, options) = CLISession::extract_subscribe_options( + "SELECT * FROM table OPTIONS (last_rows=20, batch_size=5, from=42);", + ); + + assert_eq!(sql, "SELECT * FROM table"); + let options = options.expect("options should parse"); + assert_eq!(options.last_rows, Some(20)); + assert_eq!(options.batch_size, Some(5)); + assert_eq!(options.from, Some(SeqId::from(42))); + } + + #[test] + fn test_extract_subscribe_options_accepts_from_seq_id_alias() { + let (sql, options) = CLISession::extract_subscribe_options( + "SELECT * FROM table OPTIONS (batch_size=10, from_seq_id=99);", + ); + + assert_eq!(sql, "SELECT * FROM table"); + let options = options.expect("options should parse"); + assert_eq!(options.batch_size, Some(10)); + assert_eq!(options.from, Some(SeqId::from(99))); + } + + #[test] + fn test_format_row_returns_compact_json_without_newlines() { + let mut row = kalam_client::RowData::new(); + row.insert( + "message".to_string(), + kalam_client::KalamCellValue::text("hello"), + ); + row.insert( + "count".to_string(), + kalam_client::KalamCellValue::int(5), + ); + + let formatted = CLISession::format_row(&row); + + assert!(!formatted.contains('\n')); + assert!(!formatted.contains(" ")); + assert!(formatted.starts_with('{')); + assert!(formatted.ends_with('}')); + assert!(formatted.contains("\"message\":\"hello\"")); + assert!(formatted.contains("\"count\":5")); } #[test] diff --git a/examples/chat-with-ai/chat-app.sql b/examples/chat-with-ai/chat-app.sql index ad6a41ade..17074f407 100644 --- a/examples/chat-with-ai/chat-app.sql +++ b/examples/chat-with-ai/chat-app.sql @@ -23,3 +23,9 @@ CREATE TABLE IF NOT EXISTS chat_demo.agent_events ( message TEXT NOT NULL DEFAULT '', created_at TIMESTAMP NOT NULL DEFAULT NOW() ) WITH (TYPE = 'STREAM', TTL_SECONDS = 10); + +-- Insert some initial messages into the chat +INSERT INTO chat_demo.messages (role, author, sender_username, content) +VALUES + ('user', 'user_1', 'john_doe', 'Hello everyone!'), + ('assistant', 'ai_bot', 'assistant', 'Hi John 👋 How can I help?'); diff --git a/link/kalam-link-dart/src/models.rs b/link/kalam-link-dart/src/models.rs index 47729d270..ee51c5161 100644 --- a/link/kalam-link-dart/src/models.rs +++ b/link/kalam-link-dart/src/models.rs @@ -481,6 +481,7 @@ impl DartSubscriptionConfig { batch_size: self.batch_size.map(|v| v as usize), last_rows: self.last_rows.map(|v| v as u32), from: self.from.map(kalam_client::SeqId::new), + auto_fetch_batches: None, }), ws_url: None, } diff --git a/link/link-common/src/connection/shared/routing.rs b/link/link-common/src/connection/shared/routing.rs index 4e3d65087..37dced286 100644 --- a/link/link-common/src/connection/shared/routing.rs +++ b/link/link-common/src/connection/shared/routing.rs @@ -65,7 +65,12 @@ pub(super) async fn route_event( }; let event_time_ms = now_ms(); - let auto_request_next_batch = matches!(event, ChangeEvent::InitialDataBatch { .. }); + let auto_request_next_batch = matches!(event, ChangeEvent::InitialDataBatch { .. }) + && matched_key + .as_ref() + .and_then(|key| subs.get(key.as_str(&incoming_sub_id))) + .and_then(|entry| entry.options.auto_fetch_batches) + .unwrap_or(true); let mut next_batch_last_seq = None; let mut should_request_next_batch = false; diff --git a/link/link-common/src/subscription/models/subscription_options.rs b/link/link-common/src/subscription/models/subscription_options.rs index 9a1417424..86721b497 100644 --- a/link/link-common/src/subscription/models/subscription_options.rs +++ b/link/link-common/src/subscription/models/subscription_options.rs @@ -39,6 +39,11 @@ pub struct SubscriptionOptions { /// Typically set automatically during reconnection to resume from last received event. #[serde(skip_serializing_if = "Option::is_none", alias = "from_seq_id")] pub from: Option, + + /// Client-side control for automatically requesting subsequent initial data batches. + /// Skipped on the wire because the backend only needs batch_size/last_rows/from. + #[serde(default, skip_serializing, alias = "autoFetchBatches")] + pub auto_fetch_batches: Option, } impl SubscriptionOptions { @@ -72,6 +77,12 @@ impl SubscriptionOptions { self.with_from(seq_id) } + /// Control whether the client should automatically request subsequent initial data batches. + pub fn with_auto_fetch_batches(mut self, enabled: bool) -> Self { + self.auto_fetch_batches = Some(enabled); + self + } + /// Check if this has a resume seq_id set. pub fn has_resume_seq_id(&self) -> bool { self.from.is_some() diff --git a/link/link-common/src/wasm/client.rs b/link/link-common/src/wasm/client.rs index 9921fa970..8e54e7d11 100644 --- a/link/link-common/src/wasm/client.rs +++ b/link/link-common/src/wasm/client.rs @@ -25,9 +25,12 @@ use super::{ }, wasm_debug_log, }; -use crate::models::{ - ClientMessage, ConnectionOptions, SerializationType, ServerMessage, SubscriptionOptions, - SubscriptionRequest, +use crate::{ + models::{ + ChangeEvent, ClientMessage, ConnectionOptions, SerializationType, ServerMessage, + SubscriptionOptions, SubscriptionRequest, + }, + seq_id::SeqId, }; #[derive(Serialize)] @@ -165,7 +168,12 @@ impl KalamClient { "KalamClient: Sending subscribe request - id: {}, sql: {}", subscription_id, sql )); - if let Err(error) = send_ws_message(ws, &subscribe_msg, self.negotiated_ser.get()) { + if let Err(error) = send_ws_message_traced( + ws, + &subscribe_msg, + self.negotiated_ser.get(), + &self.on_send_cb, + ) { self.subscription_state.borrow_mut().remove(&subscription_id); return Err(error); } @@ -202,6 +210,44 @@ impl KalamClient { } } +fn emit_ws_send(on_send_cb: &Rc>>, msg: &ClientMessage) { + let Some(cb) = on_send_cb.borrow().as_ref().cloned() else { + return; + }; + if let Ok(json) = serde_json::to_string(msg) { + let _ = cb.call1(&JsValue::NULL, &JsValue::from_str(&json)); + } +} + +fn send_ws_message_traced( + ws: &WebSocket, + msg: &ClientMessage, + serialization: SerializationType, + on_send_cb: &Rc>>, +) -> Result<(), JsValue> { + send_ws_message(ws, msg, serialization)?; + emit_ws_send(on_send_cb, msg); + Ok(()) +} + +fn next_batch_message(subscription_id: &str, last_seq_id: Option) -> ClientMessage { + ClientMessage::NextBatch { + subscription_id: subscription_id.to_string(), + last_seq_id, + } +} + +fn send_next_batch_traced( + ws: &WebSocket, + subscription_id: &str, + last_seq_id: Option, + serialization: SerializationType, + on_send_cb: &Rc>>, +) -> Result<(), JsValue> { + let msg = next_batch_message(subscription_id, last_seq_id); + send_ws_message_traced(ws, &msg, serialization, on_send_cb) +} + fn reject_pending_subscriptions( subscriptions: &Rc>>, message: &str, @@ -242,10 +288,11 @@ struct SubscriptionDispatch { payload: Option, resolve_subscribe: Option, reject_subscribe: Option<(js_sys::Function, String)>, + next_batch: Option<(String, Option)>, } impl SubscriptionDispatch { - fn invoke(self) { + fn invoke(self) -> Option<(String, Option)> { if let Some(cb) = self.callback { if let Some(payload) = self.payload { let _ = cb.call1(&JsValue::NULL, &JsValue::from_str(&payload)); @@ -257,6 +304,7 @@ impl SubscriptionDispatch { if let Some((reject, reason)) = self.reject_subscribe { let _ = reject.call1(&JsValue::NULL, &JsValue::from_str(&reason)); } + self.next_batch } } @@ -357,6 +405,7 @@ fn dispatch_subscription_server_message( let mut payload = None; let mut resolve_subscribe = None; let mut reject_subscribe = None; + let mut next_batch = None; let mut remove_state = false; { @@ -366,6 +415,11 @@ fn dispatch_subscription_server_message( if let Some(filtered_event) = filter_subscription_event(&state.options, event) { track_subscription_checkpoint(&mut state.last_seq_id, &filtered_event); payload = callback_payload(&mut state.callback_mode, &filtered_event); + if let ChangeEvent::InitialDataBatch { batch_control, .. } = filtered_event { + if batch_control.has_more && state.options.auto_fetch_batches.unwrap_or(false) { + next_batch = Some((client_id.to_string(), state.last_seq_id)); + } + } } match event { @@ -403,6 +457,7 @@ fn dispatch_subscription_server_message( payload, resolve_subscribe, reject_subscribe, + next_batch, }) } @@ -461,6 +516,7 @@ fn schedule_auto_reconnect( on_disconnect_cb: Rc>>, on_error_cb: Rc>>, on_receive_cb: Rc>>, + on_send_cb: Rc>>, negotiated_ser: Rc>, ) { let (delay, disable_compression) = { @@ -524,6 +580,7 @@ fn schedule_auto_reconnect( let reconnect_on_disconnect = Rc::clone(&on_disconnect_cb); let reconnect_on_error = Rc::clone(&on_error_cb); let reconnect_on_receive = Rc::clone(&on_receive_cb); + let reconnect_on_send = Rc::clone(&on_send_cb); let reconnect_negotiated_ser = Rc::clone(&negotiated_ser); let next_url = url.clone(); let next_auth = auth.clone(); @@ -538,6 +595,7 @@ fn schedule_auto_reconnect( let next_on_disconnect = Rc::clone(&on_disconnect_cb); let next_on_error = Rc::clone(&on_error_cb); let next_on_receive = Rc::clone(&on_receive_cb); + let next_on_send = Rc::clone(&on_send_cb); let next_negotiated_ser = Rc::clone(&negotiated_ser); wasm_bindgen_futures::spawn_local(async move { @@ -563,6 +621,7 @@ fn schedule_auto_reconnect( &ws, Rc::clone(&reconnect_subscription_state), Rc::clone(&reconnect_on_receive), + Rc::clone(&reconnect_on_send), Rc::clone(&reconnect_negotiated_ser), ); if let Some(cb) = reconnect_on_connect.borrow().as_ref() { @@ -585,12 +644,14 @@ fn schedule_auto_reconnect( Rc::clone(&next_on_disconnect), Rc::clone(&next_on_error), Rc::clone(&next_on_receive), + Rc::clone(&next_on_send), Rc::clone(&next_negotiated_ser), ); resubscribe_all( Rc::clone(&reconnect_ws_ref), Rc::clone(&reconnect_subscription_state), reconnect_negotiated_ser.get(), + Some(Rc::clone(&reconnect_on_send)), ) .await; reconnect::restart_ping_timer( @@ -618,6 +679,7 @@ fn schedule_auto_reconnect( next_on_disconnect, next_on_error, next_on_receive, + next_on_send, next_negotiated_ser, ); }, @@ -695,8 +757,10 @@ fn install_runtime_message_handler( ws: &WebSocket, subscriptions: Rc>>, on_receive_cb: Rc>>, + on_send_cb: Rc>>, negotiated_ser: Rc>, ) { + let ws_for_next_batch = ws.clone(); let onmessage_callback = Closure::wrap(Box::new(move |e: MessageEvent| { let event: Option = (|| { let data = e.data(); @@ -720,7 +784,15 @@ fn install_runtime_message_handler( if let Some(event) = event { if let Some(dispatch) = dispatch_subscription_server_message(&subscriptions, &event) { - dispatch.invoke(); + if let Some((subscription_id, last_seq_id)) = dispatch.invoke() { + let _ = send_next_batch_traced( + &ws_for_next_batch, + &subscription_id, + last_seq_id, + negotiated_ser.get(), + &on_send_cb, + ); + } } } }) as Box); @@ -1242,9 +1314,11 @@ impl KalamClient { let auth_handled = Rc::new(RefCell::new(!requires_auth)); // Already handled if anonymous let auth_handled_clone = Rc::clone(&auth_handled); let on_receive_for_msg = Rc::clone(&self.on_receive_cb); + let on_send_for_msg = Rc::clone(&self.on_send_cb); let on_connect_for_msg = Rc::clone(&self.on_connect_cb); let on_error_for_msg = Rc::clone(&self.on_error_cb); let negotiated_ser_for_msg = Rc::clone(&self.negotiated_ser); + let ws_for_next_batch = ws.clone(); let onmessage_callback = Closure::wrap(Box::new(move |e: MessageEvent| { // Try to parse the message as a ServerMessage. @@ -1331,7 +1405,15 @@ impl KalamClient { } if let Some(dispatch) = dispatch_subscription_server_message(&subscriptions, &event) { - dispatch.invoke(); + if let Some((subscription_id, last_seq_id)) = dispatch.invoke() { + let _ = send_next_batch_traced( + &ws_for_next_batch, + &subscription_id, + last_seq_id, + negotiated_ser_for_msg.get(), + &on_send_for_msg, + ); + } } }) as Box); ws.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref())); @@ -1357,6 +1439,7 @@ impl KalamClient { Rc::clone(&self.ws), Rc::clone(&self.subscription_state), self.negotiated_ser.get(), + Some(Rc::clone(&self.on_send_cb)), ) .await; } @@ -1421,12 +1504,44 @@ impl KalamClient { pub fn send_ping(&self) -> Result<(), JsValue> { if let Some(ws) = self.ws.borrow().as_ref() { if ws.ready_state() == WebSocket::OPEN { - send_ws_message(ws, &ClientMessage::Ping, self.negotiated_ser.get())?; + send_ws_message_traced( + ws, + &ClientMessage::Ping, + self.negotiated_ser.get(), + &self.on_send_cb, + )?; } } Ok(()) } + /// Request the next initial-data batch for a subscription. + #[wasm_bindgen(js_name = requestNextBatch)] + pub fn request_next_batch(&self, subscription_id: String) -> Result<(), JsValue> { + let last_seq_id = self + .subscription_state + .borrow() + .get(&subscription_id) + .map(|state| state.last_seq_id) + .ok_or_else(|| JsValue::from_str("Unknown subscription id"))?; + + let Some(ws) = self.ws.borrow().as_ref().cloned() else { + return Err(JsValue::from_str("WebSocket connection is unavailable")); + }; + + if ws.ready_state() != WebSocket::OPEN { + return Err(JsValue::from_str("WebSocket connection is not open")); + } + + send_next_batch_traced( + &ws, + &subscription_id, + last_seq_id, + self.negotiated_ser.get(), + &self.on_send_cb, + ) + } + /// Start the internal keepalive ping interval (idempotent). /// /// Called automatically by `connect()` and after a successful reconnect. @@ -1714,7 +1829,12 @@ impl KalamClient { let unsubscribe_msg = ClientMessage::Unsubscribe { subscription_id: subscription_id.clone(), }; - send_ws_message(ws, &unsubscribe_msg, self.negotiated_ser.get())?; + send_ws_message_traced( + ws, + &unsubscribe_msg, + self.negotiated_ser.get(), + &self.on_send_cb, + )?; } wasm_debug_log!(&format!("KalamClient: Unsubscribed from: {}", subscription_id)); @@ -2045,6 +2165,7 @@ impl KalamClient { Rc::clone(&self.on_disconnect_cb), Rc::clone(&self.on_error_cb), Rc::clone(&self.on_receive_cb), + Rc::clone(&self.on_send_cb), Rc::clone(&self.negotiated_ser), ); } @@ -2066,6 +2187,7 @@ fn install_auto_reconnect_listener( on_disconnect_cb: Rc>>, on_error_cb: Rc>>, on_receive_cb: Rc>>, + on_send_cb: Rc>>, negotiated_ser: Rc>, ) { let source_ws = ws.clone(); @@ -2091,6 +2213,7 @@ fn install_auto_reconnect_listener( Rc::clone(&on_disconnect_cb), Rc::clone(&on_error_cb), Rc::clone(&on_receive_cb), + Rc::clone(&on_send_cb), Rc::clone(&negotiated_ser), ); }) as Box); diff --git a/link/link-common/src/wasm/reconnect.rs b/link/link-common/src/wasm/reconnect.rs index c5ff6721f..0635cbb5e 100644 --- a/link/link-common/src/wasm/reconnect.rs +++ b/link/link-common/src/wasm/reconnect.rs @@ -199,6 +199,7 @@ pub(crate) async fn resubscribe_all( ws_ref: Rc>>, subscription_state: Rc>>, negotiated_ser: SerializationType, + on_send_cb: Option>>>, ) { let states: Vec<(String, SubscriptionState)> = { let mut subs = subscription_state.borrow_mut(); @@ -239,6 +240,16 @@ pub(crate) async fn resubscribe_all( "KalamClient: Failed to re-subscribe to {}: {:?}", subscription_id, _e )); + } else if let Some(on_send) = on_send_cb.as_ref() { + if let (Some(cb), Ok(json)) = ( + on_send.borrow().as_ref().cloned(), + serde_json::to_string(&subscribe_msg), + ) { + let _ = cb.call1( + &wasm_bindgen::JsValue::NULL, + &wasm_bindgen::JsValue::from_str(&json), + ); + } } } } diff --git a/link/sdks/typescript/client/src/client.ts b/link/sdks/typescript/client/src/client.ts index bf077550c..eb0317d8e 100644 --- a/link/sdks/typescript/client/src/client.ts +++ b/link/sdks/typescript/client/src/client.ts @@ -39,6 +39,7 @@ import type { QueryResponse, SubscriptionCallback, SubscriptionErrorEvent, + SubscriptionHandle, SubscriptionOptions, Unsubscribe, UploadProgress, @@ -1184,6 +1185,18 @@ export class KalamDBClient { callback: SubscriptionCallback, options?: SubscriptionOptions, ): Promise { + const handle = await this.subscribeWithSqlHandle(sql, callback, options); + return handle.unsubscribe; + } + + /** + * Subscribe to a SQL query and receive a handle for explicit batch control. + */ + async subscribeWithSqlHandle( + sql: string, + callback: SubscriptionCallback, + options?: SubscriptionOptions, + ): Promise { this.log(LogLevel.Debug, 'subscription', `Subscribing to: ${sql.substring(0, 120)}`); const optionsJson = this.stringifyOptions(normalizeSubscriptionOptions(options)); const subscriptionId = await this.startTrackedSubscription( @@ -1195,11 +1208,30 @@ export class KalamDBClient { ), ); - return async () => { - await this.unsubscribe(subscriptionId); + return { + id: subscriptionId, + unsubscribe: async () => { + await this.unsubscribe(subscriptionId); + }, + requestNextBatch: async () => { + await this.requestNextBatch(subscriptionId); + }, }; } + /** Request the next initial-data batch for an active subscription. */ + async requestNextBatch(subscriptionId: string): Promise { + await this.ensureConnected(); + const wasmClient = this.wasmClient; + if (!wasmClient) { + throw new Error('WASM client not initialized'); + } + + await this.serializeWasmCall(() => Promise.resolve( + (wasmClient as unknown as { requestNextBatch: (id: string) => void }).requestNextBatch(subscriptionId), + )); + } + /** * Subscribe to a SQL query and receive the SDK-maintained current row set. * diff --git a/link/sdks/typescript/client/src/helpers/subscription_helpers.ts b/link/sdks/typescript/client/src/helpers/subscription_helpers.ts index 709bd6a84..ea9363a14 100644 --- a/link/sdks/typescript/client/src/helpers/subscription_helpers.ts +++ b/link/sdks/typescript/client/src/helpers/subscription_helpers.ts @@ -10,12 +10,12 @@ import type { export function normalizeSubscriptionOptions( options?: SubscriptionOptions, -): { batch_size?: number; last_rows?: number; from?: string } | undefined { +): { batch_size?: number; last_rows?: number; from?: string; auto_fetch_batches?: boolean } | undefined { if (!options) { return undefined; } - const normalized: { batch_size?: number; last_rows?: number; from?: string } = {}; + const normalized: { batch_size?: number; last_rows?: number; from?: string; auto_fetch_batches?: boolean } = {}; if (options.batch_size !== undefined) { normalized.batch_size = options.batch_size; @@ -29,6 +29,11 @@ export function normalizeSubscriptionOptions( normalized.from = options.from instanceof SeqId ? options.from.toString() : SeqId.from(options.from).toString(); } + const autoFetchBatches = options.auto_fetch_batches ?? options.autoFetchBatches; + if (autoFetchBatches !== undefined) { + normalized.auto_fetch_batches = autoFetchBatches; + } + return Object.keys(normalized).length > 0 ? normalized : undefined; } diff --git a/link/sdks/typescript/client/src/index.ts b/link/sdks/typescript/client/src/index.ts index 1589842ec..1b8921dca 100644 --- a/link/sdks/typescript/client/src/index.ts +++ b/link/sdks/typescript/client/src/index.ts @@ -87,6 +87,7 @@ export type { ServerMessage, SubscriptionCallback, SubscriptionErrorEvent, + SubscriptionHandle, SubscriptionInfo, SubscriptionOptions, TimestampFormat, diff --git a/link/sdks/typescript/client/src/types.ts b/link/sdks/typescript/client/src/types.ts index 0aef70744..a3bb922df 100644 --- a/link/sdks/typescript/client/src/types.ts +++ b/link/sdks/typescript/client/src/types.ts @@ -203,6 +203,10 @@ export interface SubscriptionOptions { last_rows?: number; /** Resume from a specific sequence ID. */ from?: WireSeqId; + /** Request every initial-data batch automatically when the server has more rows. */ + auto_fetch_batches?: boolean; + /** Camel-case alias accepted by the WASM layer. Prefer auto_fetch_batches. */ + autoFetchBatches?: boolean; } /* ================================================================== */ @@ -371,6 +375,18 @@ export interface LiveRowsOptions { */ export type Unsubscribe = () => Promise; +/** + * Handle for a live subscription with explicit batch control. + */ +export interface SubscriptionHandle { + /** Server/client subscription id used in WebSocket messages. */ + id: string; + /** Unsubscribe from this subscription. */ + unsubscribe: Unsubscribe; + /** Fetch the next initial-data batch for this subscription. */ + requestNextBatch: () => Promise; +} + /* ================================================================== */ /* Connection Lifecycle Event Handlers */ /* ================================================================== */ diff --git a/link/sdks/typescript/client/tests/normalize.test.mjs b/link/sdks/typescript/client/tests/normalize.test.mjs index 62a4485b3..f43e62402 100644 --- a/link/sdks/typescript/client/tests/normalize.test.mjs +++ b/link/sdks/typescript/client/tests/normalize.test.mjs @@ -1,5 +1,6 @@ import assert from 'node:assert/strict'; import { normalizeQueryResponse, sortColumns, SYSTEM_TABLES_ORDER } from '../dist/src/helpers/query_helpers.js'; +import { normalizeSubscriptionOptions } from '../dist/src/helpers/subscription_helpers.js'; // Helper: build a minimal QueryResponse using the current schema format function makeResp(columnNames, rows) { @@ -76,4 +77,16 @@ function getColumns(resp) { assert.deepEqual(tail, ['x', 'y']); } +// Subscription options include SDK-only batch control and accept camel-case alias. +{ + assert.deepEqual( + normalizeSubscriptionOptions({ batch_size: 5, auto_fetch_batches: false }), + { batch_size: 5, auto_fetch_batches: false }, + ); + assert.deepEqual( + normalizeSubscriptionOptions({ autoFetchBatches: true }), + { auto_fetch_batches: true }, + ); +} + console.log('normalize.test.mjs passed'); diff --git a/ui/index.html b/ui/index.html index 4feace35b..74036318d 100644 --- a/ui/index.html +++ b/ui/index.html @@ -8,6 +8,7 @@
+ diff --git a/ui/src/components/sql-studio-v2/input-form/QueryTabStrip.tsx b/ui/src/components/sql-studio-v2/input-form/QueryTabStrip.tsx index 5e61f1d74..2ea70ace0 100644 --- a/ui/src/components/sql-studio-v2/input-form/QueryTabStrip.tsx +++ b/ui/src/components/sql-studio-v2/input-form/QueryTabStrip.tsx @@ -69,22 +69,46 @@ export function QueryTabStrip({ {tabs.map((tab) => { const isActive = tab.id === activeTabId; const isFlashing = flashingTabIds[tab.id] === true; + const isLive = tab.isLive; + const isLiveConnecting = isLive && tab.liveStatus === "connecting"; + const isLiveConnected = isLive && tab.liveStatus === "connected"; + const isLiveErrored = isLive && tab.liveStatus === "error"; return ( + )}