From 651af4dfc57187c1cc545b15b7249518b607e29b Mon Sep 17 00:00:00 2001 From: davidradl Date: Tue, 18 Nov 2025 12:02:54 +0000 Subject: [PATCH 1/3] http187 HTTP content tracing Signed-off-by: davidradl --- CHANGELOG.md | 1 + README.md | 23 +- .../connectors/http/internal/HttpLogger.java | 115 ++++++++++ .../http/internal/HttpLoggingLevelType.java | 15 ++ .../config/HttpConnectorConfigConstants.java | 4 + .../httpclient/BatchRequestSubmitter.java | 6 +- .../httpclient/JavaNetSinkHttpClient.java | 7 +- .../sink/httpclient/PerRequestSubmitter.java | 3 +- .../table/lookup/BodyBasedRequestFactory.java | 9 +- .../lookup/HttpLookupConnectorOptions.java | 17 ++ .../lookup/JavaNetHttpPollingClient.java | 10 +- .../sink/Slf4jHttpPostRequestCallback.java | 6 - .../http/internal/HttpLoggerRequestTest.java | 127 +++++++++++ .../http/internal/HttpLoggerResponseTest.java | 211 ++++++++++++++++++ src/test/resources/simpleLogger.properties | 2 +- 15 files changed, 539 insertions(+), 17 deletions(-) create mode 100644 src/main/java/com/getindata/connectors/http/internal/HttpLogger.java create mode 100644 src/main/java/com/getindata/connectors/http/internal/HttpLoggingLevelType.java create mode 100644 src/test/java/com/getindata/connectors/http/internal/HttpLoggerRequestTest.java create mode 100644 src/test/java/com/getindata/connectors/http/internal/HttpLoggerResponseTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 0672dcf8..d6be70e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## [Unreleased] +- Allow config control of log HTTP request, response and header logging content ## [0.23.0] - 2025-11-07 diff --git a/README.md b/README.md index 53f74a5c..5c3b3683 100644 --- a/README.md +++ b/README.md @@ -549,6 +549,26 @@ an example of a customised grant type token request. The supplied `token request a new one is requested. There is a property `gid.connector.http.security.oidc.token.expiry.reduction`, that defaults to 1 second; new tokens will be requested if the current time is later than the cached token expiry time minus `gid.connector.http.security.oidc.token.expiry.reduction`. +## Logging the http content +Debug level logging has been added for class `com.getindata.connectors.http.internal.HttpLogger`. To enable this, alter the log4j properties. +This logging puts out log entries for the HTTP requests and responses. This can be useful for diagnostics to confirm that HTTP requests have been issued and what +that HTTP responses or an exception has occurred (for example connection Refused). + +Logging HTTP may not be appropriate for production systems; where sensitive information is not allowed into the logs. But in development environments it is useful +to be able to see HTTP content. Sensitive information can occur in the headers for example authentication tokens and passwords. Also the HTTP request and response bodies +could sensitive. The default minimal logging should be used in production. For development, you can specify config option `gid.connector.http.logging.level`. +This dictates the amount of content that debug logging will show around HTTP calls; the valid values are: + +| log level | Request method | URI | HTTP Body | Response status code | Headers | +|-------------|----------------|-----|-----------|----------------------|---------| +| MIN | Y | Y | N | Y | N | +| REQRESPONSE | Y | Y | Y | Y | N | +| MAX | Y | Y | Y | Y | Y | + +Notes: +- you can customize what is traced for lookups using the `gid.connector.http.source.lookup.request-callback`. +- where there is an N in the table the output is obfuscated. + ### Restrictions at this time * No authentication is applied to the token request. * The processing does not use the refresh token if it present. @@ -561,7 +581,7 @@ be requested if the current time is later than the cached token expiry time minu | connector | required | The Value should be set to _rest-lookup_ | | format | required | Flink's format name that should be used to decode REST response, Use `json` for a typical REST endpoint. | | url | required | The base URL that should be use for GET requests. For example _http://localhost:8080/client_ | -| asyncPolling | optional | true/false - determines whether Async Polling should be used. Mechanism is based on Flink's Async I/O. | +| gid.connector.http.logging.level | optional | Logging levels for HTTP content. Valid values are `MIN` (the default), `REQRESPONSE` and `MAX`. | | lookup-method | optional | GET/POST/PUT (and any other) - determines what REST method should be used for lookup REST query. If not specified, `GET` method will be used. | | lookup.cache | optional | Enum possible values: `NONE`, `PARTIAL`. The cache strategy for the lookup table. Currently supports `NONE` (no caching) and `PARTIAL` (caching entries on lookup operation in external API). | | lookup.partial-cache.max-rows | optional | The max number of rows of lookup cache, over this value, the oldest rows will be expired. `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. | @@ -609,6 +629,7 @@ be requested if the current time is later than the cached token expiry time minu | connector | required | Specify what connector to use. For HTTP Sink it should be set to _'http-sink'_. | | format | required | Specify what format to use. | | url | required | The base URL that should be use for HTTP requests. For example _http://localhost:8080/client_. | +| gid.connector.http.logging.level | optional | Logging levels for HTTP content. Valid values are `MIN` (the default), `REQRESPONSE` and `MAX`. | | insert-method | optional | Specify which HTTP method to use in the request. The value should be set either to `POST` or `PUT`. | | sink.batch.max-size | optional | Maximum number of elements that may be passed in a batch to be written downstream. | | sink.requests.max-inflight | optional | The maximum number of in flight requests that may exist, if any more in flight requests need to be initiated once the maximum has been reached, then it will be blocked until some have completed. | diff --git a/src/main/java/com/getindata/connectors/http/internal/HttpLogger.java b/src/main/java/com/getindata/connectors/http/internal/HttpLogger.java new file mode 100644 index 00000000..22dbdc44 --- /dev/null +++ b/src/main/java/com/getindata/connectors/http/internal/HttpLogger.java @@ -0,0 +1,115 @@ +package com.getindata.connectors.http.internal; + +import java.net.http.HttpHeaders; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.StringJoiner; + +import lombok.extern.slf4j.Slf4j; + +import com.getindata.connectors.http.internal.table.lookup.HttpLookupSourceRequestEntry; +import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_LOGGING_LEVEL; + +@Slf4j +public class HttpLogger { + + private final HttpLoggingLevelType httpLoggingLevelType; + + public static HttpLogger getHttpLogger(Properties properties) { + return new HttpLogger(properties); + } + + public void logRequest(HttpRequest httpRequest) { + log.debug(createStringForRequest(httpRequest)); + } + + public void logResponse(HttpResponse response) { + log.debug(createStringForResponse(response)); + } + + public void logRequestBody(String body) { + log.debug(createStringForBody(body)); + } + + public void logExceptionResponse(HttpLookupSourceRequestEntry request, Exception e) { + log.debug(createStringForExceptionResponse(request, e)); + } + + private HttpLogger(Properties properties) { + String code = (String) properties.get(HTTP_LOGGING_LEVEL); + this.httpLoggingLevelType = HttpLoggingLevelType.valueOfStr(code); + } + + String createStringForRequest(HttpRequest httpRequest) { + String headersForLog = getHeadersForLog(httpRequest.headers()); + return String.format("HTTP %s Request: URL: %s, Headers: %s", + httpRequest.method(), + httpRequest.uri().toString(), + headersForLog + ); + } + + private String getHeadersForLog(HttpHeaders httpHeaders) { + if (httpHeaders == null) return "None"; + Map> headersMap = httpHeaders.map(); + if (headersMap.isEmpty()) return "None"; + if (this.httpLoggingLevelType == HttpLoggingLevelType.MAX) { + StringJoiner headers = new StringJoiner(";"); + for (Map.Entry> reqHeaders : headersMap.entrySet()) { + StringJoiner values = new StringJoiner(";"); + for (String value : reqHeaders.getValue()) { + values.add(value); + } + String header = reqHeaders.getKey() + ":[" + values + "]"; + headers.add(header); + } + return headers.toString(); + } + return "***"; + } + + String createStringForResponse(HttpResponse response) { + String headersForLog = getHeadersForLog(response.headers()); + + String bodyForLog = "***"; + if (response.body() == null || response.body().isEmpty()) { + bodyForLog = "None"; + } else { + if (this.httpLoggingLevelType != HttpLoggingLevelType.MIN) { + bodyForLog = response.body().toString(); + } + } + return String.format("HTTP %s Response: URL: %s," + + " Response Headers: %s, status code: %s, Response Body: %s", + response.request().method(), + response.uri(), + headersForLog, + response.statusCode(), + bodyForLog + ); + } + + private String createStringForExceptionResponse(HttpLookupSourceRequestEntry request, Exception e) { + HttpRequest httpRequest = request.getHttpRequest(); + return String.format("HTTP %s Exception Response: URL: %s Exception %s", + httpRequest.method(), + httpRequest.uri(), + e + ); + } + + String createStringForBody(String body) { + String bodyForLog = "***"; + if (body == null || body.isEmpty()) { + bodyForLog = "None"; + } else { + if (this.httpLoggingLevelType != HttpLoggingLevelType.MIN) { + bodyForLog = body.toString(); + } + } + return bodyForLog; + } +} diff --git a/src/main/java/com/getindata/connectors/http/internal/HttpLoggingLevelType.java b/src/main/java/com/getindata/connectors/http/internal/HttpLoggingLevelType.java new file mode 100644 index 00000000..03a50944 --- /dev/null +++ b/src/main/java/com/getindata/connectors/http/internal/HttpLoggingLevelType.java @@ -0,0 +1,15 @@ +package com.getindata.connectors.http.internal; + +public enum HttpLoggingLevelType { + MIN, + REQRESPONSE, + MAX; + + public static HttpLoggingLevelType valueOfStr(String code) { + if (code == null) { + return MIN; + } else { + return valueOf(code); + } + } +} diff --git a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java index 67f4f013..1450c313 100644 --- a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java +++ b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java @@ -106,6 +106,9 @@ public final class HttpConnectorConfigConstants { public static final String SOURCE_PROXY_PASSWORD = SOURCE_LOOKUP_PREFIX + "proxy.password"; + public static final String HTTP_LOGGING_LEVEL = + GID_CONNECTOR_HTTP + "logging.level"; + public static final String SINK_HTTP_TIMEOUT_SECONDS = GID_CONNECTOR_HTTP + "sink.request.timeout"; @@ -118,6 +121,7 @@ public final class HttpConnectorConfigConstants { public static final String SINK_HTTP_WRITER_THREAD_POOL_SIZE = GID_CONNECTOR_HTTP + "sink.writer.thread-pool.size"; + // ----------------------------------------------------- diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/BatchRequestSubmitter.java b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/BatchRequestSubmitter.java index 19852f69..be8561dd 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/BatchRequestSubmitter.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/BatchRequestSubmitter.java @@ -58,18 +58,18 @@ public List> submit( } var responseFutures = new ArrayList>(); - String previousReqeustMethod = requestsToSubmit.get(0).method; + String previousRequestMethod = requestsToSubmit.get(0).method; List requestBatch = new ArrayList<>(httpRequestBatchSize); for (var entry : requestsToSubmit) { if (requestBatch.size() == httpRequestBatchSize - || !previousReqeustMethod.equalsIgnoreCase(entry.method)) { + || !previousRequestMethod.equalsIgnoreCase(entry.method)) { // break batch and submit responseFutures.add(sendBatch(endpointUrl, requestBatch)); requestBatch.clear(); } requestBatch.add(entry); - previousReqeustMethod = entry.method; + previousRequestMethod = entry.method; } // submit anything that left diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java index 7e4c19ff..a7b2ecbd 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java @@ -14,6 +14,7 @@ import com.getindata.connectors.http.HttpPostRequestCallback; import com.getindata.connectors.http.internal.HeaderPreprocessor; +import com.getindata.connectors.http.internal.HttpLogger; import com.getindata.connectors.http.internal.SinkHttpClient; import com.getindata.connectors.http.internal.SinkHttpClientResponse; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; @@ -40,6 +41,8 @@ public class JavaNetSinkHttpClient implements SinkHttpClient { private final RequestSubmitter requestSubmitter; + private final Properties properties; + public JavaNetSinkHttpClient( Properties properties, HttpPostRequestCallback httpPostRequestCallback, @@ -69,6 +72,7 @@ public JavaNetSinkHttpClient( properties, headersAndValues ); + this.properties = properties; } @Override @@ -98,10 +102,9 @@ private SinkHttpClientResponse prepareSinkHttpClientResponse( for (var response : responses) { var sinkRequestEntry = response.getHttpRequest(); var optResponse = response.getResponse(); - + HttpLogger.getHttpLogger(properties).logResponse(response.getResponse().get()); httpPostRequestCallback.call( optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap); - // TODO Add response processor here and orchestrate it with statusCodeChecker. if (optResponse.isEmpty() || statusCodeChecker.isErrorCode(optResponse.get().statusCode())) { diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/PerRequestSubmitter.java b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/PerRequestSubmitter.java index 7fd23ed1..12439bd8 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/PerRequestSubmitter.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/PerRequestSubmitter.java @@ -40,8 +40,7 @@ public List> submit( for (var entry : requestToSubmit) { HttpRequest httpRequest = buildHttpRequest(entry, endpointUri); - var response = httpClient - .sendAsync( + var response = httpClient.sendAsync( httpRequest.getHttpRequest(), HttpResponse.BodyHandlers.ofString()) .exceptionally(ex -> { diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/BodyBasedRequestFactory.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/BodyBasedRequestFactory.java index ae36be8f..08fc2300 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/BodyBasedRequestFactory.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/BodyBasedRequestFactory.java @@ -11,6 +11,7 @@ import com.getindata.connectors.http.LookupQueryCreator; import com.getindata.connectors.http.internal.HeaderPreprocessor; +import com.getindata.connectors.http.internal.HttpLogger; import com.getindata.connectors.http.internal.utils.uri.URIBuilder; /** @@ -21,6 +22,7 @@ public class BodyBasedRequestFactory extends RequestFactoryBase { private final String methodName; + private final HttpLookupConfig options; public BodyBasedRequestFactory( String methodName, @@ -30,6 +32,7 @@ public BodyBasedRequestFactory( super(lookupQueryCreator, headerPreprocessor, options); this.methodName = methodName.toUpperCase(); + this.options = options; } /** @@ -42,9 +45,13 @@ public BodyBasedRequestFactory( @Override protected Builder setUpRequestMethod(LookupQueryInfo lookupQueryInfo) { HttpRequest.Builder builder = super.setUpRequestMethod(lookupQueryInfo); + String body = lookupQueryInfo.getLookupQuery(); builder .uri(constructUri(lookupQueryInfo)) - .method(methodName, BodyPublishers.ofString(lookupQueryInfo.getLookupQuery())); + .method(methodName, BodyPublishers.ofString(body)); + // we do not view the lookup keys as sensitive; therefore the request body is + // not obfuscated. + HttpLogger.getHttpLogger(options.getProperties()).logRequestBody(body); return builder; } diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java index c3524881..51270522 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java @@ -5,6 +5,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import com.getindata.connectors.http.internal.HttpLoggingLevelType; import com.getindata.connectors.http.internal.retry.RetryStrategyType; import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.*; @@ -129,6 +130,22 @@ public class HttpLookupConnectorOptions { .defaultValue(RetryStrategyType.FIXED_DELAY.getCode()) .withDescription("Auto retry strategy type: fixed-delay (default) or exponential-delay."); + public static final ConfigOption LOGGING_LEVEL_FOR_HTTP = + ConfigOptions.key(HTTP_LOGGING_LEVEL) + .stringType() + .defaultValue(String.valueOf(HttpLoggingLevelType.MIN)) + .withDescription("VALID values are " + HttpLoggingLevelType.MIN.name() + ", " + + HttpLoggingLevelType.REQRESPONSE.name() + " and " + HttpLoggingLevelType.MAX.name() + + ". This dictates the amount of content that the debug logging will show around HTTP calls." + + " This logging will be issued before HTTP requests and on receipt of responses, so you can see" + + " diagnostically when the HTTP calls were made." + + " HTTP calls and responses can contain sensitive information, so by default the responses" + + " and header information is not logged. This minimal logging is default and corresponds to a config" + + " value of " + HttpLoggingLevelType.MIN.name() + ". If you are not in secure environment would like" + + " to see the HTTP request and response bodies in the log then specify " + + HttpLoggingLevelType.REQRESPONSE.name() + ". If you would also like to see the header values," + + " specify " + HttpLoggingLevelType.MAX.name() + "."); + public static final ConfigOption SOURCE_LOOKUP_HTTP_SUCCESS_CODES = ConfigOptions.key(SOURCE_RETRY_SUCCESS_CODES) .stringType() diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java index 81a75674..9da658a1 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java @@ -31,6 +31,7 @@ import com.getindata.connectors.http.HttpPostRequestCallback; import com.getindata.connectors.http.HttpStatusCodeValidationFailedException; import com.getindata.connectors.http.internal.HeaderPreprocessor; +import com.getindata.connectors.http.internal.HttpLogger; import com.getindata.connectors.http.internal.PollingClient; import com.getindata.connectors.http.internal.retry.HttpClientWithRetry; import com.getindata.connectors.http.internal.retry.RetryConfigProvider; @@ -63,6 +64,7 @@ public class JavaNetHttpPollingClient implements PollingClient { private final HttpLookupConfig options; private final Set ignoredErrorCodes; private final boolean continueOnError; + private final HttpLogger httpLogger; public JavaNetHttpPollingClient( HttpClient httpClient, @@ -89,6 +91,7 @@ public JavaNetHttpPollingClient( .retryConfig(RetryConfigProvider.create(config)) .responseChecker(new HttpResponseChecker(successCodes, errorCodes)) .build(); + this.httpLogger = HttpLogger.getHttpLogger(options.getProperties()); } public void open(FunctionContext context) { @@ -118,15 +121,20 @@ private HttpRowDataWrapper queryAndProcess(RowData lookupData) throws Exception HttpResponse response =null; HttpRowDataWrapper httpRowDataWrapper = null; try { + httpLogger.logRequest(request.getHttpRequest()); response = httpClient.send( () -> updateHttpRequestIfRequired(request, oidcProcessor), BodyHandlers.ofString()); + httpLogger.logResponse(response); } catch (HttpStatusCodeValidationFailedException e) { + // log if we fail for status code reasons. + httpLogger.logResponse((HttpResponse) e.getResponse()); // Case 1 http non successful response if (!this.continueOnError) throw e; // use the response in the Exception response = (HttpResponse) e.getResponse(); httpRowDataWrapper = processHttpResponse(response, request, true); } catch (Exception e) { + httpLogger.logExceptionResponse(request, e); // Case 2 Exception occurred if (!this.continueOnError) throw e; String errMessage = e.getMessage(); @@ -210,9 +218,9 @@ private HttpRowDataWrapper processHttpResponse( boolean isError) throws IOException { this.httpPostRequestCallback.call(response, request, "endpoint", Collections.emptyMap()); + this.httpLogger.logResponse(response); var responseBody = response.body(); - log.debug("Received status code [{}] for RestTableSource request", response.statusCode()); if (!isError && (StringUtils.isNullOrWhitespaceOnly(responseBody) || ignoreResponse(response))) { return HttpRowDataWrapper.builder() .data(Collections.emptyList()) diff --git a/src/main/java/com/getindata/connectors/http/internal/table/sink/Slf4jHttpPostRequestCallback.java b/src/main/java/com/getindata/connectors/http/internal/table/sink/Slf4jHttpPostRequestCallback.java index 65d6a61c..fc3b3220 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/sink/Slf4jHttpPostRequestCallback.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/sink/Slf4jHttpPostRequestCallback.java @@ -25,12 +25,6 @@ public void call( HttpRequest requestEntry, String endpointUrl, Map headerMap) { - - // Uncomment if you want to see the requestBody in the log - //String requestBody = requestEntry.getElements().stream() - // .map(element -> new String(element, StandardCharsets.UTF_8)) - // .collect(Collectors.joining()); - if (response == null) { log.info( "Got response for a request.\n Request:\n " + diff --git a/src/test/java/com/getindata/connectors/http/internal/HttpLoggerRequestTest.java b/src/test/java/com/getindata/connectors/http/internal/HttpLoggerRequestTest.java new file mode 100644 index 00000000..8623fd03 --- /dev/null +++ b/src/test/java/com/getindata/connectors/http/internal/HttpLoggerRequestTest.java @@ -0,0 +1,127 @@ +package com.getindata.connectors.http.internal; + +import java.net.URI; +import java.net.URISyntaxException; +import java.net.http.HttpRequest; +import java.util.Collection; +import java.util.Properties; + +import com.google.common.collect.ImmutableList; +import lombok.Data; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import static org.assertj.core.api.Assertions.assertThat; + +import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; + +public class HttpLoggerRequestTest { + @Test + void testCreateStringForBody() { + } + + @ParameterizedTest + @MethodSource("configProvider") + void testCreateStringForRequest(HttpLoggerRequestTest.TestSpec testSpec) throws URISyntaxException { + + Properties properties = new Properties(); + if (testSpec.httpLoggingLevelType != null) { + properties.put(HttpConnectorConfigConstants.HTTP_LOGGING_LEVEL, testSpec.getHttpLoggingLevelType().name()); + } + + HttpLogger httpLogger = HttpLogger.getHttpLogger(properties); + URI uri = new URI("http://aaa"); + HttpRequest.Builder httpRequestBuilder = HttpRequest.newBuilder().uri(uri); + if (testSpec.isHasHeaders()) { + httpRequestBuilder.headers("bbb","ccc","bbb","ddd","eee","fff"); + } + if (testSpec.getMethod().equals("POST")) { + if (testSpec.isHasBody()) { + httpRequestBuilder.method("POST", HttpRequest.BodyPublishers.ofString("my body")); + } else { + httpRequestBuilder.method("POST", HttpRequest.BodyPublishers.noBody()); + } + } + assertThat(httpLogger.createStringForRequest(httpRequestBuilder.build())) + .isEqualTo(testSpec.getExpectedOutput()); + + } + + @Data + static class TestSpec { + final HttpLoggingLevelType httpLoggingLevelType; + final String method; + final boolean hasBody; + final boolean hasHeaders; + final String expectedOutput; + } + + static Collection configProvider() { + return ImmutableList.builder() + .addAll(getTestSpecs()) + .build(); + } + + private static ImmutableList getTestSpecs() { + return ImmutableList.of( + // GET no headers + new TestSpec(null, "GET", false, false, + "HTTP GET Request: URL: http://aaa, Headers: None"), + new TestSpec(HttpLoggingLevelType.MIN, "GET", false, false, + "HTTP GET Request: URL: http://aaa, Headers: None"), + new TestSpec(HttpLoggingLevelType.REQRESPONSE, "GET", false, false, + "HTTP GET Request: URL: http://aaa, Headers: None"), + new TestSpec(HttpLoggingLevelType.MAX, "GET", false, false, + "HTTP GET Request: URL: http://aaa, Headers: None"), + // GET with headers + new TestSpec(null, "GET", false, true, + "HTTP GET Request: URL: http://aaa, Headers: ***"), + new TestSpec(HttpLoggingLevelType.MIN, "GET", false, true, + "HTTP GET Request: URL: http://aaa, Headers: ***"), + new TestSpec(HttpLoggingLevelType.REQRESPONSE, "GET", false, true, + "HTTP GET Request: URL: http://aaa, Headers: ***"), + new TestSpec(HttpLoggingLevelType.MAX, "GET", false, true, + "HTTP GET Request: URL: http://aaa, Headers: bbb:[ccc;ddd];eee:[fff]"), + + // POST no headers + new TestSpec(null, "POST", false, false, + "HTTP POST Request: URL: http://aaa, Headers: None"), + new TestSpec(HttpLoggingLevelType.MIN, "POST", false, false, + "HTTP POST Request: URL: http://aaa, Headers: None"), + new TestSpec(HttpLoggingLevelType.REQRESPONSE, "POST", false, false, + "HTTP POST Request: URL: http://aaa, Headers: None"), + new TestSpec(HttpLoggingLevelType.MAX, "POST", false, false, + "HTTP POST Request: URL: http://aaa, Headers: None"), + // POST with headers + new TestSpec(null, "POST", false, true, + "HTTP POST Request: URL: http://aaa, Headers: ***"), + new TestSpec(HttpLoggingLevelType.MIN, "POST", false, true, + "HTTP POST Request: URL: http://aaa, Headers: ***"), + new TestSpec(HttpLoggingLevelType.REQRESPONSE, "POST", false, true, + "HTTP POST Request: URL: http://aaa, Headers: ***"), + new TestSpec(HttpLoggingLevelType.MAX, "POST", false, true, + "HTTP POST Request: URL: http://aaa, Headers: bbb:[ccc;ddd];eee:[fff]"), + + // POST no headers with body + new TestSpec(null, "POST", true, false, + "HTTP POST Request: URL: http://aaa, Headers: None"), + new TestSpec(HttpLoggingLevelType.MIN, "POST", true, false, + "HTTP POST Request: URL: http://aaa, Headers: None"), + new TestSpec(HttpLoggingLevelType.REQRESPONSE, "POST", true, false, + "HTTP POST Request: URL: http://aaa, Headers: None"), + new TestSpec(HttpLoggingLevelType.MAX, "POST", true, false, + "HTTP POST Request: URL: http://aaa, Headers: None"), + // POST with headers with body + new TestSpec(null, "POST", true, true, + "HTTP POST Request: URL: http://aaa, Headers: ***"), + new TestSpec(HttpLoggingLevelType.MIN, "POST", true, true, + "HTTP POST Request: URL: http://aaa, Headers: ***"), + new TestSpec(HttpLoggingLevelType.REQRESPONSE, "POST", true, true, + "HTTP POST Request: URL: http://aaa, Headers: ***"), + new TestSpec(HttpLoggingLevelType.MAX, "POST", true, true, + "HTTP POST Request: URL: http://aaa, Headers: bbb:[ccc;ddd];eee:[fff]") + + ); + } + +} diff --git a/src/test/java/com/getindata/connectors/http/internal/HttpLoggerResponseTest.java b/src/test/java/com/getindata/connectors/http/internal/HttpLoggerResponseTest.java new file mode 100644 index 00000000..bddd111b --- /dev/null +++ b/src/test/java/com/getindata/connectors/http/internal/HttpLoggerResponseTest.java @@ -0,0 +1,211 @@ +package com.getindata.connectors.http.internal; + +import java.net.URI; +import java.net.URISyntaxException; +import java.net.http.HttpClient; +import java.net.http.HttpHeaders; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import javax.net.ssl.SSLSession; + +import com.google.common.collect.ImmutableList; +import lombok.Data; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import static org.assertj.core.api.Assertions.assertThat; + +import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; + +public class HttpLoggerResponseTest { + @ParameterizedTest + @MethodSource("configProvider") + void testCreateStringForResponse(HttpLoggerResponseTest.TestSpec testSpec) throws URISyntaxException { + + Properties properties = new Properties(); + if (testSpec.httpLoggingLevelType != null) { + properties.put(HttpConnectorConfigConstants.HTTP_LOGGING_LEVEL, testSpec.getHttpLoggingLevelType().name()); + } + + HttpLogger httpLogger = HttpLogger.getHttpLogger(properties); + URI uri = new URI("http://aaa"); + MockHttpResponse response = new MockHttpResponse(); + response.setStatusCode(testSpec.getStatusCode()); + if (testSpec.method.equals("GET")) { + response.setRequest(HttpRequest.newBuilder().GET().uri(uri).build()); + } else { + // dummy request so we can populate the method in the log + response.setRequest(HttpRequest.newBuilder().POST(HttpRequest.BodyPublishers.noBody()).uri(uri).build()); + } + + if (testSpec.isHasHeaders()) { + // "bbb","ccc","bbb","ddd","eee","fff" + Map> headersMap = new HashMap<>(); + headersMap.put("bbb", List.of("ccc", "ddd")); + headersMap.put("eee", List.of("fff")); + + HttpHeaders headers = HttpHeaders.of(headersMap, (name, value) -> true); + response.setHeaders(headers); + } + + if (testSpec.isHasBody()) { + response.setBody("my body"); + } + + assertThat(httpLogger.createStringForResponse(response)).isEqualTo(testSpec.getExpectedOutput()); + + } + + @Data + static class TestSpec { + final String method; + final int statusCode; + final HttpLoggingLevelType httpLoggingLevelType; + final boolean hasBody; + final boolean hasHeaders; + final String expectedOutput; + } + + static Collection configProvider() { + return ImmutableList.builder() + .addAll(getTestSpecs("GET", 200)) + .addAll(getTestSpecs("POST", 500)) + .build(); + } + + private static ImmutableList getTestSpecs(String method, int statusCode) { + return ImmutableList.of( + // no headers no body + new TestSpec(method, statusCode, null, false, false, + "HTTP " + method + " Response: URL: http://aaa, " + + "Response Headers: None, status code: " + statusCode + ", Response Body: None"), + new TestSpec(method, statusCode, HttpLoggingLevelType.MIN, false, false, + "HTTP " + method + " Response: URL: http://aaa, " + + "Response Headers: None, status code: " + statusCode + ", Response Body: None"), + new TestSpec(method, statusCode, HttpLoggingLevelType.REQRESPONSE, false, false, + "HTTP " + method + " Response: URL: http://aaa, " + + "Response Headers: None, status code: " + statusCode + ", Response Body: None"), + new TestSpec(method, statusCode, HttpLoggingLevelType.MAX, false, false, + "HTTP " + method + " Response: URL: http://aaa, " + + "Response Headers: None, status code: " + statusCode + ", Response Body: None"), + // with headers + + new TestSpec(method, statusCode, null, false, true, + "HTTP " + method + " Response: URL: http://aaa, " + + "Response Headers: ***, status code: " + statusCode + ", Response Body: None"), + new TestSpec(method, statusCode, HttpLoggingLevelType.MIN, false, true, + "HTTP " + method + " Response: URL: http://aaa, " + + "Response Headers: ***, status code: " + statusCode + ", Response Body: None"), + new TestSpec(method, statusCode, HttpLoggingLevelType.REQRESPONSE, false, true, + "HTTP " + method + " Response: URL: http://aaa, " + + "Response Headers: ***, status code: " + statusCode + ", Response Body: None"), + new TestSpec(method, statusCode, HttpLoggingLevelType.MAX, false, true, + "HTTP " + method + " Response: URL: http://aaa, " + + "Response Headers: bbb:[ccc;ddd];eee:[fff], status code: " + statusCode + ", " + + "Response Body: None"), + + // no headers with body + new TestSpec(method, statusCode, null, true, false, + "HTTP " + method + " Response: URL: http://aaa, " + + "Response Headers: None, status code: " + statusCode + ", Response Body: ***"), + new TestSpec(method, statusCode, HttpLoggingLevelType.MIN, true, false, + "HTTP " + method + " Response: URL: http://aaa, " + + "Response Headers: None, status code: " + statusCode + ", Response Body: ***"), + new TestSpec(method, statusCode, HttpLoggingLevelType.REQRESPONSE, true, false, + "HTTP " + method + " Response: URL: http://aaa, " + + "Response Headers: None, status code: " + statusCode + ", Response Body: my body"), + new TestSpec(method, statusCode, HttpLoggingLevelType.MAX, true, false, + "HTTP " + method + " Response: URL: http://aaa, " + + "Response Headers: None, status code: " + statusCode + ", Response Body: my body"), + + // headers with body + new TestSpec(method, statusCode, null, true, true, + "HTTP " + method + " Response: URL: http://aaa, Response Headers: ***" + + ", status code: " + statusCode + ", Response Body: ***"), + new TestSpec(method, statusCode, HttpLoggingLevelType.MIN, true, true, + "HTTP " + method + " Response: URL: http://aaa, Response Headers: ***" + + ", status code: " + statusCode + ", Response Body: ***"), + new TestSpec(method, statusCode, HttpLoggingLevelType.REQRESPONSE, true, true, + "HTTP " + method + " Response: URL: http://aaa, Response Headers: ***" + + ", status code: " + statusCode + ", Response Body: my body"), + new TestSpec(method, statusCode, HttpLoggingLevelType.MAX, true, true, + "HTTP " + method + " Response: URL: http://aaa, " + + "Response Headers: bbb:[ccc;ddd];eee:[fff]" + + ", status code: " + statusCode + ", Response Body: my body") + ); + } + + private class MockHttpResponse implements HttpResponse { + private int statusCode = 0; + private HttpRequest request = null; + private HttpHeaders headers = null; + private String body = null; + + @Override + public int statusCode() { + return this.statusCode; + } + + @Override + public HttpRequest request() { + return this.request; + } + + @Override + public Optional previousResponse() { + return Optional.empty(); + } + + @Override + public HttpHeaders headers() { + return this.headers; + } + + @Override + public Object body() { + return this.body; + } + + @Override + public Optional sslSession() { + return Optional.empty(); + } + + @Override + public URI uri() { + URI uri; + try { + uri = new URI("http://aaa"); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + return uri; + } + + @Override + public HttpClient.Version version() { + return null; + } + + public void setStatusCode(int statusCode) { + this.statusCode = statusCode; + } + + public void setRequest(HttpRequest request) { + this.request = request; + } + + public void setHeaders(HttpHeaders headers) { + this.headers = headers; + } + + public void setBody(String body) { + this.body = body; + } + } +} diff --git a/src/test/resources/simpleLogger.properties b/src/test/resources/simpleLogger.properties index 3e7cd429..dc9dd60c 100644 --- a/src/test/resources/simpleLogger.properties +++ b/src/test/resources/simpleLogger.properties @@ -1,2 +1,2 @@ org.slf4j.simpleLogger.defaultLogLevel=INFO -org.slf4j.simpleLogger.log.com.getindata.connectors.http.internal.table.lookup.RequestAndResponseLogger=DEBUG +com.getindata.connectors.http.internal.HttpLogger=DEBUG From ba4b32483654f2ef2eeeda0bbe364e96242ae650 Mon Sep 17 00:00:00 2001 From: davidradl Date: Tue, 18 Nov 2025 14:20:24 +0000 Subject: [PATCH 2/3] http187 correct some variable names Signed-off-by: davidradl --- .../sink/httpclient/BatchRequestSubmitter.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/BatchRequestSubmitter.java b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/BatchRequestSubmitter.java index be8561dd..a5806733 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/BatchRequestSubmitter.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/BatchRequestSubmitter.java @@ -84,9 +84,9 @@ int getBatchSize() { private CompletableFuture sendBatch( String endpointUrl, - List reqeustBatch) { + List requestBatch) { - HttpRequest httpRequest = buildHttpRequest(reqeustBatch, URI.create(endpointUrl)); + HttpRequest httpRequest = buildHttpRequest(requestBatch, URI.create(endpointUrl)); return httpClient .sendAsync( httpRequest.getHttpRequest(), @@ -102,11 +102,11 @@ private CompletableFuture sendBatch( ); } - private HttpRequest buildHttpRequest(List reqeustBatch, URI endpointUri) { + private HttpRequest buildHttpRequest(List requestBatch, URI endpointUri) { try { - var method = reqeustBatch.get(0).method; - List elements = new ArrayList<>(reqeustBatch.size()); + var method = requestBatch.get(0).method; + List elements = new ArrayList<>(requestBatch.size()); BodyPublisher publisher; // By default, Java's BodyPublishers.ofByteArrays(elements) will just put Jsons @@ -114,7 +114,7 @@ private HttpRequest buildHttpRequest(List reqeustBatch, UR // What we do here is we pack every Json/byteArray into Json Array hence '[' and ']' // at the end, and we separate every element with comma. elements.add(BATCH_START_BYTES); - for (HttpSinkRequestEntry entry : reqeustBatch) { + for (HttpSinkRequestEntry entry : requestBatch) { elements.add(entry.element); elements.add(BATCH_ELEMENT_DELIM_BYTES); } From 08c19b76018e78555c0894e22a5327cb331dafab Mon Sep 17 00:00:00 2001 From: davidradl Date: Wed, 19 Nov 2025 14:15:50 +0000 Subject: [PATCH 3/3] http187 fixup Signed-off-by: davidradl --- README.md | 1 + .../http/internal/sink/httpclient/PerRequestSubmitter.java | 1 + .../http/internal/table/lookup/BodyBasedRequestFactory.java | 2 -- 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 5c3b3683..fe3d8b7a 100644 --- a/README.md +++ b/README.md @@ -581,6 +581,7 @@ Notes: | connector | required | The Value should be set to _rest-lookup_ | | format | required | Flink's format name that should be used to decode REST response, Use `json` for a typical REST endpoint. | | url | required | The base URL that should be use for GET requests. For example _http://localhost:8080/client_ | +| asyncPolling | optional | true/false - determines whether Async Polling should be used. Mechanism is based on Flink's Async I/O. | | gid.connector.http.logging.level | optional | Logging levels for HTTP content. Valid values are `MIN` (the default), `REQRESPONSE` and `MAX`. | | lookup-method | optional | GET/POST/PUT (and any other) - determines what REST method should be used for lookup REST query. If not specified, `GET` method will be used. | | lookup.cache | optional | Enum possible values: `NONE`, `PARTIAL`. The cache strategy for the lookup table. Currently supports `NONE` (no caching) and `PARTIAL` (caching entries on lookup operation in external API). | diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/PerRequestSubmitter.java b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/PerRequestSubmitter.java index 12439bd8..14ff6cac 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/PerRequestSubmitter.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/PerRequestSubmitter.java @@ -28,6 +28,7 @@ public PerRequestSubmitter( HttpClient httpClient) { super(properties, headersAndValues, httpClient); + } @Override diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/BodyBasedRequestFactory.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/BodyBasedRequestFactory.java index 08fc2300..e979d6f3 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/BodyBasedRequestFactory.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/BodyBasedRequestFactory.java @@ -49,8 +49,6 @@ protected Builder setUpRequestMethod(LookupQueryInfo lookupQueryInfo) { builder .uri(constructUri(lookupQueryInfo)) .method(methodName, BodyPublishers.ofString(body)); - // we do not view the lookup keys as sensitive; therefore the request body is - // not obfuscated. HttpLogger.getHttpLogger(options.getProperties()).logRequestBody(body); return builder; }