Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ protected boolean init(SettableFuture<?> blockedFuture) {
return true;
}

@Override
public boolean isInit() {
return init;
}

/**
* Init seq file list and unseq file list in {@link
* org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource} and set it into each
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private boolean isFinishedInternal() {
finished =
state.get() != State.ALIVE
|| driverContext.isDone()
|| root.isFinished()
|| (isInit() && root.isFinished())
|| sink.isClosed();
} catch (Exception e) {
throw new RuntimeException(e);
Expand All @@ -224,6 +224,8 @@ private boolean isFinishedInternal() {
return finished;
}

abstract boolean isInit();

@SuppressWarnings({"squid:S1181", "squid:S112"})
private ListenableFuture<?> processInternal() {
long startTimeNanos = System.nanoTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ protected boolean init(SettableFuture<?> blockedFuture) {
return true;
}

@Override
boolean isInit() {
return true;
}

@Override
protected void releaseResource() {
driverContext.getFragmentInstanceContext().decrementNumOfUnClosedDriver();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
Expand All @@ -35,7 +37,9 @@
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.SingleDeviceViewOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.FullOuterTimeJoinOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.LeftOuterTimeJoinOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger;
import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
Expand All @@ -51,7 +55,6 @@
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.column.IntColumn;
import org.apache.tsfile.write.schema.MeasurementSchema;
Expand Down Expand Up @@ -91,7 +94,11 @@
TimeUnit.MILLISECONDS);

@Before
public void setUp() throws MetadataException, IOException, WriteProcessException {
public void setUp()
throws MetadataException,
IOException,
WriteProcessException,

Check warning on line 100 in iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the declaration of thrown exception 'org.apache.iotdb.db.exception.WriteProcessException', as it cannot be thrown from method's body.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ1rFC3TNX_uvab99j0s&open=AZ1rFC3TNX_uvab99j0s&pullRequest=17441
org.apache.tsfile.exception.write.WriteProcessException {
IoTDBDescriptor.getInstance().getConfig().setDriverTaskExecutionTimeSliceInMs(10000);
SeriesReaderTestUtil.setUp(
measurementSchemas, deviceIds, seqResources, unSeqResources, DATA_DRIVER_TEST_SG);
Expand Down Expand Up @@ -248,4 +255,120 @@
instanceNotificationExecutor.shutdown();
}
}

@Test
public void testCallIsFinishedBeforeDataSourcePrepared() throws IllegalPathException {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {

Check failure on line 263 in iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Change this "try" to a try-with-resources.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ1rFC3TNX_uvab99j0t&open=AZ1rFC3TNX_uvab99j0t&pullRequest=17441
MeasurementPath measurementPath1 =

Check warning on line 264 in iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Distance between variable 'measurementPath1' declaration and its first usage is 10, but allowed 3. Consider making that variable final if you still need to store its value in advance (before method calls that might have side effects on the original value).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ1rFC3TNX_uvab99j0u&open=AZ1rFC3TNX_uvab99j0u&pullRequest=17441
new MeasurementPath(
new PartialPath(DATA_DRIVER_TEST_SG + ".device0"),
new MeasurementSchema("sensor0", TSDataType.INT32));
Set<String> allSensors = new HashSet<>();
allSensors.add("sensor0");
allSensors.add("sensor1");
QueryId queryId = new QueryId("stub_query");
FragmentInstanceId instanceId =
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
FragmentInstanceStateMachine stateMachine =
new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
DataRegion dataRegion = Mockito.mock(DataRegion.class);
Mockito.when(dataRegion.tryReadLock(Mockito.anyLong())).thenReturn(true);
FragmentInstanceContext fragmentInstanceContext =
createFragmentInstanceContext(instanceId, stateMachine);
fragmentInstanceContext.setDataRegion(dataRegion);
DataDriverContext driverContext = new DataDriverContext(fragmentInstanceContext, 0);
PlanNodeId planNodeId1 = new PlanNodeId("1");
driverContext.addOperatorContext(1, planNodeId1, SeriesScanOperator.class.getSimpleName());
PlanNodeId planNodeId2 = new PlanNodeId("2");
driverContext.addOperatorContext(2, planNodeId2, SeriesScanOperator.class.getSimpleName());
driverContext.addOperatorContext(
3, new PlanNodeId("3"), FullOuterTimeJoinOperator.class.getSimpleName());
driverContext.addOperatorContext(4, new PlanNodeId("4"), LimitOperator.class.getSimpleName());

SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder();
scanOptionsBuilder.withAllSensors(allSensors);
SeriesScanOperator seriesScanOperator1 =
new SeriesScanOperator(
driverContext.getOperatorContexts().get(0),
planNodeId1,
measurementPath1,
Ordering.ASC,
scanOptionsBuilder.build());
driverContext.addSourceOperator(seriesScanOperator1);
driverContext.addPath(measurementPath1);
seriesScanOperator1
.getOperatorContext()
.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));

MeasurementPath measurementPath2 =
new MeasurementPath(
new PartialPath(DATA_DRIVER_TEST_SG + ".device0"),
new MeasurementSchema("sensor1", TSDataType.INT32));
SeriesScanOperator seriesScanOperator2 =
new SeriesScanOperator(
driverContext.getOperatorContexts().get(1),
planNodeId2,
measurementPath2,
Ordering.ASC,
scanOptionsBuilder.build());
driverContext.addSourceOperator(seriesScanOperator2);
driverContext.addPath(measurementPath2);

seriesScanOperator2
.getOperatorContext()
.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));

LeftOuterTimeJoinOperator timeJoinOperator =
new LeftOuterTimeJoinOperator(
driverContext.getOperatorContexts().get(2),
seriesScanOperator1,
1,
seriesScanOperator2,
Arrays.asList(TSDataType.INT32, TSDataType.INT32),
new AscTimeComparator());
SingleDeviceViewOperator fakeOperator =
new SingleDeviceViewOperator(
driverContext.getOperatorContexts().get(3),
"d1",
timeJoinOperator,
Arrays.asList(0),
Arrays.asList(TSDataType.INT32, TSDataType.INT32));
fakeOperator.getOperatorContext().setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));

fragmentInstanceContext.setSourcePaths(driverContext.getPaths());
String deviceId = DATA_DRIVER_TEST_SG + ".device0";
Mockito.when(
dataRegion.query(
eq(driverContext.getPaths()),
eq(deviceId),
eq(fragmentInstanceContext),
Mockito.isNull(),
Mockito.isNull(),
Mockito.anyLong()))
.thenReturn(null);
fragmentInstanceContext.initQueryDataSource(driverContext.getPaths());
fragmentInstanceContext.initializeNumOfDrivers(1);

StubSink stubSink = new StubSink(fragmentInstanceContext);
driverContext.setSink(stubSink);
IDriver dataDriver = null;
try {
dataDriver = new DataDriver(fakeOperator, driverContext, 0);
assertEquals(
fragmentInstanceContext.getId(), dataDriver.getDriverTaskId().getFragmentInstanceId());
assertFalse(dataDriver.isFinished());
} finally {
if (dataDriver != null) {
dataDriver.close();
}
}
} catch (QueryProcessException e) {
e.printStackTrace();
fail();
} finally {
instanceNotificationExecutor.shutdown();
}
}
}
Loading