From 013a44b958e2180c4c4a72046f492191ecfabba4 Mon Sep 17 00:00:00 2001 From: Alexsander Date: Sun, 24 May 2026 19:33:06 +0300 Subject: [PATCH] Added configurable WebSocket heartbeat pings --- CHANGELOG.md | 1 + README.md | 2 + docs/production.md | 4 +- php_websocket.h | 4 + tests/001-contracts.phpt | 14 +- tests/017-server-heartbeat.phpt | 265 ++++++++++++++++++++++++++++++++ websocket.stub.php | 24 ++- websocket_arginfo.h | 16 +- websocket_connection.c | 6 + websocket_server.c | 16 +- websocket_server_runtime.c | 86 +++++++++++ 11 files changed, 431 insertions(+), 7 deletions(-) create mode 100644 tests/017-server-heartbeat.phpt diff --git a/CHANGELOG.md b/CHANGELOG.md index 1386a79..2809da5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ All notable changes to ext-websocket are documented here. ### Added +- Added opt-in server heartbeat pings with `pingIntervalMs` and `pongTimeoutMs` options. - Added `WebSocket\Server::onHandshake()` with `WebSocket\Request`, `WebSocket\HandshakeResponse`, and `WebSocket\HandshakeException` for pre-upgrade handshake validation. ## 1.2.1 - 2026-05-23 diff --git a/README.md b/README.md index 2607c89..cbb1999 100644 --- a/README.md +++ b/README.md @@ -106,6 +106,8 @@ Options: | `maxConnections` | Maximum concurrently accepted TCP connections; defaults to 10000 | | `handshakeTimeoutMs` | Maximum idle time before HTTP Upgrade completes; defaults to 10000 ms | | `idleTimeoutMs` | Maximum idle time after HTTP Upgrade completes; defaults to 120000 ms | +| `pingIntervalMs` | Idle time before sending an automatic ping; `0` disables heartbeat pings by default | +| `pongTimeoutMs` | Maximum time to wait for a pong after an automatic ping; defaults to 10000 ms | Prefer `WebSocket\ServerOptions` for explicit configuration. Associative arrays remain supported for compatibility. diff --git a/docs/production.md b/docs/production.md index 85954a8..2781488 100644 --- a/docs/production.md +++ b/docs/production.md @@ -36,10 +36,12 @@ $server = new WebSocket\Server(new WebSocket\ServerOptions( maxConnections: 1000, handshakeTimeoutMs: 5000, idleTimeoutMs: 60000, + pingIntervalMs: 30000, + pongTimeoutMs: 10000, )); ``` -`maxMessageSize` protects incoming frames and fragmented messages. `maxQueuedBytes` protects memory when a client reads slowly and outgoing writes need to be queued. `maxConnections`, `handshakeTimeoutMs`, and `idleTimeoutMs` protect file descriptors and event-loop work from slowloris-style or idle-connection pressure. +`maxMessageSize` protects incoming frames and fragmented messages. `maxQueuedBytes` protects memory when a client reads slowly and outgoing writes need to be queued. `maxConnections`, `handshakeTimeoutMs`, and `idleTimeoutMs` protect file descriptors and event-loop work from slowloris-style or idle-connection pressure. `pingIntervalMs` and `pongTimeoutMs` enable heartbeat pings and close peers that stop answering pongs. ## Slow Clients diff --git a/php_websocket.h b/php_websocket.h index c671180..8e221d3 100644 --- a/php_websocket.h +++ b/php_websocket.h @@ -27,6 +27,8 @@ extern zend_module_entry websocket_module_entry; #define WEBSOCKET_DEFAULT_MAX_CONNECTIONS 10000 #define WEBSOCKET_DEFAULT_HANDSHAKE_TIMEOUT_MS 10000 #define WEBSOCKET_DEFAULT_IDLE_TIMEOUT_MS 120000 +#define WEBSOCKET_DEFAULT_PING_INTERVAL_MS 0 +#define WEBSOCKET_DEFAULT_PONG_TIMEOUT_MS 10000 #define WEBSOCKET_CLOSE_REASON_MAX_LEN 123 #define WEBSOCKET_OPCODE_CONTINUATION 0x0 @@ -138,6 +140,8 @@ typedef struct _websocket_connection_object { zend_string *fragmented_payload; uint64_t accepted_at_usec; uint64_t last_activity_usec; + uint64_t last_ping_usec; + bool awaiting_pong; zend_object std; } websocket_connection_object; diff --git a/tests/001-contracts.phpt b/tests/001-contracts.phpt index bf02b5d..dacffc3 100644 --- a/tests/001-contracts.phpt +++ b/tests/001-contracts.phpt @@ -25,17 +25,24 @@ var_dump((new ReflectionMethod(WebSocket\HandshakeException::class, '__construct var_dump((new ReflectionMethod(WebSocket\Connection::class, 'send'))->getNumberOfParameters()); var_dump((new ReflectionProperty(WebSocket\Connection::class, 'subprotocol'))->getType()->allowsNull()); var_dump((new ReflectionMethod(WebSocket\ServerOptions::class, '__construct'))->getNumberOfParameters()); -$options = new WebSocket\ServerOptions(maxMessageSize: 1024, maxQueuedBytes: 2048, maxConnections: 16, handshakeTimeoutMs: 250, idleTimeoutMs: 500); +$options = new WebSocket\ServerOptions(maxMessageSize: 1024, maxQueuedBytes: 2048, maxConnections: 16, handshakeTimeoutMs: 250, idleTimeoutMs: 500, pingIntervalMs: 0, pongTimeoutMs: 750); var_dump($options->maxMessageSize); var_dump($options->maxQueuedBytes); var_dump($options->maxConnections); var_dump($options->handshakeTimeoutMs); var_dump($options->idleTimeoutMs); +var_dump($options->pingIntervalMs); +var_dump($options->pongTimeoutMs); try { new WebSocket\ServerOptions(maxMessageSize: 0); } catch (ValueError $e) { echo $e->getMessage(), "\n"; } +try { + new WebSocket\ServerOptions(pingIntervalMs: -1); +} catch (ValueError $e) { + echo $e->getMessage(), "\n"; +} var_dump((new ReflectionMethod(WebSocket\Frame::class, '__construct'))->getNumberOfParameters()); var_dump((new ReflectionMethod(WebSocket\CloseFrame::class, '__construct'))->getNumberOfParameters()); @@ -81,13 +88,16 @@ int(3) int(1) int(2) bool(true) -int(5) +int(7) int(1024) int(2048) int(16) int(250) int(500) +int(0) +int(750) WebSocket\ServerOptions::__construct(): Argument #1 ($maxMessageSize) must be at least 1 +WebSocket\ServerOptions::__construct(): Argument #6 ($pingIntervalMs) must be at least 0 int(3) int(2) WebSocket\Server::subprotocols(): Argument #1 must be a valid WebSocket subprotocol token diff --git a/tests/017-server-heartbeat.phpt b/tests/017-server-heartbeat.phpt new file mode 100644 index 0000000..64d319d --- /dev/null +++ b/tests/017-server-heartbeat.phpt @@ -0,0 +1,265 @@ +--TEST-- +WebSocket\Server sends heartbeat pings and closes missing pongs +--EXTENSIONS-- +websocket +--SKIPIF-- + +--FILE-- +listen('127.0.0.1', PORT_PLACEHOLDER); + +$closed = 0; + +$server->onOpen(static function (Connection $connection): void { + file_put_contents(EVENTS_PLACEHOLDER, "open\n", FILE_APPEND); +}); + +$server->onClose(static function (Connection $connection) use ($server, &$closed): void { + $closed++; + file_put_contents(EVENTS_PLACEHOLDER, "close\n", FILE_APPEND); + + if ($closed >= 2) { + $server->stop(); + } +}); + +$server->run(); +file_put_contents(EVENTS_PLACEHOLDER, "returned\n", FILE_APPEND); +PHP; + +$serverCode = str_replace( + ['PORT_PLACEHOLDER', 'EVENTS_PLACEHOLDER'], + [(string) $port, var_export($eventsFile, true)], + $serverCode, +); +file_put_contents($serverFile, $serverCode); + +$process = proc_open( + [PHP_BINARY, '-n', '-d', 'extension=' . $extension, $serverFile], + [ + 1 => ['pipe', 'w'], + 2 => ['pipe', 'w'], + ], + $pipes, +); + +if (!is_resource($process)) { + echo "cannot start server\n"; + exit; +} + +$connect = static function () use ($port, $process): mixed { + $client = false; + $deadline = microtime(true) + 5.0; + + do { + $client = @stream_socket_client('tcp://127.0.0.1:' . $port, $errno, $errstr, 0.1); + if ($client !== false) { + stream_set_timeout($client, 1); + return $client; + } + + $status = proc_get_status($process); + if (!$status['running']) { + break; + } + + usleep(10000); + } while (microtime(true) < $deadline); + + return false; +}; + +$readHeaders = static function ($client) use ($port): string { + fwrite($client, implode("\r\n", [ + 'GET / HTTP/1.1', + 'Host: 127.0.0.1:' . $port, + 'Upgrade: websocket', + 'Connection: Upgrade', + 'Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==', + 'Sec-WebSocket-Version: 13', + '', + '', + ])); + + $response = ''; + $deadline = microtime(true) + 5.0; + + do { + $chunk = fread($client, 4096); + if (is_string($chunk) && $chunk !== '') { + $response .= $chunk; + if (str_contains($response, "\r\n\r\n")) { + break; + } + } + + usleep(10000); + } while (microtime(true) < $deadline); + + return $response; +}; + +$readFrame = static function ($client, string &$buffer): Frame|CloseFrame|null { + $deadline = microtime(true) + 5.0; + + do { + $frame = Protocol::unpack($buffer); + if ($frame !== null) { + $buffer = substr($buffer, $frame->bytesConsumed); + return $frame; + } + + $chunk = fread($client, 4096); + if (is_string($chunk) && $chunk !== '') { + $buffer .= $chunk; + } + + usleep(10000); + } while (microtime(true) < $deadline); + + return null; +}; + +$client1 = $connect(); +$response1 = ''; +$buffer1 = ''; +$ping1 = null; +$close1 = null; + +if ($client1 !== false) { + $response1 = $readHeaders($client1); + $headerEnd = strpos($response1, "\r\n\r\n"); + $buffer1 = $headerEnd === false ? '' : substr($response1, $headerEnd + 4); + + $ping1 = $readFrame($client1, $buffer1); + + if ($ping1 instanceof Frame) { + fwrite($client1, Protocol::pack($ping1->payload, Protocol::OPCODE_PONG, Protocol::FLAG_FIN | Protocol::FLAG_MASK)); + } + + fwrite($client1, Protocol::pack(pack('n', Protocol::CLOSE_NORMAL), Protocol::OPCODE_CLOSE, Protocol::FLAG_FIN | Protocol::FLAG_MASK)); + $close1 = $readFrame($client1, $buffer1); + fclose($client1); +} + +$client2 = $connect(); +$response2 = ''; +$buffer2 = ''; +$ping2 = null; +$close2 = null; + +if ($client2 !== false) { + $response2 = $readHeaders($client2); + $headerEnd = strpos($response2, "\r\n\r\n"); + $buffer2 = $headerEnd === false ? '' : substr($response2, $headerEnd + 4); + + $ping2 = $readFrame($client2, $buffer2); + $close2 = $readFrame($client2, $buffer2); + + fclose($client2); +} + +$deadline = microtime(true) + 5.0; +do { + $status = proc_get_status($process); + if (!$status['running']) { + break; + } + + usleep(10000); +} while (microtime(true) < $deadline); + +$status = proc_get_status($process); +if ($status['running']) { + proc_terminate($process); +} + +$stdout = stream_get_contents($pipes[1]); +$stderr = stream_get_contents($pipes[2]); +fclose($pipes[1]); +fclose($pipes[2]); +proc_close($process); + +$events = file_exists($eventsFile) ? file($eventsFile, FILE_IGNORE_NEW_LINES) : []; + +var_dump($client1 !== false); +var_dump(str_contains($response1, "HTTP/1.1 101 Switching Protocols\r\n")); +var_dump($ping1 instanceof Frame && $ping1->type === MessageType::Ping && $ping1->payload === ''); +var_dump($close1 instanceof CloseFrame && $close1->code === Protocol::CLOSE_NORMAL); +var_dump($client2 !== false); +var_dump(str_contains($response2, "HTTP/1.1 101 Switching Protocols\r\n")); +var_dump($ping2 instanceof Frame && $ping2->type === MessageType::Ping && $ping2->payload === ''); +var_dump($close2 instanceof CloseFrame && $close2->code === Protocol::CLOSE_NORMAL && $close2->reason === 'pong timeout'); +var_dump(array_count_values($events)); +var_dump($stdout === ''); +var_dump($stderr === ''); + +@unlink($eventsFile); +@unlink($serverFile); +@rmdir($tmpDir); +?> +--EXPECT-- +bool(true) +bool(true) +bool(true) +bool(true) +bool(true) +bool(true) +bool(true) +bool(true) +array(3) { + ["open"]=> + int(2) + ["close"]=> + int(2) + ["returned"]=> + int(1) +} +bool(true) +bool(true) diff --git a/websocket.stub.php b/websocket.stub.php index b3d97f6..deca0f4 100644 --- a/websocket.stub.php +++ b/websocket.stub.php @@ -25,8 +25,10 @@ final class Server * - maxConnections: maximum concurrently accepted TCP connections. * - handshakeTimeoutMs: maximum idle time before HTTP Upgrade completes. * - idleTimeoutMs: maximum idle time after HTTP Upgrade completes. + * - pingIntervalMs: idle time before the server sends an automatic ping, or 0 to disable. + * - pongTimeoutMs: maximum time to wait for a pong after an automatic ping. * - * @param ServerOptions|array{maxMessageSize?: int, maxQueuedBytes?: int, maxConnections?: int, handshakeTimeoutMs?: int, idleTimeoutMs?: int} $options + * @param ServerOptions|array{maxMessageSize?: int, maxQueuedBytes?: int, maxConnections?: int, handshakeTimeoutMs?: int, idleTimeoutMs?: int, pingIntervalMs?: int, pongTimeoutMs?: int} $options */ public function __construct(ServerOptions|array $options = []) {} @@ -151,14 +153,30 @@ final class ServerOptions */ public readonly int $idleTimeoutMs; + /** + * Idle time before sending an automatic ping, in milliseconds. Zero disables heartbeat pings. + * + * @var int + */ + public readonly int $pingIntervalMs; + + /** + * Maximum time to wait for an automatic ping response, in milliseconds. + * + * @var int + */ + public readonly int $pongTimeoutMs; + /** * @param int $maxMessageSize * @param int $maxQueuedBytes * @param int $maxConnections * @param int $handshakeTimeoutMs * @param int $idleTimeoutMs + * @param int $pingIntervalMs + * @param int $pongTimeoutMs * - * @throws \ValueError If a limit is less than 1. + * @throws \ValueError If a positive limit is less than 1 or pingIntervalMs is negative. */ public function __construct( int $maxMessageSize = 16 * 1024 * 1024, @@ -166,6 +184,8 @@ public function __construct( int $maxConnections = 10000, int $handshakeTimeoutMs = 10000, int $idleTimeoutMs = 120000, + int $pingIntervalMs = 0, + int $pongTimeoutMs = 10000, ) {} } diff --git a/websocket_arginfo.h b/websocket_arginfo.h index ea2911b..eebb4b5 100644 --- a/websocket_arginfo.h +++ b/websocket_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 6adb6001500f3acf211a163aa05ad143a2628fbc */ + * Stub hash: 97cba842ad6ce0a92df68f7dc4ae6193c46f1251 */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_WebSocket_Server___construct, 0, 0, 0) ZEND_ARG_OBJ_TYPE_MASK(0, options, WebSocket\\ServerOptions, MAY_BE_ARRAY, "[]") @@ -40,6 +40,8 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_WebSocket_ServerOptions___construct, 0, 0, ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, maxConnections, IS_LONG, 0, "10000") ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, handshakeTimeoutMs, IS_LONG, 0, "10000") ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, idleTimeoutMs, IS_LONG, 0, "120000") + ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, pingIntervalMs, IS_LONG, 0, "0") + ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, pongTimeoutMs, IS_LONG, 0, "10000") ZEND_END_ARG_INFO() ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_WebSocket_Request_header, 0, 1, IS_STRING, 1) @@ -236,6 +238,18 @@ static zend_class_entry *register_class_WebSocket_ServerOptions(void) zend_declare_typed_property(class_entry, property_idleTimeoutMs_name, &property_idleTimeoutMs_default_value, ZEND_ACC_PUBLIC|ZEND_ACC_READONLY, NULL, (zend_type) ZEND_TYPE_INIT_MASK(MAY_BE_LONG)); zend_string_release(property_idleTimeoutMs_name); + zval property_pingIntervalMs_default_value; + ZVAL_UNDEF(&property_pingIntervalMs_default_value); + zend_string *property_pingIntervalMs_name = zend_string_init("pingIntervalMs", sizeof("pingIntervalMs") - 1, 1); + zend_declare_typed_property(class_entry, property_pingIntervalMs_name, &property_pingIntervalMs_default_value, ZEND_ACC_PUBLIC|ZEND_ACC_READONLY, NULL, (zend_type) ZEND_TYPE_INIT_MASK(MAY_BE_LONG)); + zend_string_release(property_pingIntervalMs_name); + + zval property_pongTimeoutMs_default_value; + ZVAL_UNDEF(&property_pongTimeoutMs_default_value); + zend_string *property_pongTimeoutMs_name = zend_string_init("pongTimeoutMs", sizeof("pongTimeoutMs") - 1, 1); + zend_declare_typed_property(class_entry, property_pongTimeoutMs_name, &property_pongTimeoutMs_default_value, ZEND_ACC_PUBLIC|ZEND_ACC_READONLY, NULL, (zend_type) ZEND_TYPE_INIT_MASK(MAY_BE_LONG)); + zend_string_release(property_pongTimeoutMs_name); + return class_entry; } diff --git a/websocket_connection.c b/websocket_connection.c index d86c0ca..74b934b 100644 --- a/websocket_connection.c +++ b/websocket_connection.c @@ -59,6 +59,8 @@ static zend_object *websocket_connection_create_object(zend_class_entry *ce) intern->fragmented_payload = NULL; intern->accepted_at_usec = 0; intern->last_activity_usec = 0; + intern->last_ping_usec = 0; + intern->awaiting_pong = false; intern->std.handlers = &websocket_connection_handlers; @@ -167,6 +169,8 @@ void websocket_connection_close_socket(websocket_connection_object *intern) intern->fragmented_opcode = 0; intern->accepted_at_usec = 0; intern->last_activity_usec = 0; + intern->last_ping_usec = 0; + intern->awaiting_pong = false; } bool websocket_connection_has_pending_writes(websocket_connection_object *intern) @@ -415,6 +419,8 @@ void websocket_connection_open(websocket_connection_object *intern, uint64_t id, } intern->fragmented = false; intern->fragmented_opcode = 0; + intern->last_ping_usec = 0; + intern->awaiting_pong = false; } static void websocket_connection_free_object(zend_object *object) diff --git a/websocket_server.c b/websocket_server.c index 038f8d2..00b45da 100644 --- a/websocket_server.c +++ b/websocket_server.c @@ -179,14 +179,18 @@ PHP_METHOD(WebSocket_ServerOptions, __construct) zend_long max_connections = WEBSOCKET_DEFAULT_MAX_CONNECTIONS; zend_long handshake_timeout_ms = WEBSOCKET_DEFAULT_HANDSHAKE_TIMEOUT_MS; zend_long idle_timeout_ms = WEBSOCKET_DEFAULT_IDLE_TIMEOUT_MS; + zend_long ping_interval_ms = WEBSOCKET_DEFAULT_PING_INTERVAL_MS; + zend_long pong_timeout_ms = WEBSOCKET_DEFAULT_PONG_TIMEOUT_MS; - ZEND_PARSE_PARAMETERS_START(0, 5) + ZEND_PARSE_PARAMETERS_START(0, 7) Z_PARAM_OPTIONAL Z_PARAM_LONG(max_message_size) Z_PARAM_LONG(max_queued_bytes) Z_PARAM_LONG(max_connections) Z_PARAM_LONG(handshake_timeout_ms) Z_PARAM_LONG(idle_timeout_ms) + Z_PARAM_LONG(ping_interval_ms) + Z_PARAM_LONG(pong_timeout_ms) ZEND_PARSE_PARAMETERS_END(); if (max_message_size < 1) { @@ -209,12 +213,22 @@ PHP_METHOD(WebSocket_ServerOptions, __construct) zend_argument_value_error(5, "must be at least 1"); RETURN_THROWS(); } + if (ping_interval_ms < 0) { + zend_argument_value_error(6, "must be at least 0"); + RETURN_THROWS(); + } + if (pong_timeout_ms < 1) { + zend_argument_value_error(7, "must be at least 1"); + RETURN_THROWS(); + } zend_update_property_long(websocket_server_options_ce, Z_OBJ_P(ZEND_THIS), "maxMessageSize", strlen("maxMessageSize"), max_message_size); zend_update_property_long(websocket_server_options_ce, Z_OBJ_P(ZEND_THIS), "maxQueuedBytes", strlen("maxQueuedBytes"), max_queued_bytes); zend_update_property_long(websocket_server_options_ce, Z_OBJ_P(ZEND_THIS), "maxConnections", strlen("maxConnections"), max_connections); zend_update_property_long(websocket_server_options_ce, Z_OBJ_P(ZEND_THIS), "handshakeTimeoutMs", strlen("handshakeTimeoutMs"), handshake_timeout_ms); zend_update_property_long(websocket_server_options_ce, Z_OBJ_P(ZEND_THIS), "idleTimeoutMs", strlen("idleTimeoutMs"), idle_timeout_ms); + zend_update_property_long(websocket_server_options_ce, Z_OBJ_P(ZEND_THIS), "pingIntervalMs", strlen("pingIntervalMs"), ping_interval_ms); + zend_update_property_long(websocket_server_options_ce, Z_OBJ_P(ZEND_THIS), "pongTimeoutMs", strlen("pongTimeoutMs"), pong_timeout_ms); } PHP_METHOD(WebSocket_Server, listen) diff --git a/websocket_server_runtime.c b/websocket_server_runtime.c index 9009eb2..6b4fe5d 100644 --- a/websocket_server_runtime.c +++ b/websocket_server_runtime.c @@ -46,6 +46,8 @@ static bool websocket_server_close_with_code(websocket_connection_object *connec static bool websocket_server_send_bytes(int fd, const char *buffer, size_t len); static uint64_t websocket_server_handshake_timeout_usec(websocket_server_object *intern); static uint64_t websocket_server_idle_timeout_usec(websocket_server_object *intern); +static uint64_t websocket_server_ping_interval_usec(websocket_server_object *intern); +static uint64_t websocket_server_pong_timeout_usec(websocket_server_object *intern); static const char websocket_bad_request_response[] = "HTTP/1.1 400 Bad Request\r\n" @@ -736,6 +738,31 @@ static size_t websocket_server_positive_option(websocket_server_object *intern, return (size_t) option_value; } +static size_t websocket_server_nonnegative_option(websocket_server_object *intern, const char *name, const size_t name_len, const size_t fallback) +{ + zval *value; + zval rv; + zend_long option_value; + + if (Z_TYPE(intern->options) == IS_ARRAY) { + value = zend_hash_str_find(Z_ARRVAL(intern->options), name, name_len); + if (!value) { + return fallback; + } + } else if (Z_TYPE(intern->options) == IS_OBJECT && instanceof_function(Z_OBJCE(intern->options), websocket_server_options_ce)) { + value = zend_read_property(websocket_server_options_ce, Z_OBJ(intern->options), name, name_len, 0, &rv); + } else { + return fallback; + } + + option_value = zval_get_long(value); + if (option_value < 0) { + return fallback; + } + + return (size_t) option_value; +} + static size_t websocket_server_max_message_size(websocket_server_object *intern) { return websocket_server_positive_option(intern, "maxMessageSize", strlen("maxMessageSize"), WEBSOCKET_DEFAULT_MAX_MESSAGE_SIZE); @@ -761,6 +788,16 @@ static uint64_t websocket_server_idle_timeout_usec(websocket_server_object *inte return (uint64_t) websocket_server_positive_option(intern, "idleTimeoutMs", strlen("idleTimeoutMs"), WEBSOCKET_DEFAULT_IDLE_TIMEOUT_MS) * 1000u; } +static uint64_t websocket_server_ping_interval_usec(websocket_server_object *intern) +{ + return (uint64_t) websocket_server_nonnegative_option(intern, "pingIntervalMs", strlen("pingIntervalMs"), WEBSOCKET_DEFAULT_PING_INTERVAL_MS) * 1000u; +} + +static uint64_t websocket_server_pong_timeout_usec(websocket_server_object *intern) +{ + return (uint64_t) websocket_server_positive_option(intern, "pongTimeoutMs", strlen("pongTimeoutMs"), WEBSOCKET_DEFAULT_PONG_TIMEOUT_MS) * 1000u; +} + static bool websocket_server_close_with_code(websocket_connection_object *connection_obj, const zend_long code, const char *reason) { zend_string *reason_string; @@ -780,6 +817,45 @@ static bool websocket_server_close_with_code(websocket_connection_object *connec return ok; } +static bool websocket_server_process_heartbeat(websocket_server_object *intern, websocket_connection_object *connection_obj, const uint64_t now_usec) +{ + const uint64_t ping_interval_usec = websocket_server_ping_interval_usec(intern); + const uint64_t last_activity_usec = connection_obj->last_activity_usec ? connection_obj->last_activity_usec : connection_obj->accepted_at_usec; + zend_string *payload; + bool ok; + + if (ping_interval_usec == 0 || now_usec == 0 || !connection_obj->open || !connection_obj->upgraded || connection_obj->close_after_write) { + return true; + } + + if (connection_obj->awaiting_pong) { + const uint64_t pong_timeout_usec = websocket_server_pong_timeout_usec(intern); + + if (connection_obj->last_ping_usec > 0 && now_usec >= connection_obj->last_ping_usec && now_usec - connection_obj->last_ping_usec >= pong_timeout_usec) { + return websocket_server_close_with_code(connection_obj, WEBSOCKET_CLOSE_NORMAL, "pong timeout"); + } + + return true; + } + + if (last_activity_usec == 0 || now_usec < last_activity_usec || now_usec - last_activity_usec < ping_interval_usec) { + return true; + } + + payload = zend_string_init("", 0, false); + ok = websocket_connection_send_frame(connection_obj, payload, WEBSOCKET_OPCODE_PING); + zend_string_release(payload); + + if (!ok) { + return websocket_server_close_with_code(connection_obj, WEBSOCKET_CLOSE_NORMAL, "heartbeat failed"); + } + + connection_obj->awaiting_pong = true; + connection_obj->last_ping_usec = now_usec; + + return true; +} + static zend_always_inline void websocket_server_mask_payload(unsigned char *dst, const unsigned char *src, const size_t len, const uint8_t mask[4]) { size_t i; @@ -1042,6 +1118,8 @@ static bool websocket_server_handle_frame(websocket_server_object *intern, zval ok = websocket_connection_send_frame(connection_obj, frame->payload, WEBSOCKET_OPCODE_PONG); break; case WEBSOCKET_OPCODE_PONG: + connection_obj->awaiting_pong = false; + connection_obj->last_ping_usec = 0; break; case WEBSOCKET_OPCODE_CLOSE: if (connection_obj->open && connection_obj->upgraded) { @@ -1406,6 +1484,10 @@ static bool websocket_server_process_connection_fd(websocket_server_object *inte return true; } + if (!websocket_server_process_heartbeat(intern, connection_obj, now_usec)) { + return false; + } + if (websocket_connection_has_pending_writes(connection_obj) && !websocket_connection_flush(connection_obj)) { return true; } @@ -1441,6 +1523,10 @@ static bool websocket_server_process_connections(websocket_server_object *intern continue; } + if (!websocket_server_process_heartbeat(intern, connection_obj, now_usec)) { + return false; + } + if (websocket_connection_has_pending_writes(connection_obj) && !websocket_connection_flush(connection_obj)) { continue; }