diff --git a/nitronode/metrics/exporter.go b/nitronode/metrics/exporter.go index ca0742cd9..d2a7262d9 100644 --- a/nitronode/metrics/exporter.go +++ b/nitronode/metrics/exporter.go @@ -289,11 +289,15 @@ func NewRuntimeMetricExporter(reg prometheus.Registerer) (RuntimeMetricExporter, rpcConnectionsTotal: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: MetricNamespace, Name: "rpc_connections_active", - Help: "Active RPC (WebSocket) connections. Labels: region, origin " + - "(both sourced from request headers / client metadata at connect " + - "time). Operator-decided values — bounded only by the set of " + - "connecting clients.", - }, []string{"region", "origin"}), + Help: "Active RPC (WebSocket) connections, labeled by application_id " + + "sourced from the app_id query parameter at connect time. " + + "Connections without an app_id are bucketed under _DEFAULT. " + + "Series for an application_id are deleted once its count drops to 0. " + + "NOTE: cardinality is bounded by the app_id format check " + + "(^[a-z0-9_-]{1,66}$) and by series shedding on disconnect — long-lived " + + "connections from many distinct but format-valid app_ids are not gated " + + "by a registry or per-app connection cap.", + }, []string{"application_id"}), rpcInflight: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: MetricNamespace, Name: "rpc_inflight", @@ -412,8 +416,15 @@ func (m *runtimeMetricExporter) ObserveRPCDuration(method, path string, success m.rpcRequestDurationSeconds.WithLabelValues(method, path, result.String()).Observe(duration.Seconds()) } -func (m *runtimeMetricExporter) SetRPCConnections(region, origin string, count uint32) { - m.rpcConnectionsTotal.WithLabelValues(region, origin).Set(float64(count)) +func (m *runtimeMetricExporter) SetRPCConnections(applicationID string, count uint32) { + label := getApplicationIDLabelValue(applicationID) + if count == 0 { + // Shed the series when the bucket empties so unique application_id values + // from clients cannot accumulate unbounded gauge labels over time. + m.rpcConnectionsTotal.DeleteLabelValues(label) + return + } + m.rpcConnectionsTotal.WithLabelValues(label).Set(float64(count)) } func (m *runtimeMetricExporter) IncRPCInflight(method string) { diff --git a/nitronode/metrics/interface.go b/nitronode/metrics/interface.go index c7392d4b6..1e519a2d4 100644 --- a/nitronode/metrics/interface.go +++ b/nitronode/metrics/interface.go @@ -37,7 +37,7 @@ type RuntimeMetricExporter interface { IncRPCMessage(msgType rpc.MsgType, method string) IncRPCRequest(method, path string, success bool) ObserveRPCDuration(method, path string, success bool, duration time.Duration) - SetRPCConnections(region, origin string, count uint32) + SetRPCConnections(applicationID string, count uint32) IncRPCInflight(method string) DecRPCInflight(method string) @@ -68,7 +68,7 @@ func (noopRuntimeMetricExporter) IncAppSessionKeys() func (noopRuntimeMetricExporter) IncRPCMessage(rpc.MsgType, string) {} func (noopRuntimeMetricExporter) IncRPCRequest(string, string, bool) {} func (noopRuntimeMetricExporter) ObserveRPCDuration(string, string, bool, time.Duration) {} -func (noopRuntimeMetricExporter) SetRPCConnections(string, string, uint32) {} +func (noopRuntimeMetricExporter) SetRPCConnections(string, uint32) {} func (noopRuntimeMetricExporter) IncAppStateUpdate(string) {} func (noopRuntimeMetricExporter) IncAppSessionUpdateSigValidation(string, app.AppSessionSignerTypeV1, bool) { } diff --git a/pkg/rpc/connection.go b/pkg/rpc/connection.go index 1a86e249a..e213da262 100644 --- a/pkg/rpc/connection.go +++ b/pkg/rpc/connection.go @@ -44,6 +44,10 @@ type Connection interface { // Origin returns the origin of the connection, such as the client's IP address or other identifying information. Origin() string + // ApplicationID returns the application identifier supplied at connection time (via the + // app_id query parameter). Returns an empty string if no application_id was provided. + ApplicationID() string + // RawRequests returns a read-only channel for receiving incoming raw request messages. // Messages received on this channel are raw bytes that need to be unmarshaled // into Request objects for processing. The channel is closed when the @@ -104,6 +108,8 @@ type WebsocketConnection struct { connectionID string // origin is the origin of the connection, such as the client's IP address origin string + // applicationID is the app_id query parameter supplied at WebSocket upgrade (may be empty) + applicationID string // websocketConn is the underlying WebSocket connection websocketConn GorillaWsConnectionAdapter // writeTimeout is the maximum duration to wait for a write to complete @@ -140,6 +146,9 @@ type WebsocketConnectionConfig struct { ConnectionID string // Origin is the origin of the connection, such as the client's IP address (optional) Origin string + // ApplicationID is the app_id query parameter supplied at WebSocket upgrade (optional). + // Caller is responsible for validation; the connection stores it as-is for metrics labeling. + ApplicationID string // WebsocketConn is the underlying WebSocket connection (required) WebsocketConn GorillaWsConnectionAdapter @@ -212,6 +221,7 @@ func NewWebsocketConnection(config WebsocketConnectionConfig) (*WebsocketConnect return &WebsocketConnection{ connectionID: config.ConnectionID, origin: config.Origin, + applicationID: config.ApplicationID, websocketConn: config.WebsocketConn, writeTimeout: config.WriteTimeout, pingInterval: config.PingInterval, @@ -321,6 +331,12 @@ func (conn *WebsocketConnection) Origin() string { return conn.origin } +// ApplicationID returns the app_id query parameter supplied at WebSocket upgrade, +// or an empty string if none was provided. +func (conn *WebsocketConnection) ApplicationID() string { + return conn.applicationID +} + // RawRequests returns the channel for processing incoming requests. func (conn *WebsocketConnection) RawRequests() <-chan []byte { return conn.processSink diff --git a/pkg/rpc/connection_hub.go b/pkg/rpc/connection_hub.go index b92c5491b..83587761f 100644 --- a/pkg/rpc/connection_hub.go +++ b/pkg/rpc/connection_hub.go @@ -5,9 +5,10 @@ import ( "sync" ) -const defaultConnectionRegion = "default" - -type ObserveConnectionsFn func(region, origin string, count uint32) +// ObserveConnectionsFn is invoked on connect and disconnect with the current per-application +// connection count. A count of 0 signals that the bucket is empty and the observer should +// shed any per-label state (e.g., delete the Prometheus gauge series) to bound cardinality. +type ObserveConnectionsFn func(applicationID string, count uint32) // ConnectionHub provides centralized management of all active RPC connections. // It maintains thread-safe mappings between connection IDs and Connection instances, @@ -28,9 +29,9 @@ type ConnectionHub struct { // mu protects concurrent access to the maps mu sync.RWMutex - // sourceMap is an optional mapping of connection sources (e.g., IP addresses or regions) - sourceMap map[string]uint32 - // observeConnections is a callback function to monitor connection counts by region + // appConnCount tracks active connection counts keyed by application_id (may be empty string) + appConnCount map[string]uint32 + // observeConnections is a callback function to report per-application connection counts observeConnections ObserveConnectionsFn } @@ -41,7 +42,7 @@ func NewConnectionHub(observeConnections ObserveConnectionsFn) *ConnectionHub { return &ConnectionHub{ connections: make(map[string]Connection), authMapping: make(map[string]map[string]bool), - sourceMap: make(map[string]uint32), + appConnCount: make(map[string]uint32), observeConnections: observeConnections, } } @@ -60,18 +61,23 @@ func (hub *ConnectionHub) Add(conn Connection) error { connID := conn.ConnectionID() hub.mu.Lock() - defer hub.mu.Unlock() // If the connection already exists, return an error if _, exists := hub.connections[connID]; exists { + hub.mu.Unlock() return fmt.Errorf("connection with ID %s already exists", connID) } hub.connections[connID] = conn - sourceID := getSourceID(conn.Origin()) - hub.sourceMap[sourceID]++ - hub.observeConnections(defaultConnectionRegion, conn.Origin(), uint32(hub.sourceMap[sourceID])) + appID := conn.ApplicationID() + hub.appConnCount[appID]++ + count := hub.appConnCount[appID] + hub.mu.Unlock() + + // Invoke the observer outside the lock: SetRPCConnections takes Prometheus-internal + // mutexes, and holding hub.mu across that would serialize readers (including Publish). + hub.observeConnections(appID, count) return nil } @@ -103,22 +109,33 @@ func (hub *ConnectionHub) Get(connID string) Connection { // This method is safe for concurrent access. func (hub *ConnectionHub) Remove(connID string) { hub.mu.Lock() - defer hub.mu.Unlock() conn, exists := hub.connections[connID] if !exists { + hub.mu.Unlock() return // No connection to remove } delete(hub.connections, connID) - sourceID := getSourceID(conn.Origin()) - if count, exists := hub.sourceMap[sourceID]; exists && count > 0 { - hub.sourceMap[sourceID]-- - if hub.sourceMap[sourceID] == 0 { - delete(hub.sourceMap, sourceID) + appID := conn.ApplicationID() + count, tracked := hub.appConnCount[appID] + changed := false + if tracked && count > 0 { + hub.appConnCount[appID]-- + count = hub.appConnCount[appID] + if count == 0 { + delete(hub.appConnCount, appID) } + changed = true + } + hub.mu.Unlock() + + // Only notify the observer when the bucket actually changed; otherwise we would + // emit DeleteLabelValues for an app_id the gauge never tracked. Invoke outside + // hub.mu to avoid serializing readers behind Prometheus-internal locks. + if changed { + hub.observeConnections(appID, count) } - hub.observeConnections(defaultConnectionRegion, conn.Origin(), uint32(hub.sourceMap[sourceID])) } // Publish broadcasts a message to all active connections for a specific user. @@ -153,6 +170,3 @@ func (hub *ConnectionHub) Publish(userID string, response []byte) { } } -func getSourceID(origin string) string { - return origin -} diff --git a/pkg/rpc/node.go b/pkg/rpc/node.go index 3964a1b27..55c8f6a32 100644 --- a/pkg/rpc/node.go +++ b/pkg/rpc/node.go @@ -157,7 +157,7 @@ func NewWebsocketNode(config WebsocketNodeConfig) (*WebsocketNode, error) { if config.ObserveConnections == nil { // Default implementation does nothing, but can be overridden for monitoring - config.ObserveConnections = func(region, origin string, count uint32) {} + config.ObserveConnections = func(applicationID string, count uint32) {} } if config.WsUpgraderReadBufferSize <= 0 { // It's the optimal default value as recommended @@ -236,6 +236,7 @@ func (wn *WebsocketNode) ServeHTTP(w http.ResponseWriter, r *http.Request) { connConfig := WebsocketConnectionConfig{ ConnectionID: connectionID, Origin: r.Header.Get("Origin"), + ApplicationID: applicationID, WebsocketConn: wsConnection, Logger: wn.cfg.Logger, ProcessBufferSize: wn.cfg.WsConnProcessBufferSize,