Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: CI

on:
push:
branches: [main]
pull_request:

permissions:
contents: read

jobs:
vet:
name: vet
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version-file: go.mod
cache: true
- run: go vet ./...

lint:
name: lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version-file: go.mod
cache: true
- uses: golangci/golangci-lint-action@v9
with:
version: v2.12

build:
name: build
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version-file: go.mod
cache: true
- run: go build ./...
59 changes: 59 additions & 0 deletions .github/workflows/security.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
name: Security

on:
push:
branches: [main]
pull_request:
schedule:
- cron: '0 6 * * 1'

permissions:
contents: read
security-events: write

jobs:
gosec:
name: gosec
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run gosec
uses: securego/gosec@v2.26.1
with:
args: '-no-fail -fmt sarif -out gosec.sarif ./...'
- name: Upload SARIF
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: gosec.sarif
category: gosec

govulncheck:
name: govulncheck
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: 'stable'
cache: true
- run: go install golang.org/x/vuln/cmd/govulncheck@latest
- run: govulncheck ./...

trivy:
name: trivy-fs
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run Trivy filesystem scan
uses: aquasecurity/trivy-action@v0.36.0
with:
scan-type: fs
format: sarif
output: trivy.sarif
ignore-unfixed: true
severity: 'HIGH,CRITICAL'
- name: Upload SARIF
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: trivy.sarif
category: trivy
30 changes: 30 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
version: "2"

run:
timeout: 3m

linters:
default: none
enable:
- bodyclose
- errcheck
- gocritic
- govet
- ineffassign
- misspell
- prealloc
- revive
- staticcheck
- unconvert
- unused

settings:
revive:
rules:
- name: var-naming
- name: empty-block
- name: unused-parameter

issues:
max-issues-per-linter: 0
max-same-issues: 0
134 changes: 133 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,134 @@
# taskq
Distributing task across multiple workers to manage the concurrency

> A scaffold for a Redis-backed distributed task queue in Go — Viper-driven config, single-instance Redis client, ready for fan-out + worker pool implementation.

[![CI](https://github.com/Chetas1/taskq/actions/workflows/ci.yml/badge.svg)](https://github.com/Chetas1/taskq/actions/workflows/ci.yml)
[![Security](https://github.com/Chetas1/taskq/actions/workflows/security.yml/badge.svg)](https://github.com/Chetas1/taskq/actions/workflows/security.yml)
[![Go Version](https://img.shields.io/badge/Go-1.21+-00ADD8?logo=go&logoColor=white)](https://go.dev/dl/)
[![Redis](https://img.shields.io/badge/Redis-DC382D?logo=redis&logoColor=white)](https://redis.io)

> **Status: scaffold.** This repo currently boots, loads YAML config, and connects to Redis. The producer/worker logic is the next milestone — see [Roadmap](#roadmap).

---

## Why this exists

A reference Go scaffold for the building blocks of a distributed task queue:

- **Viper-driven config** with file + env override (per `12-Factor`).
- **Single, dependency-injected Redis client** (avoids the global-singleton trap).
- **Clear seams** — `config`, `internal/app` — so producer and worker logic can be added without restructuring.

## Target Architecture

```mermaid
flowchart LR
subgraph Producers
P1[HTTP / gRPC API]
P2[Cron / scheduler]
end

subgraph Redis Streams
S[(Stream: tasks)]
G1[Consumer Group: workers]
end

subgraph Workers
W1[Worker 1]
W2[Worker 2]
Wn[Worker N]
end

DLQ[(Stream: tasks-dead)]
M[(Metrics / Tracing)]

P1 -- XADD --> S
P2 -- XADD --> S
S --- G1
G1 -- XREADGROUP --> W1
G1 -- XREADGROUP --> W2
G1 -- XREADGROUP --> Wn
W1 -. retries exhausted .-> DLQ
W2 -. metrics .-> M
```

## Today's Code Path

```mermaid
sequenceDiagram
autonumber
participant Bin as cmd/main
participant Cfg as config.LoadConfig
participant App as app.Init
participant R as Redis
Bin->>Cfg: read config/config.yaml + env
Cfg-->>Bin: *Config{Redis{Addr,Password,DB}}
Bin->>App: Init(cfg)
App->>R: redis.NewClient + PING
R-->>App: PONG
```

## Configuration

`config/config.yaml`:

```yaml
redis:
addr: "localhost:6379"
password: ""
db: 0
```

Env overrides (`viper.AutomaticEnv` planned): `REDIS_ADDR`, `REDIS_PASSWORD`, `REDIS_DB`.

## Roadmap

| Milestone | Status |
|--------------------------------------------------|--------|
| Boot + Redis health-check | ✅ |
| Producer API: `Enqueue(task) -> taskID` | ⏳ |
| Worker pool with at-least-once semantics | ⏳ |
| Retry policy (exponential backoff + max attempts)| ⏳ |
| Dead-letter stream | ⏳ |
| Idempotency keys + dedupe | ⏳ |
| OpenTelemetry tracing | ⏳ |
| Prometheus metrics (queue depth, age, p99 latency) | ⏳ |
| Graceful shutdown (drain in-flight, ack) on SIGTERM | ⏳ |
| Integration tests with `dockertest` / `miniredis`| ⏳ |

## Operational Concerns (when implemented)

- **At-least-once delivery.** Plan to use Redis Streams + consumer groups; commit offsets with `XACK` only after handler succeeds.
- **Visibility timeouts.** `XCLAIM` on pending entries past the timeout (e.g. 5× expected handler latency).
- **Retries.** Exponential backoff with jitter; cap retries; drop to DLQ on exhaustion.
- **Backpressure.** Bound stream length with `MAXLEN ~ N`; expose queue depth as a metric.
- **Graceful shutdown.** Stop accepting new tasks, wait for in-flight workers up to a timeout, then `XCLAIM` to a clean-up consumer.

## Security Considerations

- **TLS to Redis.** Today: plaintext (`localhost`). Production: `rediss://` with CA bundle and cert pinning.
- **Auth.** Today: `password` in YAML. Production: env var or secret-manager-injected, never committed.
- **Payload trust.** Workers must treat task payloads as untrusted — validate, bound, and rate-limit.
- **Egress from workers.** If workers call external APIs, sandbox with timeouts + circuit breakers.

## Development

```bash
# Boot Redis (one-shot)
docker run -d --name redis-taskq -p 6379:6379 redis:7-alpine

# Run
go run .

# Lint + vet + vulnerability scan
go vet ./...
golangci-lint run
govulncheck ./...

# Stop Redis
docker rm -f redis-taskq
```

## License

See [LICENSE](LICENSE).
23 changes: 16 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,36 +1,45 @@
// Package config loads runtime configuration from `config/config.yaml`
// and environment variables (via Viper's AutomaticEnv). Environment variables
// override file values; nested keys use `_` separators (e.g. `REDIS_ADDR`).
package config

import (
"log"
"fmt"

"github.com/spf13/viper"
)

// Config is the top-level taskq configuration.
type Config struct {
Redis Redis
}

// Redis holds the connection details for the backing Redis instance / cluster.
type Redis struct {
Addr string
Password string
DB int
}

func LoadConfig() *Config {
// LoadConfig reads `config/config.yaml`, applies environment overrides, and
// returns the parsed Config. Returns a wrapped error if the file is missing
// or the YAML cannot be unmarshalled.
func LoadConfig() (*Config, error) {
v := viper.New()

v.SetConfigName("config")
v.SetConfigType("yaml")
v.AddConfigPath("./config/")
v.AutomaticEnv()

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Missing SetEnvKeyReplacer makes env var overrides for nested config keys silently ineffective

v.AutomaticEnv() is called at config/config.go:33, and the doc comment at line 1-3 explicitly claims environment variables override file values with _ separators (e.g. REDIS_ADDR). However, Viper maps config key redis.addr to env var REDIS.ADDR by default (uppercased with literal .). Without calling v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")), environment variables like REDIS_ADDR, REDIS_PASSWORD, and REDIS_DB will be silently ignored, and the YAML file values will always win. This makes the env override feature broken despite being wired up.

Suggested change
v.AutomaticEnv()
v.AutomaticEnv()
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


if err := v.ReadInConfig(); err != nil {
log.Fatal(err)
return nil, fmt.Errorf("read config: %w", err)
}

var config Config
if err := v.Unmarshal(&config); err != nil {
log.Fatal(err)
var cfg Config
if err := v.Unmarshal(&cfg); err != nil {
return nil, fmt.Errorf("unmarshal config: %w", err)
}

return &config
return &cfg, nil
}
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ module github.com/Chetas1/taskq

go 1.21.0

require github.com/go-redis/redis v6.15.9+incompatible
require (
github.com/go-redis/redis/v8 v8.11.5
github.com/spf13/viper v1.19.0
)

require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
Expand All @@ -19,7 +21,6 @@ require (
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.19.0 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
Expand Down
Loading
Loading