Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
de3923a
temp
THUMarkLau Aug 31, 2022
dca1b4c
finish the metadata flush and read process in memory control writer a…
THUMarkLau Sep 1, 2022
adecc61
temp for external sort
THUMarkLau Sep 5, 2022
eda85f5
temp
THUMarkLau Sep 6, 2022
1afdabe
finish memory control tsfile io writer
THUMarkLau Sep 6, 2022
c99d54a
refactor some codes
THUMarkLau Sep 6, 2022
0a0a420
fix some bugs, and add some test
THUMarkLau Sep 7, 2022
96e3749
recover TsFileSequenceRead
THUMarkLau Sep 7, 2022
baa9b2a
finish the verify utils for vector data
THUMarkLau Sep 8, 2022
c3a470d
finish UT for aligned series
THUMarkLau Sep 8, 2022
a25aad0
refactor some codes
THUMarkLau Sep 9, 2022
db30604
add some comment
THUMarkLau Sep 9, 2022
f6372f8
recover aligned chunk metadata
THUMarkLau Sep 9, 2022
432f717
refactor some code
THUMarkLau Sep 9, 2022
2c16259
Use MemoryControlTsFileIOWriter in ReadChunkCompactionPerformer
THUMarkLau Sep 9, 2022
9bb5456
Use MemoryControlTsFileIOWriter in ReadPointCompactionPerformer
THUMarkLau Sep 9, 2022
0bd5a4c
control chunk metadata size in ReadChunkCompactionPerformer
THUMarkLau Sep 12, 2022
c1430b8
add log for debug
THUMarkLau Sep 13, 2022
97c34e9
fix npe bug
THUMarkLau Sep 13, 2022
8859ff2
temp for refactor
THUMarkLau Sep 15, 2022
2612b3a
fix deserialize bug
THUMarkLau Sep 15, 2022
4b4a198
fix bug and add some test
THUMarkLau Sep 16, 2022
5088420
remove MemoryControlTsFileIOWriter
THUMarkLau Sep 16, 2022
cbd9061
add ci (#7353)
choubenson Sep 16, 2022
941ee35
adapt chunk metadata size control for writing
THUMarkLau Sep 16, 2022
d2d21ff
Merge branch 'IOTDB-4251' of github.com:apache/iotdb into IOTDB-4251
THUMarkLau Sep 16, 2022
77e156c
merge with master
THUMarkLau Sep 19, 2022
88e67bb
check memory path count and and some comments
THUMarkLau Sep 19, 2022
7ceeb2c
fix order bug in TSMIterator
THUMarkLau Sep 19, 2022
8be3011
fix flush order bug
THUMarkLau Sep 19, 2022
23ed0b9
fix ci
THUMarkLau Sep 19, 2022
f004858
fix ci
THUMarkLau Sep 20, 2022
5cccdab
adjust according to review
THUMarkLau Sep 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions server/src/assembly/resources/conf/iotdb-datanode.properties
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,10 @@ timestamp_precision=ms
# Datatype: int
# primitive_array_size=32

# the percentage of write memory for chunk metadata remains in a single file writer when flushing memtable
# Datatype: double
# chunk_metadata_size_proportion_in_write=0.1

# Ratio of write memory for invoking flush disk, 0.4 by default
# If you have extremely high write load (like batch=1000), it can be set lower than the default value like 0.2
# Datatype: double
Expand Down Expand Up @@ -576,6 +580,10 @@ timestamp_precision=ms
# BALANCE: alternate two compaction types
# compaction_priority=BALANCE

# size proportion for chunk metadata maintains in memory when compacting
# Datatype: double
# chunk_metadata_size_proportion_in_compaction=0.05

# The target tsfile size in compaction
# Datatype: long, Unit: byte
# target_compaction_file_size=1073741824
Expand Down
21 changes: 21 additions & 0 deletions server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ public class IoTDBConfig {
/** The proportion of write memory for memtable */
private double writeProportion = 0.8;

private double chunkMetadataSizeProportionInWrite = 0.1;

/** The proportion of write memory for compaction */
private double compactionProportion = 0.2;

Expand Down Expand Up @@ -434,6 +436,8 @@ public class IoTDBConfig {
*/
private CompactionPriority compactionPriority = CompactionPriority.BALANCE;

private double chunkMetadataSizeProportionInCompaction = 0.05;

/** The target tsfile size in compaction, 1 GB by default */
private long targetCompactionFileSize = 1073741824L;

Expand Down Expand Up @@ -3163,6 +3167,23 @@ public void setThrottleThreshold(long throttleThreshold) {
this.throttleThreshold = throttleThreshold;
}

public double getChunkMetadataSizeProportionInWrite() {
return chunkMetadataSizeProportionInWrite;
}

public void setChunkMetadataSizeProportionInWrite(double chunkMetadataSizeProportionInWrite) {
this.chunkMetadataSizeProportionInWrite = chunkMetadataSizeProportionInWrite;
}

public double getChunkMetadataSizeProportionInCompaction() {
return chunkMetadataSizeProportionInCompaction;
}

public void setChunkMetadataSizeProportionInCompaction(
double chunkMetadataSizeProportionInCompaction) {
this.chunkMetadataSizeProportionInCompaction = chunkMetadataSizeProportionInCompaction;
}

public long getCacheWindowTimeInMs() {
return cacheWindowTimeInMs;
}
Expand Down
11 changes: 11 additions & 0 deletions server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,11 @@ public void loadProperties(Properties properties) {
properties.getProperty(
"concurrent_compaction_thread",
Integer.toString(conf.getConcurrentCompactionThread()))));
conf.setChunkMetadataSizeProportionInCompaction(
Double.parseDouble(
properties.getProperty(
"chunk_metadata_size_proportion_in_compaction",
Double.toString(conf.getChunkMetadataSizeProportionInCompaction()))));
conf.setTargetCompactionFileSize(
Long.parseLong(
properties.getProperty(
Expand Down Expand Up @@ -1445,6 +1450,12 @@ public void loadHotModifiedProps(Properties properties) throws QueryProcessExcep
// update tsfile-format config
loadTsFileProps(properties);

conf.setChunkMetadataSizeProportionInWrite(
Double.parseDouble(
properties.getProperty(
"chunk_metadata_size_proportion_in_write",
Double.toString(conf.getChunkMetadataSizeProportionInWrite()))));

// update max_deduplicated_path_num
conf.setMaxQueryDeduplicatedPathNum(
Integer.parseInt(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;

/**
Expand All @@ -45,7 +44,7 @@ public class ReadPointPerformerSubTask implements Callable<Void> {
private static final Logger logger =
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
private final String device;
private final Set<String> measurementList;
private final List<String> measurementList;
private final FragmentInstanceContext fragmentInstanceContext;
private final QueryDataSource queryDataSource;
private final AbstractCompactionWriter compactionWriter;
Expand All @@ -54,7 +53,7 @@ public class ReadPointPerformerSubTask implements Callable<Void> {

public ReadPointPerformerSubTask(
String device,
Set<String> measurementList,
List<String> measurementList,
FragmentInstanceContext fragmentInstanceContext,
QueryDataSource queryDataSource,
AbstractCompactionWriter compactionWriter,
Expand Down Expand Up @@ -87,7 +86,7 @@ public Void call() throws Exception {
if (dataBlockReader.hasNextBatch()) {
compactionWriter.startMeasurement(measurementSchemas, taskId);
ReadPointCompactionPerformer.writeWithReader(
compactionWriter, dataBlockReader, taskId, false);
compactionWriter, dataBlockReader, device, taskId, false);
compactionWriter.endMeasurement(taskId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ public void execute() throws IOException {
chunkWriter.estimateMaxSeriesMemSize());
chunkWriter.writeToFileWriter(writer);
}
writer.checkMetadataSizeAndMayFlush();
}

private void compactOneAlignedChunk(AlignedChunkReader chunkReader) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ public void execute() throws IOException {
} else if (pointCountInChunkWriter != 0L) {
flushChunkWriter();
}
fileWriter.checkMetadataSizeAndMayFlush();
targetResource.updateStartTime(device, minStartTimestamp);
targetResource.updateEndTime(device, maxEndTimestamp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;

/**
* CompactionPerformer is used to compact multiple files into one or multiple files. Different
Expand All @@ -35,7 +36,8 @@
public interface ICompactionPerformer {

void perform()
throws IOException, MetadataException, StorageEngineException, InterruptedException;
throws IOException, MetadataException, StorageEngineException, InterruptedException,
ExecutionException;

void setTargetFiles(List<TsFileResource> targetFiles);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.inner.utils.AlignedSeriesCompactionExecutor;
import org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
import org.apache.iotdb.db.engine.compaction.inner.utils.SingleSeriesCompactionExecutor;
import org.apache.iotdb.db.engine.compaction.performer.ISeqCompactionPerformer;
import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;

import org.slf4j.Logger;
Expand Down Expand Up @@ -63,8 +64,17 @@ public ReadChunkCompactionPerformer() {}
@Override
public void perform()
throws IOException, MetadataException, InterruptedException, StorageEngineException {
// size for file writer is 5% of per compaction task memory budget
long sizeForFileWriter =
(long)
(SystemInfo.getInstance().getMemorySizeForCompaction()
/ IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
* IoTDBDescriptor.getInstance()
.getConfig()
.getChunkMetadataSizeProportionInCompaction());
try (MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(seqFiles);
TsFileIOWriter writer = new TsFileIOWriter(targetResource.getTsFile())) {
TsFileIOWriter writer =
new TsFileIOWriter(targetResource.getTsFile(), true, sizeForFileWriter)) {
while (deviceIterator.hasNextDevice()) {
Pair<String, Boolean> deviceInfo = deviceIterator.nextDevice();
String device = deviceInfo.left;
Expand Down Expand Up @@ -138,7 +148,6 @@ private void compactNotAlignedSeries(
checkThreadInterrupted();
// TODO: we can provide a configuration item to enable concurrent between each series
PartialPath p = new PartialPath(device, seriesIterator.nextSeries());
IMeasurementSchema measurementSchema;
// TODO: seriesIterator needs to be refactor.
// This statement must be called before next hasNextSeries() called, or it may be trapped in a
// dead-loop.
Expand Down
Loading