diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/http1/VertxHttpResponseEncoder.java b/vertx-core/src/main/java/io/vertx/core/http/impl/http1/VertxHttpResponseEncoder.java index edda2b68823..2f04359f417 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/http1/VertxHttpResponseEncoder.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/http1/VertxHttpResponseEncoder.java @@ -24,6 +24,7 @@ import io.netty.handler.codec.http.LastHttpContent; import io.vertx.core.http.impl.headers.Http1xHeaders; import io.vertx.core.impl.SysProps; +import io.vertx.core.net.impl.UncloseableFileRegion; /** * {@link io.netty.handler.codec.http.HttpResponseEncoder} which forces the usage of direct buffers for max performance. @@ -60,7 +61,7 @@ public boolean acceptOutboundMessage(Object msg) throws Exception { msgClazz == VertxAssembledHttpResponse.class || msgClazz == DefaultHttpContent.class || msgClazz == VertxLastHttpContent.class || - msgClazz == DefaultFileRegion.class) { + msgClazz == UncloseableFileRegion.class) { return true; } // Netty slow-path diff --git a/vertx-core/src/main/java/io/vertx/core/impl/SysProps.java b/vertx-core/src/main/java/io/vertx/core/impl/SysProps.java index 82d6f809e86..638f295587a 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/SysProps.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/SysProps.java @@ -116,6 +116,17 @@ public String get() { JACKSON_DEFAULT_READ_MAX_NAME_LEN("vertx.jackson.defaultReadMaxNameLength"), JACKSON_DEFAULT_READ_MAX_TOKEN_COUNT("vertx.jackson.defaultMaxTokenCount"), + /** + * Disable {@code sendfile} support for the io_uring transport. + *

+ * When this system property is set to {@code true}, Vert.x will avoid using the + * io_uring splice path. + *

+ * This is useful because io_uring splice can be slower than the + * epoll {@code sendfile} in some workloads. + * See Netty issue 15747. + */ + DISABLE_IO_URING_SENDFILE("vertx.disableIoUringSendfile"), ; public final String name; diff --git a/vertx-core/src/main/java/io/vertx/core/impl/transports/IoUringTransport.java b/vertx-core/src/main/java/io/vertx/core/impl/transports/IoUringTransport.java index b8291869f2e..919a73ba026 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/transports/IoUringTransport.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/transports/IoUringTransport.java @@ -18,6 +18,7 @@ import io.netty.channel.unix.DomainSocketAddress; import io.netty.channel.uring.*; import io.vertx.core.datagram.DatagramSocketOptions; +import io.vertx.core.impl.SysProps; import io.vertx.core.net.TcpConfig; import io.vertx.core.net.impl.SocketAddressImpl; import io.vertx.core.spi.transport.Transport; @@ -29,6 +30,8 @@ */ public class IoUringTransport implements Transport { + private static final boolean DISABLE_SENDFILE = SysProps.DISABLE_IO_URING_SENDFILE.getBoolean(); + private static volatile int pendingFastOpenRequestsThreshold = 256; /** @@ -57,18 +60,18 @@ public IoUringTransport() { @Override public boolean supportsDomainSockets() { - return false; + return true; } @Override public boolean supportFileRegion() { - return false; + return IoUring.isSpliceSupported() && !DISABLE_SENDFILE; } @Override public SocketAddress convert(io.vertx.core.net.SocketAddress address) { if (address.isDomainSocket()) { - throw new IllegalArgumentException("Domain socket not supported by IOUring transport"); + return new DomainSocketAddress(address.path()); } return Transport.super.convert(address); } @@ -109,7 +112,7 @@ public ChannelFactory datagramChannelFactory() { @Override public ChannelFactory channelFactory(boolean domainSocket) { if (domainSocket) { - throw new IllegalArgumentException(); + return IoUringDomainSocketChannel::new; } return IoUringSocketChannel::new; } @@ -117,7 +120,7 @@ public ChannelFactory channelFactory(boolean domainSocket) { @Override public ChannelFactory serverChannelFactory(boolean domainSocket) { if (domainSocket) { - throw new IllegalArgumentException(); + return IoUringServerDomainSocketChannel::new; } return IoUringServerSocketChannel::new; } @@ -130,30 +133,26 @@ public void configure(DatagramChannel channel, DatagramSocketOptions options) { @Override public void configure(TcpConfig options, boolean domainSocket, ServerBootstrap bootstrap) { - if (domainSocket) { - throw new IllegalArgumentException(); - } - bootstrap.option(IoUringChannelOption.SO_REUSEPORT, options.isReusePort()); - if (options.isTcpFastOpen()) { - bootstrap.option(IoUringChannelOption.TCP_FASTOPEN, options.isTcpFastOpen() ? pendingFastOpenRequestsThreshold : 0); + if (!domainSocket) { + bootstrap.option(IoUringChannelOption.SO_REUSEPORT, options.isReusePort()); + if (options.isTcpFastOpen()) { + bootstrap.option(IoUringChannelOption.TCP_FASTOPEN, options.isTcpFastOpen() ? pendingFastOpenRequestsThreshold : 0); + } + bootstrap.childOption(IoUringChannelOption.TCP_QUICKACK, options.isTcpQuickAck()); + bootstrap.childOption(IoUringChannelOption.TCP_CORK, options.isTcpCork()); } - bootstrap.childOption(IoUringChannelOption.TCP_USER_TIMEOUT, options.getTcpUserTimeout()); - bootstrap.childOption(IoUringChannelOption.TCP_QUICKACK, options.isTcpQuickAck()); - bootstrap.childOption(IoUringChannelOption.TCP_CORK, options.isTcpCork()); - Transport.super.configure(options, false, bootstrap); + Transport.super.configure(options, domainSocket, bootstrap); } @Override public void configure(TcpConfig options, boolean domainSocket, Bootstrap bootstrap) { - if (domainSocket) { - throw new IllegalArgumentException(); - } - if (options.isTcpFastOpen()) { - bootstrap.option(IoUringChannelOption.TCP_FASTOPEN_CONNECT, options.isTcpFastOpen()); + if (!domainSocket) { + if (options.isTcpFastOpen()) { + bootstrap.option(IoUringChannelOption.TCP_FASTOPEN_CONNECT, options.isTcpFastOpen()); + } + bootstrap.option(IoUringChannelOption.TCP_QUICKACK, options.isTcpQuickAck()); + bootstrap.option(IoUringChannelOption.TCP_CORK, options.isTcpCork()); } - bootstrap.option(IoUringChannelOption.TCP_USER_TIMEOUT, options.getTcpUserTimeout()); - bootstrap.option(IoUringChannelOption.TCP_QUICKACK, options.isTcpQuickAck()); - bootstrap.option(IoUringChannelOption.TCP_CORK, options.isTcpCork()); - Transport.super.configure(options, false, bootstrap); + Transport.super.configure(options, domainSocket, bootstrap); } } diff --git a/vertx-core/src/main/java/io/vertx/core/net/impl/UncloseableFileRegion.java b/vertx-core/src/main/java/io/vertx/core/net/impl/UncloseableFileRegion.java new file mode 100644 index 00000000000..3a1d06d05f3 --- /dev/null +++ b/vertx-core/src/main/java/io/vertx/core/net/impl/UncloseableFileRegion.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2011-2026 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.net.impl; + +import io.netty.channel.DefaultFileRegion; + +import java.io.File; +import java.nio.channels.FileChannel; + +/** + * A file region that does close the underlying resource, letting the file user control the lifecycle of + * the file descriptor. + * + * @author MengYang Li + */ +public class UncloseableFileRegion extends DefaultFileRegion { + public UncloseableFileRegion(FileChannel fileChannel, long position, long count) { + super(fileChannel, position, count); + } + + public UncloseableFileRegion(File file, long position, long count) { + super(file, position, count); + } + + @Override + protected void deallocate() { + + } +} diff --git a/vertx-core/src/main/java/io/vertx/core/net/impl/VertxConnection.java b/vertx-core/src/main/java/io/vertx/core/net/impl/VertxConnection.java index 46c274de736..34105b57c86 100644 --- a/vertx-core/src/main/java/io/vertx/core/net/impl/VertxConnection.java +++ b/vertx-core/src/main/java/io/vertx/core/net/impl/VertxConnection.java @@ -508,17 +508,11 @@ public boolean writeQueueFull() { */ private void sendFileRegion(FileChannel fc, long offset, long length, ChannelPromise writeFuture) { if (length < MAX_REGION_SIZE) { - FileRegion region = new DefaultFileRegion(fc, offset, length); - // Retain explicitly this file region so the underlying channel is not closed by the NIO channel when it - // as been sent as the caller can need it again - region.retain(); + FileRegion region = new UncloseableFileRegion(fc, offset, length); writeToChannel(region, writeFuture); } else { ChannelPromise promise = chctx.newPromise(); - FileRegion region = new DefaultFileRegion(fc, offset, MAX_REGION_SIZE); - // Retain explicitly this file region so the underlying channel is not closed by the NIO channel when it - // as been sent as we need it again - region.retain(); + FileRegion region = new UncloseableFileRegion(fc, offset, MAX_REGION_SIZE); writeToChannel(region, promise); promise.addListener(future -> { if (future.isSuccess()) {