From f46a33b5f81d4ce867c228c7fc810801c0e80b09 Mon Sep 17 00:00:00 2001 From: "Taro L. Saito" Date: Fri, 19 Jun 2026 23:15:45 +0900 Subject: [PATCH 1/2] =?UTF-8?q?feature:=20WebSocket=20ping/pong=20heartbea?= =?UTF-8?q?t=20=E2=80=94=20Node=20server=20(PR=202)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add the heartbeat to the Node WebSocket server, building on the shared WebSocketHeartbeat + dispatch onActivity hook (#584). - NodeWebSocket.accept: when pingIntervalMillis > 0, start a setInterval that drives the heartbeat (sendPing on idle / ctx.close(1011) on unanswered); reset on any inbound frame via the dispatch onActivity hook; clearInterval in notifyClose. sendPing added to NodeWebSocketContext. - NodeServerConfig.webSocketPingIntervalMillis (0 = off) + builder, threaded through handleUpgrade/gateAndAccept/accept. Test: a Node integration test — server pings every 150ms, the global WebSocket runtime auto-pongs, and the live-but-idle connection is not reaped. uniJS/test: 1406 tests, 0 failed. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../scala/wvlet/uni/http/NodeHttpServer.scala | 3 +- .../scala/wvlet/uni/http/NodeServer.scala | 9 ++- .../scala/wvlet/uni/http/NodeWebSocket.scala | 64 ++++++++++++++++--- .../uni/http/JSWebSocketClientTest.scala | 32 ++++++++++ 4 files changed, 96 insertions(+), 12 deletions(-) diff --git a/uni/.js/src/main/scala/wvlet/uni/http/NodeHttpServer.scala b/uni/.js/src/main/scala/wvlet/uni/http/NodeHttpServer.scala index 719462e8..4d4bb13f 100644 --- a/uni/.js/src/main/scala/wvlet/uni/http/NodeHttpServer.scala +++ b/uni/.js/src/main/scala/wvlet/uni/http/NodeHttpServer.scala @@ -93,7 +93,8 @@ class NodeHttpServer(config: NodeServerConfig) extends HttpServer with LogSuppor socket, head, config.webSocketRoutes, - config.webSocketMaxFrameSize + config.webSocketMaxFrameSize, + config.webSocketPingIntervalMillis ) server.applyDynamic("on")("upgrade", onUpgrade) diff --git a/uni/.js/src/main/scala/wvlet/uni/http/NodeServer.scala b/uni/.js/src/main/scala/wvlet/uni/http/NodeServer.scala index 80039bfa..b78e84dc 100644 --- a/uni/.js/src/main/scala/wvlet/uni/http/NodeServer.scala +++ b/uni/.js/src/main/scala/wvlet/uni/http/NodeServer.scala @@ -50,7 +50,10 @@ case class NodeServerConfig( // WebSocket routes, matched by path during the HTTP upgrade handshake override val webSocketRoutes: Seq[WebSocketRoute] = Nil, // Maximum size (bytes) of an inbound WebSocket message - webSocketMaxFrameSize: Int = 1024 * 1024 + webSocketMaxFrameSize: Int = 1024 * 1024, + // Ping/pong heartbeat interval for WebSocket connections (0 disables it): the server pings an + // idle connection and closes it if the peer stops responding to pings. + webSocketPingIntervalMillis: Int = 0 ) extends HttpServerConfig: def withName(name: String): NodeServerConfig = copy(name = name) @@ -91,6 +94,10 @@ case class NodeServerConfig( require(sizeInBytes > 0, "webSocketMaxFrameSize must be positive") copy(webSocketMaxFrameSize = sizeInBytes) + def withWebSocketPingIntervalMillis(millis: Int): NodeServerConfig = + require(millis >= 0, "webSocketPingIntervalMillis must be >= 0") + copy(webSocketPingIntervalMillis = millis) + /** * Start the server and return the running server instance. Note that on Node.js the socket bind * is asynchronous: use [[NodeHttpServer.whenReady]] (or [[startAndAwait]]) before reading diff --git a/uni/.js/src/main/scala/wvlet/uni/http/NodeWebSocket.scala b/uni/.js/src/main/scala/wvlet/uni/http/NodeWebSocket.scala index 48b142df..efb48c8a 100644 --- a/uni/.js/src/main/scala/wvlet/uni/http/NodeWebSocket.scala +++ b/uni/.js/src/main/scala/wvlet/uni/http/NodeWebSocket.scala @@ -39,7 +39,8 @@ private[http] object NodeWebSocket extends LogSupport: socket: js.Dynamic, head: js.Dynamic, routes: Seq[WebSocketRoute], - maxFrameSize: Int + maxFrameSize: Int, + pingIntervalMillis: Int ): Unit = try val request = buildRequest(req) @@ -52,7 +53,7 @@ private[http] object NodeWebSocket extends LogSupport: // Malformed handshake: missing key (400), or unsupported Sec-WebSocket-Version (426). writeHttpClose(socket, rejection) case Right(key) => - gateAndAccept(socket, head, key, route, request, maxFrameSize) + gateAndAccept(socket, head, key, route, request, maxFrameSize, pingIntervalMillis) catch case NonFatal(e) => warn(s"WebSocket upgrade error: ${e.getMessage}") @@ -64,7 +65,8 @@ private[http] object NodeWebSocket extends LogSupport: key: String, route: WebSocketRoute, request: Request, - maxFrameSize: Int + maxFrameSize: Int, + pingIntervalMillis: Int ): Unit = // Capture the request as threaded through the filter, so attributes a filter adds during the // handshake reach the WebSocketContext (matching the Netty/Native backends). @@ -77,7 +79,7 @@ private[http] object NodeWebSocket extends LogSupport: case OnNext(response) => val resp = response.asInstanceOf[Response] if resp.isSuccessful then - accept(socket, head, key, route, upgradeRequest, maxFrameSize) + accept(socket, head, key, route, upgradeRequest, maxFrameSize, pingIntervalMillis) else writeHttpClose(socket, resp) case OnError(e) => @@ -88,27 +90,42 @@ private[http] object NodeWebSocket extends LogSupport: destroyQuietly(socket) } + end gateAndAccept + private def accept( socket: js.Dynamic, head: js.Dynamic, key: String, route: WebSocketRoute, request: Request, - maxFrameSize: Int + maxFrameSize: Int, + pingIntervalMillis: Int ): Unit = // The route filter may be asynchronous; if the client disconnected while it ran, the socket is // already gone — skip the upgrade entirely (no onOpen, so no dangling onClose). if isDestroyed(socket) then return - val handler = route.handlerFactory(request) - val ctx = NodeWebSocketContext(socket, request) - val decoder = WebSocketFrameDecoder(maxFrameSize) - var closed = false + val handler = route.handlerFactory(request) + val ctx = NodeWebSocketContext(socket, request) + val decoder = WebSocketFrameDecoder(maxFrameSize) + var closed = false + val heartbeat = + if pingIntervalMillis > 0 then + WebSocketHeartbeat() + else + null + var intervalHandle: js.Dynamic = null def notifyClose(): Unit = if !closed then closed = true + if intervalHandle != null then + try + js.Dynamic.global.clearInterval(intervalHandle) + catch + case NonFatal(_) => + () try handler.onClose(ctx) catch @@ -119,7 +136,16 @@ private[http] object NodeWebSocket extends LogSupport: if !closed then try if !decoder.feed(bytes)(ev => - WebSocketDispatcher.dispatch(handler, ctx, ctx.sendPong, () => notifyClose(), ev) + WebSocketDispatcher.dispatch( + handler, + ctx, + ctx.sendPong, + () => notifyClose(), + ev, + () => + if heartbeat != null then + heartbeat.onActivity() + ) ) then // A terminal event (peer close / protocol failure) was emitted. Fire onClose now (the @@ -173,6 +199,20 @@ private[http] object NodeWebSocket extends LogSupport: socket.applyDynamic("pause")() if !js.isUndefined(head) && head != null && head.length.asInstanceOf[Int] > 0 then drive(NodeBytes.toBytes(head)) + + // Ping/pong heartbeat: ping an idle connection and close it if the peer stops responding. + if heartbeat != null then + val tick: js.Function0[Unit] = + () => + if !closed then + heartbeat.onTick() match + case WebSocketHeartbeat.Decision.SendPing => + ctx.sendPing(Array.emptyByteArray) + case WebSocketHeartbeat.Decision.Close => + ctx.close(1011, "ping timeout") + case WebSocketHeartbeat.Decision.Idle => + () + intervalHandle = js.Dynamic.global.setInterval(tick, pingIntervalMillis) catch case NonFatal(e) => warn(s"WebSocket accept error: ${e.getMessage}") @@ -324,6 +364,10 @@ private[http] class NodeWebSocketContext(socket: js.Dynamic, override val reques if !closed then writeFrame(WebSocketFrame.OpPong, payload) + private[http] def sendPing(payload: Array[Byte]): Unit = + if !closed then + writeFrame(WebSocketFrame.OpPing, payload) + private def writeIfOpen(opcode: Int, payload: Array[Byte]): Unit = if !closed then writeFrame(opcode, payload) diff --git a/uni/.js/src/test/scala/wvlet/uni/http/JSWebSocketClientTest.scala b/uni/.js/src/test/scala/wvlet/uni/http/JSWebSocketClientTest.scala index 285c5a95..b5c7aee2 100644 --- a/uni/.js/src/test/scala/wvlet/uni/http/JSWebSocketClientTest.scala +++ b/uni/.js/src/test/scala/wvlet/uni/http/JSWebSocketClientTest.scala @@ -17,6 +17,7 @@ import wvlet.uni.rx.{OnError, OnNext, Rx, RxRunner} import wvlet.uni.test.UniTest import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.scalajs.js /** * Verifies the JS WebSocket client (global `WebSocket`, Node.js >= 22) against the in-process Node @@ -38,6 +39,12 @@ class JSWebSocketClientTest extends UniTest: } opened.future + /** A Future that completes after `millis` (via the JS event loop). */ + private def delay(millis: Int): Future[Unit] = + val p = Promise[Unit]() + js.Dynamic.global.setTimeout((() => p.trySuccess(())): js.Function0[Unit], millis) + p.future + test("JS WebSocket client echoes text messages") { NodeServer .withPort(0) @@ -89,6 +96,31 @@ class JSWebSocketClientTest extends UniTest: } } + test("server heartbeat keeps an idle connection alive via ping/pong") { + NodeServer + .withPort(0) + .withWebSocketPingIntervalMillis(150) + .withWebSocketRoute("/ws/idle") { _ => + new WebSocketHandler {} + } + .startAndAwait { server => + val closed = Promise[Unit]() + val handler = + new WebSocketHandler: + override def onClose(ctx: WebSocketContext): Unit = closed.trySuccess(()) + val result = + for + ctx <- connect(s"ws://127.0.0.1:${server.localPort}/ws/idle", handler) + // The server pings every 150ms; the Node runtime auto-pongs, so over ~5 intervals the + // heartbeat must NOT reap this live-but-idle connection. + _ <- delay(800) + yield + ctx.close() + closed.isCompleted shouldBe false + Rx.future(result) + } + } + test("JS WebSocket client fires onClose when the connection ends") { NodeServer .withPort(0) From 1d1a0ae42790a752b034e9c8188d469e65800a04 Mon Sep 17 00:00:00 2001 From: "Taro L. Saito" Date: Fri, 19 Jun 2026 23:20:32 +0900 Subject: [PATCH 2/2] fix: Don't schedule the Node WS heartbeat after an early close (review) If onOpen/drive(head) already triggered notifyClose before the interval was scheduled, the interval would leak (notifyClose ran before intervalHandle was set, so it couldn't clear it). Guard the setInterval on `!closed`. Co-Authored-By: Claude Opus 4.8 (1M context) --- uni/.js/src/main/scala/wvlet/uni/http/NodeWebSocket.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/uni/.js/src/main/scala/wvlet/uni/http/NodeWebSocket.scala b/uni/.js/src/main/scala/wvlet/uni/http/NodeWebSocket.scala index efb48c8a..cfba64b3 100644 --- a/uni/.js/src/main/scala/wvlet/uni/http/NodeWebSocket.scala +++ b/uni/.js/src/main/scala/wvlet/uni/http/NodeWebSocket.scala @@ -201,7 +201,9 @@ private[http] object NodeWebSocket extends LogSupport: drive(NodeBytes.toBytes(head)) // Ping/pong heartbeat: ping an idle connection and close it if the peer stops responding. - if heartbeat != null then + // Guard on `!closed` so we don't schedule an interval after onOpen/drive(head) already closed + // (notifyClose ran before intervalHandle was set, so it couldn't have cleared it). + if heartbeat != null && !closed then val tick: js.Function0[Unit] = () => if !closed then