Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions iotdb-collector/collector-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
</dependencies>
<build>
<resources>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ api_service_port=17070
# The number of concurrent threads for the source task.
# Effective mode: on every start
# Data type: int
task_source_parallelism_num=1
task_source_parallelism_num=4

# The number of concurrent threads for the process task.
# Effective mode: on every start
Expand All @@ -60,6 +60,16 @@ task_sink_ring_buffer_size=1024
# Data type: string
task_database_file_path=system/database/task.db

# Proactively triggers the interval for batch deliveries
# Effective mode: on every start
# Data type: long
executor_cron_heartbeat_event_interval_seconds=20

# Task progress storage interval
# Effective mode: on every start
# Data type: int
task_progress_report_interval=60

####################
### Plugin Configuration
####################
Expand Down Expand Up @@ -146,9 +156,4 @@ pipe_leader_cache_memory_usage_percentage=0.1
# Enable/disable reference tracking for pipe events
# Effective mode: on every start
# Data type: boolean
pipe_event_reference_tracking_enabled=true

# Proactively triggers the interval for batch deliveries
# Effective mode: on every start
# Data type: long
executor_cron_heartbeat_event_interval_seconds=20
pipe_event_reference_tracking_enabled=true
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.apache.iotdb.collector.config.Configuration;
import org.apache.iotdb.collector.service.ApiService;
import org.apache.iotdb.collector.service.IService;
import org.apache.iotdb.collector.service.PeriodicalJobService;
import org.apache.iotdb.collector.service.PersistenceService;
import org.apache.iotdb.collector.service.RuntimeService;
import org.apache.iotdb.collector.service.ScheduleService;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,8 +41,8 @@ public class Application {
private Application() {
services.add(new RuntimeService());
services.add(new ApiService());
services.add(new ScheduleService());
services.add(new PersistenceService());
services.add(new PeriodicalJobService());
}

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,4 @@ public void setValue(final String valueString) {
value = Integer.parseInt(valueString);
}
};

public static final Option<Long> EXECUTOR_CRON_HEARTBEAT_EVENT_INTERVAL_SECONDS =
new Option<Long>("executor_cron_heartbeat_event_interval_seconds", 20L) {
@Override
public void setValue(final String valueString) {
value = Long.parseLong(valueString);
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,20 @@
value = addHomeDir(valueString);
}
};

public static final Option<Long> EXECUTOR_CRON_HEARTBEAT_EVENT_INTERVAL_SECONDS =
new Option<Long>("executor_cron_heartbeat_event_interval_seconds", 20L) {
@Override
public void setValue(final String valueString) {
value = Long.parseLong(valueString);

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException Note

Potential uncaught 'java.lang.NumberFormatException'.
}
};

public static final Option<Integer> TASK_PROGRESS_REPORT_INTERVAL =
new Option<Integer>("task_progress_report_interval", 60) {
@Override
public void setValue(String valueString) {
value = Integer.parseInt(valueString);

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException Note

Potential uncaught 'java.lang.NumberFormatException'.
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class DBConstant {
+ " source_attribute BLOB NOT NULL,\n"
+ " processor_attribute BLOB NOT NULL,\n"
+ " sink_attribute BLOB NOT NULL,\n"
+ " task_progress BLOB NOT NULL,\n"
+ " create_time TEXT NOT NULL\n"
+ ");";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,18 @@
package org.apache.iotdb.collector.persistence;

import org.apache.iotdb.collector.config.TaskRuntimeOptions;
import org.apache.iotdb.collector.runtime.progress.ProgressIndex;
import org.apache.iotdb.collector.runtime.task.TaskStateEnum;
import org.apache.iotdb.collector.runtime.task.event.ProgressReportEvent;
import org.apache.iotdb.collector.service.RuntimeService;
import org.apache.iotdb.collector.utils.SerializationUtil;

import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.ws.rs.core.Response;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand All @@ -41,9 +40,9 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

public class TaskPersistence extends Persistence {

Expand All @@ -68,9 +67,9 @@ protected void initDatabaseFileIfPossible() {

@Override
protected void initTableIfPossible() {
try (final Connection connection = getConnection()) {
final PreparedStatement statement =
connection.prepareStatement(DBConstant.CREATE_TASK_TABLE_SQL);
try (final Connection connection = getConnection();
final PreparedStatement statement =
connection.prepareStatement(DBConstant.CREATE_TASK_TABLE_SQL)) {
statement.executeUpdate();
} catch (final SQLException e) {
LOGGER.warn("Failed to create task database", e);
Expand All @@ -82,10 +81,9 @@ public void tryResume() {
final String queryAllTaskSQL =
"SELECT task_id, task_state, source_attribute, processor_attribute, sink_attribute, create_time FROM task";

try (final Connection connection = getConnection()) {
final PreparedStatement statement = connection.prepareStatement(queryAllTaskSQL);
final ResultSet taskResultSet = statement.executeQuery();

try (final Connection connection = getConnection();
final PreparedStatement statement = connection.prepareStatement(queryAllTaskSQL);
final ResultSet taskResultSet = statement.executeQuery()) {
while (taskResultSet.next()) {
final String taskId = taskResultSet.getString(1);
final TaskStateEnum taskState = TaskStateEnum.values()[taskResultSet.getInt(2)];
Expand All @@ -96,9 +94,9 @@ public void tryResume() {
tryRecoverTask(
taskId,
taskState,
deserialize(sourceAttribute),
deserialize(processorAttribute),
deserialize(sinkAttribute));
SerializationUtil.deserialize(sourceAttribute),
SerializationUtil.deserialize(processorAttribute),
SerializationUtil.deserialize(sinkAttribute));
}
} catch (final SQLException e) {
LOGGER.warn("Failed to resume task persistence message, because {}", e.getMessage());
Expand All @@ -125,43 +123,29 @@ public void tryRecoverTask(
}
}

private Map<String, String> deserialize(final byte[] buffer) {
final Map<String, String> attribute = new HashMap<>();
final ByteBuffer attributeBuffer = ByteBuffer.wrap(buffer);

final int size = ReadWriteIOUtils.readInt(attributeBuffer);
for (int i = 0; i < size; i++) {
final String key = ReadWriteIOUtils.readString(attributeBuffer);
final String value = ReadWriteIOUtils.readString(attributeBuffer);

attribute.put(key, value);
}

return attribute;
}

public void tryPersistenceTask(
final String taskId,
final TaskStateEnum taskState,
final Map<String, String> sourceAttribute,
final Map<String, String> processorAttribute,
final Map<String, String> sinkAttribute) {
final String insertSQL =
"INSERT INTO task(task_id, task_state , source_attribute, processor_attribute, sink_attribute, create_time) values(?, ?, ?, ?, ?, ?)";
"INSERT INTO task(task_id, task_state , source_attribute, processor_attribute, sink_attribute,task_progress, create_time) values(?, ?,?, ?, ?, ?, ?)";

try (final Connection connection = getConnection()) {
final PreparedStatement statement = connection.prepareStatement(insertSQL);
try (final Connection connection = getConnection();
final PreparedStatement statement = connection.prepareStatement(insertSQL)) {

final byte[] sourceAttributeBuffer = serialize(sourceAttribute);
final byte[] processorAttributeBuffer = serialize(processorAttribute);
final byte[] sinkAttributeBuffer = serialize(sinkAttribute);
final byte[] sourceAttributeBuffer = SerializationUtil.serialize(sourceAttribute);
final byte[] processorAttributeBuffer = SerializationUtil.serialize(processorAttribute);
final byte[] sinkAttributeBuffer = SerializationUtil.serialize(sinkAttribute);

statement.setString(1, taskId);
statement.setInt(2, taskState.getTaskState());
statement.setBytes(3, sourceAttributeBuffer);
statement.setBytes(4, processorAttributeBuffer);
statement.setBytes(5, sinkAttributeBuffer);
statement.setString(6, String.valueOf(new Timestamp(System.currentTimeMillis())));
statement.setBytes(6, null);
statement.setString(7, String.valueOf(new Timestamp(System.currentTimeMillis())));
statement.executeUpdate();

LOGGER.info("successfully persisted task {} info", taskId);
Expand All @@ -170,25 +154,11 @@ public void tryPersistenceTask(
}
}

private byte[] serialize(final Map<String, String> attribute) throws IOException {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write(attribute.size(), outputStream);
for (final Map.Entry<String, String> entry : attribute.entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
ReadWriteIOUtils.write(entry.getValue(), outputStream);
}

return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size())
.array();
}
}

public void tryDeleteTask(final String taskId) {
final String deleteSQL = "DELETE FROM task WHERE task_id = ?";

try (final Connection connection = getConnection()) {
final PreparedStatement statement = connection.prepareStatement(deleteSQL);
try (final Connection connection = getConnection();
final PreparedStatement statement = connection.prepareStatement(deleteSQL)) {
statement.setString(1, taskId);
statement.executeUpdate();

Expand All @@ -201,15 +171,55 @@ public void tryDeleteTask(final String taskId) {
public void tryAlterTaskState(final String taskId, final TaskStateEnum taskState) {
final String alterSQL = "UPDATE task SET task_state = ? WHERE task_id = ?";

try (final Connection connection = getConnection()) {
final PreparedStatement statement = connection.prepareStatement(alterSQL);
try (final Connection connection = getConnection();
final PreparedStatement statement = connection.prepareStatement(alterSQL)) {
statement.setInt(1, taskState.getTaskState());
statement.setString(2, taskId);
statement.executeUpdate();

LOGGER.info("successfully altered task {}", taskId);
} catch (SQLException e) {
} catch (final SQLException e) {
LOGGER.warn("Failed to alter task persistence message, because {}", e.getMessage());
}
}

public void tryReportTaskProgress(final ProgressReportEvent reportEvent) {
if (reportEvent.getInstancesProgress() == null
|| reportEvent.getInstancesProgress().isEmpty()) {
return;
}

final String reportSQL = "UPDATE task SET task_progress = ? WHERE task_id = ?";

try (final Connection connection = getConnection();
final PreparedStatement statement = connection.prepareStatement(reportSQL)) {
statement.setBytes(
1, SerializationUtil.serializeInstances(reportEvent.getInstancesProgress()));
statement.setString(2, reportEvent.getTaskId());
statement.executeUpdate();
} catch (final SQLException | IOException e) {
LOGGER.warn("Failed to report task progress because {}", e.getMessage());
}
}

public Optional<Map<Integer, ProgressIndex>> getTasksProgress(final String taskId) {
final String queryProgressSQL = "SELECT task_progress FROM task WHERE task_id = ?";

try (final Connection connection = getConnection();
final PreparedStatement statement = connection.prepareStatement(queryProgressSQL)) {
statement.setString(1, taskId);

try (final ResultSet resultSet = statement.executeQuery()) {
final byte[] bytes = resultSet.getBytes("task_progress");

return bytes == null
? Optional.empty()
: Optional.of(SerializationUtil.deserializeInstances(bytes));
}
} catch (final SQLException e) {
LOGGER.warn("Failed to retrieve task progress because {}", e.getMessage());
}

return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

package org.apache.iotdb.collector.plugin.api;

import org.apache.iotdb.collector.runtime.progress.ProgressIndex;
import org.apache.iotdb.pipe.api.PipeSource;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;

import java.util.Optional;

public abstract class PullSource implements PipeSource {

@Override
Expand All @@ -31,4 +34,6 @@ public final void customize(
PipeExtractorRuntimeConfiguration pipeExtractorRuntimeConfiguration) {
throw new UnsupportedOperationException();
}

public abstract Optional<ProgressIndex> report();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,20 @@

package org.apache.iotdb.collector.plugin.api;

import org.apache.iotdb.collector.runtime.progress.ProgressIndex;
import org.apache.iotdb.collector.runtime.task.TaskDispatch;
import org.apache.iotdb.pipe.api.PipeSource;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;

import java.util.Optional;

public abstract class PushSource implements PipeSource {

private TaskDispatch dispatch;

protected EventCollector collector;

public PushSource() {
Expand All @@ -50,7 +56,25 @@ public final Event supply() {
throw new UnsupportedOperationException();
}

public final void markPausePosition() {
dispatch.waitUntilRunningOrDropped();
}

public final void pause() {
dispatch.pause();
}

public final void resume() {
dispatch.resume();
}

public final void supply(final Event event) throws Exception {
collector.collect(event);
}

public final void setDispatch(final TaskDispatch dispatch) {
this.dispatch = dispatch;
}

public abstract Optional<ProgressIndex> report();
}
Loading