Skip to content

Commit 3e45137

Browse files
committed
replace compact in InnerSpaceCompactionUtils with
ReadChunkCompactionPerformer
1 parent 7424fd6 commit 3e45137

12 files changed

Lines changed: 194 additions & 198 deletions

server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.iotdb.db.engine.compaction.inner.AbstractInnerSpaceCompactionTask;
2525
import org.apache.iotdb.db.engine.compaction.inner.utils.InnerSpaceCompactionUtils;
2626
import org.apache.iotdb.db.engine.compaction.log.CompactionLogger;
27+
import org.apache.iotdb.db.engine.compaction.performer.AbstractCompactionPerformer;
28+
import org.apache.iotdb.db.engine.compaction.performer.ReadChunkCompactionPerformer;
2729
import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask;
2830
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
2931
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
@@ -51,6 +53,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
5153
protected TsFileResourceList tsFileResourceList;
5254
protected boolean[] isHoldingReadLock;
5355
protected boolean[] isHoldingWriteLock;
56+
protected AbstractCompactionPerformer performer;
5457

5558
public SizeTieredCompactionTask(
5659
String logicalStorageGroupName,
@@ -116,7 +119,9 @@ protected void doCompaction() throws Exception {
116119

117120
// carry out the compaction
118121
if (sequence) {
119-
InnerSpaceCompactionUtils.compact(targetTsFileResource, selectedTsFileResourceList);
122+
performer =
123+
new ReadChunkCompactionPerformer(selectedTsFileResourceList, targetTsFileResource);
124+
performer.perform();
120125
} else {
121126
CompactionUtils.compact(
122127
Collections.emptyList(),

server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java

Lines changed: 0 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -27,34 +27,19 @@
2727
import org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.RewriteCompactionFileSelector;
2828
import org.apache.iotdb.db.engine.modification.Modification;
2929
import org.apache.iotdb.db.engine.modification.ModificationFile;
30-
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
3130
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
3231
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
33-
import org.apache.iotdb.db.exception.metadata.MetadataException;
34-
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
35-
import org.apache.iotdb.db.metadata.idtable.IDTableManager;
36-
import org.apache.iotdb.db.metadata.path.PartialPath;
3732
import org.apache.iotdb.db.query.control.FileReaderManager;
38-
import org.apache.iotdb.db.service.IoTDB;
3933
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
40-
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
41-
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
4234
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
43-
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
44-
import org.apache.iotdb.tsfile.utils.Pair;
45-
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
46-
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
4735

48-
import org.apache.commons.io.FileUtils;
4936
import org.slf4j.Logger;
5037
import org.slf4j.LoggerFactory;
5138

5239
import java.io.File;
5340
import java.io.IOException;
5441
import java.util.ArrayList;
5542
import java.util.Collection;
56-
import java.util.Comparator;
57-
import java.util.LinkedList;
5843
import java.util.List;
5944

6045
public class InnerSpaceCompactionUtils {
@@ -66,93 +51,6 @@ private InnerSpaceCompactionUtils() {
6651
throw new IllegalStateException("Utility class");
6752
}
6853

69-
public static void compact(TsFileResource targetResource, List<TsFileResource> tsFileResources)
70-
throws IOException, MetadataException, InterruptedException {
71-
72-
try (MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(tsFileResources);
73-
TsFileIOWriter writer = new TsFileIOWriter(targetResource.getTsFile())) {
74-
while (deviceIterator.hasNextDevice()) {
75-
Pair<String, Boolean> deviceInfo = deviceIterator.nextDevice();
76-
String device = deviceInfo.left;
77-
boolean aligned = deviceInfo.right;
78-
79-
writer.startChunkGroup(device);
80-
if (aligned) {
81-
compactAlignedSeries(device, targetResource, writer, deviceIterator);
82-
} else {
83-
compactNotAlignedSeries(device, targetResource, writer, deviceIterator);
84-
}
85-
writer.endChunkGroup();
86-
}
87-
88-
for (TsFileResource tsFileResource : tsFileResources) {
89-
targetResource.updatePlanIndexes(tsFileResource);
90-
}
91-
writer.endFile();
92-
targetResource.close();
93-
}
94-
}
95-
96-
private static void checkThreadInterrupted(TsFileResource tsFileResource)
97-
throws InterruptedException {
98-
if (Thread.currentThread().isInterrupted()) {
99-
throw new InterruptedException(
100-
String.format(
101-
"[Compaction] compaction for target file %s abort", tsFileResource.toString()));
102-
}
103-
}
104-
105-
private static void compactNotAlignedSeries(
106-
String device,
107-
TsFileResource targetResource,
108-
TsFileIOWriter writer,
109-
MultiTsFileDeviceIterator deviceIterator)
110-
throws IOException, MetadataException, InterruptedException {
111-
MultiTsFileDeviceIterator.MeasurementIterator seriesIterator =
112-
deviceIterator.iterateNotAlignedSeries(device, true);
113-
while (seriesIterator.hasNextSeries()) {
114-
checkThreadInterrupted(targetResource);
115-
// TODO: we can provide a configuration item to enable concurrent between each series
116-
PartialPath p = new PartialPath(device, seriesIterator.nextSeries());
117-
IMeasurementSchema measurementSchema;
118-
// TODO: seriesIterator needs to be refactor.
119-
// This statement must be called before next hasNextSeries() called, or it may be trapped in a
120-
// dead-loop.
121-
LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList =
122-
seriesIterator.getMetadataListForCurrentSeries();
123-
try {
124-
if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) {
125-
measurementSchema =
126-
IDTableManager.getInstance().getSeriesSchema(device, p.getMeasurement());
127-
} else {
128-
measurementSchema = IoTDB.schemaProcessor.getSeriesSchema(p);
129-
}
130-
} catch (PathNotExistException e) {
131-
logger.info("A deleted path is skipped: {}", e.getMessage());
132-
continue;
133-
}
134-
SingleSeriesCompactionExecutor compactionExecutorOfCurrentTimeSeries =
135-
new SingleSeriesCompactionExecutor(
136-
p, measurementSchema, readerAndChunkMetadataList, writer, targetResource);
137-
compactionExecutorOfCurrentTimeSeries.execute();
138-
}
139-
}
140-
141-
private static void compactAlignedSeries(
142-
String device,
143-
TsFileResource targetResource,
144-
TsFileIOWriter writer,
145-
MultiTsFileDeviceIterator deviceIterator)
146-
throws IOException, InterruptedException {
147-
checkThreadInterrupted(targetResource);
148-
LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>> readerAndChunkMetadataList =
149-
deviceIterator.getReaderAndChunkMetadataForCurrentAlignedSeries();
150-
AlignedSeriesCompactionExecutor compactionExecutor =
151-
new AlignedSeriesCompactionExecutor(
152-
device, targetResource, readerAndChunkMetadataList, writer);
153-
compactionExecutor.execute();
154-
}
155-
15654
public static boolean deleteTsFilesInDisk(
15755
Collection<TsFileResource> mergeTsFiles, String storageGroupName) {
15856
logger.info("{} [Compaction] Compaction starts to delete real file ", storageGroupName);
@@ -185,34 +83,6 @@ public static void deleteModificationForSourceFile(
18583
}
18684
}
18785

188-
/**
189-
* This method is called to recover modifications while an exception occurs during compaction. It
190-
* append new modifications of each selected tsfile to its corresponding old mods file and delete
191-
* the compaction mods file.
192-
*
193-
* @param selectedTsFileResources
194-
* @throws IOException
195-
*/
196-
public static void appendNewModificationsToOldModsFile(
197-
List<TsFileResource> selectedTsFileResources) throws IOException {
198-
for (TsFileResource sourceFile : selectedTsFileResources) {
199-
// if there are modifications to this seqFile during compaction
200-
if (sourceFile.getCompactionModFile().exists()) {
201-
ModificationFile compactionModificationFile =
202-
ModificationFile.getCompactionMods(sourceFile);
203-
Collection<Modification> newModification = compactionModificationFile.getModifications();
204-
compactionModificationFile.close();
205-
// write the new modifications to its old modification file
206-
try (ModificationFile oldModificationFile = sourceFile.getModFile()) {
207-
for (Modification modification : newModification) {
208-
oldModificationFile.write(modification);
209-
}
210-
}
211-
FileUtils.delete(new File(ModificationFile.getCompactionMods(sourceFile).getFilePath()));
212-
}
213-
}
214-
}
215-
21686
/**
21787
* Collect all the compaction modification files of source files, and combines them as the
21888
* modification file of target file.
@@ -262,13 +132,6 @@ public static ICrossSpaceMergeFileSelector getCrossSpaceFileSelector(
262132
}
263133
}
264134

265-
public static class TsFileNameComparator implements Comparator<TsFileSequenceReader> {
266-
267-
@Override
268-
public int compare(TsFileSequenceReader o1, TsFileSequenceReader o2) {
269-
return TsFileManager.compareFileName(new File(o1.getFileName()), new File(o2.getFileName()));
270-
}
271-
}
272135
/**
273136
* Update the targetResource. Move xxx.target to xxx.tsfile and serialize xxx.tsfile.resource .
274137
*

server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/AbstractCompactionPerformer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.iotdb.db.engine.compaction.performer;
2020

2121
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
22+
import org.apache.iotdb.db.exception.metadata.MetadataException;
2223

2324
import java.io.IOException;
2425
import java.util.List;
@@ -33,5 +34,5 @@ public abstract class AbstractCompactionPerformer {
3334
protected List<TsFileResource> seqFiles;
3435
protected List<TsFileResource> unseqFiles;
3536

36-
public abstract void perform() throws IOException;
37+
public abstract void perform() throws IOException, MetadataException, InterruptedException;
3738
}

server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/ReadChunkCompactionPerformer.java

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,123 @@
1818
*/
1919
package org.apache.iotdb.db.engine.compaction.performer;
2020

21+
import org.apache.iotdb.commons.conf.IoTDBConstant;
22+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
23+
import org.apache.iotdb.db.engine.compaction.inner.utils.AlignedSeriesCompactionExecutor;
24+
import org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
25+
import org.apache.iotdb.db.engine.compaction.inner.utils.SingleSeriesCompactionExecutor;
26+
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
27+
import org.apache.iotdb.db.exception.metadata.MetadataException;
28+
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
29+
import org.apache.iotdb.db.metadata.idtable.IDTableManager;
30+
import org.apache.iotdb.db.metadata.path.PartialPath;
31+
import org.apache.iotdb.db.service.IoTDB;
32+
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
33+
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
34+
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
35+
import org.apache.iotdb.tsfile.utils.Pair;
36+
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
37+
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
38+
39+
import org.slf4j.Logger;
40+
import org.slf4j.LoggerFactory;
41+
2142
import java.io.IOException;
43+
import java.util.LinkedList;
44+
import java.util.List;
2245

2346
public class ReadChunkCompactionPerformer extends AbstractCompactionPerformer {
47+
private static final Logger LOGGER =
48+
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
49+
private TsFileResource targetResource;
50+
51+
public ReadChunkCompactionPerformer(List<TsFileResource> sourceFiles, TsFileResource targetFile) {
52+
this.seqFiles = sourceFiles;
53+
this.targetResource = targetFile;
54+
}
55+
2456
@Override
25-
public void perform() throws IOException {}
57+
public void perform() throws IOException, MetadataException, InterruptedException {
58+
try (MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(seqFiles);
59+
TsFileIOWriter writer = new TsFileIOWriter(targetResource.getTsFile())) {
60+
while (deviceIterator.hasNextDevice()) {
61+
Pair<String, Boolean> deviceInfo = deviceIterator.nextDevice();
62+
String device = deviceInfo.left;
63+
boolean aligned = deviceInfo.right;
64+
65+
writer.startChunkGroup(device);
66+
if (aligned) {
67+
compactAlignedSeries(device, targetResource, writer, deviceIterator);
68+
} else {
69+
compactNotAlignedSeries(device, targetResource, writer, deviceIterator);
70+
}
71+
writer.endChunkGroup();
72+
}
73+
74+
for (TsFileResource tsFileResource : seqFiles) {
75+
targetResource.updatePlanIndexes(tsFileResource);
76+
}
77+
writer.endFile();
78+
targetResource.close();
79+
}
80+
}
81+
82+
private void compactAlignedSeries(
83+
String device,
84+
TsFileResource targetResource,
85+
TsFileIOWriter writer,
86+
MultiTsFileDeviceIterator deviceIterator)
87+
throws IOException, InterruptedException {
88+
checkThreadInterrupted();
89+
LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>> readerAndChunkMetadataList =
90+
deviceIterator.getReaderAndChunkMetadataForCurrentAlignedSeries();
91+
AlignedSeriesCompactionExecutor compactionExecutor =
92+
new AlignedSeriesCompactionExecutor(
93+
device, targetResource, readerAndChunkMetadataList, writer);
94+
compactionExecutor.execute();
95+
}
96+
97+
private void checkThreadInterrupted() throws InterruptedException {
98+
if (Thread.currentThread().isInterrupted()) {
99+
throw new InterruptedException(
100+
String.format(
101+
"[Compaction] compaction for target file %s abort", targetResource.toString()));
102+
}
103+
}
104+
105+
private void compactNotAlignedSeries(
106+
String device,
107+
TsFileResource targetResource,
108+
TsFileIOWriter writer,
109+
MultiTsFileDeviceIterator deviceIterator)
110+
throws IOException, MetadataException, InterruptedException {
111+
MultiTsFileDeviceIterator.MeasurementIterator seriesIterator =
112+
deviceIterator.iterateNotAlignedSeries(device, true);
113+
while (seriesIterator.hasNextSeries()) {
114+
checkThreadInterrupted();
115+
// TODO: we can provide a configuration item to enable concurrent between each series
116+
PartialPath p = new PartialPath(device, seriesIterator.nextSeries());
117+
IMeasurementSchema measurementSchema;
118+
// TODO: seriesIterator needs to be refactor.
119+
// This statement must be called before next hasNextSeries() called, or it may be trapped in a
120+
// dead-loop.
121+
LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList =
122+
seriesIterator.getMetadataListForCurrentSeries();
123+
try {
124+
if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) {
125+
measurementSchema =
126+
IDTableManager.getInstance().getSeriesSchema(device, p.getMeasurement());
127+
} else {
128+
measurementSchema = IoTDB.schemaProcessor.getSeriesSchema(p);
129+
}
130+
} catch (PathNotExistException e) {
131+
LOGGER.info("A deleted path is skipped: {}", e.getMessage());
132+
continue;
133+
}
134+
SingleSeriesCompactionExecutor compactionExecutorOfCurrentTimeSeries =
135+
new SingleSeriesCompactionExecutor(
136+
p, measurementSchema, readerAndChunkMetadataList, writer, targetResource);
137+
compactionExecutorOfCurrentTimeSeries.execute();
138+
}
139+
}
26140
}

server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.db.engine.cache.ChunkCache;
2424
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
2525
import org.apache.iotdb.db.engine.compaction.inner.utils.InnerSpaceCompactionUtils;
26+
import org.apache.iotdb.db.engine.compaction.performer.ReadChunkCompactionPerformer;
2627
import org.apache.iotdb.db.engine.compaction.utils.CompactionCheckerUtils;
2728
import org.apache.iotdb.db.engine.compaction.utils.CompactionClearUtils;
2829
import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer;
@@ -227,7 +228,7 @@ public void testDeserializePage() throws MetadataException, IOException {
227228
timeValuePair.getTimestamp() >= 250L
228229
&& timeValuePair.getTimestamp() <= 300L);
229230
}
230-
InnerSpaceCompactionUtils.compact(targetTsFileResource, sourceResources);
231+
new ReadChunkCompactionPerformer(sourceResources, targetTsFileResource).perform();
231232
InnerSpaceCompactionUtils.moveTargetFile(targetTsFileResource, COMPACTION_TEST_SG);
232233
InnerSpaceCompactionUtils.combineModsInCompaction(
233234
sourceResources, targetTsFileResource);
@@ -452,7 +453,7 @@ public void testAppendPage() throws IOException, MetadataException, InterruptedE
452453
timeValuePair ->
453454
timeValuePair.getTimestamp() >= 250L && timeValuePair.getTimestamp() <= 300L);
454455
}
455-
InnerSpaceCompactionUtils.compact(targetTsFileResource, toMergeResources);
456+
new ReadChunkCompactionPerformer(toMergeResources, targetTsFileResource).perform();
456457
InnerSpaceCompactionUtils.moveTargetFile(targetTsFileResource, COMPACTION_TEST_SG);
457458
InnerSpaceCompactionUtils.combineModsInCompaction(
458459
toMergeResources, targetTsFileResource);
@@ -727,7 +728,7 @@ public void testAppendChunk() throws IOException, IllegalPathException, Metadata
727728
timeValuePair.getTimestamp() >= 250L
728729
&& timeValuePair.getTimestamp() <= 300L);
729730
}
730-
InnerSpaceCompactionUtils.compact(targetTsFileResource, toMergeResources);
731+
new ReadChunkCompactionPerformer(toMergeResources, targetTsFileResource).perform();
731732
InnerSpaceCompactionUtils.moveTargetFile(targetTsFileResource, COMPACTION_TEST_SG);
732733
InnerSpaceCompactionUtils.combineModsInCompaction(
733734
toMergeResources, targetTsFileResource);

0 commit comments

Comments
 (0)