From 232c5d70c7ffa9b808951991f4a500ee874ae15a Mon Sep 17 00:00:00 2001 From: Bolin Lin Date: Mon, 4 May 2026 22:57:30 -0400 Subject: [PATCH 1/8] feat: add GrpcConnectionLimitFilter --- .../server/GrpcConnectionLimitFilter.java | 109 ++++++++++++++++++ 1 file changed, 109 insertions(+) create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcConnectionLimitFilter.java diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcConnectionLimitFilter.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcConnectionLimitFilter.java new file mode 100644 index 000000000000..6a545111262e --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcConnectionLimitFilter.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ratis.thirdparty.io.grpc.Attributes; +import org.apache.ratis.thirdparty.io.grpc.ServerTransportFilter; +import org.apache.ratis.thirdparty.io.grpc.Status; +import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A gRPC ServerTransportFilter that limits the number of concurrent + * connections to the Datanode gRPC server. + * + *

When the connection limit is reached, new connections are rejected + * with a RESOURCE_EXHAUSTED status, preventing file descriptor exhaustion. + */ +public class GrpcConnectionLimitFilter extends ServerTransportFilter { + + private static final Logger LOG = + LoggerFactory.getLogger(GrpcConnectionLimitFilter.class); + + private final int maxConnections; + private final AtomicInteger activeConnections = new AtomicInteger(0); + private final AtomicLong totalAcceptedConnections = new AtomicLong(0); + private final AtomicLong totalRejectedConnections = new AtomicLong(0); + + /** + * Creates a new connection limit filter. + * + * @param maxConnections the maximum number of concurrent connections allowed. + * If <= 0, no limit is enforced. + */ + public GrpcConnectionLimitFilter(int maxConnections) { + this.maxConnections = maxConnections; + } + + @Override + public Attributes transportReady(Attributes transportAttrs) { + if (maxConnections <= 0) { + return super.transportReady(transportAttrs); + } + + int current = activeConnections.incrementAndGet(); + if (current > maxConnections) { + activeConnections.decrementAndGet(); + totalRejectedConnections.incrementAndGet(); + LOG.warn("Connection rejected: limit {} reached, current active: {}", + maxConnections, current - 1); + throw new StatusRuntimeException( + Status.RESOURCE_EXHAUSTED.withDescription( + "Datanode connection limit exceeded: " + maxConnections)); + } + + totalAcceptedConnections.incrementAndGet(); + if (LOG.isDebugEnabled()) { + LOG.debug("Connection accepted: active connections: {}/{}", + current, maxConnections); + } + return super.transportReady(transportAttrs); + } + + @Override + public void transportTerminated(Attributes transportAttrs) { + if (maxConnections > 0) { + int current = activeConnections.decrementAndGet(); + if (LOG.isDebugEnabled()) { + LOG.debug("Connection terminated: active connections: {}/{}", + current, maxConnections); + } + } + super.transportTerminated(transportAttrs); + } + + + public int getActiveConnections() { + return activeConnections.get(); + } + + public long getTotalAcceptedConnections() { + return totalAcceptedConnections.get(); + } + + public long getTotalRejectedConnections() { + return totalRejectedConnections.get(); + } + + public int getMaxConnections() { + return maxConnections; + } +} From d2493f8c9bd186fe737d04aa532708fb42c9c9db Mon Sep 17 00:00:00 2001 From: Bolin Lin Date: Mon, 4 May 2026 23:11:34 -0400 Subject: [PATCH 2/8] feat: add max connection in data node configuration --- .../statemachine/DatanodeConfiguration.java | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java index 919777f35191..0f297ec8ef13 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java @@ -139,6 +139,9 @@ public class DatanodeConfiguration extends ReconfigurableConfig { static final int CONTAINER_CLOSE_THREADS_DEFAULT = 3; static final int BLOCK_DELETE_THREADS_DEFAULT = 5; + public static final String GRPC_MAX_CONNECTIONS_KEY = "hdds.datanode.grpc.max.connections"; + public static final int GRPC_MAX_CONNECTIONS_DEFAULT = 5000; + public static final String BLOCK_DELETE_COMMAND_WORKER_INTERVAL = "hdds.datanode.block.delete.command.worker.interval"; public static final Duration BLOCK_DELETE_COMMAND_WORKER_INTERVAL_DEFAULT = Duration.ofSeconds(2); @@ -154,6 +157,19 @@ public class DatanodeConfiguration extends ReconfigurableConfig { ) private int numReadThreadPerVolume = 10; + /** + * Maximum number of concurrent gRPC connections to the Datanode server. + */ + @Config(key = "hdds.datanode.grpc.max.connections", + type = ConfigType.INT, + defaultValue = "5000", + tags = {DATANODE}, + description = "Maximum number of concurrent gRPC connections to the " + + "Datanode server. This helps prevent file descriptor exhaustion. " + + "Set to 0 to disable the limit." + ) + private int grpcMaxConnections = GRPC_MAX_CONNECTIONS_DEFAULT; + /** * The maximum number of threads used to delete containers on a datanode * simultaneously. @@ -1212,4 +1228,12 @@ static long getDefaultFreeSpace() { final StorageSize measure = StorageSize.parse(HDDS_DATANODE_VOLUME_MIN_FREE_SPACE_DEFAULT); return Math.round(measure.getUnit().toBytes(measure.getValue())); } + + public int getGrpcMaxConnections() { + return grpcMaxConnections; + } + + public void setGrpcMaxConnections(int grpcMaxConnections) { + this.grpcMaxConnections = grpcMaxConnections; + } } From e055be5176a8b853c8175476776d26c26d7ae5d9 Mon Sep 17 00:00:00 2001 From: Bolin Lin Date: Mon, 4 May 2026 23:19:40 -0400 Subject: [PATCH 3/8] feat: add connection filter in XceiverServerGrpc --- .../common/transport/server/XceiverServerGrpc.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java index 7521b460467c..f2e5b51c3077 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java @@ -76,6 +76,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi { private DatanodeDetails datanodeDetails; private ThreadPoolExecutor readExecutors; private EventLoopGroup eventLoopGroup; + private GrpcConnectionLimitFilter connectionLimitFilter; /** * Constructs a Grpc server class. @@ -97,11 +98,15 @@ public XceiverServerGrpc(DatanodeDetails datanodeDetails, this.port = 0; } - final int threadCountPerDisk = - conf.getObject(DatanodeConfiguration.class).getNumReadThreadPerVolume(); + DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class); + final int threadCountPerDisk = dnConf.getNumReadThreadPerVolume(); final int numberOfDisks = HddsServerUtil.getDatanodeStorageDirs(conf).size(); final int poolSize = threadCountPerDisk * numberOfDisks; + final int maxConnections = dnConf.getGrpcMaxConnections(); + connectionLimitFilter = new GrpcConnectionLimitFilter(maxConnections); + LOG.info("Datanode gRPC server max connections: {}", + maxConnections > 0 ? maxConnections : "unlimited"); readExecutors = new ThreadPoolExecutor(poolSize, poolSize, 60, TimeUnit.SECONDS, @@ -136,7 +141,8 @@ public XceiverServerGrpc(DatanodeDetails datanodeDetails, .executor(readExecutors) .addService(ServerInterceptors.intercept( xceiverService.bindServiceWithZeroCopy(), - new GrpcServerInterceptor())); + new GrpcServerInterceptor())) + .addTransportFilter(connectionLimitFilter); SecurityConfig secConf = new SecurityConfig(conf); if (secConf.isSecurityEnabled() && secConf.isGrpcTlsEnabled()) { From 5c58adb0e0c5a02152a640255f25febfd42c494c Mon Sep 17 00:00:00 2001 From: Bolin Lin Date: Mon, 4 May 2026 23:28:02 -0400 Subject: [PATCH 4/8] test: data node configuration and connection limit filter --- .../server/GrpcConnectionLimitFilter.java | 1 - .../TestDatanodeConfiguration.java | 42 ++++ .../server/TestGrpcConnectionLimitFilter.java | 192 ++++++++++++++++++ 3 files changed, 234 insertions(+), 1 deletion(-) create mode 100644 hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/TestGrpcConnectionLimitFilter.java diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcConnectionLimitFilter.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcConnectionLimitFilter.java index 6a545111262e..8775de30181a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcConnectionLimitFilter.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcConnectionLimitFilter.java @@ -90,7 +90,6 @@ public void transportTerminated(Attributes transportAttrs) { super.transportTerminated(transportAttrs); } - public int getActiveConnections() { return activeConnections.get(); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java index 5012526782aa..fe113bcc652d 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java @@ -29,6 +29,8 @@ import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.FAILED_DB_VOLUMES_TOLERATED_KEY; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.FAILED_METADATA_VOLUMES_TOLERATED_KEY; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.FAILED_VOLUMES_TOLERATED_DEFAULT; +import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.GRPC_MAX_CONNECTIONS_DEFAULT; +import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.GRPC_MAX_CONNECTIONS_KEY; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.HDDS_DATANODE_VOLUME_MIN_FREE_SPACE_PERCENT_DEFAULT; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY; @@ -283,4 +285,44 @@ static void assertWaitTimeMin(TimeDuration expected, assertEquals(expected, t, RaftServerConfigKeys.Log.Appender.WAIT_TIME_MIN_KEY); } + + @Test + void testGrpcMaxConnectionsDefault() { + OzoneConfiguration conf = new OzoneConfiguration(); + + DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class); + + assertEquals(GRPC_MAX_CONNECTIONS_DEFAULT, subject.getGrpcMaxConnections()); + } + + @Test + void testGrpcMaxConnectionsCustomValue() { + OzoneConfiguration conf = new OzoneConfiguration(); + int customMaxConnections = 10000; + conf.setInt(GRPC_MAX_CONNECTIONS_KEY, customMaxConnections); + + DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class); + + assertEquals(customMaxConnections, subject.getGrpcMaxConnections()); + } + + @Test + void testGrpcMaxConnectionsZeroDisablesLimit() { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setInt(GRPC_MAX_CONNECTIONS_KEY, 0); + + DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class); + + assertEquals(0, subject.getGrpcMaxConnections()); + } + + @Test + void testGrpcMaxConnectionsSetter() { + OzoneConfiguration conf = new OzoneConfiguration(); + DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class); + + subject.setGrpcMaxConnections(8000); + + assertEquals(8000, subject.getGrpcMaxConnections()); + } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/TestGrpcConnectionLimitFilter.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/TestGrpcConnectionLimitFilter.java new file mode 100644 index 000000000000..e5900d76a992 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/TestGrpcConnectionLimitFilter.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ratis.thirdparty.io.grpc.Attributes; +import org.apache.ratis.thirdparty.io.grpc.Status; +import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException; +import org.junit.jupiter.api.Test; + +/** + * Tests for {@link GrpcConnectionLimitFilter}. + */ +class TestGrpcConnectionLimitFilter { + + private static final Attributes EMPTY_ATTRS = Attributes.EMPTY; + + @Test + void testConnectionAcceptedUnderLimit() { + GrpcConnectionLimitFilter filter = new GrpcConnectionLimitFilter(5); + + // Accept 5 connections (at limit) + for (int i = 0; i < 5; i++) { + Attributes result = filter.transportReady(EMPTY_ATTRS); + assertNotNull(result); + } + + assertEquals(5, filter.getActiveConnections()); + assertEquals(5, filter.getTotalAcceptedConnections()); + assertEquals(0, filter.getTotalRejectedConnections()); + } + + @Test + void testConnectionRejectedOverLimit() { + GrpcConnectionLimitFilter filter = new GrpcConnectionLimitFilter(3); + + for (int i = 0; i < 3; i++) { + filter.transportReady(EMPTY_ATTRS); + } + + StatusRuntimeException exception = assertThrows( + StatusRuntimeException.class, + () -> filter.transportReady(EMPTY_ATTRS) + ); + + assertEquals(Status.Code.RESOURCE_EXHAUSTED, exception.getStatus().getCode()); + assertEquals(3, filter.getActiveConnections()); + assertEquals(3, filter.getTotalAcceptedConnections()); + assertEquals(1, filter.getTotalRejectedConnections()); + } + + @Test + void testConnectionTerminationDecrementsCounter() { + GrpcConnectionLimitFilter filter = new GrpcConnectionLimitFilter(5); + + for (int i = 0; i < 3; i++) { + filter.transportReady(EMPTY_ATTRS); + } + assertEquals(3, filter.getActiveConnections()); + + filter.transportTerminated(EMPTY_ATTRS); + filter.transportTerminated(EMPTY_ATTRS); + + assertEquals(1, filter.getActiveConnections()); + assertEquals(3, filter.getTotalAcceptedConnections()); + } + + @Test + void testNewConnectionAllowedAfterTermination() { + GrpcConnectionLimitFilter filter = new GrpcConnectionLimitFilter(2); + + filter.transportReady(EMPTY_ATTRS); + filter.transportReady(EMPTY_ATTRS); + assertEquals(2, filter.getActiveConnections()); + + assertThrows(StatusRuntimeException.class, + () -> filter.transportReady(EMPTY_ATTRS)); + assertEquals(1, filter.getTotalRejectedConnections()); + + filter.transportTerminated(EMPTY_ATTRS); + assertEquals(1, filter.getActiveConnections()); + + filter.transportReady(EMPTY_ATTRS); + assertEquals(2, filter.getActiveConnections()); + assertEquals(3, filter.getTotalAcceptedConnections()); + } + + @Test + void testZeroLimitDisablesLimiting() { + GrpcConnectionLimitFilter filter = new GrpcConnectionLimitFilter(0); + + for (int i = 0; i < 1000; i++) { + Attributes result = filter.transportReady(EMPTY_ATTRS); + assertNotNull(result); + } + + assertEquals(0, filter.getActiveConnections()); + assertEquals(0, filter.getTotalAcceptedConnections()); + assertEquals(0, filter.getTotalRejectedConnections()); + } + + @Test + void testNegativeLimitDisablesLimiting() { + GrpcConnectionLimitFilter filter = new GrpcConnectionLimitFilter(-1); + + for (int i = 0; i < 100; i++) { + Attributes result = filter.transportReady(EMPTY_ATTRS); + assertNotNull(result); + } + + assertEquals(0, filter.getActiveConnections()); + } + + @Test + void testGetMaxConnections() { + GrpcConnectionLimitFilter filter = new GrpcConnectionLimitFilter(5000); + assertEquals(5000, filter.getMaxConnections()); + } + + @Test + void testConcurrentConnections() throws InterruptedException { + final int maxConnections = 100; + final int numThreads = 200; + GrpcConnectionLimitFilter filter = new GrpcConnectionLimitFilter(maxConnections); + + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(numThreads); + AtomicInteger acceptedCount = new AtomicInteger(0); + AtomicInteger rejectedCount = new AtomicInteger(0); + List connectionResults = new ArrayList<>(); + + for (int i = 0; i < numThreads; i++) { + executor.submit(() -> { + try { + startLatch.await(); + filter.transportReady(EMPTY_ATTRS); + acceptedCount.incrementAndGet(); + synchronized (connectionResults) { + connectionResults.add(true); + } + } catch (StatusRuntimeException e) { + rejectedCount.incrementAndGet(); + synchronized (connectionResults) { + connectionResults.add(false); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + doneLatch.countDown(); + } + }); + } + + startLatch.countDown(); + doneLatch.await(10, TimeUnit.SECONDS); + executor.shutdown(); + + assertEquals(maxConnections, acceptedCount.get(), + "Should accept exactly " + maxConnections + " connections"); + assertEquals(numThreads - maxConnections, rejectedCount.get(), + "Should reject " + (numThreads - maxConnections) + " connections"); + assertEquals(maxConnections, filter.getActiveConnections()); + assertEquals(maxConnections, filter.getTotalAcceptedConnections()); + assertEquals(numThreads - maxConnections, filter.getTotalRejectedConnections()); + } +} From 10ff1eef2f9ee0c273ba240d7d28cef447bbaa71 Mon Sep 17 00:00:00 2001 From: Bolin Lin Date: Mon, 4 May 2026 23:54:31 -0400 Subject: [PATCH 5/8] fix: test --- .../container/common/transport/server/XceiverServerGrpc.java | 4 ++-- .../transport/server/TestGrpcConnectionLimitFilter.java | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java index f2e5b51c3077..3d990f220c64 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java @@ -76,7 +76,6 @@ public final class XceiverServerGrpc implements XceiverServerSpi { private DatanodeDetails datanodeDetails; private ThreadPoolExecutor readExecutors; private EventLoopGroup eventLoopGroup; - private GrpcConnectionLimitFilter connectionLimitFilter; /** * Constructs a Grpc server class. @@ -104,7 +103,8 @@ public XceiverServerGrpc(DatanodeDetails datanodeDetails, HddsServerUtil.getDatanodeStorageDirs(conf).size(); final int poolSize = threadCountPerDisk * numberOfDisks; final int maxConnections = dnConf.getGrpcMaxConnections(); - connectionLimitFilter = new GrpcConnectionLimitFilter(maxConnections); + GrpcConnectionLimitFilter connectionLimitFilter = + new GrpcConnectionLimitFilter(maxConnections); LOG.info("Datanode gRPC server max connections: {}", maxConnections > 0 ? maxConnections : "unlimited"); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/TestGrpcConnectionLimitFilter.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/TestGrpcConnectionLimitFilter.java index e5900d76a992..080278661657 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/TestGrpcConnectionLimitFilter.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/TestGrpcConnectionLimitFilter.java @@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; import java.util.List; @@ -178,7 +179,8 @@ void testConcurrentConnections() throws InterruptedException { } startLatch.countDown(); - doneLatch.await(10, TimeUnit.SECONDS); + assertTrue(doneLatch.await(10, TimeUnit.SECONDS), + "All threads should complete within timeout"); executor.shutdown(); assertEquals(maxConnections, acceptedCount.get(), From 63e6c632b5db944d31529db24aee9da32a791dad Mon Sep 17 00:00:00 2001 From: Bolin Lin Date: Tue, 5 May 2026 00:49:47 -0400 Subject: [PATCH 6/8] doc: java doc --- .../server/GrpcConnectionLimitFilter.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcConnectionLimitFilter.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcConnectionLimitFilter.java index 8775de30181a..d6513cc746b6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcConnectionLimitFilter.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcConnectionLimitFilter.java @@ -53,6 +53,9 @@ public GrpcConnectionLimitFilter(int maxConnections) { this.maxConnections = maxConnections; } + /** + * {@inheritDoc} + */ @Override public Attributes transportReady(Attributes transportAttrs) { if (maxConnections <= 0) { @@ -78,6 +81,9 @@ public Attributes transportReady(Attributes transportAttrs) { return super.transportReady(transportAttrs); } + /** + * {@inheritDoc} + */ @Override public void transportTerminated(Attributes transportAttrs) { if (maxConnections > 0) { @@ -90,18 +96,34 @@ public void transportTerminated(Attributes transportAttrs) { super.transportTerminated(transportAttrs); } + /** + * Returns the current number of active connections. + * @return the number of active connections + */ public int getActiveConnections() { return activeConnections.get(); } + /** + * Returns the total number of connections accepted since server start. + * @return the total accepted connections count + */ public long getTotalAcceptedConnections() { return totalAcceptedConnections.get(); } + /** + * Returns the total number of connections rejected since server start. + * @return the total rejected connections count + */ public long getTotalRejectedConnections() { return totalRejectedConnections.get(); } + /** + * Returns the configured maximum number of connections. + * @return the maximum connections limit + */ public int getMaxConnections() { return maxConnections; } From c0e3d8937772dd4e0a487da32794d257b4ea155c Mon Sep 17 00:00:00 2001 From: Bolin Lin Date: Tue, 5 May 2026 11:33:44 -0400 Subject: [PATCH 7/8] feat: add so_backlog to prevent burst connections --- .../statemachine/DatanodeConfiguration.java | 25 +++++++++++++++++++ .../server/GrpcConnectionLimitFilter.java | 2 +- .../transport/server/XceiverServerGrpc.java | 7 ++++-- 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java index 0f297ec8ef13..db31f24594f7 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java @@ -142,6 +142,9 @@ public class DatanodeConfiguration extends ReconfigurableConfig { public static final String GRPC_MAX_CONNECTIONS_KEY = "hdds.datanode.grpc.max.connections"; public static final int GRPC_MAX_CONNECTIONS_DEFAULT = 5000; + public static final String GRPC_SO_BACKLOG_KEY = "hdds.datanode.grpc.so.backlog"; + public static final int GRPC_SO_BACKLOG_DEFAULT = 4096; + public static final String BLOCK_DELETE_COMMAND_WORKER_INTERVAL = "hdds.datanode.block.delete.command.worker.interval"; public static final Duration BLOCK_DELETE_COMMAND_WORKER_INTERVAL_DEFAULT = Duration.ofSeconds(2); @@ -170,6 +173,20 @@ public class DatanodeConfiguration extends ReconfigurableConfig { ) private int grpcMaxConnections = GRPC_MAX_CONNECTIONS_DEFAULT; + /** + * SO_BACKLOG value for the gRPC server socket. + */ + @Config(key = "hdds.datanode.grpc.so.backlog", + type = ConfigType.INT, + defaultValue = "4096", + tags = {DATANODE}, + description = "The SO_BACKLOG value for the Datanode gRPC server socket. " + + "This limits the number of pending connections in the kernel's " + + "accept queue. When this limit is reached, the kernel will reject " + + "new connection attempts with SYN drops." + ) + private int grpcSoBacklog = GRPC_SO_BACKLOG_DEFAULT; + /** * The maximum number of threads used to delete containers on a datanode * simultaneously. @@ -1236,4 +1253,12 @@ public int getGrpcMaxConnections() { public void setGrpcMaxConnections(int grpcMaxConnections) { this.grpcMaxConnections = grpcMaxConnections; } + + public int getGrpcSoBacklog() { + return grpcSoBacklog; + } + + public void setGrpcSoBacklog(int grpcSoBacklog) { + this.grpcSoBacklog = grpcSoBacklog; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcConnectionLimitFilter.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcConnectionLimitFilter.java index d6513cc746b6..d8b7ac06b1e1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcConnectionLimitFilter.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcConnectionLimitFilter.java @@ -47,7 +47,7 @@ public class GrpcConnectionLimitFilter extends ServerTransportFilter { * Creates a new connection limit filter. * * @param maxConnections the maximum number of concurrent connections allowed. - * If <= 0, no limit is enforced. + * If zero or negative, no limit is enforced. */ public GrpcConnectionLimitFilter(int maxConnections) { this.maxConnections = maxConnections; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java index 3d990f220c64..22ee5e471f41 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java @@ -50,6 +50,7 @@ import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors; import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts; import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder; +import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption; import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.ratis.thirdparty.io.netty.channel.ServerChannel; import org.apache.ratis.thirdparty.io.netty.channel.epoll.Epoll; @@ -103,10 +104,11 @@ public XceiverServerGrpc(DatanodeDetails datanodeDetails, HddsServerUtil.getDatanodeStorageDirs(conf).size(); final int poolSize = threadCountPerDisk * numberOfDisks; final int maxConnections = dnConf.getGrpcMaxConnections(); + final int soBacklog = dnConf.getGrpcSoBacklog(); GrpcConnectionLimitFilter connectionLimitFilter = new GrpcConnectionLimitFilter(maxConnections); - LOG.info("Datanode gRPC server max connections: {}", - maxConnections > 0 ? maxConnections : "unlimited"); + LOG.info("Datanode gRPC server max connections: {}, SO_BACKLOG: {}", + maxConnections > 0 ? maxConnections : "unlimited", soBacklog); readExecutors = new ThreadPoolExecutor(poolSize, poolSize, 60, TimeUnit.SECONDS, @@ -138,6 +140,7 @@ public XceiverServerGrpc(DatanodeDetails datanodeDetails, .bossEventLoopGroup(eventLoopGroup) .workerEventLoopGroup(eventLoopGroup) .channelType(channelType) + .withOption(ChannelOption.SO_BACKLOG, soBacklog) .executor(readExecutors) .addService(ServerInterceptors.intercept( xceiverService.bindServiceWithZeroCopy(), From 9d8f62ad0caa29fb84a6c7acec0c3e4ef8295c1c Mon Sep 17 00:00:00 2001 From: Bolin Lin Date: Tue, 5 May 2026 11:35:59 -0400 Subject: [PATCH 8/8] test: add so_backlog set test --- .../TestDatanodeConfiguration.java | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java index fe113bcc652d..4f3ac4ae5ba1 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java @@ -31,6 +31,8 @@ import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.FAILED_VOLUMES_TOLERATED_DEFAULT; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.GRPC_MAX_CONNECTIONS_DEFAULT; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.GRPC_MAX_CONNECTIONS_KEY; +import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.GRPC_SO_BACKLOG_DEFAULT; +import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.GRPC_SO_BACKLOG_KEY; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.HDDS_DATANODE_VOLUME_MIN_FREE_SPACE_PERCENT_DEFAULT; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY; @@ -325,4 +327,34 @@ void testGrpcMaxConnectionsSetter() { assertEquals(8000, subject.getGrpcMaxConnections()); } + + @Test + void testGrpcSoBacklogDefault() { + OzoneConfiguration conf = new OzoneConfiguration(); + + DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class); + + assertEquals(GRPC_SO_BACKLOG_DEFAULT, subject.getGrpcSoBacklog()); + } + + @Test + void testGrpcSoBacklogCustomValue() { + OzoneConfiguration conf = new OzoneConfiguration(); + int customSoBacklog = 256; + conf.setInt(GRPC_SO_BACKLOG_KEY, customSoBacklog); + + DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class); + + assertEquals(customSoBacklog, subject.getGrpcSoBacklog()); + } + + @Test + void testGrpcSoBacklogSetter() { + OzoneConfiguration conf = new OzoneConfiguration(); + DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class); + + subject.setGrpcSoBacklog(512); + + assertEquals(512, subject.getGrpcSoBacklog()); + } }