Skip to content

Commit 939e778

Browse files
authored
Merge pull request #5992 from eclipse-vertx/alt-svc-improvements
HTTP alternative services improvements
2 parents 9c4935b + c7f29ab commit 939e778

16 files changed

Lines changed: 497 additions & 290 deletions

vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java

Lines changed: 104 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.ArrayList;
3636
import java.util.List;
3737
import java.util.Objects;
38+
import java.util.Set;
3839
import java.util.concurrent.atomic.AtomicInteger;
3940
import java.util.function.Function;
4041
import java.util.regex.Pattern;
@@ -100,7 +101,7 @@ public class HttpClientImpl extends HttpClientBase implements HttpClientInternal
100101

101102
this.tcpTransport = tcpTransport;
102103
this.quicTransport = quicTransport;
103-
this.originEndpoints = new OriginResolver<>(vertx, resolveAll);
104+
this.originEndpoints = new OriginResolver<>(vertx, resolveAll, this);
104105
this.resolver = (EndpointResolverInternal) resolver;
105106
this.originResolver = new EndpointResolverImpl<>(vertx, originEndpoints, resolveAll ? loadBalancer : LoadBalancer.FIRST, resolverKeepAlive.toMillis());
106107
this.poolOptions = poolOptions;
@@ -187,9 +188,10 @@ private Function<EndpointKey, SharedHttpClientConnectionGroup> httpEndpointProvi
187188
ClientMetrics clientMetrics = HttpClientImpl.this.httpMetrics != null ? HttpClientImpl.this.httpMetrics.createEndpointMetrics(address, maxPoolSize) : null;
188189
PoolMetrics poolMetrics = HttpClientImpl.this.httpMetrics != null ? vertx.metrics().createPoolMetrics("http", key.authority.toString(), maxPoolSize) : null;
189190
ProxyOptions proxyOptions = key.proxyOptions;
191+
ClientSSLOptions sslOptions = key.sslOptions;
190192
if (proxyOptions != null && !key.ssl && proxyOptions.getType() == ProxyType.HTTP) {
191193
SocketAddress server = SocketAddress.inetSocketAddress(proxyOptions.getPort(), proxyOptions.getHost());
192-
key = new EndpointKey(key.ssl, key.protocol, key.sslOptions, proxyOptions, server, key.authority);
194+
key = new EndpointKey(key.ssl, key.protocol, sslOptions, proxyOptions, server, key.authority);
193195
proxyOptions = null;
194196
}
195197
HttpVersion protocol = key.protocol;
@@ -199,7 +201,7 @@ private Function<EndpointKey, SharedHttpClientConnectionGroup> httpEndpointProvi
199201
} else {
200202
protocols = List.of(protocol);
201203
}
202-
HttpConnectParams params = new HttpConnectParams(protocols, key.sslOptions, proxyOptions, key.ssl);
204+
HttpConnectParams params = new HttpConnectParams(protocols, sslOptions, proxyOptions, key.ssl);
203205
Function<SharedHttpClientConnectionGroup, SharedHttpClientConnectionGroup.Pool> p = group -> {
204206
int queueMaxSize = poolOptions.getMaxWaitQueueSize();
205207
int http1MaxSize = poolOptions.getHttp1MaxSize();
@@ -220,7 +222,7 @@ private Function<EndpointKey, SharedHttpClientConnectionGroup> httpEndpointProvi
220222
if (altSvc instanceof AltSvc.Clear) {
221223
originEndpoints.clearAlternatives(evt.origin);
222224
} else if (altSvc instanceof AltSvc.ListOfValue) {
223-
originEndpoints.updateAlternatives(evt.origin, (AltSvc.ListOfValue)altSvc);
225+
originEndpoints.updateAlternatives(sslOptions, evt.origin, (AltSvc.ListOfValue)altSvc);
224226
}
225227
});
226228
}
@@ -593,17 +595,49 @@ private Future<HttpClientRequest> doRequest(
593595
// For HTTPS we must handle SNI to consider an alternative
594596
HostAndPort altUsed;
595597
if (followAlternativeServices && server instanceof Origin && ("https".equals((originServer = (Origin)server).scheme) && originServer.host.indexOf('.') > 0)) {
596-
lookup = endpoint.selectServer(s -> {
597-
OriginServer unwrap = (OriginServer) s.unwrap();
598-
return protocol_ == unwrap.protocol;
599-
});
600-
protocol = protocol_;
598+
if (protocol_ != null) {
599+
ProtocolFilter filter;
600+
switch (protocol_) {
601+
case H3:
602+
filter = ProtocolFilter.H3;
603+
break;
604+
case H2:
605+
filter = ProtocolFilter.H2;
606+
break;
607+
case HTTP_1_1:
608+
filter = ProtocolFilter.HTTP_1_1;
609+
break;
610+
case HTTP_1_0:
611+
filter = ProtocolFilter.HTTP_1_0;
612+
break;
613+
default:
614+
throw new AssertionError();
615+
}
616+
lookup = endpoint.selectServer(filter);
617+
protocol = protocol_;
618+
} else {
619+
Set<String> protocols = endpoint.protocols();
620+
if (!protocols.isEmpty()) {
621+
List<ProtocolFilter> list = List.of(ProtocolFilter.H3, ProtocolFilter.H2, ProtocolFilter.HTTP_1_1, ProtocolFilter.HTTP_1_0);
622+
lookup = null;
623+
protocol = null;
624+
for (ProtocolFilter candidate : list) {
625+
if (protocols.contains(candidate.protocol.id())) {
626+
lookup = endpoint.selectServer(candidate);
627+
protocol = candidate.protocol;
628+
}
629+
}
630+
} else {
631+
lookup = null;
632+
protocol = null;
633+
}
634+
}
601635
if (lookup == null) {
602636
altUsed = null;
603637
lookup = endpoint.selectServer();
604638
} else {
605639
OriginServer unwrap = (OriginServer) lookup.unwrap();
606-
altUsed = unwrap.authority;
640+
altUsed = unwrap.primary ? null : unwrap.authority;
607641
}
608642
} else {
609643
protocol = protocol_;
@@ -615,23 +649,32 @@ private Future<HttpClientRequest> doRequest(
615649
throw new IllegalStateException("No results for " + server);
616650
}
617651
SocketAddress address = lookup2.address();
618-
EndpointKey key = new EndpointKey(useSSL, protocol != null ? protocol.version() : null, sslOptions, null, address, authority != null ? authority : HostAndPort.create(address.host(), address.port()));
619-
return resourceManager.withResourceAsync(key, httpEndpointProvider(followAlternativeServices && useSSL, transport), (e, created) -> {
620-
Future<Lease<HttpClientConnection>> fut2 = e.requestConnection(streamCtx, connectTimeout);
621-
ServerInteraction endpointRequest = lookup2.newInteraction();
622-
return fut2.andThen(ar -> {
623-
if (ar.failed()) {
624-
endpointRequest.reportFailure(ar.cause());
652+
return getPool(
653+
followAlternativeServices && useSSL && altUsed == null,
654+
useSSL,
655+
protocol,
656+
sslOptions,
657+
address,
658+
authority != null ? authority : HostAndPort.create(address.host(), address.port()),
659+
new Function<SharedHttpClientConnectionGroup, Future<ConnectionObtainedResult>>() {
660+
@Override
661+
public Future<ConnectionObtainedResult> apply(SharedHttpClientConnectionGroup pool) {
662+
Future<Lease<HttpClientConnection>> fut2 = pool.requestConnection(streamCtx, connectTimeout);
663+
ServerInteraction endpointRequest = lookup2.newInteraction();
664+
return fut2.andThen(ar -> {
665+
if (ar.failed()) {
666+
endpointRequest.reportFailure(ar.cause());
667+
}
668+
}).compose(lease -> {
669+
HttpClientConnection conn = lease.get();
670+
return conn.createStream(streamCtx).map(stream -> {
671+
HttpClientStream wrapped = new StatisticsGatheringHttpClientStream(stream, endpointRequest);
672+
wrapped.closeHandler(v -> lease.recycle());
673+
return new ConnectionObtainedResult(wrapped, lease, altUsed);
674+
});
675+
});
625676
}
626-
}).compose(lease -> {
627-
HttpClientConnection conn = lease.get();
628-
return conn.createStream(streamCtx).map(stream -> {
629-
HttpClientStream wrapped = new StatisticsGatheringHttpClientStream(stream, endpointRequest);
630-
wrapped.closeHandler(v -> lease.recycle());
631-
return new ConnectionObtainedResult(wrapped, lease, altUsed);
632-
});
633677
});
634-
});
635678
});
636679
if (future == null) {
637680
// I think this is not possible - so remove it
@@ -641,6 +684,42 @@ private Future<HttpClientRequest> doRequest(
641684
}
642685
}
643686

687+
Future<?> checkConnect(OriginServer primary, OriginAlternative alternative, OriginServer server, ClientSSLOptions sslOptions) {
688+
return getPool(false, true, alternative.protocol, sslOptions, server.address, primary.authority, new Function<SharedHttpClientConnectionGroup, Future<Boolean>>() {
689+
@Override
690+
public Future<Boolean> apply(SharedHttpClientConnectionGroup group) {
691+
if (group.size() > 0) {
692+
return Future.succeededFuture();
693+
} else {
694+
// Get something better
695+
Future<Lease<HttpClientConnection>> f = group.requestConnection(vertx.getOrCreateContext(), 10_000);
696+
return f.map(lease -> {
697+
lease.recycle();
698+
return null;
699+
});
700+
}
701+
}
702+
});
703+
}
704+
705+
<T> Future<T> getPool(boolean resolveOrigin,
706+
boolean useSSL,
707+
HttpProtocol protocol,
708+
ClientSSLOptions sslOptions,
709+
SocketAddress server,
710+
HostAndPort authority,
711+
Function<SharedHttpClientConnectionGroup, Future<T>> function) {
712+
EndpointKey key = new EndpointKey(useSSL, protocol != null ? protocol.version() : null, sslOptions, null, server, authority);
713+
HttpClientTransport transport;
714+
if (protocol != null && protocol.version() == HttpVersion.HTTP_3) {
715+
transport = quicTransport;
716+
} else {
717+
transport = tcpTransport;
718+
}
719+
Function<EndpointKey, SharedHttpClientConnectionGroup> provider = httpEndpointProvider(resolveOrigin, transport);
720+
return resourceManager.withResourceAsync(key, provider, (group, created) -> function.apply(group));
721+
}
722+
644723
private Future<HttpClientRequest> wrap(HttpMethod method,
645724
String requestURI,
646725
MultiMap headers,

vertx-core/src/main/java/io/vertx/core/http/impl/OriginEndpoint.java

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,16 @@ public class OriginEndpoint<L> {
3131
final L list;
3232
private final Map<OriginAlternative, OriginServer> alternatives;
3333

34-
Map<OriginAlternative, Long> update;
35-
private volatile boolean valid;
34+
Map<OriginAlternative, OriginServer> updates;
35+
volatile boolean valid;
3636

3737
OriginEndpoint(Origin origin, OriginServer primary, EndpointBuilder<L, OriginServer> builder, Map<OriginAlternative, OriginServer> alternatives) {
3838
this(origin, List.of(primary), builder, alternatives);
3939
}
4040

4141
OriginEndpoint(Origin origin, List<OriginServer> primaries, EndpointBuilder<L, OriginServer> builder, Map<OriginAlternative, OriginServer> alternatives) {
4242

43-
L list = refresh(builder, primaries, alternatives);
43+
L list = buildListOfServers(builder, primaries, alternatives);
4444

4545
this.timestamp = System.currentTimeMillis();
4646
this.primary = primaries.get(0);
@@ -49,15 +49,20 @@ public class OriginEndpoint<L> {
4949
this.builder = builder;
5050
this.alternatives = alternatives;
5151
this.list = list;
52+
this.updates = Collections.emptyMap();
5253
this.valid = true;
5354
}
5455

55-
private L refresh(EndpointBuilder<L, OriginServer> builder, List<OriginServer> primaries, Map<OriginAlternative, OriginServer> alternatives) {
56+
private L buildListOfServers(EndpointBuilder<L, OriginServer> builder, List<OriginServer> primaries, Map<OriginAlternative, OriginServer> alternatives) {
5657
for (OriginServer primary : primaries) {
57-
builder = builder.addServer(primary);
58+
if (primary.available) {
59+
builder = builder.addServer(primary);
60+
}
5861
}
5962
for (OriginServer alternativeServer : alternatives.values()) {
60-
builder.addServer(alternativeServer);
63+
if (alternativeServer.available) {
64+
builder.addServer(alternativeServer);
65+
}
6166
}
6267
return builder.build();
6368
}
@@ -75,14 +80,14 @@ boolean validate() {
7580
return valid;
7681
}
7782

78-
void clearAlternatives() {
79-
update = Collections.emptyMap();
80-
valid = alternatives.isEmpty();
83+
void updateAlternatives(Map<OriginAlternative, OriginServer> updates) {
84+
this.updates = updates;
85+
this.valid = false;
8186
}
8287

83-
void updateAlternatives(AltSvc.ListOfValue altSvc) {
88+
Map<OriginAlternative, Long> shouldRefresh(AltSvc.ListOfValue altSvc) {
8489
long now = System.currentTimeMillis();
85-
Map<OriginAlternative, Long> list = new LinkedHashMap<>();
90+
Map<OriginAlternative, Long> updates = new LinkedHashMap<>();
8691
boolean valid = true;
8792
for (AltSvc.Value altSvcValue : altSvc) {
8893
HttpProtocol protocol = HttpProtocol.fromId(altSvcValue.protocolId());
@@ -113,18 +118,15 @@ void updateAlternatives(AltSvc.ListOfValue altSvc) {
113118
long value = now + maxAge * 1000 / 2;
114119
valid = (value < alternativeCachedExpiration);
115120
}
116-
list.put(alternative, maxAge);
121+
updates.put(alternative, maxAge);
117122
}
118123
}
119124
if (valid) {
120125
// 1. check now we don't have extra unwanted keys
121126
for (OriginAlternative alternative : alternatives.keySet()) {
122-
valid &= list.containsKey(alternative);
127+
valid &= updates.containsKey(alternative);
123128
}
124129
}
125-
if (!valid) {
126-
this.update = list;
127-
this.valid = false;
128-
}
130+
return valid ? null : updates;
129131
}
130132
}

0 commit comments

Comments
 (0)