From 8e4bb49de10bb04e6a1429579107b87508bc6bcf Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 8 Apr 2026 19:02:19 +0800 Subject: [PATCH 1/3] fix --- .../dataregion/realtime/PipeRealtimeDataRegionSource.java | 7 +++++++ .../task/connection/UnboundedBlockingPendingQueue.java | 4 ++++ 2 files changed, 11 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java index 1e47a48e500da..718529243d9d0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java @@ -427,6 +427,13 @@ protected void extractHeartbeat(final PipeRealtimeEvent event) { } protected void extractProgressReportEvent(final PipeRealtimeEvent event) { + // Remove any heartbeat events in front of this event to avoid OOM + // Since the batch and retry queue no longer need the heartbeat event to trigger + // And the progress report event can trigger the processor calculation because it's not reported + // yet + while (((PipeRealtimeEvent) pendingQueue.peekLast()).getEvent() instanceof PipeHeartbeatEvent) { + pendingQueue.pollLast(); + } if (pendingQueue.peekLast() instanceof ProgressReportEvent) { final ProgressReportEvent oldEvent = (ProgressReportEvent) pendingQueue.peekLast(); oldEvent.bindProgressIndex( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java index 785e89cfb9a81..43fa64c158ea4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java @@ -37,4 +37,8 @@ public UnboundedBlockingPendingQueue(final PipeEventCounter eventCounter) { public E peekLast() { return pendingDeque.peekLast(); } + + public E pollLast() { + return pendingDeque.pollLast(); + } } From 02bedb233d4f5304bf4adef86fdf05980d082c23 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 9 Apr 2026 09:53:36 +0800 Subject: [PATCH 2/3] fix --- .../sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java | 1 - .../thrift/async/handler/PipeTransferTsFileHandler.java | 5 +---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index 1a4d7cf3863c9..d19639310cf4a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -668,7 +668,6 @@ private void retryTransfer(final PipeTsFileInsertionEvent tsFileInsertionEvent) addFailureEventToRetryQueue(tsFileInsertionEvent, null); } } catch (final Exception e) { - tsFileInsertionEvent.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(), false); addFailureEventToRetryQueue(tsFileInsertionEvent, e); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java index 6726c9a67011a..3eaaa94c4160c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java @@ -153,10 +153,7 @@ public void transfer( } if (reader == null) { - reader = - Objects.nonNull(modFile) - ? new RandomAccessFile(modFile, "r") - : new RandomAccessFile(tsFile, "r"); + reader = transferMod ? new RandomAccessFile(modFile, "r") : new RandomAccessFile(tsFile, "r"); } this.clientManager = clientManager; From 00920714d7bf8ad14e692487db71669fba231409 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 9 Apr 2026 09:57:10 +0800 Subject: [PATCH 3/3] fix --- .../apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java index ff8c8dbd293d8..499b53ba07eae 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java @@ -787,7 +787,7 @@ private TPipeTransferResp checkNonFinalFileSeal( String.format( "Failed to seal file %s, because the length of file is not correct. " + "The original file has length %s, but receiver file has length %s.", - fileName, fileLength, writingFileWriter.length())); + fileName, fileLength, file.length())); PipeLogger.log( LOGGER::warn, "Receiver id = %s: Failed to seal file %s, because the length of file is not correct. " @@ -795,7 +795,7 @@ private TPipeTransferResp checkNonFinalFileSeal( receiverId.get(), fileName, fileLength, - writingFileWriter.length()); + file.length()); return new TPipeTransferResp(status); }