diff --git a/backend/crates/kalamdb-core/src/sql/impersonation.rs b/backend/crates/kalamdb-core/src/sql/impersonation.rs index 5b4a0298..9ea5b005 100644 --- a/backend/crates/kalamdb-core/src/sql/impersonation.rs +++ b/backend/crates/kalamdb-core/src/sql/impersonation.rs @@ -87,15 +87,6 @@ impl SqlImpersonationService { .role_for_impersonation_target(&target_user_id); if can_impersonate_target_user(actor_user_id, actor_role, &target_user_id, target_role) { - self.audit_impersonation_event( - actor_user_id, - actor_role, - target_user_id.as_str(), - Some(&target_user_id), - true, - None, - ) - .await; return Ok(target_user_id); } diff --git a/backend/tests/misc/auth/test_as_user_impersonation.rs b/backend/tests/misc/auth/test_as_user_impersonation.rs index 90b929de..496ca7e3 100644 --- a/backend/tests/misc/auth/test_as_user_impersonation.rs +++ b/backend/tests/misc/auth/test_as_user_impersonation.rs @@ -363,6 +363,41 @@ async fn test_allowed_execute_as_can_select_update_and_delete_target_rows() { ); } +#[actix_web::test] +#[ntest::timeout(45000)] +async fn test_allowed_execute_as_dml_is_not_audited() { + let server = TestServer::new_shared().await; + let ns = unique_name("as_user_dml_no_audit"); + create_user_table(&server, &ns, "profiles").await; + + let actor = insert_user(&server, &unique_name("no_audit_actor"), Role::Dba).await; + let target = insert_user(&server, &unique_name("no_audit_target"), Role::User).await; + + let insert_sql = format!( + "EXECUTE AS USER '{}' (INSERT INTO {}.profiles (id, value) VALUES ('p1', 'delegated'))", + target.as_str(), + ns + ); + let insert_resp = server.execute_sql_as_user(&insert_sql, actor.as_str()).await; + assert_success(&insert_resp, "allowed cross-user insert without audit"); + + let logs = server + .app_context + .system_tables() + .audit_logs() + .scan_all() + .expect("Failed to read audit log"); + assert!( + !logs.iter().any(|entry| { + entry.action == "EXECUTE_AS_USER" + && entry.target == format!("user:{}", target.as_str()) + && entry.actor_user_id == actor + && entry.subject_user_id.as_ref() == Some(&target) + }), + "successful execute as DML should not create an audit entry" + ); +} + #[actix_web::test] #[ntest::timeout(45000)] async fn test_execute_as_stream_table_uses_target_user_scope() { diff --git a/cli/src/session.rs b/cli/src/session.rs index d2edf6df..f1e796e5 100644 --- a/cli/src/session.rs +++ b/cli/src/session.rs @@ -24,8 +24,8 @@ use indicatif::{ProgressBar, ProgressStyle}; use kalam_client::{ credentials::{CredentialStore, Credentials}, AuthProvider, AuthRefreshCallback, ClusterHealthResponse, ClusterNodeHealth, ConnectionOptions, - KalamLinkClient, KalamLinkError, KalamLinkTimeouts, SubscriptionConfig, SubscriptionOptions, - TimestampFormatter, UploadProgress, UploadProgressCallback, + KalamLinkClient, KalamLinkError, KalamLinkTimeouts, SeqId, SubscriptionConfig, + SubscriptionOptions, TimestampFormatter, UploadProgress, UploadProgressCallback, }; use rustyline::{ completion::Completer, error::ReadlineError, highlight::Highlighter, hint::Hinter, @@ -1703,8 +1703,8 @@ impl CLISession { /// Extract OPTIONS clause from SUBSCRIBE SQL query. /// - /// Parses `OPTIONS (last_rows=N)` from the SQL and returns cleaned SQL + options. - /// If no OPTIONS found, returns original SQL with default options (last_rows=100). + /// Parses CLI-managed options from `OPTIONS (...)` and returns cleaned SQL + options. + /// If no OPTIONS found, returns original SQL with default subscription options. /// /// # Examples /// ``` @@ -1728,48 +1728,63 @@ impl CLISession { let clean_sql = sql[..idx].trim().to_string(); let options_str = sql[idx + " OPTIONS".len()..].trim(); // " OPTIONS".len() == 8 - // Parse OPTIONS (last_rows=N) + // Parse CLI-managed options from OPTIONS (...) let options = Self::parse_subscribe_options(options_str); (clean_sql, options) } - /// Parse OPTIONS clause value: (last_rows=N) + /// Parse OPTIONS clause value: (last_rows=N, batch_size=N, from=N) fn parse_subscribe_options(options_str: &str) -> Option { let options_str = options_str.trim(); - // Expected format: (last_rows=N) or ( last_rows = N ) + // Expected format: (last_rows=N, batch_size=N, from=N) if !options_str.starts_with('(') || !options_str.ends_with(')') { eprintln!("Warning: Invalid OPTIONS format, using defaults"); return Some(SubscriptionOptions::default()); } let inner = options_str[1..options_str.len() - 1].trim(); + let mut options = SubscriptionOptions::new(); - // Parse last_rows=N - if let Some(equals_idx) = inner.find('=') { - let key = inner[..equals_idx].trim(); - let value = inner[equals_idx + 1..].trim(); + for pair in inner.split(',') { + let pair = pair.trim(); + if pair.is_empty() { + continue; + } - if key.to_lowercase() == "last_rows" { + let Some((key, value)) = pair.split_once('=') else { + eprintln!("Warning: Invalid option '{}', expected key=value", pair); + continue; + }; + + let key = key.trim().to_ascii_lowercase(); + let value = value.trim().trim_matches(['\'', '"']); + + if key == "last_rows" { if let Ok(last_rows) = value.parse::() { - return Some(SubscriptionOptions::new().with_last_rows(last_rows)); + options = options.with_last_rows(last_rows); } else { eprintln!("Warning: Invalid last_rows value '{}', using default", value); } - } else if key.to_lowercase() == "batch_size" { + } else if key == "batch_size" { if let Ok(batch_size) = value.parse::() { - return Some(SubscriptionOptions::new().with_batch_size(batch_size)); + options = options.with_batch_size(batch_size); } else { eprintln!("Warning: Invalid batch_size value '{}', using default", value); } + } else if key == "from" || key == "from_seq_id" { + if let Ok(seq_id) = value.parse::() { + options = options.with_from(SeqId::from(seq_id)); + } else { + eprintln!("Warning: Invalid from value '{}', using default", value); + } } else { eprintln!("Warning: Unknown option '{}', ignoring", key); } } - // Default if parsing failed - Some(SubscriptionOptions::default()) + Some(options) } /// Run a WebSocket subscription @@ -2286,9 +2301,9 @@ impl CLISession { ); } - // Display rows in the same format as snapshots + // Keep startup rows compact so each payload stays on a single CLI line. for row in rows { - let formatted = serde_json::to_string_pretty(&row).unwrap_or_default(); + let formatted = Self::format_row(row); if self.color { println!(" \x1b[90m{}\x1b[0m", formatted); } else { @@ -3516,7 +3531,55 @@ mod tests { let (sql, options) = CLISession::extract_subscribe_options("SELECT * FROM table OPTIONS (last_rows=50);"); assert_eq!(sql, "SELECT * FROM table"); - assert!(options.is_some()); + let options = options.expect("options should parse"); + assert_eq!(options.last_rows, Some(50)); + } + + #[test] + fn test_extract_subscribe_options_with_combined_options() { + let (sql, options) = CLISession::extract_subscribe_options( + "SELECT * FROM table OPTIONS (last_rows=20, batch_size=5, from=42);", + ); + + assert_eq!(sql, "SELECT * FROM table"); + let options = options.expect("options should parse"); + assert_eq!(options.last_rows, Some(20)); + assert_eq!(options.batch_size, Some(5)); + assert_eq!(options.from, Some(SeqId::from(42))); + } + + #[test] + fn test_extract_subscribe_options_accepts_from_seq_id_alias() { + let (sql, options) = CLISession::extract_subscribe_options( + "SELECT * FROM table OPTIONS (batch_size=10, from_seq_id=99);", + ); + + assert_eq!(sql, "SELECT * FROM table"); + let options = options.expect("options should parse"); + assert_eq!(options.batch_size, Some(10)); + assert_eq!(options.from, Some(SeqId::from(99))); + } + + #[test] + fn test_format_row_returns_compact_json_without_newlines() { + let mut row = kalam_client::RowData::new(); + row.insert( + "message".to_string(), + kalam_client::KalamCellValue::text("hello"), + ); + row.insert( + "count".to_string(), + kalam_client::KalamCellValue::int(5), + ); + + let formatted = CLISession::format_row(&row); + + assert!(!formatted.contains('\n')); + assert!(!formatted.contains(" ")); + assert!(formatted.starts_with('{')); + assert!(formatted.ends_with('}')); + assert!(formatted.contains("\"message\":\"hello\"")); + assert!(formatted.contains("\"count\":5")); } #[test] diff --git a/examples/chat-with-ai/chat-app.sql b/examples/chat-with-ai/chat-app.sql index ad6a41ad..17074f40 100644 --- a/examples/chat-with-ai/chat-app.sql +++ b/examples/chat-with-ai/chat-app.sql @@ -23,3 +23,9 @@ CREATE TABLE IF NOT EXISTS chat_demo.agent_events ( message TEXT NOT NULL DEFAULT '', created_at TIMESTAMP NOT NULL DEFAULT NOW() ) WITH (TYPE = 'STREAM', TTL_SECONDS = 10); + +-- Insert some initial messages into the chat +INSERT INTO chat_demo.messages (role, author, sender_username, content) +VALUES + ('user', 'user_1', 'john_doe', 'Hello everyone!'), + ('assistant', 'ai_bot', 'assistant', 'Hi John 👋 How can I help?'); diff --git a/link/kalam-link-dart/src/models.rs b/link/kalam-link-dart/src/models.rs index 47729d27..ee51c516 100644 --- a/link/kalam-link-dart/src/models.rs +++ b/link/kalam-link-dart/src/models.rs @@ -481,6 +481,7 @@ impl DartSubscriptionConfig { batch_size: self.batch_size.map(|v| v as usize), last_rows: self.last_rows.map(|v| v as u32), from: self.from.map(kalam_client::SeqId::new), + auto_fetch_batches: None, }), ws_url: None, } diff --git a/link/link-common/src/connection/shared/routing.rs b/link/link-common/src/connection/shared/routing.rs index 4e3d6508..37dced28 100644 --- a/link/link-common/src/connection/shared/routing.rs +++ b/link/link-common/src/connection/shared/routing.rs @@ -65,7 +65,12 @@ pub(super) async fn route_event( }; let event_time_ms = now_ms(); - let auto_request_next_batch = matches!(event, ChangeEvent::InitialDataBatch { .. }); + let auto_request_next_batch = matches!(event, ChangeEvent::InitialDataBatch { .. }) + && matched_key + .as_ref() + .and_then(|key| subs.get(key.as_str(&incoming_sub_id))) + .and_then(|entry| entry.options.auto_fetch_batches) + .unwrap_or(true); let mut next_batch_last_seq = None; let mut should_request_next_batch = false; diff --git a/link/link-common/src/subscription/models/subscription_options.rs b/link/link-common/src/subscription/models/subscription_options.rs index 9a141742..86721b49 100644 --- a/link/link-common/src/subscription/models/subscription_options.rs +++ b/link/link-common/src/subscription/models/subscription_options.rs @@ -39,6 +39,11 @@ pub struct SubscriptionOptions { /// Typically set automatically during reconnection to resume from last received event. #[serde(skip_serializing_if = "Option::is_none", alias = "from_seq_id")] pub from: Option, + + /// Client-side control for automatically requesting subsequent initial data batches. + /// Skipped on the wire because the backend only needs batch_size/last_rows/from. + #[serde(default, skip_serializing, alias = "autoFetchBatches")] + pub auto_fetch_batches: Option, } impl SubscriptionOptions { @@ -72,6 +77,12 @@ impl SubscriptionOptions { self.with_from(seq_id) } + /// Control whether the client should automatically request subsequent initial data batches. + pub fn with_auto_fetch_batches(mut self, enabled: bool) -> Self { + self.auto_fetch_batches = Some(enabled); + self + } + /// Check if this has a resume seq_id set. pub fn has_resume_seq_id(&self) -> bool { self.from.is_some() diff --git a/link/link-common/src/wasm/client.rs b/link/link-common/src/wasm/client.rs index 9921fa97..8e54e7d1 100644 --- a/link/link-common/src/wasm/client.rs +++ b/link/link-common/src/wasm/client.rs @@ -25,9 +25,12 @@ use super::{ }, wasm_debug_log, }; -use crate::models::{ - ClientMessage, ConnectionOptions, SerializationType, ServerMessage, SubscriptionOptions, - SubscriptionRequest, +use crate::{ + models::{ + ChangeEvent, ClientMessage, ConnectionOptions, SerializationType, ServerMessage, + SubscriptionOptions, SubscriptionRequest, + }, + seq_id::SeqId, }; #[derive(Serialize)] @@ -165,7 +168,12 @@ impl KalamClient { "KalamClient: Sending subscribe request - id: {}, sql: {}", subscription_id, sql )); - if let Err(error) = send_ws_message(ws, &subscribe_msg, self.negotiated_ser.get()) { + if let Err(error) = send_ws_message_traced( + ws, + &subscribe_msg, + self.negotiated_ser.get(), + &self.on_send_cb, + ) { self.subscription_state.borrow_mut().remove(&subscription_id); return Err(error); } @@ -202,6 +210,44 @@ impl KalamClient { } } +fn emit_ws_send(on_send_cb: &Rc>>, msg: &ClientMessage) { + let Some(cb) = on_send_cb.borrow().as_ref().cloned() else { + return; + }; + if let Ok(json) = serde_json::to_string(msg) { + let _ = cb.call1(&JsValue::NULL, &JsValue::from_str(&json)); + } +} + +fn send_ws_message_traced( + ws: &WebSocket, + msg: &ClientMessage, + serialization: SerializationType, + on_send_cb: &Rc>>, +) -> Result<(), JsValue> { + send_ws_message(ws, msg, serialization)?; + emit_ws_send(on_send_cb, msg); + Ok(()) +} + +fn next_batch_message(subscription_id: &str, last_seq_id: Option) -> ClientMessage { + ClientMessage::NextBatch { + subscription_id: subscription_id.to_string(), + last_seq_id, + } +} + +fn send_next_batch_traced( + ws: &WebSocket, + subscription_id: &str, + last_seq_id: Option, + serialization: SerializationType, + on_send_cb: &Rc>>, +) -> Result<(), JsValue> { + let msg = next_batch_message(subscription_id, last_seq_id); + send_ws_message_traced(ws, &msg, serialization, on_send_cb) +} + fn reject_pending_subscriptions( subscriptions: &Rc>>, message: &str, @@ -242,10 +288,11 @@ struct SubscriptionDispatch { payload: Option, resolve_subscribe: Option, reject_subscribe: Option<(js_sys::Function, String)>, + next_batch: Option<(String, Option)>, } impl SubscriptionDispatch { - fn invoke(self) { + fn invoke(self) -> Option<(String, Option)> { if let Some(cb) = self.callback { if let Some(payload) = self.payload { let _ = cb.call1(&JsValue::NULL, &JsValue::from_str(&payload)); @@ -257,6 +304,7 @@ impl SubscriptionDispatch { if let Some((reject, reason)) = self.reject_subscribe { let _ = reject.call1(&JsValue::NULL, &JsValue::from_str(&reason)); } + self.next_batch } } @@ -357,6 +405,7 @@ fn dispatch_subscription_server_message( let mut payload = None; let mut resolve_subscribe = None; let mut reject_subscribe = None; + let mut next_batch = None; let mut remove_state = false; { @@ -366,6 +415,11 @@ fn dispatch_subscription_server_message( if let Some(filtered_event) = filter_subscription_event(&state.options, event) { track_subscription_checkpoint(&mut state.last_seq_id, &filtered_event); payload = callback_payload(&mut state.callback_mode, &filtered_event); + if let ChangeEvent::InitialDataBatch { batch_control, .. } = filtered_event { + if batch_control.has_more && state.options.auto_fetch_batches.unwrap_or(false) { + next_batch = Some((client_id.to_string(), state.last_seq_id)); + } + } } match event { @@ -403,6 +457,7 @@ fn dispatch_subscription_server_message( payload, resolve_subscribe, reject_subscribe, + next_batch, }) } @@ -461,6 +516,7 @@ fn schedule_auto_reconnect( on_disconnect_cb: Rc>>, on_error_cb: Rc>>, on_receive_cb: Rc>>, + on_send_cb: Rc>>, negotiated_ser: Rc>, ) { let (delay, disable_compression) = { @@ -524,6 +580,7 @@ fn schedule_auto_reconnect( let reconnect_on_disconnect = Rc::clone(&on_disconnect_cb); let reconnect_on_error = Rc::clone(&on_error_cb); let reconnect_on_receive = Rc::clone(&on_receive_cb); + let reconnect_on_send = Rc::clone(&on_send_cb); let reconnect_negotiated_ser = Rc::clone(&negotiated_ser); let next_url = url.clone(); let next_auth = auth.clone(); @@ -538,6 +595,7 @@ fn schedule_auto_reconnect( let next_on_disconnect = Rc::clone(&on_disconnect_cb); let next_on_error = Rc::clone(&on_error_cb); let next_on_receive = Rc::clone(&on_receive_cb); + let next_on_send = Rc::clone(&on_send_cb); let next_negotiated_ser = Rc::clone(&negotiated_ser); wasm_bindgen_futures::spawn_local(async move { @@ -563,6 +621,7 @@ fn schedule_auto_reconnect( &ws, Rc::clone(&reconnect_subscription_state), Rc::clone(&reconnect_on_receive), + Rc::clone(&reconnect_on_send), Rc::clone(&reconnect_negotiated_ser), ); if let Some(cb) = reconnect_on_connect.borrow().as_ref() { @@ -585,12 +644,14 @@ fn schedule_auto_reconnect( Rc::clone(&next_on_disconnect), Rc::clone(&next_on_error), Rc::clone(&next_on_receive), + Rc::clone(&next_on_send), Rc::clone(&next_negotiated_ser), ); resubscribe_all( Rc::clone(&reconnect_ws_ref), Rc::clone(&reconnect_subscription_state), reconnect_negotiated_ser.get(), + Some(Rc::clone(&reconnect_on_send)), ) .await; reconnect::restart_ping_timer( @@ -618,6 +679,7 @@ fn schedule_auto_reconnect( next_on_disconnect, next_on_error, next_on_receive, + next_on_send, next_negotiated_ser, ); }, @@ -695,8 +757,10 @@ fn install_runtime_message_handler( ws: &WebSocket, subscriptions: Rc>>, on_receive_cb: Rc>>, + on_send_cb: Rc>>, negotiated_ser: Rc>, ) { + let ws_for_next_batch = ws.clone(); let onmessage_callback = Closure::wrap(Box::new(move |e: MessageEvent| { let event: Option = (|| { let data = e.data(); @@ -720,7 +784,15 @@ fn install_runtime_message_handler( if let Some(event) = event { if let Some(dispatch) = dispatch_subscription_server_message(&subscriptions, &event) { - dispatch.invoke(); + if let Some((subscription_id, last_seq_id)) = dispatch.invoke() { + let _ = send_next_batch_traced( + &ws_for_next_batch, + &subscription_id, + last_seq_id, + negotiated_ser.get(), + &on_send_cb, + ); + } } } }) as Box); @@ -1242,9 +1314,11 @@ impl KalamClient { let auth_handled = Rc::new(RefCell::new(!requires_auth)); // Already handled if anonymous let auth_handled_clone = Rc::clone(&auth_handled); let on_receive_for_msg = Rc::clone(&self.on_receive_cb); + let on_send_for_msg = Rc::clone(&self.on_send_cb); let on_connect_for_msg = Rc::clone(&self.on_connect_cb); let on_error_for_msg = Rc::clone(&self.on_error_cb); let negotiated_ser_for_msg = Rc::clone(&self.negotiated_ser); + let ws_for_next_batch = ws.clone(); let onmessage_callback = Closure::wrap(Box::new(move |e: MessageEvent| { // Try to parse the message as a ServerMessage. @@ -1331,7 +1405,15 @@ impl KalamClient { } if let Some(dispatch) = dispatch_subscription_server_message(&subscriptions, &event) { - dispatch.invoke(); + if let Some((subscription_id, last_seq_id)) = dispatch.invoke() { + let _ = send_next_batch_traced( + &ws_for_next_batch, + &subscription_id, + last_seq_id, + negotiated_ser_for_msg.get(), + &on_send_for_msg, + ); + } } }) as Box); ws.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref())); @@ -1357,6 +1439,7 @@ impl KalamClient { Rc::clone(&self.ws), Rc::clone(&self.subscription_state), self.negotiated_ser.get(), + Some(Rc::clone(&self.on_send_cb)), ) .await; } @@ -1421,12 +1504,44 @@ impl KalamClient { pub fn send_ping(&self) -> Result<(), JsValue> { if let Some(ws) = self.ws.borrow().as_ref() { if ws.ready_state() == WebSocket::OPEN { - send_ws_message(ws, &ClientMessage::Ping, self.negotiated_ser.get())?; + send_ws_message_traced( + ws, + &ClientMessage::Ping, + self.negotiated_ser.get(), + &self.on_send_cb, + )?; } } Ok(()) } + /// Request the next initial-data batch for a subscription. + #[wasm_bindgen(js_name = requestNextBatch)] + pub fn request_next_batch(&self, subscription_id: String) -> Result<(), JsValue> { + let last_seq_id = self + .subscription_state + .borrow() + .get(&subscription_id) + .map(|state| state.last_seq_id) + .ok_or_else(|| JsValue::from_str("Unknown subscription id"))?; + + let Some(ws) = self.ws.borrow().as_ref().cloned() else { + return Err(JsValue::from_str("WebSocket connection is unavailable")); + }; + + if ws.ready_state() != WebSocket::OPEN { + return Err(JsValue::from_str("WebSocket connection is not open")); + } + + send_next_batch_traced( + &ws, + &subscription_id, + last_seq_id, + self.negotiated_ser.get(), + &self.on_send_cb, + ) + } + /// Start the internal keepalive ping interval (idempotent). /// /// Called automatically by `connect()` and after a successful reconnect. @@ -1714,7 +1829,12 @@ impl KalamClient { let unsubscribe_msg = ClientMessage::Unsubscribe { subscription_id: subscription_id.clone(), }; - send_ws_message(ws, &unsubscribe_msg, self.negotiated_ser.get())?; + send_ws_message_traced( + ws, + &unsubscribe_msg, + self.negotiated_ser.get(), + &self.on_send_cb, + )?; } wasm_debug_log!(&format!("KalamClient: Unsubscribed from: {}", subscription_id)); @@ -2045,6 +2165,7 @@ impl KalamClient { Rc::clone(&self.on_disconnect_cb), Rc::clone(&self.on_error_cb), Rc::clone(&self.on_receive_cb), + Rc::clone(&self.on_send_cb), Rc::clone(&self.negotiated_ser), ); } @@ -2066,6 +2187,7 @@ fn install_auto_reconnect_listener( on_disconnect_cb: Rc>>, on_error_cb: Rc>>, on_receive_cb: Rc>>, + on_send_cb: Rc>>, negotiated_ser: Rc>, ) { let source_ws = ws.clone(); @@ -2091,6 +2213,7 @@ fn install_auto_reconnect_listener( Rc::clone(&on_disconnect_cb), Rc::clone(&on_error_cb), Rc::clone(&on_receive_cb), + Rc::clone(&on_send_cb), Rc::clone(&negotiated_ser), ); }) as Box); diff --git a/link/link-common/src/wasm/reconnect.rs b/link/link-common/src/wasm/reconnect.rs index c5ff6721..0635cbb5 100644 --- a/link/link-common/src/wasm/reconnect.rs +++ b/link/link-common/src/wasm/reconnect.rs @@ -199,6 +199,7 @@ pub(crate) async fn resubscribe_all( ws_ref: Rc>>, subscription_state: Rc>>, negotiated_ser: SerializationType, + on_send_cb: Option>>>, ) { let states: Vec<(String, SubscriptionState)> = { let mut subs = subscription_state.borrow_mut(); @@ -239,6 +240,16 @@ pub(crate) async fn resubscribe_all( "KalamClient: Failed to re-subscribe to {}: {:?}", subscription_id, _e )); + } else if let Some(on_send) = on_send_cb.as_ref() { + if let (Some(cb), Ok(json)) = ( + on_send.borrow().as_ref().cloned(), + serde_json::to_string(&subscribe_msg), + ) { + let _ = cb.call1( + &wasm_bindgen::JsValue::NULL, + &wasm_bindgen::JsValue::from_str(&json), + ); + } } } } diff --git a/link/sdks/typescript/client/src/client.ts b/link/sdks/typescript/client/src/client.ts index bf077550..eb0317d8 100644 --- a/link/sdks/typescript/client/src/client.ts +++ b/link/sdks/typescript/client/src/client.ts @@ -39,6 +39,7 @@ import type { QueryResponse, SubscriptionCallback, SubscriptionErrorEvent, + SubscriptionHandle, SubscriptionOptions, Unsubscribe, UploadProgress, @@ -1184,6 +1185,18 @@ export class KalamDBClient { callback: SubscriptionCallback, options?: SubscriptionOptions, ): Promise { + const handle = await this.subscribeWithSqlHandle(sql, callback, options); + return handle.unsubscribe; + } + + /** + * Subscribe to a SQL query and receive a handle for explicit batch control. + */ + async subscribeWithSqlHandle( + sql: string, + callback: SubscriptionCallback, + options?: SubscriptionOptions, + ): Promise { this.log(LogLevel.Debug, 'subscription', `Subscribing to: ${sql.substring(0, 120)}`); const optionsJson = this.stringifyOptions(normalizeSubscriptionOptions(options)); const subscriptionId = await this.startTrackedSubscription( @@ -1195,11 +1208,30 @@ export class KalamDBClient { ), ); - return async () => { - await this.unsubscribe(subscriptionId); + return { + id: subscriptionId, + unsubscribe: async () => { + await this.unsubscribe(subscriptionId); + }, + requestNextBatch: async () => { + await this.requestNextBatch(subscriptionId); + }, }; } + /** Request the next initial-data batch for an active subscription. */ + async requestNextBatch(subscriptionId: string): Promise { + await this.ensureConnected(); + const wasmClient = this.wasmClient; + if (!wasmClient) { + throw new Error('WASM client not initialized'); + } + + await this.serializeWasmCall(() => Promise.resolve( + (wasmClient as unknown as { requestNextBatch: (id: string) => void }).requestNextBatch(subscriptionId), + )); + } + /** * Subscribe to a SQL query and receive the SDK-maintained current row set. * diff --git a/link/sdks/typescript/client/src/helpers/subscription_helpers.ts b/link/sdks/typescript/client/src/helpers/subscription_helpers.ts index 709bd6a8..ea9363a1 100644 --- a/link/sdks/typescript/client/src/helpers/subscription_helpers.ts +++ b/link/sdks/typescript/client/src/helpers/subscription_helpers.ts @@ -10,12 +10,12 @@ import type { export function normalizeSubscriptionOptions( options?: SubscriptionOptions, -): { batch_size?: number; last_rows?: number; from?: string } | undefined { +): { batch_size?: number; last_rows?: number; from?: string; auto_fetch_batches?: boolean } | undefined { if (!options) { return undefined; } - const normalized: { batch_size?: number; last_rows?: number; from?: string } = {}; + const normalized: { batch_size?: number; last_rows?: number; from?: string; auto_fetch_batches?: boolean } = {}; if (options.batch_size !== undefined) { normalized.batch_size = options.batch_size; @@ -29,6 +29,11 @@ export function normalizeSubscriptionOptions( normalized.from = options.from instanceof SeqId ? options.from.toString() : SeqId.from(options.from).toString(); } + const autoFetchBatches = options.auto_fetch_batches ?? options.autoFetchBatches; + if (autoFetchBatches !== undefined) { + normalized.auto_fetch_batches = autoFetchBatches; + } + return Object.keys(normalized).length > 0 ? normalized : undefined; } diff --git a/link/sdks/typescript/client/src/index.ts b/link/sdks/typescript/client/src/index.ts index 1589842e..1b8921dc 100644 --- a/link/sdks/typescript/client/src/index.ts +++ b/link/sdks/typescript/client/src/index.ts @@ -87,6 +87,7 @@ export type { ServerMessage, SubscriptionCallback, SubscriptionErrorEvent, + SubscriptionHandle, SubscriptionInfo, SubscriptionOptions, TimestampFormat, diff --git a/link/sdks/typescript/client/src/types.ts b/link/sdks/typescript/client/src/types.ts index 0aef7074..a3bb922d 100644 --- a/link/sdks/typescript/client/src/types.ts +++ b/link/sdks/typescript/client/src/types.ts @@ -203,6 +203,10 @@ export interface SubscriptionOptions { last_rows?: number; /** Resume from a specific sequence ID. */ from?: WireSeqId; + /** Request every initial-data batch automatically when the server has more rows. */ + auto_fetch_batches?: boolean; + /** Camel-case alias accepted by the WASM layer. Prefer auto_fetch_batches. */ + autoFetchBatches?: boolean; } /* ================================================================== */ @@ -371,6 +375,18 @@ export interface LiveRowsOptions { */ export type Unsubscribe = () => Promise; +/** + * Handle for a live subscription with explicit batch control. + */ +export interface SubscriptionHandle { + /** Server/client subscription id used in WebSocket messages. */ + id: string; + /** Unsubscribe from this subscription. */ + unsubscribe: Unsubscribe; + /** Fetch the next initial-data batch for this subscription. */ + requestNextBatch: () => Promise; +} + /* ================================================================== */ /* Connection Lifecycle Event Handlers */ /* ================================================================== */ diff --git a/link/sdks/typescript/client/tests/normalize.test.mjs b/link/sdks/typescript/client/tests/normalize.test.mjs index 62a4485b..f43e6240 100644 --- a/link/sdks/typescript/client/tests/normalize.test.mjs +++ b/link/sdks/typescript/client/tests/normalize.test.mjs @@ -1,5 +1,6 @@ import assert from 'node:assert/strict'; import { normalizeQueryResponse, sortColumns, SYSTEM_TABLES_ORDER } from '../dist/src/helpers/query_helpers.js'; +import { normalizeSubscriptionOptions } from '../dist/src/helpers/subscription_helpers.js'; // Helper: build a minimal QueryResponse using the current schema format function makeResp(columnNames, rows) { @@ -76,4 +77,16 @@ function getColumns(resp) { assert.deepEqual(tail, ['x', 'y']); } +// Subscription options include SDK-only batch control and accept camel-case alias. +{ + assert.deepEqual( + normalizeSubscriptionOptions({ batch_size: 5, auto_fetch_batches: false }), + { batch_size: 5, auto_fetch_batches: false }, + ); + assert.deepEqual( + normalizeSubscriptionOptions({ autoFetchBatches: true }), + { auto_fetch_batches: true }, + ); +} + console.log('normalize.test.mjs passed'); diff --git a/ui/index.html b/ui/index.html index 4feace35..74036318 100644 --- a/ui/index.html +++ b/ui/index.html @@ -8,6 +8,7 @@
+ diff --git a/ui/src/components/sql-studio-v2/input-form/QueryTabStrip.tsx b/ui/src/components/sql-studio-v2/input-form/QueryTabStrip.tsx index 5e61f1d7..2ea70ace 100644 --- a/ui/src/components/sql-studio-v2/input-form/QueryTabStrip.tsx +++ b/ui/src/components/sql-studio-v2/input-form/QueryTabStrip.tsx @@ -69,22 +69,46 @@ export function QueryTabStrip({ {tabs.map((tab) => { const isActive = tab.id === activeTabId; const isFlashing = flashingTabIds[tab.id] === true; + const isLive = tab.isLive; + const isLiveConnecting = isLive && tab.liveStatus === "connecting"; + const isLiveConnected = isLive && tab.liveStatus === "connected"; + const isLiveErrored = isLive && tab.liveStatus === "error"; return ( + )}