diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java index 919777f35191..db31f24594f7 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java @@ -139,6 +139,12 @@ public class DatanodeConfiguration extends ReconfigurableConfig { static final int CONTAINER_CLOSE_THREADS_DEFAULT = 3; static final int BLOCK_DELETE_THREADS_DEFAULT = 5; + public static final String GRPC_MAX_CONNECTIONS_KEY = "hdds.datanode.grpc.max.connections"; + public static final int GRPC_MAX_CONNECTIONS_DEFAULT = 5000; + + public static final String GRPC_SO_BACKLOG_KEY = "hdds.datanode.grpc.so.backlog"; + public static final int GRPC_SO_BACKLOG_DEFAULT = 4096; + public static final String BLOCK_DELETE_COMMAND_WORKER_INTERVAL = "hdds.datanode.block.delete.command.worker.interval"; public static final Duration BLOCK_DELETE_COMMAND_WORKER_INTERVAL_DEFAULT = Duration.ofSeconds(2); @@ -154,6 +160,33 @@ public class DatanodeConfiguration extends ReconfigurableConfig { ) private int numReadThreadPerVolume = 10; + /** + * Maximum number of concurrent gRPC connections to the Datanode server. + */ + @Config(key = "hdds.datanode.grpc.max.connections", + type = ConfigType.INT, + defaultValue = "5000", + tags = {DATANODE}, + description = "Maximum number of concurrent gRPC connections to the " + + "Datanode server. This helps prevent file descriptor exhaustion. " + + "Set to 0 to disable the limit." + ) + private int grpcMaxConnections = GRPC_MAX_CONNECTIONS_DEFAULT; + + /** + * SO_BACKLOG value for the gRPC server socket. + */ + @Config(key = "hdds.datanode.grpc.so.backlog", + type = ConfigType.INT, + defaultValue = "4096", + tags = {DATANODE}, + description = "The SO_BACKLOG value for the Datanode gRPC server socket. " + + "This limits the number of pending connections in the kernel's " + + "accept queue. When this limit is reached, the kernel will reject " + + "new connection attempts with SYN drops." + ) + private int grpcSoBacklog = GRPC_SO_BACKLOG_DEFAULT; + /** * The maximum number of threads used to delete containers on a datanode * simultaneously. @@ -1212,4 +1245,20 @@ static long getDefaultFreeSpace() { final StorageSize measure = StorageSize.parse(HDDS_DATANODE_VOLUME_MIN_FREE_SPACE_DEFAULT); return Math.round(measure.getUnit().toBytes(measure.getValue())); } + + public int getGrpcMaxConnections() { + return grpcMaxConnections; + } + + public void setGrpcMaxConnections(int grpcMaxConnections) { + this.grpcMaxConnections = grpcMaxConnections; + } + + public int getGrpcSoBacklog() { + return grpcSoBacklog; + } + + public void setGrpcSoBacklog(int grpcSoBacklog) { + this.grpcSoBacklog = grpcSoBacklog; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcConnectionLimitFilter.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcConnectionLimitFilter.java new file mode 100644 index 000000000000..d8b7ac06b1e1 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcConnectionLimitFilter.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ratis.thirdparty.io.grpc.Attributes; +import org.apache.ratis.thirdparty.io.grpc.ServerTransportFilter; +import org.apache.ratis.thirdparty.io.grpc.Status; +import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A gRPC ServerTransportFilter that limits the number of concurrent + * connections to the Datanode gRPC server. + * + *

When the connection limit is reached, new connections are rejected + * with a RESOURCE_EXHAUSTED status, preventing file descriptor exhaustion. + */ +public class GrpcConnectionLimitFilter extends ServerTransportFilter { + + private static final Logger LOG = + LoggerFactory.getLogger(GrpcConnectionLimitFilter.class); + + private final int maxConnections; + private final AtomicInteger activeConnections = new AtomicInteger(0); + private final AtomicLong totalAcceptedConnections = new AtomicLong(0); + private final AtomicLong totalRejectedConnections = new AtomicLong(0); + + /** + * Creates a new connection limit filter. + * + * @param maxConnections the maximum number of concurrent connections allowed. + * If zero or negative, no limit is enforced. + */ + public GrpcConnectionLimitFilter(int maxConnections) { + this.maxConnections = maxConnections; + } + + /** + * {@inheritDoc} + */ + @Override + public Attributes transportReady(Attributes transportAttrs) { + if (maxConnections <= 0) { + return super.transportReady(transportAttrs); + } + + int current = activeConnections.incrementAndGet(); + if (current > maxConnections) { + activeConnections.decrementAndGet(); + totalRejectedConnections.incrementAndGet(); + LOG.warn("Connection rejected: limit {} reached, current active: {}", + maxConnections, current - 1); + throw new StatusRuntimeException( + Status.RESOURCE_EXHAUSTED.withDescription( + "Datanode connection limit exceeded: " + maxConnections)); + } + + totalAcceptedConnections.incrementAndGet(); + if (LOG.isDebugEnabled()) { + LOG.debug("Connection accepted: active connections: {}/{}", + current, maxConnections); + } + return super.transportReady(transportAttrs); + } + + /** + * {@inheritDoc} + */ + @Override + public void transportTerminated(Attributes transportAttrs) { + if (maxConnections > 0) { + int current = activeConnections.decrementAndGet(); + if (LOG.isDebugEnabled()) { + LOG.debug("Connection terminated: active connections: {}/{}", + current, maxConnections); + } + } + super.transportTerminated(transportAttrs); + } + + /** + * Returns the current number of active connections. + * @return the number of active connections + */ + public int getActiveConnections() { + return activeConnections.get(); + } + + /** + * Returns the total number of connections accepted since server start. + * @return the total accepted connections count + */ + public long getTotalAcceptedConnections() { + return totalAcceptedConnections.get(); + } + + /** + * Returns the total number of connections rejected since server start. + * @return the total rejected connections count + */ + public long getTotalRejectedConnections() { + return totalRejectedConnections.get(); + } + + /** + * Returns the configured maximum number of connections. + * @return the maximum connections limit + */ + public int getMaxConnections() { + return maxConnections; + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java index 7521b460467c..22ee5e471f41 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java @@ -50,6 +50,7 @@ import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors; import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts; import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder; +import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption; import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.ratis.thirdparty.io.netty.channel.ServerChannel; import org.apache.ratis.thirdparty.io.netty.channel.epoll.Epoll; @@ -97,11 +98,17 @@ public XceiverServerGrpc(DatanodeDetails datanodeDetails, this.port = 0; } - final int threadCountPerDisk = - conf.getObject(DatanodeConfiguration.class).getNumReadThreadPerVolume(); + DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class); + final int threadCountPerDisk = dnConf.getNumReadThreadPerVolume(); final int numberOfDisks = HddsServerUtil.getDatanodeStorageDirs(conf).size(); final int poolSize = threadCountPerDisk * numberOfDisks; + final int maxConnections = dnConf.getGrpcMaxConnections(); + final int soBacklog = dnConf.getGrpcSoBacklog(); + GrpcConnectionLimitFilter connectionLimitFilter = + new GrpcConnectionLimitFilter(maxConnections); + LOG.info("Datanode gRPC server max connections: {}, SO_BACKLOG: {}", + maxConnections > 0 ? maxConnections : "unlimited", soBacklog); readExecutors = new ThreadPoolExecutor(poolSize, poolSize, 60, TimeUnit.SECONDS, @@ -133,10 +140,12 @@ public XceiverServerGrpc(DatanodeDetails datanodeDetails, .bossEventLoopGroup(eventLoopGroup) .workerEventLoopGroup(eventLoopGroup) .channelType(channelType) + .withOption(ChannelOption.SO_BACKLOG, soBacklog) .executor(readExecutors) .addService(ServerInterceptors.intercept( xceiverService.bindServiceWithZeroCopy(), - new GrpcServerInterceptor())); + new GrpcServerInterceptor())) + .addTransportFilter(connectionLimitFilter); SecurityConfig secConf = new SecurityConfig(conf); if (secConf.isSecurityEnabled() && secConf.isGrpcTlsEnabled()) { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java index 5012526782aa..4f3ac4ae5ba1 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestDatanodeConfiguration.java @@ -29,6 +29,10 @@ import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.FAILED_DB_VOLUMES_TOLERATED_KEY; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.FAILED_METADATA_VOLUMES_TOLERATED_KEY; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.FAILED_VOLUMES_TOLERATED_DEFAULT; +import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.GRPC_MAX_CONNECTIONS_DEFAULT; +import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.GRPC_MAX_CONNECTIONS_KEY; +import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.GRPC_SO_BACKLOG_DEFAULT; +import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.GRPC_SO_BACKLOG_KEY; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.HDDS_DATANODE_VOLUME_MIN_FREE_SPACE_PERCENT_DEFAULT; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_DEFAULT; import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY; @@ -283,4 +287,74 @@ static void assertWaitTimeMin(TimeDuration expected, assertEquals(expected, t, RaftServerConfigKeys.Log.Appender.WAIT_TIME_MIN_KEY); } + + @Test + void testGrpcMaxConnectionsDefault() { + OzoneConfiguration conf = new OzoneConfiguration(); + + DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class); + + assertEquals(GRPC_MAX_CONNECTIONS_DEFAULT, subject.getGrpcMaxConnections()); + } + + @Test + void testGrpcMaxConnectionsCustomValue() { + OzoneConfiguration conf = new OzoneConfiguration(); + int customMaxConnections = 10000; + conf.setInt(GRPC_MAX_CONNECTIONS_KEY, customMaxConnections); + + DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class); + + assertEquals(customMaxConnections, subject.getGrpcMaxConnections()); + } + + @Test + void testGrpcMaxConnectionsZeroDisablesLimit() { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setInt(GRPC_MAX_CONNECTIONS_KEY, 0); + + DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class); + + assertEquals(0, subject.getGrpcMaxConnections()); + } + + @Test + void testGrpcMaxConnectionsSetter() { + OzoneConfiguration conf = new OzoneConfiguration(); + DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class); + + subject.setGrpcMaxConnections(8000); + + assertEquals(8000, subject.getGrpcMaxConnections()); + } + + @Test + void testGrpcSoBacklogDefault() { + OzoneConfiguration conf = new OzoneConfiguration(); + + DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class); + + assertEquals(GRPC_SO_BACKLOG_DEFAULT, subject.getGrpcSoBacklog()); + } + + @Test + void testGrpcSoBacklogCustomValue() { + OzoneConfiguration conf = new OzoneConfiguration(); + int customSoBacklog = 256; + conf.setInt(GRPC_SO_BACKLOG_KEY, customSoBacklog); + + DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class); + + assertEquals(customSoBacklog, subject.getGrpcSoBacklog()); + } + + @Test + void testGrpcSoBacklogSetter() { + OzoneConfiguration conf = new OzoneConfiguration(); + DatanodeConfiguration subject = conf.getObject(DatanodeConfiguration.class); + + subject.setGrpcSoBacklog(512); + + assertEquals(512, subject.getGrpcSoBacklog()); + } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/TestGrpcConnectionLimitFilter.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/TestGrpcConnectionLimitFilter.java new file mode 100644 index 000000000000..080278661657 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/TestGrpcConnectionLimitFilter.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ratis.thirdparty.io.grpc.Attributes; +import org.apache.ratis.thirdparty.io.grpc.Status; +import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException; +import org.junit.jupiter.api.Test; + +/** + * Tests for {@link GrpcConnectionLimitFilter}. + */ +class TestGrpcConnectionLimitFilter { + + private static final Attributes EMPTY_ATTRS = Attributes.EMPTY; + + @Test + void testConnectionAcceptedUnderLimit() { + GrpcConnectionLimitFilter filter = new GrpcConnectionLimitFilter(5); + + // Accept 5 connections (at limit) + for (int i = 0; i < 5; i++) { + Attributes result = filter.transportReady(EMPTY_ATTRS); + assertNotNull(result); + } + + assertEquals(5, filter.getActiveConnections()); + assertEquals(5, filter.getTotalAcceptedConnections()); + assertEquals(0, filter.getTotalRejectedConnections()); + } + + @Test + void testConnectionRejectedOverLimit() { + GrpcConnectionLimitFilter filter = new GrpcConnectionLimitFilter(3); + + for (int i = 0; i < 3; i++) { + filter.transportReady(EMPTY_ATTRS); + } + + StatusRuntimeException exception = assertThrows( + StatusRuntimeException.class, + () -> filter.transportReady(EMPTY_ATTRS) + ); + + assertEquals(Status.Code.RESOURCE_EXHAUSTED, exception.getStatus().getCode()); + assertEquals(3, filter.getActiveConnections()); + assertEquals(3, filter.getTotalAcceptedConnections()); + assertEquals(1, filter.getTotalRejectedConnections()); + } + + @Test + void testConnectionTerminationDecrementsCounter() { + GrpcConnectionLimitFilter filter = new GrpcConnectionLimitFilter(5); + + for (int i = 0; i < 3; i++) { + filter.transportReady(EMPTY_ATTRS); + } + assertEquals(3, filter.getActiveConnections()); + + filter.transportTerminated(EMPTY_ATTRS); + filter.transportTerminated(EMPTY_ATTRS); + + assertEquals(1, filter.getActiveConnections()); + assertEquals(3, filter.getTotalAcceptedConnections()); + } + + @Test + void testNewConnectionAllowedAfterTermination() { + GrpcConnectionLimitFilter filter = new GrpcConnectionLimitFilter(2); + + filter.transportReady(EMPTY_ATTRS); + filter.transportReady(EMPTY_ATTRS); + assertEquals(2, filter.getActiveConnections()); + + assertThrows(StatusRuntimeException.class, + () -> filter.transportReady(EMPTY_ATTRS)); + assertEquals(1, filter.getTotalRejectedConnections()); + + filter.transportTerminated(EMPTY_ATTRS); + assertEquals(1, filter.getActiveConnections()); + + filter.transportReady(EMPTY_ATTRS); + assertEquals(2, filter.getActiveConnections()); + assertEquals(3, filter.getTotalAcceptedConnections()); + } + + @Test + void testZeroLimitDisablesLimiting() { + GrpcConnectionLimitFilter filter = new GrpcConnectionLimitFilter(0); + + for (int i = 0; i < 1000; i++) { + Attributes result = filter.transportReady(EMPTY_ATTRS); + assertNotNull(result); + } + + assertEquals(0, filter.getActiveConnections()); + assertEquals(0, filter.getTotalAcceptedConnections()); + assertEquals(0, filter.getTotalRejectedConnections()); + } + + @Test + void testNegativeLimitDisablesLimiting() { + GrpcConnectionLimitFilter filter = new GrpcConnectionLimitFilter(-1); + + for (int i = 0; i < 100; i++) { + Attributes result = filter.transportReady(EMPTY_ATTRS); + assertNotNull(result); + } + + assertEquals(0, filter.getActiveConnections()); + } + + @Test + void testGetMaxConnections() { + GrpcConnectionLimitFilter filter = new GrpcConnectionLimitFilter(5000); + assertEquals(5000, filter.getMaxConnections()); + } + + @Test + void testConcurrentConnections() throws InterruptedException { + final int maxConnections = 100; + final int numThreads = 200; + GrpcConnectionLimitFilter filter = new GrpcConnectionLimitFilter(maxConnections); + + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(numThreads); + AtomicInteger acceptedCount = new AtomicInteger(0); + AtomicInteger rejectedCount = new AtomicInteger(0); + List connectionResults = new ArrayList<>(); + + for (int i = 0; i < numThreads; i++) { + executor.submit(() -> { + try { + startLatch.await(); + filter.transportReady(EMPTY_ATTRS); + acceptedCount.incrementAndGet(); + synchronized (connectionResults) { + connectionResults.add(true); + } + } catch (StatusRuntimeException e) { + rejectedCount.incrementAndGet(); + synchronized (connectionResults) { + connectionResults.add(false); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + doneLatch.countDown(); + } + }); + } + + startLatch.countDown(); + assertTrue(doneLatch.await(10, TimeUnit.SECONDS), + "All threads should complete within timeout"); + executor.shutdown(); + + assertEquals(maxConnections, acceptedCount.get(), + "Should accept exactly " + maxConnections + " connections"); + assertEquals(numThreads - maxConnections, rejectedCount.get(), + "Should reject " + (numThreads - maxConnections) + " connections"); + assertEquals(maxConnections, filter.getActiveConnections()); + assertEquals(maxConnections, filter.getTotalAcceptedConnections()); + assertEquals(numThreads - maxConnections, filter.getTotalRejectedConnections()); + } +}