Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ run:
linters:
default: all
disable:
- funlen
- gocyclo
- gocognit
- err113
- embeddedstructfieldcheck
- testpackage
- noinlineerr
Expand Down
39 changes: 33 additions & 6 deletions cmd/catp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ catp [OPTIONS] PATH ...
write first 10 seconds of CPU profile to file
-dbg-mem-prof string
write heap profile to file after 10 seconds
-end-line int
stop printing lines at this line (exclusive),
default is 0 (no limit), each input file is counted separately
-l count lines
-no-progress
disable progress printing
Expand Down Expand Up @@ -57,20 +60,25 @@ catp [OPTIONS] PATH ...
write current progress to a file
-rate-limit float
output rate limit lines per second
-save-matches value
save matches of previous filter group to file
-skip value
filter matching, may contain multiple AND patterns separated by ^,
if filter matches, line is removed from the output (may be kept if it passed preceding -pass)
for example, you can use "-skip quux^baz -skip fooO" to skip lines that have (quux AND baz) OR fooO
-skip-csv value
filter matching, loads skip params from CSV file,
each line is treated as -skip, each column value is AND condition.
-start-line int
start printing lines from this line (inclusive),
default is 0 (first line), each input file is counted separately
-version
print version and exit
```

## Examples

Feed a file into `jq` field extractor with progress printing.
### Feed a file into `jq` field extractor with progress printing

```
catp get-key.log | jq .context.callback.Data.Nonce > get-key.jq
Expand All @@ -84,11 +92,13 @@ get-key.log: 96.8% bytes read, 967819 lines processed, 8064.9 l/s, 41.8 MB/s, el
get-key.log: 100.0% bytes read, 1000000 lines processed, 8065.7 l/s, 41.8 MB/s, elapsed 2m3.98s, remaining 0s
```

### Parallel scan of multiple files

Run log filtering (lines containing `foo bar` or `baz`) on multiple files in background (with `screen`) and output to a
new compressed file.

```
screen -dmS foo12 ./catp -output ~/foo-2023-07-12.log.zst -pass "foo bar" -pass "baz" /home/logs/server-2023-07-12*
screen -dmS foo12 ./catp -parallel 20 -output ~/foo-2023-07-12.log.zst -pass "foo bar" -pass "baz" /home/logs/server-2023-07-12*
```

```
Expand All @@ -108,14 +118,31 @@ all: 32.3% bytes read, /home/logs/server-2023-07-12-09-00.log_6.zst: 5.1% bytes
# detaching from screen with ctrl+a+d
```

Filter based on large list of needles. Values from allow and block lists are loaded into high-performance
[Aho Corasick](https://en.wikipedia.org/wiki/Aho%E2%80%93Corasick_algorithm) indexes.
### Filter based on large list of needles

Values from allow and block lists are loaded into high-performance
[Aho Corasick](https://en.wikipedia.org/wiki/Aho%E2%80%93Corasick_algorithm) indexes.

```
catp -pass-csv allowlist.csv -skip-csv blocklist.csv -pass-any -output filtered.log.zst source.log.zst
```

Each source line would follow the filtering pipeline:
* if `allowlist.csv` has at least one row, all cells of which are present in the source line, source line gets into output
* if not, but if `blocklist.csv` has at least one row, all cells of which are present in the source line, source line is skipped

* if `allowlist.csv` has at least one row, all cells of which are present in the source line, source line gets into
output
* if not, but if `blocklist.csv` has at least one row, all cells of which are present in the source line, source line is
skipped
* if not, source line gets into output because of `-pass-any`

### Split matches into separate files

```
catp -pass foo -save-matches foo.log.zst -pass bar^baz -save-matches 2.gz -pass qux -pass quux -output other.log input.log
```

Pipeline:
* each line from `input.log` is being read
* lines that contain `foo` are stored to `foo.log.zst`
* lines that contain `bar` and `baz` (but not `foo` that was already matched) are stored to `2.gz`
* lines that contain `qux` or `quux` are stored to `other.log`
105 changes: 44 additions & 61 deletions cmd/catp/catp/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
"errors"
"flag"
"fmt"
"io"
"log"
"os"
"os/signal"
Expand All @@ -21,13 +20,21 @@

"github.com/bool64/dev/version"
"github.com/bool64/progress"
gzip "github.com/klauspost/pgzip"
)

// Main is the entry point for catp CLI tool.
func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,gocyclo,maintidx
r := &runner{}

var closers []func() error
defer func() {
for _, closer := range closers {
if err := closer(); err != nil {
log.Printf("failed to close: %s\n", err.Error())
}

Check notice on line 34 in cmd/catp/catp/app.go

View workflow job for this annotation

GitHub Actions / test (stable)

1 statement(s) on lines 32:34 are not covered by tests.
}
}()

flag.Var(flagFunc(func(v string) error {
r.filters.addFilter(true, bytes.Split([]byte(v), []byte("^"))...)

Expand Down Expand Up @@ -62,6 +69,17 @@
"if filter matches, line is removed from the output (may be kept if it passed preceding -pass)\n"+
"for example, you can use \"-skip quux^baz -skip fooO\" to skip lines that have (quux AND baz) OR fooO")

flag.Var(flagFunc(func(v string) error {
w, closer, err := makeWriter(v)
if err != nil {
return err
}

Check notice on line 76 in cmd/catp/catp/app.go

View workflow job for this annotation

GitHub Actions / test (stable)

3 statement(s) on lines 72:76 are not covered by tests.

closers = append(closers, closer)

return r.filters.saveTo(w)

Check notice on line 80 in cmd/catp/catp/app.go

View workflow job for this annotation

GitHub Actions / test (stable)

2 statement(s) are not covered by tests.
}), "save-matches", "save matches of previous filter group to file")

flag.IntVar(&r.parallel, "parallel", 1, "number of parallel readers if multiple files are provided\n"+
"lines from different files will go to output simultaneously (out of order of files, but in order of lines in each file)\n"+
"use 0 for multi-threaded zst decoder (slightly faster at cost of more CPU)")
Expand All @@ -79,8 +97,13 @@
"files will be written to out dir with original base names\n"+
"disables output flag")

flag.IntVar(&r.startLine, "start-line", 0, "start printing lines from this line (inclusive),\n"+
"default is 0 (first line), each input file is counted separately")
flag.IntVar(&r.endLine, "end-line", 0, "stop printing lines at this line (exclusive),\n"+
"default is 0 (no limit), each input file is counted separately")

flag.Usage = func() {
fmt.Println("catp", version.Module("github.com/bool64/progress").Version+",",
fmt.Println("catp", version.Module("github.com/bool64/progress").Version+r.options.VersionLabel+",",

Check notice on line 106 in cmd/catp/catp/app.go

View workflow job for this annotation

GitHub Actions / test (stable)

7 statement(s) on lines 105:117 are not covered by tests.
version.Info().GoVersion, strings.Join(versionExtra, " "))
fmt.Println()
fmt.Println("catp prints contents of files to STDOUT or dir/file output, \n" +
Expand All @@ -94,20 +117,6 @@
}
flag.Parse()

r.filters.buildIndex()

if *ver {
fmt.Println(version.Module("github.com/bool64/progress").Version)

return nil
}

if flag.NArg() == 0 {
flag.Usage()

return nil
}

if *cpuProfile != "" {
startProfiling(*cpuProfile, *memProfile)

Expand All @@ -122,6 +131,20 @@
}
}

r.filters.buildIndex()

if *ver {
fmt.Println(version.Module("github.com/bool64/progress").Version + r.options.VersionLabel)

return nil
}

Check notice on line 140 in cmd/catp/catp/app.go

View workflow job for this annotation

GitHub Actions / test (stable)

2 statement(s) on lines 136:140 are not covered by tests.

if flag.NArg() == 0 {
flag.Usage()

return nil
}

Check notice on line 146 in cmd/catp/catp/app.go

View workflow job for this annotation

GitHub Actions / test (stable)

2 statement(s) on lines 142:146 are not covered by tests.

var files []string

args := flag.Args()
Expand Down Expand Up @@ -158,61 +181,21 @@
sort.Strings(files)

if *output != "" && r.outDir == "" {
fn := *output

out, err := os.Create(fn) //nolint:gosec
w, closer, err := makeWriter(*output)
if err != nil {
return fmt.Errorf("failed to create output file %s: %w", fn, err)
return err

Check notice on line 186 in cmd/catp/catp/app.go

View workflow job for this annotation

GitHub Actions / test (stable)

1 statement(s) on lines 185:187 are not covered by tests.
}

r.output = out
compCloser := io.Closer(io.NopCloser(nil))

switch {
case strings.HasSuffix(fn, ".gz"):
gw := gzip.NewWriter(r.output)
compCloser = gw

r.output = gw
case strings.HasSuffix(fn, ".zst"):
zw, err := zstdWriter(r.output)
if err != nil {
return fmt.Errorf("zstd new writer: %w", err)
}

compCloser = zw

r.output = zw
}

w := bufio.NewWriterSize(r.output, 64*1024)
r.output = w

defer func() {
if err := w.Flush(); err != nil {
log.Fatalf("failed to flush STDOUT buffer: %s", err)
}

if err := compCloser.Close(); err != nil {
log.Fatalf("failed to close compressor: %s", err)
}

if err := out.Close(); err != nil {
log.Fatalf("failed to close output file %s: %s", *output, err)
}
}()
closers = append(closers, closer)
} else {
if isStdin {
r.output = os.Stdout
} else {
w := bufio.NewWriterSize(os.Stdout, 64*1024)
r.output = w

defer func() {
if err := w.Flush(); err != nil {
log.Fatalf("failed to flush STDOUT buffer: %s", err)
}
}()
closers = append(closers, w.Flush)

Check notice on line 198 in cmd/catp/catp/app.go

View workflow job for this annotation

GitHub Actions / test (stable)

3 statement(s) on lines 195:199 are not covered by tests.
}
}

Expand Down
Loading
Loading