diff --git a/core/router/src/main/java/org/wisdom/router/WebSocketRouter.java b/core/router/src/main/java/org/wisdom/router/WebSocketRouter.java index bc5001c9d..4db4886a8 100644 --- a/core/router/src/main/java/org/wisdom/router/WebSocketRouter.java +++ b/core/router/src/main/java/org/wisdom/router/WebSocketRouter.java @@ -31,6 +31,7 @@ import org.wisdom.api.concurrent.ManagedExecutorService; import org.wisdom.api.content.ContentEngine; import org.wisdom.api.content.ParameterFactories; +import org.wisdom.api.http.Context; import org.wisdom.api.http.websockets.Publisher; import org.wisdom.api.http.websockets.WebSocketDispatcher; import org.wisdom.api.http.websockets.WebSocketListener; @@ -202,12 +203,13 @@ public synchronized void unbindController(Controller controller) { * @param content the received content */ @Override - public void received(final String uri, final String from, final byte[] content) { + public void received(final String uri, final String from, final byte[] content, final Context context) { for (final OnMessageWebSocketCallback listener : listeners) { if (listener.matches(uri)) { executor.submit(new Callable() { @Override public Void call() throws Exception { + Context.CONTEXT.set(context); try { listener.invoke(uri, from, content); } catch (InvocationTargetException e) { //NOSONAR @@ -218,6 +220,8 @@ public Void call() throws Exception { } catch (Exception e) { LOGGER.error("An error occurred in the @OnMessage callback {}#{} : {}", listener.getController().getClass().getName(), listener.getMethod().getName(), e.getMessage(), e); + } finally { + Context.CONTEXT.remove(); } return null; } diff --git a/core/router/src/test/java/org/wisdom/router/WebSocketRouterTest.java b/core/router/src/test/java/org/wisdom/router/WebSocketRouterTest.java index 47986cb18..d87870d2b 100644 --- a/core/router/src/test/java/org/wisdom/router/WebSocketRouterTest.java +++ b/core/router/src/test/java/org/wisdom/router/WebSocketRouterTest.java @@ -124,7 +124,7 @@ public void foo(@Body String message) { assertThat(router.listeners.iterator().next().check()).isTrue(); assertThat(router.listeners.iterator().next().getController()).isEqualTo(controller); - router.received("/ws", "client", "hello".getBytes(Charset.defaultCharset())); + router.received("/ws", "client", "hello".getBytes(Charset.defaultCharset()), null); assertThat(message).isEqualTo("hello"); @@ -167,7 +167,7 @@ public void foo(@Parameter("name") String name, @Body String message, @Parameter assertThat(router.listeners.iterator().next().check()).isTrue(); assertThat(router.listeners.iterator().next().getController()).isEqualTo(controller); - router.received("/ws/foo", "client", "hello".getBytes(Charset.defaultCharset())); + router.received("/ws/foo", "client", "hello".getBytes(Charset.defaultCharset()), null); assertThat(results) .contains(entry("message", "hello")) @@ -176,7 +176,7 @@ public void foo(@Parameter("name") String name, @Body String message, @Parameter results.clear(); - router.received("/ws", "client", "hello".getBytes(Charset.defaultCharset())); + router.received("/ws", "client", "hello".getBytes(Charset.defaultCharset()), null); // Should not have been received. assertThat(results).isEmpty(); diff --git a/core/wisdom-api/src/main/java/org/wisdom/api/http/websockets/WebSocketListener.java b/core/wisdom-api/src/main/java/org/wisdom/api/http/websockets/WebSocketListener.java index ce21b2a74..2bff360e4 100644 --- a/core/wisdom-api/src/main/java/org/wisdom/api/http/websockets/WebSocketListener.java +++ b/core/wisdom-api/src/main/java/org/wisdom/api/http/websockets/WebSocketListener.java @@ -19,6 +19,8 @@ */ package org.wisdom.api.http.websockets; +import org.wisdom.api.http.Context; + /** * Classes implementing this interface should register themselves on {@link WebSocketDispatcher} to receive * notification when client are opening, closing web sockets or sending data. @@ -32,7 +34,7 @@ public interface WebSocketListener { * @param client the client id * @param content the received content */ - public void received(String uri, String client, byte[] content); + public void received(String uri, String client, byte[] content, Context context); /** * Callback invoked when a new client connects on a web socket identified by its url. diff --git a/core/wisdom-vertx-engine/src/main/java/org/wisdom/framework/vertx/ContextFromVertx.java b/core/wisdom-vertx-engine/src/main/java/org/wisdom/framework/vertx/ContextFromVertx.java index 4804e7d95..3fcb63ce6 100644 --- a/core/wisdom-vertx-engine/src/main/java/org/wisdom/framework/vertx/ContextFromVertx.java +++ b/core/wisdom-vertx-engine/src/main/java/org/wisdom/framework/vertx/ContextFromVertx.java @@ -92,6 +92,30 @@ public ContextFromVertx(Vertx vertx, io.vertx.core.Context vertxContext, Service } } + /** + * Creates a new context with Http headers only, without HttpRequest + * This is used to partially initialize the context on WebSocket + * + * @param accessor a structure containing the used services. + * @param headers the incoming HTTP Headers. + */ + public ContextFromVertx(Vertx vertx, io.vertx.core.Context vertxContext, ServiceAccessor accessor, MultiMap headers) { + id = ids.getAndIncrement(); + services = accessor; + request = new RequestFromVertx(headers); + this.vertx = vertx; + flash = new FlashCookieImpl(accessor.getConfiguration()); + session = new SessionCookieImpl(accessor.getCrypto(), accessor.getConfiguration()); + flash.init(this); + session.init(this); + + if (vertxContext == null) { + throw new IllegalArgumentException("Creating a context from vert.x outside of an event loop"); + } else { + this.vertxContext = vertxContext; + } + } + /** * The context id (unique). diff --git a/core/wisdom-vertx-engine/src/main/java/org/wisdom/framework/vertx/RequestFromVertx.java b/core/wisdom-vertx-engine/src/main/java/org/wisdom/framework/vertx/RequestFromVertx.java index 459925089..238764eda 100644 --- a/core/wisdom-vertx-engine/src/main/java/org/wisdom/framework/vertx/RequestFromVertx.java +++ b/core/wisdom-vertx-engine/src/main/java/org/wisdom/framework/vertx/RequestFromVertx.java @@ -23,9 +23,12 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.net.MediaType; +import io.vertx.core.Handler; import io.vertx.core.MultiMap; import io.vertx.core.buffer.Buffer; -import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.*; +import io.vertx.core.http.impl.HttpServerRequestImpl; +import io.vertx.core.net.NetSocket; import io.vertx.core.net.SocketAddress; import org.wisdom.api.cookies.Cookie; import org.wisdom.api.cookies.Cookies; @@ -35,6 +38,8 @@ import org.wisdom.framework.vertx.cookies.CookiesImpl; import org.wisdom.framework.vertx.file.VertxFileUpload; +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.security.cert.X509Certificate; import java.net.URI; import java.net.URISyntaxException; import java.util.*; @@ -73,7 +78,19 @@ public class RequestFromVertx extends Request { */ public RequestFromVertx(final HttpServerRequest request) { this.request = request; - this.cookies = new CookiesImpl(request); + this.cookies = new CookiesImpl(request.headers()); + this.data = new HashMap<>(); + } + + /** + * Creates a mock {@link org.wisdom.framework.vertx.RequestFromVertx} object from headers + * This is used to retrieve cookies information from WebSocket connection + * + * @param headers Headers from a WebSocket connection + */ + public RequestFromVertx(final MultiMap headers) { + this.request = this.mock; + this.cookies = new CookiesImpl(headers); this.data = new HashMap<>(); } @@ -594,4 +611,151 @@ public boolean ready() { protected void setRawBody(Buffer raw) { this.raw = raw; } + + private HttpServerRequest mock = new HttpServerRequest() { + @Override + public HttpServerRequest exceptionHandler(Handler handler) { + return null; + } + + @Override + public HttpServerRequest handler(Handler handler) { + return null; + } + + @Override + public HttpServerRequest pause() { + return null; + } + + @Override + public HttpServerRequest resume() { + return null; + } + + @Override + public HttpServerRequest endHandler(Handler handler) { + return null; + } + + @Override + public HttpVersion version() { + return null; + } + + @Override + public HttpMethod method() { + return null; + } + + @Override + public String uri() { + return null; + } + + @Override + public String path() { + return null; + } + + @Override + public String query() { + return null; + } + + @Override + public HttpServerResponse response() { + return null; + } + + @Override + public MultiMap headers() { + return null; + } + + @Override + public String getHeader(String s) { + return null; + } + + @Override + public String getHeader(CharSequence charSequence) { + return null; + } + + @Override + public MultiMap params() { + return null; + } + + @Override + public String getParam(String s) { + return null; + } + + @Override + public SocketAddress remoteAddress() { + return null; + } + + @Override + public SocketAddress localAddress() { + return null; + } + + @Override + public X509Certificate[] peerCertificateChain() throws SSLPeerUnverifiedException { + return new X509Certificate[0]; + } + + @Override + public String absoluteURI() { + return null; + } + + @Override + public HttpServerRequest bodyHandler(Handler handler) { + return null; + } + + @Override + public NetSocket netSocket() { + return null; + } + + @Override + public HttpServerRequest setExpectMultipart(boolean b) { + return null; + } + + @Override + public boolean isExpectMultipart() { + return false; + } + + @Override + public HttpServerRequest uploadHandler(Handler handler) { + return null; + } + + @Override + public MultiMap formAttributes() { + return null; + } + + @Override + public String getFormAttribute(String s) { + return null; + } + + @Override + public ServerWebSocket upgrade() { + return null; + } + + @Override + public boolean isEnded() { + return false; + } + }; } diff --git a/core/wisdom-vertx-engine/src/main/java/org/wisdom/framework/vertx/Server.java b/core/wisdom-vertx-engine/src/main/java/org/wisdom/framework/vertx/Server.java index 47d638362..077bb88b9 100644 --- a/core/wisdom-vertx-engine/src/main/java/org/wisdom/framework/vertx/Server.java +++ b/core/wisdom-vertx-engine/src/main/java/org/wisdom/framework/vertx/Server.java @@ -288,7 +288,7 @@ private void bind(int p, Handler> completion) { http = vertx.createHttpServer(options) .requestHandler(new HttpHandler(vertx, accessor, this)) - .websocketHandler(new WebSocketHandler(accessor, this)); + .websocketHandler(new WebSocketHandler(vertx, accessor, this)); http.listen(thePort, host, event -> { if (event.succeeded()) { diff --git a/core/wisdom-vertx-engine/src/main/java/org/wisdom/framework/vertx/WebSocketHandler.java b/core/wisdom-vertx-engine/src/main/java/org/wisdom/framework/vertx/WebSocketHandler.java index dcf26a6f5..94437eeb0 100644 --- a/core/wisdom-vertx-engine/src/main/java/org/wisdom/framework/vertx/WebSocketHandler.java +++ b/core/wisdom-vertx-engine/src/main/java/org/wisdom/framework/vertx/WebSocketHandler.java @@ -20,9 +20,11 @@ package org.wisdom.framework.vertx; import io.vertx.core.Handler; +import io.vertx.core.Vertx; import io.vertx.core.http.ServerWebSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.wisdom.api.http.Context; /** * Handles web socket frames. @@ -38,6 +40,7 @@ public class WebSocketHandler implements Handler { * The structure used to access services. */ private final ServiceAccessor accessor; + private final Vertx vertx; /** * The server configuration. @@ -51,9 +54,10 @@ public class WebSocketHandler implements Handler { * @param server the server configuration - used to check whether or not the message should be * allowed or denied */ - public WebSocketHandler(ServiceAccessor accessor, Server server) { + public WebSocketHandler(Vertx vertx, ServiceAccessor accessor, Server server) { this.accessor = accessor; this.configuration = server; + this.vertx = vertx; } /** @@ -65,20 +69,31 @@ public WebSocketHandler(ServiceAccessor accessor, Server server) { public void handle(final ServerWebSocket socket) { LOGGER.info("New web socket connection {}, {}", socket, socket.uri()); + final ContextFromVertx context = new ContextFromVertx(vertx, vertx.getOrCreateContext(), accessor, socket.headers()); + if (! configuration.accept(socket.uri())) { LOGGER.warn("Web Socket connection denied on {} by {}", socket.uri(), configuration.name()); return; } final Socket sock = new Socket(socket); + Context.CONTEXT.set(context); + //TODO Propagate context in case listeners use executor on socket opening ? accessor.getDispatcher().addSocket(socket.path(), sock); + Context.CONTEXT.remove(); socket.closeHandler(event -> { LOGGER.info("Web Socket closed {}, {}", socket, socket.uri()); + Context.CONTEXT.set(context); + //TODO Propagate context in case listeners use executor on socket closing ? accessor.getDispatcher().removeSocket(socket.path(), sock); + Context.CONTEXT.remove(); }); - socket.handler(event -> accessor.getDispatcher().received(socket.path(), event.getBytes(), sock)); + socket.handler(event -> { + //Context has to be propagated as WebSocketRouter (default WebSocketListener) execute registered routes in an executor + accessor.getDispatcher().received(socket.path(), event.getBytes(), sock, context); + }); } } diff --git a/core/wisdom-vertx-engine/src/main/java/org/wisdom/framework/vertx/WisdomVertxServer.java b/core/wisdom-vertx-engine/src/main/java/org/wisdom/framework/vertx/WisdomVertxServer.java index d5a7e6e8e..e666938a6 100644 --- a/core/wisdom-vertx-engine/src/main/java/org/wisdom/framework/vertx/WisdomVertxServer.java +++ b/core/wisdom-vertx-engine/src/main/java/org/wisdom/framework/vertx/WisdomVertxServer.java @@ -439,15 +439,17 @@ public void send(String uri, String client, byte[] message) { * @param uri the web socket url * @param content the data * @param socket the client channel + * @param context the context in which to handle the message */ - public void received(String uri, byte[] content, Socket socket) { + public void received(String uri, byte[] content, Socket socket, ContextFromVertx context) { List localListeners; synchronized (this) { localListeners = new ArrayList<>(this.listeners); } for (WebSocketListener listener : localListeners) { - listener.received(uri, id(socket), content); + // + listener.received(uri, id(socket), content, context); } } } diff --git a/core/wisdom-vertx-engine/src/main/java/org/wisdom/framework/vertx/cookies/CookiesImpl.java b/core/wisdom-vertx-engine/src/main/java/org/wisdom/framework/vertx/cookies/CookiesImpl.java index 703309920..cd41337fd 100644 --- a/core/wisdom-vertx-engine/src/main/java/org/wisdom/framework/vertx/cookies/CookiesImpl.java +++ b/core/wisdom-vertx-engine/src/main/java/org/wisdom/framework/vertx/cookies/CookiesImpl.java @@ -23,7 +23,10 @@ import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.cookie.DefaultCookie; import io.netty.handler.codec.http.cookie.ServerCookieDecoder; +import io.vertx.core.MultiMap; import io.vertx.core.http.HttpServerRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.wisdom.api.cookies.Cookie; import org.wisdom.api.cookies.Cookies; @@ -37,9 +40,13 @@ public class CookiesImpl implements Cookies { private Map cookies = Maps.newTreeMap(); - public CookiesImpl(HttpServerRequest request) { + /** + * Parse cookies from request headers (HttpRequest or WebSocketFrame) + * @param headers + */ + public CookiesImpl(MultiMap headers) { Set localCookies; - String value = request.headers().get(HttpHeaders.Names.COOKIE); + String value = headers.get(HttpHeaders.Names.COOKIE); if (value != null) { localCookies = ServerCookieDecoder.LAX.decode(value); for (io.netty.handler.codec.http.cookie.Cookie cookie : localCookies) { diff --git a/core/wisdom-vertx-engine/src/test/java/org/wisdom/framework/vertx/VertxDispatcherTest.java b/core/wisdom-vertx-engine/src/test/java/org/wisdom/framework/vertx/VertxDispatcherTest.java index 6f2019e63..3df3a8e66 100644 --- a/core/wisdom-vertx-engine/src/test/java/org/wisdom/framework/vertx/VertxDispatcherTest.java +++ b/core/wisdom-vertx-engine/src/test/java/org/wisdom/framework/vertx/VertxDispatcherTest.java @@ -28,9 +28,9 @@ import org.wisdom.api.configuration.ApplicationConfiguration; import org.wisdom.api.content.ContentEngine; import org.wisdom.api.exceptions.ExceptionMapper; +import org.wisdom.api.http.Context; import org.wisdom.api.http.websockets.WebSocketListener; import org.wisdom.api.router.Router; -import org.wisdom.framework.vertx.ssl.SSLServerContext; import javax.net.ssl.*; import java.io.File; @@ -189,14 +189,14 @@ public void testWebSocketDispatching() throws InterruptedException { // The listener should have been notified. assertThat(listener.opened).isNotNull(); - server.received("/hello", "message".getBytes(Charsets.UTF_8), sock); + server.received("/hello", "message".getBytes(Charsets.UTF_8), sock, null); // The listener should have received the message. assertThat(listener.lastMessage).isEqualTo("message"); assertThat(listener.lastClient).isNotNull(); assertThat(listener.closed).isNull(); - server.received("/hello", "message2".getBytes(Charsets.UTF_8), sock); + server.received("/hello", "message2".getBytes(Charsets.UTF_8), sock, null); assertThat(listener.lastMessage).isEqualTo("message2"); assertThat(listener.lastClient).isNotNull(); assertThat(listener.closed).isNull(); @@ -223,14 +223,14 @@ public void testWebSocketWithMultiClients() throws InterruptedException, IOExcep server.addSocket("/hello", sock1); // The listener should have been notified. assertThat(listener.opened).isNotNull(); - server.received("/hello", "message".getBytes(Charsets.UTF_8), sock1); + server.received("/hello", "message".getBytes(Charsets.UTF_8), sock1, null); // The listener should have received the message. assertThat(listener.lastMessage).isEqualTo("message"); assertThat(listener.lastClient).isEqualTo(Integer.toOctalString(socket1.hashCode())); server.addSocket("/hello", sock2); - server.received("/hello", "message2".getBytes(Charsets.UTF_8), sock2); + server.received("/hello", "message2".getBytes(Charsets.UTF_8), sock2, null); assertThat(listener.lastMessage).isEqualTo("message2"); assertThat(listener.lastClient).isEqualTo(Integer.toOctalString(socket2.hashCode())); @@ -255,7 +255,7 @@ public void testWebSocketSending() throws InterruptedException, IOException { server.addSocket("/hello", sock); // The listener should have been notified. assertThat(listener.opened).isNotNull(); - server.received("/hello", "message".getBytes(Charsets.UTF_8), sock); + server.received("/hello", "message".getBytes(Charsets.UTF_8), sock, null); // The listener should have received the message. assertThat(listener.lastMessage).isEqualTo("message"); @@ -310,7 +310,7 @@ private class MyWebSocketListener implements WebSocketListener { String closed; @Override - public void received(String uri, String client, byte[] content) { + public void received(String uri, String client, byte[] content, Context context) { this.lastMessage = new String(content, Charsets.UTF_8); this.lastClient = client; } diff --git a/core/wisdom-vertx-engine/src/test/java/org/wisdom/framework/vertx/WebSocketTest.java b/core/wisdom-vertx-engine/src/test/java/org/wisdom/framework/vertx/WebSocketTest.java index 90c41ad8c..400d05ef6 100644 --- a/core/wisdom-vertx-engine/src/test/java/org/wisdom/framework/vertx/WebSocketTest.java +++ b/core/wisdom-vertx-engine/src/test/java/org/wisdom/framework/vertx/WebSocketTest.java @@ -26,6 +26,7 @@ import org.junit.Test; import org.wisdom.api.configuration.ApplicationConfiguration; import org.wisdom.api.exceptions.ExceptionMapper; +import org.wisdom.api.http.Context; import org.wisdom.api.http.websockets.WebSocketListener; import org.wisdom.api.router.Router; import org.wisdom.framework.vertx.file.DiskFileUpload; @@ -219,7 +220,7 @@ public Spy(WisdomVertxServer server) { } @Override - public void received(String uri, String client, byte[] content) { + public void received(String uri, String client, byte[] content, Context context) { if (new String(content).equalsIgnoreCase("ping")) { server.send(uri, client, "pong"); } else {