diff --git a/avaje-jex-websockets/pom.xml b/avaje-jex-websockets/pom.xml new file mode 100644 index 00000000..b23e18ad --- /dev/null +++ b/avaje-jex-websockets/pom.xml @@ -0,0 +1,25 @@ + + 4.0.0 + + io.avaje + avaje-jex-parent + 3.3 + + avaje-jex-websocket + + + io.avaje + avaje-jex + + + io.avaje + avaje-jex-test + test + + + io.github.robaho + httpserver + test + + + diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/DWebSocket.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/DWebSocket.java new file mode 100644 index 00000000..f66b3f1d --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/DWebSocket.java @@ -0,0 +1,53 @@ +package io.avaje.jex.websocket; + +import io.avaje.jex.http.Context; +import io.avaje.jex.websocket.WsContext.WsBinaryMessage; +import io.avaje.jex.websocket.WsContext.WsClose; +import io.avaje.jex.websocket.WsContext.WsError; +import io.avaje.jex.websocket.WsContext.WsMessage; +import io.avaje.jex.websocket.WsContext.WsOpen; +import io.avaje.jex.websocket.WsContext.WsPong; +import io.avaje.jex.websocket.exception.CloseCode; +import io.avaje.jex.websocket.internal.AbstractWebSocket; + +class DWebSocket extends AbstractWebSocket { + + private final WebSocketListener listener; + private final Context ctx; + + DWebSocket(Context ctx, WebSocketListener listener) { + super(ctx.exchange()); + this.listener = listener; + this.ctx = ctx; + } + + @Override + protected void onClose(CloseCode code, String reason, boolean initiatedByRemote) { + listener.onClose(new WsClose(ctx, this, code, reason, initiatedByRemote)); + } + + @Override + protected void onMessage(WebSocketFrame frame) { + switch (frame.opCode()) { + case TEXT -> listener.onMessage(new WsMessage(ctx, this, frame, frame.textPayload())); + case BINARY -> + listener.onBinaryMessage(new WsBinaryMessage(ctx, this, frame, frame.binaryPayload())); + default -> throw new IllegalArgumentException("Unexpected value: "); + } + } + + @Override + protected void onPong(WebSocketFrame pong) { + listener.onPong(new WsPong(ctx, this, pong)); + } + + @Override + protected void onOpen() { + listener.onOpen(new WsOpen(ctx, this)); + } + + @Override + protected void onError(Exception exception) { + listener.onError(new WsError(ctx, this, exception)); + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/DWebSocketHandler.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/DWebSocketHandler.java new file mode 100644 index 00000000..4a05476a --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/DWebSocketHandler.java @@ -0,0 +1,19 @@ +package io.avaje.jex.websocket; + +import io.avaje.jex.http.Context; +import io.avaje.jex.websocket.internal.WebSocketHandler; + +class DWebSocketHandler extends WebSocketHandler { + + private final WebSocketListener listener; + + DWebSocketHandler(WebSocketListener listener) { + this.listener = listener; + } + + @Override + protected DWebSocket openWebSocket(Context exchange) { + + return new DWebSocket(exchange, listener); + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocket.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocket.java new file mode 100644 index 00000000..e8948c39 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocket.java @@ -0,0 +1,47 @@ +package io.avaje.jex.websocket; + +import io.avaje.jex.websocket.exception.CloseCode; + +/** + * Represents a WebSocket connection, providing methods to interact with the connection. + * Allows sending and receiving messages, pinging, and closing the connection. + */ +public interface WebSocket { + + /** + * Checks if the WebSocket connection is open. + * + * @return true if the connection is open, false otherwise + */ + boolean isOpen(); + + /** + * Closes the WebSocket connection with the specified close code and reason. + * + * @param code the close code indicating the reason for closure + * @param reason the reason for closing the connection + * @param initiatedByRemote true if the close was initiated by the remote endpoint, false otherwise + */ + void close(CloseCode code, String reason, boolean initiatedByRemote); + + /** + * Sends a ping frame with the specified payload to the remote endpoint. + * + * @param payload the ping payload as a byte array + */ + void ping(byte[] payload); + + /** + * Sends a binary message to the remote endpoint. + * + * @param payload the binary payload as a byte array + */ + void send(byte[] payload); + + /** + * Sends a text message to the remote endpoint. + * + * @param payload the text payload as a string + */ + void send(String payload); +} \ No newline at end of file diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketFrame.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketFrame.java new file mode 100644 index 00000000..84bed8e3 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketFrame.java @@ -0,0 +1,100 @@ +package io.avaje.jex.websocket; + +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Represents a WebSocket frame as defined by RFC 6455. Provides access to frame payload, masking, + * opcode, and frame control information. + */ +public interface WebSocketFrame { + + /** + * Returns the binary payload of the frame. + * + * @return the binary payload as a byte array + */ + byte[] binaryPayload(); + + /** + * Returns the masking key used for the frame, if present. + * + * @return the masking key as a byte array, or null if not masked + */ + byte[] maskingKey(); + + /** + * Returns the opcode of the frame. + * + * @return the opcode + */ + OpCode opCode(); + + /** + * Returns the text payload of the frame, if applicable. + * + * @return the text payload, or null if not a text frame + */ + String textPayload(); + + /** + * Indicates if this frame is the final fragment in a message. + * + * @return true if final fragment, false otherwise + */ + boolean isFin(); + + /** + * Indicates if the frame is masked. + * + * @return true if masked, false otherwise + */ + boolean isMasked(); + + /** WebSocket opcodes */ + public enum OpCode { + CONTINUATION(0), + TEXT(1), + BINARY(2), + CLOSE(8), + PING(9), + PONG(10); + + private final byte code; + private static final Map VALUES = + Arrays.stream(values()).collect(Collectors.toMap(OpCode::value, e -> e)); + + OpCode(int code) { + this.code = (byte) code; + } + + /** + * Finds the OpCode corresponding to the given byte value. + * + * @param value the opcode value + * @return the matching OpCode, or null if not found + */ + public static OpCode find(byte value) { + return VALUES.get(value); + } + + /** + * Returns the byte value of this opcode. + * + * @return the opcode value + */ + public byte value() { + return this.code; + } + + /** + * Indicates if this opcode is a control frame (close, ping, pong). + * + * @return true if control frame, false otherwise + */ + public boolean isControlFrame() { + return this == CLOSE || this == PING || this == PONG; + } + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketListener.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketListener.java new file mode 100644 index 00000000..cca01a27 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketListener.java @@ -0,0 +1,186 @@ +package io.avaje.jex.websocket; + +import java.util.function.Consumer; + +import io.avaje.jex.websocket.WsContext.WsBinaryMessage; +import io.avaje.jex.websocket.WsContext.WsClose; +import io.avaje.jex.websocket.WsContext.WsError; +import io.avaje.jex.websocket.WsContext.WsMessage; +import io.avaje.jex.websocket.WsContext.WsOpen; +import io.avaje.jex.websocket.WsContext.WsPong; + +/** + * Holds the different WebSocket handlers for a specific {@link WsHandlerEntry} or the WebSocket + * log. + */ +public interface WebSocketListener { + + /** + * Create a builder for a WebSocketListener. + * + * @return the builder + */ + static Builder builder() { + return new Builder(); + } + + /** + * Called when a binary message is received. + * + * @param binaryPayload the binary payload + */ + default void onBinaryMessage(WsBinaryMessage binaryPayload) {} + + /** + * Called when the websocket is closed. + * + * @param wsClose the close context + */ + default void onClose(WsClose wsClose) {} + + /** + * Called when a text message is received. + * + * @param message the text message + */ + default void onMessage(WsMessage message) {} + + /** + * Called when the websocket is opened. + * + * @param wsOpenContext the open context + */ + default void onOpen(WsOpen wsOpen) {} + + /** + * Called when a pong is received. + * + * @param pong the pong + */ + default void onPong(WsPong wsPong) {} + + /** + * Called when an error occurs. + * + * @param wsError the error + */ + default void onError(WsError wsError) {} + + /** A builder for creating a {@link WebSocketListener} with specific event handlers. */ + final class Builder { + private Consumer onOpen; + private Consumer onMessage; + private Consumer onBinaryMessage; + private Consumer onClose; + private Consumer onPong; + private Consumer onError; + + private Builder() {} + + /** + * Set the handler for the WebSocket open event. + * + * @param handler Consumer for {@link WsOpen} + * @return this builder + */ + public Builder onOpen(Consumer handler) { + this.onOpen = handler; + return this; + } + + /** + * Set the handler for the WebSocket text message event. + * + * @param handler Consumer for {@link WsMessage} + * @return this builder + */ + public Builder onMessage(Consumer handler) { + this.onMessage = handler; + return this; + } + + /** + * Set the handler for the WebSocket binary message event. + * + * @param handler Consumer for {@link WsBinaryMessage} + * @return this builder + */ + public Builder onBinaryMessage(Consumer handler) { + this.onBinaryMessage = handler; + return this; + } + + /** + * Set the handler for the WebSocket close event. + * + * @param handler Consumer for {@link WsClose} + * @return this builder + */ + public Builder onClose(Consumer handler) { + this.onClose = handler; + return this; + } + + /** + * Set the handler for the WebSocket pong event. + * + * @param handler Consumer for {@link WsPong} + * @return this builder + */ + public Builder onPong(Consumer handler) { + this.onPong = handler; + return this; + } + + /** + * Set the handler for the WebSocket error event. + * + * @param handler Consumer for {@link WsError} + * @return this builder + */ + public Builder onError(Consumer handler) { + this.onError = handler; + return this; + } + + /** + * Build a {@link WebSocketListener} implementation using the configured handlers. + * + * @return a new {@link WebSocketListener} instance + */ + public WebSocketListener build() { + return new WebSocketListener() { + + @Override + public void onOpen(WsOpen wsOpen) { + if (onOpen != null) onOpen.accept(wsOpen); + } + + @Override + public void onMessage(WsMessage message) { + if (onMessage != null) onMessage.accept(message); + } + + @Override + public void onBinaryMessage(WsBinaryMessage binaryPayload) { + if (onBinaryMessage != null) onBinaryMessage.accept(binaryPayload); + } + + @Override + public void onClose(WsClose wsClose) { + if (onClose != null) onClose.accept(wsClose); + } + + @Override + public void onPong(WsPong wsPong) { + if (onPong != null) onPong.accept(wsPong); + } + + @Override + public void onError(WsError wsError) { + if (onError != null) onError.accept(wsError); + } + }; + } + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java new file mode 100644 index 00000000..c7c8e272 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WebSocketPlugin.java @@ -0,0 +1,87 @@ +package io.avaje.jex.websocket; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +import io.avaje.jex.Jex; +import io.avaje.jex.Routing.HttpService; +import io.avaje.jex.security.Role; +import io.avaje.jex.spi.JexPlugin; + +/** + * A plugin for the Jex web framework to simplify the registration of WebSocket handlers. + * + *

This class provides a fluent API for mapping specific URL paths to {@link WebSocketListener} + * implementations and integrates them into the Jex application's routing. + * + *

**Note on Server Compatibility:** WebSocket support may be limited or unavailable with older versions of the default + * JDK server provider or certain third-party providers like Jetty, requiring a newer version or an + * alternative server implementation to function correctly. + */ +public class WebSocketPlugin implements JexPlugin { + + private final List handlers = new ArrayList<>(); + + /** + * Registers a WebSocket listener for a given path using a fluent builder approach. + * + * @param path The URL path to which the WebSocket endpoint will be mapped (e.g., "/ws/chat"). + * @param consumer A {@code Consumer} that configures the {@link WebSocketListener.Builder} to + * create the listener. + * @param roles Optional roles required to access this WebSocket endpoint. + * @return This {@code WebSocketPlugin} instance for method chaining. + */ + public WebSocketPlugin ws( + String path, Consumer consumer, Role... roles) { + var builder = WebSocketListener.builder(); + consumer.accept(builder); + return ws(path, builder.build(), roles); + } + + /** + * Registers a pre-built {@link WebSocketListener} for a given path. + * + * @param path The URL path to which the WebSocket endpoint will be mapped (e.g., "/ws/chat"). + * @param listener The {@link WebSocketListener} instance that handles WebSocket events. + * @param roles Optional roles required to access this WebSocket endpoint. + * @return This {@code WebSocketPlugin} instance for method chaining. + */ + public WebSocketPlugin ws(String path, WebSocketListener listener, Role... roles) { + handlers.add(r -> r.get(path, new DWebSocketHandler(listener), roles)); + return this; + } + + /** + * Applies the plugin to the Jex application. + * + *

This method adds all registered WebSocket handlers to the Jex router and checks for server + * provider compatibility before application startup. + * + * @param jex The {@link Jex} instance to which the plugin is being applied. + * @throws UnsupportedOperationException if the current server provider is detected as + * incompatible with WebSocket functionality. + */ + @Override + public void apply(Jex jex) { + jex.routing().addAll(handlers); + + var provider = jex.config().serverProvider().getClass().getPackageName(); + + if (Runtime.version().feature() < 27 && provider.indexOf("sun.") != -1 + || provider.indexOf("jetty.") != -1) { + throw new UnsupportedOperationException( + "WebSocket not supported for this version of %s, use a newer/different server provider" + .formatted(jex.config().serverProvider().getClass())); + } + } + + /** + * Creates a new instance of the {@code WebSocketPlugin}. + * + * @return A new {@code WebSocketPlugin}. + */ + public static WebSocketPlugin create() { + return new WebSocketPlugin(); + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WsContext.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WsContext.java new file mode 100644 index 00000000..b41ee266 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/WsContext.java @@ -0,0 +1,291 @@ +package io.avaje.jex.websocket; + +import java.lang.reflect.Type; + +import io.avaje.jex.http.Context; +import io.avaje.jex.websocket.exception.CloseCode; + +/** + * The abstract sealed base class that provides the context for a specific WebSocket event. + * + *

This class encapsulates the underlying {@link Context} (request context) and the {@link + * WebSocket} connection, offering methods for sending messages and controlling the session. + * Subclasses represent specific WebSocket events (e.g., Open, Message, Close, Error). + */ +public abstract sealed class WsContext { + + protected final Context ctx; + private final WebSocket ws; + + protected WsContext(Context ctx, WebSocket ws) { + this.ctx = ctx; + this.ws = ws; + } + + /** + * Serializes an object to a JSON string using the registered {@code JsonService} and sends it as + * a text frame over the socket. + * + * @param message The object to be serialized and sent. + */ + public void send(Object message) { + send(ctx.jsonService().toJsonString(message)); + } + + /** + * Sends a {@code String} message (Text Frame) over the socket. + * + * @param message The string message to send. + */ + public void send(String message) { + ws.send(message); + } + + /** + * Sends a {@code byte[]} message (Binary Frame) over the socket. + * + * @param message The binary data to send. + */ + public void send(byte[] message) { + ws.send(message); + } + + /** Sends a Ping control frame over the socket */ + public void sendPing() { + sendPing(null); + } + + /** + * Sends a Ping control frame over the socket. + * + * @param applicationData Optional application data to include in the Ping frame. + */ + public void sendPing(byte[] applicationData) { + ws.ping(applicationData != null ? applicationData : new byte[0]); + } + + /** + * Returns the underlying HTTP request {@code Context}. This provides access to request headers, + * path parameters, and attributes. + * + * @return The request {@code Context}. + */ + public Context ctx() { + return ctx; + } + + /** + * Returns the underlying {@code WebSocket} session object. + * + * @return The {@code WebSocket} session object. + */ + public WebSocket ws() { + return ws; + } + + /** Closes the WebSocket session gracefully with a default reason. */ + public void closeSession() { + ws.close(CloseCode.NORMAL_CLOSURE, "Normally closed", false); + } + + /** + * Closes the WebSocket session with a specified {@link CloseCode} and an empty reason string. + * + * @param code The {@link CloseCode} to send. + */ + public void closeSession(CloseCode code) { + ws.close(code, "", false); + } + + /** + * Closes the WebSocket session with a specified {@link CloseCode} and a descriptive reason. + * + * @param code The {@link CloseCode} to send. + * @param reason A descriptive string explaining why the session is being closed. + */ + public void closeSession(CloseCode code, String reason) { + ws.close(code, reason, false); + } + + /** + * Represents the context for an open event. This event occurs when a new connection is + * established and the handshake is complete. + */ + public static final class WsOpen extends WsContext { + WsOpen(Context ctx, WebSocket ws) { + super(ctx, ws); + } + } + + /** + * Represents the context for a Pong control frame received from the remote endpoint. Pongs are + * typically received in response to a Ping sent by this endpoint. + */ + public static final class WsPong extends WsMessageCtx { + WsPong(Context ctx, WebSocket ws, WebSocketFrame wsFrame) { + super(ctx, ws, wsFrame); + } + } + + /** + * Represents the context for an error event. This is triggered when an unhandled exception occurs + * during the lifecycle of the connection. + */ + public static final class WsError extends WsContext { + private final Exception error; + + WsError(Context ctx, WebSocket ws, Exception error) { + super(ctx, ws); + this.error = error; + } + + /** + * Gets the {@code Exception} that caused the error event. + * + * @return The underlying {@code Exception}. + */ + public Exception error() { + return error; + } + } + + /** + * Represents the context for a close event. This event is triggered when the connection is + * closed, either locally or by the remote endpoint. + */ + public static final class WsClose extends WsContext { + private final CloseCode closeCode; + private final String reason; + private final boolean initiatedByRemote; + + WsClose( + Context ctx, WebSocket ws, CloseCode closeCode, String reason, boolean initiatedByRemote) { + super(ctx, ws); + this.closeCode = closeCode; + this.reason = reason; + this.initiatedByRemote = initiatedByRemote; + } + + /** + * Gets the {@link CloseCode} provided when the connection was closed. + * + * @return The {@link CloseCode} indicating the reason for closure. + */ + public CloseCode closeCode() { + return closeCode; + } + + /** + * Gets the descriptive reason string for the close event, as provided by the closing endpoint. + * + * @return The reason string. + */ + public String reason() { + return reason; + } + + /** + * Indicates whether the close handshake was initiated by the remote endpoint (true) or by the + * local endpoint (false). + * + * @return {@code true} if closed by the remote; {@code false} if closed locally. + */ + public boolean initiatedByRemote() { + return initiatedByRemote; + } + } + + /** + * The abstract sealed base class for WebSocket contexts that involve receiving a data frame + * (e.g., Text, Binary, Pong). + */ + public abstract static sealed class WsMessageCtx extends WsContext { + private final WebSocketFrame wsFrame; + + WsMessageCtx(Context ctx, WebSocket ws, WebSocketFrame wsFrame) { + super(ctx, ws); + this.wsFrame = wsFrame; + } + + /** + * Gets the underlying raw {@code WebSocketFrame}. This is useful for inspecting frame metadata + * like opcode, RSV bits, etc. + * + * @return The raw {@code WebSocketFrame}. + */ + public WebSocketFrame wsFrame() { + return wsFrame; + } + + /** + * Indicates if this frame is the final fragment of a fragmented message. + * + * @return {@code true} if this is the final fragment (FIN bit is set), {@code false} otherwise. + */ + public boolean isFin() { + return wsFrame.isFin(); + } + } + + /** Represents the context for a binary message received from the remote endpoint. */ + public static final class WsBinaryMessage extends WsMessageCtx { + private final byte[] data; + + WsBinaryMessage(Context ctx, WebSocket ws, WebSocketFrame wsFrame, byte[] data) { + super(ctx, ws, wsFrame); + this.data = data; + } + + /** + * Gets the raw binary data (payload) of the message. + * + * @return The message content as a byte array. + */ + public byte[] data() { + return data; + } + } + + /** Represents the context for a text message received from the remote endpoint. */ + public static final class WsMessage extends WsMessageCtx { + private final String message; + + WsMessage(Context ctx, WebSocket ws, WebSocketFrame frame, String message) { + super(ctx, ws, frame); + this.message = message; + } + + /** + * Gets the text message received from the client. + * + * @return The message content as a {@code String}. + */ + public String message() { + return message; + } + + /** + * Deserializes the received JSON string message into an object of the specified {@code Type}. + * This uses the application's registered {@code JsonService}. + * + * @param The target type. + * @param type The {@code Type} (e.g., a generic type) to deserialize the message into. + * @return The deserialized object. + */ + public T messageAsClass(Type type) { + return ctx.jsonService().fromJson(type, message); + } + + /** + * Deserializes the received JSON string message into an object of the specified {@code Class}. + * + * @param The target class type. + * @param clazz The {@code Class} to deserialize the message into. + * @return The deserialized object. + * @see #messageAsClass(Type) + */ + public T messageAsClass(Class clazz) { + return messageAsClass((Type) clazz); + } + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/CloseCode.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/CloseCode.java new file mode 100644 index 00000000..db922c51 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/CloseCode.java @@ -0,0 +1,57 @@ +package io.avaje.jex.websocket.exception; + +import java.util.HashMap; +import java.util.Map; + +/** + * Websocket Close Codes. These codes are used to indicate the reason why a WebSocket connection has + * been closed. + */ +public enum CloseCode { + NORMAL_CLOSURE(1000), + GOING_AWAY(1001), + PROTOCOL_ERROR(1002), + UNSUPPORTED_DATA(1003), + NO_STATUS_RCVD(1005), + ABNORMAL_CLOSURE(1006), + INVALID_FRAME_PAYLOAD_DATA(1007), + POLICY_VIOLATION(1008), + MESSAGE_TOO_BIG(1009), + MANDATORY_EXT(1010), + INTERNAL_SERVER_ERROR(1011), + TLS_HANDSHAKE(1015); + + private final int code; + + private static final Map CODES_MAP = HashMap.newHashMap(values().length); + + static { + for (CloseCode code : values()) { + CODES_MAP.put(code.code(), code); + } + } + + CloseCode(int code) { + this.code = code; + } + + /** + * Returns the integer value of this close code. + * + * @return The integer close code. + */ + public int code() { + return this.code; + } + + /** + * Finds the {@code CloseCode} enum constant corresponding to the given integer value. + * + * @param value The integer value of the close code to find. + * @return The corresponding {@code CloseCode} enum constant, or {@code null} if no match is + * found. + */ + public static CloseCode find(int value) { + return CODES_MAP.get(value); + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/WebSocketException.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/WebSocketException.java new file mode 100644 index 00000000..ec05c149 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/exception/WebSocketException.java @@ -0,0 +1,69 @@ +package io.avaje.jex.websocket.exception; + +/** + * An unchecked exception specifically for signaling errors that occur during WebSocket + * communication. This exception wraps a standard {@link CloseCode} and a descriptive reason, making + * it easy to communicate the cause of a connection failure or protocol violation in a way that + * aligns with the WebSocket protocol. + */ +public class WebSocketException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + private final CloseCode code; + + private final String reason; + + /** + * Constructs a new {@code WebSocketException} with a specific close code and reason. The + * exception will have no cause. + * + * @param code The {@link CloseCode} that indicates the nature of the error. + * @param reason A descriptive message explaining the error. + */ + public WebSocketException(CloseCode code, String reason) { + this(code, reason, null); + } + + /** + * Constructs a new {@code WebSocketException} with a specific close code, reason, and a cause. + * + * @param code The {@link CloseCode} that indicates the nature of the error. + * @param reason A descriptive message explaining the error. + * @param cause The underlying exception that caused this {@code WebSocketException}. + */ + public WebSocketException(CloseCode code, String reason, Exception cause) { + super(code + ": " + reason, cause); + this.code = code; + this.reason = reason; + } + + /** + * Constructs a new {@code WebSocketException} from an existing exception. It defaults to {@link + * CloseCode#INTERNAL_SERVER_ERROR} for the close code, using the cause's {@code toString()} + * method for the reason. + * + * @param cause The underlying exception that caused this {@code WebSocketException}. + */ + public WebSocketException(Exception cause) { + this(CloseCode.INTERNAL_SERVER_ERROR, cause.toString(), cause); + } + + /** + * Returns the WebSocket close code associated with this exception. + * + * @return The {@link CloseCode} enum value. + */ + public CloseCode code() { + return this.code; + } + + /** + * Returns the descriptive reason associated with this exception. + * + * @return The descriptive reason string. + */ + public String reason() { + return this.reason; + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/AbstractWebSocket.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/AbstractWebSocket.java new file mode 100644 index 00000000..8f6453ea --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/AbstractWebSocket.java @@ -0,0 +1,262 @@ +package io.avaje.jex.websocket.internal; + +import static java.lang.System.Logger.Level.DEBUG; +import static java.lang.System.Logger.Level.INFO; +import static java.lang.System.Logger.Level.TRACE; + +import java.io.EOFException; + +/* + * #%L + * NanoHttpd-Websocket + * %% + * Copyright (C) 2012 - 2016 nanohttpd + * %% + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * 3. Neither the name of the nanohttpd nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED + * OF THE POSSIBILITY OF SUCH DAMAGE. + * #L% + */ + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import com.sun.net.httpserver.HttpExchange; + +import io.avaje.jex.websocket.WebSocket; +import io.avaje.jex.websocket.WebSocketFrame; +import io.avaje.jex.websocket.WebSocketFrame.OpCode; +import io.avaje.jex.websocket.exception.CloseCode; +import io.avaje.jex.websocket.exception.WebSocketException; + +public abstract class AbstractWebSocket implements WebSocket { + + private final List continuousFrames = new LinkedList<>(); + + private OpCode continuousOpCode = null; + private final InputStream in; + private Lock lock = new ReentrantLock(); + protected final System.Logger log = System.getLogger("io.avaje.jex.websocket"); + private final OutputStream out; + private volatile State state = State.UNCONNECTED; + private final URI uri; + + protected AbstractWebSocket(HttpExchange exchange) { + this.uri = exchange.getRequestURI(); + log.log(INFO, "connecting websocket {0}", uri); + this.state = State.CONNECTING; + this.in = exchange.getRequestBody(); + this.out = exchange.getResponseBody(); + } + + @Override + public void close(CloseCode code, String reason, boolean initiatedByRemote) { + log.log(INFO, "closing websocket {0}", uri); + + var oldState = this.state; + this.state = State.CLOSING; + if (oldState == State.OPEN) { + sendFrame(new CloseFrame(code, reason)); + } else { + doClose(code, reason, initiatedByRemote); + } + } + + void doClose(CloseCode code, String reason, boolean initiatedByRemote) { + if (this.state == State.CLOSED) { + return; + } + try (in; out) { + // just close the streams + } catch (IOException expected) { + // Expected + } + this.state = State.CLOSED; + onClose(code, reason, initiatedByRemote); + } + + private void handleCloseFrame(WebSocketFrame frame) { + var code = CloseCode.NORMAL_CLOSURE; + var reason = ""; + if (frame instanceof CloseFrame cf) { + code = cf.getCloseCode(); + reason = cf.getCloseReason(); + } + log.log( + TRACE, + "handleCloseFrame: {0}, code={1}, reason={2}, state {3}", + uri, + code, + reason, + this.state); + if (this.state == State.CLOSING) { + // Answer for my requested close + doClose(code, reason, false); + } else { + close(code, reason, true); + } + } + + private void handleFrameFragment(WebSocketFrame frame) { + if (frame.opCode() != OpCode.CONTINUATION) { + // First + if (this.continuousOpCode != null) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, "Previous continuous frame sequence not completed."); + } + this.continuousOpCode = frame.opCode(); + this.continuousFrames.clear(); + this.continuousFrames.add(frame); + } else if (frame.isFin()) { + // Last + if (this.continuousOpCode == null) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, "Continuous frame sequence was not started."); + } + this.continuousFrames.add(frame); + onMessage(new WSFrame(this.continuousOpCode, this.continuousFrames)); + this.continuousOpCode = null; + this.continuousFrames.clear(); + } else if (this.continuousOpCode == null) { + // Unexpected + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, "Continuous frame sequence was not started."); + } else { + // Intermediate + this.continuousFrames.add(frame); + } + } + + private void handleWebsocketFrame(WebSocketFrame frame) { + onFrameReceived(frame); + if (frame.opCode() == OpCode.CLOSE) { + handleCloseFrame(frame); + } else if (frame.opCode() == OpCode.PING) { + sendFrame(new WSFrame(OpCode.PONG, true, frame.binaryPayload())); + } else if (frame.opCode() == OpCode.PONG) { + onPong(frame); + } else if (!frame.isFin() || frame.opCode() == OpCode.CONTINUATION) { + handleFrameFragment(frame); + } else if (this.continuousOpCode != null) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, "Continuous frame sequence not completed."); + } else if (frame.opCode() == OpCode.TEXT || frame.opCode() == OpCode.BINARY) { + onMessage(frame); + } else { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, "Non control or continuous frame expected."); + } + } + + @Override + public boolean isOpen() { + return state == State.OPEN; + } + + protected void onFrameReceived(WebSocketFrame frame) { + log.log(TRACE, "frame received: {0}", frame); + } + + /** + * Debug method. Do not Override unless for debug purposes!
+ * This method is called before actually sending the frame. + * + * @param frame The sent WebSocket Frame. + */ + protected void onFrameSent(WebSocketFrame frame) { + log.log(TRACE, "frame sent: {0}", frame); + } + + protected abstract void onClose(CloseCode code, String reason, boolean initiatedByRemote); + + protected abstract void onError(Exception exception); + + protected abstract void onMessage(WebSocketFrame message) throws WebSocketException; + + protected abstract void onOpen() throws WebSocketException; + + protected abstract void onPong(WebSocketFrame pong) throws WebSocketException; + + @Override + public void ping(byte[] payload) { + sendFrame(new WSFrame(OpCode.PING, true, payload)); + } + + void readWebsocket() { + try { + state = State.OPEN; + log.log(DEBUG, "websocket open {0}", uri); + onOpen(); + while (this.state == State.OPEN) { + handleWebsocketFrame(WSFrame.read(in)); + } + } catch (EOFException e) { + log.log(TRACE, "exception on websocket", e); + onError(e); + doClose(CloseCode.ABNORMAL_CLOSURE, e.toString(), false); + } catch (Exception e) { + onError(e); + if (e instanceof WebSocketException wse) { + doClose(wse.code(), wse.reason(), false); + } else { + doClose(CloseCode.ABNORMAL_CLOSURE, e.toString(), false); + } + } finally { + doClose( + CloseCode.INTERNAL_SERVER_ERROR, + "Handler terminated without closing the connection.", + false); + log.log(TRACE, "readWebsocket() exiting {0}", uri); + } + } + + @Override + public void send(byte[] payload) { + sendFrame(new WSFrame(OpCode.BINARY, true, payload)); + } + + @Override + public void send(String payload) { + sendFrame(new WSFrame(OpCode.TEXT, true, payload)); + } + + public void sendFrame(WSFrame frame) { + lock.lock(); + try { + onFrameSent(frame); + frame.write(this.out); + } catch (IOException e) { + throw new UncheckedIOException(e); + } finally { + lock.unlock(); + } + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/CloseFrame.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/CloseFrame.java new file mode 100644 index 00000000..8c621cf1 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/CloseFrame.java @@ -0,0 +1,44 @@ +package io.avaje.jex.websocket.internal; + +import io.avaje.jex.websocket.exception.CloseCode; + +public final class CloseFrame extends WSFrame { + + private static byte[] generatePayload(CloseCode code, String closeReason) { + if (code != null) { + var reasonBytes = text2Binary(closeReason); + var payload = new byte[reasonBytes.length + 2]; + payload[0] = (byte) (code.code() >> 8 & 0xFF); + payload[1] = (byte) (code.code() & 0xFF); + System.arraycopy(reasonBytes, 0, payload, 2, reasonBytes.length); + return payload; + } + return new byte[0]; + } + + private CloseCode closeCode; + + private String closeReason; + + public CloseFrame(CloseCode code, String closeReason) { + super(OpCode.CLOSE, true, generatePayload(code, closeReason)); + } + + CloseFrame(WSFrame wrap) { + super(wrap); + assert wrap.opCode() == OpCode.CLOSE; + if (wrap.binaryPayload().length >= 2) { + this.closeCode = + CloseCode.find((wrap.binaryPayload()[0] & 0xFF) << 8 | wrap.binaryPayload()[1] & 0xFF); + this.closeReason = binary2Text(binaryPayload(), 2, binaryPayload().length - 2); + } + } + + public CloseCode getCloseCode() { + return this.closeCode; + } + + public String getCloseReason() { + return this.closeReason; + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/LICENSE.md b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/LICENSE.md new file mode 100644 index 00000000..4b21b270 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/LICENSE.md @@ -0,0 +1,14 @@ +Most of the code in the websockets package is covered under the following license: + +Copyright (c) 2012-2013 by Paul S. Hawke, 2001,2005-2013 by Jarno Elonen, 2010 by Konstantinos Togias +All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. + +* Neither the name of the NanoHttpd organization nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/State.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/State.java new file mode 100644 index 00000000..8b9d7160 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/State.java @@ -0,0 +1,42 @@ +package io.avaje.jex.websocket.internal; + +/* + * #%L + * NanoHttpd-Websocket + * %% + * Copyright (C) 2012 - 2016 nanohttpd + * %% + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * 3. Neither the name of the nanohttpd nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED + * OF THE POSSIBILITY OF SUCH DAMAGE. + * #L% + */ + +public enum State { + UNCONNECTED, + CONNECTING, + OPEN, + CLOSING, + CLOSED +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/Util.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/Util.java new file mode 100644 index 00000000..a5a168d2 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/Util.java @@ -0,0 +1,59 @@ +package io.avaje.jex.websocket.internal; + +/* + * #%L + * NanoHttpd-Websocket + * %% + * Copyright (C) 2012 - 2016 nanohttpd + * %% + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * 3. Neither the name of the nanohttpd nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED + * OF THE POSSIBILITY OF SUCH DAMAGE. + * #L% + */ + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Base64; + +class Util { + + public static final String HEADER_UPGRADE = "Upgrade"; + public static final String HEADER_UPGRADE_VALUE = "websocket"; + public static final String HEADER_CONNECTION = "Connection"; + public static final String HEADER_WEBSOCKET_VERSION = "sec-websocket-version"; + public static final String HEADER_WEBSOCKET_VERSION_VALUE = "13"; + public static final String HEADER_WEBSOCKET_KEY = "sec-websocket-key"; + public static final String HEADER_WEBSOCKET_ACCEPT = "sec-websocket-accept"; + public static final String HEADER_WEBSOCKET_PROTOCOL = "sec-websocket-protocol"; + private static final String WEBSOCKET_KEY_MAGIC = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + + public static String makeAcceptKey(String key) throws NoSuchAlgorithmException { + var md = MessageDigest.getInstance("SHA-1"); + var text = key + Util.WEBSOCKET_KEY_MAGIC; + md.update(text.getBytes(), 0, text.length()); + var sha1hash = md.digest(); + return Base64.getEncoder().encodeToString(sha1hash); + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WSFrame.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WSFrame.java new file mode 100644 index 00000000..f41d4134 --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WSFrame.java @@ -0,0 +1,388 @@ +package io.avaje.jex.websocket.internal; + +/* + * #%L + * NanoHttpd-Websocket + * %% + * Copyright (C) 2012 - 2016 nanohttpd + * %% + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * 3. Neither the name of the nanohttpd nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED + * OF THE POSSIBILITY OF SUCH DAMAGE. + * #L% + */ + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; + +import io.avaje.jex.websocket.WebSocketFrame; +import io.avaje.jex.websocket.exception.CloseCode; +import io.avaje.jex.websocket.exception.WebSocketException; + +public sealed class WSFrame implements WebSocketFrame permits CloseFrame { + + static final Charset TEXT_CHARSET = StandardCharsets.UTF_8; + + static String binary2Text(byte[] payload) { + return new String(payload, WSFrame.TEXT_CHARSET); + } + + static String binary2Text(byte[] payload, int offset, int length) { + return new String(payload, offset, length, WSFrame.TEXT_CHARSET); + } + + private static int checkedRead(int read) throws IOException { + if (read < 0) { + throw new EOFException(); + } + return read; + } + + static WSFrame read(InputStream in) throws IOException { + var head = (byte) checkedRead(in.read()); + var fin = (head & 0x80) != 0; + var opCode = OpCode.find((byte) (head & 0x0F)); + if ((head & 0x70) != 0) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, + "The reserved bits (" + Integer.toBinaryString(head & 0x70) + ") must be 0."); + } + if (opCode == null) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, + "Received frame with reserved/unknown opcode " + (head & 0x0F) + "."); + } + if (opCode.isControlFrame() && !fin) { + throw new WebSocketException(CloseCode.PROTOCOL_ERROR, "Fragmented control frame."); + } + + var frame = new WSFrame(opCode, fin); + frame.readPayloadInfo(in); + frame.readPayload(in); + if (frame.opCode() == OpCode.CLOSE) { + return new CloseFrame(frame); + } + return frame; + } + + static byte[] text2Binary(String payload) { + return payload.getBytes(WSFrame.TEXT_CHARSET); + } + + private OpCode opCode; + + private boolean fin; + + private byte[] maskingKey; + + private byte[] payload; + + // --------------------------------GETTERS--------------------------------- + + private int payloadLength; + + private String payloadString; + + private WSFrame(OpCode opCode, boolean fin) { + setOpCode(opCode); + setFin(fin); + } + + public WSFrame(OpCode opCode, boolean fin, byte[] payload) { + this(opCode, fin, payload, null); + } + + public WSFrame(OpCode opCode, boolean fin, byte[] payload, byte[] maskingKey) { + this(opCode, fin); + setMaskingKey(maskingKey); + setBinaryPayload(payload); + } + + public WSFrame(OpCode opCode, boolean fin, String payload) { + this(opCode, fin, payload, null); + } + + public WSFrame(OpCode opCode, boolean fin, String payload, byte[] maskingKey) { + this(opCode, fin); + setMaskingKey(maskingKey); + setTextPayload(payload); + } + + public WSFrame(OpCode opCode, List fragments) throws WebSocketException { + setOpCode(opCode); + setFin(true); + + var length = 0L; + for (var inter : fragments) { + length += inter.binaryPayload().length; + } + if (length < 0 || length > Integer.MAX_VALUE) { + throw new WebSocketException( + CloseCode.MESSAGE_TOO_BIG, "Max frame length has been exceeded."); + } + this.payloadLength = (int) length; + var payload = new byte[this.payloadLength]; + var offset = 0; + for (var inter : fragments) { + System.arraycopy(inter.binaryPayload(), 0, payload, offset, inter.binaryPayload().length); + offset += inter.binaryPayload().length; + } + setBinaryPayload(payload); + } + + public WSFrame(WSFrame clone) { + setOpCode(clone.opCode()); + setFin(clone.isFin()); + setBinaryPayload(clone.binaryPayload()); + setMaskingKey(clone.maskingKey()); + } + + @Override + public byte[] binaryPayload() { + return this.payload; + } + + @Override + public byte[] maskingKey() { + return this.maskingKey; + } + + @Override + public OpCode opCode() { + return this.opCode; + } + + // --------------------------------SERIALIZATION--------------------------- + + @Override + public String textPayload() { + if (this.payloadString == null) { + this.payloadString = binary2Text(binaryPayload()); + } + return this.payloadString; + } + + @Override + public boolean isFin() { + return this.fin; + } + + @Override + public boolean isMasked() { + return this.maskingKey != null && this.maskingKey.length == 4; + } + + private String payloadToString() { + if (this.payload == null) { + return "null"; + } + final var sb = new StringBuilder(); + sb.append('[').append(this.payload.length).append("b] "); + if (opCode() == OpCode.TEXT) { + var text = textPayload(); + if (text.length() > 100) { + sb.append(text.substring(0, 100)).append("..."); + } else { + sb.append(text); + } + } else { + sb.append("0x"); + for (var i = 0; i < Math.min(this.payload.length, 50); ++i) { + sb.append(Integer.toHexString(this.payload[i] & 0xFF)); + } + if (this.payload.length > 50) { + sb.append("..."); + } + } + return sb.toString(); + } + + private void readPayload(InputStream in) throws IOException { + this.payload = new byte[this.payloadLength]; + var read = 0; + while (read < this.payloadLength) { + read += checkedRead(in.read(this.payload, read, this.payloadLength - read)); + } + + if (isMasked()) { + for (var i = 0; i < this.payload.length; i++) { + this.payload[i] ^= this.maskingKey[i % 4]; + } + } + + // Test for Unicode errors + if (opCode() == OpCode.TEXT) { + this.payloadString = binary2Text(binaryPayload()); + } + } + + // --------------------------------ENCODING-------------------------------- + + private void readPayloadInfo(InputStream in) throws IOException { + var b = (byte) checkedRead(in.read()); + var masked = (b & 0x80) != 0; + + this.payloadLength = (byte) (0x7F & b); + if (this.payloadLength == 126) { + // checkedRead must return int for this to work + this.payloadLength = (checkedRead(in.read()) << 8 | checkedRead(in.read())) & 0xFFFF; + if (this.payloadLength < 126) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, + "Invalid data frame 2byte length. (not using minimal length encoding)"); + } + } else if (this.payloadLength == 127) { + var length = + (long) checkedRead(in.read()) << 56 + | (long) checkedRead(in.read()) << 48 + | (long) checkedRead(in.read()) << 40 + | (long) checkedRead(in.read()) << 32 + | checkedRead(in.read()) << 24 + | checkedRead(in.read()) << 16 + | checkedRead(in.read()) << 8 + | checkedRead(in.read()); + if (length < 65536) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, + "Invalid data frame 4byte length. (not using minimal length encoding)"); + } + if (length < 0 || length > Integer.MAX_VALUE) { + throw new WebSocketException( + CloseCode.MESSAGE_TOO_BIG, "Max frame length has been exceeded."); + } + this.payloadLength = (int) length; + } + + if (this.opCode.isControlFrame()) { + if (this.payloadLength > 125) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, "Control frame with payload length > 125 bytes."); + } + if (this.opCode == OpCode.CLOSE && this.payloadLength == 1) { + throw new WebSocketException( + CloseCode.PROTOCOL_ERROR, "Received close frame with payload len 1."); + } + } + + if (masked) { + this.maskingKey = new byte[4]; + var read = 0; + while (read < this.maskingKey.length) { + read += checkedRead(in.read(this.maskingKey, read, this.maskingKey.length - read)); + } + } + } + + void setBinaryPayload(byte[] payload) { + this.payload = payload; + this.payloadLength = payload.length; + this.payloadString = null; + } + + void setFin(boolean fin) { + this.fin = fin; + } + + void setMaskingKey(byte[] maskingKey) { + if (maskingKey != null && maskingKey.length != 4) { + throw new IllegalArgumentException( + "MaskingKey " + Arrays.toString(maskingKey) + " hasn't length 4"); + } + this.maskingKey = maskingKey; + } + + void setOpCode(OpCode opcode) { + this.opCode = opcode; + } + + void setTextPayload(String payload) { + this.payload = text2Binary(payload); + this.payloadLength = payload.length(); + this.payloadString = payload; + } + + // --------------------------------CONSTANTS------------------------------- + + void setUnmasked() { + setMaskingKey(null); + } + + @Override + public String toString() { + final var sb = new StringBuilder("WS["); + sb.append(opCode()); + sb.append(", ").append(isFin() ? "fin" : "inter"); + sb.append(", ").append(isMasked() ? "masked" : "unmasked"); + sb.append(", ").append(payloadToString()); + sb.append(']'); + return sb.toString(); + } + + // ------------------------------------------------------------------------ + void write(OutputStream out) throws IOException { + byte header = 0; + if (this.fin) { + header |= 0x80; + } + header |= this.opCode.value() & 0x0F; + out.write(header); + + this.payloadLength = binaryPayload().length; + if (this.payloadLength <= 125) { + out.write(isMasked() ? 0x80 | (byte) this.payloadLength : (byte) this.payloadLength); + } else { + if (this.payloadLength <= 0xFFFF) { + out.write(isMasked() ? 0xFE : 126); + } else { + out.write(isMasked() ? 0xFF : 127); + out.write(this.payloadLength >>> 56 & 0); // integer only + // contains + // 31 bit + out.write(this.payloadLength >>> 48 & 0); + out.write(this.payloadLength >>> 40 & 0); + out.write(this.payloadLength >>> 32 & 0); + out.write(this.payloadLength >>> 24); + out.write(this.payloadLength >>> 16); + } + out.write(this.payloadLength >>> 8); + out.write(this.payloadLength); + } + + if (isMasked()) { + out.write(this.maskingKey); + for (var i = 0; i < this.payloadLength; i++) { + out.write(binaryPayload()[i] ^ this.maskingKey[i % 4]); + } + } else { + out.write(binaryPayload()); + } + out.flush(); + } +} diff --git a/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WebSocketHandler.java b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WebSocketHandler.java new file mode 100644 index 00000000..de4ca60b --- /dev/null +++ b/avaje-jex-websockets/src/main/java/io/avaje/jex/websocket/internal/WebSocketHandler.java @@ -0,0 +1,70 @@ +package io.avaje.jex.websocket.internal; + +import java.io.IOException; +import java.security.NoSuchAlgorithmException; + +import com.sun.net.httpserver.Headers; + +import io.avaje.jex.http.BadRequestException; +import io.avaje.jex.http.Context; +import io.avaje.jex.http.ExchangeHandler; +import io.avaje.jex.http.HttpResponseException; +import io.avaje.jex.http.HttpStatus; +import io.avaje.jex.http.InternalServerErrorException; + +public abstract class WebSocketHandler implements ExchangeHandler { + + @Override + public void handle(Context ctx) throws IOException { + var headers = ctx.requestHeaders(); + + if (!isWebsocketRequested(headers)) { + throw new HttpResponseException(HttpStatus.UPGRADE_REQUIRED_426, "Not a websocket request"); + } + + if (!Util.HEADER_WEBSOCKET_VERSION_VALUE.equalsIgnoreCase( + headers.getFirst(Util.HEADER_WEBSOCKET_VERSION)) + || !headers.containsKey(Util.HEADER_WEBSOCKET_KEY)) { + throw new BadRequestException( + "Invalid Websocket-Version " + headers.getFirst(Util.HEADER_WEBSOCKET_VERSION)); + } + + var webSocket = openWebSocket(ctx); + + try { + ctx.header( + Util.HEADER_WEBSOCKET_ACCEPT, + Util.makeAcceptKey(headers.getFirst(Util.HEADER_WEBSOCKET_KEY))); + } catch (NoSuchAlgorithmException e) { + throw new InternalServerErrorException( + "The SHA-1 Algorithm required for websockets is not available on the server."); + } + + if (headers.containsKey(Util.HEADER_WEBSOCKET_PROTOCOL)) { + ctx.header( + Util.HEADER_WEBSOCKET_PROTOCOL, + headers.getFirst(Util.HEADER_WEBSOCKET_PROTOCOL).split(",")[0]); + } + + ctx.header(Util.HEADER_UPGRADE, Util.HEADER_UPGRADE_VALUE); + ctx.header(Util.HEADER_CONNECTION, Util.HEADER_UPGRADE); + ctx.writeEmpty(101); + + // this won't return until websocket is closed + webSocket.readWebsocket(); + } + + private static boolean isWebsocketRequested(Headers headers) { + // check if Upgrade connection + var values = headers.get(Util.HEADER_CONNECTION); + if (values == null + || values.stream().filter(Util.HEADER_UPGRADE::equalsIgnoreCase).findAny().isEmpty()) { + return false; + } + // check for proper upgrade type + var upgrade = headers.getFirst(Util.HEADER_UPGRADE); + return Util.HEADER_UPGRADE_VALUE.equalsIgnoreCase(upgrade); + } + + protected abstract AbstractWebSocket openWebSocket(Context exchange); +} diff --git a/avaje-jex-websockets/src/main/java/module-info.java b/avaje-jex-websockets/src/main/java/module-info.java new file mode 100644 index 00000000..598e7a5a --- /dev/null +++ b/avaje-jex-websockets/src/main/java/module-info.java @@ -0,0 +1,23 @@ +/** + * Defines the Static Content API for serving static resources with Jex - see {@link io.avaje.jex.staticcontent.StaticContent}. + * + *

{@code
+ * var staticContent = StaticContentService.createCP("/public").httpPath("/").directoryIndex("index.html");
+ * final Jex.Server app = Jex.create()
+ *   .plugin(staticContent)
+ *   .port(8080)
+ *   .start();
+ *
+ * app.shutdown();
+ *
+ * }
+ */ +module io.avaje.jex.websocket { + + exports io.avaje.jex.websocket; + exports io.avaje.jex.websocket.exception; + + requires transitive io.avaje.jex; + requires static java.logging; + +} diff --git a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java new file mode 100644 index 00000000..4679ab25 --- /dev/null +++ b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/EchoWebSocketHandler.java @@ -0,0 +1,25 @@ +package io.avaje.jex.websocket.internal; + +import io.avaje.jex.websocket.WebSocketListener; +import io.avaje.jex.websocket.WsContext.WsError; +import io.avaje.jex.websocket.WsContext.WsMessage; + +public class EchoWebSocketHandler implements WebSocketListener { + + private StringBuilder sb = new StringBuilder(); + + @Override + public void onMessage(WsMessage message) { + sb.append(message.message()); + if (message.isFin()) { + String msg = sb.toString(); + sb = new StringBuilder(); + message.send(msg); + } + } + + @Override + public void onError(WsError wsError) { + wsError.error().printStackTrace(); + } +} diff --git a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketClientUtil.java b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketClientUtil.java new file mode 100644 index 00000000..94e7ee4c --- /dev/null +++ b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketClientUtil.java @@ -0,0 +1,75 @@ +package io.avaje.jex.websocket.internal; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.WebSocket; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import io.avaje.jex.websocket.exception.CloseCode; + +public class WebSocketClientUtil { + + public static WebSocket createWSC( + int port, String path, final Consumer onTextCallback, Runnable onCloseCallback) + throws InterruptedException { + + HttpClient client = HttpClient.newHttpClient(); + + CountDownLatch waitForOpen = new CountDownLatch(1); + + CompletableFuture future = + client + .newWebSocketBuilder() + .buildAsync( + URI.create("ws://localhost:" + port + path), + new WebSocket.Listener() { + StringBuilder text = new StringBuilder(); + + @Override + public CompletionStage onText( + WebSocket webSocket, CharSequence data, boolean last) { + text.append(data); + if (last) { + onTextCallback.accept(text.toString()); + text = new StringBuilder(); + } + webSocket.request(1); + webSocket.sendClose(CloseCode.NORMAL_CLOSURE.code(), "cya"); + return null; + } + + @Override + public CompletionStage onClose( + WebSocket webSocket, int statusCode, String reason) { + if (onCloseCallback != null) { + onCloseCallback.run(); + } + return null; + } + + @Override + public void onError(WebSocket webSocket, Throwable error) { + if (onCloseCallback != null) { + onCloseCallback.run(); + } + error.printStackTrace(); + } + + @Override + public void onOpen(WebSocket webSocket) { + waitForOpen.countDown(); + webSocket.request(1); + } + }); + + WebSocket ws = future.join(); + if (!waitForOpen.await(5, TimeUnit.SECONDS)) { + throw new IllegalStateException("websocket did not open"); + } + return ws; + } +} diff --git a/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketTest.java b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketTest.java new file mode 100644 index 00000000..bdaa034f --- /dev/null +++ b/avaje-jex-websockets/src/test/java/io/avaje/jex/websocket/internal/WebSocketTest.java @@ -0,0 +1,75 @@ +package io.avaje.jex.websocket.internal; + +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.logging.ConsoleHandler; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.avaje.jex.Jex; +import io.avaje.jex.test.TestPair; +import io.avaje.jex.websocket.WebSocketPlugin; + +public class WebSocketTest { + + static { + // System.setProperty("jdk.httpclient.HttpClient.log", "all"); + // System.setProperty("jdk.internal.httpclient.websocket.debug", "true"); + } + + private static final String path = "/ws"; + + TestPair server; + + @BeforeEach + void setUp() { + + var jex = Jex.create(); + + WebSocketPlugin p = WebSocketPlugin.create(); + p.ws(path, new EchoWebSocketHandler()); + jex.plugin(p); + server = TestPair.create(jex); + + Logger logger = Logger.getLogger(WebSocketTest.class.getName()); + ConsoleHandler ch = new ConsoleHandler(); + logger.setLevel(Level.ALL); + ch.setLevel(Level.ALL); + logger.addHandler(ch); + } + + @AfterEach + public void tearDown() { + server.shutdown(); + } + + @Test + public void testEcho() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + var client = + WebSocketClientUtil.createWSC( + server.port(), + path, + s -> { + if ("a_message".equals(s)) { + latch.countDown(); + } else { + fail("received wrong message"); + } + }, + null); + + client.sendText("a_message", true); + + if (!latch.await(5, TimeUnit.SECONDS)) { + fail("did not receive message"); + } + System.err.println("closing client"); + } +} diff --git a/avaje-jex/src/main/java/io/avaje/jex/core/JdkContext.java b/avaje-jex/src/main/java/io/avaje/jex/core/JdkContext.java index 0aeae922..32375b0d 100644 --- a/avaje-jex/src/main/java/io/avaje/jex/core/JdkContext.java +++ b/avaje-jex/src/main/java/io/avaje/jex/core/JdkContext.java @@ -541,6 +541,15 @@ public void write(String content) { write(content.getBytes(StandardCharsets.UTF_8)); } + @Override + public void writeEmpty(int statusCode) { + try { + exchange.sendResponseHeaders(statusCode, -1); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + @Override public JsonService jsonService() { return mgr.jsonService(); diff --git a/avaje-jex/src/main/java/io/avaje/jex/core/json/JacksonJsonService.java b/avaje-jex/src/main/java/io/avaje/jex/core/json/JacksonJsonService.java index ac8ee4b5..dac3e48e 100644 --- a/avaje-jex/src/main/java/io/avaje/jex/core/json/JacksonJsonService.java +++ b/avaje-jex/src/main/java/io/avaje/jex/core/json/JacksonJsonService.java @@ -24,7 +24,8 @@ public final class JacksonJsonService implements JsonService { /** Create with defaults for Jackson */ public JacksonJsonService() { - this.mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + this.mapper = + new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } /** Create with a Jackson instance that might have custom configuration. */ @@ -53,7 +54,8 @@ public T fromJson(Type type, byte[] data) { } private JavaType javaType(Type type) { - return javaTypes.computeIfAbsent(type.getTypeName(), k -> mapper.getTypeFactory().constructType(type)); + return javaTypes.computeIfAbsent( + type.getTypeName(), k -> mapper.getTypeFactory().constructType(type)); } @Override @@ -101,4 +103,14 @@ private void write(Iterator iterator, final JsonGenerator generator) { throw new UncheckedIOException(e); } } + + @Override + public T fromJson(Type type, String data) { + try { + final var javaType = javaType(type); + return mapper.readValue(data, javaType); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } diff --git a/avaje-jex/src/main/java/io/avaje/jex/core/json/JsonbJsonService.java b/avaje-jex/src/main/java/io/avaje/jex/core/json/JsonbJsonService.java index e07f4bc3..ac6dab76 100644 --- a/avaje-jex/src/main/java/io/avaje/jex/core/json/JsonbJsonService.java +++ b/avaje-jex/src/main/java/io/avaje/jex/core/json/JsonbJsonService.java @@ -61,4 +61,9 @@ public void toJsonStream(Iterator iterator, OutputStream os) { } } } + + @Override + public T fromJson(Type type, String data) { + return jsonb.type(type).fromJson(data); + } } diff --git a/avaje-jex/src/main/java/io/avaje/jex/http/Context.java b/avaje-jex/src/main/java/io/avaje/jex/http/Context.java index 9d01a098..66257626 100644 --- a/avaje-jex/src/main/java/io/avaje/jex/http/Context.java +++ b/avaje-jex/src/main/java/io/avaje/jex/http/Context.java @@ -556,6 +556,14 @@ default String userAgent() { return header(Constants.USER_AGENT); } + /** Writes Nothing. */ + void writeEmpty(int statusCode); + + /** Writes Nothing. */ + default void writeEmpty(HttpStatus statusCode) { + writeEmpty(statusCode.status()); + } + /** * Writes the given bytes directly to the response. * diff --git a/avaje-jex/src/main/java/io/avaje/jex/spi/JsonService.java b/avaje-jex/src/main/java/io/avaje/jex/spi/JsonService.java index bab6d9f0..2325bd2d 100644 --- a/avaje-jex/src/main/java/io/avaje/jex/spi/JsonService.java +++ b/avaje-jex/src/main/java/io/avaje/jex/spi/JsonService.java @@ -35,10 +35,19 @@ public non-sealed interface JsonService extends JexExtension { String toJsonString(Object bean); /** - * Deserializes a json input stream and deserializes it into a Java object of the specified type. + * **Read a Java Object from JSON string** + * + *

Deserializes a Java object from a JSON string * * @param type the Type object of the desired type * @param is the input stream containing the JSON data + * @return the serialized JSON string + */ + T fromJson(Type type, String string); + + /** + * Deserializes a json input stream and deserializes it into a Java object of the specified type. + * * @return the deserialized object */ T fromJson(Type type, InputStream is); diff --git a/pom.xml b/pom.xml index 516687d8..372d43f9 100644 --- a/pom.xml +++ b/pom.xml @@ -48,6 +48,7 @@ avaje-jex-static-content avaje-jex-test avaje-jex-ssl + avaje-jex-websockets