Skip to content

Commit dcbc8eb

Browse files
committed
fix(tracing): add amqp prefetch
1 parent e51ccb9 commit dcbc8eb

3 files changed

Lines changed: 19 additions & 5 deletions

File tree

services/tracing/ingress-archiver/service/transport.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@ import (
1414
"sensorbucket.nl/sensorbucket/pkg/mq"
1515
)
1616

17-
func StartIngressDTOConsumer(conn *mq.AMQPConnection, svc *Application, queue, xchg, topic string) {
17+
func StartIngressDTOConsumer(conn *mq.AMQPConnection, svc *Application, queue, xchg, topic string, prefetch int) {
1818
consume := conn.Consume(queue, func(c *amqp091.Channel) error {
19+
if err := c.Qos(prefetch, 0, false); err != nil {
20+
return fmt.Errorf("error setting Qos with prefetch on amqp: %w", err)
21+
}
1922
_, err := c.QueueDeclare(queue, true, false, false, false, nil)
2023
if err != nil {
2124
return err
2225
}
23-
2426
// Create exchange and bind if both arguments are provided, this is optional
2527
if xchg != "" && topic != "" {
2628
if err := c.ExchangeDeclare(xchg, "topic", true, false, false, false, nil); err != nil {

services/tracing/main.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"net/http"
99
"os"
1010
"os/signal"
11+
"strconv"
1112
"time"
1213

1314
"github.com/go-chi/chi/v5"
@@ -40,6 +41,7 @@ var (
4041
AMQP_QUEUE_INGRESS = env.Could("AMQP_QUEUE_INGRESS", "archive-ingress")
4142
AMQP_XCHG_INGRESS = env.Could("AMQP_XCHG_INGRESS", "ingress")
4243
AMQP_XCHG_INGRESS_TOPIC = env.Could("AMQP_XCHG_INGRESS_TOPIC", "ingress.*")
44+
AMQP_PREFETCH = env.Could("AMQP_PREFETCH", "5")
4345
AUTH_JWKS_URL = env.Could("AUTH_JWKS_URL", "http://oathkeeper:4456/.well-known/jwks.json")
4446
)
4547

@@ -61,6 +63,11 @@ func Run(cleanup cleanupper.Cleanupper) error {
6163
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
6264
defer cancel()
6365

66+
prefetch, err := strconv.Atoi(AMQP_PREFETCH)
67+
if err != nil {
68+
return err
69+
}
70+
6471
stopProfiler, err := web.RunProfiler()
6572
if err != nil {
6673
log.Printf("could not setup profiler server: %s\n", err)
@@ -93,6 +100,7 @@ func Run(cleanup cleanupper.Cleanupper) error {
93100
go ingressarchiver.StartIngressDTOConsumer(
94101
mqConn, svc,
95102
AMQP_QUEUE_INGRESS, AMQP_XCHG_INGRESS, AMQP_XCHG_INGRESS_TOPIC,
103+
prefetch,
96104
)
97105
ingressarchiver.CreateHTTPTransport(r, svc)
98106
}
@@ -107,6 +115,7 @@ func Run(cleanup cleanupper.Cleanupper) error {
107115
AMQP_QUEUE_PIPELINEMESSAGES,
108116
AMQP_XCHG_PIPELINEMESSAGES,
109117
AMQP_XCHG_PIPELINEMESSAGES_TOPIC,
118+
prefetch,
110119
)
111120
tracinghttp := tracingtransport.NewHTTP(tracingService, HTTP_BASE)
112121
tracinghttp.SetupRoutes(r)

services/tracing/tracing/transport/mq.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ import (
1313
"sensorbucket.nl/sensorbucket/services/tracing/tracing"
1414
)
1515

16-
func StartMQ(svc *tracing.Service, conn *mq.AMQPConnection, queue, xchg, topic string) {
17-
pipelineMessages := mq.Consume(conn, queue, setupFunc(queue, xchg, topic))
16+
func StartMQ(svc *tracing.Service, conn *mq.AMQPConnection, queue, xchg, topic string, prefetch int) {
17+
pipelineMessages := mq.Consume(conn, queue, setupFunc(prefetch, queue, xchg, topic))
1818

1919
log.Println("Measurement MQ Transport running")
2020
go processMessage(pipelineMessages, svc)
@@ -89,8 +89,11 @@ func processMessage(deliveries <-chan amqp091.Delivery, svc *tracing.Service) {
8989
}
9090
}
9191

92-
func setupFunc(queue, xchg, topic string) mq.AMQPSetupFunc {
92+
func setupFunc(prefetch int, queue, xchg, topic string) mq.AMQPSetupFunc {
9393
return func(c *amqp091.Channel) error {
94+
if err := c.Qos(prefetch, 0, false); err != nil {
95+
return fmt.Errorf("error setting Qos with prefetch on amqp: %w", err)
96+
}
9497
_, err := c.QueueDeclare(queue, true, false, false, false, nil)
9598
if err != nil {
9699
return fmt.Errorf("error declaring amqp queue: %w", err)

0 commit comments

Comments
 (0)