Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ tests/system/1
.env
/data
/release
AGENTS.md
AGENTS.md
analysis
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ service SupernodeService {
// Optional request flags
message StatusRequest {
// When true, the response includes detailed P2P metrics and
// network peer information. This is gated to avoid heavy work
// unless explicitly requested.
// network peer information.
//
// This is gated to avoid heavy work unless explicitly requested:
// - peers_count is kept fast
// - heavier diagnostics are served from a cached last-known-good snapshot
bool include_p2p_metrics = 1;
}

Expand Down
3 changes: 3 additions & 0 deletions docs/gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ Returns supernode status: system resources (CPU, memory, storage), service info,

- Query `include_p2p_metrics=true` enables detailed P2P metrics and peer info.
- When omitted or false, peer count, peer addresses, and `p2p_metrics` are not included.
- When `include_p2p_metrics=true`, the response is designed to be latency-predictable:
- `network.peers_count` is computed on every request via a fast DHT path.
- Heavier diagnostics (peer list, DB stats, disk usage) are served from a single cached “last-known-good” snapshot and refreshed asynchronously at most once per a short TTL (see `p2p/p2p_stats.go` constants).

Examples:

Expand Down
4 changes: 2 additions & 2 deletions p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ type Client interface {
// Delete a key, value
Delete(ctx context.Context, key string) error

// Stats return status of p2p
Stats(ctx context.Context) (map[string]interface{}, error)
// Stats returns a typed snapshot of P2P state for internal/monitoring use.
Stats(ctx context.Context) (*StatsSnapshot, error)

// NClosestNodes return n closest supernodes to a given key string (NB full node string formatting)
NClosestNodes(ctx context.Context, n int, key string, ignores ...string) []string
Expand Down
55 changes: 38 additions & 17 deletions p2p/kademlia/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,11 @@ func NewDHT(ctx context.Context, store Store, metaStore MetaStore, options *Opti
return s, nil
}

func (s *DHT) NodesLen() int {
return len(s.ht.nodes())
func (s *DHT) PeersCount() int {
if s == nil || s.ht == nil {
return 0
}
return s.ht.peersCount()
}

func (s *DHT) getSupernodeAddress(ctx context.Context) (string, error) {
Expand Down Expand Up @@ -456,26 +459,44 @@ func (s *DHT) Delete(ctx context.Context, key string) error {
return nil
}

// Stats returns stats of DHT
func (s *DHT) Stats(ctx context.Context) (map[string]interface{}, error) {
if s.store == nil {
return nil, fmt.Errorf("store is nil")
func (s *DHT) PeersSnapshot() []*Node {
if s == nil || s.ht == nil {
return nil
}
in := s.ht.nodes()
if len(in) == 0 {
return nil
}

dbStats, err := s.store.Stats(ctx)
if err != nil {
return nil, err
out := make([]*Node, 0, len(in))
for _, n := range in {
if n == nil {
continue
}
cp := *n
if n.ID != nil {
cp.ID = append([]byte(nil), n.ID...)
}
if n.HashedID != nil {
cp.HashedID = append([]byte(nil), n.HashedID...)
}
out = append(out, &cp)
}
return out
}

dhtStats := map[string]any{}
dhtStats["self"] = s.ht.self
dhtStats["peers_count"] = len(s.ht.nodes())
dhtStats["peers"] = s.ht.nodes()
dhtStats["network"] = s.network.HandleMetricsSnapshot()
// Removed: recent per-request snapshots (logs provide visibility)
dhtStats["database"] = dbStats
func (s *DHT) NetworkHandleMetricsSnapshot() map[string]HandleCounters {
if s == nil || s.network == nil {
return nil
}
return s.network.HandleMetricsSnapshot()
}

return dhtStats, nil
func (s *DHT) DatabaseStats(ctx context.Context) (DatabaseStats, error) {
if s == nil || s.store == nil {
return DatabaseStats{}, fmt.Errorf("store is nil")
}
return s.store.Stats(ctx)
}

// newMessage creates a new message
Expand Down
10 changes: 10 additions & 0 deletions p2p/kademlia/hashtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,16 @@ func (ht *HashTable) nodes() []*Node {
return out
}

func (ht *HashTable) peersCount() int {
ht.mutex.RLock()
defer ht.mutex.RUnlock()
total := 0
for _, b := range ht.routeTable {
total += len(b)
}
return total
}

// newRandomID: match B=256 (32 bytes)
func newRandomID() ([]byte, error) {
id := make([]byte, B/8)
Expand Down
7 changes: 6 additions & 1 deletion p2p/kademlia/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
"github.com/LumeraProtocol/supernode/v2/p2p/kademlia/domain"
)

type DatabaseStats struct {
P2PDbSizeMb float64
P2PDbRecordsCount int64
}

// Store is the interface for implementing the storage mechanism for the DHT
type Store interface {
// Store a key/value pair for the queries node with the replication
Expand All @@ -22,7 +27,7 @@ type Store interface {
GetKeysForReplication(ctx context.Context, from time.Time, to time.Time) domain.KeysWithTimestamp

// Stats returns stats of store
Stats(ctx context.Context) (map[string]interface{}, error)
Stats(ctx context.Context) (DatabaseStats, error)

// Close the store
Close(ctx context.Context)
Expand Down
6 changes: 3 additions & 3 deletions p2p/kademlia/store/mem/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/LumeraProtocol/supernode/v2/p2p/kademlia"
"github.com/LumeraProtocol/supernode/v2/p2p/kademlia/domain"
)

Expand Down Expand Up @@ -68,9 +69,8 @@ func (s *Store) Close(_ context.Context) {
}

// Stats returns stats of store
func (s *Store) Stats(_ context.Context) (map[string]interface{}, error) {
stats := map[string]interface{}{}
return stats, nil
func (s *Store) Stats(_ context.Context) (kademlia.DatabaseStats, error) {
return kademlia.DatabaseStats{}, nil
}

// Count the records in store
Expand Down
9 changes: 5 additions & 4 deletions p2p/kademlia/store/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"
"time"

"github.com/LumeraProtocol/supernode/v2/p2p/kademlia"
"github.com/LumeraProtocol/supernode/v2/p2p/kademlia/store/cloud"
"github.com/LumeraProtocol/supernode/v2/pkg/logtrace"
"github.com/LumeraProtocol/supernode/v2/pkg/utils"
Expand Down Expand Up @@ -693,17 +694,17 @@ func sqliteOnDiskSizeBytes(ctx context.Context, dbPath string) (bytes uint64, ok
}

// Stats returns stats of store
func (s *Store) Stats(ctx context.Context) (map[string]interface{}, error) {
stats := map[string]interface{}{}
func (s *Store) Stats(ctx context.Context) (kademlia.DatabaseStats, error) {
stats := kademlia.DatabaseStats{}

if bytes, ok := sqliteOnDiskSizeBytes(ctx, s.dbFilePath); !ok {
logtrace.Error(ctx, "failed to get p2p db size", logtrace.Fields{logtrace.FieldError: "p2p db file not found"})
} else {
stats["p2p_db_size"] = utils.BytesToMB(bytes)
stats.P2PDbSizeMb = utils.BytesToMB(bytes)
}

if count, err := s.Count(ctx); err == nil {
stats["p2p_db_records_count"] = count
stats.P2PDbRecordsCount = int64(count)
} else {
logtrace.Error(ctx, "failed to get p2p records count", logtrace.Fields{logtrace.FieldError: err.Error()})
}
Expand Down
9 changes: 5 additions & 4 deletions p2p/mocks/Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 13 additions & 45 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"
"fmt"
"sync"
"time"

"github.com/LumeraProtocol/supernode/v2/p2p/kademlia"
Expand Down Expand Up @@ -48,6 +49,8 @@ type p2p struct {
lumeraClient lumera.Client
keyring keyring.Keyring // Add the keyring field
rqstore rqstore.Store
stats *p2pStatsManager
statsOnce sync.Once
}

// Run the kademlia network
Expand Down Expand Up @@ -166,56 +169,20 @@ func (s *p2p) Delete(ctx context.Context, key string) error {
return s.dht.Delete(ctx, key)
}

// Stats return status of p2p
func (s *p2p) Stats(ctx context.Context) (map[string]interface{}, error) {
// Stats returns a typed snapshot of the P2P subsystem state.
func (s *p2p) Stats(ctx context.Context) (*StatsSnapshot, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

retStats := map[string]interface{}{}
dhtStats := map[string]any{}

if s.dht != nil {
if stats, err := s.dht.Stats(ctx); err != nil {
logtrace.Error(ctx, "failed to get dht stats", logtrace.Fields{logtrace.FieldModule: "p2p", logtrace.FieldError: err.Error()})
} else {
dhtStats = stats
}

retStats["ban-list"] = s.dht.BanListSnapshot()
retStats["conn-pool"] = s.dht.ConnPoolSnapshot()

// Expose DHT rolling metrics snapshot both under the top-level key (as expected by
// the status service) and also within the DHT map for backward compatibility.
snapshot := s.dht.MetricsSnapshot()
retStats["dht_metrics"] = snapshot
dhtStats["dht_metrics"] = snapshot
} else {
retStats["ban-list"] = []kademlia.BanSnapshot{}
retStats["conn-pool"] = map[string]int64{}
}

if s.store != nil {
if dbStats, err := s.store.Stats(ctx); err != nil {
logtrace.Error(ctx, "failed to get store stats", logtrace.Fields{logtrace.FieldModule: "p2p", logtrace.FieldError: err.Error()})
} else if dbStats != nil {
dhtStats["database"] = dbStats
s.statsOnce.Do(func() {
if s.stats == nil {
s.stats = newP2PStatsManager()
}
})
if s.stats == nil {
return nil, errors.New("p2p stats manager is nil")
}

retStats["dht"] = dhtStats
retStats["config"] = s.config

// get free space of current kademlia folder
if s.config != nil {
if diskUse, err := utils.DiskUsage(s.config.DataDir); err != nil {
logtrace.Error(ctx, "get disk info failed", logtrace.Fields{logtrace.FieldModule: "p2p", logtrace.FieldError: err.Error()})
} else {
retStats["disk-info"] = &diskUse
}
}

return retStats, nil
return s.stats.Stats(ctx, s)
}

// NClosestNodes returns a list of n closest masternode to a given string
Expand Down Expand Up @@ -301,6 +268,7 @@ func New(ctx context.Context, config *Config, lumeraClient lumera.Client, kr key
lumeraClient: lumeraClient,
keyring: kr, // Store the keyring
rqstore: rqstore,
stats: newP2PStatsManager(),
}, nil
}

Expand Down
Loading
Loading