From 44786ee4e215d74ea4a7d7fb14581fac278f3daa Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Wed, 8 Apr 2026 10:06:43 +0800 Subject: [PATCH 1/3] fix: prevent NPE when isFinished() is called before DataDriver init --- .../execution/driver/DataDriver.java | 5 + .../queryengine/execution/driver/Driver.java | 4 +- .../execution/driver/SchemaDriver.java | 5 + .../queryengine/execution/DataDriverTest.java | 113 ++++++++++++++++++ 4 files changed, 126 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java index 743a05d37775a..c9c41f5b993ab 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java @@ -74,6 +74,11 @@ protected boolean init(SettableFuture blockedFuture) { return true; } + @Override + public boolean isInit() { + return init; + } + /** * Init seq file list and unseq file list in {@link * org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource} and set it into each diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java index ac353e3221311..35485dbbd4ec4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java @@ -213,7 +213,7 @@ private boolean isFinishedInternal() { finished = state.get() != State.ALIVE || driverContext.isDone() - || root.isFinished() + || (isInit() && root.isFinished()) || sink.isClosed(); } catch (Exception e) { throw new RuntimeException(e); @@ -224,6 +224,8 @@ private boolean isFinishedInternal() { return finished; } + abstract boolean isInit(); + @SuppressWarnings({"squid:S1181", "squid:S112"}) private ListenableFuture processInternal() { long startTimeNanos = System.nanoTime(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java index b507aa1c7487d..e3230b7ff7f33 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java @@ -42,6 +42,11 @@ protected boolean init(SettableFuture blockedFuture) { return true; } + @Override + boolean isInit() { + return true; + } + @Override protected void releaseResource() { driverContext.getFragmentInstanceContext().decrementNumOfUnClosedDriver(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java index 1f092e4c97c1e..48218814d65e8 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java @@ -35,7 +35,9 @@ import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceState; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine; import org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.SingleDeviceViewOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.join.FullOuterTimeJoinOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.join.LeftOuterTimeJoinOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator; import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger; import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator; @@ -44,6 +46,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.reader.series.SeriesReaderTestUtil; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -248,4 +251,114 @@ public void batchTest() { instanceNotificationExecutor.shutdown(); } } + + @Test + public void testCallIsFinishedBeforeDataSourcePrepared() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + IFullPath measurementPath1 = + new NonAlignedFullPath( + IDeviceID.Factory.DEFAULT_FACTORY.create(DATA_DRIVER_TEST_SG + ".device0"), + new MeasurementSchema("sensor0", TSDataType.INT32)); + Set allSensors = new HashSet<>(); + allSensors.add("sensor0"); + allSensors.add("sensor1"); + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + DataRegion dataRegion = Mockito.mock(DataRegion.class); + Mockito.when(dataRegion.tryReadLock(Mockito.anyLong())).thenReturn(true); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + fragmentInstanceContext.setDataRegion(dataRegion); + DataDriverContext driverContext = new DataDriverContext(fragmentInstanceContext, 0); + PlanNodeId planNodeId1 = new PlanNodeId("1"); + driverContext.addOperatorContext(1, planNodeId1, SeriesScanOperator.class.getSimpleName()); + PlanNodeId planNodeId2 = new PlanNodeId("2"); + driverContext.addOperatorContext(2, planNodeId2, SeriesScanOperator.class.getSimpleName()); + driverContext.addOperatorContext( + 3, new PlanNodeId("3"), FullOuterTimeJoinOperator.class.getSimpleName()); + driverContext.addOperatorContext(4, new PlanNodeId("4"), LimitOperator.class.getSimpleName()); + + SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder(); + scanOptionsBuilder.withAllSensors(allSensors); + SeriesScanOperator seriesScanOperator1 = + new SeriesScanOperator( + driverContext.getOperatorContexts().get(0), + planNodeId1, + measurementPath1, + Ordering.ASC, + scanOptionsBuilder.build()); + driverContext.addSourceOperator(seriesScanOperator1); + driverContext.addPath(measurementPath1); + seriesScanOperator1 + .getOperatorContext() + .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + + IFullPath measurementPath2 = + new NonAlignedFullPath( + IDeviceID.Factory.DEFAULT_FACTORY.create(DATA_DRIVER_TEST_SG + ".device0"), + new MeasurementSchema("sensor1", TSDataType.INT32)); + SeriesScanOperator seriesScanOperator2 = + new SeriesScanOperator( + driverContext.getOperatorContexts().get(1), + planNodeId2, + measurementPath2, + Ordering.ASC, + scanOptionsBuilder.build()); + driverContext.addSourceOperator(seriesScanOperator2); + driverContext.addPath(measurementPath2); + + seriesScanOperator2 + .getOperatorContext() + .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + + LeftOuterTimeJoinOperator timeJoinOperator = + new LeftOuterTimeJoinOperator( + driverContext.getOperatorContexts().get(2), + seriesScanOperator1, + 1, + seriesScanOperator2, + Arrays.asList(TSDataType.INT32, TSDataType.INT32), + new AscTimeComparator()); + SingleDeviceViewOperator fakeOperator = new SingleDeviceViewOperator(driverContext.getOperatorContexts().get(3), "d1", timeJoinOperator, Arrays.asList(0), Arrays.asList(TSDataType.INT32, TSDataType.INT32)); + fakeOperator.getOperatorContext().setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + + fragmentInstanceContext.setSourcePaths(driverContext.getPaths()); + String deviceId = DATA_DRIVER_TEST_SG + ".device0"; + Mockito.when( + dataRegion.query( + eq(driverContext.getPaths()), + eq(IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId)), + eq(fragmentInstanceContext), + Mockito.isNull(), + Mockito.isNull(), + Mockito.anyLong())) + .thenReturn(null); + fragmentInstanceContext.initQueryDataSource(driverContext.getPaths()); + fragmentInstanceContext.initializeNumOfDrivers(1); + + StubSink stubSink = new StubSink(fragmentInstanceContext); + driverContext.setSink(stubSink); + IDriver dataDriver = null; + try { + dataDriver = new DataDriver(fakeOperator, driverContext, 0); + assertEquals( + fragmentInstanceContext.getId(), dataDriver.getDriverTaskId().getFragmentInstanceId()); + assertFalse(dataDriver.isFinished()); + } finally { + if (dataDriver != null) { + dataDriver.close(); + } + } + } catch (QueryProcessException e) { + e.printStackTrace(); + fail(); + } finally { + instanceNotificationExecutor.shutdown(); + } + } } From f46aaae45dd99a72b625d270578df2aee2e0586b Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Wed, 8 Apr 2026 10:21:20 +0800 Subject: [PATCH 2/3] spotless --- .../iotdb/db/queryengine/execution/DataDriverTest.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java index 48218814d65e8..31c89e1cda5d5 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java @@ -46,7 +46,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; -import org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.reader.series.SeriesReaderTestUtil; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -324,7 +323,13 @@ public void testCallIsFinishedBeforeDataSourcePrepared() { seriesScanOperator2, Arrays.asList(TSDataType.INT32, TSDataType.INT32), new AscTimeComparator()); - SingleDeviceViewOperator fakeOperator = new SingleDeviceViewOperator(driverContext.getOperatorContexts().get(3), "d1", timeJoinOperator, Arrays.asList(0), Arrays.asList(TSDataType.INT32, TSDataType.INT32)); + SingleDeviceViewOperator fakeOperator = + new SingleDeviceViewOperator( + driverContext.getOperatorContexts().get(3), + "d1", + timeJoinOperator, + Arrays.asList(0), + Arrays.asList(TSDataType.INT32, TSDataType.INT32)); fakeOperator.getOperatorContext().setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); fragmentInstanceContext.setSourcePaths(driverContext.getPaths()); From 1a0fe5a4a93e702b22a33e0ba49631ce89517e52 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Wed, 8 Apr 2026 11:03:02 +0800 Subject: [PATCH 3/3] fix compile --- .../queryengine/execution/DataDriverTest.java | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java index 31c89e1cda5d5..5176923f8c20e 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java @@ -22,7 +22,9 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.MeasurementPath; +import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.common.PlanFragmentId; @@ -53,7 +55,6 @@ import com.google.common.util.concurrent.ListenableFuture; import io.airlift.units.Duration; import org.apache.tsfile.enums.TSDataType; -import org.apache.tsfile.exception.write.WriteProcessException; import org.apache.tsfile.read.common.block.TsBlock; import org.apache.tsfile.read.common.block.column.IntColumn; import org.apache.tsfile.write.schema.MeasurementSchema; @@ -93,7 +94,11 @@ public class DataDriverTest { TimeUnit.MILLISECONDS); @Before - public void setUp() throws MetadataException, IOException, WriteProcessException { + public void setUp() + throws MetadataException, + IOException, + WriteProcessException, + org.apache.tsfile.exception.write.WriteProcessException { IoTDBDescriptor.getInstance().getConfig().setDriverTaskExecutionTimeSliceInMs(10000); SeriesReaderTestUtil.setUp( measurementSchemas, deviceIds, seqResources, unSeqResources, DATA_DRIVER_TEST_SG); @@ -252,13 +257,13 @@ public void batchTest() { } @Test - public void testCallIsFinishedBeforeDataSourcePrepared() { + public void testCallIsFinishedBeforeDataSourcePrepared() throws IllegalPathException { ExecutorService instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); try { - IFullPath measurementPath1 = - new NonAlignedFullPath( - IDeviceID.Factory.DEFAULT_FACTORY.create(DATA_DRIVER_TEST_SG + ".device0"), + MeasurementPath measurementPath1 = + new MeasurementPath( + new PartialPath(DATA_DRIVER_TEST_SG + ".device0"), new MeasurementSchema("sensor0", TSDataType.INT32)); Set allSensors = new HashSet<>(); allSensors.add("sensor0"); @@ -297,9 +302,9 @@ public void testCallIsFinishedBeforeDataSourcePrepared() { .getOperatorContext() .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); - IFullPath measurementPath2 = - new NonAlignedFullPath( - IDeviceID.Factory.DEFAULT_FACTORY.create(DATA_DRIVER_TEST_SG + ".device0"), + MeasurementPath measurementPath2 = + new MeasurementPath( + new PartialPath(DATA_DRIVER_TEST_SG + ".device0"), new MeasurementSchema("sensor1", TSDataType.INT32)); SeriesScanOperator seriesScanOperator2 = new SeriesScanOperator( @@ -337,7 +342,7 @@ public void testCallIsFinishedBeforeDataSourcePrepared() { Mockito.when( dataRegion.query( eq(driverContext.getPaths()), - eq(IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId)), + eq(deviceId), eq(fragmentInstanceContext), Mockito.isNull(), Mockito.isNull(),