Skip to content

Commit 0ed8335

Browse files
authored
Migrate/es os aggregator methods (open-metadata#24068)
* Setup new client for ES/OS * Migrated createIndex and addIndexAlias methods * Migrated createAliases method to new ES/OS client * Migrated updateIndex and deleteIndex methods * Updated indexExists methods * refactor: extract index management operations into dedicated manager classes * Add tests for ElasticSearch and OpenSearch index managers * chore: fix code style issues" * Added integeration tests for ES/OS index manager * Fix log level * Fixed priority key mapping for test_case_result_index * fix test to use field name in the query as domains.id not domain.id * Migrated createEntity and createEntities method * Fixed failing tests * Set headers so 8.x client can work with 7.17x or higher server for ES * Added OpenSearch compatible stemmer configuration * Fix java code style * Fixed java checkstyle issue * Added support for 7.17.x backward compatibility * Fixed failing tests * Migrated createTimeSeriesEntity and deleteEntity methods * Refactor search client architecture with entity management abstraction * Updated log level to ERROR form WARN * deleteEntity method impl removed from ES client * migrated deleteEntityByFields method to new es/os client * migrated deleteEntityByFQNPrefix method to use new ES/OS client * deleteEntityByFQNPrefix method removed * Migrated deleteByScript method to new ES/OS client * Removed deleteByScript method from SearchClient * Migrated softDeleteOrRestoreEntity method to new ES/OS client * Reverted BulkResponse static import * Migrated softDeleteOrRestoreChildren method to new ES/OS client * Migrated updateEntity method to new ES/OS client * Migrated updateChildren method to new ES/OS client * Removed unused methods * Migrated getDocByID method to new ES/OS client * Added info logging * Fixed parsing issue while passing string doc * Migrated updateEntityRelationship method to new ES/OS client * Migrated reindexWithEntityIds method to new ES/OS client * Added log to show error message for updateEntityRelationship * Refactored entity manager methods * Added async client, fixed null obj parsing issue * Refactoring * Refactored duplicate methods createEntity and createTimeSeriesEntity * Added integration tests for ES/OS entity managers * Updated client availability checks in both index manager classes to use the consistent !isClientAvailable pattern * Removed comment * chore: Fix line formatting in Playwright test files - Adjusted line length in Domains.spec.ts for better readability - Fixed indentation and formatting in Lineage.spec.ts - Added missing newline at end of files * Added new line * Migrated other index related methods and add tests * MIgrated updateByFqnPrefix and updateLineage and deleteByRangeQuery methods * migrated deleteByRangeAndTerm method * Added integration tests * Migrated updateColumnsInUpstreamLineage and deleteColumnsInUpstreamLineage methods * Migrated updateGlossaryTermByFqnPrefix method * Removed unused method and updated error handling * Migrated reindexEntities method to new ES/OS client * Added integration tests for reindexEntities method * Added interface and impl for generic methods migration * added default impl * Migrated deleteDataStream method * Migrated deleteILMPolicy method * Migrated deleteIndexTemplate method * Migrated deleteComponentTemplate emthod * migrated dettachIlmPolicyFromIndexes method * migrated removeILMFromComponentTemplate method * Migrated cluster metric methods * Removed from ESClient/OSClient to GenericManager * Migrated getSearchHealthStatus * Add integration tests * Migrated buildDIChart and fetchDIChartFields methods * Migrated listDataInsightChartResult emthod * Migrated getQueryCostRecords * Migrated getSchemaEntityRelationship * Added data insight aggregator tests * Fixed failing tests * Migrated aggregate method * Migrated genericAggregation and aggregate methods * Fixed failing data insight query * Updated ref for OS aggregation manager * Fixed issues with aggregations migration * Fixed tests * fix aggregate api response * Fixed aggregation response structure
1 parent 2de0800 commit 0ed8335

35 files changed

+2785
-1131
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package org.openmetadata.service.search;
2+
3+
import jakarta.json.JsonObject;
4+
import jakarta.ws.rs.core.Response;
5+
import java.io.IOException;
6+
import org.openmetadata.schema.search.AggregationRequest;
7+
import org.openmetadata.schema.tests.DataQualityReport;
8+
9+
/**
10+
* Interface for search aggregation operations.
11+
* Provides methods for executing aggregation queries against search indices.
12+
*/
13+
public interface AggregationManagementClient {
14+
15+
/**
16+
* Execute an aggregation query based on the provided request.
17+
*
18+
* @param request the aggregation request containing query parameters, field names, and other
19+
* aggregation settings
20+
* @return the response containing aggregation results
21+
* @throws IOException if the aggregation operation fails
22+
*/
23+
Response aggregate(AggregationRequest request) throws IOException;
24+
25+
/**
26+
* Execute a generic aggregation for data quality reporting.
27+
*
28+
* @param query the search query
29+
* @param index the index to search
30+
* @param aggregationMetadata the aggregation metadata
31+
* @return the data quality report
32+
* @throws IOException if the aggregation operation fails
33+
*/
34+
DataQualityReport genericAggregation(
35+
String query, String index, SearchAggregation aggregationMetadata) throws IOException;
36+
37+
/**
38+
* Execute an aggregation query and return aggregation results.
39+
*
40+
* @param query the search query
41+
* @param index the index to search
42+
* @param searchAggregation the search aggregation configuration
43+
* @param filter additional filter query
44+
* @return the aggregation results as JsonObject
45+
* @throws IOException if the aggregation operation fails
46+
*/
47+
JsonObject aggregate(
48+
String query, String index, SearchAggregation searchAggregation, String filter)
49+
throws IOException;
50+
}

openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import static org.openmetadata.service.exception.CatalogExceptionMessage.NOT_IMPLEMENTED_METHOD;
44

5-
import jakarta.json.JsonObject;
65
import jakarta.ws.rs.core.Response;
76
import java.io.IOException;
87
import java.util.Set;
@@ -15,10 +14,8 @@
1514
import org.openmetadata.schema.api.lineage.SearchLineageRequest;
1615
import org.openmetadata.schema.api.lineage.SearchLineageResult;
1716
import org.openmetadata.schema.api.search.SearchSettings;
18-
import org.openmetadata.schema.search.AggregationRequest;
1917
import org.openmetadata.schema.search.SearchRequest;
2018
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
21-
import org.openmetadata.schema.tests.DataQualityReport;
2219
import org.openmetadata.schema.type.EntityReference;
2320
import org.openmetadata.schema.utils.ResultList;
2421
import org.openmetadata.service.exception.CustomExceptionMessage;
@@ -31,6 +28,7 @@ public interface SearchClient<T>
3128
extends IndexManagementClient,
3229
EntityManagementClient,
3330
GenericClient,
31+
AggregationManagementClient,
3432
DataInsightAggregatorClient {
3533
String UPSTREAM_LINEAGE_FIELD = "upstreamLineage";
3634
String UPSTREAM_ENTITY_RELATIONSHIP_FIELD = "upstreamEntityRelationship";
@@ -496,17 +494,8 @@ default ResultList searchPageHierarchy(String query, String pageType, int offset
496494
Response searchByField(String fieldName, String fieldValue, String index, Boolean deleted)
497495
throws IOException;
498496

499-
Response aggregate(AggregationRequest request) throws IOException;
500-
501-
JsonObject aggregate(
502-
String query, String index, SearchAggregation searchAggregation, String filters)
503-
throws IOException;
504-
505497
Response getEntityTypeCounts(SearchRequest request, String index) throws IOException;
506498

507-
DataQualityReport genericAggregation(
508-
String query, String index, SearchAggregation aggregationMetadata) throws IOException;
509-
510499
/* This function takes in Entity Reference, Search for occurances of those entity across ES, and perform an update for that with reindexing the data from the database to ES */
511500
void reindexAcrossIndices(String matchingKey, EntityReference sourceRef);
512501

0 commit comments

Comments
 (0)