-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcstx.go
More file actions
95 lines (80 loc) · 2.2 KB
/
cstx.go
File metadata and controls
95 lines (80 loc) · 2.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package rmqworker
import (
"sync"
"github.com/google/uuid"
"go.uber.org/zap"
"github.com/matrixbotio/rmqworker-lib/pkg/cstx"
"github.com/matrixbotio/rmqworker-lib/pkg/errs"
)
var (
cstxAcksConsumer *RMQWorker
cstxAcksConsumerStartedLock sync.Mutex
)
func (handler *RMQHandler) StartCSTXAcksConsumer() errs.APIError {
cstxAcksConsumerStartedLock.Lock()
defer cstxAcksConsumerStartedLock.Unlock()
if cstxAcksConsumer != nil {
return nil
}
err := handler.DeclareExchanges(map[string]string{cstx.ExchangeName: ExchangeTypeTopic})
if err != nil {
return err
}
queueName := cstx.ExchangeName + "-" + uuid.NewString()
task := WorkerTask{
QueueName: queueName,
ISQueueDurable: false,
ISAutoDelete: true,
Callback: func(worker *RMQWorker, deliveryHandler RMQDeliveryHandler) {
var ack cstx.AckMessage
body := deliveryHandler.GetMessageBody()
if len(body) > 0 {
if err := json.Unmarshal(body, &ack); err != nil {
worker.Logger.Error("unmarshal CrossServiceTransaction Ack message body", zap.Error(err))
return
}
}
cstx.AcksMapLock.Lock()
cstx.AcksMap[ack.TXID] = append(cstx.AcksMap[ack.TXID], ack)
cstx.AcksMapLock.Unlock()
},
ID: queueName,
FromExchange: cstx.ExchangeName,
ExchangeType: ExchangeTypeTopic,
ConsumersCount: 1,
WorkerName: queueName,
QueueLength: 1000,
MessagesLifetime: cstx.StandardAckMessageLifetime,
}
cstxAcksConsumer, err = handler.NewRMQWorker(task)
if err != nil {
return err
}
if err := cstxAcksConsumer.Serve(); err != nil {
return err
}
cstx.IsCSTXAcksConsumerSet = true
go cstx.StartAcksCleaner()
return nil
}
func (d *RMQDeliveryHandler) GetCSTX(handler *RMQHandler) cstx.CrossServiceTransaction {
var CSTX cstx.CrossServiceTransaction
ID, exists := d.GetHeader(cstx.HeaderID)
if exists {
CSTX.ID = ID.(string)
}
ackNum, exists := d.GetHeader(cstx.HeaderAckNum)
if exists {
CSTX.AckNum = ackNum.(int32)
}
timeout, exists := d.GetHeader(cstx.HeaderTimeout)
if exists {
CSTX.Timeout = timeout.(int32)
}
startedAt, exists := d.GetHeader(cstx.HeaderStartedAt)
if exists {
CSTX.StartedAt = startedAt.(int64)
}
CSTX.Handler = handler
return CSTX
}