Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion uni/.js/src/main/scala/wvlet/uni/http/NodeHttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ class NodeHttpServer(config: NodeServerConfig) extends HttpServer with LogSuppor
socket,
head,
config.webSocketRoutes,
config.webSocketMaxFrameSize
config.webSocketMaxFrameSize,
config.webSocketPingIntervalMillis
)
server.applyDynamic("on")("upgrade", onUpgrade)

Expand Down
9 changes: 8 additions & 1 deletion uni/.js/src/main/scala/wvlet/uni/http/NodeServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ case class NodeServerConfig(
// WebSocket routes, matched by path during the HTTP upgrade handshake
override val webSocketRoutes: Seq[WebSocketRoute] = Nil,
// Maximum size (bytes) of an inbound WebSocket message
webSocketMaxFrameSize: Int = 1024 * 1024
webSocketMaxFrameSize: Int = 1024 * 1024,
// Ping/pong heartbeat interval for WebSocket connections (0 disables it): the server pings an
// idle connection and closes it if the peer stops responding to pings.
webSocketPingIntervalMillis: Int = 0
) extends HttpServerConfig:

def withName(name: String): NodeServerConfig = copy(name = name)
Expand Down Expand Up @@ -91,6 +94,10 @@ case class NodeServerConfig(
require(sizeInBytes > 0, "webSocketMaxFrameSize must be positive")
copy(webSocketMaxFrameSize = sizeInBytes)

def withWebSocketPingIntervalMillis(millis: Int): NodeServerConfig =
require(millis >= 0, "webSocketPingIntervalMillis must be >= 0")
copy(webSocketPingIntervalMillis = millis)

/**
* Start the server and return the running server instance. Note that on Node.js the socket bind
* is asynchronous: use [[NodeHttpServer.whenReady]] (or [[startAndAwait]]) before reading
Expand Down
66 changes: 56 additions & 10 deletions uni/.js/src/main/scala/wvlet/uni/http/NodeWebSocket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ private[http] object NodeWebSocket extends LogSupport:
socket: js.Dynamic,
head: js.Dynamic,
routes: Seq[WebSocketRoute],
maxFrameSize: Int
maxFrameSize: Int,
pingIntervalMillis: Int
): Unit =
try
val request = buildRequest(req)
Expand All @@ -52,7 +53,7 @@ private[http] object NodeWebSocket extends LogSupport:
// Malformed handshake: missing key (400), or unsupported Sec-WebSocket-Version (426).
writeHttpClose(socket, rejection)
case Right(key) =>
gateAndAccept(socket, head, key, route, request, maxFrameSize)
gateAndAccept(socket, head, key, route, request, maxFrameSize, pingIntervalMillis)
catch
case NonFatal(e) =>
warn(s"WebSocket upgrade error: ${e.getMessage}")
Expand All @@ -64,7 +65,8 @@ private[http] object NodeWebSocket extends LogSupport:
key: String,
route: WebSocketRoute,
request: Request,
maxFrameSize: Int
maxFrameSize: Int,
pingIntervalMillis: Int
): Unit =
// Capture the request as threaded through the filter, so attributes a filter adds during the
// handshake reach the WebSocketContext (matching the Netty/Native backends).
Expand All @@ -77,7 +79,7 @@ private[http] object NodeWebSocket extends LogSupport:
case OnNext(response) =>
val resp = response.asInstanceOf[Response]
if resp.isSuccessful then
accept(socket, head, key, route, upgradeRequest, maxFrameSize)
accept(socket, head, key, route, upgradeRequest, maxFrameSize, pingIntervalMillis)
else
writeHttpClose(socket, resp)
case OnError(e) =>
Expand All @@ -88,27 +90,42 @@ private[http] object NodeWebSocket extends LogSupport:
destroyQuietly(socket)
}

end gateAndAccept

private def accept(
socket: js.Dynamic,
head: js.Dynamic,
key: String,
route: WebSocketRoute,
request: Request,
maxFrameSize: Int
maxFrameSize: Int,
pingIntervalMillis: Int
): Unit =
// The route filter may be asynchronous; if the client disconnected while it ran, the socket is
// already gone — skip the upgrade entirely (no onOpen, so no dangling onClose).
if isDestroyed(socket) then
return

val handler = route.handlerFactory(request)
val ctx = NodeWebSocketContext(socket, request)
val decoder = WebSocketFrameDecoder(maxFrameSize)
var closed = false
val handler = route.handlerFactory(request)
val ctx = NodeWebSocketContext(socket, request)
val decoder = WebSocketFrameDecoder(maxFrameSize)
var closed = false
val heartbeat =
if pingIntervalMillis > 0 then
WebSocketHeartbeat()
else
null
var intervalHandle: js.Dynamic = null

def notifyClose(): Unit =
if !closed then
closed = true
if intervalHandle != null then
try
js.Dynamic.global.clearInterval(intervalHandle)
catch
case NonFatal(_) =>
()
try
handler.onClose(ctx)
catch
Expand All @@ -119,7 +136,16 @@ private[http] object NodeWebSocket extends LogSupport:
if !closed then
try
if !decoder.feed(bytes)(ev =>
WebSocketDispatcher.dispatch(handler, ctx, ctx.sendPong, () => notifyClose(), ev)
WebSocketDispatcher.dispatch(
handler,
ctx,
ctx.sendPong,
() => notifyClose(),
ev,
() =>
if heartbeat != null then
heartbeat.onActivity()
)
)
then
// A terminal event (peer close / protocol failure) was emitted. Fire onClose now (the
Expand Down Expand Up @@ -173,6 +199,22 @@ private[http] object NodeWebSocket extends LogSupport:
socket.applyDynamic("pause")()
if !js.isUndefined(head) && head != null && head.length.asInstanceOf[Int] > 0 then
drive(NodeBytes.toBytes(head))

// Ping/pong heartbeat: ping an idle connection and close it if the peer stops responding.
// Guard on `!closed` so we don't schedule an interval after onOpen/drive(head) already closed
// (notifyClose ran before intervalHandle was set, so it couldn't have cleared it).
if heartbeat != null && !closed then
val tick: js.Function0[Unit] =
() =>
if !closed then
heartbeat.onTick() match
case WebSocketHeartbeat.Decision.SendPing =>
ctx.sendPing(Array.emptyByteArray)
case WebSocketHeartbeat.Decision.Close =>
ctx.close(1011, "ping timeout")
case WebSocketHeartbeat.Decision.Idle =>
()
intervalHandle = js.Dynamic.global.setInterval(tick, pingIntervalMillis)
catch
case NonFatal(e) =>
warn(s"WebSocket accept error: ${e.getMessage}")
Expand Down Expand Up @@ -324,6 +366,10 @@ private[http] class NodeWebSocketContext(socket: js.Dynamic, override val reques
if !closed then
writeFrame(WebSocketFrame.OpPong, payload)

private[http] def sendPing(payload: Array[Byte]): Unit =
if !closed then
writeFrame(WebSocketFrame.OpPing, payload)

private def writeIfOpen(opcode: Int, payload: Array[Byte]): Unit =
if !closed then
writeFrame(opcode, payload)
Expand Down
32 changes: 32 additions & 0 deletions uni/.js/src/test/scala/wvlet/uni/http/JSWebSocketClientTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import wvlet.uni.rx.{OnError, OnNext, Rx, RxRunner}
import wvlet.uni.test.UniTest

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.scalajs.js

/**
* Verifies the JS WebSocket client (global `WebSocket`, Node.js >= 22) against the in-process Node
Expand All @@ -38,6 +39,12 @@ class JSWebSocketClientTest extends UniTest:
}
opened.future

/** A Future that completes after `millis` (via the JS event loop). */
private def delay(millis: Int): Future[Unit] =
val p = Promise[Unit]()
js.Dynamic.global.setTimeout((() => p.trySuccess(())): js.Function0[Unit], millis)
p.future

test("JS WebSocket client echoes text messages") {
NodeServer
.withPort(0)
Expand Down Expand Up @@ -89,6 +96,31 @@ class JSWebSocketClientTest extends UniTest:
}
}

test("server heartbeat keeps an idle connection alive via ping/pong") {
NodeServer
.withPort(0)
.withWebSocketPingIntervalMillis(150)
.withWebSocketRoute("/ws/idle") { _ =>
new WebSocketHandler {}
}
.startAndAwait { server =>
val closed = Promise[Unit]()
val handler =
new WebSocketHandler:
override def onClose(ctx: WebSocketContext): Unit = closed.trySuccess(())
val result =
for
ctx <- connect(s"ws://127.0.0.1:${server.localPort}/ws/idle", handler)
// The server pings every 150ms; the Node runtime auto-pongs, so over ~5 intervals the
// heartbeat must NOT reap this live-but-idle connection.
_ <- delay(800)
yield
ctx.close()
closed.isCompleted shouldBe false
Rx.future(result)
}
}

test("JS WebSocket client fires onClose when the connection ends") {
NodeServer
.withPort(0)
Expand Down