diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java index 743a05d37775a..c9c41f5b993ab 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java @@ -74,6 +74,11 @@ protected boolean init(SettableFuture blockedFuture) { return true; } + @Override + public boolean isInit() { + return init; + } + /** * Init seq file list and unseq file list in {@link * org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource} and set it into each diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java index ac353e3221311..35485dbbd4ec4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java @@ -213,7 +213,7 @@ private boolean isFinishedInternal() { finished = state.get() != State.ALIVE || driverContext.isDone() - || root.isFinished() + || (isInit() && root.isFinished()) || sink.isClosed(); } catch (Exception e) { throw new RuntimeException(e); @@ -224,6 +224,8 @@ private boolean isFinishedInternal() { return finished; } + abstract boolean isInit(); + @SuppressWarnings({"squid:S1181", "squid:S112"}) private ListenableFuture processInternal() { long startTimeNanos = System.nanoTime(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java index b507aa1c7487d..e3230b7ff7f33 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java @@ -42,6 +42,11 @@ protected boolean init(SettableFuture blockedFuture) { return true; } + @Override + boolean isInit() { + return true; + } + @Override protected void releaseResource() { driverContext.getFragmentInstanceContext().decrementNumOfUnClosedDriver(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java index 1f092e4c97c1e..5176923f8c20e 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java @@ -22,7 +22,9 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.MeasurementPath; +import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.common.PlanFragmentId; @@ -35,7 +37,9 @@ import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceState; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine; import org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.SingleDeviceViewOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.join.FullOuterTimeJoinOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.join.LeftOuterTimeJoinOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator; import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger; import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator; @@ -51,7 +55,6 @@ import com.google.common.util.concurrent.ListenableFuture; import io.airlift.units.Duration; import org.apache.tsfile.enums.TSDataType; -import org.apache.tsfile.exception.write.WriteProcessException; import org.apache.tsfile.read.common.block.TsBlock; import org.apache.tsfile.read.common.block.column.IntColumn; import org.apache.tsfile.write.schema.MeasurementSchema; @@ -91,7 +94,11 @@ public class DataDriverTest { TimeUnit.MILLISECONDS); @Before - public void setUp() throws MetadataException, IOException, WriteProcessException { + public void setUp() + throws MetadataException, + IOException, + WriteProcessException, + org.apache.tsfile.exception.write.WriteProcessException { IoTDBDescriptor.getInstance().getConfig().setDriverTaskExecutionTimeSliceInMs(10000); SeriesReaderTestUtil.setUp( measurementSchemas, deviceIds, seqResources, unSeqResources, DATA_DRIVER_TEST_SG); @@ -248,4 +255,120 @@ public void batchTest() { instanceNotificationExecutor.shutdown(); } } + + @Test + public void testCallIsFinishedBeforeDataSourcePrepared() throws IllegalPathException { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + MeasurementPath measurementPath1 = + new MeasurementPath( + new PartialPath(DATA_DRIVER_TEST_SG + ".device0"), + new MeasurementSchema("sensor0", TSDataType.INT32)); + Set allSensors = new HashSet<>(); + allSensors.add("sensor0"); + allSensors.add("sensor1"); + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + DataRegion dataRegion = Mockito.mock(DataRegion.class); + Mockito.when(dataRegion.tryReadLock(Mockito.anyLong())).thenReturn(true); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + fragmentInstanceContext.setDataRegion(dataRegion); + DataDriverContext driverContext = new DataDriverContext(fragmentInstanceContext, 0); + PlanNodeId planNodeId1 = new PlanNodeId("1"); + driverContext.addOperatorContext(1, planNodeId1, SeriesScanOperator.class.getSimpleName()); + PlanNodeId planNodeId2 = new PlanNodeId("2"); + driverContext.addOperatorContext(2, planNodeId2, SeriesScanOperator.class.getSimpleName()); + driverContext.addOperatorContext( + 3, new PlanNodeId("3"), FullOuterTimeJoinOperator.class.getSimpleName()); + driverContext.addOperatorContext(4, new PlanNodeId("4"), LimitOperator.class.getSimpleName()); + + SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder(); + scanOptionsBuilder.withAllSensors(allSensors); + SeriesScanOperator seriesScanOperator1 = + new SeriesScanOperator( + driverContext.getOperatorContexts().get(0), + planNodeId1, + measurementPath1, + Ordering.ASC, + scanOptionsBuilder.build()); + driverContext.addSourceOperator(seriesScanOperator1); + driverContext.addPath(measurementPath1); + seriesScanOperator1 + .getOperatorContext() + .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + + MeasurementPath measurementPath2 = + new MeasurementPath( + new PartialPath(DATA_DRIVER_TEST_SG + ".device0"), + new MeasurementSchema("sensor1", TSDataType.INT32)); + SeriesScanOperator seriesScanOperator2 = + new SeriesScanOperator( + driverContext.getOperatorContexts().get(1), + planNodeId2, + measurementPath2, + Ordering.ASC, + scanOptionsBuilder.build()); + driverContext.addSourceOperator(seriesScanOperator2); + driverContext.addPath(measurementPath2); + + seriesScanOperator2 + .getOperatorContext() + .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + + LeftOuterTimeJoinOperator timeJoinOperator = + new LeftOuterTimeJoinOperator( + driverContext.getOperatorContexts().get(2), + seriesScanOperator1, + 1, + seriesScanOperator2, + Arrays.asList(TSDataType.INT32, TSDataType.INT32), + new AscTimeComparator()); + SingleDeviceViewOperator fakeOperator = + new SingleDeviceViewOperator( + driverContext.getOperatorContexts().get(3), + "d1", + timeJoinOperator, + Arrays.asList(0), + Arrays.asList(TSDataType.INT32, TSDataType.INT32)); + fakeOperator.getOperatorContext().setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + + fragmentInstanceContext.setSourcePaths(driverContext.getPaths()); + String deviceId = DATA_DRIVER_TEST_SG + ".device0"; + Mockito.when( + dataRegion.query( + eq(driverContext.getPaths()), + eq(deviceId), + eq(fragmentInstanceContext), + Mockito.isNull(), + Mockito.isNull(), + Mockito.anyLong())) + .thenReturn(null); + fragmentInstanceContext.initQueryDataSource(driverContext.getPaths()); + fragmentInstanceContext.initializeNumOfDrivers(1); + + StubSink stubSink = new StubSink(fragmentInstanceContext); + driverContext.setSink(stubSink); + IDriver dataDriver = null; + try { + dataDriver = new DataDriver(fakeOperator, driverContext, 0); + assertEquals( + fragmentInstanceContext.getId(), dataDriver.getDriverTaskId().getFragmentInstanceId()); + assertFalse(dataDriver.isFinished()); + } finally { + if (dataDriver != null) { + dataDriver.close(); + } + } + } catch (QueryProcessException e) { + e.printStackTrace(); + fail(); + } finally { + instanceNotificationExecutor.shutdown(); + } + } }