Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public static void updateHadoopConfig(org.apache.hadoop.conf.Configuration hadoo
LOG.debug("Provider already exists");
}

// Tell the ABFS driver to use the custom token provider instead of defaulting to SharedKey.
// DynamicTemporaryAzureCredentialsProvider implements CustomTokenProviderAdaptee, which
// requires auth.type=Custom to be activated.
hadoopConfig.set("fs.azure.account.auth.type", "Custom");

// then, set addition info
if (additionInfos == null) {
// if addition info is null, it also means we have not received any token,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.fs.azure;

import org.apache.fluss.config.Configuration;
import org.apache.fluss.fs.FileSystem;
import org.apache.fluss.fs.FileSystemBehaviorTestSuite;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.fs.azure.token.AbfsDelegationTokenReceiver;
import org.apache.fluss.fs.azure.token.AzureDelegationTokenReceiver;
import org.apache.fluss.fs.token.Credentials;
import org.apache.fluss.fs.token.CredentialsJsonSerde;
import org.apache.fluss.fs.token.ObtainedSecurityToken;

import org.apache.commons.lang3.reflect.FieldUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;

import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

/**
* Tests that validate the Azure File System Plugin initializes correctly via the delegation token
* path — i.e., when no {@code fs.azure.account.key} is configured and the client receives an OAuth
* token from the server instead. This is the code path exercised by Fluss clients reading KV
* snapshots from remote Azure storage.
*/
class AbfsFileSystemDelegationTokenITCase extends FileSystemBehaviorTestSuite {

private static final String ABFS_FS_PATH = "abfs://flus@test.dfs.core.windows.net/test";

@BeforeAll
static void setup() throws Exception {
// Simulate the client receiving a delegation token from the server.
// No fs.azure.account.key is set — this is the client-side path.
Credentials credentials = new Credentials(null, null, "fake-oauth-access-token");
Map<String, String> additionInfos = new HashMap<>();
additionInfos.put(
AzureFileSystemOptions.ENDPOINT_KEY.key(),
"https://login.microsoftonline.com/fake-tenant/oauth2/token");
ObtainedSecurityToken token =
new ObtainedSecurityToken(
"abfs",
CredentialsJsonSerde.toJson(credentials),
System.currentTimeMillis() + 3_600_000L,
additionInfos);
new AbfsDelegationTokenReceiver().onNewTokensObtained(token);

// Initialize without account key.
FileSystem.initialize(new Configuration(), null);
}

@AfterAll
static void tearDown() throws Exception {
// Reset static token state so other tests in the same JVM are not affected.
FieldUtils.writeStaticField(
AzureDelegationTokenReceiver.class, "additionInfos", null, true);
FieldUtils.writeStaticField(AzureDelegationTokenReceiver.class, "credentials", null, true);
FieldUtils.writeStaticField(AzureDelegationTokenReceiver.class, "validUntil", null, true);
}

@Override
protected FileSystem getFileSystem() throws IOException {
return getBasePath().getFileSystem();
}

@Override
protected FsPath getBasePath() throws IOException {
FsPath fsPath = new FsPath(ABFS_FS_PATH);
applyMockStorage(fsPath.getFileSystem());
return fsPath;
}

private static void applyMockStorage(FileSystem fileSystem) throws IOException {
try {
MemoryFileSystem memoryFileSystem = new MemoryFileSystem(URI.create(ABFS_FS_PATH));
FieldUtils.writeField(fileSystem, "fs", memoryFileSystem, true);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,17 @@ void testCreateWithoutAccountKey() throws Exception {
try {
plugin.create(uri, flussConfig);
} catch (Exception e) {
// expected or ignored
// ABFS will fail to connect to Azure in a test environment, but it must NOT fail
// with a SharedKey/SimpleKeyProvider error — that would mean fs.azure.account.auth.type
// was not set to "Custom", causing the driver to ignore the custom token provider.
Throwable cause = e;
while (cause != null) {
assertThat(cause.getMessage())
.as(
"ABFS should not fall back to SharedKey auth when delegation tokens are configured")
.doesNotContain("Failure to initialize configuration");
cause = cause.getCause();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ void updateHadoopConfigShouldSetProviderWhenEmpty() {
AzureDelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
assertThat(hadoopConfiguration.get(PROVIDER_CONFIG_NAME.key()))
.isEqualTo(DynamicTemporaryAzureCredentialsProvider.NAME);
assertThat(hadoopConfiguration.get("fs.azure.account.auth.type")).isEqualTo("Custom");
}

@Test
Expand All @@ -73,6 +74,7 @@ void updateHadoopConfigShouldPrependProviderWhenNotEmpty() {
assertThat(providers.length).isEqualTo(2);
assertThat(providers[0]).isEqualTo(DynamicTemporaryAzureCredentialsProvider.NAME);
assertThat(providers[1]).isEqualTo(PROVIDER_CLASS_NAME);
assertThat(hadoopConfiguration.get("fs.azure.account.auth.type")).isEqualTo("Custom");
}

@Test
Expand All @@ -84,5 +86,6 @@ void updateHadoopConfigShouldNotAddProviderWhenAlreadyExists() {
AzureDelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
assertThat(hadoopConfiguration.get(PROVIDER_CONFIG_NAME.key()))
.isEqualTo(DynamicTemporaryAzureCredentialsProvider.NAME);
assertThat(hadoopConfiguration.get("fs.azure.account.auth.type")).isEqualTo("Custom");
}
}