Skip to content

Commit 20fe987

Browse files
authored
Handle LocalDateTime for min and max aggregator (opensearch-project#20074)
1 parent 863ac1d commit 20fe987

3 files changed

Lines changed: 26 additions & 4 deletions

File tree

server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,8 @@
6262
import org.opensearch.search.streaming.StreamingCostMetrics;
6363

6464
import java.io.IOException;
65-
import java.util.ArrayList;
65+
import java.time.LocalDateTime;
6666
import java.util.Arrays;
67-
import java.util.List;
6867
import java.util.Map;
6968
import java.util.concurrent.atomic.AtomicReference;
7069
import java.util.function.Function;
@@ -294,6 +293,10 @@ public StreamingCostMetrics getStreamingCostMetrics() {
294293
@Override
295294
public InternalAggregation convertRow(Map<String, Object[]> shardResult, int row, SearchContext searchContext) {
296295
Object[] values = shardResult.get(name);
296+
if (values[row].getClass().equals(LocalDateTime.class)) {
297+
LocalDateTime value = (LocalDateTime) values[row];
298+
return new InternalMax(name, convertLocalDateTimeToEpochMillis(value), formatter, metadata());
299+
}
297300
return new InternalMax(name, ((Number) values[row]).doubleValue(), formatter, metadata());
298301
}
299302
}

server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,7 @@
6262
import org.opensearch.search.streaming.StreamingCostMetrics;
6363

6464
import java.io.IOException;
65-
import java.util.ArrayList;
66-
import java.util.List;
65+
import java.time.LocalDateTime;
6766
import java.util.Map;
6867
import java.util.concurrent.atomic.AtomicReference;
6968
import java.util.function.Function;
@@ -280,6 +279,10 @@ public StarTreeBucketCollector getStarTreeBucketCollector(
280279
@Override
281280
public InternalAggregation convertRow(Map<String, Object[]> shardResult, int row, SearchContext searchContext) {
282281
Object[] values = shardResult.get(name);
282+
if (values[row].getClass().equals(LocalDateTime.class)) {
283+
LocalDateTime value = (LocalDateTime) values[row];
284+
return new InternalMin(name, convertLocalDateTimeToEpochMillis(value), format, metadata());
285+
}
283286
return new InternalMin(name, ((Number) values[row]).doubleValue(), format, metadata());
284287
}
285288

server/src/main/java/org/opensearch/search/aggregations/metrics/NumericMetricsAggregator.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@
3737
import org.opensearch.search.sort.SortOrder;
3838

3939
import java.io.IOException;
40+
import java.time.Instant;
41+
import java.time.LocalDateTime;
42+
import java.time.ZoneOffset;
4043
import java.util.Map;
4144

4245
/**
@@ -64,6 +67,19 @@ protected SingleValue(String name, SearchContext context, Aggregator parent, Map
6467

6568
public abstract double metric(long owningBucketOrd);
6669

70+
/**
71+
* Converts a LocalDateTime value to epoch milliseconds for use in aggregation results.
72+
* The LocalDateTime is treated as UTC to preserve the exact date-time values
73+
* without any timezone conversion.
74+
*
75+
* @param value the LocalDateTime value to convert
76+
* @return the epoch milliseconds representation of the LocalDateTime treated as UTC
77+
*/
78+
protected static double convertLocalDateTimeToEpochMillis(LocalDateTime value) {
79+
Instant instant = value.atZone(ZoneOffset.UTC).toInstant();
80+
return instant.toEpochMilli();
81+
}
82+
6783
@Override
6884
public BucketComparator bucketComparator(String key, SortOrder order) {
6985
if (key != null && false == "value".equals(key)) {

0 commit comments

Comments
 (0)