diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d5c9b332f..b2ff3209a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -41,7 +41,7 @@ jobs: fetch-depth: 1 - uses: WillAbides/setup-go-faster@v1.14.0 with: - go-version: "1.24" + go-version: "1.25" - name: Tidy Go modules run: go mod tidy - name: Run golang-ci-lint diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index a322e391a..17ce583a7 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -5,7 +5,7 @@ on: tags: - 'v*' env: - GO_VERSION: 1.24 + GO_VERSION: 1.25 jobs: goreleaser: diff --git a/go.mod b/go.mod index 81b4e061a..4f3e7e7b5 100644 --- a/go.mod +++ b/go.mod @@ -49,9 +49,9 @@ require ( go.opentelemetry.io/otel/metric v1.41.0 // indirect go.opentelemetry.io/otel/trace v1.41.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/net v0.51.0 - golang.org/x/sys v0.42.0 - golang.org/x/text v0.35.0 + golang.org/x/net v0.53.0 + golang.org/x/sys v0.43.0 + golang.org/x/text v0.36.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -178,7 +178,7 @@ require ( cloud.google.com/go/monitoring v1.16.3 github.com/AlekSi/pointer v1.2.0 github.com/DATA-DOG/go-sqlmock v1.5.2 - github.com/IBM/sarama v1.42.1 + github.com/IBM/sarama v1.48.0 github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible github.com/NVIDIA/go-dcgm v0.0.0-20240118201113-3385e277e49f github.com/NVIDIA/go-nvml v0.12.0-2 @@ -260,8 +260,7 @@ require ( github.com/digitalocean/godo v1.132.0 // indirect github.com/docker/go-connections v0.4.0 github.com/docker/go-units v0.5.0 // indirect - github.com/eapache/go-resiliency v1.4.0 // indirect - github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/edsrzf/mmap-go v1.2.0 // indirect github.com/envoyproxy/protoc-gen-validate v1.3.0 // indirect @@ -325,7 +324,7 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect github.com/julienschmidt/httprouter v1.3.0 // indirect - github.com/klauspost/compress v1.18.4 // indirect + github.com/klauspost/compress v1.18.5 // indirect github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b github.com/leodido/go-urn v1.2.4 // indirect github.com/linode/linodego v1.46.0 // indirect @@ -342,14 +341,14 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2 // indirect github.com/pelletier/go-toml/v2 v2.0.8 - github.com/pierrec/lz4/v4 v4.1.18 // indirect + github.com/pierrec/lz4/v4 v4.1.26 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect github.com/prometheus/alertmanager v0.28.0 // indirect github.com/prometheus/common/assets v0.2.0 // indirect github.com/prometheus/exporter-toolkit v0.13.2 github.com/prometheus/procfs v0.15.1 - github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da // indirect github.com/scaleway/scaleway-sdk-go v1.0.0-beta.30 // indirect github.com/shopspring/decimal v1.3.1 // indirect @@ -374,13 +373,13 @@ require ( go.uber.org/atomic v1.11.0 // indirect go.uber.org/automaxprocs v1.6.0 // indirect go.uber.org/goleak v1.3.0 // indirect - golang.org/x/crypto v0.49.0 - golang.org/x/mod v0.33.0 // indirect + golang.org/x/crypto v0.50.0 + golang.org/x/mod v0.34.0 // indirect golang.org/x/oauth2 v0.34.0 // indirect golang.org/x/sync v0.20.0 - golang.org/x/term v0.41.0 // indirect + golang.org/x/term v0.42.0 // indirect golang.org/x/time v0.15.0 // indirect - golang.org/x/tools v0.42.0 // indirect + golang.org/x/tools v0.43.0 // indirect google.golang.org/api v0.218.0 google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17 google.golang.org/grpc v1.79.3 diff --git a/go.sum b/go.sum index 724a3c7c9..42e1da02b 100644 --- a/go.sum +++ b/go.sum @@ -647,8 +647,8 @@ github.com/Code-Hex/go-generics-cache v1.5.1/go.mod h1:qxcC9kRVrct9rHeiYpFWSoW1v github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= -github.com/IBM/sarama v1.42.1 h1:wugyWa15TDEHh2kvq2gAy1IHLjEjuYOYgXz/ruC/OSQ= -github.com/IBM/sarama v1.42.1/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= +github.com/IBM/sarama v1.48.0 h1:9LJS0VNeg/boXxT/GLAMDKX6uSQ1mr/5F/j4v9gSeBQ= +github.com/IBM/sarama v1.48.0/go.mod h1:UhvwPF8zilmLOSd6O+ENzdycCJYwMww1U9DJOZpoCro= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible h1:1G1pk05UrOh0NlF1oeaaix1x8XzrfjIDK47TY0Zehcw= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= @@ -903,10 +903,8 @@ github.com/dop251/goja v0.0.0-20251103141225-af2ceb9156d7/go.mod h1:MxLav0peU43G github.com/dsnet/compress v0.0.1/go.mod h1:Aw8dCMJ7RioblQeTqt88akK31OvO8Dhf5JflhBbQEHo= github.com/dsnet/golib v0.0.0-20171103203638-1ea166775780/go.mod h1:Lj+Z9rebOhdfkVLjJ8T6VcRQv3SXugXy999NBtR9aFY= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0= -github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= -github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= -github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= +github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/ebitengine/purego v0.8.4 h1:CF7LEKg5FFOsASUj0+QwaXf8Ht6TlFxg09+S9wz0omw= @@ -957,8 +955,6 @@ github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2 github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= -github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= -github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= github.com/frankban/quicktest v1.11.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= github.com/frankban/quicktest v1.13.0/go.mod h1:qLE0fzW0VuyUAJgPU19zByoIr0HtCHN/r/VLSOOIySU= @@ -1422,8 +1418,8 @@ github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j github.com/klauspost/compress v1.4.1/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.10.10/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= -github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= +github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE= +github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ= github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= @@ -1658,8 +1654,8 @@ github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk github.com/phpdave11/gofpdi v1.0.13/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI= github.com/pierrec/lz4/v4 v4.0.3/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= -github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.26 h1:GrpZw1gZttORinvzBdXPUXATeqlJjqUG/D87TKMnhjY= +github.com/pierrec/lz4/v4 v4.1.26/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4= github.com/pion/dtls/v3 v3.1.2 h1:gqEdOUXLtCGW+afsBLO0LtDD8GnuBBjEy6HRtyofZTc= github.com/pion/dtls/v3 v3.1.2/go.mod h1:Hw/igcX4pdY69z1Hgv5x7wJFrUkdgHwAn/Q/uo7YHRo= github.com/pion/logging v0.2.4 h1:tTew+7cmQ+Mc1pTBLKH2puKsOvhm32dROumOZ655zB8= @@ -1727,8 +1723,8 @@ github.com/prometheus/prometheus v0.302.0/go.mod h1:YcyCoTbUR/TM8rY3Aoeqr0AWTu/p github.com/prometheus/sigv4 v0.1.1 h1:UJxjOqVcXctZlwDjpUpZ2OiMWJdFijgSofwLzO1Xk0Q= github.com/prometheus/sigv4 v0.1.1/go.mod h1:RAmWVKqx0bwi0Qm4lrKMXFM0nhpesBcenfCtz9qRyH8= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= -github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= -github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg= +github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= @@ -2021,8 +2017,8 @@ golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= -golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4= -golang.org/x/crypto v0.49.0/go.mod h1:ErX4dUh2UM+CFYiXZRTcMpEcN8b/1gxEuv3nODoYtCA= +golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= +golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -2083,8 +2079,8 @@ golang.org/x/mod v0.6.0/go.mod h1:4mET923SAdbXp2ki8ey+zGs1SLqsuM2Y0uvdZR/fUNI= golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8= -golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w= +golang.org/x/mod v0.34.0 h1:xIHgNUUnW6sYkcM5Jleh05DvLOtwc6RitGHbDk4akRI= +golang.org/x/mod v0.34.0/go.mod h1:ykgH52iCZe79kzLLMhyCUzhMci+nQj+0XkbXpNYtVjY= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -2154,8 +2150,8 @@ golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= -golang.org/x/net v0.51.0 h1:94R/GTO7mt3/4wIKpcR5gkGmRLOuE/2hNGeWq/GBIFo= -golang.org/x/net v0.51.0/go.mod h1:aamm+2QF5ogm02fjy5Bb7CQ0WMt1/WVM7FtyaTLlA9Y= +golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= +golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -2311,8 +2307,8 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= -golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= +golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -2326,8 +2322,8 @@ golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU= golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= -golang.org/x/term v0.41.0 h1:QCgPso/Q3RTJx2Th4bDLqML4W6iJiaXFq2/ftQF13YU= -golang.org/x/term v0.41.0/go.mod h1:3pfBgksrReYfZ5lvYM0kSO0LIkAl4Yl2bXOkKP7Ec2A= +golang.org/x/term v0.42.0 h1:UiKe+zDFmJobeJ5ggPwOshJIVt6/Ft0rcfrXZDLWAWY= +golang.org/x/term v0.42.0/go.mod h1:Dq/D+snpsbazcBG5+F9Q1n2rXV8Ma+71xEjTRufARgY= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -2345,8 +2341,8 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8= -golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA= +golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg= +golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -2430,8 +2426,8 @@ golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA= golang.org/x/tools v0.3.0/go.mod h1:/rWhSS2+zyEVwoJf8YAX6L2f0ntZ7Kn/mGgAWcipA5k= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s= -golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k= -golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0= +golang.org/x/tools v0.43.0 h1:12BdW9CeB3Z+J/I/wj34VMl8X+fEXBxVR90JeMX5E7s= +golang.org/x/tools v0.43.0/go.mod h1:uHkMso649BX2cZK6+RpuIPXS3ho2hZo4FVwfoy1vIk0= golang.org/x/tools/godoc v0.1.0-deprecated h1:o+aZ1BOj6Hsx/GBdJO/s815sqftjSnrZZwyYTHODvtk= golang.org/x/tools/godoc v0.1.0-deprecated/go.mod h1:qM63CriJ961IHWmnWa9CjZnBndniPt4a3CK0PVB9bIg= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/logs/client/kafka/destination.go b/logs/client/kafka/destination.go index a5480b2ed..578549e2e 100644 --- a/logs/client/kafka/destination.go +++ b/logs/client/kafka/destination.go @@ -5,6 +5,7 @@ package kafka import ( "context" "errors" + "fmt" "log" "strings" "sync" @@ -56,11 +57,11 @@ type Destination struct { // If `maxConcurrentBackgroundSends` > 0, then at most that many background payloads will be sent concurrently, else // there is no concurrency and the background sending pipeline will block while sending each payload. // TODO: add support for SOCKS5 -func NewDestination(endpoint logsconfig.Endpoint, contentType string, destinationsContext *client.DestinationsContext, maxConcurrentBackgroundSends int) *Destination { +func NewDestination(endpoint logsconfig.Endpoint, contentType string, destinationsContext *client.DestinationsContext, maxConcurrentBackgroundSends int) (*Destination, error) { return newDestination(endpoint, contentType, destinationsContext, time.Duration(coreconfig.ClientTimeout())*time.Second, maxConcurrentBackgroundSends) } -func newDestination(endpoint logsconfig.Endpoint, contentType string, destinationsContext *client.DestinationsContext, timeout time.Duration, maxConcurrentBackgroundSends int) *Destination { +func newDestination(endpoint logsconfig.Endpoint, contentType string, destinationsContext *client.DestinationsContext, timeout time.Duration, maxConcurrentBackgroundSends int) (*Destination, error) { if maxConcurrentBackgroundSends < 0 { maxConcurrentBackgroundSends = 0 } @@ -129,7 +130,7 @@ func newDestination(endpoint logsconfig.Endpoint, contentType string, destinatio var err error coreconfig.Config.Logs.Net.TLS.Config, err = coreconfig.Config.Logs.KafkaConfig.ClientConfig.TLSConfig() if err != nil { - panic(err) + return nil, fmt.Errorf("kafka TLS config: %w", err) } } if coreconfig.Config.Logs.SaslEnable { @@ -166,7 +167,7 @@ func newDestination(endpoint logsconfig.Endpoint, contentType string, destinatio brokers := strings.Split(endpoint.Addr, ",") c, err := New(typ, brokers, coreconfig.Config.Logs.Config) if err != nil { - panic(err) + return nil, fmt.Errorf("kafka producer: %w", err) } d := &Destination{ @@ -182,7 +183,7 @@ func newDestination(endpoint logsconfig.Endpoint, contentType string, destinatio protocol: endpoint.Protocol, origin: endpoint.Origin, } - return d + return d, nil } func (d *Destination) Close() { @@ -274,10 +275,10 @@ func (d *Destination) sendInBackground(payloadChan chan []byte) { break } d.climit <- struct{}{} - go func() { + util.SafeGo("logs/kafka/asyncSend", func() { + defer func() { <-d.climit }() d.unconditionalSend(payload) //nolint:errcheck - <-d.climit - }() + }, nil) case <-ctx.Done(): return } @@ -293,4 +294,4 @@ func (d *Destination) waitForBackoff() { ctx, cancel := context.WithDeadline(d.destinationsContext.Context(), d.blockedUntil) defer cancel() <-ctx.Done() -} +} \ No newline at end of file diff --git a/logs/client/kafka/producer.go b/logs/client/kafka/producer.go index 2d323e28a..2e8407fb8 100644 --- a/logs/client/kafka/producer.go +++ b/logs/client/kafka/producer.go @@ -1,8 +1,11 @@ +//go:build !no_logs + package kafka import ( "fmt" "log" + "time" "github.com/IBM/sarama" @@ -45,8 +48,8 @@ func New(typ string, brokers []string, config *sarama.Config) (Producer, error) asyncProducer: p, stop: stop, } - go apw.errorWorker() - go apw.successWorker() + util.SafeGoWithRestart("logs/kafka/errorWorker", apw.errorWorker, 5*time.Second, apw.stop, nil) + util.SafeGoWithRestart("logs/kafka/successWorker", apw.successWorker, 5*time.Second, apw.stop, nil) return apw, nil case SyncProducer: p, err := sarama.NewSyncProducer(brokers, config) @@ -105,4 +108,4 @@ func (p *SyncProducerWrapper) Send(msg *sarama.ProducerMessage) error { func (p *SyncProducerWrapper) Close() error { close(p.stop) return p.syncProducer.Close() -} +} \ No newline at end of file diff --git a/logs/input/file/scanner.go b/logs/input/file/scanner.go index 34fcec109..4d858d6f9 100644 --- a/logs/input/file/scanner.go +++ b/logs/input/file/scanner.go @@ -75,13 +75,13 @@ func NewScanner(sources *logsconfig.LogSources, tailingLimit int, pipelineProvid // Start starts the Scanner func (s *Scanner) Start() { - go s.run() + util.SafeGoWithRestart("logs/scanner", s.run, 5*time.Second, s.stop, nil) } // Stop stops the Scanner and its tailers in parallel, // this call returns only when all the tailers are stopped func (s *Scanner) Stop() { - s.stop <- struct{}{} + close(s.stop) s.cleanup() } @@ -386,4 +386,4 @@ func (s *Scanner) createTailer(file *File, outputChan chan *message.Message) *Ta func (s *Scanner) createRotatedTailer(file *File, outputChan chan *message.Message, pattern *regexp.Regexp) *Tailer { return NewTailer(outputChan, file, s.tailerSleepDuration, NewDecoderFromSourceWithPattern(file.Source, pattern)) -} +} \ No newline at end of file diff --git a/logs/input/file/tailer.go b/logs/input/file/tailer.go index fb7d0e19a..bafb77b45 100644 --- a/logs/input/file/tailer.go +++ b/logs/input/file/tailer.go @@ -26,6 +26,7 @@ import ( "flashcat.cloud/categraf/logs/message" "flashcat.cloud/categraf/logs/parser" "flashcat.cloud/categraf/logs/tag" + "flashcat.cloud/categraf/logs/util" ) // DefaultSleepDuration represents the amount of time the tailer waits before reading new data when no data is received @@ -159,9 +160,9 @@ func (t *Tailer) Start(offset int64, whence int) error { t.file.Source.Status.Success() t.file.Source.AddInput(t.file.Path) - go t.forwardMessages() + util.SafeGo("logs/tailer/forward", t.forwardMessages, nil) t.decoder.Start() - go t.readForever() + util.SafeGo("logs/tailer/read", t.readForever, nil) return nil } diff --git a/logs/pipeline/pipeline.go b/logs/pipeline/pipeline.go index e2db7ba81..83127e9a3 100644 --- a/logs/pipeline/pipeline.go +++ b/logs/pipeline/pipeline.go @@ -9,6 +9,8 @@ package pipeline import ( "context" + "fmt" + "log" coreconfig "flashcat.cloud/categraf/config" logsconfig "flashcat.cloud/categraf/config/logs" @@ -30,7 +32,7 @@ type Pipeline struct { } // NewPipeline returns a new Pipeline -func NewPipeline(outputChan chan *message.Message, processingRules []*logsconfig.ProcessingRule, endpoints *logsconfig.Endpoints, destinationsContext *client.DestinationsContext, diagnosticMessageReceiver diagnostic.MessageReceiver, serverless bool) *Pipeline { +func NewPipeline(outputChan chan *message.Message, processingRules []*logsconfig.ProcessingRule, endpoints *logsconfig.Endpoints, destinationsContext *client.DestinationsContext, diagnosticMessageReceiver diagnostic.MessageReceiver, serverless bool) (*Pipeline, error) { var ( destinations *client.Destinations strategy sender.Strategy @@ -47,10 +49,18 @@ func NewPipeline(outputChan chan *message.Message, processingRules []*logsconfig strategy = sender.NewBatchStrategy(sender.ArraySerializer, endpoints.BatchWait, endpoints.BatchMaxConcurrentSend, endpoints.BatchMaxSize, endpoints.BatchMaxContentSize, "logs") encoder = processor.JSONEncoder case "kafka": - main := kafka.NewDestination(endpoints.Main, http.JSONContentType, destinationsContext, endpoints.BatchMaxConcurrentSend) + main, err := kafka.NewDestination(endpoints.Main, http.JSONContentType, destinationsContext, endpoints.BatchMaxConcurrentSend) + if err != nil { + return nil, fmt.Errorf("kafka main destination: %w", err) + } additionals := []client.Destination{} for _, endpoint := range endpoints.Additionals { - additionals = append(additionals, kafka.NewDestination(endpoint, http.JSONContentType, destinationsContext, endpoints.BatchMaxConcurrentSend)) + d, err := kafka.NewDestination(endpoint, http.JSONContentType, destinationsContext, endpoints.BatchMaxConcurrentSend) + if err != nil { + log.Printf("E! kafka additional destination: %v (topic: %s) failed to initialize: %v", endpoint.Addr, endpoint.Topic, err) + continue + } + additionals = append(additionals, d) } destinations = client.NewDestinations(main, additionals) strategy = sender.StreamStrategy @@ -64,6 +74,8 @@ func NewPipeline(outputChan chan *message.Message, processingRules []*logsconfig destinations = client.NewDestinations(main, additionals) strategy = sender.StreamStrategy encoder = processor.RawEncoder + default: + return nil, fmt.Errorf("unsupported endpoint type: %s", endpoints.Type) } senderChan := make(chan *message.Message, coreconfig.ChanSize()) @@ -80,7 +92,7 @@ func NewPipeline(outputChan chan *message.Message, processingRules []*logsconfig InputChan: inputChan, processor: processor, sender: sender, - } + }, nil } // Start launches the pipeline @@ -99,4 +111,4 @@ func (p *Pipeline) Stop() { func (p *Pipeline) Flush(ctx context.Context) { p.processor.Flush(ctx) // flush messages in the processor into the sender p.sender.Flush(ctx) // flush the sender -} +} \ No newline at end of file diff --git a/logs/pipeline/provider.go b/logs/pipeline/provider.go index 3d4950ad0..726bef9d6 100644 --- a/logs/pipeline/provider.go +++ b/logs/pipeline/provider.go @@ -9,6 +9,7 @@ package pipeline import ( "context" + "log" "sync/atomic" "flashcat.cloud/categraf/logs/diagnostic" @@ -18,6 +19,7 @@ import ( "flashcat.cloud/categraf/logs/client" "flashcat.cloud/categraf/logs/message" "flashcat.cloud/categraf/logs/restart" + "flashcat.cloud/categraf/logs/util" ) // Provider provides message channels @@ -43,6 +45,7 @@ type provider struct { destinationsContext *client.DestinationsContext serverless bool + dropChan chan *message.Message } // NewProvider returns a new Provider @@ -74,10 +77,22 @@ func (p *provider) Start() { p.outputChan = p.auditor.Channel() for i := 0; i < p.numberOfPipelines; i++ { - pipeline := NewPipeline(p.outputChan, p.processingRules, p.endpoints, p.destinationsContext, p.diagnosticMessageReceiver, p.serverless) + pipeline, err := NewPipeline(p.outputChan, p.processingRules, p.endpoints, p.destinationsContext, p.diagnosticMessageReceiver, p.serverless) + if err != nil { + log.Printf("E! failed to create pipeline %d: %v", i, err) + continue + } pipeline.Start() p.pipelines = append(p.pipelines, pipeline) } + if len(p.pipelines) == 0 { + log.Printf("E! all %d pipelines failed to initialize, log collection is disabled", p.numberOfPipelines) + p.dropChan = make(chan *message.Message, 1000) + util.SafeGo("logs/provider/drop", func() { + for range p.dropChan { + } + }, nil) + } } // Stop stops all pipelines in parallel, @@ -90,13 +105,17 @@ func (p *provider) Stop() { stopper.Stop() p.pipelines = p.pipelines[:0] p.outputChan = nil + if p.dropChan != nil { + close(p.dropChan) + p.dropChan = nil + } } // NextPipelineChan returns the next pipeline input channel func (p *provider) NextPipelineChan() chan *message.Message { pipelinesLen := len(p.pipelines) if pipelinesLen == 0 { - return nil + return p.dropChan } index := int(p.currentPipelineIndex+1) % pipelinesLen defer atomic.StoreInt32(&p.currentPipelineIndex, int32(index)) @@ -114,4 +133,4 @@ func (p *provider) Flush(ctx context.Context) { p.Flush(ctx) } } -} +} \ No newline at end of file diff --git a/logs/processor/processor.go b/logs/processor/processor.go index 7268728ac..1628cadde 100644 --- a/logs/processor/processor.go +++ b/logs/processor/processor.go @@ -11,6 +11,7 @@ import ( "context" "log" "sync" + "time" logsconfig "flashcat.cloud/categraf/config/logs" "flashcat.cloud/categraf/logs/diagnostic" @@ -26,6 +27,7 @@ type Processor struct { processingRules []*logsconfig.ProcessingRule encoder Encoder done chan struct{} + stop chan struct{} diagnosticMessageReceiver diagnostic.MessageReceiver mu sync.Mutex } @@ -38,18 +40,22 @@ func New(inputChan, outputChan chan *message.Message, processingRules []*logscon processingRules: processingRules, encoder: encoder, done: make(chan struct{}), + stop: make(chan struct{}), diagnosticMessageReceiver: diagnosticMessageReceiver, } } // Start starts the Processor. func (p *Processor) Start() { - go p.run() + util.SafeGoWithRestart("logs/processor", p.run, 5*time.Second, p.stop, func() { + close(p.done) + }) } // Stop stops the Processor, // this call blocks until inputChan is flushed func (p *Processor) Stop() { + close(p.stop) close(p.inputChan) <-p.done } @@ -74,9 +80,6 @@ func (p *Processor) Flush(ctx context.Context) { // run starts the processing of the inputChan func (p *Processor) run() { - defer func() { - p.done <- struct{}{} - }() for msg := range p.inputChan { p.processMessage(msg) p.mu.Lock() // block here if we're trying to flush synchronously @@ -123,4 +126,4 @@ func (p *Processor) applyRedactingRules(msg *message.Message) (bool, []byte) { } } return true, content -} +} \ No newline at end of file diff --git a/logs/sender/sender.go b/logs/sender/sender.go index 4e905e091..6a89c592e 100644 --- a/logs/sender/sender.go +++ b/logs/sender/sender.go @@ -9,9 +9,11 @@ package sender import ( "context" + "time" "flashcat.cloud/categraf/logs/client" "flashcat.cloud/categraf/logs/message" + "flashcat.cloud/categraf/logs/util" ) // Strategy should contain all logic to send logs to a remote destination @@ -28,6 +30,7 @@ type Sender struct { destinations *client.Destinations strategy Strategy done chan struct{} + stop chan struct{} } // NewSender returns a new sender. @@ -38,17 +41,21 @@ func NewSender(inputChan chan *message.Message, outputChan chan *message.Message destinations: destinations, strategy: strategy, done: make(chan struct{}), + stop: make(chan struct{}), } } // Start starts the sender. func (s *Sender) Start() { - go s.run() + util.SafeGoWithRestart("logs/sender", s.run, 5*time.Second, s.stop, func() { + close(s.done) + }) } // Stop stops the sender, // this call blocks until inputChan is flushed func (s *Sender) Stop() { + close(s.stop) close(s.inputChan) <-s.done s.destinations.Close() @@ -60,9 +67,6 @@ func (s *Sender) Flush(ctx context.Context) { } func (s *Sender) run() { - defer func() { - s.done <- struct{}{} - }() s.strategy.Send(s.inputChan, s.outputChan, s.send) } @@ -95,4 +99,4 @@ func (s *Sender) send(payload []byte) error { // shouldStopSending returns true if a component should stop sending logs. func shouldStopSending(err error) bool { return err == context.Canceled -} +} \ No newline at end of file diff --git a/logs/util/recover.go b/logs/util/recover.go new file mode 100644 index 000000000..de8900fcd --- /dev/null +++ b/logs/util/recover.go @@ -0,0 +1,94 @@ +//go:build !no_logs + +package util + +import ( + "log" + "runtime/debug" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +var logsPipelinePanicTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "categraf_logs_pipeline_panic_total", + Help: "Total number of recovered panics in the logs pipeline.", + }, + []string{"component"}, +) + +func init() { + prometheus.MustRegister(logsPipelinePanicTotal) +} + +// SafeGo runs fn in a new goroutine with panic recovery. +// On panic it increments the panic counter for the given component, +// logs the stack trace, and calls onPanic if non-nil. +func SafeGo(component string, fn func(), onPanic func()) { + go func() { + defer func() { + if r := recover(); r != nil { + logsPipelinePanicTotal.WithLabelValues(component).Inc() + log.Printf("E! [%s] panic recovered: %v\n%s", component, r, debug.Stack()) + if onPanic != nil { + func() { + defer func() { + if onPanicErr := recover(); onPanicErr != nil { + log.Printf("E! [%s] panic in onPanic callback: %v\n%s", component, onPanicErr, debug.Stack()) + } + }() + onPanic() + }() + } + } + }() + fn() + }() +} + +// SafeGoWithRestart runs fn in a new goroutine with panic recovery and +// automatic restart after a backoff delay. +// If stopChan is not nil, it will abort restart if stopChan is closed. +// onDone is called when fn exits naturally, or when restarts are aborted. +func SafeGoWithRestart(component string, fn func(), backoff time.Duration, stopChan chan struct{}, onDone func()) { + go func() { + if onDone != nil { + defer func() { + defer func() { + if r := recover(); r != nil { + logsPipelinePanicTotal.WithLabelValues(component).Inc() + log.Printf("E! [%s] onDone panic recovered: %v\n%s", component, r, debug.Stack()) + } + }() + onDone() + }() + } + for { + panicked := true + ch := make(chan struct{}) + SafeGo(component, func() { + defer close(ch) + fn() + panicked = false + }, nil) + <-ch // wait for fn to finish or panic + + if !panicked { + return // exited naturally + } + + log.Printf("W! [%s] restarting after %v backoff", component, backoff) + if stopChan != nil { + select { + case <-time.After(backoff): + case <-stopChan: + log.Printf("I! [%s] shutdown signal received, aborting restart", component) + return + } + } else { + time.Sleep(backoff) + } + } + }() +}