From c07011a9e5f7bb3975523df5810c3515a5c1f610 Mon Sep 17 00:00:00 2001 From: Arnoldas Seputis Date: Thu, 15 Jan 2026 13:08:41 +0100 Subject: [PATCH] [JAY-732] Add bucket_selector aggregation to QueryBuilder This change adds support for Elasticsearch's `bucket_selector` pipeline aggregation to the QueryBuilder DSL. The new aggregation allows filtering buckets based on computed metrics (e.g. retaining only those buckets where a sum or average exceeds a threshold), a capability not previously exposed through the DSL. --- CHANGELOG.md | 8 +- .../elasticsearch/aggregations.rst | 75 ++++++++ .../query_builder/aggregations.rb | 11 ++ .../aggregations/bucket_selector.rb | 67 +++++++ .../aggregations/bucket_selector_spec.rb | 164 ++++++++++++++++++ .../query_builder/aggregations_spec.rb | 66 +++++++ 6 files changed, 390 insertions(+), 1 deletion(-) create mode 100644 lib/jay_api/elasticsearch/query_builder/aggregations/bucket_selector.rb create mode 100644 spec/jay_api/elasticsearch/query_builder/aggregations/bucket_selector_spec.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a89e69..a33e1d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,12 @@ Please mark backwards incompatible changes with an exclamation mark at the start ## [Unreleased] +### Added +- Support for the `bucket_selector` pipeline aggregation in + `Elasticsearch::QueryBuilder::Aggregations`. This allows filtering + buckets based on computed metrics (e.g., filtering terms buckets by + aggregated values). + ## [29.3.1] - 2025-12-15 ### Fixed @@ -35,7 +41,7 @@ Please mark backwards incompatible changes with an exclamation mark at the start allows boolean clauses to be nested. - `QueryBuilder#sort` can now receive either the direction of the sorting (`asc` or `desc`) or a `Hash` with advanced sorting options. These are relayed - directly to Elasticsearch. + directly to Elasticsearch. ## [29.0.0] - 2025-08-28 diff --git a/documentation/source/user_guidelines/elasticsearch/aggregations.rst b/documentation/source/user_guidelines/elasticsearch/aggregations.rst index bd75b4c..e0f0d61 100644 --- a/documentation/source/user_guidelines/elasticsearch/aggregations.rst +++ b/documentation/source/user_guidelines/elasticsearch/aggregations.rst @@ -377,6 +377,79 @@ and ``brand.name`` in the index. The buckets will only say how many documents (``doc_count``) exist for each combination. Nested aggregations could be added to get other information out of the documents in each bucket. +bucket_selector +--------------- + +This is a pipeline aggregation that can select (or filter out, depending on how +you see it) some of the buckets produced by a multi-bucket aggregation. + +Detailed information on how to use this aggregation can be found on +`Elasticsearch's documentation on the Bucket Selector aggregation`_ + +Code example: + +.. code-block:: ruby + + query_builder = JayAPI::Elasticsearch::QueryBuilder.new + query_builder.size(0) + query_builder.aggregations.date_histogram('sales_per_month', field: 'date', calendar_interval: 'month').aggs do |aggs| + aggs.sum('total_sales', field: 'price') + aggs.bucket_selector( + 'sales_bucket_filter', buckets_path: { totalSales: 'total_sales' }, + script: JayAPI::Elasticsearch::QueryBuilder::Script.new(source: 'params.totalSales > 200') + ) + end + +This would generate the following query: + +.. code-block:: json + + { + "size": 0, + "query": { + "match_all": {} + }, + "aggs": { + "sales_per_month": { + "date_histogram": { + "field": "date", + "calendar_interval": "month" + }, + "aggs": { + "total_sales": { + "sum": { + "field": "price" + } + }, + "sales_bucket_filter": { + "bucket_selector": { + "buckets_path": { + "totalSales": "total_sales" + }, + "script": { + "source": "params.totalSales > 200", + "lang": "painless" + } + } + } + } + } + } + } + +This query tells Elasticsearch to create a Date Histogram divided by month. +In each of the buckets of the histogram it uses a Sum aggregation to calculate +the total sales amount for that month, finally the bucket_selector aggregation +picks only the buckets that have ``total_sales`` greater than 200. + +Note that the Bucket Selector aggregation is a sibling of the ``sum`` +aggregation and **NOT** a nested aggregation, which ``sum`` cannot have. + +Also, note that the ``buckets_path`` expression is just ``total_sales``. This +works because ``sum`` is a single-value aggregation. The syntax would need to +be different if the filtering was happening on a multi-bucket / multi-value +aggregation. Please see `Elasticsearch's documentation for buckets_path`_. + .. _`Elasticsearch's documentation on the Terms aggregation`: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-terms-aggregation.html .. _`Elasticsearch's documentation on the Avg aggregation`: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-avg-aggregation.html .. _`Elasticsearch's documentation on the Sum aggregation`: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-sum-aggregation.html @@ -386,4 +459,6 @@ to get other information out of the documents in each bucket. .. _`Elasticsearch's documentation on the Date Histogram aggregation`: https://www.elastic.co/docs/reference/aggregations/search-aggregations-bucket-datehistogram-aggregation .. _`Elasticsearch's documentation on the Scripted Metric aggregation`: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-scripted-metric-aggregation.html .. _`Elasticsearch's documentation on the Composite aggregation`: https://www.elastic.co/docs/reference/aggregations/search-aggregations-bucket-composite-aggregation +.. _`Elasticsearch's documentation on the Bucket Selector aggregation`: https://www.elastic.co/docs/reference/aggregations/search-aggregations-pipeline-bucket-selector-aggregation +.. _`Elasticsearch's documentation for buckets_path`: https://www.elastic.co/docs/reference/aggregations/pipeline#buckets-path-syntax .. _`Painless`: https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting-painless.html diff --git a/lib/jay_api/elasticsearch/query_builder/aggregations.rb b/lib/jay_api/elasticsearch/query_builder/aggregations.rb index d6d192b..0af2c6c 100644 --- a/lib/jay_api/elasticsearch/query_builder/aggregations.rb +++ b/lib/jay_api/elasticsearch/query_builder/aggregations.rb @@ -9,6 +9,7 @@ require_relative 'aggregations/date_histogram' require_relative 'aggregations/filter' require_relative 'aggregations/scripted_metric' +require_relative 'aggregations/bucket_selector' require_relative 'aggregations/sum' require_relative 'aggregations/max' require_relative 'aggregations/terms' @@ -86,6 +87,16 @@ def scripted_metric(name, map_script:, combine_script:, reduce_script:, init_scr ) end + # Adds an +bucket_selector+ type aggregation. For information about the parameters + # @see JayAPI::Elasticsearch::QueryBuilder::Aggregations::BucketSelector#initialize + def bucket_selector(name, buckets_path:, script:, gap_policy: nil) + add( + ::JayAPI::Elasticsearch::QueryBuilder::Aggregations::BucketSelector.new( + name, buckets_path:, script:, gap_policy: + ) + ) + end + # Adds a +max+ type aggregation. For information about the parameters # @see JayAPI::Elasticsearch::QueryBuilder::Aggregations::Max#initialize def max(name, field:) diff --git a/lib/jay_api/elasticsearch/query_builder/aggregations/bucket_selector.rb b/lib/jay_api/elasticsearch/query_builder/aggregations/bucket_selector.rb new file mode 100644 index 0000000..079305f --- /dev/null +++ b/lib/jay_api/elasticsearch/query_builder/aggregations/bucket_selector.rb @@ -0,0 +1,67 @@ +# frozen_string_literal: true + +require_relative 'aggregation' + +module JayAPI + module Elasticsearch + class QueryBuilder + class Aggregations + # Represents a +bucket_selector+ pipeline aggregation in Elasticsearch. + # Docs: + # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-pipeline-bucket-selector-aggregation.html + class BucketSelector < ::JayAPI::Elasticsearch::QueryBuilder::Aggregations::Aggregation + attr_reader :buckets_path, :script, :gap_policy + + # @param [String] name The name used by Elasticsearch to identify the + # aggregation. + # @param [Hash] buckets_path Path(s) to the metric or metrics + # over which the bucket_selector aggregation's script will operate. + # The keys are the names of the script variables, the values the + # paths to the metrics (relative to the parent aggregation). + # The script will receive these variables in its +params+. + # @param [JayAPI::Elasticsearch::QueryBuilder::Script] script + # Script used to decide whether to keep each bucket. + # @param [String, nil] gap_policy Optional gap policy (e.g. "skip", + # "insert_zeros"). + def initialize(name, buckets_path:, script:, gap_policy: nil) + super(name) + + @buckets_path = buckets_path + @script = script + @gap_policy = gap_policy + end + + # Bucket selector is a pipeline agg and cannot have nested aggregations. + # @raise [JayAPI::Elasticsearch::QueryBuilder::Aggregations::Errors::AggregationsError] + def aggs + no_nested_aggregations('Bucket Selector') + end + + # @return [self] A copy of the receiver. + def clone + self.class.new( + name, + buckets_path: buckets_path.is_a?(Hash) ? buckets_path.dup : buckets_path, + script:, # Script is immutable-ish, ok to reuse + gap_policy: + ) + end + + # @return [Hash] The Hash representation of the +Aggregation+. + # Properly formatted for Elasticsearch. + def to_h + super do + { + bucket_selector: { + buckets_path: buckets_path, + script: script.to_h, + gap_policy: gap_policy + }.compact + } + end + end + end + end + end + end +end diff --git a/spec/jay_api/elasticsearch/query_builder/aggregations/bucket_selector_spec.rb b/spec/jay_api/elasticsearch/query_builder/aggregations/bucket_selector_spec.rb new file mode 100644 index 0000000..4d44fc0 --- /dev/null +++ b/spec/jay_api/elasticsearch/query_builder/aggregations/bucket_selector_spec.rb @@ -0,0 +1,164 @@ +# frozen_string_literal: true + +require 'jay_api/elasticsearch/query_builder/aggregations/bucket_selector' +require 'jay_api/elasticsearch/query_builder/script' + +require_relative 'aggregation_shared' + +RSpec.describe JayAPI::Elasticsearch::QueryBuilder::Aggregations::BucketSelector do + subject(:bucket_selector) { described_class.new(name, **constructor_params) } + + let(:name) { 'expensive_genres' } + + let(:buckets_path) do + { 'avgPrice' => 'avg_price' } + end + + let(:script) do + instance_double( + JayAPI::Elasticsearch::QueryBuilder::Script, + to_h: { + source: 'params.avgPrice > params.threshold', + lang: 'painless', + params: { threshold: 10 } + } + ) + end + + let(:constructor_params) do + { + buckets_path: buckets_path, + script: script + } + end + + describe '#aggs' do + subject(:method_call) { bucket_selector.aggs } + + let(:expected_message) { 'The Bucket Selector aggregation cannot have nested aggregations.' } + + it_behaves_like 'JayAPI::Elasticsearch::QueryBuilder::Aggregations::Aggregation#no_nested_aggregations' + end + + describe '#clone' do + subject(:method_call) { bucket_selector.clone } + + it 'returns an instance of the same class' do + expect(method_call).to be_an_instance_of(described_class) + end + + it 'does not return the same object' do + expect(method_call).not_to be(bucket_selector) + end + + it "has the same 'name'" do + expect(method_call.name).to be(bucket_selector.name) + end + + context "when 'buckets_path' is a String" do + let(:buckets_path) { 'avg_price' } + + it "has the same 'buckets_path' (same object is fine)" do + expect(method_call.buckets_path).to be(bucket_selector.buckets_path).and eq('avg_price') + end + end + + context "when 'buckets_path' is a Hash" do + let(:buckets_path) do + { 'avgPrice' => 'avg_price' } + end + + it "has an equal 'buckets_path' but not the same object" do + expect(method_call.buckets_path).to eq(bucket_selector.buckets_path) + expect(method_call.buckets_path).not_to be(bucket_selector.buckets_path) + end + end + + it "has the same 'script'" do + expect(method_call.script).to be(bucket_selector.script).and be(script) + end + + context "when no 'gap_policy' is given" do + it "has the same 'gap_policy' (nil)" do + expect(method_call.gap_policy).to be(bucket_selector.gap_policy).and be_nil + end + end + + context "when a 'gap_policy' is given" do + let(:constructor_params) do + { + buckets_path: buckets_path, + script: script, + gap_policy: 'skip' + } + end + + it "has the same 'gap_policy'" do + expect(method_call.gap_policy).to be(bucket_selector.gap_policy).and eq('skip') + end + end + end + + describe '#to_h' do + subject(:method_call) { aggregation.to_h } + + let(:aggregation) { bucket_selector } + + it_behaves_like 'JayAPI::Elasticsearch::QueryBuilder::Aggregations::Aggregation#to_h' + + context "when no 'gap_policy' is given" do + let(:expected_hash) do + { + 'expensive_genres' => { + bucket_selector: { + buckets_path: { + 'avgPrice' => 'avg_price' + }, + script: { + source: 'params.avgPrice > params.threshold', + lang: 'painless', + params: { threshold: 10 } + } + } + } + } + end + + it 'returns the expected Hash' do + expect(method_call).to eq(expected_hash) + end + end + + context "when a 'gap_policy' is given" do + let(:constructor_params) do + { + buckets_path: buckets_path, + script: script, + gap_policy: 'skip' + } + end + + let(:expected_hash) do + { + 'expensive_genres' => { + bucket_selector: { + buckets_path: { + 'avgPrice' => 'avg_price' + }, + script: { + source: 'params.avgPrice > params.threshold', + lang: 'painless', + params: { threshold: 10 } + }, + gap_policy: 'skip' + } + } + } + end + + it 'returns the expected Hash (including the given gap_policy)' do + expect(method_call).to eq(expected_hash) + end + end + end +end diff --git a/spec/jay_api/elasticsearch/query_builder/aggregations_spec.rb b/spec/jay_api/elasticsearch/query_builder/aggregations_spec.rb index 8978df9..4e9c992 100644 --- a/spec/jay_api/elasticsearch/query_builder/aggregations_spec.rb +++ b/spec/jay_api/elasticsearch/query_builder/aggregations_spec.rb @@ -505,6 +505,72 @@ end end + describe '#bucket_selector' do + subject(:method_call) do + aggregations.bucket_selector(name, **method_params) + end + + let(:method_params) do + { buckets_path:, script: } + end + + let(:name) { 'only_slow_tests' } + let(:buckets_path) { { total: 'total_duration_ms' } } + + let(:script) do + instance_double( + JayAPI::Elasticsearch::QueryBuilder::Script + ) + end + + let(:bucket_selector) do + instance_double( + JayAPI::Elasticsearch::QueryBuilder::Aggregations::BucketSelector, + to_h: { 'bucket_selector' => { '#to_h' => {} } } + ) + end + + before do + allow(JayAPI::Elasticsearch::QueryBuilder::Aggregations::BucketSelector) + .to receive(:new).and_return(bucket_selector) + end + + shared_examples_for '#bucket_selector when no gap_policy is given' do + it 'creates the BucketSelector instance with the expected parameters' do + expect(JayAPI::Elasticsearch::QueryBuilder::Aggregations::BucketSelector) + .to receive(:new).with(name, buckets_path:, script:, gap_policy: nil) + + method_call + end + end + + context 'when no gap_policy is given' do + it_behaves_like '#bucket_selector when no gap_policy is given' + end + + context 'when gap_policy is given as nil' do + let(:method_params) { super().merge(gap_policy: nil) } + + it_behaves_like '#bucket_selector when no gap_policy is given' + end + + context 'when a gap_policy is provided' do + let(:method_params) { super().merge(gap_policy: 'skip') } + + it 'creates the BucketSelector instance with the expected parameters' do + expect(JayAPI::Elasticsearch::QueryBuilder::Aggregations::BucketSelector) + .to receive(:new).with(name, buckets_path:, script:, gap_policy: 'skip') + + method_call + end + end + + it 'adds the BucketSelector instance to the array of aggregations' do + expect { method_call }.to change(aggregations, :to_h) + .to(aggs: { 'bucket_selector' => { '#to_h' => {} } }) + end + end + describe '#to_h' do subject(:method_call) { aggregations.to_h }