diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 04fa0cedf2e..12443855239 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -247,6 +247,10 @@ message SearchRequest { // When an exact index ID is provided (not a pattern), the query fails only if // that index is not found and this parameter is set to `false`. bool ignore_missing_indexes = 18; + + // When true, skip finalization of aggregation results and return + // the raw IntermediateAggregationResults bytes instead. + bool skip_aggregation_finalization = 19; } enum CountHits { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index e1201ce7a0e..191110365f5 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -185,6 +185,10 @@ pub struct SearchRequest { /// that index is not found and this parameter is set to `false`. #[prost(bool, tag = "18")] pub ignore_missing_indexes: bool, + /// When true, skip finalization of aggregation results and return + /// the raw IntermediateAggregationResults bytes instead. + #[prost(bool, tag = "19")] + pub skip_aggregation_finalization: bool, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 581665fa14f..b5b03882ec0 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -372,6 +372,7 @@ fn simplify_search_request_for_scroll_api(req: &SearchRequest) -> crate::Result< // to recompute it afterward. count_hits: quickwit_proto::search::CountHits::Underestimate as i32, ignore_missing_indexes: req.ignore_missing_indexes, + skip_aggregation_finalization: false, }) } @@ -1062,6 +1063,9 @@ fn finalize_aggregation_if_any( let Some(aggregations_json) = search_request.aggregation_request.as_ref() else { return Ok(None); }; + if search_request.skip_aggregation_finalization { + return Ok(intermediate_aggregation_result_bytes_opt); + } let aggregations: QuickwitAggregations = serde_json::from_str(aggregations_json)?; let aggregation_result_postcard = finalize_aggregation( intermediate_aggregation_result_bytes_opt, @@ -5301,4 +5305,93 @@ mod tests { assert!(matches!(search_error, SearchError::InvalidArgument { .. })); Ok(()) } + + #[tokio::test] + async fn test_finalize_aggregation_if_any_no_aggregation_request() { + let search_request = SearchRequest { + aggregation_request: None, + skip_aggregation_finalization: false, + ..Default::default() + }; + let searcher_context = SearcherContext::for_test(); + let result = + finalize_aggregation_if_any(&search_request, Some(vec![1, 2, 3]), &searcher_context) + .unwrap(); + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_finalize_aggregation_if_any_skip_finalization_returns_intermediate_bytes() { + let agg_req = r#"{"avg_price": {"avg": {"field": "price"}}}"#; + let intermediate_bytes = vec![42, 43, 44]; + let search_request = SearchRequest { + aggregation_request: Some(agg_req.to_string()), + skip_aggregation_finalization: true, + ..Default::default() + }; + let searcher_context = SearcherContext::for_test(); + let result = finalize_aggregation_if_any( + &search_request, + Some(intermediate_bytes.clone()), + &searcher_context, + ) + .unwrap(); + assert_eq!(result, Some(intermediate_bytes)); + } + + #[tokio::test] + async fn test_finalize_aggregation_if_any_skip_finalization_none_bytes() { + let agg_req = r#"{"avg_price": {"avg": {"field": "price"}}}"#; + let search_request = SearchRequest { + aggregation_request: Some(agg_req.to_string()), + skip_aggregation_finalization: true, + ..Default::default() + }; + let searcher_context = SearcherContext::for_test(); + let result = finalize_aggregation_if_any(&search_request, None, &searcher_context).unwrap(); + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_finalize_aggregation_if_any_default_finalizes() { + let agg_req = r#"{"avg_price": {"avg": {"field": "price"}}}"#; + let intermediate_results = IntermediateAggregationResults::default(); + let intermediate_bytes = postcard::to_stdvec(&intermediate_results).unwrap(); + let search_request = SearchRequest { + aggregation_request: Some(agg_req.to_string()), + skip_aggregation_finalization: false, + ..Default::default() + }; + let searcher_context = SearcherContext::for_test(); + let result = finalize_aggregation_if_any( + &search_request, + Some(intermediate_bytes.clone()), + &searcher_context, + ) + .unwrap(); + // Result should be Some (finalized), but different from intermediate bytes + assert!(result.is_some()); + assert_ne!(result.unwrap(), intermediate_bytes); + } + + #[tokio::test] + async fn test_finalize_aggregation_if_any_false_flag_finalizes() { + let agg_req = r#"{"avg_price": {"avg": {"field": "price"}}}"#; + let intermediate_results = IntermediateAggregationResults::default(); + let intermediate_bytes = postcard::to_stdvec(&intermediate_results).unwrap(); + let search_request = SearchRequest { + aggregation_request: Some(agg_req.to_string()), + skip_aggregation_finalization: false, + ..Default::default() + }; + let searcher_context = SearcherContext::for_test(); + let result = finalize_aggregation_if_any( + &search_request, + Some(intermediate_bytes.clone()), + &searcher_context, + ) + .unwrap(); + assert!(result.is_some()); + assert_ne!(result.unwrap(), intermediate_bytes); + } } diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs index db14c37700a..15663a35cee 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs @@ -412,6 +412,7 @@ fn build_request_for_es_api( search_after, count_hits, ignore_missing_indexes, + skip_aggregation_finalization: false, }, has_doc_id_field, )) diff --git a/quickwit/quickwit-serve/src/search_api/rest_handler.rs b/quickwit/quickwit-serve/src/search_api/rest_handler.rs index cfdf46c61ef..b1400fa12c0 100644 --- a/quickwit/quickwit-serve/src/search_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/search_api/rest_handler.rs @@ -265,6 +265,7 @@ pub fn search_request_from_api_request( search_after: None, count_hits: search_request.count_all.into(), ignore_missing_indexes: false, + skip_aggregation_finalization: false, }; Ok(search_request) }