diff --git a/iotdb-collector/collector-core/pom.xml b/iotdb-collector/collector-core/pom.xml index 7d9c42d3..a0e45028 100644 --- a/iotdb-collector/collector-core/pom.xml +++ b/iotdb-collector/collector-core/pom.xml @@ -156,6 +156,10 @@ commons-codec commons-codec + + org.apache.kafka + kafka-clients + diff --git a/iotdb-collector/collector-core/src/assembly/resources/conf/application.properties b/iotdb-collector/collector-core/src/assembly/resources/conf/application.properties index 7cfcea42..d72b18c1 100644 --- a/iotdb-collector/collector-core/src/assembly/resources/conf/application.properties +++ b/iotdb-collector/collector-core/src/assembly/resources/conf/application.properties @@ -33,7 +33,7 @@ api_service_port=17070 # The number of concurrent threads for the source task. # Effective mode: on every start # Data type: int -task_source_parallelism_num=1 +task_source_parallelism_num=4 # The number of concurrent threads for the process task. # Effective mode: on every start @@ -60,6 +60,16 @@ task_sink_ring_buffer_size=1024 # Data type: string task_database_file_path=system/database/task.db +# Proactively triggers the interval for batch deliveries +# Effective mode: on every start +# Data type: long +executor_cron_heartbeat_event_interval_seconds=20 + +# Task progress storage interval +# Effective mode: on every start +# Data type: int +task_progress_report_interval=60 + #################### ### Plugin Configuration #################### @@ -146,9 +156,4 @@ pipe_leader_cache_memory_usage_percentage=0.1 # Enable/disable reference tracking for pipe events # Effective mode: on every start # Data type: boolean -pipe_event_reference_tracking_enabled=true - -# Proactively triggers the interval for batch deliveries -# Effective mode: on every start -# Data type: long -executor_cron_heartbeat_event_interval_seconds=20 \ No newline at end of file +pipe_event_reference_tracking_enabled=true \ No newline at end of file diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java index 80b5b42e..26174a13 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java @@ -22,9 +22,9 @@ import org.apache.iotdb.collector.config.Configuration; import org.apache.iotdb.collector.service.ApiService; import org.apache.iotdb.collector.service.IService; -import org.apache.iotdb.collector.service.PeriodicalJobService; import org.apache.iotdb.collector.service.PersistenceService; import org.apache.iotdb.collector.service.RuntimeService; +import org.apache.iotdb.collector.service.ScheduleService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,8 +41,8 @@ public class Application { private Application() { services.add(new RuntimeService()); services.add(new ApiService()); + services.add(new ScheduleService()); services.add(new PersistenceService()); - services.add(new PeriodicalJobService()); } public static void main(String[] args) { diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/PipeRuntimeOptions.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/PipeRuntimeOptions.java index 0abd49ab..c5f42020 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/PipeRuntimeOptions.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/PipeRuntimeOptions.java @@ -272,12 +272,4 @@ public void setValue(final String valueString) { value = Integer.parseInt(valueString); } }; - - public static final Option EXECUTOR_CRON_HEARTBEAT_EVENT_INTERVAL_SECONDS = - new Option("executor_cron_heartbeat_event_interval_seconds", 20L) { - @Override - public void setValue(final String valueString) { - value = Long.parseLong(valueString); - } - }; } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/TaskRuntimeOptions.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/TaskRuntimeOptions.java index dc35f339..35dda43c 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/TaskRuntimeOptions.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/TaskRuntimeOptions.java @@ -72,4 +72,20 @@ public void setValue(final String valueString) { value = addHomeDir(valueString); } }; + + public static final Option EXECUTOR_CRON_HEARTBEAT_EVENT_INTERVAL_SECONDS = + new Option("executor_cron_heartbeat_event_interval_seconds", 20L) { + @Override + public void setValue(final String valueString) { + value = Long.parseLong(valueString); + } + }; + + public static final Option TASK_PROGRESS_REPORT_INTERVAL = + new Option("task_progress_report_interval", 60) { + @Override + public void setValue(String valueString) { + value = Integer.parseInt(valueString); + } + }; } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/DBConstant.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/DBConstant.java index 5e18ce13..6928d62e 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/DBConstant.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/DBConstant.java @@ -41,6 +41,7 @@ public class DBConstant { + " source_attribute BLOB NOT NULL,\n" + " processor_attribute BLOB NOT NULL,\n" + " sink_attribute BLOB NOT NULL,\n" + + " task_progress BLOB NOT NULL,\n" + " create_time TEXT NOT NULL\n" + ");"; diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/TaskPersistence.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/TaskPersistence.java index 573f5b4f..2cf0c6cf 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/TaskPersistence.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/TaskPersistence.java @@ -20,19 +20,18 @@ package org.apache.iotdb.collector.persistence; import org.apache.iotdb.collector.config.TaskRuntimeOptions; +import org.apache.iotdb.collector.runtime.progress.ProgressIndex; import org.apache.iotdb.collector.runtime.task.TaskStateEnum; +import org.apache.iotdb.collector.runtime.task.event.ProgressReportEvent; import org.apache.iotdb.collector.service.RuntimeService; +import org.apache.iotdb.collector.utils.SerializationUtil; -import org.apache.tsfile.utils.PublicBAOS; -import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.ws.rs.core.Response; -import java.io.DataOutputStream; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -41,9 +40,9 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; -import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Optional; public class TaskPersistence extends Persistence { @@ -68,9 +67,9 @@ protected void initDatabaseFileIfPossible() { @Override protected void initTableIfPossible() { - try (final Connection connection = getConnection()) { - final PreparedStatement statement = - connection.prepareStatement(DBConstant.CREATE_TASK_TABLE_SQL); + try (final Connection connection = getConnection(); + final PreparedStatement statement = + connection.prepareStatement(DBConstant.CREATE_TASK_TABLE_SQL)) { statement.executeUpdate(); } catch (final SQLException e) { LOGGER.warn("Failed to create task database", e); @@ -82,10 +81,9 @@ public void tryResume() { final String queryAllTaskSQL = "SELECT task_id, task_state, source_attribute, processor_attribute, sink_attribute, create_time FROM task"; - try (final Connection connection = getConnection()) { - final PreparedStatement statement = connection.prepareStatement(queryAllTaskSQL); - final ResultSet taskResultSet = statement.executeQuery(); - + try (final Connection connection = getConnection(); + final PreparedStatement statement = connection.prepareStatement(queryAllTaskSQL); + final ResultSet taskResultSet = statement.executeQuery()) { while (taskResultSet.next()) { final String taskId = taskResultSet.getString(1); final TaskStateEnum taskState = TaskStateEnum.values()[taskResultSet.getInt(2)]; @@ -96,9 +94,9 @@ public void tryResume() { tryRecoverTask( taskId, taskState, - deserialize(sourceAttribute), - deserialize(processorAttribute), - deserialize(sinkAttribute)); + SerializationUtil.deserialize(sourceAttribute), + SerializationUtil.deserialize(processorAttribute), + SerializationUtil.deserialize(sinkAttribute)); } } catch (final SQLException e) { LOGGER.warn("Failed to resume task persistence message, because {}", e.getMessage()); @@ -125,21 +123,6 @@ public void tryRecoverTask( } } - private Map deserialize(final byte[] buffer) { - final Map attribute = new HashMap<>(); - final ByteBuffer attributeBuffer = ByteBuffer.wrap(buffer); - - final int size = ReadWriteIOUtils.readInt(attributeBuffer); - for (int i = 0; i < size; i++) { - final String key = ReadWriteIOUtils.readString(attributeBuffer); - final String value = ReadWriteIOUtils.readString(attributeBuffer); - - attribute.put(key, value); - } - - return attribute; - } - public void tryPersistenceTask( final String taskId, final TaskStateEnum taskState, @@ -147,21 +130,22 @@ public void tryPersistenceTask( final Map processorAttribute, final Map sinkAttribute) { final String insertSQL = - "INSERT INTO task(task_id, task_state , source_attribute, processor_attribute, sink_attribute, create_time) values(?, ?, ?, ?, ?, ?)"; + "INSERT INTO task(task_id, task_state , source_attribute, processor_attribute, sink_attribute,task_progress, create_time) values(?, ?,?, ?, ?, ?, ?)"; - try (final Connection connection = getConnection()) { - final PreparedStatement statement = connection.prepareStatement(insertSQL); + try (final Connection connection = getConnection(); + final PreparedStatement statement = connection.prepareStatement(insertSQL)) { - final byte[] sourceAttributeBuffer = serialize(sourceAttribute); - final byte[] processorAttributeBuffer = serialize(processorAttribute); - final byte[] sinkAttributeBuffer = serialize(sinkAttribute); + final byte[] sourceAttributeBuffer = SerializationUtil.serialize(sourceAttribute); + final byte[] processorAttributeBuffer = SerializationUtil.serialize(processorAttribute); + final byte[] sinkAttributeBuffer = SerializationUtil.serialize(sinkAttribute); statement.setString(1, taskId); statement.setInt(2, taskState.getTaskState()); statement.setBytes(3, sourceAttributeBuffer); statement.setBytes(4, processorAttributeBuffer); statement.setBytes(5, sinkAttributeBuffer); - statement.setString(6, String.valueOf(new Timestamp(System.currentTimeMillis()))); + statement.setBytes(6, null); + statement.setString(7, String.valueOf(new Timestamp(System.currentTimeMillis()))); statement.executeUpdate(); LOGGER.info("successfully persisted task {} info", taskId); @@ -170,25 +154,11 @@ public void tryPersistenceTask( } } - private byte[] serialize(final Map attribute) throws IOException { - try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); - final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { - ReadWriteIOUtils.write(attribute.size(), outputStream); - for (final Map.Entry entry : attribute.entrySet()) { - ReadWriteIOUtils.write(entry.getKey(), outputStream); - ReadWriteIOUtils.write(entry.getValue(), outputStream); - } - - return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()) - .array(); - } - } - public void tryDeleteTask(final String taskId) { final String deleteSQL = "DELETE FROM task WHERE task_id = ?"; - try (final Connection connection = getConnection()) { - final PreparedStatement statement = connection.prepareStatement(deleteSQL); + try (final Connection connection = getConnection(); + final PreparedStatement statement = connection.prepareStatement(deleteSQL)) { statement.setString(1, taskId); statement.executeUpdate(); @@ -201,15 +171,55 @@ public void tryDeleteTask(final String taskId) { public void tryAlterTaskState(final String taskId, final TaskStateEnum taskState) { final String alterSQL = "UPDATE task SET task_state = ? WHERE task_id = ?"; - try (final Connection connection = getConnection()) { - final PreparedStatement statement = connection.prepareStatement(alterSQL); + try (final Connection connection = getConnection(); + final PreparedStatement statement = connection.prepareStatement(alterSQL)) { statement.setInt(1, taskState.getTaskState()); statement.setString(2, taskId); statement.executeUpdate(); LOGGER.info("successfully altered task {}", taskId); - } catch (SQLException e) { + } catch (final SQLException e) { LOGGER.warn("Failed to alter task persistence message, because {}", e.getMessage()); } } + + public void tryReportTaskProgress(final ProgressReportEvent reportEvent) { + if (reportEvent.getInstancesProgress() == null + || reportEvent.getInstancesProgress().isEmpty()) { + return; + } + + final String reportSQL = "UPDATE task SET task_progress = ? WHERE task_id = ?"; + + try (final Connection connection = getConnection(); + final PreparedStatement statement = connection.prepareStatement(reportSQL)) { + statement.setBytes( + 1, SerializationUtil.serializeInstances(reportEvent.getInstancesProgress())); + statement.setString(2, reportEvent.getTaskId()); + statement.executeUpdate(); + } catch (final SQLException | IOException e) { + LOGGER.warn("Failed to report task progress because {}", e.getMessage()); + } + } + + public Optional> getTasksProgress(final String taskId) { + final String queryProgressSQL = "SELECT task_progress FROM task WHERE task_id = ?"; + + try (final Connection connection = getConnection(); + final PreparedStatement statement = connection.prepareStatement(queryProgressSQL)) { + statement.setString(1, taskId); + + try (final ResultSet resultSet = statement.executeQuery()) { + final byte[] bytes = resultSet.getBytes("task_progress"); + + return bytes == null + ? Optional.empty() + : Optional.of(SerializationUtil.deserializeInstances(bytes)); + } + } catch (final SQLException e) { + LOGGER.warn("Failed to retrieve task progress because {}", e.getMessage()); + } + + return Optional.empty(); + } } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/PullSource.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/PullSource.java index 47313ee6..8e5030fb 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/PullSource.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/PullSource.java @@ -19,10 +19,13 @@ package org.apache.iotdb.collector.plugin.api; +import org.apache.iotdb.collector.runtime.progress.ProgressIndex; import org.apache.iotdb.pipe.api.PipeSource; import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import java.util.Optional; + public abstract class PullSource implements PipeSource { @Override @@ -31,4 +34,6 @@ public final void customize( PipeExtractorRuntimeConfiguration pipeExtractorRuntimeConfiguration) { throw new UnsupportedOperationException(); } + + public abstract Optional report(); } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/PushSource.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/PushSource.java index 7663c096..90daa8f5 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/PushSource.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/PushSource.java @@ -19,14 +19,20 @@ package org.apache.iotdb.collector.plugin.api; +import org.apache.iotdb.collector.runtime.progress.ProgressIndex; +import org.apache.iotdb.collector.runtime.task.TaskDispatch; import org.apache.iotdb.pipe.api.PipeSource; import org.apache.iotdb.pipe.api.collector.EventCollector; import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; +import java.util.Optional; + public abstract class PushSource implements PipeSource { + private TaskDispatch dispatch; + protected EventCollector collector; public PushSource() { @@ -50,7 +56,25 @@ public final Event supply() { throw new UnsupportedOperationException(); } + public final void markPausePosition() { + dispatch.waitUntilRunningOrDropped(); + } + + public final void pause() { + dispatch.pause(); + } + + public final void resume() { + dispatch.resume(); + } + public final void supply(final Event event) throws Exception { collector.collect(event); } + + public final void setDispatch(final TaskDispatch dispatch) { + this.dispatch = dispatch; + } + + public abstract Optional report(); } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorParameters.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorParameters.java index 32ea6642..6a7fe4b1 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorParameters.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorParameters.java @@ -19,18 +19,16 @@ package org.apache.iotdb.collector.plugin.api.customizer; -import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; -import java.util.Map; +public class CollectorParameters { + private static final Set PARAM_SET = + Collections.unmodifiableSet(new HashSet<>(Arrays.asList("source", "processor", "sink"))); -public class CollectorParameters extends PipeParameters { - public CollectorParameters(final Map attributes) { - super(attributes); - this.attributes.forEach( - (key, value) -> { - if (!"taskId".equals(key)) { - attributes.put(key, value.replace("_", "-")); - } - }); + public static boolean matchAnyParam(final String param) { + return PARAM_SET.contains(param); } } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/BuiltinPlugin.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/BuiltinPlugin.java index 3ae111f9..8996f473 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/BuiltinPlugin.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/BuiltinPlugin.java @@ -26,11 +26,13 @@ import org.apache.iotdb.collector.plugin.builtin.source.HttpPullSource; import org.apache.iotdb.collector.plugin.builtin.source.HttpPushSource; import org.apache.iotdb.collector.plugin.builtin.source.IoTDBPushSource; +import org.apache.iotdb.collector.plugin.builtin.source.KafkaSource; public enum BuiltinPlugin { // Push Sources HTTP_PUSH_SOURCE("http-push-source", HttpPushSource.class), + KAFKA_SOURCE("kafka-source", KafkaSource.class), // Pull Sources HTTP_PULL_SOURCE("http-pull-source", HttpPullSource.class), diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPullSource.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPullSource.java index ff4cffeb..1fe10fd2 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPullSource.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPullSource.java @@ -21,6 +21,7 @@ import org.apache.iotdb.collector.plugin.api.PullSource; import org.apache.iotdb.collector.plugin.api.event.DemoEvent; +import org.apache.iotdb.collector.runtime.progress.ProgressIndex; import org.apache.iotdb.pipe.api.customizer.configuration.PipeSourceRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; @@ -29,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Optional; import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; @@ -58,4 +60,9 @@ public Event supply() { @Override public void close() {} + + @Override + public Optional report() { + return Optional.empty(); + } } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPushSource.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPushSource.java index 22258292..495bb854 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPushSource.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPushSource.java @@ -21,6 +21,7 @@ import org.apache.iotdb.collector.plugin.api.PushSource; import org.apache.iotdb.collector.plugin.api.event.DemoEvent; +import org.apache.iotdb.collector.runtime.progress.ProgressIndex; import org.apache.iotdb.pipe.api.customizer.configuration.PipeSourceRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; @@ -29,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Optional; import java.util.Random; import java.util.concurrent.TimeUnit; @@ -58,6 +60,8 @@ public void start() { private void doWork() { try { while (isStarted && !Thread.currentThread().isInterrupted()) { + markPausePosition(); + final Event event = new DemoEvent(String.valueOf(new Random().nextInt(1000))); LOGGER.info("{} created successfully ...", event); supply(event); @@ -83,4 +87,9 @@ public void close() { workerThread = null; } } + + @Override + public Optional report() { + return Optional.empty(); + } } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/IoTDBPushSource.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/IoTDBPushSource.java index 66315690..affb5cc1 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/IoTDBPushSource.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/IoTDBPushSource.java @@ -22,6 +22,7 @@ import org.apache.iotdb.collector.plugin.api.PushSource; import org.apache.iotdb.collector.plugin.builtin.source.constant.IoTDBPushSourceConstant; import org.apache.iotdb.collector.plugin.builtin.source.event.SubDemoEvent; +import org.apache.iotdb.collector.runtime.progress.ProgressIndex; import org.apache.iotdb.pipe.api.customizer.configuration.PipeSourceRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; @@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory; import java.util.List; +import java.util.Optional; import java.util.Properties; public class IoTDBPushSource extends PushSource { @@ -96,6 +98,8 @@ private void doWork() { consumer.subscribe(topic); while (isStarted && !Thread.currentThread().isInterrupted()) { + markPausePosition(); + final List messages = consumer.poll(timeout); for (final SubscriptionMessage message : messages) { final short messageType = message.getMessageType(); @@ -126,4 +130,9 @@ public void close() throws Exception { workerThread = null; } } + + @Override + public Optional report() { + return Optional.empty(); + } } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/KafkaSource.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/KafkaSource.java new file mode 100644 index 00000000..e1662023 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/KafkaSource.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.plugin.builtin.source; + +import org.apache.iotdb.collector.plugin.api.PushSource; +import org.apache.iotdb.collector.plugin.api.customizer.CollectorRuntimeEnvironment; +import org.apache.iotdb.collector.runtime.progress.ProgressIndex; +import org.apache.iotdb.collector.service.RuntimeService; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeSourceRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import static org.apache.iotdb.collector.plugin.builtin.source.constant.KafkaSourceConstant.KAFKA_SOURCE_GROUP_ID_KEY; +import static org.apache.iotdb.collector.plugin.builtin.source.constant.KafkaSourceConstant.KAFKA_SOURCE_GROUP_ID_VALUE; +import static org.apache.iotdb.collector.plugin.builtin.source.constant.KafkaSourceConstant.KAFKA_SOURCE_TOPIC_KEY; +import static org.apache.iotdb.collector.plugin.builtin.source.constant.KafkaSourceConstant.KAFKA_SOURCE_TOPIC_VALUE; +import static org.apache.iotdb.collector.plugin.builtin.source.constant.KafkaSourceConstant.KAFKA_SOURCE_URL_KEY; +import static org.apache.iotdb.collector.plugin.builtin.source.constant.KafkaSourceConstant.KAFKA_SOURCE_URL_VALUE; + +public class KafkaSource extends PushSource { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSource.class); + + private ProgressIndex startIndex; + private int instanceIndex; + + private Thread workerThread; + private volatile boolean isStarted = false; + + // kafka config + private String topic; + private String kafkaServiceURL; + private String groupId; + private long offset; + + @Override + public void validate(PipeParameterValidator validator) throws Exception {} + + @Override + public void customize( + PipeParameters pipeParameters, PipeSourceRuntimeConfiguration pipeSourceRuntimeConfiguration) + throws Exception { + final CollectorRuntimeEnvironment environment = + (CollectorRuntimeEnvironment) pipeSourceRuntimeConfiguration.getRuntimeEnvironment(); + + final String taskId = environment.getPipeName(); + instanceIndex = environment.getInstanceIndex(); + startIndex = + RuntimeService.progress().isPresent() + ? RuntimeService.progress().get().getInstanceProgressIndex(taskId, instanceIndex) + == null + ? new ProgressIndex(instanceIndex, new HashMap<>()) + : RuntimeService.progress().get().getInstanceProgressIndex(taskId, instanceIndex) + : new ProgressIndex(instanceIndex, new HashMap<>()); + + topic = pipeParameters.getStringOrDefault(KAFKA_SOURCE_TOPIC_KEY, KAFKA_SOURCE_TOPIC_VALUE); + kafkaServiceURL = + pipeParameters.getStringOrDefault(KAFKA_SOURCE_URL_KEY, KAFKA_SOURCE_URL_VALUE); + groupId = + pipeParameters.getStringOrDefault(KAFKA_SOURCE_GROUP_ID_KEY, KAFKA_SOURCE_GROUP_ID_VALUE); + } + + @Override + public void start() throws Exception { + if (workerThread == null || !workerThread.isAlive()) { + isStarted = true; + workerThread = new Thread(this::doWork); + workerThread.start(); + } + } + + public void doWork() { + final Properties props = new Properties(); + props.put("bootstrap.servers", kafkaServiceURL); + props.put("key.deserializer", StringDeserializer.class.getName()); + props.put("value.deserializer", StringDeserializer.class.getName()); + props.put("group.id", groupId); + props.put("auto.offset.reset", "none"); + props.put("enable.auto.commit", "false"); + + try (KafkaConsumer consumer = new KafkaConsumer<>(props)) { + final TopicPartition currentWorkTopicPartition = new TopicPartition(topic, instanceIndex); + offset = Long.parseLong(startIndex.getProgressInfo().getOrDefault("offset", "0")); + + consumer.assign(Collections.singleton(currentWorkTopicPartition)); + consumer.seek(currentWorkTopicPartition, offset); + + while (isStarted && !Thread.currentThread().isInterrupted()) { + markPausePosition(); + + final ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (final ConsumerRecord record : records) { + LOGGER.info( + "Partition{} consumed offset={} key={} value={}", + instanceIndex, + record.offset(), + record.key(), + record.value()); + + offset = record.offset() + 1; + TimeUnit.SECONDS.sleep(1); + } + } + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws Exception { + isStarted = false; + if (workerThread != null) { + workerThread.interrupt(); + try { + workerThread.join(1000); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + workerThread = null; + } + } + + @Override + public Optional report() { + final HashMap progressInfo = new HashMap<>(); + progressInfo.put("offset", String.valueOf(offset)); + + return Optional.of(new ProgressIndex(instanceIndex, progressInfo)); + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/constant/KafkaSourceConstant.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/constant/KafkaSourceConstant.java new file mode 100644 index 00000000..b8b3ec66 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/constant/KafkaSourceConstant.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.plugin.builtin.source.constant; + +public class KafkaSourceConstant { + public static final String KAFKA_SOURCE_TOPIC_KEY = "topic"; + public static final String KAFKA_SOURCE_URL_KEY = "url"; + public static final String KAFKA_SOURCE_GROUP_ID_KEY = "group-id"; + + public static final String KAFKA_SOURCE_TOPIC_VALUE = "my_topic"; + public static final String KAFKA_SOURCE_URL_VALUE = "localhost:9092"; + public static final String KAFKA_SOURCE_GROUP_ID_VALUE = "multi-thread-group"; +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java index 7097ce50..69a80f8f 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java @@ -25,6 +25,7 @@ import org.apache.iotdb.collector.plugin.builtin.source.HttpPullSource; import org.apache.iotdb.collector.plugin.builtin.source.HttpPushSource; import org.apache.iotdb.collector.plugin.builtin.source.IoTDBPushSource; +import org.apache.iotdb.collector.plugin.builtin.source.KafkaSource; import org.apache.iotdb.collector.runtime.plugin.meta.PluginMetaKeeper; import org.apache.iotdb.pipe.api.PipeSource; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; @@ -40,6 +41,7 @@ protected void initConstructors() { pluginConstructors.put(BuiltinPlugin.HTTP_PULL_SOURCE.getPluginName(), HttpPullSource::new); pluginConstructors.put(BuiltinPlugin.HTTP_PUSH_SOURCE.getPluginName(), HttpPushSource::new); pluginConstructors.put(BuiltinPlugin.SUBSCRIPTION_SOURCE.getPluginName(), IoTDBPushSource::new); + pluginConstructors.put(BuiltinPlugin.KAFKA_SOURCE.getPluginName(), KafkaSource::new); } @Override diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/progress/ProgressIndex.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/progress/ProgressIndex.java new file mode 100644 index 00000000..0feadb45 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/progress/ProgressIndex.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.runtime.progress; + +import java.util.Map; + +public class ProgressIndex { + private final Map progressInfo; + private final Integer instanceIndex; + + public ProgressIndex(final Integer instanceIndex, final Map progressInfo) { + this.instanceIndex = instanceIndex; + this.progressInfo = progressInfo; + } + + public Integer getInstanceIndex() { + return instanceIndex; + } + + public Map getProgressInfo() { + return progressInfo; + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/progress/ProgressRuntime.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/progress/ProgressRuntime.java new file mode 100644 index 00000000..5a8cf020 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/progress/ProgressRuntime.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.runtime.progress; + +import org.apache.iotdb.collector.runtime.task.event.ProgressReportEvent; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class ProgressRuntime implements Closeable { + private static final Map TASKS_PROGRESS = new ConcurrentHashMap<>(); + + public void addTaskProgress(final String taskId, final ProgressReportEvent event) { + TASKS_PROGRESS.putIfAbsent(taskId, event); + } + + public ProgressIndex getInstanceProgressIndex(final String taskId, final Integer instanceIndex) { + return TASKS_PROGRESS.get(taskId).getInstanceProgress(instanceIndex); + } + + public void removeTaskProgress(final String taskId) { + TASKS_PROGRESS.remove(taskId); + } + + @Override + public void close() throws IOException { + TASKS_PROGRESS.clear(); + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/Task.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/Task.java index 2dbcc71a..2783addd 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/Task.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/Task.java @@ -19,14 +19,14 @@ package org.apache.iotdb.collector.runtime.task; -import org.apache.iotdb.collector.plugin.api.customizer.CollectorParameters; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import java.util.Map; public abstract class Task { protected final String taskId; - protected final CollectorParameters parameters; + protected final PipeParameters parameters; protected final int parallelism; @@ -38,7 +38,7 @@ protected Task( final String parallelismKey, final int parallelismValue) { this.taskId = taskId; - this.parameters = new CollectorParameters(attributes); + this.parameters = new PipeParameters(attributes); this.parallelism = parameters.getIntOrDefault(parallelismKey, parallelismValue); diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java index a3e4204b..144af7d4 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java @@ -19,16 +19,21 @@ package org.apache.iotdb.collector.runtime.task; +import org.apache.iotdb.collector.plugin.api.customizer.CollectorParameters; +import org.apache.iotdb.collector.runtime.task.event.ProgressReportEvent; import org.apache.iotdb.collector.runtime.task.processor.ProcessorTask; import org.apache.iotdb.collector.runtime.task.sink.SinkTask; import org.apache.iotdb.collector.runtime.task.source.SourceTask; import org.apache.iotdb.collector.service.PersistenceService; +import org.apache.iotdb.collector.service.RuntimeService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.ws.rs.core.Response; +import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -53,11 +58,26 @@ public synchronized Response createTask( .build(); } - final SinkTask sinkTask = new SinkTask(taskId, sinkAttribute); + RuntimeService.progress() + .ifPresent( + progress -> + progress.addTaskProgress( + taskId, + new ProgressReportEvent( + taskId, + PersistenceService.task().isPresent() + ? PersistenceService.task() + .get() + .getTasksProgress(taskId) + .orElseGet(HashMap::new) + : new HashMap<>()))); + + final SinkTask sinkTask = new SinkTask(taskId, convert(sinkAttribute)); final ProcessorTask processorTask = - new ProcessorTask(taskId, processorAttribute, sinkTask.makeProducer()); + new ProcessorTask(taskId, convert(processorAttribute), sinkTask.makeProducer()); final SourceTask sourceTask = - SourceTask.construct(taskId, sourceAttribute, processorTask.makeProducer(), taskState); + SourceTask.construct( + taskId, convert(sourceAttribute), processorTask.makeProducer(), taskState); final TaskCombiner taskCombiner = new TaskCombiner(sourceTask, processorTask, sinkTask); taskCombiner.create(); @@ -82,6 +102,7 @@ public synchronized Response createTask( .entity(String.format("Successfully created task %s", taskId)) .build(); } catch (final Exception e) { + RuntimeService.progress().ifPresent(progress -> progress.removeTaskProgress(taskId)); tasks.remove(taskId); LOGGER.warn("Failed to create task {} because {}", taskId, e.getMessage(), e); @@ -131,6 +152,8 @@ public synchronized Response stopTask(final String taskId) { .ifPresent( taskPersistence -> taskPersistence.tryAlterTaskState(taskId, TaskStateEnum.STOPPED)); + RuntimeService.progress().ifPresent(progress -> progress.removeTaskProgress(taskId)); + LOGGER.info("Task {} stop successfully", taskId); return Response.status(Response.Status.OK) .entity(String.format("task %s stop successfully", taskId)) @@ -179,4 +202,21 @@ public synchronized void close() throws Exception { tasks.clear(); LOGGER.info("Task runtime closed in {}ms", System.currentTimeMillis() - currentTime); } + + private Map convert(final Map attributes) { + final Map modifyParamMap = new HashMap<>(); + final Iterator> paramIterator = attributes.entrySet().iterator(); + + while (paramIterator.hasNext()) { + final Map.Entry param = paramIterator.next(); + + if (!CollectorParameters.matchAnyParam(param.getKey())) { + modifyParamMap.put(param.getKey().replace("-", "_"), param.getValue()); + paramIterator.remove(); + } + } + attributes.putAll(modifyParamMap); + + return attributes; + } } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/event/ProgressReportEvent.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/event/ProgressReportEvent.java new file mode 100644 index 00000000..90ea81d6 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/event/ProgressReportEvent.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.runtime.task.event; + +import org.apache.iotdb.collector.runtime.progress.ProgressIndex; + +import java.util.Map; + +public class ProgressReportEvent { + private final String taskId; + private final Map instancesProgress; + + public ProgressReportEvent( + final String taskId, final Map instancesProgress) { + this.taskId = taskId; + this.instancesProgress = instancesProgress; + } + + public String getTaskId() { + return taskId; + } + + public ProgressIndex getInstanceProgress(final int instanceId) { + return instancesProgress.get(instanceId); + } + + public Map getInstancesProgress() { + return instancesProgress; + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorTask.java index 7b989227..03353a7f 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorTask.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorTask.java @@ -19,14 +19,15 @@ package org.apache.iotdb.collector.runtime.task.processor; +import org.apache.iotdb.collector.config.TaskRuntimeOptions; import org.apache.iotdb.collector.plugin.api.customizer.CollectorProcessorRuntimeConfiguration; import org.apache.iotdb.collector.plugin.api.event.PeriodicalEvent; import org.apache.iotdb.collector.runtime.plugin.PluginRuntime; import org.apache.iotdb.collector.runtime.task.Task; import org.apache.iotdb.collector.runtime.task.event.EventCollector; import org.apache.iotdb.collector.runtime.task.event.EventContainer; -import org.apache.iotdb.collector.service.PeriodicalJobService; import org.apache.iotdb.collector.service.RuntimeService; +import org.apache.iotdb.collector.service.ScheduleService; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import com.lmax.disruptor.BlockingWaitStrategy; @@ -64,7 +65,9 @@ public ProcessorTask( taskId, attributes, TASK_PROCESS_PARALLELISM_NUM.key(), - TASK_PROCESS_PARALLELISM_NUM.value()); + attributes.containsKey(TASK_PROCESS_PARALLELISM_NUM.key()) + ? Integer.parseInt(TASK_PROCESS_PARALLELISM_NUM.key()) + : TASK_PROCESS_PARALLELISM_NUM.value()); REGISTERED_EXECUTOR_SERVICES.putIfAbsent( taskId, @@ -122,19 +125,25 @@ public void createInternal() throws Exception { disruptor.start(); // Scheduled and proactive sink actions - PeriodicalJobService.register(taskId, () -> sinkProducer.collect(new PeriodicalEvent())); + ScheduleService.pushEvent() + .ifPresent( + event -> + event.register( + taskId, + () -> sinkProducer.collect(new PeriodicalEvent()), + TaskRuntimeOptions.EXECUTOR_CRON_HEARTBEAT_EVENT_INTERVAL_SECONDS.value())); } @Override public void startInternal() { // resume proactive sink actions - PeriodicalJobService.resumeSingleTask(taskId); + ScheduleService.pushEvent().ifPresent(event -> event.resumeSingleJob(taskId)); } @Override public void stopInternal() { // pause proactive sink actions - PeriodicalJobService.pauseSingleTask(taskId); + ScheduleService.pushEvent().ifPresent(event -> event.pauseSingleJob(taskId)); } @Override @@ -150,7 +159,7 @@ public void dropInternal() { } // remove proactive sink actions - PeriodicalJobService.deregister(taskId); + ScheduleService.pushEvent().ifPresent(event -> event.deregister(taskId)); disruptor.shutdown(); diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkTask.java index e63ae842..3def6d9a 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkTask.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkTask.java @@ -54,7 +54,13 @@ public class SinkTask extends Task { private SinkConsumer[] consumers; public SinkTask(final String taskId, final Map attributes) { - super(taskId, attributes, TASK_SINK_PARALLELISM_NUM.key(), TASK_SINK_PARALLELISM_NUM.value()); + super( + taskId, + attributes, + TASK_SINK_PARALLELISM_NUM.key(), + attributes.containsKey(TASK_SINK_PARALLELISM_NUM.key()) + ? Integer.parseInt(TASK_SINK_PARALLELISM_NUM.key()) + : TASK_SINK_PARALLELISM_NUM.value()); REGISTERED_EXECUTOR_SERVICES.putIfAbsent( taskId, diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/SourceTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/SourceTask.java index 2cf5f67c..206b4ca8 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/SourceTask.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/SourceTask.java @@ -43,7 +43,12 @@ protected SourceTask( final EventCollector processorProducer, final TaskStateEnum taskState) { super( - taskId, attributes, TASK_SOURCE_PARALLELISM_NUM.key(), TASK_SOURCE_PARALLELISM_NUM.value()); + taskId, + attributes, + TASK_SOURCE_PARALLELISM_NUM.key(), + attributes.containsKey(TASK_SOURCE_PARALLELISM_NUM.key()) + ? Integer.parseInt(attributes.get(TASK_SOURCE_PARALLELISM_NUM.key())) + : TASK_SOURCE_PARALLELISM_NUM.value()); this.processorProducer = processorProducer; this.taskState = taskState; } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java index 219dfcf2..ffa48a0a 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java @@ -19,18 +19,24 @@ package org.apache.iotdb.collector.runtime.task.source.pull; +import org.apache.iotdb.collector.config.TaskRuntimeOptions; import org.apache.iotdb.collector.plugin.api.PullSource; import org.apache.iotdb.collector.plugin.api.customizer.CollectorSourceRuntimeConfiguration; import org.apache.iotdb.collector.runtime.plugin.PluginRuntime; +import org.apache.iotdb.collector.runtime.progress.ProgressIndex; import org.apache.iotdb.collector.runtime.task.TaskStateEnum; import org.apache.iotdb.collector.runtime.task.event.EventCollector; +import org.apache.iotdb.collector.runtime.task.event.ProgressReportEvent; import org.apache.iotdb.collector.runtime.task.source.SourceTask; +import org.apache.iotdb.collector.service.PersistenceService; import org.apache.iotdb.collector.service.RuntimeService; +import org.apache.iotdb.collector.service.ScheduleService; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -111,6 +117,33 @@ public void createInternal() throws Exception { } }); } + + // register storage progress schedule job + ScheduleService.reportProgress() + .ifPresent( + reportEvent -> { + reportEvent.register( + taskId, + () -> { + if (consumers != null && consumers.length > 0) { + Map progresses = new HashMap<>(); + for (int i = 0; i < consumers.length; i++) { + final int finalI = i; + consumers[i] + .consumer() + .report() + .ifPresent(progressIndex -> progresses.put(finalI, progressIndex)); + } + + PersistenceService.task() + .ifPresent( + task -> + task.tryReportTaskProgress( + new ProgressReportEvent(taskId, progresses))); + } + }, + TaskRuntimeOptions.TASK_PROGRESS_REPORT_INTERVAL.value()); + }); } @Override diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/push/PushSourceTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/push/PushSourceTask.java index 7419ae74..1c74234b 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/push/PushSourceTask.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/push/PushSourceTask.java @@ -19,18 +19,24 @@ package org.apache.iotdb.collector.runtime.task.source.push; +import org.apache.iotdb.collector.config.TaskRuntimeOptions; import org.apache.iotdb.collector.plugin.api.PushSource; import org.apache.iotdb.collector.plugin.api.customizer.CollectorSourceRuntimeConfiguration; import org.apache.iotdb.collector.runtime.plugin.PluginRuntime; +import org.apache.iotdb.collector.runtime.progress.ProgressIndex; import org.apache.iotdb.collector.runtime.task.TaskStateEnum; import org.apache.iotdb.collector.runtime.task.event.EventCollector; +import org.apache.iotdb.collector.runtime.task.event.ProgressReportEvent; import org.apache.iotdb.collector.runtime.task.source.SourceTask; +import org.apache.iotdb.collector.service.PersistenceService; import org.apache.iotdb.collector.service.RuntimeService; +import org.apache.iotdb.collector.service.ScheduleService; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; import java.util.Map; public class PushSourceTask extends SourceTask { @@ -60,15 +66,16 @@ public void createInternal() throws Exception { for (int i = 0; i < parallelism; i++) { pushSources[i] = (PushSource) pluginRuntime.constructSource(parameters); pushSources[i].setCollector(processorProducer); - + pushSources[i].setDispatch(dispatch); try { pushSources[i].validate(new PipeParameterValidator(parameters)); pushSources[i].customize( parameters, new CollectorSourceRuntimeConfiguration(taskId, creationTime, parallelism, i)); - if (TaskStateEnum.RUNNING.equals(taskState)) { - pushSources[i].start(); + if (TaskStateEnum.STOPPED.equals(taskState)) { + pushSources[i].pause(); } + pushSources[i].start(); } catch (final Exception e) { try { pushSources[i].close(); @@ -78,6 +85,33 @@ public void createInternal() throws Exception { } } } + + // register storage progress schedule job + ScheduleService.reportProgress() + .ifPresent( + reportEvent -> { + reportEvent.register( + taskId, + () -> { + if (pushSources != null && pushSources.length > 0) { + Map progresses = new HashMap<>(); + for (int i = 0; i < pushSources.length; i++) { + final int finalI = i; + pushSources[i] + .report() + .ifPresent(progressIndex -> progresses.put(finalI, progressIndex)); + } + + PersistenceService.task() + .ifPresent( + task -> + task.tryReportTaskProgress( + new ProgressReportEvent(taskId, progresses))); + LOGGER.info("successfully reported task progress {}", progresses); + } + }, + TaskRuntimeOptions.TASK_PROGRESS_REPORT_INTERVAL.value()); + }); } @Override @@ -89,7 +123,7 @@ public void startInternal() { if (pushSources != null) { for (int i = 0; i < parallelism; i++) { try { - pushSources[i].start(); + pushSources[i].resume(); } catch (final Exception e) { LOGGER.warn("Failed to restart push source", e); return; @@ -109,7 +143,7 @@ public void stopInternal() { if (pushSources != null) { for (int i = 0; i < parallelism; i++) { try { - pushSources[i].close(); + pushSources[i].pause(); } catch (final Exception e) { LOGGER.warn("Failed to stop source", e); return; diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/schedule/ScheduleJob.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/schedule/ScheduleJob.java new file mode 100644 index 00000000..ed5c87f9 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/schedule/ScheduleJob.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.schedule; + +import org.apache.iotdb.collector.utils.Triple; +import org.apache.iotdb.collector.utils.preiodical.ScheduledExecutorUtil; +import org.apache.iotdb.collector.utils.preiodical.WrappedRunnable; + +import org.apache.tsfile.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public abstract class ScheduleJob { + + private static final Logger LOGGER = LoggerFactory.getLogger(ScheduleJob.class); + + private final ScheduledExecutorService executorService; + private final long minIntervalSeconds; + + private long rounds; + private Future executorFuture; + + private final List>> periodicalJobs = + new CopyOnWriteArrayList<>(); + + public ScheduleJob( + final ScheduledExecutorService executorService, final long minIntervalSeconds) { + this.executorService = executorService; + this.minIntervalSeconds = minIntervalSeconds; + } + + public synchronized void start() { + if (executorFuture == null) { + rounds = 0; + + executorFuture = + ScheduledExecutorUtil.safelyScheduleWithFixedDelay( + executorService, + this::execute, + minIntervalSeconds, + minIntervalSeconds, + TimeUnit.SECONDS); + LOGGER.info("Periodical job executor is started successfully."); + } + } + + protected void execute() { + ++rounds; + + for (final Pair> periodicalJob : + periodicalJobs) { + if (rounds % periodicalJob.right.right == 0 && periodicalJob.right.middle) { + periodicalJob.right.left.run(); + } + } + } + + public synchronized void register( + final String id, final Runnable periodicalJob, final long intervalInSeconds) { + periodicalJobs.add( + new Pair<>( + id, + new Triple<>( + new WrappedRunnable() { + @Override + public void runMayThrow() { + try { + periodicalJob.run(); + } catch (Exception e) { + LOGGER.warn("Periodical job {} failed.", id, e); + } + } + }, + true, + Math.max(intervalInSeconds / minIntervalSeconds, 1)))); + LOGGER.info( + "Periodical job {} is registered successfully. Interval: {} seconds.", + id, + Math.max(intervalInSeconds / minIntervalSeconds, 1) * minIntervalSeconds); + } + + public synchronized void deregister(final String id) { + periodicalJobs.removeIf(pair -> pair.left.equals(id)); + } + + public synchronized void pauseSingleJob(final String id) { + periodicalJobs.forEach( + pair -> { + if (pair.left.equals(id)) { + pair.right.middle = false; + } + }); + } + + public synchronized void resumeSingleJob(final String id) { + periodicalJobs.forEach( + pair -> { + if (pair.left.equals(id)) { + pair.right.middle = true; + } + }); + } + + public synchronized void stop() { + if (executorFuture != null) { + executorFuture.cancel(false); + executorFuture = null; + + LOGGER.info("Periodical job executor is stopped successfully."); + } + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/schedule/SchedulePushEventJob.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/schedule/SchedulePushEventJob.java new file mode 100644 index 00000000..06a3bd38 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/schedule/SchedulePushEventJob.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.schedule; + +import org.apache.iotdb.collector.config.TaskRuntimeOptions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ScheduledThreadPoolExecutor; + +public class SchedulePushEventJob extends ScheduleJob { + + private static final Logger LOGGER = LoggerFactory.getLogger(SchedulePushEventJob.class); + + public SchedulePushEventJob() { + super( + new ScheduledThreadPoolExecutor(1), + TaskRuntimeOptions.EXECUTOR_CRON_HEARTBEAT_EVENT_INTERVAL_SECONDS.value()); + + LOGGER.info("new single scheduled thread pool: {}", ThreadName.SCHEDULE_PUSH_EVENT_JOB); + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/schedule/ScheduleReportProgressJob.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/schedule/ScheduleReportProgressJob.java new file mode 100644 index 00000000..5b8cc3a1 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/schedule/ScheduleReportProgressJob.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.schedule; + +import org.apache.iotdb.collector.config.TaskRuntimeOptions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ScheduledThreadPoolExecutor; + +public class ScheduleReportProgressJob extends ScheduleJob { + + private static final Logger LOGGER = LoggerFactory.getLogger(ScheduleReportProgressJob.class); + + public ScheduleReportProgressJob() { + super( + new ScheduledThreadPoolExecutor(1), + TaskRuntimeOptions.TASK_PROGRESS_REPORT_INTERVAL.value()); + + LOGGER.info("new single scheduled thread pool: {}", ThreadName.SCHEDULE_REPORT_PROGRESS_JOB); + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/schedule/ThreadName.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/schedule/ThreadName.java new file mode 100644 index 00000000..2f06c271 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/schedule/ThreadName.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.collector.schedule; + +public enum ThreadName { + SCHEDULE_PUSH_EVENT_JOB("Schedule-Push-Event-Job"), + SCHEDULE_REPORT_PROGRESS_JOB("Schedule-Report-ProgressIndex-Job"); + + private final String name; + + ThreadName(final String name) { + this.name = name; + } + + public String getName() { + return name; + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/PeriodicalJobService.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/PeriodicalJobService.java deleted file mode 100644 index f04cbed7..00000000 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/PeriodicalJobService.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.collector.service; - -import org.apache.iotdb.collector.config.PipeRuntimeOptions; -import org.apache.iotdb.collector.utils.preiodical.ScheduledExecutorUtil; -import org.apache.iotdb.collector.utils.preiodical.WrappedRunnable; - -import org.apache.tsfile.utils.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -public class PeriodicalJobService implements IService { - - private static final Logger LOGGER = LoggerFactory.getLogger(PeriodicalJobService.class); - - private static final ScheduledExecutorService executorService = - new ScheduledThreadPoolExecutor(1); - - private Future executorFuture; - - // String: task id - // Boolean: task status, if task pause, skip execute - // WrappedRunnable: periodical job - private static final List>> PERIODICAL_JOBS = - new CopyOnWriteArrayList<>(); - - @Override - public void start() { - if (executorFuture == null) { - - executorFuture = - ScheduledExecutorUtil.safelyScheduleWithFixedDelay( - executorService, - this::execute, - PipeRuntimeOptions.EXECUTOR_CRON_HEARTBEAT_EVENT_INTERVAL_SECONDS.value(), - PipeRuntimeOptions.EXECUTOR_CRON_HEARTBEAT_EVENT_INTERVAL_SECONDS.value(), - TimeUnit.SECONDS); - - LOGGER.info("Periodical Job Service started successfully."); - } - } - - protected void execute() { - for (final Pair> periodicalJob : PERIODICAL_JOBS) { - if (periodicalJob.right.left) { - periodicalJob.right.right.run(); - } - } - } - - public static synchronized void register(final String taskId, final Runnable periodicalJob) { - PERIODICAL_JOBS.add( - new Pair<>( - taskId, - new Pair<>( - true, - new WrappedRunnable() { - @Override - public void runMayThrow() { - try { - periodicalJob.run(); - } catch (final Exception e) { - LOGGER.warn("Periodical job {} failed.", taskId, e); - } - } - }))); - } - - public static synchronized void deregister(final String taskId) { - PERIODICAL_JOBS.removeIf(pair -> pair.left.equals(taskId)); - } - - public static synchronized void resumeSingleTask(final String taskId) { - PERIODICAL_JOBS.forEach( - pair -> { - if (pair.getLeft().equals(taskId)) { - pair.right.left = true; - } - }); - } - - public static synchronized void pauseSingleTask(final String taskId) { - PERIODICAL_JOBS.forEach( - pair -> { - if (pair.getLeft().equals(taskId)) { - pair.right.left = false; - } - }); - } - - @Override - public synchronized void stop() { - if (executorFuture != null) { - executorFuture.cancel(false); - executorFuture = null; - LOGGER.info("Periodical Job Service stopped successfully."); - } - } - - @Override - public String name() { - return "PeriodicalJobService"; - } -} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/RuntimeService.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/RuntimeService.java index 001df2da..cf19627e 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/RuntimeService.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/RuntimeService.java @@ -20,11 +20,13 @@ package org.apache.iotdb.collector.service; import org.apache.iotdb.collector.runtime.plugin.PluginRuntime; +import org.apache.iotdb.collector.runtime.progress.ProgressRuntime; import org.apache.iotdb.collector.runtime.task.TaskRuntime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; @@ -34,11 +36,13 @@ public class RuntimeService implements IService { private static final AtomicReference TASK = new AtomicReference<>(); private static final AtomicReference PLUGIN = new AtomicReference<>(); + private static final AtomicReference PROGRESS = new AtomicReference<>(); @Override public synchronized void start() { TASK.set(new TaskRuntime()); PLUGIN.set(new PluginRuntime()); + PROGRESS.set(new ProgressRuntime()); } public static Optional task() { @@ -49,6 +53,10 @@ public static Optional plugin() { return Optional.of(PLUGIN.get()); } + public static Optional progress() { + return Optional.of(PROGRESS.get()); + } + @Override public synchronized void stop() { task() @@ -73,6 +81,18 @@ public synchronized void stop() { } }); PLUGIN.set(null); + + progress() + .ifPresent( + progressRuntime -> { + try { + progressRuntime.close(); + } catch (final IOException e) { + LOGGER.warn( + "[RuntimeService] Failed to close progress runtime: {}", e.getMessage(), e); + } + }); + PROGRESS.set(null); } @Override diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/ScheduleService.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/ScheduleService.java new file mode 100644 index 00000000..0687decc --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/ScheduleService.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.service; + +import org.apache.iotdb.collector.schedule.ScheduleJob; +import org.apache.iotdb.collector.schedule.SchedulePushEventJob; +import org.apache.iotdb.collector.schedule.ScheduleReportProgressJob; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +public class ScheduleService implements IService { + + private static final AtomicReference SCHEDULE_PUSH_EVENT_JOB = + new AtomicReference<>(); + private static final AtomicReference SCHEDULE_REPORT_PROGRESS_JOB = + new AtomicReference<>(); + + @Override + public void start() { + SCHEDULE_PUSH_EVENT_JOB.set(new SchedulePushEventJob()); + SCHEDULE_PUSH_EVENT_JOB.get().start(); + + SCHEDULE_REPORT_PROGRESS_JOB.set(new ScheduleReportProgressJob()); + SCHEDULE_REPORT_PROGRESS_JOB.get().start(); + } + + public static Optional pushEvent() { + return Optional.of(SCHEDULE_PUSH_EVENT_JOB.get()); + } + + public static Optional reportProgress() { + return Optional.of(SCHEDULE_REPORT_PROGRESS_JOB.get()); + } + + @Override + public synchronized void stop() { + pushEvent().ifPresent(ScheduleJob::stop); + SCHEDULE_PUSH_EVENT_JOB.set(null); + + reportProgress().ifPresent(ScheduleJob::stop); + SCHEDULE_REPORT_PROGRESS_JOB.set(null); + } + + @Override + public String name() { + return "ScheduleService"; + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/utils/SerializationUtil.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/utils/SerializationUtil.java new file mode 100644 index 00000000..38f62a96 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/utils/SerializationUtil.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.utils; + +import org.apache.iotdb.collector.runtime.progress.ProgressIndex; + +import org.apache.tsfile.utils.PublicBAOS; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +public class SerializationUtil { + + public static byte[] serialize(Map map) throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(map.size(), outputStream); + for (final Map.Entry entry : map.entrySet()) { + ReadWriteIOUtils.write(entry.getKey(), outputStream); + ReadWriteIOUtils.write(entry.getValue(), outputStream); + } + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()) + .array(); + } + } + + public static Map deserialize(final byte[] buffer) { + final Map attribute = new HashMap<>(); + final ByteBuffer attributeBuffer = ByteBuffer.wrap(buffer); + + final int size = ReadWriteIOUtils.readInt(attributeBuffer); + for (int i = 0; i < size; i++) { + final String key = ReadWriteIOUtils.readString(attributeBuffer); + final String value = ReadWriteIOUtils.readString(attributeBuffer); + + attribute.put(key, value); + } + + return attribute; + } + + public static byte[] serializeInstances(final Map map) + throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(map.size(), outputStream); + for (final Map.Entry entry : map.entrySet()) { + ReadWriteIOUtils.write(entry.getKey(), outputStream); + ReadWriteIOUtils.write(entry.getValue().getProgressInfo(), outputStream); + } + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()) + .array(); + } + } + + public static Map deserializeInstances(final byte[] buffer) { + final Map attribute = new HashMap<>(); + final ByteBuffer attributeBuffer = ByteBuffer.wrap(buffer); + + final int size = ReadWriteIOUtils.readInt(attributeBuffer); + for (int i = 0; i < size; i++) { + final Integer instanceIndex = ReadWriteIOUtils.readInt(attributeBuffer); + final Map attributeMap = ReadWriteIOUtils.readMap(attributeBuffer); + + attribute.put(instanceIndex, new ProgressIndex(instanceIndex, attributeMap)); + } + + return attribute; + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/utils/Triple.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/utils/Triple.java new file mode 100644 index 00000000..79671dd3 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/utils/Triple.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.utils; + +public class Triple { + public L left; + public M middle; + public R right; + + public Triple(final L left, final M middle, final R right) { + this.left = left; + this.middle = middle; + this.right = right; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((left == null) ? 0 : left.hashCode()); + result = prime * result + ((middle == null) ? 0 : middle.hashCode()); + result = prime * result + ((right == null) ? 0 : right.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Triple other = (Triple) obj; + if (left == null) { + return other.left == null; + } else if (!left.equals(other.left)) { + return false; + } + if (middle == null) { + return other.middle == null; + } else if (!middle.equals(other.middle)) { + return false; + } + if (right == null) { + return other.right == null; + } else return right.equals(other.right); + } + + @Override + public String toString() { + return "<" + left + ", " + middle + ", " + right + ">"; + } +} diff --git a/pom.xml b/pom.xml index 34890370..60c940a8 100644 --- a/pom.xml +++ b/pom.xml @@ -111,6 +111,7 @@ 2.5.0 3.1 4.13.2 + 2.8.2 1.3.15 1.8.0 @@ -980,6 +981,11 @@ iotdb-session ${iotdb.version} + + org.apache.kafka + kafka-clients + ${kafka.version} +