Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ pub use crate::serde_utils::HumanDuration;
use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig};
pub use crate::storage_config::{
AzureStorageConfig, ChecksumAlgorithm, FileStorageConfig, GoogleCloudStorageConfig,
RamStorageConfig, S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig,
StorageConfigs,
NamedS3StorageConfig, RamStorageConfig, S3StorageConfig, StorageBackend, StorageBackendFlavor,
StorageConfig, StorageConfigs,
};

/// Returns true if the ingest API v2 is enabled.
Expand Down
50 changes: 50 additions & 0 deletions quickwit/quickwit-config/src/storage_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,23 @@ pub enum StorageBackendFlavor {
#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
pub struct StorageConfigs(#[serde_as(as = "EnumMap")] Vec<StorageConfig>);

/// Validates a named S3 backend name. The name is embedded in the URI scheme
/// `s3+<name>://`, which downstream consumers (metrics/DataFusion) parse with
/// `url::Url`. URL schemes are case-insensitive and limited to `[a-z0-9.+-]`,
/// so names are restricted to lowercase ASCII letters, digits, and `-` to keep
/// routing consistent across the storage and query paths.
fn validate_named_s3_backend_name(name: &str) -> anyhow::Result<()> {
ensure!(!name.is_empty(), "named S3 backend name must not be empty");
for character in name.chars() {
ensure!(
character.is_ascii_lowercase() || character.is_ascii_digit() || character == '-',
"invalid named S3 backend name `{name}`: only lowercase ASCII letters, digits, and \
`-` are allowed (the name is used in the `s3+{name}://` URI scheme)"
);
}
Ok(())
}

impl StorageConfigs {
pub fn new(storage_configs: Vec<StorageConfig>) -> Self {
Self(storage_configs)
Expand Down Expand Up @@ -124,6 +141,13 @@ impl StorageConfigs {
"{left:?} storage config is defined multiple times",
);
}
for storage_config in self.0.iter() {
if let StorageConfig::S3(s3_storage_config) = storage_config {
for name in s3_storage_config.named.keys() {
validate_named_s3_backend_name(name)?;
}
}
}
Ok(())
}

Expand Down Expand Up @@ -967,6 +991,32 @@ mod tests {
assert!(projected.force_path_style_access);
}

#[test]
fn test_validate_named_s3_backend_name() {
// Valid: lowercase ASCII letters, digits, and `-`.
for valid in ["alt", "seaweedfs", "ovh-morocco", "s3alt", "minio-backend"] {
validate_named_s3_backend_name(valid).unwrap();
}
// Invalid: empty, underscore, uppercase, dot, other punctuation.
for invalid in ["", "prod_logs", "Prod", "a.b", "a/b", "a b"] {
validate_named_s3_backend_name(invalid).unwrap_err();
}
}

#[test]
fn test_storage_configs_reject_url_incompatible_named_backend() {
let s3_storage_config_yaml = r#"
named:
prod_logs:
endpoint: https://logs.example.com
"#;
let s3_storage_config: S3StorageConfig =
serde_yaml::from_str(s3_storage_config_yaml).unwrap();
let storage_configs = StorageConfigs::new(vec![s3_storage_config.into()]);
let error = storage_configs.validate().unwrap_err().to_string();
assert!(error.contains("prod_logs"), "unexpected error: {error}");
}

#[test]
fn test_storage_s3_named_backend_uses_own_endpoint() {
// A named backend is self-contained: `endpoint()` returns its configured
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use async_trait::async_trait;
use aws_sdk_s3::Client as S3Client;
use quickwit_common::uri::Uri;
use quickwit_config::{S3StorageConfig, StorageBackend};
use tokio::sync::{Mutex, OnceCell};
use tokio::sync::OnceCell;

use super::s3_compatible_storage::create_s3_client;
use crate::{
Expand All @@ -43,8 +43,11 @@ pub struct S3CompatibleObjectStorageFactory {
// end up being used, or if something like azure, gcs, or even local files, will be used
// instead.
s3_client: OnceCell<S3Client>,
// One cached S3Client per named backend. Built lazily on first use.
named_s3_clients: Mutex<HashMap<String, S3Client>>,
// One cached S3Client per named backend, each behind its own `OnceCell` so
// backends initialize independently. The `Mutex` is only ever held
// synchronously to look up / insert the per-name cell — never across the
// client-building await.
named_s3_clients: Mutex<HashMap<String, Arc<OnceCell<S3Client>>>>,
}

impl S3CompatibleObjectStorageFactory {
Expand Down Expand Up @@ -76,15 +79,17 @@ impl StorageFactory for S3CompatibleObjectStorageFactory {
))
})?
.as_s3_config();
let mut clients = self.named_s3_clients.lock().await;
let client = if let Some(client) = clients.get(name) {
client.clone()
} else {
let client = create_s3_client(&named_config).await;
clients.insert(name.to_string(), client.clone());
client
let client_cell = {
let mut clients = self
.named_s3_clients
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
Arc::clone(clients.entry(name.to_string()).or_default())
};
drop(clients);
let client = client_cell
.get_or_init(|| create_s3_client(&named_config))
.await
.clone();
let storage =
S3CompatibleObjectStorage::from_uri_and_client(&named_config, uri, client).await?;
return Ok(Arc::new(DebouncedStorage::new(storage)));
Expand Down Expand Up @@ -119,4 +124,53 @@ mod tests {
Some("with-dash")
);
}

#[tokio::test]
#[cfg_attr(not(feature = "ci-test"), ignore)]
async fn test_named_backends_cache_independently() {
use std::collections::BTreeMap;

use quickwit_config::NamedS3StorageConfig;

let mut named = BTreeMap::new();
for backend in ["alt", "other"] {
named.insert(
backend.to_string(),
NamedS3StorageConfig {
endpoint: Some("http://localhost:4566".to_string()),
region: Some("us-east-1".to_string()),
force_path_style_access: true,
..Default::default()
},
);
}
let storage_config = S3StorageConfig {
named,
..Default::default()
};
let factory = S3CompatibleObjectStorageFactory::new(storage_config);

// Distinct named backends each resolve into their own cached cell.
factory
.resolve(&Uri::for_test("s3+alt://bucket/a"))
.await
.unwrap();
factory
.resolve(&Uri::for_test("s3+other://bucket/b"))
.await
.unwrap();
// Re-resolving a backend reuses the cached, initialized cell.
factory
.resolve(&Uri::for_test("s3+alt://bucket/c"))
.await
.unwrap();

let clients = factory
.named_s3_clients
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
assert_eq!(clients.len(), 2);
assert!(clients.get("alt").unwrap().initialized());
assert!(clients.get("other").unwrap().initialized());
}
}