Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
```

#### Example

General purpose event bus (unlimited number of parameters of any type):

```go
func calculator(a int, b int) {
fmt.Printf("%d\n", a + b)
Expand All @@ -39,6 +42,27 @@ func main() {
}
```

You can alternatively use ~50% faster implementation if your listener accepts only one argument of a particular type.
Simple event bus (only one parameter of a chosen type):

```go
type sumComponents struct {
a int
b int
}

func calculator(args sumComponents) {
fmt.Printf("%d\n", args.a + args.b)
}

func main() {
bus := EventBus.NewSimpleBus[sumComponents]()
ref := bus.Subscribe("main:calculator", calculator)
bus.Publish("main:calculator", sumComponents{20, 40})
bus.Unsubscribe("main:calculator", ref)
}
```

#### Implemented methods
* **New()**
* **Subscribe()**
Expand All @@ -56,6 +80,12 @@ New returns new EventBus with empty handlers.
bus := EventBus.New();
```

#### NewSimpleBus[T]()
The below example creates a new EventBus with handlers accepting only string arguments:
```go
bus := EventBus.NewSimpleBus[string]()
````

#### Subscribe(topic string, fn interface{}) error
Subscribe to a topic. Returns error if `fn` is not a function.
```go
Expand All @@ -64,6 +94,18 @@ func Handler() { ... }
bus.Subscribe("topic:handler", Handler)
```

#### Subscribe(topic string, fn func(T)) SubscriptionRef

Simplified version of bus returns `SubscriptionRef` as a result of all subscription methods. It can be used to unsubscribe from the topic.
```go
func Handler(param string) { ... }
...
bus := EventBus.NewSimpleBus[string]()
ref := bus.Subscribe("topic:handler", Handler)
...
bus.Unsubscribe(ref)
```

#### SubscribeOnce(topic string, fn interface{}) error
Subscribe to a topic once. Handler will be removed after executing. Returns error if `fn` is not a function.
```go
Expand Down Expand Up @@ -91,6 +133,8 @@ bus.Subscribe("topic:handler", Handler)
bus.Publish("topic:handler", "Hello, World!");
```

In the case of simplified bus, there is only one argument accepted of predefined type.

#### SubscribeAsync(topic string, fn interface{}, transactional bool)
Subscribe to a topic with an asynchronous callback. Returns error if `fn` is not a function.
```go
Expand Down Expand Up @@ -147,6 +191,36 @@ func main() {
}
```

### Benchmarks

```shell
% GOMAXPROCS=1 go test -bench=.
2
goos: darwin
goarch: amd64
pkg: github.com/ziollek/EventBus
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkSynchronousPublishing 2877297 406.9 ns/op
BenchmarkAsynchronousPublishing 548457 2101 ns/op
BenchmarkSimpleSynchronousPublishing 3939492 305.5 ns/op
BenchmarkSimpleAsynchronousPublishing 1000000 1141 ns/op
PASS
ok github.com/ziollek/EventBus 9.449s

% GOMAXPROCS=2 go test -bench=.
2
goos: darwin
goarch: amd64
pkg: github.com/ziollek/EventBus
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkSynchronousPublishing-2 3146671 379.3 ns/op
BenchmarkAsynchronousPublishing-2 1050337 1148 ns/op
BenchmarkSimpleSynchronousPublishing-2 5079916 246.0 ns/op
BenchmarkSimpleAsynchronousPublishing-2 2124786 548.2 ns/op
PASS
ok github.com/ziollek/EventBus 9.677s
```

#### Notes
Documentation is available here: [godoc.org](https://godoc.org/github.com/asaskevich/EventBus).
Full information about code coverage is also available here: [EventBus on gocover.io](http://gocover.io/github.com/asaskevich/EventBus).
Expand Down
35 changes: 35 additions & 0 deletions event_bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,38 @@ func TestSubscribeAsync(t *testing.T) {
// t.Fail()
//}
}

func BenchmarkSynchronousPublishing(b *testing.B) {
type input struct {
number int
slice []int
name string
}
list := []int{1, 2, 3, 4, 5}

bus := New()
_ = bus.Subscribe("topic", func(i input) {})

b.ResetTimer()
for range b.N {
bus.Publish("topic", input{1, list, "test"})
}
}

func BenchmarkAsynchronousPublishing(b *testing.B) {
type input struct {
number int
slice []int
name string
}
list := []int{1, 2, 3, 4, 5}

bus := New()
_ = bus.SubscribeAsync("topic", func(i input) {}, false)

b.ResetTimer()
for range b.N {
bus.Publish("topic", input{1, list, "test"})
}
bus.WaitAsync()
}
162 changes: 162 additions & 0 deletions simple_bus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package EventBus

import (
"sync"
)

type SubscriptionRef uint64

// SimpleBusSubscriber defines subscription-related bus behavior for event of specific type T
type SimpleBusSubscriber[T any] interface {
Subscribe(topic string, fn func(T)) SubscriptionRef
SubscribeAsync(topic string, fn func(T), transactional bool) SubscriptionRef
SubscribeOnce(topic string, fn func(T)) SubscriptionRef
SubscribeOnceAsync(topic string, fn func(T)) SubscriptionRef
Unsubscribe(topic string, ref SubscriptionRef)
}

// SimpleBusPublisher defines publishing-related bus behavior for event of specific type T
type SimpleBusPublisher[T any] interface {
Publish(topic string, arg T)
}

// SimpleBus includes global (subscribe, publish, control) bus behavior
type SimpleBus[T any] interface {
BusController
SimpleBusSubscriber[T]
SimpleBusPublisher[T]
}

type simpleHandler[T any] struct {
callBack func(T)
once bool
async bool
transactional bool
reference SubscriptionRef
sync.Mutex // lock for an event handler - useful for running async callbacks serially
}

func (h *simpleHandler[T]) Call(arg T, wg *sync.WaitGroup) {
if h.async {
wg.Add(1)
go func() {
defer wg.Done()
if h.transactional {
h.Lock()
defer h.Unlock()
}
h.callBack(arg)
}()
return
} else {
h.callBack(arg)
}
}

type listeners[T any] map[SubscriptionRef]*simpleHandler[T]

func (l listeners[T]) add(handler *simpleHandler[T]) {
l[handler.reference] = handler
}

func (l listeners[_]) delete(ref SubscriptionRef) {
delete(l, ref)
}

func newSimpleHandler[T any](fn func(T), ref SubscriptionRef, async, transactional, once bool) *simpleHandler[T] {
return &simpleHandler[T]{
callBack: fn,
reference: ref,
async: async,
transactional: transactional,
once: once,
}
}

type TypedBus[T any] struct {
handlers map[string]*listeners[T]
lastRef SubscriptionRef
sync.RWMutex
wg sync.WaitGroup
}

func NewSimpleBus[T any]() SimpleBus[T] {
return &TypedBus[T]{
handlers: make(map[string]*listeners[T]),
lastRef: SubscriptionRef(0),
}
}

func (bus *TypedBus[_]) HasCallback(topic string) bool {
bus.RLock()
defer bus.RUnlock()
_, ok := bus.handlers[topic]
return ok && len(*bus.handlers[topic]) > 0
}

func (bus *TypedBus[T]) Subscribe(topic string, fn func(T)) SubscriptionRef {
return bus.subscribe(topic, fn, false, false, false)
}

func (bus *TypedBus[T]) SubscribeOnce(topic string, fn func(T)) SubscriptionRef {
return bus.subscribe(topic, fn, false, false, true)
}

func (bus *TypedBus[T]) SubscribeOnceAsync(topic string, fn func(T)) SubscriptionRef {
return bus.subscribe(topic, fn, true, false, true)
}

func (bus *TypedBus[T]) SubscribeAsync(topic string, fn func(T), transactional bool) SubscriptionRef {
return bus.subscribe(topic, fn, true, transactional, false)
}

func (bus *TypedBus[T]) Unsubscribe(topic string, ref SubscriptionRef) {
bus.Lock()
defer bus.Unlock()
if _, ok := bus.handlers[topic]; ok {
bus.handlers[topic].delete(ref)
}
}

func (bus *TypedBus[T]) Publish(topic string, arg T) {
if subscribers, ok := bus.fetchSubscribers(topic); ok {
fire := make(chan *simpleHandler[T], len(*subscribers))
bus.Lock()
for _, subscriber := range *subscribers {
fire <- subscriber
if subscriber.once {
bus.handlers[topic].delete(subscriber.reference)
}
}
bus.Unlock()
close(fire)
// calling the callbacks will not block the whole bus
for subscriber := range fire {
subscriber.Call(arg, &bus.wg)
}
}
}

func (bus *TypedBus[T]) WaitAsync() {
bus.wg.Wait()
}

func (bus *TypedBus[T]) subscribe(topic string, fn func(T), async, transactional, once bool) SubscriptionRef {
bus.Lock()
defer bus.Unlock()
bus.lastRef++
if _, ok := bus.handlers[topic]; !ok {
bus.handlers[topic] = &listeners[T]{}
}
bus.handlers[topic].add(newSimpleHandler(fn, bus.lastRef, async, transactional, once))
return bus.lastRef
}

func (bus *TypedBus[T]) fetchSubscribers(topic string) (*listeners[T], bool) {
bus.RLock()
defer bus.RUnlock()
if handlers, ok := bus.handlers[topic]; ok && len(*handlers) > 0 {
return handlers, true
}
return nil, false
}
Loading