@@ -20,11 +20,12 @@ package event
2020
2121import (
2222 "expvar"
23+ "sync"
24+ "time"
25+
2326 "github.com/rabbitstack/fibratus/pkg/event/params"
2427 "github.com/rabbitstack/fibratus/pkg/util/multierror"
2528 log "github.com/sirupsen/logrus"
26- "sync"
27- "time"
2829)
2930
3031// maxQueueTTLPeriod specifies the maximum period
@@ -61,6 +62,8 @@ type StackwalkDecorator struct {
6162
6263 flusher * time.Ticker
6364 quit chan struct {}
65+
66+ procs map [uint32 ]* Event // stores CreateProcess events with surrogate parent
6467}
6568
6669// NewStackwalkDecorator creates a new callstack return
@@ -70,6 +73,7 @@ func NewStackwalkDecorator(q *Queue) *StackwalkDecorator {
7073 s := & StackwalkDecorator {
7174 q : q ,
7275 buckets : make (map [uint64 ][]* Event ),
76+ procs : make (map [uint32 ]* Event ),
7377 flusher : time .NewTicker (flusherInterval ),
7478 quit : make (chan struct {}, 1 ),
7579 }
@@ -84,6 +88,13 @@ func (s *StackwalkDecorator) Push(e *Event) {
8488 s .mux .Lock ()
8589 defer s .mux .Unlock ()
8690
91+ // the process is created on behalf of brokered
92+ // process and the callstack return addresses
93+ // need to be obtained from the surrogate process
94+ if e .IsSurrogateProcess () {
95+ s .procs [e .Params .MustGetPid ()] = e
96+ }
97+
8798 // append the event to the bucket indexed by stack id
8899 id := e .StackID ()
89100 q , ok := s .buckets [id ]
@@ -121,9 +132,39 @@ func (s *StackwalkDecorator) Pop(e *Event) *Event {
121132 return e
122133 }
123134
135+ if evt .IsSurrogateProcess () && s .procs [evt .Params .MustGetPid ()] != nil {
136+ delete (s .procs , evt .Params .MustGetPid ())
137+ }
138+
124139 callstack := e .Params .MustGetSlice (params .Callstack )
125140 evt .AppendParam (params .Callstack , params .Slice , callstack )
126141
142+ // obtain the callstack from the CreateThread event
143+ // generated by the surrogate process, such as Seclogon.
144+ // If the remote process id is present in the procs map
145+ // the stack is attached to the cached event and then
146+ // pushed to the queue immediately
147+ if (evt .IsCreateRemoteThread () && evt .PS != nil ) &&
148+ (evt .PS .IsSeclogonSvc () || evt .PS .IsAppinfoSvc ()) {
149+ pid := evt .Params .MustGetPid ()
150+ ev , ok := s .procs [pid ]
151+ if ok {
152+ ev .AppendParam (params .Callstack , params .Slice , callstack )
153+ _ = s .q .push (ev )
154+ delete (s .procs , pid )
155+ // find the most recent CreateProcess event and
156+ // remove it from buckets as we have the callstack
157+ qu := s .buckets [ev .StackID ()]
158+ for i := len (qu ) - 1 ; i >= 0 ; i -- {
159+ proc := qu [i ]
160+ if ! proc .IsCreateProcess () && proc .Params .MustGetPid () != pid {
161+ continue
162+ }
163+ s .buckets [ev .StackID ()] = append (qu [:i ], qu [i + 1 :]... )
164+ }
165+ }
166+ }
167+
127168 return evt
128169}
129170
0 commit comments