From 37bb322a0ff18a9d4ab047f75e38bbf56050e400 Mon Sep 17 00:00:00 2001 From: Liu Xuxin Date: Tue, 3 May 2022 09:32:35 +0800 Subject: [PATCH 01/18] temp --- .../db/engine/snapshot/SnapshotTaker.java | 56 +++++++++++++++++++ .../exception/DirectoryNotLegalException.java | 28 ++++++++++ .../db/engine/storagegroup/DataRegion.java | 6 ++ .../org/apache/iotdb/rpc/TSStatusCode.java | 1 + 4 files changed, 91 insertions(+) create mode 100644 server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java create mode 100644 server/src/main/java/org/apache/iotdb/db/engine/snapshot/exception/DirectoryNotLegalException.java diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java new file mode 100644 index 0000000000000..1aa9f253cf0d9 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.snapshot; + +import org.apache.iotdb.db.engine.snapshot.exception.DirectoryNotLegalException; +import org.apache.iotdb.db.engine.storagegroup.DataRegion; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; + +/** + * SnapshotTaker takes data snapshot for a DataRegion in one time. It does so by creating + * hard link for files or copying them. SnapshotTaker supports two different ways of + * snapshot: Full Snapshot and Incremental Snapshot. The former takes a snapshot for all + * files in an empty directory, and the latter takes a snapshot based on the snapshot that took before. + */ +public class SnapshotTaker { + private final static Logger LOGGER = LoggerFactory.getLogger(SnapshotTaker.class); + private final DataRegion dataRegion; + + public SnapshotTaker(DataRegion dataRegion) { + this.dataRegion = dataRegion; + } + + public boolean takeFullSnapshot(long maxWalSizeBeforeSnapshot, String snapshotDirPath) throws DirectoryNotLegalException { + File snapshotDir = new File(snapshotDirPath); + if (snapshotDir.exists() && snapshotDir.listFiles() != null) { + // the directory should be empty or not exists + throw new DirectoryNotLegalException(String.format("%s already exists and is not empty", snapshotDirPath)); + } + + return false; + } + + public boolean takeIncrementalSnapshot(long maxWalSizeBeforeSnapshot, String snapshotDirPath) { + return false; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/exception/DirectoryNotLegalException.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/exception/DirectoryNotLegalException.java new file mode 100644 index 0000000000000..bd4742d9e5e33 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/exception/DirectoryNotLegalException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.snapshot.exception; + +import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.rpc.TSStatusCode; + +public class DirectoryNotLegalException extends IoTDBException { + public DirectoryNotLegalException(String message) { + super(message, TSStatusCode.SNAPSHOT_DIR_NOT_LEGAL.getStatusCode()); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index 1b2fbd1bfe27c..fa6fc4ee246ec 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@ -1398,6 +1398,12 @@ private TsFileProcessor getOrCreateTsFileProcessorIntern( return res; } + public List getAllWorkingTsFileProcessors() { + List processors = new LinkedList<>(workSequenceTsFileProcessors.values()); + processors.addAll(workUnsequenceTsFileProcessors.values()); + return processors; + } + private TsFileProcessor newTsFileProcessor(boolean sequence, long timePartitionId) throws IOException, DiskSpaceInsufficientException { diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index 2dfb511398904..902c5d7c7ed41 100644 --- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -83,6 +83,7 @@ public enum TSStatusCode { WRITE_PROCESS_ERROR(412), WRITE_PROCESS_REJECT(413), QUERY_ID_NOT_EXIST(414), + SNAPSHOT_DIR_NOT_LEGAL(415), UNSUPPORTED_INDEX_FUNC_ERROR(421), UNSUPPORTED_INDEX_TYPE_ERROR(422), From fd56932bef387f9b9daa6cdda791e6b429b5784d Mon Sep 17 00:00:00 2001 From: Liu Xuxin Date: Tue, 3 May 2022 15:13:20 +0800 Subject: [PATCH 02/18] temp --- .../db/engine/snapshot/SnapshotTaker.java | 65 +++++++++++++++++-- .../db/engine/storagegroup/DataRegion.java | 10 ++- .../storagegroup/TsFileNameGenerator.java | 2 +- 3 files changed, 63 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java index 1aa9f253cf0d9..d4893d5bc3995 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java @@ -18,6 +18,8 @@ */ package org.apache.iotdb.db.engine.snapshot; +import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.snapshot.exception.DirectoryNotLegalException; import org.apache.iotdb.db.engine.storagegroup.DataRegion; @@ -25,26 +27,53 @@ import org.slf4j.LoggerFactory; import java.io.File; +import java.util.LinkedList; +import java.util.List; /** - * SnapshotTaker takes data snapshot for a DataRegion in one time. It does so by creating - * hard link for files or copying them. SnapshotTaker supports two different ways of - * snapshot: Full Snapshot and Incremental Snapshot. The former takes a snapshot for all - * files in an empty directory, and the latter takes a snapshot based on the snapshot that took before. + * SnapshotTaker takes data snapshot for a DataRegion in one time. It does so by creating hard link + * for files or copying them. SnapshotTaker supports two different ways of snapshot: Full Snapshot + * and Incremental Snapshot. The former takes a snapshot for all files in an empty directory, and + * the latter takes a snapshot based on the snapshot that took before. */ public class SnapshotTaker { - private final static Logger LOGGER = LoggerFactory.getLogger(SnapshotTaker.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SnapshotTaker.class); private final DataRegion dataRegion; public SnapshotTaker(DataRegion dataRegion) { this.dataRegion = dataRegion; } - public boolean takeFullSnapshot(long maxWalSizeBeforeSnapshot, String snapshotDirPath) throws DirectoryNotLegalException { + public boolean takeFullSnapshot(String snapshotDirPath, boolean flushBeforeSnapshot) + throws DirectoryNotLegalException { File snapshotDir = new File(snapshotDirPath); if (snapshotDir.exists() && snapshotDir.listFiles() != null) { // the directory should be empty or not exists - throw new DirectoryNotLegalException(String.format("%s already exists and is not empty", snapshotDirPath)); + throw new DirectoryNotLegalException( + String.format("%s already exists and is not empty", snapshotDirPath)); + } + + if (flushBeforeSnapshot) { + dataRegion.syncCloseAllWorkingTsFileProcessors(); + } + + List timePartitions = dataRegion.getTimePartitions(); + for (Long timePartition : timePartitions) { + List seqDataDirs = getAllDataDirOfOnePartition(true, timePartition); + File seqTargetDir = + new File( + snapshotDirPath + + File.separator + + IoTDBConstant.SEQUENCE_FLODER_NAME + + File.separator + + dataRegion.getLogicalStorageGroupName() + + File.separator + + dataRegion.getDataRegionId() + + File.separator + + timePartition); + if (!seqTargetDir.mkdirs()) { + LOGGER.error("Failed to create target directory {}", seqTargetDir); + } } return false; @@ -53,4 +82,26 @@ public boolean takeFullSnapshot(long maxWalSizeBeforeSnapshot, String snapshotDi public boolean takeIncrementalSnapshot(long maxWalSizeBeforeSnapshot, String snapshotDirPath) { return false; } + + private List getAllDataDirOfOnePartition(boolean sequence, long timePartition) { + String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs(); + List resultDirs = new LinkedList<>(); + + for (String dataDir : dataDirs) { + resultDirs.add( + dataDir + + File.separator + + (sequence + ? IoTDBConstant.SEQUENCE_FLODER_NAME + : IoTDBConstant.UNSEQUENCE_FLODER_NAME) + + File.separator + + dataRegion.getLogicalStorageGroupName() + + File.separator + + dataRegion.getDataRegionId() + + File.separator + + timePartition + + File.separator); + } + return resultDirs; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index fa6fc4ee246ec..dd9ca5af4076e 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@ -1398,12 +1398,6 @@ private TsFileProcessor getOrCreateTsFileProcessorIntern( return res; } - public List getAllWorkingTsFileProcessors() { - List processors = new LinkedList<>(workSequenceTsFileProcessors.values()); - processors.addAll(workUnsequenceTsFileProcessors.values()); - return processors; - } - private TsFileProcessor newTsFileProcessor(boolean sequence, long timePartitionId) throws IOException, DiskSpaceInsufficientException { @@ -3478,6 +3472,10 @@ void call(TsFileResource oldTsFileResource, List newTsFileResour throws WriteProcessException; } + public List getTimePartitions() { + return new ArrayList<>(timePartitionIdVersionControllerMap.keySet()); + } + public String getInsertWriteLockHolder() { return insertWriteLockHolder; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java index b185838b320ae..946d8274610a1 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java @@ -71,7 +71,7 @@ public static String generateNewTsFilePathWithMkdir( time, version, innerSpaceCompactionCount, crossSpaceCompactionCount); } - private static String generateTsFileDir( + public static String generateTsFileDir( boolean sequence, String logicalStorageGroup, String virtualStorageGroup, From f115725e6ec1f3c7b02a2537da474437316321e3 Mon Sep 17 00:00:00 2001 From: Liu Xuxin Date: Thu, 5 May 2022 16:11:55 +0800 Subject: [PATCH 03/18] temp --- .../db/engine/snapshot/SnapshotTaker.java | 63 ++++++++++++++++++- 1 file changed, 62 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java index d4893d5bc3995..52f3d9c72ebea 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java @@ -20,13 +20,17 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.snapshot.exception.DirectoryNotLegalException; import org.apache.iotdb.db.engine.storagegroup.DataRegion; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; +import java.nio.file.Files; import java.util.LinkedList; import java.util.List; @@ -73,10 +77,42 @@ public boolean takeFullSnapshot(String snapshotDirPath, boolean flushBeforeSnaps + timePartition); if (!seqTargetDir.mkdirs()) { LOGGER.error("Failed to create target directory {}", seqTargetDir); + return false; + } + + try { + createFileSnapshot(seqDataDirs, seqTargetDir); + } catch (IOException e) { + LOGGER.error("Fail to create snapshot", e); + return false; + } + + List unseqDataDirs = getAllDataDirOfOnePartition(false, timePartition); + File unseqTargetDir = + new File( + snapshotDirPath + + File.separator + + IoTDBConstant.UNSEQUENCE_FLODER_NAME + + File.separator + + dataRegion.getLogicalStorageGroupName() + + File.separator + + dataRegion.getDataRegionId() + + File.separator + + timePartition); + if (!unseqTargetDir.mkdirs()) { + LOGGER.error("Failed to create target directory {}", seqTargetDir); + return false; + } + + try { + createFileSnapshot(unseqDataDirs, unseqTargetDir); + } catch (IOException e) { + LOGGER.error("Fail to create snapshot", e); + return false; } } - return false; + return true; } public boolean takeIncrementalSnapshot(long maxWalSizeBeforeSnapshot, String snapshotDirPath) { @@ -104,4 +140,29 @@ private List getAllDataDirOfOnePartition(boolean sequence, long timePart } return resultDirs; } + + private void createFileSnapshot(List sourceDirPaths, File targetDir) throws IOException { + for (String sourceDirPath : sourceDirPaths) { + File sourceDir = new File(sourceDirPath); + if (!sourceDir.exists()) { + continue; + } + // Collect TsFile, TsFileResource, Mods, CompactionMods + File[] files = + sourceDir.listFiles( + (dir, name) -> + name.endsWith(".tsfile") + || name.endsWith(TsFileResource.RESOURCE_SUFFIX) + || name.endsWith(ModificationFile.FILE_SUFFIX) + || name.endsWith(ModificationFile.COMPACTION_FILE_SUFFIX)); + if (files == null || files.length == 0) { + continue; + } + + for (File file : files) { + File linkFile = new File(targetDir, file.getName()); + Files.createLink(linkFile.toPath(), file.toPath()); + } + } + } } From db37febb1aec61ee11b3f34d5f6e293b503a8a57 Mon Sep 17 00:00:00 2001 From: Liu Xuxin Date: Thu, 5 May 2022 16:15:14 +0800 Subject: [PATCH 04/18] test snapshot --- .../iotdb/db/engine/storagegroup/DataRegion.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index 50f60468e7570..b1da1f3fe16cc 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@ -41,6 +41,7 @@ import org.apache.iotdb.db.engine.modification.Deletion; import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.engine.snapshot.SnapshotTaker; import org.apache.iotdb.db.engine.trigger.executor.TriggerEngine; import org.apache.iotdb.db.engine.trigger.executor.TriggerEvent; import org.apache.iotdb.db.engine.upgrade.UpgradeCheckStatus; @@ -2384,11 +2385,16 @@ private void loadUpgradedResources(List resources, boolean isseq /** merge file under this storage group processor */ public void compact() { - writeLock("merge"); + // writeLock("merge"); + // try { + // executeCompaction(); + // } finally { + // writeUnlock(); + // } try { - executeCompaction(); - } finally { - writeUnlock(); + new SnapshotTaker(this).takeFullSnapshot("../snapshot", true); + } catch (Exception e) { + logger.error("exception occurs", e); } } From 2307d48881b3a34701b59d3b7f3c65cdccefc19b Mon Sep 17 00:00:00 2001 From: LiuXuxin Date: Wed, 11 May 2022 12:30:40 +0800 Subject: [PATCH 05/18] recover success --- .../org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- .../apache/iotdb/db/engine/StorageEngine.java | 4 + .../db/engine/snapshot/SnapshotLoader.java | 75 ++++++++++++ .../db/engine/storagegroup/DataRegion.java | 107 ++++++++++++++++-- 4 files changed, 180 insertions(+), 8 deletions(-) create mode 100644 server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 2baa72707abba..d186d20960a6e 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -352,7 +352,7 @@ public class IoTDBConfig { private boolean enableUnseqSpaceCompaction = true; /** Compact the unsequence files into the overlapped sequence files */ - private boolean enableCrossSpaceCompaction = true; + private boolean enableCrossSpaceCompaction = false; /** * The strategy of inner space compaction task. There are just one inner space compaction strategy diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index 1b31f4ea25023..aef264c59891b 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@ -1055,6 +1055,10 @@ protected void getSeriesSchemas(InsertPlan insertPlan, DataRegion processor) } } + public String getSystemDir() { + return systemDir; + } + static class InstanceHolder { private static final StorageEngine INSTANCE = new StorageEngine(); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java new file mode 100644 index 0000000000000..a18bb7a8c10ad --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.snapshot; + +import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.engine.storagegroup.DataRegion; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; + +public class SnapshotLoader { + private Logger LOGGER = LoggerFactory.getLogger(SnapshotLoader.class); + private String storageGroupName; + private String dataDirPath; + private String dataRegionId; + + public SnapshotLoader(String dataDirPath, String storageGroupName, String dataRegionId) { + this.dataDirPath = dataDirPath; + this.storageGroupName = storageGroupName; + this.dataRegionId = dataRegionId; + } + + public DataRegion loadSnapshot() { + File seqDataDir = + new File( + dataDirPath + + File.separator + + IoTDBConstant.SEQUENCE_FLODER_NAME + + File.separator + + storageGroupName + + File.separator + + dataRegionId); + File unseqDataDir = + new File( + dataDirPath + + File.separator + + IoTDBConstant.UNSEQUENCE_FLODER_NAME + + File.separator + + storageGroupName + + File.separator + + dataRegionId); + if (!seqDataDir.exists() && !unseqDataDir.exists()) { + return null; + } + try { + return DataRegion.recoverFromSnapshot( + storageGroupName, + dataRegionId, + dataDirPath, + StorageEngine.getInstance().getSystemDir() + File.separator + storageGroupName); + } catch (Exception e) { + LOGGER.error("Exception occurs while load snapshot from {}", seqDataDir, e); + return null; + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index b1da1f3fe16cc..a95f2144a2981 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@ -41,7 +41,7 @@ import org.apache.iotdb.db.engine.modification.Deletion; import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; -import org.apache.iotdb.db.engine.snapshot.SnapshotTaker; +import org.apache.iotdb.db.engine.snapshot.SnapshotLoader; import org.apache.iotdb.db.engine.trigger.executor.TriggerEngine; import org.apache.iotdb.db.engine.trigger.executor.TriggerEvent; import org.apache.iotdb.db.engine.upgrade.UpgradeCheckStatus; @@ -263,6 +263,29 @@ public class DataRegion { /** used to collect TsFiles in this virtual storage group */ private TsFileSyncManager tsFileSyncManager = TsFileSyncManager.getInstance(); + private DataRegion(String systemDir, String dataRegionId, String logicalStorageGroupName) { + storageGroupSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, dataRegionId); + this.dataRegionId = dataRegionId; + this.logicalStorageGroupName = logicalStorageGroupName; + this.tsFileManager = + new TsFileManager(logicalStorageGroupName, dataRegionId, storageGroupSysDir.getPath()); + if (storageGroupSysDir.mkdirs()) { + logger.info( + "Storage Group system Directory {} doesn't exist, create it", + storageGroupSysDir.getPath()); + } else if (!storageGroupSysDir.exists()) { + logger.error("create Storage Group system Directory {} failed", storageGroupSysDir.getPath()); + } + + // if use id table, we use id table flush time manager + if (config.isEnableIDTable()) { + idTable = IDTableManager.getInstance().getIDTableDirectly(logicalStorageGroupName); + lastFlushTimeManager = new IDTableFlushTimeManager(idTable); + } else { + lastFlushTimeManager = new LastFlushTimeManager(); + } + } + /** * constrcut a storage group processor * @@ -2391,11 +2414,12 @@ public void compact() { // } finally { // writeUnlock(); // } - try { - new SnapshotTaker(this).takeFullSnapshot("../snapshot", true); - } catch (Exception e) { - logger.error("exception occurs", e); - } + // try { + // new SnapshotTaker(this).takeFullSnapshot("../snapshot", true); + // } catch (Exception e) { + // logger.error("exception occurs", e); + // } + new SnapshotLoader("../snapshot", "root.test", "0").loadSnapshot(); } private void resetLastCacheWhenLoadingTsfile(TsFileResource newTsFileResource) @@ -3479,7 +3503,7 @@ void call(TsFileResource oldTsFileResource, List newTsFileResour } public List getTimePartitions() { - return new ArrayList<>(timePartitionIdVersionControllerMap.keySet()); + return new ArrayList<>(partitionMaxFileVersions.keySet()); } public String getInsertWriteLockHolder() { @@ -3503,4 +3527,73 @@ public ILastFlushTimeManager getLastFlushTimeManager() { public TsFileManager getTsFileManager() { return tsFileManager; } + + public static DataRegion recoverFromSnapshot( + String logicalStorageGroupName, String dataRegionId, String dataDir, String systemDir) + throws Exception { + DataRegion dataRegion = new DataRegion(systemDir, dataRegionId, logicalStorageGroupName); + dataRegion.recoverCompaction(); + Pair, List> seqTsFilePairs = + dataRegion.getAllFiles( + Collections.singletonList( + dataDir + File.separator + IoTDBConstant.SEQUENCE_FLODER_NAME)); + Pair, List> unseqTsFilesPair = + dataRegion.getAllFiles( + Collections.singletonList( + dataDir + File.separator + IoTDBConstant.UNSEQUENCE_FLODER_NAME)); + List tmpSeqTsFiles = seqTsFilePairs.left; + List tmpUnseqTsFiles = unseqTsFilesPair.left; + DataRegionRecoveryContext DataRegionRecoveryContext = + dataRegion.new DataRegionRecoveryContext(tmpSeqTsFiles.size() + tmpUnseqTsFiles.size()); + Map> partitionTmpSeqTsFiles = + dataRegion.splitResourcesByPartition(tmpSeqTsFiles); + Map> partitionTmpUnseqTsFiles = + dataRegion.splitResourcesByPartition(tmpUnseqTsFiles); + for (List value : partitionTmpSeqTsFiles.values()) { + for (TsFileResource tsFileResource : value) { + dataRegion.recoverSealedTsFiles(tsFileResource, DataRegionRecoveryContext, true); + } + } + for (List value : partitionTmpUnseqTsFiles.values()) { + for (TsFileResource tsFileResource : value) { + dataRegion.recoverSealedTsFiles(tsFileResource, DataRegionRecoveryContext, false); + } + } + for (TsFileResource resource : dataRegion.tsFileManager.getTsFileList(true)) { + long partitionNum = resource.getTimePartition(); + dataRegion.updatePartitionFileVersion(partitionNum, resource.getVersion()); + } + for (TsFileResource resource : dataRegion.tsFileManager.getTsFileList(false)) { + long partitionNum = resource.getTimePartition(); + dataRegion.updatePartitionFileVersion(partitionNum, resource.getVersion()); + } + dataRegion.updateLatestFlushedTime(); + List seqTsFileResources = dataRegion.tsFileManager.getTsFileList(true); + for (TsFileResource resource : seqTsFileResources) { + long timePartitionId = resource.getTimePartition(); + Map endTimeMap = new HashMap<>(); + for (String deviceId : resource.getDevices()) { + long endTime = resource.getEndTime(deviceId); + endTimeMap.put(deviceId.intern(), endTime); + } + dataRegion.lastFlushTimeManager.setMultiDeviceLastTime(timePartitionId, endTimeMap); + dataRegion.lastFlushTimeManager.setMultiDeviceFlushedTime(timePartitionId, endTimeMap); + dataRegion.lastFlushTimeManager.setMultiDeviceGlobalFlushedTime(endTimeMap); + } + + // recover and start timed compaction thread + dataRegion.initCompaction(); + if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()) { + MetricsService.getInstance() + .getMetricManager() + .getOrCreateAutoGauge( + Metric.MEM.toString(), + MetricLevel.IMPORTANT, + dataRegion.storageGroupInfo, + StorageGroupInfo::getMemCost, + Tag.NAME.toString(), + "storageGroup_" + dataRegion.getLogicalStorageGroupName()); + } + return null; + } } From 7443cb248c18e5d736cf38ae9f2965023e5735b2 Mon Sep 17 00:00:00 2001 From: Liu Xuxin Date: Wed, 11 May 2022 17:52:49 +0800 Subject: [PATCH 06/18] adapt DataRegionStateMachine --- .../statemachine/DataRegionStateMachine.java | 43 ++++- .../apache/iotdb/db/engine/StorageEngine.java | 13 ++ .../db/engine/snapshot/SnapshotLoader.java | 157 ++++++++++++++++++ .../db/engine/snapshot/SnapshotTaker.java | 22 ++- .../db/engine/storagegroup/DataRegion.java | 14 +- .../dataregion/StorageGroupManager.java | 4 + 6 files changed, 246 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java index a5aebf39e18cb..1c9760d8b4fed 100644 --- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java +++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java @@ -20,8 +20,12 @@ package org.apache.iotdb.db.consensus.statemachine; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.consensus.common.DataSet; +import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.engine.snapshot.SnapshotLoader; +import org.apache.iotdb.db.engine.snapshot.SnapshotTaker; import org.apache.iotdb.db.engine.storagegroup.DataRegion; import org.apache.iotdb.db.exception.BatchProcessException; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager; @@ -47,7 +51,7 @@ public class DataRegionStateMachine extends BaseStateMachine { private static final FragmentInstanceManager QUERY_INSTANCE_MANAGER = FragmentInstanceManager.getInstance(); - private final DataRegion region; + private DataRegion region; public DataRegionStateMachine(DataRegion region) { this.region = region; @@ -59,13 +63,46 @@ public void start() {} @Override public void stop() {} + // snapshotDir -> ../snapshot + // ../snapshot/seq/root.test/0/ xxx / kkk.tsfile @Override public boolean takeSnapshot(File snapshotDir) { - return false; + try { + return new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true); + } catch (Exception e) { + logger.error( + "Exception occurs when taking snapshot for {}-{} in {}", + region.getLogicalStorageGroupName(), + region.getDataRegionId(), + snapshotDir, + e); + return false; + } } + // ../snapshot @Override - public void loadSnapshot(File latestSnapshotRootDir) {} + public void loadSnapshot(File latestSnapshotRootDir) { + // clear data + // -> data + // load + // replace + this.region = + new SnapshotLoader( + latestSnapshotRootDir.getAbsolutePath(), + region.getLogicalStorageGroupName(), + region.getDataRegionId()) + .loadSnapshotForStateMachine(); + try { + StorageEngine.getInstance() + .setDataRegion( + new PartialPath(region.getLogicalStorageGroupName()), + region.getDataRegionId(), + region); + } catch (Exception e) { + logger.error("Exception occurs when replacing data region in storage engine.", e); + } + } @Override protected TSStatus write(FragmentInstance fragmentInstance) { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index ef1b160232c3e..c1a6e5973315b 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@ -1059,6 +1059,19 @@ public String getSystemDir() { return systemDir; } + public TsFileFlushPolicy getFileFlushPolicy() { + return fileFlushPolicy; + } + + public void setDataRegion( + PartialPath logicalStorageGroup, String dataRegionId, DataRegion dataRegion) + throws MetadataException { + StorageGroupManager manager = + getStorageGroupManager( + IoTDB.schemaProcessor.getStorageGroupNodeByPath(logicalStorageGroup)); + manager.setDataRegion(Integer.parseInt(dataRegionId), dataRegion); + } + static class InstanceHolder { private static final StorageEngine INSTANCE = new StorageEngine(); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java index a18bb7a8c10ad..ea5c5136f3454 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java @@ -19,13 +19,20 @@ package org.apache.iotdb.db.engine.snapshot; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.engine.storagegroup.DataRegion; +import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; public class SnapshotLoader { private Logger LOGGER = LoggerFactory.getLogger(SnapshotLoader.class); @@ -72,4 +79,154 @@ public DataRegion loadSnapshot() { return null; } } + + /** + * 1. Clear origin data 2. Move snapshot data to data dir 3. Load data region + * + * @return + */ + public DataRegion loadSnapshotForStateMachine() { + String[] dataDirPaths = IoTDBDescriptor.getInstance().getConfig().getDataDirs(); + + // delete + List timePartitions = new ArrayList<>(); + for (String dataDirPath : dataDirPaths) { + File seqDataDirForThisRegion = + new File( + dataDirPath + + File.separator + + IoTDBConstant.SEQUENCE_FLODER_NAME + + File.separator + + storageGroupName + + File.separator + + dataRegionId); + if (seqDataDirForThisRegion.exists()) { + File[] files = seqDataDirForThisRegion.listFiles(); + if (files != null) { + timePartitions.addAll(Arrays.asList(files)); + } + } + + File unseqDataDirForThisRegion = + new File( + dataDirPath + + File.separator + + IoTDBConstant.UNSEQUENCE_FLODER_NAME + + File.separator + + storageGroupName + + File.separator + + dataRegionId); + + if (unseqDataDirForThisRegion.exists()) { + File[] files = unseqDataDirForThisRegion.listFiles(); + if (files != null) { + timePartitions.addAll(Arrays.asList(files)); + } + } + } + + try { + for (File timePartition : timePartitions) { + FileUtils.forceDelete(timePartition); + } + } catch (IOException e) { + LOGGER.error( + "Exception occurs when deleting time partition directory for {}-{}", + storageGroupName, + dataRegionId, + e); + } + + // move the snapshot data to data dir + String targetDataDir = dataDirPaths[0]; + File seqBaseDir = + new File( + targetDataDir + + File.separator + + IoTDBConstant.SEQUENCE_FLODER_NAME + + File.separator + + storageGroupName + + File.separator + + dataRegionId); + File unseqBaseDir = + new File( + targetDataDir + + File.separator + + IoTDBConstant.UNSEQUENCE_FLODER_NAME + + File.separator + + storageGroupName + + File.separator + + dataRegionId); + File seqSourceDataDir = + new File( + dataDirPath + + File.separator + + IoTDBConstant.SEQUENCE_FLODER_NAME + + File.separator + + storageGroupName + + File.separator + + dataRegionId); + File unseqSourceDataDir = + new File( + dataDirPath + + File.separator + + IoTDBConstant.UNSEQUENCE_FLODER_NAME + + File.separator + + storageGroupName + + File.separator + + dataRegionId); + File[] seqTimePartitionDirs = seqSourceDataDir.listFiles(); + if (seqTimePartitionDirs != null) { + try { + createLinksFromSnapshotDirToDataDir(seqTimePartitionDirs, seqBaseDir); + } catch (IOException e) { + LOGGER.error( + "Exception occurs when creating links from snapshot directory to data directory", e); + return null; + } + } + + File[] unseqTimePartitionDirs = unseqSourceDataDir.listFiles(); + if (unseqTimePartitionDirs != null) { + try { + createLinksFromSnapshotDirToDataDir(unseqTimePartitionDirs, unseqBaseDir); + } catch (IOException e) { + LOGGER.error( + "Exception occurs when creating links from snapshot directory to data directory", e); + return null; + } + } + + this.dataDirPath = targetDataDir; + return loadSnapshot(); + } + + private void createLinksFromSnapshotDirToDataDir(File[] timePartitionDirs, File baseDir) + throws IOException { + for (File seqTimePartitionDir : timePartitionDirs) { + String[] splittedPathInfo = + seqTimePartitionDir + .getAbsolutePath() + .split(File.separator.equals("\\") ? "\\\\" : File.separator); + String timePartition = splittedPathInfo[splittedPathInfo.length - 1]; + File targetDirForThisTimePartition = new File(baseDir, timePartition); + if (!targetDirForThisTimePartition.mkdirs()) { + throw new IOException( + String.format("Failed to make directory %s", targetDirForThisTimePartition)); + } + File[] sourceTsFiles = seqTimePartitionDir.listFiles(); + if (sourceTsFiles != null) { + for (File sourceFile : sourceTsFiles) { + File targetFile = new File(targetDirForThisTimePartition, sourceFile.getName()); + try { + Files.createLink(targetFile.toPath(), sourceFile.toPath()); + } catch (IOException e) { + throw new IOException( + String.format("Failed to create hard link from %s to %s", sourceFile, targetFile), + e); + } + } + } + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java index 52f3d9c72ebea..ae9d6b1e8f67e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java @@ -50,8 +50,26 @@ public SnapshotTaker(DataRegion dataRegion) { public boolean takeFullSnapshot(String snapshotDirPath, boolean flushBeforeSnapshot) throws DirectoryNotLegalException { - File snapshotDir = new File(snapshotDirPath); - if (snapshotDir.exists() && snapshotDir.listFiles() != null) { + File seqSnapshotDir = + new File( + snapshotDirPath + + File.separator + + IoTDBConstant.SEQUENCE_FLODER_NAME + + File.separator + + dataRegion.getLogicalStorageGroupName() + + File.separator + + dataRegion.getDataRegionId()); + File unseqSnapshotDir = + new File( + snapshotDirPath + + File.separator + + IoTDBConstant.UNSEQUENCE_FLODER_NAME + + File.separator + + dataRegion.getLogicalStorageGroupName() + + File.separator + + dataRegion.getDataRegionId()); + if ((seqSnapshotDir.exists() && seqSnapshotDir.listFiles() != null) + || (unseqSnapshotDir.exists() && unseqSnapshotDir.listFiles() != null)) { // the directory should be empty or not exists throw new DirectoryNotLegalException( String.format("%s already exists and is not empty", snapshotDirPath)); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index 0fae74376cb90..2b9186c8ca6e2 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@ -263,10 +263,15 @@ public class DataRegion { /** used to collect TsFiles in this virtual storage group */ private TsFileSyncManager tsFileSyncManager = TsFileSyncManager.getInstance(); - private DataRegion(String systemDir, String dataRegionId, String logicalStorageGroupName) { + private DataRegion( + String systemDir, + String dataRegionId, + String logicalStorageGroupName, + TsFileFlushPolicy tsFileFlushPolicy) { storageGroupSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, dataRegionId); this.dataRegionId = dataRegionId; this.logicalStorageGroupName = logicalStorageGroupName; + this.fileFlushPolicy = tsFileFlushPolicy; this.tsFileManager = new TsFileManager(logicalStorageGroupName, dataRegionId, storageGroupSysDir.getPath()); if (storageGroupSysDir.mkdirs()) { @@ -3545,7 +3550,12 @@ public TsFileManager getTsFileManager() { public static DataRegion recoverFromSnapshot( String logicalStorageGroupName, String dataRegionId, String dataDir, String systemDir) throws Exception { - DataRegion dataRegion = new DataRegion(systemDir, dataRegionId, logicalStorageGroupName); + DataRegion dataRegion = + new DataRegion( + systemDir, + dataRegionId, + logicalStorageGroupName, + StorageEngine.getInstance().getFileFlushPolicy()); dataRegion.recoverCompaction(); Pair, List> seqTsFilePairs = dataRegion.getAllFiles( diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java index aa7f8c2c260ea..558a5345bfcbf 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java @@ -496,4 +496,8 @@ public void abortCompaction() { public AtomicBoolean getIsSettling() { return isSettling; } + + public void setDataRegion(int dataRegionId, DataRegion dataRegion) { + this.dataRegion[dataRegionId] = dataRegion; + } } From 6fdbc79a03d067ecc0d727375813e456245245f6 Mon Sep 17 00:00:00 2001 From: LiuXuxin Date: Fri, 13 May 2022 16:45:20 +0800 Subject: [PATCH 07/18] flat the file when taking snapshot --- .../org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- .../cross/CrossSpaceCompactionTask.java | 1 + .../db/engine/snapshot/SnapshotLoader.java | 106 +++++------------- .../db/engine/snapshot/SnapshotTaker.java | 86 +++++--------- .../db/engine/storagegroup/DataRegion.java | 2 +- 5 files changed, 64 insertions(+), 133 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 51a28d64d7193..b93929e35f12c 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -353,7 +353,7 @@ public class IoTDBConfig { private boolean enableUnseqSpaceCompaction = true; /** Compact the unsequence files into the overlapped sequence files */ - private boolean enableCrossSpaceCompaction = false; + private boolean enableCrossSpaceCompaction = true; /** * The strategy of inner space compaction task. There are just one inner space compaction strategy diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java index 155c9795fa805..b7373bf3266b6 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java @@ -124,6 +124,7 @@ protected void doCompaction() throws Exception { // indicates that the cross compaction is complete and the result can be reused during a // restart recovery compactionLogger.close(); + Thread.sleep(1000000); performer.setSourceFiles(selectedSequenceFiles, selectedUnsequenceFiles); performer.setTargetFiles(targetTsfileResourceList); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java index ea5c5136f3454..d73e35e2e3a48 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java @@ -47,26 +47,10 @@ public SnapshotLoader(String dataDirPath, String storageGroupName, String dataRe } public DataRegion loadSnapshot() { - File seqDataDir = - new File( - dataDirPath - + File.separator - + IoTDBConstant.SEQUENCE_FLODER_NAME - + File.separator - + storageGroupName - + File.separator - + dataRegionId); - File unseqDataDir = - new File( - dataDirPath - + File.separator - + IoTDBConstant.UNSEQUENCE_FLODER_NAME - + File.separator - + storageGroupName - + File.separator - + dataRegionId); - if (!seqDataDir.exists() && !unseqDataDir.exists()) { - return null; + File dataDir = new File(dataDirPath); + if (!dataDir.exists()) { + throw new RuntimeException( + String.format("Failed to load snapshot from %s because it does not exist", dataDir)); } try { return DataRegion.recoverFromSnapshot( @@ -75,7 +59,7 @@ public DataRegion loadSnapshot() { dataDirPath, StorageEngine.getInstance().getSystemDir() + File.separator + storageGroupName); } catch (Exception e) { - LOGGER.error("Exception occurs while load snapshot from {}", seqDataDir, e); + LOGGER.error("Exception occurs while load snapshot from {}", dataDir, e); return null; } } @@ -157,39 +141,10 @@ public DataRegion loadSnapshotForStateMachine() { + storageGroupName + File.separator + dataRegionId); - File seqSourceDataDir = - new File( - dataDirPath - + File.separator - + IoTDBConstant.SEQUENCE_FLODER_NAME - + File.separator - + storageGroupName - + File.separator - + dataRegionId); - File unseqSourceDataDir = - new File( - dataDirPath - + File.separator - + IoTDBConstant.UNSEQUENCE_FLODER_NAME - + File.separator - + storageGroupName - + File.separator - + dataRegionId); - File[] seqTimePartitionDirs = seqSourceDataDir.listFiles(); - if (seqTimePartitionDirs != null) { - try { - createLinksFromSnapshotDirToDataDir(seqTimePartitionDirs, seqBaseDir); - } catch (IOException e) { - LOGGER.error( - "Exception occurs when creating links from snapshot directory to data directory", e); - return null; - } - } - - File[] unseqTimePartitionDirs = unseqSourceDataDir.listFiles(); - if (unseqTimePartitionDirs != null) { + File sourceDataDir = new File(dataDirPath); + if (sourceDataDir.exists()) { try { - createLinksFromSnapshotDirToDataDir(unseqTimePartitionDirs, unseqBaseDir); + createLinksFromSnapshotDirToDataDir(sourceDataDir, seqBaseDir, unseqBaseDir); } catch (IOException e) { LOGGER.error( "Exception occurs when creating links from snapshot directory to data directory", e); @@ -201,31 +156,32 @@ public DataRegion loadSnapshotForStateMachine() { return loadSnapshot(); } - private void createLinksFromSnapshotDirToDataDir(File[] timePartitionDirs, File baseDir) - throws IOException { - for (File seqTimePartitionDir : timePartitionDirs) { - String[] splittedPathInfo = - seqTimePartitionDir - .getAbsolutePath() - .split(File.separator.equals("\\") ? "\\\\" : File.separator); - String timePartition = splittedPathInfo[splittedPathInfo.length - 1]; - File targetDirForThisTimePartition = new File(baseDir, timePartition); - if (!targetDirForThisTimePartition.mkdirs()) { + private void createLinksFromSnapshotDirToDataDir( + File sourceDir, File seqBaseDir, File unseqBaseDir) throws IOException { + File[] files = sourceDir.listFiles(); + if (files == null) { + return; + } + for (File sourceFile : files) { + String[] fileInfo = sourceFile.getName().split(SnapshotTaker.SNAPSHOT_FILE_INFO_SEP_STR); + if (fileInfo.length != 5) { + continue; + } + boolean seq = fileInfo[0].equals("seq"); + String timePartition = fileInfo[3]; + String fileName = fileInfo[4]; + File targetDirForThisTimePartition = new File(seq ? seqBaseDir : unseqBaseDir, timePartition); + if (!targetDirForThisTimePartition.exists() && !targetDirForThisTimePartition.mkdirs()) { throw new IOException( String.format("Failed to make directory %s", targetDirForThisTimePartition)); } - File[] sourceTsFiles = seqTimePartitionDir.listFiles(); - if (sourceTsFiles != null) { - for (File sourceFile : sourceTsFiles) { - File targetFile = new File(targetDirForThisTimePartition, sourceFile.getName()); - try { - Files.createLink(targetFile.toPath(), sourceFile.toPath()); - } catch (IOException e) { - throw new IOException( - String.format("Failed to create hard link from %s to %s", sourceFile, targetFile), - e); - } - } + + File targetFile = new File(targetDirForThisTimePartition, fileName); + try { + Files.createLink(targetFile.toPath(), sourceFile.toPath()); + } catch (IOException e) { + throw new IOException( + String.format("Failed to create hard link from %s to %s", sourceFile, targetFile), e); } } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java index ae9d6b1e8f67e..13d1f96647060 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java @@ -20,6 +20,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.compaction.log.CompactionLogger; import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.snapshot.exception.DirectoryNotLegalException; import org.apache.iotdb.db.engine.storagegroup.DataRegion; @@ -43,38 +44,25 @@ public class SnapshotTaker { private static final Logger LOGGER = LoggerFactory.getLogger(SnapshotTaker.class); private final DataRegion dataRegion; + public static String SNAPSHOT_FILE_INFO_SEP_STR = "_"; public SnapshotTaker(DataRegion dataRegion) { this.dataRegion = dataRegion; } public boolean takeFullSnapshot(String snapshotDirPath, boolean flushBeforeSnapshot) - throws DirectoryNotLegalException { - File seqSnapshotDir = - new File( - snapshotDirPath - + File.separator - + IoTDBConstant.SEQUENCE_FLODER_NAME - + File.separator - + dataRegion.getLogicalStorageGroupName() - + File.separator - + dataRegion.getDataRegionId()); - File unseqSnapshotDir = - new File( - snapshotDirPath - + File.separator - + IoTDBConstant.UNSEQUENCE_FLODER_NAME - + File.separator - + dataRegion.getLogicalStorageGroupName() - + File.separator - + dataRegion.getDataRegionId()); - if ((seqSnapshotDir.exists() && seqSnapshotDir.listFiles() != null) - || (unseqSnapshotDir.exists() && unseqSnapshotDir.listFiles() != null)) { + throws DirectoryNotLegalException, IOException { + File snapshotDir = new File(snapshotDirPath); + if (snapshotDir.exists() && snapshotDir.listFiles() != null) { // the directory should be empty or not exists throw new DirectoryNotLegalException( String.format("%s already exists and is not empty", snapshotDirPath)); } + if (!snapshotDir.exists() && !snapshotDir.mkdirs()) { + throw new IOException(String.format("Failed to create directory %s", snapshotDir)); + } + if (flushBeforeSnapshot) { dataRegion.syncCloseAllWorkingTsFileProcessors(); } @@ -82,48 +70,18 @@ public boolean takeFullSnapshot(String snapshotDirPath, boolean flushBeforeSnaps List timePartitions = dataRegion.getTimePartitions(); for (Long timePartition : timePartitions) { List seqDataDirs = getAllDataDirOfOnePartition(true, timePartition); - File seqTargetDir = - new File( - snapshotDirPath - + File.separator - + IoTDBConstant.SEQUENCE_FLODER_NAME - + File.separator - + dataRegion.getLogicalStorageGroupName() - + File.separator - + dataRegion.getDataRegionId() - + File.separator - + timePartition); - if (!seqTargetDir.mkdirs()) { - LOGGER.error("Failed to create target directory {}", seqTargetDir); - return false; - } try { - createFileSnapshot(seqDataDirs, seqTargetDir); + createFileSnapshot(seqDataDirs, snapshotDir, true, timePartition); } catch (IOException e) { LOGGER.error("Fail to create snapshot", e); return false; } List unseqDataDirs = getAllDataDirOfOnePartition(false, timePartition); - File unseqTargetDir = - new File( - snapshotDirPath - + File.separator - + IoTDBConstant.UNSEQUENCE_FLODER_NAME - + File.separator - + dataRegion.getLogicalStorageGroupName() - + File.separator - + dataRegion.getDataRegionId() - + File.separator - + timePartition); - if (!unseqTargetDir.mkdirs()) { - LOGGER.error("Failed to create target directory {}", seqTargetDir); - return false; - } try { - createFileSnapshot(unseqDataDirs, unseqTargetDir); + createFileSnapshot(unseqDataDirs, snapshotDir, false, timePartition); } catch (IOException e) { LOGGER.error("Fail to create snapshot", e); return false; @@ -159,7 +117,9 @@ private List getAllDataDirOfOnePartition(boolean sequence, long timePart return resultDirs; } - private void createFileSnapshot(List sourceDirPaths, File targetDir) throws IOException { + private void createFileSnapshot( + List sourceDirPaths, File targetDir, boolean sequence, long timePartition) + throws IOException { for (String sourceDirPath : sourceDirPaths) { File sourceDir = new File(sourceDirPath); if (!sourceDir.exists()) { @@ -172,13 +132,27 @@ private void createFileSnapshot(List sourceDirPaths, File targetDir) thr name.endsWith(".tsfile") || name.endsWith(TsFileResource.RESOURCE_SUFFIX) || name.endsWith(ModificationFile.FILE_SUFFIX) - || name.endsWith(ModificationFile.COMPACTION_FILE_SUFFIX)); + || name.endsWith(ModificationFile.COMPACTION_FILE_SUFFIX) + || name.endsWith(CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX) + || name.endsWith(CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX) + || name.endsWith(IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX) + || name.endsWith(IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX)); if (files == null || files.length == 0) { continue; } for (File file : files) { - File linkFile = new File(targetDir, file.getName()); + String newFileName = + (sequence ? "seq" : "unseq") + + SNAPSHOT_FILE_INFO_SEP_STR + + dataRegion.getLogicalStorageGroupName() + + SNAPSHOT_FILE_INFO_SEP_STR + + dataRegion.getDataRegionId() + + SNAPSHOT_FILE_INFO_SEP_STR + + timePartition + + SNAPSHOT_FILE_INFO_SEP_STR + + file.getName(); + File linkFile = new File(targetDir, newFileName); Files.createLink(linkFile.toPath(), file.toPath()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index 2b9186c8ca6e2..cca102121f497 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@ -2438,7 +2438,7 @@ public void compact() { // } catch (Exception e) { // logger.error("exception occurs", e); // } - new SnapshotLoader("../snapshot", "root.test", "0").loadSnapshot(); + new SnapshotLoader("../snapshot", "root.test", "0").loadSnapshotForStateMachine(); } private void resetLastCacheWhenLoadingTsfile(TsFileResource newTsFileResource) From 84640a1aa04ebaf7622b985cf77dfe52fa848b96 Mon Sep 17 00:00:00 2001 From: Liu Xuxin Date: Mon, 16 May 2022 16:14:32 +0800 Subject: [PATCH 08/18] test config --- .../iotdb/confignode/conf/ConfigNodeConf.java | 4 +- .../iotdb/consensus/ratis/RatisConsensus.java | 3 ++ .../org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- .../db/engine/storagegroup/DataRegion.java | 2 +- testcontainer/pom.xml | 52 +++++++++---------- 5 files changed, 33 insertions(+), 30 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java index a0257461cf613..ce2613b3aabe7 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java @@ -115,10 +115,10 @@ public class ConfigNodeConf { private long timePartitionInterval = 604800; /** Default number of SchemaRegion replicas */ - private int schemaReplicationFactor = 3; + private int schemaReplicationFactor = 1; /** Default number of DataRegion replicas */ - private int dataReplicationFactor = 3; + private int dataReplicationFactor = 1; /** The initial number of SchemaRegions of each StorageGroup */ private int initialSchemaRegionCount = 1; diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index 8d644bbc81fa6..29cf6913e39ad 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -114,6 +114,9 @@ public RatisConsensus(TEndPoint endpoint, File ratisStorageDir, IStateMachine.Re RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(ratisStorageDir)); RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(properties, true); + RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(properties, 5); + RaftServerConfigKeys.Log.setPurgeUptoSnapshotIndex(properties, true); + RaftServerConfigKeys.Log.setPurgeGap(properties, 1); // set the port which server listen to in RaftProperty object final int port = NetUtils.createSocketAddr(address).getPort(); diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 51a28d64d7193..9528cdf925618 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -156,7 +156,7 @@ public class IoTDBConfig { // region Write Ahead Log Configuration /** Write mode of wal */ - private WALMode walMode = WALMode.ASYNC; + private WALMode walMode = WALMode.DISABLE; /** WAL directories */ private String[] walDirs = {DEFAULT_BASE_DIR + File.separator + IoTDBConstant.WAL_FOLDER_NAME}; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index 2b9186c8ca6e2..85050fa338e29 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@ -3618,6 +3618,6 @@ public static DataRegion recoverFromSnapshot( Tag.NAME.toString(), "storageGroup_" + dataRegion.getLogicalStorageGroupName()); } - return null; + return dataRegion; } } diff --git a/testcontainer/pom.xml b/testcontainer/pom.xml index e3067d56abdf1..debb00e7dbbb9 100644 --- a/testcontainer/pom.xml +++ b/testcontainer/pom.xml @@ -33,8 +33,8 @@ docker build -t apache/iotdb:maven-development -f ${basedir}/../docker/src/main/Dockerfile-single ${basedir}/../. image rm apache/iotdb:maven-development - - + + build -t apache/iotdb:sync-maven-development -f ${basedir}/../docker/src/main/Dockerfile-single-tc ${basedir}/../. image rm apache/iotdb:sync-maven-development @@ -90,18 +90,18 @@ ${docker.build.single.argument} - - - - - - - - - - - - + + + + + + + + + + + + build-sync-docker-image pre-integration-test @@ -126,18 +126,18 @@ ${docker.clean.single.argument} - - - - - - - - - - - - + + + + + + + + + + + + clean-sync-docker-image post-integration-test From 4759431c7af77d9b541d760e530685466ef7e24f Mon Sep 17 00:00:00 2001 From: Liu Xuxin Date: Mon, 16 May 2022 18:04:26 +0800 Subject: [PATCH 09/18] add IT --- .../iotdb/db/integration/IoTDBSnapshotIT.java | 194 ++++++++++++++++++ .../db/engine/snapshot/SnapshotLoader.java | 2 +- .../db/engine/snapshot/SnapshotTaker.java | 4 +- 3 files changed, 198 insertions(+), 2 deletions(-) create mode 100644 integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java new file mode 100644 index 0000000000000..e68e1055f2741 --- /dev/null +++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.integration; + +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.constant.TestConstant; +import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.engine.snapshot.SnapshotLoader; +import org.apache.iotdb.db.engine.snapshot.SnapshotTaker; +import org.apache.iotdb.db.engine.snapshot.exception.DirectoryNotLegalException; +import org.apache.iotdb.db.engine.storagegroup.DataRegion; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.integration.env.EnvFactory; + +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; + +public class IoTDBSnapshotIT { + final String SG_NAME = "root.snapshotTest"; + + @Before + public void setUp() throws Exception { + EnvFactory.getEnv().initBeforeTest(); + IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false); + } + + @After + public void tearDown() throws Exception { + EnvFactory.getEnv().cleanAfterTest(); + IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(true); + } + + @Test + public void testTakeSnapshot() + throws SQLException, IllegalPathException, StorageEngineException, IOException, + DirectoryNotLegalException { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("set storage group to " + SG_NAME); + for (int i = 0; i < 10; ++i) { + for (int j = 0; j < 10; ++j) { + statement.execute( + String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j)); + } + statement.execute("flush"); + for (int j = 0; j < 10; ++j) { + statement.execute( + String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j + 1)); + } + statement.execute("flush"); + } + + DataRegion region = StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME)); + File snapshotDir = new File(TestConstant.OUTPUT_DATA_DIR, "snapshot"); + if (snapshotDir.exists()) { + FileUtils.forceDelete(snapshotDir); + } + + new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true); + + Assert.assertTrue(snapshotDir.exists()); + Assert.assertTrue(snapshotDir.isDirectory()); + File[] seqTsfiles = + snapshotDir.listFiles((dir, name) -> name.endsWith(".tsfile") && name.startsWith("seq")); + File[] unseqTsfiles = + snapshotDir.listFiles( + (dir, name) -> name.endsWith(".tsfile") && name.startsWith("unseq")); + File[] tsfileResources = + snapshotDir.listFiles((dir, name) -> name.endsWith(".tsfile.resource")); + Assert.assertNotNull(seqTsfiles); + Assert.assertNotNull(unseqTsfiles); + Assert.assertNotNull(tsfileResources); + Assert.assertEquals(10, seqTsfiles.length); + Assert.assertEquals(10, unseqTsfiles.length); + Assert.assertEquals(20, tsfileResources.length); + } + } + + @Test(expected = DirectoryNotLegalException.class) + public void testTakeSnapshotInNotEmptyDir() + throws SQLException, IOException, IllegalPathException, StorageEngineException, + DirectoryNotLegalException { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("set storage group to " + SG_NAME); + for (int i = 0; i < 10; ++i) { + for (int j = 0; j < 10; ++j) { + statement.execute( + String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j)); + } + statement.execute("flush"); + for (int j = 0; j < 10; ++j) { + statement.execute( + String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j + 1)); + } + statement.execute("flush"); + } + + DataRegion region = StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME)); + File snapshotDir = new File(TestConstant.OUTPUT_DATA_DIR, "snapshot"); + if (!snapshotDir.exists()) { + snapshotDir.mkdirs(); + } + + File tmpFile = new File(snapshotDir, "test"); + tmpFile.createNewFile(); + + new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true); + } + } + + @Test + public void testLoadSnapshot() throws SQLException, MetadataException, StorageEngineException { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + Map resultMap = new HashMap<>(); + statement.execute("set storage group to " + SG_NAME); + for (int i = 0; i < 10; ++i) { + for (int j = 0; j < 10; ++j) { + statement.execute( + String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j)); + } + statement.execute("flush"); + for (int j = 0; j < 10; ++j) { + statement.execute( + String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j + 1)); + } + statement.execute("flush"); + } + ResultSet resultSet = statement.executeQuery("select ** from root"); + while (resultSet.next()) { + long time = resultSet.getLong("Time"); + for (int i = 0; i < 10; ++i) { + String measurment = SG_NAME + ".d" + i + ".s"; + int res = resultSet.getInt(SG_NAME + ".d" + i + ".s"); + resultMap.put(time + measurment, res); + } + } + + DataRegion region = StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME)); + File snapshotDir = new File(TestConstant.OUTPUT_DATA_DIR, "snapshot"); + if (!snapshotDir.exists()) { + snapshotDir.mkdirs(); + } + StorageEngine.getInstance() + .setDataRegion( + new PartialPath(SG_NAME), + "0", + new SnapshotLoader(snapshotDir.getAbsolutePath(), SG_NAME, "0") + .loadSnapshotForStateMachine()); + + resultSet = statement.executeQuery("select ** from root"); + while (resultSet.next()) { + long time = resultSet.getLong("Time"); + for (int i = 0; i < 10; ++i) { + String measurment = SG_NAME + ".d" + i + ".s"; + int res = resultSet.getInt(SG_NAME + ".d" + i + ".s"); + Assert.assertEquals(resultMap.get(time + measurment).intValue(), res); + } + } + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java index d73e35e2e3a48..882ffad5b0ebe 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java @@ -46,7 +46,7 @@ public SnapshotLoader(String dataDirPath, String storageGroupName, String dataRe this.dataRegionId = dataRegionId; } - public DataRegion loadSnapshot() { + private DataRegion loadSnapshot() { File dataDir = new File(dataDirPath); if (!dataDir.exists()) { throw new RuntimeException( diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java index 13d1f96647060..10514a4c18efc 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java @@ -53,7 +53,9 @@ public SnapshotTaker(DataRegion dataRegion) { public boolean takeFullSnapshot(String snapshotDirPath, boolean flushBeforeSnapshot) throws DirectoryNotLegalException, IOException { File snapshotDir = new File(snapshotDirPath); - if (snapshotDir.exists() && snapshotDir.listFiles() != null) { + if (snapshotDir.exists() + && snapshotDir.listFiles() != null + && snapshotDir.listFiles().length > 0) { // the directory should be empty or not exists throw new DirectoryNotLegalException( String.format("%s already exists and is not empty", snapshotDirPath)); From a8455396c72c0c1dbbd49429a83f887287666c28 Mon Sep 17 00:00:00 2001 From: LiuXuxin Date: Tue, 17 May 2022 10:29:43 +0800 Subject: [PATCH 10/18] reset config --- server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index e83f05ade3431..8cf84cf10c33b 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -156,7 +156,7 @@ public class IoTDBConfig { // region Write Ahead Log Configuration /** Write mode of wal */ - private WALMode walMode = WALMode.DISABLE; + private WALMode walMode = WALMode.ASYNC; /** WAL directories */ private String[] walDirs = {DEFAULT_BASE_DIR + File.separator + IoTDBConstant.WAL_FOLDER_NAME}; From 834d14558ad154e510e68f8eec3ee010f7f68ffe Mon Sep 17 00:00:00 2001 From: LiuXuxin Date: Tue, 17 May 2022 10:33:09 +0800 Subject: [PATCH 11/18] reset some changes --- .../iotdb/confignode/conf/ConfigNodeConf.java | 4 ++-- .../statemachine/DataRegionStateMachine.java | 7 ------- .../cross/CrossSpaceCompactionTask.java | 1 - .../db/engine/storagegroup/DataRegion.java | 19 ++++++------------- .../org/apache/iotdb/rpc/TSStatusCode.java | 1 - 5 files changed, 8 insertions(+), 24 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java index b911cafb69475..48e830d3e510b 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java @@ -107,10 +107,10 @@ public class ConfigNodeConf { private long timePartitionInterval = 604800; /** Default number of SchemaRegion replicas */ - private int schemaReplicationFactor = 1; + private int schemaReplicationFactor = 3; /** Default number of DataRegion replicas */ - private int dataReplicationFactor = 1; + private int dataReplicationFactor = 3; /** The maximum number of SchemaRegions of each StorageGroup */ private int maximumSchemaRegionCount = 4; diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java index a6df9d6611388..a9207ecbeb78c 100644 --- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java +++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java @@ -66,8 +66,6 @@ public void start() {} @Override public void stop() {} - // snapshotDir -> ../snapshot - // ../snapshot/seq/root.test/0/ xxx / kkk.tsfile @Override public boolean takeSnapshot(File snapshotDir) { try { @@ -83,13 +81,8 @@ public boolean takeSnapshot(File snapshotDir) { } } - // ../snapshot @Override public void loadSnapshot(File latestSnapshotRootDir) { - // clear data - // -> data - // load - // replace this.region = new SnapshotLoader( latestSnapshotRootDir.getAbsolutePath(), diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java index 70cebcf42437a..58077be480196 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java @@ -124,7 +124,6 @@ protected void doCompaction() throws Exception { // indicates that the cross compaction is complete and the result can be reused during a // restart recovery compactionLogger.close(); - Thread.sleep(1000000); performer.setSourceFiles(selectedSequenceFiles, selectedUnsequenceFiles); performer.setTargetFiles(targetTsfileResourceList); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index 64456dc4c309d..cf74dd0470036 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@ -41,7 +41,6 @@ import org.apache.iotdb.db.engine.modification.Deletion; import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; -import org.apache.iotdb.db.engine.snapshot.SnapshotLoader; import org.apache.iotdb.db.engine.trigger.executor.TriggerEngine; import org.apache.iotdb.db.engine.trigger.executor.TriggerEvent; import org.apache.iotdb.db.engine.upgrade.UpgradeCheckStatus; @@ -2427,18 +2426,12 @@ private void loadUpgradedResources(List resources, boolean isseq /** merge file under this storage group processor */ public void compact() { - // writeLock("merge"); - // try { - // executeCompaction(); - // } finally { - // writeUnlock(); - // } - // try { - // new SnapshotTaker(this).takeFullSnapshot("../snapshot", true); - // } catch (Exception e) { - // logger.error("exception occurs", e); - // } - new SnapshotLoader("../snapshot", "root.test", "0").loadSnapshotForStateMachine(); + writeLock("merge"); + try { + executeCompaction(); + } finally { + writeUnlock(); + } } private void resetLastCacheWhenLoadingTsfile(TsFileResource newTsFileResource) diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index bb037da449bef..f612f6732b074 100644 --- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -83,7 +83,6 @@ public enum TSStatusCode { WRITE_PROCESS_ERROR(412), WRITE_PROCESS_REJECT(413), QUERY_ID_NOT_EXIST(414), - SNAPSHOT_DIR_NOT_LEGAL(415), UNSUPPORTED_INDEX_FUNC_ERROR(421), UNSUPPORTED_INDEX_TYPE_ERROR(422), From 2634daa43e07e0e3bd5f95d5d78d2e711c3c16bb Mon Sep 17 00:00:00 2001 From: LiuXuxin Date: Tue, 17 May 2022 11:01:54 +0800 Subject: [PATCH 12/18] add test --- .../iotdb/db/integration/IoTDBSnapshotIT.java | 69 ++++++++++++++++++- .../dataregion/StorageGroupManager.java | 4 ++ .../org/apache/iotdb/rpc/TSStatusCode.java | 1 + 3 files changed, 73 insertions(+), 1 deletion(-) diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java index e68e1055f2741..322f48a6f7443 100644 --- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java +++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java @@ -24,6 +24,8 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.constant.TestConstant; import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.engine.cache.ChunkCache; +import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache; import org.apache.iotdb.db.engine.snapshot.SnapshotLoader; import org.apache.iotdb.db.engine.snapshot.SnapshotTaker; import org.apache.iotdb.db.engine.snapshot.exception.DirectoryNotLegalException; @@ -141,7 +143,9 @@ public void testTakeSnapshotInNotEmptyDir() } @Test - public void testLoadSnapshot() throws SQLException, MetadataException, StorageEngineException { + public void testLoadSnapshot() + throws SQLException, MetadataException, StorageEngineException, DirectoryNotLegalException, + IOException { try (Connection connection = EnvFactory.getEnv().getConnection(); Statement statement = connection.createStatement()) { Map resultMap = new HashMap<>(); @@ -173,6 +177,7 @@ public void testLoadSnapshot() throws SQLException, MetadataException, StorageEn if (!snapshotDir.exists()) { snapshotDir.mkdirs(); } + new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true); StorageEngine.getInstance() .setDataRegion( new PartialPath(SG_NAME), @@ -180,6 +185,68 @@ public void testLoadSnapshot() throws SQLException, MetadataException, StorageEn new SnapshotLoader(snapshotDir.getAbsolutePath(), SG_NAME, "0") .loadSnapshotForStateMachine()); + ChunkCache.getInstance().clear(); + TimeSeriesMetadataCache.getInstance().clear(); + resultSet = statement.executeQuery("select ** from root"); + while (resultSet.next()) { + long time = resultSet.getLong("Time"); + for (int i = 0; i < 10; ++i) { + String measurment = SG_NAME + ".d" + i + ".s"; + int res = resultSet.getInt(SG_NAME + ".d" + i + ".s"); + Assert.assertEquals(resultMap.get(time + measurment).intValue(), res); + } + } + } + } + + @Test + public void testTakeAndLoadSnapshotWhenCompaction() + throws SQLException, MetadataException, StorageEngineException, InterruptedException, + DirectoryNotLegalException, IOException { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + Map resultMap = new HashMap<>(); + statement.execute("set storage group to " + SG_NAME); + for (int i = 0; i < 10; ++i) { + for (int j = 0; j < 10; ++j) { + statement.execute( + String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j)); + } + statement.execute("flush"); + for (int j = 0; j < 10; ++j) { + statement.execute( + String.format("insert into %s.d%d(time, s) values (%d, %d)", SG_NAME, i, j, j + 1)); + } + statement.execute("flush"); + } + + ResultSet resultSet = statement.executeQuery("select ** from root"); + while (resultSet.next()) { + long time = resultSet.getLong("Time"); + for (int i = 0; i < 10; ++i) { + String measurment = SG_NAME + ".d" + i + ".s"; + int res = resultSet.getInt(SG_NAME + ".d" + i + ".s"); + resultMap.put(time + measurment, res); + } + } + + File snapshotDir = new File(TestConstant.OUTPUT_DATA_DIR, "snapshot"); + if (!snapshotDir.exists()) { + snapshotDir.mkdirs(); + } + IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(true); + statement.execute("merge"); + DataRegion region = StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME)); + new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true); + region.abortCompaction(); + StorageEngine.getInstance() + .setDataRegion( + new PartialPath(SG_NAME), + "0", + new SnapshotLoader(snapshotDir.getAbsolutePath(), SG_NAME, "0") + .loadSnapshotForStateMachine()); + ChunkCache.getInstance().clear(); + TimeSeriesMetadataCache.getInstance().clear(); resultSet = statement.executeQuery("select ** from root"); while (resultSet.next()) { long time = resultSet.getLong("Time"); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java index 558a5345bfcbf..79dcc03ac7cc8 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java @@ -498,6 +498,10 @@ public AtomicBoolean getIsSettling() { } public void setDataRegion(int dataRegionId, DataRegion dataRegion) { + if (this.dataRegion[dataRegionId] != null) { + this.dataRegion[dataRegionId].abortCompaction(); + this.dataRegion[dataRegionId].asyncCloseAllWorkingTsFileProcessors(); + } this.dataRegion[dataRegionId] = dataRegion; } } diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index f612f6732b074..bb037da449bef 100644 --- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -83,6 +83,7 @@ public enum TSStatusCode { WRITE_PROCESS_ERROR(412), WRITE_PROCESS_REJECT(413), QUERY_ID_NOT_EXIST(414), + SNAPSHOT_DIR_NOT_LEGAL(415), UNSUPPORTED_INDEX_FUNC_ERROR(421), UNSUPPORTED_INDEX_TYPE_ERROR(422), From 5a31ae361d6f3155dfb5b65ae5165c4a9a32b814 Mon Sep 17 00:00:00 2001 From: LiuXuxin Date: Tue, 17 May 2022 15:28:34 +0800 Subject: [PATCH 13/18] load by new data region --- .../iotdb/db/integration/IoTDBSnapshotIT.java | 8 +- .../statemachine/DataRegionStateMachine.java | 2 +- .../db/engine/snapshot/SnapshotLoader.java | 8 +- .../db/engine/snapshot/SnapshotTaker.java | 8 +- .../db/engine/storagegroup/DataRegion.java | 102 ------------------ 5 files changed, 10 insertions(+), 118 deletions(-) diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java index 322f48a6f7443..5397f4fa1e401 100644 --- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java +++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java @@ -89,7 +89,7 @@ public void testTakeSnapshot() FileUtils.forceDelete(snapshotDir); } - new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true); + new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath()); Assert.assertTrue(snapshotDir.exists()); Assert.assertTrue(snapshotDir.isDirectory()); @@ -138,7 +138,7 @@ public void testTakeSnapshotInNotEmptyDir() File tmpFile = new File(snapshotDir, "test"); tmpFile.createNewFile(); - new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true); + new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath()); } } @@ -177,7 +177,7 @@ public void testLoadSnapshot() if (!snapshotDir.exists()) { snapshotDir.mkdirs(); } - new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true); + new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath()); StorageEngine.getInstance() .setDataRegion( new PartialPath(SG_NAME), @@ -237,7 +237,7 @@ public void testTakeAndLoadSnapshotWhenCompaction() IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(true); statement.execute("merge"); DataRegion region = StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME)); - new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true); + new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath()); region.abortCompaction(); StorageEngine.getInstance() .setDataRegion( diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java index a9207ecbeb78c..a1e04e0943fd0 100644 --- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java +++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java @@ -69,7 +69,7 @@ public void stop() {} @Override public boolean takeSnapshot(File snapshotDir) { try { - return new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true); + return new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath()); } catch (Exception e) { logger.error( "Exception occurs when taking snapshot for {}-{} in {}", diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java index 882ffad5b0ebe..4ea90265f55ff 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java @@ -53,11 +53,11 @@ private DataRegion loadSnapshot() { String.format("Failed to load snapshot from %s because it does not exist", dataDir)); } try { - return DataRegion.recoverFromSnapshot( - storageGroupName, + return new DataRegion( + StorageEngine.getInstance().getSystemDir() + File.separator + storageGroupName, dataRegionId, - dataDirPath, - StorageEngine.getInstance().getSystemDir() + File.separator + storageGroupName); + StorageEngine.getInstance().getFileFlushPolicy(), + storageGroupName); } catch (Exception e) { LOGGER.error("Exception occurs while load snapshot from {}", dataDir, e); return null; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java index 10514a4c18efc..ce431e69c2609 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java @@ -50,7 +50,7 @@ public SnapshotTaker(DataRegion dataRegion) { this.dataRegion = dataRegion; } - public boolean takeFullSnapshot(String snapshotDirPath, boolean flushBeforeSnapshot) + public boolean takeFullSnapshot(String snapshotDirPath) throws DirectoryNotLegalException, IOException { File snapshotDir = new File(snapshotDirPath); if (snapshotDir.exists() @@ -65,9 +65,7 @@ public boolean takeFullSnapshot(String snapshotDirPath, boolean flushBeforeSnaps throw new IOException(String.format("Failed to create directory %s", snapshotDir)); } - if (flushBeforeSnapshot) { dataRegion.syncCloseAllWorkingTsFileProcessors(); - } List timePartitions = dataRegion.getTimePartitions(); for (Long timePartition : timePartitions) { @@ -93,10 +91,6 @@ public boolean takeFullSnapshot(String snapshotDirPath, boolean flushBeforeSnaps return true; } - public boolean takeIncrementalSnapshot(long maxWalSizeBeforeSnapshot, String snapshotDirPath) { - return false; - } - private List getAllDataDirOfOnePartition(boolean sequence, long timePartition) { String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs(); List resultDirs = new LinkedList<>(); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index cf74dd0470036..9fbc91165539f 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@ -262,34 +262,6 @@ public class DataRegion { /** used to collect TsFiles in this virtual storage group */ private TsFileSyncManager tsFileSyncManager = TsFileSyncManager.getInstance(); - private DataRegion( - String systemDir, - String dataRegionId, - String logicalStorageGroupName, - TsFileFlushPolicy tsFileFlushPolicy) { - storageGroupSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, dataRegionId); - this.dataRegionId = dataRegionId; - this.logicalStorageGroupName = logicalStorageGroupName; - this.fileFlushPolicy = tsFileFlushPolicy; - this.tsFileManager = - new TsFileManager(logicalStorageGroupName, dataRegionId, storageGroupSysDir.getPath()); - if (storageGroupSysDir.mkdirs()) { - logger.info( - "Storage Group system Directory {} doesn't exist, create it", - storageGroupSysDir.getPath()); - } else if (!storageGroupSysDir.exists()) { - logger.error("create Storage Group system Directory {} failed", storageGroupSysDir.getPath()); - } - - // if use id table, we use id table flush time manager - if (config.isEnableIDTable()) { - idTable = IDTableManager.getInstance().getIDTableDirectly(logicalStorageGroupName); - lastFlushTimeManager = new IDTableFlushTimeManager(idTable); - } else { - lastFlushTimeManager = new LastFlushTimeManager(); - } - } - /** * constrcut a storage group processor * @@ -3539,78 +3511,4 @@ public ILastFlushTimeManager getLastFlushTimeManager() { public TsFileManager getTsFileManager() { return tsFileManager; } - - public static DataRegion recoverFromSnapshot( - String logicalStorageGroupName, String dataRegionId, String dataDir, String systemDir) - throws Exception { - DataRegion dataRegion = - new DataRegion( - systemDir, - dataRegionId, - logicalStorageGroupName, - StorageEngine.getInstance().getFileFlushPolicy()); - dataRegion.recoverCompaction(); - Pair, List> seqTsFilePairs = - dataRegion.getAllFiles( - Collections.singletonList( - dataDir + File.separator + IoTDBConstant.SEQUENCE_FLODER_NAME)); - Pair, List> unseqTsFilesPair = - dataRegion.getAllFiles( - Collections.singletonList( - dataDir + File.separator + IoTDBConstant.UNSEQUENCE_FLODER_NAME)); - List tmpSeqTsFiles = seqTsFilePairs.left; - List tmpUnseqTsFiles = unseqTsFilesPair.left; - DataRegionRecoveryContext DataRegionRecoveryContext = - dataRegion.new DataRegionRecoveryContext(tmpSeqTsFiles.size() + tmpUnseqTsFiles.size()); - Map> partitionTmpSeqTsFiles = - dataRegion.splitResourcesByPartition(tmpSeqTsFiles); - Map> partitionTmpUnseqTsFiles = - dataRegion.splitResourcesByPartition(tmpUnseqTsFiles); - for (List value : partitionTmpSeqTsFiles.values()) { - for (TsFileResource tsFileResource : value) { - dataRegion.recoverSealedTsFiles(tsFileResource, DataRegionRecoveryContext, true); - } - } - for (List value : partitionTmpUnseqTsFiles.values()) { - for (TsFileResource tsFileResource : value) { - dataRegion.recoverSealedTsFiles(tsFileResource, DataRegionRecoveryContext, false); - } - } - for (TsFileResource resource : dataRegion.tsFileManager.getTsFileList(true)) { - long partitionNum = resource.getTimePartition(); - dataRegion.updatePartitionFileVersion(partitionNum, resource.getVersion()); - } - for (TsFileResource resource : dataRegion.tsFileManager.getTsFileList(false)) { - long partitionNum = resource.getTimePartition(); - dataRegion.updatePartitionFileVersion(partitionNum, resource.getVersion()); - } - dataRegion.updateLatestFlushedTime(); - List seqTsFileResources = dataRegion.tsFileManager.getTsFileList(true); - for (TsFileResource resource : seqTsFileResources) { - long timePartitionId = resource.getTimePartition(); - Map endTimeMap = new HashMap<>(); - for (String deviceId : resource.getDevices()) { - long endTime = resource.getEndTime(deviceId); - endTimeMap.put(deviceId.intern(), endTime); - } - dataRegion.lastFlushTimeManager.setMultiDeviceLastTime(timePartitionId, endTimeMap); - dataRegion.lastFlushTimeManager.setMultiDeviceFlushedTime(timePartitionId, endTimeMap); - dataRegion.lastFlushTimeManager.setMultiDeviceGlobalFlushedTime(endTimeMap); - } - - // recover and start timed compaction thread - dataRegion.initCompaction(); - if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()) { - MetricsService.getInstance() - .getMetricManager() - .getOrCreateAutoGauge( - Metric.MEM.toString(), - MetricLevel.IMPORTANT, - dataRegion.storageGroupInfo, - StorageGroupInfo::getMemCost, - Tag.NAME.toString(), - "storageGroup_" + dataRegion.getLogicalStorageGroupName()); - } - return dataRegion; - } } From ffbee2f6d7fdedf872c54298566e8b7c29e43f48 Mon Sep 17 00:00:00 2001 From: LiuXuxin Date: Wed, 18 May 2022 17:32:22 +0800 Subject: [PATCH 14/18] edit according to reviews --- .../iotdb/db/integration/IoTDBSnapshotIT.java | 8 +- .../statemachine/DataRegionStateMachine.java | 2 +- .../apache/iotdb/db/engine/StorageEngine.java | 4 - .../db/engine/snapshot/SnapshotLoader.java | 100 ++++++++++-------- .../db/engine/snapshot/SnapshotTaker.java | 12 ++- 5 files changed, 70 insertions(+), 56 deletions(-) diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java index 5397f4fa1e401..322f48a6f7443 100644 --- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java +++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java @@ -89,7 +89,7 @@ public void testTakeSnapshot() FileUtils.forceDelete(snapshotDir); } - new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath()); + new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true); Assert.assertTrue(snapshotDir.exists()); Assert.assertTrue(snapshotDir.isDirectory()); @@ -138,7 +138,7 @@ public void testTakeSnapshotInNotEmptyDir() File tmpFile = new File(snapshotDir, "test"); tmpFile.createNewFile(); - new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath()); + new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true); } } @@ -177,7 +177,7 @@ public void testLoadSnapshot() if (!snapshotDir.exists()) { snapshotDir.mkdirs(); } - new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath()); + new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true); StorageEngine.getInstance() .setDataRegion( new PartialPath(SG_NAME), @@ -237,7 +237,7 @@ public void testTakeAndLoadSnapshotWhenCompaction() IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(true); statement.execute("merge"); DataRegion region = StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME)); - new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath()); + new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true); region.abortCompaction(); StorageEngine.getInstance() .setDataRegion( diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java index a1e04e0943fd0..a9207ecbeb78c 100644 --- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java +++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java @@ -69,7 +69,7 @@ public void stop() {} @Override public boolean takeSnapshot(File snapshotDir) { try { - return new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath()); + return new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true); } catch (Exception e) { logger.error( "Exception occurs when taking snapshot for {}-{} in {}", diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index c1a6e5973315b..a2101997f9fd0 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@ -1055,10 +1055,6 @@ protected void getSeriesSchemas(InsertPlan insertPlan, DataRegion processor) } } - public String getSystemDir() { - return systemDir; - } - public TsFileFlushPolicy getFileFlushPolicy() { return fileFlushPolicy; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java index 4ea90265f55ff..d487a2b99d040 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java @@ -20,8 +20,10 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.conf.directories.DirectoryManager; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.engine.storagegroup.DataRegion; +import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; @@ -37,29 +39,28 @@ public class SnapshotLoader { private Logger LOGGER = LoggerFactory.getLogger(SnapshotLoader.class); private String storageGroupName; - private String dataDirPath; + private String snapshotPath; private String dataRegionId; - public SnapshotLoader(String dataDirPath, String storageGroupName, String dataRegionId) { - this.dataDirPath = dataDirPath; + public SnapshotLoader(String snapshotPath, String storageGroupName, String dataRegionId) { + this.snapshotPath = snapshotPath; this.storageGroupName = storageGroupName; this.dataRegionId = dataRegionId; } private DataRegion loadSnapshot() { - File dataDir = new File(dataDirPath); - if (!dataDir.exists()) { - throw new RuntimeException( - String.format("Failed to load snapshot from %s because it does not exist", dataDir)); - } try { return new DataRegion( - StorageEngine.getInstance().getSystemDir() + File.separator + storageGroupName, + IoTDBDescriptor.getInstance().getConfig().getSystemDir() + + File.separator + + "storage_groups" + + File.separator + + storageGroupName, dataRegionId, StorageEngine.getInstance().getFileFlushPolicy(), storageGroupName); } catch (Exception e) { - LOGGER.error("Exception occurs while load snapshot from {}", dataDir, e); + LOGGER.error("Exception occurs while load snapshot from {}", snapshotPath, e); return null; } } @@ -70,6 +71,40 @@ private DataRegion loadSnapshot() { * @return */ public DataRegion loadSnapshotForStateMachine() { + try { + deleteAllFilesInDataDirs(); + } catch (IOException e) { + return null; + } + + // move the snapshot data to data dir + String seqBaseDir = + IoTDBConstant.SEQUENCE_FLODER_NAME + + File.separator + + storageGroupName + + File.separator + + dataRegionId; + String unseqBaseDir = + IoTDBConstant.UNSEQUENCE_FLODER_NAME + + File.separator + + storageGroupName + + File.separator + + dataRegionId; + File sourceDataDir = new File(snapshotPath); + if (sourceDataDir.exists()) { + try { + createLinksFromSnapshotDirToDataDir(sourceDataDir, seqBaseDir, unseqBaseDir); + } catch (IOException | DiskSpaceInsufficientException e) { + LOGGER.error( + "Exception occurs when creating links from snapshot directory to data directory", e); + return null; + } + } + + return loadSnapshot(); + } + + private void deleteAllFilesInDataDirs() throws IOException { String[] dataDirPaths = IoTDBDescriptor.getInstance().getConfig().getDataDirs(); // delete @@ -119,45 +154,13 @@ public DataRegion loadSnapshotForStateMachine() { storageGroupName, dataRegionId, e); + throw e; } - - // move the snapshot data to data dir - String targetDataDir = dataDirPaths[0]; - File seqBaseDir = - new File( - targetDataDir - + File.separator - + IoTDBConstant.SEQUENCE_FLODER_NAME - + File.separator - + storageGroupName - + File.separator - + dataRegionId); - File unseqBaseDir = - new File( - targetDataDir - + File.separator - + IoTDBConstant.UNSEQUENCE_FLODER_NAME - + File.separator - + storageGroupName - + File.separator - + dataRegionId); - File sourceDataDir = new File(dataDirPath); - if (sourceDataDir.exists()) { - try { - createLinksFromSnapshotDirToDataDir(sourceDataDir, seqBaseDir, unseqBaseDir); - } catch (IOException e) { - LOGGER.error( - "Exception occurs when creating links from snapshot directory to data directory", e); - return null; - } - } - - this.dataDirPath = targetDataDir; - return loadSnapshot(); } private void createLinksFromSnapshotDirToDataDir( - File sourceDir, File seqBaseDir, File unseqBaseDir) throws IOException { + File sourceDir, String seqBaseDir, String unseqBaseDir) + throws IOException, DiskSpaceInsufficientException { File[] files = sourceDir.listFiles(); if (files == null) { return; @@ -170,7 +173,12 @@ private void createLinksFromSnapshotDirToDataDir( boolean seq = fileInfo[0].equals("seq"); String timePartition = fileInfo[3]; String fileName = fileInfo[4]; - File targetDirForThisTimePartition = new File(seq ? seqBaseDir : unseqBaseDir, timePartition); + String nextDataDir = + seq + ? DirectoryManager.getInstance().getNextFolderForSequenceFile() + : DirectoryManager.getInstance().getNextFolderForUnSequenceFile(); + File baseDir = new File(nextDataDir, seq ? seqBaseDir : unseqBaseDir); + File targetDirForThisTimePartition = new File(baseDir, timePartition); if (!targetDirForThisTimePartition.exists() && !targetDirForThisTimePartition.mkdirs()) { throw new IOException( String.format("Failed to make directory %s", targetDirForThisTimePartition)); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java index ce431e69c2609..1842b1b83068d 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java @@ -50,7 +50,7 @@ public SnapshotTaker(DataRegion dataRegion) { this.dataRegion = dataRegion; } - public boolean takeFullSnapshot(String snapshotDirPath) + public boolean takeFullSnapshot(String snapshotDirPath, boolean flushBeforeSnapshot) throws DirectoryNotLegalException, IOException { File snapshotDir = new File(snapshotDirPath); if (snapshotDir.exists() @@ -65,7 +65,9 @@ public boolean takeFullSnapshot(String snapshotDirPath) throw new IOException(String.format("Failed to create directory %s", snapshotDir)); } + if (flushBeforeSnapshot) { dataRegion.syncCloseAllWorkingTsFileProcessors(); + } List timePartitions = dataRegion.getTimePartitions(); for (Long timePartition : timePartitions) { @@ -75,6 +77,14 @@ public boolean takeFullSnapshot(String snapshotDirPath) createFileSnapshot(seqDataDirs, snapshotDir, true, timePartition); } catch (IOException e) { LOGGER.error("Fail to create snapshot", e); + File[] files = snapshotDir.listFiles(); + if (files != null) { + for (File file : files) { + if (!file.delete()) { + LOGGER.error("Failed to delete link file {} after failing to create snapshot", file); + } + } + } return false; } From 1fb507f1ddc1cad7ca802c983985374c28c45b83 Mon Sep 17 00:00:00 2001 From: LiuXuxin Date: Wed, 18 May 2022 19:37:50 +0800 Subject: [PATCH 15/18] use StorageEngineV2 instead of StorageEngine --- .../iotdb/db/integration/IoTDBSnapshotIT.java | 22 +++++++++---------- .../statemachine/DataRegionStateMachine.java | 11 ++++------ .../apache/iotdb/db/engine/StorageEngine.java | 13 ----------- .../iotdb/db/engine/StorageEngineV2.java | 14 ++++++++++++ .../db/engine/snapshot/SnapshotLoader.java | 4 ++-- 5 files changed, 30 insertions(+), 34 deletions(-) diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java index 322f48a6f7443..9b72f49c5f05c 100644 --- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java +++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSnapshotIT.java @@ -18,12 +18,12 @@ */ package org.apache.iotdb.db.integration; +import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; -import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.constant.TestConstant; -import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.engine.StorageEngineV2; import org.apache.iotdb.db.engine.cache.ChunkCache; import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache; import org.apache.iotdb.db.engine.snapshot.SnapshotLoader; @@ -83,7 +83,7 @@ public void testTakeSnapshot() statement.execute("flush"); } - DataRegion region = StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME)); + DataRegion region = StorageEngineV2.getInstance().getDataRegion(new DataRegionId(0)); File snapshotDir = new File(TestConstant.OUTPUT_DATA_DIR, "snapshot"); if (snapshotDir.exists()) { FileUtils.forceDelete(snapshotDir); @@ -129,7 +129,7 @@ public void testTakeSnapshotInNotEmptyDir() statement.execute("flush"); } - DataRegion region = StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME)); + DataRegion region = StorageEngineV2.getInstance().getDataRegion(new DataRegionId(0)); File snapshotDir = new File(TestConstant.OUTPUT_DATA_DIR, "snapshot"); if (!snapshotDir.exists()) { snapshotDir.mkdirs(); @@ -172,16 +172,15 @@ public void testLoadSnapshot() } } - DataRegion region = StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME)); + DataRegion region = StorageEngineV2.getInstance().getDataRegion(new DataRegionId(0)); File snapshotDir = new File(TestConstant.OUTPUT_DATA_DIR, "snapshot"); if (!snapshotDir.exists()) { snapshotDir.mkdirs(); } new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true); - StorageEngine.getInstance() + StorageEngineV2.getInstance() .setDataRegion( - new PartialPath(SG_NAME), - "0", + new DataRegionId(0), new SnapshotLoader(snapshotDir.getAbsolutePath(), SG_NAME, "0") .loadSnapshotForStateMachine()); @@ -236,13 +235,12 @@ public void testTakeAndLoadSnapshotWhenCompaction() } IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(true); statement.execute("merge"); - DataRegion region = StorageEngine.getInstance().getProcessor(new PartialPath(SG_NAME)); + DataRegion region = StorageEngineV2.getInstance().getDataRegion(new DataRegionId(0)); new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true); region.abortCompaction(); - StorageEngine.getInstance() + StorageEngineV2.getInstance() .setDataRegion( - new PartialPath(SG_NAME), - "0", + new DataRegionId(0), new SnapshotLoader(snapshotDir.getAbsolutePath(), SG_NAME, "0") .loadSnapshotForStateMachine()); ChunkCache.getInstance().clear(); diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java index bec0c363974cd..907d97f74433e 100644 --- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java +++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java @@ -20,10 +20,10 @@ package org.apache.iotdb.db.consensus.statemachine; import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.db.consensus.statemachine.visitor.DataExecutionVisitor; -import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.engine.StorageEngineV2; import org.apache.iotdb.db.engine.snapshot.SnapshotLoader; import org.apache.iotdb.db.engine.snapshot.SnapshotTaker; import org.apache.iotdb.db.engine.storagegroup.DataRegion; @@ -79,11 +79,8 @@ public void loadSnapshot(File latestSnapshotRootDir) { region.getDataRegionId()) .loadSnapshotForStateMachine(); try { - StorageEngine.getInstance() - .setDataRegion( - new PartialPath(region.getLogicalStorageGroupName()), - region.getDataRegionId(), - region); + StorageEngineV2.getInstance() + .setDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionId())), region); } catch (Exception e) { logger.error("Exception occurs when replacing data region in storage engine.", e); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index a2101997f9fd0..a91ef781ffb55 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@ -1055,19 +1055,6 @@ protected void getSeriesSchemas(InsertPlan insertPlan, DataRegion processor) } } - public TsFileFlushPolicy getFileFlushPolicy() { - return fileFlushPolicy; - } - - public void setDataRegion( - PartialPath logicalStorageGroup, String dataRegionId, DataRegion dataRegion) - throws MetadataException { - StorageGroupManager manager = - getStorageGroupManager( - IoTDB.schemaProcessor.getStorageGroupNodeByPath(logicalStorageGroup)); - manager.setDataRegion(Integer.parseInt(dataRegionId), dataRegion); - } - static class InstanceHolder { private static final StorageEngine INSTANCE = new StorageEngine(); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java index 30a716512f316..a494675ce0e07 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java @@ -601,6 +601,20 @@ public DataRegion getDataRegion(DataRegionId regionId) { return dataRegionMap.get(regionId); } + public void setDataRegion(DataRegionId regionId, DataRegion newRegion) { + if (dataRegionMap.containsKey(regionId)) { + DataRegion oldRegion = dataRegionMap.get(regionId); + oldRegion.syncCloseAllWorkingTsFileProcessors(); + ; + oldRegion.abortCompaction(); + } + dataRegionMap.put(regionId, newRegion); + } + + public TsFileFlushPolicy getFileFlushPolicy() { + return fileFlushPolicy; + } + static class InstanceHolder { private static final StorageEngineV2 INSTANCE = new StorageEngineV2(); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java index d487a2b99d040..476a6455335ed 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java @@ -21,7 +21,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.directories.DirectoryManager; -import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.engine.StorageEngineV2; import org.apache.iotdb.db.engine.storagegroup.DataRegion; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; @@ -57,7 +57,7 @@ private DataRegion loadSnapshot() { + File.separator + storageGroupName, dataRegionId, - StorageEngine.getInstance().getFileFlushPolicy(), + StorageEngineV2.getInstance().getFileFlushPolicy(), storageGroupName); } catch (Exception e) { LOGGER.error("Exception occurs while load snapshot from {}", snapshotPath, e); From 1e74ceac3d7ecdd4fbe3fe25969912a13896f852 Mon Sep 17 00:00:00 2001 From: LiuXuxin Date: Wed, 18 May 2022 21:23:00 +0800 Subject: [PATCH 16/18] remove setDataRegion in SGM --- .../storagegroup/dataregion/StorageGroupManager.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java index 79dcc03ac7cc8..393ba12e84d55 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java @@ -497,11 +497,4 @@ public AtomicBoolean getIsSettling() { return isSettling; } - public void setDataRegion(int dataRegionId, DataRegion dataRegion) { - if (this.dataRegion[dataRegionId] != null) { - this.dataRegion[dataRegionId].abortCompaction(); - this.dataRegion[dataRegionId].asyncCloseAllWorkingTsFileProcessors(); - } - this.dataRegion[dataRegionId] = dataRegion; - } } From ed82eef607013c6ed654364229de2429b00104f4 Mon Sep 17 00:00:00 2001 From: LiuXuxin Date: Wed, 18 May 2022 21:24:20 +0800 Subject: [PATCH 17/18] format the code --- .../db/engine/storagegroup/dataregion/StorageGroupManager.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java index 393ba12e84d55..aa7f8c2c260ea 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/dataregion/StorageGroupManager.java @@ -496,5 +496,4 @@ public void abortCompaction() { public AtomicBoolean getIsSettling() { return isSettling; } - } From 1a1aa8da109e2f489c43fb7807db7372d2925036 Mon Sep 17 00:00:00 2001 From: LiuXuxin Date: Wed, 18 May 2022 22:24:32 +0800 Subject: [PATCH 18/18] check if new region is null when loading snapshot --- .../db/consensus/statemachine/DataRegionStateMachine.java | 7 ++++++- .../java/org/apache/iotdb/db/engine/StorageEngineV2.java | 1 - 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java index 907d97f74433e..1decc0b84d379 100644 --- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java +++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java @@ -72,12 +72,17 @@ public boolean takeSnapshot(File snapshotDir) { @Override public void loadSnapshot(File latestSnapshotRootDir) { - this.region = + DataRegion newRegion = new SnapshotLoader( latestSnapshotRootDir.getAbsolutePath(), region.getLogicalStorageGroupName(), region.getDataRegionId()) .loadSnapshotForStateMachine(); + if (newRegion == null) { + logger.error("Fail to load snapshot from {}", latestSnapshotRootDir); + return; + } + this.region = newRegion; try { StorageEngineV2.getInstance() .setDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionId())), region); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java index a494675ce0e07..94ea120d20376 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java @@ -605,7 +605,6 @@ public void setDataRegion(DataRegionId regionId, DataRegion newRegion) { if (dataRegionMap.containsKey(regionId)) { DataRegion oldRegion = dataRegionMap.get(regionId); oldRegion.syncCloseAllWorkingTsFileProcessors(); - ; oldRegion.abortCompaction(); } dataRegionMap.put(regionId, newRegion);