Skip to content

Commit b2b2ade

Browse files
committed
Topology metrics
1 parent dc93b57 commit b2b2ade

4 files changed

Lines changed: 501 additions & 18 deletions

File tree

topology/actors.go

Lines changed: 109 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ package topology
44
import (
55
"context"
66
"fmt"
7+
"time"
8+
79
libp2pCrypto "github.com/libp2p/go-libp2p/core/crypto"
810
"github.com/libp2p/go-libp2p/core/peer"
911
"github.com/multiformats/go-multiaddr"
@@ -66,18 +68,19 @@ type pendingPeer struct {
6668

6769
// Actors manages actor information within the topology.
6870
type Actors struct {
69-
peers map[peer.ID]*Actor
70-
pendingPeers map[peer.ID]*pendingPeer
71-
roleIndex map[types.Role]peerSet // Indexed by roles
72-
account share.Account
73-
mutex deadlock.RWMutex
74-
logger logger.Logger
75-
network *networking.Network
76-
rbac *rbac.Manager
77-
consensusSet *share.ActorSet
78-
metrics share.Collector
79-
self *Actor
80-
onPeerEvent func() // Notifier function to signal peer events
71+
peers map[peer.ID]*Actor
72+
pendingPeers map[peer.ID]*pendingPeer
73+
roleIndex map[types.Role]peerSet // Indexed by roles
74+
account share.Account
75+
mutex deadlock.RWMutex
76+
logger logger.Logger
77+
network *networking.Network
78+
rbac *rbac.Manager
79+
consensusSet *share.ActorSet
80+
metrics share.Collector // General metrics collector from metrics package
81+
topologyMetrics *TopologyMetrics // Topology-specific metrics
82+
self *Actor
83+
onPeerEvent func() // Notifier function to signal peer events
8184

8285
// Callback slices
8386
actorAddedCallbacks []ActorAddedCallback
@@ -110,6 +113,21 @@ func NewActors(logger logger.Logger, account share.Account, network *networking.
110113
onPeerEvent: onPeerEvent,
111114
}
112115

116+
// Initialize topology metrics if we have access to a meter via network observability
117+
if network != nil && network.Observability != nil && network.Observability.Meter != nil {
118+
topologyMetrics, err := InitializeMetrics(network.Observability.Meter)
119+
if err != nil {
120+
logger.Error("Failed to initialize topology metrics", zap.Error(err))
121+
} else {
122+
a.topologyMetrics = topologyMetrics
123+
// Record self actor metrics
124+
ctx := context.Background()
125+
for _, role := range account.Roles() {
126+
topologyMetrics.RecordActorAdded(ctx, 1, role.String())
127+
}
128+
}
129+
}
130+
113131
a.logger.Info("Initialized self actor in topology", "peer_id", a.self.ID.String())
114132

115133
// Register event handlers for peer connections and disconnections.
@@ -294,6 +312,24 @@ func (a *Actors) AddPeer(peerID peer.ID, address types.Address, addresses []mult
294312
a.roleIndex[role].add(peerID)
295313
}
296314

315+
// Record metrics for added peer
316+
if a.topologyMetrics != nil {
317+
// Use background context for metrics
318+
ctx := context.Background()
319+
320+
// Record actor addition for each role
321+
for _, role := range roles {
322+
a.topologyMetrics.RecordActorAdded(ctx, 1, role.String())
323+
}
324+
// Record actor connection
325+
a.topologyMetrics.RecordActorConnection(ctx, 1)
326+
327+
// If this is a consensus actor, record that too
328+
if actor.ConsensusActor != nil {
329+
a.topologyMetrics.RecordConsensusActor(ctx, 1)
330+
}
331+
}
332+
297333
// Notify peer availability
298334
if a.onPeerEvent != nil {
299335
a.onPeerEvent()
@@ -346,6 +382,25 @@ func (a *Actors) RemovePeer(ctx context.Context, peerID peer.ID, force bool) err
346382
}
347383
}
348384

385+
// Record metrics for removed peer
386+
if a.topologyMetrics != nil {
387+
// Use the provided context for metrics
388+
// Record actor removal for each role
389+
for _, role := range actor.Roles {
390+
a.topologyMetrics.RecordActorRemoved(ctx, 1, role.String())
391+
}
392+
// Record actor disconnection
393+
a.topologyMetrics.RecordActorDisconnection(ctx, 1)
394+
395+
// If this was a consensus actor, record that too
396+
if actor.ConsensusActor != nil {
397+
a.topologyMetrics.RecordConsensusActor(ctx, -1)
398+
}
399+
400+
// Simplified topology change latency - just record current operation time
401+
a.topologyMetrics.RecordTopologyChangeLatency(ctx, time.Millisecond*10) // Use a nominal value
402+
}
403+
349404
// Notify peer availability
350405
if a.onPeerEvent != nil {
351406
a.onPeerEvent()
@@ -439,6 +494,11 @@ func (a *Actors) markPeerAsPending(peerID peer.ID, addresses []multiaddr.Multiad
439494
addresses: addresses,
440495
}
441496

497+
// Record pending peer metrics
498+
if a.topologyMetrics != nil {
499+
a.topologyMetrics.RecordPendingPeer(context.Background(), 1)
500+
}
501+
442502
a.logger.Info("Marked peer as pending", "peer_id", peerID.String())
443503
}
444504

@@ -448,15 +508,31 @@ func (a *Actors) removePendingPeer(peerID peer.ID) error {
448508
defer a.mutex.Unlock()
449509

450510
if a.pendingPeers != nil {
511+
// Check if peer was actually pending before removal
512+
_, wasPending := a.pendingPeers[peerID]
451513
delete(a.pendingPeers, peerID)
452-
a.logger.Info("Removed peer from pendingPeers", "peer_id", peerID.String())
514+
515+
// If the peer was pending and is now being removed, it's likely a timeout
516+
if wasPending {
517+
a.logger.Info("Removed peer from pendingPeers", "peer_id", peerID.String())
518+
519+
if a.topologyMetrics != nil {
520+
a.topologyMetrics.RecordPendingPeerTimeout(context.Background(), 1)
521+
}
522+
}
453523
}
454524

455525
return nil
456526
}
457527

458528
// verifyAndAddPeer moves a peer from pendingPeers to peers after verification.
459529
func (a *Actors) verifyAndAddPeer(peerID peer.ID, actorInfo *packets.ActorPacket) error {
530+
startTime := time.Now() // Start timing the verification process
531+
532+
// Record verification attempt
533+
if a.topologyMetrics != nil {
534+
a.topologyMetrics.RecordActorVerification(context.Background(), 1)
535+
}
460536
a.mutex.Lock()
461537
defer a.mutex.Unlock()
462538

@@ -569,6 +645,26 @@ func (a *Actors) verifyAndAddPeer(peerID peer.ID, actorInfo *packets.ActorPacket
569645

570646
a.logger.Info("Peer verified and added to topology", "peer_id", peerID.String())
571647

648+
// Record successful verification metrics
649+
if a.topologyMetrics != nil {
650+
ctx := context.Background()
651+
// Record verification latency
652+
a.topologyMetrics.RecordActorVerificationLatency(ctx, time.Since(startTime))
653+
654+
// Record actor addition for each role
655+
for _, role := range actorInfo.Roles {
656+
a.topologyMetrics.RecordActorAdded(ctx, 1, role.String())
657+
}
658+
659+
// Record connection
660+
a.topologyMetrics.RecordActorConnection(ctx, 1)
661+
662+
// If this is a consensus actor, record that too
663+
if actor.ConsensusActor != nil {
664+
a.topologyMetrics.RecordConsensusActor(ctx, 1)
665+
}
666+
}
667+
572668
// Invoke ActorAddedCallbacks outside the lock
573669
a.mutex.Unlock()
574670
a.invokeActorAddedCallbacks(actor)

0 commit comments

Comments
 (0)