From 1f0ee83baca4352a373bd225eb960deb7ec69329 Mon Sep 17 00:00:00 2001 From: Kirill Trofimov Date: Mon, 6 Apr 2026 18:56:17 +0300 Subject: [PATCH 1/3] add todo to readme --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index a9f13cb..ab2bc67 100644 --- a/README.md +++ b/README.md @@ -117,6 +117,7 @@ func (r *LifecycleRunner) Execute(ctx context.Context) error - [x] Lifecycle support (Init, Start, Ready) - [ ] Named/tagged dependencies - [ ] Scope support +- [ ] Init/Start timeout ## Limitations From efeab848921755068192125f6d69d028c2ee2034 Mon Sep 17 00:00:00 2001 From: Kirill Trofimov Date: Sat, 11 Apr 2026 14:11:40 +0300 Subject: [PATCH 2/3] add eventbus --- README.md | 3 + eventbus/bus.go | 83 ++++++++++++++++ eventbus/bus_test.go | 188 +++++++++++++++++++++++++++++++++++++ eventbus/samples/simple.go | 52 ++++++++++ module/module.go | 6 ++ 5 files changed, 332 insertions(+) create mode 100644 eventbus/bus.go create mode 100644 eventbus/bus_test.go create mode 100644 eventbus/samples/simple.go create mode 100644 module/module.go diff --git a/README.md b/README.md index ab2bc67..25edc61 100644 --- a/README.md +++ b/README.md @@ -118,6 +118,9 @@ func (r *LifecycleRunner) Execute(ctx context.Context) error - [ ] Named/tagged dependencies - [ ] Scope support - [ ] Init/Start timeout +- [ ] Startup profiler +- [ ] Lazy initialization +- [ ] Mermaid diagram ## Limitations diff --git a/eventbus/bus.go b/eventbus/bus.go new file mode 100644 index 0000000..17ee12b --- /dev/null +++ b/eventbus/bus.go @@ -0,0 +1,83 @@ +package eventbus + +import ( + "context" + "reflect" + "sync" +) + +type subscriber func(ctx context.Context, data any) + +type Subscriber[T any] func(ctx context.Context, data T) + +type EventBus struct { + // event and channel associated with event + events map[reflect.Type][]chan any + // all subscribers for event + subs map[reflect.Type][]subscriber +} + +func NewEventBus() *EventBus { + return &EventBus{ + events: make(map[reflect.Type][]chan any), + subs: make(map[reflect.Type][]subscriber), + } +} + +func (e *EventBus) Publish(data any) { + typ := reflect.TypeOf(data) + chans, ok := e.events[typ] + if !ok { + return + } + + // fan out + for _, ch := range chans { + ch <- data + } +} + +func Subscribe[T any](bus *EventBus, sub Subscriber[T]) { + var zero T + typ := reflect.TypeOf(&zero).Elem() + ch := make(chan any, 128) + bus.events[typ] = append(bus.events[typ], ch) + bus.subs[typ] = append(bus.subs[typ], func(ctx context.Context, data any) { + sub(ctx, data.(T)) + }) +} + +// Build is necessary because we shouldn't change subscribers after Start +func (e *EventBus) Build() *LockedEventBus { + return &LockedEventBus{e} +} + +type LockedEventBus struct { + bus *EventBus +} + +func (l *LockedEventBus) Start(ctx context.Context) error { + e := l.bus + wg := sync.WaitGroup{} + + for ev, chans := range e.events { + for i, ch := range chans { + sub := e.subs[ev][i] + wg.Go(func() { + for { + select { + case <-ctx.Done(): + return + + case data := <-ch: + sub(ctx, data) + } + } + }) + } + } + + wg.Wait() + + return nil +} diff --git a/eventbus/bus_test.go b/eventbus/bus_test.go new file mode 100644 index 0000000..54e160a --- /dev/null +++ b/eventbus/bus_test.go @@ -0,0 +1,188 @@ +package eventbus_test + +import ( + "context" + "runtime" + "sync/atomic" + "testing" + "time" + + "github.com/trofkm/compoapp/eventbus" +) + +// --- Event types --- + +type Event1 struct{ ID string } +type Event2 struct{ ID string } +type Event3 struct{ ID string } +type Event4 struct{ ID string } +type Event5 struct{ ID string } +type Event6 struct{ ID string } +type Event7 struct{ ID string } +type Event8 struct{ ID string } +type Event9 struct{ ID string } +type Event10 struct{ ID string } + +// --- Helpers --- + +func startBus(b *testing.B, bus *eventbus.EventBus) context.CancelFunc { + b.Helper() + locked := bus.Build() + ctx, cancel := context.WithCancel(context.Background()) + ready := make(chan struct{}) + go func() { + close(ready) + locked.Start(ctx) + }() + <-ready + runtime.Gosched() + return cancel +} + +// --- 1. Throughput --- + +func BenchmarkThroughput_1sub(b *testing.B) { + bus := eventbus.NewEventBus() + eventbus.Subscribe(bus, func(_ context.Context, _ Event1) {}) + cancel := startBus(b, bus) + defer cancel() + + for b.Loop() { + bus.Publish(Event1{ID: "x"}) + } +} + +func BenchmarkThroughput_10sub(b *testing.B) { + bus := eventbus.NewEventBus() + for range 10 { + eventbus.Subscribe(bus, func(_ context.Context, _ Event1) {}) + } + cancel := startBus(b, bus) + defer cancel() + + for b.Loop() { + bus.Publish(Event1{ID: "x"}) + } +} + +func BenchmarkThroughput_100sub(b *testing.B) { + bus := eventbus.NewEventBus() + for range 100 { + eventbus.Subscribe(bus, func(_ context.Context, _ Event1) {}) + } + cancel := startBus(b, bus) + defer cancel() + + for b.Loop() { + bus.Publish(Event1{ID: "x"}) + } +} + +// --- 2. Latency --- + +func BenchmarkLatency(b *testing.B) { + bus := eventbus.NewEventBus() + + var received atomic.Int64 + eventbus.Subscribe(bus, func(_ context.Context, _ Event1) { + received.Add(1) + }) + cancel := startBus(b, bus) + defer cancel() + + // sanity check + before := received.Load() + bus.Publish(Event1{ID: "test"}) + time.Sleep(100 * time.Millisecond) + after := received.Load() + // b.Logf("sanity check: before=%d after=%d", before, after) + if after == before { + b.Fatal("subscriber never called! bus is broken") + } + + for b.Loop() { + before := received.Load() + bus.Publish(Event1{ID: "x"}) + deadline := time.Now().Add(time.Millisecond) + for received.Load() == before { + if time.Now().After(deadline) { + b.Fatalf("timeout! received=%d", received.Load()) + } + runtime.Gosched() + } + } +} + +// --- 3. Multiple event types --- + +func BenchmarkMultipleTypes_10(b *testing.B) { + bus := eventbus.NewEventBus() + eventbus.Subscribe(bus, func(_ context.Context, _ Event1) {}) + eventbus.Subscribe(bus, func(_ context.Context, _ Event2) {}) + eventbus.Subscribe(bus, func(_ context.Context, _ Event3) {}) + eventbus.Subscribe(bus, func(_ context.Context, _ Event4) {}) + eventbus.Subscribe(bus, func(_ context.Context, _ Event5) {}) + eventbus.Subscribe(bus, func(_ context.Context, _ Event6) {}) + eventbus.Subscribe(bus, func(_ context.Context, _ Event7) {}) + eventbus.Subscribe(bus, func(_ context.Context, _ Event8) {}) + eventbus.Subscribe(bus, func(_ context.Context, _ Event9) {}) + eventbus.Subscribe(bus, func(_ context.Context, _ Event10) {}) + cancel := startBus(b, bus) + defer cancel() + + events := []any{ + Event1{}, Event2{}, Event3{}, Event4{}, Event5{}, + Event6{}, Event7{}, Event8{}, Event9{}, Event10{}, + } + + for i := range b.N { + bus.Publish(events[i%10]) + } +} + +// --- 4. Concurrent publishers --- + +func BenchmarkConcurrentPublish_2(b *testing.B) { benchConcurrent(b, 2) } +func BenchmarkConcurrentPublish_8(b *testing.B) { benchConcurrent(b, 8) } +func BenchmarkConcurrentPublish_32(b *testing.B) { benchConcurrent(b, 32) } + +func benchConcurrent(b *testing.B, goroutines int) { + b.Helper() + bus := eventbus.NewEventBus() + + var count atomic.Int64 + eventbus.Subscribe(bus, func(_ context.Context, _ Event1) { + count.Add(1) + }) + cancel := startBus(b, bus) + defer cancel() + + b.SetParallelism(goroutines) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + bus.Publish(Event1{}) + } + }) + + // дренируем остатки + expected := int64(b.N) + deadline := time.Now().Add(5 * time.Second) + for count.Load() < expected { + if time.Now().After(deadline) { + break + } + runtime.Gosched() + } +} + +// --- 5. No subscribers --- + +func BenchmarkPublishNoSubs(b *testing.B) { + bus := eventbus.NewEventBus() + cancel := startBus(b, bus) + defer cancel() + + for b.Loop() { + bus.Publish(Event1{}) + } +} diff --git a/eventbus/samples/simple.go b/eventbus/samples/simple.go new file mode 100644 index 0000000..8e5c61c --- /dev/null +++ b/eventbus/samples/simple.go @@ -0,0 +1,52 @@ +package main + +import ( + "context" + "fmt" + "strconv" + "sync" + "time" + + "github.com/trofkm/compoapp/eventbus" +) + +func main() { + b := eventbus.NewEventBus() + + eventbus.Subscribe(b, func(ctx context.Context, data string) { + fmt.Println("received string: ", data) + }) + + eventbus.Subscribe(b, func(ctx context.Context, data int) { + fmt.Println("received number: ", data) + }) + + wg := sync.WaitGroup{} + + wg.Go(func() { + for i := range 10 { + b.Publish(i) + time.Sleep(1 * time.Millisecond) + } + }) + + wg.Go(func() { + for i := range 10 { + b.Publish(strconv.Itoa(i + 1000)) + time.Sleep(1 * time.Millisecond) + } + }) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) + defer cancel() + + errChan := make(chan error) + go func() { + defer close(errChan) + errChan <- b.Build().Start(ctx) + }() + + wg.Wait() + + <-errChan +} diff --git a/module/module.go b/module/module.go new file mode 100644 index 0000000..d0d83a5 --- /dev/null +++ b/module/module.go @@ -0,0 +1,6 @@ +package module + +type Module struct { + Provides []any + Exports []any +} From b15c508ec782564f8c427950edacadb07c9faaa4 Mon Sep 17 00:00:00 2001 From: Kirill Trofimov Date: Mon, 13 Apr 2026 14:33:25 +0300 Subject: [PATCH 3/3] fix ci --- eventbus/bus.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/eventbus/bus.go b/eventbus/bus.go index 17ee12b..fd457a6 100644 --- a/eventbus/bus.go +++ b/eventbus/bus.go @@ -6,6 +6,10 @@ import ( "sync" ) +const ( + eventChanSize = 128 +) + type subscriber func(ctx context.Context, data any) type Subscriber[T any] func(ctx context.Context, data T) @@ -38,9 +42,8 @@ func (e *EventBus) Publish(data any) { } func Subscribe[T any](bus *EventBus, sub Subscriber[T]) { - var zero T - typ := reflect.TypeOf(&zero).Elem() - ch := make(chan any, 128) + typ := reflect.TypeFor[T]() + ch := make(chan any, eventChanSize) bus.events[typ] = append(bus.events[typ], ch) bus.subs[typ] = append(bus.subs[typ], func(ctx context.Context, data any) { sub(ctx, data.(T))