diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..b1db07d --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,45 @@ +name: CI + +on: + push: + branches: [main] + pull_request: + +permissions: + contents: read + +jobs: + vet: + name: vet + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + with: + go-version-file: go.mod + cache: true + - run: go vet ./... + + lint: + name: lint + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + with: + go-version-file: go.mod + cache: true + - uses: golangci/golangci-lint-action@v9 + with: + version: v2.12 + + build: + name: build + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + with: + go-version-file: go.mod + cache: true + - run: go build ./... diff --git a/.github/workflows/security.yml b/.github/workflows/security.yml new file mode 100644 index 0000000..d121d52 --- /dev/null +++ b/.github/workflows/security.yml @@ -0,0 +1,59 @@ +name: Security + +on: + push: + branches: [main] + pull_request: + schedule: + - cron: '0 6 * * 1' + +permissions: + contents: read + security-events: write + +jobs: + gosec: + name: gosec + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Run gosec + uses: securego/gosec@v2.26.1 + with: + args: '-no-fail -fmt sarif -out gosec.sarif ./...' + - name: Upload SARIF + uses: github/codeql-action/upload-sarif@v3 + with: + sarif_file: gosec.sarif + category: gosec + + govulncheck: + name: govulncheck + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + with: + go-version: 'stable' + cache: true + - run: go install golang.org/x/vuln/cmd/govulncheck@latest + - run: govulncheck ./... + + trivy: + name: trivy-fs + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Run Trivy filesystem scan + uses: aquasecurity/trivy-action@v0.36.0 + with: + scan-type: fs + format: sarif + output: trivy.sarif + ignore-unfixed: true + severity: 'HIGH,CRITICAL' + - name: Upload SARIF + uses: github/codeql-action/upload-sarif@v3 + with: + sarif_file: trivy.sarif + category: trivy diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..e700c63 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,30 @@ +version: "2" + +run: + timeout: 3m + +linters: + default: none + enable: + - bodyclose + - errcheck + - gocritic + - govet + - ineffassign + - misspell + - prealloc + - revive + - staticcheck + - unconvert + - unused + + settings: + revive: + rules: + - name: var-naming + - name: empty-block + - name: unused-parameter + +issues: + max-issues-per-linter: 0 + max-same-issues: 0 diff --git a/README.md b/README.md index 56cf8ca..55e7187 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,134 @@ # taskq -Distributing task across multiple workers to manage the concurrency + +> A scaffold for a Redis-backed distributed task queue in Go — Viper-driven config, single-instance Redis client, ready for fan-out + worker pool implementation. + +[![CI](https://github.com/Chetas1/taskq/actions/workflows/ci.yml/badge.svg)](https://github.com/Chetas1/taskq/actions/workflows/ci.yml) +[![Security](https://github.com/Chetas1/taskq/actions/workflows/security.yml/badge.svg)](https://github.com/Chetas1/taskq/actions/workflows/security.yml) +[![Go Version](https://img.shields.io/badge/Go-1.21+-00ADD8?logo=go&logoColor=white)](https://go.dev/dl/) +[![Redis](https://img.shields.io/badge/Redis-DC382D?logo=redis&logoColor=white)](https://redis.io) + +> **Status: scaffold.** This repo currently boots, loads YAML config, and connects to Redis. The producer/worker logic is the next milestone — see [Roadmap](#roadmap). + +--- + +## Why this exists + +A reference Go scaffold for the building blocks of a distributed task queue: + +- **Viper-driven config** with file + env override (per `12-Factor`). +- **Single, dependency-injected Redis client** (avoids the global-singleton trap). +- **Clear seams** — `config`, `internal/app` — so producer and worker logic can be added without restructuring. + +## Target Architecture + +```mermaid +flowchart LR + subgraph Producers + P1[HTTP / gRPC API] + P2[Cron / scheduler] + end + + subgraph Redis Streams + S[(Stream: tasks)] + G1[Consumer Group: workers] + end + + subgraph Workers + W1[Worker 1] + W2[Worker 2] + Wn[Worker N] + end + + DLQ[(Stream: tasks-dead)] + M[(Metrics / Tracing)] + + P1 -- XADD --> S + P2 -- XADD --> S + S --- G1 + G1 -- XREADGROUP --> W1 + G1 -- XREADGROUP --> W2 + G1 -- XREADGROUP --> Wn + W1 -. retries exhausted .-> DLQ + W2 -. metrics .-> M +``` + +## Today's Code Path + +```mermaid +sequenceDiagram + autonumber + participant Bin as cmd/main + participant Cfg as config.LoadConfig + participant App as app.Init + participant R as Redis + Bin->>Cfg: read config/config.yaml + env + Cfg-->>Bin: *Config{Redis{Addr,Password,DB}} + Bin->>App: Init(cfg) + App->>R: redis.NewClient + PING + R-->>App: PONG +``` + +## Configuration + +`config/config.yaml`: + +```yaml +redis: + addr: "localhost:6379" + password: "" + db: 0 +``` + +Env overrides (`viper.AutomaticEnv` planned): `REDIS_ADDR`, `REDIS_PASSWORD`, `REDIS_DB`. + +## Roadmap + +| Milestone | Status | +|--------------------------------------------------|--------| +| Boot + Redis health-check | ✅ | +| Producer API: `Enqueue(task) -> taskID` | ⏳ | +| Worker pool with at-least-once semantics | ⏳ | +| Retry policy (exponential backoff + max attempts)| ⏳ | +| Dead-letter stream | ⏳ | +| Idempotency keys + dedupe | ⏳ | +| OpenTelemetry tracing | ⏳ | +| Prometheus metrics (queue depth, age, p99 latency) | ⏳ | +| Graceful shutdown (drain in-flight, ack) on SIGTERM | ⏳ | +| Integration tests with `dockertest` / `miniredis`| ⏳ | + +## Operational Concerns (when implemented) + +- **At-least-once delivery.** Plan to use Redis Streams + consumer groups; commit offsets with `XACK` only after handler succeeds. +- **Visibility timeouts.** `XCLAIM` on pending entries past the timeout (e.g. 5× expected handler latency). +- **Retries.** Exponential backoff with jitter; cap retries; drop to DLQ on exhaustion. +- **Backpressure.** Bound stream length with `MAXLEN ~ N`; expose queue depth as a metric. +- **Graceful shutdown.** Stop accepting new tasks, wait for in-flight workers up to a timeout, then `XCLAIM` to a clean-up consumer. + +## Security Considerations + +- **TLS to Redis.** Today: plaintext (`localhost`). Production: `rediss://` with CA bundle and cert pinning. +- **Auth.** Today: `password` in YAML. Production: env var or secret-manager-injected, never committed. +- **Payload trust.** Workers must treat task payloads as untrusted — validate, bound, and rate-limit. +- **Egress from workers.** If workers call external APIs, sandbox with timeouts + circuit breakers. + +## Development + +```bash +# Boot Redis (one-shot) +docker run -d --name redis-taskq -p 6379:6379 redis:7-alpine + +# Run +go run . + +# Lint + vet + vulnerability scan +go vet ./... +golangci-lint run +govulncheck ./... + +# Stop Redis +docker rm -f redis-taskq +``` + +## License + +See [LICENSE](LICENSE). diff --git a/config/config.go b/config/config.go index 2d491c5..cdb56b6 100644 --- a/config/config.go +++ b/config/config.go @@ -1,36 +1,45 @@ +// Package config loads runtime configuration from `config/config.yaml` +// and environment variables (via Viper's AutomaticEnv). Environment variables +// override file values; nested keys use `_` separators (e.g. `REDIS_ADDR`). package config import ( - "log" + "fmt" "github.com/spf13/viper" ) +// Config is the top-level taskq configuration. type Config struct { Redis Redis } +// Redis holds the connection details for the backing Redis instance / cluster. type Redis struct { Addr string Password string DB int } -func LoadConfig() *Config { +// LoadConfig reads `config/config.yaml`, applies environment overrides, and +// returns the parsed Config. Returns a wrapped error if the file is missing +// or the YAML cannot be unmarshalled. +func LoadConfig() (*Config, error) { v := viper.New() v.SetConfigName("config") v.SetConfigType("yaml") v.AddConfigPath("./config/") + v.AutomaticEnv() if err := v.ReadInConfig(); err != nil { - log.Fatal(err) + return nil, fmt.Errorf("read config: %w", err) } - var config Config - if err := v.Unmarshal(&config); err != nil { - log.Fatal(err) + var cfg Config + if err := v.Unmarshal(&cfg); err != nil { + return nil, fmt.Errorf("unmarshal config: %w", err) } - return &config + return &cfg, nil } diff --git a/go.mod b/go.mod index a48149c..bc26b30 100644 --- a/go.mod +++ b/go.mod @@ -2,13 +2,15 @@ module github.com/Chetas1/taskq go 1.21.0 -require github.com/go-redis/redis v6.15.9+incompatible +require ( + github.com/go-redis/redis/v8 v8.11.5 + github.com/spf13/viper v1.19.0 +) require ( github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect - github.com/go-redis/redis/v8 v8.11.5 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect @@ -19,7 +21,6 @@ require ( github.com/spf13/afero v1.11.0 // indirect github.com/spf13/cast v1.6.0 // indirect github.com/spf13/pflag v1.0.5 // indirect - github.com/spf13/viper v1.19.0 // indirect github.com/subosito/gotenv v1.6.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.9.0 // indirect diff --git a/go.sum b/go.sum index 865d5c2..d5cb7ac 100644 --- a/go.sum +++ b/go.sum @@ -2,23 +2,41 @@ github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cb github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= -github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= -github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= +github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= +github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6keLGt6kNQ= github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4= github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= @@ -41,6 +59,7 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= @@ -50,13 +69,21 @@ go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/app/app.go b/internal/app/app.go index 44f79d7..afd62dc 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -1,34 +1,54 @@ +// Package app wires the taskq runtime: config -> Redis client -> (future) +// producer + worker pool. The current implementation only verifies Redis +// connectivity; queue logic is the next milestone (see README roadmap). package app import ( + "context" "fmt" - "sync" + "time" "github.com/Chetas1/taskq/config" - "github.com/go-redis/redis" + "github.com/go-redis/redis/v8" ) -var ( - rdb *redis.Client - once sync.Once -) +// pingTimeout bounds the initial Redis health-check so a misconfigured +// Addr/Password fails fast at boot instead of hanging an operator. +const pingTimeout = 5 * time.Second -func Init(cfg *config.Config) { - initRedis(cfg) +// App holds runtime dependencies. It is intentionally non-singleton so multiple +// instances (e.g. in tests) can coexist without sharing global state. +type App struct { + Redis *redis.Client } -func initRedis(cfg *config.Config) { - once.Do(func() { - rdb = redis.NewClient(&redis.Options{ - Addr: cfg.Redis.Addr, - Password: cfg.Redis.Password, - DB: cfg.Redis.DB, - }) - - err := rdb.Ping().Err() - if err != nil { - fmt.Print(err) - } - fmt.Print("Connected to Redis!") +// New constructs an App, connects to Redis, and verifies the connection with +// PING. Returns a wrapped error if the connection or PING fails. +func New(ctx context.Context, cfg *config.Config) (*App, error) { + rdb := redis.NewClient(&redis.Options{ + Addr: cfg.Redis.Addr, + Password: cfg.Redis.Password, + DB: cfg.Redis.DB, }) + + pingCtx, cancel := context.WithTimeout(ctx, pingTimeout) + defer cancel() + + if err := rdb.Ping(pingCtx).Err(); err != nil { + _ = rdb.Close() + return nil, fmt.Errorf("redis ping %s: %w", cfg.Redis.Addr, err) + } + + return &App{Redis: rdb}, nil +} + +// Close releases the Redis connection pool. Safe to call multiple times. +func (a *App) Close() error { + if a == nil || a.Redis == nil { + return nil + } + if err := a.Redis.Close(); err != nil { + return fmt.Errorf("close redis: %w", err) + } + return nil } diff --git a/main.go b/main.go index a9ecbec..42c282f 100644 --- a/main.go +++ b/main.go @@ -1,11 +1,48 @@ +// Command taskq boots the task-queue runtime: load config, connect to Redis, +// then block until SIGINT/SIGTERM and shut down cleanly. Producer and worker +// loops will be added behind app.App once the queue protocol stabilizes. package main import ( + "context" + "errors" + "log" + "os" + "os/signal" + "syscall" + "github.com/Chetas1/taskq/config" "github.com/Chetas1/taskq/internal/app" ) func main() { - cfg := config.LoadConfig() - app.Init(cfg) + if err := run(); err != nil { + log.Printf("fatal: %v", err) + os.Exit(1) + } +} + +func run() error { + cfg, err := config.LoadConfig() + if err != nil { + return err + } + + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + a, err := app.New(ctx, cfg) + if err != nil { + return err + } + defer func() { + if cerr := a.Close(); cerr != nil && !errors.Is(cerr, context.Canceled) { + log.Printf("close app: %v", cerr) + } + }() + + log.Printf("taskq started; redis=%s db=%d", cfg.Redis.Addr, cfg.Redis.DB) + <-ctx.Done() + log.Printf("shutdown signal received") + return nil }