Skip to content

Commit 93458ea

Browse files
authored
[IOTDB-2543] Support all time filters in last query
1 parent 44b76ae commit 93458ea

7 files changed

Lines changed: 280 additions & 142 deletions

File tree

cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1051,13 +1051,13 @@ public ByteBuffer last(LastQueryRequest request)
10511051
expression = new GlobalTimeExpression(filter);
10521052
}
10531053

1054-
List<Pair<Boolean, TimeValuePair>> timeValuePairs =
1054+
List<TimeValuePair> timeValuePairs =
10551055
LastQueryExecutor.calculateLastPairForSeriesLocally(
10561056
seriesPaths, dataTypes, queryContext, expression, request.getDeviceMeasurements());
10571057
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
10581058
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
1059-
for (Pair<Boolean, TimeValuePair> timeValuePair : timeValuePairs) {
1060-
SerializeUtils.serializeTVPair(timeValuePair.right, dataOutputStream);
1059+
for (TimeValuePair timeValuePair : timeValuePairs) {
1060+
SerializeUtils.serializeTVPair(timeValuePair, dataOutputStream);
10611061
}
10621062
return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
10631063
}

cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.apache.iotdb.tsfile.read.expression.IExpression;
4747
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
4848
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
49-
import org.apache.iotdb.tsfile.utils.Pair;
5049

5150
import org.apache.thrift.TException;
5251
import org.slf4j.Logger;
@@ -77,7 +76,7 @@ public ClusterLastQueryExecutor(LastQueryPlan lastQueryPlan, MetaGroupMember met
7776
}
7877

7978
@Override
80-
protected List<Pair<Boolean, TimeValuePair>> calculateLastPairForSeries(
79+
protected List<TimeValuePair> calculateLastPairForSeries(
8180
List<PartialPath> seriesPaths,
8281
List<TSDataType> dataTypes,
8382
QueryContext context,
@@ -87,7 +86,7 @@ protected List<Pair<Boolean, TimeValuePair>> calculateLastPairForSeries(
8786
return calculateLastPairsForSeries(seriesPaths, dataTypes, context, expression, lastQueryPlan);
8887
}
8988

90-
private List<Pair<Boolean, TimeValuePair>> calculateLastPairsForSeries(
89+
private List<TimeValuePair> calculateLastPairsForSeries(
9190
List<PartialPath> seriesPaths,
9291
List<TSDataType> dataTypes,
9392
QueryContext context,
@@ -100,14 +99,12 @@ private List<Pair<Boolean, TimeValuePair>> calculateLastPairsForSeries(
10099
} catch (CheckConsistencyException e) {
101100
throw new IOException(e);
102101
}
103-
List<Pair<Boolean, TimeValuePair>> results = new ArrayList<>(seriesPaths.size());
102+
List<TimeValuePair> results = new ArrayList<>(seriesPaths.size());
104103
for (int i = 0; i < seriesPaths.size(); i++) {
105-
results.add(new Pair<>(true, new TimeValuePair(Long.MIN_VALUE, null)));
104+
results.add(null);
106105
}
107-
108106
List<PartitionGroup> globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups();
109-
List<Future<List<Pair<Boolean, TimeValuePair>>>> groupFutures =
110-
new ArrayList<>(globalGroups.size());
107+
List<Future<List<TimeValuePair>>> groupFutures = new ArrayList<>(globalGroups.size());
111108
List<Integer> dataTypeOrdinals = new ArrayList<>(dataTypes.size());
112109
for (TSDataType dataType : dataTypes) {
113110
dataTypeOrdinals.add(dataType.ordinal());
@@ -124,15 +121,16 @@ private List<Pair<Boolean, TimeValuePair>> calculateLastPairsForSeries(
124121
dataTypeOrdinals);
125122
groupFutures.add(lastQueryPool.submit(task));
126123
}
127-
for (Future<List<Pair<Boolean, TimeValuePair>>> groupFuture : groupFutures) {
124+
for (Future<List<TimeValuePair>> groupFuture : groupFutures) {
128125
try {
129126
// merge results from each group
130-
List<Pair<Boolean, TimeValuePair>> timeValuePairs = groupFuture.get();
127+
List<TimeValuePair> timeValuePairs = groupFuture.get();
131128
for (int i = 0; i < timeValuePairs.size(); i++) {
132-
if (timeValuePairs.get(i) != null
133-
&& timeValuePairs.get(i).right != null
134-
&& timeValuePairs.get(i).right.getTimestamp() > results.get(i).right.getTimestamp()) {
135-
results.get(i).right = timeValuePairs.get(i).right;
129+
TimeValuePair currentResult = results.get(i);
130+
TimeValuePair newResult = timeValuePairs.get(i);
131+
if (currentResult == null
132+
|| newResult != null && newResult.getTimestamp() > currentResult.getTimestamp()) {
133+
results.add(i, newResult);
136134
}
137135
}
138136
} catch (InterruptedException e) {
@@ -145,7 +143,7 @@ private List<Pair<Boolean, TimeValuePair>> calculateLastPairsForSeries(
145143
return results;
146144
}
147145

148-
class GroupLastTask implements Callable<List<Pair<Boolean, TimeValuePair>>> {
146+
class GroupLastTask implements Callable<List<TimeValuePair>> {
149147

150148
private PartitionGroup group;
151149
private List<PartialPath> seriesPaths;
@@ -173,11 +171,11 @@ class GroupLastTask implements Callable<List<Pair<Boolean, TimeValuePair>>> {
173171
}
174172

175173
@Override
176-
public List<Pair<Boolean, TimeValuePair>> call() throws Exception {
174+
public List<TimeValuePair> call() throws Exception {
177175
return calculateSeriesLast(group, seriesPaths, queryContext);
178176
}
179177

180-
private List<Pair<Boolean, TimeValuePair>> calculateSeriesLast(
178+
private List<TimeValuePair> calculateSeriesLast(
181179
PartitionGroup group, List<PartialPath> seriesPaths, QueryContext context)
182180
throws QueryProcessException, StorageEngineException, IOException {
183181
if (group.contains(metaGroupMember.getThisNode())) {
@@ -187,7 +185,7 @@ private List<Pair<Boolean, TimeValuePair>> calculateSeriesLast(
187185
}
188186
}
189187

190-
private List<Pair<Boolean, TimeValuePair>> calculateSeriesLastLocally(
188+
private List<TimeValuePair> calculateSeriesLastLocally(
191189
PartitionGroup group, List<PartialPath> seriesPaths, QueryContext context)
192190
throws StorageEngineException, QueryProcessException, IOException {
193191
DataGroupMember localDataMember =
@@ -201,7 +199,7 @@ private List<Pair<Boolean, TimeValuePair>> calculateSeriesLastLocally(
201199
seriesPaths, dataTypes, context, expression, queryPlan.getDeviceToMeasurements());
202200
}
203201

204-
private List<Pair<Boolean, TimeValuePair>> calculateSeriesLastRemotely(
202+
private List<TimeValuePair> calculateSeriesLastRemotely(
205203
PartitionGroup group, List<PartialPath> seriesPaths, QueryContext context) {
206204
for (Node node : group) {
207205
try {
@@ -219,10 +217,10 @@ private List<Pair<Boolean, TimeValuePair>> calculateSeriesLastRemotely(
219217
for (int i = 0; i < seriesPaths.size(); i++) {
220218
timeValuePairs.add(SerializeUtils.deserializeTVPair(buffer));
221219
}
222-
List<Pair<Boolean, TimeValuePair>> results = new ArrayList<>();
220+
List<TimeValuePair> results = new ArrayList<>();
223221
for (int i = 0; i < seriesPaths.size(); i++) {
224222
TimeValuePair pair = timeValuePairs.get(i);
225-
results.add(new Pair<>(true, pair));
223+
results.add(pair);
226224
}
227225
return results;
228226
} catch (IOException | TException e) {

docs/UserGuide/Query-Data/Last-Query.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,27 +21,27 @@
2121

2222
# Last Query
2323

24-
The last query is a special query provided in the time series database Apache IoTDB. The last query returns the data point with the largest timestamp in the time series, that is, the latest state of a sequence. Users can specify the last query through `select last`. It is especially important in IoT data analysis scenarios as the latest point data characterizes the current state. In order to provide a millisecond-level return speed, Apache IoTDB optimizes the cache for the last query to meet users' performance requirements for real-time monitoring of devices.
24+
The last query is a special type of query in Apache IoTDB. It returns the data point with the largest timestamp of the specified time series. In other word, it returns the latest state of a time series. This feature is especially important in IoT data analysis scenarios. To meet the performance requirement of real-time device monitoring systems, Apache IoTDB caches the latest values of all time series to achieve microsecond read latency.
2525

2626
The last query is to return the most recent data point of the given timeseries in a three column format.
2727

28-
The SQL statement is defined as:
28+
The SQL syntax is defined as:
2929

3030
```sql
3131
select last <Path> [COMMA <Path>]* from < PrefixPath > [COMMA < PrefixPath >]* <WhereClause>
3232
```
3333

3434
which means: Query and return the last data points of timeseries prefixPath.path.
3535

36-
Only time filter with '>' or '>=' is supported in \<WhereClause\>. Any other filters given in the \<WhereClause\> will give an exception.
36+
Only time filter is supported in \<WhereClause\>. Any other filters given in the \<WhereClause\> will give an exception. When the cached most recent data point does not satisfy the criterion specified by the filter, IoTDB will have to get the result from the external storage, which may cause a decrease in performance.
3737

3838
The result will be returned in a four column table format.
3939

4040
```
4141
| Time | timeseries | value | dataType |
4242
```
4343

44-
**Note:** The `value` colum will always return the value as `string` and thus also has `TSDataType.TEXT`. Therefore the colum `dataType` is returned also which contains the _real_ type how the value should be interpreted.
44+
**Note:** The `value` colum will always return the value as `string` and thus also has `TSDataType.TEXT`. Therefore, the column `dataType` is returned also which contains the _real_ type how the value should be interpreted.
4545

4646
**Example 1:** get the last point of root.ln.wf01.wt01.status:
4747

docs/zh/UserGuide/Query-Data/Last-Query.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
# 最新点查询
2323

24-
最新点查询是时序数据库 Apache IoTDB 中提供的一种特殊查询,最新点查询返回时间序列中时间戳最大的数据点,即一条序列的最新状态。用户可以通过 `select last` 指定查询最新点。由于最新点数据表征了当前状态,因此在物联网数据分析场景中尤为重要。为了提供毫秒级的返回速度,Apache IoTDB 对最新点查询进行了缓存优化****满足用户对设备实时监控的性能需求
24+
最新点查询是时序数据库 Apache IoTDB 中提供的一种特殊查询。它返回指定时间序列中时间戳最大的数据点,即一条序列的最新状态。在物联网数据分析场景中,此功能尤为重要。为了满足了用户对设备实时监控的需求,Apache IoTDB 对最新点查询进行了**缓存优化**,能够提供毫秒级的返回速度
2525

2626
SQL 语法:
2727

@@ -31,7 +31,7 @@ select last <Path> [COMMA <Path>]* from < PrefixPath > [COMMA < PrefixPath >]* <
3131

3232
其含义是:查询时间序列 prefixPath.path 中最近时间戳的数据。
3333

34-
`whereClause` 中当前只支持含有 `>``>=` 的时间过滤条件,任何其他过滤条件都将会返回异常。
34+
`whereClause` 中当前只支持时间过滤条件,任何其他过滤条件都将会返回异常。当缓存的最新点不满足过滤条件时,IoTDB 需要从存储中获取结果,此时性能将会有所下降
3535

3636
结果集为四列的结构:
3737

integration/src/test/java/org/apache/iotdb/db/integration/IoTDBLastIT.java

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,139 @@ public void lastWithFilterTest() {
576576
}
577577
}
578578

579+
@Test
580+
public void lastWithLessThanFilterTest() {
581+
try (Connection connection = EnvFactory.getEnv().getConnection();
582+
Statement statement = connection.createStatement()) {
583+
584+
long lastTimestamp;
585+
statement.execute("SELECT LAST temperature FROM root.ln.wf01.wt01");
586+
try (ResultSet resultSet = statement.getResultSet()) {
587+
Assert.assertTrue(resultSet.next());
588+
lastTimestamp = resultSet.getTimestamp(1).getTime();
589+
}
590+
591+
statement.execute("select last temperature from root.ln.wf01.wt01 where time < 100");
592+
try (ResultSet resultSet = statement.getResultSet()) {
593+
Assert.assertFalse(resultSet.next());
594+
}
595+
596+
statement.execute("select last temperature from root.ln.wf01.wt01 where time < 200");
597+
try (ResultSet resultSet = statement.getResultSet()) {
598+
Assert.assertTrue(resultSet.next());
599+
Assert.assertEquals(100, resultSet.getTimestamp(1).getTime());
600+
Assert.assertFalse(resultSet.wasNull());
601+
Assert.assertEquals("root.ln.wf01.wt01.temperature", resultSet.getString(2));
602+
Assert.assertFalse(resultSet.wasNull());
603+
Assert.assertEquals("25.1", resultSet.getString(3));
604+
Assert.assertFalse(resultSet.wasNull());
605+
Assert.assertEquals("DOUBLE", resultSet.getString(4));
606+
Assert.assertFalse(resultSet.wasNull());
607+
Assert.assertFalse(resultSet.next());
608+
}
609+
610+
// Test if the last value cache is effected.
611+
statement.execute("SELECT LAST temperature FROM root.ln.wf01.wt01");
612+
try (ResultSet resultSet = statement.getResultSet()) {
613+
Assert.assertTrue(resultSet.next());
614+
Assert.assertEquals(lastTimestamp, resultSet.getTimestamp(1).getTime());
615+
}
616+
617+
} catch (Exception e) {
618+
e.printStackTrace();
619+
fail(e.getMessage());
620+
}
621+
}
622+
623+
@Test
624+
public void lastWithLessThanOrEqualToFilterTest() {
625+
try (Connection connection = EnvFactory.getEnv().getConnection();
626+
Statement statement = connection.createStatement()) {
627+
628+
long lastTimestamp;
629+
statement.execute("SELECT LAST temperature FROM root.ln.wf01.wt01");
630+
try (ResultSet resultSet = statement.getResultSet()) {
631+
Assert.assertTrue(resultSet.next());
632+
lastTimestamp = resultSet.getTimestamp(1).getTime();
633+
}
634+
635+
statement.execute("SELECT LAST temperature FROM root.ln.wf01.wt01 WHERE time <= 50");
636+
try (ResultSet resultSet = statement.getResultSet()) {
637+
Assert.assertFalse(resultSet.next());
638+
}
639+
statement.execute("SELECT LAST temperature FROM root.ln.wf01.wt01 WHERE time <= 100");
640+
try (ResultSet resultSet = statement.getResultSet()) {
641+
Assert.assertTrue(resultSet.next());
642+
Assert.assertEquals(100, resultSet.getTimestamp(1).getTime());
643+
Assert.assertFalse(resultSet.wasNull());
644+
Assert.assertEquals("root.ln.wf01.wt01.temperature", resultSet.getString(2));
645+
Assert.assertFalse(resultSet.wasNull());
646+
Assert.assertEquals("25.1", resultSet.getString(3));
647+
Assert.assertFalse(resultSet.wasNull());
648+
Assert.assertEquals("DOUBLE", resultSet.getString(4));
649+
Assert.assertFalse(resultSet.wasNull());
650+
Assert.assertFalse(resultSet.next());
651+
}
652+
653+
// Test if the last value cache is effected.
654+
statement.execute("SELECT LAST temperature FROM root.ln.wf01.wt01");
655+
try (ResultSet resultSet = statement.getResultSet()) {
656+
Assert.assertTrue(resultSet.next());
657+
Assert.assertEquals(lastTimestamp, resultSet.getTimestamp(1).getTime());
658+
}
659+
660+
} catch (Exception e) {
661+
e.printStackTrace();
662+
fail(e.getMessage());
663+
}
664+
}
665+
666+
@Test
667+
public void lastWithCompoundFilterTest() {
668+
try (Connection connection = EnvFactory.getEnv().getConnection();
669+
Statement statement = connection.createStatement()) {
670+
671+
long lastTimestamp;
672+
statement.execute("SELECT LAST temperature FROM root.ln.wf01.wt01");
673+
try (ResultSet resultSet = statement.getResultSet()) {
674+
Assert.assertTrue(resultSet.next());
675+
lastTimestamp = resultSet.getTimestamp(1).getTime();
676+
}
677+
678+
statement.execute(
679+
"SELECT LAST temperature FROM root.ln.wf01.wt01 WHERE time > 100 AND time < 200");
680+
try (ResultSet resultSet = statement.getResultSet()) {
681+
Assert.assertFalse(resultSet.next());
682+
}
683+
684+
statement.execute(
685+
"SELECT LAST temperature FROM root.ln.wf01.wt01 WHERE time > 100 AND time < 200 OR time > 250 AND time <= 300");
686+
try (ResultSet resultSet = statement.getResultSet()) {
687+
Assert.assertTrue(resultSet.next());
688+
Assert.assertEquals(300, resultSet.getTimestamp(1).getTime());
689+
Assert.assertFalse(resultSet.wasNull());
690+
Assert.assertEquals("root.ln.wf01.wt01.temperature", resultSet.getString(2));
691+
Assert.assertFalse(resultSet.wasNull());
692+
Assert.assertEquals("15.7", resultSet.getString(3));
693+
Assert.assertFalse(resultSet.wasNull());
694+
Assert.assertEquals("DOUBLE", resultSet.getString(4));
695+
Assert.assertFalse(resultSet.wasNull());
696+
Assert.assertFalse(resultSet.next());
697+
}
698+
699+
// Test if the last value cache is effected.
700+
statement.execute("SELECT LAST temperature FROM root.ln.wf01.wt01");
701+
try (ResultSet resultSet = statement.getResultSet()) {
702+
Assert.assertTrue(resultSet.next());
703+
Assert.assertEquals(lastTimestamp, resultSet.getTimestamp(1).getTime());
704+
}
705+
706+
} catch (Exception e) {
707+
e.printStackTrace();
708+
fail(e.getMessage());
709+
}
710+
}
711+
579712
private void prepareData() {
580713
try (Connection connection = EnvFactory.getEnv().getConnection();
581714
Statement statement = connection.createStatement()) {

server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,6 @@
2929
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
3030
import org.apache.iotdb.tsfile.read.expression.IExpression;
3131
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
32-
import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeGt;
33-
import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeGtEq;
34-
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
3532

3633
import org.apache.thrift.TException;
3734

@@ -80,16 +77,11 @@ public void setExpression(IExpression expression) throws QueryProcessException {
8077
if (isValidExpression(expression)) {
8178
super.setExpression(expression);
8279
} else {
83-
throw new QueryProcessException("Only '>' and '>=' are supported in LAST query");
80+
throw new QueryProcessException("Only time filters are supported in LAST query");
8481
}
8582
}
8683

87-
// Only > and >= are supported in time filter
8884
private boolean isValidExpression(IExpression expression) {
89-
if (expression instanceof GlobalTimeExpression) {
90-
Filter filter = ((GlobalTimeExpression) expression).getFilter();
91-
return filter instanceof TimeGtEq || filter instanceof TimeGt;
92-
}
93-
return false;
85+
return expression instanceof GlobalTimeExpression;
9486
}
9587
}

0 commit comments

Comments
 (0)