From bd9bddc7b22374cae508c4b0300f136f32583d00 Mon Sep 17 00:00:00 2001 From: Yaroslav Litvinov Date: Wed, 7 Jan 2026 01:08:20 +0100 Subject: [PATCH 1/5] fix deadlock in queries registry --- crates/executor/src/running_queries.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/crates/executor/src/running_queries.rs b/crates/executor/src/running_queries.rs index cf2ff674..1edfa2b7 100644 --- a/crates/executor/src/running_queries.rs +++ b/crates/executor/src/running_queries.rs @@ -84,15 +84,13 @@ impl RunningQuery { #[tracing::instrument( name = "RunningQuery::wait_query_finished", level = "trace", - skip(self), err )] pub async fn wait_query_finished( - &self, + mut rx: watch::Receiver>, ) -> std::result::Result { // use loop here to bypass default query status we posted at init // it should not go to the actual loop and should resolve as soon as results are ready - let mut rx = self.rx.clone(); loop { rx.changed().await?; let status = *rx.borrow(); @@ -132,12 +130,17 @@ impl RunningQueriesRegistry { err )] pub async fn wait_query_finished(&self, query_id: QueryId) -> Result { - let running_query = self + // Should not keep reference to RunningQuery during `wait_query_finished` + // as it causes locking issues when accessing `queries` map during the run + // outside of this call. + let rx = { + let running_query = self .queries .get(&query_id) .context(ex_error::QueryIsntRunningSnafu { query_id })?; - running_query - .wait_query_finished() + running_query.rx.clone() + }; + RunningQuery::wait_query_finished(rx) .await .context(ex_error::ExecutionStatusRecvSnafu { query_id }) } From 877c59d48a8e874b66470bd6b361e43987f2634c Mon Sep 17 00:00:00 2001 From: Yaroslav Litvinov Date: Wed, 7 Jan 2026 03:06:09 +0100 Subject: [PATCH 2/5] query_type, release_version --- Cargo.lock | 10 +-- Cargo.toml | 3 - crates/embucket-lambda/src/config.rs | 1 + crates/embucketd/src/main.rs | 1 + crates/executor/src/query.rs | 60 +++++++++++++ crates/executor/src/query_types.rs | 89 +++++++++++++++++++ crates/executor/src/running_queries.rs | 42 ++++++++- crates/executor/src/service.rs | 9 ++ .../src/tests/statestore_queries_unittest.rs | 46 +++++++--- crates/executor/src/utils.rs | 2 + crates/state-store/src/models.rs | 14 ++- 11 files changed, 255 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fb068833..e31fcd41 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3007,7 +3007,7 @@ dependencies = [ [[package]] name = "datafusion_iceberg" version = "0.9.0" -source = "git+https://github.com/Embucket/iceberg-rust.git?rev=52aaae5d773ca4b0c619a2083c354ca23548151c#52aaae5d773ca4b0c619a2083c354ca23548151c" +source = "git+https://github.com/Embucket/iceberg-rust.git?rev=af0532380770f55f23c783e555b4674cec743174#af0532380770f55f23c783e555b4674cec743174" dependencies = [ "async-trait", "chrono", @@ -4466,7 +4466,7 @@ dependencies = [ [[package]] name = "iceberg-rest-catalog" version = "0.9.0" -source = "git+https://github.com/Embucket/iceberg-rust.git?rev=52aaae5d773ca4b0c619a2083c354ca23548151c#52aaae5d773ca4b0c619a2083c354ca23548151c" +source = "git+https://github.com/Embucket/iceberg-rust.git?rev=af0532380770f55f23c783e555b4674cec743174#af0532380770f55f23c783e555b4674cec743174" dependencies = [ "async-trait", "aws-sigv4 0.3.1", @@ -4491,7 +4491,7 @@ dependencies = [ [[package]] name = "iceberg-rust" version = "0.9.0" -source = "git+https://github.com/Embucket/iceberg-rust.git?rev=52aaae5d773ca4b0c619a2083c354ca23548151c#52aaae5d773ca4b0c619a2083c354ca23548151c" +source = "git+https://github.com/Embucket/iceberg-rust.git?rev=af0532380770f55f23c783e555b4674cec743174#af0532380770f55f23c783e555b4674cec743174" dependencies = [ "apache-avro", "arrow 56.2.0", @@ -4527,7 +4527,7 @@ dependencies = [ [[package]] name = "iceberg-rust-spec" version = "0.9.0" -source = "git+https://github.com/Embucket/iceberg-rust.git?rev=52aaae5d773ca4b0c619a2083c354ca23548151c#52aaae5d773ca4b0c619a2083c354ca23548151c" +source = "git+https://github.com/Embucket/iceberg-rust.git?rev=af0532380770f55f23c783e555b4674cec743174#af0532380770f55f23c783e555b4674cec743174" dependencies = [ "apache-avro", "arrow-schema 56.2.0", @@ -4552,7 +4552,7 @@ dependencies = [ [[package]] name = "iceberg-s3tables-catalog" version = "0.9.0" -source = "git+https://github.com/Embucket/iceberg-rust.git?rev=52aaae5d773ca4b0c619a2083c354ca23548151c#52aaae5d773ca4b0c619a2083c354ca23548151c" +source = "git+https://github.com/Embucket/iceberg-rust.git?rev=af0532380770f55f23c783e555b4674cec743174#af0532380770f55f23c783e555b4674cec743174" dependencies = [ "async-trait", "aws-config", diff --git a/Cargo.toml b/Cargo.toml index 8bb6b676..7116fa7c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -124,6 +124,3 @@ significant_drop_tightening = { level = "allow", priority = 2 } module_name_repetitions = { level = "allow", priority = 2 } option_if_let_else = { level = "allow", priority = 2 } needless_for_each = { level = "allow", priority = 2 } # #[derive(OpenApi)] - -[workspace.features] -state-store-query-test = ["executor/state-store-query-test"] diff --git a/crates/embucket-lambda/src/config.rs b/crates/embucket-lambda/src/config.rs index 0420a638..7cd6f197 100644 --- a/crates/embucket-lambda/src/config.rs +++ b/crates/embucket-lambda/src/config.rs @@ -70,6 +70,7 @@ impl EnvConfig { pub fn execution_config(&self) -> ExecutionConfig { ExecutionConfig { embucket_version: self.embucket_version.clone(), + build_version: BuildInfo::GIT_DESCRIBE.to_string(), warehouse_type: "LAMBDA_SERVERLESS".to_string(), sql_parser_dialect: self.sql_parser_dialect.clone(), query_timeout_secs: self.query_timeout_secs, diff --git a/crates/embucketd/src/main.rs b/crates/embucketd/src/main.rs index b7c5b6b1..0efb990d 100644 --- a/crates/embucketd/src/main.rs +++ b/crates/embucketd/src/main.rs @@ -123,6 +123,7 @@ async fn async_main( let execution_cfg = ExecutionConfig { embucket_version: BuildInfo::VERSION.to_string(), + build_version: BuildInfo::GIT_DESCRIBE.to_string(), warehouse_type: "EMBUCKET".to_string(), sql_parser_dialect: opts.sql_parser_dialect.clone(), query_timeout_secs: opts.query_timeout_secs, diff --git a/crates/executor/src/query.rs b/crates/executor/src/query.rs index 347ffae2..a848cb1a 100644 --- a/crates/executor/src/query.rs +++ b/crates/executor/src/query.rs @@ -18,6 +18,7 @@ use crate::datafusion::physical_plan::merge::{ use crate::datafusion::rewriters::session_context::SessionContextExprRewriter; use crate::error::{OperationOn, OperationType}; use crate::models::{QueryContext, QueryResult}; +use crate::query_types::{QueryStats, QueryType, DmlStType, DdlStType, MiscStType}; use catalog::table::{CachingTable, IcebergTableBuilder}; use catalog_metastore::{ AwsAccessKeyCredentials, AwsCredentials, FileVolume, Metastore, S3TablesVolume, S3Volume, @@ -244,6 +245,63 @@ impl UserQuery { Ok(()) } + #[allow(clippy::too_many_lines)] + #[instrument( + name = "UserQuery::update_query_type", + level = "debug", + skip(self), + fields( + query_type, + query_id = self.query_context.query_id.to_string(), + ), + )] + fn update_stats_query_type(&self, statement: &DFStatement) { + let save = |query_type: QueryType| { + self.running_queries.update_stats( + self.query_context.query_id, + &QueryStats::default().with_query_type(query_type), + ); + }; + + if let DFStatement::Statement(s) = statement { + match **s { + Statement::Update { .. } => save(QueryType::Dml(DmlStType::Update)), + Statement::Insert { .. } => save(QueryType::Dml(DmlStType::Insert)), + Statement::Truncate { .. } => save(QueryType::Dml(DmlStType::Truncate)), + Statement::Query( .. ) => save(QueryType::Dml(DmlStType::Select)), + Statement::Merge { .. } => save(QueryType::Dml(DmlStType::Merge)), + Statement::AlterSession { .. } => save(QueryType::Ddl(DdlStType::AlterSession)), + Statement::AlterTable { .. } => save(QueryType::Ddl(DdlStType::AlterTable)), + Statement::Use ( .. ) => save(QueryType::Misc(MiscStType::Use)), + Statement::Set ( .. ) => save(QueryType::Misc(MiscStType::Set)), + Statement::CreateTable { .. } => save(QueryType::Ddl(DdlStType::CreateTable)), + Statement::CreateView { .. } => save(QueryType::Ddl(DdlStType::CreateView)), + Statement::CreateDatabase { .. } => save(QueryType::Ddl(DdlStType::CreateDatabase)), + Statement::CreateExternalVolume { .. } => save(QueryType::Ddl(DdlStType::CreateVolume)), + Statement::CreateSchema { .. } => save(QueryType::Ddl(DdlStType::CreateSchema)), + Statement::CreateStage { .. } => save(QueryType::Ddl(DdlStType::CreateStage)), + Statement::CopyIntoSnowflake { .. } => save(QueryType::Ddl(DdlStType::CopyIntoSnowflake)), + Statement::StartTransaction { .. } => save(QueryType::Misc(MiscStType::Begin)), + Statement::Commit { .. } => save(QueryType::Misc(MiscStType::Commit)), + Statement::Rollback { .. } => save(QueryType::Misc(MiscStType::Rollback)), + Statement::ShowColumns { .. } => save(QueryType::Misc(MiscStType::ShowColumns)), + Statement::ShowFunctions { .. } => save(QueryType::Misc(MiscStType::ShowFunctions)), + Statement::ShowObjects { .. } => save(QueryType::Misc(MiscStType::ShowObjects)), + Statement::ShowVariables { .. } => save(QueryType::Misc(MiscStType::ShowVariables)), + Statement::ShowVariable { .. } => save(QueryType::Misc(MiscStType::ShowVariable)), + Statement::ShowDatabases { .. } => save(QueryType::Misc(MiscStType::ShowDatabases)), + Statement::ShowSchemas { .. } => save(QueryType::Misc(MiscStType::ShowSchemas)), + Statement::ShowTables { .. } => save(QueryType::Misc(MiscStType::ShowTables)), + Statement::ShowViews { .. } => save(QueryType::Misc(MiscStType::ShowViews)), + Statement::Drop { .. } => save(QueryType::Ddl(DdlStType::Drop)), + Statement::ExplainTable { .. } => save(QueryType::Misc(MiscStType::ExplainTable)), + _ => {} + } + } else if let DFStatement::CreateExternalTable( .. ) = statement { + save(QueryType::Ddl(DdlStType::CreateExternalTable)); + } + } + #[allow(clippy::too_many_lines)] #[instrument( name = "UserQuery::execute", @@ -263,6 +321,8 @@ impl UserQuery { // Record the result as part of the current span. tracing::Span::current().record("statement", format!("{statement:#?}")); + self.update_stats_query_type(&statement); + // TODO: Code should be organized in a better way // 1. Single place to parse SQL strings into AST // 2. Single place to update AST diff --git a/crates/executor/src/query_types.rs b/crates/executor/src/query_types.rs index 2c6d85d8..f4eb0802 100644 --- a/crates/executor/src/query_types.rs +++ b/crates/executor/src/query_types.rs @@ -1,4 +1,5 @@ pub type QueryId = uuid::Uuid; +use std::fmt::Display; cfg_if::cfg_if! { if #[cfg(feature = "state-store-query")] { @@ -8,9 +9,97 @@ cfg_if::cfg_if! { use std::fmt::Debug; #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] pub enum ExecutionStatus { + Running, Success, Fail, Incident, } } } + +#[derive(Debug, Clone, strum::Display)] +#[strum(serialize_all = "UPPERCASE")] +pub enum DmlStType { + Select, + Insert, + Update, + Delete, + Truncate, + Merge, +} + +#[derive(Debug, Clone, strum::Display)] +#[strum(serialize_all = "UPPERCASE")] +pub enum DdlStType { + CreateExternalTable, + CreateTable, + CreateView, + CreateDatabase, + CreateVolume, + CreateSchema, + CreateStage, + CopyIntoSnowflake, + DropTable, + AlterTable, + AlterSession, + Drop, +} + +#[derive(Debug, Clone, strum::Display)] +#[strum(serialize_all = "UPPERCASE")] +pub enum MiscStType { + Use, + Set, + Begin, + Commit, + Rollback, + ShowColumns, + ShowFunctions, + ShowVariables, + ShowObjects, + ShowVariable, + ShowDatabases, + ShowSchemas, + ShowTables, + ShowViews, + ExplainTable, +} + +#[derive(Debug, Clone)] +pub enum QueryType { + Dml(DmlStType), + Ddl(DdlStType), + Misc(MiscStType), +} + +impl Display for QueryType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Dml(dml) => write!(f, "{dml}"), + Self::Ddl(ddl) => write!(f, "{ddl}"), + Self::Misc(misc) => write!(f, "{misc}"), + } + } +} + +#[derive(Debug, Clone, Default)] +pub struct QueryStats { + pub query_type: Option, + pub rows_count: Option, +} + +impl QueryStats { + pub fn with_query_type(self, query_type: QueryType) -> Self { + Self { + query_type: Some(query_type), + ..self + } + } + + pub fn with_rows_count(self, rows_count: u64) -> Self { + Self { + rows_count: Some(rows_count), + ..self + } + } +} \ No newline at end of file diff --git a/crates/executor/src/running_queries.rs b/crates/executor/src/running_queries.rs index 1edfa2b7..f6754caa 100644 --- a/crates/executor/src/running_queries.rs +++ b/crates/executor/src/running_queries.rs @@ -1,6 +1,6 @@ use super::error::{self as ex_error, Result}; use super::models::QueryResult; -use crate::query_types::{ExecutionStatus, QueryId}; +use crate::query_types::{ExecutionStatus, QueryId, QueryStats}; use dashmap::DashMap; use snafu::{OptionExt, ResultExt}; use std::sync::Arc; @@ -22,6 +22,7 @@ pub struct RunningQuery { // user can be notified when query is finished tx: watch::Sender>, rx: watch::Receiver>, + pub query_stats: QueryStats, } #[derive(Debug, Clone)] @@ -41,6 +42,7 @@ impl RunningQuery { result_handle: None, tx, rx, + query_stats: QueryStats::default(), } } #[must_use] @@ -99,6 +101,20 @@ impl RunningQuery { } } } + + #[tracing::instrument( + name = "RunningQuery::update_query_stats", + level = "trace", + skip(self), + )] + pub fn update_query_stats(&mut self, stats: &QueryStats) { + if self.query_stats.query_type.is_none() { + self.query_stats.query_type = stats.query_type.clone(); + } + if self.query_stats.rows_count.is_none() { + self.query_stats.rows_count = stats.rows_count; + } + } } pub struct RunningQueriesRegistry { @@ -155,6 +171,8 @@ pub trait RunningQueries: Send + Sync { fn notify_query_finished(&self, query_id: QueryId, status: ExecutionStatus) -> Result<()>; fn locate_query_id(&self, running_query_id: RunningQueryId) -> Result; fn count(&self) -> usize; + fn cloned_stats(&self, query_id: QueryId) -> Option; + fn update_stats(&self, query_id: QueryId, stats: &QueryStats); } impl RunningQueries for RunningQueriesRegistry { @@ -228,4 +246,26 @@ impl RunningQueries for RunningQueriesRegistry { fn count(&self) -> usize { self.queries.len() } + + + fn cloned_stats(&self, query_id: QueryId) -> Option { + if let Some(running_query) = self.queries.get(&query_id) { + Some(running_query.query_stats.clone()) + } else { + None + } + } + + #[tracing::instrument( + name = "RunningQueriesRegistry::update_stats", + level = "trace", + skip(self), + fields(query_id), + ret + )] + fn update_stats(&self, query_id: QueryId, stats: &QueryStats) { + if let Some(mut running_query) = self.queries.get_mut(&query_id) { + running_query.update_query_stats(stats); + } + } } diff --git a/crates/executor/src/service.rs b/crates/executor/src/service.rs index 4d6cdfa8..42abbd2d 100644 --- a/crates/executor/src/service.rs +++ b/crates/executor/src/service.rs @@ -528,6 +528,7 @@ impl ExecutionService for CoreExecutionService { ); query.set_execution_status(ExecutionStatus::Running); query.set_warehouse_type(self.config.warehouse_type.clone()); + query.set_release_version(self.config.build_version.clone()); } } @@ -629,6 +630,14 @@ impl ExecutionService for CoreExecutionService { cfg_if::cfg_if! { if #[cfg(feature = "state-store-query")] { execution_result.assign_query_attributes(&mut query); + if let Some(stats) = queries_registry.cloned_stats(query_id) { + if let Some(query_type) = stats.query_type { + query.set_query_type(query_type.to_string()); + } + if let Some(rows_count) = stats.rows_count { + query.set_rows_produced(rows_count); + } + } // just log error and do not raise it from task if let Err(err) = state_store.update_query(&query).await { tracing::error!("Failed to update query {query_id}: {err:?}"); diff --git a/crates/executor/src/tests/statestore_queries_unittest.rs b/crates/executor/src/tests/statestore_queries_unittest.rs index ed9ce4e6..9b88ae36 100644 --- a/crates/executor/src/tests/statestore_queries_unittest.rs +++ b/crates/executor/src/tests/statestore_queries_unittest.rs @@ -59,6 +59,7 @@ async fn test_query_lifecycle_ok_query() { .returning(|_| Ok(SessionRecord::new(TEST_SESSION_ID))); state_store_mock .expect_put_query() + .times(1) .returning(|_| Ok(())) // check created query attributes only here (it is expected to be the same for any invocation) .withf(move |query: &Query| { @@ -72,6 +73,7 @@ async fn test_query_lifecycle_ok_query() { "warehouse_type": "DEFAULT", "execution_status": "Running", "start_time": "2026-01-01T01:01:01.000000001Z", + "release_version": "test-version", "query_hash": "12320374230549905548", "query_hash_version": 1 } @@ -93,11 +95,13 @@ async fn test_query_lifecycle_ok_query() { "session_id": "test_session_id", "database_name": "embucket", "schema_name": "public", + "query_type": "SELECT", "warehouse_type": "DEFAULT", "execution_status": "Success", "start_time": "2026-01-01T01:01:01.000000001Z", "end_time": "2026-01-01T01:01:01.000000001Z", "execution_time": "1", + "release_version": "test-version", "query_hash": "12320374230549905548", "query_hash_version": 1 } @@ -117,10 +121,13 @@ async fn test_query_lifecycle_ok_query() { .await .expect("Failed to create execution service"); - execution_svc - .create_session(TEST_SESSION_ID) - .await - .expect("Failed to create session"); + timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.create_session(TEST_SESSION_ID) + ) + .await + .expect("Create session timed out") + .expect("Failed to create session"); // See note about timeout above let _ = timeout( @@ -151,6 +158,7 @@ async fn test_query_lifecycle_query_status_incident_limit_exceeded() { .returning(|_| Ok(SessionRecord::new(TEST_SESSION_ID))); state_store_mock.expect_put_query() .returning(|_| Ok(()) ) + .times(1) // check created query attributes only here (it is expected to be the same for any invocation) .withf(move |query: &Query| { insta_settings("incident_query_put").bind(|| { @@ -167,6 +175,7 @@ async fn test_query_lifecycle_query_status_incident_limit_exceeded() { "start_time": "2026-01-01T01:01:01.000000001Z", "end_time": "2026-01-01T01:01:01.000000001Z", "execution_time": "1", + "release_version": "test-version", "query_hash": "8436521302113462945", "query_hash_version": 1 } @@ -213,6 +222,7 @@ async fn test_query_lifecycle_query_status_fail() { .returning(|_| Ok(SessionRecord::new(TEST_SESSION_ID))); state_store_mock .expect_put_query() + .times(1) .returning(|_| Ok(())) .withf(|query: &Query| { insta_settings("fail_query_put").bind(|| { @@ -225,6 +235,7 @@ async fn test_query_lifecycle_query_status_fail() { "warehouse_type": "DEFAULT", "execution_status": "Running", "start_time": "2026-01-01T01:01:01.000000001Z", + "release_version": "test-version", "query_hash": "17999132521915915058", "query_hash_version": 1 } @@ -245,6 +256,7 @@ async fn test_query_lifecycle_query_status_fail() { "session_id": "test_session_id", "database_name": "embucket", "schema_name": "public", + "query_type": "SELECT", "warehouse_type": "DEFAULT", "execution_status": "Fail", "error_code": "002003", @@ -252,6 +264,7 @@ async fn test_query_lifecycle_query_status_fail() { "start_time": "2026-01-01T01:01:01.000000001Z", "end_time": "2026-01-01T01:01:01.000000001Z", "execution_time": "1", + "release_version": "test-version", "query_hash": "17999132521915915058", "query_hash_version": 1 } @@ -271,10 +284,13 @@ async fn test_query_lifecycle_query_status_fail() { .await .expect("Failed to create execution service"); - execution_svc - .create_session(TEST_SESSION_ID) - .await - .expect("Failed to create session"); + timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.create_session(TEST_SESSION_ID) + ) + .await + .expect("Create session timed out") + .expect("Failed to create session"); // See note about timeout above let _ = timeout( @@ -302,6 +318,7 @@ async fn test_query_lifecycle_query_status_cancelled() { .returning(|_| Ok(SessionRecord::new(TEST_SESSION_ID))); state_store_mock .expect_put_query() + .times(1) .returning(|_| Ok(())) .withf(|query: &Query| { insta_settings("cancelled_query_put").bind(|| { @@ -314,6 +331,7 @@ async fn test_query_lifecycle_query_status_cancelled() { "warehouse_type": "DEFAULT", "execution_status": "Running", "start_time": "2026-01-01T01:01:01.000000001Z", + "release_version": "test-version", "query_hash": "8436521302113462945", "query_hash_version": 1 } @@ -341,6 +359,7 @@ async fn test_query_lifecycle_query_status_cancelled() { "start_time": "2026-01-01T01:01:01.000000001Z", "end_time": "2026-01-01T01:01:01.000000001Z", "execution_time": "1", + "release_version": "test-version", "query_hash": "8436521302113462945", "query_hash_version": 1 } @@ -360,10 +379,13 @@ async fn test_query_lifecycle_query_status_cancelled() { .await .expect("Failed to create execution service"); - execution_svc - .create_session(TEST_SESSION_ID) - .await - .expect("Failed to create session"); + timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.create_session(TEST_SESSION_ID) + ) + .await + .expect("Create session timed out") + .expect("Failed to create session"); // See note about timeout above let query_handle = timeout( diff --git a/crates/executor/src/utils.rs b/crates/executor/src/utils.rs index 9f6a94d4..2b445e09 100644 --- a/crates/executor/src/utils.rs +++ b/crates/executor/src/utils.rs @@ -34,6 +34,7 @@ use strum::{Display, EnumString}; #[derive(Clone, Debug)] pub struct Config { pub embucket_version: String, + pub build_version: String, pub warehouse_type: String, pub sql_parser_dialect: Option, pub query_timeout_secs: u64, @@ -78,6 +79,7 @@ impl Default for Config { fn default() -> Self { Self { embucket_version: "0.1.0".to_string(), + build_version: "test-version".to_string(), warehouse_type: "DEFAULT".to_string(), sql_parser_dialect: None, query_timeout_secs: 1200, // 20 minutes diff --git a/crates/state-store/src/models.rs b/crates/state-store/src/models.rs index fc339140..9e293a4c 100644 --- a/crates/state-store/src/models.rs +++ b/crates/state-store/src/models.rs @@ -294,7 +294,7 @@ impl Query { ) -> Self { Self { query_id, - query_text: query_str.to_string(), + query_text: query_str.chars().take(100_000).collect(), session_id: session_id.to_string(), request_id, start_time: Utc::now(), @@ -335,6 +335,18 @@ impl Query { self.warehouse_type = Some(warehouse_type); } + pub fn set_release_version(&mut self, release_version: String) { + self.release_version = Some(release_version); + } + + pub fn set_rows_produced(&mut self, rows_count: u64) { + self.rows_produced = Some(rows_count); + } + + pub fn set_query_type(&mut self, query_type: String) { + self.query_type = Some(query_type); + } + #[allow(clippy::as_conversions, clippy::cast_sign_loss)] pub fn set_end_time(&mut self) { let end_time = Utc::now(); From 6aa04b0161f0dd85418aac44cf27fa607cf671ef Mon Sep 17 00:00:00 2001 From: Yaroslav Litvinov Date: Wed, 7 Jan 2026 19:19:42 +0100 Subject: [PATCH 3/5] populate query_type, rows_count --- crates/executor/src/query.rs | 35 +- crates/executor/src/query_task_result.rs | 58 ++ crates/executor/src/query_types.rs | 15 +- crates/executor/src/running_queries.rs | 22 +- crates/executor/src/service.rs | 4 +- .../src/tests/statestore_queries_unittest.rs | 603 +++++++++++++++++- crates/state-store/src/models.rs | 12 + 7 files changed, 701 insertions(+), 48 deletions(-) diff --git a/crates/executor/src/query.rs b/crates/executor/src/query.rs index a848cb1a..0e898826 100644 --- a/crates/executor/src/query.rs +++ b/crates/executor/src/query.rs @@ -18,7 +18,7 @@ use crate::datafusion::physical_plan::merge::{ use crate::datafusion::rewriters::session_context::SessionContextExprRewriter; use crate::error::{OperationOn, OperationType}; use crate::models::{QueryContext, QueryResult}; -use crate::query_types::{QueryStats, QueryType, DmlStType, DdlStType, MiscStType}; +use crate::query_types::{DdlStType, DmlStType, MiscStType, QueryStats, QueryType}; use catalog::table::{CachingTable, IcebergTableBuilder}; use catalog_metastore::{ AwsAccessKeyCredentials, AwsCredentials, FileVolume, Metastore, S3TablesVolume, S3Volume, @@ -265,22 +265,41 @@ impl UserQuery { if let DFStatement::Statement(s) = statement { match **s { + // match DML statements: + Statement::Delete { .. } => save(QueryType::Dml(DmlStType::Delete)), Statement::Update { .. } => save(QueryType::Dml(DmlStType::Update)), Statement::Insert { .. } => save(QueryType::Dml(DmlStType::Insert)), Statement::Truncate { .. } => save(QueryType::Dml(DmlStType::Truncate)), - Statement::Query( .. ) => save(QueryType::Dml(DmlStType::Select)), + Statement::Query(..) => save(QueryType::Dml(DmlStType::Select)), Statement::Merge { .. } => save(QueryType::Dml(DmlStType::Merge)), + // match DDL statements: Statement::AlterSession { .. } => save(QueryType::Ddl(DdlStType::AlterSession)), Statement::AlterTable { .. } => save(QueryType::Ddl(DdlStType::AlterTable)), - Statement::Use ( .. ) => save(QueryType::Misc(MiscStType::Use)), - Statement::Set ( .. ) => save(QueryType::Misc(MiscStType::Set)), Statement::CreateTable { .. } => save(QueryType::Ddl(DdlStType::CreateTable)), Statement::CreateView { .. } => save(QueryType::Ddl(DdlStType::CreateView)), Statement::CreateDatabase { .. } => save(QueryType::Ddl(DdlStType::CreateDatabase)), - Statement::CreateExternalVolume { .. } => save(QueryType::Ddl(DdlStType::CreateVolume)), + Statement::CreateExternalVolume { .. } => { + save(QueryType::Ddl(DdlStType::CreateVolume)) + } Statement::CreateSchema { .. } => save(QueryType::Ddl(DdlStType::CreateSchema)), Statement::CreateStage { .. } => save(QueryType::Ddl(DdlStType::CreateStage)), - Statement::CopyIntoSnowflake { .. } => save(QueryType::Ddl(DdlStType::CopyIntoSnowflake)), + Statement::CopyIntoSnowflake { .. } => { + save(QueryType::Ddl(DdlStType::CopyIntoSnowflake)) + } + Statement::Drop { object_type, .. } => match object_type { + ObjectType::Table => save(QueryType::Ddl(DdlStType::DropTable)), + ObjectType::View => save(QueryType::Ddl(DdlStType::DropView)), + ObjectType::MaterializedView => { + save(QueryType::Ddl(DdlStType::DropMaterializedView)) + } + ObjectType::Schema => save(QueryType::Ddl(DdlStType::DropSchema)), + ObjectType::Database => save(QueryType::Ddl(DdlStType::DropDatabase)), + ObjectType::Stage => save(QueryType::Ddl(DdlStType::DropStage)), + _ => {} + }, + // match other statements: + Statement::Use(..) => save(QueryType::Misc(MiscStType::Use)), + Statement::Set(..) => save(QueryType::Misc(MiscStType::Set)), Statement::StartTransaction { .. } => save(QueryType::Misc(MiscStType::Begin)), Statement::Commit { .. } => save(QueryType::Misc(MiscStType::Commit)), Statement::Rollback { .. } => save(QueryType::Misc(MiscStType::Rollback)), @@ -293,11 +312,11 @@ impl UserQuery { Statement::ShowSchemas { .. } => save(QueryType::Misc(MiscStType::ShowSchemas)), Statement::ShowTables { .. } => save(QueryType::Misc(MiscStType::ShowTables)), Statement::ShowViews { .. } => save(QueryType::Misc(MiscStType::ShowViews)), - Statement::Drop { .. } => save(QueryType::Ddl(DdlStType::Drop)), + Statement::ExplainTable { .. } => save(QueryType::Misc(MiscStType::ExplainTable)), _ => {} } - } else if let DFStatement::CreateExternalTable( .. ) = statement { + } else if let DFStatement::CreateExternalTable(..) = statement { save(QueryType::Ddl(DdlStType::CreateExternalTable)); } } diff --git a/crates/executor/src/query_task_result.rs b/crates/executor/src/query_task_result.rs index a4d5de15..0d9bd640 100644 --- a/crates/executor/src/query_task_result.rs +++ b/crates/executor/src/query_task_result.rs @@ -3,6 +3,12 @@ use super::error::Result; use super::error_code::ErrorCode; use super::models::QueryResult; use super::query_types::ExecutionStatus; +cfg_if::cfg_if! { + if #[cfg(feature = "state-store-query")] { + use super::query_types::{DmlStType, QueryType}; + use datafusion::arrow::array::{Int64Array, UInt64Array}; + } +} use super::snowflake_error::SnowflakeError; use snafu::ResultExt; use tokio::task::JoinError; @@ -77,6 +83,37 @@ impl ExecutionTaskResult { } } + #[cfg(feature = "state-store-query")] + pub fn assign_rows_counts_attributes( + &self, + query: &mut state_store::Query, + query_type: QueryType, + ) { + if let Ok(result) = &self.result + && let QueryType::Dml(query_type) = query_type + { + if let DmlStType::Select = query_type { + let rows_count: u64 = result.records.iter().map(|r| r.num_rows() as u64).sum(); + query.set_rows_produced(rows_count); + } else if let Some(rows_count) = value_by_row_column(&result, 0, 0) { + match query_type { + DmlStType::Insert => query.set_rows_inserted(rows_count as u64), + DmlStType::Update => query.set_rows_updated(rows_count as u64), + DmlStType::Delete => query.set_rows_deleted(rows_count as u64), + DmlStType::Truncate => query.set_rows_deleted(rows_count as u64), + DmlStType::Merge => { + // merge has 2 columns, currently map values to insert/select rows counts + query.set_rows_inserted(rows_count as u64); + if let Some(rows_count) = value_by_row_column(&result, 0, 1) { + query.set_rows_produced(rows_count as u64); + } + } + _ => {} + } + } + } + } + #[cfg(feature = "state-store-query")] pub fn assign_query_attributes(&self, query: &mut state_store::Query) { query.set_execution_status(self.execution_status); @@ -89,3 +126,24 @@ impl ExecutionTaskResult { query.set_end_time(); } } + +#[cfg(feature = "state-store-query")] +fn value_by_row_column(result: &QueryResult, row_idx: usize, col_idx: usize) -> Option { + result.records[0].columns().get(col_idx).and_then(|col| { + if let Some(cols) = col.as_any().downcast_ref::() { + if row_idx < cols.len() { + Some(cols.value(row_idx) as u64) + } else { + None + } + } else if let Some(cols) = col.as_any().downcast_ref::() { + if row_idx < cols.len() { + Some(cols.value(row_idx)) + } else { + None + } + } else { + None + } + }) +} diff --git a/crates/executor/src/query_types.rs b/crates/executor/src/query_types.rs index f4eb0802..0285cd63 100644 --- a/crates/executor/src/query_types.rs +++ b/crates/executor/src/query_types.rs @@ -40,6 +40,11 @@ pub enum DdlStType { CreateStage, CopyIntoSnowflake, DropTable, + DropView, + DropMaterializedView, + DropSchema, + DropDatabase, + DropStage, AlterTable, AlterSession, Drop, @@ -85,7 +90,6 @@ impl Display for QueryType { #[derive(Debug, Clone, Default)] pub struct QueryStats { pub query_type: Option, - pub rows_count: Option, } impl QueryStats { @@ -95,11 +99,4 @@ impl QueryStats { ..self } } - - pub fn with_rows_count(self, rows_count: u64) -> Self { - Self { - rows_count: Some(rows_count), - ..self - } - } -} \ No newline at end of file +} diff --git a/crates/executor/src/running_queries.rs b/crates/executor/src/running_queries.rs index f6754caa..aed90b55 100644 --- a/crates/executor/src/running_queries.rs +++ b/crates/executor/src/running_queries.rs @@ -83,11 +83,7 @@ impl RunningQuery { self.tx.send(Some(status)) } - #[tracing::instrument( - name = "RunningQuery::wait_query_finished", - level = "trace", - err - )] + #[tracing::instrument(name = "RunningQuery::wait_query_finished", level = "trace", err)] pub async fn wait_query_finished( mut rx: watch::Receiver>, ) -> std::result::Result { @@ -102,18 +98,11 @@ impl RunningQuery { } } - #[tracing::instrument( - name = "RunningQuery::update_query_stats", - level = "trace", - skip(self), - )] + #[tracing::instrument(name = "RunningQuery::update_query_stats", level = "trace", skip(self))] pub fn update_query_stats(&mut self, stats: &QueryStats) { if self.query_stats.query_type.is_none() { self.query_stats.query_type = stats.query_type.clone(); } - if self.query_stats.rows_count.is_none() { - self.query_stats.rows_count = stats.rows_count; - } } } @@ -151,9 +140,9 @@ impl RunningQueriesRegistry { // outside of this call. let rx = { let running_query = self - .queries - .get(&query_id) - .context(ex_error::QueryIsntRunningSnafu { query_id })?; + .queries + .get(&query_id) + .context(ex_error::QueryIsntRunningSnafu { query_id })?; running_query.rx.clone() }; RunningQuery::wait_query_finished(rx) @@ -247,7 +236,6 @@ impl RunningQueries for RunningQueriesRegistry { self.queries.len() } - fn cloned_stats(&self, query_id: QueryId) -> Option { if let Some(running_query) = self.queries.get(&query_id) { Some(running_query.query_stats.clone()) diff --git a/crates/executor/src/service.rs b/crates/executor/src/service.rs index 42abbd2d..b6ad5e23 100644 --- a/crates/executor/src/service.rs +++ b/crates/executor/src/service.rs @@ -633,9 +633,7 @@ impl ExecutionService for CoreExecutionService { if let Some(stats) = queries_registry.cloned_stats(query_id) { if let Some(query_type) = stats.query_type { query.set_query_type(query_type.to_string()); - } - if let Some(rows_count) = stats.rows_count { - query.set_rows_produced(rows_count); + execution_result.assign_rows_counts_attributes(&mut query, query_type); } } // just log error and do not raise it from task diff --git a/crates/executor/src/tests/statestore_queries_unittest.rs b/crates/executor/src/tests/statestore_queries_unittest.rs index 9b88ae36..8d864d88 100644 --- a/crates/executor/src/tests/statestore_queries_unittest.rs +++ b/crates/executor/src/tests/statestore_queries_unittest.rs @@ -2,6 +2,7 @@ use crate::models::QueryContext; use crate::service::{CoreExecutionService, ExecutionService}; use crate::utils::Config; use catalog_metastore::InMemoryMetastore; +use catalog_metastore::metastore_bootstrap_config::MetastoreBootstrapConfig; use insta::assert_json_snapshot; use state_store::{MockStateStore, Query, SessionRecord, StateStore}; use std::sync::Arc; @@ -14,7 +15,6 @@ use uuid::Uuid; const TEST_SESSION_ID: &str = "test_session_id"; const TEST_DATABASE: &str = "test_database"; const TEST_SCHEMA: &str = "test_schema"; -const OK_QUERY_TEXT: &str = "SELECT 1 AS a, 2.0 AS b, '3' AS c WHERE False"; const MOCK_RELATED_TIMEOUT_DURATION: Duration = Duration::from_millis(100); @@ -28,7 +28,6 @@ fn insta_settings(name: &str) -> insta::Settings { let mut settings = insta::Settings::new(); settings.set_sort_maps(true); settings.set_description(name); - settings.set_info(&format!("{name}")); settings.add_redaction(".execution_time", "1"); settings.add_filter( r"[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}", @@ -68,13 +67,13 @@ async fn test_query_lifecycle_ok_query() { { "query_id": "00000000-0000-0000-0000-000000000000", "request_id": "00000000-0000-0000-0000-000000000000", - "query_text": "SELECT 1 AS a, 2.0 AS b, '3' AS c WHERE False", + "query_text": "SELECT 1 AS a, 2.0 AS b, '3' AS 'c'", "session_id": "test_session_id", "warehouse_type": "DEFAULT", "execution_status": "Running", "start_time": "2026-01-01T01:01:01.000000001Z", "release_version": "test-version", - "query_hash": "12320374230549905548", + "query_hash": "1717924485430328356", "query_hash_version": 1 } "#); @@ -91,7 +90,7 @@ async fn test_query_lifecycle_ok_query() { { "query_id": "00000000-0000-0000-0000-000000000000", "request_id": "00000000-0000-0000-0000-000000000000", - "query_text": "SELECT 1 AS a, 2.0 AS b, '3' AS c WHERE False", + "query_text": "SELECT 1 AS a, 2.0 AS b, '3' AS 'c'", "session_id": "test_session_id", "database_name": "embucket", "schema_name": "public", @@ -100,9 +99,10 @@ async fn test_query_lifecycle_ok_query() { "execution_status": "Success", "start_time": "2026-01-01T01:01:01.000000001Z", "end_time": "2026-01-01T01:01:01.000000001Z", + "rows_produced": 1, "execution_time": "1", "release_version": "test-version", - "query_hash": "12320374230549905548", + "query_hash": "1717924485430328356", "query_hash_version": 1 } "#); @@ -123,7 +123,7 @@ async fn test_query_lifecycle_ok_query() { timeout( MOCK_RELATED_TIMEOUT_DURATION, - execution_svc.create_session(TEST_SESSION_ID) + execution_svc.create_session(TEST_SESSION_ID), ) .await .expect("Create session timed out") @@ -132,11 +132,592 @@ async fn test_query_lifecycle_ok_query() { // See note about timeout above let _ = timeout( MOCK_RELATED_TIMEOUT_DURATION, - execution_svc.query(TEST_SESSION_ID, OK_QUERY_TEXT, query_context), + execution_svc.query( + TEST_SESSION_ID, + "SELECT 1 AS a, 2.0 AS b, '3' AS 'c'", + query_context, + ), + ) + .await + .expect("Query timed out") + .expect("Query execution failed"); +} + +#[allow(clippy::expect_used)] +#[tokio::test] +async fn test_query_lifecycle_ok_insert() { + let query_context = QueryContext::default().with_request_id(Uuid::default()); + + let mut state_store_mock = MockStateStore::new(); + state_store_mock + .expect_put_new_session() + .returning(|_| Ok(())); + state_store_mock + .expect_get_session() + .returning(|_| Ok(SessionRecord::new(TEST_SESSION_ID))); + state_store_mock + .expect_put_query() + .times(2) + .returning(|_| Ok(())); + + // bypass 1st update + state_store_mock + .expect_update_query() + .times(1) + .returning(|_| Ok(())); + // verify 2nd update + state_store_mock + .expect_update_query() + .times(1) + .returning(|_| Ok(())) + .withf(move |query: &Query| { + insta_settings("ok_insert_update").bind(|| { + assert_json_snapshot!(query, @r#" + { + "query_id": "00000000-0000-0000-0000-000000000000", + "request_id": "00000000-0000-0000-0000-000000000000", + "query_text": "INSERT INTO embucket.public.table VALUES (1)", + "session_id": "test_session_id", + "database_name": "embucket", + "schema_name": "public", + "query_type": "INSERT", + "warehouse_type": "DEFAULT", + "execution_status": "Success", + "start_time": "2026-01-01T01:01:01.000000001Z", + "end_time": "2026-01-01T01:01:01.000000001Z", + "rows_inserted": 1, + "execution_time": "1", + "release_version": "test-version", + "query_hash": "17856184221539895914", + "query_hash_version": 1 + } + "#); + }); + true + }); + + let state_store: Arc = Arc::new(state_store_mock); + + let metastore = Arc::new(InMemoryMetastore::new()); + MetastoreBootstrapConfig::bootstrap() + .apply(metastore.clone()) + .await + .expect("Failed to bootstrap metastore"); + + let execution_svc = CoreExecutionService::new_test_executor( + metastore, + state_store, + Arc::new(Config::default()), + ) + .await + .expect("Failed to create execution service"); + + timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.create_session(TEST_SESSION_ID), + ) + .await + .expect("Create session timed out") + .expect("Failed to create session"); + + // prepare table + let _ = timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.query( + TEST_SESSION_ID, + "create table if not exists embucket.public.table (id int)", + query_context.clone(), + ), + ) + .await + .expect("Query timed out") + .expect("Query execution failed"); + + // insert + timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.query( + TEST_SESSION_ID, + "INSERT INTO embucket.public.table VALUES (1)", + query_context, + ), + ) + .await + .expect("Query timed out") + .expect("Query execution failed"); +} + +#[allow(clippy::expect_used)] +#[tokio::test] +async fn test_query_lifecycle_ok_update() { + let query_context = QueryContext::default().with_request_id(Uuid::default()); + + let mut state_store_mock = MockStateStore::new(); + state_store_mock + .expect_put_new_session() + .returning(|_| Ok(())); + state_store_mock + .expect_get_session() + .returning(|_| Ok(SessionRecord::new(TEST_SESSION_ID))); + state_store_mock + .expect_put_query() + .times(2) + .returning(|_| Ok(())); + + // bypass 1st update + state_store_mock + .expect_update_query() + .times(1) + .returning(|_| Ok(())); + // verify 2nd update + state_store_mock + .expect_update_query() + .times(1) + .returning(|_| Ok(())) + .withf(move |query: &Query| { + insta_settings("ok_update_update").bind(|| { + assert_json_snapshot!(query, @r#" + { + "query_id": "00000000-0000-0000-0000-000000000000", + "request_id": "00000000-0000-0000-0000-000000000000", + "query_text": "UPDATE embucket.public.table SET name = 'John'", + "session_id": "test_session_id", + "database_name": "embucket", + "schema_name": "public", + "query_type": "UPDATE", + "warehouse_type": "DEFAULT", + "execution_status": "Success", + "start_time": "2026-01-01T01:01:01.000000001Z", + "end_time": "2026-01-01T01:01:01.000000001Z", + "execution_time": "1", + "release_version": "test-version", + "query_hash": "16763742305627145642", + "query_hash_version": 1 + } + "#); + }); + true + }); + + let state_store: Arc = Arc::new(state_store_mock); + + let metastore = Arc::new(InMemoryMetastore::new()); + MetastoreBootstrapConfig::bootstrap() + .apply(metastore.clone()) + .await + .expect("Failed to bootstrap metastore"); + + let execution_svc = CoreExecutionService::new_test_executor( + metastore, + state_store, + Arc::new(Config::default()), + ) + .await + .expect("Failed to create execution service"); + + timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.create_session(TEST_SESSION_ID), + ) + .await + .expect("Create session timed out") + .expect("Failed to create session"); + + // prepare table + let _ = timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.query( + TEST_SESSION_ID, + " + CREATE TABLE embucket.public.table AS SELECT + id, + name, + RANDOM() AS random_value, + CURRENT_TIMESTAMP AS current_time + FROM (VALUES + (1, 'Alice'), + (2, 'Bob'), + (3, 'Charlie'), + (4, 'David') + ) AS t(id, name);", + query_context.clone(), + ), + ) + .await + .expect("Query timed out") + .expect("Query execution failed"); + + // update + timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.query( + TEST_SESSION_ID, + "UPDATE embucket.public.table SET name = 'John'", + query_context, + ), + ) + .await + .expect("Query timed out") + .expect("Query execution failed"); +} + +#[allow(clippy::expect_used)] +#[tokio::test] +async fn test_query_lifecycle_delete_failed() { + let query_context = QueryContext::default().with_request_id(Uuid::default()); + + let mut state_store_mock = MockStateStore::new(); + state_store_mock + .expect_put_new_session() + .returning(|_| Ok(())); + state_store_mock + .expect_get_session() + .returning(|_| Ok(SessionRecord::new(TEST_SESSION_ID))); + state_store_mock + .expect_put_query() + .times(2) + .returning(|_| Ok(())); + + // bypass 1st update + state_store_mock + .expect_update_query() + .times(1) + .returning(|_| Ok(())); + // verify 2nd update + state_store_mock + .expect_update_query() + .times(1) + .returning(|_| Ok(())) + .withf(move |query: &Query| { + insta_settings("ok_truncate_update").bind(|| { + assert_json_snapshot!(query, @r#" + { + "query_id": "00000000-0000-0000-0000-000000000000", + "request_id": "00000000-0000-0000-0000-000000000000", + "query_text": "DELETE FROM embucket.public.table", + "session_id": "test_session_id", + "database_name": "embucket", + "schema_name": "public", + "query_type": "DELETE", + "warehouse_type": "DEFAULT", + "execution_status": "Fail", + "error_code": "010001", + "error_message": "00000000-0000-0000-0000-000000000000: Query execution error: DataFusion error: This feature is not implemented: Unsupported logical plan: Dml(Delete)", + "start_time": "2026-01-01T01:01:01.000000001Z", + "end_time": "2026-01-01T01:01:01.000000001Z", + "execution_time": "1", + "release_version": "test-version", + "query_hash": "13652442282618196356", + "query_hash_version": 1 + } + "#); + }); + true + }); + + let state_store: Arc = Arc::new(state_store_mock); + + let metastore = Arc::new(InMemoryMetastore::new()); + MetastoreBootstrapConfig::bootstrap() + .apply(metastore.clone()) + .await + .expect("Failed to bootstrap metastore"); + + let execution_svc = CoreExecutionService::new_test_executor( + metastore, + state_store, + Arc::new(Config::default()), + ) + .await + .expect("Failed to create execution service"); + + timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.create_session(TEST_SESSION_ID), + ) + .await + .expect("Create session timed out") + .expect("Failed to create session"); + + // prepare table + let _ = timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.query( + TEST_SESSION_ID, + " + CREATE TABLE embucket.public.table AS SELECT + id, + name, + RANDOM() AS random_value, + CURRENT_TIMESTAMP AS current_time + FROM (VALUES + (1, 'Alice'), + (2, 'Bob'), + (3, 'Charlie'), + (4, 'David') + ) AS t(id, name);", + query_context.clone(), + ), + ) + .await + .expect("Query timed out") + .expect("Query execution failed"); + + // update + let _ = timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.query( + TEST_SESSION_ID, + "DELETE FROM embucket.public.table", + query_context, + ), + ) + .await + .expect("Query timed out") + .expect_err("Query expected to fail"); +} + +#[allow(clippy::expect_used)] +#[tokio::test] +async fn test_query_lifecycle_ok_truncate() { + let query_context = QueryContext::default().with_request_id(Uuid::default()); + + let mut state_store_mock = MockStateStore::new(); + state_store_mock + .expect_put_new_session() + .returning(|_| Ok(())); + state_store_mock + .expect_get_session() + .returning(|_| Ok(SessionRecord::new(TEST_SESSION_ID))); + state_store_mock + .expect_put_query() + .times(2) + .returning(|_| Ok(())); + + // bypass 1st update + state_store_mock + .expect_update_query() + .times(1) + .returning(|_| Ok(())); + // verify 2nd update + state_store_mock + .expect_update_query() + .times(1) + .returning(|_| Ok(())) + .withf(move |query: &Query| { + insta_settings("ok_truncate_update").bind(|| { + assert_json_snapshot!(query, @r#" + { + "query_id": "00000000-0000-0000-0000-000000000000", + "request_id": "00000000-0000-0000-0000-000000000000", + "query_text": "TRUNCATE TABLE embucket.public.table", + "session_id": "test_session_id", + "database_name": "embucket", + "schema_name": "public", + "query_type": "TRUNCATE", + "warehouse_type": "DEFAULT", + "execution_status": "Success", + "start_time": "2026-01-01T01:01:01.000000001Z", + "end_time": "2026-01-01T01:01:01.000000001Z", + "rows_deleted": 0, + "execution_time": "1", + "release_version": "test-version", + "query_hash": "16187825059241168947", + "query_hash_version": 1 + } + "#); + }); + true + }); + + let state_store: Arc = Arc::new(state_store_mock); + + let metastore = Arc::new(InMemoryMetastore::new()); + MetastoreBootstrapConfig::bootstrap() + .apply(metastore.clone()) + .await + .expect("Failed to bootstrap metastore"); + + let execution_svc = CoreExecutionService::new_test_executor( + metastore, + state_store, + Arc::new(Config::default()), + ) + .await + .expect("Failed to create execution service"); + + timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.create_session(TEST_SESSION_ID), + ) + .await + .expect("Create session timed out") + .expect("Failed to create session"); + + // prepare table + let _ = timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.query( + TEST_SESSION_ID, + " + CREATE TABLE embucket.public.table AS SELECT + id, + name, + RANDOM() AS random_value, + CURRENT_TIMESTAMP AS current_time + FROM (VALUES + (1, 'Alice'), + (2, 'Bob'), + (3, 'Charlie'), + (4, 'David') + ) AS t(id, name);", + query_context.clone(), + ), + ) + .await + .expect("Query timed out") + .expect("Query execution failed"); + + // update + let _ = timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.query( + TEST_SESSION_ID, + "TRUNCATE TABLE embucket.public.table", + query_context, + ), + ) + .await + .expect("Query timed out") + .expect("Query execution failed"); +} + +#[allow(clippy::expect_used)] +#[tokio::test] +async fn test_query_lifecycle_ok_merge() { + let query_context = QueryContext::default().with_request_id(Uuid::default()); + + let mut state_store_mock = MockStateStore::new(); + state_store_mock + .expect_put_new_session() + .returning(|_| Ok(())); + state_store_mock + .expect_get_session() + .returning(|_| Ok(SessionRecord::new(TEST_SESSION_ID))); + state_store_mock + .expect_put_query() + .times(3) + .returning(|_| Ok(())); + + // bypass first two updates + state_store_mock + .expect_update_query() + .times(2) + .returning(|_| Ok(())); + // verify 3rd update + state_store_mock + .expect_update_query() + .times(1) + .returning(|_| Ok(())) + .withf(move |query: &Query| { + insta_settings("ok_merge_update").bind(|| { + assert_json_snapshot!(query, @r#" + { + "query_id": "00000000-0000-0000-0000-000000000000", + "request_id": "00000000-0000-0000-0000-000000000000", + "query_text": "MERGE INTO t1 USING (SELECT * FROM t2) AS t2 ON t1.a = t2.a WHEN MATCHED THEN UPDATE SET t1.c = t2.c WHEN NOT MATCHED THEN INSERT (a,c) VALUES(t2.a,t2.c)", + "session_id": "test_session_id", + "database_name": "embucket", + "schema_name": "public", + "query_type": "MERGE", + "warehouse_type": "DEFAULT", + "execution_status": "Success", + "start_time": "2026-01-01T01:01:01.000000001Z", + "end_time": "2026-01-01T01:01:01.000000001Z", + "rows_produced": 4, + "rows_inserted": 1, + "execution_time": "1", + "release_version": "test-version", + "query_hash": "16532873076018472935", + "query_hash_version": 1 + } + "#); + }); + true + }); + + let state_store: Arc = Arc::new(state_store_mock); + + let metastore = Arc::new(InMemoryMetastore::new()); + MetastoreBootstrapConfig::bootstrap() + .apply(metastore.clone()) + .await + .expect("Failed to bootstrap metastore"); + + let execution_svc = CoreExecutionService::new_test_executor( + metastore, + state_store, + Arc::new(Config::default()), + ) + .await + .expect("Failed to create execution service"); + + timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.create_session(TEST_SESSION_ID), + ) + .await + .expect("Create session timed out") + .expect("Failed to create session"); + + // prepare tables + let _ = timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.query( + TEST_SESSION_ID, + " + CREATE TABLE embucket.public.t1 AS SELECT + a,b,c + FROM (VALUES + (1,'b1','c1'), + (2,'b2','c2'), + (2,'b3','c3'), + (3,'b4','c4') + ) AS t(a, b, c);", + query_context.clone(), + ), + ) + .await + .expect("Query timed out") + .expect("Query execution failed"); + + let _ = timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.query( + TEST_SESSION_ID, + " + CREATE TABLE embucket.public.t2 AS SELECT + a,b,c + FROM (VALUES + (1,'b_5','c_5'), + (3,'b_6','c_6'), + (2,'b_7','c_7'), + (4,'b_8','c_8') + ) AS t(a, b, c);", + query_context.clone(), + ), + ) + .await + .expect("Query timed out") + .expect("Query execution failed"); + + let _ = timeout( + MOCK_RELATED_TIMEOUT_DURATION, + execution_svc.query(TEST_SESSION_ID, "MERGE INTO t1 USING (SELECT * FROM t2) AS t2 ON t1.a = t2.a WHEN MATCHED THEN UPDATE SET t1.c = t2.c WHEN NOT MATCHED THEN INSERT (a,c) VALUES(t2.a,t2.c)", query_context), ) .await .expect("Query timed out") - .expect("Query execution stopped by timeout"); + .expect("Query execution failed"); } #[allow(clippy::expect_used)] @@ -286,7 +867,7 @@ async fn test_query_lifecycle_query_status_fail() { timeout( MOCK_RELATED_TIMEOUT_DURATION, - execution_svc.create_session(TEST_SESSION_ID) + execution_svc.create_session(TEST_SESSION_ID), ) .await .expect("Create session timed out") @@ -381,7 +962,7 @@ async fn test_query_lifecycle_query_status_cancelled() { timeout( MOCK_RELATED_TIMEOUT_DURATION, - execution_svc.create_session(TEST_SESSION_ID) + execution_svc.create_session(TEST_SESSION_ID), ) .await .expect("Create session timed out") diff --git a/crates/state-store/src/models.rs b/crates/state-store/src/models.rs index 9e293a4c..4058ca4b 100644 --- a/crates/state-store/src/models.rs +++ b/crates/state-store/src/models.rs @@ -343,6 +343,18 @@ impl Query { self.rows_produced = Some(rows_count); } + pub fn set_rows_inserted(&mut self, rows_count: u64) { + self.rows_inserted = Some(rows_count); + } + + pub fn set_rows_deleted(&mut self, rows_count: u64) { + self.rows_deleted = Some(rows_count); + } + + pub fn set_rows_updated(&mut self, rows_count: u64) { + self.rows_updated = Some(rows_count); + } + pub fn set_query_type(&mut self, query_type: String) { self.query_type = Some(query_type); } From fcbfbb5babedc607a3a01dd3ed79aadbc2264239 Mon Sep 17 00:00:00 2001 From: Yaroslav Litvinov Date: Wed, 7 Jan 2026 19:49:32 +0100 Subject: [PATCH 4/5] clippy --- crates/state-store/src/models.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/crates/state-store/src/models.rs b/crates/state-store/src/models.rs index 4058ca4b..ecb66392 100644 --- a/crates/state-store/src/models.rs +++ b/crates/state-store/src/models.rs @@ -317,9 +317,7 @@ impl Query { self.schema_name = Some(schema); } - // Why? warning: this could be a `const fn` - #[allow(clippy::missing_const_for_fn)] - pub fn set_execution_status(&mut self, status: ExecutionStatus) { + pub const fn set_execution_status(&mut self, status: ExecutionStatus) { self.execution_status = Some(status); } @@ -339,19 +337,19 @@ impl Query { self.release_version = Some(release_version); } - pub fn set_rows_produced(&mut self, rows_count: u64) { + pub const fn set_rows_produced(&mut self, rows_count: u64) { self.rows_produced = Some(rows_count); } - pub fn set_rows_inserted(&mut self, rows_count: u64) { + pub const fn set_rows_inserted(&mut self, rows_count: u64) { self.rows_inserted = Some(rows_count); } - pub fn set_rows_deleted(&mut self, rows_count: u64) { + pub const fn set_rows_deleted(&mut self, rows_count: u64) { self.rows_deleted = Some(rows_count); } - pub fn set_rows_updated(&mut self, rows_count: u64) { + pub const fn set_rows_updated(&mut self, rows_count: u64) { self.rows_updated = Some(rows_count); } From 0bdf97fda41f81f13b3249f676462697f9b8990b Mon Sep 17 00:00:00 2001 From: Yaroslav Litvinov Date: Wed, 7 Jan 2026 21:12:25 +0100 Subject: [PATCH 5/5] fix clippy false positives - allow(unused_assignments) --- crates/api-snowflake-rest-sessions/src/error.rs | 1 + crates/api-snowflake-rest/src/server/error.rs | 2 +- crates/catalog-metastore/src/error.rs | 1 + crates/catalog/src/df_error.rs | 1 + crates/catalog/src/error.rs | 1 + crates/executor/src/error.rs | 1 + crates/executor/src/query.rs | 7 +++---- crates/executor/src/query_task_result.rs | 2 +- crates/executor/src/query_types.rs | 4 ++-- crates/executor/src/running_queries.rs | 2 +- crates/functions/src/aggregate/errors.rs | 1 + crates/functions/src/arrow_error.rs | 1 + crates/functions/src/conversion/errors.rs | 1 + crates/functions/src/datetime/errors.rs | 1 + crates/functions/src/df_error.rs | 1 + crates/functions/src/encryption/errors.rs | 1 + crates/functions/src/errors.rs | 1 + crates/functions/src/numeric/errors.rs | 1 + crates/functions/src/regexp/errors.rs | 1 + crates/functions/src/semi-structured/errors.rs | 1 + crates/functions/src/string-binary/errors.rs | 1 + crates/functions/src/system/errors.rs | 1 + crates/functions/src/table/errors.rs | 1 + crates/queries/src/error.rs | 1 + crates/state-store/src/error.rs | 1 + 25 files changed, 28 insertions(+), 9 deletions(-) diff --git a/crates/api-snowflake-rest-sessions/src/error.rs b/crates/api-snowflake-rest-sessions/src/error.rs index 22243bbf..ffcc1c97 100644 --- a/crates/api-snowflake-rest-sessions/src/error.rs +++ b/crates/api-snowflake-rest-sessions/src/error.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use axum::{Json, http, response::IntoResponse}; use error_stack_trace; use http::header::InvalidHeaderValue; diff --git a/crates/api-snowflake-rest/src/server/error.rs b/crates/api-snowflake-rest/src/server/error.rs index c6b34174..02ed9f30 100644 --- a/crates/api-snowflake-rest/src/server/error.rs +++ b/crates/api-snowflake-rest/src/server/error.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use crate::SqlState; use crate::models::JsonResponse; use crate::models::ResponseData; @@ -16,7 +17,6 @@ use snafu::prelude::*; pub type Result = std::result::Result; -// TBD: Why context at error/source mostly not used in error? #[derive(Snafu)] #[snafu(visibility(pub(crate)))] #[error_stack_trace::debug] diff --git a/crates/catalog-metastore/src/error.rs b/crates/catalog-metastore/src/error.rs index e412361e..420b8eaa 100644 --- a/crates/catalog-metastore/src/error.rs +++ b/crates/catalog-metastore/src/error.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use error_stack_trace; use iceberg_rust::error::Error as IcebergError; use iceberg_rust_spec::table_metadata::TableMetadataBuilderError; diff --git a/crates/catalog/src/df_error.rs b/crates/catalog/src/df_error.rs index 67b521ef..5d0d4931 100644 --- a/crates/catalog/src/df_error.rs +++ b/crates/catalog/src/df_error.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use crate::error::Error; use error_stack_trace; use iceberg_rust::error::Error as IcebergError; diff --git a/crates/catalog/src/error.rs b/crates/catalog/src/error.rs index 5e0a8ba9..7aa7ce58 100644 --- a/crates/catalog/src/error.rs +++ b/crates/catalog/src/error.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use datafusion_common::DataFusionError; use error_stack_trace; use iceberg_rust::error::Error as IcebergError; diff --git a/crates/executor/src/error.rs b/crates/executor/src/error.rs index edbab26c..3d08461e 100644 --- a/crates/executor/src/error.rs +++ b/crates/executor/src/error.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use super::snowflake_error::SnowflakeError; use crate::query_types::{ExecutionStatus, QueryId}; use catalog::error::Error as CatalogError; diff --git a/crates/executor/src/query.rs b/crates/executor/src/query.rs index 24f00e7d..312c15d5 100644 --- a/crates/executor/src/query.rs +++ b/crates/executor/src/query.rs @@ -280,18 +280,18 @@ impl UserQuery { Statement::CreateView { .. } => save(QueryType::Ddl(DdlStType::CreateView)), Statement::CreateDatabase { .. } => save(QueryType::Ddl(DdlStType::CreateDatabase)), Statement::CreateExternalVolume { .. } => { - save(QueryType::Ddl(DdlStType::CreateVolume)) + save(QueryType::Ddl(DdlStType::CreateVolume)); } Statement::CreateSchema { .. } => save(QueryType::Ddl(DdlStType::CreateSchema)), Statement::CreateStage { .. } => save(QueryType::Ddl(DdlStType::CreateStage)), Statement::CopyIntoSnowflake { .. } => { - save(QueryType::Ddl(DdlStType::CopyIntoSnowflake)) + save(QueryType::Ddl(DdlStType::CopyIntoSnowflake)); } Statement::Drop { object_type, .. } => match object_type { ObjectType::Table => save(QueryType::Ddl(DdlStType::DropTable)), ObjectType::View => save(QueryType::Ddl(DdlStType::DropView)), ObjectType::MaterializedView => { - save(QueryType::Ddl(DdlStType::DropMaterializedView)) + save(QueryType::Ddl(DdlStType::DropMaterializedView)); } ObjectType::Schema => save(QueryType::Ddl(DdlStType::DropSchema)), ObjectType::Database => save(QueryType::Ddl(DdlStType::DropDatabase)), @@ -313,7 +313,6 @@ impl UserQuery { Statement::ShowSchemas { .. } => save(QueryType::Misc(MiscStType::ShowSchemas)), Statement::ShowTables { .. } => save(QueryType::Misc(MiscStType::ShowTables)), Statement::ShowViews { .. } => save(QueryType::Misc(MiscStType::ShowViews)), - Statement::ExplainTable { .. } => save(QueryType::Misc(MiscStType::ExplainTable)), _ => {} } diff --git a/crates/executor/src/query_task_result.rs b/crates/executor/src/query_task_result.rs index a8928dad..c6008f84 100644 --- a/crates/executor/src/query_task_result.rs +++ b/crates/executor/src/query_task_result.rs @@ -7,11 +7,11 @@ cfg_if::cfg_if! { if #[cfg(feature = "state-store-query")] { use super::query_types::{DmlStType, QueryType}; use datafusion::arrow::array::{Int64Array, UInt64Array}; + use state_store::QueryMetric; } } use super::snowflake_error::SnowflakeError; use snafu::ResultExt; -use state_store::QueryMetric; use tokio::task::JoinError; use uuid::Uuid; diff --git a/crates/executor/src/query_types.rs b/crates/executor/src/query_types.rs index 0285cd63..658abccb 100644 --- a/crates/executor/src/query_types.rs +++ b/crates/executor/src/query_types.rs @@ -93,10 +93,10 @@ pub struct QueryStats { } impl QueryStats { - pub fn with_query_type(self, query_type: QueryType) -> Self { + #[must_use] + pub const fn with_query_type(self, query_type: QueryType) -> Self { Self { query_type: Some(query_type), - ..self } } } diff --git a/crates/executor/src/running_queries.rs b/crates/executor/src/running_queries.rs index aed90b55..fae7b166 100644 --- a/crates/executor/src/running_queries.rs +++ b/crates/executor/src/running_queries.rs @@ -101,7 +101,7 @@ impl RunningQuery { #[tracing::instrument(name = "RunningQuery::update_query_stats", level = "trace", skip(self))] pub fn update_query_stats(&mut self, stats: &QueryStats) { if self.query_stats.query_type.is_none() { - self.query_stats.query_type = stats.query_type.clone(); + self.query_stats.query_type.clone_from(&stats.query_type); } } } diff --git a/crates/functions/src/aggregate/errors.rs b/crates/functions/src/aggregate/errors.rs index 98e6ed6c..1f54f34c 100644 --- a/crates/functions/src/aggregate/errors.rs +++ b/crates/functions/src/aggregate/errors.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use snafu::{Location, Snafu}; #[derive(Snafu)] diff --git a/crates/functions/src/arrow_error.rs b/crates/functions/src/arrow_error.rs index f419f941..3661dbb2 100644 --- a/crates/functions/src/arrow_error.rs +++ b/crates/functions/src/arrow_error.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use error_stack_trace; use snafu::{Location, Snafu}; diff --git a/crates/functions/src/conversion/errors.rs b/crates/functions/src/conversion/errors.rs index bfe86b0b..ba49e217 100644 --- a/crates/functions/src/conversion/errors.rs +++ b/crates/functions/src/conversion/errors.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use datafusion::arrow::datatypes::DataType; use datafusion_common::ScalarValue; use snafu::{Location, Snafu}; diff --git a/crates/functions/src/datetime/errors.rs b/crates/functions/src/datetime/errors.rs index c6308f24..f5b3c14f 100644 --- a/crates/functions/src/datetime/errors.rs +++ b/crates/functions/src/datetime/errors.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use chrono_tz::ParseError; use snafu::{Location, Snafu}; diff --git a/crates/functions/src/df_error.rs b/crates/functions/src/df_error.rs index 6b78daec..1c24b514 100644 --- a/crates/functions/src/df_error.rs +++ b/crates/functions/src/df_error.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use error_stack_trace; use snafu::{Location, Snafu}; diff --git a/crates/functions/src/encryption/errors.rs b/crates/functions/src/encryption/errors.rs index ce430313..687f03f9 100644 --- a/crates/functions/src/encryption/errors.rs +++ b/crates/functions/src/encryption/errors.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use datafusion::error::DataFusionError; use snafu::Location; use snafu::Snafu; diff --git a/crates/functions/src/errors.rs b/crates/functions/src/errors.rs index f67d648e..d00d9a3a 100644 --- a/crates/functions/src/errors.rs +++ b/crates/functions/src/errors.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use snafu::{Location, Snafu}; // In this file we create 2 types of errors: DataFusionInternalError and DataFusionExecutionError diff --git a/crates/functions/src/numeric/errors.rs b/crates/functions/src/numeric/errors.rs index 62d9122a..aaa26bb7 100644 --- a/crates/functions/src/numeric/errors.rs +++ b/crates/functions/src/numeric/errors.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use snafu::{Location, Snafu}; #[derive(Snafu)] diff --git a/crates/functions/src/regexp/errors.rs b/crates/functions/src/regexp/errors.rs index 0ffc7837..0b76d7a2 100644 --- a/crates/functions/src/regexp/errors.rs +++ b/crates/functions/src/regexp/errors.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use datafusion::arrow::datatypes::DataType; use snafu::{Location, Snafu}; use std::num::TryFromIntError; diff --git a/crates/functions/src/semi-structured/errors.rs b/crates/functions/src/semi-structured/errors.rs index 73f3be4c..8790dcf2 100644 --- a/crates/functions/src/semi-structured/errors.rs +++ b/crates/functions/src/semi-structured/errors.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use snafu::location; use snafu::{Location, Snafu}; diff --git a/crates/functions/src/string-binary/errors.rs b/crates/functions/src/string-binary/errors.rs index e68cb4d1..69cef0ec 100644 --- a/crates/functions/src/string-binary/errors.rs +++ b/crates/functions/src/string-binary/errors.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use snafu::location; use snafu::{Location, Snafu}; diff --git a/crates/functions/src/system/errors.rs b/crates/functions/src/system/errors.rs index 4422fe88..f13bd2a1 100644 --- a/crates/functions/src/system/errors.rs +++ b/crates/functions/src/system/errors.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use snafu::location; use snafu::{Location, Snafu}; diff --git a/crates/functions/src/table/errors.rs b/crates/functions/src/table/errors.rs index 5b3f7d56..8c3ff448 100644 --- a/crates/functions/src/table/errors.rs +++ b/crates/functions/src/table/errors.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use error_stack_trace; use snafu::{Location, Snafu}; diff --git a/crates/queries/src/error.rs b/crates/queries/src/error.rs index bea8c8e4..3df9d18f 100644 --- a/crates/queries/src/error.rs +++ b/crates/queries/src/error.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use error_stack_trace; use snafu::Location; use snafu::prelude::*; diff --git a/crates/state-store/src/error.rs b/crates/state-store/src/error.rs index 43b4baae..042b0664 100644 --- a/crates/state-store/src/error.rs +++ b/crates/state-store/src/error.rs @@ -1,3 +1,4 @@ +#![allow(unused_assignments)] use aws_sdk_dynamodb::config::http::HttpResponse; use aws_sdk_dynamodb::error::SdkError; use aws_sdk_dynamodb::operation::delete_item::DeleteItemError;