From a36a010ad4e569a6cfbc49213390cd7241acfdc3 Mon Sep 17 00:00:00 2001 From: rohitkumar Date: Wed, 29 Jan 2025 17:33:14 -0800 Subject: [PATCH 1/3] Adding table properties to signal replication run state --- .../openhouse/jobs/client/TablesClient.java | 8 ++ .../openhouse/jobs/util/AppConstants.java | 1 + .../jobs/util/ReplicationConfig.java | 1 + .../impl/InternalRepositoryUtils.java | 1 + .../impl/OpenHouseInternalRepositoryImpl.java | 30 ++++++++ .../tables/e2e/h2/RepositoryTest.java | 77 +++++++++++++++++++ 6 files changed, 118 insertions(+) diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java index 0ea533ade..f324b9c8f 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java @@ -2,6 +2,7 @@ import com.linkedin.openhouse.datalayout.persistence.StrategiesDaoTableProps; import com.linkedin.openhouse.datalayout.strategy.DataLayoutStrategy; +import com.linkedin.openhouse.jobs.util.AppConstants; import com.linkedin.openhouse.jobs.util.DatabaseTableFilter; import com.linkedin.openhouse.jobs.util.DirectoryMetadata; import com.linkedin.openhouse.jobs.util.ReplicationConfig; @@ -113,11 +114,18 @@ private Optional> getTableReplication(GetTableResponseBo .cluster(rc.getDestination()) .tableOwner(response.getTableCreator()) .schedule(rc.getCronSchedule()) + .enableSetup( + getTableProperty( + AppConstants.REPLICATION_SETUP_KEY, response.getTableProperties())) .build())); // since replicationConfigList is initialized, it cannot be null. return Optional.of(replicationConfigList); } + protected String getTableProperty(String propertyName, Map tblProperties) { + return tblProperties.getOrDefault(propertyName, null); + } + protected GetTableResponseBody getTable(TableMetadata tableMetadata) { return getTable(tableMetadata.getDbName(), tableMetadata.getTableName()); } diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/AppConstants.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/AppConstants.java index 8185d6609..e4f803ab0 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/AppConstants.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/AppConstants.java @@ -47,6 +47,7 @@ public final class AppConstants { public static final String JOB_ID = "job_id"; public static final String QUEUED_TIME = "queued_time"; public static final String DATABASE_NAME = "database_name"; + public static final String REPLICATION_SETUP_KEY = "replication.enableSetup"; private AppConstants() {} } diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/ReplicationConfig.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/ReplicationConfig.java index 15c6094a0..771304888 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/ReplicationConfig.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/ReplicationConfig.java @@ -14,4 +14,5 @@ public class ReplicationConfig { private final String schedule; private final String tableOwner; private final String cluster; + private final String enableSetup; } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java index 223faf1c9..249679846 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java @@ -26,6 +26,7 @@ public final class InternalRepositoryUtils { protected static final String POLICIES_KEY = "policies"; + protected static final String REPLICATION_SETUP_KEY = "replication.enableSetup"; private static final Set EXCLUDE_PROPERTIES_LIST = new HashSet<>(Collections.singletonList(POLICIES_KEY)); diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java index ef04d9e28..6afae3d64 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java @@ -18,6 +18,7 @@ import com.linkedin.openhouse.common.schema.IcebergSchemaHelper; import com.linkedin.openhouse.internal.catalog.SnapshotsUtil; import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager; +import com.linkedin.openhouse.tables.api.spec.v0.request.components.Policies; import com.linkedin.openhouse.tables.common.TableType; import com.linkedin.openhouse.tables.dto.mapper.TablesMapper; import com.linkedin.openhouse.tables.dto.mapper.iceberg.PartitionSpecMapper; @@ -31,6 +32,7 @@ import io.micrometer.core.instrument.MeterRegistry; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -136,10 +138,16 @@ public TableDto save(TableDto tableDto) { doUpdateSchemaIfNeeded(transaction, writeSchema, table.schema(), tableDto); UpdateProperties updateProperties = transaction.updateProperties(); + // check if replicationPolicy has any change. update property replication.setupNeeded + // accordingly + if (checkIfReplicationPolicyUpdated(table.properties(), tableDto.getPolicies())) { + updateProperties.set(REPLICATION_SETUP_KEY, "true"); + } boolean propsUpdated = doUpdateUserPropsIfNeeded(updateProperties, tableDto, table); boolean snapshotsUpdated = doUpdateSnapshotsIfNeeded(updateProperties, tableDto); boolean policiesUpdated = doUpdatePoliciesIfNeeded(updateProperties, tableDto, table.properties()); + // TODO remove tableTypeAdded after all existing tables have been back-filled to have a // tableType boolean tableTypeAdded = checkIfTableTypeAdded(updateProperties, table.properties()); @@ -160,6 +168,25 @@ public TableDto save(TableDto tableDto) { table, fileIOManager, partitionSpecMapper, policiesMapper, tableTypeMapper); } + private boolean checkIfReplicationPolicyUpdated( + Map existingTableProps, Policies policyFromTableDTO) { + String existingPolicies = existingTableProps.getOrDefault(POLICIES_KEY, ""); + // If both are empty or null, no update + if (existingPolicies.isEmpty() && policyFromTableDTO == null) { + return false; + } + // If existing policies exist and policyFromTableDTO is not null, compare replication + if (!existingPolicies.isEmpty() && policyFromTableDTO != null) { + Policies existingPoliciesObj = + new GsonBuilder().create().fromJson(existingPolicies, Policies.class); + return !Objects.equals( + existingPoliciesObj.getReplication(), policyFromTableDTO.getReplication()); + } + // If existing policies are empty but policyFromTableDTO is not null, update needed if + // replication is set + return existingPolicies.isEmpty() && policyFromTableDTO.getReplication() != null; + } + private boolean skipEligibilityCheck( Map existingTableProps, Map newTableprops) { TableType existingTableType = @@ -298,6 +325,9 @@ private Map computePropsForTableCreation(TableDto tableDto) { // Populate policies String policiesString = policiesMapper.toPoliciesJsonString(tableDto); propertiesMap.put(InternalRepositoryUtils.POLICIES_KEY, policiesString); + if (tableDto.getPolicies() != null && tableDto.getPolicies().getReplication() != null) { + propertiesMap.put(REPLICATION_SETUP_KEY, "true"); + } if (!CollectionUtils.isEmpty(tableDto.getJsonSnapshots())) { meterRegistry.counter(MetricsConstant.REPO_TABLE_CREATED_WITH_DATA_CTR).increment(); diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTest.java index 71ffcb8e6..c509566ee 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTest.java @@ -13,6 +13,7 @@ import com.linkedin.openhouse.internal.catalog.model.HouseTablePrimaryKey; import com.linkedin.openhouse.internal.catalog.repository.HouseTableRepository; import com.linkedin.openhouse.tables.api.spec.v0.request.components.ClusteringColumn; +import com.linkedin.openhouse.tables.api.spec.v0.request.components.ReplicationConfig; import com.linkedin.openhouse.tables.api.spec.v0.request.components.TimePartitionSpec; import com.linkedin.openhouse.tables.common.TableType; import com.linkedin.openhouse.tables.model.TableDto; @@ -24,6 +25,7 @@ import com.linkedin.openhouse.tables.repository.impl.InternalRepositoryUtils; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -442,6 +444,81 @@ public void testCreateTableWithTableTypeProperty() { } } + @Test + public void testCreateTableWithReplicationProperty() { + String replicationKey = "replication.enableSetup"; + TableDto tableDTO = + TABLE_DTO + .toBuilder() + .policies(TABLE_POLICIES.toBuilder().replication(null).build()) + .tableVersion(INITIAL_TABLE_VERSION) + .build(); + + TableDto createdDTO = openHouseInternalRepository.save(tableDTO); + Assertions.assertFalse(createdDTO.getTableProperties().containsKey(replicationKey)); + + TableDtoPrimaryKey primaryKey = + TableDtoPrimaryKey.builder() + .tableId(TABLE_DTO.getTableId()) + .databaseId(TABLE_DTO.getDatabaseId()) + .build(); + openHouseInternalRepository.deleteById(primaryKey); + + // create table with some replication config and assert that tblProperty has key + TableDto tableDTOWithReplicationPolicy = + TABLE_DTO.toBuilder().policies(TABLE_POLICIES).tableVersion(INITIAL_TABLE_VERSION).build(); + + TableDto createdDTOWithReplicationPolicy = + openHouseInternalRepository.save(tableDTOWithReplicationPolicy); + Assertions.assertTrue(createdDTOWithReplicationPolicy.getTableProperties().containsKey()); + Assertions.assertTrue( + Boolean.parseBoolean( + createdDTOWithReplicationPolicy.getTableProperties().get(replicationKey))); + + Map modifiedProperties = + new HashMap<>(createdDTOWithReplicationPolicy.getTableProperties()); + modifiedProperties.put(replicationKey, "false"); + + // update tblProperty, setting to false + TableDto tableDTOWithTblProperties = + createdDTOWithReplicationPolicy + .toBuilder() + .tableType(TableType.PRIMARY_TABLE) + .tableVersion(createdDTOWithReplicationPolicy.getTableLocation()) + .tableProperties(modifiedProperties) + .build(); + + TableDto createdDTOWithTblProps = openHouseInternalRepository.save(tableDTOWithTblProperties); + Assertions.assertTrue(createdDTOWithTblProps.getTableProperties().containsKey(replicationKey)); + Assertions.assertFalse( + Boolean.parseBoolean(createdDTOWithTblProps.getTableProperties().get(replicationKey))); + + // Update replication policy and assert that tblProperty values is set back to true + ArrayList configs = new ArrayList<>(); + configs.add(ReplicationConfig.builder().destination("CLUSTER1").interval("15H").build()); + TableDto tableDTOWithUpdatedReplicationPolicy = + createdDTOWithTblProps + .toBuilder() + .tableVersion(createdDTOWithTblProps.getTableLocation()) + .policies( + TABLE_POLICIES + .toBuilder() + .replication(REPLICATION_POLICY.toBuilder().config(configs).build()) + .build()) + .build(); + + TableDto createdDTOWithUpdatedReplicationPolicy = + openHouseInternalRepository.save(tableDTOWithUpdatedReplicationPolicy); + Assertions.assertTrue( + createdDTOWithUpdatedReplicationPolicy.getTableProperties().containsKey("replicationKey")); + Assertions.assertTrue( + Boolean.parseBoolean( + createdDTOWithUpdatedReplicationPolicy.getTableProperties().get(replicationKey))); + + openHouseInternalRepository.deleteById(primaryKey); + Assertions.assertFalse(openHouseInternalRepository.existsById(primaryKey)); + } + @Test void testSchemaEvolutionBasic() { Schema oldSchema = From 4cc80e6e3220f86021e44ff35d40a5aa4f26b029 Mon Sep 17 00:00:00 2001 From: rohitkumar Date: Wed, 29 Jan 2025 18:55:58 -0800 Subject: [PATCH 2/3] Making enableSetup field boolean and improved test --- .../openhouse/jobs/client/TablesClient.java | 6 ++-- .../jobs/util/ReplicationConfig.java | 2 +- .../impl/OpenHouseInternalRepositoryImpl.java | 4 +-- .../tables/e2e/h2/RepositoryTest.java | 29 +++++++++++++++---- 4 files changed, 31 insertions(+), 10 deletions(-) diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java index f324b9c8f..c8992f38a 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java @@ -115,8 +115,10 @@ private Optional> getTableReplication(GetTableResponseBo .tableOwner(response.getTableCreator()) .schedule(rc.getCronSchedule()) .enableSetup( - getTableProperty( - AppConstants.REPLICATION_SETUP_KEY, response.getTableProperties())) + Boolean.parseBoolean( + response + .getTableProperties() + .getOrDefault(AppConstants.REPLICATION_SETUP_KEY, null))) .build())); // since replicationConfigList is initialized, it cannot be null. return Optional.of(replicationConfigList); diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/ReplicationConfig.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/ReplicationConfig.java index 771304888..30cd6eb26 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/ReplicationConfig.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/ReplicationConfig.java @@ -14,5 +14,5 @@ public class ReplicationConfig { private final String schedule; private final String tableOwner; private final String cluster; - private final String enableSetup; + private final boolean enableSetup; } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java index 6afae3d64..06e65e1b2 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java @@ -141,7 +141,7 @@ public TableDto save(TableDto tableDto) { // check if replicationPolicy has any change. update property replication.setupNeeded // accordingly if (checkIfReplicationPolicyUpdated(table.properties(), tableDto.getPolicies())) { - updateProperties.set(REPLICATION_SETUP_KEY, "true"); + updateProperties.set(REPLICATION_SETUP_KEY, Boolean.TRUE.toString()); } boolean propsUpdated = doUpdateUserPropsIfNeeded(updateProperties, tableDto, table); boolean snapshotsUpdated = doUpdateSnapshotsIfNeeded(updateProperties, tableDto); @@ -326,7 +326,7 @@ private Map computePropsForTableCreation(TableDto tableDto) { String policiesString = policiesMapper.toPoliciesJsonString(tableDto); propertiesMap.put(InternalRepositoryUtils.POLICIES_KEY, policiesString); if (tableDto.getPolicies() != null && tableDto.getPolicies().getReplication() != null) { - propertiesMap.put(REPLICATION_SETUP_KEY, "true"); + propertiesMap.put(REPLICATION_SETUP_KEY, Boolean.TRUE.toString()); } if (!CollectionUtils.isEmpty(tableDto.getJsonSnapshots())) { diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTest.java index c509566ee..862a27377 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTest.java @@ -470,7 +470,8 @@ public void testCreateTableWithReplicationProperty() { TableDto createdDTOWithReplicationPolicy = openHouseInternalRepository.save(tableDTOWithReplicationPolicy); - Assertions.assertTrue(createdDTOWithReplicationPolicy.getTableProperties().containsKey()); + Assertions.assertTrue( + createdDTOWithReplicationPolicy.getTableProperties().containsKey(replicationKey)); Assertions.assertTrue( Boolean.parseBoolean( createdDTOWithReplicationPolicy.getTableProperties().get(replicationKey))); @@ -493,13 +494,31 @@ public void testCreateTableWithReplicationProperty() { Assertions.assertFalse( Boolean.parseBoolean(createdDTOWithTblProps.getTableProperties().get(replicationKey))); - // Update replication policy and assert that tblProperty values is set back to true + // Update replication policy to different values and assert that tblProperty values is still set + // to False + TableDto tableDTOWithUpdatedRetentionPolicy = + createdDTOWithTblProps + .toBuilder() + .tableVersion(createdDTOWithTblProps.getTableLocation()) + .policies(TABLE_POLICIES.toBuilder().retention(RETENTION_POLICY).build()) + .build(); + + TableDto createdDTOWithUpdatedRetentionPolicy = + openHouseInternalRepository.save(tableDTOWithUpdatedRetentionPolicy); + Assertions.assertTrue( + createdDTOWithUpdatedRetentionPolicy.getTableProperties().containsKey(replicationKey)); + Assertions.assertFalse( + Boolean.parseBoolean( + createdDTOWithUpdatedRetentionPolicy.getTableProperties().get(replicationKey))); + + // Update replication policy to different values and assert that tblProperty values is set back + // to true ArrayList configs = new ArrayList<>(); configs.add(ReplicationConfig.builder().destination("CLUSTER1").interval("15H").build()); TableDto tableDTOWithUpdatedReplicationPolicy = - createdDTOWithTblProps + createdDTOWithUpdatedRetentionPolicy .toBuilder() - .tableVersion(createdDTOWithTblProps.getTableLocation()) + .tableVersion(createdDTOWithUpdatedRetentionPolicy.getTableLocation()) .policies( TABLE_POLICIES .toBuilder() @@ -510,7 +529,7 @@ public void testCreateTableWithReplicationProperty() { TableDto createdDTOWithUpdatedReplicationPolicy = openHouseInternalRepository.save(tableDTOWithUpdatedReplicationPolicy); Assertions.assertTrue( - createdDTOWithUpdatedReplicationPolicy.getTableProperties().containsKey("replicationKey")); + createdDTOWithUpdatedReplicationPolicy.getTableProperties().containsKey(replicationKey)); Assertions.assertTrue( Boolean.parseBoolean( createdDTOWithUpdatedReplicationPolicy.getTableProperties().get(replicationKey))); From 48e994719b1f334823e0cc8d4267521ffec1d754 Mon Sep 17 00:00:00 2001 From: rohitkumar Date: Thu, 30 Jan 2025 09:49:31 -0800 Subject: [PATCH 3/3] Minor refactor --- .../java/com/linkedin/openhouse/jobs/client/TablesClient.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java index c8992f38a..d243ccd43 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java @@ -124,10 +124,6 @@ private Optional> getTableReplication(GetTableResponseBo return Optional.of(replicationConfigList); } - protected String getTableProperty(String propertyName, Map tblProperties) { - return tblProperties.getOrDefault(propertyName, null); - } - protected GetTableResponseBody getTable(TableMetadata tableMetadata) { return getTable(tableMetadata.getDbName(), tableMetadata.getTableName()); }