Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions vertx-core/src/main/java/io/vertx/core/http/HttpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -304,4 +305,14 @@ default List<Certificate> peerCertificates() throws SSLPeerUnverifiedException {
*/
String indicatedServerName();

/**
* @return the type-length-values present in the TCP header as an iterable of map entries
* where the key contains the TLV type and the value contains the TLV value.
* This is mainly used for HA Proxy Protocol v2
*/
@GenIgnore()
default Iterable<Map.Entry<Buffer, Buffer>> tlvs() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the name should be more clear about what is provided, those are coming from HA proxy protocol, e.g. proxyProtocolV2HeaderTLVs seems more descriptive.

Copy link
Author

@damianorenfer damianorenfer Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, should we rename it everywhere ? also in NetSocket etc ?

return List.of();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClosedException;
import io.vertx.core.http.StreamResetException;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SSLOptions;
Expand All @@ -26,6 +25,7 @@

import javax.net.ssl.SSLSession;
import java.time.Duration;
import java.util.Map;

/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
Expand Down Expand Up @@ -229,6 +229,11 @@ public SocketAddress localAddress(boolean real) {
return stream.connection().localAddress(real);
}

@Override
public Iterable<Map.Entry<Buffer, Buffer>> tlvs() {
return stream.connection().tlvs();
}

@Override
public Future<Void> close() {
return end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import java.util.Map;

/**
* An un-pooled HTTP client connection that maintains a queue for pending requests that cannot be served
Expand Down Expand Up @@ -207,6 +207,11 @@ public String indicatedServerName() {
return actual.indicatedServerName();
}

@Override
public Iterable<Map.Entry<Buffer, Buffer>> tlvs() {
return actual.tlvs();
}

/**
* Create an HTTP stream.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@
import io.vertx.core.net.HostAndPort;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.core.spi.metrics.HttpClientMetrics;


import javax.net.ssl.SSLSession;
import java.time.Duration;
import java.util.Map;

/**
* A connection that attempts to perform a protocol upgrade to H2C. The connection might use HTTP/1 or H2C
Expand Down Expand Up @@ -901,6 +902,11 @@ public String indicatedServerName() {
return current.indicatedServerName();
}

@Override
public Iterable<Map.Entry<Buffer, Buffer>> tlvs() {
return current.tlvs();
}

@Override
public String toString() {
return getClass().getSimpleName() + "[current=" + current.getClass().getSimpleName() + "]";
Expand Down
10 changes: 9 additions & 1 deletion vertx-core/src/main/java/io/vertx/core/net/NetSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
Expand All @@ -29,6 +28,7 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
* Represents a socket-like interface to a TCP connection on either the
Expand Down Expand Up @@ -314,5 +314,13 @@ default List<Certificate> peerCertificates() throws SSLPeerUnverifiedException {
*/
String applicationLayerProtocol();

/**
* @return the type-length-values present in the TCP header as an iterable of map entries
* where the key contains the TLV type and the value contains the TLV value.
* This is mainly used for HA Proxy Protocol v2
*/
@GenIgnore()
Iterable<Map.Entry<Buffer, Buffer>> tlvs();

}

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.FutureListener;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.internal.VertxInternal;
Expand All @@ -32,6 +33,8 @@

import javax.net.ssl.SSLSession;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;

/**
* Abstract base class for connections managed by a vertx instance. This base implementation does not handle
Expand All @@ -53,6 +56,7 @@ public abstract class ConnectionBase {
public static final VertxException CLOSED_EXCEPTION = NetSocketInternal.CLOSED_EXCEPTION;
public static final AttributeKey<SocketAddress> REMOTE_ADDRESS_OVERRIDE = AttributeKey.valueOf("RemoteAddressOverride");
public static final AttributeKey<SocketAddress> LOCAL_ADDRESS_OVERRIDE = AttributeKey.valueOf("LocalAddressOverride");
public static final AttributeKey<Iterable<Map.Entry<Buffer, Buffer>>> TLVS = AttributeKey.valueOf("TLVS");
private static final Logger log = LoggerFactory.getLogger(ConnectionBase.class);

protected final VertxInternal vertx;
Expand Down Expand Up @@ -426,4 +430,8 @@ public SocketAddress localAddress(boolean real) {
}
}

public Iterable<Map.Entry<Buffer, Buffer>> tlvs() {
return channel.hasAttr(TLVS) ? channel.attr(TLVS).getAndSet(List.of()) : List.of();
}

}
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
package io.vertx.core.net.impl;

import io.netty.buffer.ByteBufUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
import io.netty.handler.codec.haproxy.HAProxyTLV;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.concurrent.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.net.SocketAddress;

import java.io.IOException;
import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

public class HAProxyMessageCompletionHandler extends MessageToMessageDecoder<HAProxyMessage> {
//Public because its used in tests
Expand Down Expand Up @@ -71,6 +78,11 @@ protected void decode(ChannelHandlerContext ctx, HAProxyMessage msg, List<Object
ctx.channel().attr(ConnectionBase.LOCAL_ADDRESS_OVERRIDE)
.set(createAddress(protocol, msg.destinationAddress(), msg.destinationPort()));
}

if (msg.tlvs() != null) {
ctx.channel().attr(ConnectionBase.TLVS)
.set(createTLVs(msg.tlvs()));
}
}
ctx.pipeline().remove(this);
promise.setSuccess(ctx.channel());
Expand Down Expand Up @@ -103,4 +115,12 @@ private SocketAddress createAddress(HAProxyProxiedProtocol protocol, String sour
throw new IllegalStateException("Should never happen");
}
}

private Iterable<Map.Entry<Buffer, Buffer>> createTLVs(List<HAProxyTLV> haProxyTLVs) {
return haProxyTLVs.stream()
.filter(Objects::nonNull)
.map(tlv -> new AbstractMap.SimpleEntry<>(Buffer.buffer().appendByte(tlv.typeByteValue()),
Buffer.buffer(ByteBufUtil.getBytes(tlv.content()))))
.collect(Collectors.toList());
}
}
15 changes: 11 additions & 4 deletions vertx-core/src/test/java/io/vertx/test/proxy/HAProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
Expand Down Expand Up @@ -172,7 +173,7 @@ private static Buffer createIPv4IPv6ProtocolHeader(byte protocolByte, SocketAddr


private static Buffer createVersion2ProtocolHeader(byte protocolByte, Buffer address) {
byte[] header = new byte[15];
byte[] header = new byte[14];
header[0] = 0x0D; // Binary Prefix
header[1] = 0x0A; // -----
header[2] = 0x0D; // -----
Expand All @@ -189,10 +190,16 @@ private static Buffer createVersion2ProtocolHeader(byte protocolByte, Buffer add
header[12] = 0x21; // v2, cmd=PROXY
header[13] = protocolByte;

header[14] = 0x00; // Remaining Bytes

UUID uuid = UUID.fromString("1f29a3b5-7cc4-4592-a8f1-879ff1f47124");
Buffer uniqueIdTLVBuffer = Buffer.buffer().appendByte((byte) 0x05); // PP2_TYPE_UNIQUE_ID
uniqueIdTLVBuffer.appendUnsignedShort(Long.BYTES*2); // UUID length
uniqueIdTLVBuffer.appendLong(uuid.getMostSignificantBits()); // UUID value
uniqueIdTLVBuffer.appendLong(uuid.getLeastSignificantBits());

return Buffer.buffer(header)
.appendByte((byte) address.length())
.appendBuffer(address);
.appendUnsignedShort(address.length() + uniqueIdTLVBuffer.length())
.appendBuffer(address)
.appendBuffer(uniqueIdTLVBuffer);
}
}
26 changes: 26 additions & 0 deletions vertx-core/src/test/java/io/vertx/tests/http/HAProxyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@
import io.vertx.test.proxy.HAProxy;
import org.junit.Test;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assume.assumeTrue;

public abstract class HAProxyTest extends HttpTestBase {
Expand Down Expand Up @@ -158,6 +165,7 @@ private void testHAProxyProtocolAccepted(Buffer header, SocketAddress remote, So
proxy.getConnectionRemoteAddress() :
local,
req.localAddress());
assertUniqueIdTLV(header, req.connection().tlvs());
req.response().end();
complete();
});
Expand Down Expand Up @@ -277,4 +285,22 @@ private void assertAddresses(SocketAddress address1, SocketAddress address2) {
assertEquals(address1.port(), address2.port());
}
}

private void assertUniqueIdTLV(Buffer header, Iterable<Map.Entry<Buffer, Buffer>> tlvsIterable) {
// only supported for v2 header and protocol family != unknown
if (header.length() < 12 || header.getByte(12) != 0x21 || header.getByte(13) == 0x00) {
return;
}

List<Map.Entry<Buffer, Buffer>> tlvs = StreamSupport.stream(tlvsIterable.spliterator(), false)
.collect(Collectors.toList());

assertThat(tlvs, hasSize(1));

Map.Entry<Buffer, Buffer> uniqueIdTLV = tlvs.stream().findFirst().orElse(null);
assertThat(uniqueIdTLV.getKey().getByte(0), equalTo((byte)0x05));

UUID uuid = new UUID(uniqueIdTLV.getValue().getLong(0), uniqueIdTLV.getValue().getLong(Long.BYTES));
assertThat(uuid, equalTo(UUID.fromString("1f29a3b5-7cc4-4592-a8f1-879ff1f47124")));
}
}
Loading