From 7d6f99833846579b275de42a4b8d231122d3e7d2 Mon Sep 17 00:00:00 2001 From: Shuhei Kitagawa Date: Wed, 24 Jun 2026 14:25:51 +0200 Subject: [PATCH 1/5] Add metastore read replica routing --- config/quickwit.yaml | 5 + docs/configuration/node-config.md | 1 + quickwit/quickwit-cluster/src/grpc_service.rs | 2 +- .../example/src/codegen/hello.rs | 16 +- quickwit/quickwit-codegen/example/src/lib.rs | 12 +- quickwit/quickwit-codegen/src/codegen.rs | 45 +++- .../quickwit-common/src/tower/interceptor.rs | 116 +++++++++ quickwit/quickwit-common/src/tower/mod.rs | 2 + .../resources/tests/node_config/quickwit.json | 1 + .../resources/tests/node_config/quickwit.toml | 1 + .../resources/tests/node_config/quickwit.yaml | 1 + .../quickwit-config/src/node_config/mod.rs | 25 ++ .../src/node_config/serialize.rs | 26 +++ quickwit/quickwit-config/src/qw_env_vars.rs | 1 + .../src/sources/metrics/index_resolver.rs | 8 +- .../src/sources/metrics/metastore_provider.rs | 17 +- .../src/sources/metrics/mod.rs | 4 +- .../quickwit-datafusion/tests/distributed.rs | 2 +- quickwit/quickwit-datafusion/tests/metrics.rs | 2 +- .../quickwit-datafusion/tests/null_columns.rs | 2 +- .../quickwit-datafusion/tests/sketches.rs | 2 +- .../src/parquet_garbage_collection.rs | 2 +- .../parquet_merge_pipeline.rs | 2 +- .../src/codegen/ingest_service.rs | 12 +- .../quickwit-ingest/src/ingest_v2/ingester.rs | 1 + .../src/tests/metrics_distributed_tests.rs | 4 +- .../src/retention_policy_execution.rs | 6 +- quickwit/quickwit-metastore/src/lib.rs | 2 +- .../file_backed_metastore_factory.rs | 5 +- .../quickwit-metastore/src/metastore/mod.rs | 15 +- .../src/metastore/postgres/factory.rs | 36 ++- .../src/metastore/postgres/metastore.rs | 64 ++++- .../src/metastore_factory.rs | 12 +- .../src/metastore_resolver.rs | 44 +++- .../src/codegen/quickwit/quickwit.cluster.rs | 12 +- .../quickwit/quickwit.control_plane.rs | 12 +- .../codegen/quickwit/quickwit.developer.rs | 12 +- .../src/codegen/quickwit/quickwit.indexing.rs | 12 +- .../quickwit/quickwit.ingest.ingester.rs | 12 +- .../quickwit/quickwit.ingest.router.rs | 12 +- .../codegen/quickwit/quickwit.metastore.rs | 26 ++- .../quickwit-proto/src/grpc_read_replica.rs | 87 +++++++ quickwit/quickwit-proto/src/lib.rs | 1 + quickwit/quickwit-proto/src/metastore/mod.rs | 221 ++++++++++++++++++ quickwit/quickwit-search/src/lib.rs | 14 +- .../quickwit-search/src/list_fields/root.rs | 8 +- quickwit/quickwit-search/src/list_terms.rs | 7 +- quickwit/quickwit-search/src/root.rs | 75 +++--- quickwit/quickwit-search/src/service.rs | 14 +- quickwit/quickwit-search/src/tests.rs | 6 +- .../src/datafusion_api/setup.rs | 4 +- .../quickwit-serve/src/developer_api/debug.rs | 1 + .../src/elasticsearch_api/rest_handler.rs | 20 +- quickwit/quickwit-serve/src/grpc.rs | 5 +- quickwit/quickwit-serve/src/lib.rs | 128 ++++++---- quickwit/quickwit-serve/src/rest.rs | 1 + 56 files changed, 983 insertions(+), 203 deletions(-) create mode 100644 quickwit/quickwit-common/src/tower/interceptor.rs create mode 100644 quickwit/quickwit-proto/src/grpc_read_replica.rs diff --git a/config/quickwit.yaml b/config/quickwit.yaml index 035bd2fb241..1c1f035bc9b 100644 --- a/config/quickwit.yaml +++ b/config/quickwit.yaml @@ -103,6 +103,11 @@ version: 0.8 # metastore_uri: s3://your-bucket/indexes # metastore_uri: postgres://username:password@host:port/db # +# Optional PostgreSQL read replica URI used by metastore nodes for gRPC requests +# carrying the `qw-use-read-replica: true` metadata. Defaults to unset. +# +# metastore_read_replica_uri: postgres://username:password@read-replica-host:port/db +# # When using a file-backed metastore, the state of the metastore will be cached forever. # If you are indexing and searching from different processes, it is possible to periodically # refresh the state of the metastore on the searcher using the `polling_interval` hashtag. diff --git a/docs/configuration/node-config.md b/docs/configuration/node-config.md index 849e8cb44c3..e608da224ff 100644 --- a/docs/configuration/node-config.md +++ b/docs/configuration/node-config.md @@ -30,6 +30,7 @@ A commented example is available here: [quickwit.yaml](https://github.com/quickw | `peer_seeds` | List of IP addresses or hostnames used to bootstrap the cluster and discover the complete set of nodes. This list may contain the current node address and does not need to be exhaustive. If the list of peer seeds contains a host name, Quickwit will resolve it by querying the DNS every minute. On kubernetes for instance, it is a good practise to set it to a [headless service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services). | `QW_PEER_SEEDS` | | | `data_dir` | Path to directory where data (tmp data, splits kept for caching purpose) is persisted. This is mostly used in indexing. | `QW_DATA_DIR` | `./qwdata` | | `metastore_uri` | Metastore URI. Can be a local directory or `s3://my-bucket/indexes` or `postgres://username:password@localhost:5432/metastore`. [Learn more about the metastore configuration](metastore-config.md). | `QW_METASTORE_URI` | `{data_dir}/indexes` | +| `metastore_read_replica_uri` | Optional PostgreSQL read replica URI used by metastore nodes when a gRPC request carries the `qw-use-read-replica: true` metadata. If unset, those requests use `metastore_uri`. | `QW_METASTORE_READ_REPLICA_URI` | | | `default_index_root_uri` | Default index root URI that defines the location where index data (splits) is stored. The index URI is built following the scheme: `{default_index_root_uri}/{index-id}` | `QW_DEFAULT_INDEX_ROOT_URI` | `{data_dir}/indexes` | | environment variable only | Log level of Quickwit. Can be a direct log level, or a comma separated list of `module_name=level` | `RUST_LOG` | `info` | diff --git a/quickwit/quickwit-cluster/src/grpc_service.rs b/quickwit/quickwit-cluster/src/grpc_service.rs index 2cc0253a7c0..cfecf56697f 100644 --- a/quickwit/quickwit-cluster/src/grpc_service.rs +++ b/quickwit/quickwit-cluster/src/grpc_service.rs @@ -44,7 +44,7 @@ pub(crate) async fn cluster_grpc_client( ClusterServiceClient::tower() .stack_layer(CLUSTER_GRPC_CLIENT_METRICS_LAYER.clone()) - .build_from_channel(socket_addr, channel, MAX_MESSAGE_SIZE, None) + .build_from_channel(socket_addr, channel, MAX_MESSAGE_SIZE, None, []) } pub fn cluster_grpc_server( diff --git a/quickwit/quickwit-codegen/example/src/codegen/hello.rs b/quickwit/quickwit-codegen/example/src/codegen/hello.rs index 234b53ae071..c1c635d94c7 100644 --- a/quickwit/quickwit-codegen/example/src/codegen/hello.rs +++ b/quickwit/quickwit-codegen/example/src/codegen/hello.rs @@ -112,11 +112,15 @@ impl HelloClient { channel: tonic::transport::Channel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> Self { let (_, connection_keys_watcher) = tokio::sync::watch::channel( std::collections::HashSet::from_iter([addr]), ); - let mut client = hello_grpc_client::HelloGrpcClient::new(channel) + let mut client = hello_grpc_client::HelloGrpcClient::with_interceptor( + channel, + quickwit_common::tower::GrpcInterceptors::new(interceptors), + ) .max_decoding_message_size(max_message_size.0 as usize) .max_encoding_message_size(max_message_size.0 as usize); if let Some(compression_encoding) = compression_encoding_opt { @@ -131,9 +135,13 @@ impl HelloClient { balance_channel: quickwit_common::tower::BalanceChannel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> HelloClient { let connection_keys_watcher = balance_channel.connection_keys_watcher(); - let mut client = hello_grpc_client::HelloGrpcClient::new(balance_channel) + let mut client = hello_grpc_client::HelloGrpcClient::with_interceptor( + balance_channel, + quickwit_common::tower::GrpcInterceptors::new(interceptors), + ) .max_decoding_message_size(max_message_size.0 as usize) .max_encoding_message_size(max_message_size.0 as usize); if let Some(compression_encoding) = compression_encoding_opt { @@ -517,12 +525,14 @@ impl HelloTowerLayerStack { channel: tonic::transport::Channel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> HelloClient { let client = HelloClient::from_channel( addr, channel, max_message_size, compression_encoding_opt, + interceptors, ); let inner_client = client.inner; self.build_from_inner_client(inner_client) @@ -532,11 +542,13 @@ impl HelloTowerLayerStack { balance_channel: quickwit_common::tower::BalanceChannel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> HelloClient { let client = HelloClient::from_balance_channel( balance_channel, max_message_size, compression_encoding_opt, + interceptors, ); let inner_client = client.inner; self.build_from_inner_client(inner_client) diff --git a/quickwit/quickwit-codegen/example/src/lib.rs b/quickwit/quickwit-codegen/example/src/lib.rs index 75a1ada2f0d..a95faf20c9b 100644 --- a/quickwit/quickwit-codegen/example/src/lib.rs +++ b/quickwit/quickwit-codegen/example/src/lib.rs @@ -282,7 +282,8 @@ mod tests { "127.0.0.1:6666".parse().unwrap(), Endpoint::from_static("http://127.0.0.1:6666").connect_lazy(), ); - let grpc_client = HelloClient::from_balance_channel(channel, MAX_GRPC_MESSAGE_SIZE, None); + let grpc_client = + HelloClient::from_balance_channel(channel, MAX_GRPC_MESSAGE_SIZE, None, []); assert_eq!( grpc_client @@ -342,7 +343,7 @@ mod tests { // The connectivity check fails if there is no client behind the channel. let (balance_channel, _): (BalanceChannel, _) = BalanceChannel::new(); let grpc_client = - HelloClient::from_balance_channel(balance_channel, MAX_GRPC_MESSAGE_SIZE, None); + HelloClient::from_balance_channel(balance_channel, MAX_GRPC_MESSAGE_SIZE, None, []); assert_eq!( grpc_client .check_connectivity() @@ -441,6 +442,7 @@ mod tests { channel, MAX_GRPC_MESSAGE_SIZE, Some(CompressionEncoding::Zstd), + [], ); assert_eq!( @@ -788,7 +790,7 @@ mod tests { "127.0.0.1:7777".parse().unwrap(), Endpoint::from_static("http://127.0.0.1:7777").connect_lazy(), ); - HelloClient::from_balance_channel(balance_channed, MAX_GRPC_MESSAGE_SIZE, None); + HelloClient::from_balance_channel(balance_channed, MAX_GRPC_MESSAGE_SIZE, None, []); } #[tokio::test] @@ -876,7 +878,7 @@ mod tests { .timeout(Duration::from_millis(100)) .connect_lazy(); let max_message_size = ByteSize::mib(1); - let grpc_client = HelloClient::from_channel(addr, channel, max_message_size, None); + let grpc_client = HelloClient::from_channel(addr, channel, max_message_size, None, []); let error = grpc_client .hello(HelloRequest { @@ -955,7 +957,7 @@ mod tests { // this test hangs forever if we comment out the TimeoutLayer, which // shows that a request without explicit timeout might hang forever .stack_layer(TimeoutLayer::new(Duration::from_secs(3))) - .build_from_balance_channel(balance_channel, ByteSize::mib(1), None); + .build_from_balance_channel(balance_channel, ByteSize::mib(1), None, []); let response_fut = async move { grpc_client diff --git a/quickwit/quickwit-codegen/src/codegen.rs b/quickwit/quickwit-codegen/src/codegen.rs index b1a59cfb9b7..7d92e1b3501 100644 --- a/quickwit/quickwit-codegen/src/codegen.rs +++ b/quickwit/quickwit-codegen/src/codegen.rs @@ -316,6 +316,10 @@ impl CodegenContext { } } +fn is_metastore_service(context: &CodegenContext) -> bool { + context.package_name == "quickwit.metastore" && context.service_name == "MetastoreService" +} + fn generate_all( service: &Service, result_type_path: &str, @@ -637,6 +641,21 @@ fn generate_client(context: &CodegenContext) -> TokenStream { } else { TokenStream::new() }; + let metastore_read_replica_client_methods = if is_metastore_service(context) { + quote! { + pub fn as_grpc_service_with_read_replica( + &self, + read_replica: Option, + max_message_size: bytesize::ByteSize, + ) -> crate::grpc_read_replica::ReadReplicaGrpcService<#grpc_server_package_name::#grpc_server_name<#grpc_server_adapter_name>> { + let primary = self.as_grpc_service(max_message_size); + let read_replica = read_replica.map(|client| client.as_grpc_service(max_message_size)); + crate::grpc_read_replica::ReadReplicaGrpcService::new(primary, read_replica) + } + } + } else { + TokenStream::new() + }; quote! { #[derive(Debug, Clone)] @@ -671,15 +690,21 @@ fn generate_client(context: &CodegenContext) -> TokenStream { .max_encoding_message_size(max_message_size.0 as usize) } + #metastore_read_replica_client_methods + pub fn from_channel( addr: std::net::SocketAddr, channel: tonic::transport::Channel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> Self { let (_, connection_keys_watcher) = tokio::sync::watch::channel(std::collections::HashSet::from_iter([addr])); - let mut client = #grpc_client_package_name::#grpc_client_name::new(channel) + let mut client = #grpc_client_package_name::#grpc_client_name::with_interceptor( + channel, + quickwit_common::tower::GrpcInterceptors::new(interceptors), + ) .max_decoding_message_size(max_message_size.0 as usize) .max_encoding_message_size(max_message_size.0 as usize); if let Some(compression_encoding) = compression_encoding_opt { @@ -695,10 +720,14 @@ fn generate_client(context: &CodegenContext) -> TokenStream { balance_channel: quickwit_common::tower::BalanceChannel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> #client_name { let connection_keys_watcher = balance_channel.connection_keys_watcher(); - let mut client = #grpc_client_package_name::#grpc_client_name::new(balance_channel) + let mut client = #grpc_client_package_name::#grpc_client_name::with_interceptor( + balance_channel, + quickwit_common::tower::GrpcInterceptors::new(interceptors), + ) .max_decoding_message_size(max_message_size.0 as usize) .max_encoding_message_size(max_message_size.0 as usize); if let Some(compression_encoding) = compression_encoding_opt { @@ -1024,7 +1053,6 @@ fn generate_layer_stack_impl(context: &CodegenContext) -> TokenStream { svc_attribute_idents.push(svc_attribute_name); } - quote! { impl #tower_layer_stack_name { pub fn stack_layer(mut self, layer: L) -> Self @@ -1051,9 +1079,10 @@ fn generate_layer_stack_impl(context: &CodegenContext) -> TokenStream { channel: tonic::transport::Channel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> #client_name { - let client = #client_name::from_channel(addr, channel, max_message_size, compression_encoding_opt); + let client = #client_name::from_channel(addr, channel, max_message_size, compression_encoding_opt, interceptors); let inner_client = client.inner; self.build_from_inner_client(inner_client) } @@ -1063,9 +1092,10 @@ fn generate_layer_stack_impl(context: &CodegenContext) -> TokenStream { balance_channel: quickwit_common::tower::BalanceChannel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> #client_name { - let client = #client_name::from_balance_channel(balance_channel, max_message_size, compression_encoding_opt); + let client = #client_name::from_balance_channel(balance_channel, max_message_size, compression_encoding_opt, interceptors); let inner_client = client.inner; self.build_from_inner_client(inner_client) } @@ -1279,7 +1309,7 @@ fn generate_grpc_client_adapter(context: &CodegenContext) -> TokenStream { pub fn new(instance: T, connection_addrs_rx: tokio::sync::watch::Receiver>) -> Self { Self { inner: instance, - connection_addrs_rx + connection_addrs_rx, } } } @@ -1435,8 +1465,7 @@ fn generate_grpc_server_adapter_methods(context: &CodegenContext) -> TokenStream let span = #span_macro; let _ = ::set_parent(&span, parent_context); let fut = async move { - self.inner - .0 + self.inner.0 .#method_name(request) .await .map(#into_response_type) diff --git a/quickwit/quickwit-common/src/tower/interceptor.rs b/quickwit/quickwit-common/src/tower/interceptor.rs new file mode 100644 index 00000000000..3515d10f357 --- /dev/null +++ b/quickwit/quickwit-common/src/tower/interceptor.rs @@ -0,0 +1,116 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use tonic::metadata::KeyAndValueRef; +use tonic::service::Interceptor; + +pub type GrpcInterceptor = + Arc) -> Result, tonic::Status> + Send + Sync>; + +#[derive(Clone, Default)] +pub struct GrpcInterceptors { + interceptors: Vec, +} + +impl GrpcInterceptors { + pub fn new(interceptors: impl IntoIterator) -> Self { + Self { + interceptors: interceptors.into_iter().collect(), + } + } +} + +impl Interceptor for GrpcInterceptors { + fn call(&mut self, request: tonic::Request<()>) -> Result, tonic::Status> { + self.interceptors + .iter() + .try_fold(request, |request, interceptor| interceptor(request)) + } +} + +pub fn fixed_headers_interceptor(headers: tonic::metadata::MetadataMap) -> GrpcInterceptor { + Arc::new(move |mut request: tonic::Request<()>| { + for key_and_value in headers.iter() { + match key_and_value { + KeyAndValueRef::Ascii(key, value) => { + request.metadata_mut().insert(key, value.to_owned()); + } + KeyAndValueRef::Binary(key, value) => { + request.metadata_mut().insert_bin(key, value.to_owned()); + } + } + } + Ok(request) + }) +} + +#[cfg(test)] +mod tests { + use tonic::metadata::{BinaryMetadataValue, MetadataMap, MetadataValue}; + + use super::*; + + #[test] + fn fixed_headers_interceptor_copies_ascii_and_binary_metadata() { + let mut headers = MetadataMap::new(); + headers.insert("x-ascii", MetadataValue::from_static("ascii-value")); + headers.insert_bin("x-bin", BinaryMetadataValue::from_bytes(b"binary-value")); + + let mut interceptors = GrpcInterceptors::new([fixed_headers_interceptor(headers)]); + let request = interceptors.call(tonic::Request::new(())).unwrap(); + + assert_eq!( + request.metadata().get("x-ascii").unwrap(), + MetadataValue::from_static("ascii-value") + ); + assert_eq!( + request + .metadata() + .get_bin("x-bin") + .unwrap() + .to_bytes() + .unwrap() + .as_ref(), + b"binary-value" + ); + } + + #[test] + fn grpc_interceptors_apply_multiple_interceptors_in_order() { + let first: GrpcInterceptor = Arc::new(|mut request| { + request + .metadata_mut() + .insert("x-order", MetadataValue::from_static("first")); + Ok(request) + }); + let second: GrpcInterceptor = Arc::new(|mut request| { + let value = request.metadata().get("x-order").unwrap().to_str().unwrap(); + assert_eq!(value, "first"); + request + .metadata_mut() + .insert("x-order", MetadataValue::from_static("second")); + Ok(request) + }); + + let mut interceptors = GrpcInterceptors::new([first, second]); + let request = interceptors.call(tonic::Request::new(())).unwrap(); + + assert_eq!( + request.metadata().get("x-order").unwrap(), + MetadataValue::from_static("second") + ); + } +} diff --git a/quickwit/quickwit-common/src/tower/mod.rs b/quickwit/quickwit-common/src/tower/mod.rs index 8be8065e12d..02b18265f05 100644 --- a/quickwit/quickwit-common/src/tower/mod.rs +++ b/quickwit/quickwit-common/src/tower/mod.rs @@ -20,6 +20,7 @@ mod circuit_breaker; mod delay; mod estimate_rate; mod event_listener; +mod interceptor; mod load_shed; mod metrics; mod one_task_per_call_layer; @@ -43,6 +44,7 @@ pub use delay::{Delay, DelayLayer}; pub use estimate_rate::{EstimateRate, EstimateRateLayer}; pub use event_listener::{EventListener, EventListenerLayer}; use futures::Future; +pub use interceptor::{GrpcInterceptor, GrpcInterceptors, fixed_headers_interceptor}; pub use load_shed::{LoadShed, LoadShedLayer, MakeLoadShedError}; pub use metrics::{GrpcMetrics, GrpcMetricsLayer, RpcName}; pub use one_task_per_call_layer::{OneTaskPerCallLayer, TaskCancelled}; diff --git a/quickwit/quickwit-config/resources/tests/node_config/quickwit.json b/quickwit/quickwit-config/resources/tests/node_config/quickwit.json index 1548273cb89..80e1ed67493 100644 --- a/quickwit/quickwit-config/resources/tests/node_config/quickwit.json +++ b/quickwit/quickwit-config/resources/tests/node_config/quickwit.json @@ -18,6 +18,7 @@ ], "data_dir": "/opt/quickwit/data", "metastore_uri": "postgres://username:password@host:port/db", + "metastore_read_replica_uri": "postgres://username:replica-password@replica-host:port/db", "default_index_root_uri": "s3://quickwit-indexes", "rest": { "listen_port": 1111, diff --git a/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml b/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml index 3c97620f185..96d5d32707d 100644 --- a/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml +++ b/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml @@ -11,6 +11,7 @@ grpc_listen_port = 3333 peer_seeds = [ "quickwit-searcher-0.local", "quickwit-searcher-1.local" ] data_dir = "/opt/quickwit/data" metastore_uri = "postgres://username:password@host:port/db" +metastore_read_replica_uri = "postgres://username:replica-password@replica-host:port/db" default_index_root_uri = "s3://quickwit-indexes" [rest] diff --git a/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml b/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml index 7d4551b714a..49708813331 100644 --- a/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml +++ b/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml @@ -15,6 +15,7 @@ peer_seeds: - quickwit-searcher-1.local data_dir: /opt/quickwit/data metastore_uri: postgres://username:password@host:port/db +metastore_read_replica_uri: postgres://username:replica-password@replica-host:port/db default_index_root_uri: s3://quickwit-indexes rest: diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index 1c60891cf58..8ec3eb5d0d4 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -812,6 +812,8 @@ pub struct NodeConfig { pub peer_seeds: Vec, pub data_dir_path: PathBuf, pub metastore_uri: Uri, + #[serde(skip_serializing_if = "Option::is_none")] + pub metastore_read_replica_uri: Option, pub default_index_root_uri: Uri, pub rest_config: RestConfig, #[serde(skip_serializing_if = "Option::is_none")] @@ -872,6 +874,9 @@ impl NodeConfig { pub fn redact(&mut self) { self.metastore_configs.redact(); self.metastore_uri.redact(); + if let Some(metastore_read_replica_uri) = &mut self.metastore_read_replica_uri { + metastore_read_replica_uri.redact(); + } self.storage_configs.redact(); } @@ -896,6 +901,26 @@ mod tests { use super::*; use crate::IndexerConfig; + #[test] + fn test_node_config_redact_metastore_uris() { + let mut config = NodeConfig::for_test(); + config.metastore_uri = Uri::for_test("postgresql://username:password@host:port/db"); + config.metastore_read_replica_uri = Some(Uri::for_test( + "postgresql://replica-user:replica-password@replica-host:port/db", + )); + + config.redact(); + + assert_eq!( + config.metastore_uri, + "postgresql://username:***redacted***@host:port/db" + ); + assert_eq!( + config.metastore_read_replica_uri.as_ref().unwrap().as_str(), + "postgresql://replica-user:***redacted***@replica-host:port/db" + ); + } + #[test] fn test_indexer_config_serialization() { { diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index 8b53cb6cf74..b42b043c666 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -130,6 +130,10 @@ fn default_metastore_uri(data_dir_uri: &Uri) -> Uri { data_dir_uri.join("indexes#polling_interval=30s").expect("Failed to create default metastore URI. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.") } +fn default_metastore_read_replica_uri() -> ConfigValue { + ConfigValue::none() +} + // See comment above. fn default_index_root_uri(data_dir_uri: &Uri) -> Uri { data_dir_uri.join("indexes").expect("Failed to create default index root URI. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.") @@ -191,6 +195,8 @@ struct NodeConfigBuilder { #[serde(default = "default_data_dir_uri")] data_dir_uri: ConfigValue, metastore_uri: ConfigValue, + #[serde(default = "default_metastore_read_replica_uri")] + metastore_read_replica_uri: ConfigValue, default_index_root_uri: ConfigValue, #[serde(rename = "rest")] #[serde(default)] @@ -300,6 +306,8 @@ impl NodeConfigBuilder { .metastore_uri .resolve_optional(env_vars)? .unwrap_or_else(|| default_metastore_uri(&data_dir_uri)); + let metastore_read_replica_uri = + self.metastore_read_replica_uri.resolve_optional(env_vars)?; let default_index_root_uri = self .default_index_root_uri @@ -330,6 +338,7 @@ impl NodeConfigBuilder { peer_seeds: self.peer_seeds.resolve(env_vars)?.0, data_dir_path, metastore_uri, + metastore_read_replica_uri, default_index_root_uri, rest_config, health_config, @@ -429,6 +438,7 @@ impl Default for NodeConfigBuilder { peer_seeds: ConfigValue::with_default(List::default()), data_dir_uri: default_data_dir_uri(), metastore_uri: ConfigValue::none(), + metastore_read_replica_uri: ConfigValue::none(), default_index_root_uri: ConfigValue::none(), rest_config_builder: RestConfigBuilder::default(), health_config_builder: HealthConfigBuilder::default(), @@ -564,6 +574,7 @@ pub fn node_config_for_tests_from_ports( peer_seeds: Vec::new(), data_dir_path, metastore_uri, + metastore_read_replica_uri: None, default_index_root_uri, rest_config, health_config: None, @@ -663,6 +674,10 @@ mod tests { config.metastore_uri, "postgresql://username:password@host:port/db" ); + assert_eq!( + config.metastore_read_replica_uri.as_ref().unwrap().as_str(), + "postgresql://username:replica-password@replica-host:port/db" + ); assert_eq!(config.default_index_root_uri, "s3://quickwit-indexes"); let azure_storage_config = config.storage_configs.find_azure().unwrap(); @@ -848,6 +863,7 @@ mod tests { env::current_dir().unwrap().display() ) ); + assert!(config.metastore_read_replica_uri.is_none()); assert_eq!( config.default_index_root_uri, format!( @@ -883,6 +899,11 @@ mod tests { "QW_METASTORE_URI".to_string(), "postgresql://test-user:test-password@test-host:4321/test-db".to_string(), ); + env_vars.insert( + "QW_METASTORE_READ_REPLICA_URI".to_string(), + "postgresql://test-user:test-replica-password@test-replica-host:4321/test-db" + .to_string(), + ); env_vars.insert( "QW_DEFAULT_INDEX_ROOT_URI".to_string(), "s3://quickwit-indexes/prod".to_string(), @@ -945,6 +966,10 @@ mod tests { config.metastore_uri, "postgresql://test-user:test-password@test-host:4321/test-db" ); + assert_eq!( + config.metastore_read_replica_uri.as_ref().unwrap().as_str(), + "postgresql://test-user:test-replica-password@test-replica-host:4321/test-db" + ); assert_eq!(config.default_index_root_uri, "s3://quickwit-indexes/prod"); } @@ -968,6 +993,7 @@ mod tests { config.metastore_uri, "postgresql://username:password@host:port/db" ); + assert!(config.metastore_read_replica_uri.is_none()); } #[tokio::test] diff --git a/quickwit/quickwit-config/src/qw_env_vars.rs b/quickwit/quickwit-config/src/qw_env_vars.rs index 0a551ccaab9..c65a2fb6406 100644 --- a/quickwit/quickwit-config/src/qw_env_vars.rs +++ b/quickwit/quickwit-config/src/qw_env_vars.rs @@ -56,6 +56,7 @@ qw_env_vars!( QW_PEER_SEEDS, QW_DATA_DIR, QW_METASTORE_URI, + QW_METASTORE_READ_REPLICA_URI, QW_DEFAULT_INDEX_ROOT_URI ); diff --git a/quickwit/quickwit-datafusion/src/sources/metrics/index_resolver.rs b/quickwit/quickwit-datafusion/src/sources/metrics/index_resolver.rs index a8f96fec60b..b3adc651e62 100644 --- a/quickwit/quickwit-datafusion/src/sources/metrics/index_resolver.rs +++ b/quickwit/quickwit-datafusion/src/sources/metrics/index_resolver.rs @@ -28,7 +28,7 @@ use quickwit_common::uri::Uri; use quickwit_metastore::{IndexMetadataResponseExt, ListIndexesMetadataResponseExt}; use quickwit_parquet_engine::split::ParquetSplitKind; use quickwit_proto::metastore::{ - IndexMetadataRequest, ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient, + IndexMetadataRequest, ListIndexesMetadataRequest, MetastoreReadServiceClient, }; use tracing::debug; @@ -59,11 +59,11 @@ pub trait MetricsIndexResolver: Send + Sync + std::fmt::Debug { /// storage lazily on first read. #[derive(Clone)] pub struct MetastoreIndexResolver { - metastore: MetastoreServiceClient, + metastore: MetastoreReadServiceClient, } impl MetastoreIndexResolver { - pub fn new(metastore: MetastoreServiceClient) -> Self { + pub fn new(metastore: MetastoreReadServiceClient) -> Self { Self { metastore } } } @@ -85,7 +85,6 @@ impl MetricsIndexResolver for MetastoreIndexResolver { let response = self .metastore - .clone() .index_metadata(IndexMetadataRequest::for_index_id(index_name.to_string())) .await .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?; @@ -111,7 +110,6 @@ impl MetricsIndexResolver for MetastoreIndexResolver { async fn list_index_names(&self) -> DFResult> { let response = self .metastore - .clone() .list_indexes_metadata(ListIndexesMetadataRequest::all()) .await .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?; diff --git a/quickwit/quickwit-datafusion/src/sources/metrics/metastore_provider.rs b/quickwit/quickwit-datafusion/src/sources/metrics/metastore_provider.rs index 4e0927554e4..9c08f8c60a6 100644 --- a/quickwit/quickwit-datafusion/src/sources/metrics/metastore_provider.rs +++ b/quickwit/quickwit-datafusion/src/sources/metrics/metastore_provider.rs @@ -20,7 +20,7 @@ use async_trait::async_trait; use datafusion::error::Result as DFResult; use quickwit_metastore::{ListParquetSplitsQuery, list_parquet_splits_paginated}; use quickwit_parquet_engine::split::{ParquetSplitKind, ParquetSplitMetadata}; -use quickwit_proto::metastore::MetastoreServiceClient; +use quickwit_proto::metastore::MetastoreReadServiceClient; use quickwit_proto::types::IndexUid; use tracing::{debug, instrument}; @@ -30,14 +30,14 @@ use super::table_provider::MetricsSplitProvider; /// `MetricsSplitProvider` backed by the Quickwit metastore RPC. #[derive(Debug, Clone)] pub struct MetastoreSplitProvider { - metastore: MetastoreServiceClient, + metastore: MetastoreReadServiceClient, index_uid: IndexUid, split_kind: ParquetSplitKind, } impl MetastoreSplitProvider { pub fn new( - metastore: MetastoreServiceClient, + metastore: MetastoreReadServiceClient, index_uid: IndexUid, split_kind: ParquetSplitKind, ) -> Self { @@ -64,10 +64,13 @@ impl MetricsSplitProvider for MetastoreSplitProvider { )] async fn list_splits(&self, query: &MetricsSplitQuery) -> DFResult> { let metastore_query = to_metastore_query(&self.index_uid, query); - let records = - list_parquet_splits_paginated(self.metastore.clone(), self.split_kind, metastore_query) - .await - .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?; + let records = list_parquet_splits_paginated( + self.metastore.as_ref(), + self.split_kind, + metastore_query, + ) + .await + .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?; let splits: Vec = records.into_iter().map(|record| record.metadata).collect(); diff --git a/quickwit/quickwit-datafusion/src/sources/metrics/mod.rs b/quickwit/quickwit-datafusion/src/sources/metrics/mod.rs index 0e50e203fb3..fe545f95296 100644 --- a/quickwit/quickwit-datafusion/src/sources/metrics/mod.rs +++ b/quickwit/quickwit-datafusion/src/sources/metrics/mod.rs @@ -45,7 +45,7 @@ use quickwit_df_core::{ QuickwitRuntimePlugin, QuickwitRuntimeRegistration, QuickwitSubstraitConsumerExt, }; use quickwit_parquet_engine::split::ParquetSplitKind; -use quickwit_proto::metastore::{MetastoreError, MetastoreServiceClient}; +use quickwit_proto::metastore::{MetastoreError, MetastoreReadServiceClient}; use self::factory::{METRICS_FILE_TYPE, MetricsTableProviderFactory, SKETCHES_FILE_TYPE}; use self::index_resolver::{MetastoreIndexResolver, MetricsIndexResolver}; @@ -84,7 +84,7 @@ pub struct MetricsDataSource { impl MetricsDataSource { /// Create a production `MetricsDataSource` backed by the metastore. - pub fn new(metastore: MetastoreServiceClient) -> Self { + pub fn new(metastore: MetastoreReadServiceClient) -> Self { Self { index_resolver: Arc::new(MetastoreIndexResolver::new(metastore)), } diff --git a/quickwit/quickwit-datafusion/tests/distributed.rs b/quickwit/quickwit-datafusion/tests/distributed.rs index 7c8c498e35f..e5a84c352ba 100644 --- a/quickwit/quickwit-datafusion/tests/distributed.rs +++ b/quickwit/quickwit-datafusion/tests/distributed.rs @@ -69,7 +69,7 @@ async fn test_distributed_tasks_not_shuffles() { .await; } - let source = Arc::new(MetricsDataSource::new(metastore)); + let source = Arc::new(MetricsDataSource::new(Arc::new(metastore))); // Pool starts empty; we populate it once the workers are bound. The resolver // reads pool keys lazily at query time, so this ordering is safe. diff --git a/quickwit/quickwit-datafusion/tests/metrics.rs b/quickwit/quickwit-datafusion/tests/metrics.rs index c36412305fc..0a0f5d743ea 100644 --- a/quickwit/quickwit-datafusion/tests/metrics.rs +++ b/quickwit/quickwit-datafusion/tests/metrics.rs @@ -45,7 +45,7 @@ async fn start_sandbox() -> TestSandbox { /// Build a `DataFusionSessionBuilder` wired to the sandbox's metastore + storage. fn session_builder(sandbox: &TestSandbox) -> DataFusionSessionBuilder { - let source = Arc::new(MetricsDataSource::new(sandbox.metastore.clone())); + let source = Arc::new(MetricsDataSource::new(Arc::new(sandbox.metastore.clone()))); let schema_source = Arc::clone(&source); let registry = Arc::new(QuickwitObjectStoreRegistry::new( sandbox.storage_resolver.clone(), diff --git a/quickwit/quickwit-datafusion/tests/null_columns.rs b/quickwit/quickwit-datafusion/tests/null_columns.rs index ed8bfa41685..2cc5a59f5e4 100644 --- a/quickwit/quickwit-datafusion/tests/null_columns.rs +++ b/quickwit/quickwit-datafusion/tests/null_columns.rs @@ -106,7 +106,7 @@ async fn test_null_columns_for_missing_parquet_fields() { ); publish_split(&metastore, &index_uid, data_dir.path(), "wide", &batch_b).await; - let source = Arc::new(MetricsDataSource::new(metastore)); + let source = Arc::new(MetricsDataSource::new(Arc::new(metastore))); let schema_source = Arc::clone(&source); let registry = Arc::new(QuickwitObjectStoreRegistry::new( sandbox.storage_resolver.clone(), diff --git a/quickwit/quickwit-datafusion/tests/sketches.rs b/quickwit/quickwit-datafusion/tests/sketches.rs index ea14fbb986b..43411e6e20c 100644 --- a/quickwit/quickwit-datafusion/tests/sketches.rs +++ b/quickwit/quickwit-datafusion/tests/sketches.rs @@ -36,7 +36,7 @@ async fn start_sandbox() -> TestSandbox { } fn session_builder(sandbox: &TestSandbox) -> DataFusionSessionBuilder { - let source = Arc::new(MetricsDataSource::new(sandbox.metastore.clone())); + let source = Arc::new(MetricsDataSource::new(Arc::new(sandbox.metastore.clone()))); let schema_source = Arc::clone(&source); let registry = Arc::new(QuickwitObjectStoreRegistry::new( sandbox.storage_resolver.clone(), diff --git a/quickwit/quickwit-index-management/src/parquet_garbage_collection.rs b/quickwit/quickwit-index-management/src/parquet_garbage_collection.rs index 6517123c43c..5700b286f04 100644 --- a/quickwit/quickwit-index-management/src/parquet_garbage_collection.rs +++ b/quickwit/quickwit-index-management/src/parquet_garbage_collection.rs @@ -177,7 +177,7 @@ async fn list_parquet_splits( let kind = parquet_split_kind_for_index(index_uid); protect_future( progress_opt, - list_parquet_splits_paginated(metastore.clone(), kind, query), + list_parquet_splits_paginated(metastore, kind, query), ) .await .context("failed to list parquet splits") diff --git a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_pipeline.rs index 6de3f5f5983..8ee01056e01 100644 --- a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_pipeline.rs @@ -78,7 +78,7 @@ async fn fetch_published_parquet_splits_paginated( }; let query = quickwit_metastore::ListParquetSplitsQuery::for_index(index_uid.clone()) .retain_immature(OffsetDateTime::now_utc()); - let records = list_parquet_splits_paginated(metastore, kind, query).await?; + let records = list_parquet_splits_paginated(&metastore, kind, query).await?; debug!( num_splits = records.len(), "fetched published parquet splits for merge planning" diff --git a/quickwit/quickwit-ingest/src/codegen/ingest_service.rs b/quickwit/quickwit-ingest/src/codegen/ingest_service.rs index d032c70d8da..e8dcc2c46de 100644 --- a/quickwit/quickwit-ingest/src/codegen/ingest_service.rs +++ b/quickwit/quickwit-ingest/src/codegen/ingest_service.rs @@ -238,12 +238,14 @@ impl IngestServiceClient { channel: tonic::transport::Channel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> Self { let (_, connection_keys_watcher) = tokio::sync::watch::channel( std::collections::HashSet::from_iter([addr]), ); - let mut client = ingest_service_grpc_client::IngestServiceGrpcClient::new( + let mut client = ingest_service_grpc_client::IngestServiceGrpcClient::with_interceptor( channel, + quickwit_common::tower::GrpcInterceptors::new(interceptors), ) .max_decoding_message_size(max_message_size.0 as usize) .max_encoding_message_size(max_message_size.0 as usize); @@ -262,10 +264,12 @@ impl IngestServiceClient { balance_channel: quickwit_common::tower::BalanceChannel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> IngestServiceClient { let connection_keys_watcher = balance_channel.connection_keys_watcher(); - let mut client = ingest_service_grpc_client::IngestServiceGrpcClient::new( + let mut client = ingest_service_grpc_client::IngestServiceGrpcClient::with_interceptor( balance_channel, + quickwit_common::tower::GrpcInterceptors::new(interceptors), ) .max_decoding_message_size(max_message_size.0 as usize) .max_encoding_message_size(max_message_size.0 as usize); @@ -620,12 +624,14 @@ impl IngestServiceTowerLayerStack { channel: tonic::transport::Channel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> IngestServiceClient { let client = IngestServiceClient::from_channel( addr, channel, max_message_size, compression_encoding_opt, + interceptors, ); let inner_client = client.inner; self.build_from_inner_client(inner_client) @@ -635,11 +641,13 @@ impl IngestServiceTowerLayerStack { balance_channel: quickwit_common::tower::BalanceChannel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> IngestServiceClient { let client = IngestServiceClient::from_balance_channel( balance_channel, max_message_size, compression_encoding_opt, + interceptors, ); let inner_client = client.inner; self.build_from_inner_client(inner_client) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index b1de41d3163..540c98260ca 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -2519,6 +2519,7 @@ mod tests { follower_channel, MAX_GRPC_MESSAGE_SIZE, None, + [], ); let ingester_pool_entry = IngesterPoolEntry { diff --git a/quickwit/quickwit-integration-tests/src/tests/metrics_distributed_tests.rs b/quickwit/quickwit-integration-tests/src/tests/metrics_distributed_tests.rs index 533c5960997..8fabfed43dd 100644 --- a/quickwit/quickwit-integration-tests/src/tests/metrics_distributed_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/metrics_distributed_tests.rs @@ -56,7 +56,7 @@ fn metastore_client(sandbox: &ClusterSandbox) -> MetastoreServiceClient { let channel = tonic::transport::Channel::from_shared(format!("http://{addr}")) .unwrap() .connect_lazy(); - MetastoreServiceClient::from_channel(addr, channel, bytesize::ByteSize::mib(20), None) + MetastoreServiceClient::from_channel(addr, channel, bytesize::ByteSize::mib(20), None, []) } fn datafusion_client( @@ -323,7 +323,7 @@ async fn test_null_columns_for_missing_parquet_fields() { publish_split(&metastore, &index_uid, data_dir.path(), "wide", &batch_b).await; let storage_resolver = quickwit_storage::StorageResolver::unconfigured(); - let source = Arc::new(MetricsDataSource::new(metastore)); + let source = Arc::new(MetricsDataSource::new(Arc::new(metastore))); let schema_source = Arc::clone(&source); let registry = Arc::new(QuickwitObjectStoreRegistry::new(storage_resolver)); let builder = DataFusionSessionBuilder::new() diff --git a/quickwit/quickwit-janitor/src/retention_policy_execution.rs b/quickwit/quickwit-janitor/src/retention_policy_execution.rs index bf06dd1b032..3eed6bb6a69 100644 --- a/quickwit/quickwit-janitor/src/retention_policy_execution.rs +++ b/quickwit/quickwit-janitor/src/retention_policy_execution.rs @@ -117,11 +117,7 @@ pub async fn run_execute_parquet_retention_policy( ParquetSplitKind::Metrics }; let expired_splits: Vec = ctx - .protect_future(list_parquet_splits_paginated( - metastore.clone(), - kind, - query, - )) + .protect_future(list_parquet_splits_paginated(&metastore, kind, query)) .await?; if expired_splits.is_empty() { diff --git a/quickwit/quickwit-metastore/src/lib.rs b/quickwit/quickwit-metastore/src/lib.rs index 9a54edc57af..7690fad4fba 100644 --- a/quickwit/quickwit-metastore/src/lib.rs +++ b/quickwit/quickwit-metastore/src/lib.rs @@ -52,7 +52,7 @@ pub use metastore::{ StageParquetSplitsRequestExt, StageSplitsRequestExt, UpdateIndexRequestExt, UpdateSourceRequestExt, file_backed, list_parquet_splits_page, list_parquet_splits_paginated, }; -pub use metastore_factory::{MetastoreFactory, UnsupportedMetastore}; +pub use metastore_factory::{MetastoreFactory, MetastoreFactoryOptions, UnsupportedMetastore}; pub use metastore_resolver::MetastoreResolver; use quickwit_common::is_disjoint; use quickwit_doc_mapper::tag_pruning::TagFilterAst; diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_metastore_factory.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_metastore_factory.rs index 2c489eb4dd5..96823851221 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_metastore_factory.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_metastore_factory.rs @@ -26,7 +26,9 @@ use regex::Regex; use tokio::sync::Mutex; use tracing::debug; -use crate::{FileBackedMetastore, MetastoreFactory, MetastoreResolverError}; +use crate::{ + FileBackedMetastore, MetastoreFactory, MetastoreFactoryOptions, MetastoreResolverError, +}; /// A file-backed metastore factory. /// @@ -101,6 +103,7 @@ impl MetastoreFactory for FileBackedMetastoreFactory { &self, _metastore_config: &MetastoreConfig, uri: &Uri, + _options: MetastoreFactoryOptions, ) -> Result { let (uri_stripped, polling_interval_opt) = extract_polling_interval_from_uri(uri.as_str()); let uri = Uri::from_str(&uri_stripped).map_err(|_| { diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index f94c254228f..b4045c6a4e5 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -38,10 +38,11 @@ use quickwit_proto::metastore::{ IndexMetadataRequest, IndexMetadataResponse, IndexesMetadataResponse, ListIndexesMetadataResponse, ListMetricsSplitsRequest, ListMetricsSplitsResponse, ListSketchSplitsRequest, ListSketchSplitsResponse, ListSplitsRequest, ListSplitsResponse, - MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceClient, - MetastoreServiceStream, PublishMetricsSplitsRequest, PublishSketchSplitsRequest, - PublishSplitsRequest, StageMetricsSplitsRequest, StageSketchSplitsRequest, StageSplitsRequest, - UpdateIndexRequest, UpdateSourceRequest, serde_utils, + MetastoreError, MetastoreReadService, MetastoreResult, MetastoreService, + MetastoreServiceClient, MetastoreServiceStream, PublishMetricsSplitsRequest, + PublishSketchSplitsRequest, PublishSplitsRequest, StageMetricsSplitsRequest, + StageSketchSplitsRequest, StageSplitsRequest, UpdateIndexRequest, UpdateSourceRequest, + serde_utils, }; use quickwit_proto::types::{IndexUid, NodeId, SplitId}; use serde::{Deserialize, Serialize}; @@ -972,7 +973,7 @@ pub struct ParquetSplitsPage { /// Lists one parquet splits page and advances `query.after_split_id`. pub async fn list_parquet_splits_page( - metastore: &MetastoreServiceClient, + metastore: &dyn MetastoreReadService, kind: ParquetSplitKind, query: &mut ListParquetSplitsQuery, ) -> MetastoreResult { @@ -1011,7 +1012,7 @@ pub async fn list_parquet_splits_page( /// `page_size`; `after_split_id` is used as the starting cursor when already /// set and is advanced internally after each full page. pub async fn list_parquet_splits_paginated( - metastore: MetastoreServiceClient, + metastore: &dyn MetastoreReadService, kind: ParquetSplitKind, mut query: ListParquetSplitsQuery, ) -> MetastoreResult> { @@ -1019,7 +1020,7 @@ pub async fn list_parquet_splits_paginated( let mut splits = Vec::new(); loop { - let mut page = list_parquet_splits_page(&metastore, kind, &mut query).await?; + let mut page = list_parquet_splits_page(metastore, kind, &mut query).await?; splits.append(&mut page.splits); if !page.has_next_page { break; diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/factory.rs b/quickwit/quickwit-metastore/src/metastore/postgres/factory.rs index 97aded689b1..47a42a1aafa 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/factory.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/factory.rs @@ -22,7 +22,9 @@ use quickwit_proto::metastore::MetastoreServiceClient; use tokio::sync::Mutex; use tracing::debug; -use crate::{MetastoreFactory, MetastoreResolverError, PostgresqlMetastore}; +use crate::{ + MetastoreFactory, MetastoreFactoryOptions, MetastoreResolverError, PostgresqlMetastore, +}; #[derive(Clone, Default)] pub struct PostgresqlMetastoreFactory { @@ -31,13 +33,17 @@ pub struct PostgresqlMetastoreFactory { // In contrast to the file-backed metastore, we use a strong pointer here, so that the // `Metastore` doesn't get dropped. This is done in order to keep the underlying connection // pool to Postgres alive. - cache: Arc>>, + cache: Arc>>, } impl PostgresqlMetastoreFactory { - async fn get_from_cache(&self, uri: &Uri) -> Option { + async fn get_from_cache( + &self, + uri: &Uri, + options: MetastoreFactoryOptions, + ) -> Option { let cache_lock = self.cache.lock().await; - cache_lock.get(uri).cloned() + cache_lock.get(&(uri.clone(), options)).cloned() } /// If there is a valid entry in the cache to begin with, we trash the new @@ -48,13 +54,15 @@ impl PostgresqlMetastoreFactory { async fn cache_metastore( &self, uri: Uri, + options: MetastoreFactoryOptions, metastore: MetastoreServiceClient, ) -> MetastoreServiceClient { let mut cache_lock = self.cache.lock().await; - if let Some(metastore) = cache_lock.get(&uri) { + let cache_key = (uri, options); + if let Some(metastore) = cache_lock.get(&cache_key) { return metastore.clone(); } - cache_lock.insert(uri, metastore.clone()); + cache_lock.insert(cache_key, metastore.clone()); metastore } } @@ -69,8 +77,9 @@ impl MetastoreFactory for PostgresqlMetastoreFactory { &self, metastore_config: &MetastoreConfig, uri: &Uri, + options: MetastoreFactoryOptions, ) -> Result { - if let Some(metastore) = self.get_from_cache(uri).await { + if let Some(metastore) = self.get_from_cache(uri, options).await { debug!("using metastore from cache"); return Ok(metastore); } @@ -82,12 +91,15 @@ impl MetastoreFactory for PostgresqlMetastoreFactory { ); MetastoreResolverError::InvalidConfig(message) })?; - let postgresql_metastore = PostgresqlMetastore::new(postgresql_metastore_config, uri) - .await - .map(MetastoreServiceClient::new) - .map_err(MetastoreResolverError::Initialization)?; + let postgresql_metastore = if options.read_only { + PostgresqlMetastore::new_read_only(postgresql_metastore_config, uri).await + } else { + PostgresqlMetastore::new(postgresql_metastore_config, uri).await + } + .map(MetastoreServiceClient::new) + .map_err(MetastoreResolverError::Initialization)?; let unique_metastore_for_uri = self - .cache_metastore(uri.clone(), postgresql_metastore) + .cache_metastore(uri.clone(), options, postgresql_metastore) .await; Ok(unique_metastore_for_uri) } diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 43bd995e1ba..8887e608c8d 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -104,11 +104,62 @@ impl fmt::Debug for PostgresqlMetastore { } } +#[derive(Clone, Copy, Debug)] +struct PostgresqlMetastoreOptions { + read_only: bool, + skip_migrations: bool, + skip_locking: bool, +} + +impl PostgresqlMetastoreOptions { + fn from_env() -> Self { + Self { + read_only: get_bool_from_env(QW_POSTGRES_READ_ONLY_ENV_KEY, false), + skip_migrations: get_bool_from_env(QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY, false), + skip_locking: get_bool_from_env(QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY, false), + } + } + + fn read_only() -> Self { + Self { + read_only: true, + skip_migrations: true, + skip_locking: false, + } + } +} + impl PostgresqlMetastore { /// Creates a metastore given a database URI. pub async fn new( postgres_metastore_config: &PostgresMetastoreConfig, connection_uri: &Uri, + ) -> MetastoreResult { + Self::new_with_options( + postgres_metastore_config, + connection_uri, + PostgresqlMetastoreOptions::from_env(), + ) + .await + } + + /// Creates a read-only metastore given a database URI. + pub async fn new_read_only( + postgres_metastore_config: &PostgresMetastoreConfig, + connection_uri: &Uri, + ) -> MetastoreResult { + Self::new_with_options( + postgres_metastore_config, + connection_uri, + PostgresqlMetastoreOptions::read_only(), + ) + .await + } + + async fn new_with_options( + postgres_metastore_config: &PostgresMetastoreConfig, + connection_uri: &Uri, + options: PostgresqlMetastoreOptions, ) -> MetastoreResult { let min_connections = postgres_metastore_config.min_connections; let max_connections = postgres_metastore_config.max_connections.get(); @@ -122,10 +173,6 @@ impl PostgresqlMetastore { .max_connection_lifetime_opt() .expect("PostgreSQL metastore config should have been validated"); - let read_only = get_bool_from_env(QW_POSTGRES_READ_ONLY_ENV_KEY, false); - let skip_migrations = get_bool_from_env(QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY, false); - let skip_locking = get_bool_from_env(QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY, false); - let connection_pool = establish_connection( connection_uri, min_connections, @@ -133,11 +180,16 @@ impl PostgresqlMetastore { acquire_timeout, idle_timeout_opt, max_lifetime_opt, - read_only, + options.read_only, ) .await?; - run_migrations(&connection_pool, skip_migrations, skip_locking).await?; + run_migrations( + &connection_pool, + options.skip_migrations, + options.skip_locking, + ) + .await?; let metastore = PostgresqlMetastore { uri: connection_uri.clone(), diff --git a/quickwit/quickwit-metastore/src/metastore_factory.rs b/quickwit/quickwit-metastore/src/metastore_factory.rs index 872b16d3071..edea06e996e 100644 --- a/quickwit/quickwit-metastore/src/metastore_factory.rs +++ b/quickwit/quickwit-metastore/src/metastore_factory.rs @@ -19,6 +19,13 @@ use quickwit_proto::metastore::MetastoreServiceClient; use crate::MetastoreResolverError; +/// Options controlling how a metastore client is resolved. +#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)] +pub struct MetastoreFactoryOptions { + /// Whether the resolved metastore client should use a read-only connection. + pub read_only: bool, +} + /// A metastore factory builds a [`MetastoreServiceClient`] object for a target [`MetastoreBackend`] /// from a [`MetastoreConfig`] and a [`Uri`]. #[cfg_attr(any(test, feature = "testsuite"), mockall::automock)] @@ -27,11 +34,13 @@ pub trait MetastoreFactory: Send + Sync + 'static { /// Returns the metastore backend targeted by the factory. fn backend(&self) -> MetastoreBackend; - /// Returns the appropriate [`MetastoreServiceClient`] object for the `uri`. + /// Returns the appropriate [`MetastoreServiceClient`] object for the `uri` and resolution + /// options. async fn resolve( &self, metastore_config: &MetastoreConfig, uri: &Uri, + options: MetastoreFactoryOptions, ) -> Result; } @@ -59,6 +68,7 @@ impl MetastoreFactory for UnsupportedMetastore { &self, _metastore_config: &MetastoreConfig, _uri: &Uri, + _options: MetastoreFactoryOptions, ) -> Result { Err(MetastoreResolverError::UnsupportedBackend( self.message.to_string(), diff --git a/quickwit/quickwit-metastore/src/metastore_resolver.rs b/quickwit/quickwit-metastore/src/metastore_resolver.rs index bc4d2cac6e7..e388ccd26ed 100644 --- a/quickwit/quickwit-metastore/src/metastore_resolver.rs +++ b/quickwit/quickwit-metastore/src/metastore_resolver.rs @@ -25,7 +25,7 @@ use quickwit_storage::StorageResolver; use crate::metastore::file_backed::FileBackedMetastoreFactory; #[cfg(feature = "postgres")] use crate::metastore::postgres::PostgresqlMetastoreFactory; -use crate::{MetastoreFactory, MetastoreResolverError}; +use crate::{MetastoreFactory, MetastoreFactoryOptions, MetastoreResolverError}; type FactoryAndConfig = (Box, MetastoreConfig); @@ -53,6 +53,24 @@ impl MetastoreResolver { pub async fn resolve( &self, uri: &Uri, + ) -> Result { + self.resolve_inner(uri, MetastoreFactoryOptions::default()) + .await + } + + /// Resolves the given `uri` as a read-only metastore client. + pub async fn resolve_read_only( + &self, + uri: &Uri, + ) -> Result { + self.resolve_inner(uri, MetastoreFactoryOptions { read_only: true }) + .await + } + + async fn resolve_inner( + &self, + uri: &Uri, + options: MetastoreFactoryOptions, ) -> Result { let backend = match uri.protocol() { Protocol::Azure => MetastoreBackend::File, @@ -67,13 +85,20 @@ impl MetastoreResolver { )); } }; + if options.read_only && backend != MetastoreBackend::PostgreSQL { + return Err(MetastoreResolverError::UnsupportedBackend( + "read-only metastore connections are only supported for PostgreSQL".to_string(), + )); + } let (metastore_factory, metastore_config) = self .per_backend_factories .get(&backend) .ok_or(MetastoreResolverError::UnsupportedBackend( "no metastore factory is registered for this backend".to_string(), ))?; - let metastore = metastore_factory.resolve(metastore_config, uri).await?; + let metastore = metastore_factory + .resolve(metastore_config, uri, options) + .await?; Ok(metastore) } @@ -184,6 +209,21 @@ mod tests { metastore_resolver.resolve(&metastore_uri).await.unwrap(); } + #[tokio::test] + async fn test_metastore_resolver_should_reject_read_only_file() { + let metastore_resolver = MetastoreResolver::unconfigured(); + let metastore_uri = Uri::from_str("ram:///metastore").unwrap(); + let error = metastore_resolver + .resolve_read_only(&metastore_uri) + .await + .unwrap_err(); + assert!(matches!( + error, + MetastoreResolverError::UnsupportedBackend(message) + if message == "read-only metastore connections are only supported for PostgreSQL" + )); + } + #[cfg(feature = "postgres")] #[tokio::test] async fn test_postgres_and_postgresql_protocol_accepted() { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.cluster.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.cluster.rs index f1db422cfa7..e506a58edac 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.cluster.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.cluster.rs @@ -137,12 +137,14 @@ impl ClusterServiceClient { channel: tonic::transport::Channel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> Self { let (_, connection_keys_watcher) = tokio::sync::watch::channel( std::collections::HashSet::from_iter([addr]), ); - let mut client = cluster_service_grpc_client::ClusterServiceGrpcClient::new( + let mut client = cluster_service_grpc_client::ClusterServiceGrpcClient::with_interceptor( channel, + quickwit_common::tower::GrpcInterceptors::new(interceptors), ) .max_decoding_message_size(max_message_size.0 as usize) .max_encoding_message_size(max_message_size.0 as usize); @@ -161,10 +163,12 @@ impl ClusterServiceClient { balance_channel: quickwit_common::tower::BalanceChannel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> ClusterServiceClient { let connection_keys_watcher = balance_channel.connection_keys_watcher(); - let mut client = cluster_service_grpc_client::ClusterServiceGrpcClient::new( + let mut client = cluster_service_grpc_client::ClusterServiceGrpcClient::with_interceptor( balance_channel, + quickwit_common::tower::GrpcInterceptors::new(interceptors), ) .max_decoding_message_size(max_message_size.0 as usize) .max_encoding_message_size(max_message_size.0 as usize); @@ -347,12 +351,14 @@ impl ClusterServiceTowerLayerStack { channel: tonic::transport::Channel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> ClusterServiceClient { let client = ClusterServiceClient::from_channel( addr, channel, max_message_size, compression_encoding_opt, + interceptors, ); let inner_client = client.inner; self.build_from_inner_client(inner_client) @@ -362,11 +368,13 @@ impl ClusterServiceTowerLayerStack { balance_channel: quickwit_common::tower::BalanceChannel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> ClusterServiceClient { let client = ClusterServiceClient::from_balance_channel( balance_channel, max_message_size, compression_encoding_opt, + interceptors, ); let inner_client = client.inner; self.build_from_inner_client(inner_client) diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs index a2bcc2af275..a88f7088b0a 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs @@ -222,12 +222,14 @@ impl ControlPlaneServiceClient { channel: tonic::transport::Channel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> Self { let (_, connection_keys_watcher) = tokio::sync::watch::channel( std::collections::HashSet::from_iter([addr]), ); - let mut client = control_plane_service_grpc_client::ControlPlaneServiceGrpcClient::new( + let mut client = control_plane_service_grpc_client::ControlPlaneServiceGrpcClient::with_interceptor( channel, + quickwit_common::tower::GrpcInterceptors::new(interceptors), ) .max_decoding_message_size(max_message_size.0 as usize) .max_encoding_message_size(max_message_size.0 as usize); @@ -246,10 +248,12 @@ impl ControlPlaneServiceClient { balance_channel: quickwit_common::tower::BalanceChannel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> ControlPlaneServiceClient { let connection_keys_watcher = balance_channel.connection_keys_watcher(); - let mut client = control_plane_service_grpc_client::ControlPlaneServiceGrpcClient::new( + let mut client = control_plane_service_grpc_client::ControlPlaneServiceGrpcClient::with_interceptor( balance_channel, + quickwit_common::tower::GrpcInterceptors::new(interceptors), ) .max_decoding_message_size(max_message_size.0 as usize) .max_encoding_message_size(max_message_size.0 as usize); @@ -1376,12 +1380,14 @@ impl ControlPlaneServiceTowerLayerStack { channel: tonic::transport::Channel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> ControlPlaneServiceClient { let client = ControlPlaneServiceClient::from_channel( addr, channel, max_message_size, compression_encoding_opt, + interceptors, ); let inner_client = client.inner; self.build_from_inner_client(inner_client) @@ -1391,11 +1397,13 @@ impl ControlPlaneServiceTowerLayerStack { balance_channel: quickwit_common::tower::BalanceChannel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> ControlPlaneServiceClient { let client = ControlPlaneServiceClient::from_balance_channel( balance_channel, max_message_size, compression_encoding_opt, + interceptors, ); let inner_client = client.inner; self.build_from_inner_client(inner_client) diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.developer.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.developer.rs index db4b4cc3fe9..8ed4a7f9884 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.developer.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.developer.rs @@ -71,12 +71,14 @@ impl DeveloperServiceClient { channel: tonic::transport::Channel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> Self { let (_, connection_keys_watcher) = tokio::sync::watch::channel( std::collections::HashSet::from_iter([addr]), ); - let mut client = developer_service_grpc_client::DeveloperServiceGrpcClient::new( + let mut client = developer_service_grpc_client::DeveloperServiceGrpcClient::with_interceptor( channel, + quickwit_common::tower::GrpcInterceptors::new(interceptors), ) .max_decoding_message_size(max_message_size.0 as usize) .max_encoding_message_size(max_message_size.0 as usize); @@ -95,10 +97,12 @@ impl DeveloperServiceClient { balance_channel: quickwit_common::tower::BalanceChannel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> DeveloperServiceClient { let connection_keys_watcher = balance_channel.connection_keys_watcher(); - let mut client = developer_service_grpc_client::DeveloperServiceGrpcClient::new( + let mut client = developer_service_grpc_client::DeveloperServiceGrpcClient::with_interceptor( balance_channel, + quickwit_common::tower::GrpcInterceptors::new(interceptors), ) .max_decoding_message_size(max_message_size.0 as usize) .max_encoding_message_size(max_message_size.0 as usize); @@ -280,12 +284,14 @@ impl DeveloperServiceTowerLayerStack { channel: tonic::transport::Channel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> DeveloperServiceClient { let client = DeveloperServiceClient::from_channel( addr, channel, max_message_size, compression_encoding_opt, + interceptors, ); let inner_client = client.inner; self.build_from_inner_client(inner_client) @@ -295,11 +301,13 @@ impl DeveloperServiceTowerLayerStack { balance_channel: quickwit_common::tower::BalanceChannel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> DeveloperServiceClient { let client = DeveloperServiceClient::from_balance_channel( balance_channel, max_message_size, compression_encoding_opt, + interceptors, ); let inner_client = client.inner; self.build_from_inner_client(inner_client) diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs index dc89720854a..2f22a6af154 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs @@ -82,12 +82,14 @@ impl IndexingServiceClient { channel: tonic::transport::Channel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> Self { let (_, connection_keys_watcher) = tokio::sync::watch::channel( std::collections::HashSet::from_iter([addr]), ); - let mut client = indexing_service_grpc_client::IndexingServiceGrpcClient::new( + let mut client = indexing_service_grpc_client::IndexingServiceGrpcClient::with_interceptor( channel, + quickwit_common::tower::GrpcInterceptors::new(interceptors), ) .max_decoding_message_size(max_message_size.0 as usize) .max_encoding_message_size(max_message_size.0 as usize); @@ -106,10 +108,12 @@ impl IndexingServiceClient { balance_channel: quickwit_common::tower::BalanceChannel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> IndexingServiceClient { let connection_keys_watcher = balance_channel.connection_keys_watcher(); - let mut client = indexing_service_grpc_client::IndexingServiceGrpcClient::new( + let mut client = indexing_service_grpc_client::IndexingServiceGrpcClient::with_interceptor( balance_channel, + quickwit_common::tower::GrpcInterceptors::new(interceptors), ) .max_decoding_message_size(max_message_size.0 as usize) .max_encoding_message_size(max_message_size.0 as usize); @@ -292,12 +296,14 @@ impl IndexingServiceTowerLayerStack { channel: tonic::transport::Channel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> IndexingServiceClient { let client = IndexingServiceClient::from_channel( addr, channel, max_message_size, compression_encoding_opt, + interceptors, ); let inner_client = client.inner; self.build_from_inner_client(inner_client) @@ -307,11 +313,13 @@ impl IndexingServiceTowerLayerStack { balance_channel: quickwit_common::tower::BalanceChannel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> IndexingServiceClient { let client = IndexingServiceClient::from_balance_channel( balance_channel, max_message_size, compression_encoding_opt, + interceptors, ); let inner_client = client.inner; self.build_from_inner_client(inner_client) diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs index a40419ebec0..f2f3efe0a8d 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs @@ -677,12 +677,14 @@ impl IngesterServiceClient { channel: tonic::transport::Channel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> Self { let (_, connection_keys_watcher) = tokio::sync::watch::channel( std::collections::HashSet::from_iter([addr]), ); - let mut client = ingester_service_grpc_client::IngesterServiceGrpcClient::new( + let mut client = ingester_service_grpc_client::IngesterServiceGrpcClient::with_interceptor( channel, + quickwit_common::tower::GrpcInterceptors::new(interceptors), ) .max_decoding_message_size(max_message_size.0 as usize) .max_encoding_message_size(max_message_size.0 as usize); @@ -701,10 +703,12 @@ impl IngesterServiceClient { balance_channel: quickwit_common::tower::BalanceChannel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> IngesterServiceClient { let connection_keys_watcher = balance_channel.connection_keys_watcher(); - let mut client = ingester_service_grpc_client::IngesterServiceGrpcClient::new( + let mut client = ingester_service_grpc_client::IngesterServiceGrpcClient::with_interceptor( balance_channel, + quickwit_common::tower::GrpcInterceptors::new(interceptors), ) .max_decoding_message_size(max_message_size.0 as usize) .max_encoding_message_size(max_message_size.0 as usize); @@ -1683,12 +1687,14 @@ impl IngesterServiceTowerLayerStack { channel: tonic::transport::Channel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> IngesterServiceClient { let client = IngesterServiceClient::from_channel( addr, channel, max_message_size, compression_encoding_opt, + interceptors, ); let inner_client = client.inner; self.build_from_inner_client(inner_client) @@ -1698,11 +1704,13 @@ impl IngesterServiceTowerLayerStack { balance_channel: quickwit_common::tower::BalanceChannel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> IngesterServiceClient { let client = IngesterServiceClient::from_balance_channel( balance_channel, max_message_size, compression_encoding_opt, + interceptors, ); let inner_client = client.inner; self.build_from_inner_client(inner_client) diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs index 83d797af783..a7505991f11 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs @@ -180,12 +180,14 @@ impl IngestRouterServiceClient { channel: tonic::transport::Channel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> Self { let (_, connection_keys_watcher) = tokio::sync::watch::channel( std::collections::HashSet::from_iter([addr]), ); - let mut client = ingest_router_service_grpc_client::IngestRouterServiceGrpcClient::new( + let mut client = ingest_router_service_grpc_client::IngestRouterServiceGrpcClient::with_interceptor( channel, + quickwit_common::tower::GrpcInterceptors::new(interceptors), ) .max_decoding_message_size(max_message_size.0 as usize) .max_encoding_message_size(max_message_size.0 as usize); @@ -204,10 +206,12 @@ impl IngestRouterServiceClient { balance_channel: quickwit_common::tower::BalanceChannel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> IngestRouterServiceClient { let connection_keys_watcher = balance_channel.connection_keys_watcher(); - let mut client = ingest_router_service_grpc_client::IngestRouterServiceGrpcClient::new( + let mut client = ingest_router_service_grpc_client::IngestRouterServiceGrpcClient::with_interceptor( balance_channel, + quickwit_common::tower::GrpcInterceptors::new(interceptors), ) .max_decoding_message_size(max_message_size.0 as usize) .max_encoding_message_size(max_message_size.0 as usize); @@ -392,12 +396,14 @@ impl IngestRouterServiceTowerLayerStack { channel: tonic::transport::Channel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> IngestRouterServiceClient { let client = IngestRouterServiceClient::from_channel( addr, channel, max_message_size, compression_encoding_opt, + interceptors, ); let inner_client = client.inner; self.build_from_inner_client(inner_client) @@ -407,11 +413,13 @@ impl IngestRouterServiceTowerLayerStack { balance_channel: quickwit_common::tower::BalanceChannel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> IngestRouterServiceClient { let client = IngestRouterServiceClient::from_balance_channel( balance_channel, max_message_size, compression_encoding_opt, + interceptors, ); let inner_client = client.inner; self.build_from_inner_client(inner_client) diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index 3a8b176f4aa..9f7712f0673 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -1241,17 +1241,33 @@ impl MetastoreServiceClient { .max_decoding_message_size(max_message_size.0 as usize) .max_encoding_message_size(max_message_size.0 as usize) } + pub fn as_grpc_service_with_read_replica( + &self, + read_replica: Option, + max_message_size: bytesize::ByteSize, + ) -> crate::grpc_read_replica::ReadReplicaGrpcService< + metastore_service_grpc_server::MetastoreServiceGrpcServer< + MetastoreServiceGrpcServerAdapter, + >, + > { + let primary = self.as_grpc_service(max_message_size); + let read_replica = read_replica + .map(|client| client.as_grpc_service(max_message_size)); + crate::grpc_read_replica::ReadReplicaGrpcService::new(primary, read_replica) + } pub fn from_channel( addr: std::net::SocketAddr, channel: tonic::transport::Channel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> Self { let (_, connection_keys_watcher) = tokio::sync::watch::channel( std::collections::HashSet::from_iter([addr]), ); - let mut client = metastore_service_grpc_client::MetastoreServiceGrpcClient::new( + let mut client = metastore_service_grpc_client::MetastoreServiceGrpcClient::with_interceptor( channel, + quickwit_common::tower::GrpcInterceptors::new(interceptors), ) .max_decoding_message_size(max_message_size.0 as usize) .max_encoding_message_size(max_message_size.0 as usize); @@ -1270,10 +1286,12 @@ impl MetastoreServiceClient { balance_channel: quickwit_common::tower::BalanceChannel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> MetastoreServiceClient { let connection_keys_watcher = balance_channel.connection_keys_watcher(); - let mut client = metastore_service_grpc_client::MetastoreServiceGrpcClient::new( + let mut client = metastore_service_grpc_client::MetastoreServiceGrpcClient::with_interceptor( balance_channel, + quickwit_common::tower::GrpcInterceptors::new(interceptors), ) .max_decoding_message_size(max_message_size.0 as usize) .max_encoding_message_size(max_message_size.0 as usize); @@ -5648,12 +5666,14 @@ impl MetastoreServiceTowerLayerStack { channel: tonic::transport::Channel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> MetastoreServiceClient { let client = MetastoreServiceClient::from_channel( addr, channel, max_message_size, compression_encoding_opt, + interceptors, ); let inner_client = client.inner; self.build_from_inner_client(inner_client) @@ -5663,11 +5683,13 @@ impl MetastoreServiceTowerLayerStack { balance_channel: quickwit_common::tower::BalanceChannel, max_message_size: bytesize::ByteSize, compression_encoding_opt: Option, + interceptors: impl IntoIterator, ) -> MetastoreServiceClient { let client = MetastoreServiceClient::from_balance_channel( balance_channel, max_message_size, compression_encoding_opt, + interceptors, ); let inner_client = client.inner; self.build_from_inner_client(inner_client) diff --git a/quickwit/quickwit-proto/src/grpc_read_replica.rs b/quickwit/quickwit-proto/src/grpc_read_replica.rs new file mode 100644 index 00000000000..cf9ca957af0 --- /dev/null +++ b/quickwit/quickwit-proto/src/grpc_read_replica.rs @@ -0,0 +1,87 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::convert::Infallible; +use std::task::{Context, Poll}; + +pub const READ_REPLICA_HEADER_NAME: &str = "qw-use-read-replica"; +pub const READ_REPLICA_HEADER_VALUE: &str = "true"; + +pub fn read_replica_header_interceptor() -> quickwit_common::tower::GrpcInterceptor { + let mut headers = tonic::metadata::MetadataMap::new(); + headers.insert( + READ_REPLICA_HEADER_NAME, + tonic::metadata::MetadataValue::from_static(READ_REPLICA_HEADER_VALUE), + ); + quickwit_common::tower::fixed_headers_interceptor(headers) +} + +#[derive(Debug, Clone)] +pub struct ReadReplicaGrpcService { + primary: S, + read_replica: Option, +} + +impl ReadReplicaGrpcService { + pub fn new(primary: S, read_replica: Option) -> Self { + Self { + primary, + read_replica, + } + } +} + +impl tonic::server::NamedService for ReadReplicaGrpcService +where S: tonic::server::NamedService +{ + const NAME: &'static str = S::NAME; +} + +impl tower::Service> for ReadReplicaGrpcService +where + S: tower::Service< + http::Request, + Response = http::Response, + Error = Infallible, + > + Clone + + Send + + 'static, + S::Future: Send + 'static, + B: tonic::codegen::Body + Send + 'static, + B::Error: Into + Send + 'static, +{ + type Response = http::Response; + type Error = Infallible; + type Future = tonic::codegen::BoxFuture; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, request: http::Request) -> Self::Future { + let use_read_replica = request + .headers() + .get(READ_REPLICA_HEADER_NAME) + .and_then(|value| value.to_str().ok()) + == Some(READ_REPLICA_HEADER_VALUE); + let mut service = if use_read_replica { + self.read_replica + .clone() + .unwrap_or_else(|| self.primary.clone()) + } else { + self.primary.clone() + }; + Box::pin(async move { tower::Service::call(&mut service, request).await }) + } +} diff --git a/quickwit/quickwit-proto/src/lib.rs b/quickwit/quickwit-proto/src/lib.rs index 428387c6353..0d760bbcdfd 100644 --- a/quickwit/quickwit-proto/src/lib.rs +++ b/quickwit/quickwit-proto/src/lib.rs @@ -27,6 +27,7 @@ pub use tonic; pub mod developer; pub mod error; mod getters; +pub mod grpc_read_replica; pub mod indexing; pub mod ingest; pub mod metastore; diff --git a/quickwit/quickwit-proto/src/metastore/mod.rs b/quickwit/quickwit-proto/src/metastore/mod.rs index 4f53b9abb5c..85e5a191334 100644 --- a/quickwit/quickwit-proto/src/metastore/mod.rs +++ b/quickwit/quickwit-proto/src/metastore/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::fmt; +use std::sync::Arc; use quickwit_common::rate_limited_error; use quickwit_common::retry::Retryable; @@ -31,6 +32,87 @@ pub const METASTORE_FILE_DESCRIPTOR_SET: &[u8] = pub type MetastoreResult = Result; +/// Read-only subset of the metastore RPC surface that is safe to route to a read replica. +/// +/// Add methods here only for RPCs that are safe to route to a read replica. +#[async_trait::async_trait] +pub trait MetastoreReadService: fmt::Debug + Send + Sync + 'static { + /// Fetches index metadata. + async fn index_metadata( + &self, + request: IndexMetadataRequest, + ) -> MetastoreResult; + + /// Lists indexes metadata. + async fn list_indexes_metadata( + &self, + request: ListIndexesMetadataRequest, + ) -> MetastoreResult; + + /// Streams splits from indexes. + async fn list_splits( + &self, + request: ListSplitsRequest, + ) -> MetastoreResult>; + + /// Lists metrics parquet splits. + async fn list_metrics_splits( + &self, + request: ListMetricsSplitsRequest, + ) -> MetastoreResult; + + /// Lists sketch parquet splits. + async fn list_sketch_splits( + &self, + request: ListSketchSplitsRequest, + ) -> MetastoreResult; +} + +#[async_trait::async_trait] +impl MetastoreReadService for MetastoreServiceClient { + async fn index_metadata( + &self, + request: IndexMetadataRequest, + ) -> MetastoreResult { + MetastoreService::index_metadata(self, request).await + } + + async fn list_indexes_metadata( + &self, + request: ListIndexesMetadataRequest, + ) -> MetastoreResult { + MetastoreService::list_indexes_metadata(self, request).await + } + + async fn list_splits( + &self, + request: ListSplitsRequest, + ) -> MetastoreResult> { + MetastoreService::list_splits(self, request).await + } + + async fn list_metrics_splits( + &self, + request: ListMetricsSplitsRequest, + ) -> MetastoreResult { + MetastoreService::list_metrics_splits(self, request).await + } + + async fn list_sketch_splits( + &self, + request: ListSketchSplitsRequest, + ) -> MetastoreResult { + MetastoreService::list_sketch_splits(self, request).await + } +} + +/// Cloneable read-only metastore service handle. +/// +/// `Arc` derefs to `dyn MetastoreReadService`, so a +/// `&MetastoreReadServiceClient` coerces to the `&dyn MetastoreReadService` +/// taken by the read-only helpers — no blanket `impl ... for Arc` is needed. +pub type MetastoreReadServiceClient = Arc; + /// Lists the object types stored and managed by the metastore. #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] @@ -490,3 +572,142 @@ impl ListIndexesMetadataRequest { } } } + +#[cfg(test)] +mod tests { + use bytesize::ByteSize; + use tonic::transport::{Endpoint, Server}; + + use super::*; + + fn metastore_client_returning(marker: &'static str) -> MetastoreServiceClient { + let mut mock = MockMetastoreService::new(); + mock.expect_list_indexes_metadata() + .times(1) + .returning(move |_| { + Ok(ListIndexesMetadataResponse { + indexes_metadata_json_opt: Some(marker.to_string()), + indexes_metadata_json_zstd: Default::default(), + }) + }); + MetastoreServiceClient::from_mock(mock) + } + + async fn list_indexes_metadata_via_grpc( + primary: MetastoreServiceClient, + read_replica: Option, + use_read_replica: bool, + ) -> ListIndexesMetadataResponse { + let server = primary.as_grpc_service_with_read_replica(read_replica, ByteSize::mib(1)); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let incoming = futures::stream::unfold(listener, |listener| async { + Some((listener.accept().await.map(|(stream, _)| stream), listener)) + }); + let server_handle = tokio::spawn(async move { + Server::builder() + .add_service(server) + .serve_with_incoming(incoming) + .await + .unwrap(); + }); + + let channel = Endpoint::from_shared(format!("http://{addr}")) + .unwrap() + .connect_lazy(); + let mut client = metastore_service_grpc_client::MetastoreServiceGrpcClient::new(channel); + let mut request = tonic::Request::new(ListIndexesMetadataRequest::all()); + if use_read_replica { + request.metadata_mut().insert( + crate::grpc_read_replica::READ_REPLICA_HEADER_NAME, + tonic::metadata::MetadataValue::from_static( + crate::grpc_read_replica::READ_REPLICA_HEADER_VALUE, + ), + ); + } + let response = client + .list_indexes_metadata(request) + .await + .unwrap() + .into_inner(); + server_handle.abort(); + response + } + + #[tokio::test] + async fn test_metastore_grpc_server_routes_without_header_to_primary() { + let primary = metastore_client_returning("primary"); + + let response = + list_indexes_metadata_via_grpc(primary, Some(MetastoreServiceClient::mocked()), false) + .await; + assert_eq!( + response.indexes_metadata_json_opt.as_deref(), + Some("primary") + ); + } + + #[tokio::test] + async fn test_metastore_grpc_server_routes_read_replica_header_to_replica() { + let replica = metastore_client_returning("replica"); + + let response = + list_indexes_metadata_via_grpc(MetastoreServiceClient::mocked(), Some(replica), true) + .await; + assert_eq!( + response.indexes_metadata_json_opt.as_deref(), + Some("replica") + ); + } + + #[tokio::test] + async fn test_metastore_grpc_server_routes_read_replica_header_to_primary_when_unset() { + let primary = metastore_client_returning("primary"); + + let response = list_indexes_metadata_via_grpc(primary, None, true).await; + assert_eq!( + response.indexes_metadata_json_opt.as_deref(), + Some("primary") + ); + } + + #[tokio::test] + async fn test_metastore_grpc_client_sends_read_replica_header() { + let replica = metastore_client_returning("replica"); + let server = MetastoreServiceClient::mocked() + .as_grpc_service_with_read_replica(Some(replica), ByteSize::mib(1)); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let incoming = futures::stream::unfold(listener, |listener| async { + Some((listener.accept().await.map(|(stream, _)| stream), listener)) + }); + let server_handle = tokio::spawn(async move { + Server::builder() + .add_service(server) + .serve_with_incoming(incoming) + .await + .unwrap(); + }); + + let channel = Endpoint::from_shared(format!("http://{addr}")) + .unwrap() + .connect_lazy(); + let client = MetastoreServiceClient::from_channel( + addr, + channel, + ByteSize::mib(1), + None, + [crate::grpc_read_replica::read_replica_header_interceptor()], + ); + + let response = + MetastoreService::list_indexes_metadata(&client, ListIndexesMetadataRequest::all()) + .await + .unwrap(); + assert_eq!( + response.indexes_metadata_json_opt.as_deref(), + Some("replica") + ); + server_handle.abort(); + } +} diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index 8d2ad0f924d..c6867ec66b0 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -50,7 +50,8 @@ use quickwit_common::thread_pool::with_priority::ThreadPoolWithPriority; use quickwit_common::tower::Pool; use quickwit_doc_mapper::DocMapper; use quickwit_proto::metastore::{ - ListIndexesMetadataRequest, ListSplitsRequest, MetastoreService, MetastoreServiceClient, + ListIndexesMetadataRequest, ListSplitsRequest, MetastoreReadService, + MetastoreReadServiceClient, MetastoreServiceClient, }; use tantivy::schema::NamedFieldDocument; @@ -182,7 +183,7 @@ fn extract_split_and_footer_offsets(split_metadata: &SplitMetadata) -> SplitIdAn /// Get all splits of given index ids pub async fn list_all_splits( index_uids: Vec, - metastore: &mut MetastoreServiceClient, + metastore: &dyn MetastoreReadService, ) -> crate::Result> { list_relevant_splits(index_uids, None, None, None, metastore).await } @@ -193,7 +194,7 @@ pub async fn list_relevant_splits( start_timestamp: Option, end_timestamp: Option, tags_filter_opt: Option, - metastore: &mut MetastoreServiceClient, + metastore: &dyn MetastoreReadService, ) -> crate::Result> { let Some(mut query) = ListSplitsQuery::try_from_index_uids(index_uids) else { return Ok(Vec::new()); @@ -222,7 +223,7 @@ pub async fn list_relevant_splits( /// Patterns follow the elastic search patterns. pub async fn resolve_index_patterns( index_id_patterns: &[String], - metastore: &mut MetastoreServiceClient, + metastore: &dyn MetastoreReadService, ) -> crate::Result> { let list_indexes_metadata_request = if index_id_patterns.is_empty() { ListIndexesMetadataRequest::all() @@ -260,7 +261,7 @@ fn convert_document_to_json_string( /// Starts a search node, aka a `searcher`. pub async fn start_searcher_service( - metastore: MetastoreServiceClient, + metastore: MetastoreReadServiceClient, storage_resolver: StorageResolver, search_job_placer: SearchJobPlacer, searcher_context: Arc, @@ -282,6 +283,7 @@ pub async fn single_node_search( metastore: MetastoreServiceClient, storage_resolver: StorageResolver, ) -> crate::Result { + let metastore: MetastoreReadServiceClient = Arc::new(metastore); let socket_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 7280u16); let searcher_pool = SearcherPool::default(); let search_job_placer = SearchJobPlacer::new(searcher_pool.clone()); @@ -300,7 +302,7 @@ pub async fn single_node_search( root_search( &searcher_context, search_request, - metastore, + metastore.as_ref(), &cluster_client, ) .await diff --git a/quickwit/quickwit-search/src/list_fields/root.rs b/quickwit/quickwit-search/src/list_fields/root.rs index 66d5947d123..c95af4d8ffb 100644 --- a/quickwit/quickwit-search/src/list_fields/root.rs +++ b/quickwit/quickwit-search/src/list_fields/root.rs @@ -22,7 +22,7 @@ use quickwit_common::uri::Uri; use quickwit_config::build_doc_mapper; use quickwit_doc_mapper::tag_pruning::extract_tags_from_query; use quickwit_metastore::SplitMetadata; -use quickwit_proto::metastore::MetastoreServiceClient; +use quickwit_proto::metastore::MetastoreReadService; use quickwit_proto::search::{ LeafListFieldsRequest, ListFieldsEntry, ListFieldsRequest, ListFieldsResponse, }; @@ -54,10 +54,10 @@ struct IndexMetasForLeafSearch { pub async fn root_list_fields( list_fields_req: ListFieldsRequest, cluster_client: &ClusterClient, - mut metastore: MetastoreServiceClient, + metastore: &dyn MetastoreReadService, ) -> crate::Result { let indexes_metadata = - resolve_index_patterns(&list_fields_req.index_id_patterns[..], &mut metastore).await?; + resolve_index_patterns(&list_fields_req.index_id_patterns[..], metastore).await?; // The request contains a wildcard, but couldn't find any index. if indexes_metadata.is_empty() { @@ -115,7 +115,7 @@ pub async fn root_list_fields( start_timestamp, end_timestamp, tags_filter_opt, - &mut metastore, + metastore, ) .await?; diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs index 0c575793f41..30622b25af7 100644 --- a/quickwit/quickwit-search/src/list_terms.rs +++ b/quickwit/quickwit-search/src/list_terms.rs @@ -23,7 +23,7 @@ use quickwit_common::pretty::PrettySample; use quickwit_config::build_doc_mapper; use quickwit_metastore::{ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitMetadata}; use quickwit_metrics::HistogramTimer; -use quickwit_proto::metastore::{ListSplitsRequest, MetastoreService, MetastoreServiceClient}; +use quickwit_proto::metastore::{ListSplitsRequest, MetastoreReadService}; use quickwit_proto::search::{ LeafListTermsRequest, LeafListTermsResponse, ListTermsRequest, ListTermsResponse, SplitIdAndFooterOffsets, SplitSearchError, @@ -48,12 +48,12 @@ use crate::{ClusterClient, SearchError, SearchJob, SearcherContext, resolve_inde #[instrument(skip(list_terms_request, cluster_client, metastore))] pub async fn root_list_terms( list_terms_request: &ListTermsRequest, - mut metastore: MetastoreServiceClient, + metastore: &dyn MetastoreReadService, cluster_client: &ClusterClient, ) -> crate::Result { let start_instant = tokio::time::Instant::now(); let indexes_metadata = - resolve_index_patterns(&list_terms_request.index_id_patterns, &mut metastore).await?; + resolve_index_patterns(&list_terms_request.index_id_patterns, metastore).await?; // The request contains a wildcard, but couldn't find any index. if indexes_metadata.is_empty() { return Ok(ListTermsResponse { @@ -113,7 +113,6 @@ pub async fn root_list_terms( .collect(); let list_splits_request = ListSplitsRequest::try_from_list_splits_query(&query)?; let split_metadatas: Vec = metastore - .clone() .list_splits(list_splits_request) .await? .collect_splits_metadata() diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 7624c7c8e30..69416133f96 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -28,9 +28,7 @@ use quickwit_config::build_doc_mapper; use quickwit_doc_mapper::DYNAMIC_FIELD_NAME; use quickwit_doc_mapper::tag_pruning::extract_tags_from_query; use quickwit_metastore::{IndexMetadata, ListIndexesMetadataResponseExt, SplitMetadata}; -use quickwit_proto::metastore::{ - ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient, -}; +use quickwit_proto::metastore::{ListIndexesMetadataRequest, MetastoreReadService}; use quickwit_proto::search::{ FetchDocsRequest, FetchDocsResponse, Hit, LeafHit, LeafRequestRef, LeafResourceStats, LeafSearchRequest, LeafSearchResponse, PartialHit, RootResourceStats, SearchPlanResponse, @@ -1208,7 +1206,7 @@ pub fn ensure_all_indexes_found( } async fn refine_and_list_matches( - metastore: &mut MetastoreServiceClient, + metastore: &dyn MetastoreReadService, search_request: &mut SearchRequest, indexes_metadata: Vec, query_ast_resolved: QueryAst, @@ -1251,7 +1249,7 @@ async fn refine_and_list_matches( /// Fetches the list of splits and their metadata from the metastore async fn plan_splits_for_root_search( search_request: &mut SearchRequest, - metastore: &mut MetastoreServiceClient, + metastore: &dyn MetastoreReadService, ) -> crate::Result<(Vec, IndexesMetasForLeafSearch)> { let list_indexes_metadatas_request = ListIndexesMetadataRequest { index_id_patterns: search_request.index_id_patterns.clone(), @@ -1295,14 +1293,14 @@ async fn plan_splits_for_root_search( pub async fn root_search( searcher_context: &SearcherContext, mut search_request: SearchRequest, - mut metastore: MetastoreServiceClient, + metastore: &dyn MetastoreReadService, cluster_client: &ClusterClient, ) -> crate::Result { let start_instant = Instant::now(); let (split_metadatas, indexes_meta_for_leaf_search) = RootSearchMetricsFuture { start: start_instant, - tracked: plan_splits_for_root_search(&mut search_request, &mut metastore), + tracked: plan_splits_for_root_search(&mut search_request, metastore), is_success: None, step: RootSearchMetricsStep::Plan, } @@ -1364,7 +1362,7 @@ pub async fn root_search( /// Returns details on how a query would be executed pub async fn search_plan( mut search_request: SearchRequest, - mut metastore: MetastoreServiceClient, + metastore: &dyn MetastoreReadService, ) -> crate::Result { let list_indexes_metadatas_request = ListIndexesMetadataRequest { index_id_patterns: search_request.index_id_patterns.clone(), @@ -1396,7 +1394,7 @@ pub async fn search_plan( let request_metadata = validate_request_and_build_metadata(&indexes_metadata, &search_request)?; let split_metadatas = refine_and_list_matches( - &mut metastore, + metastore, &mut search_request, indexes_metadata, request_metadata.query_ast_resolved.clone(), @@ -1902,7 +1900,8 @@ mod tests { use quickwit_indexing::MockSplitBuilder; use quickwit_metastore::{IndexMetadata, ListSplitsRequestExt, ListSplitsResponseExt}; use quickwit_proto::metastore::{ - ListIndexesMetadataResponse, ListSplitsResponse, MockMetastoreService, + ListIndexesMetadataResponse, ListSplitsResponse, MetastoreServiceClient, + MockMetastoreService, }; use quickwit_proto::search::{ ScrollRequest, SortByValue, SortOrder, SortValue, SplitSearchError, @@ -2758,7 +2757,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -2828,7 +2827,7 @@ mod tests { let search_response = root_search( &searcher_context, search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -2920,7 +2919,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -3215,7 +3214,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await?; @@ -3333,7 +3332,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -3465,7 +3464,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request.clone(), - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await?; @@ -3647,7 +3646,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request.clone(), - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await?; @@ -3771,7 +3770,7 @@ mod tests { let search_response = root_search( &searcher_context, search_request, - mock_metastore_client.clone(), + &mock_metastore_client, &cluster_client, ) .await @@ -3790,7 +3789,7 @@ mod tests { let search_error = root_search( &searcher_context, search_request, - mock_metastore_client, + &mock_metastore_client, &cluster_client, ) .await @@ -3915,7 +3914,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -4055,7 +4054,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -4136,7 +4135,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -4203,7 +4202,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -4293,7 +4292,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -4375,7 +4374,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -4424,7 +4423,7 @@ mod tests { max_hits: 10, ..Default::default() }, - metastore.clone(), + &metastore, &cluster_client, ) .await @@ -4440,7 +4439,7 @@ mod tests { max_hits: 10, ..Default::default() }, - metastore, + &metastore, &cluster_client, ) .await @@ -4505,7 +4504,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await; @@ -4555,7 +4554,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - metastore.clone(), + &metastore, &cluster_client, ) .await; @@ -4575,7 +4574,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - metastore, + &metastore, &cluster_client, ) .await; @@ -4625,7 +4624,7 @@ mod tests { }); let search_response = search_plan( search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), ) .await .unwrap(); @@ -4712,7 +4711,7 @@ mod tests { ignore_missing_indexes: true, ..Default::default() }, - mock_metastore_service.clone(), + &mock_metastore_service, ) .await .unwrap(); @@ -4726,7 +4725,7 @@ mod tests { ignore_missing_indexes: false, ..Default::default() }, - mock_metastore_service.clone(), + &mock_metastore_service, ) .await .unwrap_err(); @@ -5093,7 +5092,7 @@ mod tests { let search_response = root_search( &searcher_context, search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -5359,7 +5358,7 @@ mod tests { let search_response = root_search( &searcher_context, search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -5541,7 +5540,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -5662,7 +5661,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -5715,7 +5714,7 @@ mod tests { let search_error = root_search( &searcher_context, search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index db2b066ae83..f26c1513160 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -20,7 +20,7 @@ use async_trait::async_trait; use quickwit_common::uri::Uri; use quickwit_config::SearcherConfig; use quickwit_doc_mapper::DocMapper; -use quickwit_proto::metastore::MetastoreServiceClient; +use quickwit_proto::metastore::MetastoreReadServiceClient; use quickwit_proto::search::{ FetchDocsRequest, FetchDocsResponse, GetKvRequest, Hit, LeafListFieldsRequest, LeafListTermsRequest, LeafListTermsResponse, LeafSearchRequest, LeafSearchResponse, @@ -47,7 +47,7 @@ use crate::{ClusterClient, SearchError, fetch_docs, root_search, search_plan}; #[derive(Clone)] /// The search service implementation. pub struct SearchServiceImpl { - metastore: MetastoreServiceClient, + metastore: MetastoreReadServiceClient, storage_resolver: StorageResolver, cluster_client: ClusterClient, searcher_context: Arc, @@ -141,7 +141,7 @@ pub trait SearchService: 'static + Send + Sync { impl SearchServiceImpl { /// Creates a new search service. pub fn new( - metastore: MetastoreServiceClient, + metastore: MetastoreReadServiceClient, storage_resolver: StorageResolver, cluster_client: ClusterClient, searcher_context: Arc, @@ -170,7 +170,7 @@ impl SearchService for SearchServiceImpl { let search_result = root_search( &self.searcher_context, search_request, - self.metastore.clone(), + self.metastore.as_ref(), &self.cluster_client, ) .await?; @@ -233,7 +233,7 @@ impl SearchService for SearchServiceImpl { ) -> crate::Result { let search_result = root_list_terms( &list_terms_request, - self.metastore.clone(), + self.metastore.as_ref(), &self.cluster_client, ) .await?; @@ -293,7 +293,7 @@ impl SearchService for SearchServiceImpl { root_list_fields( list_fields_req, &self.cluster_client, - self.metastore.clone(), + self.metastore.as_ref(), ) .await } @@ -320,7 +320,7 @@ impl SearchService for SearchServiceImpl { &self, search_request: SearchRequest, ) -> crate::Result { - let search_plan = search_plan(search_request, self.metastore.clone()).await?; + let search_plan = search_plan(search_request, self.metastore.as_ref()).await?; Ok(search_plan) } diff --git a/quickwit/quickwit-search/src/tests.rs b/quickwit/quickwit-search/src/tests.rs index c8d851d06cb..2b65560e1a3 100644 --- a/quickwit/quickwit-search/src/tests.rs +++ b/quickwit/quickwit-search/src/tests.rs @@ -966,7 +966,7 @@ async fn test_single_node_split_pruning_by_tags() -> anyhow::Result<()> { None, None, extract_tags_from_query(query_ast), - &mut test_sandbox.metastore(), + &test_sandbox.metastore(), ) .await?; assert!(selected_splits.is_empty()); @@ -978,7 +978,7 @@ async fn test_single_node_split_pruning_by_tags() -> anyhow::Result<()> { None, None, extract_tags_from_query(query_ast), - &mut test_sandbox.metastore(), + &test_sandbox.metastore(), ) .await?; assert_eq!(selected_splits.len(), 2); @@ -990,7 +990,7 @@ async fn test_single_node_split_pruning_by_tags() -> anyhow::Result<()> { None, None, extract_tags_from_query(query_ast), - &mut test_sandbox.metastore(), + &test_sandbox.metastore(), ) .await?; assert_eq!(selected_splits.len(), 2); diff --git a/quickwit/quickwit-serve/src/datafusion_api/setup.rs b/quickwit/quickwit-serve/src/datafusion_api/setup.rs index aad5e45a57b..11210184b94 100644 --- a/quickwit/quickwit-serve/src/datafusion_api/setup.rs +++ b/quickwit/quickwit-serve/src/datafusion_api/setup.rs @@ -39,7 +39,7 @@ use quickwit_datafusion::{ DataFusionService, DataFusionSessionBuilder, QuickwitObjectStoreRegistry, QuickwitWorkerResolver, build_worker, }; -use quickwit_proto::metastore::MetastoreServiceClient; +use quickwit_proto::metastore::MetastoreReadServiceClient; use quickwit_search::{SearchServiceClient, SearcherPool, create_search_client_from_grpc_addr}; use quickwit_storage::StorageResolver; use tokio::time::timeout; @@ -66,7 +66,7 @@ use crate::QuickwitServices; pub(crate) fn build_datafusion_session_builder( node_config: &NodeConfig, cluster_change_stream: ClusterChangeStream, - metastore: MetastoreServiceClient, + metastore: MetastoreReadServiceClient, storage_resolver: StorageResolver, ) -> anyhow::Result>> { if !node_config.is_service_enabled(QuickwitService::Searcher) { diff --git a/quickwit/quickwit-serve/src/developer_api/debug.rs b/quickwit/quickwit-serve/src/developer_api/debug.rs index 23d8285cf59..4a3ddcaf263 100644 --- a/quickwit/quickwit-serve/src/developer_api/debug.rs +++ b/quickwit/quickwit-serve/src/developer_api/debug.rs @@ -111,6 +111,7 @@ async fn get_node_debug_infos( ready_node.channel(), DeveloperApiServer::MAX_GRPC_MESSAGE_SIZE, Some(CompressionEncoding::Zstd), + [], ); let roles = target_roles.iter().map(|role| role.to_string()).collect(); let request = GetDebugInfoRequest { roles }; diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs index 0e733081f6b..ee92560f608 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs @@ -205,12 +205,12 @@ async fn get_index_metadata( pub(crate) async fn es_compat_index_mapping( index_id: String, params: IndexMappingQueryParams, - mut metastore: MetastoreServiceClient, + metastore: MetastoreServiceClient, search_service: Arc, ) -> Result { let indexes_metadata = if index_id.contains('*') || index_id.contains(',') { let patterns: Vec = index_id.split(',').map(|s| s.trim().to_string()).collect(); - resolve_index_patterns(&patterns, &mut metastore).await? + resolve_index_patterns(&patterns, &metastore).await? } else { vec![get_index_metadata(index_id.clone(), metastore).await?] }; @@ -736,9 +736,9 @@ async fn es_compat_stats( pub(crate) async fn es_compat_index_stats( index_id_patterns: Vec, - mut metastore: MetastoreServiceClient, + metastore: MetastoreServiceClient, ) -> Result { - let indexes_metadata = resolve_index_patterns(&index_id_patterns, &mut metastore).await?; + let indexes_metadata = resolve_index_patterns(&index_id_patterns, &metastore).await?; // Index uid to index id mapping let index_uid_to_index_id: HashMap = indexes_metadata @@ -751,7 +751,7 @@ pub(crate) async fn es_compat_index_stats( .map(|index_metadata| index_metadata.index_uid) .collect_vec(); // calling into the search module is not necessary, but reuses established patterns - let splits_metadata = list_all_splits(index_uids, &mut metastore).await?; + let splits_metadata = list_all_splits(index_uids, &metastore).await?; let search_response_rest: ElasticsearchStatsResponse = convert_to_es_stats_response(index_uid_to_index_id, splits_metadata); @@ -769,10 +769,10 @@ pub(crate) async fn es_compat_cat_indices( pub(crate) async fn es_compat_index_cat_indices( index_id_patterns: Vec, query_params: CatIndexQueryParams, - mut metastore: MetastoreServiceClient, + metastore: MetastoreServiceClient, ) -> Result, ElasticsearchError> { query_params.validate()?; - let indexes_metadata = resolve_index_patterns(&index_id_patterns, &mut metastore).await?; + let indexes_metadata = resolve_index_patterns(&index_id_patterns, &metastore).await?; let mut index_id_to_resp: HashMap = indexes_metadata .iter() .map(|metadata| (metadata.index_uid.to_owned(), metadata.clone().into())) @@ -785,7 +785,7 @@ pub(crate) async fn es_compat_index_cat_indices( .collect_vec(); // calling into the search module is not necessary, but reuses established patterns - list_all_splits(index_uids, &mut metastore).await? + list_all_splits(index_uids, &metastore).await? }; let search_response_rest: Vec = @@ -815,9 +815,9 @@ pub(crate) async fn es_compat_index_cat_indices( pub(crate) async fn es_compat_resolve_index( index_id_patterns: Vec, - mut metastore: MetastoreServiceClient, + metastore: MetastoreServiceClient, ) -> Result { - let indexes_metadata = resolve_index_patterns(&index_id_patterns, &mut metastore).await?; + let indexes_metadata = resolve_index_patterns(&index_id_patterns, &metastore).await?; let mut indices: Vec = indexes_metadata .into_iter() .map(|metadata| metadata.into()) diff --git a/quickwit/quickwit-serve/src/grpc.rs b/quickwit/quickwit-serve/src/grpc.rs index c0cac95a146..7dc08b6387e 100644 --- a/quickwit/quickwit-serve/src/grpc.rs +++ b/quickwit/quickwit-serve/src/grpc.rs @@ -67,7 +67,10 @@ pub(crate) async fn start_grpc_server( enabled_grpc_services.insert("metastore"); file_descriptor_sets.push(quickwit_proto::metastore::METASTORE_FILE_DESCRIPTOR_SET); - Some(metastore_server.as_grpc_service(grpc_config.max_message_size)) + Some(metastore_server.as_grpc_service_with_read_replica( + services.metastore_read_replica_server_opt.clone(), + grpc_config.max_message_size, + )) } else { None }; diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 31840382d2d..7dc09e29b2a 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -104,8 +104,8 @@ use quickwit_proto::ingest::ingester::{ use quickwit_proto::ingest::router::IngestRouterServiceClient; use quickwit_proto::ingest::{IngestV2Error, RateLimitingCause}; use quickwit_proto::metastore::{ - EntityKind, ListIndexesMetadataRequest, MetastoreError, MetastoreService, - MetastoreServiceClient, + EntityKind, ListIndexesMetadataRequest, MetastoreError, MetastoreReadServiceClient, + MetastoreService, MetastoreServiceClient, }; use quickwit_proto::search::ReportSplitsRequest; use quickwit_proto::types::NodeId; @@ -181,6 +181,7 @@ struct QuickwitServices { pub node_config: Arc, pub cluster: Cluster, pub metastore_server_opt: Option, + pub metastore_read_replica_server_opt: Option, pub metastore_client: MetastoreServiceClient, pub control_plane_server_opt: Option>, pub control_plane_client: ControlPlaneServiceClient, @@ -303,6 +304,7 @@ async fn start_ingest_client_if_needed( balance_channel, node_config.grpc_config.max_message_size, node_config.ingest_api_config.grpc_compression_encoding(), + [], ); Ok(ingest_service) } @@ -373,6 +375,7 @@ async fn start_control_plane_if_needed( balance_channel, node_config.grpc_config.max_message_size, None, + [], ); Ok((control_plane_server_opt, control_plane_client)) } @@ -459,45 +462,68 @@ pub async fn serve_quickwit( let grpc_config = node_config.grpc_config.clone(); // Instantiate a metastore "server" if the `metastore` role is enabled on the node. - let metastore_server_opt: Option = - if node_config.is_service_enabled(QuickwitService::Metastore) { - let metastore: MetastoreServiceClient = metastore_resolver - .resolve(&node_config.metastore_uri) - .await - .with_context(|| { - format!( - "failed to resolve metastore uri `{}`", - node_config.metastore_uri - ) - })?; - let max_in_flight_requests = if node_config.metastore_uri.protocol().is_database() { - node_config - .metastore_configs - .find_postgres() - .map(|config| config.max_connections.get() * 2) - .unwrap_or_default() - .max(100) + let (metastore_server_opt, metastore_read_replica_server_opt): ( + Option, + Option, + ) = if node_config.is_service_enabled(QuickwitService::Metastore) { + let metastore: MetastoreServiceClient = metastore_resolver + .resolve(&node_config.metastore_uri) + .await + .with_context(|| { + format!( + "failed to resolve metastore uri `{}`", + node_config.metastore_uri + ) + })?; + let metastore_read_replica = + if let Some(metastore_read_replica_uri) = &node_config.metastore_read_replica_uri { + Some( + metastore_resolver + .resolve_read_only(metastore_read_replica_uri) + .await + .with_context(|| { + format!( + "failed to resolve metastore read replica uri \ + `{metastore_read_replica_uri}`" + ) + })?, + ) } else { - 100 + None }; - // These layers apply to all the RPCs of the metastore. - let shared_layer = ServiceBuilder::new() - .layer(METASTORE_GRPC_SERVER_METRICS_LAYER.clone()) - .layer(LoadShedLayer::new(max_in_flight_requests)) - .into_inner(); + let max_in_flight_requests = if node_config.metastore_uri.protocol().is_database() { + node_config + .metastore_configs + .find_postgres() + .map(|config| config.max_connections.get() * 2) + .unwrap_or_default() + .max(100) + } else { + 100 + }; + // These layers apply to all the RPCs of the metastore. + let shared_layer = ServiceBuilder::new() + .layer(METASTORE_GRPC_SERVER_METRICS_LAYER.clone()) + .layer(LoadShedLayer::new(max_in_flight_requests)) + .into_inner(); + let build_metastore_server = |metastore| { let broker_layer = EventListenerLayer::new(event_broker.clone()); - let metastore = MetastoreServiceClient::tower() - .stack_layer(shared_layer) + MetastoreServiceClient::tower() + .stack_layer(shared_layer.clone()) .stack_create_index_layer(broker_layer.clone()) .stack_delete_index_layer(broker_layer.clone()) .stack_add_source_layer(broker_layer.clone()) .stack_delete_source_layer(broker_layer.clone()) .stack_toggle_source_layer(broker_layer) - .build(metastore); - Some(metastore) - } else { - None + .build(metastore) }; + ( + Some(build_metastore_server(metastore)), + metastore_read_replica.map(build_metastore_server), + ) + } else { + (None, None) + }; // Instantiate a metastore client, either local if available or remote otherwise. let metastore_client: MetastoreServiceClient = if let Some(metastore_server) = &metastore_server_opt { @@ -523,7 +549,7 @@ pub async fn serve_quickwit( .stack_layer(tower::limit::GlobalConcurrencyLimitLayer::new( get_metastore_client_max_concurrency(), )) - .build_from_balance_channel(balance_channel, grpc_config.max_message_size, None) + .build_from_balance_channel(balance_channel, grpc_config.max_message_size, None, []) }; // Instantiate a control plane server if the `control-plane` role is enabled on the node. // Otherwise, instantiate a control plane client. @@ -676,12 +702,33 @@ pub async fn serve_quickwit( )) }; + let balance_channel = balance_channel_for_service(&cluster, QuickwitService::Metastore).await; + // Search remains available without a control plane because not all metastore RPCs + // are proxied. Metastore nodes route read-replica headers to the primary when no + // read replica is configured. + let metastore_read_only_client = MetastoreServiceClient::tower() + .stack_layer(RetryLayer::new(RetryPolicy::from(RetryParams::standard()))) + .stack_layer(TimeoutLayer::new(GRPC_METASTORE_SERVICE_TIMEOUT)) + .stack_layer(METASTORE_GRPC_CLIENT_METRICS_LAYER.clone()) + .stack_layer(tower::limit::GlobalConcurrencyLimitLayer::new( + get_metastore_client_max_concurrency(), + )) + .build_from_balance_channel( + balance_channel, + grpc_config.max_message_size, + None, + [quickwit_proto::grpc_read_replica::read_replica_header_interceptor()], + ); + let searcher_metastore_client: MetastoreReadServiceClient = + Arc::new(MetastoreServiceClient::new(ControlPlaneMetastore::new( + control_plane_client.clone(), + metastore_read_only_client, + ))); + let (search_job_placer, search_service, searcher_pool) = setup_searcher( &node_config, cluster.change_stream(), - // search remains available without a control plane because not all - // metastore RPCs are proxied - metastore_through_control_plane.clone(), + searcher_metastore_client.clone(), storage_resolver.clone(), searcher_context, ) @@ -698,7 +745,7 @@ pub async fn serve_quickwit( let datafusion_session_builder = datafusion_api::setup::build_datafusion_session_builder( &node_config, cluster.change_stream(), - metastore_through_control_plane.clone(), + searcher_metastore_client.clone(), storage_resolver.clone(), )?; // The search job placer owns a clone of this pool; the local binding is not @@ -779,6 +826,7 @@ pub async fn serve_quickwit( node_config: Arc::new(node_config), cluster: cluster.clone(), metastore_server_opt, + metastore_read_replica_server_opt, metastore_client: metastore_through_control_plane.clone(), control_plane_server_opt, control_plane_client, @@ -1179,13 +1227,14 @@ fn build_ingester_service( node.channel(), max_message_size, grpc_compression_encoding_opt, + [], ) } async fn setup_searcher( node_config: &NodeConfig, cluster_change_stream: ClusterChangeStream, - metastore: MetastoreServiceClient, + metastore: MetastoreReadServiceClient, storage_resolver: StorageResolver, searcher_context: Arc, ) -> anyhow::Result<(SearchJobPlacer, Arc, SearcherPool)> { @@ -1406,6 +1455,7 @@ fn build_indexing_service( node.channel(), max_message_size, None, + [], ) } @@ -1806,7 +1856,7 @@ mod tests { let (search_job_placer, _searcher_service, _searcher_pool) = setup_searcher( &node_config, change_stream, - metastore, + Arc::new(metastore), storage_resolver, searcher_context, ) diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index 39bfb0e2580..7249b59f858 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -897,6 +897,7 @@ mod tests { otlp_traces_service_opt: None, metastore_client, metastore_server_opt: None, + metastore_read_replica_server_opt: None, node_config: Arc::new(node_config.clone()), search_service: Arc::new(MockSearchService::new()), jaeger_service_opt: None, From 3a7022bfc31e39f360aac71ad368985d5a8a4882 Mon Sep 17 00:00:00 2001 From: Shuhei Kitagawa Date: Wed, 24 Jun 2026 18:03:11 +0200 Subject: [PATCH 2/5] Route metastore reads to replicas conditionally --- quickwit/quickwit-serve/src/lib.rs | 78 ++++++++++++++++++------------ 1 file changed, 47 insertions(+), 31 deletions(-) diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 7dc09e29b2a..343cf096afe 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -69,8 +69,8 @@ use quickwit_common::retry::RetryParams; use quickwit_common::runtimes::RuntimesConfig; use quickwit_common::tower::{ BalanceChannel, BoxFutureInfaillible, BufferLayer, Change, CircuitBreakerEvaluator, - ConstantRate, EstimateRateLayer, EventListenerLayer, GrpcMetricsLayer, LoadShedLayer, - RateLimitLayer, RetryLayer, RetryPolicy, SmaRateEstimator, TimeoutLayer, + ConstantRate, EstimateRateLayer, EventListenerLayer, GrpcInterceptor, GrpcMetricsLayer, + LoadShedLayer, RateLimitLayer, RetryLayer, RetryPolicy, SmaRateEstimator, TimeoutLayer, }; use quickwit_common::uri::Uri; use quickwit_common::{get_bool_from_env, spawn_named_task}; @@ -153,6 +153,27 @@ fn get_metastore_client_max_concurrency() -> usize { ) } +fn build_metastore_client_from_balance_channel( + balance_channel: BalanceChannel, + max_message_size: ByteSize, + grpc_compression_encoding_opt: Option, + interceptors: impl IntoIterator, +) -> MetastoreServiceClient { + MetastoreServiceClient::tower() + .stack_layer(RetryLayer::new(RetryPolicy::from(RetryParams::standard()))) + .stack_layer(TimeoutLayer::new(GRPC_METASTORE_SERVICE_TIMEOUT)) + .stack_layer(METASTORE_GRPC_CLIENT_METRICS_LAYER.clone()) + .stack_layer(tower::limit::GlobalConcurrencyLimitLayer::new( + get_metastore_client_max_concurrency(), + )) + .build_from_balance_channel( + balance_channel, + max_message_size, + grpc_compression_encoding_opt, + interceptors, + ) +} + static CP_GRPC_CLIENT_METRICS_LAYER: LazyLock = LazyLock::new(|| GrpcMetricsLayer::new("control_plane", "client")); static CP_GRPC_SERVER_METRICS_LAYER: LazyLock = @@ -542,14 +563,12 @@ pub async fn serve_quickwit( { bail!("could not find any metastore node in the cluster"); } - MetastoreServiceClient::tower() - .stack_layer(RetryLayer::new(RetryPolicy::from(RetryParams::standard()))) - .stack_layer(TimeoutLayer::new(GRPC_METASTORE_SERVICE_TIMEOUT)) - .stack_layer(METASTORE_GRPC_CLIENT_METRICS_LAYER.clone()) - .stack_layer(tower::limit::GlobalConcurrencyLimitLayer::new( - get_metastore_client_max_concurrency(), - )) - .build_from_balance_channel(balance_channel, grpc_config.max_message_size, None, []) + build_metastore_client_from_balance_channel( + balance_channel, + grpc_config.max_message_size, + None, + [], + ) }; // Instantiate a control plane server if the `control-plane` role is enabled on the node. // Otherwise, instantiate a control plane client. @@ -702,28 +721,25 @@ pub async fn serve_quickwit( )) }; - let balance_channel = balance_channel_for_service(&cluster, QuickwitService::Metastore).await; - // Search remains available without a control plane because not all metastore RPCs - // are proxied. Metastore nodes route read-replica headers to the primary when no - // read replica is configured. - let metastore_read_only_client = MetastoreServiceClient::tower() - .stack_layer(RetryLayer::new(RetryPolicy::from(RetryParams::standard()))) - .stack_layer(TimeoutLayer::new(GRPC_METASTORE_SERVICE_TIMEOUT)) - .stack_layer(METASTORE_GRPC_CLIENT_METRICS_LAYER.clone()) - .stack_layer(tower::limit::GlobalConcurrencyLimitLayer::new( - get_metastore_client_max_concurrency(), - )) - .build_from_balance_channel( - balance_channel, - grpc_config.max_message_size, - None, - [quickwit_proto::grpc_read_replica::read_replica_header_interceptor()], - ); let searcher_metastore_client: MetastoreReadServiceClient = - Arc::new(MetastoreServiceClient::new(ControlPlaneMetastore::new( - control_plane_client.clone(), - metastore_read_only_client, - ))); + if node_config.metastore_read_replica_uri.is_some() { + let balance_channel = + balance_channel_for_service(&cluster, QuickwitService::Metastore).await; + // The read-replica header is honored at the metastore gRPC server boundary, + // so read-only callers must use a remote metastore client when a replica exists. + let metastore_read_only_client = build_metastore_client_from_balance_channel( + balance_channel, + grpc_config.max_message_size, + None, + [quickwit_proto::grpc_read_replica::read_replica_header_interceptor()], + ); + Arc::new(MetastoreServiceClient::new(ControlPlaneMetastore::new( + control_plane_client.clone(), + metastore_read_only_client, + ))) + } else { + Arc::new(metastore_through_control_plane.clone()) + }; let (search_job_placer, search_service, searcher_pool) = setup_searcher( &node_config, From 8bdc4faf588239e65f8033d4feef7999c07a1d9f Mon Sep 17 00:00:00 2001 From: Shuhei Kitagawa Date: Wed, 24 Jun 2026 18:21:59 +0200 Subject: [PATCH 3/5] Fix metastore read replica startup routing --- quickwit/quickwit-serve/src/lib.rs | 41 +++++++++++++++++------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 343cf096afe..97386f749bf 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -174,6 +174,23 @@ fn build_metastore_client_from_balance_channel( ) } +async fn metastore_balance_channel( + cluster: &Cluster, +) -> anyhow::Result> { + info!("connecting to metastore"); + + let balance_channel = balance_channel_for_service(cluster, QuickwitService::Metastore).await; + if !balance_channel + .wait_for(Duration::from_secs(300), |connections| { + !connections.is_empty() + }) + .await + { + bail!("could not find any metastore node in the cluster"); + } + Ok(balance_channel) +} + static CP_GRPC_CLIENT_METRICS_LAYER: LazyLock = LazyLock::new(|| GrpcMetricsLayer::new("control_plane", "client")); static CP_GRPC_SERVER_METRICS_LAYER: LazyLock = @@ -550,19 +567,7 @@ pub async fn serve_quickwit( if let Some(metastore_server) = &metastore_server_opt { metastore_server.clone() } else { - info!("connecting to metastore"); - - let balance_channel = - balance_channel_for_service(&cluster, QuickwitService::Metastore).await; - - if !balance_channel - .wait_for(Duration::from_secs(300), |connections| { - !connections.is_empty() - }) - .await - { - bail!("could not find any metastore node in the cluster"); - } + let balance_channel = metastore_balance_channel(&cluster).await?; build_metastore_client_from_balance_channel( balance_channel, grpc_config.max_message_size, @@ -722,11 +727,13 @@ pub async fn serve_quickwit( }; let searcher_metastore_client: MetastoreReadServiceClient = - if node_config.metastore_read_replica_uri.is_some() { - let balance_channel = - balance_channel_for_service(&cluster, QuickwitService::Metastore).await; + if let Some(metastore_read_replica_server) = &metastore_read_replica_server_opt { + Arc::new(metastore_read_replica_server.clone()) + } else if node_config.metastore_read_replica_uri.is_some() { + let balance_channel = metastore_balance_channel(&cluster).await?; // The read-replica header is honored at the metastore gRPC server boundary, - // so read-only callers must use a remote metastore client when a replica exists. + // so remote read-only callers must use a remote metastore client when a + // replica exists. let metastore_read_only_client = build_metastore_client_from_balance_channel( balance_channel, grpc_config.max_message_size, From 60b1cf855be3c147600178bdb7b5c98fbaaf2c55 Mon Sep 17 00:00:00 2001 From: Shuhei Kitagawa Date: Wed, 24 Jun 2026 19:12:07 +0200 Subject: [PATCH 4/5] Respect readiness in read replica gRPC service --- .../quickwit-proto/src/grpc_read_replica.rs | 28 ++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/quickwit/quickwit-proto/src/grpc_read_replica.rs b/quickwit/quickwit-proto/src/grpc_read_replica.rs index cf9ca957af0..4e7d9767913 100644 --- a/quickwit/quickwit-proto/src/grpc_read_replica.rs +++ b/quickwit/quickwit-proto/src/grpc_read_replica.rs @@ -27,7 +27,7 @@ pub fn read_replica_header_interceptor() -> quickwit_common::tower::GrpcIntercep quickwit_common::tower::fixed_headers_interceptor(headers) } -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct ReadReplicaGrpcService { primary: S, read_replica: Option, @@ -42,6 +42,12 @@ impl ReadReplicaGrpcService { } } +impl Clone for ReadReplicaGrpcService { + fn clone(&self) -> Self { + Self::new(self.primary.clone(), self.read_replica.clone()) + } +} + impl tonic::server::NamedService for ReadReplicaGrpcService where S: tonic::server::NamedService { @@ -63,9 +69,13 @@ where { type Response = http::Response; type Error = Infallible; - type Future = tonic::codegen::BoxFuture; + type Future = S::Future; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + std::task::ready!(tower::Service::poll_ready(&mut self.primary, cx))?; + if let Some(read_replica) = self.read_replica.as_mut() { + std::task::ready!(tower::Service::poll_ready(read_replica, cx))?; + } Poll::Ready(Ok(())) } @@ -75,13 +85,11 @@ where .get(READ_REPLICA_HEADER_NAME) .and_then(|value| value.to_str().ok()) == Some(READ_REPLICA_HEADER_VALUE); - let mut service = if use_read_replica { - self.read_replica - .clone() - .unwrap_or_else(|| self.primary.clone()) + + if use_read_replica && let Some(read_replica) = self.read_replica.as_mut() { + tower::Service::call(read_replica, request) } else { - self.primary.clone() - }; - Box::pin(async move { tower::Service::call(&mut service, request).await }) + tower::Service::call(&mut self.primary, request) + } } } From 8eb71762e32b9c9d33e491506fab006f5b5ab9d6 Mon Sep 17 00:00:00 2001 From: Shuhei Kitagawa Date: Wed, 24 Jun 2026 19:48:54 +0200 Subject: [PATCH 5/5] Wait for Quickwit readiness in CLI tests --- quickwit/quickwit-cli/tests/helpers.rs | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/quickwit/quickwit-cli/tests/helpers.rs b/quickwit/quickwit-cli/tests/helpers.rs index b9b8207aa1a..2730ff65398 100644 --- a/quickwit/quickwit-cli/tests/helpers.rs +++ b/quickwit/quickwit-cli/tests/helpers.rs @@ -16,13 +16,14 @@ use std::fs; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::Arc; +use std::time::Duration; use anyhow::Context; use predicates::str; use quickwit_cli::ClientArgs; use quickwit_cli::service::RunCliCommand; use quickwit_common::net::find_available_tcp_port; -use quickwit_common::test_utils::wait_for_server_ready; +use quickwit_common::test_utils::wait_until_predicate; use quickwit_common::uri::Uri; use quickwit_config::service::QuickwitService; use quickwit_metastore::{IndexMetadata, IndexMetadataResponseExt, MetastoreResolver}; @@ -174,7 +175,7 @@ impl TestEnv { _ = server_handle => {} } }); - wait_for_server_ready(([127, 0, 0, 1], self.rest_listen_port).into()).await?; + wait_for_quickwit_ready(self.rest_listen_port).await?; Ok(()) } @@ -186,6 +187,26 @@ impl TestEnv { } } +async fn wait_for_quickwit_ready(rest_listen_port: u16) -> anyhow::Result<()> { + let ready_url = format!("http://127.0.0.1:{rest_listen_port}/health/readyz"); + wait_until_predicate( + || async { + let Ok(response) = reqwest::get(&ready_url).await else { + return false; + }; + if !response.status().is_success() { + return false; + } + response.json::().await.unwrap_or(false) + }, + Duration::from_secs(10), + Duration::from_millis(50), + ) + .await + .with_context(|| format!("quickwit server did not become ready at `{ready_url}`"))?; + Ok(()) +} + pub enum TestStorageType { S3, LocalFileSystem,