|
1 | 1 | # Bunny |
2 | 2 |
|
3 | | -Golang RabbitMQ Producer and Consumer |
| 3 | +A Go library that provides a high-level wrapper around RabbitMQ (AMQP) with built-in OpenTelemetry tracing, Prometheus metrics, and flexible message serialization. |
| 4 | + |
| 5 | +## Features |
| 6 | + |
| 7 | +- **Producer and Consumer** abstractions over `amqp091-go` |
| 8 | +- **Automatic reconnection** with backoff on connection/channel loss |
| 9 | +- **Publisher confirms** to ensure delivery before returning |
| 10 | +- **OpenTelemetry tracing** with trace context propagated via AMQP headers |
| 11 | +- **Prometheus metrics** for message counts and processing latency |
| 12 | +- **Pluggable serializers/deserializers**: JSON, Protocol Buffers, ProtoJSON, raw bytes, or custom |
| 13 | +- **Retry handling** via a built-in `RetryPublisher` error handler |
| 14 | +- **Functional options** pattern for clean, composable configuration |
| 15 | + |
| 16 | +## Requirements |
| 17 | + |
| 18 | +- Go 1.22.2+ |
| 19 | +- A running RabbitMQ instance (default: `localhost:5672`, credentials `guest`/`guest`) |
| 20 | + |
| 21 | +## Installation |
| 22 | + |
| 23 | +```bash |
| 24 | +go get github.com/patrickjmcd/bunny |
| 25 | +``` |
| 26 | + |
| 27 | +## Usage |
| 28 | + |
| 29 | +### Producer |
| 30 | + |
| 31 | +```go |
| 32 | +import ( |
| 33 | + "context" |
| 34 | + "github.com/patrickjmcd/bunny/producer" |
| 35 | +) |
| 36 | + |
| 37 | +p, err := producer.NewRabbitProducer( |
| 38 | + producer.WithConnectionString("amqp://guest:guest@localhost:5672/"), |
| 39 | + producer.WithExchangeName("my-exchange"), |
| 40 | + producer.WithExchangeType("topic"), |
| 41 | + producer.WithQueueName("my-queue"), |
| 42 | + producer.WithTopic("my.routing.key"), |
| 43 | + producer.WithValueSerializer(producer.NewJsonSerializer[MyMessage]()), |
| 44 | +) |
| 45 | +if err != nil { |
| 46 | + log.Fatal(err) |
| 47 | +} |
| 48 | +defer p.Close() |
| 49 | + |
| 50 | +// Wait until the connection is ready |
| 51 | +<-p.Ready |
| 52 | + |
| 53 | +err = p.ProduceMessage(context.Background(), "correlation-id", MyMessage{...}, "") |
| 54 | +``` |
| 55 | + |
| 56 | +To publish a raw AMQP message: |
| 57 | + |
| 58 | +```go |
| 59 | +import amqp "github.com/rabbitmq/amqp091-go" |
| 60 | + |
| 61 | +err = p.ProduceRaw(ctx, "correlation-id", &amqp.Publishing{ |
| 62 | + ContentType: "application/json", |
| 63 | + Body: []byte(`{"hello":"world"}`), |
| 64 | +}) |
| 65 | +``` |
| 66 | + |
| 67 | +### Consumer |
| 68 | + |
| 69 | +```go |
| 70 | +import ( |
| 71 | + "context" |
| 72 | + "github.com/patrickjmcd/bunny/consumer" |
| 73 | + amqp "github.com/rabbitmq/amqp091-go" |
| 74 | +) |
| 75 | + |
| 76 | +type MyHandler struct{} |
| 77 | + |
| 78 | +func (h *MyHandler) OnReceive(ctx context.Context, key, value interface{}) error { |
| 79 | + msg := value.(MyMessage) |
| 80 | + // process msg... |
| 81 | + return nil |
| 82 | +} |
| 83 | + |
| 84 | +type MyErrorHandler struct{} |
| 85 | + |
| 86 | +func (h *MyErrorHandler) OnError(ctx context.Context, raw *amqp.Delivery) error { |
| 87 | + // handle or dead-letter the message |
| 88 | + return nil |
| 89 | +} |
| 90 | + |
| 91 | +c, err := consumer.NewRabbitConsumer( |
| 92 | + consumer.WithConnectionString("amqp://guest:guest@localhost:5672/"), |
| 93 | + consumer.WithExchangeName("my-exchange"), |
| 94 | + consumer.WithExchangeType("topic"), |
| 95 | + consumer.WithQueueName("my-queue"), |
| 96 | + consumer.WithTopic("my.routing.key"), |
| 97 | + consumer.WithValueDeserializer(consumer.NewJSONDeserializer[MyMessage]()), |
| 98 | + consumer.WithMessageHandler(&MyHandler{}), |
| 99 | + consumer.WithMessageErrorHandler(&MyErrorHandler{}), |
| 100 | + consumer.WithPrefetchCount(10), |
| 101 | +) |
| 102 | +if err != nil { |
| 103 | + log.Fatal(err) |
| 104 | +} |
| 105 | +defer c.Close() |
| 106 | + |
| 107 | +// Blocks until ctx is cancelled or a fatal error occurs |
| 108 | +if err := c.Run(context.Background()); err != nil { |
| 109 | + log.Fatal(err) |
| 110 | +} |
| 111 | +``` |
| 112 | + |
| 113 | +### Retry Publisher |
| 114 | + |
| 115 | +Use `RetryPublisher` as the `MessageErrorHandler` to automatically re-publish failed messages to another exchange/queue. |
| 116 | + |
| 117 | +```go |
| 118 | +import ( |
| 119 | + "github.com/patrickjmcd/bunny/retry" |
| 120 | + "github.com/patrickjmcd/bunny/producer" |
| 121 | +) |
| 122 | + |
| 123 | +retryProducer, _ := producer.NewRabbitProducer( |
| 124 | + producer.WithConnectionString("amqp://guest:guest@localhost:5672/"), |
| 125 | + producer.WithExchangeName("retry-exchange"), |
| 126 | + producer.WithQueueName("retry-queue"), |
| 127 | + producer.WithTopic("retry.key"), |
| 128 | +) |
| 129 | + |
| 130 | +retryHandler := retry.NewRetryPublisherConfig( |
| 131 | + retry.WithMessagePublisher(retryProducer), |
| 132 | +) |
| 133 | + |
| 134 | +c, _ := consumer.NewRabbitConsumer( |
| 135 | + // ...other options... |
| 136 | + consumer.WithMessageErrorHandler(retryHandler), |
| 137 | +) |
| 138 | +``` |
| 139 | + |
| 140 | +## Configuration Options |
| 141 | + |
| 142 | +### Connection |
| 143 | + |
| 144 | +| Option | Description | Default | |
| 145 | +|--------|-------------|---------| |
| 146 | +| `WithConnectionString(s)` | Full AMQP URI (overrides individual fields) | — | |
| 147 | +| `WithProtocol(s)` | Protocol (`amqp` or `amqps`) | `amqp` | |
| 148 | +| `WithHost(s)` | Broker hostname | `localhost` | |
| 149 | +| `WithPort(s)` | Broker port | `5672` | |
| 150 | +| `WithUsername(s)` | Username | `guest` | |
| 151 | +| `WithVHost(s)` | Virtual host | `/` | |
| 152 | + |
| 153 | +### Exchange |
| 154 | + |
| 155 | +| Option | Description | Default | |
| 156 | +|--------|-------------|---------| |
| 157 | +| `WithExchangeName(s)` | Exchange name | `""` | |
| 158 | +| `WithExchangeType(s)` | Type: `direct`, `fanout`, `topic`, `headers` | `""` | |
| 159 | +| `WithExchangeDurable(b)` | Survive broker restart | `false` | |
| 160 | +| `WithExchangeAutoDelete(b)` | Delete when unused | `false` | |
| 161 | +| `WithExchangeInternal(b)` | Internal exchange | `false` | |
| 162 | +| `WithExchangeNoWait(b)` | Do not wait for confirmation | `false` | |
| 163 | +| `WithExchangeArgs(t)` | Additional arguments | `nil` | |
| 164 | + |
| 165 | +### Queue |
| 166 | + |
| 167 | +| Option | Description | Default | |
| 168 | +|--------|-------------|---------| |
| 169 | +| `WithQueueName(s)` | Queue name | `""` | |
| 170 | +| `WithQueueDurable(b)` | Survive broker restart | `false` | |
| 171 | +| `WithQueueAutoDelete(b)` | Delete when unused | `true` | |
| 172 | +| `WithQueueExclusive(b)` | Exclusive to connection | `false` | |
| 173 | +| `WithQueueNoWait(b)` | Do not wait for confirmation | `false` | |
| 174 | +| `WithQueueNoBind(b)` | Skip exchange binding | `false` | |
| 175 | +| `WithQueueArgs(t)` | Additional arguments | `nil` | |
| 176 | + |
| 177 | +### Producer-specific |
| 178 | + |
| 179 | +| Option | Description | Default | |
| 180 | +|--------|-------------|---------| |
| 181 | +| `WithTopic(s)` | Routing key for published messages | `"unconfigured-topic"` | |
| 182 | +| `WithValueSerializer(v)` | Serializer implementation | `NilSerializer` | |
| 183 | +| `WithPublisherMandatory(b)` | Return unroutable messages | `true` | |
| 184 | +| `WithPublisherImmediate(b)` | Fail if no immediate consumer | `false` | |
| 185 | +| `WithTracer(t)` | OpenTelemetry tracer | global OTEL tracer | |
| 186 | +| `WithTracePropagator(p)` | OTEL text map propagator | global propagator | |
| 187 | + |
| 188 | +### Consumer-specific |
| 189 | + |
| 190 | +| Option | Description | Default | |
| 191 | +|--------|-------------|---------| |
| 192 | +| `WithTopic(s)` | Routing key to bind | `"unconfigured-Topic"` | |
| 193 | +| `WithValueDeserializer(v)` | Deserializer implementation | `NilDeserializer` | |
| 194 | +| `WithMessageHandler(h)` | Handler for successfully deserialized messages | `NoOpMessageHandler` | |
| 195 | +| `WithMessageErrorHandler(h)` | Handler for failed messages | `NoOpMessageErrorHandler` | |
| 196 | +| `WithProcessingDelay(d)` | Artificial delay before processing each message | `0` | |
| 197 | +| `WithConsumerName(s)` | Consumer tag | `""` | |
| 198 | +| `WithConsumerAutoAck(b)` | Acknowledge messages automatically | `false` | |
| 199 | +| `WithConsumerExclusive(b)` | Exclusive consumer | `false` | |
| 200 | +| `WithPrefetchCount(n)` | QoS prefetch count | `0` (unlimited) | |
| 201 | +| `WithSuppressProcessingErrors(b)` | Suppress handler error logs | `false` | |
| 202 | +| `WithTracer(t)` | OpenTelemetry tracer | noop tracer | |
| 203 | +| `WithTracePropagator(p)` | OTEL text map propagator | TraceContext + Baggage | |
| 204 | + |
| 205 | +## Serializers and Deserializers |
| 206 | + |
| 207 | +### Producer serializers (`producer` package) |
| 208 | + |
| 209 | +| Type | Description | |
| 210 | +|------|-------------| |
| 211 | +| `NilSerializer` | No-op, passes body as-is | |
| 212 | +| `BytesSerializer` | Raw `[]byte` | |
| 213 | +| `JsonSerializer[T]` | JSON encoding of type `T` | |
| 214 | +| `ProtobufSerializer[T]` | Protocol Buffer binary encoding | |
| 215 | +| `ProtoJsonSerializer[T]` | Protocol Buffer JSON encoding | |
| 216 | + |
| 217 | +Implement the `ValueSerializer` interface for custom formats: |
| 218 | + |
| 219 | +```go |
| 220 | +type ValueSerializer interface { |
| 221 | + Serialize(topic string, value interface{}) ([]byte, error) |
| 222 | + GetContentType() string |
| 223 | + Close() |
| 224 | +} |
| 225 | +``` |
| 226 | + |
| 227 | +### Consumer deserializers (`consumer` package) |
| 228 | + |
| 229 | +| Type | Description | |
| 230 | +|------|-------------| |
| 231 | +| `NilDeserializer` | No-op, returns body as-is | |
| 232 | +| `StringDeserializer` | Converts body to `string` | |
| 233 | +| `JSONDeserializer[T]` | JSON decoding into type `T` | |
| 234 | +| `ProtobufDeserializer[T]` | Protocol Buffer binary decoding | |
| 235 | +| `ProtoJsonDeserializer[T]` | Protocol Buffer JSON decoding | |
| 236 | + |
| 237 | +Implement `ValueDeserializer` for custom formats: |
| 238 | + |
| 239 | +```go |
| 240 | +type ValueDeserializer interface { |
| 241 | + Deserialize(ctx context.Context, topic string, data []byte) (interface{}, error) |
| 242 | + DeserializeInto(ctx context.Context, topic string, data []byte, dest interface{}) error |
| 243 | + Close() |
| 244 | +} |
| 245 | +``` |
| 246 | + |
| 247 | +## Observability |
| 248 | + |
| 249 | +### OpenTelemetry |
| 250 | + |
| 251 | +Bunny creates spans for each produce and consume operation, propagating trace context through AMQP message headers. Pass your configured tracer and propagator via `WithTracer` and `WithTracePropagator`. |
| 252 | + |
| 253 | +### Prometheus Metrics |
| 254 | + |
| 255 | +The following metrics are registered automatically: |
| 256 | + |
| 257 | +| Metric | Type | Labels | Description | |
| 258 | +|--------|------|--------|-------------| |
| 259 | +| `bunny_produced_total` | Counter | `topic` | Total messages produced | |
| 260 | +| `bunny_receive_latency_ms` | Histogram | `exchange`, `queue`, `topic` | Time from message timestamp to receipt | |
| 261 | +| `bunny_processing_duration_ms` | Histogram | `exchange`, `queue`, `topic` | Time to process each message | |
| 262 | + |
| 263 | +## Packages |
| 264 | + |
| 265 | +| Package | Description | |
| 266 | +|---------|-------------| |
| 267 | +| `rabbit` | Low-level AMQP client with reconnection logic | |
| 268 | +| `rabbit/options` | Connection, exchange, queue, consumer, and publisher option types | |
| 269 | +| `producer` | High-level message publisher | |
| 270 | +| `consumer` | High-level message consumer | |
| 271 | +| `retry` | `MessageErrorHandler` implementation that re-publishes failed messages | |
| 272 | + |
| 273 | +## License |
| 274 | + |
| 275 | +MIT |
0 commit comments