From c381487a55950d0633c3ccff234ace7559bdb8f2 Mon Sep 17 00:00:00 2001 From: Anton Filonenko Date: Tue, 12 May 2026 12:11:19 +0300 Subject: [PATCH 1/2] fix(nitronode,rpc): bound rpc_connections_active label cardinality Replace the client-controlled Origin header label on the rpc_connections_active gauge with application_id, which is parsed and validated against IsValidApplicationID at WebSocket upgrade. The hub now tracks per-application counts and deletes the gauge series when an application's count drops to zero, so disconnected buckets do not accumulate as zero-valued time series. The connection-level Origin field is retained for logging. Co-Authored-By: Claude Opus 4.7 (1M context) --- nitronode/metrics/exporter.go | 21 +++++++++++++------- nitronode/metrics/interface.go | 4 ++-- pkg/rpc/connection.go | 16 +++++++++++++++ pkg/rpc/connection_hub.go | 36 ++++++++++++++++------------------ pkg/rpc/node.go | 3 ++- 5 files changed, 51 insertions(+), 29 deletions(-) diff --git a/nitronode/metrics/exporter.go b/nitronode/metrics/exporter.go index ca0742cd9..715965cc9 100644 --- a/nitronode/metrics/exporter.go +++ b/nitronode/metrics/exporter.go @@ -289,11 +289,11 @@ func NewRuntimeMetricExporter(reg prometheus.Registerer) (RuntimeMetricExporter, rpcConnectionsTotal: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: MetricNamespace, Name: "rpc_connections_active", - Help: "Active RPC (WebSocket) connections. Labels: region, origin " + - "(both sourced from request headers / client metadata at connect " + - "time). Operator-decided values — bounded only by the set of " + - "connecting clients.", - }, []string{"region", "origin"}), + Help: "Active RPC (WebSocket) connections, labeled by application_id " + + "sourced from the app_id query parameter at connect time. " + + "Connections without an app_id are bucketed under _DEFAULT. " + + "Series for an application_id are deleted once its count drops to 0.", + }, []string{"application_id"}), rpcInflight: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: MetricNamespace, Name: "rpc_inflight", @@ -412,8 +412,15 @@ func (m *runtimeMetricExporter) ObserveRPCDuration(method, path string, success m.rpcRequestDurationSeconds.WithLabelValues(method, path, result.String()).Observe(duration.Seconds()) } -func (m *runtimeMetricExporter) SetRPCConnections(region, origin string, count uint32) { - m.rpcConnectionsTotal.WithLabelValues(region, origin).Set(float64(count)) +func (m *runtimeMetricExporter) SetRPCConnections(applicationID string, count uint32) { + label := getApplicationIDLabelValue(applicationID) + if count == 0 { + // Shed the series when the bucket empties so unique application_id values + // from clients cannot accumulate unbounded gauge labels over time. + m.rpcConnectionsTotal.DeleteLabelValues(label) + return + } + m.rpcConnectionsTotal.WithLabelValues(label).Set(float64(count)) } func (m *runtimeMetricExporter) IncRPCInflight(method string) { diff --git a/nitronode/metrics/interface.go b/nitronode/metrics/interface.go index c7392d4b6..1e519a2d4 100644 --- a/nitronode/metrics/interface.go +++ b/nitronode/metrics/interface.go @@ -37,7 +37,7 @@ type RuntimeMetricExporter interface { IncRPCMessage(msgType rpc.MsgType, method string) IncRPCRequest(method, path string, success bool) ObserveRPCDuration(method, path string, success bool, duration time.Duration) - SetRPCConnections(region, origin string, count uint32) + SetRPCConnections(applicationID string, count uint32) IncRPCInflight(method string) DecRPCInflight(method string) @@ -68,7 +68,7 @@ func (noopRuntimeMetricExporter) IncAppSessionKeys() func (noopRuntimeMetricExporter) IncRPCMessage(rpc.MsgType, string) {} func (noopRuntimeMetricExporter) IncRPCRequest(string, string, bool) {} func (noopRuntimeMetricExporter) ObserveRPCDuration(string, string, bool, time.Duration) {} -func (noopRuntimeMetricExporter) SetRPCConnections(string, string, uint32) {} +func (noopRuntimeMetricExporter) SetRPCConnections(string, uint32) {} func (noopRuntimeMetricExporter) IncAppStateUpdate(string) {} func (noopRuntimeMetricExporter) IncAppSessionUpdateSigValidation(string, app.AppSessionSignerTypeV1, bool) { } diff --git a/pkg/rpc/connection.go b/pkg/rpc/connection.go index 1a86e249a..e213da262 100644 --- a/pkg/rpc/connection.go +++ b/pkg/rpc/connection.go @@ -44,6 +44,10 @@ type Connection interface { // Origin returns the origin of the connection, such as the client's IP address or other identifying information. Origin() string + // ApplicationID returns the application identifier supplied at connection time (via the + // app_id query parameter). Returns an empty string if no application_id was provided. + ApplicationID() string + // RawRequests returns a read-only channel for receiving incoming raw request messages. // Messages received on this channel are raw bytes that need to be unmarshaled // into Request objects for processing. The channel is closed when the @@ -104,6 +108,8 @@ type WebsocketConnection struct { connectionID string // origin is the origin of the connection, such as the client's IP address origin string + // applicationID is the app_id query parameter supplied at WebSocket upgrade (may be empty) + applicationID string // websocketConn is the underlying WebSocket connection websocketConn GorillaWsConnectionAdapter // writeTimeout is the maximum duration to wait for a write to complete @@ -140,6 +146,9 @@ type WebsocketConnectionConfig struct { ConnectionID string // Origin is the origin of the connection, such as the client's IP address (optional) Origin string + // ApplicationID is the app_id query parameter supplied at WebSocket upgrade (optional). + // Caller is responsible for validation; the connection stores it as-is for metrics labeling. + ApplicationID string // WebsocketConn is the underlying WebSocket connection (required) WebsocketConn GorillaWsConnectionAdapter @@ -212,6 +221,7 @@ func NewWebsocketConnection(config WebsocketConnectionConfig) (*WebsocketConnect return &WebsocketConnection{ connectionID: config.ConnectionID, origin: config.Origin, + applicationID: config.ApplicationID, websocketConn: config.WebsocketConn, writeTimeout: config.WriteTimeout, pingInterval: config.PingInterval, @@ -321,6 +331,12 @@ func (conn *WebsocketConnection) Origin() string { return conn.origin } +// ApplicationID returns the app_id query parameter supplied at WebSocket upgrade, +// or an empty string if none was provided. +func (conn *WebsocketConnection) ApplicationID() string { + return conn.applicationID +} + // RawRequests returns the channel for processing incoming requests. func (conn *WebsocketConnection) RawRequests() <-chan []byte { return conn.processSink diff --git a/pkg/rpc/connection_hub.go b/pkg/rpc/connection_hub.go index b92c5491b..a71b9d48e 100644 --- a/pkg/rpc/connection_hub.go +++ b/pkg/rpc/connection_hub.go @@ -5,9 +5,10 @@ import ( "sync" ) -const defaultConnectionRegion = "default" - -type ObserveConnectionsFn func(region, origin string, count uint32) +// ObserveConnectionsFn is invoked on connect and disconnect with the current per-application +// connection count. A count of 0 signals that the bucket is empty and the observer should +// shed any per-label state (e.g., delete the Prometheus gauge series) to bound cardinality. +type ObserveConnectionsFn func(applicationID string, count uint32) // ConnectionHub provides centralized management of all active RPC connections. // It maintains thread-safe mappings between connection IDs and Connection instances, @@ -28,9 +29,9 @@ type ConnectionHub struct { // mu protects concurrent access to the maps mu sync.RWMutex - // sourceMap is an optional mapping of connection sources (e.g., IP addresses or regions) - sourceMap map[string]uint32 - // observeConnections is a callback function to monitor connection counts by region + // appConnCount tracks active connection counts keyed by application_id (may be empty string) + appConnCount map[string]uint32 + // observeConnections is a callback function to report per-application connection counts observeConnections ObserveConnectionsFn } @@ -41,7 +42,7 @@ func NewConnectionHub(observeConnections ObserveConnectionsFn) *ConnectionHub { return &ConnectionHub{ connections: make(map[string]Connection), authMapping: make(map[string]map[string]bool), - sourceMap: make(map[string]uint32), + appConnCount: make(map[string]uint32), observeConnections: observeConnections, } } @@ -69,9 +70,9 @@ func (hub *ConnectionHub) Add(conn Connection) error { hub.connections[connID] = conn - sourceID := getSourceID(conn.Origin()) - hub.sourceMap[sourceID]++ - hub.observeConnections(defaultConnectionRegion, conn.Origin(), uint32(hub.sourceMap[sourceID])) + appID := conn.ApplicationID() + hub.appConnCount[appID]++ + hub.observeConnections(appID, hub.appConnCount[appID]) return nil } @@ -111,14 +112,14 @@ func (hub *ConnectionHub) Remove(connID string) { } delete(hub.connections, connID) - sourceID := getSourceID(conn.Origin()) - if count, exists := hub.sourceMap[sourceID]; exists && count > 0 { - hub.sourceMap[sourceID]-- - if hub.sourceMap[sourceID] == 0 { - delete(hub.sourceMap, sourceID) + appID := conn.ApplicationID() + if count, exists := hub.appConnCount[appID]; exists && count > 0 { + hub.appConnCount[appID]-- + if hub.appConnCount[appID] == 0 { + delete(hub.appConnCount, appID) } } - hub.observeConnections(defaultConnectionRegion, conn.Origin(), uint32(hub.sourceMap[sourceID])) + hub.observeConnections(appID, hub.appConnCount[appID]) } // Publish broadcasts a message to all active connections for a specific user. @@ -153,6 +154,3 @@ func (hub *ConnectionHub) Publish(userID string, response []byte) { } } -func getSourceID(origin string) string { - return origin -} diff --git a/pkg/rpc/node.go b/pkg/rpc/node.go index 3964a1b27..55c8f6a32 100644 --- a/pkg/rpc/node.go +++ b/pkg/rpc/node.go @@ -157,7 +157,7 @@ func NewWebsocketNode(config WebsocketNodeConfig) (*WebsocketNode, error) { if config.ObserveConnections == nil { // Default implementation does nothing, but can be overridden for monitoring - config.ObserveConnections = func(region, origin string, count uint32) {} + config.ObserveConnections = func(applicationID string, count uint32) {} } if config.WsUpgraderReadBufferSize <= 0 { // It's the optimal default value as recommended @@ -236,6 +236,7 @@ func (wn *WebsocketNode) ServeHTTP(w http.ResponseWriter, r *http.Request) { connConfig := WebsocketConnectionConfig{ ConnectionID: connectionID, Origin: r.Header.Get("Origin"), + ApplicationID: applicationID, WebsocketConn: wsConnection, Logger: wn.cfg.Logger, ProcessBufferSize: wn.cfg.WsConnProcessBufferSize, From 885bbfcbe360d0636f2fd7d8f7260af511b2666f Mon Sep 17 00:00:00 2001 From: Anton Filonenko Date: Wed, 13 May 2026 13:59:24 +0300 Subject: [PATCH 2/2] fix(rpc): release ConnectionHub lock before observer callback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ConnectionHub.Add and Remove invoked observeConnections (-> Prometheus SetRPCConnections, which takes metric-internal mutexes) while still holding hub.mu, so all readers — including Publish — serialized behind Prometheus writes during connection churn. Snapshot the post-mutation count, release hub.mu, then call the observer outside the lock. Also gate Remove's observer call on whether the bucket actually changed, so map-miss / already-zero paths no longer emit a spurious DeleteLabelValues for an app_id the gauge never tracked. Annotate the rpc_connections_active Help with the remaining cardinality bound: the format regex on application_id plus disconnect-driven series shedding — no registry check and no per-app connection cap. Co-Authored-By: Claude Opus 4.7 (1M context) --- nitronode/metrics/exporter.go | 6 +++++- pkg/rpc/connection_hub.go | 28 ++++++++++++++++++++++------ 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/nitronode/metrics/exporter.go b/nitronode/metrics/exporter.go index 715965cc9..d2a7262d9 100644 --- a/nitronode/metrics/exporter.go +++ b/nitronode/metrics/exporter.go @@ -292,7 +292,11 @@ func NewRuntimeMetricExporter(reg prometheus.Registerer) (RuntimeMetricExporter, Help: "Active RPC (WebSocket) connections, labeled by application_id " + "sourced from the app_id query parameter at connect time. " + "Connections without an app_id are bucketed under _DEFAULT. " + - "Series for an application_id are deleted once its count drops to 0.", + "Series for an application_id are deleted once its count drops to 0. " + + "NOTE: cardinality is bounded by the app_id format check " + + "(^[a-z0-9_-]{1,66}$) and by series shedding on disconnect — long-lived " + + "connections from many distinct but format-valid app_ids are not gated " + + "by a registry or per-app connection cap.", }, []string{"application_id"}), rpcInflight: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: MetricNamespace, diff --git a/pkg/rpc/connection_hub.go b/pkg/rpc/connection_hub.go index a71b9d48e..83587761f 100644 --- a/pkg/rpc/connection_hub.go +++ b/pkg/rpc/connection_hub.go @@ -61,10 +61,10 @@ func (hub *ConnectionHub) Add(conn Connection) error { connID := conn.ConnectionID() hub.mu.Lock() - defer hub.mu.Unlock() // If the connection already exists, return an error if _, exists := hub.connections[connID]; exists { + hub.mu.Unlock() return fmt.Errorf("connection with ID %s already exists", connID) } @@ -72,7 +72,12 @@ func (hub *ConnectionHub) Add(conn Connection) error { appID := conn.ApplicationID() hub.appConnCount[appID]++ - hub.observeConnections(appID, hub.appConnCount[appID]) + count := hub.appConnCount[appID] + hub.mu.Unlock() + + // Invoke the observer outside the lock: SetRPCConnections takes Prometheus-internal + // mutexes, and holding hub.mu across that would serialize readers (including Publish). + hub.observeConnections(appID, count) return nil } @@ -104,22 +109,33 @@ func (hub *ConnectionHub) Get(connID string) Connection { // This method is safe for concurrent access. func (hub *ConnectionHub) Remove(connID string) { hub.mu.Lock() - defer hub.mu.Unlock() conn, exists := hub.connections[connID] if !exists { + hub.mu.Unlock() return // No connection to remove } delete(hub.connections, connID) appID := conn.ApplicationID() - if count, exists := hub.appConnCount[appID]; exists && count > 0 { + count, tracked := hub.appConnCount[appID] + changed := false + if tracked && count > 0 { hub.appConnCount[appID]-- - if hub.appConnCount[appID] == 0 { + count = hub.appConnCount[appID] + if count == 0 { delete(hub.appConnCount, appID) } + changed = true + } + hub.mu.Unlock() + + // Only notify the observer when the bucket actually changed; otherwise we would + // emit DeleteLabelValues for an app_id the gauge never tracked. Invoke outside + // hub.mu to avoid serializing readers behind Prometheus-internal locks. + if changed { + hub.observeConnections(appID, count) } - hub.observeConnections(appID, hub.appConnCount[appID]) } // Publish broadcasts a message to all active connections for a specific user.