Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ linters-settings:
linters:
enable-all: true
disable:
- testpackage
- gocognit
- gocyclo
- funlen
Expand Down
33 changes: 26 additions & 7 deletions cmd/catp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,26 @@ catp [OPTIONS] PATH ...
use 0 for multi-threaded zst decoder (slightly faster at cost of more CPU) (default 1)
-pass value
filter matching, may contain multiple AND patterns separated by ^,
if filter matches, line is passed to the output (unless filtered out by -skip)
each -pass value is added with OR logic,
for example, you can use "-pass bar^baz -pass foo" to only keep lines that have (bar AND baz) OR foo
if filter matches, line is passed to the output (may be filtered out by preceding -skip)
other -pass values are evaluated if preceding pass/skip did not match,
for example, you can use "-pass bar^baz -pass foo -skip fo" to only keep lines that have (bar AND baz) OR foo, but not fox
-pass-any
finishes matching and gets the value even if previous -pass did not match,
if previous -skip matched, the line would be skipped any way.
-pass-csv value
filter matching, loads pass params from CSV file,
each line is treated as -pass, each column value is AND condition.
-progress-json string
write current progress to a file
-rate-limit float
output rate limit lines per second
-skip value
filter matching, may contain multiple AND patterns separated by ^,
if filter matches, line is removed from the output (even if it passed -pass)
each -skip value is added with OR logic,
if filter matches, line is removed from the output (may be kept if it passed preceding -pass)
for example, you can use "-skip quux^baz -skip fooO" to skip lines that have (quux AND baz) OR fooO
-skip-csv value
filter matching, loads skip params from CSV file,
each line is treated as -skip, each column value is AND condition.
-version
print version and exit
```
Expand All @@ -77,10 +85,10 @@ get-key.log: 100.0% bytes read, 1000000 lines processed, 8065.7 l/s, 41.8 MB/s,
```

Run log filtering (lines containing `foo bar` or `baz`) on multiple files in background (with `screen`) and output to a
new file.
new compressed file.

```
screen -dmS foo12 ./catp -output ~/foo-2023-07-12.log -pass "foo bar" -pass "baz" /home/logs/server-2023-07-12*
screen -dmS foo12 ./catp -output ~/foo-2023-07-12.log.zst -pass "foo bar" -pass "baz" /home/logs/server-2023-07-12*
```

```
Expand All @@ -100,3 +108,14 @@ all: 32.3% bytes read, /home/logs/server-2023-07-12-09-00.log_6.zst: 5.1% bytes
# detaching from screen with ctrl+a+d
```

Filter based on large list of needles. Values from allow and block lists are loaded into high-performance
[Aho Corasick](https://en.wikipedia.org/wiki/Aho%E2%80%93Corasick_algorithm) indexes.

```
catp -pass-csv allowlist.csv -skip-csv blocklist.csv -pass-any -output filtered.log.zst source.log.zst
```

Each source line would follow the filtering pipeline:
* if `allowlist.csv` has at least one row, all cells of which are present in the source line, source line gets into output
* if not, but if `blocklist.csv` has at least one row, all cells of which are present in the source line, source line is skipped
* if not, source line gets into output because of `-pass-any`
8 changes: 5 additions & 3 deletions cmd/catp/catp/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
r := &runner{}

flag.Var(flagFunc(func(v string) error {
r.filters = append(r.filters, filter{pass: true, and: bytes.Split([]byte(v), []byte("^"))})
r.filters.addFilter(true, bytes.Split([]byte(v), []byte("^"))...)

return nil
}), "pass", "filter matching, may contain multiple AND patterns separated by ^,\n"+
Expand All @@ -44,7 +44,7 @@

flag.BoolFunc("pass-any", "finishes matching and gets the value even if previous -pass did not match,\n"+
"if previous -skip matched, the line would be skipped any way.", func(s string) error {
r.filters = append(r.filters, filter{pass: true})
r.filters.addPassAny()

Check notice on line 47 in cmd/catp/catp/app.go

View workflow job for this annotation

GitHub Actions / test (stable)

2 statement(s) on lines 46:50 are not covered by tests.

return nil
})
Expand All @@ -55,7 +55,7 @@
"each line is treated as -skip, each column value is AND condition.")

flag.Var(flagFunc(func(v string) error {
r.filters = append(r.filters, filter{pass: false, and: bytes.Split([]byte(v), []byte("^"))})
r.filters.addFilter(false, bytes.Split([]byte(v), []byte("^"))...)

return nil
}), "skip", "filter matching, may contain multiple AND patterns separated by ^,\n"+
Expand Down Expand Up @@ -94,6 +94,8 @@
}
flag.Parse()

r.filters.buildIndex()

if *ver {
fmt.Println(version.Module("github.com/bool64/progress").Version)

Expand Down
45 changes: 6 additions & 39 deletions cmd/catp/catp/catp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import (
"bufio"
"bytes"
"context"
"encoding/csv"
"encoding/json"
Expand Down Expand Up @@ -45,7 +44,7 @@
currentBytesUncompressed int64
currentLines int64

filters []filter
filters filters

currentFile *progress.CountingReader
currentTotal int64
Expand All @@ -66,13 +65,7 @@
hasCompression bool
}

type (
filter struct {
pass bool // Skip is false.
and [][]byte
}
flagFunc func(v string) error
)
type flagFunc func(v string) error

func (f flagFunc) String() string { return "" }
func (f flagFunc) Set(value string) error { return f(value) }
Expand Down Expand Up @@ -172,7 +165,7 @@
atomic.StoreInt64(&r.lastBytesUncompressed, currentBytesUncompressed)
}

if len(r.filters) > 0 || r.options.PrepareLine != nil {
if r.filters.isSet() || r.options.PrepareLine != nil {
m := atomic.LoadInt64(&r.matches)
pr.Matches = &m
res += fmt.Sprintf(", matches %d", m)
Expand Down Expand Up @@ -251,7 +244,7 @@

line := s.Bytes()

if !r.shouldWrite(line) {
if !r.filters.shouldWrite(line) {
continue
}

Expand Down Expand Up @@ -297,32 +290,6 @@
}
}

func (r *runner) shouldWrite(line []byte) bool {
shouldWrite := true

for _, f := range r.filters {
if f.pass {
shouldWrite = false
}

andMatched := true

for _, andFilter := range f.and {
if !bytes.Contains(line, andFilter) {
andMatched = false

break
}
}

if andMatched {
return f.pass
}
}

return shouldWrite
}

func (r *runner) cat(filename string) (err error) { //nolint:gocyclo
var rd io.Reader

Expand Down Expand Up @@ -432,7 +399,7 @@
r.limiter = rate.NewLimiter(rate.Limit(r.rateLimit), 100)
}

if len(r.filters) > 0 || r.parallel > 1 || r.hasOptions || r.countLines || r.rateLimit > 0 {
if r.filters.isSet() || r.parallel > 1 || r.hasOptions || r.countLines || r.rateLimit > 0 {
r.scanFile(filename, rd, out)
} else {
r.readFile(rd, out)
Expand Down Expand Up @@ -524,7 +491,7 @@
and = append(and, []byte(v))
}

r.filters = append(r.filters, filter{pass: pass, and: and})
r.filters.addFilter(pass, and...)

Check notice on line 494 in cmd/catp/catp/catp.go

View workflow job for this annotation

GitHub Actions / test (stable)

1 statement(s) are not covered by tests.
}

return nil
Expand Down
153 changes: 153 additions & 0 deletions cmd/catp/catp/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package catp

import (
"bytes"

"github.com/cloudflare/ahocorasick"
)

type (
filterAnd [][]byte
filterGroup struct {
pass bool
ors []filterAnd

// Prefilter checks for match of the first element of any ors item.
// This first element is removed from and.
pre *ahocorasick.Matcher
}
filters struct {
g []*filterGroup
}
)

func (f *filters) buildIndex() {
for _, g := range f.g {
g.buildIndex()
}
}

func (f *filters) isSet() bool {
return len(f.g) > 0
}

func (f *filters) addFilterString(pass bool, and ...string) {
andb := make([][]byte, 0, len(and))

for _, item := range and {
andb = append(andb, []byte(item))
}

f.addFilter(pass, andb...)
}

func (f *filters) addPassAny() {
f.g = append(f.g, &filterGroup{pass: true})
}

Check notice on line 46 in cmd/catp/catp/filter.go

View workflow job for this annotation

GitHub Actions / test (stable)

1 statement(s) are not covered by tests.

func (f *filters) addFilter(pass bool, and ...[]byte) {
if len(and) == 0 {
return
}

Check notice on line 51 in cmd/catp/catp/filter.go

View workflow job for this annotation

GitHub Actions / test (stable)

1 statement(s) on lines 49:51 are not covered by tests.

var g *filterGroup

// Get current group if exists and has same pass, append new current group with new pass otherwise.
if len(f.g) != 0 {
g = f.g[len(f.g)-1]

if g.pass != pass {
g = &filterGroup{pass: pass}
f.g = append(f.g, g)
}
} else {
// Create and append the very first group.
g = &filterGroup{pass: pass}
f.g = append(f.g, g)
}

g.ors = append(g.ors, and)
}

func (f *filters) shouldWrite(line []byte) bool {
shouldWrite := true

for _, g := range f.g {
if g.pass {
shouldWrite = false
}

matched := g.match(line)

if matched {
return g.pass
}
}

return shouldWrite
}

func (g *filterGroup) match(line []byte) bool {
if g.pre != nil {
ids := g.pre.Match(line)
if len(ids) == 0 {
return false
}

Check notice on line 95 in cmd/catp/catp/filter.go

View workflow job for this annotation

GitHub Actions / test (stable)

3 statement(s) on lines 91:95 are not covered by tests.

for _, id := range ids {
or := g.ors[id]

andMatched := true

for _, and := range or {
if !bytes.Contains(line, and) {
andMatched = false

break

Check notice on line 106 in cmd/catp/catp/filter.go

View workflow job for this annotation

GitHub Actions / test (stable)

11 statement(s) are not covered by tests.
}
}

if andMatched {
return true
}

Check notice on line 112 in cmd/catp/catp/filter.go

View workflow job for this annotation

GitHub Actions / test (stable)

2 statement(s) are not covered by tests.
}

return false

Check notice on line 115 in cmd/catp/catp/filter.go

View workflow job for this annotation

GitHub Actions / test (stable)

1 statement(s) are not covered by tests.
}

for _, or := range g.ors {
andMatched := true

for _, and := range or {
if !bytes.Contains(line, and) {
andMatched = false

break
}
}

if andMatched {
return true
}
}

return false
}

func (g *filterGroup) buildIndex() {
if g.pre != nil {
return
}

Check notice on line 140 in cmd/catp/catp/filter.go

View workflow job for this annotation

GitHub Actions / test (stable)

1 statement(s) on lines 138:140 are not covered by tests.

if len(g.ors) < 5 {
return
}

indexItems := make([][]byte, 0, len(g.ors))
for i, or := range g.ors {
indexItems = append(indexItems, or[0])
g.ors[i] = or[1:]
}

Check notice on line 150 in cmd/catp/catp/filter.go

View workflow job for this annotation

GitHub Actions / test (stable)

4 statement(s) are not covered by tests.

g.pre = ahocorasick.NewMatcher(indexItems)
}
26 changes: 26 additions & 0 deletions cmd/catp/catp/filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package catp

import (
"bytes"
"os"
"testing"
)

func TestFilter_Match(t *testing.T) {
f := filters{}

f.addFilterString(false, "dbg")
f.addFilterString(true, "linux", "64")
f.addFilterString(true, "windows")

input, err := os.ReadFile("./testdata/release-assets.yml")
if err != nil {
t.Fatal(err)
}

for _, line := range bytes.Split(input, []byte("\n")) {
if f.shouldWrite(line) {
println(string(line))
}
}
}
Binary file modified cmd/catp/default.pgo
Binary file not shown.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.23.0
require (
github.com/DataDog/zstd v1.5.7
github.com/bool64/dev v0.2.40
github.com/cloudflare/ahocorasick v0.0.0-20240916140611-054963ec9396
github.com/klauspost/compress v1.18.0
github.com/klauspost/pgzip v1.2.6
golang.org/x/time v0.12.0
Expand Down
Loading
Loading