From 558bc1734a4ef488fbf05e1c01c11e6e2c8c89f0 Mon Sep 17 00:00:00 2001 From: ParsaJR Date: Fri, 12 Dec 2025 15:08:11 +0330 Subject: [PATCH 1/9] fix: race condition closes #2 --- main.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index 15712b5..92a4e76 100644 --- a/main.go +++ b/main.go @@ -7,16 +7,21 @@ import ( "netexp/netdev" "netexp/pipeline" "os" + "sync" "time" ) var ( version = "0.3.8" - metrics []byte listen string getver bool ) +var ( + mu sync.RWMutex // guards the metrics + metrics []byte +) + func main() { // get options from flags flag.StringVar(&listen, "listen", ":9298", "network address to listen on") @@ -45,6 +50,8 @@ func serve() { }) http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { + mu.RLock() + defer mu.RUnlock() w.Write(metrics) }) @@ -72,7 +79,9 @@ func gather() { panic(fmt.Errorf("could not get traffic: %w", err)) } + mu.Lock() metrics = p.Step(recv, trns) + mu.Unlock() time.Sleep(time.Second) } From ea632f340ee725f11e1ea1bcf30d6b72a64ff230 Mon Sep 17 00:00:00 2001 From: demurky Date: Fri, 12 Dec 2025 20:27:22 +0330 Subject: [PATCH 2/9] merge the math package into the series package --- pipeline/pipeline.go | 9 ++++----- {math => series}/math.go | 2 +- {math => series}/math_test.go | 8 ++++---- 3 files changed, 9 insertions(+), 10 deletions(-) rename {math => series}/math.go (96%) rename {math => series}/math_test.go (88%) diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index bfcbc6b..d1f5ae5 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -2,7 +2,6 @@ package pipeline import ( "fmt" - "netexp/math" "netexp/series" ) @@ -53,8 +52,8 @@ func (p *Pipeline) Step(recv, trns int64) []byte { for _, r := range p.ranges { // Check if there exists enough sample data for the current duration range 'r' if p.trns_series.Length() >= r+1 { - trns_rate := math.Rate(p.trns_series.Samples, r+1) - recv_rate := math.Rate(p.recv_series.Samples, r+1) + trns_rate := series.Rate(p.trns_series.Samples, r+1) + recv_rate := series.Rate(p.recv_series.Samples, r+1) p.trns_rates_series[r].Record(trns_rate) p.recv_rates_series[r].Record(recv_rate) @@ -69,8 +68,8 @@ func (p *Pipeline) Step(recv, trns int64) []byte { if m > r && p.trns_rates_series[r].Length() >= m { trns_name := fmt.Sprintf("netexp_transmit_rate_%ds_max_%ds_bps", r, m) recv_name := fmt.Sprintf("netexp_receive_rate_%ds_max_%ds_bps", r, m) - register(trns_name, math.Max(p.trns_rates_series[r].Samples, m)) - register(recv_name, math.Max(p.recv_rates_series[r].Samples, m)) + register(trns_name, series.Max(p.trns_rates_series[r].Samples, m)) + register(recv_name, series.Max(p.recv_rates_series[r].Samples, m)) } } } diff --git a/math/math.go b/series/math.go similarity index 96% rename from math/math.go rename to series/math.go index 6814f5c..28575e9 100644 --- a/math/math.go +++ b/series/math.go @@ -1,4 +1,4 @@ -package math +package series func Max(series []int64, head int) int64 { var max int64 diff --git a/math/math_test.go b/series/math_test.go similarity index 88% rename from math/math_test.go rename to series/math_test.go index 5c67954..a06cd1e 100644 --- a/math/math_test.go +++ b/series/math_test.go @@ -1,7 +1,7 @@ -package math_test +package series_test import ( - "netexp/math" + "netexp/series" "strconv" "testing" ) @@ -20,7 +20,7 @@ func TestMax(t *testing.T) { for i, tc := range tests { t.Run(strconv.Itoa(i), func(t *testing.T) { - got := math.Max(tc.series, tc.head) + got := series.Max(tc.series, tc.head) if got != tc.want { t.Errorf("incorrect max; got %d want %d", got, tc.want) } @@ -42,7 +42,7 @@ func TestRate(t *testing.T) { for i, tc := range tests { t.Run(strconv.Itoa(i), func(t *testing.T) { - got := math.Rate(tc.series, tc.head) + got := series.Rate(tc.series, tc.head) if got != tc.want { t.Errorf("incorrect rate; got %d want %d", got, tc.want) } From f07ef2d2876a3af0c702741b016a2eec0824003d Mon Sep 17 00:00:00 2001 From: demurky Date: Fri, 12 Dec 2025 20:43:34 +0330 Subject: [PATCH 3/9] remove docker support --- .dockerignore | 4 --- .github/workflows/docker.yml | 63 ------------------------------------ Dockerfile | 38 ---------------------- 3 files changed, 105 deletions(-) delete mode 100644 .dockerignore delete mode 100644 .github/workflows/docker.yml delete mode 100644 Dockerfile diff --git a/.dockerignore b/.dockerignore deleted file mode 100644 index e8520ba..0000000 --- a/.dockerignore +++ /dev/null @@ -1,4 +0,0 @@ -netexp -Dockerfile -README.md -LICENSE diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml deleted file mode 100644 index 8608b6d..0000000 --- a/.github/workflows/docker.yml +++ /dev/null @@ -1,63 +0,0 @@ -# build and publish the project's docker image - -name: Docker Build - -on: - create: - push: - branches: [ main ] - tags: [ '*' ] - pull_request: - branches: [ main ] - -env: - REGISTRY: ghcr.io - IMAGE_NAME: ${{ github.repository }} - -jobs: - - build: - - runs-on: ubuntu-latest - - permissions: - contents: read - packages: write - - steps: - - - name: Checkout repository - uses: actions/checkout@v4 - - - name: Extract Docker metadata - id: metadata - uses: docker/metadata-action@v5 - with: - images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} - - - name: Setup Docker Buildx - uses: docker/setup-buildx-action@v3 - - - name: Setup QEMU - uses: docker/setup-qemu-action@v3 - - - name: Login to Docker registry ${{ env.REGISTRY }} - uses: docker/login-action@v3 - with: - registry: ${{ env.REGISTRY }} - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} - # only run on tags - if: github.event_name != 'pull_request' && startsWith(github.ref, 'refs/tags/') - - - name: Build and push Docker image - uses: docker/build-push-action@v6 - with: - context: . - platforms: linux/amd64,linux/arm64 - tags: ${{ steps.metadata.outputs.tags }} - labels: ${{ steps.metadata.outputs.labels }} - cache-from: type=gha - cache-to: type=gha,mode=max - # only run on tags - push: ${{ github.event_name != 'pull_request' && startsWith(github.ref, 'refs/tags/') }} diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index c5a2e3b..0000000 --- a/Dockerfile +++ /dev/null @@ -1,38 +0,0 @@ -# =============== -# = build image -# =============== - -FROM --platform=$BUILDPLATFORM golang:alpine AS build - -WORKDIR /src -COPY . . - -ARG TARGETOS -ARG TARGETARCH - -ARG GOOS=$TARGETOS -ARG GOARCH=$TARGETARCH -ARG CGO_ENABLED=0 - -RUN go test -v ./... -RUN go build -trimpath -ldflags '-s -w -buildid=' - -# =============== -# = main image -# =============== - -FROM --platform=$TARGETPLATFORM alpine:3.19 - -RUN apk add --update --no-cache tini -COPY --from=build /src/netexp /usr/local/bin/netexp -ENTRYPOINT [ "tini", "--", "netexp" ] - -COPY --chmod=755 <<-'EOF' /healthcheck.sh - #!/bin/sh -eu - listen=${NETEXP_LISTEN:-:9298} - host=${listen%:*} - port=${listen#$host} - content=$(wget --quiet --tries=1 --output-document=- "http://127.0.0.1$port") - test -n "$(printf '%s\n' "$content" | wc -l)" -EOF -HEALTHCHECK CMD [ "/healthcheck.sh" ] From 077a38aca03acfd2c4d6ea151c0a98409581f986 Mon Sep 17 00:00:00 2001 From: demurky Date: Fri, 12 Dec 2025 20:44:02 +0330 Subject: [PATCH 4/9] update the testing workflow --- .github/workflows/go.yml | 32 --------------------------- .github/workflows/run-tests.yml | 38 +++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 32 deletions(-) delete mode 100644 .github/workflows/go.yml create mode 100644 .github/workflows/run-tests.yml diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml deleted file mode 100644 index 5ca8609..0000000 --- a/.github/workflows/go.yml +++ /dev/null @@ -1,32 +0,0 @@ -# test and build the project -# more info: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go - -name: Go Test - -on: - push: - branches: [ main ] - pull_request: - branches: [ main ] - -jobs: - - test: - - runs-on: ubuntu-latest - - steps: - - - name: Checkout repository - uses: actions/checkout@v4 - - - name: Setup Go - uses: actions/setup-go@v5 - with: - go-version: '1.21.4' - - - name: Build - run: go build -v ./... - - - name: Test - run: go test -v ./... diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml new file mode 100644 index 0000000..ed7dad2 --- /dev/null +++ b/.github/workflows/run-tests.yml @@ -0,0 +1,38 @@ +name: run-tests + +on: + push: + branches: + - main + + pull_request: + branches: + - main + +jobs: + + run-tests: + + strategy: + matrix: + go-version: + - stable + - oldstable + + runs-on: ubuntu-latest + + permissions: + contents: read + + steps: + + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Setup Go + uses: actions/setup-go@v5 + with: + go-version: ${{ matrix.go-version }} + + - name: Run tests + run: go test -v ./... From 3466435686cbacf24366d39d014f0c407bf21ba6 Mon Sep 17 00:00:00 2001 From: demurky Date: Fri, 12 Dec 2025 20:44:18 +0330 Subject: [PATCH 5/9] bump go version to 1.25 --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index fbf689a..35a1769 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ module netexp -go 1.21.4 +go 1.25 From bc341f638c057aceefab689726e02ecd803d7e68 Mon Sep 17 00:00:00 2001 From: demurky Date: Fri, 12 Dec 2025 20:57:31 +0330 Subject: [PATCH 6/9] remove oldstable support from the testing workflow there's no point in it anyway since we prefer GOTOOLCHAIN=auto. --- .github/workflows/run-tests.yml | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index ed7dad2..0979ad2 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -13,12 +13,6 @@ jobs: run-tests: - strategy: - matrix: - go-version: - - stable - - oldstable - runs-on: ubuntu-latest permissions: @@ -32,7 +26,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v5 with: - go-version: ${{ matrix.go-version }} + go-version: stable - name: Run tests run: go test -v ./... From 30b434ded117ef0b2e488b1c58d87fd32d745983 Mon Sep 17 00:00:00 2001 From: demurky Date: Fri, 12 Dec 2025 21:01:24 +0330 Subject: [PATCH 7/9] bump the workflow actions --- .github/workflows/run-tests.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 0979ad2..5a981ff 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -21,10 +21,10 @@ jobs: steps: - name: Checkout repository - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Setup Go - uses: actions/setup-go@v5 + uses: actions/setup-go@v6 with: go-version: stable From d467a4101ede74cb5b29bdd491908b8f7772a36f Mon Sep 17 00:00:00 2001 From: ParsaJR Date: Tue, 16 Dec 2025 00:21:00 +0330 Subject: [PATCH 8/9] Custom rcu-like structure --- main.go | 36 ++++++++++++----- rcu/rcu.go | 111 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 138 insertions(+), 9 deletions(-) create mode 100644 rcu/rcu.go diff --git a/main.go b/main.go index 92a4e76..6682a41 100644 --- a/main.go +++ b/main.go @@ -6,8 +6,8 @@ import ( "net/http" "netexp/netdev" "netexp/pipeline" + "netexp/rcu" "os" - "sync" "time" ) @@ -17,9 +17,10 @@ var ( getver bool ) +type Metrics []byte + var ( - mu sync.RWMutex // guards the metrics - metrics []byte + rcuMetrics *rcu.RCU[Metrics] ) func main() { @@ -40,6 +41,11 @@ func main() { return } + // Init rcu + rcuMetrics = rcu.New[Metrics]() + // Add the first element. + rcuMetrics.Rotate() + serve() gather() } @@ -50,9 +56,12 @@ func serve() { }) http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { - mu.RLock() - defer mu.RUnlock() - w.Write(metrics) + latest, done := rcuMetrics.Latest() + + if latest != nil { + w.Write(*latest) + done() + } }) go func() { @@ -68,6 +77,8 @@ func serve() { func gather() { p := pipeline.New([]int{1, 5, 10, 15, 30, 60}) + timer := time.NewTicker(60 * time.Second) + for { data, err := netdev.ReadNetDev() if err != nil { @@ -79,9 +90,16 @@ func gather() { panic(fmt.Errorf("could not get traffic: %w", err)) } - mu.Lock() - metrics = p.Step(recv, trns) - mu.Unlock() + m := p.Step(recv, trns) + + // Non blocking. It expected to be fast. + rcuMetrics.Assign(m) + + select { + case <-timer.C: + rcuMetrics.Rotate() + default: + } time.Sleep(time.Second) } diff --git a/rcu/rcu.go b/rcu/rcu.go new file mode 100644 index 0000000..e8f0bbd --- /dev/null +++ b/rcu/rcu.go @@ -0,0 +1,111 @@ +// read, copy, update +package rcu + +import ( + "slices" + "sync" +) + +// Represents a single unit of data that "RCU" Holds. +type Element[T any] struct { + data T + mu *sync.Mutex // guards the "refCount" down below. + refCount int // Read & Writes on "refCount" only happens under the mu lock. +} + +// RCU is a structure that provides a safe way to Write and read +// data. All readers are guaranteed to access to the second latest +// buffer, Using its "Latest()" method. +type RCU[T any] struct { + elements []Element[T] + mu sync.RWMutex +} + +func New[T any]() *RCU[T] { + return &RCU[T]{ + // 10 capacity guarantees that no reallocation occur, if and + // only if the program doesn't append more than that. Which + // is unlikely to happen if we configure a timeout deadline on + // the HTTP server. + elements: make([]Element[T], 0, 10), + mu: sync.RWMutex{}, + } +} + +// Rotate adds a new instance of Element to the Elements slice and +// also removes unreferenced elements from the beginning of the slice. +func (rcu *RCU[T]) Rotate() { + rcu.mu.Lock() + defer rcu.mu.Unlock() + + newElem := Element[T]{ + refCount: 0, + mu: &sync.Mutex{}, + } + + rcu.elements = append(rcu.elements, newElem) + + if len(rcu.elements) <= 2 { + return // So there is nothing to clean up. + } + + // Only check up to last two (protect the last two: current and + // previous elements). And do not waste your time if its lock + // acquired. + til := 0 + for i := 0; i < len(rcu.elements)-2; i++ { + + ok := rcu.elements[i].mu.TryLock() + + if !ok { + break + } + + if rcu.elements[i].refCount > 0 { + rcu.elements[i].mu.Unlock() + break // Stop if we hit a referenced element; We only remove consecutive unreferenced elements. + } + til++ + rcu.elements[i].mu.Unlock() + } + + if til > 0 { + rcu.elements = slices.Delete(rcu.elements, 0, til) + } +} + +type RefDecrementFunc func() + +// returns the most recent valid element. The caller is reponsible for +// decrementing the refCount using the returned "RefDecrementFunc". +func (rcu *RCU[T]) Latest() (*T, RefDecrementFunc) { + rcu.mu.RLock() + + if len(rcu.elements) >= 2 { + index := len(rcu.elements) - 2 + + elem := &rcu.elements[index] + rcu.mu.RUnlock() + + elem.mu.Lock() + elem.refCount++ + elem.mu.Unlock() + + return &elem.data, func() { + elem.mu.Lock() + elem.refCount-- + elem.mu.Unlock() + } + } + + rcu.mu.RUnlock() + return nil, nil +} + +// Assigns data to the last index of "elements" slice. It doesn't +// need mutual exclution, because only one goroutine manipulates the +// rcu slice. +func (rcu *RCU[T]) Assign(data T) { + l := len(rcu.elements) + rcu.elements[l-1].data = data +} From c9c65408c5809958c89e85595e3b58014537363e Mon Sep 17 00:00:00 2001 From: ParsaJR Date: Tue, 16 Dec 2025 14:30:07 +0330 Subject: [PATCH 9/9] using "sync/atomic" primitives --- main.go | 32 ++-------------- rcu/rcu.go | 108 +++-------------------------------------------------- 2 files changed, 10 insertions(+), 130 deletions(-) diff --git a/main.go b/main.go index 6682a41..6eec7ea 100644 --- a/main.go +++ b/main.go @@ -13,16 +13,11 @@ import ( var ( version = "0.3.8" + metrics rcu.RCU[[]byte] listen string getver bool ) -type Metrics []byte - -var ( - rcuMetrics *rcu.RCU[Metrics] -) - func main() { // get options from flags flag.StringVar(&listen, "listen", ":9298", "network address to listen on") @@ -41,11 +36,6 @@ func main() { return } - // Init rcu - rcuMetrics = rcu.New[Metrics]() - // Add the first element. - rcuMetrics.Rotate() - serve() gather() } @@ -56,12 +46,7 @@ func serve() { }) http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { - latest, done := rcuMetrics.Latest() - - if latest != nil { - w.Write(*latest) - done() - } + w.Write(*metrics.Load()) }) go func() { @@ -77,8 +62,6 @@ func serve() { func gather() { p := pipeline.New([]int{1, 5, 10, 15, 30, 60}) - timer := time.NewTicker(60 * time.Second) - for { data, err := netdev.ReadNetDev() if err != nil { @@ -90,16 +73,9 @@ func gather() { panic(fmt.Errorf("could not get traffic: %w", err)) } - m := p.Step(recv, trns) + buf := p.Step(recv, trns) - // Non blocking. It expected to be fast. - rcuMetrics.Assign(m) - - select { - case <-timer.C: - rcuMetrics.Rotate() - default: - } + metrics.Store(&buf) time.Sleep(time.Second) } diff --git a/rcu/rcu.go b/rcu/rcu.go index e8f0bbd..97031ee 100644 --- a/rcu/rcu.go +++ b/rcu/rcu.go @@ -1,111 +1,15 @@ -// read, copy, update package rcu -import ( - "slices" - "sync" -) +import "sync/atomic" -// Represents a single unit of data that "RCU" Holds. -type Element[T any] struct { - data T - mu *sync.Mutex // guards the "refCount" down below. - refCount int // Read & Writes on "refCount" only happens under the mu lock. -} - -// RCU is a structure that provides a safe way to Write and read -// data. All readers are guaranteed to access to the second latest -// buffer, Using its "Latest()" method. type RCU[T any] struct { - elements []Element[T] - mu sync.RWMutex -} - -func New[T any]() *RCU[T] { - return &RCU[T]{ - // 10 capacity guarantees that no reallocation occur, if and - // only if the program doesn't append more than that. Which - // is unlikely to happen if we configure a timeout deadline on - // the HTTP server. - elements: make([]Element[T], 0, 10), - mu: sync.RWMutex{}, - } -} - -// Rotate adds a new instance of Element to the Elements slice and -// also removes unreferenced elements from the beginning of the slice. -func (rcu *RCU[T]) Rotate() { - rcu.mu.Lock() - defer rcu.mu.Unlock() - - newElem := Element[T]{ - refCount: 0, - mu: &sync.Mutex{}, - } - - rcu.elements = append(rcu.elements, newElem) - - if len(rcu.elements) <= 2 { - return // So there is nothing to clean up. - } - - // Only check up to last two (protect the last two: current and - // previous elements). And do not waste your time if its lock - // acquired. - til := 0 - for i := 0; i < len(rcu.elements)-2; i++ { - - ok := rcu.elements[i].mu.TryLock() - - if !ok { - break - } - - if rcu.elements[i].refCount > 0 { - rcu.elements[i].mu.Unlock() - break // Stop if we hit a referenced element; We only remove consecutive unreferenced elements. - } - til++ - rcu.elements[i].mu.Unlock() - } - - if til > 0 { - rcu.elements = slices.Delete(rcu.elements, 0, til) - } + p atomic.Pointer[T] } -type RefDecrementFunc func() - -// returns the most recent valid element. The caller is reponsible for -// decrementing the refCount using the returned "RefDecrementFunc". -func (rcu *RCU[T]) Latest() (*T, RefDecrementFunc) { - rcu.mu.RLock() - - if len(rcu.elements) >= 2 { - index := len(rcu.elements) - 2 - - elem := &rcu.elements[index] - rcu.mu.RUnlock() - - elem.mu.Lock() - elem.refCount++ - elem.mu.Unlock() - - return &elem.data, func() { - elem.mu.Lock() - elem.refCount-- - elem.mu.Unlock() - } - } - - rcu.mu.RUnlock() - return nil, nil +func (r *RCU[T]) Store(t *T) { + r.p.Store(t) } -// Assigns data to the last index of "elements" slice. It doesn't -// need mutual exclution, because only one goroutine manipulates the -// rcu slice. -func (rcu *RCU[T]) Assign(data T) { - l := len(rcu.elements) - rcu.elements[l-1].data = data +func (r *RCU[T]) Load() *T { + return r.p.Load() }