Skip to content

Commit 8321819

Browse files
committed
implement more end points
1 parent 294b716 commit 8321819

31 files changed

+516
-167
lines changed

src/main/java/org/sourcelab/kafka/connect/apiclient/ApiClient.java

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,24 @@
55
import org.sourcelab.kafka.connect.apiclient.request.JacksonFactory;
66
import org.sourcelab.kafka.connect.apiclient.request.Request;
77
import org.sourcelab.kafka.connect.apiclient.request.RequestErrorResponse;
8-
import org.sourcelab.kafka.connect.apiclient.request.delete.connector.DeleteConnector;
8+
import org.sourcelab.kafka.connect.apiclient.request.delete.DeleteConnector;
99
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorStatus;
10-
import org.sourcelab.kafka.connect.apiclient.request.get.connector.GetConnector;
11-
import org.sourcelab.kafka.connect.apiclient.request.get.connector.GetConnectorConfig;
12-
import org.sourcelab.kafka.connect.apiclient.request.get.connector.GetConnectorStatus;
13-
import org.sourcelab.kafka.connect.apiclient.request.get.connector.GetConnectors;
10+
import org.sourcelab.kafka.connect.apiclient.request.dto.NewConnectorDefinition;
11+
import org.sourcelab.kafka.connect.apiclient.request.dto.Task;
12+
import org.sourcelab.kafka.connect.apiclient.request.dto.TaskStatus;
13+
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnector;
14+
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorConfig;
15+
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorStatus;
16+
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorTaskStatus;
17+
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectorTasks;
18+
import org.sourcelab.kafka.connect.apiclient.request.get.GetConnectors;
1419
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorDefinition;
15-
import org.sourcelab.kafka.connect.apiclient.request.post.connector.PostConnector;
16-
import org.sourcelab.kafka.connect.apiclient.request.post.connector.PostConnectorRestart;
17-
import org.sourcelab.kafka.connect.apiclient.request.put.connector.PutConnectorConfig;
18-
import org.sourcelab.kafka.connect.apiclient.request.put.connector.PutConnectorPause;
19-
import org.sourcelab.kafka.connect.apiclient.request.put.connector.PutConnectorResume;
20+
import org.sourcelab.kafka.connect.apiclient.request.post.PostConnector;
21+
import org.sourcelab.kafka.connect.apiclient.request.post.PostConnectorRestart;
22+
import org.sourcelab.kafka.connect.apiclient.request.post.PostConnectorTaskRestart;
23+
import org.sourcelab.kafka.connect.apiclient.request.put.PutConnectorConfig;
24+
import org.sourcelab.kafka.connect.apiclient.request.put.PutConnectorPause;
25+
import org.sourcelab.kafka.connect.apiclient.request.put.PutConnectorResume;
2026
import org.sourcelab.kafka.connect.apiclient.rest.HttpClientRestClient;
2127
import org.sourcelab.kafka.connect.apiclient.rest.InvalidRequestException;
2228
import org.sourcelab.kafka.connect.apiclient.rest.RestClient;
@@ -123,8 +129,14 @@ public ConnectorStatus getConnectorStatus(final String connectorName) {
123129
return submitRequest(new GetConnectorStatus(connectorName));
124130
}
125131

126-
// TODO Add return value
127-
public String addConnector(final ConnectorDefinition connectorDefinition) {
132+
/**
133+
* Create a new connector, returning the current connector info if successful.
134+
* https://docs.confluent.io/current/connect/restapi.html#post--connectors
135+
*
136+
* @param connectorDefinition Defines the new connector to deploy
137+
* @return connector info.
138+
*/
139+
public ConnectorDefinition addConnector(final NewConnectorDefinition connectorDefinition) {
128140
return submitRequest(new PostConnector(connectorDefinition));
129141
}
130142

@@ -184,6 +196,41 @@ public Boolean deleteConnector(final String connectorName) {
184196
return submitRequest(new DeleteConnector(connectorName));
185197
}
186198

199+
/**
200+
* Get a list of tasks currently running for the connector.
201+
* https://docs.confluent.io/current/connect/restapi.html#get--connectors-(string-name)-tasks
202+
*
203+
* @param connectorName Name of connector to retrieve tasks for.
204+
* @return Collection of details about each task.
205+
*/
206+
public Collection<Task> getConnectorTasks(final String connectorName) {
207+
return submitRequest(new GetConnectorTasks(connectorName));
208+
}
209+
210+
/**
211+
* Get a task’s status.
212+
* https://docs.confluent.io/current/connect/restapi.html#get--connectors-(string-name)-tasks-(int-taskid)-status
213+
*
214+
* @param connectorName Name of connector to retrieve tasks for.
215+
* @param taskId Id of task to get status for.
216+
* @return Details about task.
217+
*/
218+
public TaskStatus getConnectorTaskStatus(final String connectorName, final int taskId) {
219+
return submitRequest(new GetConnectorTaskStatus(connectorName, taskId));
220+
}
221+
222+
/**
223+
* Restart an individual task.
224+
* https://docs.confluent.io/current/connect/restapi.html#post--connectors-(string-name)-tasks-(int-taskid)-restart
225+
*
226+
* @param connectorName Name of connector to restart tasks for.
227+
* @param taskId Id of task to restart
228+
* @return True if a success.
229+
*/
230+
public Boolean restartConnectorTask(final String connectorName, final int taskId) {
231+
return submitRequest(new PostConnectorTaskRestart(connectorName, taskId));
232+
}
233+
187234
/**
188235
* package protected for access in tests.
189236
* @return Rest Client.

src/main/java/org/sourcelab/kafka/connect/apiclient/request/JacksonFactory.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,26 @@
1515
public class JacksonFactory {
1616

1717
/**
18-
* Creates properly configured Jackson Object Mapper instances.
19-
* @return ObjectMapper instance.
18+
* Holds our jackson singleton mapper. ObjectMapper is defined as being
19+
* ThreadSafe so this should be OK to stash as a static and shared.
2020
*/
21-
public static ObjectMapper newInstance() {
22-
// Create new mapper
23-
final ObjectMapper mapper = new ObjectMapper();
21+
private static final ObjectMapper mapper = new ObjectMapper();
2422

25-
// Configure it
23+
/*
24+
* Statically configure the instance.
25+
*/
26+
static {
27+
// Configure mapper
2628
mapper
2729
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
2830
.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
31+
}
2932

33+
/**
34+
* Creates properly configured Jackson Object Mapper instances.
35+
* @return ObjectMapper instance.
36+
*/
37+
public static ObjectMapper newInstance() {
3038
return mapper;
3139
}
3240
}

src/main/java/org/sourcelab/kafka/connect/apiclient/request/delete/connector/DeleteConnector.java renamed to src/main/java/org/sourcelab/kafka/connect/apiclient/request/delete/DeleteConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package org.sourcelab.kafka.connect.apiclient.request.delete.connector;
1+
package org.sourcelab.kafka.connect.apiclient.request.delete;
22

33
import com.sun.xml.internal.rngom.util.Uri;
44
import org.sourcelab.kafka.connect.apiclient.request.delete.DeleteRequest;

src/main/java/org/sourcelab/kafka/connect/apiclient/request/dto/ConnectorDefinition.java

Lines changed: 0 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
package org.sourcelab.kafka.connect.apiclient.request.dto;
22

3-
import java.util.ArrayList;
4-
import java.util.Collections;
5-
import java.util.HashMap;
63
import java.util.List;
74
import java.util.Map;
85

@@ -12,26 +9,6 @@ public class ConnectorDefinition {
129
private Map<String, String> config;
1310
private List<TaskDefinition> tasks;
1411

15-
/**
16-
* Default constructor.
17-
*/
18-
public ConnectorDefinition() {
19-
}
20-
21-
/**
22-
* Constructor
23-
* @param name Name of Connector.
24-
* @param config Configuration values for connector.
25-
*/
26-
public ConnectorDefinition(final String name, final Map<String, String> config) {
27-
this.name = name;
28-
this.config = Collections.unmodifiableMap(new HashMap<>(config));
29-
30-
// Not used.
31-
this.type = null;
32-
this.tasks = Collections.unmodifiableList(new ArrayList<>());
33-
}
34-
3512
public String getName() {
3613
return name;
3714
}
@@ -48,10 +25,6 @@ public List<TaskDefinition> getTasks() {
4825
return tasks;
4926
}
5027

51-
public static Builder newBuilder() {
52-
return new Builder();
53-
}
54-
5528
@Override
5629
public String toString() {
5730
return "ConnectorDefinition{"
@@ -62,44 +35,6 @@ public String toString() {
6235
+ '}';
6336
}
6437

65-
public static final class Builder {
66-
private String name;
67-
private String type = null;
68-
private Map<String, String> config = new HashMap<>();
69-
70-
private Builder() {
71-
}
72-
73-
public Builder withName(final String name) {
74-
this.name = name;
75-
return this;
76-
}
77-
78-
public Builder withConfig(final Map<String, String> config) {
79-
this.config = new HashMap<>(config);
80-
return this;
81-
}
82-
83-
public Builder withConfig(final String key, final String value) {
84-
this.config.put(key, value);
85-
return this;
86-
}
87-
88-
public Builder withConfig(final String key, final Object value) {
89-
this.config.put(key, value.toString());
90-
return this;
91-
}
92-
93-
public Builder withType(final String type) {
94-
this.type = type;
95-
return this;
96-
}
97-
98-
public ConnectorDefinition build() {
99-
return new ConnectorDefinition(name, config);
100-
}
101-
}
102-
10338
private static class TaskDefinition {
10439
private String connector;
10540
private int task;
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.sourcelab.kafka.connect.apiclient.request.dto;
2+
3+
import com.fasterxml.jackson.annotation.JsonAlias;
4+
5+
public class ConnectorPlugin {
6+
@JsonAlias("class")
7+
private String className;
8+
9+
private String getClassName() {
10+
return className;
11+
}
12+
13+
@Override
14+
public String toString() {
15+
return "ConnectorPlugin{"
16+
+ "className='" + className + '\''
17+
+ '}';
18+
}
19+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package org.sourcelab.kafka.connect.apiclient.request.dto;
2+
3+
import java.util.Collections;
4+
import java.util.HashMap;
5+
import java.util.Map;
6+
7+
public class NewConnectorDefinition {
8+
private final String name;
9+
private final Map<String, String> config;
10+
11+
/**
12+
* Constructor
13+
* @param name Name of Connector.
14+
* @param config Configuration values for connector.
15+
*/
16+
public NewConnectorDefinition(final String name, final Map<String, String> config) {
17+
this.name = name;
18+
this.config = Collections.unmodifiableMap(new HashMap<>(config));
19+
}
20+
21+
public String getName() {
22+
return name;
23+
}
24+
25+
public Map<String, String> getConfig() {
26+
return config;
27+
}
28+
29+
public static Builder newBuilder() {
30+
return new Builder();
31+
}
32+
33+
@Override
34+
public String toString() {
35+
return "NewConnectorDefinition{"
36+
+ "name='" + name + '\''
37+
+ ", config=" + config
38+
+ '}';
39+
}
40+
41+
/**
42+
* Builder for NewConnectorDefinition.
43+
*/
44+
public static final class Builder {
45+
private String name;
46+
private Map<String, String> config = new HashMap<>();
47+
48+
private Builder() {
49+
}
50+
51+
public Builder withName(final String name) {
52+
this.name = name;
53+
return this;
54+
}
55+
56+
public Builder withConfig(final Map<String, String> config) {
57+
this.config = new HashMap<>(config);
58+
return this;
59+
}
60+
61+
public Builder withConfig(final String key, final String value) {
62+
this.config.put(key, value);
63+
return this;
64+
}
65+
66+
public Builder withConfig(final String key, final Object value) {
67+
this.config.put(key, value.toString());
68+
return this;
69+
}
70+
71+
public NewConnectorDefinition build() {
72+
return new NewConnectorDefinition(name, config);
73+
}
74+
}
75+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package org.sourcelab.kafka.connect.apiclient.request.dto;
2+
3+
import java.util.Map;
4+
5+
/**
6+
* Represents Details about a Task.
7+
*/
8+
public class Task {
9+
private TaskId id;
10+
private Map<String, String> config;
11+
12+
public TaskId getId() {
13+
return id;
14+
}
15+
16+
public Map<String, String> getConfig() {
17+
return config;
18+
}
19+
20+
@Override
21+
public String toString() {
22+
return "Task{"
23+
+ "id=" + id
24+
+ ", config=" + config
25+
+ '}';
26+
}
27+
28+
/**
29+
* Defines a Task Id.
30+
*/
31+
private static class TaskId {
32+
private String connector;
33+
private int task;
34+
35+
public String getConnector() {
36+
return connector;
37+
}
38+
39+
public int getTask() {
40+
return task;
41+
}
42+
43+
@Override
44+
public String toString() {
45+
return "TaskId{"
46+
+ "connector='" + connector + '\''
47+
+ ", task=" + task
48+
+ '}';
49+
}
50+
}
51+
}

0 commit comments

Comments
 (0)