Skip to content

Commit 49de425

Browse files
committed
More work in progress
1 parent d02ba26 commit 49de425

23 files changed

+842
-208
lines changed

src/main/java/org/sourcelab/kafka/connect/apiclient/ApiClient.java

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,25 @@
22

33
import org.slf4j.Logger;
44
import org.slf4j.LoggerFactory;
5+
import org.sourcelab.kafka.connect.apiclient.request.JacksonFactory;
6+
import org.sourcelab.kafka.connect.apiclient.request.Request;
7+
import org.sourcelab.kafka.connect.apiclient.request.RequestErrorResponse;
8+
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorStatus;
9+
import org.sourcelab.kafka.connect.apiclient.request.get.connector.GetConnector;
10+
import org.sourcelab.kafka.connect.apiclient.request.get.connector.GetConnectorConfig;
11+
import org.sourcelab.kafka.connect.apiclient.request.get.connector.GetConnectorStatus;
12+
import org.sourcelab.kafka.connect.apiclient.request.get.connector.GetConnectors;
13+
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorDefinition;
14+
import org.sourcelab.kafka.connect.apiclient.request.post.connector.PostConnectors;
15+
import org.sourcelab.kafka.connect.apiclient.request.put.connector.PutConnectorConfig;
516
import org.sourcelab.kafka.connect.apiclient.rest.HttpClientRestClient;
17+
import org.sourcelab.kafka.connect.apiclient.rest.InvalidRequestException;
618
import org.sourcelab.kafka.connect.apiclient.rest.RestClient;
19+
import org.sourcelab.kafka.connect.apiclient.rest.RestResponse;
20+
21+
import java.io.IOException;
22+
import java.util.Collection;
23+
import java.util.Map;
724

825
public class ApiClient {
926
private static final Logger logger = LoggerFactory.getLogger(ApiClient.class);
@@ -18,6 +35,12 @@ public class ApiClient {
1835
*/
1936
private final RestClient restClient;
2037

38+
/**
39+
* Internal State flag.
40+
*/
41+
private boolean isInitialized = false;
42+
43+
2144
/**
2245
* Default Constructor.
2346
* @param configuration Api Client Configuration.
@@ -37,4 +60,86 @@ public ApiClient(final Configuration configuration, final RestClient restClient)
3760
this.configuration = configuration;
3861
this.restClient = restClient;
3962
}
63+
64+
private <T> T submitRequest(final Request<T> request) {
65+
// Submit request
66+
final RestResponse restResponse = getRestClient().submitRequest(request);
67+
final int responseCode = restResponse.getHttpCode();
68+
String responseStr = restResponse.getResponseStr();
69+
70+
// If we have a valid response
71+
logger.info("Response: {}", restResponse);
72+
73+
// Check for invalid http status codes
74+
if (responseCode >= 200 && responseCode < 300) {
75+
// These response codes have no values
76+
if ((responseCode == 204 || responseCode == 205) && responseStr == null) {
77+
// Avoid NPE
78+
responseStr = "";
79+
}
80+
81+
try {
82+
return request.parseResponse(restResponse.getResponseStr());
83+
} catch (IOException exception) {
84+
throw new RuntimeException(exception.getMessage(), exception);
85+
}
86+
}
87+
88+
// Attempt to parse error response
89+
try {
90+
final RequestErrorResponse errorResponse = JacksonFactory.newInstance().readValue(responseStr, RequestErrorResponse.class);
91+
throw new InvalidRequestException(errorResponse.getMessage(), errorResponse.getErrorCode());
92+
} catch (IOException e) {
93+
// swallow
94+
}
95+
throw new InvalidRequestException("Invalid response from server: " + responseStr, restResponse.getHttpCode());
96+
97+
}
98+
99+
public Collection<String> getConnectors() {
100+
return submitRequest(new GetConnectors());
101+
}
102+
103+
public ConnectorDefinition getConnector(final String connectorName) {
104+
return submitRequest(new GetConnector(connectorName));
105+
}
106+
107+
public Map<String, String> getConnectorConfig(final String connectorName) {
108+
return submitRequest(new GetConnectorConfig(connectorName));
109+
}
110+
111+
public ConnectorStatus getConnectorStatus(final String connectorName) {
112+
return submitRequest(new GetConnectorStatus(connectorName));
113+
}
114+
115+
// TODO Add return value
116+
public String addConnector(final ConnectorDefinition connectorDefinition) {
117+
return submitRequest(new PostConnectors(connectorDefinition));
118+
}
119+
120+
public ConnectorDefinition updateConnectorConfig(final String connectorName, final Map<String, String> config) {
121+
return submitRequest(new PutConnectorConfig(connectorName, config));
122+
}
123+
124+
/**
125+
* package protected for access in tests.
126+
* @return Rest Client.
127+
*/
128+
private RestClient getRestClient() {
129+
// If we haven't initialized.
130+
if (!isInitialized) {
131+
// Call Init.
132+
restClient.init(getConfiguration());
133+
134+
// Flip state flag
135+
isInitialized = true;
136+
}
137+
138+
// return our rest client.
139+
return restClient;
140+
}
141+
142+
private Configuration getConfiguration() {
143+
return configuration;
144+
}
40145
}

src/main/java/org/sourcelab/kafka/connect/apiclient/Configuration.java

Lines changed: 18 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,13 @@
1-
/**
2-
* Copyright 2017 Stephen Powis https://github.com/Crim/pardot-java-client
3-
*
4-
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
5-
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
6-
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
7-
* persons to whom the Software is furnished to do so, subject to the following conditions:
8-
*
9-
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
10-
* Software.
11-
*
12-
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
13-
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
14-
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
15-
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
16-
*/
17-
181
package org.sourcelab.kafka.connect.apiclient;
192

203
/**
21-
* Configure your Pardot API credentials.
4+
* Configure your Kafka Connect API client.
225
*
236
* Also allows for configuring an optional proxy with or without authentication.
247
*/
258
public class Configuration {
26-
private final String email;
27-
private final String password;
28-
private final String userKey;
29-
30-
/**
31-
* Optionally you can re-use an existing known good api key.
32-
*/
33-
private String apiKey = null;
9+
// Defines the URL/Hostname of Kafka-Connect
10+
private final String apiHost;
3411

3512
// Optional Proxy Configuration
3613
private String proxyHost = null;
@@ -41,32 +18,21 @@ public class Configuration {
4118
private String proxyUsername = null;
4219
private String proxyPassword = null;
4320

44-
// If you want to override the Pardot API url or version.
45-
private String pardotApiHost = "https://pi.pardot.com/api";
46-
private String pardotApiVersion = "3";
47-
4821
/**
49-
* Constructor.
50-
* @param email Pardot user's email address.
51-
* @param password Pardot user's password.
52-
* @param userKey Pardot user's userKey.
22+
* Default Constructor.
23+
* @param kafkaConnectHost Hostname of Kafka-Connect
5324
*/
54-
public Configuration(final String email, final String password, final String userKey) {
55-
this.email = email;
56-
this.password = password;
57-
this.userKey = userKey;
58-
}
59-
60-
public String getEmail() {
61-
return email;
62-
}
63-
64-
public String getPassword() {
65-
return password;
66-
}
25+
public Configuration(final String kafkaConnectHost) {
26+
if (kafkaConnectHost == null) {
27+
throw new NullPointerException("Kafka Connect Host parameter cannot be null!");
28+
}
6729

68-
public String getUserKey() {
69-
return userKey;
30+
// Normalize into "http://<hostname>"
31+
if (!kafkaConnectHost.startsWith("http://")) {
32+
this.apiHost = "http://" + kafkaConnectHost;
33+
} else {
34+
this.apiHost = kafkaConnectHost;
35+
}
7036
}
7137

7238
/**
@@ -97,24 +63,6 @@ public Configuration useProxyAuthentication(final String proxyUsername, final St
9763
return this;
9864
}
9965

100-
/**
101-
* Configure library to use Pardot Api Version 4.
102-
* @return Configuration instance.
103-
*/
104-
public Configuration withApiVersion4() {
105-
this.pardotApiVersion = "4";
106-
return this;
107-
}
108-
109-
/**
110-
* Configure library to use Pardot Api Version 4.
111-
* @return Configuration instance.
112-
*/
113-
public Configuration withApiVersion3() {
114-
this.pardotApiVersion = "3";
115-
return this;
116-
}
117-
11866
public String getProxyHost() {
11967
return proxyHost;
12068
}
@@ -135,37 +83,14 @@ public String getProxyPassword() {
13583
return proxyPassword;
13684
}
13785

138-
public String getPardotApiHost() {
139-
return pardotApiHost;
140-
}
141-
142-
public void setPardotApiHost(final String pardotApiHost) {
143-
this.pardotApiHost = pardotApiHost;
144-
}
145-
146-
public String getPardotApiVersion() {
147-
return pardotApiVersion;
148-
}
149-
150-
public void setPardotApiVersion(final String pardotApiVersion) {
151-
this.pardotApiVersion = pardotApiVersion;
152-
}
153-
154-
public String getApiKey() {
155-
return apiKey;
156-
}
157-
158-
public void setApiKey(final String apiKey) {
159-
this.apiKey = apiKey;
86+
public String getApiHost() {
87+
return apiHost;
16088
}
16189

16290
@Override
16391
public String toString() {
16492
final StringBuilder stringBuilder = new StringBuilder("Configuration{")
165-
.append("email='").append(email).append('\'')
166-
.append(", password='XXXXX'")
167-
.append(", userKey='").append(userKey.substring(0,3)).append("...'");
168-
93+
.append("apiHost='").append(apiHost).append('\'');
16994
if (proxyHost != null) {
17095
stringBuilder
17196
.append(", proxy='").append(proxyScheme).append("://");
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package org.sourcelab.kafka.connect.apiclient.request;
2+
3+
import com.fasterxml.jackson.databind.DeserializationFeature;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
6+
import com.fasterxml.jackson.dataformat.xml.JacksonXmlModule;
7+
import com.fasterxml.jackson.dataformat.xml.XmlMapper;
8+
import com.fasterxml.jackson.datatype.joda.JodaModule;
9+
10+
import java.text.SimpleDateFormat;
11+
12+
/**
13+
* Creates properly configured Jackson XML Mapper instances.
14+
*/
15+
public class JacksonFactory {
16+
17+
/**
18+
* Creates properly configured Jackson Object Mapper instances.
19+
* @return ObjectMapper instance.
20+
*/
21+
public static ObjectMapper newInstance() {
22+
// Create new mapper
23+
final ObjectMapper mapper = new ObjectMapper();
24+
25+
// Configure it
26+
mapper
27+
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
28+
.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
29+
30+
return mapper;
31+
}
32+
}
Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,28 @@
11
package org.sourcelab.kafka.connect.apiclient.request;
22

3+
import java.io.IOException;
4+
import java.util.Collection;
35
import java.util.Map;
46

57
/**
68
* Interface for all Requests to implement.
79
*/
8-
public interface Request {
10+
public interface Request<T> {
911

1012
/**
1113
* @return The name of the end point this request uses. Example: "campaign", "user", etc..
1214
*/
1315
String getApiEndpoint();
1416

17+
/**
18+
* @return The type of HTTP Request.
19+
*/
20+
RequestMethod getRequestMethod();
21+
1522
/**
1623
* @return correctly formatted request parameters.
1724
*/
18-
Map<String, String> getRequestParameters();
25+
Object getRequestBody();
26+
27+
T parseResponse(final String responseStr) throws IOException;
1928
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package org.sourcelab.kafka.connect.apiclient.request;
2+
3+
public class RequestErrorResponse {
4+
private int errorCode;
5+
private String message;
6+
7+
public int getErrorCode() {
8+
return errorCode;
9+
}
10+
11+
public String getMessage() {
12+
return message;
13+
}
14+
15+
@Override
16+
public String toString() {
17+
return "RequestErrorResponse{"
18+
+ "errorCode=" + errorCode
19+
+ ", message='" + message + '\''
20+
+ '}';
21+
}
22+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.sourcelab.kafka.connect.apiclient.request;
2+
3+
public enum RequestMethod {
4+
GET,
5+
POST,
6+
PUT;
7+
}

0 commit comments

Comments
 (0)