Skip to content
This repository was archived by the owner on Apr 29, 2020. It is now read-only.

Commit 0918e7e

Browse files
committed
DS farm: buffer per-DS update channel
Occasionally the daemon set farm locks up with the farm goroutine blocking forever attempting to send an update to a daemon set worker goroutine. This can happen due to a race where the worker thread might exit for a number of reasons after the farm goroutine checks the child map to determine a worker already exists but before sending an update. This commit sidesteps the problem by buffering the per-daemon set update channel so that the farm goroutine will never block sending to a worker. If a worker dies, an existing routine grabs a mutex protecting the child map and clears out the child entry and drains the buffered channel. The next time an update is seen for the daemon set, the farm loop should know that it needs to spawn another worker.
1 parent 33207df commit 0918e7e

1 file changed

Lines changed: 16 additions & 2 deletions

File tree

pkg/ds/farm.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ type Farm struct {
8787
type childDS struct {
8888
ds DaemonSet
8989
cancel context.CancelFunc
90-
updatedCh chan<- ds_fields.DaemonSet
90+
updatedCh chan ds_fields.DaemonSet
9191
deletedCh chan<- ds_fields.DaemonSet
9292
errCh <-chan error
9393
unlocker consul.TxnUnlocker
@@ -310,6 +310,10 @@ func (dsf *Farm) closeChild(dsID fields.ID) {
310310
if child, ok := dsf.children[dsID]; ok {
311311
dsf.logger.WithField("ds", dsID).Infoln("Releasing daemon set")
312312
child.cancel()
313+
314+
// drain the updatedCh (it's buffered)
315+
for range child.updatedCh {
316+
}
313317
close(child.updatedCh)
314318
close(child.deletedCh)
315319

@@ -560,7 +564,9 @@ func (dsf *Farm) spawnDaemonSet(
560564
dsf.statusWritingInterval,
561565
)
562566

563-
updatedCh := make(chan ds_fields.DaemonSet)
567+
// updatedCh is buffered by 1 to protect the control loop by slow (or
568+
// dead) readers
569+
updatedCh := make(chan ds_fields.DaemonSet, 1)
564570
deletedCh := make(chan ds_fields.DaemonSet)
565571
ctx, cancel := context.WithCancel(ctx)
566572

@@ -733,6 +739,14 @@ func (dsf *Farm) lockAndSpawn(ctx context.Context, dsFields ds_fields.DaemonSet,
733739

734740
// If we already are running the daemon set, just pass the update. Otherwise spawn one
735741
if ok {
742+
// try to drain the buffered value off the updatedCh if there is one (which
743+
// indicates the worker goroutine was slow to read it or is dead)
744+
select {
745+
case <-child.updatedCh:
746+
dsf.logger.Warnln("daemon set worker missed an update, sending a newer one")
747+
default:
748+
}
749+
736750
child.updatedCh <- dsFields
737751
} else {
738752
dsf.children[dsFields.ID] = dsf.spawnDaemonSet(

0 commit comments

Comments
 (0)