From 955103e3526846dd71037bc62cafde5c325426a6 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 2 Apr 2026 09:37:39 +0800 Subject: [PATCH] Load: Fixed multiple bugs (#17413) * fix * load * Update ClusterConfigTaskExecutor.java * fix --- .../execution/config/executor/ClusterConfigTaskExecutor.java | 2 ++ .../queryengine/plan/scheduler/load/LoadTsFileScheduler.java | 5 ++++- .../iotdb/db/storageengine/load/LoadTsFileManager.java | 4 ++-- .../db/storageengine/load/active/ActiveLoadPendingQueue.java | 4 ++-- .../db/storageengine/load/config/LoadTsFileConfigurator.java | 4 ++-- .../storageengine/load/memory/LoadTsFileMemoryManager.java | 1 - .../storageengine/load/metrics/LoadTsFileCostMetricsSet.java | 2 +- 7 files changed, 13 insertions(+), 9 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 24c64c240308b..023f7e3379368 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -2871,6 +2871,7 @@ public SettableFuture removeDataNode( future.setException( new IOException( "The DataNode to be removed is not in the cluster, or the input format is incorrect.")); + return future; } LOGGER.info("Starting to remove DataNode with nodeIds: {}", nodeIds); @@ -2940,6 +2941,7 @@ public SettableFuture removeConfigNode( future.setException( new IOException( "The ConfigNode to be removed is not in the cluster, or the input format is incorrect.")); + return future; } TConfigNodeLocation configNodeLocation = removeConfigNodeLocations.get(0); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java index e9cd679250bff..4a85b6d0ebc0a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java @@ -275,7 +275,10 @@ public void start() { final StringBuilder failedTsFiles = new StringBuilder( !tsFileNodeList.isEmpty() - ? tsFileNodeList.get(0).getTsFileResource().getTsFilePath() + ? tsFileNodeList + .get(failedTsFileNodeIndexes.get(0)) + .getTsFileResource() + .getTsFilePath() : ""); final ListIterator iterator = failedTsFileNodeIndexes.listIterator(1); while (iterator.hasNext()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java index 1e94188f01bb9..ce1bb680f21e2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java @@ -220,7 +220,7 @@ public void writeToDataRegion(DataRegion dataRegion, LoadTsFilePieceNode pieceNo } } - final Optional cleanupTask = Optional.of(uuid2CleanupTask.get(uuid)); + final Optional cleanupTask = Optional.ofNullable(uuid2CleanupTask.get(uuid)); cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning); try { final AtomicReference exception = new AtomicReference<>(); @@ -293,7 +293,7 @@ public boolean loadAll( return false; } - final Optional cleanupTask = Optional.of(uuid2CleanupTask.get(uuid)); + final Optional cleanupTask = Optional.ofNullable(uuid2CleanupTask.get(uuid)); cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning); try { uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe, timePartitionProgressIndexMap); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java index bb611b842be7c..3ca8395646219 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java @@ -66,11 +66,11 @@ public synchronized boolean isFilePendingOrLoading(final String file) { return loadingFileSet.contains(file) || pendingFileSet.contains(file); } - public int size() { + public synchronized int size() { return pendingFileQueue.size() + loadingFileSet.size(); } - public boolean isEmpty() { + public synchronized boolean isEmpty() { return pendingFileQueue.isEmpty() && loadingFileSet.isEmpty(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java index 65d5b9d2941e5..510d47b0b23df 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java @@ -66,14 +66,14 @@ public static void validateParameters(final String key, final String value) { public static void validateDatabaseLevelParam(final String databaseLevel) { try { - int level = Integer.parseInt(databaseLevel); + final int level = Integer.parseInt(databaseLevel); if (level < DATABASE_LEVEL_MIN_VALUE) { throw new SemanticException( String.format( "Given database level %d is less than the minimum value %d, please input a valid database level.", level, DATABASE_LEVEL_MIN_VALUE)); } - } catch (Exception e) { + } catch (final NumberFormatException e) { throw new SemanticException( String.format( "Given database level %s is not a valid integer, please input a valid database level.", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java index 4b08064bb9c0d..6c85d2280fd0b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java @@ -127,7 +127,6 @@ public synchronized LoadTsFileDataCacheMemoryBlock allocateDataCacheMemoryBlock( final long actuallyAllocateMemoryInBytes = tryAllocateFromQuery(MEMORY_TOTAL_SIZE_FROM_QUERY_IN_BYTES >> 2); dataCacheMemoryBlock = new LoadTsFileDataCacheMemoryBlock(actuallyAllocateMemoryInBytes); - usedMemorySizeInBytes.addAndGet(actuallyAllocateMemoryInBytes); LOGGER.info( "Create Data Cache Memory Block {}, allocate memory {}", dataCacheMemoryBlock, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java index 28bd40c2d29f8..1ce119131aeff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java @@ -159,7 +159,7 @@ public void unbindFrom(AbstractMetricService metricService) { stage)); metricService.remove( - MetricType.RATE, + MetricType.COUNTER, Metric.LOAD_DISK_IO.toString(), Tag.NAME.toString(), String.valueOf(IoTDBDescriptor.getInstance().getConfig().getDataNodeId()));