diff --git a/.gitignore b/.gitignore index 5fe0570a..00197fb4 100644 --- a/.gitignore +++ b/.gitignore @@ -29,4 +29,5 @@ tests/system/1 .env /data /release -AGENTS.md \ No newline at end of file +AGENTS.md +analysis \ No newline at end of file diff --git a/README.md b/README.md index 4b4da332..be37835b 100644 --- a/README.md +++ b/README.md @@ -18,8 +18,11 @@ service SupernodeService { // Optional request flags message StatusRequest { // When true, the response includes detailed P2P metrics and - // network peer information. This is gated to avoid heavy work - // unless explicitly requested. + // network peer information. + // + // This is gated to avoid heavy work unless explicitly requested: + // - peers_count is kept fast + // - heavier diagnostics are served from a cached last-known-good snapshot bool include_p2p_metrics = 1; } diff --git a/docs/gateway.md b/docs/gateway.md index 58814f5e..5325edcb 100644 --- a/docs/gateway.md +++ b/docs/gateway.md @@ -9,6 +9,9 @@ Returns supernode status: system resources (CPU, memory, storage), service info, - Query `include_p2p_metrics=true` enables detailed P2P metrics and peer info. - When omitted or false, peer count, peer addresses, and `p2p_metrics` are not included. +- When `include_p2p_metrics=true`, the response is designed to be latency-predictable: + - `network.peers_count` is computed on every request via a fast DHT path. + - Heavier diagnostics (peer list, DB stats, disk usage) are served from a single cached “last-known-good” snapshot and refreshed asynchronously at most once per a short TTL (see `p2p/p2p_stats.go` constants). Examples: diff --git a/p2p/client.go b/p2p/client.go index 9ed9b9d9..b68a38dc 100644 --- a/p2p/client.go +++ b/p2p/client.go @@ -31,8 +31,8 @@ type Client interface { // Delete a key, value Delete(ctx context.Context, key string) error - // Stats return status of p2p - Stats(ctx context.Context) (map[string]interface{}, error) + // Stats returns a typed snapshot of P2P state for internal/monitoring use. + Stats(ctx context.Context) (*StatsSnapshot, error) // NClosestNodes return n closest supernodes to a given key string (NB full node string formatting) NClosestNodes(ctx context.Context, n int, key string, ignores ...string) []string diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index 36b89c3c..70af36fb 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -218,8 +218,11 @@ func NewDHT(ctx context.Context, store Store, metaStore MetaStore, options *Opti return s, nil } -func (s *DHT) NodesLen() int { - return len(s.ht.nodes()) +func (s *DHT) PeersCount() int { + if s == nil || s.ht == nil { + return 0 + } + return s.ht.peersCount() } func (s *DHT) getSupernodeAddress(ctx context.Context) (string, error) { @@ -456,26 +459,44 @@ func (s *DHT) Delete(ctx context.Context, key string) error { return nil } -// Stats returns stats of DHT -func (s *DHT) Stats(ctx context.Context) (map[string]interface{}, error) { - if s.store == nil { - return nil, fmt.Errorf("store is nil") +func (s *DHT) PeersSnapshot() []*Node { + if s == nil || s.ht == nil { + return nil + } + in := s.ht.nodes() + if len(in) == 0 { + return nil } - dbStats, err := s.store.Stats(ctx) - if err != nil { - return nil, err + out := make([]*Node, 0, len(in)) + for _, n := range in { + if n == nil { + continue + } + cp := *n + if n.ID != nil { + cp.ID = append([]byte(nil), n.ID...) + } + if n.HashedID != nil { + cp.HashedID = append([]byte(nil), n.HashedID...) + } + out = append(out, &cp) } + return out +} - dhtStats := map[string]any{} - dhtStats["self"] = s.ht.self - dhtStats["peers_count"] = len(s.ht.nodes()) - dhtStats["peers"] = s.ht.nodes() - dhtStats["network"] = s.network.HandleMetricsSnapshot() - // Removed: recent per-request snapshots (logs provide visibility) - dhtStats["database"] = dbStats +func (s *DHT) NetworkHandleMetricsSnapshot() map[string]HandleCounters { + if s == nil || s.network == nil { + return nil + } + return s.network.HandleMetricsSnapshot() +} - return dhtStats, nil +func (s *DHT) DatabaseStats(ctx context.Context) (DatabaseStats, error) { + if s == nil || s.store == nil { + return DatabaseStats{}, fmt.Errorf("store is nil") + } + return s.store.Stats(ctx) } // newMessage creates a new message diff --git a/p2p/kademlia/hashtable.go b/p2p/kademlia/hashtable.go index dc22dc07..2fa5eaa5 100644 --- a/p2p/kademlia/hashtable.go +++ b/p2p/kademlia/hashtable.go @@ -368,6 +368,16 @@ func (ht *HashTable) nodes() []*Node { return out } +func (ht *HashTable) peersCount() int { + ht.mutex.RLock() + defer ht.mutex.RUnlock() + total := 0 + for _, b := range ht.routeTable { + total += len(b) + } + return total +} + // newRandomID: match B=256 (32 bytes) func newRandomID() ([]byte, error) { id := make([]byte, B/8) diff --git a/p2p/kademlia/store.go b/p2p/kademlia/store.go index b27362d2..5d8cb82b 100644 --- a/p2p/kademlia/store.go +++ b/p2p/kademlia/store.go @@ -7,6 +7,11 @@ import ( "github.com/LumeraProtocol/supernode/v2/p2p/kademlia/domain" ) +type DatabaseStats struct { + P2PDbSizeMb float64 + P2PDbRecordsCount int64 +} + // Store is the interface for implementing the storage mechanism for the DHT type Store interface { // Store a key/value pair for the queries node with the replication @@ -22,7 +27,7 @@ type Store interface { GetKeysForReplication(ctx context.Context, from time.Time, to time.Time) domain.KeysWithTimestamp // Stats returns stats of store - Stats(ctx context.Context) (map[string]interface{}, error) + Stats(ctx context.Context) (DatabaseStats, error) // Close the store Close(ctx context.Context) diff --git a/p2p/kademlia/store/mem/mem.go b/p2p/kademlia/store/mem/mem.go index 642bbae7..124f7a19 100644 --- a/p2p/kademlia/store/mem/mem.go +++ b/p2p/kademlia/store/mem/mem.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/LumeraProtocol/supernode/v2/p2p/kademlia" "github.com/LumeraProtocol/supernode/v2/p2p/kademlia/domain" ) @@ -68,9 +69,8 @@ func (s *Store) Close(_ context.Context) { } // Stats returns stats of store -func (s *Store) Stats(_ context.Context) (map[string]interface{}, error) { - stats := map[string]interface{}{} - return stats, nil +func (s *Store) Stats(_ context.Context) (kademlia.DatabaseStats, error) { + return kademlia.DatabaseStats{}, nil } // Count the records in store diff --git a/p2p/kademlia/store/sqlite/sqlite.go b/p2p/kademlia/store/sqlite/sqlite.go index fc665156..841636c7 100644 --- a/p2p/kademlia/store/sqlite/sqlite.go +++ b/p2p/kademlia/store/sqlite/sqlite.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/LumeraProtocol/supernode/v2/p2p/kademlia" "github.com/LumeraProtocol/supernode/v2/p2p/kademlia/store/cloud" "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" "github.com/LumeraProtocol/supernode/v2/pkg/utils" @@ -693,17 +694,17 @@ func sqliteOnDiskSizeBytes(ctx context.Context, dbPath string) (bytes uint64, ok } // Stats returns stats of store -func (s *Store) Stats(ctx context.Context) (map[string]interface{}, error) { - stats := map[string]interface{}{} +func (s *Store) Stats(ctx context.Context) (kademlia.DatabaseStats, error) { + stats := kademlia.DatabaseStats{} if bytes, ok := sqliteOnDiskSizeBytes(ctx, s.dbFilePath); !ok { logtrace.Error(ctx, "failed to get p2p db size", logtrace.Fields{logtrace.FieldError: "p2p db file not found"}) } else { - stats["p2p_db_size"] = utils.BytesToMB(bytes) + stats.P2PDbSizeMb = utils.BytesToMB(bytes) } if count, err := s.Count(ctx); err == nil { - stats["p2p_db_records_count"] = count + stats.P2PDbRecordsCount = int64(count) } else { logtrace.Error(ctx, "failed to get p2p records count", logtrace.Fields{logtrace.FieldError: err.Error()}) } diff --git a/p2p/mocks/Client.go b/p2p/mocks/Client.go index 67991025..763eace8 100644 --- a/p2p/mocks/Client.go +++ b/p2p/mocks/Client.go @@ -5,6 +5,7 @@ package mocks import ( context "context" + p2p "github.com/LumeraProtocol/supernode/v2/p2p" mock "github.com/stretchr/testify/mock" time "time" @@ -201,15 +202,15 @@ func (_m *Client) Retrieve(ctx context.Context, key string, localOnly ...bool) ( } // Stats provides a mock function with given fields: ctx -func (_m *Client) Stats(ctx context.Context) (map[string]interface{}, error) { +func (_m *Client) Stats(ctx context.Context) (*p2p.StatsSnapshot, error) { ret := _m.Called(ctx) - var r0 map[string]interface{} - if rf, ok := ret.Get(0).(func(context.Context) map[string]interface{}); ok { + var r0 *p2p.StatsSnapshot + if rf, ok := ret.Get(0).(func(context.Context) *p2p.StatsSnapshot); ok { r0 = rf(ctx) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(map[string]interface{}) + r0 = ret.Get(0).(*p2p.StatsSnapshot) } } diff --git a/p2p/p2p.go b/p2p/p2p.go index 6a8a35ff..584fd787 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -4,6 +4,7 @@ import ( "context" "encoding/hex" "fmt" + "sync" "time" "github.com/LumeraProtocol/supernode/v2/p2p/kademlia" @@ -48,6 +49,8 @@ type p2p struct { lumeraClient lumera.Client keyring keyring.Keyring // Add the keyring field rqstore rqstore.Store + stats *p2pStatsManager + statsOnce sync.Once } // Run the kademlia network @@ -166,56 +169,20 @@ func (s *p2p) Delete(ctx context.Context, key string) error { return s.dht.Delete(ctx, key) } -// Stats return status of p2p -func (s *p2p) Stats(ctx context.Context) (map[string]interface{}, error) { +// Stats returns a typed snapshot of the P2P subsystem state. +func (s *p2p) Stats(ctx context.Context) (*StatsSnapshot, error) { if err := ctx.Err(); err != nil { return nil, err } - - retStats := map[string]interface{}{} - dhtStats := map[string]any{} - - if s.dht != nil { - if stats, err := s.dht.Stats(ctx); err != nil { - logtrace.Error(ctx, "failed to get dht stats", logtrace.Fields{logtrace.FieldModule: "p2p", logtrace.FieldError: err.Error()}) - } else { - dhtStats = stats - } - - retStats["ban-list"] = s.dht.BanListSnapshot() - retStats["conn-pool"] = s.dht.ConnPoolSnapshot() - - // Expose DHT rolling metrics snapshot both under the top-level key (as expected by - // the status service) and also within the DHT map for backward compatibility. - snapshot := s.dht.MetricsSnapshot() - retStats["dht_metrics"] = snapshot - dhtStats["dht_metrics"] = snapshot - } else { - retStats["ban-list"] = []kademlia.BanSnapshot{} - retStats["conn-pool"] = map[string]int64{} - } - - if s.store != nil { - if dbStats, err := s.store.Stats(ctx); err != nil { - logtrace.Error(ctx, "failed to get store stats", logtrace.Fields{logtrace.FieldModule: "p2p", logtrace.FieldError: err.Error()}) - } else if dbStats != nil { - dhtStats["database"] = dbStats + s.statsOnce.Do(func() { + if s.stats == nil { + s.stats = newP2PStatsManager() } + }) + if s.stats == nil { + return nil, errors.New("p2p stats manager is nil") } - - retStats["dht"] = dhtStats - retStats["config"] = s.config - - // get free space of current kademlia folder - if s.config != nil { - if diskUse, err := utils.DiskUsage(s.config.DataDir); err != nil { - logtrace.Error(ctx, "get disk info failed", logtrace.Fields{logtrace.FieldModule: "p2p", logtrace.FieldError: err.Error()}) - } else { - retStats["disk-info"] = &diskUse - } - } - - return retStats, nil + return s.stats.Stats(ctx, s) } // NClosestNodes returns a list of n closest masternode to a given string @@ -301,6 +268,7 @@ func New(ctx context.Context, config *Config, lumeraClient lumera.Client, kr key lumeraClient: lumeraClient, keyring: kr, // Store the keyring rqstore: rqstore, + stats: newP2PStatsManager(), }, nil } diff --git a/p2p/p2p_stats.go b/p2p/p2p_stats.go new file mode 100644 index 00000000..49e1bcaa --- /dev/null +++ b/p2p/p2p_stats.go @@ -0,0 +1,266 @@ +package p2p + +import ( + "context" + "time" + + "sync/atomic" + + "github.com/LumeraProtocol/supernode/v2/p2p/kademlia" + "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" + "github.com/LumeraProtocol/supernode/v2/pkg/utils" + ristretto "github.com/dgraph-io/ristretto/v2" +) + +const ( + // Cache layout: + // - p2pStatsLKGKey is the long-lived last-known-good snapshot. Status requests serve from this + // snapshot immediately (low latency) and refresh it asynchronously when stale. + // - p2pStatsFreshKey is a short-lived freshness marker. Its presence means “recently refreshed”, + // so we do not start another heavy refresh yet. + p2pStatsLKGKey = "p2p_stats/snapshot" + p2pStatsFreshKey = "p2p_stats/fresh" + + // Knobs (tune latency vs freshness): + // + // p2pStatsFreshTTL: + // Minimum time between starting heavy diagnostic refreshes (DB stats, disk usage, peer list). + // On an incoming Stats() call (e.g. /api/v1/status?include_p2p_metrics=true), if the freshness + // marker is missing/expired, we trigger ONE background refresh and immediately return the + // current snapshot. + // + // p2pStatsCacheKeepAlive: + // How long we keep the last-known-good snapshot around for fallback when refreshes fail. + // + // p2pStatsRefreshTimeout: + // Per-refresh time budget for the heavy refresh goroutine. + p2pStatsFreshTTL = 30 * time.Second + p2pStatsCacheKeepAlive = 10 * time.Minute + p2pStatsRefreshTimeout = 6 * time.Second + + p2pStatsSlowRefreshThreshold = 750 * time.Millisecond +) + +type p2pStatsManager struct { + cache *ristretto.Cache[string, any] + + refreshInFlight atomic.Bool +} + +func newP2PStatsManager() *p2pStatsManager { + c, err := ristretto.NewCache(&ristretto.Config[string, any]{ + NumCounters: 100, + MaxCost: 10, + BufferItems: 64, + }) + if err != nil { + logtrace.Error(context.Background(), "failed to create p2p stats cache (continuing without caching)", logtrace.Fields{ + logtrace.FieldModule: "p2p", + logtrace.FieldError: err.Error(), + }) + } + return &p2pStatsManager{cache: c} +} + +func (m *p2pStatsManager) getSnapshot() *StatsSnapshot { + if m == nil || m.cache == nil { + return nil + } + v, ok := m.cache.Get(p2pStatsLKGKey) + if !ok { + return nil + } + snap, _ := v.(*StatsSnapshot) + return snap +} + +func (m *p2pStatsManager) setSnapshot(snap *StatsSnapshot) { + if m == nil || m.cache == nil || snap == nil { + return + } + m.cache.SetWithTTL(p2pStatsLKGKey, snap, 1, p2pStatsCacheKeepAlive) +} + +func (m *p2pStatsManager) isFresh() bool { + if m == nil || m.cache == nil { + return false + } + _, ok := m.cache.Get(p2pStatsFreshKey) + return ok +} + +func (m *p2pStatsManager) markFresh() { + if m == nil || m.cache == nil { + return + } + m.cache.SetWithTTL(p2pStatsFreshKey, true, 1, p2pStatsFreshTTL) +} + +// Stats returns a typed snapshot compatible with the p2p.Client interface. +// +// Incoming call semantics: +// - The call stays latency-predictable: it returns immediately from the cached snapshot. +// - PeersCount is always refreshed via a fast DHT path on every call (no peer list allocation). +// - Heavy diagnostics are refreshed in the background at most once per p2pStatsFreshTTL, deduped +// across concurrent callers (refreshInFlight). +func (m *p2pStatsManager) Stats(ctx context.Context, p *p2p) (*StatsSnapshot, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + + peersCount := int32(0) + if p != nil && p.dht != nil { + peersCount = int32(p.dht.PeersCount()) + } + + prev := m.getSnapshot() + snap := cloneSnapshot(prev) + snap.PeersCount = peersCount + // Store a separate struct instance in the cache to avoid aliasing with the returned snapshot, + // while avoiding an extra deep clone on every Stats() call. + cachedSnap := *snap + m.setSnapshot(&cachedSnap) + + if !m.isFresh() { + m.maybeRefreshDiagnostics(ctx, p) + } + + return snap, nil +} + +func (m *p2pStatsManager) maybeRefreshDiagnostics(ctx context.Context, p *p2p) { + if m == nil || p == nil { + return + } + if !m.refreshInFlight.CompareAndSwap(false, true) { + return + } + + logCtx := ctx + go func() { + defer m.refreshInFlight.Store(false) + + start := time.Now() + refreshCtx, cancel := context.WithTimeout(context.Background(), p2pStatsRefreshTimeout) + err := m.refreshDiagnostics(refreshCtx, p) + cancel() + dur := time.Since(start) + + if err != nil { + logtrace.Warn(logCtx, "p2p stats diagnostics refresh failed", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "refresh": "diagnostics", + "ms": dur.Milliseconds(), + logtrace.FieldError: err.Error(), + }) + } + if dur > p2pStatsSlowRefreshThreshold { + logtrace.Warn(logCtx, "p2p stats diagnostics refresh slow", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "refresh": "diagnostics", + "ms": dur.Milliseconds(), + }) + } + }() +} + +func (m *p2pStatsManager) refreshDiagnostics(ctx context.Context, p *p2p) error { + if err := ctx.Err(); err != nil { + return err + } + + prev := m.getSnapshot() + next := cloneSnapshot(prev) + + var refreshErr error + + if p != nil && p.dht != nil { + peers := p.dht.PeersSnapshot() + next.Peers = peers + next.PeersCount = int32(len(peers)) + next.NetworkHandleMetrics = p.dht.NetworkHandleMetricsSnapshot() + dbStats, err := p.dht.DatabaseStats(ctx) + if err != nil { + refreshErr = err + } else { + next.Database = dbStats + } + next.BanList = p.dht.BanListSnapshot() + next.ConnPool = p.dht.ConnPoolSnapshot() + + metricsSnap := p.dht.MetricsSnapshot() + next.DHTMetrics = metricsSnap + } + + if p != nil && p.config != nil { + diskUse, err := utils.DiskUsage(p.config.DataDir) + if err != nil { + if refreshErr == nil { + refreshErr = err + } + } else { + next.DiskInfo = &diskUse + } + } + + m.setSnapshot(next) + m.markFresh() + return refreshErr +} + +func cloneSnapshot(in *StatsSnapshot) *StatsSnapshot { + if in == nil { + return &StatsSnapshot{ + BanList: []kademlia.BanSnapshot{}, + ConnPool: map[string]int64{}, + } + } + + out := *in + + if in.Peers != nil { + out.Peers = make([]*kademlia.Node, len(in.Peers)) + for i, peer := range in.Peers { + if peer == nil { + continue + } + cp := *peer + if peer.ID != nil { + cp.ID = append([]byte(nil), peer.ID...) + } + if peer.HashedID != nil { + cp.HashedID = append([]byte(nil), peer.HashedID...) + } + out.Peers[i] = &cp + } + } + + if in.BanList != nil { + out.BanList = append([]kademlia.BanSnapshot(nil), in.BanList...) + } else { + out.BanList = []kademlia.BanSnapshot{} + } + + if in.ConnPool != nil { + out.ConnPool = make(map[string]int64, len(in.ConnPool)) + for k, v := range in.ConnPool { + out.ConnPool[k] = v + } + } else { + out.ConnPool = map[string]int64{} + } + + if in.NetworkHandleMetrics != nil { + out.NetworkHandleMetrics = make(map[string]kademlia.HandleCounters, len(in.NetworkHandleMetrics)) + for k, v := range in.NetworkHandleMetrics { + out.NetworkHandleMetrics[k] = v + } + } + + if in.DiskInfo != nil { + du := *in.DiskInfo + out.DiskInfo = &du + } + + return &out +} diff --git a/p2p/stats_snapshot.go b/p2p/stats_snapshot.go new file mode 100644 index 00000000..72599869 --- /dev/null +++ b/p2p/stats_snapshot.go @@ -0,0 +1,24 @@ +package p2p + +import ( + "github.com/LumeraProtocol/supernode/v2/p2p/kademlia" + "github.com/LumeraProtocol/supernode/v2/pkg/utils" +) + +// StatsSnapshot is a typed alternative to Client.Stats' map[string]any payload. +// It is intended for internal consumers that want compile-time safety. +type StatsSnapshot struct { + PeersCount int32 + Peers []*kademlia.Node + + BanList []kademlia.BanSnapshot + ConnPool map[string]int64 + + NetworkHandleMetrics map[string]kademlia.HandleCounters + DHTMetrics kademlia.DHTMetricsSnapshot + + Database DatabaseStats + DiskInfo *utils.DiskStatus +} + +type DatabaseStats = kademlia.DatabaseStats diff --git a/supernode/status/service.go b/supernode/status/service.go index 0645385f..33faef1d 100644 --- a/supernode/status/service.go +++ b/supernode/status/service.go @@ -7,11 +7,9 @@ import ( pb "github.com/LumeraProtocol/supernode/v2/gen/supernode" "github.com/LumeraProtocol/supernode/v2/p2p" - "github.com/LumeraProtocol/supernode/v2/p2p/kademlia" "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" "github.com/LumeraProtocol/supernode/v2/pkg/lumera" "github.com/LumeraProtocol/supernode/v2/pkg/task" - "github.com/LumeraProtocol/supernode/v2/pkg/utils" "github.com/LumeraProtocol/supernode/v2/supernode/config" ) @@ -96,12 +94,6 @@ func (s *SupernodeStatusService) GetStatus(ctx context.Context, includeP2PMetric }) } - if resp.Network == nil { - resp.Network = &pb.StatusResponse_Network{} - } - resp.Network.PeersCount = 0 - resp.Network.PeerAddresses = []string{} - // Populate running tasks from injected tracker if s.tracker != nil { snap := s.tracker.Snapshot() @@ -116,100 +108,72 @@ func (s *SupernodeStatusService) GetStatus(ctx context.Context, includeP2PMetric } } - // Prepare optional P2P metrics container - pm := &pb.StatusResponse_P2PMetrics{ - DhtMetrics: &pb.StatusResponse_P2PMetrics_DhtMetrics{}, - NetworkHandleMetrics: map[string]*pb.StatusResponse_P2PMetrics_HandleCounters{}, - ConnPoolMetrics: map[string]int64{}, - BanList: []*pb.StatusResponse_P2PMetrics_BanEntry{}, - Database: &pb.StatusResponse_P2PMetrics_DatabaseStats{}, - Disk: &pb.StatusResponse_P2PMetrics_DiskStatus{}, - } - if includeP2PMetrics && s.p2pService != nil { + // Prepare optional P2P metrics container (only when requested). + pm := &pb.StatusResponse_P2PMetrics{ + DhtMetrics: &pb.StatusResponse_P2PMetrics_DhtMetrics{}, + NetworkHandleMetrics: map[string]*pb.StatusResponse_P2PMetrics_HandleCounters{}, + ConnPoolMetrics: map[string]int64{}, + BanList: []*pb.StatusResponse_P2PMetrics_BanEntry{}, + Database: &pb.StatusResponse_P2PMetrics_DatabaseStats{}, + Disk: &pb.StatusResponse_P2PMetrics_DiskStatus{}, + } + // Bound P2P metrics collection so status can't hang if P2P is slow p2pCtx, cancel := context.WithTimeout(ctx, statusSubsystemTimeout) defer cancel() - p2pStats, err := s.p2pService.Stats(p2pCtx) + snap, err := s.p2pService.Stats(p2pCtx) if err != nil { - logtrace.Error(ctx, "failed to get p2p stats", logtrace.Fields{logtrace.FieldError: err.Error()}) - } else { - if dhtStats, ok := p2pStats["dht"].(map[string]interface{}); ok { - if peersCount, ok := dhtStats["peers_count"].(int); ok { - resp.Network.PeersCount = int32(peersCount) - } - if peers, ok := dhtStats["peers"].([]*kademlia.Node); ok { - resp.Network.PeerAddresses = make([]string, 0, len(peers)) - for _, peer := range peers { - resp.Network.PeerAddresses = append(resp.Network.PeerAddresses, fmt.Sprintf("%s@%s:%d", string(peer.ID), peer.IP, peer.Port)) - } - } else { - resp.Network.PeerAddresses = []string{} - } - } - if du, ok := p2pStats["disk-info"].(utils.DiskStatus); ok { - pm.Disk.AllMb = du.All - pm.Disk.UsedMb = du.Used - pm.Disk.FreeMb = du.Free - } else if duPtr, ok := p2pStats["disk-info"].(*utils.DiskStatus); ok && duPtr != nil { - pm.Disk.AllMb = duPtr.All - pm.Disk.UsedMb = duPtr.Used - pm.Disk.FreeMb = duPtr.Free - } - if bans, ok := p2pStats["ban-list"].([]kademlia.BanSnapshot); ok { - for _, b := range bans { - pm.BanList = append(pm.BanList, &pb.StatusResponse_P2PMetrics_BanEntry{Id: b.ID, Ip: b.IP, Port: uint32(b.Port), Count: int32(b.Count), CreatedAtUnix: b.CreatedAt.Unix(), AgeSeconds: int64(b.Age.Seconds())}) - } - } - if pool, ok := p2pStats["conn-pool"].(map[string]int64); ok { - for k, v := range pool { - pm.ConnPoolMetrics[k] = v - } - } - if dhtStats, ok := p2pStats["dht"].(map[string]interface{}); ok { - if db, ok := dhtStats["database"].(map[string]interface{}); ok { - var sizeMB float64 - if v, ok := db["p2p_db_size"].(float64); ok { - sizeMB = v - } - var recs int64 - switch v := db["p2p_db_records_count"].(type) { - case int: - recs = int64(v) - case int64: - recs = v - case float64: - recs = int64(v) - } - pm.Database.P2PDbSizeMb = sizeMB - pm.Database.P2PDbRecordsCount = recs - } - if nhm, ok := dhtStats["network"].(map[string]kademlia.HandleCounters); ok { - for k, c := range nhm { - pm.NetworkHandleMetrics[k] = &pb.StatusResponse_P2PMetrics_HandleCounters{Total: c.Total, Success: c.Success, Failure: c.Failure, Timeout: c.Timeout} - } - } else if nhmI, ok := dhtStats["network"].(map[string]interface{}); ok { - for k, vi := range nhmI { - if c, ok := vi.(kademlia.HandleCounters); ok { - pm.NetworkHandleMetrics[k] = &pb.StatusResponse_P2PMetrics_HandleCounters{Total: c.Total, Success: c.Success, Failure: c.Failure, Timeout: c.Timeout} - } - } - } - } - if snap, ok := p2pStats["dht_metrics"].(kademlia.DHTMetricsSnapshot); ok { - for _, sp := range snap.StoreSuccessRecent { - pm.DhtMetrics.StoreSuccessRecent = append(pm.DhtMetrics.StoreSuccessRecent, &pb.StatusResponse_P2PMetrics_DhtMetrics_StoreSuccessPoint{TimeUnix: sp.Time.Unix(), Requests: int32(sp.Requests), Successful: int32(sp.Successful), SuccessRate: sp.SuccessRate}) - } - for _, bp := range snap.BatchRetrieveRecent { - pm.DhtMetrics.BatchRetrieveRecent = append(pm.DhtMetrics.BatchRetrieveRecent, &pb.StatusResponse_P2PMetrics_DhtMetrics_BatchRetrievePoint{TimeUnix: bp.Time.Unix(), Keys: int32(bp.Keys), Required: int32(bp.Required), FoundLocal: int32(bp.FoundLocal), FoundNetwork: int32(bp.FoundNet), DurationMs: bp.Duration.Milliseconds()}) + logtrace.Error(ctx, "failed to get p2p stats snapshot", logtrace.Fields{logtrace.FieldError: err.Error()}) + return resp, err + } + + if resp.Network == nil { + resp.Network = &pb.StatusResponse_Network{} + } + resp.Network.PeersCount = snap.PeersCount + resp.Network.PeerAddresses = []string{} + if peers := snap.Peers; len(peers) > 0 { + resp.Network.PeerAddresses = make([]string, 0, len(peers)) + for _, peer := range peers { + if peer == nil { + continue } - pm.DhtMetrics.HotPathBannedSkips = snap.HotPathBannedSkips - pm.DhtMetrics.HotPathBanIncrements = snap.HotPathBanIncrements + resp.Network.PeerAddresses = append(resp.Network.PeerAddresses, fmt.Sprintf("%s@%s:%d", string(peer.ID), peer.IP, peer.Port)) } } - } - if includeP2PMetrics { + + if snap.DiskInfo != nil { + pm.Disk.AllMb = snap.DiskInfo.All + pm.Disk.UsedMb = snap.DiskInfo.Used + pm.Disk.FreeMb = snap.DiskInfo.Free + } + for _, b := range snap.BanList { + pm.BanList = append(pm.BanList, &pb.StatusResponse_P2PMetrics_BanEntry{Id: b.ID, Ip: b.IP, Port: uint32(b.Port), Count: int32(b.Count), CreatedAtUnix: b.CreatedAt.Unix(), AgeSeconds: int64(b.Age.Seconds())}) + } + for k, v := range snap.ConnPool { + pm.ConnPoolMetrics[k] = v + } + + pm.Database.P2PDbSizeMb = snap.Database.P2PDbSizeMb + pm.Database.P2PDbRecordsCount = snap.Database.P2PDbRecordsCount + + for k, c := range snap.NetworkHandleMetrics { + pm.NetworkHandleMetrics[k] = &pb.StatusResponse_P2PMetrics_HandleCounters{Total: c.Total, Success: c.Success, Failure: c.Failure, Timeout: c.Timeout} + } + + for _, sp := range snap.DHTMetrics.StoreSuccessRecent { + pm.DhtMetrics.StoreSuccessRecent = append(pm.DhtMetrics.StoreSuccessRecent, &pb.StatusResponse_P2PMetrics_DhtMetrics_StoreSuccessPoint{TimeUnix: sp.Time.Unix(), Requests: int32(sp.Requests), Successful: int32(sp.Successful), SuccessRate: sp.SuccessRate}) + } + for _, bp := range snap.DHTMetrics.BatchRetrieveRecent { + pm.DhtMetrics.BatchRetrieveRecent = append(pm.DhtMetrics.BatchRetrieveRecent, &pb.StatusResponse_P2PMetrics_DhtMetrics_BatchRetrievePoint{TimeUnix: bp.Time.Unix(), Keys: int32(bp.Keys), Required: int32(bp.Required), FoundLocal: int32(bp.FoundLocal), FoundNetwork: int32(bp.FoundNet), DurationMs: bp.Duration.Milliseconds()}) + } + pm.DhtMetrics.HotPathBannedSkips = snap.DHTMetrics.HotPathBannedSkips + pm.DhtMetrics.HotPathBanIncrements = snap.DHTMetrics.HotPathBanIncrements + resp.P2PMetrics = pm + } else if includeP2PMetrics { + return resp, fmt.Errorf("p2p service is nil") } if s.config != nil && s.lumeraClient != nil { diff --git a/supernode/supernode_metrics/constants.go b/supernode/supernode_metrics/constants.go index 5dba5d78..0b8c1e41 100644 --- a/supernode/supernode_metrics/constants.go +++ b/supernode/supernode_metrics/constants.go @@ -11,7 +11,7 @@ const ( // metrics reporting and active probing begin. This allows the node to fully // initialize (establish P2P connections, sync state, etc.) before participating // in the reachability protocol. - DefaultStartupDelaySeconds = 30 + DefaultStartupDelaySeconds = 300 // FallbackMetricsUpdateIntervalBlocks is used when chain params are unavailable // or return an invalid/zero value. This should be conservative and stable. diff --git a/supernode/supernode_metrics/monitor_service.go b/supernode/supernode_metrics/monitor_service.go index 0ddaa0dc..4fd616af 100644 --- a/supernode/supernode_metrics/monitor_service.go +++ b/supernode/supernode_metrics/monitor_service.go @@ -448,10 +448,10 @@ func (hm *Collector) reportHealth(ctx context.Context) { break } } - if allPortsOpen { + if allPortsOpen && metrics.PeersCount > 0 { logtrace.Info(ctx, "Reporting supernode metrics", fields) } else { - logtrace.Warn(ctx, "Reporting supernode metrics (one or more ports not OPEN)", fields) + logtrace.Warn(ctx, "Reporting supernode metrics (one or more ports not OPEN or peers_count=0)", fields) } // Report the metrics snapshot to the blockchain using the supernode