diff --git a/example/rabbitmq/README_QUORUM.md b/example/rabbitmq/README_QUORUM.md new file mode 100644 index 0000000..a68516c --- /dev/null +++ b/example/rabbitmq/README_QUORUM.md @@ -0,0 +1,394 @@ +# RabbitMQ Quorum Queue 使用指南 + +## 概述 + +go-queue 的 RabbitMQ 组件支持声明 Quorum Queue 队列,并利用其原生的 Delivery Limit 机制避免消息处理失败导致的死循环问题。 + +## Quorum Queue vs Classic Queue + +| 特性 | Classic Queue | Quorum Queue | +|------|---------------|---------------| +| 数据持久化 | 按消息配置 | 始终持久化 | +| 高可用性 | 不支持 | 支持(基于 Raft 共识算法) | +| 毒消息处理 | 不支持 | 支持(Delivery Limit) | +| 适用场景 | 临时队列、低延迟要求 | 关键业务数据、高可靠性要求 | + +## 使用场景 + +**推荐使用 Quorum Queue 的场景:** +- 订单系统 +- 投票系统 +- 需要确保消息不丢失的关键业务 +- 需要避免毒消息死循环的场景 + +**使用 Classic Queue 的场景:** +- 临时队列 +- 对延迟极其敏感的场景 +- 数据安全优先级不高的场景 + +## 配置说明 + +### QueueConf 配置参数 + +```go +type QueueConf struct { + Name string // 队列名称 + QueueType string // 队列类型: "classic" 或 "quorum",默认 "classic" + Durable bool // 是否持久化 + AutoDelete bool // 自动删除 + Exclusive bool // 排他性 + NoWait bool // 是否阻塞等待 + DeliveryLimit int64 // 投递限制(仅 Quorum Queue),0 表示使用 RabbitMQ 默认值 + DeadLetterExchange string // 死信交换机 + DeadLetterRoutingKey string // 死信路由键 +} +``` + +### 声明 Quorum Queue + +```go +admin := rabbitmq.MustNewAdmin(conf) + +// 使用新的 DeclareQueueConf 方法声明队列 +queueConf := rabbitmq.QueueConf{ + Name: "orders.quorum", + QueueType: "quorum", + Durable: true, + DeliveryLimit: 20, // 超过 20 次投递后,消息将被丢弃或死信 + DeadLetterExchange: "orders.dlx", + DeadLetterRoutingKey: "failed", +} + +err := admin.DeclareQueueConf(queueConf) +if err != nil { + log.Fatal(err) +} +``` + +## Delivery Limit 机制 + +### 工作原理 + +Quorum Queue 自动跟踪每条消息的投递次数,通过消息头 `x-delivery-count` 暴露给消费者: + +1. 消息首次被投递,`x-delivery-count = 1` +2. 如果消费者处理失败并执行 `Nack(requeue=true)`,消息重新入队 +3. 下次投递时,`x-delivery-count = 2` +4. 重复此过程,直到超过 `delivery-limit` +5. 超过限制后: + - 如果配置了死信交换机:消息被发送到死信队列 + - 未配置死信交换机:消息被丢弃 + +### 避免队列阻塞 + +Delivery Limit 机制完美解决了以下问题: + +**问题场景:** +```go +// 消费者代码中存在 Bug,导致某些消息始终处理失败 +func (h Handler) Consume(message string) error { + if strings.Contains(message, "buggy") { + return errors.New("处理失败") + } + return nil +} +``` + +**如果不使用 Delivery Limit:** +1. 包含 "buggy" 的消息被消费 +2. 处理失败,Nack + requeue=true +3. 消息立即重新投递 +4. 无限循环,队列被阻塞 + +**使用 Delivery Limit:** +1. 包含 "buggy" 的消息被消费 +2. 处理失败,Nack + requeue=true,投递次数 +1 +3. 重复投递最多 20 次(默认) +4. 超过限制后,消息进入死信队列或被丢弃 +5. 队列可以继续处理其他消息 + +## 完整示例 + +### 1. 声明基础设施 + +```go +package main + +import ( + "log" + + "github.com/zeromicro/go-queue/rabbitmq" +) + +func main() { + conf := rabbitmq.RabbitConf{ + Host: "localhost", + Port: 5672, + Username: "guest", + Password: "guest", + } + admin := rabbitmq.MustNewAdmin(conf) + + // 声明死信交换机 + err := admin.DeclareExchange(rabbitmq.ExchangeConf{ + ExchangeName: "orders.dlx", + Type: "direct", + Durable: true, + }, nil) + if err != nil { + log.Fatal(err) + } + + // 声明死信队列 + err = admin.DeclareQueueConf(rabbitmq.QueueConf{ + Name: "orders.failed", + QueueType: "quorum", + Durable: true, + }) + if err != nil { + log.Fatal(err) + } + + // 绑定死信队列 + err = admin.Bind("orders.failed", "failed", "orders.dlx", false, nil) + if err != nil { + log.Fatal(err) + } + + // 声明主交换机 + err = admin.DeclareExchange(rabbitmq.ExchangeConf{ + ExchangeName: "orders", + Type: "direct", + Durable: true, + }, nil) + if err != nil { + log.Fatal(err) + } + + // 声明主队列(Quorum Queue,带 Delivery Limit) + err = admin.DeclareQueueConf(rabbitmq.QueueConf{ + Name: "orders.main", + QueueType: "quorum", + Durable: true, + DeliveryLimit: 20, // 限制投递次数 + DeadLetterExchange: "orders.dlx", // 死信交换机 + DeadLetterRoutingKey: "failed", // 死信路由键 + }) + if err != nil { + log.Fatal(err) + } + + // 绑定主队列 + err = admin.Bind("orders.main", "created", "orders", false, nil) + if err != nil { + log.Fatal(err) + } +} +``` + +### 2. 消费者配置 + +```go +package main + +import ( + "context" + "fmt" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/zeromicro/go-zero/core/logc" + "github.com/zeromicro/go-zero/core/service" + "github.com/zeromicro/go-queue/rabbitmq" +) + +type OrderHandler struct{} + +func (h OrderHandler) Consume(message string) error { + fmt.Printf("Processing order: %s\n", message) + + // 业务处理逻辑 + err := processOrder(message) + if err != nil { + return err // 返回错误,消息将被重新投递 + } + + return nil +} + +func main() { + listenerConf := rabbitmq.RabbitListenerConf{ + RabbitConf: rabbitmq.RabbitConf{ + Host: "localhost", + Port: 5672, + Username: "guest", + Password: "guest", + }, + ListenerQueues: []rabbitmq.ConsumerConf{ + { + Name: "orders.main", + AutoAck: false, // 必须手动确认 + }, + }, + } + + // 自定义错误处理器 + listener := rabbitmq.MustNewListener(listenerConf, OrderHandler{}, + rabbitmq.WithErrorHandler(func(ctx context.Context, msg amqp.Delivery, err error) { + // 记录详细错误信息 + deliveryCount := msg.Headers["x-delivery-count"] + logc.Errorf(ctx, "Failed to process order (delivery count: %v): %s, error: %v", + deliveryCount, string(msg.Body), err) + }), + ) + + serviceGroup := service.NewServiceGroup() + serviceGroup.Add(listener) + defer serviceGroup.Stop() + serviceGroup.Start() +} + +func processOrder(message string) error { + // 实际的业务处理逻辑 + return nil +} +``` + +### 3. 死信消息消费者(可选) + +```go +package main + +import ( + "fmt" + "log" + + "github.com/zeromicro/go-zero/core/service" + "github.com/zeromicro/go-queue/rabbitmq" +) + +type FailedOrderHandler struct{} + +func (h FailedOrderHandler) Consume(message string) error { + fmt.Printf("Processing failed order: %s\n", message) + + // 记录失败订单到数据库,发送告警等 + return nil +} + +func main() { + listenerConf := rabbitmq.RabbitListenerConf{ + RabbitConf: rabbitmq.RabbitConf{ + Host: "localhost", + Port: 5672, + Username: "guest", + Password: "guest", + }, + ListenerQueues: []rabbitmq.ConsumerConf{ + { + Name: "orders.failed", + AutoAck: true, // 死信消息可以直接自动确认 + }, + }, + } + + listener := rabbitmq.MustNewListener(listenerConf, FailedOrderHandler{}) + serviceGroup := service.NewServiceGroup() + serviceGroup.Add(listener) + defer serviceGroup.Stop() + serviceGroup.Start() +} +``` + +## 配置选项说明 + +### DeliveryLimit 值选择 + +- `-1`:无限制(不推荐,可能导致死循环) +- `0`:使用 RabbitMQ 默认值(RabbitMQ 4.0+ 默认为 20) +- `1-N`:指定最大投递次数(推荐 10-20) + +```go +// 推荐:限制 10 次投递 +DeliveryLimit: 10, + +// 无限制(谨慎使用) +DeliveryLimit: -1, + +// 使用 RabbitMQ 默认值(20) +DeliveryLimit: 0, +``` + +### 死信配置策略 + +**策略一:允许消息丢弃(简单)** +```go +// 不配置死信交换机,消息被丢弃 +queueConf := rabbitmq.QueueConf{ + Name: "orders.main", + QueueType: "quorum", + DeliveryLimit: 20, +} +``` + +**策略二:死信到专用队列(推荐)** +```go +// 失败消息进入死信队列,可以后续分析或重试 +queueConf := rabbitmq.QueueConf{ + Name: "orders.main", + QueueType: "quorum", + DeliveryLimit: 20, + DeadLetterExchange: "orders.dlx", + DeadLetterRoutingKey: "failed", +} +``` + +## 注意事项 + +1. **AutoAck 必须为 false** + ```go + ConsumerConf{ + Name: "orders.main", + AutoAck: false, // 必须手动确认,否则 Nack 不生效 + } + ``` + +2. **Delivery Limit 仅对 Quorum Queue 生效** + - Classic Queue 不会投递次数限制 + +3. **RabbitMQ 版本要求** + - RabbitMQ 3.8+ 支持 Quorum Queue + - RabbitMQ 3.11+ 支持 Delivery Limit + - RabbitMQ 4.0+ 默认 delivery-limit 为 20 + +4. **死信队列类型** + - 建议死信队列也使用 Quorum Queue 以确保数据安全 + +## 最佳实践 + +1. **为所有 Quorum Queue 配置 Delivery Limit** + ```go + DeliveryLimit: 20, + ``` + +2. **为关键队列配置死信交换机** + ```go + DeadLetterExchange: "app.dlx", + DeadLetterRoutingKey: "queue-name", + ``` + +3. **在错误处理器中记录投递次数** + ```go + rabbitmq.WithErrorHandler(func(ctx context.Context, msg amqp.Delivery, err error) { + deliveryCount := msg.Headers["x-delivery-count"] + logc.Errorf(ctx, "Error (delivery count: %v): %v", deliveryCount, err) + }) + ``` + +4. **监控死信队列** + - 定期检查死信队列中的消息数量 + - 分析失败原因,修复 bug 或调整业务逻辑 + +## 参考资料 + +- [RabbitMQ Quorum Queues 文档](https://www.rabbitmq.com/quorum-queues.html) +- [RabbitMQ Dead Letter Exchange](https://www.rabbitmq.com/dlx.html) diff --git a/example/rabbitmq/admin/admin.go b/example/rabbitmq/admin/admin.go index d0ee705..47c436f 100644 --- a/example/rabbitmq/admin/admin.go +++ b/example/rabbitmq/admin/admin.go @@ -44,4 +44,26 @@ func main() { if err != nil { log.Fatal(err) } + + err = admin.DeclareExchange(rabbitmq.ExchangeConf{ + ExchangeName: "orders.dlx", + Type: "direct", + Durable: true, + }, nil) + if err != nil { + log.Fatal(err) + } + + quorumQueueConf := rabbitmq.QueueConf{ + Name: "orders.quorum", + QueueType: "quorum", + Durable: true, + DeliveryLimit: 20, + DeadLetterExchange: "orders.dlx", + DeadLetterRoutingKey: "failed", + } + err = admin.DeclareQueueConf(quorumQueueConf) + if err != nil { + log.Fatal(err) + } } diff --git a/rabbitmq/config.go b/rabbitmq/config.go index 3b1f1ff..960025e 100644 --- a/rabbitmq/config.go +++ b/rabbitmq/config.go @@ -32,11 +32,15 @@ type RabbitSenderConf struct { } type QueueConf struct { - Name string - Durable bool `json:",default=true"` - AutoDelete bool `json:",default=false"` - Exclusive bool `json:",default=false"` - NoWait bool `json:",default=false"` + Name string + QueueType string `json:",options=classic|quorum,default=classic"` + Durable bool `json:",default=true"` + AutoDelete bool `json:",default=false"` + Exclusive bool `json:",default=false"` + NoWait bool `json:",default=false"` + DeliveryLimit int64 `json:",optional"` + DeadLetterExchange string `json:",optional"` + DeadLetterRoutingKey string `json:",optional"` } type ExchangeConf struct { diff --git a/rabbitmq/listener.go b/rabbitmq/listener.go index 90de462..d7e6ed6 100644 --- a/rabbitmq/listener.go +++ b/rabbitmq/listener.go @@ -1,10 +1,11 @@ package rabbitmq import ( + "context" "log" amqp "github.com/rabbitmq/amqp091-go" - "github.com/zeromicro/go-zero/core/logx" + "github.com/zeromicro/go-zero/core/logc" "github.com/zeromicro/go-zero/core/queue" ) @@ -15,17 +16,37 @@ type ( Consume(message string) error } + ConsumeErrorHandler func(ctx context.Context, msg amqp.Delivery, err error) + + listenerOptions struct { + errorHandler ConsumeErrorHandler + } + + ListenerOption func(*listenerOptions) + RabbitListener struct { - conn *amqp.Connection - channel *amqp.Channel - forever chan bool - handler ConsumeHandler - queues RabbitListenerConf + conn *amqp.Connection + channel *amqp.Channel + forever chan bool + handler ConsumeHandler + queues RabbitListenerConf + errorHandler ConsumeErrorHandler } ) -func MustNewListener(listenerConf RabbitListenerConf, handler ConsumeHandler) queue.MessageQueue { - listener := RabbitListener{queues: listenerConf, handler: handler, forever: make(chan bool)} +func MustNewListener(listenerConf RabbitListenerConf, handler ConsumeHandler, opts ...ListenerOption) queue.MessageQueue { + var options listenerOptions + for _, opt := range opts { + opt(&options) + } + ensureListenerOptions(&options) + + listener := RabbitListener{ + queues: listenerConf, + handler: handler, + forever: make(chan bool), + errorHandler: options.errorHandler, + } conn, err := amqp.Dial(getRabbitURL(listenerConf.RabbitConf)) if err != nil { log.Fatalf("failed to connect rabbitmq, error: %v", err) @@ -43,6 +64,7 @@ func MustNewListener(listenerConf RabbitListenerConf, handler ConsumeHandler) qu func (q RabbitListener) Start() { for _, que := range q.queues.ListenerQueues { + que := que msg, err := q.channel.Consume( que.Name, "", @@ -58,8 +80,14 @@ func (q RabbitListener) Start() { go func() { for d := range msg { + ctx := context.Background() if err := q.handler.Consume(string(d.Body)); err != nil { - logx.Errorf("Error on consuming: %s, error: %v", string(d.Body), err) + q.errorHandler(ctx, d, err) + if !que.AutoAck { + if e := d.Nack(false, true); e != nil { + logc.Errorf(ctx, "nack failed, error: %v", e) + } + } } } }() @@ -73,3 +101,17 @@ func (q RabbitListener) Stop() { q.conn.Close() close(q.forever) } + +func WithErrorHandler(errorHandler ConsumeErrorHandler) ListenerOption { + return func(options *listenerOptions) { + options.errorHandler = errorHandler + } +} + +func ensureListenerOptions(options *listenerOptions) { + if options.errorHandler == nil { + options.errorHandler = func(ctx context.Context, msg amqp.Delivery, err error) { + logc.Errorf(ctx, "consume: %s, error: %v", string(msg.Body), err) + } + } +} diff --git a/rabbitmq/listener_test.go b/rabbitmq/listener_test.go new file mode 100644 index 0000000..7e3ccfe --- /dev/null +++ b/rabbitmq/listener_test.go @@ -0,0 +1,87 @@ +package rabbitmq + +import ( + "context" + "errors" + "testing" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/stretchr/testify/assert" +) + +func TestEnsureListenerOptions(t *testing.T) { + t.Run("default error handler", func(t *testing.T) { + var opts listenerOptions + ensureListenerOptions(&opts) + assert.NotNil(t, opts.errorHandler) + }) + + t.Run("custom error handler is preserved", func(t *testing.T) { + var opts listenerOptions + called := false + customHandler := func(ctx context.Context, msg amqp.Delivery, err error) { + called = true + } + opts.errorHandler = customHandler + ensureListenerOptions(&opts) + opts.errorHandler(context.Background(), amqp.Delivery{}, nil) + assert.True(t, called, "custom handler should be preserved") + }) +} + +func TestWithErrorHandler(t *testing.T) { + t.Run("set error handler", func(t *testing.T) { + var opts listenerOptions + called := false + customHandler := func(ctx context.Context, msg amqp.Delivery, err error) { + called = true + } + opt := WithErrorHandler(customHandler) + opt(&opts) + assert.NotNil(t, opts.errorHandler) + opts.errorHandler(context.Background(), amqp.Delivery{}, nil) + assert.True(t, called, "custom handler should be called") + }) +} + +type mockConsumeHandler struct { + shouldFail bool +} + +func (m mockConsumeHandler) Consume(message string) error { + if m.shouldFail { + return errors.New("consume failed") + } + return nil +} + +func TestConsumeErrorHandler(t *testing.T) { + t.Run("error handler is called on failure", func(t *testing.T) { + errorCalled := false + var capturedMsg amqp.Delivery + var capturedErr error + + customHandler := func(ctx context.Context, msg amqp.Delivery, err error) { + errorCalled = true + capturedMsg = msg + capturedErr = err + } + + testMsg := amqp.Delivery{ + Body: []byte("test message"), + } + + opts := listenerOptions{errorHandler: customHandler} + handler := mockConsumeHandler{shouldFail: true} + + ctx := context.Background() + err := handler.Consume(string(testMsg.Body)) + if err != nil { + opts.errorHandler(ctx, testMsg, err) + } + + assert.True(t, errorCalled) + assert.Equal(t, "test message", string(capturedMsg.Body)) + assert.Equal(t, "consume failed", capturedErr.Error()) + }) +} diff --git a/rabbitmq/rabbitmqadmin.go b/rabbitmq/rabbitmqadmin.go index 17c28fd..cebaadf 100644 --- a/rabbitmq/rabbitmqadmin.go +++ b/rabbitmq/rabbitmqadmin.go @@ -62,3 +62,39 @@ func (q *Admin) Bind(queueName string, routekey string, exchange string, notWait args, ) } + +func buildQueueArgs(conf QueueConf) amqp.Table { + args := make(amqp.Table) + + if conf.QueueType != "" && conf.QueueType != "classic" { + args["x-queue-type"] = conf.QueueType + } + + if conf.DeliveryLimit != 0 { + args["x-delivery-limit"] = conf.DeliveryLimit + } + + if conf.DeadLetterExchange != "" { + args["x-dead-letter-exchange"] = conf.DeadLetterExchange + } + + if conf.DeadLetterRoutingKey != "" { + args["x-dead-letter-routing-key"] = conf.DeadLetterRoutingKey + } + + return args +} + +func (q *Admin) DeclareQueueConf(conf QueueConf) error { + args := buildQueueArgs(conf) + _, err := q.channel.QueueDeclare( + conf.Name, + conf.Durable, + conf.AutoDelete, + conf.Exclusive, + conf.NoWait, + args, + ) + + return err +} diff --git a/rabbitmq/rabbitmqadmin_test.go b/rabbitmq/rabbitmqadmin_test.go new file mode 100644 index 0000000..05931ab --- /dev/null +++ b/rabbitmq/rabbitmqadmin_test.go @@ -0,0 +1,109 @@ +package rabbitmq + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBuildQueueArgs(t *testing.T) { + t.Run("classic queue without extra args", func(t *testing.T) { + conf := QueueConf{ + Name: "test-queue", + Durable: true, + } + args := buildQueueArgs(conf) + + assert.Empty(t, args) + }) + + t.Run("quorum queue type", func(t *testing.T) { + conf := QueueConf{ + Name: "test-quorum", + QueueType: "quorum", + } + args := buildQueueArgs(conf) + + assert.Equal(t, "quorum", args["x-queue-type"]) + }) + + t.Run("classic queue type explicit", func(t *testing.T) { + conf := QueueConf{ + Name: "test-classic", + QueueType: "classic", + } + args := buildQueueArgs(conf) + + assert.Empty(t, args) + }) + + t.Run("with delivery limit", func(t *testing.T) { + conf := QueueConf{ + Name: "test-delivery-limit", + DeliveryLimit: 20, + } + args := buildQueueArgs(conf) + + assert.Equal(t, int64(20), args["x-delivery-limit"]) + }) + + t.Run("with delivery limit zero", func(t *testing.T) { + conf := QueueConf{ + Name: "test-no-delivery-limit", + DeliveryLimit: 0, + } + args := buildQueueArgs(conf) + + assert.Nil(t, args["x-delivery-limit"]) + }) + + t.Run("with dead letter exchange", func(t *testing.T) { + conf := QueueConf{ + Name: "test-dlx", + DeadLetterExchange: "dlx.exchange", + } + args := buildQueueArgs(conf) + + assert.Equal(t, "dlx.exchange", args["x-dead-letter-exchange"]) + }) + + t.Run("with dead letter routing key", func(t *testing.T) { + conf := QueueConf{ + Name: "test-dlx-rk", + DeadLetterRoutingKey: "failed", + } + args := buildQueueArgs(conf) + + assert.Equal(t, "failed", args["x-dead-letter-routing-key"]) + }) + + t.Run("quorum queue with all options", func(t *testing.T) { + conf := QueueConf{ + Name: "orders.quorum", + QueueType: "quorum", + DeliveryLimit: 20, + DeadLetterExchange: "orders.dlx", + DeadLetterRoutingKey: "failed", + } + args := buildQueueArgs(conf) + + assert.Equal(t, "quorum", args["x-queue-type"]) + assert.Equal(t, int64(20), args["x-delivery-limit"]) + assert.Equal(t, "orders.dlx", args["x-dead-letter-exchange"]) + assert.Equal(t, "failed", args["x-dead-letter-routing-key"]) + }) + + t.Run("delivery limit -1 means no limit", func(t *testing.T) { + conf := QueueConf{ + Name: "test-unlimited", + DeliveryLimit: -1, + } + args := buildQueueArgs(conf) + + assert.Equal(t, int64(-1), args["x-delivery-limit"]) + }) +} + +func TestAdmin_DeclareQueueConf_Integration(t *testing.T) { + t.Skip("integration test skipped, requires RabbitMQ server") +}