diff --git a/src/main/java/com/getindata/connectors/http/SchemaLifecycleAwareElementConverter.java b/src/main/java/com/getindata/connectors/http/SchemaLifecycleAwareElementConverter.java
index c4986bac..234d6ae6 100644
--- a/src/main/java/com/getindata/connectors/http/SchemaLifecycleAwareElementConverter.java
+++ b/src/main/java/com/getindata/connectors/http/SchemaLifecycleAwareElementConverter.java
@@ -1,12 +1,12 @@
package com.getindata.connectors.http;
import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
-import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
/**
- * An enhancement for Flink's {@link ElementConverter} that expose {@link #open(InitContext)} method
- * that will be called by HTTP connect code to ensure that element converter is initialized
+ * An enhancement for Flink's {@link ElementConverter} that expose {@link #open(WriterInitContext)}
+ * method that will be called by HTTP connect code to ensure that element converter is initialized
* properly. This is required for cases when Flink's SerializationSchema and DeserializationSchema
* objects like JsonRowDataSerializationSchema are used.
*
@@ -29,6 +29,6 @@ public interface SchemaLifecycleAwareElementConverter
*
* @param context Contextual information that can be used during initialization.
*/
- void open(InitContext context);
+ void open(WriterInitContext context);
}
diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java
index de37faac..e42be88b 100644
--- a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java
+++ b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java
@@ -5,9 +5,12 @@
import java.util.Collections;
import java.util.Properties;
+import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
@@ -115,7 +118,7 @@ protected HttpSinkInternal(
@Override
public StatefulSinkWriter> createWriter(
- InitContext context) throws IOException {
+ WriterInitContext context) throws IOException {
ElementConverter elementConverter = getElementConverter();
if (elementConverter instanceof SchemaLifecycleAwareElementConverter) {
@@ -126,12 +129,7 @@ public StatefulSinkWriter> cr
return new HttpSinkWriter<>(
elementConverter,
context,
- getMaxBatchSize(),
- getMaxInFlightRequests(),
- getMaxBufferedRequests(),
- getMaxBatchSizeInBytes(),
- getMaxTimeInBufferMS(),
- getMaxRecordSizeInBytes(),
+ getAsyncSinkWriterConfiguration(),
endpointUrl,
sinkHttpClientBuilder.build(
properties,
@@ -146,19 +144,14 @@ public StatefulSinkWriter> cr
@Override
public StatefulSinkWriter> restoreWriter(
- InitContext context,
+ WriterInitContext context,
Collection> recoveredState)
throws IOException {
return new HttpSinkWriter<>(
getElementConverter(),
context,
- getMaxBatchSize(),
- getMaxInFlightRequests(),
- getMaxBufferedRequests(),
- getMaxBatchSizeInBytes(),
- getMaxTimeInBufferMS(),
- getMaxRecordSizeInBytes(),
+ getAsyncSinkWriterConfiguration(),
endpointUrl,
sinkHttpClientBuilder.build(
properties,
@@ -171,6 +164,17 @@ public StatefulSinkWriter> re
);
}
+ private AsyncSinkWriterConfiguration getAsyncSinkWriterConfiguration() {
+ return AsyncSinkWriterConfiguration.builder()
+ .setMaxBatchSize(getMaxBatchSize())
+ .setMaxBatchSizeInBytes(getMaxBatchSizeInBytes())
+ .setMaxInFlightRequests(getMaxInFlightRequests())
+ .setMaxBufferedRequests(getMaxBufferedRequests())
+ .setMaxTimeInBufferMS(getMaxTimeInBufferMS())
+ .setMaxRecordSizeInBytes(getMaxRecordSizeInBytes())
+ .build();
+ }
+
@Override
public SimpleVersionedSerializer>
getWriterStateSerializer() {
diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java
index d17e9213..17ad21af 100644
--- a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java
+++ b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java
@@ -1,18 +1,18 @@
package com.getindata.connectors.http.internal.sink;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
-import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.ResultHandler;
+import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -47,20 +47,13 @@ public class HttpSinkWriter extends AsyncSinkWriter elementConverter,
- Sink.InitContext context,
- int maxBatchSize,
- int maxInFlightRequests,
- int maxBufferedRequests,
- long maxBatchSizeInBytes,
- long maxTimeInBufferMS,
- long maxRecordSizeInBytes,
+ WriterInitContext context,
+ AsyncSinkWriterConfiguration writerConfiguration,
String endpointUrl,
SinkHttpClient sinkHttpClient,
Collection> bufferedRequestStates,
Properties properties) {
-
- super(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests,
- maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, bufferedRequestStates);
+ super(elementConverter, context, writerConfiguration, bufferedRequestStates);
this.endpointUrl = endpointUrl;
this.sinkHttpClient = sinkHttpClient;
@@ -83,7 +76,7 @@ public HttpSinkWriter(
@Override
protected void submitRequestEntries(
List requestEntries,
- Consumer> requestResult) {
+ ResultHandler resultHandler) {
var future = sinkHttpClient.putRequests(requestEntries, endpointUrl);
future.whenCompleteAsync((response, err) -> {
if (err != null) {
@@ -114,7 +107,7 @@ protected void submitRequestEntries(
//requestResult.accept(Collections.emptyList());
//}
}
- requestResult.accept(Collections.emptyList());
+ resultHandler.complete();
}, sinkWriterThreadPool);
}
diff --git a/src/main/java/com/getindata/connectors/http/internal/table/SerializationSchemaElementConverter.java b/src/main/java/com/getindata/connectors/http/internal/table/SerializationSchemaElementConverter.java
index 5fb00719..dc41ce26 100644
--- a/src/main/java/com/getindata/connectors/http/internal/table/SerializationSchemaElementConverter.java
+++ b/src/main/java/com/getindata/connectors/http/internal/table/SerializationSchemaElementConverter.java
@@ -1,8 +1,8 @@
package com.getindata.connectors.http.internal.table;
import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.connector.sink2.Sink.InitContext;
import org.apache.flink.api.connector.sink2.SinkWriter.Context;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.FlinkRuntimeException;
@@ -27,7 +27,7 @@ public SerializationSchemaElementConverter(
}
@Override
- public void open(InitContext context) {
+ public void open(WriterInitContext context) {
if (!schemaOpened) {
try {
serializationSchema.open(context.asSerializationSchemaInitializationContext());
diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java
index 4338aaeb..9fc6136d 100644
--- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java
+++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java
@@ -115,7 +115,7 @@ private HttpRowDataWrapper queryAndProcess(RowData lookupData) throws Exception
var request = requestFactory.buildLookupRequest(lookupData);
var oidcProcessor = HttpHeaderUtils.createOIDCHeaderPreprocessor(options.getReadableConfig());
- HttpResponse response =null;
+ HttpResponse response = null;
HttpRowDataWrapper httpRowDataWrapper = null;
try {
response = httpClient.send(
@@ -129,7 +129,7 @@ private HttpRowDataWrapper queryAndProcess(RowData lookupData) throws Exception
} catch (Exception e) {
// Case 2 Exception occurred
if (!this.continueOnError) throw e;
- String errMessage = e.getMessage();
+ String errMessage = e.getMessage();
// some exceptions do not have messages including the java.net.ConnectException we can get here if
// the connection is bad.
if (errMessage == null) {
@@ -141,7 +141,7 @@ private HttpRowDataWrapper queryAndProcess(RowData lookupData) throws Exception
.httpCompletionState(HttpCompletionState.EXCEPTION)
.build();
}
- if (httpRowDataWrapper == null) {
+ if (httpRowDataWrapper == null) {
// Case 3 Successful path.
httpRowDataWrapper = processHttpResponse(response, request, false);
}
@@ -152,7 +152,8 @@ private HttpRowDataWrapper queryAndProcess(RowData lookupData) throws Exception
/**
* If using OIDC, update the http request using the oidc header pre processor to supply the
* authentication header, with a short lived bearer token.
- * @param request http reauest to amend
+ *
+ * @param request http reauest to amend
* @param oidcHeaderPreProcessor OIDC header pre processor
* @return http request, which for OIDC will have the bearer token as the authentication header
*/
@@ -176,8 +177,8 @@ protected HttpRequest updateHttpRequestIfRequired(HttpLookupSourceRequestEntry r
Map headerMap = new HashMap<>();
if (httpRequest.headers() != null && !httpRequest.headers().map().isEmpty()) {
for (Map.Entry> header
- :httpRequest.headers().map().entrySet()) {
- List values = header.getValue();
+ : httpRequest.headers().map().entrySet()) {
+ List values = header.getValue();
if (values.size() == 1) {
headerMap.put(header.getKey(), header.getValue().get(0));
}
@@ -187,7 +188,7 @@ protected HttpRequest updateHttpRequestIfRequired(HttpLookupSourceRequestEntry r
Optional oidcTokenRequest = readableConfig
.getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST);
String bearerToken = oidcHeaderPreProcessor.preprocessValueForHeader(
- HttpHeaderUtils.AUTHORIZATION, oidcTokenRequest.get());
+ HttpHeaderUtils.AUTHORIZATION, oidcTokenRequest.get());
headerMap.put(HttpHeaderUtils.AUTHORIZATION, bearerToken);
String[] headerAndValueArray = HttpHeaderUtils.toHeaderAndValueArray(headerMap);
builder.headers(headerAndValueArray);
@@ -198,10 +199,11 @@ protected HttpRequest updateHttpRequestIfRequired(HttpLookupSourceRequestEntry r
/**
* Process the http response.
+ *
* @param response http response
- * @param request http request
- * @param isError whether the http response is an error (i.e. not successful after the retry
- * processing and accounting for the config)
+ * @param request http request
+ * @param isError whether the http response is an error (i.e. not successful after the retry
+ * processing and accounting for the config)
* @return HttpRowDataWrapper http row information and http error information
*/
private HttpRowDataWrapper processHttpResponse(
@@ -229,7 +231,7 @@ private HttpRowDataWrapper processHttpResponse(
.build();
} else {
Collection rowData = Collections.emptyList();
- HttpCompletionState httpCompletionState= HttpCompletionState.SUCCESS;
+ HttpCompletionState httpCompletionState = HttpCompletionState.SUCCESS;
String errMessage = null;
try {
rowData = deserialize(responseBody);
@@ -243,7 +245,7 @@ private HttpRowDataWrapper processHttpResponse(
.errorMessage(errMessage)
.httpHeadersMap(response.headers().map())
.httpStatusCode(response.statusCode())
- .httpCompletionState( httpCompletionState)
+ .httpCompletionState(httpCompletionState)
.build();
}
}
@@ -257,32 +259,32 @@ HttpRequestFactory getRequestFactory() {
private Collection deserialize(String responseBody) throws IOException {
byte[] rawBytes = responseBody.getBytes();
String resultType =
- options.getProperties().getProperty(RESULT_TYPE, RESULT_TYPE_SINGLE_VALUE);
+ options.getProperties().getProperty(RESULT_TYPE, RESULT_TYPE_SINGLE_VALUE);
if (resultType.equals(RESULT_TYPE_SINGLE_VALUE)) {
return deserializeSingleValue(rawBytes);
} else if (resultType.equals(RESULT_TYPE_ARRAY)) {
return deserializeArray(rawBytes);
} else {
throw new IllegalStateException(
- String.format("Unknown lookup source result type '%s'.", resultType));
+ String.format("Unknown lookup source result type '%s'.", resultType));
}
}
private List deserializeSingleValue(byte[] rawBytes) throws IOException {
return Optional.ofNullable(responseBodyDecoder.deserialize(rawBytes))
- .map(Collections::singletonList)
- .orElse(Collections.emptyList());
+ .map(Collections::singletonList)
+ .orElse(Collections.emptyList());
}
private List deserializeArray(byte[] rawBytes) throws IOException {
List rawObjects =
- objectMapper.readValue(rawBytes, new TypeReference<>() {
- });
+ objectMapper.readValue(rawBytes, new TypeReference<>() {
+ });
List result = new ArrayList<>();
for (JsonNode rawObject : rawObjects) {
if (!(rawObject instanceof NullNode)) {
RowData deserialized =
- responseBodyDecoder.deserialize(rawObject.toString().getBytes());
+ responseBodyDecoder.deserialize(rawObject.toString().getBytes());
// deserialize() returns null if deserialization fails
if (deserialized != null) {
result.add(deserialized);
diff --git a/src/test/java/com/getindata/StreamTableJob.java b/src/test/java/com/getindata/StreamTableJob.java
index 95fea0f5..c55ae76b 100644
--- a/src/test/java/com/getindata/StreamTableJob.java
+++ b/src/test/java/com/getindata/StreamTableJob.java
@@ -1,10 +1,13 @@
package com.getindata;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.java.utils.ParameterTool;
+import java.time.Duration;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.util.ParameterTool;
public class StreamTableJob {
@@ -13,9 +16,13 @@ public static void main(String[] args) {
ParameterTool parameters = ParameterTool.fromSystemProperties();
parameters = parameters.mergeWith(ParameterTool.fromArgs(args));
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ Configuration config = new Configuration();
+ config.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
+ config.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1000);
+ config.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofMillis(1000));
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
// env.enableCheckpointing(5000);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000));
env.setParallelism(1);
env.disableOperatorChaining();
env.getConfig().setGlobalJobParameters(parameters);
diff --git a/src/test/java/com/getindata/connectors/http/internal/retry/RetryConfigProviderTest.java b/src/test/java/com/getindata/connectors/http/internal/retry/RetryConfigProviderTest.java
index db0fa8a9..ef324c78 100644
--- a/src/test/java/com/getindata/connectors/http/internal/retry/RetryConfigProviderTest.java
+++ b/src/test/java/com/getindata/connectors/http/internal/retry/RetryConfigProviderTest.java
@@ -3,6 +3,7 @@
import java.util.stream.IntStream;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -16,7 +17,7 @@ void verifyFixedDelayRetryConfig() {
var config = new Configuration();
config.setString("gid.connector.http.source.lookup.retry-strategy.type", "fixed-delay");
config.setString("gid.connector.http.source.lookup.retry-strategy.fixed-delay.delay", "10s");
- config.setInteger("lookup.max-retries", 12);
+ config.set(LookupOptions.MAX_RETRIES, 12);
var retryConfig = RetryConfigProvider.create(config);
@@ -32,8 +33,8 @@ void verifyExponentialDelayConfig() {
config.setString("gid.connector.http.source.lookup.retry-strategy.type", "exponential-delay");
config.setString("gid.connector.http.source.lookup.retry-strategy.exponential-delay.initial-backoff", "15ms");
config.setString("gid.connector.http.source.lookup.retry-strategy.exponential-delay.max-backoff", "120ms");
- config.setInteger("gid.connector.http.source.lookup.retry-strategy.exponential-delay.backoff-multiplier", 2);
- config.setInteger("lookup.max-retries", 6);
+ config.setString("gid.connector.http.source.lookup.retry-strategy.exponential-delay.backoff-multiplier", "2");
+ config.set(LookupOptions.MAX_RETRIES, 6);
var retryConfig = RetryConfigProvider.create(config);
var intervalFunction = retryConfig.getIntervalFunction();
diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkWriterTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkWriterTest.java
index db1975ed..22125630 100644
--- a/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkWriterTest.java
+++ b/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkWriterTest.java
@@ -6,12 +6,13 @@
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
-import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.ResultHandler;
+import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
@@ -38,7 +39,7 @@ class HttpSinkWriterTest {
private ElementConverter elementConverter;
@Mock
- private InitContext context;
+ private WriterInitContext context;
@Mock
private SinkHttpClient httpClient;
@@ -64,12 +65,14 @@ public void setUp() {
this.httpSinkWriter = new HttpSinkWriter<>(
elementConverter,
context,
- 10,
- 10,
- 100,
- 10,
- 10,
- 10,
+ AsyncSinkWriterConfiguration.builder()
+ .setMaxBatchSize(10)
+ .setMaxBatchSizeInBytes(10)
+ .setMaxInFlightRequests(10)
+ .setMaxBufferedRequests(100)
+ .setMaxTimeInBufferMS(10)
+ .setMaxRecordSizeInBytes(10)
+ .build(),
"http://localhost/client",
httpClient,
stateBuffer,
@@ -85,11 +88,25 @@ public void testErrorMetric() throws InterruptedException {
when(httpClient.putRequests(anyList(), anyString())).thenReturn(future);
HttpSinkRequestEntry request = new HttpSinkRequestEntry("PUT", "hello".getBytes());
- Consumer> requestResult =
- httpSinkRequestEntries -> log.info(String.valueOf(httpSinkRequestEntries));
+ ResultHandler resultHandler = new ResultHandler() {
+ @Override
+ public void complete() {
+ log.info("Request completed successfully");
+ }
+
+ @Override
+ public void completeExceptionally(Exception e) {
+ log.error("Request failed.", e);
+ }
+
+ @Override
+ public void retryForEntries(List requestEntriesToRetry) {
+ log.warn("Request failed partially.");
+ }
+ };
List requestEntries = Collections.singletonList(request);
- this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult);
+ this.httpSinkWriter.submitRequestEntries(requestEntries, resultHandler);
// would be good to use Countdown Latch instead sleep...
Thread.sleep(2000);
diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/BodyBasedRequestFactoryTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/BodyBasedRequestFactoryTest.java
index f65b94ca..ec2845d9 100644
--- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/BodyBasedRequestFactoryTest.java
+++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/BodyBasedRequestFactoryTest.java
@@ -1,6 +1,5 @@
package com.getindata.connectors.http.internal.table.lookup;
-
import java.net.URI;
import java.net.http.HttpClient;
import java.util.Collection;
@@ -21,19 +20,19 @@ public class BodyBasedRequestFactoryTest {
@ParameterizedTest
@MethodSource("configProvider")
- void testconstructUri(TestSpec testSpec) throws Exception {
- Set configs = new HashSet();
+ void testConstructUri(TestSpec testSpec) throws Exception {
+ Set configs = new HashSet<>();
- Configuration configuration= new Configuration();
- Configuration configuration_http11 = new Configuration();
- Configuration configuration_http2 = new Configuration();
+ Configuration configuration = new Configuration();
+ Configuration configurationHttp11 = new Configuration();
+ Configuration configurationHttp2 = new Configuration();
- configuration_http2.setString(LOOKUP_HTTP_VERSION, String.valueOf(HttpClient.Version.HTTP_2));
- configuration_http11.setString(LOOKUP_HTTP_VERSION, String.valueOf(HttpClient.Version.HTTP_1_1));
+ configurationHttp2.setString(LOOKUP_HTTP_VERSION.key(), String.valueOf(HttpClient.Version.HTTP_2));
+ configurationHttp11.setString(LOOKUP_HTTP_VERSION.key(), String.valueOf(HttpClient.Version.HTTP_1_1));
configs.add(configuration);
- configs.add(configuration_http11);
- configs.add(configuration_http2);
+ configs.add(configurationHttp11);
+ configs.add(configurationHttp2);
for(Configuration config: configs) {
LookupQueryInfo lookupQueryInfo = new LookupQueryInfo(testSpec.url,
diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactoryTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactoryTest.java
index 82811004..b0464e1e 100644
--- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactoryTest.java
+++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactoryTest.java
@@ -2,8 +2,8 @@
import java.util.*;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
@@ -37,13 +37,12 @@ public class HttpLookupTableSourceFactoryTest {
@Test
void validateHttpLookupSourceOptions() {
-
HttpLookupTableSourceFactory httpLookupTableSourceFactory
= new HttpLookupTableSourceFactory();
- TableConfig tableConfig = new TableConfig();
+ Configuration tableConfig = new Configuration();
httpLookupTableSourceFactory.validateHttpLookupSourceOptions(tableConfig);
tableConfig.set(HttpLookupConnectorOptions
- .SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL.key(), "aaa");
+ .SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL, "aaa");
try {
httpLookupTableSourceFactory.validateHttpLookupSourceOptions(tableConfig);
@@ -53,7 +52,7 @@ void validateHttpLookupSourceOptions() {
}
// should now work.
tableConfig.set(HttpLookupConnectorOptions
- .SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST.key(), "bbb");
+ .SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST, "bbb");
httpLookupTableSourceFactory.validateHttpLookupSourceOptions(tableConfig);
}
diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java
index 64ca4ab3..3a900d21 100644
--- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java
+++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java
@@ -23,10 +23,9 @@
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
-import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -106,11 +105,14 @@ void setup() {
wireMockServer.start();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRestartStrategy(RestartStrategies.noRestart());
Configuration config = new Configuration();
+ config.set(RestartStrategyOptions.RESTART_STRATEGY,
+ RestartStrategyOptions.RestartStrategyType.NO_RESTART_STRATEGY.getMainValue());
config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
env.configure(config, getClass().getClassLoader());
- env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
+ env.getCheckpointConfig().setCheckpointInterval(1000L);
+ env.getCheckpointConfig().setCheckpointingConsistencyMode(
+ org.apache.flink.core.execution.CheckpointingMode.EXACTLY_ONCE);
env.setParallelism(1); // wire mock server has problem with scenario state during parallel execution
tEnv = StreamTableEnvironment.create(env);
diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceTest.java
index d4684cd4..3ebbf750 100644
--- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceTest.java
+++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceTest.java
@@ -1,6 +1,13 @@
package com.getindata.connectors.http.internal.table.lookup;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.metrics.groups.CacheMetricGroup;
@@ -172,8 +179,9 @@ void shouldCreateTableSourceWithParams() {
HttpLookupTableSource tableSource =
(HttpLookupTableSource) createTableSource(SCHEMA, getOptions());
+ // FIXME: preferCustomShuffle
LookupTableSource.LookupRuntimeProvider lookupProvider =
- tableSource.getLookupRuntimeProvider(new LookupRuntimeProviderContext(lookupKey));
+ tableSource.getLookupRuntimeProvider(new LookupRuntimeProviderContext(lookupKey, false));
HttpTableLookupFunction tableFunction = (HttpTableLookupFunction)
((LookupFunctionProvider) lookupProvider).createLookupFunction();
@@ -205,7 +213,7 @@ void shouldCreateAsyncTableSourceWithParams() {
AsyncLookupFunctionProvider lookupProvider =
(AsyncLookupFunctionProvider)
tableSource.getLookupRuntimeProvider(
- new LookupRuntimeProviderContext(lookupKey));
+ new LookupRuntimeProviderContext(lookupKey, false));
AsyncHttpTableLookupFunction tableFunction =
(AsyncHttpTableLookupFunction) lookupProvider.createAsyncLookupFunction();
@@ -322,9 +330,6 @@ private static ImmutableList getTestSpecs() {
HttpLookupTableSource tableSource = new HttpLookupTableSource(
null, options,
null, null, cache);
- int[][] lookupKeys = {{1, 2}};
- LookupTableSource.LookupContext lookupContext =
- new LookupRuntimeProviderContext(lookupKeys);
return tableSource.getLookupRuntimeProvider(null, null, null);
}
diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java
index a4b0293f..8180eeac 100644
--- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java
+++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java
@@ -103,7 +103,8 @@ static void cleanUpAll() {
@BeforeEach
void setUp() {
int[][] lookupKey = {{}};
- this.dynamicTableSourceContext = new LookupRuntimeProviderContext(lookupKey);
+ // TODO: what is preferCustomShuffle?
+ this.dynamicTableSourceContext = new LookupRuntimeProviderContext(lookupKey, false);
this.lookupRowData = GenericRowData.of(
StringData.fromString("1"),
@@ -273,8 +274,8 @@ void shouldHandleCodeBasedOnConfiguration(
// GIVEN
this.stubMapping = setUpServerStub(201);
- configuration.setString(SOURCE_LOOKUP_HTTP_SUCCESS_CODES, successCodesExpression);
- configuration.setString(SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES, ignoredResponseCodesExpression);
+ configuration.set(SOURCE_LOOKUP_HTTP_SUCCESS_CODES, successCodesExpression);
+ configuration.set(SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES, ignoredResponseCodesExpression);
JavaNetHttpPollingClient pollingClient = setUpPollingClient();
// WHEN
diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java
index 3f88f936..8414dddf 100644
--- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java
+++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java
@@ -67,7 +67,8 @@ public class JavaNetHttpPollingClientHttpsConnectionTest extends HttpsConnection
public void setUp() {
super.setUp();
int[][] lookupKey = {{0, 1}};
- this.dynamicTableSourceContext = new LookupRuntimeProviderContext(lookupKey);
+ // FIXME: What is preferCustomShuffle?
+ this.dynamicTableSourceContext = new LookupRuntimeProviderContext(lookupKey, false);
this.lookupRowData = GenericRowData.of(
StringData.fromString("1"),
diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientWithWireTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientWithWireTest.java
index ae4997ab..fd13c156 100644
--- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientWithWireTest.java
+++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientWithWireTest.java
@@ -9,9 +9,9 @@
import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.ConfigurationException;
@@ -73,8 +73,9 @@ public void setup() {
wireMockServer.start();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRestartStrategy(RestartStrategies.noRestart());
Configuration config = new Configuration();
+ config.set(RestartStrategyOptions.RESTART_STRATEGY,
+ RestartStrategyOptions.RestartStrategyType.NO_RESTART_STRATEGY.getMainValue());
config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
env.configure(config, getClass().getClassLoader());
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
@@ -128,8 +129,8 @@ private void shouldUpdateHttpRequestIfRequired(HttpRequest httpRequest) throws C
HttpRequest newHttpRequest = client.updateHttpRequestIfRequired(request,
oidcHeaderPreProcessor);
assertThat(httpRequest).isEqualTo(newHttpRequest);
- configuration.setString(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL.key(), "http://localhost:9090/auth");
- configuration.setString(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST, BEARER_REQUEST);
+ configuration.set(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL, "http://localhost:9090/auth");
+ configuration.set(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST, BEARER_REQUEST);
configuration.set(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_EXPIRY_REDUCTION,
Duration.ofSeconds(1L));
client = new JavaNetHttpPollingClient(mock(HttpClient.class),
diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactoryTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactoryTest.java
index 83fa3826..ff83ca62 100644
--- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactoryTest.java
+++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactoryTest.java
@@ -78,7 +78,7 @@ public void lookupQueryInfoTestRequiredConfig() {
}
private void createUsingFactory(boolean async) {
- this.config.setBoolean(HttpLookupConnectorOptions.ASYNC_POLLING, async);
+ this.config.set(HttpLookupConnectorOptions.ASYNC_POLLING, async);
LookupRow lookupRow= new LookupRow()
.addLookupEntry(
new RowDataSingleValueLookupSchemaEntry(
diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
index ae3b85ee..8d81ab25 100644
--- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
+++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
@@ -222,7 +222,7 @@ private static void validateCreatedQueryForPutAndPost(LookupQueryInfo createdQue
QUERY_PARAMS);
}
config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_URL_MAP, URL_PARAMS);
- config.setString(LOOKUP_METHOD, operation);
+ config.set(LOOKUP_METHOD, operation);
return config;
}
diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonQueryCreatorFactoryTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonQueryCreatorFactoryTest.java
index ce3c87de..49b0e655 100644
--- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonQueryCreatorFactoryTest.java
+++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonQueryCreatorFactoryTest.java
@@ -49,11 +49,9 @@ public void setUp() {
this.tableContext = new FactoryUtil.DefaultDynamicTableContext(
ObjectIdentifier.of("default", "default", "test"),
new ResolvedCatalogTable(
- CatalogTable.of(
- Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
- null,
- Collections.emptyList(),
- Collections.emptyMap()),
+ CatalogTable.newBuilder()
+ .schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
+ .build(),
resolvedSchema),
Collections.emptyMap(),
config,
diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/QueryCreatorUtils.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/QueryCreatorUtils.java
index 6ccdb444..6328e604 100644
--- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/QueryCreatorUtils.java
+++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/QueryCreatorUtils.java
@@ -24,11 +24,9 @@ public static DynamicTableFactory.Context getTableContext(Configuration config,
return new FactoryUtil.DefaultDynamicTableContext(
ObjectIdentifier.of("default", "default", "test"),
new ResolvedCatalogTable(
- CatalogTable.of(
- Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
- null,
- Collections.emptyList(),
- Collections.emptyMap()),
+ CatalogTable.newBuilder()
+ .schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
+ .build(),
resolvedSchema),
Collections.emptyMap(),
config,