diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSizeTieredCompactionIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSizeTieredCompactionIT.java index 8912732108660..6cb6c5488f935 100644 --- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSizeTieredCompactionIT.java +++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSizeTieredCompactionIT.java @@ -1021,7 +1021,7 @@ public void testSequenceInnerCompactionContinously() throws SQLException { CompactionPriority compactionPriority = IoTDBDescriptor.getInstance().getConfig().getCompactionPriority(); IoTDBDescriptor.getInstance().getConfig().setCompactionPriority(CompactionPriority.INNER_CROSS); - long originCompactionNum = CompactionTaskManager.getInstance().getFinishTaskNum(); + long originCompactionNum = CompactionTaskManager.getInstance().getFinishedTaskNum(); try (Connection connection = EnvFactory.getEnv().getConnection(); Statement statement = connection.createStatement()) { for (int i = 1; i <= 3; i++) { @@ -1044,7 +1044,7 @@ public void testSequenceInnerCompactionContinously() throws SQLException { } statement.execute("MERGE"); int totalWaitingTime = 0; - while (CompactionTaskManager.getInstance().getFinishTaskNum() - originCompactionNum < 2) { + while (CompactionTaskManager.getInstance().getFinishedTaskNum() - originCompactionNum < 2) { try { Thread.sleep(100); } catch (InterruptedException e) { @@ -1060,7 +1060,7 @@ public void testSequenceInnerCompactionContinously() throws SQLException { } } statement.execute("Merge"); - while (CompactionTaskManager.getInstance().getFinishTaskNum() - originCompactionNum < 3) { + while (CompactionTaskManager.getInstance().getFinishedTaskNum() - originCompactionNum < 3) { try { Thread.sleep(100); } catch (InterruptedException e) { @@ -1102,7 +1102,7 @@ public void testSequenceInnerCompactionConcurrently() throws SQLException { long oriTargetFileSize = IoTDBDescriptor.getInstance().getConfig().getTargetCompactionFileSize(); IoTDBDescriptor.getInstance().getConfig().setTargetCompactionFileSize(600); - long originCompactionNum = CompactionTaskManager.getInstance().getFinishTaskNum(); + long originCompactionNum = CompactionTaskManager.getInstance().getFinishedTaskNum(); try (Connection connection = EnvFactory.getEnv().getConnection(); Statement statement = connection.createStatement()) { for (int i = 1; i <= 3; i++) { @@ -1124,7 +1124,7 @@ public void testSequenceInnerCompactionConcurrently() throws SQLException { statement.execute("FLUSH"); } long totalWaitingTime = 0; - while (CompactionTaskManager.getInstance().getFinishTaskNum() - originCompactionNum < 1) { + while (CompactionTaskManager.getInstance().getFinishedTaskNum() - originCompactionNum < 1) { try { Thread.sleep(100); } catch (InterruptedException e) { @@ -1167,7 +1167,7 @@ public void testUnsequenceInnerCompactionContinously() throws SQLException { IoTDBDescriptor.getInstance().getConfig().getTargetCompactionFileSize(); IoTDBDescriptor.getInstance().getConfig().setConcurrentCompactionThread(2); IoTDBDescriptor.getInstance().getConfig().setTargetCompactionFileSize(600); - long originFinishCount = CompactionTaskManager.getInstance().getFinishTaskNum(); + long originFinishCount = CompactionTaskManager.getInstance().getFinishedTaskNum(); CompactionPriority compactionPriority = IoTDBDescriptor.getInstance().getConfig().getCompactionPriority(); IoTDBDescriptor.getInstance().getConfig().setCompactionPriority(CompactionPriority.INNER_CROSS); @@ -1199,7 +1199,7 @@ public void testUnsequenceInnerCompactionContinously() throws SQLException { } long totalWaitingTime = 0; statement.execute("MERGE"); - while (CompactionTaskManager.getInstance().getFinishTaskNum() - originFinishCount < 1) { + while (CompactionTaskManager.getInstance().getFinishedTaskNum() - originFinishCount < 1) { try { Thread.sleep(100); } catch (InterruptedException e) { @@ -1239,7 +1239,7 @@ public void testUnsequenceInnerCompactionConcurrently() throws SQLException { long oriTargetFileSize = IoTDBDescriptor.getInstance().getConfig().getTargetCompactionFileSize(); IoTDBDescriptor.getInstance().getConfig().setTargetCompactionFileSize(600); - long originCompactionNum = CompactionTaskManager.getInstance().getFinishTaskNum(); + long originCompactionNum = CompactionTaskManager.getInstance().getFinishedTaskNum(); try (Connection connection = EnvFactory.getEnv().getConnection(); Statement statement = connection.createStatement()) { for (int i = 1; i <= 3; i++) { @@ -1267,7 +1267,7 @@ public void testUnsequenceInnerCompactionConcurrently() throws SQLException { statement.execute("FLUSH"); } int totalWaitingTime = 0; - while (CompactionTaskManager.getInstance().getFinishTaskNum() - originCompactionNum < 1) { + while (CompactionTaskManager.getInstance().getFinishedTaskNum() - originCompactionNum < 1) { try { Thread.sleep(100); } catch (InterruptedException e) { @@ -1305,7 +1305,7 @@ public void testSequenceAndUnsequenceInnerCompactionConcurrently() throws SQLExc long oriTargetFileSize = IoTDBDescriptor.getInstance().getConfig().getTargetCompactionFileSize(); IoTDBDescriptor.getInstance().getConfig().setTargetCompactionFileSize(600); - long originCompactionNum = CompactionTaskManager.getInstance().getFinishTaskNum(); + long originCompactionNum = CompactionTaskManager.getInstance().getFinishedTaskNum(); try (Connection connection = EnvFactory.getEnv().getConnection(); Statement statement = connection.createStatement()) { for (int i = 1; i <= 3; i++) { @@ -1336,7 +1336,7 @@ public void testSequenceAndUnsequenceInnerCompactionConcurrently() throws SQLExc statement.execute("FLUSH"); } int totalWaitingTime = 0; - while (CompactionTaskManager.getInstance().getFinishTaskNum() - originCompactionNum < 2) { + while (CompactionTaskManager.getInstance().getFinishedTaskNum() - originCompactionNum < 2) { try { Thread.sleep(100); } catch (InterruptedException e) { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java index ac5349abbd8d7..cc4ad3d66e32f 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java @@ -58,18 +58,18 @@ public static void scheduleCompaction(TsFileManager tsFileManager, long timePart try { tryToSubmitCrossSpaceCompactionTask( tsFileManager.getStorageGroupName(), - tsFileManager.getDataRegion(), + tsFileManager.getDataRegionId(), timePartition, tsFileManager); tryToSubmitInnerSpaceCompactionTask( tsFileManager.getStorageGroupName(), - tsFileManager.getDataRegion(), + tsFileManager.getDataRegionId(), timePartition, tsFileManager, true); tryToSubmitInnerSpaceCompactionTask( tsFileManager.getStorageGroupName(), - tsFileManager.getDataRegion(), + tsFileManager.getDataRegionId(), timePartition, tsFileManager, false); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java index 34bcd01de8816..bb5af28d03009 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java @@ -21,7 +21,6 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; -import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.service.IService; @@ -41,15 +40,14 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; /** CompactionMergeTaskPoolManager provides a ThreadPool tPro queue and run all compaction tasks. */ @@ -69,20 +67,13 @@ public class CompactionTaskManager implements IService { private WrappedThreadPoolExecutor subCompactionTaskExecutionPool; public static volatile AtomicInteger currentTaskNum = new AtomicInteger(0); - private FixedPriorityBlockingQueue candidateCompactionTaskQueue = + private final FixedPriorityBlockingQueue candidateCompactionTaskQueue = new FixedPriorityBlockingQueue<>(1024, new DefaultCompactionTaskComparatorImpl()); - // , it is used to store all compaction tasks under each + // , it is used to store all compaction tasks under each // virtualStorageGroup - private Map>> - storageGroupTasks = new HashMap<>(); - - // The thread pool that periodically fetches and executes the compaction task from - // candidateCompactionTaskQueue to taskExecutionPool. The default number of threads for this pool - // is 1. - private ScheduledExecutorService compactionTaskSubmissionThreadPool; - - private final long TASK_SUBMIT_INTERVAL = - IoTDBDescriptor.getInstance().getConfig().getCompactionSubmissionIntervalInMs(); + private final Map>> + storageGroupTasks = new ConcurrentHashMap<>(); + private final AtomicInteger finishedTaskNum = new AtomicInteger(0); private final RateLimiter mergeWriteRateLimiter = RateLimiter.create(Double.MAX_VALUE); @@ -99,46 +90,40 @@ public synchronized void start() { && (config.isEnableSeqSpaceCompaction() || config.isEnableUnseqSpaceCompaction() || config.isEnableCrossSpaceCompaction())) { - this.taskExecutionPool = - (WrappedThreadPoolExecutor) - IoTDBThreadPoolFactory.newFixedThreadPool( - IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread(), - ThreadName.COMPACTION_SERVICE.getName()); - this.subCompactionTaskExecutionPool = - (WrappedThreadPoolExecutor) - IoTDBThreadPoolFactory.newFixedThreadPool( - IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread() - * IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum(), - ThreadName.COMPACTION_SUB_SERVICE.getName()); + initThreadPool(); currentTaskNum = new AtomicInteger(0); - compactionTaskSubmissionThreadPool = - IoTDBThreadPoolFactory.newScheduledThreadPool(1, ThreadName.COMPACTION_SERVICE.getName()); candidateCompactionTaskQueue.regsitPollLastHook( AbstractCompactionTask::resetCompactionCandidateStatusForAllSourceFiles); candidateCompactionTaskQueue.regsitPollLastHook( x -> CompactionMetricsRecorder.recordTaskInfo( x, CompactionTaskStatus.POLL_FROM_QUEUE, candidateCompactionTaskQueue.size())); - - // Periodically do the following: fetch the highest priority thread from the - // candidateCompactionTaskQueue, check that all tsfiles in the compaction task are valid, and - // if there is thread space available in the taskExecutionPool, put the compaction task thread - // into the taskExecutionPool and perform the compaction. - ScheduledExecutorUtil.safelyScheduleWithFixedDelay( - compactionTaskSubmissionThreadPool, - this::submitTaskFromTaskQueue, - TASK_SUBMIT_INTERVAL, - TASK_SUBMIT_INTERVAL, - TimeUnit.MILLISECONDS); } logger.info("Compaction task manager started."); } + private void initThreadPool() { + int compactionThreadNum = + IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread(); + this.taskExecutionPool = + (WrappedThreadPoolExecutor) + IoTDBThreadPoolFactory.newFixedThreadPool( + compactionThreadNum, ThreadName.COMPACTION_SERVICE.getName()); + this.subCompactionTaskExecutionPool = + (WrappedThreadPoolExecutor) + IoTDBThreadPoolFactory.newFixedThreadPool( + IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread() + * IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum(), + ThreadName.COMPACTION_SUB_SERVICE.getName()); + for (int i = 0; i < compactionThreadNum; ++i) { + taskExecutionPool.submit(new CompactionWorker(i, candidateCompactionTaskQueue)); + } + } + @Override public void stop() { if (taskExecutionPool != null) { taskExecutionPool.shutdownNow(); - compactionTaskSubmissionThreadPool.shutdownNow(); logger.info("Waiting for task taskExecutionPool to shut down"); waitTermination(); storageGroupTasks.clear(); @@ -150,7 +135,6 @@ public void stop() { public void waitAndStop(long milliseconds) { if (taskExecutionPool != null) { awaitTermination(taskExecutionPool, milliseconds); - awaitTermination(compactionTaskSubmissionThreadPool, milliseconds); logger.info("Waiting for task taskExecutionPool to shut down in {} ms", milliseconds); waitTermination(); storageGroupTasks.clear(); @@ -161,25 +145,28 @@ public void waitAndStop(long milliseconds) { public void waitAllCompactionFinish() { long sleepingStartTime = 0; if (taskExecutionPool != null) { - while (taskExecutionPool.getActiveCount() > 0 || taskExecutionPool.getQueue().size() > 0) { - // wait - try { - Thread.sleep(200); - sleepingStartTime += 200; - if (sleepingStartTime % 10000 == 0) { - logger.warn( - "Has waiting {} seconds for all compaction task finish", sleepingStartTime / 1000); - } - if (sleepingStartTime >= MAX_WAITING_TIME) { - return; + WrappedThreadPoolExecutor tmpThreadPool = taskExecutionPool; + taskExecutionPool = null; + candidateCompactionTaskQueue.clear(); + while (true) { + int totalSize = 0; + for (Map> taskMap : + storageGroupTasks.values()) { + totalSize += taskMap.size(); + } + if (totalSize > 0) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + logger.error("Interrupted when waiting all task finish", e); + break; } - } catch (InterruptedException e) { - logger.error("thread interrupted while waiting for compaction to end", e); - return; + } else { + break; } } storageGroupTasks.clear(); - candidateCompactionTaskQueue.clear(); + taskExecutionPool = tmpThreadPool; logger.info("All compaction task finish"); } } @@ -206,7 +193,7 @@ private void waitTermination() { private void awaitTermination(ExecutorService service, long milliseconds) { try { - service.shutdown(); + service.shutdownNow(); service.awaitTermination(milliseconds, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { logger.warn("CompactionThreadPool can not be closed in {} ms", milliseconds); @@ -241,40 +228,12 @@ public synchronized boolean addTaskToWaitingQueue(AbstractCompactionTask compact } private boolean isTaskRunning(AbstractCompactionTask task) { - String storageGroupName = task.getFullStorageGroupName(); + String regionWithSG = getSGWithRegionId(task.getStorageGroupName(), task.getDataRegionId()); return storageGroupTasks - .computeIfAbsent(storageGroupName, x -> new HashMap<>()) + .computeIfAbsent(regionWithSG, x -> new ConcurrentHashMap<>()) .containsKey(task); } - /** - * This method will submit task cached in queue with most priority to execution thread pool if - * there is available thread. - */ - public synchronized void submitTaskFromTaskQueue() { - try { - while (currentTaskNum.get() - < IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread() - && !candidateCompactionTaskQueue.isEmpty()) { - AbstractCompactionTask task = candidateCompactionTaskQueue.take(); - - // add metrics - CompactionMetricsRecorder.recordTaskInfo( - task, CompactionTaskStatus.POLL_FROM_QUEUE, candidateCompactionTaskQueue.size()); - - if (task != null && task.checkValidAndSetMerging()) { - submitTask(task); - CompactionMetricsRecorder.recordTaskInfo( - task, CompactionTaskStatus.READY_TO_EXECUTE, currentTaskNum.get()); - } else { - logger.warn("A task {} is not submitted", task); - } - } - } catch (InterruptedException e) { - logger.error("Exception occurs while submitting compaction task", e); - } - } - public RateLimiter getMergeWriteRateLimiter() { setWriteMergeRate( IoTDBDescriptor.getInstance().getConfig().getCompactionWriteThroughputMbPerSec()); @@ -303,35 +262,14 @@ public static void mergeRateLimiterAcquire(RateLimiter limiter, long bytesLength } public synchronized void removeRunningTaskFuture(AbstractCompactionTask task) { - String storageGroupName = task.getFullStorageGroupName(); - if (storageGroupTasks.containsKey(storageGroupName)) { - storageGroupTasks.get(storageGroupName).remove(task); + String regionWithSG = getSGWithRegionId(task.getStorageGroupName(), task.getDataRegionId()); + if (storageGroupTasks.containsKey(regionWithSG)) { + storageGroupTasks.get(regionWithSG).remove(task); } // add metrics CompactionMetricsRecorder.recordTaskInfo( task, CompactionTaskStatus.FINISHED, currentTaskNum.get()); - } - - /** - * This method will directly submit a task to thread pool if there is available thread. - * - * @return the future of the task. - */ - public synchronized Future submitTask( - AbstractCompactionTask compactionTask) throws RejectedExecutionException { - if (taskExecutionPool != null && !taskExecutionPool.isShutdown()) { - Future future = taskExecutionPool.submit(compactionTask); - storageGroupTasks - .computeIfAbsent(compactionTask.getFullStorageGroupName(), x -> new HashMap<>()) - .put(compactionTask, future); - return future; - } - logger.warn( - "A CompactionTask failed to be submitted to CompactionTaskManager because {}", - taskExecutionPool == null - ? "taskExecutionPool is null" - : "taskExecutionPool is terminated"); - return null; + finishedTaskNum.incrementAndGet(); } public synchronized Future submitSubTask(Callable subCompactionTask) { @@ -350,10 +288,10 @@ public synchronized Future submitSubTask(Callable subCompactionTask) public synchronized List abortCompaction(String storageGroupName) { List compactionTaskOfCurSG = new ArrayList<>(); if (storageGroupTasks.containsKey(storageGroupName)) { - for (Map.Entry> taskFutureEntry : + for (Map.Entry> compactionTaskEntry : storageGroupTasks.get(storageGroupName).entrySet()) { - taskFutureEntry.getValue().cancel(true); - compactionTaskOfCurSG.add(taskFutureEntry.getKey()); + compactionTaskEntry.getValue().cancel(true); + compactionTaskOfCurSG.add(compactionTaskEntry.getKey()); } } @@ -372,7 +310,12 @@ public boolean isAnyTaskInListStillRunning(List compacti } public int getExecutingTaskCount() { - return taskExecutionPool.getActiveCount() + taskExecutionPool.getQueue().size(); + int runningTaskCnt = 0; + for (Map> runningTaskMap : + storageGroupTasks.values()) { + runningTaskCnt += runningTaskMap.size(); + } + return runningTaskCnt; } public int getTotalTaskCount() { @@ -381,15 +324,27 @@ public int getTotalTaskCount() { public synchronized List getRunningCompactionTaskList() { List tasks = new ArrayList<>(); - for (Map> taskFutureMap : + for (Map> runningTaskMap : storageGroupTasks.values()) { - tasks.addAll(taskFutureMap.keySet()); + tasks.addAll(runningTaskMap.keySet()); } return tasks; } - public long getFinishTaskNum() { - return taskExecutionPool.getCompletedTaskCount(); + public long getFinishedTaskNum() { + return finishedTaskNum.get(); + } + + public void recordTask(AbstractCompactionTask task, Future summary) { + storageGroupTasks + .computeIfAbsent( + getSGWithRegionId(task.getStorageGroupName(), task.getDataRegionId()), + x -> new ConcurrentHashMap<>()) + .put(task, summary); + } + + public static String getSGWithRegionId(String storageGroupName, String dataRegionId) { + return storageGroupName + "-" + dataRegionId; } @TestOnly @@ -421,21 +376,8 @@ public void restart() throws InterruptedException { throw new RuntimeException("Failed to shutdown subCompactionTaskExecutionPool"); } } - this.taskExecutionPool = - (WrappedThreadPoolExecutor) - IoTDBThreadPoolFactory.newFixedThreadPool( - IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread(), - ThreadName.COMPACTION_SERVICE.getName()); - this.subCompactionTaskExecutionPool = - (WrappedThreadPoolExecutor) - IoTDBThreadPoolFactory.newFixedThreadPool( - IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread() - * IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum(), - ThreadName.COMPACTION_SUB_SERVICE.getName()); - this.compactionTaskSubmissionThreadPool = - IoTDBThreadPoolFactory.newScheduledThreadPool(1, ThreadName.COMPACTION_SERVICE.getName()); - candidateCompactionTaskQueue.regsitPollLastHook( - AbstractCompactionTask::resetCompactionCandidateStatusForAllSourceFiles); + initThreadPool(); + finishedTaskNum.set(0); candidateCompactionTaskQueue.clear(); } currentTaskNum = new AtomicInteger(0); @@ -446,4 +388,19 @@ public void restart() throws InterruptedException { public void clearCandidateQueue() { candidateCompactionTaskQueue.clear(); } + + @TestOnly + public Future getCompactionTaskFutureMayBlock(AbstractCompactionTask task) + throws InterruptedException, TimeoutException { + String regionWithSG = getSGWithRegionId(task.getStorageGroupName(), task.getDataRegionId()); + long startTime = System.currentTimeMillis(); + while (!storageGroupTasks.containsKey(regionWithSG) + || !storageGroupTasks.get(regionWithSG).containsKey(task)) { + Thread.sleep(10); + if (System.currentTimeMillis() - startTime > 20_000) { + throw new TimeoutException("Timeout when waiting for task future"); + } + } + return storageGroupTasks.get(regionWithSG).get(task); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionWorker.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionWorker.java new file mode 100644 index 0000000000000..86ef4b74a813e --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionWorker.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.compaction; + +import org.apache.iotdb.db.engine.compaction.constant.CompactionTaskStatus; +import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask; +import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary; +import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder; +import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue; + +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class CompactionWorker implements Runnable { + private static final Logger log = LoggerFactory.getLogger("COMPACTION"); + private final int threadId; + private final FixedPriorityBlockingQueue compactionTaskQueue; + + public CompactionWorker( + int threadId, FixedPriorityBlockingQueue compactionTaskQueue) { + this.threadId = threadId; + this.compactionTaskQueue = compactionTaskQueue; + } + + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + AbstractCompactionTask task = null; + try { + task = compactionTaskQueue.take(); + } catch (InterruptedException e) { + log.warn("CompactionThread-{} terminates because interruption", threadId); + return; + } + if (task != null) { + // add metrics + CompactionMetricsRecorder.recordTaskInfo( + task, CompactionTaskStatus.POLL_FROM_QUEUE, compactionTaskQueue.size()); + if (task.checkValidAndSetMerging()) { + CompactionTaskSummary summary = task.getSummary(); + CompactionTaskFuture future = new CompactionTaskFuture(summary); + CompactionTaskManager.getInstance().recordTask(task, future); + task.start(); + } + } + } + } + + static class CompactionTaskFuture implements Future { + CompactionTaskSummary summary; + + public CompactionTaskFuture(CompactionTaskSummary summary) { + this.summary = summary; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + summary.cancel(); + return true; + } + + @Override + public boolean isCancelled() { + return summary.isCancel(); + } + + @Override + public boolean isDone() { + return summary.isFinished(); + } + + @Override + public CompactionTaskSummary get() throws InterruptedException, ExecutionException { + while (!summary.isFinished()) { + TimeUnit.MILLISECONDS.sleep(100); + } + return summary; + } + + @Override + public CompactionTaskSummary get(long timeout, @NotNull TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + long perSleepTime = timeout < 100 ? timeout : 100; + long totalSleepTime = 0L; + while (!summary.isFinished()) { + if (totalSleepTime >= timeout) { + throw new TimeoutException("Timeout when trying to get compaction task summary"); + } + unit.sleep(perSleepTime); + totalSleepTime += perSleepTime; + } + return summary; + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java index ae47fc3916ac3..bd20b35bdbf9a 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java @@ -67,7 +67,8 @@ public CrossSpaceCompactionTask( AtomicInteger currentTaskNum, long serialId) { super( - tsFileManager.getStorageGroupName() + "-" + tsFileManager.getDataRegion(), + tsFileManager.getStorageGroupName(), + tsFileManager.getDataRegionId(), timePartition, tsFileManager, currentTaskNum, @@ -81,7 +82,7 @@ public CrossSpaceCompactionTask( } @Override - protected void doCompaction() throws Exception { + protected void doCompaction() { try { if (!tsFileManager.isAllowCompaction()) { return; @@ -94,8 +95,9 @@ protected void doCompaction() throws Exception { || selectedSequenceFiles.isEmpty() || selectedUnsequenceFiles.isEmpty()) { LOGGER.info( - "{} [Compaction] Cross space compaction file list is empty, end it", - fullStorageGroupName); + "{}-{} [Compaction] Cross space compaction file list is empty, end it", + storageGroupName, + dataRegionId); return; } @@ -107,8 +109,9 @@ protected void doCompaction() throws Exception { } LOGGER.info( - "{} [Compaction] CrossSpaceCompactionTask start. Sequence files : {}, unsequence files : {}, total size is {} MB", - fullStorageGroupName, + "{}-{} [Compaction] CrossSpaceCompactionTask start. Sequence files : {}, unsequence files : {}, total size is {} MB", + storageGroupName, + dataRegionId, selectedSequenceFiles, selectedUnsequenceFiles, ((double) selectedFileSize) / 1024.0 / 1024.0); @@ -130,9 +133,11 @@ protected void doCompaction() throws Exception { performer.setSourceFiles(selectedSequenceFiles, selectedUnsequenceFiles); performer.setTargetFiles(targetTsfileResourceList); + performer.setSummary(summary); performer.perform(); - CompactionUtils.moveTargetFile(targetTsfileResourceList, false, fullStorageGroupName); + CompactionUtils.moveTargetFile( + targetTsfileResourceList, false, storageGroupName + "-" + dataRegionId); CompactionUtils.combineModsInCrossCompaction( selectedSequenceFiles, selectedUnsequenceFiles, targetTsfileResourceList); @@ -156,8 +161,9 @@ protected void doCompaction() throws Exception { } long costTime = (System.currentTimeMillis() - startTime) / 1000; LOGGER.info( - "{} [Compaction] CrossSpaceCompactionTask Costs {} s, compaction speed is {} MB/s", - fullStorageGroupName, + "{}-{} [Compaction] CrossSpaceCompactionTask Costs {} s, compaction speed is {} MB/s", + storageGroupName, + dataRegionId, costTime, ((double) selectedFileSize) / 1024.0d / 1024.0d / costTime); } @@ -165,12 +171,18 @@ protected void doCompaction() throws Exception { // catch throwable to handle OOM errors if (!(throwable instanceof InterruptedException)) { LOGGER.error( - "{} [Compaction] Meet errors in cross space compaction.", fullStorageGroupName); + "{}-{} [Compaction] Meet errors in cross space compaction.", + storageGroupName, + dataRegionId); + } else { + LOGGER.warn("{}-{} [Compaction] Compaction interrupted", storageGroupName, dataRegionId); + // clean the interrupted flag + Thread.interrupted(); } // handle exception CompactionExceptionHandler.handleException( - fullStorageGroupName, + storageGroupName + "-" + dataRegionId, logFile, targetTsfileResourceList, selectedSequenceFiles, @@ -179,7 +191,6 @@ protected void doCompaction() throws Exception { timePartition, false, true); - throw throwable; } finally { releaseAllLock(); } @@ -228,7 +239,9 @@ public List getSelectedUnsequenceFiles() { @Override public String toString() { - return fullStorageGroupName + return storageGroupName + + "-" + + dataRegionId + "-" + timePartition + " task seq files are " @@ -305,8 +318,4 @@ private boolean addReadLock(List tsFileResourceList) { } return true; } - - public String getStorageGroupName() { - return fullStorageGroupName; - } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java index 91e31fd126f96..c13394742175c 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java @@ -57,6 +57,7 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { protected int maxCompactionCount; protected TsFileResourceList tsFileResourceList; + protected List targetTsFileList; protected boolean[] isHoldingReadLock; protected boolean[] isHoldingWriteLock; @@ -69,7 +70,8 @@ public InnerSpaceCompactionTask( AtomicInteger currentTaskNum, long serialId) { super( - tsFileManager.getStorageGroupName() + "-" + tsFileManager.getDataRegion(), + tsFileManager.getStorageGroupName(), + tsFileManager.getDataRegionId(), timePartition, tsFileManager, currentTaskNum, @@ -93,23 +95,26 @@ public InnerSpaceCompactionTask( } @Override - protected void doCompaction() throws Exception { + protected void doCompaction() { if (!tsFileManager.isAllowCompaction()) { return; } long startTime = System.currentTimeMillis(); // get resource of target file String dataDirectory = selectedTsFileResourceList.get(0).getTsFile().getParent(); - // Here is tmpTargetFile, which is xxx.target - targetTsFileResource = - TsFileNameGenerator.getInnerCompactionTargetFileResource( - selectedTsFileResourceList, sequence); - List targetTsFileList = - new ArrayList<>(Collections.singletonList(targetTsFileResource)); LOGGER.info( - "{} [Compaction] starting compaction task with {} files", - fullStorageGroupName, + "{}-{} [Compaction] starting compaction task with {} files", + storageGroupName, + dataRegionId, selectedTsFileResourceList.size()); + try { + targetTsFileResource = + TsFileNameGenerator.getInnerCompactionTargetFileResource( + selectedTsFileResourceList, sequence); + } catch (IOException e) { + LOGGER.error("Failed to get target file for {}", selectedTsFileResourceList, e); + return; + } File logFile = new File( dataDirectory @@ -117,29 +122,39 @@ protected void doCompaction() throws Exception { + targetTsFileResource.getTsFile().getName() + CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX); try (CompactionLogger compactionLogger = new CompactionLogger(logFile)) { + // Here is tmpTargetFile, which is xxx.target + targetTsFileList = new ArrayList<>(Collections.singletonList(targetTsFileResource)); compactionLogger.logFiles(selectedTsFileResourceList, CompactionLogger.STR_SOURCE_FILES); compactionLogger.logFiles(targetTsFileList, CompactionLogger.STR_TARGET_FILES); - LOGGER.info("{} [InnerSpaceCompactionTask] Close the logger", fullStorageGroupName); + LOGGER.info( + "{}-{} [InnerSpaceCompactionTask] Close the logger", storageGroupName, dataRegionId); compactionLogger.close(); LOGGER.info( - "{} [Compaction] compaction with {}", fullStorageGroupName, selectedTsFileResourceList); + "{}-{} [Compaction] compaction with {}", + storageGroupName, + dataRegionId, + selectedTsFileResourceList); // carry out the compaction performer.setSourceFiles(selectedTsFileResourceList); // As elements in targetFiles may be removed in ReadPointCompactionPerformer, we should use a // mutable list instead of Collections.singletonList() performer.setTargetFiles(targetTsFileList); + performer.setSummary(summary); performer.perform(); - CompactionUtils.moveTargetFile(targetTsFileList, true, fullStorageGroupName); + CompactionUtils.moveTargetFile(targetTsFileList, true, storageGroupName + "-" + dataRegionId); - LOGGER.info("{} [InnerSpaceCompactionTask] start to rename mods file", fullStorageGroupName); + LOGGER.info( + "{}-{} [InnerSpaceCompactionTask] start to rename mods file", + storageGroupName, + dataRegionId); CompactionUtils.combineModsInInnerCompaction( selectedTsFileResourceList, targetTsFileResource); - if (Thread.currentThread().isInterrupted()) { + if (Thread.currentThread().isInterrupted() || summary.isCancel()) { throw new InterruptedException( - String.format("%s [Compaction] abort", fullStorageGroupName)); + String.format("%s-%s [Compaction] abort", storageGroupName, dataRegionId)); } // replace the old files with new file, the new is in same position as the old @@ -160,8 +175,9 @@ protected void doCompaction() throws Exception { } LOGGER.info( - "{} [Compaction] Compacted target files, try to get the write lock of source files", - fullStorageGroupName); + "{}-{} [Compaction] Compacted target files, try to get the write lock of source files", + storageGroupName, + dataRegionId); // release the read lock of all source files, and get the write lock of them to delete them for (int i = 0; i < selectedTsFileResourceList.size(); ++i) { @@ -182,17 +198,21 @@ protected void doCompaction() throws Exception { } LOGGER.info( - "{} [Compaction] compaction finish, start to delete old files", fullStorageGroupName); + "{}-{} [Compaction] compaction finish, start to delete old files", + storageGroupName, + dataRegionId); // delete the old files - CompactionUtils.deleteTsFilesInDisk(selectedTsFileResourceList, fullStorageGroupName); + CompactionUtils.deleteTsFilesInDisk( + selectedTsFileResourceList, storageGroupName + "-" + dataRegionId); CompactionUtils.deleteModificationForSourceFile( - selectedTsFileResourceList, fullStorageGroupName); + selectedTsFileResourceList, storageGroupName + "-" + dataRegionId); double costTime = (System.currentTimeMillis() - startTime) / 1000.0d; LOGGER.info( - "{} [InnerSpaceCompactionTask] all compaction task finish, target file is {}," + "{}-{} [InnerSpaceCompactionTask] all compaction task finish, target file is {}," + "time cost is {} s, compaction speed is {} MB/s", - fullStorageGroupName, + storageGroupName, + dataRegionId, targetTsFileResource.getTsFile().getName(), costTime, ((double) selectedFileSize) / 1024.0d / 1024.0d / costTime); @@ -204,15 +224,20 @@ protected void doCompaction() throws Exception { // catch throwable to handle OOM errors if (!(throwable instanceof InterruptedException)) { LOGGER.error( - "{} [Compaction] Meet errors in inner space compaction.", - fullStorageGroupName, + "{}-{} [Compaction] Meet errors in inner space compaction.", + storageGroupName, + dataRegionId, throwable); + } else { + // clean the interrupt flag + LOGGER.warn("{}-{} [Compaction] Compaction interrupted", storageGroupName, dataRegionId); + Thread.interrupted(); } // handle exception if (isSequence()) { CompactionExceptionHandler.handleException( - fullStorageGroupName, + storageGroupName + "-" + dataRegionId, logFile, targetTsFileList, selectedTsFileResourceList, @@ -223,7 +248,7 @@ protected void doCompaction() throws Exception { isSequence()); } else { CompactionExceptionHandler.handleException( - fullStorageGroupName, + storageGroupName + "-" + dataRegionId, logFile, targetTsFileList, Collections.emptyList(), @@ -233,7 +258,6 @@ protected void doCompaction() throws Exception { true, isSequence()); } - throw throwable; } finally { releaseFileLocksAndResetMergingStatus(); } @@ -302,7 +326,9 @@ public long getMaxFileVersion() { @Override public String toString() { - return fullStorageGroupName + return storageGroupName + + "-" + + dataRegionId + "-" + timePartition + " task file num is " diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/ICompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/ICompactionPerformer.java index 9ad1b5b310e61..172eb50ee7bfb 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/ICompactionPerformer.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/ICompactionPerformer.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.engine.compaction.performer; import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.StorageEngineException; @@ -38,6 +39,8 @@ void perform() void setTargetFiles(List targetFiles); + void setSummary(CompactionTaskSummary summary); + default void setSourceFiles(List files) { throw new RuntimeException("Cannot set single type of source files to this kind of performer"); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java index 2a3d50d9ff672..1a94214848e54 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator; import org.apache.iotdb.db.engine.compaction.inner.utils.SingleSeriesCompactionExecutor; import org.apache.iotdb.db.engine.compaction.performer.ISeqCompactionPerformer; +import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata; @@ -46,6 +47,7 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME); private TsFileResource targetResource; private List seqFiles; + private CompactionTaskSummary summary; public ReadChunkCompactionPerformer(List sourceFiles, TsFileResource targetFile) { this.seqFiles = sourceFiles; @@ -96,6 +98,11 @@ public void setTargetFiles(List targetFiles) { this.targetResource = targetFiles.get(0); } + @Override + public void setSummary(CompactionTaskSummary summary) { + this.summary = summary; + } + private void compactAlignedSeries( String device, TsFileResource targetResource, @@ -112,7 +119,7 @@ private void compactAlignedSeries( } private void checkThreadInterrupted() throws InterruptedException { - if (Thread.interrupted()) { + if (Thread.interrupted() || summary.isCancel()) { throw new InterruptedException( String.format( "[Compaction] compaction for target file %s abort", targetResource.toString())); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java index 499868e80b31a..eecb159196763 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator; import org.apache.iotdb.db.engine.compaction.performer.ICrossCompactionPerformer; import org.apache.iotdb.db.engine.compaction.performer.IUnseqCompactionPerformer; +import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter; import org.apache.iotdb.db.engine.compaction.writer.CrossSpaceCompactionWriter; import org.apache.iotdb.db.engine.compaction.writer.InnerSpaceCompactionWriter; @@ -80,6 +81,7 @@ public class ReadPointCompactionPerformer private static final int subTaskNum = IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum(); private Map readerCacheMap = new HashMap<>(); + private CompactionTaskSummary summary; private List targetFiles = Collections.emptyList(); @@ -145,6 +147,11 @@ public void setTargetFiles(List targetFiles) { this.targetFiles = targetFiles; } + @Override + public void setSummary(CompactionTaskSummary summary) { + this.summary = summary; + } + private void compactAlignedSeries( String device, MultiTsFileDeviceIterator deviceIterator, @@ -408,7 +415,7 @@ private static void updatePlanIndexes( } private void checkThreadInterrupted() throws InterruptedException { - if (Thread.interrupted()) { + if (Thread.interrupted() || summary.isCancel()) { throw new InterruptedException( String.format( "[Compaction] compaction for target file %s abort", targetFiles.toString())); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java index 7a00e466be928..5b6597b4c6b1a 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java @@ -27,7 +27,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; /** @@ -37,27 +36,28 @@ * currentTaskNum in CompactionScheduler when the {@link AbstractCompactionTask#doCompaction()} is * finished. The future returns the {@link CompactionTaskSummary} of this task execution. */ -public abstract class AbstractCompactionTask implements Callable { +public abstract class AbstractCompactionTask { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME); - protected String fullStorageGroupName; + protected String dataRegionId; + protected String storageGroupName; protected long timePartition; protected final AtomicInteger currentTaskNum; protected final TsFileManager tsFileManager; - protected long timeCost = 0L; - protected volatile boolean ran = false; - protected volatile boolean finished = false; protected ICompactionPerformer performer; protected int hashCode = -1; + protected CompactionTaskSummary summary = new CompactionTaskSummary(); protected long serialId; public AbstractCompactionTask( - String fullStorageGroupName, + String storageGroupName, + String dataRegionId, long timePartition, TsFileManager tsFileManager, AtomicInteger currentTaskNum, long serialId) { - this.fullStorageGroupName = fullStorageGroupName; + this.storageGroupName = storageGroupName; + this.dataRegionId = dataRegionId; this.timePartition = timePartition; this.tsFileManager = tsFileManager; this.currentTaskNum = currentTaskNum; @@ -66,33 +66,28 @@ public AbstractCompactionTask( public abstract void setSourceFilesToCompactionCandidate(); - protected abstract void doCompaction() throws Exception; + protected abstract void doCompaction(); - @Override - public CompactionTaskSummary call() throws Exception { - ran = true; - long startTime = System.currentTimeMillis(); + public void start() { currentTaskNum.incrementAndGet(); boolean isSuccess = false; try { + summary.start(); doCompaction(); isSuccess = true; - } catch (InterruptedException e) { - LOGGER.warn("{} [Compaction] Current task is interrupted", fullStorageGroupName); - } catch (Throwable e) { - // Use throwable to catch OOM exception. - LOGGER.error("{} [Compaction] Running compaction task failed", fullStorageGroupName, e); } finally { this.currentTaskNum.decrementAndGet(); - timeCost = System.currentTimeMillis() - startTime; + summary.finish(isSuccess); CompactionTaskManager.getInstance().removeRunningTaskFuture(this); - finished = true; } - return new CompactionTaskSummary(isSuccess); } - public String getFullStorageGroupName() { - return fullStorageGroupName; + public String getStorageGroupName() { + return this.storageGroupName; + } + + public String getDataRegionId() { + return this.dataRegionId; } public long getTimePartition() { @@ -120,21 +115,34 @@ public boolean equals(Object other) { public abstract void resetCompactionCandidateStatusForAllSourceFiles(); public long getTimeCost() { - return timeCost; + return summary.getTimeCost(); } protected void checkInterrupted() throws InterruptedException { if (Thread.currentThread().isInterrupted()) { - throw new InterruptedException(String.format("%s [Compaction] abort", fullStorageGroupName)); + throw new InterruptedException( + String.format("%s-%s [Compaction] abort", storageGroupName, dataRegionId)); } } public boolean isTaskRan() { - return ran; + return summary.isRan(); + } + + public void cancel() { + summary.cancel(); + } + + public boolean isSuccess() { + return summary.isSuccess(); + } + + public CompactionTaskSummary getSummary() { + return summary; } public boolean isTaskFinished() { - return finished; + return summary.isFinished(); } public long getSerialId() { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionTaskSummary.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionTaskSummary.java index a7380969ff49f..a4c9435533755 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionTaskSummary.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionTaskSummary.java @@ -20,13 +20,56 @@ /** The summary of one {@link AbstractCompactionTask} execution */ public class CompactionTaskSummary { - private final boolean success; + private long timeCost = 0L; + private volatile Status status = Status.NOT_STARTED; + private long startTime = -1L; - public CompactionTaskSummary(boolean success) { - this.success = success; + public CompactionTaskSummary() {} + + public void start() { + this.status = Status.STARTED; + this.startTime = System.currentTimeMillis(); + } + + public void finish(boolean success) { + this.status = success ? Status.SUCCESS : Status.FAILED; + this.timeCost = System.currentTimeMillis() - this.startTime; + } + + public void cancel() { + if (this.status != Status.SUCCESS && this.status != Status.FAILED) { + this.status = Status.CANCELED; + if (this.startTime != -1) { + this.timeCost = System.currentTimeMillis() - this.startTime; + } + } + } + + public boolean isCancel() { + return this.status == Status.CANCELED; + } + + public boolean isFinished() { + return this.status == Status.SUCCESS || this.status == Status.FAILED; + } + + public boolean isRan() { + return this.status != Status.NOT_STARTED; } public boolean isSuccess() { - return success; + return this.status == Status.SUCCESS; + } + + public long getTimeCost() { + return timeCost; + } + + enum Status { + NOT_STARTED, + STARTED, + SUCCESS, + FAILED, + CANCELED } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index 3514b38a8cda3..54834733e783a 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@ -2447,7 +2447,6 @@ private void executeCompaction() { for (long timePartition : timePartitions) { CompactionScheduler.scheduleCompaction(tsFileManager, timePartition); } - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); } /** diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java index cd76c44a72173..11df2f8b4bee4 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java @@ -46,7 +46,7 @@ public class TsFileManager { private static final Logger LOGGER = LoggerFactory.getLogger(TsFileManager.class); private String storageGroupName; - private String dataRegion; + private String dataRegionId; private String storageGroupDir; /** Serialize queries, delete resource files, compaction cleanup files */ @@ -63,10 +63,10 @@ public class TsFileManager { private boolean allowCompaction = true; private AtomicLong currentCompactionTaskSerialId = new AtomicLong(0); - public TsFileManager(String storageGroupName, String dataRegion, String storageGroupDir) { + public TsFileManager(String storageGroupName, String dataRegionId, String storageGroupDir) { this.storageGroupName = storageGroupName; this.storageGroupDir = storageGroupDir; - this.dataRegion = dataRegion; + this.dataRegionId = dataRegionId; } public List getTsFileList(boolean sequence) { @@ -357,12 +357,12 @@ public void setAllowCompaction(boolean allowCompaction) { this.allowCompaction = allowCompaction; } - public String getDataRegion() { - return dataRegion; + public String getDataRegionId() { + return dataRegionId; } - public void setDataRegion(String dataRegion) { - this.dataRegion = dataRegion; + public void setDataRegionId(String dataRegionId) { + this.dataRegionId = dataRegionId; } public List getSequenceRecoverTsFileResources() { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java index 715756cd9a9bc..d7a48abd53a8a 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java @@ -591,9 +591,6 @@ public void setStatus(TsFileResourceStatus status) { } break; case UNCLOSED: - // Print a stack trace in a warn statement. - RuntimeException e = new RuntimeException(); - LOGGER.error("Setting the status of a TsFileResource to UNCLOSED", e); this.status = TsFileResourceStatus.UNCLOSED; break; case DELETED: diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java index d8abaa888a808..1716a6316d72c 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java @@ -203,7 +203,6 @@ public void test1() throws IOException, MetadataException, InterruptedException Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); } catch (InterruptedException e) { e.printStackTrace(); } @@ -220,7 +219,6 @@ public void test1() throws IOException, MetadataException, InterruptedException Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -230,7 +228,6 @@ public void test1() throws IOException, MetadataException, InterruptedException } } CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); totalWaitingTime = 0; while (tsFileManager.getTsFileList(false).size() > 0) { @@ -238,7 +235,6 @@ public void test1() throws IOException, MetadataException, InterruptedException Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -338,7 +334,6 @@ public void test2() throws IOException, MetadataException, InterruptedException Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -348,7 +343,6 @@ public void test2() throws IOException, MetadataException, InterruptedException } } CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); totalWaitingTime = 0; while (tsFileManager.getTsFileList(false).size() > 0) { try { @@ -459,7 +453,7 @@ public void test3() throws IOException, MetadataException, InterruptedException Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -475,7 +469,7 @@ public void test3() throws IOException, MetadataException, InterruptedException Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + } catch (InterruptedException e) { e.printStackTrace(); } @@ -569,7 +563,7 @@ public void test4() throws IOException, MetadataException, InterruptedException Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -665,7 +659,7 @@ public void test5() throws IOException, MetadataException, InterruptedException Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -680,7 +674,7 @@ public void test5() throws IOException, MetadataException, InterruptedException Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -696,7 +690,7 @@ public void test5() throws IOException, MetadataException, InterruptedException Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -788,7 +782,7 @@ public void test6() throws IOException, MetadataException, InterruptedException assertEquals(100, tsFileManager.getTsFileList(true).size()); CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + long totalWaitingTime = 0; while (tsFileManager.getTsFileList(false).size() > 1) { assertEquals(100, tsFileManager.getTsFileList(true).size()); @@ -796,7 +790,7 @@ public void test6() throws IOException, MetadataException, InterruptedException Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -806,7 +800,7 @@ public void test6() throws IOException, MetadataException, InterruptedException } } CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + totalWaitingTime = 0; while (tsFileManager.getTsFileList(false).size() > 0) { assertEquals(100, tsFileManager.getTsFileList(true).size()); @@ -814,7 +808,7 @@ public void test6() throws IOException, MetadataException, InterruptedException Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -904,14 +898,14 @@ public void test7() throws IOException, MetadataException, InterruptedException } CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + long totalWaitingTime = 0; while (tsFileManager.getTsFileList(true).size() > 1) { try { Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -921,14 +915,14 @@ public void test7() throws IOException, MetadataException, InterruptedException } } CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + totalWaitingTime = 0; while (tsFileManager.getTsFileList(false).size() > 0) { try { Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -1019,14 +1013,14 @@ public void test8() throws IOException, MetadataException, InterruptedException } CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + long totalWaitingTime = 0; while (tsFileManager.getTsFileList(false).size() > 0) { try { Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -1118,14 +1112,14 @@ public void test9() throws IOException, MetadataException, InterruptedException } CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + long totalWaitingTime = 0; while (tsFileManager.getTsFileList(true).size() > 50) { try { Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -1135,13 +1129,13 @@ public void test9() throws IOException, MetadataException, InterruptedException } } CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + totalWaitingTime = 0; while (tsFileManager.getTsFileList(true).size() > 25) { Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + if (totalWaitingTime > MAX_WAITING_TIME) { fail(); } @@ -1233,14 +1227,14 @@ public void test10() throws IOException, MetadataException, InterruptedException } CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + long totalWaitingTime = 0; while (tsFileManager.getTsFileList(false).size() > 50) { try { Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -1251,14 +1245,14 @@ public void test10() throws IOException, MetadataException, InterruptedException } assertEquals(100, tsFileManager.getTsFileList(true).size()); CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + totalWaitingTime = 0; while (tsFileManager.getTsFileList(false).size() > 25) { try { Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -1352,14 +1346,14 @@ public void test11() throws IOException, MetadataException, InterruptedException } CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + long totalWaitingTime = 0; while (tsFileManager.getTsFileList(true).size() > 50) { try { Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -1369,14 +1363,14 @@ public void test11() throws IOException, MetadataException, InterruptedException } } CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + totalWaitingTime = 0; while (tsFileManager.getTsFileList(true).size() > 25) { try { Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -1468,7 +1462,6 @@ public void test12() throws IOException, MetadataException, InterruptedException } CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); long totalWaitingTime = 0; while (tsFileManager.getTsFileList(false).size() > 98) { @@ -1476,7 +1469,7 @@ public void test12() throws IOException, MetadataException, InterruptedException Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -1487,14 +1480,14 @@ public void test12() throws IOException, MetadataException, InterruptedException } assertEquals(100, tsFileManager.getTsFileList(true).size()); CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + totalWaitingTime = 0; while (tsFileManager.getTsFileList(false).size() > 96) { try { Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -1588,7 +1581,7 @@ public void test14() throws IOException, MetadataException, InterruptedException assertEquals(100, tsFileManager.getTsFileList(true).size()); CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + assertEquals(100, tsFileManager.getTsFileList(true).size()); long totalWaitingTime = 0; while (tsFileManager.getTsFileList(false).size() > 99) { @@ -1597,7 +1590,7 @@ public void test14() throws IOException, MetadataException, InterruptedException totalWaitingTime += 100; assertEquals(100, tsFileManager.getTsFileList(true).size()); CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -1608,7 +1601,7 @@ public void test14() throws IOException, MetadataException, InterruptedException } assertEquals(100, tsFileManager.getTsFileList(true).size()); CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + totalWaitingTime = 0; while (tsFileManager.getTsFileList(false).size() > 98) { try { @@ -1616,7 +1609,7 @@ public void test14() throws IOException, MetadataException, InterruptedException assertEquals(100, tsFileManager.getTsFileList(true).size()); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -1708,14 +1701,14 @@ public void test15() throws IOException, MetadataException, InterruptedException } CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + long totalWaitingTime = 0; while (tsFileManager.getTsFileList(true).size() > 99) { try { Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -1725,14 +1718,14 @@ public void test15() throws IOException, MetadataException, InterruptedException } } CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + totalWaitingTime = 0; while (tsFileManager.getTsFileList(true).size() > 98) { try { Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -1746,7 +1739,7 @@ public void test15() throws IOException, MetadataException, InterruptedException Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -1837,14 +1830,14 @@ public void test16() throws IOException, MetadataException, InterruptedException } CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + long totalWaitingTime = 0; while (tsFileManager.getTsFileList(false).size() > 98) { try { Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; @@ -1855,14 +1848,14 @@ public void test16() throws IOException, MetadataException, InterruptedException } assertEquals(100, tsFileManager.getTsFileList(true).size()); CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + totalWaitingTime = 0; while (tsFileManager.getTsFileList(false).size() > 96) { try { Thread.sleep(100); totalWaitingTime += 100; CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); + if (totalWaitingTime > MAX_WAITING_TIME) { fail(); break; diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskComparatorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskComparatorTest.java index 6bbba1a83c2a8..8b85696e78c48 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskComparatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskComparatorTest.java @@ -301,7 +301,11 @@ public void testSerialId() throws InterruptedException { long cnt = 0; while (compactionTaskQueue.size() > 0) { for (int i = 0; i < 10; ++i) { - taskCount.get(compactionTaskQueue.take().getFullStorageGroupName()).incrementAndGet(); + AbstractCompactionTask task = compactionTaskQueue.take(); + String id = + CompactionTaskManager.getSGWithRegionId( + task.getStorageGroupName(), task.getDataRegionId()); + taskCount.get(id).incrementAndGet(); } cnt++; for (int i = 0; i < 10; ++i) { @@ -331,7 +335,7 @@ public FakedInnerSpaceCompactionTask( } @Override - protected void doCompaction() throws Exception {} + protected void doCompaction() {} @Override public boolean equalsOtherTask(AbstractCompactionTask other) { @@ -365,7 +369,7 @@ public FakeCrossSpaceCompactionTask( } @Override - protected void doCompaction() throws Exception {} + protected void doCompaction() {} @Override public boolean equalsOtherTask(AbstractCompactionTask other) { diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java index 620c40137a339..eda36fca69508 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.compaction.performer.impl.ReadChunkCompactionPerformer; import org.apache.iotdb.db.engine.compaction.performer.impl.ReadPointCompactionPerformer; import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask; +import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.storagegroup.TsFileManager; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.StorageEngineException; @@ -40,11 +41,9 @@ import java.io.File; import java.io.IOException; import java.util.List; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; -import static org.junit.Assert.fail; - public class CompactionTaskManagerTest extends InnerCompactionTest { static final Logger logger = LoggerFactory.getLogger(CompactionTaskManagerTest.class); File tempSGDir; @@ -93,41 +92,34 @@ public void testRepeatedSubmitBeforeExecution() throws Exception { 0); seqResources.get(0).readLock(); CompactionTaskManager manager = CompactionTaskManager.getInstance(); + Future summaryFuture = null; try { for (TsFileResource resource : seqResources) { Assert.assertFalse(resource.isCompactionCandidate()); } Assert.assertTrue(manager.addTaskToWaitingQueue(task1)); + summaryFuture = CompactionTaskManager.getInstance().getCompactionTaskFutureMayBlock(task1); Assert.assertEquals(manager.getTotalTaskCount(), 1); for (TsFileResource resource : seqResources) { - Assert.assertTrue(resource.isCompactionCandidate()); + Assert.assertTrue(resource.isCompacting()); } // a same task should not be submitted compaction task manager Assert.assertFalse(manager.addTaskToWaitingQueue(task2)); Assert.assertEquals(manager.getTotalTaskCount(), 1); for (TsFileResource resource : seqResources) { - Assert.assertTrue(resource.isCompactionCandidate()); + Assert.assertTrue(resource.isCompacting()); } - manager.submitTaskFromTaskQueue(); } finally { seqResources.get(0).readUnlock(); } - Thread.sleep(5000); + if (summaryFuture != null) { + summaryFuture.get(); + } Assert.assertEquals(0, manager.getTotalTaskCount()); for (TsFileResource resource : seqResources) { Assert.assertFalse(resource.isCompactionCandidate()); } - long waitingTime = 0; - while (manager.getRunningCompactionTaskList().size() > 0) { - Thread.sleep(50); - waitingTime += 50; - if (waitingTime % 10000 == 0) { - logger.warn("{}", manager.getRunningCompactionTaskList()); - } - if (waitingTime > MAX_WAITING_TIME) { - fail(); - } - } + manager.waitAllCompactionFinish(); } @Test @@ -155,17 +147,15 @@ public void testRepeatedSubmitWhenExecuting() throws Exception { new AtomicInteger(0), 0); seqResources.get(0).readLock(); + Future summaryFuture = null; try { CompactionTaskManager manager = CompactionTaskManager.getInstance(); for (TsFileResource resource : seqResources) { Assert.assertFalse(resource.isCompactionCandidate()); } manager.addTaskToWaitingQueue(task1); - for (TsFileResource resource : seqResources) { - Assert.assertTrue(resource.isCompactionCandidate()); - } - manager.submitTaskFromTaskQueue(); - Thread.sleep(2000); + + summaryFuture = CompactionTaskManager.getInstance().getCompactionTaskFutureMayBlock(task1); for (TsFileResource resource : seqResources) { Assert.assertFalse(resource.isCompactionCandidate()); } @@ -178,16 +168,8 @@ public void testRepeatedSubmitWhenExecuting() throws Exception { } finally { seqResources.get(0).readUnlock(); } - long waitingTime = 0; - while (CompactionTaskManager.getInstance().getRunningCompactionTaskList().size() > 0) { - Thread.sleep(100); - waitingTime += 100; - if (waitingTime % 10000 == 0) { - logger.warn("{}", CompactionTaskManager.getInstance().getRunningCompactionTaskList()); - } - if (waitingTime > MAX_WAITING_TIME) { - fail(); - } + if (summaryFuture != null) { + summaryFuture.get(); } for (TsFileResource resource : seqResources) { Assert.assertFalse(resource.isCompactionCandidate()); @@ -219,31 +201,18 @@ public void testRepeatedSubmitAfterExecution() throws Exception { new AtomicInteger(0), 0); CompactionTaskManager manager = CompactionTaskManager.getInstance(); - Assert.assertTrue(manager.addTaskToWaitingQueue(task1)); - manager.submitTaskFromTaskQueue(); - while (manager.getTotalTaskCount() > 0) { - Thread.sleep(10); - } seqResources.get(0).readLock(); + Assert.assertTrue(manager.addTaskToWaitingQueue(task1)); + Future future = CompactionTaskManager.getInstance().getCompactionTaskFutureMayBlock(task1); + seqResources.get(0).readUnlock(); + CompactionTaskManager.getInstance().waitAllCompactionFinish(); // an invalid task can be submitted to waiting queue, but should not be submitted to thread pool try { Assert.assertTrue(manager.addTaskToWaitingQueue(task2)); - manager.submitTaskFromTaskQueue(); Assert.assertEquals(manager.getExecutingTaskCount(), 0); - seqResources.get(0).readUnlock(); } finally { - long waitingTime = 0; - while (manager.getRunningCompactionTaskList().size() > 0) { - Thread.sleep(100); - waitingTime += 100; - if (waitingTime % 10000 == 0) { - logger.warn("{}", manager.getRunningCompactionTaskList()); - } - if (waitingTime > MAX_WAITING_TIME) { - fail(); - } - } + CompactionTaskManager.getInstance().waitAllCompactionFinish(); } } @@ -265,10 +234,10 @@ public void testRemoveSelfFromRunningList() throws Exception { CompactionTaskManager manager = CompactionTaskManager.getInstance(); manager.restart(); seqResources.get(0).readLock(); + Future future = null; try { manager.addTaskToWaitingQueue(task1); - manager.submitTaskFromTaskQueue(); - Thread.sleep(5000); + future = CompactionTaskManager.getInstance().getCompactionTaskFutureMayBlock(task1); List runningList = manager.getRunningCompactionTaskList(); // compaction task should add itself to running list Assert.assertEquals(1, runningList.size()); @@ -277,20 +246,11 @@ public void testRemoveSelfFromRunningList() throws Exception { seqResources.get(0).readUnlock(); } // after execution, task should remove itself from running list - Thread.sleep(5000); + future.get(); + Thread.sleep(10); List runningList = manager.getRunningCompactionTaskList(); Assert.assertEquals(0, runningList.size()); - long waitingTime = 0; - while (manager.getRunningCompactionTaskList().size() > 0) { - Thread.sleep(100); - waitingTime += 100; - if (waitingTime % 10000 == 0) { - logger.warn("{}", manager.getRunningCompactionTaskList()); - } - if (waitingTime > MAX_WAITING_TIME) { - fail(); - } - } + manager.waitAllCompactionFinish(); } @Test @@ -307,14 +267,16 @@ public void testSizeTieredCompactionStatus() throws Exception { new ReadChunkCompactionPerformer(seqResources), new AtomicInteger(0), 0); + seqResources.get(0).readLock(); CompactionTaskManager.getInstance().addTaskToWaitingQueue(task); for (TsFileResource resource : seqResources) { - Assert.assertTrue(resource.isCompactionCandidate()); + Assert.assertTrue(resource.isCompactionCandidate() || resource.isCompacting()); } + Future future = CompactionTaskManager.getInstance().getCompactionTaskFutureMayBlock(task); + seqResources.get(0).readUnlock(); + future.get(); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); - Thread.sleep(50); for (TsFileResource resource : seqResources) { Assert.assertFalse(resource.isCompactionCandidate()); } @@ -345,22 +307,17 @@ public void testRewriteCrossCompactionFileStatus() throws Exception { } CompactionTaskManager.getInstance().addTaskToWaitingQueue(task); + seqResources.get(0).readLock(); for (TsFileResource resource : seqResources) { - Assert.assertTrue(resource.isCompactionCandidate()); + Assert.assertTrue(resource.isCompactionCandidate() || resource.isCompacting()); } for (TsFileResource resource : unseqResources) { - Assert.assertTrue(resource.isCompactionCandidate()); + Assert.assertTrue(resource.isCompactionCandidate() || resource.isCompacting()); } - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); - long waitingTime = 0; - while (!task.isTaskFinished()) { - TimeUnit.MILLISECONDS.sleep(200); - waitingTime += 200; - if (waitingTime > 10_000) { - fail(); - } - } + Future future = CompactionTaskManager.getInstance().getCompactionTaskFutureMayBlock(task); + seqResources.get(0).readUnlock(); + future.get(); for (TsFileResource resource : seqResources) { Assert.assertFalse(resource.isCompactionCandidate()); } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java index 8a299ed9debad..c0064917f8eea 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java @@ -22,7 +22,9 @@ import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.compaction.performer.ICompactionPerformer; import org.apache.iotdb.db.engine.compaction.performer.impl.ReadPointCompactionPerformer; +import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.StorageEngineException; @@ -121,7 +123,10 @@ public void testSeqInnerSpaceCompactionWithSameTimeseries() List targetResources = CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG); tsFilesReader = @@ -206,7 +211,10 @@ public void testSeqInnerSpaceCompactionWithDifferentTimeseries() List targetResources = CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG); assertEquals( 0, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0")); @@ -308,7 +316,10 @@ public void testUnSeqInnerSpaceCompactionWithSameTimeseries() List targetResources = CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources, false); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG); for (int i = 0; i < 2; i++) { @@ -413,7 +424,10 @@ public void testUnSeqInnerSpaceCompactionWithDifferentTimeseries() List targetResources = CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources, false); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG); for (int i = 0; i < 9; i++) { @@ -548,7 +562,10 @@ public void testUnSeqInnerSpaceCompactionWithAllDataDeletedInTimeseries() } List targetResources = CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources, false); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG); for (int i = 0; i < 5; i++) { @@ -673,7 +690,10 @@ public void testUnSeqInnerSpaceCompactionWithAllDataDeletedInDevice() } List targetResources = CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources, false); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG); for (int i = 0; i < 5; i++) { @@ -784,7 +804,10 @@ public void testUnSeqInnerSpaceCompactionWithAllDataDeletedInTargetFile() } List targetResources = CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources, false); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG); for (int i = 0; i < 5; i++) { @@ -864,7 +887,10 @@ public void testAlignedSeqInnerSpaceCompactionWithSameTimeseries() } List targetResources = CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG); for (int i = TsFileGeneratorUtils.getAlignDeviceOffset(); @@ -972,7 +998,10 @@ public void testAlignedSeqInnerSpaceCompactionWithDifferentTimeseriesAndEmptyPag } List targetResources = CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG); for (int i = TsFileGeneratorUtils.getAlignDeviceOffset(); @@ -1090,7 +1119,10 @@ public void testAlignedSeqInnerSpaceCompactionWithDifferentTimeseriesAndEmptyChu } List targetResources = CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG); for (int i = TsFileGeneratorUtils.getAlignDeviceOffset(); @@ -1214,7 +1246,10 @@ public void testAlignedUnSeqInnerSpaceCompactionWithEmptyChunkAndEmptyPage() } List targetResources = CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources, false); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG); for (int i = TsFileGeneratorUtils.getAlignDeviceOffset(); @@ -1387,7 +1422,10 @@ public void testAlignedUnSeqInnerSpaceCompactionWithAllDataDeletedInTimeseries() } List targetResources = CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources, false); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG); for (int i = TsFileGeneratorUtils.getAlignDeviceOffset(); @@ -1541,7 +1579,10 @@ public void testAlignedUnSeqInnerSpaceCompactionWithAllDataDeletedInDevice() } List targetResources = CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources, false); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG); for (int i = TsFileGeneratorUtils.getAlignDeviceOffset(); @@ -1647,7 +1688,10 @@ public void testAlignedUnSeqInnerSpaceCompactionWithSameTimeseries() } List targetResources = CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources, false); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG); for (int i = TsFileGeneratorUtils.getAlignDeviceOffset(); @@ -1740,7 +1784,10 @@ public void testCrossSpaceCompactionWithSameTimeseries() List targetResources = CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); tsFilesReader = @@ -1853,7 +1900,10 @@ public void testCrossSpaceCompactionWithDifferentTimeseries() List targetResources = CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); List deviceIdList = new ArrayList<>(); @@ -2047,7 +2097,10 @@ public void testCrossSpaceCompactionWithAllDataDeletedInTimeseries() List targetResources = CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); List deviceIdList = new ArrayList<>(); @@ -2238,7 +2291,10 @@ public void testCrossSpaceCompactionWithAllDataDeletedInDevice() List targetResources = CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); List deviceIdList = new ArrayList<>(); @@ -2419,7 +2475,10 @@ public void testCrossSpaceCompactionWithAllDataDeletedInOneTargetFile() List targetResources = CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); Assert.assertEquals(2, targetResources.size()); @@ -2591,7 +2650,10 @@ public void testCrossSpaceCompactionWithAllDataDeletedInDeviceInSeqFiles() List targetResources = CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); Assert.assertEquals(4, targetResources.size()); @@ -2762,7 +2824,10 @@ public void testAlignedCrossSpaceCompactionWithSameTimeseries() List targetResources = CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); tsFilesReader = @@ -2889,7 +2954,10 @@ public void testAlignedCrossSpaceCompactionWithDifferentTimeseries() List targetResources = CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); for (int i = TsFileGeneratorUtils.getAlignDeviceOffset(); @@ -3089,7 +3157,10 @@ public void testAlignedCrossSpaceCompactionWithAllDataDeletedInTimeseries() List targetResources = CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); Assert.assertEquals(4, targetResources.size()); @@ -3324,7 +3395,10 @@ public void testAlignedCrossSpaceCompactionWithAllDataDeletedInOneTargetFile() List targetResources = CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); for (int i = TsFileGeneratorUtils.getAlignDeviceOffset(); @@ -3534,7 +3608,10 @@ public void testAlignedCrossSpaceCompactionWithFileTimeIndexResource() List targetResources = CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); for (int i = TsFileGeneratorUtils.getAlignDeviceOffset(); @@ -3619,7 +3696,10 @@ public void testCrossSpaceCompactionWithNewDeviceInUnseqFile() { List targetResources = CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); Assert.assertEquals(4, targetResources.size()); @@ -3689,7 +3769,10 @@ public void testCrossSpaceCompactionWithDeviceMaxTimeLaterInUnseqFile() { List targetResources = CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); Assert.assertEquals(2, targetResources.size()); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionTest.java index 5044533d25c8c..53067b3c5b523 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionTest.java @@ -25,7 +25,9 @@ import org.apache.iotdb.db.engine.compaction.CompactionExceptionHandler; import org.apache.iotdb.db.engine.compaction.CompactionUtils; import org.apache.iotdb.db.engine.compaction.log.CompactionLogger; +import org.apache.iotdb.db.engine.compaction.performer.ICompactionPerformer; import org.apache.iotdb.db.engine.compaction.performer.impl.ReadPointCompactionPerformer; +import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer; import org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils; import org.apache.iotdb.db.engine.modification.ModificationFile; @@ -91,7 +93,10 @@ public void testHandleWithAllSourceFilesExisted() throws Exception { compactionLogger.logFiles(targetResources, STR_TARGET_FILES); compactionLogger.logFiles(seqResources, STR_SOURCE_FILES); compactionLogger.logFiles(unseqResources, STR_SOURCE_FILES); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); compactionLogger.close(); CompactionExceptionHandler.handleException( COMPACTION_TEST_SG, @@ -162,7 +167,10 @@ public void testHandleWithAllSourceFilesExistedAndTargetFilesMoved() throws Exce compactionLogger.logFiles(targetResources, STR_TARGET_FILES); compactionLogger.logFiles(seqResources, STR_SOURCE_FILES); compactionLogger.logFiles(unseqResources, STR_SOURCE_FILES); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); compactionLogger.close(); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); CompactionExceptionHandler.handleException( @@ -234,7 +242,10 @@ public void testHandleWithSomeSourceFilesExisted() throws Exception { compactionLogger.logFiles(targetResources, STR_TARGET_FILES); compactionLogger.logFiles(seqResources, STR_SOURCE_FILES); compactionLogger.logFiles(unseqResources, STR_SOURCE_FILES); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); for (TsFileResource resource : seqResources) { tsFileManager.getSequenceListByTimePartition(0).remove(resource); @@ -316,7 +327,10 @@ public void testHandleWithoutAllSourceFilesAndModFilesExist() throws Exception { compactionLogger.logFiles(targetResources, STR_TARGET_FILES); compactionLogger.logFiles(seqResources, STR_SOURCE_FILES); compactionLogger.logFiles(unseqResources, STR_SOURCE_FILES); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); compactionLogger.close(); for (int i = 0; i < seqResources.size(); i++) { @@ -430,7 +444,10 @@ public void testHandleWithAllSourcesFileAndCompactonModFileExist() throws Except compactionLogger.logFiles(targetResources, STR_TARGET_FILES); compactionLogger.logFiles(seqResources, STR_SOURCE_FILES); compactionLogger.logFiles(unseqResources, STR_SOURCE_FILES); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); compactionLogger.close(); for (int i = 0; i < seqResources.size(); i++) { diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java index 1d8b252fbdf12..f16612a34c85b 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java @@ -443,7 +443,7 @@ public void testOneSeqFileAndSixUnseqFile() throws Exception { .createInstance(), new AtomicInteger(0), 0); - compactionTask.call(); + compactionTask.start(); List targetTsfileResourceList = new ArrayList<>(); for (TsFileResource seqResource : seqResources) { TsFileResource targetResource = @@ -748,7 +748,7 @@ public void testFiveSeqFileAndOneUnseqFileWithSomeDeviceNotInSeqFiles() throws E .createInstance(), new AtomicInteger(0), 0); - compactionTask.call(); + compactionTask.start(); List targetTsfileResourceList = new ArrayList<>(); for (TsFileResource seqResource : seqResources.subList(1, 4)) { TsFileResource targetResource = @@ -1052,7 +1052,7 @@ public void testFiveSeqFileAndOneUnseqFile() throws Exception { .createInstance(), new AtomicInteger(0), 0); - compactionTask.call(); + compactionTask.start(); List targetTsfileResourceList = new ArrayList<>(); for (TsFileResource seqResource : seqResources.subList(1, 4)) { TsFileResource targetResource = diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionRecoverTest.java index d229dac9e9705..77731b19bd2ec 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionRecoverTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionRecoverTest.java @@ -24,8 +24,10 @@ import org.apache.iotdb.db.engine.compaction.AbstractCompactionTest; import org.apache.iotdb.db.engine.compaction.CompactionUtils; import org.apache.iotdb.db.engine.compaction.log.CompactionLogger; +import org.apache.iotdb.db.engine.compaction.performer.ICompactionPerformer; import org.apache.iotdb.db.engine.compaction.performer.impl.ReadPointCompactionPerformer; import org.apache.iotdb.db.engine.compaction.task.CompactionRecoverTask; +import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils; import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.storagegroup.TsFileManager; @@ -88,7 +90,10 @@ public void testRecoverWithAllSourceFilesExisted() throws Exception { compactionLogger.logFiles(targetResources, STR_TARGET_FILES); compactionLogger.logFiles(seqResources, STR_SOURCE_FILES); compactionLogger.logFiles(unseqResources, STR_SOURCE_FILES); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); compactionLogger.close(); new CompactionRecoverTask(COMPACTION_TEST_SG, "0", tsFileManager, compactionLogFile, false) .doCompaction(); @@ -148,7 +153,10 @@ public void testRecoverWithAllSourceFilesExistedAndSomeTargetFilesNotExist() thr compactionLogger.logFiles(targetResources, STR_TARGET_FILES); compactionLogger.logFiles(seqResources, STR_SOURCE_FILES); compactionLogger.logFiles(unseqResources, STR_SOURCE_FILES); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); // Target files may not exist for (int i = 0; i < targetResources.size(); i++) { if (i < 2) { @@ -216,7 +224,10 @@ public void testRecoverWithAllSourceFilesExistedAndTargetFilesMoved() throws Exc compactionLogger.logFiles(targetResources, STR_TARGET_FILES); compactionLogger.logFiles(seqResources, STR_SOURCE_FILES); compactionLogger.logFiles(unseqResources, STR_SOURCE_FILES); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); compactionLogger.close(); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); new CompactionRecoverTask(COMPACTION_TEST_SG, "0", tsFileManager, compactionLogFile, false) @@ -277,7 +288,10 @@ public void testRecoverWithSomeSourceFilesExisted() throws Exception { compactionLogger.logFiles(targetResources, STR_TARGET_FILES); compactionLogger.logFiles(seqResources, STR_SOURCE_FILES); compactionLogger.logFiles(unseqResources, STR_SOURCE_FILES); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); seqResources.get(0).getTsFile().delete(); compactionLogger.close(); @@ -339,7 +353,10 @@ public void testRecoverWithoutAllSourceFilesAndModFilesExist() throws Exception compactionLogger.logFiles(targetResources, STR_TARGET_FILES); compactionLogger.logFiles(seqResources, STR_SOURCE_FILES); compactionLogger.logFiles(unseqResources, STR_SOURCE_FILES); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); compactionLogger.close(); for (int i = 0; i < seqResources.size(); i++) { @@ -434,7 +451,10 @@ public void testRecoverWithAllSourcesFileAndCompactonModFileExist() throws Excep compactionLogger.logFiles(targetResources, STR_TARGET_FILES); compactionLogger.logFiles(seqResources, STR_SOURCE_FILES); compactionLogger.logFiles(unseqResources, STR_SOURCE_FILES); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); compactionLogger.close(); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); for (int i = 0; i < seqResources.size(); i++) { @@ -541,7 +561,10 @@ public void testRecoverWithAllSourcesFileAndCompactonModFileExistAndSomeTargetFi compactionLogger.logFiles(targetResources, STR_TARGET_FILES); compactionLogger.logFiles(seqResources, STR_SOURCE_FILES); compactionLogger.logFiles(unseqResources, STR_SOURCE_FILES); - new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources).perform(); + ICompactionPerformer performer = + new ReadPointCompactionPerformer(seqResources, unseqResources, targetResources); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); // Target files may not exist for (int i = 0; i < targetResources.size(); i++) { if (i < 2) { diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java index f4f4527daf782..fa3181d854c9e 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java @@ -228,7 +228,7 @@ public void testAlignedCrossSpaceCompactionWithAllDataDeletedInTimeseries() thro new ReadPointCompactionPerformer(), new AtomicInteger(0), 0); - task.call(); + task.start(); for (TsFileResource resource : seqResources) { resource.resetModFile(); @@ -465,7 +465,7 @@ public void testAlignedCrossSpaceCompactionWithAllDataDeletedInOneTargetFile() t new ReadPointCompactionPerformer(), new AtomicInteger(0), 0); - task.call(); + task.start(); for (TsFileResource resource : seqResources) { Assert.assertFalse(resource.getModFile().exists()); @@ -646,7 +646,7 @@ public void testOneDeletionDuringCompaction() throws Exception { Assert.assertTrue(resource.getModFile().exists()); Assert.assertEquals(2, resource.getModFile().getModifications().size()); } - task.call(); + task.start(); for (TsFileResource resource : seqResources) { Assert.assertFalse(resource.getTsFile().exists()); Assert.assertFalse(resource.getModFile().exists()); @@ -768,7 +768,7 @@ public void testSeveralDeletionsDuringCompaction() throws Exception { Assert.assertTrue(resource.getModFile().exists()); Assert.assertEquals(3, resource.getModFile().getModifications().size()); } - task.call(); + task.start(); for (TsFileResource resource : seqResources) { Assert.assertFalse(resource.getTsFile().exists()); Assert.assertFalse(resource.getModFile().exists()); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java index 407c57f5afb5e..e93c0ff687837 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java @@ -84,7 +84,11 @@ public void testCompactWithPartialEmptyUnseqFiles() throws Exception { new ReadPointCompactionPerformer(), new AtomicInteger(0), 0); - Future future = CompactionTaskManager.getInstance().submitTask(task); + unseqResources.get(0).readLock(); + CompactionTaskManager.getInstance().addTaskToWaitingQueue(task); + Future future = + CompactionTaskManager.getInstance().getCompactionTaskFutureMayBlock(task); + unseqResources.get(0).readUnlock(); Assert.assertTrue(future.get().isSuccess()); } } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionLogTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionLogTest.java index bb6a7b7b0af0d..53d99520d5be1 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionLogTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionLogTest.java @@ -67,7 +67,6 @@ public void testCompactionLog() { tsFileManager.addAll(seqResources, true); tsFileManager.addAll(unseqResources, false); CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); try { Thread.sleep(1000); } catch (Exception e) { diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionMoreDataTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionMoreDataTest.java index 6624d8bd83dc5..7b35d35aa2f09 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionMoreDataTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionMoreDataTest.java @@ -207,15 +207,12 @@ public void testSensorWithTwoOrThreeNode() throws MetadataException, IOException tsFileManager.addAll(seqResources, true); tsFileManager.addAll(unseqResources, false); CompactionScheduler.scheduleCompaction(tsFileManager, 0); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); try { Thread.sleep(500); } catch (Exception e) { } - while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) { - // wait - } + CompactionTaskManager.getInstance().waitAllCompactionFinish(); QueryContext context = new QueryContext(); MeasurementPath path = SchemaTestUtils.getMeasurementPath( diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java index b3c103ee64e22..c68db5edcdccd 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java @@ -94,7 +94,6 @@ public void testFileSelector1() tsFileManager.addAll(seqResources, true); CompactionScheduler.tryToSubmitInnerSpaceCompactionTask("testSG", "0", 0L, tsFileManager, true); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); try { Thread.sleep(5000); } catch (Exception e) { @@ -116,7 +115,6 @@ public void testFileSelector2() TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp"); tsFileManager.addAll(seqResources, true); CompactionScheduler.tryToSubmitInnerSpaceCompactionTask("testSG", "0", 0L, tsFileManager, true); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); long waitingTime = 0; while (CompactionTaskManager.getInstance().getExecutingTaskCount() != 0) { @@ -146,8 +144,6 @@ public void testFileSelectorWithUnclosedFile() TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp"); tsFileManager.addAll(seqResources, true); CompactionScheduler.tryToSubmitInnerSpaceCompactionTask("testSG", "0", 0L, tsFileManager, true); - CompactionTaskManager.getInstance().submitTaskFromTaskQueue(); - long waitingTime = 0; while (CompactionTaskManager.getInstance().getExecutingTaskCount() != 0) { try { diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java index 3e0c81d9441ee..8c52951785383 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java @@ -26,7 +26,9 @@ import org.apache.iotdb.db.engine.cache.ChunkCache; import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache; import org.apache.iotdb.db.engine.compaction.CompactionUtils; +import org.apache.iotdb.db.engine.compaction.performer.ICompactionPerformer; import org.apache.iotdb.db.engine.compaction.performer.impl.ReadChunkCompactionPerformer; +import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.utils.CompactionCheckerUtils; import org.apache.iotdb.db.engine.compaction.utils.CompactionClearUtils; import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer; @@ -229,7 +231,10 @@ public void testDeserializePage() throws MetadataException, IOException, WritePr timeValuePair.getTimestamp() >= 250L && timeValuePair.getTimestamp() <= 300L); } - new ReadChunkCompactionPerformer(sourceResources, targetTsFileResource).perform(); + ICompactionPerformer performer = + new ReadChunkCompactionPerformer(sourceResources, targetTsFileResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetTsFileResource), true, COMPACTION_TEST_SG); CompactionUtils.combineModsInInnerCompaction(sourceResources, targetTsFileResource); @@ -456,7 +461,10 @@ public void testAppendPage() timeValuePair -> timeValuePair.getTimestamp() >= 250L && timeValuePair.getTimestamp() <= 300L); } - new ReadChunkCompactionPerformer(toMergeResources, targetTsFileResource).perform(); + ICompactionPerformer performer = + new ReadChunkCompactionPerformer(toMergeResources, targetTsFileResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetTsFileResource), true, COMPACTION_TEST_SG); CompactionUtils.combineModsInInnerCompaction(toMergeResources, targetTsFileResource); @@ -733,7 +741,10 @@ public void testAppendChunk() timeValuePair.getTimestamp() >= 250L && timeValuePair.getTimestamp() <= 300L); } - new ReadChunkCompactionPerformer(toMergeResources, targetTsFileResource).perform(); + ICompactionPerformer performer = + new ReadChunkCompactionPerformer(toMergeResources, targetTsFileResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetTsFileResource), true, COMPACTION_TEST_SG); CompactionUtils.combineModsInInnerCompaction(toMergeResources, targetTsFileResource); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionTest.java index 1b4ad64e6cc11..decd692c81ea2 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionTest.java @@ -22,7 +22,9 @@ import org.apache.iotdb.db.engine.compaction.CompactionExceptionHandler; import org.apache.iotdb.db.engine.compaction.CompactionUtils; import org.apache.iotdb.db.engine.compaction.log.CompactionLogger; +import org.apache.iotdb.db.engine.compaction.performer.ICompactionPerformer; import org.apache.iotdb.db.engine.compaction.performer.impl.ReadChunkCompactionPerformer; +import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.modification.ModificationFile; @@ -64,7 +66,9 @@ public void testWhenAllSourceExistsAndTargetNotComplete() throws Exception { compactionLogger.logFiles(seqResources, CompactionLogger.STR_SOURCE_FILES); compactionLogger.logFiles( Collections.singletonList(targetResource), CompactionLogger.STR_TARGET_FILES); - new ReadChunkCompactionPerformer(seqResources, targetResource).perform(); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(seqResources, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetResource), true, COMPACTION_TEST_SG); try (FileOutputStream os = new FileOutputStream(targetResource.getTsFile(), true); @@ -113,7 +117,9 @@ public void testWhenAllSourceExistsAndTargetComplete() throws Exception { compactionLogger.logFiles(seqResources, CompactionLogger.STR_SOURCE_FILES); compactionLogger.logFiles( Collections.singletonList(targetResource), CompactionLogger.STR_TARGET_FILES); - new ReadChunkCompactionPerformer(seqResources, targetResource).perform(); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(seqResources, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetResource), true, COMPACTION_TEST_SG); compactionLogger.close(); @@ -158,7 +164,9 @@ public void testWhenSomeSourceLostAndTargetComplete() throws Exception { compactionLogger.logFiles(seqResources, CompactionLogger.STR_SOURCE_FILES); compactionLogger.logFiles( Collections.singletonList(targetResource), CompactionLogger.STR_TARGET_FILES); - new ReadChunkCompactionPerformer(seqResources, targetResource).perform(); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(seqResources, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetResource), true, COMPACTION_TEST_SG); for (TsFileResource resource : seqResources) { @@ -214,7 +222,9 @@ public void testWhenSomeSourceLostAndTargetNotComplete() throws Exception { compactionLogger.logFiles(seqResources, CompactionLogger.STR_SOURCE_FILES); compactionLogger.logFiles( Collections.singletonList(targetResource), CompactionLogger.STR_TARGET_FILES); - new ReadChunkCompactionPerformer(seqResources, targetResource).perform(); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(seqResources, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetResource), true, COMPACTION_TEST_SG); seqResources.get(0).remove(); @@ -268,7 +278,9 @@ public void testHandleWithCompactionMods() throws Exception { compactionLogger.logFiles(seqResources, CompactionLogger.STR_SOURCE_FILES); compactionLogger.logFiles( Collections.singletonList(targetResource), CompactionLogger.STR_TARGET_FILES); - new ReadChunkCompactionPerformer(seqResources, targetResource).perform(); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(seqResources, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetResource), true, COMPACTION_TEST_SG); for (int i = 0; i < seqResources.size(); i++) { @@ -343,7 +355,9 @@ public void testHandleWithNormalMods() throws Exception { new Pair<>(i * ptNum, i * ptNum + 10)); CompactionFileGeneratorUtils.generateMods(deleteMap, seqResources.get(i), false); } - new ReadChunkCompactionPerformer(seqResources, targetResource).perform(); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(seqResources, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetResource), true, COMPACTION_TEST_SG); seqResources.get(0).remove(); @@ -402,7 +416,9 @@ public void testHandleWithCompactionModsAndNormalMods() throws Exception { new Pair<>(i * ptNum, i * ptNum + 5)); CompactionFileGeneratorUtils.generateMods(deleteMap, seqResources.get(i), false); } - new ReadChunkCompactionPerformer(seqResources, targetResource).perform(); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(seqResources, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetResource), true, COMPACTION_TEST_SG); for (int i = 0; i < seqResources.size(); i++) { diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java index e0419f721b6e8..588b1af97e056 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java @@ -24,7 +24,9 @@ import org.apache.iotdb.db.engine.cache.ChunkCache; import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache; import org.apache.iotdb.db.engine.compaction.CompactionUtils; +import org.apache.iotdb.db.engine.compaction.performer.ICompactionPerformer; import org.apache.iotdb.db.engine.compaction.performer.impl.ReadPointCompactionPerformer; +import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.utils.CompactionCheckerUtils; import org.apache.iotdb.db.engine.compaction.utils.CompactionClearUtils; import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer; @@ -368,11 +370,13 @@ public void test() timeValuePair.getTimestamp() >= 250L && timeValuePair.getTimestamp() <= 300L); } - new ReadPointCompactionPerformer( + ICompactionPerformer performer = + new ReadPointCompactionPerformer( Collections.emptyList(), toMergeResources, - Collections.singletonList(targetTsFileResource)) - .perform(); + Collections.singletonList(targetTsFileResource)); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetTsFileResource), true, COMPACTION_TEST_SG); CompactionUtils.combineModsInInnerCompaction(toMergeResources, targetTsFileResource); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerAlignedTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerAlignedTest.java index 6e2f0578f8dff..f786cc00e1dbb 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerAlignedTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerAlignedTest.java @@ -24,7 +24,9 @@ import org.apache.iotdb.db.engine.cache.ChunkCache; import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache; import org.apache.iotdb.db.engine.compaction.TestUtilsForAlignedSeries; +import org.apache.iotdb.db.engine.compaction.performer.ICompactionPerformer; import org.apache.iotdb.db.engine.compaction.performer.impl.ReadChunkCompactionPerformer; +import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.utils.CompactionCheckerUtils; import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer; import org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils; @@ -133,7 +135,9 @@ public void testSimpleAlignedTsFileCompaction() throws Exception { Map> originData = CompactionCheckerUtils.getDataByQuery( fullPaths, iMeasurementSchemas, resources, new ArrayList<>()); - new ReadChunkCompactionPerformer(resources, targetResource).perform(); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(resources, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); Map> compactedData = CompactionCheckerUtils.getDataByQuery( fullPaths, @@ -201,7 +205,9 @@ public void testAlignedTsFileWithModificationCompaction() throws Exception { Map> originData = CompactionCheckerUtils.getDataByQuery( fullPaths, iMeasurementSchemas, resources, new ArrayList<>()); - new ReadChunkCompactionPerformer(resources, targetResource).perform(); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(resources, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); Map> compactedData = CompactionCheckerUtils.getDataByQuery( fullPaths, @@ -263,7 +269,9 @@ public void testAlignedTsFileWithNullValueCompaction() throws Exception { Map> originData = CompactionCheckerUtils.getDataByQuery( fullPaths, iMeasurementSchemas, resources, new ArrayList<>()); - new ReadChunkCompactionPerformer(resources, targetResource).perform(); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(resources, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); Map> compactedData = CompactionCheckerUtils.getDataByQuery( fullPaths, @@ -328,7 +336,9 @@ public void testAlignedTsFileWithDifferentSchemaInDifferentTsFileCompaction() th Map> originData = CompactionCheckerUtils.getDataByQuery( fullPaths, iMeasurementSchemas, resources, new ArrayList<>()); - new ReadChunkCompactionPerformer(resources, targetResource).perform(); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(resources, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); Map> compactedData = CompactionCheckerUtils.getDataByQuery( fullPaths, @@ -391,7 +401,9 @@ public void testAlignedTsFileWithDifferentDataTypeCompaction() throws Exception Map> originData = CompactionCheckerUtils.getDataByQuery( fullPaths, iMeasurementSchemas, resources, new ArrayList<>()); - new ReadChunkCompactionPerformer(resources, targetResource).perform(); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(resources, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); Map> compactedData = CompactionCheckerUtils.getDataByQuery( fullPaths, @@ -456,7 +468,9 @@ public void testAlignedTsFileWithDifferentDataTypeInDifferentTsFileCompaction() Map> originData = CompactionCheckerUtils.getDataByQuery( fullPaths, iMeasurementSchemas, resources, new ArrayList<>()); - new ReadChunkCompactionPerformer(resources, targetResource).perform(); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(resources, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); Map> compactedData = CompactionCheckerUtils.getDataByQuery( fullPaths, @@ -522,7 +536,9 @@ public void testAlignedTsFileWithBadSchemaCompaction() throws Exception { Map> originData = CompactionCheckerUtils.getDataByQuery( fullPaths, iMeasurementSchemas, resources, new ArrayList<>()); - new ReadChunkCompactionPerformer(resources, targetResource).perform(); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(resources, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); Map> compactedData = CompactionCheckerUtils.getDataByQuery( fullPaths, diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerNoAlignedTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerNoAlignedTest.java index fe9f9ef987174..5fbbb6d401233 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerNoAlignedTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerNoAlignedTest.java @@ -25,7 +25,9 @@ import org.apache.iotdb.db.constant.TestConstant; import org.apache.iotdb.db.engine.cache.ChunkCache; import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache; +import org.apache.iotdb.db.engine.compaction.performer.ICompactionPerformer; import org.apache.iotdb.db.engine.compaction.performer.impl.ReadChunkCompactionPerformer; +import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.utils.CompactionCheckerUtils; import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer; import org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils; @@ -201,7 +203,10 @@ public void testDirectlyFlushChunk() throws Exception { tsFileName.getVersion(), tsFileName.getInnerCompactionCnt() + 1, tsFileName.getCrossCompactionCnt()))); - new ReadChunkCompactionPerformer(sourceFiles, targetResource).perform(); + ICompactionPerformer performer = + new ReadChunkCompactionPerformer(sourceFiles, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); Map>> chunkPagePointsNumMerged = new HashMap<>(); long[] points = new long[fileNum]; for (int i = 1; i <= fileNum; i++) { @@ -291,7 +296,10 @@ public void testLargeChunkMergeWithCacheChunkAndFlush() throws Exception { tsFileName.getVersion(), tsFileName.getInnerCompactionCnt() + 1, tsFileName.getCrossCompactionCnt()))); - new ReadChunkCompactionPerformer(sourceFiles, targetResource).perform(); + ICompactionPerformer performer = + new ReadChunkCompactionPerformer(sourceFiles, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); Map>> chunkPagePointsNumMerged = new HashMap<>(); // outer list is a chunk, inner list is point num in each page for (String path : fullPathSet) { @@ -383,7 +391,10 @@ public void testLargeChunkDeserializeIntoPoint() throws Exception { tsFileName.getVersion(), tsFileName.getInnerCompactionCnt() + 1, tsFileName.getCrossCompactionCnt()))); - new ReadChunkCompactionPerformer(sourceFiles, targetResource).perform(); + ICompactionPerformer performer = + new ReadChunkCompactionPerformer(sourceFiles, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); Map>> chunkPagePointsNumMerged = new HashMap<>(); // outer list is a chunk, inner list is point num in each page for (String path : fullPathSet) { @@ -462,7 +473,10 @@ public void testMergeChunk() throws Exception { tsFileName.getVersion(), tsFileName.getInnerCompactionCnt() + 1, tsFileName.getCrossCompactionCnt()))); - new ReadChunkCompactionPerformer(sourceFiles, targetResource).perform(); + ICompactionPerformer performer = + new ReadChunkCompactionPerformer(sourceFiles, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); Map>> chunkPagePointsNumMerged = new HashMap<>(); // outer list is a chunk, inner list is point num in each page List> chunkPointsArray = new ArrayList<>(); @@ -570,7 +584,10 @@ public void testMiddleChunkDeserialize() throws Exception { tsFileName.getVersion(), tsFileName.getInnerCompactionCnt() + 1, tsFileName.getCrossCompactionCnt()))); - new ReadChunkCompactionPerformer(sourceFiles, targetResource).perform(); + ICompactionPerformer performer = + new ReadChunkCompactionPerformer(sourceFiles, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); Map>> chunkPagePointsNumMerged = new HashMap<>(); // outer list is a chunk, inner list is point num in each page for (String path : fullPathSet) { @@ -649,7 +666,10 @@ public void testDeserializePage() throws Exception { tsFileName.getVersion(), tsFileName.getInnerCompactionCnt() + 1, tsFileName.getCrossCompactionCnt()))); - new ReadChunkCompactionPerformer(sourceFiles, targetResource).perform(); + ICompactionPerformer performer = + new ReadChunkCompactionPerformer(sourceFiles, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); Map>> chunkPagePointsNumMerged = new HashMap<>(); // outer list is a chunk, inner list is point num in each page List> chunkPointsArray = new ArrayList<>(); @@ -731,7 +751,10 @@ public void testDeserializeCachedChunk() throws Exception { tsFileName.getVersion(), tsFileName.getInnerCompactionCnt() + 1, tsFileName.getCrossCompactionCnt()))); - new ReadChunkCompactionPerformer(sourceFiles, targetResource).perform(); + ICompactionPerformer performer = + new ReadChunkCompactionPerformer(sourceFiles, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); Map>> chunkPagePointsNumMerged = new HashMap<>(); // outer list is a chunk, inner list is point num in each page List> chunkPointsArray = new ArrayList<>(); @@ -807,7 +830,10 @@ public void testMixCompact1() throws Exception { tsFileName.getVersion(), tsFileName.getInnerCompactionCnt() + 1, tsFileName.getCrossCompactionCnt()))); - new ReadChunkCompactionPerformer(sourceFiles, targetResource).perform(); + ICompactionPerformer performer = + new ReadChunkCompactionPerformer(sourceFiles, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); Map>> chunkPagePointsNumMerged = new HashMap<>(); // outer list is a chunk, inner list is point num in each page for (String path : fullPathSet) { @@ -885,7 +911,10 @@ public void testMixCompact2() throws Exception { tsFileName.getVersion(), tsFileName.getInnerCompactionCnt() + 1, tsFileName.getCrossCompactionCnt()))); - new ReadChunkCompactionPerformer(sourceFiles, targetResource).perform(); + ICompactionPerformer performer = + new ReadChunkCompactionPerformer(sourceFiles, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); Map>> chunkPagePointsNumMerged = new HashMap<>(); // outer list is a chunk, inner list is point num in each page for (String path : fullPathSet) { diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerOldTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerOldTest.java index 6ab9ef3b0c76d..59b0ab3d3d6f8 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerOldTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerOldTest.java @@ -24,7 +24,9 @@ import org.apache.iotdb.db.constant.TestConstant; import org.apache.iotdb.db.engine.compaction.CompactionUtils; import org.apache.iotdb.db.engine.compaction.log.CompactionLogger; +import org.apache.iotdb.db.engine.compaction.performer.ICompactionPerformer; import org.apache.iotdb.db.engine.compaction.performer.impl.ReadChunkCompactionPerformer; +import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.tsfile.exception.write.WriteProcessException; @@ -109,7 +111,10 @@ public void testCompact() new CompactionLogger( new File(targetTsFileResource.getTsFilePath().concat(".compaction.log"))); sizeTieredCompactionLogger.logFiles(seqResources, CompactionLogger.STR_SOURCE_FILES); - new ReadChunkCompactionPerformer(seqResources, targetTsFileResource).perform(); + ICompactionPerformer performer = + new ReadChunkCompactionPerformer(seqResources, targetTsFileResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetTsFileResource), true, COMPACTION_TEST_SG); sizeTieredCompactionLogger.close(); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java index dbd4f72cab7e5..68d12a73dbeb1 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java @@ -27,8 +27,10 @@ import org.apache.iotdb.db.engine.compaction.CompactionUtils; import org.apache.iotdb.db.engine.compaction.inner.AbstractInnerSpaceCompactionTest; import org.apache.iotdb.db.engine.compaction.log.CompactionLogger; +import org.apache.iotdb.db.engine.compaction.performer.ICompactionPerformer; import org.apache.iotdb.db.engine.compaction.performer.impl.ReadChunkCompactionPerformer; import org.apache.iotdb.db.engine.compaction.task.CompactionRecoverTask; +import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer; import org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils; import org.apache.iotdb.db.engine.storagegroup.TsFileManager; @@ -144,9 +146,11 @@ public void testCompactionRecoverWithUncompletedTargetFileAndLog() throws Except compactionLogger.logFiles(tmpSeqResources, STR_SOURCE_FILES); compactionLogger.logFiles(Collections.singletonList(targetTsFileResource), STR_TARGET_FILES); deleteFileIfExists(targetTsFileResource.getTsFile()); - new ReadChunkCompactionPerformer( - new ArrayList<>(seqResources.subList(0, 3)), targetTsFileResource) - .perform(); + ICompactionPerformer performer = + new ReadChunkCompactionPerformer( + new ArrayList<>(seqResources.subList(0, 3)), targetTsFileResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); compactionLogger.close(); CompactionUtils.moveTargetFile( Collections.singletonList(targetTsFileResource), true, COMPACTION_TEST_SG); @@ -266,9 +270,11 @@ public void testRecoverWithAllSourceFilesExisted() throws Exception { compactionLogger.logFiles(tmpSeqResources, STR_SOURCE_FILES); compactionLogger.logFiles(Collections.singletonList(targetTsFileResource), STR_TARGET_FILES); deleteFileIfExists(targetTsFileResource.getTsFile()); - new ReadChunkCompactionPerformer( - new ArrayList<>(seqResources.subList(0, 3)), targetTsFileResource) - .perform(); + ICompactionPerformer performer = + new ReadChunkCompactionPerformer( + new ArrayList<>(seqResources.subList(0, 3)), targetTsFileResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); compactionLogger.close(); new CompactionRecoverTask(COMPACTION_TEST_SG, "0", tsFileManager, compactionLogFile, true) .doCompaction(); @@ -377,9 +383,11 @@ public void testRecoverWithAllSourceFilesExistedAndTargetFileNotExist() throws E compactionLogger.logFiles(tmpSeqResources, STR_SOURCE_FILES); compactionLogger.logFiles(Collections.singletonList(targetTsFileResource), STR_TARGET_FILES); deleteFileIfExists(targetTsFileResource.getTsFile()); - new ReadChunkCompactionPerformer( - new ArrayList<>(seqResources.subList(0, 3)), targetTsFileResource) - .perform(); + ICompactionPerformer performer = + new ReadChunkCompactionPerformer( + new ArrayList<>(seqResources.subList(0, 3)), targetTsFileResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); // target file may not exist targetTsFileResource.remove(); compactionLogger.close(); @@ -490,9 +498,11 @@ public void testRecoverWithoutAllSourceFilesExisted() throws Exception { compactionLogger.logFiles(tmpSeqResources, STR_SOURCE_FILES); compactionLogger.logFiles(Collections.singletonList(targetTsFileResource), STR_TARGET_FILES); deleteFileIfExists(targetTsFileResource.getTsFile()); - new ReadChunkCompactionPerformer( - new ArrayList<>(seqResources.subList(0, 3)), targetTsFileResource) - .perform(); + ICompactionPerformer performer = + new ReadChunkCompactionPerformer( + new ArrayList<>(seqResources.subList(0, 3)), targetTsFileResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetTsFileResource), true, COMPACTION_TEST_SG); // delete one source file @@ -565,7 +575,9 @@ public void testRecoverWithAllSourcesFileAndCompactonModFileExist() throws Excep CompactionLogger compactionLogger = new CompactionLogger(logFile); compactionLogger.logFiles(seqResources, STR_SOURCE_FILES); compactionLogger.logFiles(Collections.singletonList(targetResource), STR_TARGET_FILES); - new ReadChunkCompactionPerformer(seqResources, targetResource).perform(); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(seqResources, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetResource), true, COMPACTION_TEST_SG); for (int i = 0; i < seqResources.size(); i++) { @@ -634,7 +646,9 @@ public void testRecoverWithAllSourcesFileAndCompactonModFileExistAndTargetFileNo CompactionLogger compactionLogger = new CompactionLogger(logFile); compactionLogger.logFiles(seqResources, STR_SOURCE_FILES); compactionLogger.logFiles(Collections.singletonList(targetResource), STR_TARGET_FILES); - new ReadChunkCompactionPerformer(seqResources, targetResource).perform(); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(seqResources, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); // target file may not exist targetResource.remove(); for (int i = 0; i < seqResources.size(); i++) { @@ -705,7 +719,9 @@ public void testRecoverWithoutAllSourceFilesExistAndModFiles() throws Exception CompactionLogger compactionLogger = new CompactionLogger(logFile); compactionLogger.logFiles(seqResources, STR_SOURCE_FILES); compactionLogger.logFiles(Collections.singletonList(targetResource), STR_TARGET_FILES); - new ReadChunkCompactionPerformer(seqResources, targetResource).perform(); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(seqResources, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetResource), true, COMPACTION_TEST_SG); for (int i = 0; i < seqResources.size(); i++) { @@ -817,9 +833,11 @@ public void testRecoverCompleteTargetFileAndCompactionLog() throws Exception { compactionLogger.logFiles(tmpSeqResources, STR_SOURCE_FILES); compactionLogger.logFiles(Collections.singletonList(targetTsFileResource), STR_TARGET_FILES); deleteFileIfExists(targetTsFileResource.getTsFile()); - new ReadChunkCompactionPerformer( - new ArrayList<>(seqResources.subList(0, 3)), targetTsFileResource) - .perform(); + ICompactionPerformer performer = + new ReadChunkCompactionPerformer( + new ArrayList<>(seqResources.subList(0, 3)), targetTsFileResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); compactionLogger.close(); CompactionUtils.moveTargetFile( Collections.singletonList(targetTsFileResource), true, COMPACTION_TEST_SG); @@ -916,9 +934,11 @@ public void testCompactionRecoverWithCompletedTargetFileAndLog() throws Exceptio compactionLogger.logFiles(tmpSeqResources, STR_SOURCE_FILES); compactionLogger.logFiles(Collections.singletonList(targetTsFileResource), STR_TARGET_FILES); deleteFileIfExists(targetTsFileResource.getTsFile()); - new ReadChunkCompactionPerformer( - new ArrayList<>(seqResources.subList(0, 3)), targetTsFileResource) - .perform(); + ICompactionPerformer performer = + new ReadChunkCompactionPerformer( + new ArrayList<>(seqResources.subList(0, 3)), targetTsFileResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetTsFileResource), true, COMPACTION_TEST_SG); compactionLogger.close(); @@ -1018,9 +1038,11 @@ public void testCompactionRecoverWithCompletedTargetFile() throws Exception { compactionLogger.logFiles(tmpSeqResources, STR_SOURCE_FILES); compactionLogger.logFiles(Collections.singletonList(targetTsFileResource), STR_TARGET_FILES); deleteFileIfExists(targetTsFileResource.getTsFile()); - new ReadChunkCompactionPerformer( - new ArrayList<>(seqResources.subList(0, 3)), targetTsFileResource) - .perform(); + ICompactionPerformer performer = + new ReadChunkCompactionPerformer( + new ArrayList<>(seqResources.subList(0, 3)), targetTsFileResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); compactionLogger.close(); CompactionUtils.moveTargetFile( Collections.singletonList(targetTsFileResource), true, COMPACTION_TEST_SG); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/recover/SizeTieredCompactionRecoverCompatibleTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/recover/SizeTieredCompactionRecoverCompatibleTest.java index 692c51b7027b1..e4a438dd5ca2d 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/recover/SizeTieredCompactionRecoverCompatibleTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/recover/SizeTieredCompactionRecoverCompatibleTest.java @@ -23,8 +23,10 @@ import org.apache.iotdb.db.engine.compaction.AbstractCompactionTest; import org.apache.iotdb.db.engine.compaction.CompactionUtils; import org.apache.iotdb.db.engine.compaction.log.CompactionLogger; +import org.apache.iotdb.db.engine.compaction.performer.ICompactionPerformer; import org.apache.iotdb.db.engine.compaction.performer.impl.ReadChunkCompactionPerformer; import org.apache.iotdb.db.engine.compaction.task.CompactionRecoverTask; +import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils; import org.apache.iotdb.db.engine.storagegroup.TsFileManager; import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator; @@ -69,7 +71,9 @@ public void testCompatibleWithAllSourceFilesExistWithFilePath() throws Exception registerTimeseriesInMManger(2, 3, false); TsFileResource targetResource = TsFileNameGenerator.getInnerCompactionTargetFileResource(seqResources, true); - new ReadChunkCompactionPerformer(seqResources, targetResource).perform(); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(seqResources, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); RandomAccessFile targetFile = new RandomAccessFile(targetResource.getTsFile(), "rw"); long fileLength = targetFile.length(); targetFile.getChannel().truncate(fileLength - 20); @@ -126,7 +130,9 @@ public void testCompatibleWithSomeSourceFilesLostWithFilePath() throws Exception registerTimeseriesInMManger(2, 3, false); TsFileResource targetResource = TsFileNameGenerator.getInnerCompactionTargetFileResource(seqResources, true); - new ReadChunkCompactionPerformer(seqResources, targetResource).perform(); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(seqResources, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetResource), true, "root.compactionTest"); @@ -183,7 +189,9 @@ public void testCompatibleWithAllSourceFilesExistWithFileInfo() throws Exception registerTimeseriesInMManger(2, 3, false); TsFileResource targetResource = TsFileNameGenerator.getInnerCompactionTargetFileResource(seqResources, true); - new ReadChunkCompactionPerformer(seqResources, targetResource).perform(); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(seqResources, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); RandomAccessFile targetFile = new RandomAccessFile(targetResource.getTsFile(), "rw"); long fileLength = targetFile.length(); targetFile.getChannel().truncate(fileLength - 20); @@ -220,7 +228,9 @@ public void testCompatibleWithSomeSourceFilesLostWithFileInfo() throws Exception registerTimeseriesInMManger(2, 3, false); TsFileResource targetResource = TsFileNameGenerator.getInnerCompactionTargetFileResource(unseqResources, true); - new ReadChunkCompactionPerformer(unseqResources, targetResource).perform(); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(seqResources, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetResource), true, "root.compactionTest"); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/recover/SizeTieredCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/recover/SizeTieredCompactionRecoverTest.java index 29f15a8687c93..39e138eeb2c5b 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/recover/SizeTieredCompactionRecoverTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/recover/SizeTieredCompactionRecoverTest.java @@ -26,8 +26,10 @@ import org.apache.iotdb.db.constant.TestConstant; import org.apache.iotdb.db.engine.compaction.CompactionUtils; import org.apache.iotdb.db.engine.compaction.log.CompactionLogger; +import org.apache.iotdb.db.engine.compaction.performer.ICompactionPerformer; import org.apache.iotdb.db.engine.compaction.performer.impl.ReadChunkCompactionPerformer; import org.apache.iotdb.db.engine.compaction.task.CompactionRecoverTask; +import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer; import org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils; import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy; @@ -235,7 +237,9 @@ public void testRecoverWithCompleteTargetFileUsingFileInfo() throws Exception { compactionLogger.logFiles(sourceFiles, STR_SOURCE_FILES); compactionLogger.logFiles(Collections.singletonList(targetResource), STR_TARGET_FILES); compactionLogger.close(); - new ReadChunkCompactionPerformer(sourceFiles, targetResource).perform(); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(sourceFiles, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetResource), true, COMPACTION_TEST_SG); CompactionRecoverTask recoverTask = @@ -286,7 +290,9 @@ public void testRecoverWithIncompleteTargetFileUsingFileInfo() throws Exception compactionLogger.logFiles(sourceFiles, STR_SOURCE_FILES); compactionLogger.logFiles(Collections.singletonList(targetResource), STR_TARGET_FILES); compactionLogger.close(); - new ReadChunkCompactionPerformer(sourceFiles, targetResource).perform(); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(sourceFiles, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetResource), true, COMPACTION_TEST_SG); FileOutputStream targetStream = new FileOutputStream(targetResource.getTsFile(), true); @@ -342,7 +348,9 @@ public void testRecoverWithCompleteTargetFileUsingFilePath() throws Exception { logger.logFiles(sourceFiles, CompactionLogger.STR_SOURCE_FILES); logger.logFiles(Collections.singletonList(targetResource), CompactionLogger.STR_TARGET_FILES); logger.close(); - new ReadChunkCompactionPerformer(sourceFiles, targetResource).perform(); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(sourceFiles, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetResource), true, COMPACTION_TEST_SG); CompactionRecoverTask recoverTask = @@ -393,7 +401,9 @@ public void testRecoverWithIncompleteTargetFileUsingFilePath() throws Exception compactionLogger.logFiles(sourceFiles, STR_SOURCE_FILES); compactionLogger.logFiles(Collections.singletonList(targetResource), STR_TARGET_FILES); compactionLogger.close(); - new ReadChunkCompactionPerformer(sourceFiles, targetResource).perform(); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(sourceFiles, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetResource), true, COMPACTION_TEST_SG); FileOutputStream targetStream = new FileOutputStream(targetResource.getTsFile(), true); @@ -452,7 +462,10 @@ public void testRecoverWithCompleteTargetFileUsingFileInfoAndChangingDataDirs() compactionLogger.logFiles(sourceFiles, STR_SOURCE_FILES); compactionLogger.logFiles(Collections.singletonList(targetResource), STR_TARGET_FILES); compactionLogger.close(); - new ReadChunkCompactionPerformer(sourceFiles, targetResource).perform(); + ICompactionPerformer performer = + new ReadChunkCompactionPerformer(sourceFiles, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetResource), true, COMPACTION_TEST_SG); long sizeOfTargetFile = targetResource.getTsFileSize(); @@ -549,7 +562,10 @@ public void testRecoverWithIncompleteTargetFileUsingFileInfoAndChangingDataDirs( compactionLogger.logFiles(sourceFiles, STR_SOURCE_FILES); compactionLogger.logFiles(Collections.singletonList(targetResource), STR_TARGET_FILES); compactionLogger.close(); - new ReadChunkCompactionPerformer(sourceFiles, targetResource).perform(); + ICompactionPerformer performer = + new ReadChunkCompactionPerformer(sourceFiles, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetResource), true, COMPACTION_TEST_SG); FileOutputStream targetStream = new FileOutputStream(targetResource.getTsFile(), true); @@ -643,7 +659,10 @@ public void testRecoverWithCompleteTargetFileUsingFilePathAndChangingDataDirs() compactionLogger.logFiles(sourceFiles, STR_SOURCE_FILES); compactionLogger.logFiles(Collections.singletonList(targetResource), STR_TARGET_FILES); compactionLogger.close(); - new ReadChunkCompactionPerformer(sourceFiles, targetResource).perform(); + ICompactionPerformer performer = + new ReadChunkCompactionPerformer(sourceFiles, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetResource), true, COMPACTION_TEST_SG); long sizeOfTargetFile = targetResource.getTsFileSize(); @@ -740,7 +759,10 @@ public void testRecoverWithIncompleteTargetFileUsingFilePathAndChangingDataDirs( compactionLogger.logFiles(sourceFiles, STR_SOURCE_FILES); compactionLogger.logFiles(Collections.singletonList(targetResource), STR_TARGET_FILES); compactionLogger.close(); - new ReadChunkCompactionPerformer(sourceFiles, targetResource).perform(); + ICompactionPerformer performer = + new ReadChunkCompactionPerformer(sourceFiles, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); CompactionUtils.moveTargetFile( Collections.singletonList(targetResource), true, COMPACTION_TEST_SG); FileOutputStream targetStream = new FileOutputStream(targetResource.getTsFile(), true); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java index f1bd004ab8016..9a490a7bfbfcc 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java @@ -725,7 +725,7 @@ public void testMerge() dataRegion.syncCloseAllWorkingTsFileProcessors(); dataRegion.compact(); long totalWaitingTime = 0; - while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) { + do { // wait try { Thread.sleep(100); @@ -740,7 +740,7 @@ public void testMerge() Assert.fail(); break; } - } + } while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0); QueryDataSource queryDataSource = dataRegion.query( @@ -787,7 +787,7 @@ public void testDeleteStorageGroupWhenCompacting() throws Exception { new ReadChunkCompactionPerformer(dataRegion.getSequenceFileList()), new AtomicInteger(0), 0); - CompactionTaskManager.getInstance().submitTask(task); + CompactionTaskManager.getInstance().addTaskToWaitingQueue(task); Thread.sleep(20); StorageEngine.getInstance().deleteStorageGroup(new PartialPath(storageGroup)); Thread.sleep(500); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java index 88088dc826d67..08d9b63dbff24 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java @@ -669,7 +669,7 @@ public void testMerge() processor.syncCloseAllWorkingTsFileProcessors(); processor.compact(); long totalWaitingTime = 0; - while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0) { + do { // wait try { Thread.sleep(100); @@ -684,7 +684,7 @@ public void testMerge() Assert.fail(); break; } - } + } while (CompactionTaskManager.getInstance().getExecutingTaskCount() > 0); QueryDataSource queryDataSource = processor.query( @@ -731,7 +731,7 @@ public void testDeleteStorageGroupWhenCompacting() throws Exception { new ReadChunkCompactionPerformer(processor.getSequenceFileList()), new AtomicInteger(0), 0); - CompactionTaskManager.getInstance().submitTask(task); + CompactionTaskManager.getInstance().addTaskToWaitingQueue(task); Thread.sleep(20); StorageEngine.getInstance().deleteStorageGroup(new PartialPath(storageGroup)); Thread.sleep(500);