diff --git a/Cargo.lock b/Cargo.lock index 94ca27eca1..40580b5ec0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -644,7 +644,7 @@ dependencies = [ "hyper", "hyper-util", "pin-project-lite", - "rustls 0.23.10", + "rustls 0.23.12", "rustls-pemfile", "tokio", "tokio-rustls 0.26.0", @@ -888,17 +888,6 @@ dependencies = [ "syn_derive", ] -[[package]] -name = "bstr" -version = "0.2.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223" -dependencies = [ - "lazy_static", - "memchr", - "regex-automata 0.1.10", -] - [[package]] name = "bstr" version = "1.9.1" @@ -906,6 +895,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05efc5cfd9110c8416e471df0e96702d58690178e206e61b7173706673c93706" dependencies = [ "memchr", + "regex-automata 0.4.7", "serde", ] @@ -1228,9 +1218,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.11" +version = "4.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35723e6a11662c2afb578bcf0b88bf6ea8e21282a953428f240574fcc3a2b5b3" +checksum = "0fbb260a053428790f3de475e304ff84cdbc4face759ea7a3e64c1edd938a7fc" dependencies = [ "clap_builder", "clap_derive", @@ -1238,9 +1228,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.11" +version = "4.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49eb96cbfa7cfa35017b7cd548c75b14c3118c98b423041d70562665e07fb0fa" +checksum = "64b17d7ea74e9f833c7dbf2cbe4fb12ff26783eda4782a8975b72f895c9b4d99" dependencies = [ "anstream", "anstyle", @@ -1260,9 +1250,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.11" +version = "4.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d029b67f89d30bbb547c89fd5161293c0aec155fc691d7924b64550662db93e" +checksum = "501d359d5f3dcaf6ecdeee48833ae73ec6e42723a1e52419c79abf9507eec0a0" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -1282,7 +1272,7 @@ version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eab6d70c534d5e54680aae99712800fca8d048cdfad3fbd154daf823d444f08b" dependencies = [ - "bstr 1.9.1", + "bstr", "bytes", "clickhouse-derive", "clickhouse-rs-cityhash-sys", @@ -3405,7 +3395,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57da3b9b5b85bd66f31093f8c408b90a74431672542466497dcbdfdc02034be1" dependencies = [ "aho-corasick", - "bstr 1.9.1", + "bstr", "log", "regex-automata 0.4.7", "regex-syntax 0.8.4", @@ -3513,7 +3503,7 @@ dependencies = [ "rsa", "rstest", "rstest_reuse", - "rustls 0.23.10", + "rustls 0.23.12", "serde", "serde_derive", "serde_json", @@ -3579,7 +3569,7 @@ dependencies = [ "opentelemetry-aws", "rand 0.8.5", "reqwest", - "rustls 0.23.10", + "rustls 0.23.12", "serde", "serde_json", "serde_with", @@ -4309,7 +4299,7 @@ dependencies = [ "http", "hyper", "hyper-util", - "rustls 0.23.10", + "rustls 0.23.12", "rustls-pki-types", "tokio", "tokio-rustls 0.26.0", @@ -4626,7 +4616,7 @@ dependencies = [ "runtime", "runtime-local", "runtime-noop", - "rustls 0.23.10", + "rustls 0.23.12", "secrecy", "serde", "serde_json", @@ -4852,7 +4842,7 @@ dependencies = [ "jwt-compact", "reqwest", "runtime", - "rustls 0.23.10", + "rustls 0.23.12", "secrecy", "serde", "serde_json", @@ -5020,9 +5010,9 @@ checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" [[package]] name = "libmimalloc-sys" -version = "0.1.38" +version = "0.1.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e7bb23d733dfcc8af652a78b7bf232f0e967710d044732185e561e47c0336b6" +checksum = "23aa6811d3bd4deb8a84dde645f943476d13b248d818edcf8ce0b2f37f036b44" dependencies = [ "cc", "libc", @@ -5345,9 +5335,9 @@ dependencies = [ [[package]] name = "mimalloc" -version = "0.1.42" +version = "0.1.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9186d86b79b52f4a77af65604b51225e8db1d6ee7e3f41aec1e40829c71a176" +checksum = "68914350ae34959d83f732418d51e2427a794055d0b9529f48259ac07af65633" dependencies = [ "libmimalloc-sys", ] @@ -6140,7 +6130,7 @@ dependencies = [ "insta", "registry-v2", "reqwest", - "rustls 0.23.10", + "rustls 0.23.12", "serde_json", "thiserror", "tokio", @@ -6323,9 +6313,9 @@ dependencies = [ [[package]] name = "pest_derive" -version = "2.7.10" +version = "2.7.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26293c9193fbca7b1a3bf9b79dc1e388e927e6cacaa78b4a3ab705a1d3d41459" +checksum = "2a548d2beca6773b1c244554d36fcf8548a8a58e74156968211567250e48e49a" dependencies = [ "pest", "pest_generator", @@ -6333,9 +6323,9 @@ dependencies = [ [[package]] name = "pest_generator" -version = "2.7.10" +version = "2.7.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ec22af7d3fb470a85dd2ca96b7c577a1eb4ef6f1683a9fe9a8c16e136c04687" +checksum = "3c93a82e8d145725dcbaf44e5ea887c8a869efdcc28706df2d08c69e17077183" dependencies = [ "pest", "pest_meta", @@ -6346,9 +6336,9 @@ dependencies = [ [[package]] name = "pest_meta" -version = "2.7.10" +version = "2.7.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7a240022f37c361ec1878d646fc5b7d7c4d28d5946e1a80ad5a7a4f4ca0bdcd" +checksum = "a941429fea7e08bedec25e4f6785b6ffaacc6b755da98df5ef3e7dcf4a124c4f" dependencies = [ "once_cell", "pest", @@ -6547,7 +6537,7 @@ dependencies = [ "indexmap 2.2.6", "itertools 0.13.0", "reqwest", - "rustls 0.23.10", + "rustls 0.23.12", "rustls-native-certs", "serde", "serde_json", @@ -6828,7 +6818,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls 0.23.10", + "rustls 0.23.12", "thiserror", "tokio", "tracing", @@ -6844,7 +6834,7 @@ dependencies = [ "rand 0.8.5", "ring", "rustc-hash", - "rustls 0.23.10", + "rustls 0.23.12", "slab", "thiserror", "tinyvec", @@ -7296,7 +7286,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls 0.23.10", + "rustls 0.23.12", "rustls-pemfile", "rustls-pki-types", "serde", @@ -7611,9 +7601,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.10" +version = "0.23.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05cff451f60db80f490f3c182b77c35260baace73209e9cdbbe526bfe3a4d402" +checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044" dependencies = [ "log", "once_cell", @@ -7626,9 +7616,9 @@ dependencies = [ [[package]] name = "rustls-native-certs" -version = "0.7.0" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792" +checksum = "a88d6d420651b496bdd98684116959239430022a115c1240e6c3993be0b15fba" dependencies = [ "openssl-probe", "rustls-pemfile", @@ -7655,9 +7645,9 @@ checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" [[package]] name = "rustls-webpki" -version = "0.102.4" +version = "0.102.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff448f7e92e913c4b7d4c6d8e4540a1724b319b4152b8aef6d4cf8339712b33e" +checksum = "8e6b52d4fda176fd835fdc55a835d4a89b8499cad995885a21149d5ad62f852e" dependencies = [ "ring", "rustls-pki-types", @@ -8135,11 +8125,11 @@ checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" [[package]] name = "similar" -version = "2.5.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa42c91313f1d05da9b26f267f931cf178d4aba455b4c4622dd7355eb80c6640" +checksum = "1de1d4f81173b03af4c0cbed3c898f6bff5b870e4a7f5d6f4057d62a7a4b686e" dependencies = [ - "bstr 0.2.17", + "bstr", "unicode-segmentation", ] @@ -8905,7 +8895,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04fb792ccd6bbcd4bba408eb8a292f70fc4a3589e5d793626f45190e6454b6ab" dependencies = [ "ring", - "rustls 0.23.10", + "rustls 0.23.12", "tokio", "tokio-postgres", "tokio-rustls 0.26.0", @@ -8940,7 +8930,7 @@ version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls 0.23.10", + "rustls 0.23.12", "rustls-pki-types", "tokio", ] @@ -8985,14 +8975,14 @@ dependencies = [ [[package]] name = "toml" -version = "0.8.14" +version = "0.8.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f49eb2ab21d2f26bd6db7bf383edc527a7ebaee412d17af4d40fdccd442f335" +checksum = "ac2caab0bf757388c6c0ae23b3293fdb463fee59434529014f85e3263b995c28" dependencies = [ "serde", "serde_spanned", "toml_datetime", - "toml_edit 0.22.14", + "toml_edit 0.22.16", ] [[package]] @@ -9028,9 +9018,9 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.22.14" +version = "0.22.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f21c7aaf97f1bd9ca9d4f9e73b0a6c74bd5afef56f2bc931943a6e1c37e04e38" +checksum = "278f3d518e152219c994ce877758516bca5e118eaed6996192a774fb9fbf0788" dependencies = [ "indexmap 2.2.6", "serde", @@ -9267,7 +9257,7 @@ dependencies = [ "httparse", "log", "rand 0.8.5", - "rustls 0.23.10", + "rustls 0.23.12", "rustls-pki-types", "sha1", "thiserror", @@ -9321,9 +9311,9 @@ checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" [[package]] name = "ulid" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34778c17965aa2a08913b57e1f34db9b4a63f5de31768b55bf20d2795f921259" +checksum = "04f903f293d11f31c0c29e4148f6dc0d033a7f80cebc0282bea147611667d289" dependencies = [ "getrandom 0.2.15", "rand 0.8.5", @@ -9456,9 +9446,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.8.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" +checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" dependencies = [ "getrandom 0.2.15", ] diff --git a/engine/crates/engine-config-builder/src/from_sdl_config.rs b/engine/crates/engine-config-builder/src/from_sdl_config.rs index 7102956d3d..2540032e32 100644 --- a/engine/crates/engine-config-builder/src/from_sdl_config.rs +++ b/engine/crates/engine-config-builder/src/from_sdl_config.rs @@ -44,7 +44,7 @@ pub fn build_with_sdl_config(config: &FederatedGraphConfig, graph: FederatedGrap rate_limit: context.rate_limit, timeout: config.timeout, entity_caching: match config.entity_caching { - EntityCachingConfig::Enabled { ttl } => EntityCaching::Enabled { ttl }, + EntityCachingConfig::Enabled { ttl, .. } => EntityCaching::Enabled { ttl }, _ => EntityCaching::Disabled, }, }) @@ -211,7 +211,7 @@ impl<'a> BuildContext<'a> { retry, entity_caching: entity_caching.as_ref().map(|config| match config { EntityCachingConfig::Disabled => EntityCaching::Disabled, - EntityCachingConfig::Enabled { ttl } => EntityCaching::Enabled { ttl: *ttl }, + EntityCachingConfig::Enabled { ttl, .. } => EntityCaching::Enabled { ttl: *ttl }, }), }, ); diff --git a/engine/crates/engine-v2/src/operation/metrics.rs b/engine/crates/engine-v2/src/operation/metrics.rs index 3c2cb23360..ffd7de3c45 100644 --- a/engine/crates/engine-v2/src/operation/metrics.rs +++ b/engine/crates/engine-v2/src/operation/metrics.rs @@ -17,6 +17,7 @@ pub(super) fn prepare_metrics_attributes( }), sanitized_query_hash: blake3::hash(sanitized_query.as_bytes()).into(), sanitized_query, + // Added after the binding step used_fields: String::new(), }) } diff --git a/engine/crates/engine/response/src/lib.rs b/engine/crates/engine/response/src/lib.rs index 49cd3414ea..a5479beae9 100644 --- a/engine/crates/engine/response/src/lib.rs +++ b/engine/crates/engine/response/src/lib.rs @@ -24,6 +24,8 @@ mod streaming; pub struct GraphqlOperationAnalyticsAttributes { pub name: Option, pub r#type: common_types::OperationType, + #[serde(default)] + pub used_fields: String, } /// Query response @@ -60,18 +62,11 @@ impl Response { /// Create a new successful response with the data. #[must_use] - pub fn new( - mut data: QueryResponse, - operation_name: Option<&str>, - operation_type: common_types::OperationType, - ) -> Self { + pub fn new(mut data: QueryResponse, graphql_operation: GraphqlOperationAnalyticsAttributes) -> Self { data.shrink_to_fit(); Self { data, - graphql_operation: Some(GraphqlOperationAnalyticsAttributes { - name: operation_name.map(str::to_owned), - r#type: operation_type, - }), + graphql_operation: Some(graphql_operation), ..Default::default() } } @@ -114,6 +109,7 @@ impl Response { OperationType::Mutation => common_types::OperationType::Mutation, OperationType::Subscription => common_types::OperationType::Subscription, }, + used_fields: String::new(), }), ..Default::default() } diff --git a/engine/crates/engine/src/schema.rs b/engine/crates/engine/src/schema.rs index 690dbaaf85..402c68f055 100644 --- a/engine/crates/engine/src/schema.rs +++ b/engine/crates/engine/src/schema.rs @@ -273,7 +273,7 @@ impl Schema { _ => None, }); - let operation_analytics_attributes = if let Some(operation_name) = request.operation_name() { + let mut operation_analytics_attributes = if let Some(operation_name) = request.operation_name() { match &document.operations { DocumentOperations::Multiple(operations) => { operations @@ -281,6 +281,7 @@ impl Schema { .map(|operation| GraphqlOperationAnalyticsAttributes { name: Some(operation_name.to_string()), r#type: response_operation_for_definition(&operation.node), + used_fields: String::new(), }) } _ => None, @@ -290,12 +291,14 @@ impl Schema { DocumentOperations::Single(operation) => Some(GraphqlOperationAnalyticsAttributes { name: engine_parser::find_first_field_name(&document.fragments, &operation.node.selection_set), r#type: response_operation_for_definition(&operation.node), + used_fields: String::new(), }), DocumentOperations::Multiple(operations) if operations.len() == 1 => { let (operation_name, operation) = operations.iter().next().unwrap(); Some(GraphqlOperationAnalyticsAttributes { name: Some(operation_name.to_string()), r#type: response_operation_for_definition(&operation.node), + used_fields: String::new(), }) } _ => None, @@ -303,7 +306,7 @@ impl Schema { }; // check rules - let validation_result = { + let mut validation_result = { let validation_fut = async { check_strict_rules(&self.env.registry, &document, Some(&request.variables)) .map_err(|errors| errors.into_iter().map(ServerError::from).collect()) @@ -314,6 +317,9 @@ impl Schema { Err(errors) => return Err((operation_analytics_attributes, errors)), } }; + if let Some(ref mut attrs) = operation_analytics_attributes { + attrs.used_fields = std::mem::take(&mut validation_result.used_fields).unwrap_or_default(); + } let mut operation = if let Some(operation_name) = request.operation_name() { match document.operations { @@ -621,12 +627,11 @@ impl Schema { schema.env.operation_metrics.record( grafbase_telemetry::metrics::GraphqlRequestMetricsAttributes { operation: OperationMetricsAttributes { - ty: env.operation.ty.into(), - name: env.operation_analytics_attributes.name.clone(), - sanitized_query_hash: blake3::hash(sanitized_query.as_bytes()).into(), - sanitized_query, - used_fields: String::new() - + ty: env.operation.ty.into(), + name: env.operation_analytics_attributes.name.clone(), + sanitized_query_hash: blake3::hash(sanitized_query.as_bytes()).into(), + sanitized_query, + used_fields: env.operation_analytics_attributes.used_fields.clone(), }, status, cache_status: None, diff --git a/engine/crates/gateway-core/src/cache/partial.rs b/engine/crates/gateway-core/src/cache/partial.rs index e48ee215bb..eab02beb3c 100644 --- a/engine/crates/gateway-core/src/cache/partial.rs +++ b/engine/crates/gateway-core/src/cache/partial.rs @@ -68,7 +68,15 @@ where } Ok(Arc::new( - engine::Response::new(body, request.operation_name(), operation_type).http_headers(headers), + engine::Response::new( + body, + engine::GraphqlOperationAnalyticsAttributes { + name: request.operation_name().map(str::to_string), + r#type: operation_type, + used_fields: String::new(), + }, + ) + .http_headers(headers), )) } } @@ -118,7 +126,15 @@ where .await; } - let response = engine::Response::new(body, request.operation_name(), operation_type).http_headers(headers); + let response = engine::Response::new( + body, + engine::GraphqlOperationAnalyticsAttributes { + name: request.operation_name().map(str::to_string), + r#type: operation_type, + used_fields: String::new(), + }, + ) + .http_headers(headers); Ok(stream::once(async move { engine::StreamingPayload::InitialResponse(InitialResponse { diff --git a/engine/crates/gateway-core/src/lib.rs b/engine/crates/gateway-core/src/lib.rs index 1c90f0338d..cf0adf7945 100644 --- a/engine/crates/gateway-core/src/lib.rs +++ b/engine/crates/gateway-core/src/lib.rs @@ -220,7 +220,7 @@ where name: operation.name.clone(), sanitized_query_hash: blake3::hash(normalized_query.as_bytes()).into(), sanitized_query: normalized_query, - used_fields: String::new(), + used_fields: operation.used_fields.clone(), }, status, cache_status: headers diff --git a/engine/crates/parser-sdl/src/federation.rs b/engine/crates/parser-sdl/src/federation.rs index 0fa53b28b4..2831a96748 100644 --- a/engine/crates/parser-sdl/src/federation.rs +++ b/engine/crates/parser-sdl/src/federation.rs @@ -61,20 +61,62 @@ pub enum EntityCachingConfig { Disabled, Enabled { ttl: Option, + storage: EntityCacheStorage, }, } +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Default)] +pub enum EntityCacheStorage { + #[default] + Memory, + Redis(RedisConfig), +} + impl From for EntityCachingConfig { fn from(config: gateway_config::EntityCachingConfig) -> Self { match (config.enabled, config.ttl) { (Some(false), _) => EntityCachingConfig::Disabled, - (Some(true), ttl) => EntityCachingConfig::Enabled { ttl }, - (_, Some(ttl)) => EntityCachingConfig::Enabled { ttl: Some(ttl) }, + (Some(true), ttl) => EntityCachingConfig::Enabled { + ttl, + storage: entity_cache_storage(config.storage, config.redis), + }, + (_, Some(ttl)) => EntityCachingConfig::Enabled { + ttl: Some(ttl), + storage: entity_cache_storage(config.storage, config.redis), + }, _ => EntityCachingConfig::Disabled, } } } +fn entity_cache_storage( + storage: gateway_config::EntityCachingStorage, + redis: Option, +) -> EntityCacheStorage { + match storage { + gateway_config::EntityCachingStorage::Memory => EntityCacheStorage::Memory, + gateway_config::EntityCachingStorage::Redis => EntityCacheStorage::Redis(redis.unwrap_or_default().into()), + } +} + +impl From for RedisConfig { + fn from(value: gateway_config::EntityCachingRedisConfig) -> Self { + let gateway_config::EntityCachingRedisConfig { url, key_prefix, tls } = value; + RedisConfig { + url, + key_prefix, + tls: tls.map(Into::into), + } + } +} + +impl From for RedisTlsConfig { + fn from(value: gateway_config::EntityCachingRedisTlsConfig) -> Self { + let gateway_config::EntityCachingRedisTlsConfig { cert, key, ca } = value; + RedisTlsConfig { cert, key, ca } + } +} + impl From<(String, ConnectorHeaderValue)> for SubgraphHeaderRule { fn from((name, value): (String, ConnectorHeaderValue)) -> Self { match value { @@ -100,7 +142,7 @@ pub struct GraphRateLimit { pub struct RateLimitConfig { pub global: Option, pub storage: RateLimitStorage, - pub redis: RateLimitRedisConfig, + pub redis: RedisConfig, } impl From for RateLimitConfig { @@ -131,7 +173,7 @@ impl From for RateLimitStorage { } } -impl From for RateLimitRedisConfig { +impl From for RedisConfig { fn from(value: gateway_config::RateLimitRedisConfig) -> Self { Self { url: value.url, @@ -141,7 +183,7 @@ impl From for RateLimitRedisConfig { } } -impl From for RateLimitRedisTlsConfig { +impl From for RedisTlsConfig { fn from(value: gateway_config::RateLimitRedisTlsConfig) -> Self { Self { cert: value.cert, @@ -158,14 +200,14 @@ pub enum RateLimitStorage { } #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] -pub struct RateLimitRedisConfig { +pub struct RedisConfig { pub url: url::Url, pub key_prefix: String, - pub tls: Option, + pub tls: Option, } #[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)] -pub struct RateLimitRedisTlsConfig { +pub struct RedisTlsConfig { pub cert: Option, pub key: Option, pub ca: Option, @@ -201,7 +243,8 @@ mod tests { assert_eq!( EntityCachingConfig::from(config.entity_caching), EntityCachingConfig::Enabled { - ttl: Some(Duration::from_secs(60)) + ttl: Some(Duration::from_secs(60)), + storage: Default::default(), } ) } @@ -218,7 +261,8 @@ mod tests { assert_eq!( EntityCachingConfig::from(config.subgraphs.remove("products").unwrap().entity_caching.unwrap()), EntityCachingConfig::Enabled { - ttl: Some(Duration::from_secs(60)) + ttl: Some(Duration::from_secs(60)), + storage: Default::default() } ) } @@ -234,7 +278,10 @@ mod tests { assert_eq!( EntityCachingConfig::from(config.subgraphs.remove("products").unwrap().entity_caching.unwrap()), - EntityCachingConfig::Enabled { ttl: None } + EntityCachingConfig::Enabled { + ttl: None, + storage: Default::default() + } ) } diff --git a/engine/crates/parser-sdl/src/rules/subgraph_directive.rs b/engine/crates/parser-sdl/src/rules/subgraph_directive.rs index fe80024a51..7011705b59 100644 --- a/engine/crates/parser-sdl/src/rules/subgraph_directive.rs +++ b/engine/crates/parser-sdl/src/rules/subgraph_directive.rs @@ -190,18 +190,18 @@ impl Visitor<'_> for SubgraphDirectiveVisitor { subgraph.development_url = Some(url.to_string()) } - if let Some(enabled) = directive.entity_caching_enabled { - if enabled { - subgraph.entity_caching = Some(EntityCachingConfig::Enabled { ttl: None }); - } else { - subgraph.entity_caching = Some(EntityCachingConfig::Disabled); - } - } - - if let Some(ttl) = directive.entity_caching_ttl { - // If there's a ttl we always enable - subgraph.entity_caching = Some(EntityCachingConfig::Enabled { ttl: Some(ttl) }); - } + subgraph.entity_caching = match (directive.entity_caching_enabled, directive.entity_caching_ttl) { + (Some(false), _) => Some(EntityCachingConfig::Disabled), + (Some(true), ttl) => Some(EntityCachingConfig::Enabled { + ttl, + storage: Default::default(), + }), + (_, Some(ttl)) => Some(EntityCachingConfig::Enabled { + ttl: Some(ttl), + storage: Default::default(), + }), + _ => None, + }; subgraph.header_rules.extend( directive diff --git a/engine/crates/runtime-local/src/lib.rs b/engine/crates/runtime-local/src/lib.rs index 13bc1b21f5..f6043c3f22 100644 --- a/engine/crates/runtime-local/src/lib.rs +++ b/engine/crates/runtime-local/src/lib.rs @@ -8,6 +8,8 @@ mod kv; mod log; mod pg; pub mod rate_limiting; +#[cfg(feature = "redis")] +pub mod redis; mod ufd_invoker; pub use bridge::Bridge; diff --git a/engine/crates/runtime-local/src/rate_limiting/redis.rs b/engine/crates/runtime-local/src/rate_limiting/redis.rs index 9dd8fce828..dc7644a8e6 100644 --- a/engine/crates/runtime-local/src/rate_limiting/redis.rs +++ b/engine/crates/runtime-local/src/rate_limiting/redis.rs @@ -1,20 +1,18 @@ -mod pool; +use std::time::{Duration, SystemTime}; -use std::{ - fs::File, - io::{BufReader, Read}, - time::{Duration, SystemTime}, -}; - -use anyhow::Context; -use deadpool::managed::Pool; use futures_util::future::BoxFuture; -use gateway_config::{Config, RateLimitRedisConfig}; +use gateway_config::Config; use grafbase_telemetry::span::GRAFBASE_TARGET; -use redis::ClientTlsConfig; use runtime::rate_limiting::{Error, RateLimitKey, RateLimiter, RateLimiterContext}; use tokio::sync::watch; +use crate::redis::Pool; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub struct RateLimitRedisConfig<'a> { + pub key_prefix: &'a str, +} + /// Rate limiter by utilizing Redis as a backend. It uses a averaging fixed window algorithm /// to define is the limit reached or not. /// @@ -30,86 +28,26 @@ use tokio::sync::watch; /// A request must have a unique access to a connection, which means utilizing a connection /// pool. pub struct RedisRateLimiter { - pool: Pool, + pool: Pool, key_prefix: String, config_watcher: watch::Receiver, } impl RedisRateLimiter { pub async fn runtime( - config: &RateLimitRedisConfig, + config: RateLimitRedisConfig<'_>, + pool: Pool, watcher: watch::Receiver, ) -> anyhow::Result { - Ok(RateLimiter::new(Self::new(config, watcher).await?)) + let inner = Self::new(config, pool, watcher).await?; + Ok(RateLimiter::new(inner)) } pub async fn new( - config: &RateLimitRedisConfig, + config: RateLimitRedisConfig<'_>, + pool: Pool, watcher: watch::Receiver, ) -> anyhow::Result { - let tls_config = match &config.tls { - Some(tls) => { - let client_tls = match tls.cert.as_ref().zip(tls.key.as_ref()) { - Some((cert, key)) => { - let mut client_cert = Vec::new(); - - File::open(cert) - .and_then(|file| BufReader::new(file).read_to_end(&mut client_cert)) - .context("loading the Redis client certificate")?; - - let mut client_key = Vec::new(); - - File::open(key) - .and_then(|file| BufReader::new(file).read_to_end(&mut client_key)) - .context("loading the Redis client key")?; - - Some(ClientTlsConfig { - client_cert, - client_key, - }) - } - None => None, - }; - - let root_cert = match &tls.ca { - Some(path) => { - let mut ca = Vec::new(); - - File::open(path) - .and_then(|file| BufReader::new(file).read_to_end(&mut ca)) - .context("loading the Redis CA certificate")?; - - Some(ca) - } - None => None, - }; - - Some(pool::TlsConfig { client_tls, root_cert }) - } - None => None, - }; - - let manager = match pool::Manager::new(config.url.as_str(), tls_config) { - Ok(manager) => manager, - Err(e) => { - tracing::error!(target: GRAFBASE_TARGET, "error creating a Redis pool: {e}"); - return Err(e.into()); - } - }; - - let pool = match Pool::builder(manager) - .wait_timeout(Some(Duration::from_secs(5))) - .create_timeout(Some(Duration::from_secs(10))) - .runtime(deadpool::Runtime::Tokio1) - .build() - { - Ok(pool) => pool, - Err(e) => { - tracing::error!(target: GRAFBASE_TARGET, "error creating a Redis pool: {e}"); - return Err(e.into()); - } - }; - Ok(Self { pool, key_prefix: config.key_prefix.to_string(), @@ -215,7 +153,7 @@ impl RedisRateLimiter { } } -async fn incr_counter(pool: Pool, current_bucket: String, expire: Duration) -> Result<(), Error> { +async fn incr_counter(pool: Pool, current_bucket: String, expire: Duration) -> Result<(), Error> { let mut conn = match pool.get().await { Ok(conn) => conn, Err(error) => { diff --git a/engine/crates/runtime-local/src/redis.rs b/engine/crates/runtime-local/src/redis.rs new file mode 100644 index 0000000000..27376bf5ca --- /dev/null +++ b/engine/crates/runtime-local/src/redis.rs @@ -0,0 +1,132 @@ +mod pool; + +use std::{ + collections::{hash_map::Entry, HashMap}, + fs::File, + io::{BufReader, Read}, + path::{Path, PathBuf}, + time::Duration, +}; + +use anyhow::Context; +use grafbase_telemetry::span::GRAFBASE_TARGET; +use redis::ClientTlsConfig; + +pub type Pool = deadpool::managed::Pool; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub struct RedisTlsConfig<'a> { + pub cert: Option<&'a Path>, + pub key: Option<&'a Path>, + pub ca: Option<&'a Path>, +} + +#[derive(PartialEq, Eq, Hash, Default)] +struct RedisConfigKey { + url: String, + cert: Option, + key: Option, + ca: Option, +} + +/// A deduplicating factory for redis connection pools. +/// +/// If you ask it to create a pool with the same details twice it will return the same pool +#[derive(Default)] +pub struct RedisPoolFactory { + pools: HashMap, +} + +impl RedisPoolFactory { + pub fn pool(&mut self, url: &str, tls_config: Option>) -> anyhow::Result { + let key = { + let mut config_key = RedisConfigKey { + url: url.to_string(), + ..Default::default() + }; + if let Some(RedisTlsConfig { cert, key, ca }) = tls_config { + config_key = RedisConfigKey { + cert: cert.map(ToOwned::to_owned), + key: key.map(ToOwned::to_owned), + ca: ca.map(ToOwned::to_owned), + ..config_key + }; + } + config_key + }; + + match self.pools.entry(key) { + Entry::Occupied(entry) => Ok(entry.get().clone()), + Entry::Vacant(entry) => { + let pool = new_pool(url, tls_config)?; + entry.insert(pool.clone()); + Ok(pool) + } + } + } +} + +fn new_pool(url: &str, tls_config: Option>) -> anyhow::Result { + let tls_config = match tls_config { + Some(tls) => { + let client_tls = match tls.cert.zip(tls.key) { + Some((cert, key)) => { + let mut client_cert = Vec::new(); + + File::open(cert) + .and_then(|file| BufReader::new(file).read_to_end(&mut client_cert)) + .context("loading the Redis client certificate")?; + + let mut client_key = Vec::new(); + + File::open(key) + .and_then(|file| BufReader::new(file).read_to_end(&mut client_key)) + .context("loading the Redis client key")?; + + Some(ClientTlsConfig { + client_cert, + client_key, + }) + } + None => None, + }; + + let root_cert = match tls.ca { + Some(path) => { + let mut ca = Vec::new(); + + File::open(path) + .and_then(|file| BufReader::new(file).read_to_end(&mut ca)) + .context("loading the Redis CA certificate")?; + + Some(ca) + } + None => None, + }; + + Some(pool::TlsConfig { client_tls, root_cert }) + } + None => None, + }; + + let manager = match pool::Manager::new(url, tls_config) { + Ok(manager) => manager, + Err(e) => { + tracing::error!(target: GRAFBASE_TARGET, "error creating a Redis pool: {e}"); + return Err(e.into()); + } + }; + + match Pool::builder(manager) + .wait_timeout(Some(Duration::from_secs(5))) + .create_timeout(Some(Duration::from_secs(10))) + .runtime(deadpool::Runtime::Tokio1) + .build() + { + Ok(pool) => Ok(pool), + Err(e) => { + tracing::error!(target: GRAFBASE_TARGET, "error creating a Redis pool: {e}"); + Err(e.into()) + } + } +} diff --git a/engine/crates/runtime-local/src/rate_limiting/redis/pool.rs b/engine/crates/runtime-local/src/redis/pool.rs similarity index 95% rename from engine/crates/runtime-local/src/rate_limiting/redis/pool.rs rename to engine/crates/runtime-local/src/redis/pool.rs index 9104101a65..36e411867d 100644 --- a/engine/crates/runtime-local/src/rate_limiting/redis/pool.rs +++ b/engine/crates/runtime-local/src/redis/pool.rs @@ -14,7 +14,7 @@ pub(super) struct TlsConfig { } impl Manager { - pub fn new(url: &str, tls: Option) -> RedisResult { + pub(super) fn new(url: &str, tls: Option) -> RedisResult { let client = match tls { Some(config) => Client::build_with_tls( url, diff --git a/gateway/crates/config/src/entity_caching.rs b/gateway/crates/config/src/entity_caching.rs index d5be7e11f5..ccb199a80f 100644 --- a/gateway/crates/config/src/entity_caching.rs +++ b/gateway/crates/config/src/entity_caching.rs @@ -1,10 +1,62 @@ -use std::time::Duration; +use std::{path::PathBuf, time::Duration}; #[derive(Debug, Default, serde::Deserialize, Clone, PartialEq)] pub struct EntityCachingConfig { pub enabled: Option, + #[serde(default)] + pub storage: EntityCachingStorage, + + #[serde(default)] + pub redis: Option, + /// The ttl to store cache entries with. Defaults to 60s #[serde(deserialize_with = "duration_str::deserialize_option_duration", default)] pub ttl: Option, } + +#[derive(Debug, Clone, Default, PartialEq, serde::Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum EntityCachingStorage { + #[default] + Memory, + Redis, +} + +#[derive(Debug, Clone, PartialEq, serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct EntityCachingRedisConfig { + #[serde(default = "EntityCachingRedisConfig::default_url")] + pub url: url::Url, + #[serde(default = "EntityCachingRedisConfig::default_key_prefix")] + pub key_prefix: String, + pub tls: Option, +} + +impl Default for EntityCachingRedisConfig { + fn default() -> Self { + Self { + url: Self::default_url(), + key_prefix: Self::default_key_prefix(), + tls: None, + } + } +} + +impl EntityCachingRedisConfig { + fn default_url() -> url::Url { + url::Url::parse("redis://localhost:6379").expect("must be correct") + } + + fn default_key_prefix() -> String { + String::from("grafbase-cache") + } +} + +#[derive(Debug, Clone, PartialEq, Default, serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct EntityCachingRedisTlsConfig { + pub cert: Option, + pub key: Option, + pub ca: Option, +} diff --git a/gateway/crates/federated-server/src/server/gateway.rs b/gateway/crates/federated-server/src/server/gateway.rs index 7e86f9c900..e72659f87e 100644 --- a/gateway/crates/federated-server/src/server/gateway.rs +++ b/gateway/crates/federated-server/src/server/gateway.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use runtime_local::rate_limiting::in_memory::key_based::InMemoryRateLimiter; use runtime_local::rate_limiting::redis::RedisRateLimiter; +use runtime_local::redis::{RedisPoolFactory, RedisTlsConfig}; use tokio::sync::watch; use engine_v2::Engine; @@ -64,12 +65,30 @@ pub(super) async fn generate( runtime::trusted_documents_client::Client::new(NoopTrustedDocuments) }; + let mut redis_factory = RedisPoolFactory::default(); + let watcher = ConfigWatcher::init(gateway_config.clone(), hot_reload_config_path)?; - let rate_limiter = match &gateway_config.gateway.rate_limit { - Some(config) if config.storage.is_redis() => RedisRateLimiter::runtime(&config.redis, watcher) - .await - .map_err(|e| crate::Error::InternalError(e.to_string()))?, + let rate_limiter = match config.rate_limit_config() { + Some(config) if config.storage.is_redis() => { + let tls = config.redis.tls.map(|tls| RedisTlsConfig { + cert: tls.cert, + key: tls.key, + ca: tls.ca, + }); + + let pool = redis_factory + .pool(config.redis.url, tls) + .map_err(|e| crate::Error::InternalError(e.to_string()))?; + + let global_config = runtime_local::rate_limiting::redis::RateLimitRedisConfig { + key_prefix: config.redis.key_prefix, + }; + + RedisRateLimiter::runtime(global_config, pool, watcher) + .await + .map_err(|e| crate::Error::InternalError(e.to_string()))? + } _ => InMemoryRateLimiter::runtime_with_watcher(watcher), };