-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver_test.go
More file actions
101 lines (91 loc) · 2.48 KB
/
server_test.go
File metadata and controls
101 lines (91 loc) · 2.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package main
import (
"fmt"
"log"
"os"
"sync"
"sync/atomic"
"testing"
"time"
)
func TestPubSub(t *testing.T) {
// This test will create a number of PubSub subscribers and publishers that will run concurrently
// the test asserts that the exact number of messages are received as expected
numSubscribers := 20
numPublishers := 20
numMessages := 50
expectedTotalMsgs := numMessages * numPublishers * numSubscribers
addr := "127.0.0.1:9911"
l := log.New(os.Stdout, "[PUBSUB SERVER]: ", log.Ldate|log.Ltime)
s := NewServer(l, addr)
go func() {
err := s.ListenAndServe()
l.Printf("server listen error: %s", err)
}()
// HACK to wait for PubSub HTTP server to be up
time.Sleep(1 * time.Second)
// used to track expected messages received
var wg sync.WaitGroup
var msgsReceived uint64
// expected messages to receive per PubSub client
wg.Add(expectedTotalMsgs)
// connect Subscribers
for i := 0; i < numSubscribers; i++ {
cl := NewClient(addr)
wsCl, msgCh, errCh, err := cl.Subscribe()
if err != nil {
t.Errorf("Failed to setup PubSub client[%d]: %s", i, err)
return
}
l := log.New(os.Stdout, fmt.Sprintf("[PUBSUB CLIENT: %s]: ", wsCl.LocalAddr()), log.Ldate|log.Ltime)
go func() {
for {
select {
case msg, ok := <-msgCh:
if !ok {
l.Printf("client subscription shutdown")
return
}
// received message
atomic.AddUint64(&msgsReceived, 1)
l.Printf("client received message [totalmsgs=%d]: %s", atomic.LoadUint64(&msgsReceived), msg)
wg.Done()
case err, ok := <-errCh:
if !ok {
l.Printf("client subscription shutdown")
return
}
l.Printf("client error: %v", err)
}
}
}()
}
// create Publishers and send messages
for i := 0; i < numPublishers; i++ {
pubID := i
go func() {
l := log.New(os.Stdout, fmt.Sprintf("[PUBSUB PUBLISHER: %d]: ", pubID), log.Ldate|log.Ltime)
p := NewPublisher(addr)
for j := 0; j < numMessages; j++ {
err := p.Publish(fmt.Sprintf("[Publisher: %d] Message [%d]", pubID, j))
if err != nil {
l.Printf("Publisher failed to send message: %s", err)
return
}
}
}()
}
// wait to receive all expected messages or timeout (test failure)
done := make(chan struct{})
go func() {
defer close(done)
wg.Wait()
}()
select {
case <-done:
return
case <-time.After(30 * time.Second):
t.Errorf("Expected %d messages to be received but received %d before timing out", expectedTotalMsgs, atomic.LoadUint64(&msgsReceived))
return
}
}