Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
89c62d0
Add automated benchdiff job to CI
fako1024 Oct 12, 2023
cbf93be
Fix and improve logging of benchmark CI job
fako1024 Oct 12, 2023
1f9dac6
Increase benchmark significance for CI
fako1024 Oct 12, 2023
80b11f4
Adapt CI benchmark arguments to avoid timeouts
fako1024 Oct 12, 2023
21707df
Hopefully fix incorrect benchmark syntax
fako1024 Oct 12, 2023
bef51c8
Use self-hosted Github runner to improve benchmark consistency
fako1024 Oct 13, 2023
bb33a18
Reduce scope of benchmark to simplify testing
fako1024 Oct 20, 2023
3b6f06b
Improve runner and notification limits
fako1024 Oct 20, 2023
b85732d
Attempt to speed up and run all benchmarks again
fako1024 Oct 20, 2023
71c36e9
Switch to self-managed benchmark & comparison
fako1024 Oct 20, 2023
6f28393
Fix Go environment availability
fako1024 Oct 20, 2023
77ca9fe
Second attempt at preserving the Go environment for benchmark script
fako1024 Oct 20, 2023
1741bb2
Third attempt at preserving environment
fako1024 Oct 20, 2023
2cc6f4d
Ensure Go env variables are added
fako1024 Oct 20, 2023
857b0c0
Try to preserve env via shell
fako1024 Oct 20, 2023
69bf085
Use template notation for shell
fako1024 Oct 20, 2023
2dd4d05
Update GitHub actions and add further debugging info
fako1024 Oct 21, 2023
814a0a2
Attempt to explicitly export PATH env variable
fako1024 Oct 21, 2023
ba45725
Store PATH in file to be read in benchmark script
fako1024 Oct 21, 2023
e990c9f
Reduce scope of benchmarks for faster debugging
fako1024 Oct 21, 2023
5f95b80
Increase test runtime for another test
fako1024 Oct 21, 2023
8eae24e
Add automatic sticky PR comment
fako1024 Oct 21, 2023
c33e868
Run benchmark only on PR
fako1024 Oct 21, 2023
3feae2e
Properly separate CI / benchmarks in different jobs
fako1024 Oct 21, 2023
ee97dd0
Fix some minor parameters
fako1024 Oct 21, 2023
f27332b
Extend parameters to run full benchmark suite again
fako1024 Oct 21, 2023
0566f70
Ensure the correct PR head is checked out and do not trigger on _read…
fako1024 Oct 21, 2023
563ef92
Temporarily limit scope of benchmark for faster debugging
fako1024 Oct 23, 2023
b0b555f
Provide CPU pinning capabilities for mock sources and optimize benchm…
fako1024 Oct 23, 2023
ac5f1b2
Limit GC for hot path of benchmark
fako1024 Oct 26, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions .github/workflows/ci-pr.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
name: CI Build / Test (Pull Request)

on:
pull_request:
types: [opened, synchronize]

jobs:

bench:
name: Comparative Benchmark
runs-on: [self-hosted, linux, benchmark-only]
steps:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: ^1.21
id: go

- name: Check out code into the Go module directory
uses: actions/checkout@v4
with:
ref: ${{ github.event.pull_request.head.ref }}
fetch-depth: 0
show-progress: false

- name: Run Comparative Benchmark
id: bench
env:
BENCH_TORUN: .
BENCH_COUNT: 15
BENCH_TIME: 5s
run: |
printenv PATH > .path
/home/app/bench.sh

- name: Add / Update Benchmark Result Comment
continue-on-error: true
uses: marocchino/sticky-pull-request-comment@v2
with:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
header: bench
hide_and_recreate: true
hide_classify: "OUTDATED"
message: |
### Benchmark Result
<details><summary>Benchmark diff with base branch</summary>

```
${{ steps.bench.outputs.diff }}
```
</details>
12 changes: 4 additions & 8 deletions .github/workflows/go.yml → .github/workflows/ci-push.yml
Original file line number Diff line number Diff line change
@@ -1,26 +1,22 @@
# This workflow will build a golang project
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go

name: Go
name: CI Build / Test (Push)

on:
- push
- pull_request

jobs:

build-linux:
name: Build on Linux
name: Build / Test on Linux
runs-on: ubuntu-latest
steps:
- name: Set up Go
uses: actions/setup-go@v3
uses: actions/setup-go@v4
with:
go-version: ^1.21
id: go

- name: Check out code into the Go module directory
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Build for 386
run: GOOS=linux GOARCH=386 go build -v -x ./...
Expand Down
53 changes: 47 additions & 6 deletions capture/afpacket/afpacket/afpacket_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ package afpacket
import (
"errors"
"io"
"runtime"
"time"

"github.com/fako1024/slimcap/capture"
"github.com/fako1024/slimcap/capture/afpacket/socket"
"github.com/fako1024/slimcap/event"
"github.com/fako1024/slimcap/link"
"golang.org/x/sys/unix"
)

const (
Expand All @@ -28,8 +30,10 @@ const (
type MockSource struct {
*Source

mockPackets chan capture.Packet
MockFd *socket.MockFileDescriptor
mockPackets chan capture.Packet
MockFd *socket.MockFileDescriptor

cpuSet *unix.CPUSet
packetAddCallbackFn func(payload []byte, totalLen uint32, pktType, ipLayerOffset byte)
}

Expand All @@ -40,6 +44,12 @@ func (m *MockSource) PacketAddCallbackFn(fn func(payload []byte, totalLen uint32
return m
}

// CPUSet defines an explicit set of CPU cores to run / pin any background tasks on / to
func (m *MockSource) CPUSet(cpuSet *unix.CPUSet) *MockSource {
m.cpuSet = cpuSet
return m
}

// AddPacket adds a new mock packet to the source
// This can happen prior to calling run or continuously while consuming data
func (m *MockSource) AddPacket(pkt capture.Packet) error {
Expand Down Expand Up @@ -107,6 +117,17 @@ func (m *MockSource) Pipe(src capture.Source, doneReadingChan chan struct{}) cha
errChan := make(chan error)

go func(errs chan error, done chan struct{}) {

// Minimize scheduler overhead by locking this goroutine to the current thread.
// In addition, pin the task to the CPU set (if provided)
runtime.LockOSThread()
if m.cpuSet != nil {
if err := unix.SchedSetaffinity(0, m.cpuSet); err != nil {
errs <- err
return
}
}

for {
if err := m.AddPacketFromSource(src); err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, capture.ErrCaptureStopped) {
Expand Down Expand Up @@ -145,6 +166,16 @@ func (m *MockSource) RunNoDrain() chan error {
errChan := make(chan error)
go func(errs chan error) {

// Minimize scheduler overhead by locking this goroutine to the current thread.
// In addition, pin the task to the CPU set (if provided)
runtime.LockOSThread()
if m.cpuSet != nil {
if err := unix.SchedSetaffinity(0, m.cpuSet); err != nil {
errs <- err
return
}
}

// Populate a slice with all packets from the channel for repeated consumption
packets := make([]capture.Packet, 0, len(m.mockPackets))
for i := 0; i < len(m.mockPackets); i++ {
Expand Down Expand Up @@ -190,20 +221,30 @@ func (m *MockSource) Close() error {

//////////////////////////////////////////////////////////////////////////////////////////////////////

func (m *MockSource) run(errChan chan error) {
func (m *MockSource) run(errs chan error) {

defer close(errs)

defer close(errChan)
// Minimize scheduler overhead by locking this goroutine to the current thread.
// In addition, pin the task to the CPU set (if provided)
runtime.LockOSThread()
if m.cpuSet != nil {
if err := unix.SchedSetaffinity(0, m.cpuSet); err != nil {
errs <- err
return
}
}

for pkt := range m.mockPackets {

m.MockFd.Put(pkt)

// Queue / trigger an event equivalent to receiving a new packet via the PPOLL syscall
if err := event.ToMockHandler(m.eventHandler).SignalAvailableData(); err != nil {
errChan <- err
errs <- err
return
}
}

errChan <- nil
errs <- nil
}
31 changes: 31 additions & 0 deletions capture/afpacket/afpacket/afpacket_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ package afpacket
import (
"fmt"
"net"
"runtime"
"sync"
"testing"
"time"

"github.com/fako1024/slimcap/capture"
"github.com/fako1024/slimcap/link"
"github.com/stretchr/testify/require"
"golang.org/x/sys/unix"
)

func TestOptions(t *testing.T) {
Expand Down Expand Up @@ -228,6 +230,12 @@ func TestPipe(t *testing.T) {

func BenchmarkCaptureMethods(b *testing.B) {

// TODO: Add some logic / checking for isolated cores (and use those) and
// validation that there are sufficient cores
var cpuMaskFG, cpuMaskBG unix.CPUSet
cpuMaskFG.Set(1)
cpuMaskBG.Set(2)

testPacket, err := capture.BuildPacket(
net.ParseIP("1.2.3.4"),
net.ParseIP("4.5.6.7"),
Expand All @@ -242,14 +250,17 @@ func BenchmarkCaptureMethods(b *testing.B) {
Promiscuous(false),
)
require.Nil(b, err)
mockSrc.CPUSet(&cpuMaskBG)

for mockSrc.CanAddPackets() {
require.Nil(b, mockSrc.AddPacket(testPacket))
}
mockSrc.RunNoDrain()

b.Run("NextPacket", func(b *testing.B) {
require.Nil(b, pinToCPU(&cpuMaskFG))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
p, _ := mockSrc.NextPacket(nil)
_ = p
Expand All @@ -258,6 +269,7 @@ func BenchmarkCaptureMethods(b *testing.B) {

b.Run("NextPacketInPlace", func(b *testing.B) {
var p capture.Packet = mockSrc.NewPacket()
require.Nil(b, pinToCPU(&cpuMaskFG))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -279,6 +291,7 @@ func BenchmarkCaptureMethods(b *testing.B) {
b.Run("NextIPPacketInPlace", func(b *testing.B) {
pkt := mockSrc.NewPacket()
var p capture.IPLayer = pkt.IPLayer()
require.Nil(b, pinToCPU(&cpuMaskFG))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -290,7 +303,9 @@ func BenchmarkCaptureMethods(b *testing.B) {
})

b.Run("NextPacketFn", func(b *testing.B) {
require.Nil(b, pinToCPU(&cpuMaskFG))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = mockSrc.NextPacketFn(func(payload []byte, totalLen uint32, pktType, ipLayerOffset byte) error {
_ = payload
Expand Down Expand Up @@ -350,3 +365,19 @@ func testCaptureMethods(t *testing.T, fn func(t *testing.T, src *MockSource, i,
// Close the mock source
require.Nil(t, mockSrc.Close())
}

func pinToCPU(cpuSet *unix.CPUSet) error {

// If no CPU set is provided, do nothing
if cpuSet == nil {
return nil
}

// Set affinity and lock thread
if err := unix.SchedSetaffinity(0, cpuSet); err != nil {
return err
}
runtime.LockOSThread()

return nil
}
43 changes: 36 additions & 7 deletions capture/afpacket/afring/afring_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package afring
import (
"errors"
"io"
"runtime"
"sync/atomic"
"time"
"unsafe"
Expand Down Expand Up @@ -49,6 +50,7 @@ type MockSource struct {

MockFd *socket.MockFileDescriptor

cpuSet *unix.CPUSet
packetAddCallbackFn func(payload []byte, totalLen uint32, pktType, ipLayerOffset byte)
}

Expand Down Expand Up @@ -98,6 +100,12 @@ func (m *MockSource) PacketAddCallbackFn(fn func(payload []byte, totalLen uint32
return m
}

// CPUSet defines an explicit set of CPU cores to run / pin any background tasks on / to
func (m *MockSource) CPUSet(cpuSet *unix.CPUSet) *MockSource {
m.cpuSet = cpuSet
return m
}

// AddPacket adds a new mock packet to the source
// This can happen prior to calling run or continuously while consuming data, mimicking the
// function of an actual ring buffer. Consequently, if the ring buffer is full and elements not
Expand Down Expand Up @@ -196,6 +204,17 @@ func (m *MockSource) Pipe(src capture.Source, doneReadingChan chan struct{}) (er
errChan = make(chan error, 1)

go func(errs chan error, done chan struct{}) {

// Minimize scheduler overhead by locking this goroutine to the current thread.
// In addition, pin the task to the CPU set (if provided)
runtime.LockOSThread()
if m.cpuSet != nil {
if err := unix.SchedSetaffinity(0, m.cpuSet); err != nil {
errs <- err
return
}
}

for {
if err := m.AddPacketFromSource(src); err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, capture.ErrCaptureStopped) {
Expand All @@ -222,10 +241,10 @@ func (m *MockSource) Pipe(src capture.Source, doneReadingChan chan struct{}) (er
// Run executes processing of packets in the background, mimicking the function of an actual kernel
// packet ring buffer
func (m *MockSource) Run() <-chan error {
errChan := make(chan error)
go m.run(errChan)
errs := make(chan error)
go m.run(errs)

return errChan
return errs
}

// Done notifies the mock source that no more mock packets will be added, causing the ring buffer
Expand All @@ -248,8 +267,18 @@ func (m *MockSource) ForceBlockskUnavailable() {

//////////////////////////////////////////////////////////////////////////////////////////////////////

func (m *MockSource) run(errChan chan<- error) {
defer close(errChan)
func (m *MockSource) run(errs chan<- error) {
defer close(errs)

// Minimize scheduler overhead by locking this goroutine to the current thread.
// In addition, pin the task to the CPU set (if provided)
runtime.LockOSThread()
if m.cpuSet != nil {
if err := unix.SchedSetaffinity(0, m.cpuSet); err != nil {
errs <- err
return
}
}

for block := range m.mockBlocks {

Expand All @@ -263,12 +292,12 @@ func (m *MockSource) run(errChan chan<- error) {

// Queue / trigger an event equivalent to receiving a new block via the PPOLL syscall
if err := event.ToMockHandler(m.eventHandler).SignalAvailableData(); err != nil {
errChan <- err
errs <- err
return
}
}

errChan <- nil
errs <- nil
}

func (m *MockSource) getBlockStatus(n int) (status uint32) {
Expand Down
Loading