Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions examples/function-queue/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Function Queue with Worker Pool Example

This example demonstrates how to use `workqueue` to manage and execute functions in a worker pool pattern.

## What This Example Demonstrates

1. **Storing functions as work items**: Shows how to store executable functions/closures in the queue by wrapping them in a struct (since functions themselves aren't comparable in Go).

2. **Worker pool pattern**: Creates a pool of 5 concurrent goroutines that pull work from the shared queue and execute tasks in parallel.

3. **Priority-based execution**: Tasks with higher priority values are executed first. The example includes various tasks with different priorities (40-100).

4. **Diverse task types**: Demonstrates different types of work functions:
- Database operations (backup)
- Communication (email notifications)
- Data processing (reports, indexing)
- Maintenance (cleanup, archiving)

5. **Graceful shutdown**: Workers detect when the queue is empty and shut down cleanly using context timeouts.

## How to Run

```bash
cd examples/function-queue
go run main.go
```

## Expected Output

The example will:
- Add 8 different tasks to the queue with varying priorities
- Start 5 workers that process tasks concurrently
- Show which worker executes which task
- Display task execution in priority order (highest first)
- Show workers shutting down gracefully when queue is empty

## Key Concepts

### Worker Pool Pattern
A worker pool is a common concurrency pattern where:
- Multiple goroutines (workers) run concurrently
- Workers pull work items from a shared queue
- Work is distributed automatically across available workers
- The pattern provides controlled concurrency and load balancing

### Storing Functions in the Queue

Since Go functions aren't comparable (can't be used as map keys or compared with `==`), we can't use them directly as the `Key` or `Value` type in workqueue. Instead, we:
1. Use an ID or name as the key (which is comparable)
2. Store the function in a struct as the value
3. Extract and execute the function when the task is taken from the queue

### workqueue Features Used

- **Priority**: Higher priority tasks (e.g., Database Backup: 100) execute before lower priority tasks (e.g., Archive Old Logs: 40)
- **Take**: Workers block until a task is available
- **Context with timeout**: Allows workers to detect when no more work is available
- **Concurrent access**: Multiple workers safely access the same queue

## Adapting for Production

In a production system, you might:
1. Add error handling and retry logic for failed tasks
2. Implement task timeouts to prevent hanging
3. Add monitoring and metrics (tasks processed, queue depth, etc.)
4. Store task results or errors for later review
5. Support task cancellation
6. Implement a dead-letter queue for repeatedly failing tasks
7. Add task dependencies (task B runs only after task A completes)
8. Persist tasks to disk/database for durability
9. Support scheduled/delayed task execution using `DelayedUntil`
10. Add task expiration using `ExpiresAt` for time-sensitive work

## Real-World Use Cases

This pattern is useful for:
- Background job processing
- Task scheduling systems
- Event-driven architectures
- Batch processing pipelines
- Microservices work distribution
- Async operation handling
- Load balancing across workers
218 changes: 218 additions & 0 deletions examples/function-queue/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
package main

import (
"context"
"fmt"
"log"
"sync"
"time"

"github.com/andrewortman/workqueue"
)

// TaskMetadata represents metadata about a task
// This is comparable and can be used in the workqueue
type TaskMetadata struct {
ID int
Name string
Priority int64
}

// Task represents a work function to be executed
type Task struct {
Metadata TaskMetadata
Execute func() error
}

func main() {
// Create a work queue where:
// - Key: task ID (int)
// - Value: TaskMetadata (comparable)
// We'll use a map to look up the actual functions
queue := workqueue.NewInMemory[int, TaskMetadata](nil)
ctx := context.Background()

fmt.Println("=== Function Queue with Worker Pool Example ===")
fmt.Println()

// Create a map to store task functions
// The queue will store TaskMetadata, and we'll look up functions here
taskFunctions := make(map[int]func() error)

// Create sample tasks with different behaviors
tasks := []Task{
{
Metadata: TaskMetadata{ID: 1, Name: "Database Backup", Priority: 100},
Execute: func() error {
fmt.Println(" → Starting database backup...")
time.Sleep(500 * time.Millisecond)
fmt.Println(" → Database backup completed!")
return nil
},
},
{
Metadata: TaskMetadata{ID: 2, Name: "Send Email Notification", Priority: 80},
Execute: func() error {
fmt.Println(" → Sending email notification...")
time.Sleep(200 * time.Millisecond)
fmt.Println(" → Email sent successfully!")
return nil
},
},
{
Metadata: TaskMetadata{ID: 3, Name: "Generate Report", Priority: 90},
Execute: func() error {
fmt.Println(" → Generating monthly report...")
time.Sleep(700 * time.Millisecond)
fmt.Println(" → Report generated!")
return nil
},
},
{
Metadata: TaskMetadata{ID: 4, Name: "Clean Temporary Files", Priority: 50},
Execute: func() error {
fmt.Println(" → Cleaning temporary files...")
time.Sleep(300 * time.Millisecond)
fmt.Println(" → Cleanup completed!")
return nil
},
},
{
Metadata: TaskMetadata{ID: 5, Name: "Update Cache", Priority: 85},
Execute: func() error {
fmt.Println(" → Updating cache...")
time.Sleep(400 * time.Millisecond)
fmt.Println(" → Cache updated!")
return nil
},
},
{
Metadata: TaskMetadata{ID: 6, Name: "Process API Webhooks", Priority: 95},
Execute: func() error {
fmt.Println(" → Processing webhooks...")
time.Sleep(250 * time.Millisecond)
fmt.Println(" → Webhooks processed!")
return nil
},
},
{
Metadata: TaskMetadata{ID: 7, Name: "Archive Old Logs", Priority: 40},
Execute: func() error {
fmt.Println(" → Archiving old logs...")
time.Sleep(600 * time.Millisecond)
fmt.Println(" → Logs archived!")
return nil
},
},
{
Metadata: TaskMetadata{ID: 8, Name: "Index Search Data", Priority: 75},
Execute: func() error {
fmt.Println(" → Indexing search data...")
time.Sleep(450 * time.Millisecond)
fmt.Println(" → Indexing complete!")
return nil
},
},
}

// Add tasks to the queue and store functions in the map
fmt.Println("Adding tasks to the queue...")
for _, task := range tasks {
// Store the function in our map
taskFunctions[task.Metadata.ID] = task.Execute

// Add metadata to the queue
item := workqueue.WorkItem[int, TaskMetadata]{
Key: task.Metadata.ID,
Value: task.Metadata,
Priority: task.Metadata.Priority,
}
if err := queue.Put(ctx, item); err != nil {
log.Fatalf("Failed to add task %d: %v", task.Metadata.ID, err)
}
fmt.Printf(" Added task %d: %s (priority: %d)\n", task.Metadata.ID, task.Metadata.Name, task.Metadata.Priority)
}

fmt.Println()
fmt.Println("Starting worker pool (5 workers)...")
fmt.Println()

// Create a worker pool
const numWorkers = 5
var wg sync.WaitGroup

// Track completed tasks
completed := make(chan int, len(tasks))

// Start workers
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
workerID := i
go func() {
defer wg.Done()

for {
// Try to take a task from the queue with a timeout
// This allows workers to exit gracefully when queue is empty
timeoutCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
item, err := queue.Take(timeoutCtx)
cancel()

if err != nil {
// Context timeout means no more tasks available
if err == context.DeadlineExceeded {
fmt.Printf("Worker %d: No more tasks, shutting down\n", workerID)
return
}
log.Printf("Worker %d: Error taking task: %v", workerID, err)
return
}

metadata := item.Value

// Look up the function for this task
taskFunc, ok := taskFunctions[metadata.ID]
if !ok {
log.Printf("Worker %d: Function not found for task %d", workerID, metadata.ID)
continue
}

// Execute the task
fmt.Printf("Worker %d: Executing task %d - %s\n", workerID, metadata.ID, metadata.Name)

// Run the function
if err := taskFunc(); err != nil {
log.Printf("Worker %d: Task %d failed: %v", workerID, metadata.ID, err)
} else {
fmt.Printf("Worker %d: Task %d completed successfully\n", workerID, metadata.ID)
}

completed <- metadata.ID
fmt.Println()
}
}()
}

// Wait for all workers to finish
wg.Wait()

// Close the completed channel and collect results
close(completed)
completedTasks := []int{}
for taskID := range completed {
completedTasks = append(completedTasks, taskID)
}

fmt.Println("=== Summary ===")
fmt.Printf("Total tasks completed: %d\n", len(completedTasks))
fmt.Println()
fmt.Println("This example demonstrated:")
fmt.Println(" 1. Storing functions/closures as work items")
fmt.Println(" 2. Priority-based execution (higher priority tasks run first)")
fmt.Println(" 3. Worker pool pattern with 5 concurrent goroutines")
fmt.Println(" 4. Each worker pulling and executing tasks from the queue")
fmt.Println(" 5. Graceful shutdown when queue is empty")
fmt.Println()
fmt.Println("Note: Due to concurrent execution, output order may vary")
fmt.Println(" but high-priority tasks generally execute earlier.")
}
60 changes: 60 additions & 0 deletions examples/url-crawler/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# URL Crawl Frontier Example

This example demonstrates how to use `workqueue` as a URL crawl frontier - a queue that manages URLs to be fetched by a web crawler.

## What This Example Demonstrates

1. **Priority-based URL fetching**: URLs that have never been fetched get the highest priority (1000), while URLs that have been fetched get lower priority (500). In a real crawler, you could implement age-based priority where older fetches get higher priority.

2. **Delayed re-fetching**: After a URL is fetched, it's re-added to the queue with a 1-hour delay using `DelayedUntil`. This prevents the same URL from being fetched too frequently.

3. **24-hour expiration**: All URLs expire and are automatically removed from the queue after 24 hours using `ExpiresAt`.

4. **Multiple workers**: The example uses 3 concurrent worker goroutines that fetch URLs from the queue, simulating a realistic crawler architecture.

5. **Queue operations**: Demonstrates `Put` (adding new URLs), `Take` (fetching URLs to process), and checking queue status with `Size`.

## How to Run

```bash
cd examples/url-crawler
go run main.go
```

## Expected Output

The example will:
- Add 5 seed URLs to the queue
- Start 3 workers that process URLs concurrently
- Show each worker fetching URLs and re-queueing them with a delay
- Display the final queue state showing delayed items
- Demonstrate that delayed URLs cannot be fetched immediately

## Key Concepts

### Crawl Frontier
A crawl frontier is a data structure that manages which URLs a web crawler should fetch next. It typically handles:
- Politeness policies (delay between fetches from the same host)
- Prioritization (important pages first)
- Duplicate detection (don't fetch the same URL twice)
- URL expiration (remove stale URLs)

### workqueue Features Used

- **Priority**: Higher priority URLs are fetched first
- **DelayedUntil**: Prevents immediate re-fetching of URLs (politeness policy)
- **ExpiresAt**: Automatically removes old URLs from the queue
- **Take**: Workers block until a URL is available for fetching
- **Put**: Add new URLs or re-add previously fetched URLs

## Adapting for Production

In a production crawler, you would:
1. Actually fetch URLs using an HTTP client
2. Parse fetched pages to discover new URLs
3. Implement host-based delays (not just global delays)
4. Add duplicate detection logic
5. Persist the queue to disk/database
6. Handle robots.txt and other politeness policies
7. Implement more sophisticated priority calculation
8. Add error handling and retry logic
Loading