This repository was archived by the owner on Feb 9, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathvolta_application.go
More file actions
218 lines (179 loc) · 4.4 KB
/
volta_application.go
File metadata and controls
218 lines (179 loc) · 4.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
package volta
import (
"errors"
"fmt"
"github.com/fatih/color"
"github.com/rabbitmq/amqp091-go"
"sync"
"time"
)
type App struct {
// Configuration
config Config
// RabbitMQ connection
connectRetries int
baseConnection *amqp091.Connection
mutex sync.Mutex
// Global Middlewares
middlewares []Handler
// Exchanges
exchanges map[string]Exchange
// Queues
queues map[string]Queue
// Handlers
handlers map[string][]Handler
// Error handlers
onBindError OnBindError
}
// New creates a new App instance
func New(config Config) *App {
// Create a new App instance
app := &App{config: config}
// Set the configuration to the given one
if config.RabbitMQ == "" {
app.config.RabbitMQ = DefaultConfig.RabbitMQ
}
if config.Marshal == nil {
app.config.Marshal = DefaultConfig.Marshal
}
if config.Unmarshal == nil {
app.config.Unmarshal = DefaultConfig.Unmarshal
}
return app
}
func (a *App) initExchanges() error {
if !a.config.DisableLogging {
color.Cyan("\nRegistering exchanges...\n")
}
for _, exchange := range a.exchanges {
err := a.declareExchange(exchange)
if err != nil {
return errors.New(fmt.Sprintf("volta: Problem with declaring exchange %s: %s", exchange.Name, err.Error()))
}
if !a.config.DisableLogging {
color.HiWhite("Exchange \"%s\" registered", exchange.Name)
}
}
return nil
}
func (a *App) initQueues() error {
if !a.config.DisableLogging {
color.Cyan("\nRegistering queues...\n")
}
for _, queue := range a.queues {
if queue.Exchange != "" {
err := a.declareQueue(queue)
if err != nil {
return errors.New(fmt.Sprintf("volta: Problem with declaring queue %s: %s", queue.Name, err.Error()))
}
if !a.config.DisableLogging {
color.HiWhite("Queue \"%s\" registered", queue.Name)
}
} else {
if !a.config.DisableLogging {
color.HiRed("Queue \"%s\" skipped (no exchange)", queue.Name)
}
}
}
return nil
}
func (a *App) initConsumers() error {
if !a.config.DisableLogging {
color.Cyan("\nRegistering consumers...\n")
}
for rk, handlers := range a.handlers {
if err := a.consume(rk, handlers...); err != nil {
return errors.New(fmt.Sprintf("volta: Problem with consuming queue %s: %s", rk, err.Error()))
} else {
if !a.config.DisableLogging {
color.HiWhite("Consumer \"%s\" registered", rk)
}
}
}
return nil
}
func (a *App) connect() (err error) {
if !a.config.DisableLogging {
color.Cyan("Connecting to RabbitMQ...\n")
}
a.baseConnection, err = amqp091.Dial(a.config.RabbitMQ)
if err != nil {
if !a.config.DisableLogging {
color.HiRed("volta: Problem with connecting to RabbitMQ: %s", err.Error())
}
a.connectRetries++
if a.connectRetries > a.config.ConnectRetries {
return errors.New("volta: Problem with connecting to RabbitMQ")
}
time.Sleep(time.Duration(a.config.ConnectRetryInterval) * time.Second)
connError := a.connect()
if connError != nil {
return connError
}
}
return nil
}
// Listen starts the application, registers the error handler and connects to RabbitMQ
func (a *App) Listen() error {
// Connect to RabbitMQ
if err := a.connect(); err != nil {
return err
}
// Register exchanges
if err := a.initExchanges(); err != nil {
return err
}
// Register queues
if err := a.initQueues(); err != nil {
return err
}
// Register consumers
if err := a.initConsumers(); err != nil {
return err
}
// Check for connection active
go func() {
if !a.config.DisableLogging {
color.HiWhite("\nConnection watcher registered")
}
for {
if a.baseConnection.IsClosed() {
if !a.config.DisableLogging {
color.HiRed("Connection to RabbitMQ lost, reconnecting...")
}
a.Listen()
}
time.Sleep(5 * time.Second)
}
}()
// Infinite loop
forever := make(chan bool)
<-forever
return nil
}
// MustListen starts the application, registers the error handler and connects to RabbitMQ
// It panics if an error occurs
func (a *App) MustListen() {
if err := a.Listen(); err != nil {
panic(err)
}
}
// Close closes the connection to RabbitMQ
func (a *App) Close() error {
err := a.baseConnection.Close()
if err != nil {
return err
}
return nil
}
// MustClose closes the connection to RabbitMQ and panics if an error occurs
func (a *App) MustClose() {
if err := a.Close(); err != nil {
panic(err)
}
}
func (a *App) Use(middlewares ...Handler) {
a.mutex.Lock()
defer a.mutex.Unlock()
a.middlewares = append(a.middlewares, middlewares...)
}