Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 20 additions & 30 deletions cli/command/container/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,36 +144,21 @@ func RunStats(ctx context.Context, dockerCLI command.Cli, options *StatsOptions)
}

eh := newEventHandler()
addEvents := []events.Action{events.ActionStart}
if options.All {
eh.setHandler(events.ActionCreate, func(e events.Message) {
if s := NewStats(e.Actor.ID); cStats.add(s) {
waitFirst.Add(1)
log.G(ctx).WithFields(log.Fields{
"event": e.Action,
"container": e.Actor.ID,
}).Debug("collecting stats for container")
go collect(ctx, s, apiClient, !options.NoStream, waitFirst)
}
})
addEvents = append(addEvents, events.ActionCreate)
}

eh.setHandler(events.ActionStart, func(e events.Message) {
eh.setHandler(addEvents, func(ctx context.Context, e events.Message) {
if s := NewStats(e.Actor.ID); cStats.add(s) {
waitFirst.Add(1)
log.G(ctx).WithFields(log.Fields{
"event": e.Action,
"container": e.Actor.ID,
}).Debug("collecting stats for container")
log.G(ctx).Debug("collecting stats for container")
go collect(ctx, s, apiClient, !options.NoStream, waitFirst)
}
})

if !options.All {
eh.setHandler(events.ActionDie, func(e events.Message) {
log.G(ctx).WithFields(log.Fields{
"event": e.Action,
"container": e.Actor.ID,
}).Debug("stop collecting stats for container")
eh.setHandler([]events.Action{events.ActionDie}, func(ctx context.Context, e events.Message) {
log.G(ctx).Debug("stop collecting stats for container")
cStats.remove(e.Actor.ID)
})
}
Expand Down Expand Up @@ -216,7 +201,7 @@ func RunStats(ctx context.Context, dockerCLI command.Cli, options *StatsOptions)
}

eventChan := make(chan events.Message)
go eh.watch(eventChan)
go eh.watch(ctx, eventChan)
stopped := make(chan struct{})
go monitorContainerEvents(started, eventChan, stopped)
defer close(stopped)
Expand Down Expand Up @@ -369,33 +354,38 @@ func RunStats(ctx context.Context, dockerCLI command.Cli, options *StatsOptions)

// newEventHandler initializes and returns an eventHandler
func newEventHandler() *eventHandler {
return &eventHandler{handlers: make(map[events.Action]func(events.Message))}
return &eventHandler{handlers: make(map[events.Action]func(context.Context, events.Message))}
}

// eventHandler allows for registering specific events to setHandler.
type eventHandler struct {
handlers map[events.Action]func(events.Message)
handlers map[events.Action]func(context.Context, events.Message)
}

func (eh *eventHandler) setHandler(action events.Action, handler func(events.Message)) {
eh.handlers[action] = handler
func (eh *eventHandler) setHandler(actions []events.Action, handler func(context.Context, events.Message)) {
for _, action := range actions {
eh.handlers[action] = handler
}
}

// watch ranges over the passed in event chan and processes the events based on the
// handlers created for a given action.
// To stop watching, close the event chan.
func (eh *eventHandler) watch(c <-chan events.Message) {
func (eh *eventHandler) watch(ctx context.Context, c <-chan events.Message) {
for e := range c {
h, exists := eh.handlers[e.Action]
if !exists {
continue
}
if e.Actor.ID == "" {
log.G(context.TODO()).WithField("event", e).Errorf("event handler: received %s event with empty ID", e.Action)
log.G(ctx).WithField("event", e).Errorf("event handler: received %s event with empty ID", e.Action)
continue
}
logger := log.G(ctx).WithFields(log.Fields{
"event": e.Action,
"container": e.Actor.ID,
})

log.G(context.TODO()).WithField("event", e).Debugf("event handler: received %s event for: %s", e.Action, e.Actor.ID)
go h(e)
go h(log.WithLogger(ctx, logger), e)
}
}
Loading