Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions nitronode/metrics/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,15 @@ func NewRuntimeMetricExporter(reg prometheus.Registerer) (RuntimeMetricExporter,
rpcConnectionsTotal: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricNamespace,
Name: "rpc_connections_active",
Help: "Active RPC (WebSocket) connections. Labels: region, origin " +
"(both sourced from request headers / client metadata at connect " +
"time). Operator-decided values — bounded only by the set of " +
"connecting clients.",
}, []string{"region", "origin"}),
Help: "Active RPC (WebSocket) connections, labeled by application_id " +
"sourced from the app_id query parameter at connect time. " +
"Connections without an app_id are bucketed under _DEFAULT. " +
"Series for an application_id are deleted once its count drops to 0. " +
"NOTE: cardinality is bounded by the app_id format check " +
"(^[a-z0-9_-]{1,66}$) and by series shedding on disconnect — long-lived " +
"connections from many distinct but format-valid app_ids are not gated " +
"by a registry or per-app connection cap.",
}, []string{"application_id"}),
rpcInflight: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricNamespace,
Name: "rpc_inflight",
Expand Down Expand Up @@ -412,8 +416,15 @@ func (m *runtimeMetricExporter) ObserveRPCDuration(method, path string, success
m.rpcRequestDurationSeconds.WithLabelValues(method, path, result.String()).Observe(duration.Seconds())
}

func (m *runtimeMetricExporter) SetRPCConnections(region, origin string, count uint32) {
m.rpcConnectionsTotal.WithLabelValues(region, origin).Set(float64(count))
func (m *runtimeMetricExporter) SetRPCConnections(applicationID string, count uint32) {
label := getApplicationIDLabelValue(applicationID)
if count == 0 {
// Shed the series when the bucket empties so unique application_id values
// from clients cannot accumulate unbounded gauge labels over time.
m.rpcConnectionsTotal.DeleteLabelValues(label)
return
}
m.rpcConnectionsTotal.WithLabelValues(label).Set(float64(count))
Comment thread
nksazonov marked this conversation as resolved.
}

func (m *runtimeMetricExporter) IncRPCInflight(method string) {
Expand Down
4 changes: 2 additions & 2 deletions nitronode/metrics/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type RuntimeMetricExporter interface {
IncRPCMessage(msgType rpc.MsgType, method string)
IncRPCRequest(method, path string, success bool)
ObserveRPCDuration(method, path string, success bool, duration time.Duration)
SetRPCConnections(region, origin string, count uint32)
SetRPCConnections(applicationID string, count uint32)
IncRPCInflight(method string)
DecRPCInflight(method string)

Expand Down Expand Up @@ -68,7 +68,7 @@ func (noopRuntimeMetricExporter) IncAppSessionKeys()
func (noopRuntimeMetricExporter) IncRPCMessage(rpc.MsgType, string) {}
func (noopRuntimeMetricExporter) IncRPCRequest(string, string, bool) {}
func (noopRuntimeMetricExporter) ObserveRPCDuration(string, string, bool, time.Duration) {}
func (noopRuntimeMetricExporter) SetRPCConnections(string, string, uint32) {}
func (noopRuntimeMetricExporter) SetRPCConnections(string, uint32) {}
func (noopRuntimeMetricExporter) IncAppStateUpdate(string) {}
func (noopRuntimeMetricExporter) IncAppSessionUpdateSigValidation(string, app.AppSessionSignerTypeV1, bool) {
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/rpc/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ type Connection interface {
// Origin returns the origin of the connection, such as the client's IP address or other identifying information.
Origin() string

// ApplicationID returns the application identifier supplied at connection time (via the
// app_id query parameter). Returns an empty string if no application_id was provided.
ApplicationID() string

// RawRequests returns a read-only channel for receiving incoming raw request messages.
// Messages received on this channel are raw bytes that need to be unmarshaled
// into Request objects for processing. The channel is closed when the
Expand Down Expand Up @@ -104,6 +108,8 @@ type WebsocketConnection struct {
connectionID string
// origin is the origin of the connection, such as the client's IP address
origin string
// applicationID is the app_id query parameter supplied at WebSocket upgrade (may be empty)
applicationID string
// websocketConn is the underlying WebSocket connection
websocketConn GorillaWsConnectionAdapter
// writeTimeout is the maximum duration to wait for a write to complete
Expand Down Expand Up @@ -140,6 +146,9 @@ type WebsocketConnectionConfig struct {
ConnectionID string
// Origin is the origin of the connection, such as the client's IP address (optional)
Origin string
// ApplicationID is the app_id query parameter supplied at WebSocket upgrade (optional).
// Caller is responsible for validation; the connection stores it as-is for metrics labeling.
ApplicationID string
// WebsocketConn is the underlying WebSocket connection (required)
WebsocketConn GorillaWsConnectionAdapter

Expand Down Expand Up @@ -212,6 +221,7 @@ func NewWebsocketConnection(config WebsocketConnectionConfig) (*WebsocketConnect
return &WebsocketConnection{
connectionID: config.ConnectionID,
origin: config.Origin,
applicationID: config.ApplicationID,
websocketConn: config.WebsocketConn,
writeTimeout: config.WriteTimeout,
pingInterval: config.PingInterval,
Expand Down Expand Up @@ -321,6 +331,12 @@ func (conn *WebsocketConnection) Origin() string {
return conn.origin
}

// ApplicationID returns the app_id query parameter supplied at WebSocket upgrade,
// or an empty string if none was provided.
func (conn *WebsocketConnection) ApplicationID() string {
return conn.applicationID
}

// RawRequests returns the channel for processing incoming requests.
func (conn *WebsocketConnection) RawRequests() <-chan []byte {
return conn.processSink
Expand Down
56 changes: 35 additions & 21 deletions pkg/rpc/connection_hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"sync"
)

const defaultConnectionRegion = "default"

type ObserveConnectionsFn func(region, origin string, count uint32)
// ObserveConnectionsFn is invoked on connect and disconnect with the current per-application
// connection count. A count of 0 signals that the bucket is empty and the observer should
// shed any per-label state (e.g., delete the Prometheus gauge series) to bound cardinality.
type ObserveConnectionsFn func(applicationID string, count uint32)

// ConnectionHub provides centralized management of all active RPC connections.
// It maintains thread-safe mappings between connection IDs and Connection instances,
Expand All @@ -28,9 +29,9 @@ type ConnectionHub struct {
// mu protects concurrent access to the maps
mu sync.RWMutex

// sourceMap is an optional mapping of connection sources (e.g., IP addresses or regions)
sourceMap map[string]uint32
// observeConnections is a callback function to monitor connection counts by region
// appConnCount tracks active connection counts keyed by application_id (may be empty string)
appConnCount map[string]uint32
// observeConnections is a callback function to report per-application connection counts
observeConnections ObserveConnectionsFn
}

Expand All @@ -41,7 +42,7 @@ func NewConnectionHub(observeConnections ObserveConnectionsFn) *ConnectionHub {
return &ConnectionHub{
connections: make(map[string]Connection),
authMapping: make(map[string]map[string]bool),
sourceMap: make(map[string]uint32),
appConnCount: make(map[string]uint32),
observeConnections: observeConnections,
}
}
Expand All @@ -60,18 +61,23 @@ func (hub *ConnectionHub) Add(conn Connection) error {
connID := conn.ConnectionID()

hub.mu.Lock()
defer hub.mu.Unlock()

// If the connection already exists, return an error
if _, exists := hub.connections[connID]; exists {
hub.mu.Unlock()
return fmt.Errorf("connection with ID %s already exists", connID)
}

hub.connections[connID] = conn

sourceID := getSourceID(conn.Origin())
hub.sourceMap[sourceID]++
hub.observeConnections(defaultConnectionRegion, conn.Origin(), uint32(hub.sourceMap[sourceID]))
appID := conn.ApplicationID()
hub.appConnCount[appID]++
count := hub.appConnCount[appID]
hub.mu.Unlock()

// Invoke the observer outside the lock: SetRPCConnections takes Prometheus-internal
// mutexes, and holding hub.mu across that would serialize readers (including Publish).
hub.observeConnections(appID, count)

return nil
}
Expand Down Expand Up @@ -103,22 +109,33 @@ func (hub *ConnectionHub) Get(connID string) Connection {
// This method is safe for concurrent access.
func (hub *ConnectionHub) Remove(connID string) {
hub.mu.Lock()
defer hub.mu.Unlock()

conn, exists := hub.connections[connID]
if !exists {
hub.mu.Unlock()
return // No connection to remove
}
delete(hub.connections, connID)

sourceID := getSourceID(conn.Origin())
if count, exists := hub.sourceMap[sourceID]; exists && count > 0 {
hub.sourceMap[sourceID]--
if hub.sourceMap[sourceID] == 0 {
delete(hub.sourceMap, sourceID)
appID := conn.ApplicationID()
count, tracked := hub.appConnCount[appID]
changed := false
if tracked && count > 0 {
hub.appConnCount[appID]--
count = hub.appConnCount[appID]
if count == 0 {
delete(hub.appConnCount, appID)
}
changed = true
}
hub.mu.Unlock()

// Only notify the observer when the bucket actually changed; otherwise we would
// emit DeleteLabelValues for an app_id the gauge never tracked. Invoke outside
// hub.mu to avoid serializing readers behind Prometheus-internal locks.
if changed {
hub.observeConnections(appID, count)
}
hub.observeConnections(defaultConnectionRegion, conn.Origin(), uint32(hub.sourceMap[sourceID]))
}

// Publish broadcasts a message to all active connections for a specific user.
Expand Down Expand Up @@ -153,6 +170,3 @@ func (hub *ConnectionHub) Publish(userID string, response []byte) {
}
}

func getSourceID(origin string) string {
return origin
}
3 changes: 2 additions & 1 deletion pkg/rpc/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func NewWebsocketNode(config WebsocketNodeConfig) (*WebsocketNode, error) {

if config.ObserveConnections == nil {
// Default implementation does nothing, but can be overridden for monitoring
config.ObserveConnections = func(region, origin string, count uint32) {}
config.ObserveConnections = func(applicationID string, count uint32) {}
}
if config.WsUpgraderReadBufferSize <= 0 {
// It's the optimal default value as recommended
Expand Down Expand Up @@ -236,6 +236,7 @@ func (wn *WebsocketNode) ServeHTTP(w http.ResponseWriter, r *http.Request) {
connConfig := WebsocketConnectionConfig{
ConnectionID: connectionID,
Origin: r.Header.Get("Origin"),
ApplicationID: applicationID,
WebsocketConn: wsConnection,
Logger: wn.cfg.Logger,
ProcessBufferSize: wn.cfg.WsConnProcessBufferSize,
Expand Down