Skip to content

Commit b67d772

Browse files
committed
introduce SubscriptionRef, add example to README
Signed-off-by: tomasz.ziolkowski <tomasz.ziolkowski@allegro.pl>
1 parent e80cea7 commit b67d772

3 files changed

Lines changed: 106 additions & 58 deletions

File tree

README.md

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ import (
2626
```
2727

2828
#### Example
29+
30+
General purpose event bus (unlimited number of parameters of any type):
31+
2932
```go
3033
func calculator(a int, b int) {
3134
fmt.Printf("%d\n", a + b)
@@ -39,6 +42,27 @@ func main() {
3942
}
4043
```
4144

45+
You can alternatively use ~50% faster implementation if your listener accepts only one argument of a particular type.
46+
Simple event bus (only one parameter of a chosen type):
47+
48+
```go
49+
type sumComponents struct {
50+
a int
51+
b int
52+
}
53+
54+
func calculator(args sumComponents) {
55+
fmt.Printf("%d\n", args.a + args.b)
56+
}
57+
58+
func main() {
59+
bus := EventBus.NewSimpleBus[sumComponents]()
60+
ref := bus.Subscribe("main:calculator", calculator)
61+
bus.Publish("main:calculator", sumComponents{20, 40})
62+
bus.Unsubscribe("main:calculator", ref)
63+
}
64+
```
65+
4266
#### Implemented methods
4367
* **New()**
4468
* **Subscribe()**
@@ -56,10 +80,10 @@ New returns new EventBus with empty handlers.
5680
bus := EventBus.New();
5781
```
5882

59-
You can alternatively use ~50% faster implementation if your listener accepts only one argument of a particular type.
83+
#### NewSimpleBus[T]()
6084
The below example creates a new EventBus with handlers accepting only string arguments:
6185
```go
62-
bus := EventBus.NewSimpleBus[string]();
86+
bus := EventBus.NewSimpleBus[string]()
6387
````
6488

6589
#### Subscribe(topic string, fn interface{}) error
@@ -70,6 +94,18 @@ func Handler() { ... }
7094
bus.Subscribe("topic:handler", Handler)
7195
```
7296

97+
#### Subscribe(topic string, fn func(T)) SubscriptionRef
98+
99+
Simplified version of bus returns `SubscriptionRef` as a result of all subscription methods. It can be used to unsubscribe from the topic.
100+
```go
101+
func Handler(param string) { ... }
102+
...
103+
bus := EventBus.NewSimpleBus[string]()
104+
ref := bus.Subscribe("topic:handler", Handler)
105+
...
106+
bus.Unsubscribe(ref)
107+
```
108+
73109
#### SubscribeOnce(topic string, fn interface{}) error
74110
Subscribe to a topic once. Handler will be removed after executing. Returns error if `fn` is not a function.
75111
```go

generic_bus.go

Lines changed: 36 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
package EventBus
22

33
import (
4-
"reflect"
54
"sync"
65
)
76

7+
type SubscriptionRef uint64
8+
89
// SimpleBusSubscriber defines subscription-related bus behavior for event of specific type T
910
type SimpleBusSubscriber[T any] interface {
10-
Subscribe(topic string, fn func(T))
11-
SubscribeAsync(topic string, fn func(T), transactional bool)
12-
SubscribeOnce(topic string, fn func(T))
13-
SubscribeOnceAsync(topic string, fn func(T))
14-
Unsubscribe(topic string, handler func(T))
11+
Subscribe(topic string, fn func(T)) SubscriptionRef
12+
SubscribeAsync(topic string, fn func(T), transactional bool) SubscriptionRef
13+
SubscribeOnce(topic string, fn func(T)) SubscriptionRef
14+
SubscribeOnceAsync(topic string, fn func(T)) SubscriptionRef
15+
Unsubscribe(topic string, ref SubscriptionRef)
1516
}
1617

1718
// SimpleBusPublisher defines publishing-related bus behavior for event of specific type T
@@ -26,16 +27,12 @@ type SimpleBus[T any] interface {
2627
SimpleBusPublisher[T]
2728
}
2829

29-
type callBack[T any] struct {
30-
Call func(T)
31-
pointer uintptr
32-
}
33-
3430
type genericHandler[T any] struct {
35-
callBack callBack[T]
31+
callBack func(T)
3632
once bool
3733
async bool
3834
transactional bool
35+
reference SubscriptionRef
3936
sync.Mutex // lock for an event handler - useful for running async callbacks serially
4037
}
4138

@@ -48,37 +45,28 @@ func (h *genericHandler[T]) Call(arg T, wg *sync.WaitGroup) {
4845
h.Lock()
4946
defer h.Unlock()
5047
}
51-
h.callBack.Call(arg)
48+
h.callBack(arg)
5249
}()
5350
return
5451
} else {
55-
h.callBack.Call(arg)
52+
h.callBack(arg)
5653
}
5754
}
5855

59-
type listeners[T any] []*genericHandler[T]
56+
type listeners[T any] map[SubscriptionRef]*genericHandler[T]
6057

61-
func (l *listeners[T]) add(handler *genericHandler[T]) {
62-
for _, h := range *l {
63-
if h.callBack.pointer == handler.callBack.pointer {
64-
return
65-
}
66-
}
67-
*l = append(*l, handler)
58+
func (l listeners[T]) add(handler *genericHandler[T]) {
59+
l[handler.reference] = handler
6860
}
6961

70-
func (l *listeners[T]) delete(handler *genericHandler[T]) {
71-
for i, h := range *l {
72-
if h.callBack.pointer == handler.callBack.pointer {
73-
*l = append((*l)[:i], (*l)[i+1:]...)
74-
return
75-
}
76-
}
62+
func (l listeners[_]) delete(ref SubscriptionRef) {
63+
delete(l, ref)
7764
}
7865

79-
func newGenericHandler[T any](fn func(T), async, transactional, once bool) *genericHandler[T] {
66+
func newGenericHandler[T any](fn func(T), ref SubscriptionRef, async, transactional, once bool) *genericHandler[T] {
8067
return &genericHandler[T]{
81-
callBack: callBack[T]{Call: fn, pointer: reflect.ValueOf(fn).Pointer()},
68+
callBack: fn,
69+
reference: ref,
8270
async: async,
8371
transactional: transactional,
8472
once: once,
@@ -87,13 +75,15 @@ func newGenericHandler[T any](fn func(T), async, transactional, once bool) *gene
8775

8876
type GenericBus[T any] struct {
8977
handlers map[string]*listeners[T]
78+
lastRef SubscriptionRef
9079
sync.RWMutex
9180
wg sync.WaitGroup
9281
}
9382

9483
func NewSimpleBus[T any]() SimpleBus[T] {
9584
return &GenericBus[T]{
9685
handlers: make(map[string]*listeners[T]),
86+
lastRef: SubscriptionRef(0),
9787
}
9888
}
9989

@@ -104,27 +94,27 @@ func (bus *GenericBus[_]) HasCallback(topic string) bool {
10494
return ok && len(*bus.handlers[topic]) > 0
10595
}
10696

107-
func (bus *GenericBus[T]) Subscribe(topic string, fn func(T)) {
108-
bus.subscribe(topic, fn, false, false, false)
97+
func (bus *GenericBus[T]) Subscribe(topic string, fn func(T)) SubscriptionRef {
98+
return bus.subscribe(topic, fn, false, false, false)
10999
}
110100

111-
func (bus *GenericBus[T]) SubscribeOnce(topic string, fn func(T)) {
112-
bus.subscribe(topic, fn, false, false, true)
101+
func (bus *GenericBus[T]) SubscribeOnce(topic string, fn func(T)) SubscriptionRef {
102+
return bus.subscribe(topic, fn, false, false, true)
113103
}
114104

115-
func (bus *GenericBus[T]) SubscribeOnceAsync(topic string, fn func(T)) {
116-
bus.subscribe(topic, fn, true, false, true)
105+
func (bus *GenericBus[T]) SubscribeOnceAsync(topic string, fn func(T)) SubscriptionRef {
106+
return bus.subscribe(topic, fn, true, false, true)
117107
}
118108

119-
func (bus *GenericBus[T]) SubscribeAsync(topic string, fn func(T), transactional bool) {
120-
bus.subscribe(topic, fn, true, transactional, false)
109+
func (bus *GenericBus[T]) SubscribeAsync(topic string, fn func(T), transactional bool) SubscriptionRef {
110+
return bus.subscribe(topic, fn, true, transactional, false)
121111
}
122112

123-
func (bus *GenericBus[T]) Unsubscribe(topic string, fn func(T)) {
113+
func (bus *GenericBus[T]) Unsubscribe(topic string, ref SubscriptionRef) {
124114
bus.Lock()
125115
defer bus.Unlock()
126116
if _, ok := bus.handlers[topic]; ok {
127-
bus.handlers[topic].delete(newGenericHandler(fn, false, false, false))
117+
bus.handlers[topic].delete(ref)
128118
}
129119
}
130120

@@ -135,7 +125,7 @@ func (bus *GenericBus[T]) Publish(topic string, arg T) {
135125
for _, subscriber := range *subscribers {
136126
fire <- subscriber
137127
if subscriber.once {
138-
bus.handlers[topic].delete(subscriber)
128+
bus.handlers[topic].delete(subscriber.reference)
139129
}
140130
}
141131
bus.Unlock()
@@ -151,13 +141,15 @@ func (bus *GenericBus[T]) WaitAsync() {
151141
bus.wg.Wait()
152142
}
153143

154-
func (bus *GenericBus[T]) subscribe(topic string, fn func(T), async, transactional, once bool) {
144+
func (bus *GenericBus[T]) subscribe(topic string, fn func(T), async, transactional, once bool) SubscriptionRef {
155145
bus.Lock()
156146
defer bus.Unlock()
147+
bus.lastRef++
157148
if _, ok := bus.handlers[topic]; !ok {
158149
bus.handlers[topic] = &listeners[T]{}
159150
}
160-
bus.handlers[topic].add(newGenericHandler(fn, async, transactional, once))
151+
bus.handlers[topic].add(newGenericHandler(fn, bus.lastRef, async, transactional, once))
152+
return bus.lastRef
161153
}
162154

163155
func (bus *GenericBus[T]) fetchSubscribers(topic string) (*listeners[T], bool) {

generic_bus_test.go

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package EventBus
22

33
import (
4+
"fmt"
45
"testing"
56
"time"
67
)
@@ -28,14 +29,17 @@ func TestSimpleOnceAndManySubscribe(t *testing.T) {
2829
bus := NewSimpleBus[any]()
2930
topic := "topic"
3031
flag := 0
31-
// artificially define different functions with the same logic
32-
// subscription logic prevents from subscribe the same callback for the same topic multiple times
33-
fn1 := func(_ any) { flag += 1 }
34-
fn2 := func(_ any) { flag += 1 }
35-
fn3 := func(_ any) { flag += 1 }
36-
bus.SubscribeOnce(topic, fn1)
37-
bus.Subscribe(topic, fn2)
38-
bus.Subscribe(topic, fn3)
32+
33+
fn := func(_ any) { flag += 1 }
34+
35+
refFirst := bus.SubscribeOnce(topic, fn)
36+
refSecond := bus.Subscribe(topic, fn)
37+
refThird := bus.Subscribe(topic, fn)
38+
39+
if refFirst == refSecond || refFirst == refThird || refSecond == refThird {
40+
t.Fail()
41+
}
42+
3943
bus.Publish(topic, nil)
4044

4145
if flag != 3 {
@@ -46,12 +50,12 @@ func TestSimpleOnceAndManySubscribe(t *testing.T) {
4650
func TestSimpleUnsubscribe(t *testing.T) {
4751
bus := NewSimpleBus[any]()
4852
fn := func(_ any) {}
49-
bus.Subscribe("topic", fn)
53+
ref := bus.Subscribe("topic", fn)
5054
if !bus.HasCallback("topic") {
5155
t.Logf("Expected to have callback for topic but it is not present")
5256
t.Fail()
5357
}
54-
bus.Unsubscribe("topic", fn)
58+
bus.Unsubscribe("topic", ref)
5559
if bus.HasCallback("topic") {
5660
t.Logf("Expected to have no callback for topic after unsubscribe but it is present")
5761
t.Fail()
@@ -70,9 +74,9 @@ func TestSimpleUnsubscribeMethod(t *testing.T) {
7074
bus := NewSimpleBus[any]()
7175
h := &accumulator{val: 0}
7276

73-
bus.Subscribe("topic", h.Handle)
77+
ref := bus.Subscribe("topic", h.Handle)
7478
bus.Publish("topic", nil)
75-
bus.Unsubscribe("topic", h.Handle)
79+
bus.Unsubscribe("topic", ref)
7680
bus.Publish("topic", nil)
7781

7882
if h.val != 1 {
@@ -218,3 +222,19 @@ func BenchmarkSimpleAsynchronousPublishing(b *testing.B) {
218222
}
219223
bus.WaitAsync()
220224
}
225+
226+
type sumComponents struct {
227+
a int
228+
b int
229+
}
230+
231+
func calculator(args sumComponents) {
232+
fmt.Printf("%d\n", args.a+args.b)
233+
}
234+
235+
func TestSimpleExample(t *testing.T) {
236+
bus := NewSimpleBus[sumComponents]()
237+
ref := bus.Subscribe("main:calculator", calculator)
238+
bus.Publish("main:calculator", sumComponents{20, 40})
239+
bus.Unsubscribe("main:calculator", ref)
240+
}

0 commit comments

Comments
 (0)