Skip to content

Commit 294b716

Browse files
committed
Implement a few more end points
1 parent acb6ce1 commit 294b716

File tree

10 files changed

+284
-8
lines changed

10 files changed

+284
-8
lines changed

src/main/java/org/sourcelab/kafka/connect/apiclient/ApiClient.java

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,18 @@
55
import org.sourcelab.kafka.connect.apiclient.request.JacksonFactory;
66
import org.sourcelab.kafka.connect.apiclient.request.Request;
77
import org.sourcelab.kafka.connect.apiclient.request.RequestErrorResponse;
8+
import org.sourcelab.kafka.connect.apiclient.request.delete.connector.DeleteConnector;
89
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorStatus;
910
import org.sourcelab.kafka.connect.apiclient.request.get.connector.GetConnector;
1011
import org.sourcelab.kafka.connect.apiclient.request.get.connector.GetConnectorConfig;
1112
import org.sourcelab.kafka.connect.apiclient.request.get.connector.GetConnectorStatus;
1213
import org.sourcelab.kafka.connect.apiclient.request.get.connector.GetConnectors;
1314
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorDefinition;
14-
import org.sourcelab.kafka.connect.apiclient.request.post.connector.PostConnectors;
15+
import org.sourcelab.kafka.connect.apiclient.request.post.connector.PostConnector;
16+
import org.sourcelab.kafka.connect.apiclient.request.post.connector.PostConnectorRestart;
1517
import org.sourcelab.kafka.connect.apiclient.request.put.connector.PutConnectorConfig;
18+
import org.sourcelab.kafka.connect.apiclient.request.put.connector.PutConnectorPause;
19+
import org.sourcelab.kafka.connect.apiclient.request.put.connector.PutConnectorResume;
1620
import org.sourcelab.kafka.connect.apiclient.rest.HttpClientRestClient;
1721
import org.sourcelab.kafka.connect.apiclient.rest.InvalidRequestException;
1822
import org.sourcelab.kafka.connect.apiclient.rest.RestClient;
@@ -79,8 +83,8 @@ private <T> T submitRequest(final Request<T> request) {
7983
}
8084

8185
try {
82-
return request.parseResponse(restResponse.getResponseStr());
83-
} catch (IOException exception) {
86+
return request.parseResponse(responseStr);
87+
} catch (final IOException exception) {
8488
throw new RuntimeException(exception.getMessage(), exception);
8589
}
8690
}
@@ -108,19 +112,78 @@ public Map<String, String> getConnectorConfig(final String connectorName) {
108112
return submitRequest(new GetConnectorConfig(connectorName));
109113
}
110114

115+
/**
116+
* Get the status of specified connector by name.
117+
* https://docs.confluent.io/current/connect/restapi.html#get--connectors-(string-name)-config
118+
*
119+
* @param connectorName Name of connector.
120+
* @return Status details of the connector.
121+
*/
111122
public ConnectorStatus getConnectorStatus(final String connectorName) {
112123
return submitRequest(new GetConnectorStatus(connectorName));
113124
}
114125

115126
// TODO Add return value
116127
public String addConnector(final ConnectorDefinition connectorDefinition) {
117-
return submitRequest(new PostConnectors(connectorDefinition));
128+
return submitRequest(new PostConnector(connectorDefinition));
118129
}
119130

131+
/**
132+
* Update a connector's configuration.
133+
* https://docs.confluent.io/current/connect/restapi.html#put--connectors-(string-name)-config
134+
*
135+
* @param connectorName Name of connector to update.
136+
* @param config Configuration values to set.
137+
* @return ConnectorDefinition describing the connectors configuration.
138+
*/
120139
public ConnectorDefinition updateConnectorConfig(final String connectorName, final Map<String, String> config) {
121140
return submitRequest(new PutConnectorConfig(connectorName, config));
122141
}
123142

143+
/**
144+
* Restart a connector.
145+
* https://docs.confluent.io/current/connect/restapi.html#post--connectors-(string-name)-restart
146+
*
147+
* @param connectorName Name of connector to restart.
148+
* @return Boolean true if success.
149+
*/
150+
public Boolean restartConnector(final String connectorName) {
151+
return submitRequest(new PostConnectorRestart(connectorName));
152+
}
153+
154+
/**
155+
* Pause a connector.
156+
* https://docs.confluent.io/current/connect/restapi.html#put--connectors-(string-name)-pause
157+
*
158+
* @param connectorName Name of connector to pause.
159+
* @return Boolean true if success.
160+
*/
161+
public Boolean pauseConnector(final String connectorName) {
162+
return submitRequest(new PutConnectorPause(connectorName));
163+
}
164+
165+
/**
166+
* Resume a connector.
167+
* https://docs.confluent.io/current/connect/restapi.html#put--connectors-(string-name)-resume
168+
*
169+
* @param connectorName Name of connector to resume.
170+
* @return Boolean true if success.
171+
*/
172+
public Boolean resumeConnector(final String connectorName) {
173+
return submitRequest(new PutConnectorResume(connectorName));
174+
}
175+
176+
/**
177+
* Resume a connector.
178+
* https://docs.confluent.io/current/connect/restapi.html#put--connectors-(string-name)-resume
179+
*
180+
* @param connectorName Name of connector to resume.
181+
* @return Boolean true if success.
182+
*/
183+
public Boolean deleteConnector(final String connectorName) {
184+
return submitRequest(new DeleteConnector(connectorName));
185+
}
186+
124187
/**
125188
* package protected for access in tests.
126189
* @return Rest Client.
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
package org.sourcelab.kafka.connect.apiclient.request;
22

33
public enum RequestMethod {
4+
DELETE,
45
GET,
56
POST,
6-
PUT;
7+
PUT
78
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package org.sourcelab.kafka.connect.apiclient.request.delete;
2+
3+
import org.sourcelab.kafka.connect.apiclient.request.Request;
4+
import org.sourcelab.kafka.connect.apiclient.request.RequestMethod;
5+
6+
public interface DeleteRequest<T> extends Request<T> {
7+
@Override
8+
default RequestMethod getRequestMethod() {
9+
return RequestMethod.DELETE;
10+
}
11+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package org.sourcelab.kafka.connect.apiclient.request.delete.connector;
2+
3+
import com.sun.xml.internal.rngom.util.Uri;
4+
import org.sourcelab.kafka.connect.apiclient.request.delete.DeleteRequest;
5+
6+
import java.io.IOException;
7+
8+
public class DeleteConnector implements DeleteRequest<Boolean> {
9+
private final String name;
10+
11+
public DeleteConnector(final String name) {
12+
this.name = name;
13+
}
14+
15+
@Override
16+
public String getApiEndpoint() {
17+
return "/connectors/" + Uri.escapeDisallowedChars(name);
18+
}
19+
20+
@Override
21+
public Object getRequestBody() {
22+
return "";
23+
}
24+
25+
@Override
26+
public Boolean parseResponse(final String responseStr) throws IOException {
27+
return true;
28+
}
29+
}

src/main/java/org/sourcelab/kafka/connect/apiclient/request/post/connector/PostConnectors.java renamed to src/main/java/org/sourcelab/kafka/connect/apiclient/request/post/connector/PostConnector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55

66
import java.io.IOException;
77

8-
public class PostConnectors implements PostRequest<String> {
8+
public class PostConnector implements PostRequest<String> {
99
private final ConnectorDefinition connectorDefinition;
1010

11-
public PostConnectors(final ConnectorDefinition connectorDefinition) {
11+
public PostConnector(final ConnectorDefinition connectorDefinition) {
1212
if (connectorDefinition == null) {
1313
throw new NullPointerException("ConnectorDefinition parameter cannot be a null reference!");
1414
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package org.sourcelab.kafka.connect.apiclient.request.post.connector;
2+
3+
import com.sun.xml.internal.rngom.util.Uri;
4+
import org.sourcelab.kafka.connect.apiclient.request.post.PostRequest;
5+
6+
import java.io.IOException;
7+
8+
public class PostConnectorRestart implements PostRequest<Boolean> {
9+
private final String name;
10+
11+
public PostConnectorRestart(final String name) {
12+
this.name = name;
13+
}
14+
15+
@Override
16+
public String getApiEndpoint() {
17+
return "/connectors/" + Uri.escapeDisallowedChars(name) + "/restart";
18+
}
19+
20+
@Override
21+
public Object getRequestBody() {
22+
return "";
23+
}
24+
25+
@Override
26+
public Boolean parseResponse(final String responseStr) throws IOException {
27+
return true;
28+
}
29+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package org.sourcelab.kafka.connect.apiclient.request.put.connector;
2+
3+
import com.sun.xml.internal.rngom.util.Uri;
4+
import org.sourcelab.kafka.connect.apiclient.request.put.PutRequest;
5+
6+
import java.io.IOException;
7+
8+
public class PutConnectorPause implements PutRequest<Boolean> {
9+
private final String name;
10+
11+
public PutConnectorPause(final String name) {
12+
this.name = name;
13+
}
14+
15+
@Override
16+
public String getApiEndpoint() {
17+
return "/connectors/" + Uri.escapeDisallowedChars(name) + "/pause";
18+
}
19+
20+
@Override
21+
public Object getRequestBody() {
22+
return null;
23+
}
24+
25+
@Override
26+
public Boolean parseResponse(final String responseStr) throws IOException {
27+
return true;
28+
}
29+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package org.sourcelab.kafka.connect.apiclient.request.put.connector;
2+
3+
import com.sun.xml.internal.rngom.util.Uri;
4+
import org.sourcelab.kafka.connect.apiclient.request.put.PutRequest;
5+
6+
import java.io.IOException;
7+
8+
public class PutConnectorResume implements PutRequest<Boolean> {
9+
private final String name;
10+
11+
public PutConnectorResume(final String name) {
12+
this.name = name;
13+
}
14+
15+
@Override
16+
public String getApiEndpoint() {
17+
return "/connectors/" + Uri.escapeDisallowedChars(name) + "/resume";
18+
}
19+
20+
@Override
21+
public Object getRequestBody() {
22+
return null;
23+
}
24+
25+
@Override
26+
public Boolean parseResponse(final String responseStr) throws IOException {
27+
return true;
28+
}
29+
}

src/main/java/org/sourcelab/kafka/connect/apiclient/rest/HttpClientRestClient.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.apache.http.client.ResponseHandler;
1010
import org.apache.http.client.config.RequestConfig;
1111
import org.apache.http.client.entity.UrlEncodedFormEntity;
12+
import org.apache.http.client.methods.HttpDelete;
1213
import org.apache.http.client.methods.HttpGet;
1314
import org.apache.http.client.methods.HttpPost;
1415
import org.apache.http.client.methods.HttpPut;
@@ -157,6 +158,8 @@ public RestResponse submitRequest(final Request request) throws RestException {
157158
return submitPostRequest(url, request.getRequestBody(), responseHandler);
158159
case PUT:
159160
return submitPutRequest(url, request.getRequestBody(), responseHandler);
161+
case DELETE:
162+
return submitDeleteRequest(url, request.getRequestBody(), responseHandler);
160163
default:
161164
throw new IllegalArgumentException("Unknown Request Method: " + request.getRequestMethod());
162165
}
@@ -294,6 +297,50 @@ private <T> T submitPutRequest(final String url, final Object requestBody, final
294297
return null;
295298
}
296299

300+
/**
301+
* Internal DELETE method.
302+
* @param url Url to DELETE to.
303+
* @param requestBody POST entity include in the request body
304+
* @param responseHandler The response Handler to use to parse the response
305+
* @param <T> The type that ResponseHandler returns.
306+
* @return Parsed response.
307+
*/
308+
private <T> T submitDeleteRequest(final String url, final Object requestBody, final ResponseHandler<T> responseHandler) throws IOException {
309+
try {
310+
// Construct URI including our request parameters.
311+
final URIBuilder uriBuilder = new URIBuilder(url)
312+
.setCharset(StandardCharsets.UTF_8);
313+
314+
final HttpDelete delete = new HttpDelete(url);
315+
316+
// Add Accept header.
317+
delete.addHeader(new BasicHeader("Accept", "application/json"));
318+
319+
// Conditionally add content-type header?
320+
delete.addHeader(new BasicHeader("Content-Type", "application/json"));
321+
322+
// Define required auth params
323+
final List<NameValuePair> params = new ArrayList<>();
324+
325+
// Convert to Json
326+
final String jsonPayloadStr = JacksonFactory.newInstance().writeValueAsString(requestBody);
327+
328+
logger.info("Executing request {} with {}", delete.getRequestLine(), jsonPayloadStr);
329+
330+
// Execute and return
331+
return httpClient.execute(delete, responseHandler);
332+
} catch (ClientProtocolException e) {
333+
e.printStackTrace();
334+
} catch (IOException e) {
335+
// Typically this is a parse error.
336+
e.printStackTrace();
337+
} catch (URISyntaxException e) {
338+
// Bad URI building
339+
e.printStackTrace();
340+
}
341+
return null;
342+
}
343+
297344
/**
298345
* Internal helper method for generating URLs w/ the appropriate API host and API version.
299346
* @param endPoint The end point you want to hit.

src/test/java/org/sourcelab/kafka/connect/apiclient/ApiClientTest.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,49 @@ public void testAddConnector() {
8888
*/
8989
@Test
9090
public void testUpdateConnectorConfig() {
91+
final String connectorName = "My Test Connector";
92+
9193
final Map<String, String> config = new HashMap<>();
9294
config.put("connector.class", "org.apache.kafka.connect.tools.MockConnector");
9395
config.put("tasks.max", "10");
9496
config.put("topics", "test-topic");
9597

96-
logger.info("Result: {}", apiClient.updateConnectorConfig("My Test Connector", config));
98+
logger.info("Result: {}", apiClient.updateConnectorConfig(connectorName, config));
99+
}
100+
101+
/**
102+
* Test restarting a connector.
103+
*/
104+
@Test
105+
public void testRestartConnector() {
106+
final String connectorName = "My Test Connector";
107+
logger.info("Result: {}", apiClient.restartConnector(connectorName));
108+
}
109+
110+
/**
111+
* Test pausing a connector.
112+
*/
113+
@Test
114+
public void testPauseConnector() {
115+
final String connectorName = "My Test Connector";
116+
logger.info("Result: {}", apiClient.pauseConnector(connectorName));
117+
}
118+
119+
/**
120+
* Test pausing a connector.
121+
*/
122+
@Test
123+
public void testResumeConnector() {
124+
final String connectorName = "My Test Connector";
125+
logger.info("Result: {}", apiClient.resumeConnector(connectorName));
126+
}
127+
128+
/**
129+
* Test pausing a connector.
130+
*/
131+
@Test
132+
public void testDeleteConnector() {
133+
final String connectorName = "My Test Connector";
134+
logger.info("Result: {}", apiClient.deleteConnector(connectorName));
97135
}
98136
}

0 commit comments

Comments
 (0)