diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index 1a4d7cf3863c9..d19639310cf4a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -668,7 +668,6 @@ private void retryTransfer(final PipeTsFileInsertionEvent tsFileInsertionEvent) addFailureEventToRetryQueue(tsFileInsertionEvent, null); } } catch (final Exception e) { - tsFileInsertionEvent.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(), false); addFailureEventToRetryQueue(tsFileInsertionEvent, e); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java index 6726c9a67011a..3eaaa94c4160c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java @@ -153,10 +153,7 @@ public void transfer( } if (reader == null) { - reader = - Objects.nonNull(modFile) - ? new RandomAccessFile(modFile, "r") - : new RandomAccessFile(tsFile, "r"); + reader = transferMod ? new RandomAccessFile(modFile, "r") : new RandomAccessFile(tsFile, "r"); } this.clientManager = clientManager; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java index 1e47a48e500da..718529243d9d0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java @@ -427,6 +427,13 @@ protected void extractHeartbeat(final PipeRealtimeEvent event) { } protected void extractProgressReportEvent(final PipeRealtimeEvent event) { + // Remove any heartbeat events in front of this event to avoid OOM + // Since the batch and retry queue no longer need the heartbeat event to trigger + // And the progress report event can trigger the processor calculation because it's not reported + // yet + while (((PipeRealtimeEvent) pendingQueue.peekLast()).getEvent() instanceof PipeHeartbeatEvent) { + pendingQueue.pollLast(); + } if (pendingQueue.peekLast() instanceof ProgressReportEvent) { final ProgressReportEvent oldEvent = (ProgressReportEvent) pendingQueue.peekLast(); oldEvent.bindProgressIndex( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java index 785e89cfb9a81..43fa64c158ea4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java @@ -37,4 +37,8 @@ public UnboundedBlockingPendingQueue(final PipeEventCounter eventCounter) { public E peekLast() { return pendingDeque.peekLast(); } + + public E pollLast() { + return pendingDeque.pollLast(); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java index ff8c8dbd293d8..499b53ba07eae 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java @@ -787,7 +787,7 @@ private TPipeTransferResp checkNonFinalFileSeal( String.format( "Failed to seal file %s, because the length of file is not correct. " + "The original file has length %s, but receiver file has length %s.", - fileName, fileLength, writingFileWriter.length())); + fileName, fileLength, file.length())); PipeLogger.log( LOGGER::warn, "Receiver id = %s: Failed to seal file %s, because the length of file is not correct. " @@ -795,7 +795,7 @@ private TPipeTransferResp checkNonFinalFileSeal( receiverId.get(), fileName, fileLength, - writingFileWriter.length()); + file.length()); return new TPipeTransferResp(status); }