Skip to content

fgrzl/queues

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Dependabot Updates ci

Queues

A Go library providing a unified interface for various queue implementations, supporting multiple backends.

Features

  • Pluggable Backends: Easily switch between different queue providers.
  • Supported Providers:
  • Batch Processing: Support for sending and receiving multiple messages at once.
  • Visibility Timeouts: Ensures messages are processed at least once while preventing duplicates.

Installation

To install the package, run:

go get github.com/fgrzl/queues

Usage

1️⃣ Initializing a Queue Provider

package main

import (
    "log"
    "github.com/fgrzl/queues/pebble"
)

func main() {
    // Initialize the Pebble queue provider
    provider, err := pebble.NewQueueProvider(pebble.QueueProviderOptions{Path: "path/to/db"})
    if err != nil {
        log.Fatalf("Failed to initialize Pebble provider: %v", err)
    }
    defer provider.Close()

    // Your code to send/receive messages
}

2️⃣ Sending Messages

📌 Sending a Single Message

package main

import (
    "context"
    "log"
    "github.com/fgrzl/queues"
)

func sendSingle(provider queues.QueueProvider) {
    ctx := context.Background()
    queueName := "example-queue"

    // Create a new queue item
    item := queues.NewQueueItem(queueName, "Hello, Queue!")

    // Send the item to the queue
    if err := provider.Send(ctx, item); err != nil {
        log.Fatalf("Failed to send item to queue: %v", err)
    }

    log.Println("Message sent successfully!")
}

📌 Sending a Batch of Messages

func sendBatch(provider queues.QueueProvider) {
    ctx := context.Background()
    queueName := "example-queue"

    // Create a batch of queue items
    batch := []*queues.QueueItem{
        queues.NewQueueItem(queueName, newDummyContent()),
        queues.NewQueueItem(queueName, newDummyContent()),
        queues.NewQueueItem(queueName, newDummyContent()),
    }

    // Send the batch to the queue
    if err := provider.SendBatch(ctx, batch); err != nil {
        log.Fatalf("Failed to send batch messages: %v", err)
    }

    log.Println("Batch messages sent successfully!")
}

3️⃣ Receiving Messages

func receiveMessages(provider queues.QueueProvider) {
    ctx := context.Background()
    queueName := "example-queue"

    // Set up receive arguments
    args := queues.NewReceiveArgs(queueName)

    // Receive messages from the queue
    messages, err := provider.Receive(ctx, &args)
    if err != nil {
        log.Fatalf("Failed to receive items from queue: %v", err)
    }

    for _, msg := range messages {
        log.Printf("Received message: %v", msg.Content)
    }
}

4️⃣ Deleting Messages After Processing

func deleteMessage(provider queues.QueueProvider, message *queues.QueueItem) {
    ctx := context.Background()

    // Remove the message from the queue
    success, err := provider.Remove(ctx, message)
    if err != nil {
        log.Fatalf("Failed to delete message: %v", err)
    }

    if success {
        log.Println("Message successfully deleted from the queue.")
    } else {
        log.Println("Message could not be deleted from the queue.")
    }
}

Running Tests

To run the tests, first start the Docker Compose services:

docker compose -f test/compose.yml up -d

Then, use the following command to run the tests:

go test ./...

The tests are located in the queues_test.go file and cover various scenarios for different queue providers.

About

Unified queue interface for Go supporting Azure, AWS SQS, and Pebble

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages