Skip to content

Commit 6ef35e4

Browse files
authored
Cleaned multiple potential problems in pipe module (#17396)
* fix * fix * fix * fix * gras-shop * fix * spls * fix * pipe-dn * logger-bug * fix
1 parent 77fe3e7 commit 6ef35e4

File tree

29 files changed

+99
-192
lines changed

29 files changed

+99
-192
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public PipeHeartbeat(
5555
// the final results and namely these dataNodes are omitted in calculation.
5656
remainingEventCountMap.put(
5757
pipeMeta.getStaticMeta(),
58-
Objects.nonNull(pipeCompletedListFromAgent)
58+
Objects.nonNull(pipeRemainingEventCountListFromAgent)
5959
? pipeRemainingEventCountListFromAgent.get(i)
6060
: 0L);
6161
remainingTimeMap.put(

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,10 @@ protected void triggerSnapshot() {
153153
@Override
154154
public synchronized EnrichedEvent supply() throws Exception {
155155
final EnrichedEvent event = super.supply();
156-
PipeEventCommitManager.getInstance()
157-
.enrichWithCommitterKeyAndCommitId(event, creationTime, regionId);
156+
if (Objects.nonNull(event)) {
157+
PipeEventCommitManager.getInstance()
158+
.enrichWithCommitterKeyAndCommitId(event, creationTime, regionId);
159+
}
158160
return event;
159161
}
160162

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public static synchronized void launchPipePluginAgent(
7878
curList.add(uninstalledOrConflictedPipePluginMetaList.get(index + offset));
7979
offset++;
8080
}
81-
index += (offset + 1);
81+
index += offset;
8282
fetchAndSavePipePluginJars(curList);
8383
}
8484

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -706,7 +706,7 @@ protected void calculateMemoryUsage(
706706
MESSAGE_PIPE_NOT_ENOUGH_MEMORY,
707707
needMemory,
708708
freeMemorySizeInBytes,
709-
freeMemorySizeInBytes,
709+
reservedMemorySizeInBytes,
710710
PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes());
711711
LOGGER.warn(message);
712712
throw new PipeException(message);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ private void collectEvent(final Event event) {
238238
((PipeHeartbeatEvent) event).recordConnectorQueueSize(pendingQueue);
239239
}
240240

241-
pendingQueue.directOffer(event);
241+
pendingQueue.offer(event);
242242
collectInvocationCount.incrementAndGet();
243243
}
244244

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public PipeRealtimePriorityBlockingQueue() {
7272
}
7373

7474
@Override
75-
public boolean directOffer(final Event event) {
75+
public boolean offer(final Event event) {
7676
checkBeforeOffer(event);
7777

7878
if (event instanceof TsFileInsertionEvent) {
@@ -85,18 +85,13 @@ public boolean directOffer(final Event event) {
8585
((EnrichedEvent) event).decreaseReferenceCount(PipeEventCollector.class.getName(), false);
8686
return false;
8787
} else {
88-
return super.directOffer(event);
88+
return super.offer(event);
8989
}
9090
}
9191

92-
@Override
93-
public boolean waitedOffer(final Event event) {
94-
return directOffer(event);
95-
}
96-
9792
@Override
9893
public boolean put(final Event event) {
99-
directOffer(event);
94+
offer(event);
10095
return true;
10196
}
10297

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
3232
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
3333
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
34-
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
3534
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
3635
import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
3736
import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
@@ -193,7 +192,7 @@ public ByteBuffer serializeToByteBuffer() {
193192
@Override
194193
public void deserializeFromByteBuffer(final ByteBuffer buffer) {
195194
isGeneratedByPipe = ReadWriteIOUtils.readBool(buffer);
196-
deleteDataNode = (DeleteDataNode) PlanNodeType.deserialize(buffer);
195+
deleteDataNode = (AbstractDeleteDataNode) PlanNodeType.deserialize(buffer);
197196
progressIndex = deleteDataNode.getProgressIndex();
198197
}
199198

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public void unbindFrom(final AbstractMetricService metricService) {
138138
// phantom reference count
139139
metricService.remove(MetricType.AUTO_GAUGE, Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString());
140140

141-
metricService.remove(MetricType.RATE, Metric.PIPE_TSFILE_SEND_DISK_IO.toString());
141+
metricService.remove(MetricType.COUNTER, Metric.PIPE_TSFILE_SEND_DISK_IO.toString());
142142
}
143143

144144
public void recordDiskIO(final long bytes) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -338,14 +338,14 @@ private void unbind(final AbstractMetricService metricService) {
338338
Tag.NAME.toString(),
339339
RECEIVER,
340340
Tag.TYPE.toString(),
341-
"handshakeDatanodeV1");
341+
"handshakeDataNodeV1");
342342
metricService.remove(
343343
MetricType.TIMER,
344344
Metric.PIPE_DATANODE_RECEIVER.toString(),
345345
Tag.NAME.toString(),
346346
RECEIVER,
347347
Tag.TYPE.toString(),
348-
"handshakeDatanodeV2");
348+
"handshakeDataNodeV2");
349349
metricService.remove(
350350
MetricType.TIMER,
351351
Metric.PIPE_DATANODE_RECEIVER.toString(),

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -744,7 +744,7 @@ public void collectWindowOutputs(
744744
throw new UnsupportedOperationException(
745745
String.format(
746746
"The output tablet does not support column type %s",
747-
valueColumnTypes[rowIndex]));
747+
valueColumnTypes[columnIndex]));
748748
}
749749
} else {
750750
bitMaps[columnIndex].mark(rowIndex);
@@ -758,7 +758,7 @@ public void collectWindowOutputs(
758758
int filteredCount = 0;
759759
for (int i = 0; i < columnNameStringList.length; ++i) {
760760
if (!bitMaps[i].isAllMarked()) {
761-
originColumnIndex2FilteredColumnIndexMapperList[i] = ++filteredCount;
761+
originColumnIndex2FilteredColumnIndexMapperList[i] = filteredCount++;
762762
}
763763
}
764764

0 commit comments

Comments
 (0)