diff --git a/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowpipeIngestClient.java b/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowpipeIngestClient.java index c94065568692..572025a3c0fb 100644 --- a/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowpipeIngestClient.java +++ b/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowpipeIngestClient.java @@ -24,7 +24,9 @@ import org.apache.nifi.processors.snowflake.snowpipe.InsertReport; import java.io.IOException; +import java.net.Authenticator; import java.net.HttpURLConnection; +import java.net.ProxySelector; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; @@ -75,6 +77,25 @@ class SnowpipeIngestClient implements AutoCloseable { final URI baseUri, final String pipeName, final RSAKeyAuthorizationProvider authorizationProvider + ) { + this(baseUri, pipeName, authorizationProvider, null, null); + } + + /** + * Snowpipe Ingest Client with proxy support + * + * @param baseUri Base URI for the Snowpipe REST API + * @param pipeName Fully qualified pipe name + * @param authorizationProvider RSA Key Authorization Provider for JWT authentication + * @param proxySelector Optional proxy selector; {@code null} uses the system default + * @param proxyAuthenticator Optional authenticator for proxy credentials; {@code null} when not required + */ + SnowpipeIngestClient( + final URI baseUri, + final String pipeName, + final RSAKeyAuthorizationProvider authorizationProvider, + final ProxySelector proxySelector, + final Authenticator proxyAuthenticator ) { Objects.requireNonNull(baseUri, "Base URI required"); Objects.requireNonNull(pipeName, "Pipe Name required"); @@ -87,9 +108,16 @@ class SnowpipeIngestClient implements AutoCloseable { this.insertReportUri = baseUri.resolve(insertReportPath); this.authorizationProvider = authorizationProvider; - this.httpClient = HttpClient.newBuilder() - .connectTimeout(CONNECT_TIMEOUT) - .build(); + + final HttpClient.Builder builder = HttpClient.newBuilder() + .connectTimeout(CONNECT_TIMEOUT); + if (proxySelector != null) { + builder.proxy(proxySelector); + } + if (proxyAuthenticator != null) { + builder.authenticator(proxyAuthenticator); + } + this.httpClient = builder.build(); } @Override diff --git a/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/StandardSnowflakeIngestManagerProviderService.java b/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/StandardSnowflakeIngestManagerProviderService.java index d7d0b049f866..7ce7eef17375 100644 --- a/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/StandardSnowflakeIngestManagerProviderService.java +++ b/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/StandardSnowflakeIngestManagerProviderService.java @@ -22,6 +22,8 @@ import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.expression.ExpressionLanguageScope; @@ -32,14 +34,24 @@ import org.apache.nifi.processors.snowflake.snowpipe.InsertFiles; import org.apache.nifi.processors.snowflake.snowpipe.InsertReport; import org.apache.nifi.processors.snowflake.util.SnowflakeProperties; +import org.apache.nifi.proxy.ProxyConfiguration; +import org.apache.nifi.proxy.ProxySpec; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.snowflake.service.util.AccountIdentifierFormat; import org.apache.nifi.snowflake.service.util.AccountIdentifierFormatParameters; import org.apache.nifi.snowflake.service.util.ConnectionUrlFormat; +import java.io.IOException; +import java.net.Authenticator; +import java.net.PasswordAuthentication; +import java.net.Proxy; +import java.net.ProxySelector; +import java.net.SocketAddress; import java.net.URI; import java.security.PrivateKey; import java.security.interfaces.RSAPrivateCrtKey; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; @Tags({"snowflake", "snowpipe", "database", "connection"}) @@ -124,6 +136,19 @@ public class StandardSnowflakeIngestManagerProviderService extends AbstractContr .required(true) .build(); + // Only HTTP proxy type is supported. SOCKS is silently ignored by java.net.http.HttpClient. + // For authenticated proxies: the JDK disables Basic auth over CONNECT tunneling by default + // (jdk.http.auth.tunneling.disabledSchemes=Basic). To use proxy credentials with an HTTPS + // target like Snowflake, add -Djdk.http.auth.tunneling.disabledSchemes="" to bootstrap.conf. + static final PropertyDescriptor PROXY_CONFIGURATION = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(ProxyConfiguration.createProxyConfigPropertyDescriptor(ProxySpec.HTTP_AUTH)) + .description("Specifies the Proxy Configuration Controller Service to proxy network requests." + + " Only HTTP proxy type is supported." + + " For authenticated proxy with HTTPS targets, the JDK disables Basic authentication" + + " over CONNECT tunneling by default. To enable it, add" + + " -Djdk.http.auth.tunneling.disabledSchemes=\"\" to bootstrap.conf.") + .build(); + static final List PROPERTY_DESCRIPTORS = List.of( ACCOUNT_IDENTIFIER_FORMAT, HOST_URL, @@ -136,7 +161,8 @@ public class StandardSnowflakeIngestManagerProviderService extends AbstractContr PRIVATE_KEY_SERVICE, DATABASE, SCHEMA, - PIPE + PIPE, + PROXY_CONFIGURATION ); private static final String HTTPS_URI_FORMAT = "https://%s"; @@ -147,6 +173,13 @@ public class StandardSnowflakeIngestManagerProviderService extends AbstractContr private static final char HYPHEN = '-'; + @Override + protected Collection customValidate(final ValidationContext context) { + final List results = new ArrayList<>(super.customValidate(context)); + ProxyConfiguration.validateProxySpec(context, results, ProxySpec.HTTP_AUTH); + return results; + } + @Override protected List getSupportedPropertyDescriptors() { return PROPERTY_DESCRIPTORS; @@ -173,7 +206,11 @@ public void onEnabled(final ConfigurationContext context) throws InitializationE final URI baseUri = URI.create(HTTPS_URI_FORMAT.formatted(hostNormalized)); final RSAKeyAuthorizationProvider authorizationProvider = new RSAKeyAuthorizationProvider(account, user, rsaPrivateKey); - ingestClient = new SnowpipeIngestClient(baseUri, qualifiedPipeName, authorizationProvider); + + final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context); + final ProxySelector proxySelector = buildProxySelector(proxyConfiguration); + final Authenticator proxyAuthenticator = buildProxyAuthenticator(proxyConfiguration); + ingestClient = new SnowpipeIngestClient(baseUri, qualifiedPipeName, authorizationProvider, proxySelector, proxyAuthenticator); } else { throw new InitializationException("RSA Private Key not provided"); } @@ -213,6 +250,37 @@ public void migrateProperties(PropertyConfiguration config) { config.renameProperty(SnowflakeProperties.OLD_SCHEMA_PROPERTY_NAME, SnowflakeProperties.SCHEMA.getName()); } + private ProxySelector buildProxySelector(final ProxyConfiguration proxyConfiguration) { + final Proxy proxy = proxyConfiguration.createProxy(); + return new ProxySelector() { + @Override + public List select(final URI uri) { + return List.of(proxy); + } + + @Override + public void connectFailed(final URI uri, final SocketAddress sa, final IOException ioe) { + } + }; + } + + private Authenticator buildProxyAuthenticator(final ProxyConfiguration proxyConfiguration) { + if (!proxyConfiguration.hasCredential()) { + return null; + } + final String proxyUser = proxyConfiguration.getProxyUserName(); + final String proxyPassword = proxyConfiguration.getProxyUserPassword(); + return new Authenticator() { + @Override + protected PasswordAuthentication getPasswordAuthentication() { + if (getRequestorType() == RequestorType.PROXY) { + return new PasswordAuthentication(proxyUser, proxyPassword.toCharArray()); + } + return super.getPasswordAuthentication(); + } + }; + } + private AccountIdentifierFormatParameters getAccountIdentifierFormatParameters(ConfigurationContext context) { return new AccountIdentifierFormatParameters( context.getProperty(HOST_URL).evaluateAttributeExpressions().getValue(), diff --git a/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/test/java/org/apache/nifi/snowflake/service/SnowpipeIngestClientTest.java b/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/test/java/org/apache/nifi/snowflake/service/SnowpipeIngestClientTest.java index 57cb5dd082bd..9a3cc54dd5f1 100644 --- a/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/test/java/org/apache/nifi/snowflake/service/SnowpipeIngestClientTest.java +++ b/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/test/java/org/apache/nifi/snowflake/service/SnowpipeIngestClientTest.java @@ -31,7 +31,12 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import java.io.IOException; import java.net.HttpURLConnection; +import java.net.InetSocketAddress; +import java.net.Proxy; +import java.net.ProxySelector; +import java.net.SocketAddress; import java.net.URI; import java.security.KeyPair; import java.security.KeyPairGenerator; @@ -264,6 +269,49 @@ void testGetInsertReportErrorResponse() { assertTrue(exception.getMessage().contains(String.valueOf(HttpURLConnection.HTTP_INTERNAL_ERROR))); } + @Test + void testInsertFilesViaProxy() throws InterruptedException, NoSuchAlgorithmException { + // MockWebServer acts as the HTTP proxy. The base URI points to a distinct fake host + // so that the request only reaches MockWebServer if the proxy selector is honoured. + // When Java's HttpClient routes a request through an HTTP proxy it sends the target + // in absolute form (e.g. "http://fake-snowflake.example.com/v1/..."), which lets us + // assert that the proxy path was actually exercised. + mockWebServer.enqueue(new MockResponse.Builder() + .code(HttpURLConnection.HTTP_OK) + .addHeader(CONTENT_TYPE_HEADER, APPLICATION_JSON) + .body(INSERT_FILES_SUCCESS_RESPONSE) + .build()); + + final InetSocketAddress proxyAddress = new InetSocketAddress(mockWebServer.getHostName(), mockWebServer.getPort()); + final Proxy proxy = new Proxy(Proxy.Type.HTTP, proxyAddress); + final ProxySelector proxySelector = new ProxySelector() { + @Override + public List select(final URI uri) { + return List.of(proxy); + } + + @Override + public void connectFailed(final URI uri, final SocketAddress sa, final IOException ioe) { + } + }; + + // Intentionally different from the proxy address to prove routing goes via the proxy + final URI targetBaseUri = URI.create("http://fake-snowflake.example.com"); + final RSAPrivateCrtKey privateKey = generatePrivateKey(); + final RSAKeyAuthorizationProvider authProvider = new RSAKeyAuthorizationProvider(ACCOUNT, USER, privateKey); + + try (final SnowpipeIngestClient proxyClient = new SnowpipeIngestClient(targetBaseUri, PIPE_NAME, authProvider, proxySelector, null)) { + proxyClient.insertFiles(new InsertFiles(List.of(new InsertFile(STAGED_FILE_PATH)))); + } + + final RecordedRequest request = mockWebServer.takeRequest(); + // Absolute-form target confirms the request was routed through the configured proxy + final String target = request.getTarget(); + assertNotNull(target); + assertTrue(target.startsWith("http://fake-snowflake.example.com"), "Expected absolute-form proxy target but got: " + target); + assertNotNull(request.getHeaders().get(AUTHORIZATION_HEADER)); + } + private static RSAPrivateCrtKey generatePrivateKey() throws NoSuchAlgorithmException { final KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance(KEY_ALGORITHM); final KeyPair keyPair = keyPairGenerator.generateKeyPair(); diff --git a/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/test/java/org/apache/nifi/snowflake/service/TestStandardSnowflakeIngestManagerProviderService.java b/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/test/java/org/apache/nifi/snowflake/service/TestStandardSnowflakeIngestManagerProviderService.java index cdb25ff4a021..4cf66862b9ff 100644 --- a/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/test/java/org/apache/nifi/snowflake/service/TestStandardSnowflakeIngestManagerProviderService.java +++ b/nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/test/java/org/apache/nifi/snowflake/service/TestStandardSnowflakeIngestManagerProviderService.java @@ -136,6 +136,15 @@ void testMigrateProperties() { assertEquals(expectedRenamed, propertiesRenamed); } + @Test + void testProxyConfigurationServicePropertyPresent() { + final StandardSnowflakeIngestManagerProviderService service = new StandardSnowflakeIngestManagerProviderService(); + final boolean hasProxyProperty = service.getSupportedPropertyDescriptors() + .stream() + .anyMatch(pd -> pd.getName().equals(StandardSnowflakeIngestManagerProviderService.PROXY_CONFIGURATION.getName())); + assertTrue(hasProxyProperty, "Proxy Configuration Service property should be present"); + } + @Test void testInsertFilesRequest() throws Exception { mockWebServer.enqueue(new MockResponse.Builder()