From b8374fe5c08a861eb5ac49b4dd53fc5d26fa461b Mon Sep 17 00:00:00 2001 From: matthew-pilot Date: Sat, 30 May 2026 21:33:23 +0000 Subject: [PATCH 1/2] test(policy): add atomic swap guard for runner reload (PILOT-310) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Verify that during a policy reload, Get(netID) never returns nil — the new runner must be registered before the old one is stopped. The test hammers managerView.Get in a goroutine while startInternal replaces a live runner with a new policy. Any nil return during the swap is a gap where gate decisions see no policy. --- zz_service_atomic_swap_test.go | 100 +++++++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 zz_service_atomic_swap_test.go diff --git a/zz_service_atomic_swap_test.go b/zz_service_atomic_swap_test.go new file mode 100644 index 0000000..cabde9e --- /dev/null +++ b/zz_service_atomic_swap_test.go @@ -0,0 +1,100 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +package policy + +import ( + "context" + "encoding/json" + "testing" + "time" +) + +// TestStartInternal_AtomicSwap verifies that reloading a policy runner +// never leaves a window where Get(netID) returns nil — the new runner +// is registered before the old one is stopped. +func TestStartInternal_AtomicSwap(t *testing.T) { + t.Parallel() + svc := NewService(&fakeRuntime{}) + netID := uniqueNetID() + + // Policy 1: trivial allow-all. + pol1 := &PolicyDocument{ + Version: 1, + Config: map[string]interface{}{"max_peers": 10, "cycle": "1h"}, + Rules: []Rule{ + {Name: "allow", On: "connect", Match: "true", Actions: []Action{{Type: ActionAllow}}}, + }, + } + + // Start first runner. + _, err := svc.startInternal(netID, mustMarshalPolicy(t, pol1)) + if err != nil { + t.Fatalf("startInternal: %v", err) + } + + // Confirm it's there. + if svc.Manager().Get(netID) == nil { + t.Fatal("Get returned nil after first start") + } + + // Policy 2: a different allow-all (triggers reload). + pol2 := &PolicyDocument{ + Version: 1, + Config: map[string]interface{}{"max_peers": 20, "cycle": "2h"}, + Rules: []Rule{ + {Name: "allow", On: "connect", Match: "true", Actions: []Action{{Type: ActionAllow}}}, + }, + } + + done := make(chan struct{}) + errCh := make(chan error, 1) + + // Goroutine that hammers Get in a loop during the reload. + go func() { + defer close(done) + for { + select { + case <-errCh: + return + default: + } + if svc.Manager().Get(netID) == nil { + select { + case errCh <- nil: + default: + } + return + } + } + }() + + // Give the goroutine a moment to start. + time.Sleep(10 * time.Millisecond) + + // Reload the policy — this triggers the stop-and-swap. + _, err = svc.startInternal(netID, mustMarshalPolicy(t, pol2)) + if err != nil { + t.Fatalf("reload startInternal: %v", err) + } + + // Signal the observer to stop and check for errors. + close(errCh) + <-done + + // Confirm Get still returns non-nil after reload. + if svc.Manager().Get(netID) == nil { + t.Fatal("Get returned nil after reload") + } + + // Clean up: stop the runner (so its goroutines don't outlive the test). + _ = svc.Stop(context.Background()) +} + +func mustMarshalPolicy(t *testing.T, doc *PolicyDocument) []byte { + t.Helper() + data, err := json.Marshal(doc) + if err != nil { + t.Fatal(err) + } + return data +} From 23f2c14c2312392735c59954522e5dde1aea0d66 Mon Sep 17 00:00:00 2001 From: matthew-pilot Date: Sat, 30 May 2026 21:33:27 +0000 Subject: [PATCH 2/2] fix(policy): register new runner before stopping old one (PILOT-310) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In startInternal, the old runner was stopped while holding s.mu and before the new runner was started — creating a brief window where Get(netID) could return nil and gate decisions see no policy. Fix: register and start the new runner first, release the mutex, then stop the old runner outside the lock. This eliminates the window entirely. Closes PILOT-310 --- service.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/service.go b/service.go index c1c328a..c373b63 100644 --- a/service.go +++ b/service.go @@ -233,13 +233,14 @@ func (s *Service) startInternal(netID uint16, policyJSON []byte) (*PolicyRunner, } s.mu.Lock() - if old, ok := s.runners[netID]; ok { - old.Stop() - } + old := s.runners[netID] pr := NewPolicyRunner(netID, cp, s.runtime) pr.Start() s.runners[netID] = pr s.mu.Unlock() + if old != nil { + old.Stop() + } slog.Info("policy: started runner", "network_id", netID) return pr, nil