-
Notifications
You must be signed in to change notification settings - Fork 21
Expand file tree
/
Copy pathannotate.go
More file actions
446 lines (420 loc) · 12.9 KB
/
annotate.go
File metadata and controls
446 lines (420 loc) · 12.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
package zannotate
/*
* ZAnnotate Copyright 2018 Regents of the University of Michigan
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy
* of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
import (
"bufio"
"encoding/csv"
"fmt"
"os/signal"
"reflect"
"slices"
"syscall"
"time"
"io"
"net"
"os"
"strings"
"sync"
jsoniter "github.com/json-iterator/go"
log "github.com/sirupsen/logrus"
)
// struct that is populated by the input reader and passed between types of worker threads
type inProcessIP struct {
Out map[string]interface{}
Ip net.IP
}
func jsonToInProcess(line string, ipFieldName string,
annotationFieldName string) inProcessIP {
var inParsed interface{}
var retv inProcessIP
if err := jsoniter.Unmarshal([]byte(line), &inParsed); err != nil {
log.Fatal("unable to parse input json record: ", line)
}
jsonMap := inParsed.(map[string]interface{})
if val, ok := jsonMap[ipFieldName]; ok {
if valS, ok := val.(string); ok {
retv.Ip = net.ParseIP(valS)
} else {
log.Fatal("ip is not a string in JSON for ", line)
}
} else {
log.Fatal("unable to find IP address field in ", line)
}
if _, ok := jsonMap[annotationFieldName]; ok {
log.Fatal("input record already contains annotation key ", line)
}
retv.Out = jsonMap
return retv
}
func ipToInProcess(line string) inProcessIP {
var retv inProcessIP
retv.Ip = net.ParseIP(line)
if retv.Ip == nil {
log.Fatal("invalid input IP address: ", line)
}
retv.Out = make(map[string]interface{})
retv.Out["ip"] = retv.Ip
return retv
}
func csvToInProcess(record []string, headers []string, ipFieldName, annotationFieldName string) inProcessIP {
var retv inProcessIP
if len(record) != len(headers) {
log.Fatalf("csv record length (%d) does not match header length (%d) for record: %v", len(record), len(headers), record)
}
outMap := make(map[string]interface{}, len(headers))
for i, header := range headers {
outMap[header] = record[i]
}
foundIPField := false
for i, header := range headers {
switch header {
case ipFieldName:
foundIPField = true
retv.Ip = net.ParseIP(record[i])
if retv.Ip == nil {
log.Fatalf("unable to parse IP at column '%d': %s", i, record[i])
}
case annotationFieldName:
log.Fatalf("csv headers already contains annotation key '%v'", annotationFieldName)
default:
outMap[header] = record[i]
}
}
if !foundIPField {
log.Fatalf("unable to find IP address field with IP field name %s in CSV record: %v", ipFieldName, record)
}
retv.Out = outMap
return retv
}
// tee takes an input channel and returns two output channels that will both receive the same data as the input channel.
// The output channels will be closed when the input channel is closed and all data has been processed.
func tee[T any](in <-chan T) (<-chan T, <-chan T) {
out1 := make(chan T)
out2 := make(chan T)
go func() {
defer close(out1)
defer close(out2)
for val := range in {
out1 <- val
out2 <- val
}
}()
return out1, out2
}
// single worker that reads from file and queues raw lines
func AnnotateRead(conf *GlobalConf, path string, in chan<- string) {
log.Debug("read thread started")
var f *os.File
if path == "" || path == "-" {
log.Debug("reading input from stdin")
f = os.Stdin
} else {
var err error
f, err = os.Open(path)
if err != nil {
log.Fatal("unable to open input file:", err.Error())
}
log.Debug("reading input from ", path)
}
r := bufio.NewReader(f)
if conf.InputFileType == "csv" {
// Need to extract the header
header, err := r.ReadString('\n')
if err != nil {
log.Fatal("unable to read the CSV headers", err.Error())
}
csvReader := csv.NewReader(strings.NewReader(header))
fields, err := csvReader.Read()
if err != nil {
log.Fatal("unable to parse CSV headers: ", err.Error())
}
conf.csvHeaders = fields
}
// read IPs out of input
for {
line, err := r.ReadString('\n')
if line != "" {
in <- line
}
if err != nil {
if err != io.EOF {
log.Fatal("input unable to read file", err)
}
break
}
}
close(in)
log.Debug("read thread finished")
}
// multiple workers that decode raw lines from AnnotateRead
// from JSON/CSV into native golang objects
func AnnotateInputDecode(conf *GlobalConf, inChan <-chan string,
outChan chan<- inProcessIP, wg *sync.WaitGroup, i int) {
for line := range inChan {
l := strings.TrimSuffix(line, "\n")
switch conf.InputFileType {
case "json":
val := jsonToInProcess(l, conf.InputIPFieldName, conf.OutputAnnotationFieldName)
if conf.OutputAnnotationFieldName != "" {
val.Out[conf.OutputAnnotationFieldName] = make(map[string]interface{})
}
outChan <- val
case "csv":
r := csv.NewReader(strings.NewReader(l))
row, err := r.Read()
if err != nil {
log.Errorf("failed to parse CSV line (%s): %v", l, err)
continue
}
val := csvToInProcess(row, conf.csvHeaders, conf.InputIPFieldName, conf.OutputAnnotationFieldName)
if conf.OutputAnnotationFieldName != "" {
val.Out[conf.OutputAnnotationFieldName] = make(map[string]interface{})
}
outChan <- val
default:
outChan <- ipToInProcess(l)
}
}
log.Debug("decode thread ", i, " done")
wg.Done()
}
func AnnotateOutputEncode(conf *GlobalConf, inChan <-chan inProcessIP,
outChan chan<- string, wg *sync.WaitGroup, i int) {
//
for rec := range inChan {
jsonRes, err := jsoniter.Marshal(rec.Out)
if err != nil {
log.Fatal("Unable to marshal JSON result", err)
}
outChan <- string(jsonRes)
}
wg.Done()
}
func AnnotateWrite(path string, out <-chan string, wg *sync.WaitGroup) {
log.Debug("write thread started")
var f *os.File
if path == "" || path == "-" {
f = os.Stdout
} else {
var err error
f, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
log.Fatal("unable to open output file:", err.Error())
}
defer func() {
if err := f.Close(); err != nil {
log.Errorf("unable to close output file: %v", err)
}
}()
}
for n := range out {
if _, err := f.WriteString(n + "\n"); err != nil {
log.Fatalf("unable to write to output file: %v", err)
}
}
wg.Done()
log.Debug("write thread finished")
}
func isPtrNil(i any) bool {
if i == nil {
return true
}
val := reflect.ValueOf(i)
switch val.Kind() {
case reflect.Pointer, reflect.Interface, reflect.Slice, reflect.Map:
return val.IsNil()
default:
return false
}
}
func AnnotateWorker(conf *GlobalConf, a Annotator, inChan <-chan inProcessIP,
outChan chan<- inProcessIP, fieldName string, wg *sync.WaitGroup, i int) {
name := a.GetFieldName()
log.Debug("annotate worker (", name, ") ", i, " started")
log.Debug("annotate worker (", name, ") ", i, " to use fieldname ", name)
if err := a.Initialize(); err != nil {
log.Fatal("error initializing annotate worker: ", err)
}
for inProcess := range inChan {
if fieldName != "" && slices.Contains([]string{"json", "csv"}, conf.InputFileType) {
p := inProcess.Out[fieldName].(map[string]interface{})
res := a.Annotate(inProcess.Ip)
if isPtrNil(res) {
res = struct{}{} // Don't return null, breaks downstream JSON parsing
}
p[name] = res
} else {
res := a.Annotate(inProcess.Ip)
if isPtrNil(res) {
res = struct{}{} // Don't return null, breaks downstream JSON parsing
}
inProcess.Out[name] = res
}
outChan <- inProcess
}
wg.Done()
}
// PerSecondUpdateWorker prints a per-second scan summary as well as a Scan Completed/Aborted msg at the end
// It writes the updates to the file path provided, or stderr if the file path is empty or "-".
// For every line of output received on outChan, it counts one IP annotated
func PerSecondUpdateWorker(filePath string, outChan <-chan string, wg *sync.WaitGroup) {
const (
scanCompleteStatusMsg = "Scan Complete; "
scanAbortedStatusMsg = "Scan Aborted; "
perSecondStatusMsg = ""
)
log.Debug("PerSecondUpdateWorker started")
defer wg.Done()
f := os.Stderr
userProvidedFilePath := filePath != "-" && filePath != ""
if userProvidedFilePath {
var err error
f, err = os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil {
log.Fatalf("unable to open per-second log file: %s", err.Error())
}
defer func(f *os.File) {
err := f.Close()
if err != nil {
log.Fatalf("unable to close per-second log file: %s", err.Error())
}
}(f)
}
startTime := time.Now()
ipsAnnotated := 0
getLogMessage := func(scanStatus string) string {
timeSinceStart := time.Since(startTime)
return fmt.Sprintf("%02dh:%02dm:%02ds; %s%d ips annotated; %.02f ips/sec\n",
int(timeSinceStart.Hours()),
int(timeSinceStart.Minutes())%60,
int(timeSinceStart.Seconds())%60,
scanStatus, // empty string for per-second updates and Scan Complete/Aborted for the relevant circumstance
ipsAnnotated,
float64(ipsAnnotated)/timeSinceStart.Seconds())
}
monitorForInterrupt := func() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
sigNum := <-sigs // SIGINT or SIGTERM received
_, err := f.WriteString(getLogMessage(scanAbortedStatusMsg))
if err != nil {
log.Fatalf("unable to write to log file: %v", err)
}
if userProvidedFilePath {
err = f.Sync() // User provided an actual file to log to, let's flush it to disk before exiting
if err != nil {
log.Fatalf("unable to write to log file: %v", err)
}
}
os.Exit(128 + int(sigNum.(syscall.Signal)))
}
go monitorForInterrupt()
// Now for the per-second, usual status update loop
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
// Print per-second output summary
_, err := f.WriteString(getLogMessage(perSecondStatusMsg))
if err != nil {
log.Fatalf("unable to write to log file: %v", err)
}
case _, ok := <-outChan:
if !ok {
// output channel closed, scan complete
_, err := f.WriteString(getLogMessage(scanCompleteStatusMsg))
if err != nil {
log.Fatalf("unable to write to log file: %v", err)
}
return
}
// IP annotated on the outbound channel
ipsAnnotated++
}
}
}
func DoAnnotation(conf *GlobalConf) {
// let each enabled annotator do their global initialization
// before we ask them to generate threa Annotators
for _, annotator := range Annotators {
if annotator.IsEnabled() {
if err := annotator.Initialize(conf); err != nil {
log.Fatal("Annotation unable to initialize: ", err)
}
}
}
// several types of channels/subprocesses
// [read from file](1) -> [decode input data](n,d=3) -> [annotator 1](n)
// -> [annotator 2](n) -> ... -> [annotator n](n) -> [encode output data](n,d=3)
// -> [write to file](1)
inRaw := make(chan string)
inDecoded := make(chan inProcessIP)
// read input file
go AnnotateRead(conf, conf.InputFilePath, inRaw)
// decode input data
var decodeWG sync.WaitGroup
for i := 0; i < conf.InputDecodeThreads; i++ {
go AnnotateInputDecode(conf, inRaw, inDecoded, &decodeWG, i)
decodeWG.Add(1)
}
// spawn threads for each type of annotator
lastChannel := inDecoded
nextChannel := make(chan inProcessIP)
var annotateWaitGroups []*sync.WaitGroup
var annotateChannels []chan inProcessIP
for _, annotator := range Annotators {
if annotator.IsEnabled() {
var annotateWG sync.WaitGroup
for i := 0; i < annotator.GetWorkers(); i++ {
go AnnotateWorker(conf, annotator.MakeAnnotator(i), lastChannel, nextChannel,
conf.OutputAnnotationFieldName, &annotateWG, i)
annotateWG.Add(1)
}
lastChannel = nextChannel
annotateWaitGroups = append(annotateWaitGroups, &annotateWG)
annotateChannels = append(annotateChannels, lastChannel)
nextChannel = make(chan inProcessIP)
}
}
// encode raw data
var encodeWG sync.WaitGroup
pipedEncodedOut := make(chan string)
for i := 0; i < conf.OutputEncodeThreads; i++ {
go AnnotateOutputEncode(conf, lastChannel, pipedEncodedOut, &encodeWG, i)
encodeWG.Add(1)
}
var writeWG sync.WaitGroup
encodedOut, updatesOut := tee[string](pipedEncodedOut)
go AnnotateWrite(conf.OutputFilePath, encodedOut, &writeWG)
writeWG.Add(1)
go PerSecondUpdateWorker(conf.StatusUpdatesFilePath, updatesOut, &writeWG)
writeWG.Add(1)
// all workers started. close out everything in a safe order
// inRaw: we don't need to wait on this because it'll close its own channel
// when it finishes until that channel is closed, none of the decoder threads
// will finish. So, just wait for them.
decodeWG.Wait()
close(inDecoded)
// wait on all of the different types of annotation workers
for i, wg := range annotateWaitGroups {
wg.Wait()
close(annotateChannels[i])
}
// wait for the encoders
encodeWG.Wait()
close(pipedEncodedOut)
// wait on writing to file
writeWG.Wait()
//endTime := time.Now().Format(time.RFC3339)
}