From 5ad89d03024419641e3aefe123647e01dedd8b6e Mon Sep 17 00:00:00 2001 From: Lorenzo Affetti Date: Fri, 27 Mar 2026 15:14:16 +0100 Subject: [PATCH] [fs/azure] Set auth.type=Custom for token delegation --- .../token/AzureDelegationTokenReceiver.java | 5 + .../AbfsFileSystemDelegationTokenITCase.java | 99 +++++++++++++++++++ .../fs/azure/AzureFileSystemPluginTest.java | 12 ++- .../AzureDelegationTokenReceiverTest.java | 3 + 4 files changed, 118 insertions(+), 1 deletion(-) create mode 100644 fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AbfsFileSystemDelegationTokenITCase.java diff --git a/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiver.java b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiver.java index f59444e837..cf9fb43b31 100644 --- a/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiver.java +++ b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiver.java @@ -56,6 +56,11 @@ public static void updateHadoopConfig(org.apache.hadoop.conf.Configuration hadoo LOG.debug("Provider already exists"); } + // Tell the ABFS driver to use the custom token provider instead of defaulting to SharedKey. + // DynamicTemporaryAzureCredentialsProvider implements CustomTokenProviderAdaptee, which + // requires auth.type=Custom to be activated. + hadoopConfig.set("fs.azure.account.auth.type", "Custom"); + // then, set addition info if (additionInfos == null) { // if addition info is null, it also means we have not received any token, diff --git a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AbfsFileSystemDelegationTokenITCase.java b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AbfsFileSystemDelegationTokenITCase.java new file mode 100644 index 0000000000..8890239277 --- /dev/null +++ b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AbfsFileSystemDelegationTokenITCase.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.fs.azure; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.fs.FileSystem; +import org.apache.fluss.fs.FileSystemBehaviorTestSuite; +import org.apache.fluss.fs.FsPath; +import org.apache.fluss.fs.azure.token.AbfsDelegationTokenReceiver; +import org.apache.fluss.fs.azure.token.AzureDelegationTokenReceiver; +import org.apache.fluss.fs.token.Credentials; +import org.apache.fluss.fs.token.CredentialsJsonSerde; +import org.apache.fluss.fs.token.ObtainedSecurityToken; + +import org.apache.commons.lang3.reflect.FieldUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +import java.io.IOException; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +/** + * Tests that validate the Azure File System Plugin initializes correctly via the delegation token + * path — i.e., when no {@code fs.azure.account.key} is configured and the client receives an OAuth + * token from the server instead. This is the code path exercised by Fluss clients reading KV + * snapshots from remote Azure storage. + */ +class AbfsFileSystemDelegationTokenITCase extends FileSystemBehaviorTestSuite { + + private static final String ABFS_FS_PATH = "abfs://flus@test.dfs.core.windows.net/test"; + + @BeforeAll + static void setup() throws Exception { + // Simulate the client receiving a delegation token from the server. + // No fs.azure.account.key is set — this is the client-side path. + Credentials credentials = new Credentials(null, null, "fake-oauth-access-token"); + Map additionInfos = new HashMap<>(); + additionInfos.put( + AzureFileSystemOptions.ENDPOINT_KEY.key(), + "https://login.microsoftonline.com/fake-tenant/oauth2/token"); + ObtainedSecurityToken token = + new ObtainedSecurityToken( + "abfs", + CredentialsJsonSerde.toJson(credentials), + System.currentTimeMillis() + 3_600_000L, + additionInfos); + new AbfsDelegationTokenReceiver().onNewTokensObtained(token); + + // Initialize without account key. + FileSystem.initialize(new Configuration(), null); + } + + @AfterAll + static void tearDown() throws Exception { + // Reset static token state so other tests in the same JVM are not affected. + FieldUtils.writeStaticField( + AzureDelegationTokenReceiver.class, "additionInfos", null, true); + FieldUtils.writeStaticField(AzureDelegationTokenReceiver.class, "credentials", null, true); + FieldUtils.writeStaticField(AzureDelegationTokenReceiver.class, "validUntil", null, true); + } + + @Override + protected FileSystem getFileSystem() throws IOException { + return getBasePath().getFileSystem(); + } + + @Override + protected FsPath getBasePath() throws IOException { + FsPath fsPath = new FsPath(ABFS_FS_PATH); + applyMockStorage(fsPath.getFileSystem()); + return fsPath; + } + + private static void applyMockStorage(FileSystem fileSystem) throws IOException { + try { + MemoryFileSystem memoryFileSystem = new MemoryFileSystem(URI.create(ABFS_FS_PATH)); + FieldUtils.writeField(fileSystem, "fs", memoryFileSystem, true); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } +} diff --git a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AzureFileSystemPluginTest.java b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AzureFileSystemPluginTest.java index 4a25bb4bd6..6d19867735 100644 --- a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AzureFileSystemPluginTest.java +++ b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AzureFileSystemPluginTest.java @@ -97,7 +97,17 @@ void testCreateWithoutAccountKey() throws Exception { try { plugin.create(uri, flussConfig); } catch (Exception e) { - // expected or ignored + // ABFS will fail to connect to Azure in a test environment, but it must NOT fail + // with a SharedKey/SimpleKeyProvider error — that would mean fs.azure.account.auth.type + // was not set to "Custom", causing the driver to ignore the custom token provider. + Throwable cause = e; + while (cause != null) { + assertThat(cause.getMessage()) + .as( + "ABFS should not fall back to SharedKey auth when delegation tokens are configured") + .doesNotContain("Failure to initialize configuration"); + cause = cause.getCause(); + } } } diff --git a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiverTest.java b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiverTest.java index c17ebc6c0f..f839b90a54 100644 --- a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiverTest.java +++ b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiverTest.java @@ -61,6 +61,7 @@ void updateHadoopConfigShouldSetProviderWhenEmpty() { AzureDelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration); assertThat(hadoopConfiguration.get(PROVIDER_CONFIG_NAME.key())) .isEqualTo(DynamicTemporaryAzureCredentialsProvider.NAME); + assertThat(hadoopConfiguration.get("fs.azure.account.auth.type")).isEqualTo("Custom"); } @Test @@ -73,6 +74,7 @@ void updateHadoopConfigShouldPrependProviderWhenNotEmpty() { assertThat(providers.length).isEqualTo(2); assertThat(providers[0]).isEqualTo(DynamicTemporaryAzureCredentialsProvider.NAME); assertThat(providers[1]).isEqualTo(PROVIDER_CLASS_NAME); + assertThat(hadoopConfiguration.get("fs.azure.account.auth.type")).isEqualTo("Custom"); } @Test @@ -84,5 +86,6 @@ void updateHadoopConfigShouldNotAddProviderWhenAlreadyExists() { AzureDelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration); assertThat(hadoopConfiguration.get(PROVIDER_CONFIG_NAME.key())) .isEqualTo(DynamicTemporaryAzureCredentialsProvider.NAME); + assertThat(hadoopConfiguration.get("fs.azure.account.auth.type")).isEqualTo("Custom"); } }