diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties index 6f9173bc6333d..7c32433b09208 100644 --- a/server/src/assembly/resources/conf/iotdb-datanode.properties +++ b/server/src/assembly/resources/conf/iotdb-datanode.properties @@ -460,6 +460,10 @@ timestamp_precision=ms # Datatype: int # primitive_array_size=32 +# the percentage of write memory for chunk metadata remains in a single file writer when flushing memtable +# Datatype: double +# chunk_metadata_size_proportion_in_write=0.1 + # Ratio of write memory for invoking flush disk, 0.4 by default # If you have extremely high write load (like batch=1000), it can be set lower than the default value like 0.2 # Datatype: double @@ -576,6 +580,10 @@ timestamp_precision=ms # BALANCE: alternate two compaction types # compaction_priority=BALANCE +# size proportion for chunk metadata maintains in memory when compacting +# Datatype: double +# chunk_metadata_size_proportion_in_compaction=0.05 + # The target tsfile size in compaction # Datatype: long, Unit: byte # target_compaction_file_size=1073741824 diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 9e14dc37ed48d..d4c7b117f2060 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -150,6 +150,8 @@ public class IoTDBConfig { /** The proportion of write memory for memtable */ private double writeProportion = 0.8; + private double chunkMetadataSizeProportionInWrite = 0.1; + /** The proportion of write memory for compaction */ private double compactionProportion = 0.2; @@ -434,6 +436,8 @@ public class IoTDBConfig { */ private CompactionPriority compactionPriority = CompactionPriority.BALANCE; + private double chunkMetadataSizeProportionInCompaction = 0.05; + /** The target tsfile size in compaction, 1 GB by default */ private long targetCompactionFileSize = 1073741824L; @@ -3163,6 +3167,23 @@ public void setThrottleThreshold(long throttleThreshold) { this.throttleThreshold = throttleThreshold; } + public double getChunkMetadataSizeProportionInWrite() { + return chunkMetadataSizeProportionInWrite; + } + + public void setChunkMetadataSizeProportionInWrite(double chunkMetadataSizeProportionInWrite) { + this.chunkMetadataSizeProportionInWrite = chunkMetadataSizeProportionInWrite; + } + + public double getChunkMetadataSizeProportionInCompaction() { + return chunkMetadataSizeProportionInCompaction; + } + + public void setChunkMetadataSizeProportionInCompaction( + double chunkMetadataSizeProportionInCompaction) { + this.chunkMetadataSizeProportionInCompaction = chunkMetadataSizeProportionInCompaction; + } + public long getCacheWindowTimeInMs() { return cacheWindowTimeInMs; } diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 7171b70191d69..8cccd57e5af29 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -616,6 +616,11 @@ public void loadProperties(Properties properties) { properties.getProperty( "concurrent_compaction_thread", Integer.toString(conf.getConcurrentCompactionThread())))); + conf.setChunkMetadataSizeProportionInCompaction( + Double.parseDouble( + properties.getProperty( + "chunk_metadata_size_proportion_in_compaction", + Double.toString(conf.getChunkMetadataSizeProportionInCompaction())))); conf.setTargetCompactionFileSize( Long.parseLong( properties.getProperty( @@ -1445,6 +1450,12 @@ public void loadHotModifiedProps(Properties properties) throws QueryProcessExcep // update tsfile-format config loadTsFileProps(properties); + conf.setChunkMetadataSizeProportionInWrite( + Double.parseDouble( + properties.getProperty( + "chunk_metadata_size_proportion_in_write", + Double.toString(conf.getChunkMetadataSizeProportionInWrite())))); + // update max_deduplicated_path_num conf.setMaxQueryDeduplicatedPathNum( Integer.parseInt( diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java index 4280d2a6f54a2..5dd91bdd44a69 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java @@ -33,7 +33,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.Callable; /** @@ -45,7 +44,7 @@ public class ReadPointPerformerSubTask implements Callable { private static final Logger logger = LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME); private final String device; - private final Set measurementList; + private final List measurementList; private final FragmentInstanceContext fragmentInstanceContext; private final QueryDataSource queryDataSource; private final AbstractCompactionWriter compactionWriter; @@ -54,7 +53,7 @@ public class ReadPointPerformerSubTask implements Callable { public ReadPointPerformerSubTask( String device, - Set measurementList, + List measurementList, FragmentInstanceContext fragmentInstanceContext, QueryDataSource queryDataSource, AbstractCompactionWriter compactionWriter, @@ -87,7 +86,7 @@ public Void call() throws Exception { if (dataBlockReader.hasNextBatch()) { compactionWriter.startMeasurement(measurementSchemas, taskId); ReadPointCompactionPerformer.writeWithReader( - compactionWriter, dataBlockReader, taskId, false); + compactionWriter, dataBlockReader, device, taskId, false); compactionWriter.endMeasurement(taskId); } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java index c9c36378f506f..3de4c64a36e36 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java @@ -151,6 +151,7 @@ public void execute() throws IOException { chunkWriter.estimateMaxSeriesMemSize()); chunkWriter.writeToFileWriter(writer); } + writer.checkMetadataSizeAndMayFlush(); } private void compactOneAlignedChunk(AlignedChunkReader chunkReader) throws IOException { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java index 40f2632d0e2cf..d1d4a366e7f2e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java @@ -151,6 +151,7 @@ public void execute() throws IOException { } else if (pointCountInChunkWriter != 0L) { flushChunkWriter(); } + fileWriter.checkMetadataSizeAndMayFlush(); targetResource.updateStartTime(device, minStartTimestamp); targetResource.updateEndTime(device, maxEndTimestamp); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/ICompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/ICompactionPerformer.java index 172eb50ee7bfb..2799c3236b9b0 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/ICompactionPerformer.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/ICompactionPerformer.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.List; +import java.util.concurrent.ExecutionException; /** * CompactionPerformer is used to compact multiple files into one or multiple files. Different @@ -35,7 +36,8 @@ public interface ICompactionPerformer { void perform() - throws IOException, MetadataException, StorageEngineException, InterruptedException; + throws IOException, MetadataException, StorageEngineException, InterruptedException, + ExecutionException; void setTargetFiles(List targetFiles); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java index 1a94214848e54..ac0fa1ddc46f8 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.inner.utils.AlignedSeriesCompactionExecutor; import org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator; import org.apache.iotdb.db.engine.compaction.inner.utils.SingleSeriesCompactionExecutor; @@ -28,11 +29,11 @@ import org.apache.iotdb.db.engine.compaction.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.rescon.SystemInfo; import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.utils.Pair; -import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import org.slf4j.Logger; @@ -63,8 +64,17 @@ public ReadChunkCompactionPerformer() {} @Override public void perform() throws IOException, MetadataException, InterruptedException, StorageEngineException { + // size for file writer is 5% of per compaction task memory budget + long sizeForFileWriter = + (long) + (SystemInfo.getInstance().getMemorySizeForCompaction() + / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread() + * IoTDBDescriptor.getInstance() + .getConfig() + .getChunkMetadataSizeProportionInCompaction()); try (MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(seqFiles); - TsFileIOWriter writer = new TsFileIOWriter(targetResource.getTsFile())) { + TsFileIOWriter writer = + new TsFileIOWriter(targetResource.getTsFile(), true, sizeForFileWriter)) { while (deviceIterator.hasNextDevice()) { Pair deviceInfo = deviceIterator.nextDevice(); String device = deviceInfo.left; @@ -138,7 +148,6 @@ private void compactNotAlignedSeries( checkThreadInterrupted(); // TODO: we can provide a configuration item to enable concurrent between each series PartialPath p = new PartialPath(device, seriesIterator.nextSeries()); - IMeasurementSchema measurementSchema; // TODO: seriesIterator needs to be refactor. // This statement must be called before next hasNextSeries() called, or it may be trapped in a // dead-loop. diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java index 0b8df320acefb..dcc81f1e3b765 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java @@ -47,7 +47,6 @@ import org.apache.iotdb.db.utils.QueryUtils; import org.apache.iotdb.tsfile.file.header.ChunkHeader; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; -import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; @@ -57,7 +56,6 @@ import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; -import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,7 +68,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.stream.Collectors; @@ -106,7 +103,8 @@ public ReadPointCompactionPerformer() {} @Override public void perform() - throws IOException, MetadataException, StorageEngineException, InterruptedException { + throws IOException, MetadataException, StorageEngineException, InterruptedException, + ExecutionException { long queryId = QueryResourceManager.getInstance().assignCompactionQueryId(); FragmentInstanceContext fragmentInstanceContext = FragmentInstanceContext.createFragmentInstanceContextForCompaction(queryId); @@ -137,7 +135,6 @@ public void perform() } compactionWriter.endFile(); - updateDeviceStartTimeAndEndTime(targetFiles, compactionWriter); updatePlanIndexes(targetFiles, seqFiles, unseqFiles); } finally { clearReaderCache(); @@ -164,7 +161,8 @@ private void compactAlignedSeries( throws IOException, MetadataException { MultiTsFileDeviceIterator.AlignedMeasurementIterator alignedMeasurementIterator = deviceIterator.iterateAlignedSeries(device); - Set allMeasurements = alignedMeasurementIterator.getAllMeasurements(); + List allMeasurements = + new LinkedList<>(alignedMeasurementIterator.getAllMeasurements()); Map schemaMap = getMeasurementSchema(device, allMeasurements); List measurementSchemas = new ArrayList<>(schemaMap.values()); if (measurementSchemas.isEmpty()) { @@ -188,9 +186,10 @@ private void compactAlignedSeries( // chunkgroup is serialized only when at least one timeseries under this device has data compactionWriter.startChunkGroup(device, true); compactionWriter.startMeasurement(measurementSchemas, 0); - writeWithReader(compactionWriter, dataBlockReader, 0, true); + writeWithReader(compactionWriter, dataBlockReader, device, 0, true); compactionWriter.endMeasurement(0); compactionWriter.endChunkGroup(); + compactionWriter.checkAndMayFlushChunkMetadata(); } } @@ -200,55 +199,43 @@ private void compactNonAlignedSeries( AbstractCompactionWriter compactionWriter, FragmentInstanceContext fragmentInstanceContext, QueryDataSource queryDataSource) - throws IOException, InterruptedException, IllegalPathException { + throws IOException, InterruptedException, IllegalPathException, ExecutionException { MultiTsFileDeviceIterator.MeasurementIterator measurementIterator = deviceIterator.iterateNotAlignedSeries(device, false); - Set allMeasurements = measurementIterator.getAllMeasurements(); + List allMeasurements = new ArrayList<>(measurementIterator.getAllMeasurements()); + allMeasurements.sort((String::compareTo)); int subTaskNums = Math.min(allMeasurements.size(), subTaskNum); Map schemaMap = getMeasurementSchema(device, allMeasurements); - // assign all measurements to different sub tasks - Set[] measurementsForEachSubTask = new HashSet[subTaskNums]; - int idx = 0; - for (String measurement : allMeasurements) { - if (measurementsForEachSubTask[idx % subTaskNums] == null) { - measurementsForEachSubTask[idx % subTaskNums] = new HashSet<>(); - } - measurementsForEachSubTask[idx++ % subTaskNums].add(measurement); - } - // construct sub tasks and start compacting measurements in parallel - List> futures = new ArrayList<>(); compactionWriter.startChunkGroup(device, false); - for (int i = 0; i < subTaskNums; i++) { - futures.add( - CompactionTaskManager.getInstance() - .submitSubTask( - new ReadPointPerformerSubTask( - device, - measurementsForEachSubTask[i], - fragmentInstanceContext, - queryDataSource, - compactionWriter, - schemaMap, - i))); - } - - // wait for all sub tasks finish - for (int i = 0; i < subTaskNums; i++) { - try { - futures.get(i).get(); - } catch (ExecutionException e) { - LOGGER.error("[Compaction] SubCompactionTask meet errors ", e); - throw new IOException(e); + for (int taskCount = 0; taskCount < allMeasurements.size(); ) { + List> futures = new ArrayList<>(); + for (int i = 0; i < subTaskNums && taskCount < allMeasurements.size(); i++) { + futures.add( + CompactionTaskManager.getInstance() + .submitSubTask( + new ReadPointPerformerSubTask( + device, + Collections.singletonList(allMeasurements.get(taskCount++)), + fragmentInstanceContext, + queryDataSource, + compactionWriter, + schemaMap, + i))); + } + for (Future future : futures) { + future.get(); } + // sync all the subtask, and check the writer chunk metadata size + compactionWriter.checkAndMayFlushChunkMetadata(); } compactionWriter.endChunkGroup(); } private Map getMeasurementSchema( - String device, Set measurements) throws IllegalPathException, IOException { + String device, List measurements) throws IllegalPathException, IOException { HashMap schemaMap = new HashMap<>(); List allResources = new LinkedList<>(seqFiles); allResources.addAll(unseqFiles); @@ -317,29 +304,6 @@ private void clearReaderCache() throws IOException { } } - private static void updateDeviceStartTimeAndEndTime( - List targetResources, AbstractCompactionWriter compactionWriter) { - List targetFileWriters = compactionWriter.getFileIOWriter(); - for (int i = 0; i < targetFileWriters.size(); i++) { - TsFileIOWriter fileIOWriter = targetFileWriters.get(i); - TsFileResource fileResource = targetResources.get(i); - // The tmp target file may does not have any data points written due to the existence of the - // mods file, and it will be deleted after compaction. So skip the target file that has been - // deleted. - if (!fileResource.getTsFile().exists()) { - continue; - } - for (Map.Entry> entry : - fileIOWriter.getDeviceTimeseriesMetadataMap().entrySet()) { - String device = entry.getKey(); - for (TimeseriesMetadata timeseriesMetadata : entry.getValue()) { - fileResource.updateStartTime(device, timeseriesMetadata.getStatistics().getStartTime()); - fileResource.updateEndTime(device, timeseriesMetadata.getStatistics().getEndTime()); - } - } - } - } - /** * @param measurementIds if device is aligned, then measurementIds contain all measurements. If * device is not aligned, then measurementIds only contain one measurement. @@ -348,7 +312,7 @@ public static IDataBlockReader constructReader( String deviceId, List measurementIds, List measurementSchemas, - Set allSensors, + List allSensors, FragmentInstanceContext fragmentInstanceContext, QueryDataSource queryDataSource, boolean isAlign) @@ -363,11 +327,20 @@ public static IDataBlockReader constructReader( tsDataType = measurementSchemas.get(0).getType(); } return new SeriesDataBlockReader( - seriesPath, allSensors, tsDataType, fragmentInstanceContext, queryDataSource, true); + seriesPath, + new HashSet<>(allSensors), + tsDataType, + fragmentInstanceContext, + queryDataSource, + true); } public static void writeWithReader( - AbstractCompactionWriter writer, IDataBlockReader reader, int subTaskId, boolean isAligned) + AbstractCompactionWriter writer, + IDataBlockReader reader, + String device, + int subTaskId, + boolean isAligned) throws IOException { while (reader.hasNextBatch()) { TsBlock tsBlock = reader.nextBatch(); @@ -375,6 +348,7 @@ public static void writeWithReader( writer.write( tsBlock.getTimeColumn(), tsBlock.getValueColumns(), + device, subTaskId, tsBlock.getPositionCount()); } else { @@ -383,6 +357,7 @@ public static void writeWithReader( TimeValuePair timeValuePair = pointReader.nextTimeValuePair(); writer.write( timeValuePair.getTimestamp(), timeValuePair.getValue().getValue(), subTaskId); + writer.updateStartTimeAndEndTime(device, timeValuePair.getTimestamp(), subTaskId); } } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java index 542c44c4f0c90..ae01567f9bd3c 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java @@ -72,7 +72,8 @@ public void startMeasurement(List measurementSchemaList, int public abstract void write(long timestamp, Object value, int subTaskId) throws IOException; - public abstract void write(TimeColumn timestamps, Column[] columns, int subTaskId, int batchSize) + public abstract void write( + TimeColumn timestamps, Column[] columns, String device, int subTaskId, int batchSize) throws IOException; public abstract void endFile() throws IOException; @@ -140,6 +141,8 @@ protected void writeDataPoint(Long timestamp, Object value, int subTaskId) { measurementPointCountArray[subTaskId] += 1; } + public abstract void updateStartTimeAndEndTime(String device, long time, int subTaskId); + protected void flushChunkToFileWriter(TsFileIOWriter targetWriter, int subTaskId) throws IOException { writeRateLimit(chunkWriters[subTaskId].estimateMaxSeriesMemSize()); @@ -177,4 +180,11 @@ protected void writeRateLimit(long bytesLength) { } public abstract List getFileIOWriter(); + + public void checkAndMayFlushChunkMetadata() throws IOException { + List writers = this.getFileIOWriter(); + for (TsFileIOWriter writer : writers) { + writer.checkMetadataSizeAndMayFlush(); + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java index 80902dd1d946f..d192c0f6d71df 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java @@ -18,8 +18,10 @@ */ package org.apache.iotdb.db.engine.compaction.writer; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.query.control.FileReaderManager; +import org.apache.iotdb.db.rescon.SystemInfo; import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; @@ -37,6 +39,7 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter { // source tsfiles private List seqTsFileResources; + private List targetTsFileResources; // Each sub task has its corresponding seq file index. // The index of the array corresponds to subTaskId. @@ -57,11 +60,21 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter { public CrossSpaceCompactionWriter( List targetResources, List seqFileResources) throws IOException { + this.targetTsFileResources = targetResources; currentDeviceEndTime = new long[seqFileResources.size()]; isEmptyFile = new boolean[seqFileResources.size()]; isDeviceExistedInTargetFiles = new boolean[targetResources.size()]; + long memorySizeForEachWriter = + (long) + (SystemInfo.getInstance().getMemorySizeForCompaction() + / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread() + * IoTDBDescriptor.getInstance() + .getConfig() + .getChunkMetadataSizeProportionInCompaction() + / targetResources.size()); for (int i = 0; i < targetResources.size(); i++) { - this.fileWriterList.add(new TsFileIOWriter(targetResources.get(i).getTsFile())); + this.fileWriterList.add( + new TsFileIOWriter(targetResources.get(i).getTsFile(), true, memorySizeForEachWriter)); isEmptyFile[i] = true; } this.seqTsFileResources = seqFileResources; @@ -111,12 +124,19 @@ public void write(long timestamp, Object value, int subTaskId) throws IOExceptio } @Override - public void write(TimeColumn timestamps, Column[] columns, int subTaskId, int batchSize) + public void write( + TimeColumn timestamps, Column[] columns, String device, int subTaskId, int batchSize) throws IOException { // todo control time range of target tsfile checkTimeAndMayFlushChunkToCurrentFile(timestamps.getStartTime(), subTaskId); AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) this.chunkWriters[subTaskId]; chunkWriter.write(timestamps, columns, batchSize); + synchronized (this) { + // we need to synchronized here to avoid multi-thread competition in sub-task + TsFileResource resource = targetTsFileResources.get(seqFileIndexArray[subTaskId]); + resource.updateStartTime(device, timestamps.getStartTime()); + resource.updateEndTime(device, timestamps.getEndTime()); + } checkChunkSizeAndMayOpenANewChunk(fileWriterList.get(seqFileIndexArray[subTaskId]), subTaskId); isDeviceExistedInTargetFiles[seqFileIndexArray[subTaskId]] = true; isEmptyFile[seqFileIndexArray[subTaskId]] = false; @@ -192,4 +212,15 @@ private void checkIsDeviceExistAndGetDeviceEndTime() throws IOException { fileIndex++; } } + + @Override + public void updateStartTimeAndEndTime(String device, long time, int subTaskId) { + synchronized (this) { + int fileIndex = seqFileIndexArray[subTaskId]; + TsFileResource resource = targetTsFileResources.get(fileIndex); + // we need to synchronized here to avoid multi-thread competition in sub-task + resource.updateStartTime(device, time); + resource.updateEndTime(device, time); + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java index a73c6c29074fe..2c3c2e58ad07b 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java @@ -18,7 +18,9 @@ */ package org.apache.iotdb.db.engine.compaction.writer; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.rescon.SystemInfo; import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl; @@ -32,10 +34,19 @@ public class InnerSpaceCompactionWriter extends AbstractCompactionWriter { private TsFileIOWriter fileWriter; private boolean isEmptyFile; + private TsFileResource resource; public InnerSpaceCompactionWriter(TsFileResource targetFileResource) throws IOException { - this.fileWriter = new TsFileIOWriter(targetFileResource.getTsFile()); + long sizeForFileWriter = + (long) + (SystemInfo.getInstance().getMemorySizeForCompaction() + / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread() + * IoTDBDescriptor.getInstance() + .getConfig() + .getChunkMetadataSizeProportionInCompaction()); + this.fileWriter = new TsFileIOWriter(targetFileResource.getTsFile(), true, sizeForFileWriter); isEmptyFile = true; + resource = targetFileResource; } @Override @@ -65,11 +76,17 @@ public void write(long timestamp, Object value, int subTaskId) throws IOExceptio } @Override - public void write(TimeColumn timestamps, Column[] columns, int subTaskId, int batchSize) + public void write( + TimeColumn timestamps, Column[] columns, String device, int subTaskId, int batchSize) throws IOException { AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) this.chunkWriters[subTaskId]; chunkWriter.write(timestamps, columns, batchSize); checkChunkSizeAndMayOpenANewChunk(fileWriter, subTaskId); + synchronized (this) { + // we need to synchronized here to avoid multi-thread competition in sub-task + resource.updateStartTime(device, timestamps.getStartTime()); + resource.updateEndTime(device, timestamps.getEndTime()); + } isEmptyFile = false; } @@ -89,6 +106,15 @@ public void close() throws IOException { fileWriter = null; } + @Override + public void updateStartTimeAndEndTime(String device, long time, int subTaskId) { + // we need to synchronized here to avoid multi-thread competition in sub-task + synchronized (this) { + resource.updateStartTime(device, time); + resource.updateEndTime(device, time); + } + } + @Override public List getFileIOWriter() { return Collections.singletonList(fileWriter); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java index a3ca350fea248..6c6ab965b6fd1 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java @@ -38,6 +38,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -114,14 +117,19 @@ public void syncFlushMemTable() throws ExecutionException, InterruptedException long sortTime = 0; // for map do not use get(key) to iterate - for (Map.Entry memTableEntry : - memTable.getMemTableMap().entrySet()) { - encodingTaskQueue.put(new StartFlushGroupIOTask(memTableEntry.getKey().toStringID())); - - final Map value = memTableEntry.getValue().getMemChunkMap(); - for (Map.Entry iWritableMemChunkEntry : value.entrySet()) { + Map memTableMap = memTable.getMemTableMap(); + List deviceIDList = new ArrayList<>(memTableMap.keySet()); + // sort the IDeviceID in lexicographical order + deviceIDList.sort(Comparator.comparing(IDeviceID::toStringID)); + for (IDeviceID deviceID : deviceIDList) { + encodingTaskQueue.put(new StartFlushGroupIOTask(deviceID.toStringID())); + + final Map value = memTableMap.get(deviceID).getMemChunkMap(); + List seriesInOrder = new ArrayList<>(value.keySet()); + seriesInOrder.sort((String::compareTo)); + for (String seriesId : seriesInOrder) { long startTime = System.currentTimeMillis(); - IWritableMemChunk series = iWritableMemChunkEntry.getValue(); + IWritableMemChunk series = value.get(seriesId); /* * sort task (first task of flush pipeline) */ @@ -274,6 +282,7 @@ public void run() { this.writer.endChunkGroup(); } else { ((IChunkWriter) ioMessage).writeToFileWriter(this.writer); + writer.checkMetadataSizeAndMayFlush(); } } catch (IOException e) { LOGGER.error( diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index 570b97421aced..336181281bd46 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -178,7 +178,14 @@ public class TsFileProcessor { this.storageGroupName = storageGroupName; this.tsFileResource = new TsFileResource(tsfile, this); this.storageGroupInfo = storageGroupInfo; - this.writer = new RestorableTsFileIOWriter(tsfile); + this.writer = + new RestorableTsFileIOWriter( + tsfile, + (long) + (IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold() + * IoTDBDescriptor.getInstance() + .getConfig() + .getChunkMetadataSizeProportionInWrite())); this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback; this.sequence = sequence; this.walNode = WALManager.getInstance().applyForWALNode(storageGroupName); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java index 79f7b597c6b9f..95d35d115d958 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java @@ -44,6 +44,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.utils.FilePathUtils; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; +import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -505,10 +506,15 @@ public void removeModFile() throws IOException { modFile = null; } - /** Remove the data file, its resource file, and its modification file physically. */ + /** + * Remove the data file, its resource file, its chunk metadata temp file, and its modification + * file physically. + */ public boolean remove() { try { fsFactory.deleteIfExists(file); + fsFactory.deleteIfExists( + new File(file.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)); } catch (IOException e) { LOGGER.error("TsFile {} cannot be deleted: {}", file, e.getMessage()); return false; diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSplitByPartitionTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSplitByPartitionTool.java index b573ed354d271..ddfa8789b7d5d 100644 --- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSplitByPartitionTool.java +++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSplitByPartitionTool.java @@ -492,10 +492,10 @@ protected boolean fileCheck() throws IOException { protected TsFileResource endFileAndGenerateResource(TsFileIOWriter tsFileIOWriter) throws IOException { - tsFileIOWriter.endFile(); - TsFileResource tsFileResource = new TsFileResource(tsFileIOWriter.getFile()); Map> deviceTimeseriesMetadataMap = tsFileIOWriter.getDeviceTimeseriesMetadataMap(); + tsFileIOWriter.endFile(); + TsFileResource tsFileResource = new TsFileResource(tsFileIOWriter.getFile()); for (Entry> entry : deviceTimeseriesMetadataMap.entrySet()) { String device = entry.getKey(); for (TimeseriesMetadata timeseriesMetaData : entry.getValue()) { diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/AbstractTsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/AbstractTsFileRecoverPerformer.java index f2c3934ccf1f5..e506d66c3a50f 100644 --- a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/AbstractTsFileRecoverPerformer.java +++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/AbstractTsFileRecoverPerformer.java @@ -18,13 +18,16 @@ */ package org.apache.iotdb.db.wal.recover.file; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.DataRegionException; import org.apache.iotdb.db.utils.FileLoaderUtils; import org.apache.iotdb.tsfile.exception.NotCompatibleTsFileException; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; +import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; +import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +58,12 @@ public AbstractTsFileRecoverPerformer(TsFileResource tsFileResource) { */ protected void recoverWithWriter() throws DataRegionException, IOException { File tsFile = tsFileResource.getTsFile(); + File chunkMetadataTempFile = + new File(tsFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX); + if (chunkMetadataTempFile.exists()) { + // delete chunk metadata temp file + FileUtils.delete(chunkMetadataTempFile); + } if (!tsFile.exists()) { logger.error("TsFile {} is missing, will skip its recovery.", tsFile); return; @@ -68,7 +77,14 @@ protected void recoverWithWriter() throws DataRegionException, IOException { // try to remove corrupted part of the TsFile try { - writer = new RestorableTsFileIOWriter(tsFile); + writer = + new RestorableTsFileIOWriter( + tsFile, + (long) + (IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold() + * IoTDBDescriptor.getInstance() + .getConfig() + .getChunkMetadataSizeProportionInWrite())); } catch (NotCompatibleTsFileException e) { boolean result = tsFile.delete(); logger.warn( diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java index f0ee6155a1789..f12e7499f7e4c 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java @@ -62,6 +62,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR; import static org.junit.Assert.assertEquals; @@ -92,7 +93,7 @@ public void tearDown() throws IOException, StorageEngineException { @Test public void testSeqInnerSpaceCompactionWithSameTimeseries() throws IOException, WriteProcessException, MetadataException, StorageEngineException, - InterruptedException { + InterruptedException, ExecutionException { registerTimeseriesInMManger(2, 3, false); createFiles(5, 2, 3, 100, 0, 0, 50, 50, false, true); @@ -167,7 +168,7 @@ public void testSeqInnerSpaceCompactionWithSameTimeseries() @Test public void testSeqInnerSpaceCompactionWithDifferentTimeseries() throws IOException, WriteProcessException, MetadataException, StorageEngineException, - InterruptedException { + InterruptedException, ExecutionException { registerTimeseriesInMManger(5, 5, false); createFiles(2, 2, 3, 100, 0, 0, 50, 50, false, true); createFiles(2, 3, 5, 50, 250, 250, 50, 50, false, true); @@ -286,7 +287,7 @@ public void testSeqInnerSpaceCompactionWithDifferentTimeseries() @Test public void testUnSeqInnerSpaceCompactionWithSameTimeseries() throws IOException, WriteProcessException, MetadataException, StorageEngineException, - InterruptedException { + InterruptedException, ExecutionException { registerTimeseriesInMManger(2, 3, false); createFiles(5, 2, 3, 100, 0, 0, 50, 50, false, false); @@ -372,7 +373,7 @@ public void testUnSeqInnerSpaceCompactionWithSameTimeseries() @Test public void testUnSeqInnerSpaceCompactionWithDifferentTimeseries() throws IOException, WriteProcessException, MetadataException, StorageEngineException, - InterruptedException { + InterruptedException, ExecutionException { registerTimeseriesInMManger(9, 9, false); createFiles(2, 2, 3, 100, 0, 0, 50, 50, false, false); createFiles(2, 3, 5, 50, 150, 150, 50, 50, false, false); @@ -498,7 +499,7 @@ public void testUnSeqInnerSpaceCompactionWithDifferentTimeseries() @Test public void testUnSeqInnerSpaceCompactionWithAllDataDeletedInTimeseries() throws IOException, WriteProcessException, MetadataException, StorageEngineException, - InterruptedException { + InterruptedException, ExecutionException { TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30); registerTimeseriesInMManger(5, 7, false); createFiles(2, 2, 3, 300, 0, 0, 0, 0, false, false); @@ -633,7 +634,7 @@ public void testUnSeqInnerSpaceCompactionWithAllDataDeletedInTimeseries() @Test public void testUnSeqInnerSpaceCompactionWithAllDataDeletedInDevice() throws IOException, WriteProcessException, MetadataException, StorageEngineException, - InterruptedException { + InterruptedException, ExecutionException { TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30); registerTimeseriesInMManger(5, 7, false); createFiles(2, 2, 3, 300, 0, 0, 0, 0, false, false); @@ -761,7 +762,7 @@ public void testUnSeqInnerSpaceCompactionWithAllDataDeletedInDevice() @Test public void testUnSeqInnerSpaceCompactionWithAllDataDeletedInTargetFile() throws IOException, WriteProcessException, MetadataException, StorageEngineException, - InterruptedException { + InterruptedException, ExecutionException { TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30); registerTimeseriesInMManger(5, 7, false); createFiles(2, 2, 3, 300, 0, 0, 0, 0, false, false); @@ -853,7 +854,7 @@ public void testUnSeqInnerSpaceCompactionWithAllDataDeletedInTargetFile() @Test public void testAlignedSeqInnerSpaceCompactionWithSameTimeseries() throws IOException, WriteProcessException, MetadataException, StorageEngineException, - InterruptedException { + InterruptedException, ExecutionException { registerTimeseriesInMManger(2, 3, true); createFiles(5, 2, 3, 100, 0, 0, 50, 50, true, true); @@ -950,7 +951,7 @@ public void testAlignedSeqInnerSpaceCompactionWithSameTimeseries() @Test public void testAlignedSeqInnerSpaceCompactionWithDifferentTimeseriesAndEmptyPage() throws IOException, WriteProcessException, MetadataException, StorageEngineException, - InterruptedException { + InterruptedException, ExecutionException { TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(50); registerTimeseriesInMManger(5, 7, true); createFiles(2, 2, 3, 100, 0, 0, 50, 50, true, true); @@ -1072,7 +1073,7 @@ public void testAlignedSeqInnerSpaceCompactionWithDifferentTimeseriesAndEmptyPag @Test public void testAlignedSeqInnerSpaceCompactionWithDifferentTimeseriesAndEmptyChunk() throws IOException, WriteProcessException, MetadataException, StorageEngineException, - InterruptedException { + InterruptedException, ExecutionException { registerTimeseriesInMManger(5, 7, true); createFiles(2, 2, 3, 100, 0, 0, 50, 50, true, true); createFiles(2, 3, 5, 50, 250, 250, 50, 50, true, true); @@ -1193,7 +1194,7 @@ public void testAlignedSeqInnerSpaceCompactionWithDifferentTimeseriesAndEmptyChu @Test public void testAlignedUnSeqInnerSpaceCompactionWithEmptyChunkAndEmptyPage() throws IOException, WriteProcessException, MetadataException, StorageEngineException, - InterruptedException { + InterruptedException, ExecutionException { TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30); registerTimeseriesInMManger(5, 7, true); createFiles(2, 2, 3, 300, 0, 0, 0, 0, true, false); @@ -1326,7 +1327,7 @@ public void testAlignedUnSeqInnerSpaceCompactionWithEmptyChunkAndEmptyPage() @Test public void testAlignedUnSeqInnerSpaceCompactionWithAllDataDeletedInTimeseries() throws IOException, WriteProcessException, MetadataException, StorageEngineException, - InterruptedException { + InterruptedException, ExecutionException { TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30); registerTimeseriesInMManger(5, 7, true); createFiles(2, 2, 3, 300, 0, 0, 0, 0, true, false); @@ -1507,7 +1508,7 @@ public void testAlignedUnSeqInnerSpaceCompactionWithAllDataDeletedInTimeseries() @Test public void testAlignedUnSeqInnerSpaceCompactionWithAllDataDeletedInDevice() throws IOException, WriteProcessException, MetadataException, StorageEngineException, - InterruptedException { + InterruptedException, ExecutionException { TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30); registerTimeseriesInMManger(5, 7, true); createFiles(2, 2, 3, 300, 0, 0, 0, 0, true, false); @@ -1655,7 +1656,7 @@ public void testAlignedUnSeqInnerSpaceCompactionWithAllDataDeletedInDevice() @Test public void testAlignedUnSeqInnerSpaceCompactionWithSameTimeseries() throws IOException, WriteProcessException, MetadataException, StorageEngineException, - InterruptedException { + InterruptedException, ExecutionException { registerTimeseriesInMManger(2, 3, true); createFiles(5, 2, 3, 100, 0, 0, 50, 50, true, false); @@ -1754,7 +1755,7 @@ public void testAlignedUnSeqInnerSpaceCompactionWithSameTimeseries() @Test public void testCrossSpaceCompactionWithSameTimeseries() throws IOException, WriteProcessException, MetadataException, StorageEngineException, - InterruptedException { + InterruptedException, ExecutionException { registerTimeseriesInMManger(2, 3, false); createFiles(5, 2, 3, 100, 0, 0, 0, 0, false, true); createFiles(5, 2, 3, 50, 0, 10000, 50, 50, false, false); @@ -1845,7 +1846,7 @@ public void testCrossSpaceCompactionWithSameTimeseries() @Test public void testCrossSpaceCompactionWithDifferentTimeseries() throws IOException, WriteProcessException, MetadataException, StorageEngineException, - InterruptedException { + InterruptedException, ExecutionException { TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30); registerTimeseriesInMManger(4, 5, false); createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true); @@ -2031,7 +2032,7 @@ public void testCrossSpaceCompactionWithDifferentTimeseries() @Test public void testCrossSpaceCompactionWithAllDataDeletedInTimeseries() throws IOException, WriteProcessException, MetadataException, StorageEngineException, - InterruptedException { + InterruptedException, ExecutionException { TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30); registerTimeseriesInMManger(4, 5, false); createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true); @@ -2229,7 +2230,7 @@ public void testCrossSpaceCompactionWithAllDataDeletedInTimeseries() @Test public void testCrossSpaceCompactionWithAllDataDeletedInDevice() throws IOException, WriteProcessException, MetadataException, StorageEngineException, - InterruptedException { + InterruptedException, ExecutionException { TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30); registerTimeseriesInMManger(4, 5, false); createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true); @@ -2418,7 +2419,7 @@ public void testCrossSpaceCompactionWithAllDataDeletedInDevice() @Test public void testCrossSpaceCompactionWithAllDataDeletedInOneTargetFile() throws IOException, WriteProcessException, MetadataException, StorageEngineException, - InterruptedException { + InterruptedException, ExecutionException { TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30); registerTimeseriesInMManger(4, 5, false); createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true); @@ -2584,7 +2585,7 @@ public void testCrossSpaceCompactionWithAllDataDeletedInOneTargetFile() @Test public void testCrossSpaceCompactionWithAllDataDeletedInDeviceInSeqFiles() throws IOException, WriteProcessException, MetadataException, StorageEngineException, - InterruptedException { + InterruptedException, ExecutionException { TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30); registerTimeseriesInMManger(4, 5, false); createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true); @@ -2789,7 +2790,7 @@ public void testCrossSpaceCompactionWithAllDataDeletedInDeviceInSeqFiles() @Test public void testAlignedCrossSpaceCompactionWithSameTimeseries() throws IOException, WriteProcessException, MetadataException, StorageEngineException, - InterruptedException { + InterruptedException, ExecutionException { registerTimeseriesInMManger(2, 3, true); createFiles(5, 2, 3, 100, 0, 0, 0, 0, true, true); createFiles(5, 2, 3, 50, 0, 10000, 50, 50, true, false); @@ -2889,7 +2890,7 @@ public void testAlignedCrossSpaceCompactionWithSameTimeseries() @Test public void testAlignedCrossSpaceCompactionWithDifferentTimeseries() throws IOException, WriteProcessException, MetadataException, StorageEngineException, - InterruptedException { + InterruptedException, ExecutionException { TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30); registerTimeseriesInMManger(4, 5, true); createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true); @@ -3054,7 +3055,7 @@ public void testAlignedCrossSpaceCompactionWithDifferentTimeseries() @Test public void testAlignedCrossSpaceCompactionWithAllDataDeletedInTimeseries() throws IOException, WriteProcessException, MetadataException, StorageEngineException, - InterruptedException { + InterruptedException, ExecutionException { TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30); registerTimeseriesInMManger(4, 5, true); createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true); @@ -3291,7 +3292,7 @@ public void testAlignedCrossSpaceCompactionWithAllDataDeletedInTimeseries() @Test public void testAlignedCrossSpaceCompactionWithAllDataDeletedInOneTargetFile() throws IOException, WriteProcessException, MetadataException, StorageEngineException, - InterruptedException { + InterruptedException, ExecutionException { TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30); registerTimeseriesInMManger(4, 5, true); createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true); @@ -3500,7 +3501,7 @@ public void testAlignedCrossSpaceCompactionWithAllDataDeletedInOneTargetFile() @Test public void testAlignedCrossSpaceCompactionWithFileTimeIndexResource() throws IOException, WriteProcessException, MetadataException, StorageEngineException, - InterruptedException { + InterruptedException, ExecutionException { TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30); registerTimeseriesInMManger(4, 5, true); createFiles(2, 2, 3, 300, 0, 0, 50, 50, true, true); @@ -3694,7 +3695,7 @@ public void testAlignedCrossSpaceCompactionWithFileTimeIndexResource() } @Test - public void testCrossSpaceCompactionWithNewDeviceInUnseqFile() { + public void testCrossSpaceCompactionWithNewDeviceInUnseqFile() throws ExecutionException { TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30); try { registerTimeseriesInMManger(6, 6, false); @@ -3769,7 +3770,8 @@ public void testCrossSpaceCompactionWithNewDeviceInUnseqFile() { } @Test - public void testCrossSpaceCompactionWithDeviceMaxTimeLaterInUnseqFile() { + public void testCrossSpaceCompactionWithDeviceMaxTimeLaterInUnseqFile() + throws ExecutionException { TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30); try { registerTimeseriesInMManger(6, 6, false); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java index 09e17a763fdb7..095a6d260fc4a 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java @@ -63,6 +63,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.iotdb.db.engine.compaction.utils.CompactionCheckerUtils.putChunk; @@ -350,7 +351,7 @@ public void testDeserializePage() throws MetadataException, IOException, WritePr } } } - } catch (InterruptedException | StorageEngineException e) { + } catch (InterruptedException | StorageEngineException | ExecutionException e) { e.printStackTrace(); } finally { IoTDBDescriptor.getInstance() @@ -365,7 +366,7 @@ public void testDeserializePage() throws MetadataException, IOException, WritePr @Test public void testAppendPage() throws IOException, MetadataException, InterruptedException, StorageEngineException, - WriteProcessException { + WriteProcessException, ExecutionException { for (int toMergeFileNum : toMergeFileNums) { for (CompactionTimeseriesType compactionTimeseriesType : compactionTimeseriesTypes) { @@ -632,7 +633,7 @@ public void testAppendPage() @Test public void testAppendChunk() throws IOException, IllegalPathException, MetadataException, StorageEngineException, - WriteProcessException { + WriteProcessException, ExecutionException { long prevChunkPointNumLowerBoundInCompaction = IoTDBDescriptor.getInstance().getConfig().getChunkPointNumLowerBoundInCompaction(); IoTDBDescriptor.getInstance().getConfig().setChunkPointNumLowerBoundInCompaction(1); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java index 588b1af97e056..1fd50b21febbf 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java @@ -47,6 +47,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; @@ -57,10 +59,12 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; import static org.apache.iotdb.db.engine.compaction.utils.CompactionCheckerUtils.putOnePageChunk; public class InnerUnseqCompactionTest { + private static final Logger LOG = LoggerFactory.getLogger(InnerUnseqCompactionTest.class); static final String COMPACTION_TEST_SG = "root.compactionTest"; static final String[] fullPaths = new String[] { @@ -132,7 +136,7 @@ public void tearDown() throws IOException, StorageEngineException { @Test public void test() throws MetadataException, IOException, StorageEngineException, WriteProcessException, - InterruptedException { + InterruptedException, ExecutionException { for (int toMergeFileNum : toMergeFileNums) { for (CompactionTimeseriesType compactionTimeseriesType : compactionTimeseriesTypes) { for (boolean compactionBeforeHasMod : compactionBeforeHasMods) { @@ -351,6 +355,13 @@ public void test() toDeleteTimeseriesAndTime, tsFileResource, false); } } + LOG.error( + "{} {} {} {} {}", + toMergeFileNum, + compactionTimeseriesType, + compactionBeforeHasMod, + compactionHasMod, + compactionOverlapType); TsFileResource targetTsFileResource = CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources( toMergeResources, false) diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerOldTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerOldTest.java index 59b0ab3d3d6f8..b48d494c0beaa 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerOldTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerOldTest.java @@ -45,6 +45,7 @@ import java.io.File; import java.io.IOException; import java.util.Collections; +import java.util.concurrent.ExecutionException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -78,7 +79,7 @@ public void tearDown() throws IOException, StorageEngineException { @Test public void testCompact() throws IOException, MetadataException, InterruptedException, StorageEngineException, - WriteProcessException { + WriteProcessException, ExecutionException { TsFileResource targetTsFileResource = new TsFileResource( new File( diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java index 831f8cd120fde..9ee1f7f566869 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java @@ -186,6 +186,14 @@ public static ChunkMetadata deserializeFrom( return chunkMetaData; } + public static ChunkMetadata deserializeFrom(ByteBuffer buffer, TSDataType dataType) { + ChunkMetadata chunkMetadata = new ChunkMetadata(); + chunkMetadata.tsDataType = dataType; + chunkMetadata.offsetOfChunkHeader = ReadWriteIOUtils.readLong(buffer); + chunkMetadata.statistics = Statistics.deserialize(buffer, dataType); + return chunkMetadata; + } + @Override public long getVersion() { return version; diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java index 062ffd6183ae1..44cdc8b0bf4b6 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java @@ -81,6 +81,11 @@ public static MetadataIndexNode constructMetadataIndex( measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT)); } + return checkAndBuildLevelIndex(deviceMetadataIndexMap, out); + } + + public static MetadataIndexNode checkAndBuildLevelIndex( + Map deviceMetadataIndexMap, TsFileOutput out) throws IOException { // if not exceed the max child nodes num, ignore the device index and directly point to the // measurement if (deviceMetadataIndexMap.size() <= config.getMaxDegreeOfIndexNode()) { @@ -123,7 +128,7 @@ public static MetadataIndexNode constructMetadataIndex( * @param out tsfile output * @param type MetadataIndexNode type */ - private static MetadataIndexNode generateRootNode( + public static MetadataIndexNode generateRootNode( Queue metadataIndexNodeQueue, TsFileOutput out, MetadataIndexNodeType type) throws IOException { int queueSize = metadataIndexNodeQueue.size(); @@ -148,7 +153,7 @@ private static MetadataIndexNode generateRootNode( return metadataIndexNodeQueue.poll(); } - private static void addCurrentIndexNodeToQueue( + public static void addCurrentIndexNodeToQueue( MetadataIndexNode currentIndexNode, Queue metadataIndexNodeQueue, TsFileOutput out) diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java index 3f6f6336b30ad..1d3972cafe4d5 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java @@ -74,7 +74,7 @@ public void addEntry(MetadataIndexEntry metadataIndexEntry) { this.children.add(metadataIndexEntry); } - boolean isFull() { + public boolean isFull() { return children.size() >= config.getMaxDegreeOfIndexNode(); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java index 95e01e2da11fd..f6f974fc1ad13 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java @@ -105,10 +105,15 @@ public int serializeTo(OutputStream outputStream) throws IOException { * @param outputStream -output stream to determine byte length * @return -byte length */ - public int serializeBloomFilter(OutputStream outputStream, Set paths) throws IOException { - int byteLen = 0; + public int buildAndSerializeBloomFilter(OutputStream outputStream, Set paths) + throws IOException { BloomFilter filter = buildBloomFilter(paths); + return serializeBloomFilter(outputStream, filter); + } + public int serializeBloomFilter(OutputStream outputStream, BloomFilter filter) + throws IOException { + int byteLen = 0; byte[] bytes = filter.serialize(); byteLen += ReadWriteForEncodingUtils.writeUnsignedVarInt(bytes.length, outputStream); outputStream.write(bytes); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java index 78253124b89a9..391426cc34fe1 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java @@ -78,6 +78,18 @@ public RestorableTsFileIOWriter(File file) throws IOException { this(file, true); } + /** + * @param file a given tsfile path you want to (continue to) write + * @throws IOException if write failed, or the file is broken but autoRepair==false. + */ + public RestorableTsFileIOWriter(File file, long maxMetadataSize) throws IOException { + this(file, true); + this.maxMetadataSize = maxMetadataSize; + this.enableMemoryControl = true; + this.chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX); + this.checkMetadataSizeAndMayFlush(); + } + public RestorableTsFileIOWriter(File file, boolean truncate) throws IOException { if (logger.isDebugEnabled()) { logger.debug("{} is opened.", file.getName()); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java index 22a68bdfd7612..851f03c192536 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java @@ -26,35 +26,46 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; -import org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor; +import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry; import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode; import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; import org.apache.iotdb.tsfile.read.common.Chunk; import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.utils.BloomFilter; import org.apache.iotdb.tsfile.utils.BytesUtils; +import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.utils.PublicBAOS; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; +import org.apache.iotdb.tsfile.write.writer.tsmiterator.TSMIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.TreeMap; +import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.addCurrentIndexNodeToQueue; +import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.checkAndBuildLevelIndex; +import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.generateRootNode; + /** * TsFileIOWriter is used to construct metadata and write data stored in memory to output stream. */ @@ -76,7 +87,7 @@ public class TsFileIOWriter implements AutoCloseable { protected File file; // current flushed Chunk - private ChunkMetadata currentChunkMetadata; + protected ChunkMetadata currentChunkMetadata; // current flushed ChunkGroup protected List chunkMetadataList = new ArrayList<>(); // all flushed ChunkGroups @@ -93,6 +104,20 @@ public class TsFileIOWriter implements AutoCloseable { private long minPlanIndex; private long maxPlanIndex; + // the following variable is used for memory control + protected long maxMetadataSize; + protected long currentChunkMetadataSize = 0L; + protected File chunkMetadataTempFile; + protected LocalTsFileOutput tempOutput; + protected volatile boolean hasChunkMetadataInDisk = false; + protected String currentSeries = null; + // record the total num of path in order to make bloom filter + protected int pathCount = 0; + protected boolean enableMemoryControl = false; + private Path lastSerializePath = null; + protected LinkedList endPosInCMTForDevice = new LinkedList<>(); + public static final String CHUNK_METADATA_TEMP_FILE_SUFFIX = ".cmt"; + /** empty construct function. */ protected TsFileIOWriter() {} @@ -126,6 +151,15 @@ public TsFileIOWriter(TsFileOutput output, boolean test) { this.out = output; } + /** for write with memory control */ + public TsFileIOWriter(File file, boolean enableMemoryControl, long maxMetadataSize) + throws IOException { + this(file); + this.enableMemoryControl = enableMemoryControl; + this.maxMetadataSize = maxMetadataSize; + chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX); + } + /** * Writes given bytes to output stream. This method is called when total memory size exceeds the * chunk group size threshold. @@ -249,6 +283,9 @@ public void writeChunk(Chunk chunk) throws IOException { /** end chunk and write some log. */ public void endCurrentChunk() { + if (enableMemoryControl) { + this.currentChunkMetadataSize += currentChunkMetadata.calculateRamSize(); + } chunkMetadataList.add(currentChunkMetadata); currentChunkMetadata = null; } @@ -260,47 +297,14 @@ public void endCurrentChunk() { */ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning public void endFile() throws IOException { - long metaOffset = out.getPosition(); - - // serialize the SEPARATOR of MetaData - ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream()); - - // group ChunkMetadata by series - Map> chunkMetadataListMap = new TreeMap<>(); - - for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) { - List chunkMetadatas = chunkGroupMetadata.getChunkMetadataList(); - for (IChunkMetadata chunkMetadata : chunkMetadatas) { - Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid()); - chunkMetadataListMap.computeIfAbsent(series, k -> new ArrayList<>()).add(chunkMetadata); - } - } - - MetadataIndexNode metadataIndex = flushMetadataIndex(chunkMetadataListMap); - TsFileMetadata tsFileMetaData = new TsFileMetadata(); - tsFileMetaData.setMetadataIndex(metadataIndex); - tsFileMetaData.setMetaOffset(metaOffset); + checkInMemoryPathCount(); + readChunkMetadataAndConstructIndexTree(); long footerIndex = out.getPosition(); if (logger.isDebugEnabled()) { logger.debug("start to flush the footer,file pos:{}", footerIndex); } - // write TsFileMetaData - int size = tsFileMetaData.serializeTo(out.wrapAsStream()); - if (logger.isDebugEnabled()) { - logger.debug("finish flushing the footer {}, file pos:{}", tsFileMetaData, out.getPosition()); - } - - // write bloom filter - size += tsFileMetaData.serializeBloomFilter(out.wrapAsStream(), chunkMetadataListMap.keySet()); - if (logger.isDebugEnabled()) { - logger.debug("finish flushing the bloom filter file pos:{}", out.getPosition()); - } - - // write TsFileMetaData size - ReadWriteIOUtils.write(size, out.wrapAsStream()); // write the size of the file metadata. - // write magic string out.write(MAGIC_STRING_BYTES); @@ -312,63 +316,112 @@ public void endFile() throws IOException { canWrite = false; } - /** - * Flush TsFileMetadata, including ChunkMetadataList and TimeseriesMetaData - * - * @param chunkMetadataListMap chunkMetadata that Path.mask == 0 - * @return MetadataIndexEntry list in TsFileMetadata - */ - private MetadataIndexNode flushMetadataIndex(Map> chunkMetadataListMap) - throws IOException { + private void checkInMemoryPathCount() { + for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) { + pathCount += chunkGroupMetadata.getChunkMetadataList().size(); + } + } - // convert ChunkMetadataList to this field - deviceTimeseriesMetadataMap = new LinkedHashMap<>(); - // create device -> TimeseriesMetaDataList Map - for (Map.Entry> entry : chunkMetadataListMap.entrySet()) { - // for ordinary path - flushOneChunkMetadata(entry.getKey(), entry.getValue()); + private void readChunkMetadataAndConstructIndexTree() throws IOException { + if (tempOutput != null) { + tempOutput.close(); } + long metaOffset = out.getPosition(); - // construct TsFileMetadata and return - return MetadataIndexConstructor.constructMetadataIndex(deviceTimeseriesMetadataMap, out); - } + // serialize the SEPARATOR of MetaData + ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream()); - /** - * Flush one chunkMetadata - * - * @param path Path of chunk - * @param chunkMetadataList List of chunkMetadata about path(previous param) - */ - private void flushOneChunkMetadata(Path path, List chunkMetadataList) - throws IOException { - // create TimeseriesMetaData - PublicBAOS publicBAOS = new PublicBAOS(); - TSDataType dataType = chunkMetadataList.get(chunkMetadataList.size() - 1).getDataType(); - Statistics seriesStatistics = Statistics.getStatsByType(dataType); - - int chunkMetadataListLength = 0; - boolean serializeStatistic = (chunkMetadataList.size() > 1); - // flush chunkMetadataList one by one - for (IChunkMetadata chunkMetadata : chunkMetadataList) { - if (!chunkMetadata.getDataType().equals(dataType)) { - continue; + TSMIterator tsmIterator = + hasChunkMetadataInDisk + ? TSMIterator.getTSMIteratorInDisk( + chunkMetadataTempFile, chunkGroupMetadataList, endPosInCMTForDevice) + : TSMIterator.getTSMIteratorInMemory(chunkGroupMetadataList); + Map deviceMetadataIndexMap = new TreeMap<>(); + Queue measurementMetadataIndexQueue = new ArrayDeque<>(); + String currentDevice = null; + String prevDevice = null; + MetadataIndexNode currentIndexNode = + new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT); + TSFileConfig config = TSFileDescriptor.getInstance().getConfig(); + int seriesIdxForCurrDevice = 0; + BloomFilter filter = + BloomFilter.getEmptyBloomFilter( + TSFileDescriptor.getInstance().getConfig().getBloomFilterErrorRate(), pathCount); + + int indexCount = 0; + while (tsmIterator.hasNext()) { + // read in all chunk metadata of one series + // construct the timeseries metadata for this series + Pair timeseriesMetadataPair = tsmIterator.next(); + TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right; + currentSeries = timeseriesMetadataPair.left; + + indexCount++; + // build bloom filter + filter.add(currentSeries); + // construct the index tree node for the series + Path currentPath = null; + if (timeseriesMetadata.getTSDataType() == TSDataType.VECTOR) { + // this series is the time column of the aligned device + // the full series path will be like "root.sg.d." + // we remove the last . in the series id here + currentPath = new Path(currentSeries); + currentDevice = currentSeries.substring(0, currentSeries.length() - 1); + } else { + currentPath = new Path(currentSeries, true); + currentDevice = currentPath.getDevice(); } - chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, serializeStatistic); - seriesStatistics.mergeStatistics(chunkMetadata.getStatistics()); + if (!currentDevice.equals(prevDevice)) { + if (prevDevice != null) { + addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out); + deviceMetadataIndexMap.put( + prevDevice, + generateRootNode( + measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT)); + currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT); + } + measurementMetadataIndexQueue = new ArrayDeque<>(); + seriesIdxForCurrDevice = 0; + } + + if (seriesIdxForCurrDevice % config.getMaxDegreeOfIndexNode() == 0) { + if (currentIndexNode.isFull()) { + addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out); + currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT); + } + if (timeseriesMetadata.getTSDataType() != TSDataType.VECTOR) { + currentIndexNode.addEntry( + new MetadataIndexEntry(currentPath.getMeasurement(), out.getPosition())); + } else { + currentIndexNode.addEntry(new MetadataIndexEntry("", out.getPosition())); + } + } + + prevDevice = currentDevice; + seriesIdxForCurrDevice++; + // serialize the timeseries metadata to file + timeseriesMetadata.serializeTo(out.wrapAsStream()); } - TimeseriesMetadata timeseriesMetadata = - new TimeseriesMetadata( - (byte) - ((serializeStatistic ? (byte) 1 : (byte) 0) | chunkMetadataList.get(0).getMask()), - chunkMetadataListLength, - path.getMeasurement(), - dataType, - seriesStatistics, - publicBAOS); - deviceTimeseriesMetadataMap - .computeIfAbsent(path.getDevice(), k -> new ArrayList<>()) - .add(timeseriesMetadata); + addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out); + if (prevDevice != null) { + deviceMetadataIndexMap.put( + prevDevice, + generateRootNode( + measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT)); + } + + MetadataIndexNode metadataIndex = checkAndBuildLevelIndex(deviceMetadataIndexMap, out); + + TsFileMetadata tsFileMetadata = new TsFileMetadata(); + tsFileMetadata.setMetadataIndex(metadataIndex); + tsFileMetadata.setMetaOffset(metaOffset); + + int size = tsFileMetadata.serializeTo(out.wrapAsStream()); + size += tsFileMetadata.serializeBloomFilter(out.wrapAsStream(), filter); + + // write TsFileMetaData size + ReadWriteIOUtils.write(size, out.wrapAsStream()); } /** @@ -412,6 +465,9 @@ public void reset() throws IOException { public void close() throws IOException { canWrite = false; out.close(); + if (tempOutput != null) { + this.tempOutput.close(); + } } void writeSeparatorMaskForTest() throws IOException { @@ -490,6 +546,30 @@ public TsFileOutput getIOWriterOut() { * @return DeviceTimeseriesMetadataMap */ public Map> getDeviceTimeseriesMetadataMap() { + Map> deviceTimeseriesMetadataMap = new TreeMap<>(); + Map>> chunkMetadataMap = new TreeMap<>(); + for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) { + for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) { + chunkMetadataMap + .computeIfAbsent(chunkGroupMetadata.getDevice(), x -> new TreeMap<>()) + .computeIfAbsent(chunkMetadata.getMeasurementUid(), x -> new ArrayList<>()) + .add(chunkMetadata); + } + } + for (String device : chunkMetadataMap.keySet()) { + Map> seriesToChunkMetadataMap = chunkMetadataMap.get(device); + for (Map.Entry> entry : seriesToChunkMetadataMap.entrySet()) { + try { + deviceTimeseriesMetadataMap + .computeIfAbsent(device, x -> new ArrayList<>()) + .add(TSMIterator.constructOneTimeseriesMetadata(entry.getKey(), entry.getValue())); + } catch (IOException e) { + logger.error("Failed to get device timeseries metadata map", e); + return null; + } + } + } + return deviceTimeseriesMetadataMap; } @@ -508,4 +588,85 @@ public long getMaxPlanIndex() { public void setMaxPlanIndex(long maxPlanIndex) { this.maxPlanIndex = maxPlanIndex; } + + /** + * Check if the size of chunk metadata in memory is greater than the given threshold. If so, the + * chunk metadata will be written to a temp files. Notice! If you are writing a aligned device + * in row, you should make sure all data of current writing device has been written before this + * method is called. For writing not aligned series or writing aligned series in column, you + * should make sure that all data of one series is written before you call this function. + * + * @throws IOException + */ + public void checkMetadataSizeAndMayFlush() throws IOException { + // This function should be called after all data of an aligned device has been written + if (enableMemoryControl && currentChunkMetadataSize > maxMetadataSize) { + try { + sortAndFlushChunkMetadata(); + } catch (IOException e) { + logger.error("Meets exception when flushing metadata to temp file for {}", file, e); + throw e; + } + } + } + + /** + * Sort the chunk metadata by the lexicographical order and the start time of the chunk, then + * flush them to a temp file. + * + * @throws IOException + */ + protected void sortAndFlushChunkMetadata() throws IOException { + // group by series + List>> sortedChunkMetadataList = + TSMIterator.sortChunkMetadata( + chunkGroupMetadataList, currentChunkGroupDeviceId, chunkMetadataList); + if (tempOutput == null) { + tempOutput = new LocalTsFileOutput(new FileOutputStream(chunkMetadataTempFile)); + } + hasChunkMetadataInDisk = true; + for (Pair> pair : sortedChunkMetadataList) { + Path seriesPath = pair.left; + boolean isNewPath = !seriesPath.equals(lastSerializePath); + if (isNewPath) { + // record the count of path to construct bloom filter later + pathCount++; + } + List iChunkMetadataList = pair.right; + writeChunkMetadataToTempFile(iChunkMetadataList, seriesPath, isNewPath); + lastSerializePath = seriesPath; + logger.debug("Flushing {}", seriesPath); + } + // clear the cache metadata to release the memory + chunkGroupMetadataList.clear(); + if (chunkMetadataList != null) { + chunkMetadataList.clear(); + } + } + + private void writeChunkMetadataToTempFile( + List iChunkMetadataList, Path seriesPath, boolean isNewPath) + throws IOException { + // [DeviceId] measurementId datatype size chunkMetadataBuffer + if (lastSerializePath == null + || !seriesPath.getDevice().equals(lastSerializePath.getDevice())) { + // mark the end position of last device + endPosInCMTForDevice.add(tempOutput.getPosition()); + // serialize the device + // for each device, we only serialize it once, in order to save io + ReadWriteIOUtils.write(seriesPath.getDevice(), tempOutput.wrapAsStream()); + } + if (isNewPath && iChunkMetadataList.size() > 0) { + // serialize the public info of this measurement + ReadWriteIOUtils.writeVar(seriesPath.getMeasurement(), tempOutput.wrapAsStream()); + ReadWriteIOUtils.write(iChunkMetadataList.get(0).getDataType(), tempOutput.wrapAsStream()); + } + PublicBAOS buffer = new PublicBAOS(); + int totalSize = 0; + for (IChunkMetadata chunkMetadata : iChunkMetadataList) { + totalSize += chunkMetadata.serializeTo(buffer, true); + } + ReadWriteIOUtils.write(totalSize, tempOutput.wrapAsStream()); + buffer.writeTo(tempOutput); + } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/DiskTSMIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/DiskTSMIterator.java new file mode 100644 index 0000000000000..fd02f1438a68a --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/DiskTSMIterator.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.write.writer.tsmiterator; + +import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput; +import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +/** + * This class read ChunkMetadata iteratively from disk(.cmt file) and memory(list of + * ChunkGroupMetadata), and construct them as TimeseriesMetadata. It will read ChunkMetadata in disk + * first, and after all ChunkMetadata in disk is read, it will read ChunkMetadata in memory. + */ +public class DiskTSMIterator extends TSMIterator { + + private static final Logger LOG = LoggerFactory.getLogger(DiskTSMIterator.class); + + private LinkedList endPosForEachDevice; + private File cmtFile; + private LocalTsFileInput input; + private long fileLength = 0; + private long currentPos = 0; + private long nextEndPosForDevice = 0; + private String currentDevice; + private boolean remainsInFile = true; + + protected DiskTSMIterator( + File cmtFile, + List chunkGroupMetadataList, + LinkedList endPosForEachDevice) + throws IOException { + super(chunkGroupMetadataList); + this.cmtFile = cmtFile; + this.endPosForEachDevice = endPosForEachDevice; + this.input = new LocalTsFileInput(cmtFile.toPath()); + this.fileLength = cmtFile.length(); + this.nextEndPosForDevice = endPosForEachDevice.removeFirst(); + } + + @Override + public boolean hasNext() { + return remainsInFile || iterator.hasNext(); + } + + @Override + public Pair next() { + try { + if (remainsInFile) { + // deserialize from file + return getTimeSerisMetadataFromFile(); + } else { + // get from memory iterator + return super.next(); + } + } catch (IOException e) { + LOG.error("Meets IOException when reading timeseries metadata from disk", e); + return null; + } + } + + private Pair getTimeSerisMetadataFromFile() throws IOException { + if (currentPos == nextEndPosForDevice) { + // deserialize the current device name + currentDevice = ReadWriteIOUtils.readString(input.wrapAsInputStream()); + nextEndPosForDevice = + endPosForEachDevice.size() > 0 ? endPosForEachDevice.removeFirst() : fileLength; + } + // deserialize public info for measurement + String measurementUid = ReadWriteIOUtils.readVarIntString(input.wrapAsInputStream()); + byte dataTypeInByte = ReadWriteIOUtils.readByte(input.wrapAsInputStream()); + TSDataType dataType = TSDataType.getTsDataType(dataTypeInByte); + int chunkBufferSize = ReadWriteIOUtils.readInt(input.wrapAsInputStream()); + ByteBuffer chunkBuffer = ByteBuffer.allocate(chunkBufferSize); + int readSize = ReadWriteIOUtils.readAsPossible(input, chunkBuffer); + if (readSize < chunkBufferSize) { + throw new IOException( + String.format( + "Expected to read %s bytes, but actually read %s bytes", chunkBufferSize, readSize)); + } + chunkBuffer.flip(); + + // deserialize chunk metadata from chunk buffer + List chunkMetadataList = new ArrayList<>(); + while (chunkBuffer.hasRemaining()) { + chunkMetadataList.add(ChunkMetadata.deserializeFrom(chunkBuffer, dataType)); + } + updateCurrentPos(); + return new Pair<>( + currentDevice + "." + measurementUid, + constructOneTimeseriesMetadata(measurementUid, chunkMetadataList)); + } + + private void updateCurrentPos() throws IOException { + currentPos = input.position(); + if (currentPos >= fileLength) { + remainsInFile = false; + input.close(); + } + } +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/TSMIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/TSMIterator.java new file mode 100644 index 0000000000000..f11242f296240 --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/TSMIterator.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.write.writer.tsmiterator; + +import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.utils.PublicBAOS; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +/** + * TSMIterator returns full path of series and its TimeseriesMetadata iteratively. It accepts data + * source from memory or disk. Static method getTSMIteratorInMemory returns a TSMIterator that reads + * from memory, and static method getTSMIteratorInDisk returns a TSMIterator that reads from disk. + */ +public class TSMIterator { + private static final Logger LOG = LoggerFactory.getLogger(TSMIterator.class); + protected List>> sortedChunkMetadataList; + protected Iterator>> iterator; + + protected TSMIterator(List chunkGroupMetadataList) { + this.sortedChunkMetadataList = sortChunkMetadata(chunkGroupMetadataList, null, null); + this.iterator = sortedChunkMetadataList.iterator(); + } + + public static TSMIterator getTSMIteratorInMemory( + List chunkGroupMetadataList) { + return new TSMIterator(chunkGroupMetadataList); + } + + public static TSMIterator getTSMIteratorInDisk( + File cmtFile, List chunkGroupMetadataList, LinkedList serializePos) + throws IOException { + return new DiskTSMIterator(cmtFile, chunkGroupMetadataList, serializePos); + } + + public boolean hasNext() { + return iterator.hasNext(); + } + + public Pair next() throws IOException { + Pair> nextPair = iterator.next(); + return new Pair<>( + nextPair.left.getFullPath(), + constructOneTimeseriesMetadata(nextPair.left.getMeasurement(), nextPair.right)); + } + + public static TimeseriesMetadata constructOneTimeseriesMetadata( + String measurementId, List chunkMetadataList) throws IOException { + // create TimeseriesMetaData + PublicBAOS publicBAOS = new PublicBAOS(); + TSDataType dataType = chunkMetadataList.get(chunkMetadataList.size() - 1).getDataType(); + Statistics seriesStatistics = Statistics.getStatsByType(dataType); + + int chunkMetadataListLength = 0; + boolean serializeStatistic = (chunkMetadataList.size() > 1); + // flush chunkMetadataList one by one + for (IChunkMetadata chunkMetadata : chunkMetadataList) { + if (!chunkMetadata.getDataType().equals(dataType)) { + continue; + } + chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, serializeStatistic); + seriesStatistics.mergeStatistics(chunkMetadata.getStatistics()); + } + + TimeseriesMetadata timeseriesMetadata = + new TimeseriesMetadata( + (byte) + ((serializeStatistic ? (byte) 1 : (byte) 0) | chunkMetadataList.get(0).getMask()), + chunkMetadataListLength, + measurementId, + dataType, + seriesStatistics, + publicBAOS); + return timeseriesMetadata; + } + + public static List>> sortChunkMetadata( + List chunkGroupMetadataList, + String currentDevice, + List chunkMetadataList) { + Map>> chunkMetadataMap = new TreeMap<>(); + List>> sortedChunkMetadataList = new LinkedList<>(); + for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) { + chunkMetadataMap.computeIfAbsent(chunkGroupMetadata.getDevice(), x -> new TreeMap<>()); + for (IChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) { + chunkMetadataMap + .get(chunkGroupMetadata.getDevice()) + .computeIfAbsent( + new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid()), + x -> new ArrayList<>()) + .add(chunkMetadata); + } + } + if (currentDevice != null) { + for (IChunkMetadata chunkMetadata : chunkMetadataList) { + chunkMetadataMap + .computeIfAbsent(currentDevice, x -> new TreeMap<>()) + .computeIfAbsent( + new Path(currentDevice, chunkMetadata.getMeasurementUid()), x -> new ArrayList<>()) + .add(chunkMetadata); + } + } + + for (Map.Entry>> entry : chunkMetadataMap.entrySet()) { + Map> seriesChunkMetadataMap = entry.getValue(); + for (Map.Entry> seriesChunkMetadataEntry : + seriesChunkMetadataMap.entrySet()) { + sortedChunkMetadataList.add( + new Pair<>(seriesChunkMetadataEntry.getKey(), seriesChunkMetadataEntry.getValue())); + } + } + return sortedChunkMetadataList; + } +} diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java new file mode 100644 index 0000000000000..c97a9a07742a7 --- /dev/null +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.write; + +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; +import org.apache.iotdb.tsfile.common.constant.TsFileConstant; +import org.apache.iotdb.tsfile.encoding.decoder.Decoder; +import org.apache.iotdb.tsfile.file.MetaMarker; +import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader; +import org.apache.iotdb.tsfile.file.header.ChunkHeader; +import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.TimeValuePair; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.IBatchDataIterator; +import org.apache.iotdb.tsfile.read.reader.IChunkReader; +import org.apache.iotdb.tsfile.read.reader.IPointReader; +import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.read.reader.page.PageReader; +import org.apache.iotdb.tsfile.read.reader.page.TimePageReader; +import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader; +import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; + +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +/** This class provide some static method to check the integrity of tsfile */ +public class TsFileIntegrityCheckingTool { + private static Logger LOG = LoggerFactory.getLogger(TsFileIntegrityCheckingTool.class); + + /** + * This method check the integrity of file by reading it from the start to the end. It mainly + * checks the integrity of the chunks. + * + * @param filename + */ + public static void checkIntegrityBySequenceRead(String filename) { + try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) { + String headMagicString = reader.readHeadMagic(); + Assert.assertEquals(TSFileConfig.MAGIC_STRING, headMagicString); + String tailMagicString = reader.readTailMagic(); + Assert.assertEquals(TSFileConfig.MAGIC_STRING, tailMagicString); + reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1); + List timeBatch = new ArrayList<>(); + int pageIndex = 0; + byte marker; + while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) { + switch (marker) { + case MetaMarker.CHUNK_HEADER: + case MetaMarker.TIME_CHUNK_HEADER: + case MetaMarker.VALUE_CHUNK_HEADER: + case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER: + case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER: + case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER: + ChunkHeader header = reader.readChunkHeader(marker); + if (header.getDataSize() == 0) { + // empty value chunk + break; + } + Decoder defaultTimeDecoder = + Decoder.getDecoderByType( + TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), + TSDataType.INT64); + Decoder valueDecoder = + Decoder.getDecoderByType(header.getEncodingType(), header.getDataType()); + int dataSize = header.getDataSize(); + pageIndex = 0; + if (header.getDataType() == TSDataType.VECTOR) { + timeBatch.clear(); + } + while (dataSize > 0) { + valueDecoder.reset(); + PageHeader pageHeader = + reader.readPageHeader( + header.getDataType(), + (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER); + ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType()); + if ((header.getChunkType() & (byte) TsFileConstant.TIME_COLUMN_MASK) + == (byte) TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk + TimePageReader timePageReader = + new TimePageReader(pageHeader, pageData, defaultTimeDecoder); + timeBatch.add(timePageReader.getNextTimeBatch()); + } else if ((header.getChunkType() & (byte) TsFileConstant.VALUE_COLUMN_MASK) + == (byte) TsFileConstant.VALUE_COLUMN_MASK) { // Value Chunk + ValuePageReader valuePageReader = + new ValuePageReader(pageHeader, pageData, header.getDataType(), valueDecoder); + TsPrimitiveType[] valueBatch = + valuePageReader.nextValueBatch(timeBatch.get(pageIndex)); + } else { // NonAligned Chunk + PageReader pageReader = + new PageReader( + pageData, header.getDataType(), valueDecoder, defaultTimeDecoder, null); + BatchData batchData = pageReader.getAllSatisfiedPageData(); + } + pageIndex++; + dataSize -= pageHeader.getSerializedPageSize(); + } + break; + case MetaMarker.CHUNK_GROUP_HEADER: + ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader(); + break; + case MetaMarker.OPERATION_INDEX_RANGE: + reader.readPlanIndex(); + break; + default: + MetaMarker.handleUnexpectedMarker(marker); + } + } + } catch (IOException e) { + LOG.error("Meet exception when checking integrity of tsfile", e); + Assert.fail(); + } + } + + /** + * This method checks the integrity of the file by mimicking the process of the query, which reads + * the metadata index tree first, and get the timeseries metadata list and chunk metadata list. + * After that, this method acquires single chunk according to chunk metadata, then it deserializes + * the chunk, and verifies the correctness of the data. + * + * @param filename File to be check + * @param originData The origin data in a map format, Device -> SeriesId -> List>, + * each inner list stands for a chunk. + */ + public static void checkIntegrityByQuery( + String filename, + Map>>>> originData) { + try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) { + Map> allTimeseriesMetadata = + reader.getAllTimeseriesMetadata(true); + Assert.assertEquals(originData.size(), allTimeseriesMetadata.size()); + // check each series + for (Map.Entry> entry : allTimeseriesMetadata.entrySet()) { + String deviceId = entry.getKey(); + List timeseriesMetadataList = entry.getValue(); + boolean vectorMode = false; + if (timeseriesMetadataList.size() > 0 + && timeseriesMetadataList.get(0).getTSDataType() != TSDataType.VECTOR) { + Assert.assertEquals(originData.get(deviceId).size(), timeseriesMetadataList.size()); + } else { + vectorMode = true; + Assert.assertEquals(originData.get(deviceId).size(), timeseriesMetadataList.size() - 1); + } + + if (!vectorMode) { + // check integrity of not aligned series + for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) { + // get its chunk metadata list, and read the chunk + String measurementId = timeseriesMetadata.getMeasurementId(); + List>> originChunks = + originData.get(deviceId).get(measurementId); + List chunkMetadataList = timeseriesMetadata.getChunkMetadataList(); + Assert.assertEquals(originChunks.size(), chunkMetadataList.size()); + chunkMetadataList.sort(Comparator.comparing(IChunkMetadata::getStartTime)); + for (int i = 0; i < chunkMetadataList.size(); ++i) { + Chunk chunk = reader.readMemChunk((ChunkMetadata) chunkMetadataList.get(i)); + ChunkReader chunkReader = new ChunkReader(chunk, null); + List> originValue = originChunks.get(i); + // deserialize the chunk and verify it with origin data + for (int valIdx = 0; chunkReader.hasNextSatisfiedPage(); ) { + IPointReader pointReader = chunkReader.nextPageData().getBatchDataIterator(); + while (pointReader.hasNextTimeValuePair()) { + TimeValuePair pair = pointReader.nextTimeValuePair(); + Assert.assertEquals( + originValue.get(valIdx).left.longValue(), pair.getTimestamp()); + Assert.assertEquals(originValue.get(valIdx++).right, pair.getValue()); + } + } + } + } + } else { + // check integrity of vector type + // get the timeseries metadata of the time column + TimeseriesMetadata timeColumnMetadata = timeseriesMetadataList.get(0); + List timeChunkMetadataList = timeColumnMetadata.getChunkMetadataList(); + timeChunkMetadataList.sort(Comparator.comparing(IChunkMetadata::getStartTime)); + + for (int i = 1; i < timeseriesMetadataList.size(); ++i) { + // traverse each value column + List valueChunkMetadataList = + timeseriesMetadataList.get(i).getChunkMetadataList(); + Assert.assertEquals(timeChunkMetadataList.size(), valueChunkMetadataList.size()); + List>> originDataChunks = + originData.get(deviceId).get(timeseriesMetadataList.get(i).getMeasurementId()); + for (int chunkIdx = 0; chunkIdx < timeChunkMetadataList.size(); ++chunkIdx) { + Chunk timeChunk = + reader.readMemChunk((ChunkMetadata) timeChunkMetadataList.get(chunkIdx)); + Chunk valueChunk = + reader.readMemChunk((ChunkMetadata) valueChunkMetadataList.get(chunkIdx)); + // construct an aligned chunk reader using time chunk and value chunk + IChunkReader chunkReader = + new AlignedChunkReader(timeChunk, Collections.singletonList(valueChunk), null); + // verify the values + List> originValue = originDataChunks.get(chunkIdx); + for (int valIdx = 0; chunkReader.hasNextSatisfiedPage(); ) { + IBatchDataIterator pointReader = chunkReader.nextPageData().getBatchDataIterator(); + while (pointReader.hasNext()) { + long time = pointReader.currentTime(); + Assert.assertEquals(originValue.get(valIdx).left.longValue(), time); + Assert.assertEquals( + originValue.get(valIdx++).right.getValue(), + ((TsPrimitiveType[]) pointReader.currentValue())[0].getValue()); + pointReader.next(); + } + } + } + } + } + } + + } catch (IOException e) { + LOG.error("Meet exception when checking integrity of tsfile", e); + Assert.fail(); + } + } +} diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java new file mode 100644 index 0000000000000..44e4af3678819 --- /dev/null +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java @@ -0,0 +1,1261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.write.writer; + +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; +import org.apache.iotdb.tsfile.encoding.encoder.Encoder; +import org.apache.iotdb.tsfile.encoding.encoder.TSEncodingBuilder; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; +import org.apache.iotdb.tsfile.write.TsFileIntegrityCheckingTool; +import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl; +import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; +import org.apache.iotdb.tsfile.write.chunk.TimeChunkWriter; +import org.apache.iotdb.tsfile.write.chunk.ValueChunkWriter; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.writer.tsmiterator.TSMIterator; + +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +public class TsFileIOWriterMemoryControlTest { + private static File testFile = new File("target", "1-1-0-0.tsfile"); + private static File emptyFile = new File("target", "temp"); + private long TEST_CHUNK_SIZE = 1000; + private List sortedSeriesId = new ArrayList<>(); + private List sortedDeviceId = new ArrayList<>(); + private boolean init = false; + + @Before + public void setUp() throws IOException { + if (!init) { + init = true; + for (int i = 0; i < 2048; ++i) { + sortedSeriesId.add("s" + i); + sortedDeviceId.add("root.sg.d" + i); + } + sortedSeriesId.sort((String::compareTo)); + sortedDeviceId.sort((String::compareTo)); + } + TEST_CHUNK_SIZE = 1000; + } + + @After + public void tearDown() throws IOException { + if (testFile.exists()) { + FileUtils.delete(testFile); + } + if (new File(testFile.getPath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX).exists()) { + FileUtils.delete( + new File(testFile.getPath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)); + } + if (emptyFile.exists()) { + FileUtils.delete(emptyFile); + } + } + + /** The following tests is for ChunkMetadata serialization and deserialization. */ + @Test + public void testSerializeAndDeserializeChunkMetadata() throws IOException { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) { + List originChunkMetadataList = new ArrayList<>(); + for (int i = 0; i < 10; ++i) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + for (int j = 0; j < 5; ++j) { + ChunkWriterImpl chunkWriter; + switch (j) { + case 0: + chunkWriter = generateIntData(j, 0L, new ArrayList<>()); + break; + case 1: + chunkWriter = generateBooleanData(j, 0, new ArrayList<>()); + break; + case 2: + chunkWriter = generateFloatData(j, 0L, new ArrayList<>()); + break; + case 3: + chunkWriter = generateDoubleData(j, 0L, new ArrayList<>()); + break; + case 4: + default: + chunkWriter = generateTextData(j, 0L, new ArrayList<>()); + break; + } + chunkWriter.writeToFileWriter(writer); + } + originChunkMetadataList.addAll(writer.chunkMetadataList); + writer.endChunkGroup(); + } + writer.sortAndFlushChunkMetadata(); + writer.tempOutput.flush(); + + TSMIterator iterator = + TSMIterator.getTSMIteratorInDisk( + writer.chunkMetadataTempFile, + writer.chunkGroupMetadataList, + writer.endPosInCMTForDevice); + for (int i = 0; iterator.hasNext(); ++i) { + Pair timeseriesMetadataPair = iterator.next(); + TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right; + Assert.assertEquals(sortedSeriesId.get(i % 5), timeseriesMetadata.getMeasurementId()); + Assert.assertEquals( + originChunkMetadataList.get(i).getDataType(), timeseriesMetadata.getTSDataType()); + Assert.assertEquals( + originChunkMetadataList.get(i).getStatistics(), timeseriesMetadata.getStatistics()); + } + } + } + + @Test + public void testSerializeAndDeserializeAlignedChunkMetadata() throws IOException { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) { + List originChunkMetadataList = new ArrayList<>(); + for (int i = 0; i < 10; ++i) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + AlignedChunkWriterImpl chunkWriter = generateVectorData(0L, new ArrayList<>(), 6); + chunkWriter.writeToFileWriter(writer); + originChunkMetadataList.addAll(writer.chunkMetadataList); + writer.endChunkGroup(); + } + writer.sortAndFlushChunkMetadata(); + writer.tempOutput.flush(); + + List measurementIds = new ArrayList<>(); + for (int i = 0; i < 10; ++i) { + measurementIds.add(sortedDeviceId.get(i) + "."); + for (int j = 1; j <= 6; ++j) { + measurementIds.add(sortedDeviceId.get(i) + ".s" + j); + } + } + TSMIterator iterator = + TSMIterator.getTSMIteratorInDisk( + writer.chunkMetadataTempFile, new ArrayList<>(), writer.endPosInCMTForDevice); + for (int i = 0; iterator.hasNext(); ++i) { + Pair timeseriesMetadataPair = iterator.next(); + String fullPath = timeseriesMetadataPair.left; + TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right; + Assert.assertEquals(measurementIds.get(i), fullPath); + Assert.assertEquals( + originChunkMetadataList.get(i).getDataType(), timeseriesMetadata.getTSDataType()); + Assert.assertEquals( + originChunkMetadataList.get(i).getStatistics(), timeseriesMetadata.getStatistics()); + } + } + } + + @Test + public void testSerializeAndDeserializeMixedChunkMetadata() throws IOException { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) { + List originChunkMetadataList = new ArrayList<>(); + List seriesIds = new ArrayList<>(); + for (int i = 0; i < 10; ++i) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + if (i % 2 == 0) { + // write normal series + for (int j = 0; j < 5; ++j) { + ChunkWriterImpl chunkWriter; + switch (j) { + case 0: + chunkWriter = generateIntData(j, 0L, new ArrayList<>()); + break; + case 1: + chunkWriter = generateBooleanData(j, 0L, new ArrayList<>()); + break; + case 2: + chunkWriter = generateFloatData(j, 0L, new ArrayList<>()); + break; + case 3: + chunkWriter = generateDoubleData(j, 0L, new ArrayList<>()); + break; + case 4: + default: + chunkWriter = generateTextData(j, 0L, new ArrayList<>()); + break; + } + chunkWriter.writeToFileWriter(writer); + seriesIds.add(deviceId + "." + sortedSeriesId.get(j)); + } + } else { + // write vector + AlignedChunkWriterImpl chunkWriter = generateVectorData(0L, new ArrayList<>(), 6); + chunkWriter.writeToFileWriter(writer); + seriesIds.add(deviceId + "."); + for (int l = 1; l <= 6; ++l) { + seriesIds.add(deviceId + ".s" + l); + } + } + originChunkMetadataList.addAll(writer.chunkMetadataList); + writer.endChunkGroup(); + } + writer.sortAndFlushChunkMetadata(); + writer.tempOutput.flush(); + + TSMIterator iterator = + TSMIterator.getTSMIteratorInDisk( + writer.chunkMetadataTempFile, new ArrayList<>(), writer.endPosInCMTForDevice); + for (int i = 0; i < originChunkMetadataList.size(); ++i) { + Pair timeseriesMetadataPair = iterator.next(); + Assert.assertEquals(seriesIds.get(i), timeseriesMetadataPair.left); + Assert.assertEquals( + originChunkMetadataList.get(i).getDataType(), + timeseriesMetadataPair.right.getTSDataType()); + Assert.assertEquals( + originChunkMetadataList.get(i).getStatistics(), + timeseriesMetadataPair.right.getStatistics()); + } + } + } + + /** The following tests is for writing normal series in different nums. */ + + /** + * Write a file with 10 devices and 5 series in each device. For each series, we write one chunk + * for it. This test make sure that each chunk + * + * @throws IOException + */ + @Test + public void testWriteCompleteFileWithNormalChunk() throws IOException { + Map>>>> originData = new HashMap<>(); + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 10; ++i) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + for (int j = 0; j < 5; ++j) { + List> valList = new ArrayList<>(); + ChunkWriterImpl chunkWriter; + switch (j) { + case 0: + chunkWriter = generateIntData(j, 0L, valList); + break; + case 1: + chunkWriter = generateBooleanData(j, 0L, valList); + break; + case 2: + chunkWriter = generateFloatData(j, 0L, valList); + break; + case 3: + chunkWriter = generateDoubleData(j, 0L, valList); + break; + case 4: + default: + chunkWriter = generateTextData(j, 0L, valList); + break; + } + chunkWriter.writeToFileWriter(writer); + writer.checkMetadataSizeAndMayFlush(); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + writer.endChunkGroup(); + } + Assert.assertTrue(writer.hasChunkMetadataInDisk); + writer.endFile(); + } + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData); + } + + /** + * Write a file with 10 devices and 5 series in each device. For each series, we write 100 chunks + * for it. This test make sure that each chunk + * + * @throws IOException + */ + @Test + public void testWriteCompleteFileWithMultipleNormalChunk() throws IOException { + Map>>>> originData = new HashMap<>(); + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 10; ++i) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + for (int j = 0; j < 5; ++j) { + ChunkWriterImpl chunkWriter; + switch (j) { + case 0: + for (int k = 0; k < 10; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 1: + for (int k = 0; k < 10; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 2: + for (int k = 0; k < 10; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 3: + for (int k = 0; k < 10; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 4: + default: + for (int k = 0; k < 10; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + } + writer.checkMetadataSizeAndMayFlush(); + } + writer.endChunkGroup(); + } + Assert.assertTrue(writer.hasChunkMetadataInDisk); + writer.endFile(); + } + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData); + } + + /** + * Write a file with 10 devices and 5 series in each device. For each series, we write 100 chunks + * for it. We maintain some chunk metadata in memory when calling endFile(). + * + * @throws IOException + */ + @Test + public void testWriteCompleteFileWithMetadataRemainsInMemoryWhenEndFile() throws IOException { + Map>>>> originData = new HashMap<>(); + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 10; ++i) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + for (int j = 0; j < 5; ++j) { + ChunkWriterImpl chunkWriter; + switch (j) { + case 0: + for (int k = 0; k < 10; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 1: + for (int k = 0; k < 10; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 2: + for (int k = 0; k < 10; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 3: + for (int k = 0; k < 10; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 4: + default: + for (int k = 0; k < 10; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + } + if (i < 9) { + writer.checkMetadataSizeAndMayFlush(); + } + } + writer.endChunkGroup(); + } + Assert.assertTrue(writer.hasChunkMetadataInDisk); + Assert.assertFalse(writer.chunkGroupMetadataList.isEmpty()); + writer.endFile(); + } + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData); + } + + /** + * Write a file with 2 devices and 5 series in each device. For each series, we write 1024 chunks + * for it. This test make sure that each chunk + * + * @throws IOException + */ + @Test + public void testWriteCompleteFileWithEnormousNormalChunk() throws IOException { + Map>>>> originData = new HashMap<>(); + long originTestChunkSize = TEST_CHUNK_SIZE; + TEST_CHUNK_SIZE = 10; + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 2; ++i) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + for (int j = 0; j < 5; ++j) { + ChunkWriterImpl chunkWriter; + switch (j) { + case 0: + for (int k = 0; k < 1024; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 1: + for (int k = 0; k < 1024; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 2: + for (int k = 0; k < 1024; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 3: + for (int k = 0; k < 1024; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 4: + default: + for (int k = 0; k < 1024; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + } + writer.checkMetadataSizeAndMayFlush(); + } + writer.endChunkGroup(); + } + Assert.assertTrue(writer.hasChunkMetadataInDisk); + writer.endFile(); + } finally { + TEST_CHUNK_SIZE = originTestChunkSize; + } + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData); + } + + /** + * Write a file with 2 devices and 1024 series in each device. For each series, we write 50 chunks + * for it. This test make sure that each chunk + * + * @throws IOException + */ + @Test + public void testWriteCompleteFileWithEnormousSeriesNum() throws IOException { + Map>>>> originTimes = new HashMap<>(); + long originTestChunkSize = TEST_CHUNK_SIZE; + TEST_CHUNK_SIZE = 1; + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 2; ++i) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + for (int j = 0; j < 1024; ++j) { + ChunkWriterImpl chunkWriter; + switch (j % 5) { + case 0: + for (int k = 0; k < 50; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originTimes + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 1: + for (int k = 0; k < 50; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originTimes + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 2: + for (int k = 0; k < 50; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originTimes + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 3: + for (int k = 0; k < 50; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originTimes + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 4: + default: + for (int k = 0; k < 50; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originTimes + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + } + writer.checkMetadataSizeAndMayFlush(); + } + writer.endChunkGroup(); + } + Assert.assertTrue(writer.hasChunkMetadataInDisk); + writer.endFile(); + } finally { + TEST_CHUNK_SIZE = originTestChunkSize; + } + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originTimes); + } + + /** + * Write a file with 1024 devices and 5 series in each device. For each series, we write 10 chunks + * for it. This test make sure that each chunk + * + * @throws IOException + */ + @Test + public void testWriteCompleteFileWithEnormousDeviceNum() throws IOException { + Map>>>> originTimes = new HashMap<>(); + long originTestChunkSize = TEST_CHUNK_SIZE; + TEST_CHUNK_SIZE = 10; + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 1024; ++i) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + for (int j = 0; j < 5; ++j) { + ChunkWriterImpl chunkWriter; + switch (j % 5) { + case 0: + for (int k = 0; k < 10; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originTimes + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 1: + for (int k = 0; k < 10; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originTimes + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 2: + for (int k = 0; k < 10; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originTimes + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 3: + for (int k = 0; k < 10; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originTimes + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 4: + default: + for (int k = 0; k < 10; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originTimes + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + } + writer.checkMetadataSizeAndMayFlush(); + } + writer.endChunkGroup(); + } + Assert.assertTrue(writer.hasChunkMetadataInDisk); + writer.endFile(); + } finally { + TEST_CHUNK_SIZE = originTestChunkSize; + } + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originTimes); + } + + /** The following tests is for writing aligned series. */ + + /** + * Test writing 10 align series, 6 in a group. + * + * @throws IOException + */ + @Test + public void testWriteCompleteFileWithAlignedSeriesWithOneChunk() throws IOException { + Map>>>> originData = new HashMap<>(); + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 10; ++i) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + List>> valList = new ArrayList<>(); + AlignedChunkWriterImpl chunkWriter = generateVectorData(0L, valList, 6); + for (int j = 1; j <= 6; ++j) { + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent("s" + j, x -> new ArrayList<>()) + .add(valList.get(j - 1)); + } + + chunkWriter.writeToFileWriter(writer); + writer.endChunkGroup(); + writer.checkMetadataSizeAndMayFlush(); + } + writer.endFile(); + Assert.assertTrue(writer.hasChunkMetadataInDisk); + } + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData); + } + + /** + * Test writing 1 aligned series, for each series we write 512 chunks + * + * @throws IOException + */ + @Test + public void testWriteCompleteFileWithAlignedSeriesWithMultiChunks() throws IOException { + Map>>>> originData = new HashMap<>(); + int chunkNum = 512, seriesNum = 6; + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 1; ++i) { + String deviceId = sortedDeviceId.get(i); + for (int k = 0; k < chunkNum; ++k) { + writer.startChunkGroup(deviceId); + List>> valList = new ArrayList<>(); + AlignedChunkWriterImpl chunkWriter = + generateVectorData(k * TEST_CHUNK_SIZE, valList, seriesNum); + for (int j = 1; j <= seriesNum; ++j) { + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent("s" + j, x -> new ArrayList<>()) + .add(valList.get(j - 1)); + } + + chunkWriter.writeToFileWriter(writer); + writer.endChunkGroup(); + } + writer.checkMetadataSizeAndMayFlush(); + } + writer.endFile(); + Assert.assertTrue(writer.hasChunkMetadataInDisk); + } + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData); + } + + /** + * Test write aligned chunk metadata, for each aligned series, we write 1024 components. + * + * @throws IOException + */ + @Test + public void testWriteCompleteFileWithAlignedSeriesWithManyComponents() throws IOException { + Map>>>> originData = new HashMap<>(); + int chunkNum = 5, seriesNum = 1024; + long originTestPointNum = TEST_CHUNK_SIZE; + TEST_CHUNK_SIZE = 10; + try { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 10; ++i) { + String deviceId = sortedDeviceId.get(i); + for (int k = 0; k < chunkNum; ++k) { + writer.startChunkGroup(deviceId); + List>> valList = new ArrayList<>(); + AlignedChunkWriterImpl chunkWriter = + generateVectorData(k * TEST_CHUNK_SIZE, valList, seriesNum); + for (int j = 1; j <= seriesNum; ++j) { + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent("s" + j, x -> new ArrayList<>()) + .add(valList.get(j - 1)); + } + + chunkWriter.writeToFileWriter(writer); + writer.endChunkGroup(); + } + writer.checkMetadataSizeAndMayFlush(); + } + writer.endFile(); + Assert.assertTrue(writer.hasChunkMetadataInDisk); + } + } finally { + TEST_CHUNK_SIZE = originTestPointNum; + } + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData); + } + + @Test + public void testWriteCompleteFileWithLotsAlignedSeries() throws IOException { + Map>>>> originData = new HashMap<>(); + int chunkNum = 5, seriesNum = 12; + long originTestPointNum = TEST_CHUNK_SIZE; + TEST_CHUNK_SIZE = 10; + int deviceNum = 1024; + try { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < deviceNum; ++i) { + String deviceId = sortedDeviceId.get(i); + for (int k = 0; k < chunkNum; ++k) { + writer.startChunkGroup(deviceId); + List>> valList = new ArrayList<>(); + AlignedChunkWriterImpl chunkWriter = + generateVectorData(k * TEST_CHUNK_SIZE, valList, seriesNum); + for (int j = 1; j <= seriesNum; ++j) { + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent("s" + j, x -> new ArrayList<>()) + .add(valList.get(j - 1)); + } + + chunkWriter.writeToFileWriter(writer); + writer.endChunkGroup(); + } + writer.checkMetadataSizeAndMayFlush(); + } + writer.endFile(); + Assert.assertTrue(writer.hasChunkMetadataInDisk); + } + } finally { + TEST_CHUNK_SIZE = originTestPointNum; + } + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData); + } + + @Test + public void testWritingAlignedSeriesByColumnWithMultiComponents() throws IOException { + Map>>>> originValue = new HashMap<>(); + TEST_CHUNK_SIZE = 10; + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 5; i++) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + TSEncoding timeEncoding = + TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()); + TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType(); + Encoder encoder = TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType); + for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) { + TimeChunkWriter timeChunkWriter = + new TimeChunkWriter("", CompressionType.SNAPPY, TSEncoding.PLAIN, encoder); + for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) { + timeChunkWriter.write(j); + } + timeChunkWriter.writeToFileWriter(writer); + } + writer.sortAndFlushChunkMetadata(); + Assert.assertTrue(writer.hasChunkMetadataInDisk); + for (int k = 0; k < 1024; ++k) { + TSEncodingBuilder builder = TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN); + builder.initFromProps(null); + for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) { + ValueChunkWriter chunkWriter = + new ValueChunkWriter( + sortedSeriesId.get(k), + CompressionType.SNAPPY, + TSDataType.DOUBLE, + TSEncoding.PLAIN, + builder.getEncoder(TSDataType.DOUBLE)); + Random random = new Random(); + List> valueList = new ArrayList<>(); + for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) { + double val = random.nextDouble(); + chunkWriter.write(j, val, false); + valueList.add(new Pair<>((long) j, new TsPrimitiveType.TsDouble(val))); + } + chunkWriter.writeToFileWriter(writer); + originValue + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(k), x -> new ArrayList<>()) + .add(valueList); + } + writer.sortAndFlushChunkMetadata(); + } + writer.endChunkGroup(); + } + writer.endFile(); + } + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originValue); + } + + @Test + public void testWritingCompleteMixedFiles() throws IOException { + Map>>>> originData = new HashMap<>(); + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 5; ++i) { + String deviceId = sortedDeviceId.get(i); + for (int k = 0; k < 10; ++k) { + writer.startChunkGroup(deviceId); + List>> valList = new ArrayList<>(); + AlignedChunkWriterImpl chunkWriter = generateVectorData(k * TEST_CHUNK_SIZE, valList, 6); + for (int j = 1; j <= 6; ++j) { + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent("s" + j, x -> new ArrayList<>()) + .add(valList.get(j - 1)); + } + + chunkWriter.writeToFileWriter(writer); + writer.endChunkGroup(); + } + writer.checkMetadataSizeAndMayFlush(); + } + for (int i = 5; i < 10; ++i) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + for (int j = 0; j < 5; ++j) { + ChunkWriterImpl chunkWriter; + switch (j) { + case 0: + for (int k = 0; k < 10; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 1: + for (int k = 0; k < 10; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 2: + for (int k = 0; k < 10; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 3: + for (int k = 0; k < 10; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 4: + default: + for (int k = 0; k < 10; ++k) { + List> valList = new ArrayList<>(); + chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + } + writer.checkMetadataSizeAndMayFlush(); + } + writer.endChunkGroup(); + } + writer.endFile(); + Assert.assertTrue(writer.hasChunkMetadataInDisk); + } + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData); + } + + @Test + public void testWritingAlignedSeriesByColumn() throws IOException { + Map>>>> originValue = new HashMap<>(); + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 5; i++) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + TSEncoding timeEncoding = + TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()); + TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType(); + Encoder encoder = TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType); + TimeChunkWriter timeChunkWriter = + new TimeChunkWriter("", CompressionType.SNAPPY, TSEncoding.PLAIN, encoder); + for (int j = 0; j < TEST_CHUNK_SIZE; ++j) { + timeChunkWriter.write(j); + } + timeChunkWriter.writeToFileWriter(writer); + writer.sortAndFlushChunkMetadata(); + Assert.assertTrue(writer.hasChunkMetadataInDisk); + for (int k = 0; k < 5; ++k) { + TSEncodingBuilder builder = TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN); + builder.initFromProps(null); + ValueChunkWriter chunkWriter = + new ValueChunkWriter( + sortedSeriesId.get(k), + CompressionType.SNAPPY, + TSDataType.DOUBLE, + TSEncoding.PLAIN, + builder.getEncoder(TSDataType.DOUBLE)); + Random random = new Random(); + List> valueList = new ArrayList<>(); + for (int j = 0; j < TEST_CHUNK_SIZE; ++j) { + double val = random.nextDouble(); + chunkWriter.write(j, val, false); + valueList.add(new Pair<>((long) j, new TsPrimitiveType.TsDouble(val))); + } + chunkWriter.writeToFileWriter(writer); + originValue + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(k), x -> new ArrayList<>()) + .add(valueList); + writer.sortAndFlushChunkMetadata(); + } + writer.endChunkGroup(); + } + writer.endFile(); + } + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originValue); + } + + @Test + public void testWritingAlignedSeriesByColumnWithMultiChunks() throws IOException { + Map>>>> originValue = new HashMap<>(); + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 5; i++) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + TSEncoding timeEncoding = + TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()); + TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType(); + Encoder encoder = TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType); + for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) { + TimeChunkWriter timeChunkWriter = + new TimeChunkWriter("", CompressionType.SNAPPY, TSEncoding.PLAIN, encoder); + for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) { + timeChunkWriter.write(j); + } + timeChunkWriter.writeToFileWriter(writer); + } + writer.sortAndFlushChunkMetadata(); + Assert.assertTrue(writer.hasChunkMetadataInDisk); + for (int k = 0; k < 5; ++k) { + TSEncodingBuilder builder = TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN); + builder.initFromProps(null); + for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) { + ValueChunkWriter chunkWriter = + new ValueChunkWriter( + sortedSeriesId.get(k), + CompressionType.SNAPPY, + TSDataType.DOUBLE, + TSEncoding.PLAIN, + builder.getEncoder(TSDataType.DOUBLE)); + Random random = new Random(); + List> valueList = new ArrayList<>(); + for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) { + double val = random.nextDouble(); + chunkWriter.write(j, val, false); + valueList.add(new Pair<>((long) j, new TsPrimitiveType.TsDouble(val))); + } + chunkWriter.writeToFileWriter(writer); + originValue + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(k), x -> new ArrayList<>()) + .add(valueList); + } + writer.sortAndFlushChunkMetadata(); + } + writer.endChunkGroup(); + } + writer.endFile(); + } + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originValue); + } + + /** The following tests is for writing mixed of normal series and aligned series */ + private ChunkWriterImpl generateIntData( + int idx, long startTime, List> record) { + ChunkWriterImpl chunkWriter = + new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.INT64)); + Random random = new Random(); + for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) { + long val = random.nextLong(); + chunkWriter.write(i, val); + record.add(new Pair<>(i, new TsPrimitiveType.TsLong(val))); + } + return chunkWriter; + } + + private ChunkWriterImpl generateFloatData( + int idx, long startTime, List> record) { + ChunkWriterImpl chunkWriter = + new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.FLOAT)); + Random random = new Random(); + for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) { + float val = random.nextFloat(); + chunkWriter.write(i, val); + record.add(new Pair<>(i, new TsPrimitiveType.TsFloat(val))); + } + return chunkWriter; + } + + private ChunkWriterImpl generateDoubleData( + int idx, long startTime, List> record) { + ChunkWriterImpl chunkWriter = + new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.DOUBLE)); + Random random = new Random(); + for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) { + double val = random.nextDouble(); + chunkWriter.write(i, val); + record.add(new Pair<>(i, new TsPrimitiveType.TsDouble(val))); + } + return chunkWriter; + } + + private ChunkWriterImpl generateBooleanData( + int idx, long startTime, List> record) { + ChunkWriterImpl chunkWriter = + new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.BOOLEAN)); + Random random = new Random(); + for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) { + boolean val = random.nextBoolean(); + chunkWriter.write(i, val); + record.add(new Pair<>(i, new TsPrimitiveType.TsBoolean(val))); + } + return chunkWriter; + } + + private AlignedChunkWriterImpl generateVectorData( + long startTime, List>> record, int seriesNum) { + List measurementSchemas = new ArrayList<>(); + TSDataType[] dataTypes = + new TSDataType[] { + TSDataType.INT32, + TSDataType.INT64, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.BOOLEAN, + TSDataType.TEXT + }; + for (int i = 0; i < seriesNum; ++i) { + measurementSchemas.add(new MeasurementSchema("s" + (i + 1), dataTypes[i % 6])); + } + AlignedChunkWriterImpl chunkWriter = new AlignedChunkWriterImpl(measurementSchemas); + Random random = new Random(); + for (int i = 0; i < seriesNum; ++i) { + record.add(new ArrayList<>()); + } + for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) { + TsPrimitiveType[] points = new TsPrimitiveType[seriesNum]; + for (int j = 0; j < seriesNum; ++j) { + switch (j % 6) { + case 0: + points[j] = new TsPrimitiveType.TsInt(random.nextInt()); + break; + case 1: + points[j] = new TsPrimitiveType.TsLong(random.nextLong()); + break; + case 2: + points[j] = new TsPrimitiveType.TsFloat(random.nextFloat()); + break; + case 3: + points[j] = new TsPrimitiveType.TsDouble(random.nextDouble()); + break; + case 4: + points[j] = new TsPrimitiveType.TsBoolean(random.nextBoolean()); + break; + case 5: + points[j] = + new TsPrimitiveType.TsBinary(new Binary(String.valueOf(random.nextDouble()))); + break; + } + } + for (int j = 0; j < seriesNum; ++j) { + record.get(j).add(new Pair<>(i, points[j])); + } + chunkWriter.write(i, points); + } + return chunkWriter; + } + + private ChunkWriterImpl generateTextData( + int idx, long startTime, List> record) { + ChunkWriterImpl chunkWriter = + new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.TEXT)); + Random random = new Random(); + for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) { + Binary val = new Binary(String.valueOf(random.nextDouble())); + chunkWriter.write(i, val); + record.add(new Pair<>(i, new TsPrimitiveType.TsBinary(val))); + } + return chunkWriter; + } +}