From a382407a89cc91c021bb3aebbd42d2eda29047ae Mon Sep 17 00:00:00 2001 From: contra Date: Thu, 28 May 2026 15:33:15 +0300 Subject: [PATCH 1/3] =?UTF-8?q?fix:=20proxyclient=20lifecycle=20=E2=80=94?= =?UTF-8?q?=20immediate=20close,=20sync=20bind,=20nil=20guards?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Close(): close device immediately instead of 2-minute deferred goroutine. Orphaned devices accumulated, sending rogue keepalives. - ConfigureDevice(): close old device/proxy before creating new ones. ReConfigureDevice leaked old netstack, device, and proxy server. - Proxy(): synchronous net.Listen + server.Serve so port conflicts fail ConfigureDevice immediately instead of silently in background. - PeerStats(): nil/lock guard on Device to prevent panic after Close. - Suppress http.ErrServerClosed log noise on normal shutdown. --- .../wireguard/endpoint/proxyclient/client.go | 57 ++++-- .../endpoint/proxyclient/client_test.go | 193 +++++++++++++++++- 2 files changed, 234 insertions(+), 16 deletions(-) diff --git a/services/wireguard/endpoint/proxyclient/client.go b/services/wireguard/endpoint/proxyclient/client.go index 96a0d8441c..8029566bd6 100644 --- a/services/wireguard/endpoint/proxyclient/client.go +++ b/services/wireguard/endpoint/proxyclient/client.go @@ -20,7 +20,9 @@ package proxyclient import ( "bufio" "context" + "errors" "fmt" + "net" "net/http" "net/netip" "strings" @@ -54,6 +56,23 @@ func (c *client) ReConfigureDevice(config wgcfg.DeviceConfig) error { } func (c *client) ConfigureDevice(cfg wgcfg.DeviceConfig) error { + // Close old resources before creating new ones. + // Without this, ReConfigureDevice leaks the old device, netstack, and + // proxy server — the old proxy keeps holding the port so the new one + // silently fails to bind. + c.mu.Lock() + oldProxyClose := c.proxyClose + c.proxyClose = nil + oldDevice := c.Device + c.Device = nil + c.mu.Unlock() + if oldProxyClose != nil { + oldProxyClose() + } + if oldDevice != nil { + oldDevice.Close() + } + localAddr, err := netip.ParseAddr(cfg.Subnet.IP.String()) if err != nil { return fmt.Errorf("could not parse local addr: %w", err) @@ -99,20 +118,23 @@ func (c *client) DestroyDevice(iface string) error { } func (c *client) PeerStats(iface string) (wgcfg.Stats, error) { - deviceState, err := userspace.ParseUserspaceDevice(c.Device.IpcGetOperation) + c.mu.Lock() + dev := c.Device + c.mu.Unlock() + if dev == nil { + return wgcfg.Stats{}, fmt.Errorf("device is closed") + } + + deviceState, err := userspace.ParseUserspaceDevice(dev.IpcGetOperation) if err != nil { return wgcfg.Stats{}, fmt.Errorf("could not parse device state: %w", err) } stats, statErr := userspace.ParseDevicePeerStats(deviceState) if statErr != nil { - err = statErr - log.Warn().Err(err).Msg("Failed to parse device stats, will try again") - } else { - return stats, nil + return wgcfg.Stats{}, fmt.Errorf("could not parse device state: %w", statErr) } - - return wgcfg.Stats{}, fmt.Errorf("could not parse device state: %w", err) + return stats, nil } func (c *client) Close() (err error) { @@ -121,13 +143,12 @@ func (c *client) Close() (err error) { if c.proxyClose != nil { c.proxyClose() + c.proxyClose = nil } if c.Device != nil { - go func() { - time.Sleep(2 * time.Minute) - c.Device.Close() - }() + c.Device.Close() + c.Device = nil } return nil } @@ -141,8 +162,14 @@ func (c *client) Proxy(tnet *netstack.Net, proxyPort int) error { bind = addr + bind } + // Bind synchronously so port conflicts fail ConfigureDevice immediately + // instead of silently failing in a background goroutine. + ln, err := net.Listen("tcp", bind) + if err != nil { + return fmt.Errorf("proxy listen on %s failed: %w", bind, err) + } + server := http.Server{ - Addr: bind, Handler: newProxyHandler(60*time.Second, tnet), ReadTimeout: 0, ReadHeaderTimeout: 0, @@ -155,13 +182,13 @@ func (c *client) Proxy(tnet *netstack.Net, proxyPort int) error { ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() server.Shutdown(ctx) - return server.Close() } go func() { - err := server.ListenAndServe() - log.Error().Err(err).Msg("Shutting down proxy server...") + if err := server.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Error().Err(err).Msg("Proxy server error") + } }() return nil diff --git a/services/wireguard/endpoint/proxyclient/client_test.go b/services/wireguard/endpoint/proxyclient/client_test.go index 6e15ef4516..2c115537b7 100644 --- a/services/wireguard/endpoint/proxyclient/client_test.go +++ b/services/wireguard/endpoint/proxyclient/client_test.go @@ -18,11 +18,20 @@ package proxyclient import ( + "fmt" "net" + "net/netip" + "sync/atomic" "testing" - "github.com/mysteriumnetwork/node/services/wireguard/wgcfg" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.zx2c4.com/wireguard/conn" + "golang.zx2c4.com/wireguard/device" + + "github.com/mysteriumnetwork/node/services/wireguard/endpoint/netstack" + "github.com/mysteriumnetwork/node/services/wireguard/key" + "github.com/mysteriumnetwork/node/services/wireguard/wgcfg" ) func Test_ConfigureDevice_ConfigureErrors(t *testing.T) { @@ -63,3 +72,185 @@ func Test_ConfigureDevice_ConfigureErrors(t *testing.T) { }) } } + +// newTestDevice creates a minimal WireGuard device for testing. +func newTestDevice(t *testing.T) *device.Device { + t.Helper() + localAddr := netip.MustParseAddr("10.0.0.1") + dnsAddr := netip.MustParseAddr("8.8.8.8") + tunnel, _, err := netstack.CreateNetTUN([]netip.Addr{localAddr}, []netip.Addr{dnsAddr}, 1280) + require.NoError(t, err) + logger := device.NewLogger(device.LogLevelSilent, "(test) ") + return device.NewDevice(tunnel, conn.NewDefaultBind(), logger) +} + +// newTestConfig builds a DeviceConfig with valid keys for ConfigureDevice. +func newTestConfig(t *testing.T, proxyPort int) wgcfg.DeviceConfig { + t.Helper() + privKey, err := key.GeneratePrivateKey() + require.NoError(t, err) + peerPrivKey, err := key.GeneratePrivateKey() + require.NoError(t, err) + peerPubKey, err := key.PrivateKeyToPublicKey(peerPrivKey) + require.NoError(t, err) + + return wgcfg.DeviceConfig{ + IfaceName: "test0", + Subnet: net.IPNet{IP: net.ParseIP("10.0.0.2"), Mask: net.IPv4Mask(255, 255, 255, 0)}, + PrivateKey: privKey, + DNS: []string{"8.8.8.8"}, + Peer: wgcfg.Peer{ + PublicKey: peerPubKey, + AllowedIPs: []string{"0.0.0.0/0"}, + KeepAlivePeriodSeconds: 18, + }, + ProxyPort: proxyPort, + } +} + +func Test_Close_ReleasesDeviceImmediately(t *testing.T) { + c, err := New() + require.NoError(t, err) + + dev := newTestDevice(t) + c.mu.Lock() + c.Device = dev + c.mu.Unlock() + + err = c.Close() + require.NoError(t, err) + + c.mu.Lock() + assert.Nil(t, c.Device, "Device must be nil immediately after Close") + assert.Nil(t, c.proxyClose, "proxyClose must be nil after Close") + c.mu.Unlock() +} + +func Test_Close_ProxyCloseCalledAndNilled(t *testing.T) { + c, err := New() + require.NoError(t, err) + + var called atomic.Bool + c.mu.Lock() + c.proxyClose = func() error { called.Store(true); return nil } + c.mu.Unlock() + + err = c.Close() + require.NoError(t, err) + assert.True(t, called.Load(), "proxyClose must be called during Close") + + c.mu.Lock() + assert.Nil(t, c.proxyClose, "proxyClose must be nil after Close") + c.mu.Unlock() +} + +func Test_Close_Idempotent(t *testing.T) { + c, err := New() + require.NoError(t, err) + + dev := newTestDevice(t) + c.mu.Lock() + c.Device = dev + c.mu.Unlock() + + require.NoError(t, c.Close()) + require.NoError(t, c.Close(), "second Close must not panic or error") +} + +func Test_ConfigureDevice_CleansOldResources(t *testing.T) { + c, err := New() + require.NoError(t, err) + + // Set up an old device and proxy closure to simulate a previous configure. + oldDev := newTestDevice(t) + var oldProxyClosed atomic.Bool + c.mu.Lock() + c.Device = oldDev + c.proxyClose = func() error { oldProxyClosed.Store(true); return nil } + c.mu.Unlock() + + // ConfigureDevice with a valid config — should clean old resources first. + cfg := newTestConfig(t, 0) // port 0 = OS assigns + err = c.ConfigureDevice(cfg) + require.NoError(t, err) + + assert.True(t, oldProxyClosed.Load(), "old proxyClose must be called during reconfigure") + + c.mu.Lock() + assert.NotNil(t, c.Device, "new Device must be set after ConfigureDevice") + assert.NotNil(t, c.proxyClose, "new proxyClose must be set after ConfigureDevice") + c.mu.Unlock() + + // Cleanup + c.Close() +} + +func Test_ConfigureDevice_DoubleConfigureSamePort(t *testing.T) { + c, err := New() + require.NoError(t, err) + + // First configure — picks a free port. + cfg1 := newTestConfig(t, 0) + err = c.ConfigureDevice(cfg1) + require.NoError(t, err) + + c.mu.Lock() + firstDevice := c.Device + c.mu.Unlock() + require.NotNil(t, firstDevice) + + // Second configure — old device/proxy must be cleaned first. + cfg2 := newTestConfig(t, 0) + err = c.ConfigureDevice(cfg2) + require.NoError(t, err) + + c.mu.Lock() + secondDevice := c.Device + c.mu.Unlock() + require.NotNil(t, secondDevice) + assert.True(t, firstDevice != secondDevice, "second ConfigureDevice must create a new device") + + c.Close() +} + +func Test_PeerStats_NilDeviceReturnsError(t *testing.T) { + c, err := New() + require.NoError(t, err) + + // Device is nil (never configured or already closed) + _, err = c.PeerStats("any") + assert.ErrorContains(t, err, "device is closed") +} + +func Test_PeerStats_NilAfterClose(t *testing.T) { + c, err := New() + require.NoError(t, err) + + dev := newTestDevice(t) + c.mu.Lock() + c.Device = dev + c.mu.Unlock() + + require.NoError(t, c.Close()) + + _, err = c.PeerStats("any") + assert.ErrorContains(t, err, "device is closed") +} + +func Test_Proxy_SyncBindFailsOnPortConflict(t *testing.T) { + c, err := New() + require.NoError(t, err) + + // Occupy a port on all interfaces (same as Proxy does) + ln, err := net.Listen("tcp", ":0") + require.NoError(t, err) + defer ln.Close() + _, portStr, _ := net.SplitHostPort(ln.Addr().String()) + var port int + fmt.Sscanf(portStr, "%d", &port) + + // Proxy on the same port should fail synchronously + err = c.Proxy(nil, port) + assert.Error(t, err, "Proxy should fail when port is already bound") + assert.Contains(t, err.Error(), "proxy listen") +} From 6ace6e85f68a8f0e811ab5b117789f95386cf60a Mon Sep 17 00:00:00 2001 From: contra Date: Thu, 28 May 2026 15:33:23 +0300 Subject: [PATCH 2/3] fix: skip interface release and cleanup in proxymode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Stop(): skip ReleaseInterface when ProxyPort > 0. Proxymode names interfaces myst without AllocateInterface, so release always errored with "allocated interface not found". - StartConsumerMode(): same skip on configure failure path. - cleanAbandonedInterfaces(): skip in proxymode (same reason as dVPN — multiple concurrent connections should not destroy each other). --- services/wireguard/endpoint/endpoint.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/services/wireguard/endpoint/endpoint.go b/services/wireguard/endpoint/endpoint.go index 7b77489300..512b354fb0 100644 --- a/services/wireguard/endpoint/endpoint.go +++ b/services/wireguard/endpoint/endpoint.go @@ -75,8 +75,11 @@ func (ce *connectionEndpoint) StartConsumerMode(cfg wgcfg.DeviceConfig) error { ce.cfg = cfg if err := ce.wgClient.ConfigureDevice(cfg); err != nil { - if err1 := ce.resourceAllocator.ReleaseInterface(iface); err1 != nil { - log.Error().Err(err1).Msg("Can't release allocated interface " + iface) + // Only release interface if it was allocated (not in proxymode) + if cfg.ProxyPort == 0 { + if err1 := ce.resourceAllocator.ReleaseInterface(iface); err1 != nil { + log.Error().Err(err1).Msg("Can't release allocated interface " + iface) + } } return errors.Wrap(err, "could not configure device") } @@ -156,12 +159,18 @@ func (ce *connectionEndpoint) Stop() error { return err } + // In proxymode, interface name is derived from ProxyPort (e.g. "myst13276") + // and was never allocated through the resourceAllocator, so skip release. + if ce.cfg.ProxyPort > 0 { + return nil + } + return ce.resourceAllocator.ReleaseInterface(ce.cfg.IfaceName) } func (ce *connectionEndpoint) cleanAbandonedInterfaces() error { - if config.GetBool(config.FlagDVPNMode) { - // Do not clean up unknown interfaces in dVPN mode. + if config.GetBool(config.FlagDVPNMode) || config.GetBool(config.FlagProxyMode) { + // Do not clean up unknown interfaces in dVPN or proxy mode. // There could be several connections at the same time and we should not kill them. return nil } From b170d3018f54ac48127692441b378c4c80d89e80 Mon Sep 17 00:00:00 2001 From: contra Date: Thu, 28 May 2026 15:33:30 +0300 Subject: [PATCH 3/3] fix: skip keepalive auto-disconnect in proxymode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In proxymode the gateway manages tunnel health via its own sentinel and probe mechanisms. The P2P keepalive failure (3 × 5s) was killing perfectly healthy WireGuard tunnels because the P2P channel (NATS/UDP signaling) is less stable than the tunnel itself in Docker. --- core/connection/manager.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/core/connection/manager.go b/core/connection/manager.go index dde63f212a..d1bd0bdb5e 100644 --- a/core/connection/manager.go +++ b/core/connection/manager.go @@ -986,6 +986,15 @@ func (m *connectionManager) keepAliveLoop(channel p2p.Channel, sessionID session log.Err(err).Msgf("Failed to send p2p keepalive ping. SessionID=%s", sessionID) errCount++ if errCount == m.config.KeepAlive.MaxSendErrCount { + // In proxymode, the gateway manages connection health via its own + // sentinel and probe mechanisms. Don't kill the WireGuard tunnel — + // the P2P channel failure might be transient while the tunnel is fine. + if config.GetBool(config.FlagProxyMode) { + log.Warn().Msgf("P2P keepalive failed %d times for SessionID=%s (proxymode: keeping alive)", errCount, sessionID) + errCount = 0 + cancel() + continue + } log.Error().Msgf("Max p2p keepalive err count reached, disconnecting. SessionID=%s", sessionID) if config.GetBool(config.FlagKeepConnectedOnFail) { m.statusOnHold()