Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func (c *Certificate) UnmarshalEnvironmentValue(data string) error {
type Config struct {
GrpcListenAddress string `env:"GRPC_LISTEN_ADDRESS,default=0.0.0.0:8080"`
GrpcWebListenAddress string `env:"GRPC_WEB_LISTEN_ADDRESS,default=0.0.0.0:8081"`
MetricsListenAddress string `env:"METRICS_LISTEN_ADDRESS,default=0.0.0.0:9091"`
SQLiteDirPath string `env:"SQLITE_DIR_PATH,default=db"`
PgDatabaseUrl string `env:"DATABASE_URL"`
CACert *Certificate `env:"CA_CERT"`
Expand Down
6 changes: 5 additions & 1 deletion fly.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,12 @@ soft_limit = 500
[[services.ports]]
handlers = ["tls", "http"]
port = 442

[services.concurrency]
type = "connections"
hard_limit = 3000
soft_limit = 1500

[metrics]
port = 9091
path = "/metrics"
35 changes: 23 additions & 12 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,32 +1,38 @@
module github.com/breez/data-sync

go 1.22
go 1.23.0

toolchain go1.23.1
toolchain go1.24.5

require (
github.com/Netflix/go-env v0.0.0-20220526054621-78278af1949d
github.com/btcsuite/btcd/btcec/v2 v2.3.3
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0
github.com/golang-migrate/migrate/v4 v4.17.1
github.com/google/uuid v1.6.0
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.1.0
github.com/improbable-eng/grpc-web v0.15.0
github.com/jackc/pgx/v5 v5.5.5
github.com/mattn/go-sqlite3 v1.14.22
github.com/stretchr/testify v1.9.0
github.com/prometheus/client_golang v1.23.2
github.com/rs/cors v1.11.1
github.com/stretchr/testify v1.11.1
github.com/tv42/zbase32 v0.0.0-20220222190657-f76a9fc892fa
google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.34.1
google.golang.org/protobuf v1.36.8
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f // indirect
github.com/gobuffalo/here v0.6.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/improbable-eng/grpc-web v0.15.0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.14.3 // indirect
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa // indirect
Expand All @@ -37,18 +43,23 @@ require (
github.com/jackc/pgtype v1.14.0 // indirect
github.com/jackc/pgx/v4 v4.18.2 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/klauspost/compress v1.15.11 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/markbates/pkger v0.17.1 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.66.1 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
github.com/rogpeppe/go-internal v1.13.1 // indirect
github.com/rs/cors v1.11.1 // indirect
go.uber.org/atomic v1.7.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
golang.org/x/crypto v0.41.0 // indirect
golang.org/x/net v0.43.0 // indirect
golang.org/x/sync v0.16.0 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/text v0.28.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
nhooyr.io/websocket v1.8.6 // indirect
Expand Down
96 changes: 70 additions & 26 deletions go.sum

Large diffs are not rendered by default.

26 changes: 26 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import (

"github.com/breez/data-sync/config"
"github.com/breez/data-sync/proto"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/improbable-eng/grpc-web/go/grpcweb"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/cors"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
Expand All @@ -32,12 +35,24 @@ func main() {
syncServer.Start(quitChan)
s := CreateServer(config, grpcListener, syncServer)
RunWebProxy(s, config)
RunMetricsServer(config)
log.Printf("Server listening at %s", config.GrpcListenAddress)
if err := s.Serve(grpcListener); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}

func RunMetricsServer(config *config.Config) {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
log.Printf("Starting metrics server on %s", config.MetricsListenAddress)
go func() {
if err := http.ListenAndServe(config.MetricsListenAddress, mux); err != nil {
log.Fatalf("Failed to start metrics server: %v", err)
}
}()
}

func RunWebProxy(grpcServer *grpc.Server, config *config.Config) {
wrappedGrpc := grpcweb.WrapServer(grpcServer,
grpcweb.WithOriginFunc(func(origin string) bool {
Expand Down Expand Up @@ -79,6 +94,14 @@ func RunWebProxy(grpcServer *grpc.Server, config *config.Config) {
}

func CreateServer(config *config.Config, listener net.Listener, syncServer proto.SyncerServer) *grpc.Server {
srvMetrics := grpcprom.NewServerMetrics(
grpcprom.WithServerHandlingTimeHistogram(),
)
// Intentionally ignoring error: concurrent tests re-register metrics on the
// default registry, causing MustRegister to panic. Safe to skip in production
// (single registration) and in tests (duplicate is harmless).
_ = prometheus.Register(srvMetrics)

s := grpc.NewServer(
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: time.Second * 5,
Expand All @@ -88,7 +111,10 @@ func CreateServer(config *config.Config, listener net.Listener, syncServer proto
Time: time.Second * 10,
Timeout: time.Second * 5,
}),
grpc.ChainUnaryInterceptor(srvMetrics.UnaryServerInterceptor()),
grpc.ChainStreamInterceptor(srvMetrics.StreamServerInterceptor()),
)
proto.RegisterSyncerServer(s, syncServer)
srvMetrics.InitializeMetrics(s)
return s
}
34 changes: 34 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package metrics

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
ActiveSubscriptions = promauto.NewGauge(prometheus.GaugeOpts{
Name: "sync_active_subscriptions",
Help: "Number of open ListenChanges streams.",
})

DistinctSubscribedUsers = promauto.NewGauge(prometheus.GaugeOpts{
Name: "sync_distinct_subscribed_users",
Help: "Number of unique users with at least one open stream.",
})

NotificationsSent = promauto.NewCounter(prometheus.CounterOpts{
Name: "sync_notifications_sent_total",
Help: "Total notifications successfully delivered to subscriber channels.",
})

NotificationsDropped = promauto.NewCounter(prometheus.CounterOpts{
Name: "sync_notifications_dropped_total",
Help: "Total notifications dropped due to full subscriber buffer.",
})

MsgChanDelay = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "sync_msgchan_delay_seconds",
Help: "Time messages wait in the eventsManager msgChan before processing.",
Buckets: []float64{0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0},
})
)
133 changes: 133 additions & 0 deletions metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package main

import (
"context"
"fmt"
"os"
"testing"
"time"

"github.com/breez/data-sync/metrics"
"github.com/breez/data-sync/middleware"
"github.com/breez/data-sync/proto"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
)

func TestMetrics(t *testing.T) {
cfg, privateKey := testParameters(t)
privateKey2, err := btcec.NewPrivateKey()
require.NoError(t, err)

client, closer := server(context.Background(), cfg)
defer closer()
defer os.RemoveAll(cfg.SQLiteDirPath)

// Baseline
requireMetricValue(t, metrics.ActiveSubscriptions, 0)
requireMetricValue(t, metrics.DistinctSubscribedUsers, 0)
requireMetricValue(t, metrics.NotificationsSent, 0)

// --- Single subscriber ---
ctx1, cancel1 := context.WithCancel(context.Background())
stream1 := openListenChangesCtx(t, ctx1, privateKey, client)
_, err = stream1.Recv() // initial empty notification
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)

requireMetricValue(t, metrics.ActiveSubscriptions, 1)
requireMetricValue(t, metrics.DistinctSubscribedUsers, 1)

// --- Second subscriber, same user ---
ctx2, cancel2 := context.WithCancel(context.Background())
stream2 := openListenChangesCtx(t, ctx2, privateKey, client)
_, err = stream2.Recv()
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)

requireMetricValue(t, metrics.ActiveSubscriptions, 2)
requireMetricValue(t, metrics.DistinctSubscribedUsers, 1) // still 1 distinct user

// --- Third subscriber, different user ---
ctx3, cancel3 := context.WithCancel(context.Background())
stream3 := openListenChangesCtx(t, ctx3, privateKey2, client)
_, err = stream3.Recv()
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)

requireMetricValue(t, metrics.ActiveSubscriptions, 3)
requireMetricValue(t, metrics.DistinctSubscribedUsers, 2) // now 2 distinct users

// --- SetRecord for user1 → notification sent to both of user1's streams ---
_, err = client.SetRecord(context.Background(), createSetRecordRequest(t, privateKey, &proto.Record{
Id: "metrics-test-1",
Revision: 0,
Data: []byte("hello"),
}))
require.NoError(t, err)
_, err = stream1.Recv()
require.NoError(t, err)
_, err = stream2.Recv()
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)

requireMetricValue(t, metrics.NotificationsSent, 2) // 2 streams for user1

// --- Unsubscribe one of user1's streams ---
cancel1()
time.Sleep(100 * time.Millisecond)

requireMetricValue(t, metrics.ActiveSubscriptions, 2)
requireMetricValue(t, metrics.DistinctSubscribedUsers, 2) // user1 still has stream2

// --- Unsubscribe user1's last stream ---
cancel2()
time.Sleep(100 * time.Millisecond)

requireMetricValue(t, metrics.ActiveSubscriptions, 1)
requireMetricValue(t, metrics.DistinctSubscribedUsers, 1) // only user2 remains

// --- Notifications for user2 (still has stream3) ---
_, err = client.SetRecord(context.Background(), createSetRecordRequest(t, privateKey2, &proto.Record{
Id: "metrics-test-2",
Revision: 0,
Data: []byte("user2-data"),
}))
require.NoError(t, err)
_, err = stream3.Recv()
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)

requireMetricValue(t, metrics.NotificationsSent, 3) // 2 earlier + 1 now

// --- Unsubscribe last stream ---
cancel3()
time.Sleep(100 * time.Millisecond)

requireMetricValue(t, metrics.ActiveSubscriptions, 0)
requireMetricValue(t, metrics.DistinctSubscribedUsers, 0)

// --- msgChan delay histogram should have observations ---
delayCount := testutil.CollectAndCount(metrics.MsgChanDelay)
require.Greater(t, delayCount, 0, "msgchan_delay histogram should have observations")
}

func openListenChangesCtx(t *testing.T, ctx context.Context, privateKey *btcec.PrivateKey, client proto.SyncerClient) proto.Syncer_ListenChangesClient {
requestTime := uint32(time.Now().Unix())
toSign := fmt.Sprintf("%v", requestTime)
signature, err := middleware.SignMessage(privateKey, []byte(toSign))
require.NoError(t, err)
stream, err := client.ListenChanges(ctx, &proto.ListenChangesRequest{
RequestTime: requestTime,
Signature: signature,
})
require.NoError(t, err)
return stream
}

func requireMetricValue(t *testing.T, c prometheus.Collector, expected float64) {
t.Helper()
require.Equal(t, expected, testutil.ToFloat64(c))
}
6 changes: 3 additions & 3 deletions store/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func (s *StoreTest) TestAddRecords(t *testing.T, storage SyncStorage) {
records, err := storage.ListChanges(context.Background(), testStoreID, 0)
require.NoError(t, err, "failed to call list changes")
require.Equal(t, records, []StoredRecord{
{Id: "a1", Data: []byte("data1"), Revision: 1},
{Id: "a2", Data: []byte("data2"), Revision: 2},
{Id: "a1", Data: []byte("data1"), Revision: 1, SchemaVersion: "0.0.1"},
{Id: "a2", Data: []byte("data2"), Revision: 2, SchemaVersion: "0.0.1"},
})

// Test different store with same id
Expand All @@ -49,7 +49,7 @@ func (s *StoreTest) TestUpdateRecords(t *testing.T, storage SyncStorage) {
records, err := storage.ListChanges(context.Background(), testStoreID, 0)
require.NoError(t, err, "failed to call list changes")
require.Equal(t, records, []StoredRecord{
{Id: "a1", Data: []byte("data2"), Revision: 2},
{Id: "a1", Data: []byte("data2"), Revision: 2, SchemaVersion: "0.0.1"},
})
}

Expand Down
Loading