diff --git a/pipeline/processor.go b/pipeline/processor.go index de4cd0eaa..66b5beadd 100644 --- a/pipeline/processor.go +++ b/pipeline/processor.go @@ -1,6 +1,8 @@ package pipeline import ( + "sync" + "github.com/ozontech/file.d/logger" insaneJSON "github.com/vitkovskii/insane-json" "go.uber.org/atomic" @@ -397,7 +399,14 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) { parent.SetChildParentKind() nextActionIdx := parent.action + 1 - for _, node := range nodes { + wg := &sync.WaitGroup{} + results := make([]*Event, len(nodes)) + resultsChan := make(chan struct { + index int + child *Event + }, len(nodes)) + + for i, node := range nodes { // we can't reuse parent event (using insaneJSON.Root{Node: child} // because of nil decoder child := &Event{ @@ -409,10 +418,33 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) { child.SetChildKind() child.action = nextActionIdx - ok, _ := p.doActions(child) - if ok { + wg.Add(1) + go func(i int, child *Event) { + defer wg.Done() + ok, _ := p.doActions(child) + if ok { + resultsChan <- struct { + index int + child *Event + }{index: i, child: child} + } + }(i, child) + } + + go func() { + wg.Wait() + close(resultsChan) + }() + + for result := range resultsChan { + results[result.index] = result.child + } + + for _, child := range results { + if child != nil { child.stage = eventStageOutput p.output.Out(child) + child.Root.ReleasePoolMem() } }