From 33d484a62b67dceb633639cf664c0b8238c138cf Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 8 Apr 2026 15:25:37 +0800 Subject: [PATCH 1/7] Pipe: add hot-reloadable AirGap payload size guard to mitigate DoS risk. Introduce a dedicated AirGap receiver payload limit in the pipe config and enforce it before request buffer allocation, so oversized payloads are rejected early and memory pressure is bounded under malicious or malformed inputs. Made-with: Cursor --- .../protocol/airgap/IoTDBAirGapReceiver.java | 23 +++++++++++++++---- .../iotdb/commons/conf/CommonConfig.java | 16 +++++++++++++ .../iotdb/commons/pipe/config/PipeConfig.java | 7 ++++++ .../commons/pipe/config/PipeDescriptor.java | 5 ++++ 4 files changed, 46 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java index 0ff1834e8cd0f..5cc74c61ed9a0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java @@ -86,17 +86,21 @@ public void runMayThrow() throws Throwable { "Pipe air gap receiver {} closed because socket is closed. Socket: {}", receiverId, socket); - } catch (final Exception e) { - LOGGER.warn( - "Pipe air gap receiver {} closed because of exception. Socket: {}", + } catch (final Throwable e) { + LOGGER.error( + "Pipe air gap receiver {} closed because of critical failure. Socket: {}", receiverId, socket, e); - throw e; + if (!socket.isClosed()) { + socket.close(); + } } finally { // session will be closed and removed here PipeDataNodeAgent.receiver().thrift().handleClientExit(); - socket.close(); + if (!socket.isClosed()) { + socket.close(); + } } } @@ -230,6 +234,15 @@ private byte[] readData(final InputStream inputStream) throws IOException { return new byte[0]; } + final int maxLength = PipeConfig.getInstance().getPipeAirGapReceiverMaxPayloadSizeInBytes(); + if (length > maxLength) { + throw new IOException( + String.format( + "Detected potential DoS attack: AirGap payload length (%d) exceeds maximum allowed (%d). " + + "Closing connection from %s", + length, maxLength, socket.getRemoteSocketAddress())); + } + final byte[] resultBuffer = new byte[length]; readTillFull(inputStream, resultBuffer); if (isELanguagePayload) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index ff4a47b6f84ab..c0f599f6fd3f1 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -311,6 +311,7 @@ public class CommonConfig { private double pipeReceiverActualToEstimatedMemoryRatio = 3; private int pipeReceiverReqDecompressedMaxLengthInBytes = 1073741824; // 1GB + private int pipeAirGapReceiverMaxPayloadSizeInBytes = 10 * 1024 * 1024; // 10MB private boolean pipeReceiverLoadConversionEnabled = false; private volatile long pipePeriodicalLogMinIntervalSeconds = 60; private volatile long pipeLoggerCacheMaxSizeInBytes = 16 * MB; @@ -1702,6 +1703,17 @@ public void setPipeReceiverReqDecompressedMaxLengthInBytes( pipeReceiverReqDecompressedMaxLengthInBytes); } + public void setPipeAirGapReceiverMaxPayloadSizeInBytes( + int pipeAirGapReceiverMaxPayloadSizeInBytes) { + if (this.pipeAirGapReceiverMaxPayloadSizeInBytes == pipeAirGapReceiverMaxPayloadSizeInBytes) { + return; + } + this.pipeAirGapReceiverMaxPayloadSizeInBytes = pipeAirGapReceiverMaxPayloadSizeInBytes; + logger.info( + "pipeAirGapReceiverMaxPayloadSizeInBytes is set to {}.", + pipeAirGapReceiverMaxPayloadSizeInBytes); + } + public boolean isPipeReceiverLoadConversionEnabled() { return pipeReceiverLoadConversionEnabled; } @@ -1743,6 +1755,10 @@ public int getPipeReceiverReqDecompressedMaxLengthInBytes() { return pipeReceiverReqDecompressedMaxLengthInBytes; } + public int getPipeAirGapReceiverMaxPayloadSizeInBytes() { + return pipeAirGapReceiverMaxPayloadSizeInBytes; + } + public double getPipeMetaReportMaxLogNumPerRound() { return pipeMetaReportMaxLogNumPerRound; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index ebe953767acce..e2c94ac3cd162 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -361,6 +361,10 @@ public int getPipeReceiverReqDecompressedMaxLengthInBytes() { return COMMON_CONFIG.getPipeReceiverReqDecompressedMaxLengthInBytes(); } + public int getPipeAirGapReceiverMaxPayloadSizeInBytes() { + return COMMON_CONFIG.getPipeAirGapReceiverMaxPayloadSizeInBytes(); + } + public boolean isPipeReceiverLoadConversionEnabled() { return COMMON_CONFIG.isPipeReceiverLoadConversionEnabled(); } @@ -627,6 +631,9 @@ public void printAllConfigs() { LOGGER.info( "PipeReceiverReqDecompressedMaxLengthInBytes: {}", getPipeReceiverReqDecompressedMaxLengthInBytes()); + LOGGER.info( + "PipeAirGapReceiverMaxPayloadSizeInBytes: {}", + getPipeAirGapReceiverMaxPayloadSizeInBytes()); LOGGER.info("PipeReceiverLoadConversionEnabled: {}", isPipeReceiverLoadConversionEnabled()); LOGGER.info( "PipePeriodicalLogMinIntervalSeconds: {}", getPipePeriodicalLogMinIntervalSeconds()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java index 2135bac00127c..8dcbce35094e8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java @@ -464,6 +464,11 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr properties.getProperty( "pipe_receiver_req_decompressed_max_length_in_bytes", String.valueOf(config.getPipeReceiverReqDecompressedMaxLengthInBytes())))); + config.setPipeAirGapReceiverMaxPayloadSizeInBytes( + Integer.parseInt( + properties.getProperty( + "pipe_air_gap_receiver_max_payload_size_in_bytes", + String.valueOf(config.getPipeAirGapReceiverMaxPayloadSizeInBytes())))); config.setPipeReceiverLoadConversionEnabled( Boolean.parseBoolean( properties.getProperty( From 689cd78d24516ddb079320628ed6d13be212f362 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 8 Apr 2026 15:29:50 +0800 Subject: [PATCH 2/7] update --- .../java/org/apache/iotdb/commons/conf/CommonConfig.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index c0f599f6fd3f1..55f19a576c3dc 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -1705,6 +1705,12 @@ public void setPipeReceiverReqDecompressedMaxLengthInBytes( public void setPipeAirGapReceiverMaxPayloadSizeInBytes( int pipeAirGapReceiverMaxPayloadSizeInBytes) { + if (pipeAirGapReceiverMaxPayloadSizeInBytes <= 0) { + logger.info( + "Ignore invalid pipeAirGapReceiverMaxPayloadSizeInBytes {}, because it must be greater than 0.", + pipeAirGapReceiverMaxPayloadSizeInBytes); + return; + } if (this.pipeAirGapReceiverMaxPayloadSizeInBytes == pipeAirGapReceiverMaxPayloadSizeInBytes) { return; } From 280b679aa82451ef9d418adb7318fec055b54460 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 8 Apr 2026 15:57:49 +0800 Subject: [PATCH 3/7] update --- .../protocol/airgap/IoTDBAirGapReceiver.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java index 5cc74c61ed9a0..10b1b293af832 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java @@ -251,11 +251,16 @@ private byte[] readData(final InputStream inputStream) throws IOException { return resultBuffer; } + private int readLength(final InputStream inputStream) throws IOException { + return readLength(inputStream, false); + } + /** * Read the length of the following data. The thread may typically block here when there is no * data to read. */ - private int readLength(final InputStream inputStream) throws IOException { + private int readLength(final InputStream inputStream, final boolean isELanguage) + throws IOException { final byte[] doubleIntLengthBytes = new byte[2 * INT_LEN]; readTillFull(inputStream, doubleIntLengthBytes); @@ -264,10 +269,16 @@ private int readLength(final InputStream inputStream) throws IOException { if (Arrays.equals( doubleIntLengthBytes, BytesUtils.subBytes(AirGapELanguageConstant.E_LANGUAGE_PREFIX, 0, 2 * INT_LEN))) { + if (isELanguage) { + throw new IOException( + String.format( + "Detected suspicious nested E-Language prefix. Closing connection from %s", + socket.getRemoteSocketAddress())); + } isELanguagePayload = true; skipTillEnough( inputStream, (long) AirGapELanguageConstant.E_LANGUAGE_PREFIX.length - 2 * INT_LEN); - return readLength(inputStream); + return readLength(inputStream, true); } final byte[] dataLengthBytes = BytesUtils.subBytes(doubleIntLengthBytes, 0, INT_LEN); From 50045c6e8a4f08cfc52631f01083ff416510fd7b Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 8 Apr 2026 17:14:37 +0800 Subject: [PATCH 4/7] update --- .../java/org/apache/iotdb/commons/conf/CommonConfig.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 55f19a576c3dc..4b754e0b135f9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -311,7 +311,11 @@ public class CommonConfig { private double pipeReceiverActualToEstimatedMemoryRatio = 3; private int pipeReceiverReqDecompressedMaxLengthInBytes = 1073741824; // 1GB - private int pipeAirGapReceiverMaxPayloadSizeInBytes = 10 * 1024 * 1024; // 10MB + // Align with default thrift frame size calculation. + private int pipeAirGapReceiverMaxPayloadSizeInBytes = + Math.min( + 64 * 1024 * 1024, + (int) Math.min(Runtime.getRuntime().maxMemory() / 64, Integer.MAX_VALUE)); private boolean pipeReceiverLoadConversionEnabled = false; private volatile long pipePeriodicalLogMinIntervalSeconds = 60; private volatile long pipeLoggerCacheMaxSizeInBytes = 16 * MB; From b94a1c8d46b41da681982d978c0c47d5a4df0960 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 8 Apr 2026 17:24:08 +0800 Subject: [PATCH 5/7] update --- .../protocol/airgap/IoTDBAirGapReceiver.java | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java index 10b1b293af832..54844c136d38d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java @@ -86,21 +86,17 @@ public void runMayThrow() throws Throwable { "Pipe air gap receiver {} closed because socket is closed. Socket: {}", receiverId, socket); - } catch (final Throwable e) { - LOGGER.error( - "Pipe air gap receiver {} closed because of critical failure. Socket: {}", + } catch (final Exception e) { + LOGGER.warn( + "Pipe air gap receiver {} closed because of exception. Socket: {}", receiverId, socket, e); - if (!socket.isClosed()) { - socket.close(); - } + throw e; } finally { // session will be closed and removed here PipeDataNodeAgent.receiver().thrift().handleClientExit(); - if (!socket.isClosed()) { - socket.close(); - } + socket.close(); } } From 80501fa4937cac59c50e996a7735f9a428dea09e Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 8 Apr 2026 17:44:43 +0800 Subject: [PATCH 6/7] update --- .../protocol/airgap/IoTDBAirGapReceiver.java | 5 +- .../airgap/IoTDBAirGapReceiverTest.java | 71 +++++++++++++++++++ 2 files changed, 73 insertions(+), 3 deletions(-) create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java index 54844c136d38d..8658d12b6a89f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java @@ -222,7 +222,7 @@ private boolean checkSum(byte[] bytes) { } } - private byte[] readData(final InputStream inputStream) throws IOException { + byte[] readData(final InputStream inputStream) throws IOException { final int length = readLength(inputStream); if (length <= 0) { @@ -234,8 +234,7 @@ private byte[] readData(final InputStream inputStream) throws IOException { if (length > maxLength) { throw new IOException( String.format( - "Detected potential DoS attack: AirGap payload length (%d) exceeds maximum allowed (%d). " - + "Closing connection from %s", + "AirGap payload length (%d) exceeds maximum allowed (%d). Closing connection from %s", length, maxLength, socket.getRemoteSocketAddress())); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java new file mode 100644 index 0000000000000..c6ede12601d6c --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.receiver.protocol.airgap; + +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapELanguageConstant; + +import org.apache.tsfile.utils.BytesUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.Socket; + +public class IoTDBAirGapReceiverTest { + + @Test + public void testRejectOversizedAirGapPayload() throws Exception { + final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); + final int originalMaxPayload = commonConfig.getPipeAirGapReceiverMaxPayloadSizeInBytes(); + + try { + commonConfig.setPipeAirGapReceiverMaxPayloadSizeInBytes(16); + final IoTDBAirGapReceiver receiver = new IoTDBAirGapReceiver(new Socket(), 1L); + + final byte[] oversizedLength = BytesUtils.intToBytes(32); + final InputStream inputStream = + new ByteArrayInputStream(BytesUtils.concatByteArray(oversizedLength, oversizedLength)); + + final IOException exception = + Assert.assertThrows(IOException.class, () -> receiver.readData(inputStream)); + Assert.assertTrue(exception.getMessage().contains("payload length")); + } finally { + commonConfig.setPipeAirGapReceiverMaxPayloadSizeInBytes(originalMaxPayload); + } + } + + @Test + public void testRejectNestedELanguagePrefix() throws Exception { + final IoTDBAirGapReceiver receiver = new IoTDBAirGapReceiver(new Socket(), 2L); + + final InputStream inputStream = + new ByteArrayInputStream( + BytesUtils.concatByteArray( + AirGapELanguageConstant.E_LANGUAGE_PREFIX, AirGapELanguageConstant.E_LANGUAGE_PREFIX)); + + final IOException exception = + Assert.assertThrows(IOException.class, () -> receiver.readData(inputStream)); + Assert.assertTrue(exception.getMessage().contains("nested E-Language prefix")); + } +} From d78fed77ab209173988378aa76f2b7f1e2af357d Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 8 Apr 2026 18:39:16 +0800 Subject: [PATCH 7/7] spotless --- .../pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java index c6ede12601d6c..19dea8140a190 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java @@ -62,7 +62,8 @@ public void testRejectNestedELanguagePrefix() throws Exception { final InputStream inputStream = new ByteArrayInputStream( BytesUtils.concatByteArray( - AirGapELanguageConstant.E_LANGUAGE_PREFIX, AirGapELanguageConstant.E_LANGUAGE_PREFIX)); + AirGapELanguageConstant.E_LANGUAGE_PREFIX, + AirGapELanguageConstant.E_LANGUAGE_PREFIX)); final IOException exception = Assert.assertThrows(IOException.class, () -> receiver.readData(inputStream));