Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,6 @@ private void retryTransfer(final PipeTsFileInsertionEvent tsFileInsertionEvent)
addFailureEventToRetryQueue(tsFileInsertionEvent, null);
}
} catch (final Exception e) {
tsFileInsertionEvent.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(), false);
addFailureEventToRetryQueue(tsFileInsertionEvent, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
return tsFile;
}

public void transfer(

Check warning on line 140 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 89 to 64, Complexity from 16 to 14, Nesting Level from 3 to 2, Number of Variables from 13 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ1v-_xmVoFEiAkUHCpd&open=AZ1v-_xmVoFEiAkUHCpd&pullRequest=17450
final IoTDBDataNodeAsyncClientManager clientManager,
final AsyncPipeDataTransferServiceClient client)
throws TException, IOException {
Expand All @@ -153,10 +153,7 @@
}

if (reader == null) {
reader =
Objects.nonNull(modFile)
? new RandomAccessFile(modFile, "r")
: new RandomAccessFile(tsFile, "r");
reader = transferMod ? new RandomAccessFile(modFile, "r") : new RandomAccessFile(tsFile, "r");
}

this.clientManager = clientManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,13 @@ protected void extractHeartbeat(final PipeRealtimeEvent event) {
}

protected void extractProgressReportEvent(final PipeRealtimeEvent event) {
// Remove any heartbeat events in front of this event to avoid OOM
// Since the batch and retry queue no longer need the heartbeat event to trigger
// And the progress report event can trigger the processor calculation because it's not reported
// yet
while (((PipeRealtimeEvent) pendingQueue.peekLast()).getEvent() instanceof PipeHeartbeatEvent) {
pendingQueue.pollLast();
}
if (pendingQueue.peekLast() instanceof ProgressReportEvent) {
final ProgressReportEvent oldEvent = (ProgressReportEvent) pendingQueue.peekLast();
oldEvent.bindProgressIndex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,8 @@ public UnboundedBlockingPendingQueue(final PipeEventCounter eventCounter) {
public E peekLast() {
return pendingDeque.peekLast();
}

public E pollLast() {
return pendingDeque.pollLast();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@
}

private TPipeTransferResp checkNonFinalFileSeal(
final File file, final String fileName, final long fileLength) throws IOException {

Check warning on line 769 in iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the declaration of thrown exception 'java.io.IOException', as it cannot be thrown from method's body.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ1v-_iJVoFEiAkUHCpc&open=AZ1v-_iJVoFEiAkUHCpc&pullRequest=17450
if (!file.exists()) {
final TSStatus status =
RpcUtils.getStatus(
Expand All @@ -787,15 +787,15 @@
String.format(
"Failed to seal file %s, because the length of file is not correct. "
+ "The original file has length %s, but receiver file has length %s.",
fileName, fileLength, writingFileWriter.length()));
fileName, fileLength, file.length()));
PipeLogger.log(
LOGGER::warn,
"Receiver id = %s: Failed to seal file %s, because the length of file is not correct. "
+ "The original file has length %s, but receiver file has length %s.",
receiverId.get(),
fileName,
fileLength,
writingFileWriter.length());
file.length());
return new TPipeTransferResp(status);
}

Expand Down
Loading