diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java index cd563dae..1b6d75b8 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java @@ -86,4 +86,11 @@ public Response dropTask( ? RuntimeService.task().get().dropTask(dropTaskRequest.getTaskId()) : Response.serverError().entity("Task runtime is down").build(); } + + @Override + public Response showTask(final SecurityContext securityContext) { + return RuntimeService.task().isPresent() + ? RuntimeService.task().get().showTask() + : Response.serverError().entity("Task runtime is down").build(); + } } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/TaskPersistence.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/TaskPersistence.java index 2cf0c6cf..88c4d9ca 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/TaskPersistence.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/TaskPersistence.java @@ -40,6 +40,9 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -222,4 +225,34 @@ public Optional> getTasksProgress(final String taskI return Optional.empty(); } + + public List> showTasks() { + final List> result = new ArrayList<>(); + final String queryAllTaskSQL = + "SELECT task_id, task_state, source_attribute, processor_attribute, sink_attribute, create_time FROM task"; + + try (final Connection connection = getConnection(); + final PreparedStatement statement = connection.prepareStatement(queryAllTaskSQL); + final ResultSet taskResultSet = statement.executeQuery()) { + while (taskResultSet.next()) { + final Map taskInfo = new LinkedHashMap<>(); + + taskInfo.put("taskId", taskResultSet.getString(1)); + taskInfo.put("taskState", TaskStateEnum.values()[taskResultSet.getInt(2)].name()); + taskInfo.put("sourceAttribute", SerializationUtil.deserialize(taskResultSet.getBytes(3))); + taskInfo.put( + "processorAttribute", SerializationUtil.deserialize(taskResultSet.getBytes(4))); + taskInfo.put("sinkAttribute", SerializationUtil.deserialize(taskResultSet.getBytes(5))); + taskInfo.put("createTime", taskResultSet.getTimestamp(6)); + + result.add(taskInfo); + } + + return result; + } catch (final SQLException e) { + LOGGER.warn("Failed to show tasks because {}", e.getMessage()); + } + + return result; + } } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/BuiltinPlugin.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/BuiltinPlugin.java index 8996f473..ba75247a 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/BuiltinPlugin.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/BuiltinPlugin.java @@ -25,8 +25,8 @@ import org.apache.iotdb.collector.plugin.builtin.sink.protocol.IoTDBDataRegionSyncConnector; import org.apache.iotdb.collector.plugin.builtin.source.HttpPullSource; import org.apache.iotdb.collector.plugin.builtin.source.HttpPushSource; -import org.apache.iotdb.collector.plugin.builtin.source.IoTDBPushSource; -import org.apache.iotdb.collector.plugin.builtin.source.KafkaSource; +import org.apache.iotdb.collector.plugin.builtin.source.iotdb.IoTDBPushSource; +import org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSource; public enum BuiltinPlugin { diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/processor/SubscriptionProcessor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/processor/SubscriptionProcessor.java index fdf2c9eb..1b933325 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/processor/SubscriptionProcessor.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/processor/SubscriptionProcessor.java @@ -19,7 +19,7 @@ package org.apache.iotdb.collector.plugin.builtin.processor; -import org.apache.iotdb.collector.plugin.builtin.source.event.SubDemoEvent; +import org.apache.iotdb.collector.plugin.builtin.source.iotdb.SubDemoEvent; import org.apache.iotdb.pipe.api.PipeProcessor; import org.apache.iotdb.pipe.api.collector.EventCollector; import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration; diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/KafkaSource.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/KafkaSource.java deleted file mode 100644 index e1662023..00000000 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/KafkaSource.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.collector.plugin.builtin.source; - -import org.apache.iotdb.collector.plugin.api.PushSource; -import org.apache.iotdb.collector.plugin.api.customizer.CollectorRuntimeEnvironment; -import org.apache.iotdb.collector.runtime.progress.ProgressIndex; -import org.apache.iotdb.collector.service.RuntimeService; -import org.apache.iotdb.pipe.api.customizer.configuration.PipeSourceRuntimeConfiguration; -import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; -import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.Duration; -import java.util.Collections; -import java.util.HashMap; -import java.util.Optional; -import java.util.Properties; -import java.util.concurrent.TimeUnit; - -import static org.apache.iotdb.collector.plugin.builtin.source.constant.KafkaSourceConstant.KAFKA_SOURCE_GROUP_ID_KEY; -import static org.apache.iotdb.collector.plugin.builtin.source.constant.KafkaSourceConstant.KAFKA_SOURCE_GROUP_ID_VALUE; -import static org.apache.iotdb.collector.plugin.builtin.source.constant.KafkaSourceConstant.KAFKA_SOURCE_TOPIC_KEY; -import static org.apache.iotdb.collector.plugin.builtin.source.constant.KafkaSourceConstant.KAFKA_SOURCE_TOPIC_VALUE; -import static org.apache.iotdb.collector.plugin.builtin.source.constant.KafkaSourceConstant.KAFKA_SOURCE_URL_KEY; -import static org.apache.iotdb.collector.plugin.builtin.source.constant.KafkaSourceConstant.KAFKA_SOURCE_URL_VALUE; - -public class KafkaSource extends PushSource { - - private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSource.class); - - private ProgressIndex startIndex; - private int instanceIndex; - - private Thread workerThread; - private volatile boolean isStarted = false; - - // kafka config - private String topic; - private String kafkaServiceURL; - private String groupId; - private long offset; - - @Override - public void validate(PipeParameterValidator validator) throws Exception {} - - @Override - public void customize( - PipeParameters pipeParameters, PipeSourceRuntimeConfiguration pipeSourceRuntimeConfiguration) - throws Exception { - final CollectorRuntimeEnvironment environment = - (CollectorRuntimeEnvironment) pipeSourceRuntimeConfiguration.getRuntimeEnvironment(); - - final String taskId = environment.getPipeName(); - instanceIndex = environment.getInstanceIndex(); - startIndex = - RuntimeService.progress().isPresent() - ? RuntimeService.progress().get().getInstanceProgressIndex(taskId, instanceIndex) - == null - ? new ProgressIndex(instanceIndex, new HashMap<>()) - : RuntimeService.progress().get().getInstanceProgressIndex(taskId, instanceIndex) - : new ProgressIndex(instanceIndex, new HashMap<>()); - - topic = pipeParameters.getStringOrDefault(KAFKA_SOURCE_TOPIC_KEY, KAFKA_SOURCE_TOPIC_VALUE); - kafkaServiceURL = - pipeParameters.getStringOrDefault(KAFKA_SOURCE_URL_KEY, KAFKA_SOURCE_URL_VALUE); - groupId = - pipeParameters.getStringOrDefault(KAFKA_SOURCE_GROUP_ID_KEY, KAFKA_SOURCE_GROUP_ID_VALUE); - } - - @Override - public void start() throws Exception { - if (workerThread == null || !workerThread.isAlive()) { - isStarted = true; - workerThread = new Thread(this::doWork); - workerThread.start(); - } - } - - public void doWork() { - final Properties props = new Properties(); - props.put("bootstrap.servers", kafkaServiceURL); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - props.put("group.id", groupId); - props.put("auto.offset.reset", "none"); - props.put("enable.auto.commit", "false"); - - try (KafkaConsumer consumer = new KafkaConsumer<>(props)) { - final TopicPartition currentWorkTopicPartition = new TopicPartition(topic, instanceIndex); - offset = Long.parseLong(startIndex.getProgressInfo().getOrDefault("offset", "0")); - - consumer.assign(Collections.singleton(currentWorkTopicPartition)); - consumer.seek(currentWorkTopicPartition, offset); - - while (isStarted && !Thread.currentThread().isInterrupted()) { - markPausePosition(); - - final ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (final ConsumerRecord record : records) { - LOGGER.info( - "Partition{} consumed offset={} key={} value={}", - instanceIndex, - record.offset(), - record.key(), - record.value()); - - offset = record.offset() + 1; - TimeUnit.SECONDS.sleep(1); - } - } - } catch (final InterruptedException e) { - throw new RuntimeException(e); - } - } - - @Override - public void close() throws Exception { - isStarted = false; - if (workerThread != null) { - workerThread.interrupt(); - try { - workerThread.join(1000); - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - } - workerThread = null; - } - } - - @Override - public Optional report() { - final HashMap progressInfo = new HashMap<>(); - progressInfo.put("offset", String.valueOf(offset)); - - return Optional.of(new ProgressIndex(instanceIndex, progressInfo)); - } -} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/constant/KafkaSourceConstant.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/constant/SourceConstant.java similarity index 66% rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/constant/KafkaSourceConstant.java rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/constant/SourceConstant.java index b8b3ec66..1da7ff65 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/constant/KafkaSourceConstant.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/constant/SourceConstant.java @@ -19,12 +19,9 @@ package org.apache.iotdb.collector.plugin.builtin.source.constant; -public class KafkaSourceConstant { - public static final String KAFKA_SOURCE_TOPIC_KEY = "topic"; - public static final String KAFKA_SOURCE_URL_KEY = "url"; - public static final String KAFKA_SOURCE_GROUP_ID_KEY = "group-id"; +public class SourceConstant { - public static final String KAFKA_SOURCE_TOPIC_VALUE = "my_topic"; - public static final String KAFKA_SOURCE_URL_VALUE = "localhost:9092"; - public static final String KAFKA_SOURCE_GROUP_ID_VALUE = "multi-thread-group"; + public static final String REPORT_TIME_INTERVAL_KEY = "report-time-interval"; + + public static final String REPORT_TIME_INTERVAL_DEFAULT_VALUE = "60"; } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/IoTDBPushSource.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/iotdb/IoTDBPushSource.java similarity index 95% rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/IoTDBPushSource.java rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/iotdb/IoTDBPushSource.java index affb5cc1..7b97703a 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/IoTDBPushSource.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/iotdb/IoTDBPushSource.java @@ -17,11 +17,9 @@ * under the License. */ -package org.apache.iotdb.collector.plugin.builtin.source; +package org.apache.iotdb.collector.plugin.builtin.source.iotdb; import org.apache.iotdb.collector.plugin.api.PushSource; -import org.apache.iotdb.collector.plugin.builtin.source.constant.IoTDBPushSourceConstant; -import org.apache.iotdb.collector.plugin.builtin.source.event.SubDemoEvent; import org.apache.iotdb.collector.runtime.progress.ProgressIndex; import org.apache.iotdb.pipe.api.customizer.configuration.PipeSourceRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/constant/IoTDBPushSourceConstant.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/iotdb/IoTDBPushSourceConstant.java similarity index 95% rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/constant/IoTDBPushSourceConstant.java rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/iotdb/IoTDBPushSourceConstant.java index 9bc516e1..aefa0198 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/constant/IoTDBPushSourceConstant.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/iotdb/IoTDBPushSourceConstant.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.collector.plugin.builtin.source.constant; +package org.apache.iotdb.collector.plugin.builtin.source.iotdb; public class IoTDBPushSourceConstant { public static final String HOST_KEY = "host"; diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/event/SubDemoEvent.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/iotdb/SubDemoEvent.java similarity index 96% rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/event/SubDemoEvent.java rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/iotdb/SubDemoEvent.java index 8eb95a58..ea547f8e 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/event/SubDemoEvent.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/iotdb/SubDemoEvent.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.collector.plugin.builtin.source.event; +package org.apache.iotdb.collector.plugin.builtin.source.iotdb; import org.apache.iotdb.collector.plugin.builtin.sink.event.PipeRawTabletInsertionEvent; import org.apache.iotdb.collector.plugin.builtin.source.event.common.PipeRowCollector; diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/kafka/KafkaSource.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/kafka/KafkaSource.java new file mode 100644 index 00000000..54bdba0a --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/kafka/KafkaSource.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.plugin.builtin.source.kafka; + +import org.apache.iotdb.collector.plugin.api.PushSource; +import org.apache.iotdb.collector.plugin.api.customizer.CollectorRuntimeEnvironment; +import org.apache.iotdb.collector.runtime.progress.ProgressIndex; +import org.apache.iotdb.collector.service.RuntimeService; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeSourceRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.function.Predicate; + +import static org.apache.iotdb.collector.plugin.builtin.source.constant.SourceConstant.REPORT_TIME_INTERVAL_DEFAULT_VALUE; +import static org.apache.iotdb.collector.plugin.builtin.source.constant.SourceConstant.REPORT_TIME_INTERVAL_KEY; +import static org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSourceConstant.AUTO_OFFSET_RESET_SET; +import static org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSourceConstant.BOOLEAN_SET; +import static org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSourceConstant.KAFKA_SOURCE_AUTO_OFFSET_RESET_DEFAULT_VALUE; +import static org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSourceConstant.KAFKA_SOURCE_AUTO_OFFSET_RESET_KEY; +import static org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSourceConstant.KAFKA_SOURCE_BOOTSTRAP_SERVERS_DEFAULT_VALUE; +import static org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSourceConstant.KAFKA_SOURCE_BOOTSTRAP_SERVERS_KEY; +import static org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSourceConstant.KAFKA_SOURCE_ENABLE_AUTO_COMMIT_DEFAULT_VALUE; +import static org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSourceConstant.KAFKA_SOURCE_ENABLE_AUTO_COMMIT_KEY; +import static org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSourceConstant.KAFKA_SOURCE_GROUP_ID_DEFAULT_VALUE; +import static org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSourceConstant.KAFKA_SOURCE_GROUP_ID_KEY; +import static org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSourceConstant.KAFKA_SOURCE_KEY_DESERIALIZER_DEFAULT_VALUE; +import static org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSourceConstant.KAFKA_SOURCE_KEY_DESERIALIZER_KEY; +import static org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSourceConstant.KAFKA_SOURCE_MAX_POLL_INTERVAL_MS_DEFAULT_VALUE; +import static org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSourceConstant.KAFKA_SOURCE_MAX_POLL_INTERVAL_MS_KEY; +import static org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSourceConstant.KAFKA_SOURCE_MAX_POLL_RECORDS_DEFAULT_VALUE; +import static org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSourceConstant.KAFKA_SOURCE_MAX_POLL_RECORDS_KEY; +import static org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSourceConstant.KAFKA_SOURCE_PARTITION_ASSIGN_STRATEGY_DEFAULT_VALUE; +import static org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSourceConstant.KAFKA_SOURCE_PARTITION_ASSIGN_STRATEGY_KEY; +import static org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSourceConstant.KAFKA_SOURCE_SESSION_TIMEOUT_MS_DEFAULT_VALUE; +import static org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSourceConstant.KAFKA_SOURCE_SESSION_TIMEOUT_MS_KEY; +import static org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSourceConstant.KAFKA_SOURCE_TOPIC_DEFAULT_VALUE; +import static org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSourceConstant.KAFKA_SOURCE_TOPIC_KEY; +import static org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSourceConstant.KAFKA_SOURCE_VALUE_DESERIALIZER_DEFAULT_VALUE; +import static org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSourceConstant.KAFKA_SOURCE_VALUE_DESERIALIZER_KEY; + +public class KafkaSource extends PushSource { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSource.class); + + private ProgressIndex startIndex; + private int instanceIndex; + + private Thread workerThread; + private volatile boolean isStarted = false; + private volatile KafkaConsumer consumer; + + // kafka config + private String topic; + private String bootstrapServers; + private String groupId; + private String keyDeserializer; + private String valueDeserializer; + private String autoOffsetReset; + private boolean enableAutoCommit; + private int sessionTimeoutMs; + private int maxPollIntervalMs; + private int maxPollRecords; + private String partitionAssignmentStrategy; + + private long offset; + + private int reportTimeInterval; + + @Override + public void validate(PipeParameterValidator validator) throws Exception { + validateRequiredParam(validator, KAFKA_SOURCE_TOPIC_KEY); + validateRequiredParam(validator, KAFKA_SOURCE_GROUP_ID_KEY); + + validateParam( + validator, + KAFKA_SOURCE_AUTO_OFFSET_RESET_KEY, + autoOffsetReset -> AUTO_OFFSET_RESET_SET.contains(String.valueOf(autoOffsetReset)), + KAFKA_SOURCE_AUTO_OFFSET_RESET_DEFAULT_VALUE); + + validateParam( + validator, + KAFKA_SOURCE_ENABLE_AUTO_COMMIT_KEY, + enableAutoCommit -> BOOLEAN_SET.contains(String.valueOf(enableAutoCommit)), + KAFKA_SOURCE_ENABLE_AUTO_COMMIT_DEFAULT_VALUE); + + validateIntegerParam(validator, KAFKA_SOURCE_SESSION_TIMEOUT_MS_KEY, value -> value > 0); + validateIntegerParam(validator, KAFKA_SOURCE_MAX_POLL_INTERVAL_MS_KEY, value -> value > 0); + validateIntegerParam(validator, KAFKA_SOURCE_MAX_POLL_RECORDS_KEY, value -> value > 0); + validateIntegerParam(validator, REPORT_TIME_INTERVAL_KEY, value -> value > 0); + } + + private void validateParam( + final PipeParameterValidator validator, + final String paramKey, + final Predicate validationCondition, + final String defaultValue) { + final String paramValue = validator.getParameters().getStringOrDefault(paramKey, defaultValue); + + validator.validate( + validationCondition::test, + String.format("%s must be one of %s, but got %s", paramKey, BOOLEAN_SET, paramValue), + paramValue); + } + + private void validateRequiredParam( + final PipeParameterValidator validator, final String paramKey) { + validator.validate(Objects::nonNull, String.format("%s is required", paramKey)); + } + + private void validateIntegerParam( + final PipeParameterValidator validator, + final String paramKey, + final Predicate validationCondition) { + final int paramValue = + validator.getParameters().getIntOrDefault(paramKey, Integer.parseInt(paramKey)); + + validator.validate( + value -> validationCondition.test((Integer) value), + String.format("%s must be > 0, but got %d", paramKey, paramValue), + paramValue); + } + + @Override + public void customize( + PipeParameters pipeParameters, PipeSourceRuntimeConfiguration pipeSourceRuntimeConfiguration) + throws Exception { + final CollectorRuntimeEnvironment environment = + (CollectorRuntimeEnvironment) pipeSourceRuntimeConfiguration.getRuntimeEnvironment(); + + final String taskId = environment.getPipeName(); + instanceIndex = environment.getInstanceIndex(); + startIndex = + RuntimeService.progress().isPresent() + ? RuntimeService.progress().get().getInstanceProgressIndex(taskId, instanceIndex) + == null + ? new ProgressIndex(instanceIndex, new HashMap<>()) + : RuntimeService.progress().get().getInstanceProgressIndex(taskId, instanceIndex) + : new ProgressIndex(instanceIndex, new HashMap<>()); + + topic = + pipeParameters.getStringOrDefault(KAFKA_SOURCE_TOPIC_KEY, KAFKA_SOURCE_TOPIC_DEFAULT_VALUE); + bootstrapServers = + pipeParameters.getStringOrDefault( + KAFKA_SOURCE_BOOTSTRAP_SERVERS_KEY, KAFKA_SOURCE_BOOTSTRAP_SERVERS_DEFAULT_VALUE); + groupId = + pipeParameters.getStringOrDefault( + KAFKA_SOURCE_GROUP_ID_KEY, KAFKA_SOURCE_GROUP_ID_DEFAULT_VALUE); + keyDeserializer = + pipeParameters.getStringOrDefault( + KAFKA_SOURCE_KEY_DESERIALIZER_KEY, KAFKA_SOURCE_KEY_DESERIALIZER_DEFAULT_VALUE); + valueDeserializer = + pipeParameters.getStringOrDefault( + KAFKA_SOURCE_VALUE_DESERIALIZER_KEY, KAFKA_SOURCE_VALUE_DESERIALIZER_DEFAULT_VALUE); + autoOffsetReset = + pipeParameters.getStringOrDefault( + KAFKA_SOURCE_AUTO_OFFSET_RESET_KEY, KAFKA_SOURCE_AUTO_OFFSET_RESET_DEFAULT_VALUE); + enableAutoCommit = + pipeParameters.getBooleanOrDefault(KAFKA_SOURCE_ENABLE_AUTO_COMMIT_KEY, false); + sessionTimeoutMs = + pipeParameters.getIntOrDefault( + KAFKA_SOURCE_SESSION_TIMEOUT_MS_KEY, + Integer.parseInt(KAFKA_SOURCE_SESSION_TIMEOUT_MS_DEFAULT_VALUE)); + maxPollRecords = + pipeParameters.getIntOrDefault( + KAFKA_SOURCE_MAX_POLL_RECORDS_KEY, + Integer.parseInt(KAFKA_SOURCE_MAX_POLL_RECORDS_DEFAULT_VALUE)); + maxPollIntervalMs = + pipeParameters.getIntOrDefault( + KAFKA_SOURCE_MAX_POLL_INTERVAL_MS_KEY, + Integer.parseInt(KAFKA_SOURCE_MAX_POLL_INTERVAL_MS_DEFAULT_VALUE)); + partitionAssignmentStrategy = + pipeParameters.getStringOrDefault( + KAFKA_SOURCE_PARTITION_ASSIGN_STRATEGY_KEY, + KAFKA_SOURCE_PARTITION_ASSIGN_STRATEGY_DEFAULT_VALUE); + reportTimeInterval = + pipeParameters.getIntOrDefault( + REPORT_TIME_INTERVAL_KEY, Integer.parseInt(REPORT_TIME_INTERVAL_DEFAULT_VALUE)); + } + + @Override + public void start() throws Exception { + if (workerThread == null || !workerThread.isAlive()) { + isStarted = true; + + workerThread = new Thread(this::doWork); + workerThread.setName("kafka-source-worker-" + instanceIndex); + workerThread.start(); + } + } + + public void doWork() { + initConsumer(); + + try { + final TopicPartition currentWorkTopicPartition = new TopicPartition(topic, instanceIndex); + offset = + Long.parseLong( + startIndex + .getProgressInfo() + .getOrDefault( + "offset", + report().isPresent() + ? report().get().getProgressInfo().getOrDefault("offset", "0") + : "0")); + + consumer.assign(Collections.singleton(currentWorkTopicPartition)); + if (!enableAutoCommit) { + consumer.seek(currentWorkTopicPartition, offset); + } + + while (isStarted && !Thread.currentThread().isInterrupted()) { + markPausePosition(); + + processRecords(consumer.poll(Duration.ofMillis(100))); + + if (enableAutoCommit) { + consumer.commitSync(); + } + } + } catch (final WakeupException e) { + LOGGER.warn("The kafka-consumer-worker-{} wakeup triggered", instanceIndex, e); + } catch (final Exception e) { + LOGGER.warn("Error occurred in kafka-consumer-worker-{}", instanceIndex, e); + } finally { + if (consumer != null) { + consumer.close(); + consumer = null; + } + } + } + + private void initConsumer() { + final Properties props = new Properties(); + + props.put(KAFKA_SOURCE_BOOTSTRAP_SERVERS_KEY, bootstrapServers); + props.put(KAFKA_SOURCE_KEY_DESERIALIZER_KEY, keyDeserializer); + props.put(KAFKA_SOURCE_VALUE_DESERIALIZER_KEY, valueDeserializer); + props.put(KAFKA_SOURCE_GROUP_ID_KEY, groupId); + props.put(KAFKA_SOURCE_AUTO_OFFSET_RESET_KEY, autoOffsetReset); + props.put(KAFKA_SOURCE_ENABLE_AUTO_COMMIT_KEY, enableAutoCommit); + props.put(KAFKA_SOURCE_SESSION_TIMEOUT_MS_KEY, sessionTimeoutMs); + props.put(KAFKA_SOURCE_MAX_POLL_RECORDS_KEY, maxPollRecords); + props.put(KAFKA_SOURCE_MAX_POLL_INTERVAL_MS_KEY, maxPollIntervalMs); + props.put(KAFKA_SOURCE_PARTITION_ASSIGN_STRATEGY_KEY, partitionAssignmentStrategy); + + consumer = new KafkaConsumer<>(props); + } + + private void processRecords(final ConsumerRecords records) { + records.forEach( + record -> { + try { + supplyRecord(record); + } catch (final Exception e) { + LOGGER.warn("Failed to process record at offset {}", record.offset(), e); + } + }); + } + + private void supplyRecord(final ConsumerRecord record) { + offset = record.offset(); + + LOGGER.debug( + "Consumed record: partition={}, offset={}, key={}, value={}", + instanceIndex, + record.offset(), + record.key(), + record.value()); + } + + @Override + public void close() throws Exception { + isStarted = false; + if (consumer != null) { + consumer.wakeup(); + } + + if (workerThread != null) { + workerThread.interrupt(); + try { + workerThread.join(1000); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + workerThread = null; + } + } + + @Override + public Optional report() { + final HashMap progressInfo = new HashMap<>(); + progressInfo.put("offset", String.valueOf(offset)); + progressInfo.put(REPORT_TIME_INTERVAL_KEY, String.valueOf(reportTimeInterval)); + + return Optional.of(new ProgressIndex(instanceIndex, progressInfo)); + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/kafka/KafkaSourceConstant.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/kafka/KafkaSourceConstant.java new file mode 100644 index 00000000..bf9d6505 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/kafka/KafkaSourceConstant.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.plugin.builtin.source.kafka; + +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +public class KafkaSourceConstant { + + public static final String KAFKA_SOURCE_TOPIC_KEY = "topic"; + public static final String KAFKA_SOURCE_BOOTSTRAP_SERVERS_KEY = "bootstrap.servers"; + public static final String KAFKA_SOURCE_GROUP_ID_KEY = "group.id"; + public static final String KAFKA_SOURCE_KEY_DESERIALIZER_KEY = "key.deserializer"; + public static final String KAFKA_SOURCE_VALUE_DESERIALIZER_KEY = "value.deserializer"; + public static final String KAFKA_SOURCE_AUTO_OFFSET_RESET_KEY = "auto.offset.reset"; + public static final String KAFKA_SOURCE_ENABLE_AUTO_COMMIT_KEY = "enable.auto.commit"; + public static final String KAFKA_SOURCE_SESSION_TIMEOUT_MS_KEY = "session.timeout.ms"; + public static final String KAFKA_SOURCE_MAX_POLL_RECORDS_KEY = "max.poll.records"; + public static final String KAFKA_SOURCE_MAX_POLL_INTERVAL_MS_KEY = "max.poll.interval.ms"; + public static final String KAFKA_SOURCE_PARTITION_ASSIGN_STRATEGY_KEY = + "partition.assign.strategy"; + + public static final String KAFKA_SOURCE_TOPIC_DEFAULT_VALUE = ""; + public static final String KAFKA_SOURCE_BOOTSTRAP_SERVERS_DEFAULT_VALUE = "localhost:9092"; + public static final String KAFKA_SOURCE_GROUP_ID_DEFAULT_VALUE = ""; + public static final String KAFKA_SOURCE_KEY_DESERIALIZER_DEFAULT_VALUE = + StringDeserializer.class.getName(); + public static final String KAFKA_SOURCE_VALUE_DESERIALIZER_DEFAULT_VALUE = + StringDeserializer.class.getName(); + public static final String KAFKA_SOURCE_AUTO_OFFSET_RESET_DEFAULT_VALUE = "none"; + public static final String KAFKA_SOURCE_ENABLE_AUTO_COMMIT_DEFAULT_VALUE = "false"; + public static final String KAFKA_SOURCE_SESSION_TIMEOUT_MS_DEFAULT_VALUE = "10000"; + public static final String KAFKA_SOURCE_MAX_POLL_RECORDS_DEFAULT_VALUE = "500"; + public static final String KAFKA_SOURCE_MAX_POLL_INTERVAL_MS_DEFAULT_VALUE = "300000"; + public static final String KAFKA_SOURCE_PARTITION_ASSIGN_STRATEGY_DEFAULT_VALUE = + RangeAssignor.class.getName(); + + public static final Set AUTO_OFFSET_RESET_SET = + Collections.unmodifiableSet(new HashSet<>(Arrays.asList("none", "earliest", "latest"))); + public static final Set BOOLEAN_SET = + Collections.unmodifiableSet(new HashSet<>(Arrays.asList("true", "false"))); + + private KafkaSourceConstant() { + throw new IllegalStateException("Utility class"); + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java index 69a80f8f..d85a1952 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java @@ -24,8 +24,8 @@ import org.apache.iotdb.collector.plugin.builtin.BuiltinPlugin; import org.apache.iotdb.collector.plugin.builtin.source.HttpPullSource; import org.apache.iotdb.collector.plugin.builtin.source.HttpPushSource; -import org.apache.iotdb.collector.plugin.builtin.source.IoTDBPushSource; -import org.apache.iotdb.collector.plugin.builtin.source.KafkaSource; +import org.apache.iotdb.collector.plugin.builtin.source.iotdb.IoTDBPushSource; +import org.apache.iotdb.collector.plugin.builtin.source.kafka.KafkaSource; import org.apache.iotdb.collector.runtime.plugin.meta.PluginMetaKeeper; import org.apache.iotdb.pipe.api.PipeSource; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java index 144af7d4..0dd1d355 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java @@ -103,6 +103,8 @@ public synchronized Response createTask( .build(); } catch (final Exception e) { RuntimeService.progress().ifPresent(progress -> progress.removeTaskProgress(taskId)); + PersistenceService.task().ifPresent(task -> tasks.remove(taskId)); + tasks.remove(taskId); LOGGER.warn("Failed to create task {} because {}", taskId, e.getMessage(), e); @@ -193,6 +195,12 @@ public synchronized Response dropTask(final String taskId) { } } + public Response showTask() { + return PersistenceService.task().isPresent() + ? Response.ok(PersistenceService.task().get().showTasks()).build() + : Response.serverError().build(); + } + @Override public synchronized void close() throws Exception { final long currentTime = System.currentTimeMillis(); diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java index ffa48a0a..5cf7468e 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java @@ -44,6 +44,8 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import static org.apache.iotdb.collector.plugin.builtin.source.constant.SourceConstant.REPORT_TIME_INTERVAL_KEY; + public class PullSourceTask extends SourceTask { private static final Logger LOGGER = LoggerFactory.getLogger(PullSourceTask.class); @@ -142,7 +144,18 @@ public void createInternal() throws Exception { new ProgressReportEvent(taskId, progresses))); } }, - TaskRuntimeOptions.TASK_PROGRESS_REPORT_INTERVAL.value()); + consumers[0].consumer().report().isPresent() + ? Integer.parseInt( + consumers[0] + .consumer() + .report() + .get() + .getProgressInfo() + .getOrDefault( + REPORT_TIME_INTERVAL_KEY, + String.valueOf( + TaskRuntimeOptions.TASK_PROGRESS_REPORT_INTERVAL.value()))) + : TaskRuntimeOptions.TASK_PROGRESS_REPORT_INTERVAL.value()); }); } diff --git a/iotdb-collector/collector-openapi/src/main/openapi3/v1/task.yaml b/iotdb-collector/collector-openapi/src/main/openapi3/v1/task.yaml index f2ba0f5b..56f46ed8 100644 --- a/iotdb-collector/collector-openapi/src/main/openapi3/v1/task.yaml +++ b/iotdb-collector/collector-openapi/src/main/openapi3/v1/task.yaml @@ -87,6 +87,13 @@ paths: "200": $ref: '#/components/responses/SuccessExecutionStatus' + /task/v1/show: + post: + operationId: showTask + responses: + "200": + $ref: '#/components/responses/SuccessExecutionStatus' + components: schemas: CreateTaskRequest: