diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index b2655e350a2c6..989dc75746400 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.consensus.ConsensusFactory; @@ -3388,6 +3389,15 @@ public void setPartitionCacheSize(int partitionCacheSize) { this.partitionCacheSize = partitionCacheSize; } + public int getPipeDataStructureTabletSizeInBytes() { + int size = PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes(); + if (size > thriftMaxFrameSize) { + size = (int) (thriftMaxFrameSize * 0.8); + CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletSizeInBytes(size); + } + return size; + } + public int getAuthorCacheSize() { return authorCacheSize; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java index cbbfea0a5b2a5..279996690ff92 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java @@ -21,9 +21,9 @@ import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; -import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; @@ -92,7 +92,7 @@ protected TsFileInsertionDataContainer( this.allocatedMemoryBlockForTablet = PipeDataNodeResourceManager.memory() .forceAllocateForTabletWithRetry( - PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes()); + IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes()); } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java index ce2c2b8e467d6..271bc317d0f96 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil; @@ -124,7 +125,7 @@ public TsFileInsertionScanDataContainer( this.allocatedMemoryBlockForBatchData = PipeDataNodeResourceManager.memory() .forceAllocateForTabletWithRetry( - PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes()); + IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes()); this.allocatedMemoryBlockForChunk = PipeDataNodeResourceManager.memory() .forceAllocateForTabletWithRetry(PipeConfig.getInstance().getPipeMaxReaderChunkSize()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java index c9b21d780ee98..a707f554c5184 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.resource.memory; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.event.common.row.PipeRow; import org.apache.iotdb.db.utils.MemUtils; @@ -33,10 +34,10 @@ import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.TsPrimitiveType; import org.apache.tsfile.write.record.Tablet; -import org.apache.tsfile.write.schema.MeasurementSchema; import java.util.List; import java.util.Map; +import java.util.Objects; public class PipeMemoryWeightUtil { @@ -107,7 +108,10 @@ public static Pair calculateTabletRowCountAndMemory(RowRecord } } - return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes, schemaCount); + return calculateTabletRowCountAndMemoryBySize( + totalSizeInBytes, + schemaCount, + PipeConfig.getInstance().getPipeDataStructureTabletRowSize()); } /** @@ -152,7 +156,8 @@ public static Pair calculateTabletRowCountAndMemory(BatchData } } - return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes, schemaCount); + return calculateTabletRowCountAndMemoryBySize( + totalSizeInBytes, schemaCount, batchData.length()); } /** @@ -162,22 +167,28 @@ public static Pair calculateTabletRowCountAndMemory(BatchData * @return left is the row count of tablet, right is the memory cost of tablet in bytes */ public static Pair calculateTabletRowCountAndMemory(PipeRow row) { - return calculateTabletRowCountAndMemoryBySize(row.getCurrentRowSize(), row.size()); + return calculateTabletRowCountAndMemoryBySize( + row.getCurrentRowSize(), + row.size(), + PipeConfig.getInstance().getPipeDataStructureTabletRowSize()); } private static Pair calculateTabletRowCountAndMemoryBySize( - int rowSize, int schemaCount) { - if (rowSize <= 0) { + int rowBytesUsed, int schemaCount, int inputNum) { + if (rowBytesUsed <= 0) { return new Pair<>(1, 0); } // Calculate row number according to the max size of a pipe tablet. // "-100" is the estimated size of other data structures in a pipe tablet. // "*8" converts bytes to bits, because the bitmap size is 1 bit per schema. - int rowNumber = - 8 - * (PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes() - 100) - / (8 * rowSize + schemaCount); + // Here we estimate the max use of + int sizeLimit = + Math.min( + IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(), + (int) (inputNum * rowBytesUsed * 1.2)); + + int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount); rowNumber = Math.max(1, rowNumber); if ( // This means the row number is larger than the max row count of a pipe tablet @@ -185,67 +196,14 @@ private static Pair calculateTabletRowCountAndMemoryBySize( // Bound the row number, the memory cost is rowSize * rowNumber return new Pair<>( PipeConfig.getInstance().getPipeDataStructureTabletRowSize(), - rowSize * PipeConfig.getInstance().getPipeDataStructureTabletRowSize()); + rowBytesUsed * PipeConfig.getInstance().getPipeDataStructureTabletRowSize()); } else { - return new Pair<>( - rowNumber, PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes()); + return new Pair<>(rowNumber, sizeLimit); } } - public static long calculateTabletSizeInBytes(Tablet tablet) { - long totalSizeInBytes = 0; - - if (tablet == null) { - return totalSizeInBytes; - } - - // timestamps - if (tablet.timestamps != null) { - totalSizeInBytes += tablet.timestamps.length * 8L; - } - - // values - final List timeseries = tablet.getSchemas(); - if (timeseries != null) { - for (int column = 0; column < timeseries.size(); column++) { - final MeasurementSchema measurementSchema = timeseries.get(column); - if (measurementSchema == null) { - continue; - } - - final TSDataType tsDataType = measurementSchema.getType(); - if (tsDataType == null) { - continue; - } - - if (tsDataType.isBinary()) { - if (tablet.values == null || tablet.values.length <= column) { - continue; - } - final Binary[] values = ((Binary[]) tablet.values[column]); - if (values == null) { - continue; - } - for (Binary value : values) { - totalSizeInBytes += value == null ? 8 : value.ramBytesUsed(); - } - } else { - totalSizeInBytes += (long) tablet.getMaxRowNumber() * tsDataType.getDataTypeSize(); - } - } - } - - // bitMaps - if (tablet.bitMaps != null) { - for (int i = 0; i < tablet.bitMaps.length; i++) { - totalSizeInBytes += tablet.bitMaps[i] == null ? 0 : tablet.bitMaps[i].getSize(); - } - } - - // estimate other dataStructures size - totalSizeInBytes += 100; - - return totalSizeInBytes; + public static long calculateTabletSizeInBytes(final Tablet tablet) { + return Objects.nonNull(tablet) ? tablet.ramBytesUsed() : 0L; } public static int calculateBatchDataRamBytesUsed(BatchData batchData) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java index 2674e8b89552c..bd4e3923815f4 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.pipe.event; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern; @@ -47,6 +49,7 @@ import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,9 +81,19 @@ public class TsFileInsertionDataContainerTest { private File alignedTsFile; private File nonalignedTsFile; private TsFileResource resource; + private boolean isPipeMemoryManagementEnabled; + + @Before + public void setUp() throws Exception { + isPipeMemoryManagementEnabled = PipeConfig.getInstance().getPipeMemoryManagementEnabled(); + CommonDescriptor.getInstance().getConfig().setPipeMemoryManagementEnabled(false); + } @After public void tearDown() throws Exception { + CommonDescriptor.getInstance() + .getConfig() + .setPipeMemoryManagementEnabled(isPipeMemoryManagementEnabled); if (alignedTsFile != null) { alignedTsFile.delete(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 44fd17c3cfe65..f8e845a7af25c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -214,7 +214,9 @@ public class CommonConfig { private boolean pipeRetryLocallyForParallelOrUserConflict = true; private int pipeDataStructureTabletRowSize = 2048; - private int pipeDataStructureTabletSizeInBytes = 2097152; + + // 128MB + private int pipeDataStructureTabletSizeInBytes = 60 * 1024 * 1024; private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 0.3; private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold = 0.3; private volatile double pipeTotalFloatingMemoryProportion = 0.5; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java index 65283916939e9..82b85e3f8284e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java @@ -141,11 +141,11 @@ protected TPipeTransferResp handleTransferHandshakeV1(final PipeTransferHandshak } catch (Exception e) { PipeLogger.log( LOGGER::warn, + e, "Receiver id = %s: Failed to delete original receiver file dir %s, because %s.", receiverId.get(), receiverFileDirWithIdSuffix.get().getPath(), - e.getMessage(), - e); + e.getMessage()); } } else { if (LOGGER.isDebugEnabled()) { @@ -487,11 +487,11 @@ private void closeCurrentWritingFileWriter(final boolean fsyncAfterClose) { } catch (final Exception e) { PipeLogger.log( LOGGER::warn, + e, "Receiver id = %s: Failed to close current writing file writer %s, because %s.", receiverId.get(), writingFile == null ? "null" : writingFile.getPath(), - e.getMessage(), - e); + e.getMessage()); } writingFileWriter = null; } else { @@ -526,11 +526,11 @@ private void deleteFile(final File file) { } catch (final Exception e) { PipeLogger.log( LOGGER::warn, + e, "Receiver id = %s: Failed to delete original writing file %s, because %s.", receiverId.get(), file.getPath(), - e.getMessage(), - e); + e.getMessage()); } } else { if (LOGGER.isDebugEnabled()) { @@ -611,11 +611,11 @@ protected final TPipeTransferResp handleTransferFileSealV1(final PipeTransferFil } catch (final Exception e) { PipeLogger.log( LOGGER::warn, + e, "Receiver id = %s: Failed to seal file %s from req %s.", receiverId.get(), writingFile, - req, - e); + req); return new TPipeTransferResp( RpcUtils.getStatus( TSStatusCode.PIPE_TRANSFER_FILE_ERROR, @@ -698,11 +698,11 @@ protected final TPipeTransferResp handleTransferFileSealV2(final PipeTransferFil } catch (final Exception e) { PipeLogger.log( LOGGER::warn, + e, "Receiver id = %s: Failed to seal file %s from req %s.", receiverId.get(), files, - req, - e); + req); return new TPipeTransferResp( RpcUtils.getStatus( TSStatusCode.PIPE_TRANSFER_FILE_ERROR,