diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java index 1348e2d960c..24b86535394 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java @@ -35,6 +35,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.regex.Pattern; @@ -100,7 +101,7 @@ public class HttpClientImpl extends HttpClientBase implements HttpClientInternal this.tcpTransport = tcpTransport; this.quicTransport = quicTransport; - this.originEndpoints = new OriginResolver<>(vertx, resolveAll); + this.originEndpoints = new OriginResolver<>(vertx, resolveAll, this); this.resolver = (EndpointResolverInternal) resolver; this.originResolver = new EndpointResolverImpl<>(vertx, originEndpoints, resolveAll ? loadBalancer : LoadBalancer.FIRST, resolverKeepAlive.toMillis()); this.poolOptions = poolOptions; @@ -187,9 +188,10 @@ private Function httpEndpointProvi ClientMetrics clientMetrics = HttpClientImpl.this.httpMetrics != null ? HttpClientImpl.this.httpMetrics.createEndpointMetrics(address, maxPoolSize) : null; PoolMetrics poolMetrics = HttpClientImpl.this.httpMetrics != null ? vertx.metrics().createPoolMetrics("http", key.authority.toString(), maxPoolSize) : null; ProxyOptions proxyOptions = key.proxyOptions; + ClientSSLOptions sslOptions = key.sslOptions; if (proxyOptions != null && !key.ssl && proxyOptions.getType() == ProxyType.HTTP) { SocketAddress server = SocketAddress.inetSocketAddress(proxyOptions.getPort(), proxyOptions.getHost()); - key = new EndpointKey(key.ssl, key.protocol, key.sslOptions, proxyOptions, server, key.authority); + key = new EndpointKey(key.ssl, key.protocol, sslOptions, proxyOptions, server, key.authority); proxyOptions = null; } HttpVersion protocol = key.protocol; @@ -199,7 +201,7 @@ private Function httpEndpointProvi } else { protocols = List.of(protocol); } - HttpConnectParams params = new HttpConnectParams(protocols, key.sslOptions, proxyOptions, key.ssl); + HttpConnectParams params = new HttpConnectParams(protocols, sslOptions, proxyOptions, key.ssl); Function p = group -> { int queueMaxSize = poolOptions.getMaxWaitQueueSize(); int http1MaxSize = poolOptions.getHttp1MaxSize(); @@ -220,7 +222,7 @@ private Function httpEndpointProvi if (altSvc instanceof AltSvc.Clear) { originEndpoints.clearAlternatives(evt.origin); } else if (altSvc instanceof AltSvc.ListOfValue) { - originEndpoints.updateAlternatives(evt.origin, (AltSvc.ListOfValue)altSvc); + originEndpoints.updateAlternatives(sslOptions, evt.origin, (AltSvc.ListOfValue)altSvc); } }); } @@ -593,17 +595,49 @@ private Future doRequest( // For HTTPS we must handle SNI to consider an alternative HostAndPort altUsed; if (followAlternativeServices && server instanceof Origin && ("https".equals((originServer = (Origin)server).scheme) && originServer.host.indexOf('.') > 0)) { - lookup = endpoint.selectServer(s -> { - OriginServer unwrap = (OriginServer) s.unwrap(); - return protocol_ == unwrap.protocol; - }); - protocol = protocol_; + if (protocol_ != null) { + ProtocolFilter filter; + switch (protocol_) { + case H3: + filter = ProtocolFilter.H3; + break; + case H2: + filter = ProtocolFilter.H2; + break; + case HTTP_1_1: + filter = ProtocolFilter.HTTP_1_1; + break; + case HTTP_1_0: + filter = ProtocolFilter.HTTP_1_0; + break; + default: + throw new AssertionError(); + } + lookup = endpoint.selectServer(filter); + protocol = protocol_; + } else { + Set protocols = endpoint.protocols(); + if (!protocols.isEmpty()) { + List list = List.of(ProtocolFilter.H3, ProtocolFilter.H2, ProtocolFilter.HTTP_1_1, ProtocolFilter.HTTP_1_0); + lookup = null; + protocol = null; + for (ProtocolFilter candidate : list) { + if (protocols.contains(candidate.protocol.id())) { + lookup = endpoint.selectServer(candidate); + protocol = candidate.protocol; + } + } + } else { + lookup = null; + protocol = null; + } + } if (lookup == null) { altUsed = null; lookup = endpoint.selectServer(); } else { OriginServer unwrap = (OriginServer) lookup.unwrap(); - altUsed = unwrap.authority; + altUsed = unwrap.primary ? null : unwrap.authority; } } else { protocol = protocol_; @@ -615,23 +649,32 @@ private Future doRequest( throw new IllegalStateException("No results for " + server); } SocketAddress address = lookup2.address(); - EndpointKey key = new EndpointKey(useSSL, protocol != null ? protocol.version() : null, sslOptions, null, address, authority != null ? authority : HostAndPort.create(address.host(), address.port())); - return resourceManager.withResourceAsync(key, httpEndpointProvider(followAlternativeServices && useSSL, transport), (e, created) -> { - Future> fut2 = e.requestConnection(streamCtx, connectTimeout); - ServerInteraction endpointRequest = lookup2.newInteraction(); - return fut2.andThen(ar -> { - if (ar.failed()) { - endpointRequest.reportFailure(ar.cause()); + return getPool( + followAlternativeServices && useSSL && altUsed == null, + useSSL, + protocol, + sslOptions, + address, + authority != null ? authority : HostAndPort.create(address.host(), address.port()), + new Function>() { + @Override + public Future apply(SharedHttpClientConnectionGroup pool) { + Future> fut2 = pool.requestConnection(streamCtx, connectTimeout); + ServerInteraction endpointRequest = lookup2.newInteraction(); + return fut2.andThen(ar -> { + if (ar.failed()) { + endpointRequest.reportFailure(ar.cause()); + } + }).compose(lease -> { + HttpClientConnection conn = lease.get(); + return conn.createStream(streamCtx).map(stream -> { + HttpClientStream wrapped = new StatisticsGatheringHttpClientStream(stream, endpointRequest); + wrapped.closeHandler(v -> lease.recycle()); + return new ConnectionObtainedResult(wrapped, lease, altUsed); + }); + }); } - }).compose(lease -> { - HttpClientConnection conn = lease.get(); - return conn.createStream(streamCtx).map(stream -> { - HttpClientStream wrapped = new StatisticsGatheringHttpClientStream(stream, endpointRequest); - wrapped.closeHandler(v -> lease.recycle()); - return new ConnectionObtainedResult(wrapped, lease, altUsed); - }); }); - }); }); if (future == null) { // I think this is not possible - so remove it @@ -641,6 +684,42 @@ private Future doRequest( } } + Future checkConnect(OriginServer primary, OriginAlternative alternative, OriginServer server, ClientSSLOptions sslOptions) { + return getPool(false, true, alternative.protocol, sslOptions, server.address, primary.authority, new Function>() { + @Override + public Future apply(SharedHttpClientConnectionGroup group) { + if (group.size() > 0) { + return Future.succeededFuture(); + } else { + // Get something better + Future> f = group.requestConnection(vertx.getOrCreateContext(), 10_000); + return f.map(lease -> { + lease.recycle(); + return null; + }); + } + } + }); + } + + Future getPool(boolean resolveOrigin, + boolean useSSL, + HttpProtocol protocol, + ClientSSLOptions sslOptions, + SocketAddress server, + HostAndPort authority, + Function> function) { + EndpointKey key = new EndpointKey(useSSL, protocol != null ? protocol.version() : null, sslOptions, null, server, authority); + HttpClientTransport transport; + if (protocol != null && protocol.version() == HttpVersion.HTTP_3) { + transport = quicTransport; + } else { + transport = tcpTransport; + } + Function provider = httpEndpointProvider(resolveOrigin, transport); + return resourceManager.withResourceAsync(key, provider, (group, created) -> function.apply(group)); + } + private Future wrap(HttpMethod method, String requestURI, MultiMap headers, diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/OriginEndpoint.java b/vertx-core/src/main/java/io/vertx/core/http/impl/OriginEndpoint.java index 7d3ddf1a55b..f0d2b71f6a9 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/OriginEndpoint.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/OriginEndpoint.java @@ -31,8 +31,8 @@ public class OriginEndpoint { final L list; private final Map alternatives; - Map update; - private volatile boolean valid; + Map updates; + volatile boolean valid; OriginEndpoint(Origin origin, OriginServer primary, EndpointBuilder builder, Map alternatives) { this(origin, List.of(primary), builder, alternatives); @@ -40,7 +40,7 @@ public class OriginEndpoint { OriginEndpoint(Origin origin, List primaries, EndpointBuilder builder, Map alternatives) { - L list = refresh(builder, primaries, alternatives); + L list = buildListOfServers(builder, primaries, alternatives); this.timestamp = System.currentTimeMillis(); this.primary = primaries.get(0); @@ -49,15 +49,20 @@ public class OriginEndpoint { this.builder = builder; this.alternatives = alternatives; this.list = list; + this.updates = Collections.emptyMap(); this.valid = true; } - private L refresh(EndpointBuilder builder, List primaries, Map alternatives) { + private L buildListOfServers(EndpointBuilder builder, List primaries, Map alternatives) { for (OriginServer primary : primaries) { - builder = builder.addServer(primary); + if (primary.available) { + builder = builder.addServer(primary); + } } for (OriginServer alternativeServer : alternatives.values()) { - builder.addServer(alternativeServer); + if (alternativeServer.available) { + builder.addServer(alternativeServer); + } } return builder.build(); } @@ -75,14 +80,14 @@ boolean validate() { return valid; } - void clearAlternatives() { - update = Collections.emptyMap(); - valid = alternatives.isEmpty(); + void updateAlternatives(Map updates) { + this.updates = updates; + this.valid = false; } - void updateAlternatives(AltSvc.ListOfValue altSvc) { + Map shouldRefresh(AltSvc.ListOfValue altSvc) { long now = System.currentTimeMillis(); - Map list = new LinkedHashMap<>(); + Map updates = new LinkedHashMap<>(); boolean valid = true; for (AltSvc.Value altSvcValue : altSvc) { HttpProtocol protocol = HttpProtocol.fromId(altSvcValue.protocolId()); @@ -113,18 +118,15 @@ void updateAlternatives(AltSvc.ListOfValue altSvc) { long value = now + maxAge * 1000 / 2; valid = (value < alternativeCachedExpiration); } - list.put(alternative, maxAge); + updates.put(alternative, maxAge); } } if (valid) { // 1. check now we don't have extra unwanted keys for (OriginAlternative alternative : alternatives.keySet()) { - valid &= list.containsKey(alternative); + valid &= updates.containsKey(alternative); } } - if (!valid) { - this.update = list; - this.valid = false; - } + return valid ? null : updates; } } diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/OriginResolver.java b/vertx-core/src/main/java/io/vertx/core/http/impl/OriginResolver.java index 7c91b221d40..4afdb78fa8e 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/OriginResolver.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/OriginResolver.java @@ -11,11 +11,14 @@ package io.vertx.core.http.impl; import io.vertx.core.Completable; +import io.vertx.core.CompositeFuture; import io.vertx.core.Future; import io.vertx.core.Promise; +import io.vertx.core.http.HttpProtocol; import io.vertx.core.internal.VertxInternal; import io.vertx.core.internal.resolver.NameResolver; import io.vertx.core.net.Address; +import io.vertx.core.net.ClientSSLOptions; import io.vertx.core.net.HostAndPort; import io.vertx.core.net.SocketAddress; import io.vertx.core.net.impl.SocketAddressImpl; @@ -55,29 +58,160 @@ public class OriginResolver implements EndpointResolver, L> { private final VertxInternal vertx; + private final HttpClientImpl client; private final ConcurrentMap> endpoints; private final boolean resolveAll; - public OriginResolver(VertxInternal vertx, boolean resolveAll) { + public OriginResolver(VertxInternal vertx, boolean resolveAll, HttpClientImpl client) { this.vertx = vertx; this.endpoints = new ConcurrentHashMap<>(); this.resolveAll = resolveAll; + this.client = client; } public void clearAlternatives(Origin origin) { OriginEndpoint endpoint = endpoints.get(origin); if (endpoint != null) { - endpoint.clearAlternatives(); + endpoint.updateAlternatives(Collections.emptyMap()); } } - public void updateAlternatives(Origin origin, AltSvc.ListOfValue altSvc) { + public void updateAlternatives(ClientSSLOptions sslOptions, Origin origin, AltSvc.ListOfValue altSvc) { OriginEndpoint endpoint = endpoints.get(origin); if (endpoint != null) { - endpoint.updateAlternatives(altSvc); + Map updates = endpoint.shouldRefresh(altSvc); + if (updates != null) { + Future> result = compute(sslOptions, origin, endpoint.primary, updates); + result.onComplete(ar -> { + if (ar.succeeded()) { + endpoint.updateAlternatives(ar.result()); + } + }); + } } } + private Future> compute( + ClientSSLOptions sslOptions, + Origin address, + OriginServer primary, + Map updates) { + + class Resolution extends LinkedHashMap { + final String host; + public Resolution(String host) { + this.host = host; + } + } + + // Keep order + Map alternatives = new LinkedHashMap<>(); + Map hosts = new HashMap<>(); + for (Map.Entry entry : updates.entrySet()) { + // Already resolved + OriginAlternative alternative = entry.getKey(); + long maxAge = entry.getValue(); + if (alternative.authority.host().equals(address.host)) { + alternative = new OriginAlternative( + alternative.protocol, + HostAndPort.authority(address.host, alternative.authority.port())); + alternatives.put(alternative, new OriginServer( + false, + alternative.protocol, + alternative.authority, + SocketAddress.inetSocketAddress(new InetSocketAddress(((SocketAddressImpl)primary.address).ipAddress(), alternative.authority.port())), + maxAge)); + } else { + Resolution resolution = hosts.get(alternative.authority.host()); + if (resolution == null) { + resolution = new Resolution(alternative.authority.host()); + hosts.put(alternative.authority.host(), resolution); + } + resolution.put(alternative, maxAge); + } + } + + List resolutions = new ArrayList<>(hosts.values()); + int size = hosts.size(); + List> list = new ArrayList<>(size); + for (Resolution resolution : resolutions) { + Future fut = vertx + .nameResolver() + .resolve(resolution.host); + list.add(fut); + } + + Promise promise = Promise.promise(); + + if (size > 0) { + AtomicInteger count = new AtomicInteger(); + Completable joiner = (result, failure) -> { + if (count.incrementAndGet() == size) { + for (int i = 0; i < size; i++) { + Resolution r = resolutions.get(i); + Future f = list.get(i); + if (f.succeeded()) { + for (Map.Entry entry : r.entrySet()) { + OriginAlternative alternative = entry.getKey(); + long maxAge = entry.getValue(); + alternatives.put(alternative, + new OriginServer( + false, + alternative.protocol, + alternative.authority, + SocketAddress.inetSocketAddress(new InetSocketAddress(f.result(), alternative.authority.port())), + maxAge) + ); + } + } + } + promise.succeed(); + } + }; + for (Future f : list) { + f.onComplete(joiner); + } + } else { + promise.succeed(); + } + + Promise> p2 = Promise.promise(); + promise + .future() + .onComplete(ar -> { + if (ar.succeeded()) { + + + List> l = new ArrayList<>(); + for (Map.Entry e : alternatives.entrySet()) { + Future f = client.checkConnect(primary, e.getKey(), e.getValue(), sslOptions); + l.add(f); + } + + CompositeFuture cf = Future.join(l); + cf.onComplete(ar2 -> { + + Iterator> it = alternatives.entrySet().iterator(); + Map newOne = new LinkedHashMap<>(); + int idx = 0; + while (it.hasNext()) { + Map.Entry entry = it.next(); + newOne.put(entry.getKey(), entry.getValue()); + entry.getValue().available = cf.succeeded(idx++); + } + p2.complete(newOne); + }); + + } else { + p2.fail(ar.cause()); + } + }); + + // Now validate + + return p2.future(); + } + @Override public Origin tryCast(Address address) { return address instanceof Origin ? (Origin)address : null; @@ -88,6 +222,12 @@ public SocketAddress addressOf(OriginServer server) { return server.address != null ? server.address : null; } + @Override + public String protocolOf(OriginServer server) { + HttpProtocol protocol = server.protocol; + return protocol != null ? protocol.id() : null; + } + @Override public Future> resolve(Origin address, EndpointBuilder builder) { NameResolver resolver = vertx @@ -127,104 +267,11 @@ public boolean isValid(OriginEndpoint state) { return state.validate(); } - @Override - public boolean isAvailable(OriginServer endpoint) { - return endpoint.primary || endpoint.connectFailures.get() == 0; - } - - @Override - public void reportFailure(OriginServer endpoint, Throwable failure) { - endpoint.connectFailures.incrementAndGet(); - } - @Override public Future> refresh(Origin address, OriginEndpoint state) { - - Map update = state.update; - if (update == null || update.isEmpty()) { - endpoints.remove(address, state); - return null; - } - - class Resolution { - String host; - Map alternatives = new LinkedHashMap<>(); - } - - // Maintain order - Map alternatives = new LinkedHashMap<>(); - Map hosts = new HashMap<>(); - for (Map.Entry entry : update.entrySet()) { - // Already resolved - OriginAlternative alternative = entry.getKey(); - long maxAge = entry.getValue(); - if (alternative.authority.host().equals(address.host)) { - alternative = new OriginAlternative( - alternative.protocol, - HostAndPort.authority(address.host, alternative.authority.port())); - alternatives.put(alternative, new OriginServer( - false, - alternative.protocol, - alternative.authority, - SocketAddress.inetSocketAddress(new InetSocketAddress(((SocketAddressImpl)state.primary.address).ipAddress(), alternative.authority.port())), - maxAge)); - } else { - Resolution resolution = hosts.get(alternative.authority.host()); - if (resolution == null) { - resolution = new Resolution(); - resolution.host = alternative.authority.host(); - hosts.put(alternative.authority.host(), resolution); - } - resolution.alternatives.put(alternative, maxAge); - } - } - int size = hosts.size(); - if (size == 0) { - OriginEndpoint endpoint = new OriginEndpoint<>(address, state.primary, state.builder, alternatives); - endpoints.put(address, endpoint); - return Future.succeededFuture(endpoint); - } - - List resolutions = new ArrayList<>(hosts.values()); - List> list = new ArrayList<>(size); - for (Resolution resolution : resolutions) { - Future fut = vertx.nameResolver().resolve(resolution.host); - list.add(fut); - } - - Promise> promise = Promise.promise(); - - AtomicInteger count = new AtomicInteger(); - Completable joiner = (result, failure) -> { - if (count.incrementAndGet() == size) { - for (int i = 0;i < size;i++) { - Resolution r = resolutions.get(i); - Future f = list.get(i); - for (Map.Entry entry : r.alternatives.entrySet()) { - if (f.succeeded()) { - OriginAlternative alternative = entry.getKey(); - long maxAge = entry.getValue(); - alternatives.put(alternative, - new OriginServer( - false, - alternative.protocol, - alternative.authority, - SocketAddress.inetSocketAddress(new InetSocketAddress(f.result(), alternative.authority.port())), - maxAge) - ); - } - } - } - - OriginEndpoint endpoint = new OriginEndpoint<>(address, state.primary, state.builder, alternatives); - endpoints.put(address, endpoint); - promise.complete(endpoint); - } - }; - for (Future f : list) { - f.onComplete(joiner); - } - return promise.future(); + OriginEndpoint endpoint = new OriginEndpoint<>(address, state.primary, state.builder, state.updates); + endpoints.put(address, endpoint); + return Future.succeededFuture(endpoint); } @Override diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/OriginServer.java b/vertx-core/src/main/java/io/vertx/core/http/impl/OriginServer.java index 95f4db91448..59d8dde51ac 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/OriginServer.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/OriginServer.java @@ -14,8 +14,6 @@ import io.vertx.core.net.HostAndPort; import io.vertx.core.net.SocketAddress; -import java.util.concurrent.atomic.AtomicInteger; - /** * An origin server. * @@ -28,7 +26,7 @@ public class OriginServer { public final HostAndPort authority; // the alt-authority field public final SocketAddress address; // the server socket address public final long maxAge; - public final AtomicInteger connectFailures; + public boolean available; public OriginServer(boolean primary, HttpProtocol protocol, HostAndPort authority, SocketAddress address, long maxAge) { this.primary = primary; @@ -36,11 +34,12 @@ public OriginServer(boolean primary, HttpProtocol protocol, HostAndPort authorit this.authority = authority; this.address = address; this.maxAge = maxAge; - this.connectFailures = new AtomicInteger(); + this.available = true; } @Override public String toString() { - return "OriginServer[protocol=" + protocol + ",authority=" + authority + ",address=" + address + ",maxAge=" + maxAge + "]"; + return "OriginServer[protocol=" + protocol + ",authority=" + authority + ",address=" + address + + ",maxAge=" + maxAge + ",available=" + available + "]"; } } diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/ProtocolFilter.java b/vertx-core/src/main/java/io/vertx/core/http/impl/ProtocolFilter.java new file mode 100644 index 00000000000..2f225b4d723 --- /dev/null +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/ProtocolFilter.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2011-2026 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.http.impl; + +import io.vertx.core.http.HttpProtocol; +import io.vertx.core.net.endpoint.ServerEndpoint; + +import java.util.function.Predicate; + +/** + * Protocol filters, those can safely be used as endpoint view keys. + * + * @author Julien Viet + */ +enum ProtocolFilter implements Predicate { + + H3(HttpProtocol.H3), + H2(HttpProtocol.H2), + HTTP_1_1(HttpProtocol.HTTP_1_1), + HTTP_1_0(HttpProtocol.HTTP_1_0); + + final HttpProtocol protocol; + + ProtocolFilter(HttpProtocol protocol) { + this.protocol = protocol; + } + + @Override + public boolean test(ServerEndpoint serverEndpoint) { + return protocol.id().equals(serverEndpoint.protocolId()); + } +} diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/SharedHttpClientConnectionGroup.java b/vertx-core/src/main/java/io/vertx/core/http/impl/SharedHttpClientConnectionGroup.java index ac07f47f166..84e4b31f337 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/SharedHttpClientConnectionGroup.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/SharedHttpClientConnectionGroup.java @@ -86,6 +86,10 @@ public SharedHttpClientConnectionGroup(ClientMetrics clientMetrics, this.connectHandler = connectHandler; } + public int size() { + return pool.pool.size(); + } + protected void checkExpired() { pool.checkExpired(); } diff --git a/vertx-core/src/main/java/io/vertx/core/net/endpoint/Endpoint.java b/vertx-core/src/main/java/io/vertx/core/net/endpoint/Endpoint.java index 8d2406f93e0..71157ca375c 100644 --- a/vertx-core/src/main/java/io/vertx/core/net/endpoint/Endpoint.java +++ b/vertx-core/src/main/java/io/vertx/core/net/endpoint/Endpoint.java @@ -15,6 +15,7 @@ import io.vertx.codegen.annotations.VertxGen; import java.util.List; +import java.util.Set; import java.util.function.Predicate; @Unstable @@ -32,6 +33,13 @@ public interface Endpoint { */ List servers(); + /** + * Provide the set of protocols the endpoint can accept. + * + * @return the set of protocol ids + */ + Set protocols(); + /** * Select a server. * diff --git a/vertx-core/src/main/java/io/vertx/core/net/endpoint/ServerEndpoint.java b/vertx-core/src/main/java/io/vertx/core/net/endpoint/ServerEndpoint.java index 1620b26f148..c1a8a38a0b8 100644 --- a/vertx-core/src/main/java/io/vertx/core/net/endpoint/ServerEndpoint.java +++ b/vertx-core/src/main/java/io/vertx/core/net/endpoint/ServerEndpoint.java @@ -34,7 +34,10 @@ public interface ServerEndpoint { */ SocketAddress address(); - boolean isAvailable(); + /** + * @return the server protocol id, optional it can be null + */ + String protocolId(); /** * Initiate a request/response interaction with the endpoint represented by this node, the returned interaction gathers statistics. diff --git a/vertx-core/src/main/java/io/vertx/core/net/endpoint/impl/EndpointResolverImpl.java b/vertx-core/src/main/java/io/vertx/core/net/endpoint/impl/EndpointResolverImpl.java index dc442cf5eb0..cb21a1f2d40 100644 --- a/vertx-core/src/main/java/io/vertx/core/net/endpoint/impl/EndpointResolverImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/net/endpoint/impl/EndpointResolverImpl.java @@ -22,11 +22,9 @@ import io.vertx.core.spi.endpoint.EndpointResolver; import io.vertx.core.spi.endpoint.EndpointBuilder; -import java.net.ConnectException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.Function; @@ -102,6 +100,11 @@ public EndpointImpl(A address, AtomicLong lastAccessed, S state) { this.lastAccessed = lastAccessed; } + @Override + public Set protocols() { + return endpointResolver.endpoint(state).protocols; + } + @Override public List servers() { return endpointResolver.endpoint(state).servers; @@ -110,14 +113,7 @@ public List servers() { public ServerEndpoint selectServer(Predicate filter, String key) { ListOfServers listOfServers = endpointResolver.endpoint(state); EndpointResolverImpl.View view = listOfServers.viewOf(loadBalancer, filter); - ServerEndpoint selected = view.selectEndpoint(key); - if (selected != null && !selected.isAvailable()) { - // Rebuild views - listOfServers.views.clear(); - view = listOfServers.viewOf(loadBalancer, filter); - selected = view.selectEndpoint(key); - } - return selected; + return view.selectEndpoint(key); } private void close() { @@ -254,10 +250,24 @@ private static class ListOfServers implements Iterable { // Put stuff here I think ... final List servers; final Map, View> views; + final Set protocols; private ListOfServers(List servers) { + + Set protocols = Collections.emptySet(); + for (ServerEndpoint server : servers) { + String protocol = server.protocolId(); + if (protocol != null) { + if (protocols.isEmpty()) { + protocols = new HashSet<>(3); + } + protocols.add(protocol); + } + } + this.servers = servers; this.views = new ConcurrentHashMap<>(); + this.protocols = protocols; } @Override @@ -275,7 +285,7 @@ View viewOf(LoadBalancer loadBalancer, Predicate filter) { if (view == null) { List l = new ArrayList<>(servers.size()); for (ServerEndpoint s : servers) { - if (s.isAvailable() && filter.test(s)) { + if (filter.test(s)) { l.add(s); } } @@ -303,10 +313,6 @@ public String key() { return key; } @Override - public boolean isAvailable() { - return endpointResolver.isAvailable(endpoint); - } - @Override public Object unwrap() { return endpoint; } @@ -319,6 +325,10 @@ public SocketAddress address() { return endpointResolver.addressOf(endpoint); } @Override + public String protocolId() { + return endpointResolver.protocolOf(endpoint); + } + @Override public ServerInteraction newInteraction() { lastAccessed.set(System.currentTimeMillis()); InteractionMetrics metrics = this.metrics; @@ -344,9 +354,9 @@ public void reportResponseEnd() { } @Override public void reportFailure(Throwable failure) { - if (!connected && failure instanceof ConnectException) { - endpointResolver.reportFailure(endpoint, failure); - } +// if (!connected && failure instanceof ConnectException) { +// endpointResolver.reportFailure(endpoint, failure); +// } metrics.reportFailure(metric, failure); } }; diff --git a/vertx-core/src/main/java/io/vertx/core/spi/endpoint/EndpointResolver.java b/vertx-core/src/main/java/io/vertx/core/spi/endpoint/EndpointResolver.java index ecc59604c2d..d8514e87ef2 100644 --- a/vertx-core/src/main/java/io/vertx/core/spi/endpoint/EndpointResolver.java +++ b/vertx-core/src/main/java/io/vertx/core/spi/endpoint/EndpointResolver.java @@ -36,13 +36,23 @@ public interface EndpointResolver { A tryCast(Address address); /** - * Returns the socket address of a given endpoint {@code server}. + * Returns the socket address of a given {@code server}. * - * @param server the endpoint server + * @param server the server * @return the server socket address */ SocketAddress addressOf(S server); + /** + * Returns the protocol id of a given {@code} server. + * + * @param server the server + * @return the protocol if of the server or {@code null} if unknown + */ + default String protocolOf(S server) { + return null; + } + /** * Resolve an address to the resolver state for this name. * @@ -68,13 +78,6 @@ public interface EndpointResolver { */ boolean isValid(D state); - default boolean isAvailable(S endpoint) { - return true; - } - - default void reportFailure(S endpoint, Throwable failure) { - } - /** * * @param address diff --git a/vertx-core/src/test/java/io/vertx/test/fakeresolver/FakeAddressResolver.java b/vertx-core/src/test/java/io/vertx/test/fakeresolver/FakeAddressResolver.java index f8e2be60d3e..b3c0ce1f78a 100644 --- a/vertx-core/src/test/java/io/vertx/test/fakeresolver/FakeAddressResolver.java +++ b/vertx-core/src/test/java/io/vertx/test/fakeresolver/FakeAddressResolver.java @@ -58,16 +58,6 @@ public B endpoint(FakeState state) { return state.endpoints; } - @Override - public boolean isAvailable(FakeServerEndpoint endpoint) { - return endpoint.available; - } - - @Override - public void reportFailure(FakeServerEndpoint endpoint, Throwable failure) { - endpoint.available = false; - } - @Override public boolean isValid(FakeState state) { FakeRegistration registration = map.get(state.name); diff --git a/vertx-core/src/test/java/io/vertx/test/fakeresolver/FakeServerEndpoint.java b/vertx-core/src/test/java/io/vertx/test/fakeresolver/FakeServerEndpoint.java index 6a27c16ed12..ff5541dc044 100644 --- a/vertx-core/src/test/java/io/vertx/test/fakeresolver/FakeServerEndpoint.java +++ b/vertx-core/src/test/java/io/vertx/test/fakeresolver/FakeServerEndpoint.java @@ -4,11 +4,9 @@ public class FakeServerEndpoint { - boolean available; final SocketAddress socketAddress; FakeServerEndpoint(SocketAddress socketAddress) { - this.available = true; this.socketAddress = socketAddress; } diff --git a/vertx-core/src/test/java/io/vertx/tests/endpoint/EndpointResolverTest.java b/vertx-core/src/test/java/io/vertx/tests/endpoint/EndpointResolverTest.java index 609c1e7c0cc..850cd165ab4 100644 --- a/vertx-core/src/test/java/io/vertx/tests/endpoint/EndpointResolverTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/endpoint/EndpointResolverTest.java @@ -87,18 +87,4 @@ public void testRebuild() { assertEquals(addr2, endpoint.selectServer(odd).address()); assertEquals(addr4, endpoint.selectServer(odd).address()); } - - @Test - public void testServerInvalidation() { - fakeResolver.registerAddress("example.com", List.of(addr1, addr2, addr3, addr4)); - EndpointResolverImpl resolver = new EndpointResolverImpl<>((VertxInternal) vertx, fakeResolver, LoadBalancer.FIRST, 5000); - Endpoint endpoint = resolver.resolveEndpoint(new FakeAddress("example.com")).await(); - ServerEndpoint server = endpoint.selectServer(); - assertEquals(addr1, server.address()); - ServerInteraction interaction = server.newInteraction(); - interaction.reportFailure(new ConnectException()); - System.out.println(endpoint.servers()); - server = endpoint.selectServer(); - assertEquals(addr2, server.address()); - } } diff --git a/vertx-core/src/test/java/io/vertx/tests/endpoint/LoadBalancingCornerCasesTest.java b/vertx-core/src/test/java/io/vertx/tests/endpoint/LoadBalancingCornerCasesTest.java index 175753baeac..3e6dfc94fcc 100644 --- a/vertx-core/src/test/java/io/vertx/tests/endpoint/LoadBalancingCornerCasesTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/endpoint/LoadBalancingCornerCasesTest.java @@ -47,8 +47,8 @@ public Object unwrap() { return null; } @Override - public boolean isAvailable() { - return true; + public String protocolId() { + return null; } @Override public String key() { diff --git a/vertx-core/src/test/java/io/vertx/tests/endpoint/LoadBalancingTest.java b/vertx-core/src/test/java/io/vertx/tests/endpoint/LoadBalancingTest.java index 752841b95fe..b26694eeeed 100644 --- a/vertx-core/src/test/java/io/vertx/tests/endpoint/LoadBalancingTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/endpoint/LoadBalancingTest.java @@ -38,8 +38,8 @@ public String key() { return ""; } @Override - public boolean isAvailable() { - return true; + public String protocolId() { + return null; } @Override public Object unwrap() { diff --git a/vertx-core/src/test/java/io/vertx/tests/http/HttpAlternativesTest.java b/vertx-core/src/test/java/io/vertx/tests/http/HttpAlternativesTest.java index e0af5d3af36..dbcce36e07d 100644 --- a/vertx-core/src/test/java/io/vertx/tests/http/HttpAlternativesTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/http/HttpAlternativesTest.java @@ -15,9 +15,6 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.http.*; import io.vertx.core.http.impl.Origin; -import io.vertx.core.http.Http1ClientConfig; -import io.vertx.core.http.Http2ClientConfig; -import io.vertx.core.http.Http3ClientConfig; import io.vertx.core.http.HttpClientConfig; import io.vertx.core.internal.http.HttpClientInternal; import io.vertx.core.internal.net.endpoint.EndpointResolverInternal; @@ -28,18 +25,16 @@ import io.vertx.test.core.VertxTestBase; import io.vertx.test.proxy.Proxy; import io.vertx.test.tls.Cert; -import org.junit.Ignore; +import junit.framework.AssertionFailedError; import org.junit.Rule; import org.junit.Test; import javax.net.ssl.SSLHandshakeException; -import java.net.ConnectException; -import java.security.cert.CertificateException; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -63,10 +58,22 @@ protected VertxOptions getOptions() { return options; } - private Consumer> startServer(int port, Cert cert, HttpVersion... versions) { + private interface Server { + Server handler(Handler handler); + int connectionCount(); + } + + private Server startServer(int port, Cert cert, HttpVersion... versions) { AtomicReference> handler = new AtomicReference<>(); List tcpVersions = Stream.of(versions).filter(v -> v != HttpVersion.HTTP_3).collect(Collectors.toList()); List quicVersions = Stream.of(versions).filter(v -> v == HttpVersion.HTTP_3).collect(Collectors.toList()); + AtomicInteger connectionCount = new AtomicInteger(); + Handler connectionHandler = connection -> { + connectionCount.incrementAndGet(); + connection.closeHandler(v -> { + connectionCount.decrementAndGet(); + }); + }; if (!tcpVersions.isEmpty()) { HttpServer server = vertx.createHttpServer(new HttpServerOptions() .setSsl(true) @@ -74,6 +81,7 @@ private Consumer> startServer(int port, Cert { Handler h = handler.get(); if (h != null) { @@ -86,6 +94,7 @@ private Consumer> startServer(int port, Cert { Handler h = handler.get(); if (h != null) { @@ -94,7 +103,17 @@ private Consumer> startServer(int port, Cert httpServerRequestHandler) { + handler.set(httpServerRequestHandler); + return this; + } + @Override + public int connectionCount() { + return connectionCount.get(); + } + }; } @Override @@ -103,75 +122,76 @@ protected void tearDown() throws Exception { client = null; } - @Ignore @Test public void testHttp1ToHttp2Protocol() { testFollowProtocol(HttpVersion.HTTP_1_1, HttpVersion.HTTP_2, "h2=\"localhost:4044\"", "localhost:4044"); } - @Ignore @Test public void testHttp1ToHttp2ProtocolSameHost() { testFollowProtocol(HttpVersion.HTTP_1_1, HttpVersion.HTTP_2, "h2=\":4044\"", "host2.com:4044"); } - @Ignore @Test public void testHttp1ToHttp3Protocol() { testFollowProtocol(HttpVersion.HTTP_1_1, HttpVersion.HTTP_3, "h3=\"host2.com:4044\"", "host2.com:4044"); } - @Ignore @Test public void testHttp1ToHttp3ProtocolSameHost() { testFollowProtocol(HttpVersion.HTTP_1_1, HttpVersion.HTTP_3, "h3=\":4044\"", "host2.com:4044"); } - @Ignore @Test public void testHttp2ToHttp3Protocol() { testFollowProtocol(HttpVersion.HTTP_2, HttpVersion.HTTP_3, "h3=\"host2.com:4044\"", "host2.com:4044"); } - @Ignore @Test public void testHttp2ToHttp3ProtocolSameHost() { testFollowProtocol(HttpVersion.HTTP_2, HttpVersion.HTTP_3, "h3=\":4044\"", "host2.com:4044"); } - @Ignore @Test public void testExpiration() throws Exception { testFollowProtocol(HttpVersion.HTTP_1_1, HttpVersion.HTTP_2, "h2=\"host2.com:4044\";ma=1", "host2.com:4044"); Thread.sleep(1500); - Buffer body = client.request(new RequestOptions().setHost("host2.com").setProtocolVersion(HttpVersion.HTTP_2).setPort(4043).setURI("/")) - .compose(request -> request - .send() - .expecting(response -> request.version() == HttpVersion.HTTP_1_1) - .compose(HttpClientResponse::body) - ).await(); - assertEquals("host2.com:4043", body.toString()); + try { + client.request(new RequestOptions().setHost("host2.com").setProtocolVersion(HttpVersion.HTTP_2).setPort(4043).setURI("/")) + .compose(request -> request + .send() + .expecting(response -> request.version() == HttpVersion.HTTP_1_1) + .compose(HttpClientResponse::body) + ).await(); + fail(); + } catch (Exception e) { + assertEquals(SSLHandshakeException.class, e.getClass()); + assertTrue(e.getCause().getMessage().contains("no_application_protocol")); + } } - @Ignore @Test public void testOverwriteInvalidation() { - testInvalidation("h2=\"host2.com:4045\""); + Exception e = testInvalidation("h2=\"host2.com:4045\""); + assertEquals(SSLHandshakeException.class, e.getClass()); + assertTrue(e.getCause().getMessage().contains("no_application_protocol")); } - @Ignore @Test public void testClearInvalidation() { - testInvalidation("clear"); + Exception e = testInvalidation("clear"); + assertEquals(SSLHandshakeException.class, e.getClass()); + assertTrue(e.getCause().getMessage().contains("no_application_protocol")); } - @Ignore @Test public void testInvalidProtocolInvalidation() { - testInvalidation("h2c=\"host2.com:4044\""); + Exception e = testInvalidation("h2c=\"host2.com:4044\""); + assertEquals(SSLHandshakeException.class, e.getClass()); + assertTrue(e.getCause().getMessage().contains("no_application_protocol")); } - private void testInvalidation(String altSvcInvalidator) { + private Exception testInvalidation(String altSvcInvalidator) { AtomicReference altSvc = new AtomicReference<>("h2=\"host2.com:4044\""); testFollowProtocol(HttpVersion.HTTP_1_1, HttpVersion.HTTP_2, altSvc::get, "host2.com:4044"); altSvc.set(altSvcInvalidator); @@ -183,9 +203,16 @@ private void testInvalidation(String altSvcInvalidator) { ).await(); assertEquals("host2.com:4043", body.toString()); try { - client.request(new RequestOptions().setHost("host2.com").setProtocolVersion(HttpVersion.HTTP_2).setPort(4043).setURI("/")).await(); + int retries = 10; + for (int i = 0;i < retries;i++) { + client + .request(new RequestOptions().setHost("host2.com").setProtocolVersion(HttpVersion.HTTP_2).setPort(4043).setURI("/")) + .await(); + Thread.sleep(100); + } + throw new AssertionFailedError(); } catch (Exception e) { - assertTrue(e instanceof ConnectException); + return e; } } @@ -195,14 +222,14 @@ private void testFollowProtocol(HttpVersion initialProtocol, HttpVersion upgrade private void testFollowProtocol(HttpVersion initialProtocol, HttpVersion upgradedProtocol, Supplier advertisement, String expectedAltUsed) { startServer(4043, Cert.SNI_JKS, initialProtocol) - .accept(request -> { + .handler(request -> { assertNull(request.getHeader(HttpHeaders.ALT_USED)); assertEquals("host2.com", request.connection().indicatedServerName()); writeAltSvc(request, advertisement.get()) .end(request.authority().toString(false)); }); - startServer(4044, Cert.SNI_JKS, upgradedProtocol) - .accept(request -> { + Server alternative = startServer(4044, Cert.SNI_JKS, upgradedProtocol) + .handler(request -> { assertEquals(expectedAltUsed, request.getHeader(HttpHeaders.ALT_USED)); assertEquals("host2.com", request.connection().indicatedServerName()); request @@ -222,6 +249,7 @@ private void testFollowProtocol(HttpVersion initialProtocol, HttpVersion upgrade .compose(HttpClientResponse::body) ).await(); assertEquals("host2.com:4043", body.toString()); + assertWaitUntil(() -> alternative.connectionCount() == 1); body = client.request(new RequestOptions().setHost("host2.com").setProtocolVersion(upgradedProtocol).setPort(4043).setURI("/")) .compose(request -> request .send() @@ -229,73 +257,79 @@ private void testFollowProtocol(HttpVersion initialProtocol, HttpVersion upgrade .compose(HttpClientResponse::body) ).await(); assertEquals("host2.com:4043", body.toString()); + body = client.request(new RequestOptions().setHost("host2.com").setPort(4043).setURI("/")) + .compose(request -> request + .send() + .expecting(response -> request.version() == upgradedProtocol) + .compose(HttpClientResponse::body) + ).await(); + assertEquals("host2.com:4043", body.toString()); } - @Ignore @Test public void testEvictInvalidAlternative() { startServer(4043, Cert.SNI_JKS, HttpVersion.HTTP_1_1) - .accept(request -> { + .handler(request -> { assertNull(request.getHeader(HttpHeaders.ALT_USED)); request .response() .putHeader(HttpHeaders.ALT_SVC, "h2=\"host2.com:4044\"") .end(request.authority().toString(false)); }); - client = vertx.createHttpClient(new HttpClientOptions().setFollowAlternativeServices(true).setSsl(true).setTrustAll(true)); + client = vertx.createHttpClient(new HttpClientConfig().setFollowAlternativeServices(true).setSsl(true), new ClientSSLOptions().setTrustAll(true)); Buffer body = client.request(HttpMethod.GET, 4043, "host2.com", "/") .compose(request -> request .send() .compose(HttpClientResponse::body) ).await(); assertEquals("host2.com:4043", body.toString()); - try { - client.request(new RequestOptions().setHost("host2.com").setProtocolVersion(HttpVersion.HTTP_2).setPort(4043).setURI("/")) - .await(); - fail(); - } catch (Exception e) { - assertEquals(ConnectException.class, e.getClass().getSuperclass()); + for (int i = 0;i < 2;i++) { + try { + client.request(new RequestOptions().setHost("host2.com").setProtocolVersion(HttpVersion.HTTP_2).setPort(4043).setURI("/")) + .await(); + fail(); + } catch (Exception e) { + assertEquals(SSLHandshakeException.class, e.getClass()); + assertTrue(e.getCause().getMessage().contains("no_application_protocol")); + } } - client.request(new RequestOptions().setHost("host2.com").setProtocolVersion(HttpVersion.HTTP_2).setPort(4043).setURI("/")) - .compose(request -> request - .send() - .expecting(response -> request.version() == HttpVersion.HTTP_1_1) - .compose(HttpClientResponse::body) - ).await(); } - @Ignore @Test public void testIgnoreAlternativeWithoutSNI() { startServer(4043, Cert.SNI_JKS, HttpVersion.HTTP_1_1) - .accept(request -> { + .handler(request -> { assertEquals(null, request.connection().indicatedServerName()); request .response() .putHeader(HttpHeaders.ALT_SVC, "h2=\":4044\"") .end(request.authority().toString(false)); }); - client = vertx.createHttpClient(new HttpClientOptions().setFollowAlternativeServices(true).setSsl(true).setTrustAll(true)); + client = vertx.createHttpClient(new HttpClientConfig().setFollowAlternativeServices(true).setSsl(true), new ClientSSLOptions().setTrustAll(true)); Buffer body = client.request(HttpMethod.GET, 4043, "localhost", "/") .compose(request -> request .send() .compose(HttpClientResponse::body) ).await(); assertEquals("localhost:4043", body.toString()); - body = client.request(new RequestOptions().setHost("localhost").setProtocolVersion(HttpVersion.HTTP_2).setPort(4043).setURI("/")) - .compose(request -> request - .send() - .expecting(response -> request.version() == HttpVersion.HTTP_1_1) - .compose(HttpClientResponse::body) - ).await(); - assertEquals("localhost:4043", body.toString()); + try { + body = client.request(new RequestOptions().setHost("localhost").setProtocolVersion(HttpVersion.HTTP_2).setPort(4043).setURI("/")) + .compose(request -> request + .send() + .expecting(response -> request.version() == HttpVersion.HTTP_1_1) + .compose(HttpClientResponse::body) + ).await(); + fail(); + } catch (Exception e) { + assertEquals(SSLHandshakeException.class, e.getClass()); + assertTrue(e.getCause().getMessage().contains("no_application_protocol")); + } } - @Ignore @Test public void testCertificateValidation() { startServer(4043, Cert.SNI_JKS, HttpVersion.HTTP_1_1) - .accept(request -> { + .handler(request -> { assertNull(request.getHeader(HttpHeaders.ALT_USED)); assertEquals("host2.com", request.connection().indicatedServerName()); request @@ -304,10 +338,10 @@ public void testCertificateValidation() { .end(request.authority().toString(false)); }); startServer(4044, Cert.SERVER_JKS, HttpVersion.HTTP_2) - .accept(request -> { + .handler(request -> { fail(); }); - client = vertx.createHttpClient(new HttpClientOptions().setFollowAlternativeServices(true).setSsl(true).setTrustAll(true)); + client = vertx.createHttpClient(new HttpClientConfig().setFollowAlternativeServices(true).setSsl(true), new ClientSSLOptions().setTrustAll(true)); Buffer body = client.request(HttpMethod.GET, 4043, "host2.com", "/") .compose(request -> request .send() @@ -324,43 +358,41 @@ public void testCertificateValidation() { fail(); } catch (Exception e) { assertEquals(SSLHandshakeException.class, e.getClass()); - Throwable root = e; - while (root.getCause() != null) { - root = root.getCause(); - } - assertEquals(CertificateException.class, root.getClass()); - assertEquals("No name matching host2.com found", root.getMessage()); + assertTrue(e.getCause().getMessage().contains("no_application_protocol")); } } - @Ignore @Test public void testIgnoreAlternativeServicesAdvertisements() { startServer(4043, Cert.SNI_JKS, HttpVersion.HTTP_1_1) - .accept(request -> { + .handler(request -> { assertNull(request.getHeader(HttpHeaders.ALT_USED)); request .response() .putHeader(HttpHeaders.ALT_SVC, "h2=\"host2.com:4044\"") .end(request.authority().toString(false)); }); - client = vertx.createHttpClient(new HttpClientOptions().setFollowAlternativeServices(false).setSsl(true).setTrustAll(true)); + client = vertx.createHttpClient(new HttpClientConfig().setFollowAlternativeServices(false).setSsl(true), new ClientSSLOptions().setTrustAll(true)); Buffer body = client.request(HttpMethod.GET, 4043, "host2.com", "/") .compose(request -> request .send() .compose(HttpClientResponse::body) ).await(); assertEquals("host2.com:4043", body.toString()); - body = client.request(new RequestOptions().setHost("host2.com").setProtocolVersion(HttpVersion.HTTP_2).setPort(4043).setURI("/")) - .compose(request -> request - .send() - .expecting(response -> request.version() == HttpVersion.HTTP_1_1) - .compose(HttpClientResponse::body) - ).await(); + try { + client.request(new RequestOptions().setHost("host2.com").setProtocolVersion(HttpVersion.HTTP_2).setPort(4043).setURI("/")) + .compose(request -> request + .send() + .expecting(response -> request.version() == HttpVersion.HTTP_1_1) + .compose(HttpClientResponse::body) + ).await(); + } catch (Exception e) { + assertEquals(SSLHandshakeException.class, e.getClass()); + assertTrue(e.getCause().getMessage().contains("no_application_protocol")); + } assertEquals("host2.com:4043", body.toString()); } - @Ignore @Test public void testIgnoreAlternativeServicesAdvertisements2() { HttpServer server = vertx.createHttpServer(new HttpServerOptions().setHttp2ClearTextEnabled(false)); @@ -388,7 +420,6 @@ public void testIgnoreAlternativeServicesAdvertisements2() { assertEquals("host2.com:8080", body.toString()); } - @Ignore @Test public void testAlternativeCaching1() throws Exception { // Test that we maintain the information although the server will an advertisement on each request @@ -396,7 +427,6 @@ public void testAlternativeCaching1() throws Exception { assertEquals(1, times); } - @Ignore @Test public void testAlternativeCaching2() throws Exception { // Test that we refresh information when expiration time is short and the server provides identical advertisements @@ -406,7 +436,7 @@ public void testAlternativeCaching2() throws Exception { private int testAlternativeCaching(String altSvc, int num, long sleepTime) throws Exception { startServer(4043, Cert.SNI_JKS, HttpVersion.HTTP_1_1) - .accept(request -> { + .handler(request -> { assertNull(request.getHeader(HttpHeaders.ALT_USED)); assertEquals("host2.com", request.connection().indicatedServerName()); request @@ -414,7 +444,11 @@ private int testAlternativeCaching(String altSvc, int num, long sleepTime) throw .putHeader(HttpHeaders.ALT_SVC, altSvc) .end(request.authority().toString(false)); }); - client = vertx.createHttpClient(new HttpClientOptions().setFollowAlternativeServices(true).setSsl(true).setTrustAll(true)); + startServer(4044, Cert.SNI_JKS, HttpVersion.HTTP_2) + .handler(request -> { + fail(); + }); + client = vertx.createHttpClient(new HttpClientConfig().setFollowAlternativeServices(true).setSsl(true), new ClientSSLOptions().setTrustAll(true)); Buffer body = client.request(HttpMethod.GET, 4043, "host2.com", "/") .compose(request -> request .send() @@ -423,7 +457,11 @@ private int testAlternativeCaching(String altSvc, int num, long sleepTime) throw EndpointResolverInternal resolver = ((HttpClientInternal) client).originResolver(); Map map = new IdentityHashMap<>(); for (int i = 0;i < num;i++) { - body = client.request(HttpMethod.GET, 4043, "host2.com", "/") + body = client.request(new RequestOptions() + .setProtocolVersion(HttpVersion.HTTP_1_1) + .setMethod(HttpMethod.GET) + .setHost("host2.com") + .setPort(4043)) .compose(request -> request .send() .compose(HttpClientResponse::body)