From 1fdd0593cb8f61aaee65f11bfb08b078ec2ca69f Mon Sep 17 00:00:00 2001 From: "cong.xie" Date: Fri, 6 Feb 2026 14:19:44 -0500 Subject: [PATCH 1/5] feat(search): add skip_aggregation_finalization to SearchRequest Add optional bool field to SearchRequest that, when true, skips finalization of aggregation results in root.rs and returns the raw IntermediateAggregationResults bytes instead. This enables callers to handle intermediate results directly for multi-step query merging. --- quickwit/quickwit-proto/protos/quickwit/search.proto | 4 ++++ .../quickwit-proto/src/codegen/quickwit/quickwit.search.rs | 4 ++++ quickwit/quickwit-search/src/root.rs | 4 ++++ quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs | 1 + quickwit/quickwit-serve/src/search_api/rest_handler.rs | 1 + 5 files changed, 14 insertions(+) diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 04fa0cedf2e..93cd0ae2da6 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -247,6 +247,10 @@ message SearchRequest { // When an exact index ID is provided (not a pattern), the query fails only if // that index is not found and this parameter is set to `false`. bool ignore_missing_indexes = 18; + + // When true, skip finalization of aggregation results and return + // the raw IntermediateAggregationResults bytes instead. + optional bool skip_aggregation_finalization = 19; } enum CountHits { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index e1201ce7a0e..5da02bea06d 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -185,6 +185,10 @@ pub struct SearchRequest { /// that index is not found and this parameter is set to `false`. #[prost(bool, tag = "18")] pub ignore_missing_indexes: bool, + /// When true, skip finalization of aggregation results and return + /// the raw IntermediateAggregationResults bytes instead. + #[prost(bool, optional, tag = "19")] + pub skip_aggregation_finalization: ::core::option::Option, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 581665fa14f..80706df4e61 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -372,6 +372,7 @@ fn simplify_search_request_for_scroll_api(req: &SearchRequest) -> crate::Result< // to recompute it afterward. count_hits: quickwit_proto::search::CountHits::Underestimate as i32, ignore_missing_indexes: req.ignore_missing_indexes, + skip_aggregation_finalization: None, }) } @@ -1062,6 +1063,9 @@ fn finalize_aggregation_if_any( let Some(aggregations_json) = search_request.aggregation_request.as_ref() else { return Ok(None); }; + if search_request.skip_aggregation_finalization == Some(true) { + return Ok(intermediate_aggregation_result_bytes_opt); + } let aggregations: QuickwitAggregations = serde_json::from_str(aggregations_json)?; let aggregation_result_postcard = finalize_aggregation( intermediate_aggregation_result_bytes_opt, diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs index db14c37700a..55d8876ca3b 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs @@ -412,6 +412,7 @@ fn build_request_for_es_api( search_after, count_hits, ignore_missing_indexes, + skip_aggregation_finalization: None, }, has_doc_id_field, )) diff --git a/quickwit/quickwit-serve/src/search_api/rest_handler.rs b/quickwit/quickwit-serve/src/search_api/rest_handler.rs index cfdf46c61ef..c0313379b64 100644 --- a/quickwit/quickwit-serve/src/search_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/search_api/rest_handler.rs @@ -265,6 +265,7 @@ pub fn search_request_from_api_request( search_after: None, count_hits: search_request.count_all.into(), ignore_missing_indexes: false, + skip_aggregation_finalization: None, }; Ok(search_request) } From 144f680a1077ea98c6edc215bb3307dafff45fc9 Mon Sep 17 00:00:00 2001 From: "cong.xie" Date: Fri, 6 Feb 2026 14:56:54 -0500 Subject: [PATCH 2/5] test(search): add unit tests for finalize_aggregation_if_any with skip_aggregation_finalization --- quickwit/quickwit-search/src/root.rs | 97 ++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 80706df4e61..9aa665611c7 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -5305,4 +5305,101 @@ mod tests { assert!(matches!(search_error, SearchError::InvalidArgument { .. })); Ok(()) } + + #[test] + fn test_finalize_aggregation_if_any_no_aggregation_request() { + let search_request = SearchRequest { + aggregation_request: None, + skip_aggregation_finalization: None, + ..Default::default() + }; + let searcher_context = SearcherContext::for_test(); + let result = finalize_aggregation_if_any( + &search_request, + Some(vec![1, 2, 3]), + &searcher_context, + ) + .unwrap(); + assert!(result.is_none()); + } + + #[test] + fn test_finalize_aggregation_if_any_skip_finalization_returns_intermediate_bytes() { + let agg_req = r#"{"avg_price": {"avg": {"field": "price"}}}"#; + let intermediate_bytes = vec![42, 43, 44]; + let search_request = SearchRequest { + aggregation_request: Some(agg_req.to_string()), + skip_aggregation_finalization: Some(true), + ..Default::default() + }; + let searcher_context = SearcherContext::for_test(); + let result = finalize_aggregation_if_any( + &search_request, + Some(intermediate_bytes.clone()), + &searcher_context, + ) + .unwrap(); + assert_eq!(result, Some(intermediate_bytes)); + } + + #[test] + fn test_finalize_aggregation_if_any_skip_finalization_none_bytes() { + let agg_req = r#"{"avg_price": {"avg": {"field": "price"}}}"#; + let search_request = SearchRequest { + aggregation_request: Some(agg_req.to_string()), + skip_aggregation_finalization: Some(true), + ..Default::default() + }; + let searcher_context = SearcherContext::for_test(); + let result = finalize_aggregation_if_any( + &search_request, + None, + &searcher_context, + ) + .unwrap(); + assert!(result.is_none()); + } + + #[test] + fn test_finalize_aggregation_if_any_default_finalizes() { + let agg_req = r#"{"avg_price": {"avg": {"field": "price"}}}"#; + let intermediate_results = IntermediateAggregationResults::default(); + let intermediate_bytes = postcard::to_stdvec(&intermediate_results).unwrap(); + let search_request = SearchRequest { + aggregation_request: Some(agg_req.to_string()), + skip_aggregation_finalization: None, + ..Default::default() + }; + let searcher_context = SearcherContext::for_test(); + let result = finalize_aggregation_if_any( + &search_request, + Some(intermediate_bytes.clone()), + &searcher_context, + ) + .unwrap(); + // Result should be Some (finalized), but different from intermediate bytes + assert!(result.is_some()); + assert_ne!(result.unwrap(), intermediate_bytes); + } + + #[test] + fn test_finalize_aggregation_if_any_false_flag_finalizes() { + let agg_req = r#"{"avg_price": {"avg": {"field": "price"}}}"#; + let intermediate_results = IntermediateAggregationResults::default(); + let intermediate_bytes = postcard::to_stdvec(&intermediate_results).unwrap(); + let search_request = SearchRequest { + aggregation_request: Some(agg_req.to_string()), + skip_aggregation_finalization: Some(false), + ..Default::default() + }; + let searcher_context = SearcherContext::for_test(); + let result = finalize_aggregation_if_any( + &search_request, + Some(intermediate_bytes.clone()), + &searcher_context, + ) + .unwrap(); + assert!(result.is_some()); + assert_ne!(result.unwrap(), intermediate_bytes); + } } From cbf050b785934f600b9e96f6352a0e543be5d26d Mon Sep 17 00:00:00 2001 From: "cong.xie" Date: Fri, 6 Feb 2026 15:04:57 -0500 Subject: [PATCH 3/5] fix fmt --- quickwit/quickwit-search/src/root.rs | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 9aa665611c7..4549b8fcf7d 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -5314,12 +5314,9 @@ mod tests { ..Default::default() }; let searcher_context = SearcherContext::for_test(); - let result = finalize_aggregation_if_any( - &search_request, - Some(vec![1, 2, 3]), - &searcher_context, - ) - .unwrap(); + let result = + finalize_aggregation_if_any(&search_request, Some(vec![1, 2, 3]), &searcher_context) + .unwrap(); assert!(result.is_none()); } @@ -5351,12 +5348,7 @@ mod tests { ..Default::default() }; let searcher_context = SearcherContext::for_test(); - let result = finalize_aggregation_if_any( - &search_request, - None, - &searcher_context, - ) - .unwrap(); + let result = finalize_aggregation_if_any(&search_request, None, &searcher_context).unwrap(); assert!(result.is_none()); } From 82186511d3a6192d47c8b4ac5007a63823ca67c9 Mon Sep 17 00:00:00 2001 From: "cong.xie" Date: Fri, 6 Feb 2026 15:11:52 -0500 Subject: [PATCH 4/5] refactor: change skip_aggregation_finalization from optional bool to bool in proto --- .../quickwit-proto/protos/quickwit/search.proto | 2 +- .../src/codegen/quickwit/quickwit.search.rs | 4 ++-- quickwit/quickwit-search/src/root.rs | 14 +++++++------- .../src/elasticsearch_api/rest_handler.rs | 2 +- .../quickwit-serve/src/search_api/rest_handler.rs | 2 +- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 93cd0ae2da6..12443855239 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -250,7 +250,7 @@ message SearchRequest { // When true, skip finalization of aggregation results and return // the raw IntermediateAggregationResults bytes instead. - optional bool skip_aggregation_finalization = 19; + bool skip_aggregation_finalization = 19; } enum CountHits { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index 5da02bea06d..191110365f5 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -187,8 +187,8 @@ pub struct SearchRequest { pub ignore_missing_indexes: bool, /// When true, skip finalization of aggregation results and return /// the raw IntermediateAggregationResults bytes instead. - #[prost(bool, optional, tag = "19")] - pub skip_aggregation_finalization: ::core::option::Option, + #[prost(bool, tag = "19")] + pub skip_aggregation_finalization: bool, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 4549b8fcf7d..1a385378aba 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -372,7 +372,7 @@ fn simplify_search_request_for_scroll_api(req: &SearchRequest) -> crate::Result< // to recompute it afterward. count_hits: quickwit_proto::search::CountHits::Underestimate as i32, ignore_missing_indexes: req.ignore_missing_indexes, - skip_aggregation_finalization: None, + skip_aggregation_finalization: false, }) } @@ -1063,7 +1063,7 @@ fn finalize_aggregation_if_any( let Some(aggregations_json) = search_request.aggregation_request.as_ref() else { return Ok(None); }; - if search_request.skip_aggregation_finalization == Some(true) { + if search_request.skip_aggregation_finalization { return Ok(intermediate_aggregation_result_bytes_opt); } let aggregations: QuickwitAggregations = serde_json::from_str(aggregations_json)?; @@ -5310,7 +5310,7 @@ mod tests { fn test_finalize_aggregation_if_any_no_aggregation_request() { let search_request = SearchRequest { aggregation_request: None, - skip_aggregation_finalization: None, + skip_aggregation_finalization: false, ..Default::default() }; let searcher_context = SearcherContext::for_test(); @@ -5326,7 +5326,7 @@ mod tests { let intermediate_bytes = vec![42, 43, 44]; let search_request = SearchRequest { aggregation_request: Some(agg_req.to_string()), - skip_aggregation_finalization: Some(true), + skip_aggregation_finalization: true, ..Default::default() }; let searcher_context = SearcherContext::for_test(); @@ -5344,7 +5344,7 @@ mod tests { let agg_req = r#"{"avg_price": {"avg": {"field": "price"}}}"#; let search_request = SearchRequest { aggregation_request: Some(agg_req.to_string()), - skip_aggregation_finalization: Some(true), + skip_aggregation_finalization: true, ..Default::default() }; let searcher_context = SearcherContext::for_test(); @@ -5359,7 +5359,7 @@ mod tests { let intermediate_bytes = postcard::to_stdvec(&intermediate_results).unwrap(); let search_request = SearchRequest { aggregation_request: Some(agg_req.to_string()), - skip_aggregation_finalization: None, + skip_aggregation_finalization: false, ..Default::default() }; let searcher_context = SearcherContext::for_test(); @@ -5381,7 +5381,7 @@ mod tests { let intermediate_bytes = postcard::to_stdvec(&intermediate_results).unwrap(); let search_request = SearchRequest { aggregation_request: Some(agg_req.to_string()), - skip_aggregation_finalization: Some(false), + skip_aggregation_finalization: false, ..Default::default() }; let searcher_context = SearcherContext::for_test(); diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs index 55d8876ca3b..15663a35cee 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs @@ -412,7 +412,7 @@ fn build_request_for_es_api( search_after, count_hits, ignore_missing_indexes, - skip_aggregation_finalization: None, + skip_aggregation_finalization: false, }, has_doc_id_field, )) diff --git a/quickwit/quickwit-serve/src/search_api/rest_handler.rs b/quickwit/quickwit-serve/src/search_api/rest_handler.rs index c0313379b64..b1400fa12c0 100644 --- a/quickwit/quickwit-serve/src/search_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/search_api/rest_handler.rs @@ -265,7 +265,7 @@ pub fn search_request_from_api_request( search_after: None, count_hits: search_request.count_all.into(), ignore_missing_indexes: false, - skip_aggregation_finalization: None, + skip_aggregation_finalization: false, }; Ok(search_request) } From b4085d573827d9067c9e9246bd2a701d62cdbb11 Mon Sep 17 00:00:00 2001 From: "cong.xie" Date: Fri, 6 Feb 2026 15:25:42 -0500 Subject: [PATCH 5/5] fix(test): use tokio::test for SearcherContext which requires Tokio runtime --- quickwit/quickwit-search/src/root.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 1a385378aba..b5b03882ec0 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -5306,8 +5306,8 @@ mod tests { Ok(()) } - #[test] - fn test_finalize_aggregation_if_any_no_aggregation_request() { + #[tokio::test] + async fn test_finalize_aggregation_if_any_no_aggregation_request() { let search_request = SearchRequest { aggregation_request: None, skip_aggregation_finalization: false, @@ -5320,8 +5320,8 @@ mod tests { assert!(result.is_none()); } - #[test] - fn test_finalize_aggregation_if_any_skip_finalization_returns_intermediate_bytes() { + #[tokio::test] + async fn test_finalize_aggregation_if_any_skip_finalization_returns_intermediate_bytes() { let agg_req = r#"{"avg_price": {"avg": {"field": "price"}}}"#; let intermediate_bytes = vec![42, 43, 44]; let search_request = SearchRequest { @@ -5339,8 +5339,8 @@ mod tests { assert_eq!(result, Some(intermediate_bytes)); } - #[test] - fn test_finalize_aggregation_if_any_skip_finalization_none_bytes() { + #[tokio::test] + async fn test_finalize_aggregation_if_any_skip_finalization_none_bytes() { let agg_req = r#"{"avg_price": {"avg": {"field": "price"}}}"#; let search_request = SearchRequest { aggregation_request: Some(agg_req.to_string()), @@ -5352,8 +5352,8 @@ mod tests { assert!(result.is_none()); } - #[test] - fn test_finalize_aggregation_if_any_default_finalizes() { + #[tokio::test] + async fn test_finalize_aggregation_if_any_default_finalizes() { let agg_req = r#"{"avg_price": {"avg": {"field": "price"}}}"#; let intermediate_results = IntermediateAggregationResults::default(); let intermediate_bytes = postcard::to_stdvec(&intermediate_results).unwrap(); @@ -5374,8 +5374,8 @@ mod tests { assert_ne!(result.unwrap(), intermediate_bytes); } - #[test] - fn test_finalize_aggregation_if_any_false_flag_finalizes() { + #[tokio::test] + async fn test_finalize_aggregation_if_any_false_flag_finalizes() { let agg_req = r#"{"avg_price": {"avg": {"field": "price"}}}"#; let intermediate_results = IntermediateAggregationResults::default(); let intermediate_bytes = postcard::to_stdvec(&intermediate_results).unwrap();