diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index 8f2bfc46a4a..88e33a85da5 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -82,8 +82,8 @@ pub use crate::serde_utils::HumanDuration; use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig}; pub use crate::storage_config::{ AzureStorageConfig, ChecksumAlgorithm, FileStorageConfig, GoogleCloudStorageConfig, - RamStorageConfig, S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, - StorageConfigs, + NamedS3StorageConfig, RamStorageConfig, S3StorageConfig, StorageBackend, StorageBackendFlavor, + StorageConfig, StorageConfigs, }; /// Returns true if the ingest API v2 is enabled. diff --git a/quickwit/quickwit-config/src/storage_config.rs b/quickwit/quickwit-config/src/storage_config.rs index c6e3e0ab16a..727f0a3ae2f 100644 --- a/quickwit/quickwit-config/src/storage_config.rs +++ b/quickwit/quickwit-config/src/storage_config.rs @@ -91,6 +91,23 @@ pub enum StorageBackendFlavor { #[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)] pub struct StorageConfigs(#[serde_as(as = "EnumMap")] Vec); +/// Validates a named S3 backend name. The name is embedded in the URI scheme +/// `s3+://`, which downstream consumers (metrics/DataFusion) parse with +/// `url::Url`. URL schemes are case-insensitive and limited to `[a-z0-9.+-]`, +/// so names are restricted to lowercase ASCII letters, digits, and `-` to keep +/// routing consistent across the storage and query paths. +fn validate_named_s3_backend_name(name: &str) -> anyhow::Result<()> { + ensure!(!name.is_empty(), "named S3 backend name must not be empty"); + for character in name.chars() { + ensure!( + character.is_ascii_lowercase() || character.is_ascii_digit() || character == '-', + "invalid named S3 backend name `{name}`: only lowercase ASCII letters, digits, and \ + `-` are allowed (the name is used in the `s3+{name}://` URI scheme)" + ); + } + Ok(()) +} + impl StorageConfigs { pub fn new(storage_configs: Vec) -> Self { Self(storage_configs) @@ -124,6 +141,13 @@ impl StorageConfigs { "{left:?} storage config is defined multiple times", ); } + for storage_config in self.0.iter() { + if let StorageConfig::S3(s3_storage_config) = storage_config { + for name in s3_storage_config.named.keys() { + validate_named_s3_backend_name(name)?; + } + } + } Ok(()) } @@ -967,6 +991,32 @@ mod tests { assert!(projected.force_path_style_access); } + #[test] + fn test_validate_named_s3_backend_name() { + // Valid: lowercase ASCII letters, digits, and `-`. + for valid in ["alt", "seaweedfs", "ovh-morocco", "s3alt", "minio-backend"] { + validate_named_s3_backend_name(valid).unwrap(); + } + // Invalid: empty, underscore, uppercase, dot, other punctuation. + for invalid in ["", "prod_logs", "Prod", "a.b", "a/b", "a b"] { + validate_named_s3_backend_name(invalid).unwrap_err(); + } + } + + #[test] + fn test_storage_configs_reject_url_incompatible_named_backend() { + let s3_storage_config_yaml = r#" + named: + prod_logs: + endpoint: https://logs.example.com + "#; + let s3_storage_config: S3StorageConfig = + serde_yaml::from_str(s3_storage_config_yaml).unwrap(); + let storage_configs = StorageConfigs::new(vec![s3_storage_config.into()]); + let error = storage_configs.validate().unwrap_err().to_string(); + assert!(error.contains("prod_logs"), "unexpected error: {error}"); + } + #[test] fn test_storage_s3_named_backend_uses_own_endpoint() { // A named backend is self-contained: `endpoint()` returns its configured diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index 118cd01edb9..a9109c1ad11 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -455,7 +455,7 @@ pub struct RootResourceStats { /// the first phase (running aggregation, and identifying the doc address of the top-k hits we should return) /// and the second phase (fetch documents). /// - /// If there are no top-k hits, the second phase . + /// If there are no top-k hits, the second phase is skipped. #[prost(uint64, tag = "8")] pub root_wall_time_microsecs: u64, } diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs index 70e4d5159fc..8a5f2b48537 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs @@ -13,13 +13,13 @@ // limitations under the License. use std::collections::HashMap; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use async_trait::async_trait; use aws_sdk_s3::Client as S3Client; use quickwit_common::uri::Uri; use quickwit_config::{S3StorageConfig, StorageBackend}; -use tokio::sync::{Mutex, OnceCell}; +use tokio::sync::OnceCell; use super::s3_compatible_storage::create_s3_client; use crate::{ @@ -43,8 +43,11 @@ pub struct S3CompatibleObjectStorageFactory { // end up being used, or if something like azure, gcs, or even local files, will be used // instead. s3_client: OnceCell, - // One cached S3Client per named backend. Built lazily on first use. - named_s3_clients: Mutex>, + // One cached S3Client per named backend, each behind its own `OnceCell` so + // backends initialize independently. The `Mutex` is only ever held + // synchronously to look up / insert the per-name cell — never across the + // client-building await. + named_s3_clients: Mutex>>>, } impl S3CompatibleObjectStorageFactory { @@ -76,15 +79,17 @@ impl StorageFactory for S3CompatibleObjectStorageFactory { )) })? .as_s3_config(); - let mut clients = self.named_s3_clients.lock().await; - let client = if let Some(client) = clients.get(name) { - client.clone() - } else { - let client = create_s3_client(&named_config).await; - clients.insert(name.to_string(), client.clone()); - client + let client_cell = { + let mut clients = self + .named_s3_clients + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + Arc::clone(clients.entry(name.to_string()).or_default()) }; - drop(clients); + let client = client_cell + .get_or_init(|| create_s3_client(&named_config)) + .await + .clone(); let storage = S3CompatibleObjectStorage::from_uri_and_client(&named_config, uri, client).await?; return Ok(Arc::new(DebouncedStorage::new(storage))); @@ -119,4 +124,53 @@ mod tests { Some("with-dash") ); } + + #[tokio::test] + #[cfg_attr(not(feature = "ci-test"), ignore)] + async fn test_named_backends_cache_independently() { + use std::collections::BTreeMap; + + use quickwit_config::NamedS3StorageConfig; + + let mut named = BTreeMap::new(); + for backend in ["alt", "other"] { + named.insert( + backend.to_string(), + NamedS3StorageConfig { + endpoint: Some("http://localhost:4566".to_string()), + region: Some("us-east-1".to_string()), + force_path_style_access: true, + ..Default::default() + }, + ); + } + let storage_config = S3StorageConfig { + named, + ..Default::default() + }; + let factory = S3CompatibleObjectStorageFactory::new(storage_config); + + // Distinct named backends each resolve into their own cached cell. + factory + .resolve(&Uri::for_test("s3+alt://bucket/a")) + .await + .unwrap(); + factory + .resolve(&Uri::for_test("s3+other://bucket/b")) + .await + .unwrap(); + // Re-resolving a backend reuses the cached, initialized cell. + factory + .resolve(&Uri::for_test("s3+alt://bucket/c")) + .await + .unwrap(); + + let clients = factory + .named_s3_clients + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + assert_eq!(clients.len(), 2); + assert!(clients.get("alt").unwrap().initialized()); + assert!(clients.get("other").unwrap().initialized()); + } }