From 766d20aa69620347e497fa8a67e0081bf9c1d4e4 Mon Sep 17 00:00:00 2001 From: Liu Xuxin Date: Tue, 29 Nov 2022 23:21:23 +0800 Subject: [PATCH 1/2] take snapshot before every compaction --- .../iotdb/commons/conf/IoTDBConstant.java | 1 + .../compaction/CompactionTaskManager.java | 2 + .../db/engine/compaction/CompactionUtils.java | 82 +++++++++++++++++++ .../cross/CrossSpaceCompactionTask.java | 18 +++- .../inner/InnerSpaceCompactionTask.java | 40 ++++++--- .../task/AbstractCompactionTask.java | 2 + 6 files changed, 129 insertions(+), 16 deletions(-) diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java index 00d8a48eaaf36..87bf0eb8bb0a5 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java @@ -201,6 +201,7 @@ private IoTDBConstant() {} public static final String DATA_FOLDER_NAME = "data"; public static final String SEQUENCE_FLODER_NAME = "sequence"; public static final String UNSEQUENCE_FLODER_NAME = "unsequence"; + public static final String COMPACTION_BACKUP_FOLDER_NAME = "compaction_back"; public static final String FILE_NAME_SEPARATOR = "-"; public static final String UPGRADE_FOLDER_NAME = "upgrade"; public static final String CONSENSUS_FOLDER_NAME = "consensus"; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java index edd79a608c2e8..e8a9d034e6527 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java @@ -49,6 +49,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** CompactionMergeTaskPoolManager provides a ThreadPool tPro queue and run all compaction tasks. */ public class CompactionTaskManager implements IService { @@ -79,6 +80,7 @@ public class CompactionTaskManager implements IService { private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); private volatile boolean init = false; + public static final AtomicLong compactionId = new AtomicLong(0); public static CompactionTaskManager getInstance() { return INSTANCE; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java index bb114710f5111..21129eed1b75a 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java @@ -19,8 +19,10 @@ package org.apache.iotdb.db.engine.compaction; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.modification.ModificationFile; +import org.apache.iotdb.db.engine.storagegroup.TsFileManager; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.query.control.FileReaderManager; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; @@ -34,6 +36,7 @@ import java.io.File; import java.io.IOException; +import java.nio.file.Files; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -256,4 +259,83 @@ public static void updatePlanIndexes( } } } + + public static void takeSnapshot( + TsFileManager tsFileManager, String storageGroupName, long timePartition, long compactionId) + throws IOException { + File targetBaseDir = + new File( + IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0] + + File.separator + + IoTDBConstant.COMPACTION_BACKUP_FOLDER_NAME + + File.separator + + compactionId + + File.separator + + storageGroupName + + File.separator + + timePartition); + if (!targetBaseDir.exists() && !targetBaseDir.mkdirs()) { + throw new IOException("Failed to create " + targetBaseDir.getAbsolutePath()); + } + tsFileManager.readLock(); + try { + List sequenceFiles = + tsFileManager.getSequenceListByTimePartition(timePartition); + for (TsFileResource resource : sequenceFiles) { + File targetFile = + new File( + targetBaseDir + + File.separator + + IoTDBConstant.SEQUENCE_FLODER_NAME + + File.separator + + resource.getTsFile().getName()); + File targetResource = + new File( + targetBaseDir + + File.separator + + IoTDBConstant.SEQUENCE_FLODER_NAME + + File.separator + + resource.getTsFile().getName() + + TsFileResource.RESOURCE_SUFFIX); + if (!targetFile.getParentFile().exists()) { + targetFile.getParentFile().mkdirs(); + } + Files.createLink(targetFile.toPath(), resource.getTsFile().toPath()); + Files.createLink( + targetResource.toPath(), + new File(resource.getTsFile().getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX) + .toPath()); + } + + List unsequenceFiles = + tsFileManager.getUnsequenceListByTimePartition(timePartition); + for (TsFileResource resource : unsequenceFiles) { + File targetFile = + new File( + targetBaseDir + + File.separator + + IoTDBConstant.UNSEQUENCE_FLODER_NAME + + File.separator + + resource.getTsFile().getName()); + File targetResource = + new File( + targetBaseDir + + File.separator + + IoTDBConstant.UNSEQUENCE_FLODER_NAME + + File.separator + + resource.getTsFile().getName() + + TsFileResource.RESOURCE_SUFFIX); + if (!targetFile.getParentFile().exists()) { + targetFile.getParentFile().mkdirs(); + } + Files.createLink(targetFile.toPath(), resource.getTsFile().toPath()); + Files.createLink( + targetResource.toPath(), + new File(resource.getTsFile().getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX) + .toPath()); + } + } finally { + tsFileManager.readUnlock(); + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java index 960073aea31a9..a8dcc65cc42db 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java @@ -124,9 +124,10 @@ protected void doCompaction() { } LOGGER.info( - "{}-{} [Compaction] CrossSpaceCompaction task starts with {} seq files and {} unsequence files. Sequence files : {}, unsequence files : {} . Sequence files size is {} MB, unsequence file size is {} MB, total size is {} MB", + "{}-{} [Compaction] [Task-id: {}] CrossSpaceCompaction task starts with {} seq files and {} unsequence files. Sequence files : {}, unsequence files : {} . Sequence files size is {} MB, unsequence file size is {} MB, total size is {} MB", storageGroupName, dataRegionId, + compactionId, selectedSequenceFiles.size(), selectedUnsequenceFiles.size(), selectedSequenceFiles, @@ -151,6 +152,9 @@ protected void doCompaction() { // restart recovery compactionLogger.close(); + CompactionUtils.takeSnapshot( + tsFileManager, storageGroupName + "-" + dataRegionId, timePartition, compactionId); + performer.setSourceFiles(selectedSequenceFiles, selectedUnsequenceFiles); performer.setTargetFiles(targetTsfileResourceList); performer.setSummary(summary); @@ -205,9 +209,10 @@ protected void doCompaction() { } long costTime = (System.currentTimeMillis() - startTime) / 1000; LOGGER.info( - "{}-{} [Compaction] CrossSpaceCompaction task finishes successfully, time cost is {} s, compaction speed is {} MB/s", + "{}-{} [Compaction] [Task-id: {}] CrossSpaceCompaction task finishes successfully, time cost is {} s, compaction speed is {} MB/s", storageGroupName, dataRegionId, + compactionId, costTime, (selectedSeqFileSize + selectedUnseqFileSize) / 1024 / 1024 / costTime); } @@ -215,12 +220,17 @@ protected void doCompaction() { // catch throwable to handle OOM errors if (!(throwable instanceof InterruptedException)) { LOGGER.error( - "{}-{} [Compaction] Meet errors in cross space compaction.", + "{}-{} [Compaction] [Task-id: {}] Meet errors in cross space compaction.", storageGroupName, dataRegionId, + compactionId, throwable); } else { - LOGGER.warn("{}-{} [Compaction] Compaction interrupted", storageGroupName, dataRegionId); + LOGGER.warn( + "{}-{} [Compaction] [Task-id: {}] Compaction interrupted", + storageGroupName, + dataRegionId, + compactionId); // clean the interrupted flag Thread.interrupted(); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java index 51d8554940f8b..4f215a367c159 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java @@ -106,9 +106,10 @@ protected void doCompaction() { // get resource of target file String dataDirectory = selectedTsFileResourceList.get(0).getTsFile().getParent(); LOGGER.info( - "{}-{} [Compaction] InnerSpaceCompaction task starts with {} files", + "{}-{} [Compaction] [Task-id: {}] InnerSpaceCompaction task starts with {} files", storageGroupName, dataRegionId, + compactionId, selectedTsFileResourceList.size()); try { targetTsFileResource = @@ -133,11 +134,15 @@ protected void doCompaction() { "{}-{} [InnerSpaceCompactionTask] Close the logger", storageGroupName, dataRegionId); compactionLogger.close(); LOGGER.info( - "{}-{} [Compaction] compaction with {}", + "{}-{} [Compaction] [Task-id: {}] compaction with {}", storageGroupName, dataRegionId, + compactionId, selectedTsFileResourceList); + // take a snapshot for all tsfile in this time partition + CompactionUtils.takeSnapshot( + tsFileManager, storageGroupName + "-" + dataRegionId, timePartition, compactionId); // carry out the compaction performer.setSourceFiles(selectedTsFileResourceList); // As elements in targetFiles may be removed in ReadPointCompactionPerformer, we should use a @@ -149,15 +154,18 @@ protected void doCompaction() { CompactionUtils.moveTargetFile(targetTsFileList, true, storageGroupName + "-" + dataRegionId); LOGGER.info( - "{}-{} [InnerSpaceCompactionTask] start to rename mods file", + "{}-{} [InnerSpaceCompactionTask] [Task-id: {}] start to rename mods file", storageGroupName, - dataRegionId); + dataRegionId, + compactionId); CompactionUtils.combineModsInInnerCompaction( selectedTsFileResourceList, targetTsFileResource); if (Thread.currentThread().isInterrupted() || summary.isCancel()) { throw new InterruptedException( - String.format("%s-%s [Compaction] abort", storageGroupName, dataRegionId)); + String.format( + "%s-%s [Compaction] [Task-id: %d] abort", + storageGroupName, dataRegionId, compactionId)); } // replace the old files with new file, the new is in same position as the old @@ -178,9 +186,10 @@ protected void doCompaction() { } LOGGER.info( - "{}-{} [Compaction] Compacted target files, try to get the write lock of source files", + "{}-{} [Compaction] [Task-id: {}] Compacted target files, try to get the write lock of source files", storageGroupName, - dataRegionId); + dataRegionId, + compactionId); // release the read lock of all source files, and get the write lock of them to delete them for (int i = 0; i < selectedTsFileResourceList.size(); ++i) { @@ -201,9 +210,10 @@ protected void doCompaction() { } LOGGER.info( - "{}-{} [Compaction] compaction finish, start to delete old files", + "{}-{} [Compaction] [Task-id: {}] compaction finish, start to delete old files", storageGroupName, - dataRegionId); + dataRegionId, + compactionId); // delete the old files long totalSizeOfDeletedFile = 0L; for (TsFileResource resource : selectedTsFileResourceList) { @@ -237,10 +247,11 @@ protected void doCompaction() { double costTime = (System.currentTimeMillis() - startTime) / 1000.0d; LOGGER.info( - "{}-{} [Compaction] InnerSpaceCompaction task finishes successfully, target file is {}," + "{}-{} [Compaction] [Task-id: {}] InnerSpaceCompaction task finishes successfully, target file is {}," + "time cost is {} s, compaction speed is {} MB/s", storageGroupName, dataRegionId, + compactionId, targetTsFileResource.getTsFile().getName(), costTime, ((double) selectedFileSize) / 1024.0d / 1024.0d / costTime); @@ -252,13 +263,18 @@ protected void doCompaction() { // catch throwable to handle OOM errors if (!(throwable instanceof InterruptedException)) { LOGGER.error( - "{}-{} [Compaction] Meet errors in inner space compaction.", + "{}-{} [Compaction] [Task-id: {}] Meet errors in inner space compaction.", storageGroupName, dataRegionId, + compactionId, throwable); } else { // clean the interrupt flag - LOGGER.warn("{}-{} [Compaction] Compaction interrupted", storageGroupName, dataRegionId); + LOGGER.warn( + "{}-{} [Compaction] [Task-id: {}] Compaction interrupted", + storageGroupName, + dataRegionId, + compactionId); Thread.interrupted(); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java index 5b6597b4c6b1a..be5932ed343e4 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java @@ -48,6 +48,7 @@ public abstract class AbstractCompactionTask { protected int hashCode = -1; protected CompactionTaskSummary summary = new CompactionTaskSummary(); protected long serialId; + protected long compactionId; public AbstractCompactionTask( String storageGroupName, @@ -62,6 +63,7 @@ public AbstractCompactionTask( this.tsFileManager = tsFileManager; this.currentTaskNum = currentTaskNum; this.serialId = serialId; + this.compactionId = CompactionTaskManager.compactionId.getAndIncrement(); } public abstract void setSourceFilesToCompactionCandidate(); From 1d48be9008b45fae3f95c3b7217884e51b3781d4 Mon Sep 17 00:00:00 2001 From: Liu Xuxin Date: Wed, 30 Nov 2022 10:01:13 +0800 Subject: [PATCH 2/2] add check for resource after compaction --- .../db/engine/compaction/CompactionUtils.java | 40 +++++++++++++++++++ .../cross/CrossSpaceCompactionTask.java | 14 +++++++ .../inner/InnerSpaceCompactionTask.java | 13 ++++++ .../engine/storagegroup/TsFileResource.java | 2 +- 4 files changed, 68 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java index 21129eed1b75a..f5126d1c5131c 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java @@ -38,8 +38,10 @@ import java.io.IOException; import java.nio.file.Files; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; /** @@ -338,4 +340,42 @@ public static void takeSnapshot( tsFileManager.readUnlock(); } } + + public static boolean validateTsFileResources( + TsFileManager manager, String storageGroupName, long timePartition, long compactionId) { + List resources = + manager.getSequenceListByTimePartition(timePartition).getArrayList(); + resources.sort( + (f1, f2) -> + Long.compareUnsigned( + Long.parseLong(f1.getTsFile().getName().split("-")[0]), + Long.parseLong(f2.getTsFile().getName().split("-")[0]))); + Map lastEndTimeMap = new HashMap<>(); + TsFileResource prevTsFileResource = null; + for (TsFileResource resource : resources) { + Set devices = resource.getDevices(); + for (String device : devices) { + long currentStartTime = resource.getStartTime(device); + long currentEndTime = resource.getEndTime(device); + long lastEndTime = lastEndTimeMap.computeIfAbsent(device, x -> Long.MIN_VALUE); + if (lastEndTime >= currentStartTime) { + logger.error( + "{} [TaskId: {}] Device {} is overlapped between {} and {}, end time in {} is {}, start time in {} is {}", + storageGroupName, + compactionId, + device, + prevTsFileResource, + resource, + prevTsFileResource, + lastEndTime, + resource, + currentStartTime); + return false; + } + lastEndTimeMap.put(device, currentEndTime); + } + prevTsFileResource = resource; + } + return true; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java index a8dcc65cc42db..50c65a288e12e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java @@ -173,6 +173,20 @@ protected void doCompaction() { timePartition, true); + if (!CompactionUtils.validateTsFileResources( + tsFileManager, storageGroupName, timePartition, compactionId)) { + LOGGER.error( + "{}-{} [Compaction] [Task-id: {}] Failed to pass the check of resources. " + + "Source sequence files is {}, unsequence files is {}, target files is {}. Terminate the system.", + storageGroupName, + dataRegionId, + compactionId, + selectedSequenceFiles, + selectedUnsequenceFiles, + targetTsfileResourceList); + System.exit(-1); + } + releaseReadAndLockWrite(selectedSequenceFiles); releaseReadAndLockWrite(selectedUnsequenceFiles); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java index 4f215a367c159..33e59587b6607 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java @@ -185,6 +185,19 @@ protected void doCompaction() { false); } + if (!CompactionUtils.validateTsFileResources( + tsFileManager, storageGroupName, timePartition, compactionId)) { + LOGGER.error( + "{}-{} [Compaction] [Task-id: {}] Failed to pass the check of resources. " + + "Source files is {}, target file is {}. Terminate the system.", + storageGroupName, + dataRegionId, + compactionId, + selectedTsFileResourceList, + targetTsFileList); + System.exit(-1); + } + LOGGER.info( "{}-{} [Compaction] [Task-id: {}] Compacted target files, try to get the write lock of source files", storageGroupName, diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java index a73ada0e331d8..19f0f86ce1d48 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java @@ -557,7 +557,7 @@ void moveTo(File targetDir) { @Override public String toString() { - return String.format("file is %s, status: %s", file.toString(), status); + return String.format("%s ", file.toString()); } @Override