Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,3 @@ significant_drop_tightening = { level = "allow", priority = 2 }
module_name_repetitions = { level = "allow", priority = 2 }
option_if_let_else = { level = "allow", priority = 2 }
needless_for_each = { level = "allow", priority = 2 } # #[derive(OpenApi)]

[workspace.features]
state-store-query-test = ["executor/state-store-query-test"]
1 change: 1 addition & 0 deletions crates/api-snowflake-rest-sessions/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![allow(unused_assignments)]
use axum::{Json, http, response::IntoResponse};
use error_stack_trace;
use http::header::InvalidHeaderValue;
Expand Down
2 changes: 1 addition & 1 deletion crates/api-snowflake-rest/src/server/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![allow(unused_assignments)]
use crate::SqlState;
use crate::models::JsonResponse;
use crate::models::ResponseData;
Expand All @@ -16,7 +17,6 @@ use snafu::prelude::*;

pub type Result<T> = std::result::Result<T, Error>;

// TBD: Why context at error/source mostly not used in error?
#[derive(Snafu)]
#[snafu(visibility(pub(crate)))]
#[error_stack_trace::debug]
Expand Down
1 change: 1 addition & 0 deletions crates/catalog-metastore/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![allow(unused_assignments)]
use error_stack_trace;
use iceberg_rust::error::Error as IcebergError;
use iceberg_rust_spec::table_metadata::TableMetadataBuilderError;
Expand Down
1 change: 1 addition & 0 deletions crates/catalog/src/df_error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![allow(unused_assignments)]
use crate::error::Error;
use error_stack_trace;
use iceberg_rust::error::Error as IcebergError;
Expand Down
1 change: 1 addition & 0 deletions crates/catalog/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![allow(unused_assignments)]
use datafusion_common::DataFusionError;
use error_stack_trace;
use iceberg_rust::error::Error as IcebergError;
Expand Down
1 change: 1 addition & 0 deletions crates/embucket-lambda/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl EnvConfig {
pub fn execution_config(&self) -> ExecutionConfig {
ExecutionConfig {
embucket_version: self.embucket_version.clone(),
build_version: BuildInfo::GIT_DESCRIBE.to_string(),
warehouse_type: "LAMBDA_SERVERLESS".to_string(),
sql_parser_dialect: self.sql_parser_dialect.clone(),
query_timeout_secs: self.query_timeout_secs,
Expand Down
1 change: 1 addition & 0 deletions crates/embucketd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ async fn async_main(

let execution_cfg = ExecutionConfig {
embucket_version: BuildInfo::VERSION.to_string(),
build_version: BuildInfo::GIT_DESCRIBE.to_string(),
warehouse_type: "EMBUCKET".to_string(),
sql_parser_dialect: opts.sql_parser_dialect.clone(),
query_timeout_secs: opts.query_timeout_secs,
Expand Down
1 change: 1 addition & 0 deletions crates/executor/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![allow(unused_assignments)]
use super::snowflake_error::SnowflakeError;
use crate::query_types::{ExecutionStatus, QueryId};
use catalog::error::Error as CatalogError;
Expand Down
78 changes: 78 additions & 0 deletions crates/executor/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::datafusion::physical_plan::merge::{
use crate::datafusion::rewriters::session_context::SessionContextExprRewriter;
use crate::error::{OperationOn, OperationType};
use crate::models::{QueryContext, QueryMetric, QueryResult, metrics_set_to_json};
use crate::query_types::{DdlStType, DmlStType, MiscStType, QueryStats, QueryType};
use catalog::table::{CachingTable, IcebergTableBuilder};
use catalog_metastore::{
AwsAccessKeyCredentials, AwsCredentials, FileVolume, Metastore, S3TablesVolume, S3Volume,
Expand Down Expand Up @@ -245,6 +246,81 @@ impl UserQuery {
Ok(())
}

#[allow(clippy::too_many_lines)]
#[instrument(
name = "UserQuery::update_query_type",
level = "debug",
skip(self),
fields(
query_type,
query_id = self.query_context.query_id.to_string(),
),
)]
fn update_stats_query_type(&self, statement: &DFStatement) {
let save = |query_type: QueryType| {
self.running_queries.update_stats(
self.query_context.query_id,
&QueryStats::default().with_query_type(query_type),
);
};

if let DFStatement::Statement(s) = statement {
match **s {
// match DML statements:
Statement::Delete { .. } => save(QueryType::Dml(DmlStType::Delete)),
Statement::Update { .. } => save(QueryType::Dml(DmlStType::Update)),
Statement::Insert { .. } => save(QueryType::Dml(DmlStType::Insert)),
Statement::Truncate { .. } => save(QueryType::Dml(DmlStType::Truncate)),
Statement::Query(..) => save(QueryType::Dml(DmlStType::Select)),
Statement::Merge { .. } => save(QueryType::Dml(DmlStType::Merge)),
// match DDL statements:
Statement::AlterSession { .. } => save(QueryType::Ddl(DdlStType::AlterSession)),
Statement::AlterTable { .. } => save(QueryType::Ddl(DdlStType::AlterTable)),
Statement::CreateTable { .. } => save(QueryType::Ddl(DdlStType::CreateTable)),
Statement::CreateView { .. } => save(QueryType::Ddl(DdlStType::CreateView)),
Statement::CreateDatabase { .. } => save(QueryType::Ddl(DdlStType::CreateDatabase)),
Statement::CreateExternalVolume { .. } => {
save(QueryType::Ddl(DdlStType::CreateVolume));
}
Statement::CreateSchema { .. } => save(QueryType::Ddl(DdlStType::CreateSchema)),
Statement::CreateStage { .. } => save(QueryType::Ddl(DdlStType::CreateStage)),
Statement::CopyIntoSnowflake { .. } => {
save(QueryType::Ddl(DdlStType::CopyIntoSnowflake));
}
Statement::Drop { object_type, .. } => match object_type {
ObjectType::Table => save(QueryType::Ddl(DdlStType::DropTable)),
ObjectType::View => save(QueryType::Ddl(DdlStType::DropView)),
ObjectType::MaterializedView => {
save(QueryType::Ddl(DdlStType::DropMaterializedView));
}
ObjectType::Schema => save(QueryType::Ddl(DdlStType::DropSchema)),
ObjectType::Database => save(QueryType::Ddl(DdlStType::DropDatabase)),
ObjectType::Stage => save(QueryType::Ddl(DdlStType::DropStage)),
_ => {}
},
// match other statements:
Statement::Use(..) => save(QueryType::Misc(MiscStType::Use)),
Statement::Set(..) => save(QueryType::Misc(MiscStType::Set)),
Statement::StartTransaction { .. } => save(QueryType::Misc(MiscStType::Begin)),
Statement::Commit { .. } => save(QueryType::Misc(MiscStType::Commit)),
Statement::Rollback { .. } => save(QueryType::Misc(MiscStType::Rollback)),
Statement::ShowColumns { .. } => save(QueryType::Misc(MiscStType::ShowColumns)),
Statement::ShowFunctions { .. } => save(QueryType::Misc(MiscStType::ShowFunctions)),
Statement::ShowObjects { .. } => save(QueryType::Misc(MiscStType::ShowObjects)),
Statement::ShowVariables { .. } => save(QueryType::Misc(MiscStType::ShowVariables)),
Statement::ShowVariable { .. } => save(QueryType::Misc(MiscStType::ShowVariable)),
Statement::ShowDatabases { .. } => save(QueryType::Misc(MiscStType::ShowDatabases)),
Statement::ShowSchemas { .. } => save(QueryType::Misc(MiscStType::ShowSchemas)),
Statement::ShowTables { .. } => save(QueryType::Misc(MiscStType::ShowTables)),
Statement::ShowViews { .. } => save(QueryType::Misc(MiscStType::ShowViews)),
Statement::ExplainTable { .. } => save(QueryType::Misc(MiscStType::ExplainTable)),
_ => {}
}
} else if let DFStatement::CreateExternalTable(..) = statement {
save(QueryType::Ddl(DdlStType::CreateExternalTable));
}
}

#[allow(clippy::too_many_lines)]
#[instrument(
name = "UserQuery::execute",
Expand All @@ -264,6 +340,8 @@ impl UserQuery {
// Record the result as part of the current span.
tracing::Span::current().record("statement", format!("{statement:#?}"));

self.update_stats_query_type(&statement);

// TODO: Code should be organized in a better way
// 1. Single place to parse SQL strings into AST
// 2. Single place to update AST
Expand Down
61 changes: 59 additions & 2 deletions crates/executor/src/query_task_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ use super::error::Result;
use super::error_code::ErrorCode;
use super::models::QueryResult;
use super::query_types::ExecutionStatus;
cfg_if::cfg_if! {
if #[cfg(feature = "state-store-query")] {
use super::query_types::{DmlStType, QueryType};
use datafusion::arrow::array::{Int64Array, UInt64Array};
use state_store::QueryMetric;
}
}
use super::snowflake_error::SnowflakeError;
use snafu::ResultExt;
#[cfg(feature = "state-store-query")]
use state_store::QueryMetric;
use tokio::task::JoinError;
use uuid::Uuid;

Expand Down Expand Up @@ -80,6 +85,37 @@ impl ExecutionTaskResult {
}
}

#[cfg(feature = "state-store-query")]
pub fn assign_rows_counts_attributes(
&self,
query: &mut state_store::Query,
query_type: QueryType,
) {
if let Ok(result) = &self.result
&& let QueryType::Dml(query_type) = query_type
{
if let DmlStType::Select = query_type {
let rows_count: u64 = result.records.iter().map(|r| r.num_rows() as u64).sum();
query.set_rows_produced(rows_count);
} else if let Some(rows_count) = value_by_row_column(&result, 0, 0) {
match query_type {
DmlStType::Insert => query.set_rows_inserted(rows_count as u64),
DmlStType::Update => query.set_rows_updated(rows_count as u64),
DmlStType::Delete => query.set_rows_deleted(rows_count as u64),
DmlStType::Truncate => query.set_rows_deleted(rows_count as u64),
DmlStType::Merge => {
// merge has 2 columns, currently map values to insert/select rows counts
query.set_rows_inserted(rows_count as u64);
if let Some(rows_count) = value_by_row_column(&result, 0, 1) {
query.set_rows_produced(rows_count as u64);
}
}
_ => {}
}
}
}
}

#[cfg(feature = "state-store-query")]
pub fn assign_query_attributes(&self, query: &mut state_store::Query) {
query.set_execution_status(self.execution_status);
Expand Down Expand Up @@ -107,3 +143,24 @@ impl ExecutionTaskResult {
query.set_end_time();
}
}

#[cfg(feature = "state-store-query")]
fn value_by_row_column(result: &QueryResult, row_idx: usize, col_idx: usize) -> Option<u64> {
result.records[0].columns().get(col_idx).and_then(|col| {
if let Some(cols) = col.as_any().downcast_ref::<Int64Array>() {
if row_idx < cols.len() {
Some(cols.value(row_idx) as u64)
} else {
None
}
} else if let Some(cols) = col.as_any().downcast_ref::<UInt64Array>() {
if row_idx < cols.len() {
Some(cols.value(row_idx))
} else {
None
}
} else {
None
}
})
}
86 changes: 86 additions & 0 deletions crates/executor/src/query_types.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub type QueryId = uuid::Uuid;
use std::fmt::Display;

cfg_if::cfg_if! {
if #[cfg(feature = "state-store-query")] {
Expand All @@ -8,9 +9,94 @@ cfg_if::cfg_if! {
use std::fmt::Debug;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ExecutionStatus {
Running,
Success,
Fail,
Incident,
}
}
}

#[derive(Debug, Clone, strum::Display)]
#[strum(serialize_all = "UPPERCASE")]
pub enum DmlStType {
Select,
Insert,
Update,
Delete,
Truncate,
Merge,
}

#[derive(Debug, Clone, strum::Display)]
#[strum(serialize_all = "UPPERCASE")]
pub enum DdlStType {
CreateExternalTable,
CreateTable,
CreateView,
CreateDatabase,
CreateVolume,
CreateSchema,
CreateStage,
CopyIntoSnowflake,
DropTable,
DropView,
DropMaterializedView,
DropSchema,
DropDatabase,
DropStage,
AlterTable,
AlterSession,
Drop,
}

#[derive(Debug, Clone, strum::Display)]
#[strum(serialize_all = "UPPERCASE")]
pub enum MiscStType {
Use,
Set,
Begin,
Commit,
Rollback,
ShowColumns,
ShowFunctions,
ShowVariables,
ShowObjects,
ShowVariable,
ShowDatabases,
ShowSchemas,
ShowTables,
ShowViews,
ExplainTable,
}

#[derive(Debug, Clone)]
pub enum QueryType {
Dml(DmlStType),
Ddl(DdlStType),
Misc(MiscStType),
}

impl Display for QueryType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Dml(dml) => write!(f, "{dml}"),
Self::Ddl(ddl) => write!(f, "{ddl}"),
Self::Misc(misc) => write!(f, "{misc}"),
}
}
}

#[derive(Debug, Clone, Default)]
pub struct QueryStats {
pub query_type: Option<QueryType>,
}

impl QueryStats {
#[must_use]
pub const fn with_query_type(self, query_type: QueryType) -> Self {
Self {
query_type: Some(query_type),
}
}
}
Loading