-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconsumer_test.go
More file actions
88 lines (78 loc) · 1.79 KB
/
consumer_test.go
File metadata and controls
88 lines (78 loc) · 1.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
//go:build !race
package rabbitmq
import (
amqp "github.com/rabbitmq/amqp091-go"
"github.com/sidmal/rabbitmq/test/proto"
"github.com/stretchr/testify/assert"
"reflect"
"testing"
"time"
)
func TestConsumer_ReConsume_MayRunFalse(t *testing.T) {
cons := &consumer{}
cons.reConsume()
}
func TestConsumer_ReConsume_RabbitMQClose(t *testing.T) {
cons := &consumer{
mayRun: true,
rabbitMQ: &rabbitMq{
close: make(chan bool, 1),
waitConnection: make(chan struct{}, 1),
},
}
cons.rabbitMQ.waitConnection <- struct{}{}
tp := time.NewTimer(time.Second * 1)
exit := make(chan struct{}, 1)
go func() {
cons.reConsume()
}()
select {
case <-tp.C:
cons.rabbitMQ.close <- true
exit <- struct{}{}
}
<-exit
}
func TestConsumer_ReConsume_ConsumeError(t *testing.T) {
opts := &Options{
dsn: defaultAmqpUrl,
consumer: &ConsumerOptions{
Args: map[string]interface{}{
"some_undefined_arg": &proto.One{},
},
},
topic: "TestConsumer_ReConsume_ConsumeError",
}
rmq, err := newRabbitMq(opts)
assert.NoError(t, err)
cons := &consumer{
mayRun: true,
rabbitMQ: rmq,
minResubscribeDelay: 100 * time.Millisecond,
maxResubscribeDelay: 1 * time.Second,
}
tp := time.NewTimer(time.Second * 2)
exit := make(chan struct{}, 1)
go func() {
cons.reConsume()
}()
select {
case <-tp.C:
cons.rabbitMQ.close <- true
exit <- struct{}{}
}
<-exit
}
func TestConsumer_HandlerExecute_Error(t *testing.T) {
msg, err := NewProtobufEncoder().Marshal(&proto.One{Value: "test"})
assert.NoError(t, err)
publishing := amqp.Delivery{
Body: msg,
}
h := &handler{
msgType: reflect.TypeOf(&proto.One{}).Elem(),
}
err = h.execute(publishing, &TestFailMessageEncoder{})
assert.Error(t, err)
assert.EqualError(t, err, "TestFailMessageEncoder_Unmarshal")
}