From 4f99a89a5bfe1bec458609dc4548ef01b3f62165 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 9 Apr 2026 10:37:01 +0800 Subject: [PATCH 1/9] fix --- .../resource/memory/PipeMemoryWeightUtil.java | 58 +------------------ 1 file changed, 3 insertions(+), 55 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java index c9b21d780ee98..fc072c4d1ce87 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java @@ -33,10 +33,10 @@ import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.TsPrimitiveType; import org.apache.tsfile.write.record.Tablet; -import org.apache.tsfile.write.schema.MeasurementSchema; import java.util.List; import java.util.Map; +import java.util.Objects; public class PipeMemoryWeightUtil { @@ -192,60 +192,8 @@ private static Pair calculateTabletRowCountAndMemoryBySize( } } - public static long calculateTabletSizeInBytes(Tablet tablet) { - long totalSizeInBytes = 0; - - if (tablet == null) { - return totalSizeInBytes; - } - - // timestamps - if (tablet.timestamps != null) { - totalSizeInBytes += tablet.timestamps.length * 8L; - } - - // values - final List timeseries = tablet.getSchemas(); - if (timeseries != null) { - for (int column = 0; column < timeseries.size(); column++) { - final MeasurementSchema measurementSchema = timeseries.get(column); - if (measurementSchema == null) { - continue; - } - - final TSDataType tsDataType = measurementSchema.getType(); - if (tsDataType == null) { - continue; - } - - if (tsDataType.isBinary()) { - if (tablet.values == null || tablet.values.length <= column) { - continue; - } - final Binary[] values = ((Binary[]) tablet.values[column]); - if (values == null) { - continue; - } - for (Binary value : values) { - totalSizeInBytes += value == null ? 8 : value.ramBytesUsed(); - } - } else { - totalSizeInBytes += (long) tablet.getMaxRowNumber() * tsDataType.getDataTypeSize(); - } - } - } - - // bitMaps - if (tablet.bitMaps != null) { - for (int i = 0; i < tablet.bitMaps.length; i++) { - totalSizeInBytes += tablet.bitMaps[i] == null ? 0 : tablet.bitMaps[i].getSize(); - } - } - - // estimate other dataStructures size - totalSizeInBytes += 100; - - return totalSizeInBytes; + public static long calculateTabletSizeInBytes(final Tablet tablet) { + return Objects.nonNull(tablet) ? tablet.ramBytesUsed() : 0L; } public static int calculateBatchDataRamBytesUsed(BatchData batchData) { From ec62fa082039c6332edd318e73acc0d14db5e6c0 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 9 Apr 2026 11:22:59 +0800 Subject: [PATCH 2/9] fix --- .../resource/memory/PipeMemoryWeightUtil.java | 33 ++++++++++++------- .../iotdb/commons/conf/CommonConfig.java | 4 ++- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java index fc072c4d1ce87..3b9bc800f84c8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java @@ -107,7 +107,10 @@ public static Pair calculateTabletRowCountAndMemory(RowRecord } } - return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes, schemaCount); + return calculateTabletRowCountAndMemoryBySize( + totalSizeInBytes, + schemaCount, + PipeConfig.getInstance().getPipeDataStructureTabletRowSize()); } /** @@ -152,7 +155,8 @@ public static Pair calculateTabletRowCountAndMemory(BatchData } } - return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes, schemaCount); + return calculateTabletRowCountAndMemoryBySize( + totalSizeInBytes, schemaCount, batchData.length()); } /** @@ -162,22 +166,28 @@ public static Pair calculateTabletRowCountAndMemory(BatchData * @return left is the row count of tablet, right is the memory cost of tablet in bytes */ public static Pair calculateTabletRowCountAndMemory(PipeRow row) { - return calculateTabletRowCountAndMemoryBySize(row.getCurrentRowSize(), row.size()); + return calculateTabletRowCountAndMemoryBySize( + row.getCurrentRowSize(), + row.size(), + PipeConfig.getInstance().getPipeDataStructureTabletRowSize()); } private static Pair calculateTabletRowCountAndMemoryBySize( - int rowSize, int schemaCount) { - if (rowSize <= 0) { + int rowBytesUsed, int schemaCount, int inputNum) { + if (rowBytesUsed <= 0) { return new Pair<>(1, 0); } // Calculate row number according to the max size of a pipe tablet. // "-100" is the estimated size of other data structures in a pipe tablet. // "*8" converts bytes to bits, because the bitmap size is 1 bit per schema. - int rowNumber = - 8 - * (PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes() - 100) - / (8 * rowSize + schemaCount); + // Here we estimate the max use of + int sizeLimit = + Math.min( + PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes(), + (int) (inputNum * rowBytesUsed * 1.2)); + + int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount); rowNumber = Math.max(1, rowNumber); if ( // This means the row number is larger than the max row count of a pipe tablet @@ -185,10 +195,9 @@ private static Pair calculateTabletRowCountAndMemoryBySize( // Bound the row number, the memory cost is rowSize * rowNumber return new Pair<>( PipeConfig.getInstance().getPipeDataStructureTabletRowSize(), - rowSize * PipeConfig.getInstance().getPipeDataStructureTabletRowSize()); + rowBytesUsed * PipeConfig.getInstance().getPipeDataStructureTabletRowSize()); } else { - return new Pair<>( - rowNumber, PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes()); + return new Pair<>(rowNumber, sizeLimit); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 44fd17c3cfe65..876f840b9135a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -214,7 +214,9 @@ public class CommonConfig { private boolean pipeRetryLocallyForParallelOrUserConflict = true; private int pipeDataStructureTabletRowSize = 2048; - private int pipeDataStructureTabletSizeInBytes = 2097152; + + // 128MB + private int pipeDataStructureTabletSizeInBytes = 128 * 1024 * 1024; private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 0.3; private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold = 0.3; private volatile double pipeTotalFloatingMemoryProportion = 0.5; From d5dad06117225ed688e6c4f5f19e7223745ff8e4 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 9 Apr 2026 14:07:46 +0800 Subject: [PATCH 3/9] push --- .../event/TsFileInsertionDataContainerTest.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java index 2674e8b89552c..6020798a66a8c 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java @@ -25,6 +25,8 @@ import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer; import org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer; import org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; @@ -47,6 +49,7 @@ import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,9 +81,19 @@ public class TsFileInsertionDataContainerTest { private File alignedTsFile; private File nonalignedTsFile; private TsFileResource resource; + private boolean isPipeMemoryManagementEnabled; + + @Before + public void setUp() throws Exception { + isPipeMemoryManagementEnabled = PipeConfig.getInstance().getPipeMemoryManagementEnabled(); + CommonDescriptor.getInstance().getConfig().setPipeMemoryManagementEnabled(false); + } @After public void tearDown() throws Exception { + CommonDescriptor.getInstance() + .getConfig() + .setPipeMemoryManagementEnabled(isPipeMemoryManagementEnabled); if (alignedTsFile != null) { alignedTsFile.delete(); } From 67eb3bc183909290cc7b73cdc1cf408b13b69121 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 9 Apr 2026 14:12:43 +0800 Subject: [PATCH 4/9] ger-limit --- .../main/java/org/apache/iotdb/commons/conf/CommonConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 876f840b9135a..f8e845a7af25c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -216,7 +216,7 @@ public class CommonConfig { private int pipeDataStructureTabletRowSize = 2048; // 128MB - private int pipeDataStructureTabletSizeInBytes = 128 * 1024 * 1024; + private int pipeDataStructureTabletSizeInBytes = 60 * 1024 * 1024; private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 0.3; private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold = 0.3; private volatile double pipeTotalFloatingMemoryProportion = 0.5; From 0ec45ae96e7f0659aecddec7e30631b1da575017 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:03:06 +0800 Subject: [PATCH 5/9] fix --- .../iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java index 6020798a66a8c..bd4e3923815f4 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java @@ -19,14 +19,14 @@ package org.apache.iotdb.db.pipe.event; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern; import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer; import org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer; import org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer; -import org.apache.iotdb.commons.conf.CommonDescriptor; -import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; From facfc5ec23abbe784300d766dbbbeeabfe1e6009 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 9 Apr 2026 10:37:01 +0800 Subject: [PATCH 6/9] [To dev/1.3] Pipe: Fixed the OOM bug of tablet memory calculation & Optimized the tablet size by memory estimation --- .../resource/memory/PipeMemoryWeightUtil.java | 91 +++++-------------- .../TsFileInsertionDataContainerTest.java | 13 +++ .../iotdb/commons/conf/CommonConfig.java | 4 +- 3 files changed, 40 insertions(+), 68 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java index c9b21d780ee98..3b9bc800f84c8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java @@ -33,10 +33,10 @@ import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.TsPrimitiveType; import org.apache.tsfile.write.record.Tablet; -import org.apache.tsfile.write.schema.MeasurementSchema; import java.util.List; import java.util.Map; +import java.util.Objects; public class PipeMemoryWeightUtil { @@ -107,7 +107,10 @@ public static Pair calculateTabletRowCountAndMemory(RowRecord } } - return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes, schemaCount); + return calculateTabletRowCountAndMemoryBySize( + totalSizeInBytes, + schemaCount, + PipeConfig.getInstance().getPipeDataStructureTabletRowSize()); } /** @@ -152,7 +155,8 @@ public static Pair calculateTabletRowCountAndMemory(BatchData } } - return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes, schemaCount); + return calculateTabletRowCountAndMemoryBySize( + totalSizeInBytes, schemaCount, batchData.length()); } /** @@ -162,22 +166,28 @@ public static Pair calculateTabletRowCountAndMemory(BatchData * @return left is the row count of tablet, right is the memory cost of tablet in bytes */ public static Pair calculateTabletRowCountAndMemory(PipeRow row) { - return calculateTabletRowCountAndMemoryBySize(row.getCurrentRowSize(), row.size()); + return calculateTabletRowCountAndMemoryBySize( + row.getCurrentRowSize(), + row.size(), + PipeConfig.getInstance().getPipeDataStructureTabletRowSize()); } private static Pair calculateTabletRowCountAndMemoryBySize( - int rowSize, int schemaCount) { - if (rowSize <= 0) { + int rowBytesUsed, int schemaCount, int inputNum) { + if (rowBytesUsed <= 0) { return new Pair<>(1, 0); } // Calculate row number according to the max size of a pipe tablet. // "-100" is the estimated size of other data structures in a pipe tablet. // "*8" converts bytes to bits, because the bitmap size is 1 bit per schema. - int rowNumber = - 8 - * (PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes() - 100) - / (8 * rowSize + schemaCount); + // Here we estimate the max use of + int sizeLimit = + Math.min( + PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes(), + (int) (inputNum * rowBytesUsed * 1.2)); + + int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount); rowNumber = Math.max(1, rowNumber); if ( // This means the row number is larger than the max row count of a pipe tablet @@ -185,67 +195,14 @@ private static Pair calculateTabletRowCountAndMemoryBySize( // Bound the row number, the memory cost is rowSize * rowNumber return new Pair<>( PipeConfig.getInstance().getPipeDataStructureTabletRowSize(), - rowSize * PipeConfig.getInstance().getPipeDataStructureTabletRowSize()); + rowBytesUsed * PipeConfig.getInstance().getPipeDataStructureTabletRowSize()); } else { - return new Pair<>( - rowNumber, PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes()); + return new Pair<>(rowNumber, sizeLimit); } } - public static long calculateTabletSizeInBytes(Tablet tablet) { - long totalSizeInBytes = 0; - - if (tablet == null) { - return totalSizeInBytes; - } - - // timestamps - if (tablet.timestamps != null) { - totalSizeInBytes += tablet.timestamps.length * 8L; - } - - // values - final List timeseries = tablet.getSchemas(); - if (timeseries != null) { - for (int column = 0; column < timeseries.size(); column++) { - final MeasurementSchema measurementSchema = timeseries.get(column); - if (measurementSchema == null) { - continue; - } - - final TSDataType tsDataType = measurementSchema.getType(); - if (tsDataType == null) { - continue; - } - - if (tsDataType.isBinary()) { - if (tablet.values == null || tablet.values.length <= column) { - continue; - } - final Binary[] values = ((Binary[]) tablet.values[column]); - if (values == null) { - continue; - } - for (Binary value : values) { - totalSizeInBytes += value == null ? 8 : value.ramBytesUsed(); - } - } else { - totalSizeInBytes += (long) tablet.getMaxRowNumber() * tsDataType.getDataTypeSize(); - } - } - } - - // bitMaps - if (tablet.bitMaps != null) { - for (int i = 0; i < tablet.bitMaps.length; i++) { - totalSizeInBytes += tablet.bitMaps[i] == null ? 0 : tablet.bitMaps[i].getSize(); - } - } - - // estimate other dataStructures size - totalSizeInBytes += 100; - - return totalSizeInBytes; + public static long calculateTabletSizeInBytes(final Tablet tablet) { + return Objects.nonNull(tablet) ? tablet.ramBytesUsed() : 0L; } public static int calculateBatchDataRamBytesUsed(BatchData batchData) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java index 2674e8b89552c..bd4e3923815f4 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.pipe.event; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern; @@ -47,6 +49,7 @@ import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,9 +81,19 @@ public class TsFileInsertionDataContainerTest { private File alignedTsFile; private File nonalignedTsFile; private TsFileResource resource; + private boolean isPipeMemoryManagementEnabled; + + @Before + public void setUp() throws Exception { + isPipeMemoryManagementEnabled = PipeConfig.getInstance().getPipeMemoryManagementEnabled(); + CommonDescriptor.getInstance().getConfig().setPipeMemoryManagementEnabled(false); + } @After public void tearDown() throws Exception { + CommonDescriptor.getInstance() + .getConfig() + .setPipeMemoryManagementEnabled(isPipeMemoryManagementEnabled); if (alignedTsFile != null) { alignedTsFile.delete(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 44fd17c3cfe65..f8e845a7af25c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -214,7 +214,9 @@ public class CommonConfig { private boolean pipeRetryLocallyForParallelOrUserConflict = true; private int pipeDataStructureTabletRowSize = 2048; - private int pipeDataStructureTabletSizeInBytes = 2097152; + + // 128MB + private int pipeDataStructureTabletSizeInBytes = 60 * 1024 * 1024; private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 0.3; private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold = 0.3; private volatile double pipeTotalFloatingMemoryProportion = 0.5; From b9d1d2657f99d93329d27d8b02892a1b1217adc9 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 9 Apr 2026 18:08:57 +0800 Subject: [PATCH 7/9] fix --- .../pipe/receiver/IoTDBFileReceiver.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java index 65283916939e9..82b85e3f8284e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java @@ -141,11 +141,11 @@ protected TPipeTransferResp handleTransferHandshakeV1(final PipeTransferHandshak } catch (Exception e) { PipeLogger.log( LOGGER::warn, + e, "Receiver id = %s: Failed to delete original receiver file dir %s, because %s.", receiverId.get(), receiverFileDirWithIdSuffix.get().getPath(), - e.getMessage(), - e); + e.getMessage()); } } else { if (LOGGER.isDebugEnabled()) { @@ -487,11 +487,11 @@ private void closeCurrentWritingFileWriter(final boolean fsyncAfterClose) { } catch (final Exception e) { PipeLogger.log( LOGGER::warn, + e, "Receiver id = %s: Failed to close current writing file writer %s, because %s.", receiverId.get(), writingFile == null ? "null" : writingFile.getPath(), - e.getMessage(), - e); + e.getMessage()); } writingFileWriter = null; } else { @@ -526,11 +526,11 @@ private void deleteFile(final File file) { } catch (final Exception e) { PipeLogger.log( LOGGER::warn, + e, "Receiver id = %s: Failed to delete original writing file %s, because %s.", receiverId.get(), file.getPath(), - e.getMessage(), - e); + e.getMessage()); } } else { if (LOGGER.isDebugEnabled()) { @@ -611,11 +611,11 @@ protected final TPipeTransferResp handleTransferFileSealV1(final PipeTransferFil } catch (final Exception e) { PipeLogger.log( LOGGER::warn, + e, "Receiver id = %s: Failed to seal file %s from req %s.", receiverId.get(), writingFile, - req, - e); + req); return new TPipeTransferResp( RpcUtils.getStatus( TSStatusCode.PIPE_TRANSFER_FILE_ERROR, @@ -698,11 +698,11 @@ protected final TPipeTransferResp handleTransferFileSealV2(final PipeTransferFil } catch (final Exception e) { PipeLogger.log( LOGGER::warn, + e, "Receiver id = %s: Failed to seal file %s from req %s.", receiverId.get(), files, - req, - e); + req); return new TPipeTransferResp( RpcUtils.getStatus( TSStatusCode.PIPE_TRANSFER_FILE_ERROR, From 696597fee719fd9f2b3d2758e9b39fe048070fad Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:23:11 +0800 Subject: [PATCH 8/9] fix --- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 +++++++++++ .../container/TsFileInsertionDataContainer.java | 4 ++-- .../scan/TsFileInsertionScanDataContainer.java | 3 ++- .../db/pipe/resource/memory/PipeMemoryWeightUtil.java | 3 ++- 4 files changed, 17 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index b2655e350a2c6..a516f827ef381 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -24,6 +24,8 @@ import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.enums.ReadConsistencyLevel; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.consensus.ConsensusFactory; @@ -3388,6 +3390,15 @@ public void setPartitionCacheSize(int partitionCacheSize) { this.partitionCacheSize = partitionCacheSize; } + public int getPipeDataStructureTabletSizeInBytes() { + int size = PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes(); + if (size > thriftMaxFrameSize) { + size = (int) (thriftMaxFrameSize * 0.8); + CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletSizeInBytes(size); + } + return size; + } + public int getAuthorCacheSize() { return authorCacheSize; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java index cbbfea0a5b2a5..279996690ff92 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java @@ -21,9 +21,9 @@ import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; -import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; @@ -92,7 +92,7 @@ protected TsFileInsertionDataContainer( this.allocatedMemoryBlockForTablet = PipeDataNodeResourceManager.memory() .forceAllocateForTabletWithRetry( - PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes()); + IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes()); } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java index ce2c2b8e467d6..271bc317d0f96 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil; @@ -124,7 +125,7 @@ public TsFileInsertionScanDataContainer( this.allocatedMemoryBlockForBatchData = PipeDataNodeResourceManager.memory() .forceAllocateForTabletWithRetry( - PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes()); + IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes()); this.allocatedMemoryBlockForChunk = PipeDataNodeResourceManager.memory() .forceAllocateForTabletWithRetry(PipeConfig.getInstance().getPipeMaxReaderChunkSize()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java index 3b9bc800f84c8..a707f554c5184 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.resource.memory; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.event.common.row.PipeRow; import org.apache.iotdb.db.utils.MemUtils; @@ -184,7 +185,7 @@ private static Pair calculateTabletRowCountAndMemoryBySize( // Here we estimate the max use of int sizeLimit = Math.min( - PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes(), + IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(), (int) (inputNum * rowBytesUsed * 1.2)); int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount); From aa07b93ab1fe00f8264d15954a99f2fbf3d7449e Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 10 Apr 2026 10:07:07 +0800 Subject: [PATCH 9/9] fix --- .../src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java | 1 - 1 file changed, 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index a516f827ef381..989dc75746400 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -24,7 +24,6 @@ import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; -import org.apache.iotdb.commons.enums.ReadConsistencyLevel; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.commons.utils.TestOnly;