diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java index 0e8ff890b82fb..03835e7743bf3 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java @@ -589,7 +589,8 @@ public void testTransferMods() { Collections.singleton("5,")); TestUtils.executeNonQueries( - senderEnv, Arrays.asList("drop pipe test_history", "drop pipe test_realtime")); + senderEnv, + Arrays.asList("drop pipe if exists test_history", "drop pipe if exists test_realtime")); TestUtils.executeNonQuery(receiverEnv, "drop database root.**"); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 69eab2d79cf28..31f446df27031 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.enums.ReadConsistencyLevel; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.db.exception.LoadConfigurationException; @@ -3287,6 +3288,15 @@ public void setPartitionCacheSize(int partitionCacheSize) { this.partitionCacheSize = partitionCacheSize; } + public int getPipeDataStructureTabletSizeInBytes() { + int size = PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes(); + if (size > thriftMaxFrameSize) { + size = (int) (thriftMaxFrameSize * 0.8); + CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletSizeInBytes(size); + } + return size; + } + public int getAuthorCacheSize() { return authorCacheSize; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java index 82f3e0ea2b841..678e7a4a62a5d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java @@ -22,9 +22,9 @@ import org.apache.iotdb.commons.audit.IAuditEntity; import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; -import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; @@ -106,7 +106,7 @@ protected TsFileInsertionEventParser( this.allocatedMemoryBlockForTablet = PipeDataNodeResourceManager.memory() .forceAllocateForTabletWithRetry( - PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes()); + IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes()); } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index 2bd22c8b0bc07..da9d7d00477b9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -29,6 +29,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.db.auth.AuthorityChecker; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser; @@ -135,7 +136,7 @@ public TsFileInsertionEventScanParser( this.allocatedMemoryBlockForBatchData = PipeDataNodeResourceManager.memory() .forceAllocateForTabletWithRetry( - PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes()); + IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes()); this.allocatedMemoryBlockForChunk = PipeDataNodeResourceManager.memory() .forceAllocateForTabletWithRetry(PipeConfig.getInstance().getPipeMaxReaderChunkSize()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java index e353163e726ab..af2fd214e4a30 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java @@ -93,7 +93,7 @@ public TsFileInsertionEventTableParser( .forceAllocateForTabletWithRetry(currentModifications.ramBytesUsed()); long tableSize = Math.min( - PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes(), + IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(), IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize()); this.allocatedMemoryBlockForChunk = @@ -107,7 +107,9 @@ public TsFileInsertionEventTableParser( this.allocatedMemoryBlockForTableSchemas = PipeDataNodeResourceManager.memory() .forceAllocateForTabletWithRetry( - PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes()); + IoTDBDescriptor.getInstance() + .getConfig() + .getPipeDataStructureTabletSizeInBytes()); this.startTime = startTime; this.endTime = endTime; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java index 458434d84942b..833bd3577eb68 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.resource.memory; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.event.common.row.PipeRow; import org.apache.iotdb.db.utils.MemUtils; @@ -118,7 +119,10 @@ public static Pair calculateTabletRowCountAndMemory(RowRecord } } - return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes, schemaCount); + return calculateTabletRowCountAndMemoryBySize( + totalSizeInBytes, + schemaCount, + PipeConfig.getInstance().getPipeDataStructureTabletRowSize()); } /** @@ -163,7 +167,8 @@ public static Pair calculateTabletRowCountAndMemory(BatchData } } - return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes, schemaCount); + return calculateTabletRowCountAndMemoryBySize( + totalSizeInBytes, schemaCount, batchData.length()); } /** @@ -173,22 +178,28 @@ public static Pair calculateTabletRowCountAndMemory(BatchData * @return left is the row count of tablet, right is the memory cost of tablet in bytes */ public static Pair calculateTabletRowCountAndMemory(PipeRow row) { - return calculateTabletRowCountAndMemoryBySize(row.getCurrentRowSize(), row.size()); + return calculateTabletRowCountAndMemoryBySize( + row.getCurrentRowSize(), + row.size(), + PipeConfig.getInstance().getPipeDataStructureTabletRowSize()); } private static Pair calculateTabletRowCountAndMemoryBySize( - int rowSize, int schemaCount) { - if (rowSize <= 0) { + int rowBytesUsed, int schemaCount, int inputNum) { + if (rowBytesUsed <= 0) { return new Pair<>(1, 0); } // Calculate row number according to the max size of a pipe tablet. // "-100" is the estimated size of other data structures in a pipe tablet. // "*8" converts bytes to bits, because the bitmap size is 1 bit per schema. - int rowNumber = - 8 - * (PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes() - 100) - / (8 * rowSize + schemaCount); + // Here we estimate the max use of + int sizeLimit = + Math.min( + IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(), + (int) (inputNum * rowBytesUsed * 1.2)); + + int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount); rowNumber = Math.max(1, rowNumber); if ( // This means the row number is larger than the max row count of a pipe tablet @@ -196,10 +207,9 @@ private static Pair calculateTabletRowCountAndMemoryBySize( // Bound the row number, the memory cost is rowSize * rowNumber return new Pair<>( PipeConfig.getInstance().getPipeDataStructureTabletRowSize(), - rowSize * PipeConfig.getInstance().getPipeDataStructureTabletRowSize()); + rowBytesUsed * PipeConfig.getInstance().getPipeDataStructureTabletRowSize()); } else { - return new Pair<>( - rowNumber, PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes()); + return new Pair<>(rowNumber, sizeLimit); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java index 8814190755e27..a2e7c558ea0b5 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java @@ -19,7 +19,9 @@ package org.apache.iotdb.db.pipe.event; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixTreePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; @@ -48,6 +50,7 @@ import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,9 +82,19 @@ public class TsFileInsertionEventParserTest { private File alignedTsFile; private File nonalignedTsFile; private TsFileResource resource; + private boolean isPipeMemoryManagementEnabled; + + @Before + public void setUp() throws Exception { + isPipeMemoryManagementEnabled = PipeConfig.getInstance().getPipeMemoryManagementEnabled(); + CommonDescriptor.getInstance().getConfig().setPipeMemoryManagementEnabled(false); + } @After public void tearDown() throws Exception { + CommonDescriptor.getInstance() + .getConfig() + .setPipeMemoryManagementEnabled(isPipeMemoryManagementEnabled); if (alignedTsFile != null) { alignedTsFile.delete(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index ff4a47b6f84ab..754ea976a5825 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -218,7 +218,9 @@ public class CommonConfig { private boolean pipeRetryLocallyForParallelOrUserConflict = true; private int pipeDataStructureTabletRowSize = 2048; - private int pipeDataStructureTabletSizeInBytes = 2097152; + + // 60MB + private int pipeDataStructureTabletSizeInBytes = 60 * 1024 * 1024; private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 0.3; private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold = 0.3; private volatile double pipeTotalFloatingMemoryProportion = 0.5; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java index ff8c8dbd293d8..d094c40dea580 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java @@ -145,11 +145,11 @@ protected TPipeTransferResp handleTransferHandshakeV1(final PipeTransferHandshak } catch (Exception e) { PipeLogger.log( LOGGER::warn, + e, "Receiver id = %s: Failed to delete original receiver file dir %s, because %s.", receiverId.get(), receiverFileDirWithIdSuffix.get().getPath(), - e.getMessage(), - e); + e.getMessage()); } } else { if (LOGGER.isDebugEnabled()) { @@ -183,9 +183,9 @@ protected TPipeTransferResp handleTransferHandshakeV1(final PipeTransferHandshak } catch (Exception e) { PipeLogger.log( LOGGER::warn, + e, "Receiver id = %s: Failed to create pipe receiver file folder because all disks of folders are full.", - receiverId.get(), - e); + receiverId.get()); return new TPipeTransferResp(StatusUtils.getStatus(TSStatusCode.DISK_SPACE_INSUFFICIENT)); } @@ -524,11 +524,11 @@ private void closeCurrentWritingFileWriter(final boolean fsyncBeforeClose) { } catch (final Exception e) { PipeLogger.log( LOGGER::warn, + e, "Receiver id = %s: Failed to close current writing file writer %s, because %s.", receiverId.get(), writingFile == null ? "null" : writingFile.getPath(), - e.getMessage(), - e); + e.getMessage()); } writingFileWriter = null; } else { @@ -563,11 +563,11 @@ private void deleteFile(final File file) { } catch (final Exception e) { PipeLogger.log( LOGGER::warn, + e, "Receiver id = %s: Failed to delete original writing file %s, because %s.", receiverId.get(), file.getPath(), - e.getMessage(), - e); + e.getMessage()); } } else { if (LOGGER.isDebugEnabled()) { @@ -648,11 +648,11 @@ protected final TPipeTransferResp handleTransferFileSealV1(final PipeTransferFil } catch (final Exception e) { PipeLogger.log( LOGGER::warn, + e, "Receiver id = %s: Failed to seal file %s from req %s.", receiverId.get(), writingFile, - req, - e); + req); return new TPipeTransferResp( RpcUtils.getStatus( TSStatusCode.PIPE_TRANSFER_FILE_ERROR, @@ -745,11 +745,11 @@ protected final TPipeTransferResp handleTransferFileSealV2(final PipeTransferFil } catch (final Exception e) { PipeLogger.log( LOGGER::warn, + e, "Receiver id = %s: Failed to seal file %s from req %s.", receiverId.get(), files, - req, - e); + req); return new TPipeTransferResp( RpcUtils.getStatus( TSStatusCode.PIPE_TRANSFER_FILE_ERROR,