Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public PipeHeartbeat(
// the final results and namely these dataNodes are omitted in calculation.
remainingEventCountMap.put(
pipeMeta.getStaticMeta(),
Objects.nonNull(pipeCompletedListFromAgent)
Objects.nonNull(pipeRemainingEventCountListFromAgent)
? pipeRemainingEventCountListFromAgent.get(i)
: 0L);
remainingTimeMap.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,10 @@ protected void triggerSnapshot() {
@Override
public synchronized EnrichedEvent supply() throws Exception {
final EnrichedEvent event = super.supply();
PipeEventCommitManager.getInstance()
.enrichWithCommitterKeyAndCommitId(event, creationTime, regionId);
if (Objects.nonNull(event)) {
PipeEventCommitManager.getInstance()
.enrichWithCommitterKeyAndCommitId(event, creationTime, regionId);
}
return event;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ private void collectEvent(final Event event) {
((PipeHeartbeatEvent) event).recordConnectorQueueSize(pendingQueue);
}

pendingQueue.directOffer(event);
pendingQueue.offer(event);
collectInvocationCount.incrementAndGet();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
}

@Override
public boolean directOffer(final Event event) {
public boolean offer(final Event event) {
checkBeforeOffer(event);

if (event instanceof TsFileInsertionEvent) {
Expand All @@ -85,18 +85,13 @@
((EnrichedEvent) event).decreaseReferenceCount(PipeEventCollector.class.getName(), false);
return false;
} else {
return super.directOffer(event);
return super.offer(event);
}
}

@Override
public boolean waitedOffer(final Event event) {
return directOffer(event);
}

@Override
public boolean put(final Event event) {
directOffer(event);
offer(event);
return true;
}

Expand Down Expand Up @@ -201,7 +196,7 @@
return tsfileInsertEventDeque.peek();
}

public synchronized void replace(

Check warning on line 199 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 118 to 64, Complexity from 15 to 14, Nesting Level from 5 to 2, Number of Variables from 24 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ1Bjn4vb4NJCcvIOeAr&open=AZ1Bjn4vb4NJCcvIOeAr&pullRequest=17396
String dataRegionId, Set<TsFileResource> sourceFiles, List<TsFileResource> targetFiles) {

final int regionId = Integer.parseInt(dataRegionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
Expand Down Expand Up @@ -193,7 +192,7 @@ public ByteBuffer serializeToByteBuffer() {
@Override
public void deserializeFromByteBuffer(final ByteBuffer buffer) {
isGeneratedByPipe = ReadWriteIOUtils.readBool(buffer);
deleteDataNode = (DeleteDataNode) PlanNodeType.deserialize(buffer);
deleteDataNode = (AbstractDeleteDataNode) PlanNodeType.deserialize(buffer);
progressIndex = deleteDataNode.getProgressIndex();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,15 @@
}
}

if (req.getFileNames().size() < 2) {
return new TIoTConsensusV2TransferResp(
RpcUtils.getStatus(
TSStatusCode.IOT_CONSENSUS_V2_TRANSFER_FILE_ERROR,
String.format(
"Failed to seal file %s, because the number of files is less than 2.",
req.getFileNames())));
}

// Sync here is necessary to ensure that the data is written to the disk. Or data region may
// load the file before the data is written to the disk and cause unexpected behavior after
// system restart. (e.g., empty file in data region's data directory)
Expand Down Expand Up @@ -952,6 +961,7 @@

private class IoTConsensusV2TsFileWriterPool {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private final List<IoTConsensusV2TsFileWriter> iotConsensusV2TsFileWriterPool =
new ArrayList<>();
private final ConsensusPipeName consensusPipeName;
Expand Down Expand Up @@ -1014,15 +1024,18 @@
while (!tsFileWriter.isPresent()) {
tsFileWriter =
iotConsensusV2TsFileWriterPool.stream().filter(item -> !item.isUsed()).findFirst();
Thread.sleep(RETRY_WAIT_TIME);
condition.await(RETRY_WAIT_TIME, TimeUnit.MILLISECONDS);

Check warning on line 1027 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Do something with the "boolean" value returned by "await".

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ0-X4c6s65uEkFo92iL&open=AZ0-X4c6s65uEkFo92iL&pullRequest=17396
}
tsFileWriter.get().setUsed(true);
tsFileWriter.get().setCommitIdOfCorrespondingHolderEvent(commitId);
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn(
"IoTConsensusV2{}: receiver thread get interrupted when waiting for borrowing tsFileWriter.",
consensusPipeName);
final String errorStr =
String.format(
"IoTConsensusV2%s: receiver thread get interrupted when waiting for borrowing tsFileWriter.",

Check warning on line 1035 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 100 characters (found 111).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ0-X4c6s65uEkFo92iM&open=AZ0-X4c6s65uEkFo92iM&pullRequest=17396
consensusPipeName);
LOGGER.warn(errorStr);
throw new RuntimeException(errorStr);

Check warning on line 1038 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace generic exceptions with specific library exceptions or a custom exception.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ0-X4c6s65uEkFo92iK&open=AZ0-X4c6s65uEkFo92iK&pullRequest=17396
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -1057,7 +1070,7 @@
&& tsFileWriter.isUsed()) {
try {
Thread.sleep(RETRY_WAIT_TIME);
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn(
"IoTConsensusV2-PipeName-{}: receiver thread get interrupted when exiting.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public final void releaseReceiverResource(DataRegionId dataRegionId) {
ConsensusPipeName consensusPipeName = receiverEntry.getKey();
AtomicReference<IoTConsensusV2Receiver> receiverReference =
receiverEntry.getValue();
if (receiverReference != null) {
if (receiverReference != null && receiverReference.get() != null) {
receiverReference.get().handleExit();
receiverReference.set(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {

while (true) {
final TEndPoint targetNodeUrl = endPointList.get((int) (Math.random() * clientSize));
if (isUnhealthy(targetNodeUrl) && n <= clientSize) {
if (isUnhealthy(targetNodeUrl) && n < clientSize) {
n++;
continue;
}
Expand All @@ -498,7 +498,7 @@ public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
long n = 0;
while (true) {
for (final TEndPoint targetNodeUrl : endPointList) {
if (isUnhealthy(targetNodeUrl) && n <= clientSize) {
if (isUnhealthy(targetNodeUrl) && n < clientSize) {
n++;
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public void transfer(final Event event) throws Exception {
throw new PipeConnectionException(
String.format(
"Network error when transfer tsfile event %s, because %s.",
((PipeDeleteDataNodeEvent) event).coreReportMessage(), e.getMessage()),
((EnrichedEvent) event).coreReportMessage(), e.getMessage()),
e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -90,14 +92,15 @@ public synchronized void unregister(WebSocketSink connector) {
final PriorityBlockingQueue<EventWaitingForTransfer> eventTransferQueue =
eventsWaitingForTransfer.remove(pipeName);
while (!eventTransferQueue.isEmpty()) {
eventTransferQueue.forEach(
final List<EventWaitingForTransfer> eventWrappers = new ArrayList<>(eventTransferQueue);
eventTransferQueue.clear();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will an event be missed if it is put into the queue after copying and before clearing?

eventWrappers.forEach(
(eventWrapper) -> {
if (eventWrapper.event instanceof EnrichedEvent) {
((EnrichedEvent) eventWrapper.event)
.decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
}
});
eventTransferQueue.clear();
synchronized (eventTransferQueue) {
eventTransferQueue.notifyAll();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,18 +160,19 @@ List<Pair<String, File>> writeTableModelTabletsToTsFiles(
e.getMessage(),
e);

final File file = fileWriter.getIOWriter().getFile();
try {
fileWriter.close();
} catch (final Exception closeException) {
LOGGER.warn(
"Batch id = {}: Failed to close the tsfile {} after failed to write tablets into, because {}",
currentBatchId.get(),
fileWriter.getIOWriter().getFile().getPath(),
file.getPath(),
closeException.getMessage(),
closeException);
} finally {
// Add current writing file to the list and delete the file
sealedFiles.add(new Pair<>(dataBase, fileWriter.getIOWriter().getFile()));
sealedFiles.add(new Pair<>(dataBase, file));
}

for (final Pair<String, File> sealedFile : sealedFiles) {
Expand All @@ -181,7 +182,7 @@ List<Pair<String, File>> writeTableModelTabletsToTsFiles(
currentBatchId.get(),
deleteSuccess ? "Successfully" : "Failed to",
sealedFile.right.getPath(),
fileWriter.getIOWriter().getFile().getPath(),
file.getPath(),
deleteSuccess ? "" : "Maybe the tsfile needs to be deleted manually.");
}
sealedFiles.clear();
Expand All @@ -191,8 +192,8 @@ List<Pair<String, File>> writeTableModelTabletsToTsFiles(
throw e;
}

fileWriter.close();
final File sealedFile = fileWriter.getIOWriter().getFile();
fileWriter.close();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Batch id = {}: Seal tsfile {} successfully.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,19 @@ private List<Pair<String, File>> writeTabletsToTsFiles()
e.getMessage(),
e);

final File file = fileWriter.getIOWriter().getFile();
try {
fileWriter.close();
} catch (final Exception closeException) {
LOGGER.warn(
"Batch id = {}: Failed to close the tsfile {} after failed to write tablets into, because {}",
currentBatchId.get(),
fileWriter.getIOWriter().getFile().getPath(),
file.getPath(),
closeException.getMessage(),
closeException);
} finally {
// Add current writing file to the list and delete the file
sealedFiles.add(new Pair<>(null, fileWriter.getIOWriter().getFile()));
sealedFiles.add(new Pair<>(null, file));
}

for (final Pair<String, File> sealedFile : sealedFiles) {
Expand All @@ -176,7 +177,7 @@ private List<Pair<String, File>> writeTabletsToTsFiles()
currentBatchId.get(),
deleteSuccess ? "Successfully" : "Failed to",
sealedFile.right.getPath(),
fileWriter.getIOWriter().getFile().getPath(),
file.getPath(),
deleteSuccess ? "" : "Maybe the tsfile needs to be deleted manually.");
}
sealedFiles.clear();
Expand All @@ -186,8 +187,8 @@ private List<Pair<String, File>> writeTabletsToTsFiles()
throw e;
}

fileWriter.close();
final File sealedFile = fileWriter.getIOWriter().getFile();
fileWriter.close();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Batch id = {}: Seal tsfile {} successfully.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,7 @@ private File getNextBaseDir() throws DiskSpaceInsufficientException {
return baseDir;
}
throw new PipeException(
String.format(
"Failed to create batch file dir %s. (Batch id = %s)",
baseDir.getPath(), currentBatchId.get()));
String.format("Failed to create batch file dir. (Batch id = %s)", currentBatchId.get()));
Comment thread
jt2594838 marked this conversation as resolved.
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ public void customize(
EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY)) {
watermarkIntervalInMs =
parameters.getLongOrDefault(
Arrays.asList(_EXTRACTOR_WATERMARK_INTERVAL_KEY, _SOURCE_WATERMARK_INTERVAL_KEY),
Arrays.asList(EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY),
EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE);
} else if (parameters.hasAnyAttributes(
_EXTRACTOR_WATERMARK_INTERVAL_KEY, _SOURCE_WATERMARK_INTERVAL_KEY)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected void doExtract(final PipeRealtimeEvent event) {
} else if (eventToExtract instanceof PipeHeartbeatEvent) {
extractHeartbeat(event);
} else if (eventToExtract instanceof PipeDeleteDataNodeEvent) {
extractDirectly(event);
pendingQueue.offer(event);
} else {
throw new UnsupportedOperationException(
String.format(
Expand Down Expand Up @@ -116,21 +116,7 @@ private void extractTabletInsertion(final PipeRealtimeEvent event) {
if (state == TsFileEpoch.State.USING_BOTH) {
event.skipReportOnCommit();
}
if (!pendingQueue.waitedOffer(event)) {
// This would not happen, but just in case.
// pendingQueue is unbounded, so it should never reach capacity.
final String errorMessage =
String.format(
"extractTabletInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor %s "
+ "has reached capacity, discard tablet event %s, current state %s",
this, event, event.getTsFileEpoch().getState(this));
LOGGER.error(errorMessage);
PipeDataNodeAgent.runtime()
.report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));

// Ignore the tablet event.
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridSource.class.getName(), false);
}
pendingQueue.offer(event);
break;
default:
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -176,21 +162,7 @@ private void extractTsFileInsertion(final PipeRealtimeEvent event) {
case EMPTY:
case USING_TSFILE:
case USING_BOTH:
if (!pendingQueue.waitedOffer(event)) {
// This would not happen, but just in case.
// pendingQueue is unbounded, so it should never reach capacity.
final String errorMessage =
String.format(
"extractTsFileInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor %s "
+ "has reached capacity, discard TsFile event %s, current state %s",
this, event, event.getTsFileEpoch().getState(this));
LOGGER.error(errorMessage);
PipeDataNodeAgent.runtime()
.report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));

// Ignore the tsfile event.
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridSource.class.getName(), false);
}
pendingQueue.offer(event);
break;
default:
throw new UnsupportedOperationException(
Expand Down
Loading
Loading