-
Notifications
You must be signed in to change notification settings - Fork 11
PLUGIN-1936: Read data in a streaming manner #131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
1f2d9d7
4235153
80033a3
5e99944
cf89907
2274e97
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,6 +27,8 @@ | |
| import com.google.gson.JsonArray; | ||
| import com.google.gson.JsonObject; | ||
| import com.google.gson.reflect.TypeToken; | ||
| import com.google.gson.stream.JsonReader; | ||
| import com.google.gson.stream.JsonToken; | ||
| import io.cdap.cdap.api.data.schema.Schema; | ||
| import io.cdap.cdap.etl.api.FailureCollector; | ||
| import io.cdap.plugin.servicenow.connector.ServiceNowConnectorConfig; | ||
|
|
@@ -55,8 +57,12 @@ | |
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.io.IOException; | ||
| import java.io.InputStream; | ||
| import java.io.InputStreamReader; | ||
| import java.lang.reflect.Type; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.util.ArrayList; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
|
|
@@ -131,7 +137,7 @@ public String getAccessTokenRetryableMode() throws ExecutionException, RetryExce | |
| * @param limit The number of records to be fetched | ||
| * @return The list of Map; each Map representing a table row | ||
| */ | ||
| public List<Map<String, String>> fetchTableRecords( | ||
| public RestAPIResponse fetchTableRecords( | ||
| String tableName, | ||
| SourceValueType valueType, | ||
| String startDate, | ||
|
|
@@ -154,7 +160,7 @@ public List<Map<String, String>> fetchTableRecords( | |
| String accessToken = getAccessToken(); | ||
| requestBuilder.setAuthHeader(accessToken); | ||
| RestAPIResponse apiResponse = executeGetWithRetries(requestBuilder.build()); | ||
| return parseResponseToResultListOfMap(apiResponse.getResponseBody()); | ||
| return apiResponse; | ||
| } | ||
|
|
||
| private void applyDateRangeToRequest(ServiceNowTableAPIRequestBuilder requestBuilder, String startDate, | ||
|
|
@@ -197,6 +203,40 @@ public List<Map<String, String>> parseResponseToResultListOfMap(String responseB | |
| return GSON.fromJson(ja, type); | ||
| } | ||
|
|
||
| public List<Map<String, String>> parseResponseToResultListOfMap(InputStream in) throws ServiceNowAPIException { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we creating a List of map? Instead we can directly read it as a JsonObject, something like: gson.fromJson(reader, ServiceNowRecordObject.class); |
||
| try (InputStreamReader reader = new InputStreamReader(in, StandardCharsets.UTF_8); | ||
| JsonReader jsonReader = new JsonReader(reader)) { | ||
| jsonReader.setLenient(true); | ||
| jsonReader.beginObject(); | ||
|
|
||
| List<Map<String, String>> records = new ArrayList<>(); | ||
| while (jsonReader.hasNext()) { | ||
| String name = jsonReader.nextName(); | ||
| if (ServiceNowConstants.RESULT.equals(name) && jsonReader.peek() == JsonToken.BEGIN_ARRAY) { | ||
| jsonReader.beginArray(); | ||
| while (jsonReader.hasNext()) { | ||
| jsonReader.beginObject(); | ||
| while (jsonReader.hasNext()) { | ||
| Map<String, String> record = new HashMap<>(); | ||
| String field = jsonReader.nextName(); | ||
| JsonToken token = jsonReader.peek(); | ||
| record.put(field, token == JsonToken.NULL ? null : jsonReader.nextString()); | ||
| records.add(record); | ||
| } | ||
| jsonReader.endObject(); | ||
| } | ||
| jsonReader.endArray(); | ||
| } else { | ||
| jsonReader.skipValue(); | ||
| } | ||
| } | ||
| jsonReader.endObject(); | ||
| return records; | ||
| } catch (IOException e) { | ||
| throw new ServiceNowAPIException(e, null); | ||
| } | ||
| } | ||
|
|
||
| private String getErrorMessage(String responseBody) { | ||
| try { | ||
| JsonObject jo = GSON.fromJson(responseBody, JsonObject.class); | ||
|
|
@@ -231,12 +271,14 @@ private String getErrorMessage(String responseBody) { | |
| * @param limit The number of records to be fetched | ||
| * @return The list of Map; each Map representing a table row | ||
| */ | ||
| public List<Map<String, String>> fetchTableRecordsRetryableMode(String tableName, SourceValueType valueType, | ||
| public RestAPIResponse fetchTableRecordsRetryableMode(String tableName, SourceValueType valueType, | ||
| String startDate, String endDate, int offset, | ||
| int limit) throws ServiceNowAPIException { | ||
| final List<Map<String, String>> results = new ArrayList<>(); | ||
| //final List<Map<String, String>> results = new ArrayList<>(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please remove |
||
| final RestAPIResponse[] restAPIResponse = new RestAPIResponse[1]; | ||
| Callable<Boolean> fetchRecords = () -> { | ||
| results.addAll(fetchTableRecords(tableName, valueType, startDate, endDate, offset, limit)); | ||
| // results.addAll(fetchTableRecords(tableName, valueType, startDate, endDate, offset, limit)); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please remove |
||
| restAPIResponse[0] = fetchTableRecords(tableName, valueType, startDate, endDate, offset, limit); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you please explain what are we trying to do here? why restAPIResponse is an array |
||
| return true; | ||
| }; | ||
|
|
||
|
|
@@ -254,7 +296,7 @@ public List<Map<String, String>> fetchTableRecordsRetryableMode(String tableName | |
| e, null, false); | ||
| } | ||
|
|
||
| return results; | ||
| return restAPIResponse[0]; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -277,8 +319,8 @@ public Schema fetchTableSchema(String tableName, FailureCollector collector) { | |
| } | ||
|
|
||
| @VisibleForTesting | ||
| public MetadataAPISchemaResponse parseSchemaResponse(String responseBody) { | ||
| return GSON.fromJson(responseBody, MetadataAPISchemaResponse.class); | ||
| public MetadataAPISchemaResponse parseSchemaResponse(InputStream responseStream) { | ||
| return GSON.fromJson(createJsonReader(responseStream), MetadataAPISchemaResponse.class); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -352,7 +394,7 @@ public Schema fetchTableSchema(String tableName, String accessToken, SourceValue | |
| private Schema prepareSchemaWithSchemaAPI(RestAPIResponse restAPIResponse, List<ServiceNowColumn> columns, | ||
| String tableName) throws ServiceNowAPIException { | ||
| SchemaAPISchemaResponse schemaAPISchemaResponse = | ||
| GSON.fromJson(restAPIResponse.getResponseBody(), SchemaAPISchemaResponse.class); | ||
| GSON.fromJson(createJsonReader(restAPIResponse.getBodyAsStream()), SchemaAPISchemaResponse.class); | ||
|
|
||
| if (schemaAPISchemaResponse.getResult() == null || schemaAPISchemaResponse.getResult().isEmpty()) { | ||
| throw new ServiceNowAPIException( | ||
|
|
@@ -386,8 +428,7 @@ private Schema prepareSchemaWithSchemaAPI(RestAPIResponse restAPIResponse, List< | |
| private Schema prepareSchemaWithMetadataAPI(RestAPIResponse restAPIResponse, List<ServiceNowColumn> columns, | ||
| String tableName, SourceValueType valueType) throws | ||
| ServiceNowAPIException { | ||
| MetadataAPISchemaResponse metadataAPISchemaResponse = parseSchemaResponse(restAPIResponse.getResponseBody()); | ||
|
|
||
| MetadataAPISchemaResponse metadataAPISchemaResponse = parseSchemaResponse(restAPIResponse.getBodyAsStream()); | ||
| if (metadataAPISchemaResponse.getResult() == null || metadataAPISchemaResponse.getResult().getColumns() == null || | ||
| metadataAPISchemaResponse.getResult().getColumns().isEmpty()) { | ||
| throw new ServiceNowAPIException( | ||
|
|
@@ -413,6 +454,11 @@ private Schema prepareSchemaWithMetadataAPI(RestAPIResponse restAPIResponse, Lis | |
| return SchemaBuilder.constructSchema(tableName, columns); | ||
| } | ||
|
|
||
| public JsonReader createJsonReader(InputStream inputStream) { | ||
| Objects.requireNonNull(inputStream, "InputStream must not be null"); | ||
| return new JsonReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); | ||
| } | ||
|
|
||
| /** | ||
| * Get the total number of records in the table | ||
| * | ||
|
|
@@ -503,8 +549,8 @@ public String createRecordInDisplayMode(String tableName, HttpEntity entity) thr | |
| } | ||
|
|
||
| private String getSystemId(RestAPIResponse restAPIResponse) { | ||
| CreateRecordAPIResponse apiResponse = GSON.fromJson(restAPIResponse.getResponseBody(), | ||
| CreateRecordAPIResponse.class); | ||
| CreateRecordAPIResponse apiResponse = GSON.fromJson( | ||
| new InputStreamReader(restAPIResponse.getBodyAsStream(), StandardCharsets.UTF_8), CreateRecordAPIResponse.class); | ||
| return apiResponse.getResult().get(ServiceNowConstants.SYSTEM_ID).toString(); | ||
| } | ||
|
|
||
|
|
@@ -527,7 +573,8 @@ public Map<String, String> getRecordFromServiceNowTable(String tableName, String | |
| requestBuilder.setAuthHeader(accessToken); | ||
| restAPIResponse = executeGetWithRetries(requestBuilder.build()); | ||
|
|
||
| APIResponse apiResponse = GSON.fromJson(restAPIResponse.getResponseBody(), APIResponse.class); | ||
| APIResponse apiResponse = GSON.fromJson( | ||
| new InputStreamReader(restAPIResponse.getBodyAsStream(), StandardCharsets.UTF_8), APIResponse.class); | ||
| return apiResponse.getResult().get(0); | ||
| } | ||
|
|
||
|
|
@@ -545,8 +592,9 @@ public Map<String, String> getRecordFromServiceNowTable(String tableName, String | |
| * @throws RuntimeException if the schema response is null or contains no result. | ||
| */ | ||
| private Schema prepareStringBasedSchema(RestAPIResponse restAPIResponse, List<ServiceNowColumn> columns, | ||
| String tableName) { | ||
| List<Map<String, String>> result = parseResponseToResultListOfMap(restAPIResponse.getResponseBody()); | ||
| String tableName) throws ServiceNowAPIException { | ||
| List<Map<String, String>> result = parseResponseToResultListOfMap(restAPIResponse.getBodyAsStream()); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should be List of Records/objects instead of map |
||
| // List<Map<String, String>> result = parseResponseToResultListOfMap(restAPIResponse.getResponseBody()); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove |
||
| if (result != null && !result.isEmpty()) { | ||
| Map<String, String> firstRecord = result.get(0); | ||
| for (String key : firstRecord.keySet()) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also explain how are we checking is result is empty