Skip to content

Commit d934c07

Browse files
committed
Add pool HTTP API, wire node-status endpoint, and fix metrics test
- Add owm-pool Rust crate (config, DB migrations, share/balance/payout schema) - Add internal/http pool_api with GET /internal/pool/nodes/{pubkey}/status handler and tests - Wire pool node-status route into coordinator HTTP mux (main.go) - Fix metrics_test: initialize sentinel label values so all 8 Vec metric families appear in Gather output
1 parent 55c7539 commit d934c07

19 files changed

Lines changed: 3487 additions & 0 deletions

owm-coordinator/cmd/coordinator/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030

3131
"github.com/owmnetwork/owm-coordinator/internal/config"
3232
"github.com/owmnetwork/owm-coordinator/internal/fl"
33+
poolhttp "github.com/owmnetwork/owm-coordinator/internal/http"
3334
_ "github.com/owmnetwork/owm-coordinator/internal/metrics"
3435
"github.com/owmnetwork/owm-coordinator/internal/lightning"
3536
"github.com/owmnetwork/owm-coordinator/internal/lightning/mock"
@@ -283,6 +284,7 @@ func run() error {
283284
}
284285
mux := http.NewServeMux()
285286
mux.Handle("/metrics", promhttp.Handler())
287+
mux.HandleFunc("GET /internal/pool/nodes/{pubkey}/status", poolhttp.HandleGetNodeStatus(db, log))
286288
httpSrv := &http.Server{Addr: httpAddr, Handler: mux}
287289
go func() {
288290
if err := httpSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package poolhttp
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"net/http"
8+
9+
"github.com/jackc/pgx/v5"
10+
"github.com/jackc/pgx/v5/pgxpool"
11+
"go.uber.org/zap"
12+
)
13+
14+
// dbQuerier is the minimal DB interface used internally by the handler.
15+
// *pgxpool.Pool satisfies this interface in production; a hand-rolled mock
16+
// satisfies it in tests without leaking any exported surface.
17+
type dbQuerier interface {
18+
QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
19+
}
20+
21+
type nodeStatusRow struct {
22+
NodeID string
23+
Tier string
24+
Status string
25+
}
26+
27+
type nodeStatusResponse struct {
28+
NodeID string `json:"node_id"`
29+
Tier string `json:"tier"`
30+
Status string `json:"status"`
31+
CoordinatorVerified bool `json:"coordinator_verified"`
32+
}
33+
34+
// handleGetNodeStatus is the unexported implementation that accepts the narrow
35+
// dbQuerier interface, keeping the logic testable without exposing the
36+
// interface publicly.
37+
func handleGetNodeStatus(db dbQuerier, log *zap.Logger) http.HandlerFunc {
38+
return func(w http.ResponseWriter, r *http.Request) {
39+
pubkey := r.PathValue("pubkey")
40+
41+
var row nodeStatusRow
42+
err := db.QueryRow(
43+
r.Context(),
44+
"SELECT node_id, tier, status FROM nodes WHERE public_key = $1",
45+
pubkey,
46+
).Scan(&row.NodeID, &row.Tier, &row.Status)
47+
48+
w.Header().Set("Content-Type", "application/json")
49+
50+
if errors.Is(err, pgx.ErrNoRows) {
51+
w.WriteHeader(http.StatusNotFound)
52+
json.NewEncoder(w).Encode(map[string]string{"error": "node not found"}) //nolint:errcheck
53+
return
54+
}
55+
if err != nil {
56+
log.Error("query node status", zap.Error(err))
57+
w.WriteHeader(http.StatusInternalServerError)
58+
json.NewEncoder(w).Encode(map[string]string{"error": "internal error"}) //nolint:errcheck
59+
return
60+
}
61+
62+
w.WriteHeader(http.StatusOK)
63+
json.NewEncoder(w).Encode(nodeStatusResponse{ //nolint:errcheck
64+
NodeID: row.NodeID,
65+
Tier: row.Tier,
66+
Status: row.Status,
67+
CoordinatorVerified: true,
68+
})
69+
}
70+
}
71+
72+
// HandleGetNodeStatus returns an http.HandlerFunc that looks up a node by its
73+
// public key and reports its current status.
74+
func HandleGetNodeStatus(db *pgxpool.Pool, log *zap.Logger) http.HandlerFunc {
75+
return handleGetNodeStatus(db, log)
76+
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package poolhttp
2+
3+
import (
4+
"context"
5+
"errors"
6+
"net/http"
7+
"net/http/httptest"
8+
"strings"
9+
"testing"
10+
11+
"github.com/jackc/pgx/v5"
12+
"go.uber.org/zap"
13+
)
14+
15+
// mockRow implements pgx.Row for testing.
16+
type mockRow struct {
17+
nodeID string
18+
tier string
19+
status string
20+
err error
21+
}
22+
23+
func (m *mockRow) Scan(dest ...any) error {
24+
if m.err != nil {
25+
return m.err
26+
}
27+
*dest[0].(*string) = m.nodeID
28+
*dest[1].(*string) = m.tier
29+
*dest[2].(*string) = m.status
30+
return nil
31+
}
32+
33+
// mockDB implements dbQuerier for testing.
34+
type mockDB struct {
35+
nodeID string
36+
tier string
37+
status string
38+
err error
39+
}
40+
41+
func (m *mockDB) QueryRow(_ context.Context, _ string, _ ...any) pgx.Row {
42+
return &mockRow{
43+
nodeID: m.nodeID,
44+
tier: m.tier,
45+
status: m.status,
46+
err: m.err,
47+
}
48+
}
49+
50+
func serveWithMux(handler http.HandlerFunc, req *http.Request) *httptest.ResponseRecorder {
51+
rr := httptest.NewRecorder()
52+
mux := http.NewServeMux()
53+
mux.HandleFunc("GET /internal/pool/nodes/{pubkey}/status", handler)
54+
mux.ServeHTTP(rr, req)
55+
return rr
56+
}
57+
58+
func TestHandleGetNodeStatus_Active(t *testing.T) {
59+
db := &mockDB{nodeID: "uuid-1", tier: "t1", status: "active"}
60+
handler := handleGetNodeStatus(db, zap.NewNop())
61+
62+
req := httptest.NewRequest(http.MethodGet, "/internal/pool/nodes/somepubkey/status", nil)
63+
rr := serveWithMux(handler, req)
64+
65+
if rr.Code != http.StatusOK {
66+
t.Fatalf("expected 200, got %d", rr.Code)
67+
}
68+
body := rr.Body.String()
69+
if !strings.Contains(body, `"status":"active"`) {
70+
t.Errorf("body missing status:active, got: %s", body)
71+
}
72+
if !strings.Contains(body, `"coordinator_verified":true`) {
73+
t.Errorf("body missing coordinator_verified:true, got: %s", body)
74+
}
75+
}
76+
77+
func TestHandleGetNodeStatus_Pending(t *testing.T) {
78+
db := &mockDB{nodeID: "uuid-2", tier: "t1", status: "pending"}
79+
handler := handleGetNodeStatus(db, zap.NewNop())
80+
81+
req := httptest.NewRequest(http.MethodGet, "/internal/pool/nodes/somepubkey/status", nil)
82+
rr := serveWithMux(handler, req)
83+
84+
if rr.Code != http.StatusOK {
85+
t.Fatalf("expected 200, got %d", rr.Code)
86+
}
87+
if !strings.Contains(rr.Body.String(), `"status":"pending"`) {
88+
t.Errorf("body missing status:pending, got: %s", rr.Body.String())
89+
}
90+
}
91+
92+
func TestHandleGetNodeStatus_NotFound(t *testing.T) {
93+
db := &mockDB{err: pgx.ErrNoRows}
94+
handler := handleGetNodeStatus(db, zap.NewNop())
95+
96+
req := httptest.NewRequest(http.MethodGet, "/internal/pool/nodes/somepubkey/status", nil)
97+
rr := serveWithMux(handler, req)
98+
99+
if rr.Code != http.StatusNotFound {
100+
t.Fatalf("expected 404, got %d", rr.Code)
101+
}
102+
if !strings.Contains(rr.Body.String(), `"error":"node not found"`) {
103+
t.Errorf("body missing error message, got: %s", rr.Body.String())
104+
}
105+
}
106+
107+
func TestHandleGetNodeStatus_DBError(t *testing.T) {
108+
db := &mockDB{err: errors.New("connection reset by peer")}
109+
handler := handleGetNodeStatus(db, zap.NewNop())
110+
111+
req := httptest.NewRequest(http.MethodGet, "/internal/pool/nodes/somepubkey/status", nil)
112+
rr := serveWithMux(handler, req)
113+
114+
if rr.Code != http.StatusInternalServerError {
115+
t.Fatalf("expected 500, got %d", rr.Code)
116+
}
117+
if !strings.Contains(rr.Body.String(), `"error":"internal error"`) {
118+
t.Errorf("body missing internal error message, got: %s", rr.Body.String())
119+
}
120+
}

owm-coordinator/internal/metrics/metrics_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,16 @@ import (
1111
)
1212

1313
func TestMetrics_AllEightPresent(t *testing.T) {
14+
// Vec metrics only appear in /metrics output once at least one label
15+
// combination has been observed. Initialize a sentinel value for each so
16+
// the Gather call below emits all eight metric families.
17+
OwmNodesTotal.WithLabelValues("_test", "_test")
18+
OwmTasksTotal.WithLabelValues("_test", "_test")
19+
OwmFLRoundsTotal.WithLabelValues("_test")
20+
OwmPaymentsTotal.WithLabelValues("_test")
21+
OwmStakeVerificationsTotal.WithLabelValues("_test")
22+
OwmSlashEventsTotal.WithLabelValues("_test")
23+
1424
ts := httptest.NewServer(promhttp.Handler())
1525
defer ts.Close()
1626

owm-pool/.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
/target
2+
*.env
3+
.env*

0 commit comments

Comments
 (0)