Skip to content

Commit d34bd8d

Browse files
authored
[To dev/1.3] Pipe: Optimized the memory occupation of pipe realtime source (#17450)(#17474) (#17486)
* Pipe: Optimized the memory occupation of pipe realtime source (#17450) * fix * fix * fix * Pipe: Fixed the NPE of progress report event (#17474)
1 parent 8ccd7c6 commit d34bd8d

File tree

5 files changed

+18
-7
lines changed

5 files changed

+18
-7
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -647,7 +647,6 @@ private void retryTransfer(final PipeTsFileInsertionEvent tsFileInsertionEvent)
647647
addFailureEventToRetryQueue(tsFileInsertionEvent, null);
648648
}
649649
} catch (final Exception e) {
650-
tsFileInsertionEvent.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(), false);
651650
addFailureEventToRetryQueue(tsFileInsertionEvent, e);
652651
}
653652
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,7 @@ public void transfer(
150150
}
151151

152152
if (reader == null) {
153-
reader =
154-
Objects.nonNull(modFile)
155-
? new RandomAccessFile(modFile, "r")
156-
: new RandomAccessFile(tsFile, "r");
153+
reader = transferMod ? new RandomAccessFile(modFile, "r") : new RandomAccessFile(tsFile, "r");
157154
}
158155

159156
this.clientManager = clientManager;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,17 @@ protected void extractHeartbeat(final PipeRealtimeEvent event) {
371371
}
372372

373373
protected void extractProgressReportEvent(final PipeRealtimeEvent event) {
374+
// Remove any heartbeat events in front of this event to avoid OOM
375+
// Since the batch and retry queue no longer need the heartbeat event to trigger
376+
// And the progress report event can trigger the processor calculation because it's not reported
377+
// yet
378+
while (true) {
379+
final PipeRealtimeEvent lastEvent = ((PipeRealtimeEvent) pendingQueue.peekLast());
380+
if (lastEvent == null || !(lastEvent.getEvent() instanceof PipeHeartbeatEvent)) {
381+
break;
382+
}
383+
pendingQueue.pollLast();
384+
}
374385
if (pendingQueue.peekLast() instanceof ProgressReportEvent) {
375386
final ProgressReportEvent oldEvent = (ProgressReportEvent) pendingQueue.peekLast();
376387
oldEvent.bindProgressIndex(

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/UnboundedBlockingPendingQueue.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,8 @@ public UnboundedBlockingPendingQueue(final PipeEventCounter eventCounter) {
3737
public E peekLast() {
3838
return pendingDeque.peekLast();
3939
}
40+
41+
public E pollLast() {
42+
return pendingDeque.pollLast();
43+
}
4044
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -740,15 +740,15 @@ private TPipeTransferResp checkNonFinalFileSeal(
740740
String.format(
741741
"Failed to seal file %s, because the length of file is not correct. "
742742
+ "The original file has length %s, but receiver file has length %s.",
743-
fileName, fileLength, writingFileWriter.length()));
743+
fileName, fileLength, file.length()));
744744
PipeLogger.log(
745745
LOGGER::warn,
746746
"Receiver id = %s: Failed to seal file %s, because the length of file is not correct. "
747747
+ "The original file has length %s, but receiver file has length %s.",
748748
receiverId.get(),
749749
fileName,
750750
fileLength,
751-
writingFileWriter.length());
751+
file.length());
752752
return new TPipeTransferResp(status);
753753
}
754754

0 commit comments

Comments
 (0)