Skip to content

Commit e8c86d7

Browse files
authored
Collector: Fixed task not being able to be dropped when the task is stopped & Modify PipeParameters & Supports registration of plugins and reflection usage & Add SQLite support for persistence of plugin and task meta. (#57)
1 parent 75aecc9 commit e8c86d7

37 files changed

Lines changed: 1330 additions & 181 deletions

iotdb-collector/collector-core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@
107107
<groupId>com.google.code.findbugs</groupId>
108108
<artifactId>jsr305</artifactId>
109109
</dependency>
110+
<dependency>
111+
<groupId>org.xerial</groupId>
112+
<artifactId>sqlite-jdbc</artifactId>
113+
</dependency>
110114
</dependencies>
111115
<build>
112116
<plugins>

iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.iotdb.collector.config.Configuration;
2323
import org.apache.iotdb.collector.service.ApiService;
2424
import org.apache.iotdb.collector.service.IService;
25+
import org.apache.iotdb.collector.service.PersistenceService;
2526
import org.apache.iotdb.collector.service.RuntimeService;
2627

2728
import org.slf4j.Logger;
@@ -39,6 +40,7 @@ public class Application {
3940
private Application() {
4041
services.add(new RuntimeService());
4142
services.add(new ApiService());
43+
services.add(new PersistenceService());
4244
}
4345

4446
public static void main(String[] args) {

iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,9 @@
2020
package org.apache.iotdb.collector.api.v1.plugin.impl;
2121

2222
import org.apache.iotdb.collector.api.v1.plugin.PluginApiService;
23-
import org.apache.iotdb.collector.api.v1.plugin.model.AlterPluginRequest;
2423
import org.apache.iotdb.collector.api.v1.plugin.model.CreatePluginRequest;
2524
import org.apache.iotdb.collector.api.v1.plugin.model.DropPluginRequest;
26-
import org.apache.iotdb.collector.api.v1.plugin.model.StartPluginRequest;
27-
import org.apache.iotdb.collector.api.v1.plugin.model.StopPluginRequest;
25+
import org.apache.iotdb.collector.service.RuntimeService;
2826

2927
import javax.ws.rs.core.Response;
3028
import javax.ws.rs.core.SecurityContext;
@@ -34,30 +32,34 @@ public class PluginApiServiceImpl extends PluginApiService {
3432
@Override
3533
public Response createPlugin(
3634
final CreatePluginRequest createPluginRequest, final SecurityContext securityContext) {
37-
return Response.ok("create plugin").build();
38-
}
35+
PluginApiServiceRequestValidationHandler.validateCreatePluginRequest(createPluginRequest);
3936

40-
@Override
41-
public Response alterPlugin(
42-
final AlterPluginRequest alterPluginRequest, final SecurityContext securityContext) {
43-
return Response.ok("alter plugin").build();
37+
return RuntimeService.plugin().isPresent()
38+
? RuntimeService.plugin()
39+
.get()
40+
.createPlugin(
41+
createPluginRequest.getPluginName().toUpperCase(),
42+
createPluginRequest.getClassName(),
43+
createPluginRequest.getJarName(),
44+
null,
45+
true)
46+
: Response.ok("create plugin").build();
4447
}
4548

4649
@Override
47-
public Response startPlugin(
48-
final StartPluginRequest startPluginRequest, final SecurityContext securityContext) {
49-
return Response.ok("start plugin").build();
50-
}
50+
public Response dropPlugin(
51+
final DropPluginRequest dropPluginRequest, final SecurityContext securityContext) {
52+
PluginApiServiceRequestValidationHandler.validateDropPluginRequest(dropPluginRequest);
5153

52-
@Override
53-
public Response stopPlugin(
54-
final StopPluginRequest stopPluginRequest, final SecurityContext securityContext) {
55-
return Response.ok("stop plugin").build();
54+
return RuntimeService.plugin().isPresent()
55+
? RuntimeService.plugin().get().dropPlugin(dropPluginRequest.getPluginName().toUpperCase())
56+
: Response.ok("drop plugin").build();
5657
}
5758

5859
@Override
59-
public Response dropPlugin(
60-
final DropPluginRequest dropPluginRequest, final SecurityContext securityContext) {
61-
return Response.ok("drop plugin").build();
60+
public Response showPlugin(final SecurityContext securityContext) {
61+
return RuntimeService.plugin().isPresent()
62+
? RuntimeService.plugin().get().showPlugin()
63+
: Response.ok("show plugin").build();
6264
}
6365
}

iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,21 @@
1919

2020
package org.apache.iotdb.collector.api.v1.plugin.impl;
2121

22-
public class PluginApiServiceRequestValidationHandler {}
22+
import org.apache.iotdb.collector.api.v1.plugin.model.CreatePluginRequest;
23+
import org.apache.iotdb.collector.api.v1.plugin.model.DropPluginRequest;
24+
25+
import java.util.Objects;
26+
27+
public class PluginApiServiceRequestValidationHandler {
28+
private PluginApiServiceRequestValidationHandler() {}
29+
30+
public static void validateCreatePluginRequest(final CreatePluginRequest createPluginRequest) {
31+
Objects.requireNonNull(createPluginRequest.getPluginName(), "plugin name cannot be null");
32+
Objects.requireNonNull(createPluginRequest.getClassName(), "class name cannot be null");
33+
Objects.requireNonNull(createPluginRequest.getJarName(), "jar name cannot be null");
34+
}
35+
36+
public static void validateDropPluginRequest(final DropPluginRequest dropPluginRequest) {
37+
Objects.requireNonNull(dropPluginRequest.getPluginName(), "plugin name cannot be null");
38+
}
39+
}

iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.iotdb.collector.api.v1.task.model.DropTaskRequest;
2626
import org.apache.iotdb.collector.api.v1.task.model.StartTaskRequest;
2727
import org.apache.iotdb.collector.api.v1.task.model.StopTaskRequest;
28+
import org.apache.iotdb.collector.runtime.task.TaskStateEnum;
2829
import org.apache.iotdb.collector.service.RuntimeService;
2930

3031
import javax.ws.rs.core.Response;
@@ -42,9 +43,11 @@ public Response createTask(
4243
.get()
4344
.createTask(
4445
createTaskRequest.getTaskId(),
46+
TaskStateEnum.RUNNING,
4547
createTaskRequest.getSourceAttribute(),
4648
createTaskRequest.getProcessorAttribute(),
47-
createTaskRequest.getSinkAttribute())
49+
createTaskRequest.getSinkAttribute(),
50+
true)
4851
: Response.serverError().entity("Task runtime is down").build();
4952
}
5053

iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class Options {
3636
try {
3737
Class.forName(ApiServiceOptions.class.getName());
3838
Class.forName(TaskRuntimeOptions.class.getName());
39+
Class.forName(PluginRuntimeOptions.class.getName());
3940
} catch (final ClassNotFoundException e) {
4041
throw new RuntimeException("Failed to load options", e);
4142
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.collector.config;
21+
22+
import java.io.File;
23+
24+
public class PluginRuntimeOptions extends Options {
25+
public static final Option<String> PLUGIN_LIB_DIR =
26+
new Option<String>("plugin_lib_dir", "ext" + File.separator + "plugin") {
27+
@Override
28+
public void setValue(final String valueString) {
29+
value = valueString;
30+
}
31+
};
32+
33+
public static final Option<String> PLUGIN_INSTALL_LIB_DIR =
34+
new Option<String>(
35+
"plugin_install_lib_dir", PLUGIN_LIB_DIR.value() + File.separator + "install") {
36+
@Override
37+
public void setValue(final String valueString) {
38+
value = valueString;
39+
}
40+
};
41+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.collector.persistence;
21+
22+
public class DBConstant {
23+
24+
public static final String CREATE_PLUGIN_TABLE_SQL =
25+
"CREATE TABLE IF NOT EXISTS plugin\n"
26+
+ "(\n"
27+
+ " plugin_name TEXT PRIMARY KEY,\n"
28+
+ " class_name TEXT NOT NULL,\n"
29+
+ " jar_name TEXT NOT NULL,\n"
30+
+ " jar_md5 TEXT NOT NULL,\n"
31+
+ " create_time TEXT NOT NULL\n"
32+
+ ");";
33+
public static final String CREATE_TASK_TABLE_SQL =
34+
"CREATE TABLE IF NOT EXISTS task\n"
35+
+ "(\n"
36+
+ " task_id TEXT PRIMARY KEY,\n"
37+
+ " task_state INT NOT NULL,\n"
38+
+ " source_attribute BLOB NOT NULL,\n"
39+
+ " processor_attribute BLOB NOT NULL,\n"
40+
+ " sink_attribute BLOB NOT NULL,\n"
41+
+ " create_time TEXT NOT NULL\n"
42+
+ ");";
43+
44+
public static final String PLUGIN_DATABASE_FILE_PATH = "ext/db/plugin.db";
45+
public static final String TASK_DATABASE_FILE_PATH = "ext/db/task.db";
46+
47+
public static final String PLUGIN_DATABASE_URL = "jdbc:sqlite:" + PLUGIN_DATABASE_FILE_PATH;
48+
public static final String TASK_DATABASE_URL = "jdbc:sqlite:" + TASK_DATABASE_FILE_PATH;
49+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.collector.persistence;
21+
22+
import java.sql.Connection;
23+
import java.sql.DriverManager;
24+
import java.sql.SQLException;
25+
26+
public abstract class Persistence {
27+
28+
private final String databaseUrl;
29+
30+
public Persistence(final String databaseUrl) {
31+
this.databaseUrl = databaseUrl;
32+
initDatabaseFileIfPossible();
33+
initTableIfPossible();
34+
}
35+
36+
protected abstract void initDatabaseFileIfPossible();
37+
38+
protected abstract void initTableIfPossible();
39+
40+
public abstract void tryResume();
41+
42+
protected Connection getConnection() throws SQLException {
43+
return DriverManager.getConnection(databaseUrl);
44+
}
45+
}

0 commit comments

Comments
 (0)