Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ func (r *LifecycleRunner) Execute(ctx context.Context) error
- [x] Lifecycle support (Init, Start, Ready)
- [ ] Named/tagged dependencies
- [ ] Scope support
- [ ] Init/Start timeout
- [ ] Startup profiler
- [ ] Lazy initialization
- [ ] Mermaid diagram

## Limitations

Expand Down
86 changes: 86 additions & 0 deletions eventbus/bus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package eventbus

import (
"context"
"reflect"
"sync"
)

const (
eventChanSize = 128
)

type subscriber func(ctx context.Context, data any)

type Subscriber[T any] func(ctx context.Context, data T)

type EventBus struct {
// event and channel associated with event
events map[reflect.Type][]chan any
// all subscribers for event
subs map[reflect.Type][]subscriber
}

func NewEventBus() *EventBus {
return &EventBus{
events: make(map[reflect.Type][]chan any),
subs: make(map[reflect.Type][]subscriber),
}
}

func (e *EventBus) Publish(data any) {
typ := reflect.TypeOf(data)
chans, ok := e.events[typ]
if !ok {
return
}

// fan out
for _, ch := range chans {
ch <- data
}
}

func Subscribe[T any](bus *EventBus, sub Subscriber[T]) {
typ := reflect.TypeFor[T]()
ch := make(chan any, eventChanSize)
bus.events[typ] = append(bus.events[typ], ch)
bus.subs[typ] = append(bus.subs[typ], func(ctx context.Context, data any) {
sub(ctx, data.(T))
})
}

// Build is necessary because we shouldn't change subscribers after Start
func (e *EventBus) Build() *LockedEventBus {
return &LockedEventBus{e}
}

type LockedEventBus struct {
bus *EventBus
}

func (l *LockedEventBus) Start(ctx context.Context) error {
e := l.bus
wg := sync.WaitGroup{}

for ev, chans := range e.events {
for i, ch := range chans {
sub := e.subs[ev][i]
wg.Go(func() {
for {
select {
case <-ctx.Done():
return

case data := <-ch:
sub(ctx, data)
}
}
})
}
}

wg.Wait()

return nil
}
188 changes: 188 additions & 0 deletions eventbus/bus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package eventbus_test

import (
"context"
"runtime"
"sync/atomic"
"testing"
"time"

"github.com/trofkm/compoapp/eventbus"
)

// --- Event types ---

type Event1 struct{ ID string }
type Event2 struct{ ID string }
type Event3 struct{ ID string }
type Event4 struct{ ID string }
type Event5 struct{ ID string }
type Event6 struct{ ID string }
type Event7 struct{ ID string }
type Event8 struct{ ID string }
type Event9 struct{ ID string }
type Event10 struct{ ID string }

// --- Helpers ---

func startBus(b *testing.B, bus *eventbus.EventBus) context.CancelFunc {
b.Helper()
locked := bus.Build()
ctx, cancel := context.WithCancel(context.Background())
ready := make(chan struct{})
go func() {
close(ready)
locked.Start(ctx)
}()
<-ready
runtime.Gosched()
return cancel
}

// --- 1. Throughput ---

func BenchmarkThroughput_1sub(b *testing.B) {
bus := eventbus.NewEventBus()
eventbus.Subscribe(bus, func(_ context.Context, _ Event1) {})
cancel := startBus(b, bus)
defer cancel()

for b.Loop() {
bus.Publish(Event1{ID: "x"})
}
}

func BenchmarkThroughput_10sub(b *testing.B) {
bus := eventbus.NewEventBus()
for range 10 {
eventbus.Subscribe(bus, func(_ context.Context, _ Event1) {})
}
cancel := startBus(b, bus)
defer cancel()

for b.Loop() {
bus.Publish(Event1{ID: "x"})
}
}

func BenchmarkThroughput_100sub(b *testing.B) {
bus := eventbus.NewEventBus()
for range 100 {
eventbus.Subscribe(bus, func(_ context.Context, _ Event1) {})
}
cancel := startBus(b, bus)
defer cancel()

for b.Loop() {
bus.Publish(Event1{ID: "x"})
}
}

// --- 2. Latency ---

func BenchmarkLatency(b *testing.B) {
bus := eventbus.NewEventBus()

var received atomic.Int64
eventbus.Subscribe(bus, func(_ context.Context, _ Event1) {
received.Add(1)
})
cancel := startBus(b, bus)
defer cancel()

// sanity check
before := received.Load()
bus.Publish(Event1{ID: "test"})
time.Sleep(100 * time.Millisecond)
after := received.Load()
// b.Logf("sanity check: before=%d after=%d", before, after)
if after == before {
b.Fatal("subscriber never called! bus is broken")
}

for b.Loop() {
before := received.Load()
bus.Publish(Event1{ID: "x"})
deadline := time.Now().Add(time.Millisecond)
for received.Load() == before {
if time.Now().After(deadline) {
b.Fatalf("timeout! received=%d", received.Load())
}
runtime.Gosched()
}
}
}

// --- 3. Multiple event types ---

func BenchmarkMultipleTypes_10(b *testing.B) {
bus := eventbus.NewEventBus()
eventbus.Subscribe(bus, func(_ context.Context, _ Event1) {})
eventbus.Subscribe(bus, func(_ context.Context, _ Event2) {})
eventbus.Subscribe(bus, func(_ context.Context, _ Event3) {})
eventbus.Subscribe(bus, func(_ context.Context, _ Event4) {})
eventbus.Subscribe(bus, func(_ context.Context, _ Event5) {})
eventbus.Subscribe(bus, func(_ context.Context, _ Event6) {})
eventbus.Subscribe(bus, func(_ context.Context, _ Event7) {})
eventbus.Subscribe(bus, func(_ context.Context, _ Event8) {})
eventbus.Subscribe(bus, func(_ context.Context, _ Event9) {})
eventbus.Subscribe(bus, func(_ context.Context, _ Event10) {})
cancel := startBus(b, bus)
defer cancel()

events := []any{
Event1{}, Event2{}, Event3{}, Event4{}, Event5{},
Event6{}, Event7{}, Event8{}, Event9{}, Event10{},
}

for i := range b.N {
bus.Publish(events[i%10])
}
}

// --- 4. Concurrent publishers ---

func BenchmarkConcurrentPublish_2(b *testing.B) { benchConcurrent(b, 2) }
func BenchmarkConcurrentPublish_8(b *testing.B) { benchConcurrent(b, 8) }
func BenchmarkConcurrentPublish_32(b *testing.B) { benchConcurrent(b, 32) }

func benchConcurrent(b *testing.B, goroutines int) {
b.Helper()
bus := eventbus.NewEventBus()

var count atomic.Int64
eventbus.Subscribe(bus, func(_ context.Context, _ Event1) {
count.Add(1)
})
cancel := startBus(b, bus)
defer cancel()

b.SetParallelism(goroutines)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
bus.Publish(Event1{})
}
})

// дренируем остатки
expected := int64(b.N)
deadline := time.Now().Add(5 * time.Second)
for count.Load() < expected {
if time.Now().After(deadline) {
break
}
runtime.Gosched()
}
}

// --- 5. No subscribers ---

func BenchmarkPublishNoSubs(b *testing.B) {
bus := eventbus.NewEventBus()
cancel := startBus(b, bus)
defer cancel()

for b.Loop() {
bus.Publish(Event1{})
}
}
52 changes: 52 additions & 0 deletions eventbus/samples/simple.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package main

import (
"context"
"fmt"
"strconv"
"sync"
"time"

"github.com/trofkm/compoapp/eventbus"
)

func main() {
b := eventbus.NewEventBus()

eventbus.Subscribe(b, func(ctx context.Context, data string) {
fmt.Println("received string: ", data)
})

eventbus.Subscribe(b, func(ctx context.Context, data int) {
fmt.Println("received number: ", data)
})

wg := sync.WaitGroup{}

wg.Go(func() {
for i := range 10 {
b.Publish(i)
time.Sleep(1 * time.Millisecond)
}
})

wg.Go(func() {
for i := range 10 {
b.Publish(strconv.Itoa(i + 1000))
time.Sleep(1 * time.Millisecond)
}
})

ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()

errChan := make(chan error)
go func() {
defer close(errChan)
errChan <- b.Build().Start(ctx)
}()

wg.Wait()

<-errChan
}
6 changes: 6 additions & 0 deletions module/module.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package module

type Module struct {
Provides []any
Exports []any
}
Loading