From cd71e92273e3883705aefdd94a88204c3238d5d2 Mon Sep 17 00:00:00 2001 From: jianghao65536 Date: Fri, 10 Oct 2025 01:01:08 +0800 Subject: [PATCH] systemd: retry when the dbus connection returns EAGAIN Signed-off-by: jianghao65536 [cyphar: gofumpt systemd/dbus_test.go] [cyphar: simplify retry loop to return from inside loop] [cyphar: improve exponential backoff to be less aggressive] [cyphar: improve parallel test] Signed-off-by: Aleksa Sarai --- systemd/dbus.go | 26 ++++++++++++++++++--- systemd/dbus_test.go | 55 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 3 deletions(-) create mode 100644 systemd/dbus_test.go diff --git a/systemd/dbus.go b/systemd/dbus.go index bb87ae8..c492372 100644 --- a/systemd/dbus.go +++ b/systemd/dbus.go @@ -4,10 +4,13 @@ import ( "context" "errors" "fmt" + "math/rand/v2" "sync" + "time" systemdDbus "github.com/coreos/go-systemd/v22/dbus" dbus "github.com/godbus/dbus/v5" + "golang.org/x/sys/unix" ) var ( @@ -64,10 +67,27 @@ func (d *dbusConnManager) getConnection() (*systemdDbus.Conn, error) { } func (d *dbusConnManager) newConnection() (*systemdDbus.Conn, error) { - if dbusRootless { - return newUserSystemdDbus() + newDbusConn := func() (*systemdDbus.Conn, error) { + if dbusRootless { + return newUserSystemdDbus() + } + return systemdDbus.NewWithContext(context.TODO()) + } + + var err error + for retry := range 7 { + var conn *systemdDbus.Conn + conn, err = newDbusConn() + if !errors.Is(err, unix.EAGAIN) { + return conn, err + } + // Exponential backoff (100ms * 2^attempt + ~12.5% jitter). + // At most we would expect 15 seconds of delay with 7 attempts. + delay := 100 * time.Millisecond << retry + delay += time.Duration(rand.Int64N(1 + (delay.Milliseconds() >> 3))) + time.Sleep(delay) } - return systemdDbus.NewWithContext(context.TODO()) + return nil, fmt.Errorf("dbus connection failed after several retries: %w", err) } // resetConnection resets the connection to its initial state diff --git a/systemd/dbus_test.go b/systemd/dbus_test.go new file mode 100644 index 0000000..50bde38 --- /dev/null +++ b/systemd/dbus_test.go @@ -0,0 +1,55 @@ +package systemd + +import ( + "context" + "os" + "sync" + "testing" +) + +func TestParallelConnection(t *testing.T) { + if !IsRunningSystemd() { + t.Skip("Test requires systemd.") + } + var dms []*dbusConnManager + for range 600 { + dms = append(dms, newDbusConnManager(os.Geteuid() != 0)) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var ( + doneWg sync.WaitGroup + startCh = make(chan struct{}) + errCh = make(chan error, 1) + ) + for _, dm := range dms { + doneWg.Add(1) + go func(dm *dbusConnManager) { + defer doneWg.Done() + select { + case <-ctx.Done(): + return + case <-startCh: + _, err := dm.newConnection() + if err != nil { + // Only bother trying to send the first error. + select { + case errCh <- err: + default: + } + cancel() + } + } + }(dm) + } + close(startCh) // trigger all connection attempts + doneWg.Wait() + + select { + case err := <-errCh: + t.Fatal(err) + default: + } +}