diff --git a/Cargo.toml b/Cargo.toml index b096a7f40..78da00d1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -125,6 +125,3 @@ significant_drop_tightening = { level = "allow", priority = 2 } module_name_repetitions = { level = "allow", priority = 2 } option_if_let_else = { level = "allow", priority = 2 } needless_for_each = { level = "allow", priority = 2 } # #[derive(OpenApi)] - -[workspace.features] -state-store-query-test = ["executor/state-store-query-test"] diff --git a/crates/api-snowflake-rest-sessions/src/error.rs b/crates/api-snowflake-rest-sessions/src/error.rs index 22243bbfb..ffcc1c97c 100644 --- a/crates/api-snowflake-rest-sessions/src/error.rs +++ b/crates/api-snowflake-rest-sessions/src/error.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use axum::{Json, http, response::IntoResponse}; use error_stack_trace; use http::header::InvalidHeaderValue; diff --git a/crates/api-snowflake-rest/src/server/error.rs b/crates/api-snowflake-rest/src/server/error.rs index c6b341746..02ed9f30a 100644 --- a/crates/api-snowflake-rest/src/server/error.rs +++ b/crates/api-snowflake-rest/src/server/error.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use crate::SqlState; use crate::models::JsonResponse; use crate::models::ResponseData; @@ -16,7 +17,6 @@ use snafu::prelude::*; pub type Result = std::result::Result; -// TBD: Why context at error/source mostly not used in error? #[derive(Snafu)] #[snafu(visibility(pub(crate)))] #[error_stack_trace::debug] diff --git a/crates/catalog-metastore/src/error.rs b/crates/catalog-metastore/src/error.rs index e412361e6..420b8eaa1 100644 --- a/crates/catalog-metastore/src/error.rs +++ b/crates/catalog-metastore/src/error.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use error_stack_trace; use iceberg_rust::error::Error as IcebergError; use iceberg_rust_spec::table_metadata::TableMetadataBuilderError; diff --git a/crates/catalog/src/df_error.rs b/crates/catalog/src/df_error.rs index 67b521efd..5d0d4931e 100644 --- a/crates/catalog/src/df_error.rs +++ b/crates/catalog/src/df_error.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use crate::error::Error; use error_stack_trace; use iceberg_rust::error::Error as IcebergError; diff --git a/crates/catalog/src/error.rs b/crates/catalog/src/error.rs index 5e0a8ba98..7aa7ce583 100644 --- a/crates/catalog/src/error.rs +++ b/crates/catalog/src/error.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use datafusion_common::DataFusionError; use error_stack_trace; use iceberg_rust::error::Error as IcebergError; diff --git a/crates/embucket-lambda/src/config.rs b/crates/embucket-lambda/src/config.rs index 0420a638a..7cd6f197f 100644 --- a/crates/embucket-lambda/src/config.rs +++ b/crates/embucket-lambda/src/config.rs @@ -70,6 +70,7 @@ impl EnvConfig { pub fn execution_config(&self) -> ExecutionConfig { ExecutionConfig { embucket_version: self.embucket_version.clone(), + build_version: BuildInfo::GIT_DESCRIBE.to_string(), warehouse_type: "LAMBDA_SERVERLESS".to_string(), sql_parser_dialect: self.sql_parser_dialect.clone(), query_timeout_secs: self.query_timeout_secs, diff --git a/crates/embucketd/src/main.rs b/crates/embucketd/src/main.rs index b7c5b6b1c..0efb990d9 100644 --- a/crates/embucketd/src/main.rs +++ b/crates/embucketd/src/main.rs @@ -123,6 +123,7 @@ async fn async_main( let execution_cfg = ExecutionConfig { embucket_version: BuildInfo::VERSION.to_string(), + build_version: BuildInfo::GIT_DESCRIBE.to_string(), warehouse_type: "EMBUCKET".to_string(), sql_parser_dialect: opts.sql_parser_dialect.clone(), query_timeout_secs: opts.query_timeout_secs, diff --git a/crates/executor/src/error.rs b/crates/executor/src/error.rs index edbab26cb..3d08461ef 100644 --- a/crates/executor/src/error.rs +++ b/crates/executor/src/error.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use super::snowflake_error::SnowflakeError; use crate::query_types::{ExecutionStatus, QueryId}; use catalog::error::Error as CatalogError; diff --git a/crates/executor/src/query.rs b/crates/executor/src/query.rs index f6b122dd7..312c15d56 100644 --- a/crates/executor/src/query.rs +++ b/crates/executor/src/query.rs @@ -18,6 +18,7 @@ use crate::datafusion::physical_plan::merge::{ use crate::datafusion::rewriters::session_context::SessionContextExprRewriter; use crate::error::{OperationOn, OperationType}; use crate::models::{QueryContext, QueryMetric, QueryResult, metrics_set_to_json}; +use crate::query_types::{DdlStType, DmlStType, MiscStType, QueryStats, QueryType}; use catalog::table::{CachingTable, IcebergTableBuilder}; use catalog_metastore::{ AwsAccessKeyCredentials, AwsCredentials, FileVolume, Metastore, S3TablesVolume, S3Volume, @@ -245,6 +246,81 @@ impl UserQuery { Ok(()) } + #[allow(clippy::too_many_lines)] + #[instrument( + name = "UserQuery::update_query_type", + level = "debug", + skip(self), + fields( + query_type, + query_id = self.query_context.query_id.to_string(), + ), + )] + fn update_stats_query_type(&self, statement: &DFStatement) { + let save = |query_type: QueryType| { + self.running_queries.update_stats( + self.query_context.query_id, + &QueryStats::default().with_query_type(query_type), + ); + }; + + if let DFStatement::Statement(s) = statement { + match **s { + // match DML statements: + Statement::Delete { .. } => save(QueryType::Dml(DmlStType::Delete)), + Statement::Update { .. } => save(QueryType::Dml(DmlStType::Update)), + Statement::Insert { .. } => save(QueryType::Dml(DmlStType::Insert)), + Statement::Truncate { .. } => save(QueryType::Dml(DmlStType::Truncate)), + Statement::Query(..) => save(QueryType::Dml(DmlStType::Select)), + Statement::Merge { .. } => save(QueryType::Dml(DmlStType::Merge)), + // match DDL statements: + Statement::AlterSession { .. } => save(QueryType::Ddl(DdlStType::AlterSession)), + Statement::AlterTable { .. } => save(QueryType::Ddl(DdlStType::AlterTable)), + Statement::CreateTable { .. } => save(QueryType::Ddl(DdlStType::CreateTable)), + Statement::CreateView { .. } => save(QueryType::Ddl(DdlStType::CreateView)), + Statement::CreateDatabase { .. } => save(QueryType::Ddl(DdlStType::CreateDatabase)), + Statement::CreateExternalVolume { .. } => { + save(QueryType::Ddl(DdlStType::CreateVolume)); + } + Statement::CreateSchema { .. } => save(QueryType::Ddl(DdlStType::CreateSchema)), + Statement::CreateStage { .. } => save(QueryType::Ddl(DdlStType::CreateStage)), + Statement::CopyIntoSnowflake { .. } => { + save(QueryType::Ddl(DdlStType::CopyIntoSnowflake)); + } + Statement::Drop { object_type, .. } => match object_type { + ObjectType::Table => save(QueryType::Ddl(DdlStType::DropTable)), + ObjectType::View => save(QueryType::Ddl(DdlStType::DropView)), + ObjectType::MaterializedView => { + save(QueryType::Ddl(DdlStType::DropMaterializedView)); + } + ObjectType::Schema => save(QueryType::Ddl(DdlStType::DropSchema)), + ObjectType::Database => save(QueryType::Ddl(DdlStType::DropDatabase)), + ObjectType::Stage => save(QueryType::Ddl(DdlStType::DropStage)), + _ => {} + }, + // match other statements: + Statement::Use(..) => save(QueryType::Misc(MiscStType::Use)), + Statement::Set(..) => save(QueryType::Misc(MiscStType::Set)), + Statement::StartTransaction { .. } => save(QueryType::Misc(MiscStType::Begin)), + Statement::Commit { .. } => save(QueryType::Misc(MiscStType::Commit)), + Statement::Rollback { .. } => save(QueryType::Misc(MiscStType::Rollback)), + Statement::ShowColumns { .. } => save(QueryType::Misc(MiscStType::ShowColumns)), + Statement::ShowFunctions { .. } => save(QueryType::Misc(MiscStType::ShowFunctions)), + Statement::ShowObjects { .. } => save(QueryType::Misc(MiscStType::ShowObjects)), + Statement::ShowVariables { .. } => save(QueryType::Misc(MiscStType::ShowVariables)), + Statement::ShowVariable { .. } => save(QueryType::Misc(MiscStType::ShowVariable)), + Statement::ShowDatabases { .. } => save(QueryType::Misc(MiscStType::ShowDatabases)), + Statement::ShowSchemas { .. } => save(QueryType::Misc(MiscStType::ShowSchemas)), + Statement::ShowTables { .. } => save(QueryType::Misc(MiscStType::ShowTables)), + Statement::ShowViews { .. } => save(QueryType::Misc(MiscStType::ShowViews)), + Statement::ExplainTable { .. } => save(QueryType::Misc(MiscStType::ExplainTable)), + _ => {} + } + } else if let DFStatement::CreateExternalTable(..) = statement { + save(QueryType::Ddl(DdlStType::CreateExternalTable)); + } + } + #[allow(clippy::too_many_lines)] #[instrument( name = "UserQuery::execute", @@ -264,6 +340,8 @@ impl UserQuery { // Record the result as part of the current span. tracing::Span::current().record("statement", format!("{statement:#?}")); + self.update_stats_query_type(&statement); + // TODO: Code should be organized in a better way // 1. Single place to parse SQL strings into AST // 2. Single place to update AST diff --git a/crates/executor/src/query_task_result.rs b/crates/executor/src/query_task_result.rs index 6dc724b8d..c6008f842 100644 --- a/crates/executor/src/query_task_result.rs +++ b/crates/executor/src/query_task_result.rs @@ -3,10 +3,15 @@ use super::error::Result; use super::error_code::ErrorCode; use super::models::QueryResult; use super::query_types::ExecutionStatus; +cfg_if::cfg_if! { + if #[cfg(feature = "state-store-query")] { + use super::query_types::{DmlStType, QueryType}; + use datafusion::arrow::array::{Int64Array, UInt64Array}; + use state_store::QueryMetric; + } +} use super::snowflake_error::SnowflakeError; use snafu::ResultExt; -#[cfg(feature = "state-store-query")] -use state_store::QueryMetric; use tokio::task::JoinError; use uuid::Uuid; @@ -80,6 +85,37 @@ impl ExecutionTaskResult { } } + #[cfg(feature = "state-store-query")] + pub fn assign_rows_counts_attributes( + &self, + query: &mut state_store::Query, + query_type: QueryType, + ) { + if let Ok(result) = &self.result + && let QueryType::Dml(query_type) = query_type + { + if let DmlStType::Select = query_type { + let rows_count: u64 = result.records.iter().map(|r| r.num_rows() as u64).sum(); + query.set_rows_produced(rows_count); + } else if let Some(rows_count) = value_by_row_column(&result, 0, 0) { + match query_type { + DmlStType::Insert => query.set_rows_inserted(rows_count as u64), + DmlStType::Update => query.set_rows_updated(rows_count as u64), + DmlStType::Delete => query.set_rows_deleted(rows_count as u64), + DmlStType::Truncate => query.set_rows_deleted(rows_count as u64), + DmlStType::Merge => { + // merge has 2 columns, currently map values to insert/select rows counts + query.set_rows_inserted(rows_count as u64); + if let Some(rows_count) = value_by_row_column(&result, 0, 1) { + query.set_rows_produced(rows_count as u64); + } + } + _ => {} + } + } + } + } + #[cfg(feature = "state-store-query")] pub fn assign_query_attributes(&self, query: &mut state_store::Query) { query.set_execution_status(self.execution_status); @@ -107,3 +143,24 @@ impl ExecutionTaskResult { query.set_end_time(); } } + +#[cfg(feature = "state-store-query")] +fn value_by_row_column(result: &QueryResult, row_idx: usize, col_idx: usize) -> Option { + result.records[0].columns().get(col_idx).and_then(|col| { + if let Some(cols) = col.as_any().downcast_ref::() { + if row_idx < cols.len() { + Some(cols.value(row_idx) as u64) + } else { + None + } + } else if let Some(cols) = col.as_any().downcast_ref::() { + if row_idx < cols.len() { + Some(cols.value(row_idx)) + } else { + None + } + } else { + None + } + }) +} diff --git a/crates/executor/src/query_types.rs b/crates/executor/src/query_types.rs index 2c6d85d8c..658abccb3 100644 --- a/crates/executor/src/query_types.rs +++ b/crates/executor/src/query_types.rs @@ -1,4 +1,5 @@ pub type QueryId = uuid::Uuid; +use std::fmt::Display; cfg_if::cfg_if! { if #[cfg(feature = "state-store-query")] { @@ -8,9 +9,94 @@ cfg_if::cfg_if! { use std::fmt::Debug; #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] pub enum ExecutionStatus { + Running, Success, Fail, Incident, } } } + +#[derive(Debug, Clone, strum::Display)] +#[strum(serialize_all = "UPPERCASE")] +pub enum DmlStType { + Select, + Insert, + Update, + Delete, + Truncate, + Merge, +} + +#[derive(Debug, Clone, strum::Display)] +#[strum(serialize_all = "UPPERCASE")] +pub enum DdlStType { + CreateExternalTable, + CreateTable, + CreateView, + CreateDatabase, + CreateVolume, + CreateSchema, + CreateStage, + CopyIntoSnowflake, + DropTable, + DropView, + DropMaterializedView, + DropSchema, + DropDatabase, + DropStage, + AlterTable, + AlterSession, + Drop, +} + +#[derive(Debug, Clone, strum::Display)] +#[strum(serialize_all = "UPPERCASE")] +pub enum MiscStType { + Use, + Set, + Begin, + Commit, + Rollback, + ShowColumns, + ShowFunctions, + ShowVariables, + ShowObjects, + ShowVariable, + ShowDatabases, + ShowSchemas, + ShowTables, + ShowViews, + ExplainTable, +} + +#[derive(Debug, Clone)] +pub enum QueryType { + Dml(DmlStType), + Ddl(DdlStType), + Misc(MiscStType), +} + +impl Display for QueryType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Dml(dml) => write!(f, "{dml}"), + Self::Ddl(ddl) => write!(f, "{ddl}"), + Self::Misc(misc) => write!(f, "{misc}"), + } + } +} + +#[derive(Debug, Clone, Default)] +pub struct QueryStats { + pub query_type: Option, +} + +impl QueryStats { + #[must_use] + pub const fn with_query_type(self, query_type: QueryType) -> Self { + Self { + query_type: Some(query_type), + } + } +} diff --git a/crates/executor/src/running_queries.rs b/crates/executor/src/running_queries.rs index cf2ff674c..fae7b1665 100644 --- a/crates/executor/src/running_queries.rs +++ b/crates/executor/src/running_queries.rs @@ -1,6 +1,6 @@ use super::error::{self as ex_error, Result}; use super::models::QueryResult; -use crate::query_types::{ExecutionStatus, QueryId}; +use crate::query_types::{ExecutionStatus, QueryId, QueryStats}; use dashmap::DashMap; use snafu::{OptionExt, ResultExt}; use std::sync::Arc; @@ -22,6 +22,7 @@ pub struct RunningQuery { // user can be notified when query is finished tx: watch::Sender>, rx: watch::Receiver>, + pub query_stats: QueryStats, } #[derive(Debug, Clone)] @@ -41,6 +42,7 @@ impl RunningQuery { result_handle: None, tx, rx, + query_stats: QueryStats::default(), } } #[must_use] @@ -81,18 +83,12 @@ impl RunningQuery { self.tx.send(Some(status)) } - #[tracing::instrument( - name = "RunningQuery::wait_query_finished", - level = "trace", - skip(self), - err - )] + #[tracing::instrument(name = "RunningQuery::wait_query_finished", level = "trace", err)] pub async fn wait_query_finished( - &self, + mut rx: watch::Receiver>, ) -> std::result::Result { // use loop here to bypass default query status we posted at init // it should not go to the actual loop and should resolve as soon as results are ready - let mut rx = self.rx.clone(); loop { rx.changed().await?; let status = *rx.borrow(); @@ -101,6 +97,13 @@ impl RunningQuery { } } } + + #[tracing::instrument(name = "RunningQuery::update_query_stats", level = "trace", skip(self))] + pub fn update_query_stats(&mut self, stats: &QueryStats) { + if self.query_stats.query_type.is_none() { + self.query_stats.query_type.clone_from(&stats.query_type); + } + } } pub struct RunningQueriesRegistry { @@ -132,12 +135,17 @@ impl RunningQueriesRegistry { err )] pub async fn wait_query_finished(&self, query_id: QueryId) -> Result { - let running_query = self - .queries - .get(&query_id) - .context(ex_error::QueryIsntRunningSnafu { query_id })?; - running_query - .wait_query_finished() + // Should not keep reference to RunningQuery during `wait_query_finished` + // as it causes locking issues when accessing `queries` map during the run + // outside of this call. + let rx = { + let running_query = self + .queries + .get(&query_id) + .context(ex_error::QueryIsntRunningSnafu { query_id })?; + running_query.rx.clone() + }; + RunningQuery::wait_query_finished(rx) .await .context(ex_error::ExecutionStatusRecvSnafu { query_id }) } @@ -152,6 +160,8 @@ pub trait RunningQueries: Send + Sync { fn notify_query_finished(&self, query_id: QueryId, status: ExecutionStatus) -> Result<()>; fn locate_query_id(&self, running_query_id: RunningQueryId) -> Result; fn count(&self) -> usize; + fn cloned_stats(&self, query_id: QueryId) -> Option; + fn update_stats(&self, query_id: QueryId, stats: &QueryStats); } impl RunningQueries for RunningQueriesRegistry { @@ -225,4 +235,25 @@ impl RunningQueries for RunningQueriesRegistry { fn count(&self) -> usize { self.queries.len() } + + fn cloned_stats(&self, query_id: QueryId) -> Option { + if let Some(running_query) = self.queries.get(&query_id) { + Some(running_query.query_stats.clone()) + } else { + None + } + } + + #[tracing::instrument( + name = "RunningQueriesRegistry::update_stats", + level = "trace", + skip(self), + fields(query_id), + ret + )] + fn update_stats(&self, query_id: QueryId, stats: &QueryStats) { + if let Some(mut running_query) = self.queries.get_mut(&query_id) { + running_query.update_query_stats(stats); + } + } } diff --git a/crates/executor/src/service.rs b/crates/executor/src/service.rs index 4d6cdfa8f..b6ad5e234 100644 --- a/crates/executor/src/service.rs +++ b/crates/executor/src/service.rs @@ -528,6 +528,7 @@ impl ExecutionService for CoreExecutionService { ); query.set_execution_status(ExecutionStatus::Running); query.set_warehouse_type(self.config.warehouse_type.clone()); + query.set_release_version(self.config.build_version.clone()); } } @@ -629,6 +630,12 @@ impl ExecutionService for CoreExecutionService { cfg_if::cfg_if! { if #[cfg(feature = "state-store-query")] { execution_result.assign_query_attributes(&mut query); + if let Some(stats) = queries_registry.cloned_stats(query_id) { + if let Some(query_type) = stats.query_type { + query.set_query_type(query_type.to_string()); + execution_result.assign_rows_counts_attributes(&mut query, query_type); + } + } // just log error and do not raise it from task if let Err(err) = state_store.update_query(&query).await { tracing::error!("Failed to update query {query_id}: {err:?}"); diff --git a/crates/executor/src/tests/statestore_queries_unittest.rs b/crates/executor/src/tests/statestore_queries_unittest.rs index 0a30b9721..2e8f72e30 100644 --- a/crates/executor/src/tests/statestore_queries_unittest.rs +++ b/crates/executor/src/tests/statestore_queries_unittest.rs @@ -2,6 +2,7 @@ use crate::models::QueryContext; use crate::service::{CoreExecutionService, ExecutionService}; use crate::utils::Config; use catalog_metastore::InMemoryMetastore; +use catalog_metastore::metastore_bootstrap_config::MetastoreBootstrapConfig; use insta::assert_json_snapshot; use state_store::{MockStateStore, Query, SessionRecord, StateStore}; use std::sync::Arc; @@ -14,7 +15,6 @@ use uuid::Uuid; const TEST_SESSION_ID: &str = "test_session_id"; const TEST_DATABASE: &str = "test_database"; const TEST_SCHEMA: &str = "test_schema"; -const OK_QUERY_TEXT: &str = "SELECT 1 AS a, 2.0 AS b, '3' AS c WHERE False"; const MOCK_RELATED_TIMEOUT_DURATION: Duration = Duration::from_millis(100); @@ -28,8 +28,8 @@ fn insta_settings(name: &str) -> insta::Settings { let mut settings = insta::Settings::new(); settings.set_sort_maps(true); settings.set_description(name); - settings.set_info(&format!("{name}")); settings.add_redaction(".execution_time", "1"); + settings.add_redaction(".query_metrics", "[query_metrics]"); settings.add_filter( r"[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}", "00000000-0000-0000-0000-000000000000", @@ -59,6 +59,7 @@ async fn test_query_lifecycle_ok_query() { .returning(|_| Ok(SessionRecord::new(TEST_SESSION_ID))); state_store_mock .expect_put_query() + .times(1) .returning(|_| Ok(())) // check created query attributes only here (it is expected to be the same for any invocation) .withf(move |query: &Query| { @@ -67,12 +68,13 @@ async fn test_query_lifecycle_ok_query() { { "query_id": "00000000-0000-0000-0000-000000000000", "request_id": "00000000-0000-0000-0000-000000000000", - "query_text": "SELECT 1 AS a, 2.0 AS b, '3' AS c WHERE False", + "query_text": "SELECT 1 AS a, 2.0 AS b, '3' AS 'c'", "session_id": "test_session_id", "warehouse_type": "DEFAULT", "execution_status": "Running", "start_time": "2026-01-01T01:01:01.000000001Z", - "query_hash": "12320374230549905548", + "release_version": "test-version", + "query_hash": "1717924485430328356", "query_hash_version": 1 } "#); @@ -89,25 +91,21 @@ async fn test_query_lifecycle_ok_query() { { "query_id": "00000000-0000-0000-0000-000000000000", "request_id": "00000000-0000-0000-0000-000000000000", - "query_text": "SELECT 1 AS a, 2.0 AS b, '3' AS c WHERE False", + "query_text": "SELECT 1 AS a, 2.0 AS b, '3' AS 'c'", "session_id": "test_session_id", "database_name": "embucket", "schema_name": "public", + "query_type": "SELECT", "warehouse_type": "DEFAULT", "execution_status": "Success", "start_time": "2026-01-01T01:01:01.000000001Z", "end_time": "2026-01-01T01:01:01.000000001Z", + "rows_produced": 1, "execution_time": "1", - "query_hash": "12320374230549905548", + "release_version": "test-version", + "query_hash": "1717924485430328356", "query_hash_version": 1, - "query_metrics": [ - { - "node_id": 0, - "parent_node_id": null, - "operator": "EmptyExec", - "metrics": [] - } - ] + "query_metrics": "[query_metrics]" } "#); }); @@ -125,19 +123,607 @@ async fn test_query_lifecycle_ok_query() { .await .expect("Failed to create execution service"); - execution_svc - .create_session(TEST_SESSION_ID) - .await - .expect("Failed to create session"); + timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.create_session(TEST_SESSION_ID), + ) + .await + .expect("Create session timed out") + .expect("Failed to create session"); // See note about timeout above let _ = timeout( MOCK_RELATED_TIMEOUT_DURATION, - execution_svc.query(TEST_SESSION_ID, OK_QUERY_TEXT, query_context), + execution_svc.query( + TEST_SESSION_ID, + "SELECT 1 AS a, 2.0 AS b, '3' AS 'c'", + query_context, + ), + ) + .await + .expect("Query timed out") + .expect("Query execution failed"); +} + +#[allow(clippy::expect_used)] +#[tokio::test] +async fn test_query_lifecycle_ok_insert() { + let query_context = QueryContext::default().with_request_id(Uuid::default()); + + let mut state_store_mock = MockStateStore::new(); + state_store_mock + .expect_put_new_session() + .returning(|_| Ok(())); + state_store_mock + .expect_get_session() + .returning(|_| Ok(SessionRecord::new(TEST_SESSION_ID))); + state_store_mock + .expect_put_query() + .times(2) + .returning(|_| Ok(())); + + // bypass 1st update + state_store_mock + .expect_update_query() + .times(1) + .returning(|_| Ok(())); + // verify 2nd update + state_store_mock + .expect_update_query() + .times(1) + .returning(|_| Ok(())) + .withf(move |query: &Query| { + insta_settings("ok_insert_update").bind(|| { + assert_json_snapshot!(query, @r#" + { + "query_id": "00000000-0000-0000-0000-000000000000", + "request_id": "00000000-0000-0000-0000-000000000000", + "query_text": "INSERT INTO embucket.public.table VALUES (1)", + "session_id": "test_session_id", + "database_name": "embucket", + "schema_name": "public", + "query_type": "INSERT", + "warehouse_type": "DEFAULT", + "execution_status": "Success", + "start_time": "2026-01-01T01:01:01.000000001Z", + "end_time": "2026-01-01T01:01:01.000000001Z", + "rows_inserted": 1, + "execution_time": "1", + "release_version": "test-version", + "query_hash": "17856184221539895914", + "query_hash_version": 1, + "query_metrics": "[query_metrics]" + } + "#); + }); + true + }); + + let state_store: Arc = Arc::new(state_store_mock); + + let metastore = Arc::new(InMemoryMetastore::new()); + MetastoreBootstrapConfig::bootstrap() + .apply(metastore.clone()) + .await + .expect("Failed to bootstrap metastore"); + + let execution_svc = CoreExecutionService::new_test_executor( + metastore, + state_store, + Arc::new(Config::default()), + ) + .await + .expect("Failed to create execution service"); + + timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.create_session(TEST_SESSION_ID), + ) + .await + .expect("Create session timed out") + .expect("Failed to create session"); + + // prepare table + let _ = timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.query( + TEST_SESSION_ID, + "create table if not exists embucket.public.table (id int)", + query_context.clone(), + ), + ) + .await + .expect("Query timed out") + .expect("Query execution failed"); + + // insert + timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.query( + TEST_SESSION_ID, + "INSERT INTO embucket.public.table VALUES (1)", + query_context, + ), + ) + .await + .expect("Query timed out") + .expect("Query execution failed"); +} + +#[allow(clippy::expect_used)] +#[tokio::test] +async fn test_query_lifecycle_ok_update() { + let query_context = QueryContext::default().with_request_id(Uuid::default()); + + let mut state_store_mock = MockStateStore::new(); + state_store_mock + .expect_put_new_session() + .returning(|_| Ok(())); + state_store_mock + .expect_get_session() + .returning(|_| Ok(SessionRecord::new(TEST_SESSION_ID))); + state_store_mock + .expect_put_query() + .times(2) + .returning(|_| Ok(())); + + // bypass 1st update + state_store_mock + .expect_update_query() + .times(1) + .returning(|_| Ok(())); + // verify 2nd update + state_store_mock + .expect_update_query() + .times(1) + .returning(|_| Ok(())) + .withf(move |query: &Query| { + insta_settings("ok_update_update").bind(|| { + assert_json_snapshot!(query, @r#" + { + "query_id": "00000000-0000-0000-0000-000000000000", + "request_id": "00000000-0000-0000-0000-000000000000", + "query_text": "UPDATE embucket.public.table SET name = 'John'", + "session_id": "test_session_id", + "database_name": "embucket", + "schema_name": "public", + "query_type": "UPDATE", + "warehouse_type": "DEFAULT", + "execution_status": "Success", + "start_time": "2026-01-01T01:01:01.000000001Z", + "end_time": "2026-01-01T01:01:01.000000001Z", + "execution_time": "1", + "release_version": "test-version", + "query_hash": "16763742305627145642", + "query_hash_version": 1, + "query_metrics": "[query_metrics]" + } + "#); + }); + true + }); + + let state_store: Arc = Arc::new(state_store_mock); + + let metastore = Arc::new(InMemoryMetastore::new()); + MetastoreBootstrapConfig::bootstrap() + .apply(metastore.clone()) + .await + .expect("Failed to bootstrap metastore"); + + let execution_svc = CoreExecutionService::new_test_executor( + metastore, + state_store, + Arc::new(Config::default()), + ) + .await + .expect("Failed to create execution service"); + + timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.create_session(TEST_SESSION_ID), + ) + .await + .expect("Create session timed out") + .expect("Failed to create session"); + + // prepare table + let _ = timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.query( + TEST_SESSION_ID, + " + CREATE TABLE embucket.public.table AS SELECT + id, + name, + RANDOM() AS random_value, + CURRENT_TIMESTAMP AS current_time + FROM (VALUES + (1, 'Alice'), + (2, 'Bob'), + (3, 'Charlie'), + (4, 'David') + ) AS t(id, name);", + query_context.clone(), + ), ) .await .expect("Query timed out") - .expect("Query execution stopped by timeout"); + .expect("Query execution failed"); + + // update + timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.query( + TEST_SESSION_ID, + "UPDATE embucket.public.table SET name = 'John'", + query_context, + ), + ) + .await + .expect("Query timed out") + .expect("Query execution failed"); +} + +#[allow(clippy::expect_used)] +#[tokio::test] +async fn test_query_lifecycle_delete_failed() { + let query_context = QueryContext::default().with_request_id(Uuid::default()); + + let mut state_store_mock = MockStateStore::new(); + state_store_mock + .expect_put_new_session() + .returning(|_| Ok(())); + state_store_mock + .expect_get_session() + .returning(|_| Ok(SessionRecord::new(TEST_SESSION_ID))); + state_store_mock + .expect_put_query() + .times(2) + .returning(|_| Ok(())); + + // bypass 1st update + state_store_mock + .expect_update_query() + .times(1) + .returning(|_| Ok(())); + // verify 2nd update + state_store_mock + .expect_update_query() + .times(1) + .returning(|_| Ok(())) + .withf(move |query: &Query| { + insta_settings("ok_truncate_update").bind(|| { + assert_json_snapshot!(query, @r#" + { + "query_id": "00000000-0000-0000-0000-000000000000", + "request_id": "00000000-0000-0000-0000-000000000000", + "query_text": "DELETE FROM embucket.public.table", + "session_id": "test_session_id", + "database_name": "embucket", + "schema_name": "public", + "query_type": "DELETE", + "warehouse_type": "DEFAULT", + "execution_status": "Fail", + "error_code": "010001", + "error_message": "00000000-0000-0000-0000-000000000000: Query execution error: DataFusion error: This feature is not implemented: Unsupported logical plan: Dml(Delete)", + "start_time": "2026-01-01T01:01:01.000000001Z", + "end_time": "2026-01-01T01:01:01.000000001Z", + "execution_time": "1", + "release_version": "test-version", + "query_hash": "13652442282618196356", + "query_hash_version": 1 + } + "#); + }); + true + }); + + let state_store: Arc = Arc::new(state_store_mock); + + let metastore = Arc::new(InMemoryMetastore::new()); + MetastoreBootstrapConfig::bootstrap() + .apply(metastore.clone()) + .await + .expect("Failed to bootstrap metastore"); + + let execution_svc = CoreExecutionService::new_test_executor( + metastore, + state_store, + Arc::new(Config::default()), + ) + .await + .expect("Failed to create execution service"); + + timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.create_session(TEST_SESSION_ID), + ) + .await + .expect("Create session timed out") + .expect("Failed to create session"); + + // prepare table + let _ = timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.query( + TEST_SESSION_ID, + " + CREATE TABLE embucket.public.table AS SELECT + id, + name, + RANDOM() AS random_value, + CURRENT_TIMESTAMP AS current_time + FROM (VALUES + (1, 'Alice'), + (2, 'Bob'), + (3, 'Charlie'), + (4, 'David') + ) AS t(id, name);", + query_context.clone(), + ), + ) + .await + .expect("Query timed out") + .expect("Query execution failed"); + + // update + let _ = timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.query( + TEST_SESSION_ID, + "DELETE FROM embucket.public.table", + query_context, + ), + ) + .await + .expect("Query timed out") + .expect_err("Query expected to fail"); +} + +#[allow(clippy::expect_used)] +#[tokio::test] +async fn test_query_lifecycle_ok_truncate() { + let query_context = QueryContext::default().with_request_id(Uuid::default()); + + let mut state_store_mock = MockStateStore::new(); + state_store_mock + .expect_put_new_session() + .returning(|_| Ok(())); + state_store_mock + .expect_get_session() + .returning(|_| Ok(SessionRecord::new(TEST_SESSION_ID))); + state_store_mock + .expect_put_query() + .times(2) + .returning(|_| Ok(())); + + // bypass 1st update + state_store_mock + .expect_update_query() + .times(1) + .returning(|_| Ok(())); + // verify 2nd update + state_store_mock + .expect_update_query() + .times(1) + .returning(|_| Ok(())) + .withf(move |query: &Query| { + insta_settings("ok_truncate_update").bind(|| { + assert_json_snapshot!(query, @r#" + { + "query_id": "00000000-0000-0000-0000-000000000000", + "request_id": "00000000-0000-0000-0000-000000000000", + "query_text": "TRUNCATE TABLE embucket.public.table", + "session_id": "test_session_id", + "database_name": "embucket", + "schema_name": "public", + "query_type": "TRUNCATE", + "warehouse_type": "DEFAULT", + "execution_status": "Success", + "start_time": "2026-01-01T01:01:01.000000001Z", + "end_time": "2026-01-01T01:01:01.000000001Z", + "rows_deleted": 0, + "execution_time": "1", + "release_version": "test-version", + "query_hash": "16187825059241168947", + "query_hash_version": 1, + "query_metrics": "[query_metrics]" + } + "#); + }); + true + }); + + let state_store: Arc = Arc::new(state_store_mock); + + let metastore = Arc::new(InMemoryMetastore::new()); + MetastoreBootstrapConfig::bootstrap() + .apply(metastore.clone()) + .await + .expect("Failed to bootstrap metastore"); + + let execution_svc = CoreExecutionService::new_test_executor( + metastore, + state_store, + Arc::new(Config::default()), + ) + .await + .expect("Failed to create execution service"); + + timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.create_session(TEST_SESSION_ID), + ) + .await + .expect("Create session timed out") + .expect("Failed to create session"); + + // prepare table + let _ = timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.query( + TEST_SESSION_ID, + " + CREATE TABLE embucket.public.table AS SELECT + id, + name, + RANDOM() AS random_value, + CURRENT_TIMESTAMP AS current_time + FROM (VALUES + (1, 'Alice'), + (2, 'Bob'), + (3, 'Charlie'), + (4, 'David') + ) AS t(id, name);", + query_context.clone(), + ), + ) + .await + .expect("Query timed out") + .expect("Query execution failed"); + + // update + let _ = timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.query( + TEST_SESSION_ID, + "TRUNCATE TABLE embucket.public.table", + query_context, + ), + ) + .await + .expect("Query timed out") + .expect("Query execution failed"); +} + +#[allow(clippy::expect_used)] +#[tokio::test] +async fn test_query_lifecycle_ok_merge() { + let query_context = QueryContext::default().with_request_id(Uuid::default()); + + let mut state_store_mock = MockStateStore::new(); + state_store_mock + .expect_put_new_session() + .returning(|_| Ok(())); + state_store_mock + .expect_get_session() + .returning(|_| Ok(SessionRecord::new(TEST_SESSION_ID))); + state_store_mock + .expect_put_query() + .times(3) + .returning(|_| Ok(())); + + // bypass first two updates + state_store_mock + .expect_update_query() + .times(2) + .returning(|_| Ok(())); + // verify 3rd update + state_store_mock + .expect_update_query() + .times(1) + .returning(|_| Ok(())) + .withf(move |query: &Query| { + insta_settings("ok_merge_update").bind(|| { + assert_json_snapshot!(query, @r#" + { + "query_id": "00000000-0000-0000-0000-000000000000", + "request_id": "00000000-0000-0000-0000-000000000000", + "query_text": "MERGE INTO t1 USING (SELECT * FROM t2) AS t2 ON t1.a = t2.a WHEN MATCHED THEN UPDATE SET t1.c = t2.c WHEN NOT MATCHED THEN INSERT (a,c) VALUES(t2.a,t2.c)", + "session_id": "test_session_id", + "database_name": "embucket", + "schema_name": "public", + "query_type": "MERGE", + "warehouse_type": "DEFAULT", + "execution_status": "Success", + "start_time": "2026-01-01T01:01:01.000000001Z", + "end_time": "2026-01-01T01:01:01.000000001Z", + "rows_produced": 4, + "rows_inserted": 1, + "execution_time": "1", + "release_version": "test-version", + "query_hash": "16532873076018472935", + "query_hash_version": 1, + "query_metrics": "[query_metrics]" + } + "#); + }); + true + }); + + let state_store: Arc = Arc::new(state_store_mock); + + let metastore = Arc::new(InMemoryMetastore::new()); + MetastoreBootstrapConfig::bootstrap() + .apply(metastore.clone()) + .await + .expect("Failed to bootstrap metastore"); + + let execution_svc = CoreExecutionService::new_test_executor( + metastore, + state_store, + Arc::new(Config::default()), + ) + .await + .expect("Failed to create execution service"); + + timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.create_session(TEST_SESSION_ID), + ) + .await + .expect("Create session timed out") + .expect("Failed to create session"); + + // prepare tables + let _ = timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.query( + TEST_SESSION_ID, + " + CREATE TABLE embucket.public.t1 AS SELECT + a,b,c + FROM (VALUES + (1,'b1','c1'), + (2,'b2','c2'), + (2,'b3','c3'), + (3,'b4','c4') + ) AS t(a, b, c);", + query_context.clone(), + ), + ) + .await + .expect("Query timed out") + .expect("Query execution failed"); + + let _ = timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.query( + TEST_SESSION_ID, + " + CREATE TABLE embucket.public.t2 AS SELECT + a,b,c + FROM (VALUES + (1,'b_5','c_5'), + (3,'b_6','c_6'), + (2,'b_7','c_7'), + (4,'b_8','c_8') + ) AS t(a, b, c);", + query_context.clone(), + ), + ) + .await + .expect("Query timed out") + .expect("Query execution failed"); + + let _ = timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.query(TEST_SESSION_ID, "MERGE INTO t1 USING (SELECT * FROM t2) AS t2 ON t1.a = t2.a WHEN MATCHED THEN UPDATE SET t1.c = t2.c WHEN NOT MATCHED THEN INSERT (a,c) VALUES(t2.a,t2.c)", query_context), + ) + .await + .expect("Query timed out") + .expect("Query execution failed"); } #[allow(clippy::expect_used)] @@ -159,6 +745,7 @@ async fn test_query_lifecycle_query_status_incident_limit_exceeded() { .returning(|_| Ok(SessionRecord::new(TEST_SESSION_ID))); state_store_mock.expect_put_query() .returning(|_| Ok(()) ) + .times(1) // check created query attributes only here (it is expected to be the same for any invocation) .withf(move |query: &Query| { insta_settings("incident_query_put").bind(|| { @@ -175,6 +762,7 @@ async fn test_query_lifecycle_query_status_incident_limit_exceeded() { "start_time": "2026-01-01T01:01:01.000000001Z", "end_time": "2026-01-01T01:01:01.000000001Z", "execution_time": "1", + "release_version": "test-version", "query_hash": "8436521302113462945", "query_hash_version": 1 } @@ -221,6 +809,7 @@ async fn test_query_lifecycle_query_status_fail() { .returning(|_| Ok(SessionRecord::new(TEST_SESSION_ID))); state_store_mock .expect_put_query() + .times(1) .returning(|_| Ok(())) .withf(|query: &Query| { insta_settings("fail_query_put").bind(|| { @@ -233,6 +822,7 @@ async fn test_query_lifecycle_query_status_fail() { "warehouse_type": "DEFAULT", "execution_status": "Running", "start_time": "2026-01-01T01:01:01.000000001Z", + "release_version": "test-version", "query_hash": "17999132521915915058", "query_hash_version": 1 } @@ -253,6 +843,7 @@ async fn test_query_lifecycle_query_status_fail() { "session_id": "test_session_id", "database_name": "embucket", "schema_name": "public", + "query_type": "SELECT", "warehouse_type": "DEFAULT", "execution_status": "Fail", "error_code": "002003", @@ -260,6 +851,7 @@ async fn test_query_lifecycle_query_status_fail() { "start_time": "2026-01-01T01:01:01.000000001Z", "end_time": "2026-01-01T01:01:01.000000001Z", "execution_time": "1", + "release_version": "test-version", "query_hash": "17999132521915915058", "query_hash_version": 1 } @@ -279,10 +871,13 @@ async fn test_query_lifecycle_query_status_fail() { .await .expect("Failed to create execution service"); - execution_svc - .create_session(TEST_SESSION_ID) - .await - .expect("Failed to create session"); + timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.create_session(TEST_SESSION_ID), + ) + .await + .expect("Create session timed out") + .expect("Failed to create session"); // See note about timeout above let _ = timeout( @@ -310,6 +905,7 @@ async fn test_query_lifecycle_query_status_cancelled() { .returning(|_| Ok(SessionRecord::new(TEST_SESSION_ID))); state_store_mock .expect_put_query() + .times(1) .returning(|_| Ok(())) .withf(|query: &Query| { insta_settings("cancelled_query_put").bind(|| { @@ -322,6 +918,7 @@ async fn test_query_lifecycle_query_status_cancelled() { "warehouse_type": "DEFAULT", "execution_status": "Running", "start_time": "2026-01-01T01:01:01.000000001Z", + "release_version": "test-version", "query_hash": "8436521302113462945", "query_hash_version": 1 } @@ -349,6 +946,7 @@ async fn test_query_lifecycle_query_status_cancelled() { "start_time": "2026-01-01T01:01:01.000000001Z", "end_time": "2026-01-01T01:01:01.000000001Z", "execution_time": "1", + "release_version": "test-version", "query_hash": "8436521302113462945", "query_hash_version": 1 } @@ -368,10 +966,13 @@ async fn test_query_lifecycle_query_status_cancelled() { .await .expect("Failed to create execution service"); - execution_svc - .create_session(TEST_SESSION_ID) - .await - .expect("Failed to create session"); + timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.create_session(TEST_SESSION_ID), + ) + .await + .expect("Create session timed out") + .expect("Failed to create session"); // See note about timeout above let query_handle = timeout( diff --git a/crates/executor/src/utils.rs b/crates/executor/src/utils.rs index 9f6a94d40..2b445e095 100644 --- a/crates/executor/src/utils.rs +++ b/crates/executor/src/utils.rs @@ -34,6 +34,7 @@ use strum::{Display, EnumString}; #[derive(Clone, Debug)] pub struct Config { pub embucket_version: String, + pub build_version: String, pub warehouse_type: String, pub sql_parser_dialect: Option, pub query_timeout_secs: u64, @@ -78,6 +79,7 @@ impl Default for Config { fn default() -> Self { Self { embucket_version: "0.1.0".to_string(), + build_version: "test-version".to_string(), warehouse_type: "DEFAULT".to_string(), sql_parser_dialect: None, query_timeout_secs: 1200, // 20 minutes diff --git a/crates/functions/src/aggregate/errors.rs b/crates/functions/src/aggregate/errors.rs index 98e6ed6ca..1f54f34cf 100644 --- a/crates/functions/src/aggregate/errors.rs +++ b/crates/functions/src/aggregate/errors.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use snafu::{Location, Snafu}; #[derive(Snafu)] diff --git a/crates/functions/src/arrow_error.rs b/crates/functions/src/arrow_error.rs index f419f941b..3661dbb2f 100644 --- a/crates/functions/src/arrow_error.rs +++ b/crates/functions/src/arrow_error.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use error_stack_trace; use snafu::{Location, Snafu}; diff --git a/crates/functions/src/conversion/errors.rs b/crates/functions/src/conversion/errors.rs index bfe86b0bc..ba49e2179 100644 --- a/crates/functions/src/conversion/errors.rs +++ b/crates/functions/src/conversion/errors.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use datafusion::arrow::datatypes::DataType; use datafusion_common::ScalarValue; use snafu::{Location, Snafu}; diff --git a/crates/functions/src/datetime/errors.rs b/crates/functions/src/datetime/errors.rs index c6308f24b..f5b3c14f7 100644 --- a/crates/functions/src/datetime/errors.rs +++ b/crates/functions/src/datetime/errors.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use chrono_tz::ParseError; use snafu::{Location, Snafu}; diff --git a/crates/functions/src/df_error.rs b/crates/functions/src/df_error.rs index 6b78daec8..1c24b514a 100644 --- a/crates/functions/src/df_error.rs +++ b/crates/functions/src/df_error.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use error_stack_trace; use snafu::{Location, Snafu}; diff --git a/crates/functions/src/encryption/errors.rs b/crates/functions/src/encryption/errors.rs index ce430313a..687f03f9d 100644 --- a/crates/functions/src/encryption/errors.rs +++ b/crates/functions/src/encryption/errors.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use datafusion::error::DataFusionError; use snafu::Location; use snafu::Snafu; diff --git a/crates/functions/src/errors.rs b/crates/functions/src/errors.rs index f67d648eb..d00d9a3a9 100644 --- a/crates/functions/src/errors.rs +++ b/crates/functions/src/errors.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use snafu::{Location, Snafu}; // In this file we create 2 types of errors: DataFusionInternalError and DataFusionExecutionError diff --git a/crates/functions/src/numeric/errors.rs b/crates/functions/src/numeric/errors.rs index 62d9122a7..aaa26bb71 100644 --- a/crates/functions/src/numeric/errors.rs +++ b/crates/functions/src/numeric/errors.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use snafu::{Location, Snafu}; #[derive(Snafu)] diff --git a/crates/functions/src/regexp/errors.rs b/crates/functions/src/regexp/errors.rs index 0ffc7837a..0b76d7a2c 100644 --- a/crates/functions/src/regexp/errors.rs +++ b/crates/functions/src/regexp/errors.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use datafusion::arrow::datatypes::DataType; use snafu::{Location, Snafu}; use std::num::TryFromIntError; diff --git a/crates/functions/src/semi-structured/errors.rs b/crates/functions/src/semi-structured/errors.rs index 73f3be4cb..8790dcf2a 100644 --- a/crates/functions/src/semi-structured/errors.rs +++ b/crates/functions/src/semi-structured/errors.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use snafu::location; use snafu::{Location, Snafu}; diff --git a/crates/functions/src/string-binary/errors.rs b/crates/functions/src/string-binary/errors.rs index e68cb4d12..69cef0ec2 100644 --- a/crates/functions/src/string-binary/errors.rs +++ b/crates/functions/src/string-binary/errors.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use snafu::location; use snafu::{Location, Snafu}; diff --git a/crates/functions/src/system/errors.rs b/crates/functions/src/system/errors.rs index 4422fe881..f13bd2a15 100644 --- a/crates/functions/src/system/errors.rs +++ b/crates/functions/src/system/errors.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use snafu::location; use snafu::{Location, Snafu}; diff --git a/crates/functions/src/table/errors.rs b/crates/functions/src/table/errors.rs index 5b3f7d560..8c3ff4486 100644 --- a/crates/functions/src/table/errors.rs +++ b/crates/functions/src/table/errors.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use error_stack_trace; use snafu::{Location, Snafu}; diff --git a/crates/queries/src/error.rs b/crates/queries/src/error.rs index bea8c8e41..3df9d18fb 100644 --- a/crates/queries/src/error.rs +++ b/crates/queries/src/error.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use error_stack_trace; use snafu::Location; use snafu::prelude::*; diff --git a/crates/state-store/src/error.rs b/crates/state-store/src/error.rs index 8abb7ac46..e51633878 100644 --- a/crates/state-store/src/error.rs +++ b/crates/state-store/src/error.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use aws_sdk_dynamodb::config::http::HttpResponse; use aws_sdk_dynamodb::error::SdkError; use aws_sdk_dynamodb::operation::delete_item::DeleteItemError; diff --git a/crates/state-store/src/models.rs b/crates/state-store/src/models.rs index 26dcd1f90..11032ec3c 100644 --- a/crates/state-store/src/models.rs +++ b/crates/state-store/src/models.rs @@ -296,7 +296,7 @@ impl Query { ) -> Self { Self { query_id, - query_text: query_str.to_string(), + query_text: query_str.chars().take(100_000).collect(), session_id: session_id.to_string(), request_id, start_time: Utc::now(), @@ -319,9 +319,7 @@ impl Query { self.schema_name = Some(schema); } - // Why? warning: this could be a `const fn` - #[allow(clippy::missing_const_for_fn)] - pub fn set_execution_status(&mut self, status: ExecutionStatus) { + pub const fn set_execution_status(&mut self, status: ExecutionStatus) { self.execution_status = Some(status); } @@ -341,6 +339,30 @@ impl Query { self.warehouse_type = Some(warehouse_type); } + pub fn set_release_version(&mut self, release_version: String) { + self.release_version = Some(release_version); + } + + pub const fn set_rows_produced(&mut self, rows_count: u64) { + self.rows_produced = Some(rows_count); + } + + pub const fn set_rows_inserted(&mut self, rows_count: u64) { + self.rows_inserted = Some(rows_count); + } + + pub const fn set_rows_deleted(&mut self, rows_count: u64) { + self.rows_deleted = Some(rows_count); + } + + pub const fn set_rows_updated(&mut self, rows_count: u64) { + self.rows_updated = Some(rows_count); + } + + pub fn set_query_type(&mut self, query_type: String) { + self.query_type = Some(query_type); + } + #[allow(clippy::as_conversions, clippy::cast_sign_loss)] pub fn set_end_time(&mut self) { let end_time = Utc::now();