diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java index 0ff1834e8cd0f..8658d12b6a89f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java @@ -222,7 +222,7 @@ private boolean checkSum(byte[] bytes) { } } - private byte[] readData(final InputStream inputStream) throws IOException { + byte[] readData(final InputStream inputStream) throws IOException { final int length = readLength(inputStream); if (length <= 0) { @@ -230,6 +230,14 @@ private byte[] readData(final InputStream inputStream) throws IOException { return new byte[0]; } + final int maxLength = PipeConfig.getInstance().getPipeAirGapReceiverMaxPayloadSizeInBytes(); + if (length > maxLength) { + throw new IOException( + String.format( + "AirGap payload length (%d) exceeds maximum allowed (%d). Closing connection from %s", + length, maxLength, socket.getRemoteSocketAddress())); + } + final byte[] resultBuffer = new byte[length]; readTillFull(inputStream, resultBuffer); if (isELanguagePayload) { @@ -238,11 +246,16 @@ private byte[] readData(final InputStream inputStream) throws IOException { return resultBuffer; } + private int readLength(final InputStream inputStream) throws IOException { + return readLength(inputStream, false); + } + /** * Read the length of the following data. The thread may typically block here when there is no * data to read. */ - private int readLength(final InputStream inputStream) throws IOException { + private int readLength(final InputStream inputStream, final boolean isELanguage) + throws IOException { final byte[] doubleIntLengthBytes = new byte[2 * INT_LEN]; readTillFull(inputStream, doubleIntLengthBytes); @@ -251,10 +264,16 @@ private int readLength(final InputStream inputStream) throws IOException { if (Arrays.equals( doubleIntLengthBytes, BytesUtils.subBytes(AirGapELanguageConstant.E_LANGUAGE_PREFIX, 0, 2 * INT_LEN))) { + if (isELanguage) { + throw new IOException( + String.format( + "Detected suspicious nested E-Language prefix. Closing connection from %s", + socket.getRemoteSocketAddress())); + } isELanguagePayload = true; skipTillEnough( inputStream, (long) AirGapELanguageConstant.E_LANGUAGE_PREFIX.length - 2 * INT_LEN); - return readLength(inputStream); + return readLength(inputStream, true); } final byte[] dataLengthBytes = BytesUtils.subBytes(doubleIntLengthBytes, 0, INT_LEN); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java new file mode 100644 index 0000000000000..19dea8140a190 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.receiver.protocol.airgap; + +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapELanguageConstant; + +import org.apache.tsfile.utils.BytesUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.Socket; + +public class IoTDBAirGapReceiverTest { + + @Test + public void testRejectOversizedAirGapPayload() throws Exception { + final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); + final int originalMaxPayload = commonConfig.getPipeAirGapReceiverMaxPayloadSizeInBytes(); + + try { + commonConfig.setPipeAirGapReceiverMaxPayloadSizeInBytes(16); + final IoTDBAirGapReceiver receiver = new IoTDBAirGapReceiver(new Socket(), 1L); + + final byte[] oversizedLength = BytesUtils.intToBytes(32); + final InputStream inputStream = + new ByteArrayInputStream(BytesUtils.concatByteArray(oversizedLength, oversizedLength)); + + final IOException exception = + Assert.assertThrows(IOException.class, () -> receiver.readData(inputStream)); + Assert.assertTrue(exception.getMessage().contains("payload length")); + } finally { + commonConfig.setPipeAirGapReceiverMaxPayloadSizeInBytes(originalMaxPayload); + } + } + + @Test + public void testRejectNestedELanguagePrefix() throws Exception { + final IoTDBAirGapReceiver receiver = new IoTDBAirGapReceiver(new Socket(), 2L); + + final InputStream inputStream = + new ByteArrayInputStream( + BytesUtils.concatByteArray( + AirGapELanguageConstant.E_LANGUAGE_PREFIX, + AirGapELanguageConstant.E_LANGUAGE_PREFIX)); + + final IOException exception = + Assert.assertThrows(IOException.class, () -> receiver.readData(inputStream)); + Assert.assertTrue(exception.getMessage().contains("nested E-Language prefix")); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index ff4a47b6f84ab..4b754e0b135f9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -311,6 +311,11 @@ public class CommonConfig { private double pipeReceiverActualToEstimatedMemoryRatio = 3; private int pipeReceiverReqDecompressedMaxLengthInBytes = 1073741824; // 1GB + // Align with default thrift frame size calculation. + private int pipeAirGapReceiverMaxPayloadSizeInBytes = + Math.min( + 64 * 1024 * 1024, + (int) Math.min(Runtime.getRuntime().maxMemory() / 64, Integer.MAX_VALUE)); private boolean pipeReceiverLoadConversionEnabled = false; private volatile long pipePeriodicalLogMinIntervalSeconds = 60; private volatile long pipeLoggerCacheMaxSizeInBytes = 16 * MB; @@ -1702,6 +1707,23 @@ public void setPipeReceiverReqDecompressedMaxLengthInBytes( pipeReceiverReqDecompressedMaxLengthInBytes); } + public void setPipeAirGapReceiverMaxPayloadSizeInBytes( + int pipeAirGapReceiverMaxPayloadSizeInBytes) { + if (pipeAirGapReceiverMaxPayloadSizeInBytes <= 0) { + logger.info( + "Ignore invalid pipeAirGapReceiverMaxPayloadSizeInBytes {}, because it must be greater than 0.", + pipeAirGapReceiverMaxPayloadSizeInBytes); + return; + } + if (this.pipeAirGapReceiverMaxPayloadSizeInBytes == pipeAirGapReceiverMaxPayloadSizeInBytes) { + return; + } + this.pipeAirGapReceiverMaxPayloadSizeInBytes = pipeAirGapReceiverMaxPayloadSizeInBytes; + logger.info( + "pipeAirGapReceiverMaxPayloadSizeInBytes is set to {}.", + pipeAirGapReceiverMaxPayloadSizeInBytes); + } + public boolean isPipeReceiverLoadConversionEnabled() { return pipeReceiverLoadConversionEnabled; } @@ -1743,6 +1765,10 @@ public int getPipeReceiverReqDecompressedMaxLengthInBytes() { return pipeReceiverReqDecompressedMaxLengthInBytes; } + public int getPipeAirGapReceiverMaxPayloadSizeInBytes() { + return pipeAirGapReceiverMaxPayloadSizeInBytes; + } + public double getPipeMetaReportMaxLogNumPerRound() { return pipeMetaReportMaxLogNumPerRound; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index ebe953767acce..e2c94ac3cd162 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -361,6 +361,10 @@ public int getPipeReceiverReqDecompressedMaxLengthInBytes() { return COMMON_CONFIG.getPipeReceiverReqDecompressedMaxLengthInBytes(); } + public int getPipeAirGapReceiverMaxPayloadSizeInBytes() { + return COMMON_CONFIG.getPipeAirGapReceiverMaxPayloadSizeInBytes(); + } + public boolean isPipeReceiverLoadConversionEnabled() { return COMMON_CONFIG.isPipeReceiverLoadConversionEnabled(); } @@ -627,6 +631,9 @@ public void printAllConfigs() { LOGGER.info( "PipeReceiverReqDecompressedMaxLengthInBytes: {}", getPipeReceiverReqDecompressedMaxLengthInBytes()); + LOGGER.info( + "PipeAirGapReceiverMaxPayloadSizeInBytes: {}", + getPipeAirGapReceiverMaxPayloadSizeInBytes()); LOGGER.info("PipeReceiverLoadConversionEnabled: {}", isPipeReceiverLoadConversionEnabled()); LOGGER.info( "PipePeriodicalLogMinIntervalSeconds: {}", getPipePeriodicalLogMinIntervalSeconds()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java index 2135bac00127c..8dcbce35094e8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java @@ -464,6 +464,11 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr properties.getProperty( "pipe_receiver_req_decompressed_max_length_in_bytes", String.valueOf(config.getPipeReceiverReqDecompressedMaxLengthInBytes())))); + config.setPipeAirGapReceiverMaxPayloadSizeInBytes( + Integer.parseInt( + properties.getProperty( + "pipe_air_gap_receiver_max_payload_size_in_bytes", + String.valueOf(config.getPipeAirGapReceiverMaxPayloadSizeInBytes())))); config.setPipeReceiverLoadConversionEnabled( Boolean.parseBoolean( properties.getProperty(