Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti
- Generic Table is no longer in beta and is generally-available.
- Added Windows support for Python client.
- (Before/After)UpdateTableEvent is emitted for all table updates within a transaction.
- Added KMS options to Polaris CLI
- Changed from Poetry to UV for Python package management
- Added KMS options to Polaris CLI.
- Changed from Poetry to UV for Python package management.
- Exclude KMS policies when KMS is not being used for S3.
- Improved default KMS permission handling to better distinguish read-only and read-write access.

### Deprecations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,16 @@ private static void addKmsKeyPolicy(
String region,
String accountId) {

IamStatement.Builder allowKms = buildBaseKmsStatement(canWrite);
boolean hasCurrentKey = kmsKeyArn != null;
boolean hasAllowedKeys = hasAllowedKmsKeys(allowedKmsKeys);
boolean isAwsS3 = region != null && accountId != null;

// Nothing to do if no keys are configured and not AWS S3
if (!hasCurrentKey && !hasAllowedKeys && !isAwsS3) {
return;
}

IamStatement.Builder allowKms = buildBaseKmsStatement(canWrite);

if (hasCurrentKey) {
addKmsKeyResource(kmsKeyArn, allowKms);
Expand All @@ -311,30 +318,31 @@ private static void addKmsKeyPolicy(
addAllowedKmsKeyResources(allowedKmsKeys, allowKms);
}

// Add KMS statement if we have any KMS key configuration
if (hasCurrentKey || hasAllowedKeys) {
// Only add wildcard KMS access for read-only operations on AWS S3 when no specific keys are
// configured. This does not apply to services like Minio where region and accountId are not
// available.
boolean shouldAddWildcard = !hasCurrentKey && !hasAllowedKeys && !canWrite && isAwsS3;
if (shouldAddWildcard) {
addAllKeysResource(region, accountId, allowKms);
}

if (hasCurrentKey || hasAllowedKeys || shouldAddWildcard) {
policyBuilder.addStatement(allowKms.build());
} else if (!canWrite) {
// Only add wildcard KMS access for read-only operations when no specific keys are configured
// this check is for minio because it doesn't have region or account id
if (region != null && accountId != null) {
addAllKeysResource(region, accountId, allowKms);
policyBuilder.addStatement(allowKms.build());
}
}
}

private static IamStatement.Builder buildBaseKmsStatement(boolean canEncrypt) {
IamStatement.Builder allowKms =
IamStatement.builder()
.effect(IamEffect.ALLOW)
.addAction("kms:GenerateDataKeyWithoutPlaintext")
.addAction("kms:DescribeKey")
.addAction("kms:Decrypt")
.addAction("kms:GenerateDataKey");
.addAction("kms:Decrypt");

if (canEncrypt) {
allowKms.addAction("kms:Encrypt");
allowKms
.addAction("kms:Encrypt")
.addAction("kms:GenerateDataKey")
.addAction("kms:GenerateDataKeyWithoutPlaintext");
}

return allowKms;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,11 +468,8 @@ public void testGetSubscopedCredsInlinePolicyWithoutWrites() {
.returns(IamEffect.ALLOW, IamStatement::effect)
.returns(
List.of(
IamAction.create(
"kms:GenerateDataKeyWithoutPlaintext"),
IamAction.create("kms:DescribeKey"),
IamAction.create("kms:Decrypt"),
IamAction.create("kms:GenerateDataKey")),
IamAction.create("kms:Decrypt")),
IamStatement::actions)
.returns(
List.of(
Expand Down Expand Up @@ -583,11 +580,8 @@ public void testGetSubscopedCredsInlinePolicyWithEmptyReadAndWrite() {
.returns(IamEffect.ALLOW, IamStatement::effect)
.returns(
List.of(
IamAction.create(
"kms:GenerateDataKeyWithoutPlaintext"),
IamAction.create("kms:DescribeKey"),
IamAction.create("kms:Decrypt"),
IamAction.create("kms:GenerateDataKey")),
IamAction.create("kms:Decrypt")),
IamStatement::actions)
.returns(
List.of(
Expand Down Expand Up @@ -824,11 +818,14 @@ public void testKmsKeyPolicyLogic() {
assertThat(stmt.actions())
.containsAll(
List.of(
IamAction.create("kms:GenerateDataKeyWithoutPlaintext"),
IamAction.create("kms:DescribeKey"),
IamAction.create("kms:Decrypt"),
IamAction.create("kms:GenerateDataKey")));
assertThat(stmt.actions()).doesNotContain(IamAction.create("kms:Encrypt"));
IamAction.create("kms:Decrypt")));
assertThat(stmt.actions())
.doesNotContainAnyElementsOf(
List.of(
IamAction.create("kms:Encrypt"),
IamAction.create("kms:GenerateDataKey"),
IamAction.create("kms:GenerateDataKeyWithoutPlaintext")));
assertThat(stmt.resources())
.containsExactlyInAnyOrder(
IamResource.create(allowedKmsKeys.get(0)),
Expand Down Expand Up @@ -1473,4 +1470,45 @@ public String getConfiguration(@Nonnull RealmContext ctx, String configName) {
.isInstanceOf(software.amazon.awssdk.services.sts.model.StsException.class)
.hasMessageContaining("sts:TagSession");
}

@Test
public void testNoKmsForNonAwsReadOnly() {
StsClient stsClient = Mockito.mock(StsClient.class);
String roleARN = "arn:aws:iam::012345678901:role/jdoe";
String externalId = "externalId";
String bucket = "bucket";
String warehouseKeyPrefix = "path/to/warehouse";

// Test with no KMS keys and read-only for non-AWS (should not add any KMS statement)
Mockito.when(stsClient.assumeRole(Mockito.isA(AssumeRoleRequest.class)))
.thenAnswer(
invocation -> {
AssumeRoleRequest request = invocation.getArgument(0);
IamPolicy policy = IamPolicy.fromJson(request.policy());

// Verify no KMS statement exists
assertThat(policy.statements())
.noneMatch(
stmt ->
stmt.actions().stream()
.anyMatch(action -> action.value().startsWith("kms:")));
return ASSUME_ROLE_RESPONSE;
});

new AwsCredentialsStorageIntegration(
AwsStorageConfigurationInfo.builder()
.addAllowedLocation(s3Path(bucket, warehouseKeyPrefix))
.roleARN(roleARN)
.externalId(externalId)
.build(),
stsClient)
.getSubscopedCreds(
EMPTY_REALM_CONFIG,
true,
Set.of(s3Path(bucket, warehouseKeyPrefix + "/table")),
Set.of(),
POLARIS_PRINCIPAL,
Optional.empty(),
CredentialVendingContext.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1409,6 +1409,22 @@ public void doRefresh() {
Set.of(PolarisStorageActions.READ, PolarisStorageActions.LIST));
return TableMetadataParser.read(fileIO, metadataLocation);
});

// After a refresh, re-load the FileIO with the new table metadata properties to
// ensure the right permissions are present for subsequent file system interactions.
if (currentMetadata != null) {
tableFileIO =
loadFileIOForTableLike(
Comment on lines +1415 to +1417
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is that related to KMS policies?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's just a parallel bugfix, I'd prefer to make it in a separate PR for the sake of clarity 🤔 WDYT?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Appear to be a parallel bug. I am waiting for testing from reporter then I can split this into two PRs. Currently if we merged current PR, it will trigger diff error from reporter.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good - thx!

tableIdentifier,
StorageUtil.getLocationsUsedByTable(currentMetadata),
resolvedEntities,
new HashMap<>(currentMetadata.properties()),
Set.of(
PolarisStorageActions.READ,
PolarisStorageActions.WRITE,
PolarisStorageActions.LIST));
}

polarisEventListener.onEvent(
new PolarisEvent(
PolarisEventType.AFTER_REFRESH_TABLE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.apache.polaris.core.identity.provider.ServiceIdentityProvider;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.persistence.cache.EntityCache;
import org.apache.polaris.core.persistence.dao.entity.BaseResult;
import org.apache.polaris.core.persistence.dao.entity.EntityResult;
Expand All @@ -124,6 +125,7 @@
import org.apache.polaris.core.persistence.resolver.ResolverFactory;
import org.apache.polaris.core.secrets.UserSecretsManager;
import org.apache.polaris.core.storage.CredentialVendingContext;
import org.apache.polaris.core.storage.PolarisStorageActions;
import org.apache.polaris.core.storage.PolarisStorageIntegration;
import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider;
import org.apache.polaris.core.storage.StorageAccessConfig;
Expand Down Expand Up @@ -2503,4 +2505,66 @@ public void testPaginatedListNamespaces() {
}
}
}

@Test
public void testFileIOIsRefreshedOnTableRefresh() {
// Catalog use the spied provider to verify internal behavior
StorageAccessConfigProvider spiedProvider = spy(storageAccessConfigProvider);
PolarisPassthroughResolutionView passthroughView =
new PolarisPassthroughResolutionView(
resolutionManifestFactory, authenticatedRoot, CATALOG_NAME);
TaskExecutor taskExecutor = Mockito.mock(TaskExecutor.class);
IcebergCatalog catalog =
new IcebergCatalog(
diagServices,
resolverFactory,
metaStoreManager,
polarisContext,
passthroughView,
authenticatedRoot,
taskExecutor,
spiedProvider,
fileIOFactory,
polarisEventListener,
eventMetadataFactory);
catalog.setCatalogFileIo(new InMemoryFileIO());
catalog.initialize(
CATALOG_NAME,
Map.of(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO"));

// Create a table
TableIdentifier tableIdentifier = TableIdentifier.of(NS, "refresh_test_table");
if (requiresNamespaceCreate()) {
catalog.createNamespace(NS);
}
catalog.buildTable(tableIdentifier, SCHEMA).create();

// Get the table operations
IcebergCatalog.BasePolarisTableOperations operations =
(IcebergCatalog.BasePolarisTableOperations)
((BaseTable) catalog.loadTable(tableIdentifier)).operations();

// Verify initial state
assertThat(operations.io()).isNotNull();

// refresh the table
operations.refresh();

// Verify that getStorageAccessConfig was called with WRITE permissions at least twice:
// 1. during table creation
// 2. during table refresh, to update the FileIO with WRITE permissions
Mockito.verify(spiedProvider, Mockito.atLeast(2))
.getStorageAccessConfig(
Mockito.eq(tableIdentifier),
Mockito.anySet(),
Mockito.argThat(
actions ->
actions.containsAll(
Set.of(
PolarisStorageActions.READ,
PolarisStorageActions.WRITE,
PolarisStorageActions.LIST))),
Mockito.any(),
Mockito.any(PolarisResolvedPathWrapper.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,10 @@ public void testLoadFileIOForCleanupTask(String scheme) {
.isInstanceOf(InMemoryFileIO.class);

// 1. BasePolarisCatalog:doCommit: for writing the table during the creation
// 2. BasePolarisCatalog:doRefresh: for reading the table during the drop
// 2. BasePolarisCatalog:doRefresh: for reading the table during the drop (2 calls: 1 to read, 1
// to reload)
// 3. TaskFileIOSupplier:apply: for clean up metadata files and merge files
Mockito.verify(testServices.fileIOFactory(), Mockito.times(3))
Mockito.verify(testServices.fileIOFactory(), Mockito.times(4))
.loadFileIO(Mockito.any(), Mockito.any(), Mockito.any());
}

Expand Down