Skip to content

Commit 9bf8544

Browse files
committed
Adjust to call the method that supports obtaining data region configurations at the database granularity
1 parent 6f16c8c commit 9bf8544

File tree

11 files changed

+65
-56
lines changed

11 files changed

+65
-56
lines changed

integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBTimePartitionIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public void testTimePartition() throws Exception {
142142
}
143143
timestatmps.forEach(
144144
t -> {
145-
long timePartitionId = TimePartitionUtils.getTimePartitionId(t);
145+
long timePartitionId = TimePartitionUtils.getTimePartitionId(t, "root.sg1");
146146
assertTrue(timePartitions.contains(timePartitionId));
147147
});
148148
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3428,7 +3428,7 @@ public SettableFuture<ConfigTaskResult> getTimeSlotList(
34283428
} catch (final Exception e) {
34293429
future.setException(e);
34303430
}
3431-
GetTimeSlotListTask.buildTSBlock(resp, future);
3431+
GetTimeSlotListTask.buildTSBlock(resp, future, getTimeSlotListStatement.getDatabase());
34323432
return future;
34333433
}
34343434

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/GetTimeSlotListTask.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,13 @@ public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTask
5858
return configTaskExecutor.getTimeSlotList(getTimeSlotListStatement);
5959
}
6060

61-
public static void buildTSBlockRow(TsBlockBuilder builder, TTimePartitionSlot timePartitionSlot) {
61+
public static void buildTSBlockRow(
62+
TsBlockBuilder builder, TTimePartitionSlot timePartitionSlot, String database) {
6263
builder.getTimeColumnBuilder().writeLong(0L);
6364
builder
6465
.getColumnBuilder(0)
65-
.writeLong(TimePartitionUtils.getTimePartitionId(timePartitionSlot.getStartTime()));
66+
.writeLong(
67+
TimePartitionUtils.getTimePartitionId(timePartitionSlot.getStartTime(), database));
6668
builder
6769
.getColumnBuilder(1)
6870
.writeBinary(
@@ -73,14 +75,16 @@ public static void buildTSBlockRow(TsBlockBuilder builder, TTimePartitionSlot ti
7375
}
7476

7577
public static void buildTSBlock(
76-
TGetTimeSlotListResp getTimeSlotListResp, SettableFuture<ConfigTaskResult> future) {
78+
TGetTimeSlotListResp getTimeSlotListResp,
79+
SettableFuture<ConfigTaskResult> future,
80+
String database) {
7781
List<TSDataType> outputDataTypes =
7882
ColumnHeaderConstant.getTimeSlotListColumnHeaders.stream()
7983
.map(ColumnHeader::getColumnType)
8084
.collect(Collectors.toList());
8185
TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
8286

83-
getTimeSlotListResp.getTimeSlotList().forEach(e -> buildTSBlockRow(builder, e));
87+
getTimeSlotListResp.getTimeSlotList().forEach(e -> buildTSBlockRow(builder, e, database));
8488

8589
DatasetHeader datasetHeader = DatasetHeaderFactory.getGetTimeSlotListHeader();
8690
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, builder.build(), datasetHeader));

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1236,7 +1236,8 @@ private List<PlanNode> splitInnerTimeJoinNode(
12361236
InnerTimeJoinNode innerTimeJoinNode = (InnerTimeJoinNode) node.clone();
12371237
innerTimeJoinNode.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
12381238

1239-
List<Long> timePartitionIds = convertToTimePartitionIds(oneRegion);
1239+
List<Long> timePartitionIds =
1240+
convertToTimePartitionIds(oneRegion, analysis.getDatabaseName());
12401241
innerTimeJoinNode.setTimePartitions(timePartitionIds);
12411242

12421243
// region group id -> parent InnerTimeJoinNode
@@ -1285,10 +1286,11 @@ private List<PlanNode> splitInnerTimeJoinNode(
12851286
return subInnerJoinNode;
12861287
}
12871288

1288-
private List<Long> convertToTimePartitionIds(List<TTimePartitionSlot> timePartitionSlotList) {
1289+
private List<Long> convertToTimePartitionIds(
1290+
List<TTimePartitionSlot> timePartitionSlotList, String database) {
12891291
List<Long> res = new ArrayList<>(timePartitionSlotList.size());
12901292
for (TTimePartitionSlot timePartitionSlot : timePartitionSlotList) {
1291-
res.add(TimePartitionUtils.getTimePartitionId(timePartitionSlot.startTime));
1293+
res.add(TimePartitionUtils.getTimePartitionId(timePartitionSlot.startTime, database));
12921294
}
12931295
return res;
12941296
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,11 +268,11 @@ private long[] enLargeArray(long[] array, long defaultValue) {
268268
}
269269

270270
@Override
271-
public long getTimePartition(String tsFilePath) {
271+
public long getTimePartition(String tsFilePath, String database) {
272272
try {
273273
if (deviceToIndex != null && !deviceToIndex.isEmpty()) {
274274
return TimePartitionUtils.getTimePartitionId(
275-
startTimes[deviceToIndex.values().iterator().next()]);
275+
startTimes[deviceToIndex.values().iterator().next()], database);
276276
}
277277
String[] filePathSplits = FilePathUtils.splitTsFilePath(tsFilePath);
278278
return Long.parseLong(filePathSplits[filePathSplits.length - 2]);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,10 @@ ITimeIndex deserialize(InputStream inputStream, IDeviceID.Deserializer deseriali
102102
* get time partition
103103
*
104104
* @param tsFilePath tsFile absolute path
105+
* @param database database name
105106
* @return partition
106107
*/
107-
long getTimePartition(String tsFilePath);
108+
long getTimePartition(String tsFilePath, String database);
108109

109110
/**
110111
* get time partition with check. If data of tsFile spans partitions, an exception will be thrown

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileSplitByPartitionTool.java

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public static void rewriteTsFile(
119119
throws IOException, WriteProcessException, IllegalPathException {
120120
try (TsFileSplitByPartitionTool rewriteTool =
121121
new TsFileSplitByPartitionTool(resourceToBeRewritten)) {
122-
rewriteTool.parseAndRewriteFile(rewrittenResources);
122+
rewriteTool.parseAndRewriteFile(rewrittenResources, resourceToBeRewritten.getDatabaseName());
123123
}
124124
}
125125

@@ -134,7 +134,7 @@ public void close() throws IOException {
134134
* @throws IOException WriteProcessException
135135
*/
136136
@SuppressWarnings({"squid:S3776", "deprecation"}) // Suppress high Cognitive Complexity warning
137-
public void parseAndRewriteFile(List<TsFileResource> rewrittenResources)
137+
public void parseAndRewriteFile(List<TsFileResource> rewrittenResources, String database)
138138
throws IOException, WriteProcessException, IllegalPathException {
139139
// check if the TsFile has correct header
140140
if (!fileCheck()) {
@@ -181,7 +181,8 @@ public void parseAndRewriteFile(List<TsFileResource> rewrittenResources)
181181
// a new Page
182182
PageHeader pageHeader =
183183
reader.readPageHeader(dataType, header.getChunkType() == MetaMarker.CHUNK_HEADER);
184-
boolean needToDecode = checkIfNeedToDecode(measurementSchema, deviceId, pageHeader);
184+
boolean needToDecode =
185+
checkIfNeedToDecode(measurementSchema, deviceId, pageHeader, database);
185186
needToDecodeInfo.add(needToDecode);
186187
ByteBuffer pageData =
187188
!needToDecode
@@ -198,7 +199,8 @@ public void parseAndRewriteFile(List<TsFileResource> rewrittenResources)
198199
pageHeadersInChunk,
199200
dataInChunk,
200201
needToDecodeInfo,
201-
chunkHeaderOffset);
202+
chunkHeaderOffset,
203+
database);
202204
firstChunkInChunkGroup = false;
203205
break;
204206
case MetaMarker.OPERATION_INDEX_RANGE:
@@ -248,7 +250,7 @@ public void parseAndRewriteFile(List<TsFileResource> rewrittenResources)
248250
* false.
249251
*/
250252
protected boolean checkIfNeedToDecode(
251-
MeasurementSchema schema, IDeviceID deviceId, PageHeader pageHeader) {
253+
MeasurementSchema schema, IDeviceID deviceId, PageHeader pageHeader, String database) {
252254
if (pageHeader.getStatistics() == null) {
253255
return true;
254256
}
@@ -267,8 +269,8 @@ protected boolean checkIfNeedToDecode(
267269
}
268270
}
269271
}
270-
return TimePartitionUtils.getTimePartitionId(pageHeader.getStartTime())
271-
!= TimePartitionUtils.getTimePartitionId(pageHeader.getEndTime());
272+
return TimePartitionUtils.getTimePartitionId(pageHeader.getStartTime(), database)
273+
!= TimePartitionUtils.getTimePartitionId(pageHeader.getEndTime(), database);
272274
}
273275

274276
/**
@@ -283,17 +285,27 @@ protected void reWriteChunk(
283285
List<PageHeader> pageHeadersInChunk,
284286
List<ByteBuffer> pageDataInChunk,
285287
List<Boolean> needToDecodeInfoInChunk,
286-
long chunkHeaderOffset)
288+
long chunkHeaderOffset,
289+
String database)
287290
throws IOException, PageException, IllegalPathException {
288291
valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType());
289292
Map<Long, ChunkWriterImpl> partitionChunkWriterMap = new HashMap<>();
290293
for (int i = 0; i < pageDataInChunk.size(); i++) {
291294
if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(i))) {
292295
decodeAndWritePage(
293-
deviceId, schema, pageDataInChunk.get(i), partitionChunkWriterMap, chunkHeaderOffset);
296+
deviceId,
297+
schema,
298+
pageDataInChunk.get(i),
299+
partitionChunkWriterMap,
300+
chunkHeaderOffset,
301+
database);
294302
} else {
295303
writePage(
296-
schema, pageHeadersInChunk.get(i), pageDataInChunk.get(i), partitionChunkWriterMap);
304+
schema,
305+
pageHeadersInChunk.get(i),
306+
pageDataInChunk.get(i),
307+
partitionChunkWriterMap,
308+
database);
297309
}
298310
}
299311
for (Entry<Long, ChunkWriterImpl> entry : partitionChunkWriterMap.entrySet()) {
@@ -352,9 +364,10 @@ protected void writePage(
352364
MeasurementSchema schema,
353365
PageHeader pageHeader,
354366
ByteBuffer pageData,
355-
Map<Long, ChunkWriterImpl> partitionChunkWriterMap)
367+
Map<Long, ChunkWriterImpl> partitionChunkWriterMap,
368+
String database)
356369
throws PageException {
357-
long partitionId = TimePartitionUtils.getTimePartitionId(pageHeader.getStartTime());
370+
long partitionId = TimePartitionUtils.getTimePartitionId(pageHeader.getStartTime(), database);
358371
getOrDefaultTsFileIOWriter(oldTsFile, partitionId);
359372
ChunkWriterImpl chunkWriter =
360373
partitionChunkWriterMap.computeIfAbsent(partitionId, v -> new ChunkWriterImpl(schema));
@@ -366,7 +379,8 @@ protected void decodeAndWritePage(
366379
MeasurementSchema schema,
367380
ByteBuffer pageData,
368381
Map<Long, ChunkWriterImpl> partitionChunkWriterMap,
369-
long chunkHeaderOffset)
382+
long chunkHeaderOffset,
383+
String database)
370384
throws IOException, IllegalPathException {
371385
valueDecoder.reset();
372386
PageReader pageReader =
@@ -375,7 +389,7 @@ protected void decodeAndWritePage(
375389
List<TimeRange> deleteIntervalList = getOldSortedDeleteIntervals(deviceId, schema);
376390
pageReader.setDeleteIntervalList(deleteIntervalList);
377391
BatchData batchData = pageReader.getAllSatisfiedPageData();
378-
rewritePageIntoFiles(batchData, schema, partitionChunkWriterMap);
392+
rewritePageIntoFiles(batchData, schema, partitionChunkWriterMap, database);
379393
}
380394

381395
private List<TimeRange> getOldSortedDeleteIntervals(
@@ -400,11 +414,12 @@ private List<TimeRange> getOldSortedDeleteIntervals(
400414
protected void rewritePageIntoFiles(
401415
BatchData batchData,
402416
MeasurementSchema schema,
403-
Map<Long, ChunkWriterImpl> partitionChunkWriterMap) {
417+
Map<Long, ChunkWriterImpl> partitionChunkWriterMap,
418+
String database) {
404419
while (batchData.hasCurrent()) {
405420
long time = batchData.currentTime();
406421
Object value = batchData.currentValue();
407-
long partitionId = TimePartitionUtils.getTimePartitionId(time);
422+
long partitionId = TimePartitionUtils.getTimePartitionId(time, database);
408423

409424
ChunkWriterImpl chunkWriter =
410425
partitionChunkWriterMap.computeIfAbsent(partitionId, v -> new ChunkWriterImpl(schema));

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/settle/TsFileAndModSettleTool.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,8 @@ public void settleOneTsFileAndMod(
214214
}
215215
try (TsFileSplitByPartitionTool tsFileRewriteTool =
216216
new TsFileSplitByPartitionTool(resourceToBeSettled)) {
217-
tsFileRewriteTool.parseAndRewriteFile(settledResources);
217+
tsFileRewriteTool.parseAndRewriteFile(
218+
settledResources, resourceToBeSettled.getDatabaseName());
218219
}
219220
}
220221

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,7 @@ private void initDataPartitionMap() {
113113
for (int i = 0; i < seriesSlotPartitionNum; i++) {
114114
Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionSlotMap = new HashMap<>();
115115
for (int t = -2; t < 5; t++) {
116-
@SuppressWarnings("deprecation")
117-
long startTime = t * TimePartitionUtils.getTimePartitionInterval() + 1;
116+
long startTime = t * TimePartitionUtils.getTimePartitionInterval("root.sg1") + 1;
118117
timePartitionSlotMap.put(
119118
TimePartitionUtils.getTimePartitionSlot(startTime, "root.sg1"),
120119
Collections.singletonList(
@@ -365,8 +364,7 @@ public void testInsertRowsNode() throws IllegalPathException {
365364
for (int i = 0; i < 7; i++) {
366365
InsertRowNode insertRowNode = new InsertRowNode(new PlanNodeId("plan node 3"));
367366
insertRowNode.setTargetPath(new PartialPath(String.format("root.sg1.d%d", i)));
368-
@SuppressWarnings("deprecation")
369-
long interval = TimePartitionUtils.getTimePartitionInterval();
367+
long interval = TimePartitionUtils.getTimePartitionInterval("root.sg1");
370368
insertRowNode.setTime((i - 2) * interval);
371369
insertRowsNode.addOneInsertRowNode(insertRowNode, 2 * i);
372370

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/StorageEngineTest.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -74,17 +74,16 @@ public void testGetAllDataRegionIds() throws Exception {
7474

7575
@Test
7676
public void testGetTimePartitionId() {
77-
@SuppressWarnings("deprecation")
78-
long timePartitionInterval = TimePartitionUtils.getTimePartitionInterval();
79-
@SuppressWarnings("deprecation")
80-
long interval = TimePartitionUtils.getTimePartitionInterval();
81-
Assert.assertEquals(-2, TimePartitionUtils.getTimePartitionId(-interval - 1));
82-
Assert.assertEquals(-1, TimePartitionUtils.getTimePartitionId(-interval));
83-
Assert.assertEquals(-1, TimePartitionUtils.getTimePartitionId(-1));
84-
Assert.assertEquals(0, TimePartitionUtils.getTimePartitionId(0));
85-
Assert.assertEquals(0, TimePartitionUtils.getTimePartitionId(1));
86-
Assert.assertEquals(0, TimePartitionUtils.getTimePartitionId(interval / 2));
87-
Assert.assertEquals(1, TimePartitionUtils.getTimePartitionId(interval * 2 - 1));
88-
Assert.assertEquals(2, TimePartitionUtils.getTimePartitionId(timePartitionInterval * 2 + 1));
77+
long timePartitionInterval = TimePartitionUtils.getTimePartitionInterval("root.db");
78+
long interval = TimePartitionUtils.getTimePartitionInterval("root.db");
79+
Assert.assertEquals(-2, TimePartitionUtils.getTimePartitionId(-interval - 1, "root.db"));
80+
Assert.assertEquals(-1, TimePartitionUtils.getTimePartitionId(-interval, "root.db"));
81+
Assert.assertEquals(-1, TimePartitionUtils.getTimePartitionId(-1, "root.db"));
82+
Assert.assertEquals(0, TimePartitionUtils.getTimePartitionId(0, "root.db"));
83+
Assert.assertEquals(0, TimePartitionUtils.getTimePartitionId(1, "root.db"));
84+
Assert.assertEquals(0, TimePartitionUtils.getTimePartitionId(interval / 2, "root.db"));
85+
Assert.assertEquals(1, TimePartitionUtils.getTimePartitionId(interval * 2 - 1, "root.db"));
86+
Assert.assertEquals(
87+
2, TimePartitionUtils.getTimePartitionId(timePartitionInterval * 2 + 1, "root.db"));
8988
}
9089
}

0 commit comments

Comments
 (0)