From 12b856252cd844fa097ae7e80e398e11d6ce3f7b Mon Sep 17 00:00:00 2001 From: Damiano Renfer Date: Thu, 19 Feb 2026 18:18:24 +0100 Subject: [PATCH] Add support for HA Proxy Protocol TLV's TLV's are supported in netty, but currently not accessible to vert.x users. Here the TLV's are exposed in `NetSocket#tlvs` --- .../io/vertx/core/http/HttpConnection.java | 11 ++++++++ .../vertx/core/http/impl/HttpNetSocket.java | 7 ++++- .../impl/UnpooledHttpClientConnection.java | 7 ++++- .../tcp/Http2UpgradeClientConnection.java | 8 +++++- .../java/io/vertx/core/net/NetSocket.java | 10 ++++++- .../vertx/core/net/impl/ConnectionBase.java | 8 ++++++ .../impl/HAProxyMessageCompletionHandler.java | 20 ++++++++++++++ .../java/io/vertx/test/proxy/HAProxy.java | 15 ++++++++--- .../java/io/vertx/tests/http/HAProxyTest.java | 26 +++++++++++++++++++ 9 files changed, 104 insertions(+), 8 deletions(-) diff --git a/vertx-core/src/main/java/io/vertx/core/http/HttpConnection.java b/vertx-core/src/main/java/io/vertx/core/http/HttpConnection.java index b740a66d761..b444d823781 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/HttpConnection.java +++ b/vertx-core/src/main/java/io/vertx/core/http/HttpConnection.java @@ -23,6 +23,7 @@ import java.time.Duration; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; /** @@ -304,4 +305,14 @@ default List peerCertificates() throws SSLPeerUnverifiedException { */ String indicatedServerName(); + /** + * @return the type-length-values present in the TCP header as an iterable of map entries + * where the key contains the TLV type and the value contains the TLV value. + * This is mainly used for HA Proxy Protocol v2 + */ + @GenIgnore() + default Iterable> tlvs() { + return List.of(); + } + } diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpNetSocket.java b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpNetSocket.java index d0f8226ffb8..a9fa348b013 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpNetSocket.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpNetSocket.java @@ -16,7 +16,6 @@ import io.vertx.core.Handler; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpClosedException; -import io.vertx.core.http.StreamResetException; import io.vertx.core.internal.ContextInternal; import io.vertx.core.net.NetSocket; import io.vertx.core.net.SSLOptions; @@ -26,6 +25,7 @@ import javax.net.ssl.SSLSession; import java.time.Duration; +import java.util.Map; /** * @author Julien Viet @@ -229,6 +229,11 @@ public SocketAddress localAddress(boolean real) { return stream.connection().localAddress(real); } + @Override + public Iterable> tlvs() { + return stream.connection().tlvs(); + } + @Override public Future close() { return end(); diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/UnpooledHttpClientConnection.java b/vertx-core/src/main/java/io/vertx/core/http/impl/UnpooledHttpClientConnection.java index afdc2bcfcec..d7e636c8188 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/UnpooledHttpClientConnection.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/UnpooledHttpClientConnection.java @@ -26,7 +26,7 @@ import java.time.Duration; import java.util.ArrayDeque; import java.util.Deque; -import java.util.concurrent.TimeUnit; +import java.util.Map; /** * An un-pooled HTTP client connection that maintains a queue for pending requests that cannot be served @@ -207,6 +207,11 @@ public String indicatedServerName() { return actual.indicatedServerName(); } + @Override + public Iterable> tlvs() { + return actual.tlvs(); + } + /** * Create an HTTP stream. * diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/tcp/Http2UpgradeClientConnection.java b/vertx-core/src/main/java/io/vertx/core/http/impl/tcp/Http2UpgradeClientConnection.java index 3058d8c9adc..e462a290cc0 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/tcp/Http2UpgradeClientConnection.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/tcp/Http2UpgradeClientConnection.java @@ -34,10 +34,11 @@ import io.vertx.core.net.HostAndPort; import io.vertx.core.net.SocketAddress; import io.vertx.core.spi.metrics.ClientMetrics; -import io.vertx.core.spi.metrics.HttpClientMetrics; + import javax.net.ssl.SSLSession; import java.time.Duration; +import java.util.Map; /** * A connection that attempts to perform a protocol upgrade to H2C. The connection might use HTTP/1 or H2C @@ -901,6 +902,11 @@ public String indicatedServerName() { return current.indicatedServerName(); } + @Override + public Iterable> tlvs() { + return current.tlvs(); + } + @Override public String toString() { return getClass().getSimpleName() + "[current=" + current.getClass().getSimpleName() + "]"; diff --git a/vertx-core/src/main/java/io/vertx/core/net/NetSocket.java b/vertx-core/src/main/java/io/vertx/core/net/NetSocket.java index 315f6ae91b7..35bb396d3f1 100644 --- a/vertx-core/src/main/java/io/vertx/core/net/NetSocket.java +++ b/vertx-core/src/main/java/io/vertx/core/net/NetSocket.java @@ -18,7 +18,6 @@ import io.vertx.codegen.annotations.VertxGen; import io.vertx.core.Future; import io.vertx.core.Handler; -import io.vertx.core.VertxException; import io.vertx.core.buffer.Buffer; import io.vertx.core.streams.ReadStream; import io.vertx.core.streams.WriteStream; @@ -29,6 +28,7 @@ import java.time.Duration; import java.util.Arrays; import java.util.List; +import java.util.Map; /** * Represents a socket-like interface to a TCP connection on either the @@ -314,5 +314,13 @@ default List peerCertificates() throws SSLPeerUnverifiedException { */ String applicationLayerProtocol(); + /** + * @return the type-length-values present in the TCP header as an iterable of map entries + * where the key contains the TLV type and the value contains the TLV value. + * This is mainly used for HA Proxy Protocol v2 + */ + @GenIgnore() + Iterable> tlvs(); + } diff --git a/vertx-core/src/main/java/io/vertx/core/net/impl/ConnectionBase.java b/vertx-core/src/main/java/io/vertx/core/net/impl/ConnectionBase.java index 2fd760e54e6..2a3214aef0f 100644 --- a/vertx-core/src/main/java/io/vertx/core/net/impl/ConnectionBase.java +++ b/vertx-core/src/main/java/io/vertx/core/net/impl/ConnectionBase.java @@ -19,6 +19,7 @@ import io.netty.util.AttributeKey; import io.netty.util.concurrent.FutureListener; import io.vertx.core.*; +import io.vertx.core.buffer.Buffer; import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.PromiseInternal; import io.vertx.core.internal.VertxInternal; @@ -32,6 +33,8 @@ import javax.net.ssl.SSLSession; import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; /** * Abstract base class for connections managed by a vertx instance. This base implementation does not handle @@ -53,6 +56,7 @@ public abstract class ConnectionBase { public static final VertxException CLOSED_EXCEPTION = NetSocketInternal.CLOSED_EXCEPTION; public static final AttributeKey REMOTE_ADDRESS_OVERRIDE = AttributeKey.valueOf("RemoteAddressOverride"); public static final AttributeKey LOCAL_ADDRESS_OVERRIDE = AttributeKey.valueOf("LocalAddressOverride"); + public static final AttributeKey>> TLVS = AttributeKey.valueOf("TLVS"); private static final Logger log = LoggerFactory.getLogger(ConnectionBase.class); protected final VertxInternal vertx; @@ -426,4 +430,8 @@ public SocketAddress localAddress(boolean real) { } } + public Iterable> tlvs() { + return channel.hasAttr(TLVS) ? channel.attr(TLVS).getAndSet(List.of()) : List.of(); + } + } diff --git a/vertx-core/src/main/java/io/vertx/core/net/impl/HAProxyMessageCompletionHandler.java b/vertx-core/src/main/java/io/vertx/core/net/impl/HAProxyMessageCompletionHandler.java index b93d615daae..9dfa0904674 100644 --- a/vertx-core/src/main/java/io/vertx/core/net/impl/HAProxyMessageCompletionHandler.java +++ b/vertx-core/src/main/java/io/vertx/core/net/impl/HAProxyMessageCompletionHandler.java @@ -1,19 +1,26 @@ package io.vertx.core.net.impl; +import io.netty.buffer.ByteBufUtil; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.handler.codec.haproxy.HAProxyMessage; import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol; +import io.netty.handler.codec.haproxy.HAProxyTLV; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.concurrent.Promise; +import io.vertx.core.buffer.Buffer; import io.vertx.core.internal.logging.Logger; import io.vertx.core.internal.logging.LoggerFactory; import io.vertx.core.net.SocketAddress; import java.io.IOException; +import java.util.AbstractMap; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; public class HAProxyMessageCompletionHandler extends MessageToMessageDecoder { //Public because its used in tests @@ -71,6 +78,11 @@ protected void decode(ChannelHandlerContext ctx, HAProxyMessage msg, List> createTLVs(List haProxyTLVs) { + return haProxyTLVs.stream() + .filter(Objects::nonNull) + .map(tlv -> new AbstractMap.SimpleEntry<>(Buffer.buffer().appendByte(tlv.typeByteValue()), + Buffer.buffer(ByteBufUtil.getBytes(tlv.content())))) + .collect(Collectors.toList()); + } } diff --git a/vertx-core/src/test/java/io/vertx/test/proxy/HAProxy.java b/vertx-core/src/test/java/io/vertx/test/proxy/HAProxy.java index 5af8dccfd6d..c8bd283903e 100644 --- a/vertx-core/src/test/java/io/vertx/test/proxy/HAProxy.java +++ b/vertx-core/src/test/java/io/vertx/test/proxy/HAProxy.java @@ -8,6 +8,7 @@ import java.net.InetAddress; import java.nio.charset.StandardCharsets; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -172,7 +173,7 @@ private static Buffer createIPv4IPv6ProtocolHeader(byte protocolByte, SocketAddr private static Buffer createVersion2ProtocolHeader(byte protocolByte, Buffer address) { - byte[] header = new byte[15]; + byte[] header = new byte[14]; header[0] = 0x0D; // Binary Prefix header[1] = 0x0A; // ----- header[2] = 0x0D; // ----- @@ -189,10 +190,16 @@ private static Buffer createVersion2ProtocolHeader(byte protocolByte, Buffer add header[12] = 0x21; // v2, cmd=PROXY header[13] = protocolByte; - header[14] = 0x00; // Remaining Bytes + + UUID uuid = UUID.fromString("1f29a3b5-7cc4-4592-a8f1-879ff1f47124"); + Buffer uniqueIdTLVBuffer = Buffer.buffer().appendByte((byte) 0x05); // PP2_TYPE_UNIQUE_ID + uniqueIdTLVBuffer.appendUnsignedShort(Long.BYTES*2); // UUID length + uniqueIdTLVBuffer.appendLong(uuid.getMostSignificantBits()); // UUID value + uniqueIdTLVBuffer.appendLong(uuid.getLeastSignificantBits()); return Buffer.buffer(header) - .appendByte((byte) address.length()) - .appendBuffer(address); + .appendUnsignedShort(address.length() + uniqueIdTLVBuffer.length()) + .appendBuffer(address) + .appendBuffer(uniqueIdTLVBuffer); } } diff --git a/vertx-core/src/test/java/io/vertx/tests/http/HAProxyTest.java b/vertx-core/src/test/java/io/vertx/tests/http/HAProxyTest.java index e3fea3379b4..3a6ae3fab57 100644 --- a/vertx-core/src/test/java/io/vertx/tests/http/HAProxyTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/http/HAProxyTest.java @@ -20,8 +20,15 @@ import io.vertx.test.proxy.HAProxy; import org.junit.Test; +import java.util.List; +import java.util.Map; +import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.junit.Assume.assumeTrue; public abstract class HAProxyTest extends HttpTestBase { @@ -158,6 +165,7 @@ private void testHAProxyProtocolAccepted(Buffer header, SocketAddress remote, So proxy.getConnectionRemoteAddress() : local, req.localAddress()); + assertUniqueIdTLV(header, req.connection().tlvs()); req.response().end(); complete(); }); @@ -277,4 +285,22 @@ private void assertAddresses(SocketAddress address1, SocketAddress address2) { assertEquals(address1.port(), address2.port()); } } + + private void assertUniqueIdTLV(Buffer header, Iterable> tlvsIterable) { + // only supported for v2 header and protocol family != unknown + if (header.length() < 12 || header.getByte(12) != 0x21 || header.getByte(13) == 0x00) { + return; + } + + List> tlvs = StreamSupport.stream(tlvsIterable.spliterator(), false) + .collect(Collectors.toList()); + + assertThat(tlvs, hasSize(1)); + + Map.Entry uniqueIdTLV = tlvs.stream().findFirst().orElse(null); + assertThat(uniqueIdTLV.getKey().getByte(0), equalTo((byte)0x05)); + + UUID uuid = new UUID(uniqueIdTLV.getValue().getLong(0), uniqueIdTLV.getValue().getLong(Long.BYTES)); + assertThat(uuid, equalTo(UUID.fromString("1f29a3b5-7cc4-4592-a8f1-879ff1f47124"))); + } }