diff --git a/src/adapter/src/coord/peek.rs b/src/adapter/src/coord/peek.rs index 1ede6e3e3ff1c..45b0b49a9c7d9 100644 --- a/src/adapter/src/coord/peek.rs +++ b/src/adapter/src/coord/peek.rs @@ -59,7 +59,6 @@ use crate::coord::timestamp_selection::TimestampDetermination; use crate::optimize::OptimizerError; use crate::statement_logging::WatchSetCreation; use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy}; -use crate::util::ResultExt; use crate::{AdapterError, ExecuteContextGuard, ExecuteResponse}; /// A peek is a request to read data from a maintained arrangement. @@ -851,7 +850,7 @@ impl crate::coord::Coordinator { target_replica, rows_tx, ) - .unwrap_or_terminate("cannot fail to peek"); + .map_err(AdapterError::concurrent_dependency_drop_from_peek_error)?; let duration_histogram = self.metrics.row_set_finishing_seconds(); diff --git a/src/adapter/src/error.rs b/src/adapter/src/error.rs index c57f147c69965..372973b46c5fe 100644 --- a/src/adapter/src/error.rs +++ b/src/adapter/src/error.rs @@ -17,7 +17,6 @@ use itertools::Itertools; use mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER; use mz_compute_client::controller::error as compute_error; use mz_compute_client::controller::error::{CollectionLookupError, InstanceMissing}; -use mz_compute_client::controller::instance::PeekError; use mz_compute_types::ComputeInstanceId; use mz_expr::EvalError; use mz_ore::error::ErrorExt; @@ -719,21 +718,43 @@ impl AdapterError { } } - pub fn concurrent_dependency_drop_from_peek_error( - e: PeekError, + pub fn concurrent_dependency_drop_from_instance_peek_error( + e: mz_compute_client::controller::instance::PeekError, compute_instance: ComputeInstanceId, ) -> AdapterError { + use mz_compute_client::controller::instance::PeekError::*; match e { - PeekError::ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop { + ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop { dependency_kind: "replica", dependency_id: id.to_string(), }, - PeekError::InstanceShutDown => AdapterError::ConcurrentDependencyDrop { + InstanceShutDown => AdapterError::ConcurrentDependencyDrop { dependency_kind: "cluster", dependency_id: compute_instance.to_string(), }, - e @ PeekError::ReadHoldIdMismatch(_) => AdapterError::internal("peek error", e), - e @ PeekError::ReadHoldInsufficient(_) => AdapterError::internal("peek error", e), + e @ ReadHoldIdMismatch(_) => AdapterError::internal("instance peek error", e), + e @ ReadHoldInsufficient(_) => AdapterError::internal("instance peek error", e), + } + } + + pub fn concurrent_dependency_drop_from_peek_error( + e: mz_compute_client::controller::error::PeekError, + ) -> AdapterError { + use mz_compute_client::controller::error::PeekError::*; + match e { + InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop { + dependency_kind: "cluster", + dependency_id: id.to_string(), + }, + CollectionMissing(id) => AdapterError::ConcurrentDependencyDrop { + dependency_kind: "collection", + dependency_id: id.to_string(), + }, + ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop { + dependency_kind: "replica", + dependency_id: id.to_string(), + }, + e @ SinceViolation(_) => AdapterError::internal("peek error", e), } } diff --git a/src/adapter/src/peek_client.rs b/src/adapter/src/peek_client.rs index e8525123c76ff..5a999db237f32 100644 --- a/src/adapter/src/peek_client.rs +++ b/src/adapter/src/peek_client.rs @@ -390,10 +390,12 @@ impl PeekClient { // The frontend will handle statement logging for the error. self.call_coordinator(|tx| Command::UnregisterFrontendPeek { uuid, tx }) .await; - return Err(AdapterError::concurrent_dependency_drop_from_peek_error( - err, - compute_instance, - )); + return Err( + AdapterError::concurrent_dependency_drop_from_instance_peek_error( + err, + compute_instance, + ), + ); } let peek_response_stream = Coordinator::create_peek_response_stream(