Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.ConsensusFactory;
Expand Down Expand Up @@ -182,7 +183,7 @@
/** The proportion of memtable memory for WAL queue */
private double walBufferQueueProportion = 0.1;

/** The proportion of memtable memory for device path cache */

Check warning on line 186 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

First sentence of Javadoc is missing an ending period.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ11LywS_TZDx_O-UxA5&open=AZ11LywS_TZDx_O-UxA5&pullRequest=17451
private double devicePathCacheProportion = 0.05;

/**
Expand Down Expand Up @@ -3388,6 +3389,15 @@
this.partitionCacheSize = partitionCacheSize;
}

public int getPipeDataStructureTabletSizeInBytes() {
int size = PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes();
if (size > thriftMaxFrameSize) {
size = (int) (thriftMaxFrameSize * 0.8);
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletSizeInBytes(size);
}
return size;
}

public int getAuthorCacheSize() {
return authorCacheSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

import org.apache.iotdb.commons.path.PatternTreeMap;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
Expand Down Expand Up @@ -92,7 +92,7 @@ protected TsFileInsertionDataContainer(
this.allocatedMemoryBlockForTablet =
PipeDataNodeResourceManager.memory()
.forceAllocateForTabletWithRetry(
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
import org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
Expand Down Expand Up @@ -124,7 +125,7 @@ public TsFileInsertionScanDataContainer(
this.allocatedMemoryBlockForBatchData =
PipeDataNodeResourceManager.memory()
.forceAllocateForTabletWithRetry(
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes());
this.allocatedMemoryBlockForChunk =
PipeDataNodeResourceManager.memory()
.forceAllocateForTabletWithRetry(PipeConfig.getInstance().getPipeMaxReaderChunkSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.resource.memory;

import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
import org.apache.iotdb.db.utils.MemUtils;

Expand All @@ -33,10 +34,10 @@
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.TsPrimitiveType;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;

import java.util.List;
import java.util.Map;
import java.util.Objects;

public class PipeMemoryWeightUtil {

Expand Down Expand Up @@ -107,7 +108,10 @@ public static Pair<Integer, Integer> calculateTabletRowCountAndMemory(RowRecord
}
}

return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes, schemaCount);
return calculateTabletRowCountAndMemoryBySize(
totalSizeInBytes,
schemaCount,
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
}

/**
Expand Down Expand Up @@ -152,7 +156,8 @@ public static Pair<Integer, Integer> calculateTabletRowCountAndMemory(BatchData
}
}

return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes, schemaCount);
return calculateTabletRowCountAndMemoryBySize(
totalSizeInBytes, schemaCount, batchData.length());
}

/**
Expand All @@ -162,90 +167,43 @@ public static Pair<Integer, Integer> calculateTabletRowCountAndMemory(BatchData
* @return left is the row count of tablet, right is the memory cost of tablet in bytes
*/
public static Pair<Integer, Integer> calculateTabletRowCountAndMemory(PipeRow row) {
return calculateTabletRowCountAndMemoryBySize(row.getCurrentRowSize(), row.size());
return calculateTabletRowCountAndMemoryBySize(
row.getCurrentRowSize(),
row.size(),
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
}

private static Pair<Integer, Integer> calculateTabletRowCountAndMemoryBySize(
int rowSize, int schemaCount) {
if (rowSize <= 0) {
int rowBytesUsed, int schemaCount, int inputNum) {
if (rowBytesUsed <= 0) {
return new Pair<>(1, 0);
}

// Calculate row number according to the max size of a pipe tablet.
// "-100" is the estimated size of other data structures in a pipe tablet.
// "*8" converts bytes to bits, because the bitmap size is 1 bit per schema.
int rowNumber =
8
* (PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes() - 100)
/ (8 * rowSize + schemaCount);
// Here we estimate the max use of
int sizeLimit =
Math.min(
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(),
(int) (inputNum * rowBytesUsed * 1.2));

int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount);
rowNumber = Math.max(1, rowNumber);

if ( // This means the row number is larger than the max row count of a pipe tablet
rowNumber > PipeConfig.getInstance().getPipeDataStructureTabletRowSize()) {
// Bound the row number, the memory cost is rowSize * rowNumber
return new Pair<>(
PipeConfig.getInstance().getPipeDataStructureTabletRowSize(),
rowSize * PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
rowBytesUsed * PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
} else {
return new Pair<>(
rowNumber, PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
return new Pair<>(rowNumber, sizeLimit);
}
}

public static long calculateTabletSizeInBytes(Tablet tablet) {
long totalSizeInBytes = 0;

if (tablet == null) {
return totalSizeInBytes;
}

// timestamps
if (tablet.timestamps != null) {
totalSizeInBytes += tablet.timestamps.length * 8L;
}

// values
final List<MeasurementSchema> timeseries = tablet.getSchemas();
if (timeseries != null) {
for (int column = 0; column < timeseries.size(); column++) {
final MeasurementSchema measurementSchema = timeseries.get(column);
if (measurementSchema == null) {
continue;
}

final TSDataType tsDataType = measurementSchema.getType();
if (tsDataType == null) {
continue;
}

if (tsDataType.isBinary()) {
if (tablet.values == null || tablet.values.length <= column) {
continue;
}
final Binary[] values = ((Binary[]) tablet.values[column]);
if (values == null) {
continue;
}
for (Binary value : values) {
totalSizeInBytes += value == null ? 8 : value.ramBytesUsed();
}
} else {
totalSizeInBytes += (long) tablet.getMaxRowNumber() * tsDataType.getDataTypeSize();
}
}
}

// bitMaps
if (tablet.bitMaps != null) {
for (int i = 0; i < tablet.bitMaps.length; i++) {
totalSizeInBytes += tablet.bitMaps[i] == null ? 0 : tablet.bitMaps[i].getSize();
}
}

// estimate other dataStructures size
totalSizeInBytes += 100;

return totalSizeInBytes;
public static long calculateTabletSizeInBytes(final Tablet tablet) {
return Objects.nonNull(tablet) ? tablet.ramBytesUsed() : 0L;
}

public static int calculateBatchDataRamBytesUsed(BatchData batchData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.iotdb.db.pipe.event;

import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern;
Expand Down Expand Up @@ -47,6 +49,7 @@
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -78,9 +81,19 @@
private File alignedTsFile;
private File nonalignedTsFile;
private TsFileResource resource;
private boolean isPipeMemoryManagementEnabled;

@Before
public void setUp() throws Exception {

Check warning on line 87 in iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the declaration of thrown exception 'java.lang.Exception', as it cannot be thrown from method's body.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ1xFyOo-0BjvfN9P-wv&open=AZ1xFyOo-0BjvfN9P-wv&pullRequest=17451
isPipeMemoryManagementEnabled = PipeConfig.getInstance().getPipeMemoryManagementEnabled();
CommonDescriptor.getInstance().getConfig().setPipeMemoryManagementEnabled(false);
}

@After
public void tearDown() throws Exception {
CommonDescriptor.getInstance()
.getConfig()
.setPipeMemoryManagementEnabled(isPipeMemoryManagementEnabled);
if (alignedTsFile != null) {
alignedTsFile.delete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,9 @@ public class CommonConfig {
private boolean pipeRetryLocallyForParallelOrUserConflict = true;

private int pipeDataStructureTabletRowSize = 2048;
private int pipeDataStructureTabletSizeInBytes = 2097152;

// 128MB
private int pipeDataStructureTabletSizeInBytes = 60 * 1024 * 1024;
private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 0.3;
private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold = 0.3;
private volatile double pipeTotalFloatingMemoryProportion = 0.5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,11 @@ protected TPipeTransferResp handleTransferHandshakeV1(final PipeTransferHandshak
} catch (Exception e) {
PipeLogger.log(
LOGGER::warn,
e,
"Receiver id = %s: Failed to delete original receiver file dir %s, because %s.",
receiverId.get(),
receiverFileDirWithIdSuffix.get().getPath(),
e.getMessage(),
e);
e.getMessage());
}
} else {
if (LOGGER.isDebugEnabled()) {
Expand Down Expand Up @@ -487,11 +487,11 @@ private void closeCurrentWritingFileWriter(final boolean fsyncAfterClose) {
} catch (final Exception e) {
PipeLogger.log(
LOGGER::warn,
e,
"Receiver id = %s: Failed to close current writing file writer %s, because %s.",
receiverId.get(),
writingFile == null ? "null" : writingFile.getPath(),
e.getMessage(),
e);
e.getMessage());
}
writingFileWriter = null;
} else {
Expand Down Expand Up @@ -526,11 +526,11 @@ private void deleteFile(final File file) {
} catch (final Exception e) {
PipeLogger.log(
LOGGER::warn,
e,
"Receiver id = %s: Failed to delete original writing file %s, because %s.",
receiverId.get(),
file.getPath(),
e.getMessage(),
e);
e.getMessage());
}
} else {
if (LOGGER.isDebugEnabled()) {
Expand Down Expand Up @@ -611,11 +611,11 @@ protected final TPipeTransferResp handleTransferFileSealV1(final PipeTransferFil
} catch (final Exception e) {
PipeLogger.log(
LOGGER::warn,
e,
"Receiver id = %s: Failed to seal file %s from req %s.",
receiverId.get(),
writingFile,
req,
e);
req);
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
Expand Down Expand Up @@ -698,11 +698,11 @@ protected final TPipeTransferResp handleTransferFileSealV2(final PipeTransferFil
} catch (final Exception e) {
PipeLogger.log(
LOGGER::warn,
e,
"Receiver id = %s: Failed to seal file %s from req %s.",
receiverId.get(),
files,
req,
e);
req);
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
Expand Down
Loading