@@ -20,7 +20,6 @@ package processors
2020
2121import (
2222 "expvar"
23- "github.com/gammazero/deque"
2423 "github.com/rabbitstack/fibratus/pkg/config"
2524 "github.com/rabbitstack/fibratus/pkg/fs"
2625 "github.com/rabbitstack/fibratus/pkg/handle"
@@ -60,9 +59,10 @@ type fsProcessor struct {
6059 devPathResolver fs.DevPathResolver
6160 config * config.Config
6261
63- deq * deque.Deque [* kevent.Kevent ]
64- mu sync.Mutex
65- purger * time.Ticker
62+ // buckets stores stack walk events per stack id
63+ buckets map [uint64 ][]* kevent.Kevent
64+ mu sync.Mutex
65+ purger * time.Ticker
6666
6767 quit chan struct {}
6868}
@@ -88,7 +88,7 @@ func newFsProcessor(
8888 devMapper : devMapper ,
8989 devPathResolver : devPathResolver ,
9090 config : config ,
91- deq : deque. New [ * kevent.Kevent ]( 100 ),
91+ buckets : make ( map [ uint64 ][] * kevent.Kevent ),
9292 purger : time .NewTicker (time .Second * 5 ),
9393 quit : make (chan struct {}, 1 ),
9494 }
@@ -159,7 +159,15 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) {
159159 if ! kevent .IsCurrentProcDropped (e .PID ) {
160160 f .mu .Lock ()
161161 defer f .mu .Unlock ()
162- f .deq .PushBack (e )
162+
163+ // append the event to the bucket indexed by stack id
164+ id := e .StackID ()
165+ q , ok := f .buckets [id ]
166+ if ! ok {
167+ f .buckets [id ] = []* kevent.Kevent {e }
168+ } else {
169+ f .buckets [id ] = append (q , e )
170+ }
163171 }
164172 case ktypes .FileOpEnd :
165173 // get the CreateFile pending event by IRP identifier
@@ -169,6 +177,7 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) {
169177 dispo = e .Kparams .MustGetUint64 (kparams .FileExtraInfo )
170178 status = e .Kparams .MustGetUint32 (kparams .NTStatus )
171179 )
180+
172181 if dispo > windows .FILE_MAXIMUM_DISPOSITION {
173182 return e , nil
174183 }
@@ -177,10 +186,12 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) {
177186 return e , nil
178187 }
179188 delete (f .irps , irp )
189+
180190 // reset the wait status to allow passage of this event to
181191 // the aggregator queue. Additionally, append params to it
182192 ev .WaitEnqueue = false
183193 fileObject := ev .Kparams .MustGetUint64 (kparams .FileObject )
194+
184195 // try to get extended file info. If the file object is already
185196 // present in the map, we'll reuse the existing file information
186197 fileinfo , ok := f .files [fileObject ]
@@ -191,9 +202,11 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) {
191202 fileinfo = f .getFileInfo (filepath , opts )
192203 f .files [fileObject ] = fileinfo
193204 }
205+
194206 if f .config .Kstream .EnableHandleKevents {
195207 f .devPathResolver .AddPath (ev .GetParamAsString (kparams .FilePath ))
196208 }
209+
197210 ev .AppendParam (kparams .NTStatus , kparams .Status , status )
198211 if fileinfo .Type != fs .Unknown {
199212 ev .AppendEnum (kparams .FileType , uint32 (fileinfo .Type ), fs .FileTypes )
@@ -205,17 +218,20 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) {
205218 // the events are delayed until the respective FileOpEnd
206219 // event arrives, we enable stack tracing for CreateFile
207220 // events. When the CreateFile event is generated, we store
208- // it pending IRP map. Subsequently, the stack walk event is
209- // generated which we put inside the queue. After FileOpEnd
210- // arrives, the previous stack walk for CreateFile is popped
211- // from the queue and the callstack parameter attached to the
221+ // it in pending IRP map. Subsequently, the stack walk event
222+ // is put inside the queue. After FileOpEnd event arrives,
223+ // the previous stack walk for CreateFile is popped from
224+ // the queue and the callstack parameter attached to the
212225 // event.
213226 if f .config .Kstream .StackEnrichment {
214227 f .mu .Lock ()
215228 defer f .mu .Unlock ()
216- i := f .deq .RIndex (func (evt * kevent.Kevent ) bool { return evt .StackID () == ev .StackID () })
217- if i != - 1 {
218- s := f .deq .Remove (i )
229+
230+ id := ev .StackID ()
231+ q , ok := f .buckets [id ]
232+ if ok && len (q ) > 0 {
233+ var s * kevent.Kevent
234+ s , f .buckets [id ] = q [len (q )- 1 ], q [:len (q )- 1 ]
219235 callstack := s .Kparams .MustGetSlice (kparams .Callstack )
220236 ev .AppendParam (kparams .Callstack , kparams .Slice , callstack )
221237 }
@@ -228,6 +244,7 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) {
228244 return ev , nil
229245 }
230246 }
247+
231248 return ev , nil
232249 case ktypes .ReleaseFile :
233250 fileReleaseCount .Add (1 )
@@ -297,6 +314,7 @@ func (f *fsProcessor) processEvent(e *kevent.Kevent) (*kevent.Kevent, error) {
297314 return e , f .psnap .AddMmap (e )
298315 }
299316 }
317+
300318 return e , nil
301319}
302320
@@ -336,13 +354,16 @@ func (f *fsProcessor) purge() {
336354 select {
337355 case <- f .purger .C :
338356 f .mu .Lock ()
357+
339358 // evict unmatched stack traces
340- for i := 0 ; i < f .deq .Len (); i ++ {
341- evt := f .deq .At (i )
342- if time .Since (evt .Timestamp ) > time .Second * 30 {
343- f .deq .Remove (i )
359+ for id , q := range f .buckets {
360+ for i , evt := range q {
361+ if time .Since (evt .Timestamp ) > time .Second * 30 {
362+ f .buckets [id ] = append (q [:i ], q [i + 1 :]... )
363+ }
344364 }
345365 }
366+
346367 f .mu .Unlock ()
347368 case <- f .quit :
348369 return
0 commit comments