Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions backend/crates/kalamdb-core/src/sql/impersonation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,6 @@ impl SqlImpersonationService {
.role_for_impersonation_target(&target_user_id);

if can_impersonate_target_user(actor_user_id, actor_role, &target_user_id, target_role) {
self.audit_impersonation_event(
actor_user_id,
actor_role,
target_user_id.as_str(),
Some(&target_user_id),
true,
None,
)
.await;
return Ok(target_user_id);
}

Expand Down
35 changes: 35 additions & 0 deletions backend/tests/misc/auth/test_as_user_impersonation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,41 @@ async fn test_allowed_execute_as_can_select_update_and_delete_target_rows() {
);
}

#[actix_web::test]
#[ntest::timeout(45000)]
async fn test_allowed_execute_as_dml_is_not_audited() {
let server = TestServer::new_shared().await;
let ns = unique_name("as_user_dml_no_audit");
create_user_table(&server, &ns, "profiles").await;

let actor = insert_user(&server, &unique_name("no_audit_actor"), Role::Dba).await;
let target = insert_user(&server, &unique_name("no_audit_target"), Role::User).await;

let insert_sql = format!(
"EXECUTE AS USER '{}' (INSERT INTO {}.profiles (id, value) VALUES ('p1', 'delegated'))",
target.as_str(),
ns
);
let insert_resp = server.execute_sql_as_user(&insert_sql, actor.as_str()).await;
assert_success(&insert_resp, "allowed cross-user insert without audit");

let logs = server
.app_context
.system_tables()
.audit_logs()
.scan_all()
.expect("Failed to read audit log");
assert!(
!logs.iter().any(|entry| {
entry.action == "EXECUTE_AS_USER"
&& entry.target == format!("user:{}", target.as_str())
&& entry.actor_user_id == actor
&& entry.subject_user_id.as_ref() == Some(&target)
}),
"successful execute as DML should not create an audit entry"
);
}

#[actix_web::test]
#[ntest::timeout(45000)]
async fn test_execute_as_stream_table_uses_target_user_scope() {
Expand Down
103 changes: 83 additions & 20 deletions cli/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use indicatif::{ProgressBar, ProgressStyle};
use kalam_client::{
credentials::{CredentialStore, Credentials},
AuthProvider, AuthRefreshCallback, ClusterHealthResponse, ClusterNodeHealth, ConnectionOptions,
KalamLinkClient, KalamLinkError, KalamLinkTimeouts, SubscriptionConfig, SubscriptionOptions,
TimestampFormatter, UploadProgress, UploadProgressCallback,
KalamLinkClient, KalamLinkError, KalamLinkTimeouts, SeqId, SubscriptionConfig,
SubscriptionOptions, TimestampFormatter, UploadProgress, UploadProgressCallback,
};
use rustyline::{
completion::Completer, error::ReadlineError, highlight::Highlighter, hint::Hinter,
Expand Down Expand Up @@ -1703,8 +1703,8 @@ impl CLISession {

/// Extract OPTIONS clause from SUBSCRIBE SQL query.
///
/// Parses `OPTIONS (last_rows=N)` from the SQL and returns cleaned SQL + options.
/// If no OPTIONS found, returns original SQL with default options (last_rows=100).
/// Parses CLI-managed options from `OPTIONS (...)` and returns cleaned SQL + options.
/// If no OPTIONS found, returns original SQL with default subscription options.
///
/// # Examples
/// ```
Expand All @@ -1728,48 +1728,63 @@ impl CLISession {
let clean_sql = sql[..idx].trim().to_string();
let options_str = sql[idx + " OPTIONS".len()..].trim(); // " OPTIONS".len() == 8

// Parse OPTIONS (last_rows=N)
// Parse CLI-managed options from OPTIONS (...)
let options = Self::parse_subscribe_options(options_str);

(clean_sql, options)
}

/// Parse OPTIONS clause value: (last_rows=N)
/// Parse OPTIONS clause value: (last_rows=N, batch_size=N, from=N)
fn parse_subscribe_options(options_str: &str) -> Option<SubscriptionOptions> {
let options_str = options_str.trim();

// Expected format: (last_rows=N) or ( last_rows = N )
// Expected format: (last_rows=N, batch_size=N, from=N)
if !options_str.starts_with('(') || !options_str.ends_with(')') {
eprintln!("Warning: Invalid OPTIONS format, using defaults");
return Some(SubscriptionOptions::default());
}

let inner = options_str[1..options_str.len() - 1].trim();
let mut options = SubscriptionOptions::new();

// Parse last_rows=N
if let Some(equals_idx) = inner.find('=') {
let key = inner[..equals_idx].trim();
let value = inner[equals_idx + 1..].trim();
for pair in inner.split(',') {
let pair = pair.trim();
if pair.is_empty() {
continue;
}

if key.to_lowercase() == "last_rows" {
let Some((key, value)) = pair.split_once('=') else {
eprintln!("Warning: Invalid option '{}', expected key=value", pair);
continue;
};

let key = key.trim().to_ascii_lowercase();
let value = value.trim().trim_matches(['\'', '"']);

if key == "last_rows" {
if let Ok(last_rows) = value.parse::<u32>() {
return Some(SubscriptionOptions::new().with_last_rows(last_rows));
options = options.with_last_rows(last_rows);
} else {
eprintln!("Warning: Invalid last_rows value '{}', using default", value);
}
} else if key.to_lowercase() == "batch_size" {
} else if key == "batch_size" {
if let Ok(batch_size) = value.parse::<usize>() {
return Some(SubscriptionOptions::new().with_batch_size(batch_size));
options = options.with_batch_size(batch_size);
} else {
eprintln!("Warning: Invalid batch_size value '{}', using default", value);
}
} else if key == "from" || key == "from_seq_id" {
if let Ok(seq_id) = value.parse::<i64>() {
options = options.with_from(SeqId::from(seq_id));
} else {
eprintln!("Warning: Invalid from value '{}', using default", value);
}
} else {
eprintln!("Warning: Unknown option '{}', ignoring", key);
}
}

// Default if parsing failed
Some(SubscriptionOptions::default())
Some(options)
}

/// Run a WebSocket subscription
Expand Down Expand Up @@ -2286,9 +2301,9 @@ impl CLISession {
);
}

// Display rows in the same format as snapshots
// Keep startup rows compact so each payload stays on a single CLI line.
for row in rows {
let formatted = serde_json::to_string_pretty(&row).unwrap_or_default();
let formatted = Self::format_row(row);
if self.color {
println!(" \x1b[90m{}\x1b[0m", formatted);
} else {
Expand Down Expand Up @@ -3516,7 +3531,55 @@ mod tests {
let (sql, options) =
CLISession::extract_subscribe_options("SELECT * FROM table OPTIONS (last_rows=50);");
assert_eq!(sql, "SELECT * FROM table");
assert!(options.is_some());
let options = options.expect("options should parse");
assert_eq!(options.last_rows, Some(50));
}

#[test]
fn test_extract_subscribe_options_with_combined_options() {
let (sql, options) = CLISession::extract_subscribe_options(
"SELECT * FROM table OPTIONS (last_rows=20, batch_size=5, from=42);",
);

assert_eq!(sql, "SELECT * FROM table");
let options = options.expect("options should parse");
assert_eq!(options.last_rows, Some(20));
assert_eq!(options.batch_size, Some(5));
assert_eq!(options.from, Some(SeqId::from(42)));
}

#[test]
fn test_extract_subscribe_options_accepts_from_seq_id_alias() {
let (sql, options) = CLISession::extract_subscribe_options(
"SELECT * FROM table OPTIONS (batch_size=10, from_seq_id=99);",
);

assert_eq!(sql, "SELECT * FROM table");
let options = options.expect("options should parse");
assert_eq!(options.batch_size, Some(10));
assert_eq!(options.from, Some(SeqId::from(99)));
}

#[test]
fn test_format_row_returns_compact_json_without_newlines() {
let mut row = kalam_client::RowData::new();
row.insert(
"message".to_string(),
kalam_client::KalamCellValue::text("hello"),
);
row.insert(
"count".to_string(),
kalam_client::KalamCellValue::int(5),
);

let formatted = CLISession::format_row(&row);

assert!(!formatted.contains('\n'));
assert!(!formatted.contains(" "));
assert!(formatted.starts_with('{'));
assert!(formatted.ends_with('}'));
assert!(formatted.contains("\"message\":\"hello\""));
assert!(formatted.contains("\"count\":5"));
}

#[test]
Expand Down
6 changes: 6 additions & 0 deletions examples/chat-with-ai/chat-app.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,9 @@ CREATE TABLE IF NOT EXISTS chat_demo.agent_events (
message TEXT NOT NULL DEFAULT '',
created_at TIMESTAMP NOT NULL DEFAULT NOW()
) WITH (TYPE = 'STREAM', TTL_SECONDS = 10);

-- Insert some initial messages into the chat
INSERT INTO chat_demo.messages (role, author, sender_username, content)
VALUES
('user', 'user_1', 'john_doe', 'Hello everyone!'),
('assistant', 'ai_bot', 'assistant', 'Hi John 👋 How can I help?');
1 change: 1 addition & 0 deletions link/kalam-link-dart/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ impl DartSubscriptionConfig {
batch_size: self.batch_size.map(|v| v as usize),
last_rows: self.last_rows.map(|v| v as u32),
from: self.from.map(kalam_client::SeqId::new),
auto_fetch_batches: None,
}),
ws_url: None,
}
Expand Down
7 changes: 6 additions & 1 deletion link/link-common/src/connection/shared/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,12 @@ pub(super) async fn route_event(
};
let event_time_ms = now_ms();

let auto_request_next_batch = matches!(event, ChangeEvent::InitialDataBatch { .. });
let auto_request_next_batch = matches!(event, ChangeEvent::InitialDataBatch { .. })
&& matched_key
.as_ref()
.and_then(|key| subs.get(key.as_str(&incoming_sub_id)))
.and_then(|entry| entry.options.auto_fetch_batches)
.unwrap_or(true);
let mut next_batch_last_seq = None;
let mut should_request_next_batch = false;

Expand Down
11 changes: 11 additions & 0 deletions link/link-common/src/subscription/models/subscription_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ pub struct SubscriptionOptions {
/// Typically set automatically during reconnection to resume from last received event.
#[serde(skip_serializing_if = "Option::is_none", alias = "from_seq_id")]
pub from: Option<SeqId>,

/// Client-side control for automatically requesting subsequent initial data batches.
/// Skipped on the wire because the backend only needs batch_size/last_rows/from.
#[serde(default, skip_serializing, alias = "autoFetchBatches")]
pub auto_fetch_batches: Option<bool>,
}

impl SubscriptionOptions {
Expand Down Expand Up @@ -72,6 +77,12 @@ impl SubscriptionOptions {
self.with_from(seq_id)
}

/// Control whether the client should automatically request subsequent initial data batches.
pub fn with_auto_fetch_batches(mut self, enabled: bool) -> Self {
self.auto_fetch_batches = Some(enabled);
self
}

/// Check if this has a resume seq_id set.
pub fn has_resume_seq_id(&self) -> bool {
self.from.is_some()
Expand Down
Loading
Loading