diff --git a/examples/function-queue/README.md b/examples/function-queue/README.md new file mode 100644 index 0000000..48da303 --- /dev/null +++ b/examples/function-queue/README.md @@ -0,0 +1,83 @@ +# Function Queue with Worker Pool Example + +This example demonstrates how to use `workqueue` to manage and execute functions in a worker pool pattern. + +## What This Example Demonstrates + +1. **Storing functions as work items**: Shows how to store executable functions/closures in the queue by wrapping them in a struct (since functions themselves aren't comparable in Go). + +2. **Worker pool pattern**: Creates a pool of 5 concurrent goroutines that pull work from the shared queue and execute tasks in parallel. + +3. **Priority-based execution**: Tasks with higher priority values are executed first. The example includes various tasks with different priorities (40-100). + +4. **Diverse task types**: Demonstrates different types of work functions: + - Database operations (backup) + - Communication (email notifications) + - Data processing (reports, indexing) + - Maintenance (cleanup, archiving) + +5. **Graceful shutdown**: Workers detect when the queue is empty and shut down cleanly using context timeouts. + +## How to Run + +```bash +cd examples/function-queue +go run main.go +``` + +## Expected Output + +The example will: +- Add 8 different tasks to the queue with varying priorities +- Start 5 workers that process tasks concurrently +- Show which worker executes which task +- Display task execution in priority order (highest first) +- Show workers shutting down gracefully when queue is empty + +## Key Concepts + +### Worker Pool Pattern +A worker pool is a common concurrency pattern where: +- Multiple goroutines (workers) run concurrently +- Workers pull work items from a shared queue +- Work is distributed automatically across available workers +- The pattern provides controlled concurrency and load balancing + +### Storing Functions in the Queue + +Since Go functions aren't comparable (can't be used as map keys or compared with `==`), we can't use them directly as the `Key` or `Value` type in workqueue. Instead, we: +1. Use an ID or name as the key (which is comparable) +2. Store the function in a struct as the value +3. Extract and execute the function when the task is taken from the queue + +### workqueue Features Used + +- **Priority**: Higher priority tasks (e.g., Database Backup: 100) execute before lower priority tasks (e.g., Archive Old Logs: 40) +- **Take**: Workers block until a task is available +- **Context with timeout**: Allows workers to detect when no more work is available +- **Concurrent access**: Multiple workers safely access the same queue + +## Adapting for Production + +In a production system, you might: +1. Add error handling and retry logic for failed tasks +2. Implement task timeouts to prevent hanging +3. Add monitoring and metrics (tasks processed, queue depth, etc.) +4. Store task results or errors for later review +5. Support task cancellation +6. Implement a dead-letter queue for repeatedly failing tasks +7. Add task dependencies (task B runs only after task A completes) +8. Persist tasks to disk/database for durability +9. Support scheduled/delayed task execution using `DelayedUntil` +10. Add task expiration using `ExpiresAt` for time-sensitive work + +## Real-World Use Cases + +This pattern is useful for: +- Background job processing +- Task scheduling systems +- Event-driven architectures +- Batch processing pipelines +- Microservices work distribution +- Async operation handling +- Load balancing across workers diff --git a/examples/function-queue/main.go b/examples/function-queue/main.go new file mode 100644 index 0000000..f3bee31 --- /dev/null +++ b/examples/function-queue/main.go @@ -0,0 +1,218 @@ +package main + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + "github.com/andrewortman/workqueue" +) + +// TaskMetadata represents metadata about a task +// This is comparable and can be used in the workqueue +type TaskMetadata struct { + ID int + Name string + Priority int64 +} + +// Task represents a work function to be executed +type Task struct { + Metadata TaskMetadata + Execute func() error +} + +func main() { + // Create a work queue where: + // - Key: task ID (int) + // - Value: TaskMetadata (comparable) + // We'll use a map to look up the actual functions + queue := workqueue.NewInMemory[int, TaskMetadata](nil) + ctx := context.Background() + + fmt.Println("=== Function Queue with Worker Pool Example ===") + fmt.Println() + + // Create a map to store task functions + // The queue will store TaskMetadata, and we'll look up functions here + taskFunctions := make(map[int]func() error) + + // Create sample tasks with different behaviors + tasks := []Task{ + { + Metadata: TaskMetadata{ID: 1, Name: "Database Backup", Priority: 100}, + Execute: func() error { + fmt.Println(" → Starting database backup...") + time.Sleep(500 * time.Millisecond) + fmt.Println(" → Database backup completed!") + return nil + }, + }, + { + Metadata: TaskMetadata{ID: 2, Name: "Send Email Notification", Priority: 80}, + Execute: func() error { + fmt.Println(" → Sending email notification...") + time.Sleep(200 * time.Millisecond) + fmt.Println(" → Email sent successfully!") + return nil + }, + }, + { + Metadata: TaskMetadata{ID: 3, Name: "Generate Report", Priority: 90}, + Execute: func() error { + fmt.Println(" → Generating monthly report...") + time.Sleep(700 * time.Millisecond) + fmt.Println(" → Report generated!") + return nil + }, + }, + { + Metadata: TaskMetadata{ID: 4, Name: "Clean Temporary Files", Priority: 50}, + Execute: func() error { + fmt.Println(" → Cleaning temporary files...") + time.Sleep(300 * time.Millisecond) + fmt.Println(" → Cleanup completed!") + return nil + }, + }, + { + Metadata: TaskMetadata{ID: 5, Name: "Update Cache", Priority: 85}, + Execute: func() error { + fmt.Println(" → Updating cache...") + time.Sleep(400 * time.Millisecond) + fmt.Println(" → Cache updated!") + return nil + }, + }, + { + Metadata: TaskMetadata{ID: 6, Name: "Process API Webhooks", Priority: 95}, + Execute: func() error { + fmt.Println(" → Processing webhooks...") + time.Sleep(250 * time.Millisecond) + fmt.Println(" → Webhooks processed!") + return nil + }, + }, + { + Metadata: TaskMetadata{ID: 7, Name: "Archive Old Logs", Priority: 40}, + Execute: func() error { + fmt.Println(" → Archiving old logs...") + time.Sleep(600 * time.Millisecond) + fmt.Println(" → Logs archived!") + return nil + }, + }, + { + Metadata: TaskMetadata{ID: 8, Name: "Index Search Data", Priority: 75}, + Execute: func() error { + fmt.Println(" → Indexing search data...") + time.Sleep(450 * time.Millisecond) + fmt.Println(" → Indexing complete!") + return nil + }, + }, + } + + // Add tasks to the queue and store functions in the map + fmt.Println("Adding tasks to the queue...") + for _, task := range tasks { + // Store the function in our map + taskFunctions[task.Metadata.ID] = task.Execute + + // Add metadata to the queue + item := workqueue.WorkItem[int, TaskMetadata]{ + Key: task.Metadata.ID, + Value: task.Metadata, + Priority: task.Metadata.Priority, + } + if err := queue.Put(ctx, item); err != nil { + log.Fatalf("Failed to add task %d: %v", task.Metadata.ID, err) + } + fmt.Printf(" Added task %d: %s (priority: %d)\n", task.Metadata.ID, task.Metadata.Name, task.Metadata.Priority) + } + + fmt.Println() + fmt.Println("Starting worker pool (5 workers)...") + fmt.Println() + + // Create a worker pool + const numWorkers = 5 + var wg sync.WaitGroup + + // Track completed tasks + completed := make(chan int, len(tasks)) + + // Start workers + for i := 1; i <= numWorkers; i++ { + wg.Add(1) + workerID := i + go func() { + defer wg.Done() + + for { + // Try to take a task from the queue with a timeout + // This allows workers to exit gracefully when queue is empty + timeoutCtx, cancel := context.WithTimeout(ctx, 2*time.Second) + item, err := queue.Take(timeoutCtx) + cancel() + + if err != nil { + // Context timeout means no more tasks available + if err == context.DeadlineExceeded { + fmt.Printf("Worker %d: No more tasks, shutting down\n", workerID) + return + } + log.Printf("Worker %d: Error taking task: %v", workerID, err) + return + } + + metadata := item.Value + + // Look up the function for this task + taskFunc, ok := taskFunctions[metadata.ID] + if !ok { + log.Printf("Worker %d: Function not found for task %d", workerID, metadata.ID) + continue + } + + // Execute the task + fmt.Printf("Worker %d: Executing task %d - %s\n", workerID, metadata.ID, metadata.Name) + + // Run the function + if err := taskFunc(); err != nil { + log.Printf("Worker %d: Task %d failed: %v", workerID, metadata.ID, err) + } else { + fmt.Printf("Worker %d: Task %d completed successfully\n", workerID, metadata.ID) + } + + completed <- metadata.ID + fmt.Println() + } + }() + } + + // Wait for all workers to finish + wg.Wait() + + // Close the completed channel and collect results + close(completed) + completedTasks := []int{} + for taskID := range completed { + completedTasks = append(completedTasks, taskID) + } + + fmt.Println("=== Summary ===") + fmt.Printf("Total tasks completed: %d\n", len(completedTasks)) + fmt.Println() + fmt.Println("This example demonstrated:") + fmt.Println(" 1. Storing functions/closures as work items") + fmt.Println(" 2. Priority-based execution (higher priority tasks run first)") + fmt.Println(" 3. Worker pool pattern with 5 concurrent goroutines") + fmt.Println(" 4. Each worker pulling and executing tasks from the queue") + fmt.Println(" 5. Graceful shutdown when queue is empty") + fmt.Println() + fmt.Println("Note: Due to concurrent execution, output order may vary") + fmt.Println(" but high-priority tasks generally execute earlier.") +} diff --git a/examples/url-crawler/README.md b/examples/url-crawler/README.md new file mode 100644 index 0000000..68e69ee --- /dev/null +++ b/examples/url-crawler/README.md @@ -0,0 +1,60 @@ +# URL Crawl Frontier Example + +This example demonstrates how to use `workqueue` as a URL crawl frontier - a queue that manages URLs to be fetched by a web crawler. + +## What This Example Demonstrates + +1. **Priority-based URL fetching**: URLs that have never been fetched get the highest priority (1000), while URLs that have been fetched get lower priority (500). In a real crawler, you could implement age-based priority where older fetches get higher priority. + +2. **Delayed re-fetching**: After a URL is fetched, it's re-added to the queue with a 1-hour delay using `DelayedUntil`. This prevents the same URL from being fetched too frequently. + +3. **24-hour expiration**: All URLs expire and are automatically removed from the queue after 24 hours using `ExpiresAt`. + +4. **Multiple workers**: The example uses 3 concurrent worker goroutines that fetch URLs from the queue, simulating a realistic crawler architecture. + +5. **Queue operations**: Demonstrates `Put` (adding new URLs), `Take` (fetching URLs to process), and checking queue status with `Size`. + +## How to Run + +```bash +cd examples/url-crawler +go run main.go +``` + +## Expected Output + +The example will: +- Add 5 seed URLs to the queue +- Start 3 workers that process URLs concurrently +- Show each worker fetching URLs and re-queueing them with a delay +- Display the final queue state showing delayed items +- Demonstrate that delayed URLs cannot be fetched immediately + +## Key Concepts + +### Crawl Frontier +A crawl frontier is a data structure that manages which URLs a web crawler should fetch next. It typically handles: +- Politeness policies (delay between fetches from the same host) +- Prioritization (important pages first) +- Duplicate detection (don't fetch the same URL twice) +- URL expiration (remove stale URLs) + +### workqueue Features Used + +- **Priority**: Higher priority URLs are fetched first +- **DelayedUntil**: Prevents immediate re-fetching of URLs (politeness policy) +- **ExpiresAt**: Automatically removes old URLs from the queue +- **Take**: Workers block until a URL is available for fetching +- **Put**: Add new URLs or re-add previously fetched URLs + +## Adapting for Production + +In a production crawler, you would: +1. Actually fetch URLs using an HTTP client +2. Parse fetched pages to discover new URLs +3. Implement host-based delays (not just global delays) +4. Add duplicate detection logic +5. Persist the queue to disk/database +6. Handle robots.txt and other politeness policies +7. Implement more sophisticated priority calculation +8. Add error handling and retry logic diff --git a/examples/url-crawler/main.go b/examples/url-crawler/main.go new file mode 100644 index 0000000..ccdad11 --- /dev/null +++ b/examples/url-crawler/main.go @@ -0,0 +1,166 @@ +package main + +import ( + "context" + "fmt" + "log" + "math/rand" + "time" + + "github.com/andrewortman/workqueue" +) + +// URLInfo stores information about a URL to be crawled +type URLInfo struct { + URL string + LastFetched time.Time +} + +func main() { + // Create a new work queue for URL crawling + // Key: URL string, Value: URLInfo + queue := workqueue.NewInMemory[string, URLInfo](nil) + ctx := context.Background() + + fmt.Println("=== URL Crawl Frontier Example ===") + fmt.Println() + + // Seed URLs to crawl + seedURLs := []string{ + "https://example.com", + "https://example.com/page1", + "https://example.com/page2", + "https://example.com/page3", + "https://example.com/blog", + } + + now := time.Now() + + // Add seed URLs to the queue + // URLs that haven't been fetched get the highest priority (using a large value) + // They expire after 24 hours + fmt.Println("Adding seed URLs to the queue...") + for _, url := range seedURLs { + item := workqueue.WorkItem[string, URLInfo]{ + Key: url, + Value: URLInfo{ + URL: url, + LastFetched: time.Time{}, // Never fetched + }, + Priority: 1000, // High priority for never-fetched URLs + ExpiresAt: now.Add(24 * time.Hour), + } + if err := queue.Put(ctx, item); err != nil { + log.Fatalf("Failed to add URL %s: %v", url, err) + } + fmt.Printf(" Added: %s (priority: %d, expires in 24h)\n", url, item.Priority) + } + + fmt.Println() + fmt.Println("Starting crawler workers...") + fmt.Println() + + // Start 3 worker goroutines that fetch URLs + const numWorkers = 3 + done := make(chan bool) + + for i := 1; i <= numWorkers; i++ { + workerID := i + go func() { + fetchCount := 0 + // Each worker will process 2 URLs + for fetchCount < 2 { + // Take a URL from the queue + item, err := queue.Take(ctx) + if err != nil { + log.Printf("Worker %d: error taking item: %v", workerID, err) + break + } + + urlInfo := item.Value + fetchCount++ + + // Simulate fetching the URL + fmt.Printf("Worker %d: Fetching %s (fetch #%d)\n", workerID, urlInfo.URL, fetchCount) + + // Simulate varying fetch times (100-500ms) + sleepTime := time.Duration(100+rand.Intn(400)) * time.Millisecond + time.Sleep(sleepTime) + + fmt.Printf("Worker %d: Completed %s (took %v)\n", workerID, urlInfo.URL, sleepTime) + + // Re-add the URL with a 1-hour delay and lower priority + // After being fetched, URLs get lower priority (older fetches = higher priority) + // So we decrease the priority slightly with each fetch + lastFetchTime := time.Now() + updatedItem := workqueue.WorkItem[string, URLInfo]{ + Key: item.Key, + Value: URLInfo{ + URL: urlInfo.URL, + LastFetched: lastFetchTime, + }, + // Lower priority after being fetched (we use age-based priority) + // The more recently fetched, the lower the priority + Priority: 500, // Lower than never-fetched URLs + // Delay for 1 hour before it can be fetched again + DelayedUntil: lastFetchTime.Add(1 * time.Hour), + // Still expires 24 hours from original add (in this demo, we keep original expiry) + ExpiresAt: item.ExpiresAt, + } + + // Use PutOrUpdate to re-add the URL (it was removed by Take) + if err := queue.Put(ctx, updatedItem); err != nil { + log.Printf("Worker %d: Failed to re-queue %s: %v", workerID, urlInfo.URL, err) + } else { + fmt.Printf("Worker %d: Re-queued %s (delayed 1h, priority: %d)\n", + workerID, urlInfo.URL, updatedItem.Priority) + } + fmt.Println() + } + done <- true + }() + } + + // Wait for all workers to complete + for i := 0; i < numWorkers; i++ { + <-done + } + + fmt.Println("=== Checking Queue Status ===") + + // Show the queue size + size, err := queue.Size(ctx) + if err != nil { + log.Fatalf("Failed to get queue size: %v", err) + } + fmt.Printf("Queue size: %d pending, %d delayed\n", size.Pending, size.Delayed) + fmt.Println() + + // Demonstrate that delayed items cannot be fetched immediately + fmt.Println("Attempting to fetch another URL (should show delayed items)...") + + // Create a context with a short timeout + timeoutCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) + defer cancel() + + _, err = queue.Take(timeoutCtx) + if err != nil { + fmt.Printf("Expected timeout: %v\n", err) + fmt.Println("(This is correct - all URLs are delayed for 1 hour)") + } + + fmt.Println() + fmt.Println("=== Summary ===") + fmt.Println("This example demonstrated:") + fmt.Println(" 1. Adding URLs with priority (never-fetched = highest)") + fmt.Println(" 2. Workers fetching URLs from the queue") + fmt.Println(" 3. Re-queueing URLs with 1-hour delay after fetch") + fmt.Println(" 4. 24-hour expiration for URLs") + fmt.Println(" 5. Priority-based processing (higher priority first)") + fmt.Println() + fmt.Println("In a real crawler:") + fmt.Println(" - URLs would actually be fetched via HTTP") + fmt.Println(" - New URLs discovered would be added to the queue") + fmt.Println(" - Priority could be based on various factors (age, importance, etc.)") + fmt.Println(" - The crawler would run continuously, not just process a fixed number") +}