Skip to content

Commit eec5ec2

Browse files
committed
Improve eventsManager observability and resilience
- Instrument subscriptions, notifications, and msgChan delay with Prometheus metrics - Use non-blocking sends to avoid stalling on full subscriber buffers - Add periodic stats logging (1-minute ticker) - Improve log messages with subscription counts - Add metrics integration tests
1 parent db7d2e5 commit eec5ec2

2 files changed

Lines changed: 179 additions & 12 deletions

File tree

metrics_test.go

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"testing"
8+
"time"
9+
10+
"github.com/breez/data-sync/metrics"
11+
"github.com/breez/data-sync/middleware"
12+
"github.com/breez/data-sync/proto"
13+
"github.com/btcsuite/btcd/btcec/v2"
14+
"github.com/prometheus/client_golang/prometheus"
15+
"github.com/prometheus/client_golang/prometheus/testutil"
16+
"github.com/stretchr/testify/require"
17+
)
18+
19+
func TestMetrics(t *testing.T) {
20+
cfg, privateKey := testParameters(t)
21+
privateKey2, err := btcec.NewPrivateKey()
22+
require.NoError(t, err)
23+
24+
client, closer := server(context.Background(), cfg)
25+
defer closer()
26+
defer os.RemoveAll(cfg.SQLiteDirPath)
27+
28+
// Baseline
29+
requireMetricValue(t, metrics.ActiveSubscriptions, 0)
30+
requireMetricValue(t, metrics.DistinctSubscribedUsers, 0)
31+
requireMetricValue(t, metrics.NotificationsSent, 0)
32+
33+
// --- Single subscriber ---
34+
ctx1, cancel1 := context.WithCancel(context.Background())
35+
stream1 := openListenChangesCtx(t, ctx1, privateKey, client)
36+
_, err = stream1.Recv() // initial empty notification
37+
require.NoError(t, err)
38+
time.Sleep(100 * time.Millisecond)
39+
40+
requireMetricValue(t, metrics.ActiveSubscriptions, 1)
41+
requireMetricValue(t, metrics.DistinctSubscribedUsers, 1)
42+
43+
// --- Second subscriber, same user ---
44+
ctx2, cancel2 := context.WithCancel(context.Background())
45+
stream2 := openListenChangesCtx(t, ctx2, privateKey, client)
46+
_, err = stream2.Recv()
47+
require.NoError(t, err)
48+
time.Sleep(100 * time.Millisecond)
49+
50+
requireMetricValue(t, metrics.ActiveSubscriptions, 2)
51+
requireMetricValue(t, metrics.DistinctSubscribedUsers, 1) // still 1 distinct user
52+
53+
// --- Third subscriber, different user ---
54+
ctx3, cancel3 := context.WithCancel(context.Background())
55+
stream3 := openListenChangesCtx(t, ctx3, privateKey2, client)
56+
_, err = stream3.Recv()
57+
require.NoError(t, err)
58+
time.Sleep(100 * time.Millisecond)
59+
60+
requireMetricValue(t, metrics.ActiveSubscriptions, 3)
61+
requireMetricValue(t, metrics.DistinctSubscribedUsers, 2) // now 2 distinct users
62+
63+
// --- SetRecord for user1 → notification sent to both of user1's streams ---
64+
_, err = client.SetRecord(context.Background(), createSetRecordRequest(t, privateKey, &proto.Record{
65+
Id: "metrics-test-1",
66+
Revision: 0,
67+
Data: []byte("hello"),
68+
}))
69+
require.NoError(t, err)
70+
_, err = stream1.Recv()
71+
require.NoError(t, err)
72+
_, err = stream2.Recv()
73+
require.NoError(t, err)
74+
time.Sleep(100 * time.Millisecond)
75+
76+
requireMetricValue(t, metrics.NotificationsSent, 2) // 2 streams for user1
77+
78+
// --- Unsubscribe one of user1's streams ---
79+
cancel1()
80+
time.Sleep(100 * time.Millisecond)
81+
82+
requireMetricValue(t, metrics.ActiveSubscriptions, 2)
83+
requireMetricValue(t, metrics.DistinctSubscribedUsers, 2) // user1 still has stream2
84+
85+
// --- Unsubscribe user1's last stream ---
86+
cancel2()
87+
time.Sleep(100 * time.Millisecond)
88+
89+
requireMetricValue(t, metrics.ActiveSubscriptions, 1)
90+
requireMetricValue(t, metrics.DistinctSubscribedUsers, 1) // only user2 remains
91+
92+
// --- Notifications for user2 (still has stream3) ---
93+
_, err = client.SetRecord(context.Background(), createSetRecordRequest(t, privateKey2, &proto.Record{
94+
Id: "metrics-test-2",
95+
Revision: 0,
96+
Data: []byte("user2-data"),
97+
}))
98+
require.NoError(t, err)
99+
_, err = stream3.Recv()
100+
require.NoError(t, err)
101+
time.Sleep(100 * time.Millisecond)
102+
103+
requireMetricValue(t, metrics.NotificationsSent, 3) // 2 earlier + 1 now
104+
105+
// --- Unsubscribe last stream ---
106+
cancel3()
107+
time.Sleep(100 * time.Millisecond)
108+
109+
requireMetricValue(t, metrics.ActiveSubscriptions, 0)
110+
requireMetricValue(t, metrics.DistinctSubscribedUsers, 0)
111+
112+
// --- msgChan delay histogram should have observations ---
113+
delayCount := testutil.CollectAndCount(metrics.MsgChanDelay)
114+
require.Greater(t, delayCount, 0, "msgchan_delay histogram should have observations")
115+
}
116+
117+
func openListenChangesCtx(t *testing.T, ctx context.Context, privateKey *btcec.PrivateKey, client proto.SyncerClient) proto.Syncer_ListenChangesClient {
118+
requestTime := uint32(time.Now().Unix())
119+
toSign := fmt.Sprintf("%v", requestTime)
120+
signature, err := middleware.SignMessage(privateKey, []byte(toSign))
121+
require.NoError(t, err)
122+
stream, err := client.ListenChanges(ctx, &proto.ListenChangesRequest{
123+
RequestTime: requestTime,
124+
Signature: signature,
125+
})
126+
require.NoError(t, err)
127+
return stream
128+
}
129+
130+
func requireMetricValue(t *testing.T, c prometheus.Collector, expected float64) {
131+
t.Helper()
132+
require.Equal(t, expected, testutil.ToFloat64(c))
133+
}

syncer_server.go

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@ import (
99
"time"
1010

1111
"github.com/breez/data-sync/config"
12-
"github.com/google/uuid"
12+
"github.com/breez/data-sync/metrics"
1313
"github.com/breez/data-sync/middleware"
1414
"github.com/breez/data-sync/proto"
1515
"github.com/breez/data-sync/store"
1616
"github.com/breez/data-sync/store/postgres"
1717
"github.com/breez/data-sync/store/sqlite"
18+
"github.com/google/uuid"
1819
"google.golang.org/grpc/codes"
1920
"google.golang.org/grpc/status"
2021
)
@@ -276,34 +277,47 @@ type subscription struct {
276277
eventsChan chan *proto.Notification
277278
}
278279

280+
type timestampedMsg struct {
281+
enqueuedAt time.Time
282+
payload any
283+
}
284+
279285
type eventsManager struct {
280286
sync.Mutex
281287
globalIDs int64
282288
streams map[string][]*subscription
283-
msgChan chan interface{}
289+
msgChan chan timestampedMsg
284290
}
285291

286292
func newEventsManager() *eventsManager {
287293
return &eventsManager{
288294
globalIDs: 0,
289295
streams: make(map[string][]*subscription),
290-
msgChan: make(chan interface{}),
296+
msgChan: make(chan timestampedMsg),
291297
}
292298

293299
}
294300

295301
func (c *eventsManager) start(quitChan chan struct{}) {
296302
go func() {
303+
statsTicker := time.NewTicker(1 * time.Minute)
304+
defer statsTicker.Stop()
297305
for {
298-
log.Printf("eventsManager select started\n")
299306
select {
300-
case msg := <-c.msgChan:
307+
case tmsg := <-c.msgChan:
308+
metrics.MsgChanDelay.Observe(time.Since(tmsg.enqueuedAt).Seconds())
309+
msg := tmsg.payload
310+
301311
if s, ok := msg.(*subscription); ok {
312+
wasAbsent := len(c.streams[s.pubkey]) == 0
302313
c.streams[s.pubkey] = append(c.streams[s.pubkey], s)
303-
log.Printf("eventsManager: new subscription for user %s: id - %d\n", s.pubkey, s.id)
314+
metrics.ActiveSubscriptions.Inc()
315+
if wasAbsent {
316+
metrics.DistinctSubscribedUsers.Inc()
317+
}
318+
log.Printf("eventsManager: new subscription for user %s: id - %d (user now has %d subscriptions)\n", s.pubkey, s.id, len(c.streams[s.pubkey]))
304319
}
305320
if s, ok := msg.(*unsubscribe); ok {
306-
log.Printf("eventsManager: unsubscribing user %s: id - %d\n", s.pubkey, s.id)
307321
var newSubs []*subscription
308322
for _, sub := range c.streams[s.pubkey] {
309323
if sub.id != s.id {
@@ -316,24 +330,44 @@ func (c *eventsManager) start(quitChan chan struct{}) {
316330
if len(newSubs) > 0 {
317331
c.streams[s.pubkey] = newSubs
318332
}
333+
metrics.ActiveSubscriptions.Dec()
334+
if len(newSubs) == 0 {
335+
metrics.DistinctSubscribedUsers.Dec()
336+
}
337+
log.Printf("eventsManager: unsubscribed user %s: id - %d (user now has %d subscriptions)\n", s.pubkey, s.id, len(newSubs))
319338
}
320339
if s, ok := msg.(*notifyChange); ok {
321340
log.Printf("eventsManager: notifying change for user %v\n", s.pubkey)
322341
for _, sub := range c.streams[s.pubkey] {
323-
sub.eventsChan <- &proto.Notification{ClientId: s.clientId}
342+
select {
343+
case sub.eventsChan <- &proto.Notification{ClientId: s.clientId}:
344+
metrics.NotificationsSent.Inc()
345+
default:
346+
metrics.NotificationsDropped.Inc()
347+
log.Printf("eventsManager: notification buffer full for user %s: id - %d, dropping notification\n", s.pubkey, sub.id)
348+
}
349+
}
350+
}
351+
case <-statsTicker.C:
352+
totalSubs := 0
353+
multiSubUsers := 0
354+
for _, subs := range c.streams {
355+
totalSubs += len(subs)
356+
if len(subs) > 1 {
357+
multiSubUsers++
324358
}
325359
}
360+
log.Printf("eventsManager stats: total_subscriptions=%d, distinct_users=%d, users_with_multiple_subs=%d\n", totalSubs, len(c.streams), multiSubUsers)
326361
case <-quitChan:
327362
log.Printf("eventsManager: quitChan received\n")
328363
return
329364
}
330-
log.Printf("eventsManager select finished. number of subscriptions = %v\n", len(c.streams))
331365
}
332366
}()
333367
}
334368

335369
func (c *eventsManager) notifyChange(pubkey string, clientId *string) {
336-
c.msgChan <- &notifyChange{pubkey: pubkey, clientId: clientId}
370+
c.msgChan <- timestampedMsg{enqueuedAt: time.Now(), payload: &notifyChange{pubkey: pubkey, clientId: clientId}}
337371
}
338372

339373
func (c *eventsManager) subscribe(pubkey string) *subscription {
@@ -343,12 +377,12 @@ func (c *eventsManager) subscribe(pubkey string) *subscription {
343377
s := &subscription{pubkey: pubkey, eventsChan: eventsChan, id: c.globalIDs}
344378
c.Unlock()
345379

346-
c.msgChan <- s
380+
c.msgChan <- timestampedMsg{enqueuedAt: time.Now(), payload: s}
347381
log.Printf("New connection for user %s: id - %d\n", pubkey, s.id)
348382
return s
349383
}
350384

351385
func (c *eventsManager) unsubscribe(pubkey string, id int64) {
352-
c.msgChan <- &unsubscribe{pubkey: pubkey, id: id}
386+
c.msgChan <- timestampedMsg{enqueuedAt: time.Now(), payload: &unsubscribe{pubkey: pubkey, id: id}}
353387
log.Printf("Removing connection for user %s - id %d\n", pubkey, id)
354388
}

0 commit comments

Comments
 (0)