Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/cluster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ jobs:
- name: Run cross-node smoke
run: bash scripts/tests/10-test-cluster-api.sh

- name: Run resilience test (kill + restart a node)
# The smoke phase above already validated propagation;
# this phase validates that the cluster keeps serving
# writes when a node is down and that the resurrected
# node converges back. Catches regressions in the
# hint-replay / anti-entropy paths under the actual
# docker network — a class of bugs in-process tests
# cannot reach.
run: bash scripts/tests/20-test-cluster-resilience.sh

- name: Dump container logs (on failure)
if: failure()
run: |
Expand Down
42 changes: 42 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,48 @@ adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

### Added

- **SWIM self-refutation + cross-process gossip dissemination.**
Closes the last `experimental` marker on the heartbeat path.
Three pieces:
- **`acceptGossip` self-refute** — incoming entries that
reference the local node as Suspect or Dead at incarnation
≥ ours now bump the local incarnation and re-mark Alive.
Higher-incarnation-wins propagation in the same function
disseminates the refutation cluster-wide, so a falsely-
suspected node can clear suspicion through gossip alone
(pre-fix the only path was a fresh probe).
- **HTTP gossip wire** — new `Gossip(ctx, targetID, members)`
method on `DistTransport`, new
`POST /internal/gossip` server endpoint (auth-wrapped),
new `GossipMember` wire DTO. `runGossipTick` now falls
through to the HTTP path when the transport isn't an
`InProcessTransport`, so cross-process clusters disseminate
membership state — pre-Phase-E this was an in-process-only
no-op.
- The `experimental` qualifier is removed from
`heartbeatLoop`'s comment + the heartbeat-section field
doc; SWIM-style indirect probes (Phase B.1) and
self-refutation (this round) together provide the SWIM
properties the marker was tracking.
Regression coverage at
[tests/integration/dist_swim_refute_test.go](tests/integration/dist_swim_refute_test.go):
`TestDistSWIM_HTTPGossipExchange` exercises the wire (A pushes
membership to B over HTTP; B's view converges),
`TestDistSWIM_SelfRefute` drives a forged "you are suspect"
gossip into a node's `/internal/gossip` and asserts the local
incarnation bumps + state returns to Alive.
- **End-to-end resilience test** at
[scripts/tests/20-test-cluster-resilience.sh](scripts/tests/20-test-cluster-resilience.sh)
— kills a docker container mid-run, asserts the surviving 4
nodes still serve every previously-written key AND every key
written during the outage, then restarts the killed node and
asserts it converges on the full state within 60 s. Validates
Phase B.2 (hint-replay) and the post-restart anti-entropy
paths against the *actual* docker network — a class of bugs
in-process tests can't reach. 24 assertions across 6 phases.
Wired into both `make test-cluster` (runs after the smoke,
exit-code-propagated through the same teardown trap) and the
`cluster` CI workflow as a follow-up step.
- **Cross-process cluster smoke in CI** —
[.github/workflows/cluster.yml](.github/workflows/cluster.yml) boots
the 5-node `docker-compose.cluster.yml` stack on every PR/push,
Expand Down
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,15 @@ stop-dev-cluster:
# initial review (factory dropped options, seeds without IDs,
# json.RawMessage on non-owner GET).
test-cluster: stop-dev-cluster
@echo "spinning up cluster + running cross-node smoke"
@echo "spinning up cluster + running cross-node smoke + resilience"
@echo
docker compose -f docker-compose.cluster.yml up --build -d
@bash scripts/tests/wait-for-cluster.sh
@rc=0; bash scripts/tests/10-test-cluster-api.sh || rc=$$?; \
if [ $$rc -eq 0 ]; then \
echo ""; echo "smoke ok — running resilience phase"; echo ""; \
bash scripts/tests/20-test-cluster-resilience.sh || rc=$$?; \
fi; \
echo ""; echo "tearing down cluster (rc=$$rc)"; \
docker compose -f docker-compose.cluster.yml down -v --rmi local --remove-orphans >/dev/null 2>&1 || true; \
exit $$rc
Expand Down
2 changes: 1 addition & 1 deletion cmd/hypercache-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package main
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"log/slog"
Expand All @@ -31,6 +30,7 @@ import (
"syscall"
"time"

"github.com/goccy/go-json"
fiber "github.com/gofiber/fiber/v3"

"github.com/hyp3rd/hypercache"
Expand Down
2 changes: 1 addition & 1 deletion cmd/hypercache-server/main_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package main

import (
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/goccy/go-json"
fiber "github.com/gofiber/fiber/v3"
)

Expand Down
Binary file modified hypercache-server
Binary file not shown.
25 changes: 25 additions & 0 deletions pkg/backend/dist_http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ func (s *distHTTPServer) start(bindCtx context.Context, dm *DistMemory) error {
s.registerHealth(dm)
s.registerDrain(dm)
s.registerProbe(dm)
s.registerGossip(dm)
s.registerMerkle(dm)

return s.listen(bindCtx)
Expand Down Expand Up @@ -516,6 +517,30 @@ func (s *distHTTPServer) registerProbe(dm *DistMemory) {
}))
}

// registerGossip wires `POST /internal/gossip` — the SWIM
// membership-dissemination endpoint. The body is a JSON array of
// GossipMember snapshots; the receiver's acceptGossip merges them
// via higher-incarnation-wins and self-refutes if any entry
// claims this node is suspect or dead.
//
// Auth-wrapped like the rest of `/internal/*` because gossip can
// inject membership state — an unauthenticated peer could mark
// real nodes as dead by spoofing a high-incarnation snapshot.
func (s *distHTTPServer) registerGossip(dm *DistMemory) {
s.app.Post("/internal/gossip", s.wrapAuth(func(fctx fiber.Ctx) error {
var members []GossipMember

err := json.Unmarshal(fctx.Body(), &members)
if err != nil {
return fctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{constants.ErrorLabel: err.Error()})
}

dm.acceptGossip(gossipMembersToNodes(members))

return fctx.SendStatus(fiber.StatusOK)
}))
}

func (s *distHTTPServer) registerMerkle(dm *DistMemory) {
s.app.Get("/internal/merkle", s.wrapAuth(func(fctx fiber.Ctx) error {
tree := dm.BuildMerkleTree()
Expand Down
41 changes: 41 additions & 0 deletions pkg/backend/dist_http_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,47 @@ func (t *DistHTTPTransport) IndirectHealth(ctx context.Context, relayNodeID, tar
return nil
}

// Gossip pushes a member-list snapshot to the target's
// `/internal/gossip` endpoint. The receiver merges via
// higher-incarnation-wins and may self-refute if the snapshot
// claims it's suspect — see acceptGossip + refuteIfSuspected.
//
// The body is a JSON array of GossipMember; the wire shape is
// stable (separate type from cluster.Node) so the cluster
// package can add internal fields without breaking peers running
// older binaries.
func (t *DistHTTPTransport) Gossip(ctx context.Context, targetNodeID string, members []GossipMember) error {
payload, err := json.Marshal(members)
if err != nil {
return ewrap.Wrap(err, "marshal gossip payload")
}

hreq, err := t.newNodeRequest(ctx, http.MethodPost, targetNodeID, "/internal/gossip",
nil, bytes.NewReader(payload))
if err != nil {
return ewrap.Wrap(err, errMsgNewRequest)
}

hreq.Header.Set("Content-Type", "application/json")

resp, err := t.doTrusted(hreq)
if err != nil {
return err
}

defer drainBody(t.limitedBody(resp))

if resp.StatusCode == http.StatusNotFound {
return sentinel.ErrBackendNotFound
}

if resp.StatusCode >= statusThreshold {
return ewrap.Newf("gossip status %d", resp.StatusCode)
}

return nil
}

// FetchMerkle retrieves a Merkle tree snapshot from a remote node.
func (t *DistHTTPTransport) FetchMerkle(ctx context.Context, nodeID string) (*MerkleTree, error) {
if t == nil {
Expand Down
78 changes: 68 additions & 10 deletions pkg/backend/dist_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ type DistMemory struct {
nodeID string
seeds []string // static seed node addresses

// heartbeat / failure detection (experimental)
// heartbeat / failure detection. Phase E added SWIM
// self-refutation (refuteIfSuspected) and HTTP gossip
// dissemination, retiring the prior "experimental" marker —
// the path now disseminates suspect/dead transitions across
// the cluster and lets a falsely-accused node bump its
// incarnation to clear suspicion.
hbInterval time.Duration
hbSuspectAfter time.Duration
hbDeadAfter time.Duration
Expand Down Expand Up @@ -3023,19 +3028,32 @@ func (dm *DistMemory) runGossipTick() {
}

target := candidates[idxBig.Int64()]
transport := dm.loadTransport()
snapshot := dm.membership.List()

ip, ok := dm.loadTransport().(*InProcessTransport)
if !ok {
return
}
// In-process fast path: skip the wire and call acceptGossip
// directly. Pre-Phase-E this was the ONLY path; the function
// bailed for any other transport type, so cross-process
// clusters never disseminated membership / never refuted
// suspect claims. The fall-through below now uses the
// transport's Gossip method, which routes via HTTP for the
// auto-created DistHTTPTransport.
if ip, ok := transport.(*InProcessTransport); ok {
if remote, ok2 := ip.backends[string(target.ID)]; ok2 {
remote.acceptGossip(snapshot)
}

remote, ok2 := ip.backends[string(target.ID)]
if !ok2 {
return
}

snapshot := dm.membership.List()
remote.acceptGossip(snapshot)
gossipErr := transport.Gossip(dm.lifeCtx, string(target.ID), nodesToGossipMembers(snapshot))
if gossipErr != nil {
dm.logger.Debug(
"gossip push failed",
slog.String("peer_id", string(target.ID)),
slog.Any("err", gossipErr),
)
}
}

func (dm *DistMemory) acceptGossip(nodes []*cluster.Node) {
Expand All @@ -3045,6 +3063,8 @@ func (dm *DistMemory) acceptGossip(nodes []*cluster.Node) {

for _, node := range nodes {
if node.ID == dm.localNode.ID {
dm.refuteIfSuspected(node)

continue
}

Expand Down Expand Up @@ -3079,6 +3099,41 @@ func (dm *DistMemory) acceptGossip(nodes []*cluster.Node) {
}
}

// refuteIfSuspected handles the SWIM self-refute path: when a peer
// gossips that THIS node is Suspect or Dead at incarnation N, bump
// our local incarnation to N+1 and re-upsert ourselves as Alive.
// Higher-incarnation-wins propagation in `acceptGossip` ensures the
// next gossip tick disseminates the refutation cluster-wide.
//
// Pre-fix this path was a no-op (`continue` on local-ID match) — a
// node that fell briefly behind heartbeat would be marked Suspect by
// peers and could not undo it through gossip; only a fresh probe
// would clear suspicion. Self-refute closes the loop required for
// the heartbeat marker to drop its `experimental` qualifier.
func (dm *DistMemory) refuteIfSuspected(claim *cluster.Node) {
if claim == nil || dm.localNode == nil {
return
}

if claim.State == cluster.NodeAlive {
return // peer agrees we're alive — nothing to refute
}

// Only refute when the peer's claim is at >= our incarnation;
// older claims are stale and ignored.
if claim.Incarnation < dm.localNode.Incarnation {
return
}

dm.membership.Mark(dm.localNode.ID, cluster.NodeAlive)

dm.logger.Info(
"self-refuted suspect/dead claim from peer",
slog.Uint64("claim_incarnation", claim.Incarnation),
slog.String("claim_state", claim.State.String()),
)
}

// chooseNewer picks the item with higher version; on version tie uses lexicographically smaller Origin as winner.
func (dm *DistMemory) chooseNewer(itemA, itemB *cache.Item) *cache.Item {
if itemA == nil {
Expand Down Expand Up @@ -3265,7 +3320,10 @@ func parseSeedSpec(raw string) seedSpec {
return seedSpec{id: id, addr: addr}
}

// heartbeatLoop probes peers and updates membership (best-effort experimental).
// heartbeatLoop probes peers and updates membership. SWIM-style
// indirect probes (Phase B.1) and self-refutation via gossip
// (Phase E) are wired into the surrounding helpers — this loop
// only schedules the per-tick work.
func (dm *DistMemory) heartbeatLoop(ctx context.Context, stopCh <-chan struct{}) { // reduced cognitive complexity via helpers
ticker := time.NewTicker(dm.hbInterval)
defer ticker.Stop()
Expand Down
Loading
Loading