Skip to content
This repository was archived by the owner on Apr 29, 2020. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/ds/daemon_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ func (ds *daemonSet) WatchDesires(
// so that the timer would be stopped after
err = nil
case <-ctx.Done():
ds.logger.Warnln("goroutine exiting due to canceled context")
return
}
} else {
Expand All @@ -380,6 +381,7 @@ func (ds *daemonSet) WatchDesires(
case newDS, ok := <-updatedCh:
if !ok {
// channel closed
ds.logger.Warnln("goroutine exiting because updatedCh has closed")
return
}
if ds.ID() != newDS.ID {
Expand Down Expand Up @@ -462,6 +464,7 @@ func (ds *daemonSet) WatchDesires(

case deleteDS, ok := <-deletedCh:
if !ok {
ds.logger.Warnln("goroutine exiting because deletedCh has closed")
return
}
// Deleting a daemon sets has no effect
Expand All @@ -471,6 +474,7 @@ func (ds *daemonSet) WatchDesires(
case _, ok := <-nodesChangedCh:
if !ok {
// channel closed
ds.logger.Warnln("goroutine exiting because nodesChangedCh has closed")
return
}
if reportErr := ds.reportEligible(); reportErr != nil {
Expand Down Expand Up @@ -530,6 +534,7 @@ func (ds *daemonSet) WatchDesires(
nodesToAdd <- addedNodes

case <-ctx.Done():
ds.logger.Warnln("goroutine exiting because context was canceled")
return
}
}
Expand Down
18 changes: 16 additions & 2 deletions pkg/ds/farm.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type Farm struct {
type childDS struct {
ds DaemonSet
cancel context.CancelFunc
updatedCh chan<- ds_fields.DaemonSet
updatedCh chan ds_fields.DaemonSet
deletedCh chan<- ds_fields.DaemonSet
errCh <-chan error
unlocker consul.TxnUnlocker
Expand Down Expand Up @@ -310,6 +310,10 @@ func (dsf *Farm) closeChild(dsID fields.ID) {
if child, ok := dsf.children[dsID]; ok {
dsf.logger.WithField("ds", dsID).Infoln("Releasing daemon set")
child.cancel()

// drain the updatedCh (it's buffered)
for range child.updatedCh {
}
close(child.updatedCh)
close(child.deletedCh)

Expand Down Expand Up @@ -560,7 +564,9 @@ func (dsf *Farm) spawnDaemonSet(
dsf.statusWritingInterval,
)

updatedCh := make(chan ds_fields.DaemonSet)
// updatedCh is buffered by 1 to protect the control loop by slow (or
// dead) readers
updatedCh := make(chan ds_fields.DaemonSet, 1)
deletedCh := make(chan ds_fields.DaemonSet)
ctx, cancel := context.WithCancel(ctx)

Expand Down Expand Up @@ -733,6 +739,14 @@ func (dsf *Farm) lockAndSpawn(ctx context.Context, dsFields ds_fields.DaemonSet,

// If we already are running the daemon set, just pass the update. Otherwise spawn one
if ok {
// try to drain the buffered value off the updatedCh if there is one (which
// indicates the worker goroutine was slow to read it or is dead)
select {
case <-child.updatedCh:
dsf.logger.Warnln("daemon set worker missed an update, sending a newer one")
default:
}

child.updatedCh <- dsFields
} else {
dsf.children[dsFields.ID] = dsf.spawnDaemonSet(
Expand Down