Skip to content
This repository was archived by the owner on Dec 7, 2025. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*.so
*.dylib
bin/
worker

# Test binary, built with `go test -c`
*.test
Expand Down
78 changes: 32 additions & 46 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ A high-performance Go worker service that fetches events from Redis queues and b

## Features

- 🚀 **High Performance**: Concurrent processing of multiple event queues
- 🚀 **High Performance**: Efficient event processing with adaptive batching
- 📊 **Adaptive Tuning**: Dynamically adjusts batch sizes and intervals based on queue length
- 🔄 **Dead Letter Queue**: Automatic retry mechanism for failed events
- 🛡️ **Graceful Shutdown**: Ensures all pending events are flushed before shutdown
Expand All @@ -13,17 +13,13 @@ A high-performance Go worker service that fetches events from Redis queues and b

## Architecture

The worker processes three types of Discord events:
The worker processes Discord events from Redis and bulk inserts them into PostgreSQL.

- **Event Logs** (`logs:events`) - General Discord events
- **Guardian Logs** (`logs:guardian`) - Moderation/guardian actions
- **Join Logs** (`logs:join`) - User join/leave events
Events flow:

Events are:

1. Fetched from Redis lists in batches
1. Fetched from Redis list (`logs:events`) in batches
2. Base64-decoded and parsed from JSON
3. Bulk inserted into PostgreSQL tables
3. Bulk inserted into PostgreSQL `events` table
4. Failed events are automatically moved to a DLQ for retry

## Prerequisites
Expand Down Expand Up @@ -122,49 +118,39 @@ volumes:

## Database Schema

The worker automatically creates the following tables:

### Events Table
The worker expects the `events` table to exist with the following schema (managed by Drizzle ORM):

```sql
CREATE TABLE events (
id TEXT PRIMARY KEY,
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
guild_id TEXT NOT NULL,
channel_id TEXT,
user_id TEXT,
event_type TEXT NOT NULL,
logged_at TIMESTAMPTZ NOT NULL,
data JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
```

### Guardian Logs Table

```sql
CREATE TABLE guardian_logs (
id TEXT PRIMARY KEY,
guild_id TEXT NOT NULL,
user_id TEXT,
action TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
data JSONB,
inserted_at TIMESTAMPTZ DEFAULT NOW()
action_thread_id TEXT,
logged_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
actor_id TEXT,
channel_id TEXT,
target_id TEXT,
message_id TEXT,
reason TEXT,
duration INTEGER,
old_value JSONB,
new_value JSONB,
url TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
```

### Join Logs Table

```sql
CREATE TABLE join_logs (
id TEXT PRIMARY KEY,
guild_id TEXT NOT NULL,
user_id TEXT NOT NULL,
event_type TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
data JSONB,
inserted_at TIMESTAMPTZ DEFAULT NOW()
);
-- Indexes
CREATE INDEX events_guild_id_idx ON events(guild_id);
CREATE INDEX events_action_idx ON events(action);
CREATE INDEX events_logged_at_idx ON events(logged_at);
CREATE INDEX events_actor_id_idx ON events(actor_id);
CREATE INDEX events_target_id_idx ON events(target_id);
CREATE INDEX events_action_thread_id_idx ON events(action_thread_id);
CREATE INDEX events_guild_logged_idx ON events(guild_id, logged_at);
CREATE INDEX events_guild_action_logged_idx ON events(guild_id, action, logged_at);
CREATE INDEX events_guild_actor_logged_idx ON events(guild_id, actor_id, logged_at);
CREATE INDEX events_guild_target_logged_idx ON events(guild_id, target_id, logged_at);
```

## Adaptive Tuning
Expand Down Expand Up @@ -199,7 +185,7 @@ The worker provides structured JSON logs suitable for log aggregation systems:
"count": 1000,
"batch_size": 1000,
"interval": 5000000000,
"message": "✅ Flushed event logs"
"message": "✅ Flushed events"
}
```

Expand Down
115 changes: 23 additions & 92 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,111 +90,42 @@ func workerLoop(ctx context.Context, redisClient *redis.Client, db *database.DB,
case <-ctx.Done():
return
case <-ticker.C:
// Get queue lengths
// Get queue length for events
eventsLen, _ := redisClient.LLen(ctx, "logs:events")
guardianLen, _ := redisClient.LLen(ctx, "logs:guardian")
joinLen, _ := redisClient.LLen(ctx, "logs:join")
totalQueue := int(eventsLen)

totalQueue := int(eventsLen + guardianLen + joinLen)

// Adjust tuning
// Adjust tuning based on queue depth
batchSize, interval := tuning.AdjustTuning(totalQueue, cfg.BatchSize, cfg.FlushInterval)

// Update ticker interval
ticker.Reset(interval)

// Flush logs concurrently
var wg sync.WaitGroup
wg.Add(3)

// Events
go func() {
defer wg.Done()
count, err := flush.FlushLogs(
ctx,
redisClient,
db,
"logs:events",
batchSize,
db.BulkInsertEvents,
)
if err != nil {
log.Error().Err(err).Msg("❌ Error flushing events")
} else if count > 0 {
log.Info().
Int("count", count).
Int("batch_size", batchSize).
Dur("interval", interval).
Msg("✅ Flushed event logs")
}
}()

// Guardian
go func() {
defer wg.Done()
count, err := flush.FlushLogs(
ctx,
redisClient,
db,
"logs:guardian",
batchSize,
db.BulkInsertGuardianLogs,
)
if err != nil {
log.Error().Err(err).Msg("❌ Error flushing guardian logs")
} else if count > 0 {
log.Info().
Int("count", count).
Int("batch_size", batchSize).
Dur("interval", interval).
Msg("✅ Flushed guardian logs")
}
}()

// Join
go func() {
defer wg.Done()
count, err := flush.FlushLogs(
ctx,
redisClient,
db,
"logs:join",
batchSize,
db.BulkInsertJoinLogs,
)
if err != nil {
log.Error().Err(err).Msg("❌ Error flushing join logs")
} else if count > 0 {
log.Info().
Int("count", count).
Int("batch_size", batchSize).
Dur("interval", interval).
Msg("✅ Flushed join logs")
}
}()

wg.Wait()
// Flush events
count, err := flush.FlushLogs(
ctx,
redisClient,
"logs:events",
batchSize,
db.BulkInsertEvents,
)
if err != nil {
log.Error().Err(err).Msg("❌ Error flushing events")
} else if count > 0 {
log.Info().
Int("count", count).
Int("batch_size", batchSize).
Dur("interval", interval).
Msg("✅ Flushed events")
}
}
}
}

func flushRemaining(ctx context.Context, redisClient *redis.Client, db *database.DB, batchSize int) {
// Flush events
count, _ := flush.FlushLogs(ctx, redisClient, db, "logs:events", batchSize, db.BulkInsertEvents)
if count > 0 {
log.Info().Int("count", count).Msg("Flushed event logs on shutdown")
}

// Flush guardian
count, _ = flush.FlushLogs(ctx, redisClient, db, "logs:guardian", batchSize, db.BulkInsertGuardianLogs)
if count > 0 {
log.Info().Int("count", count).Msg("Flushed guardian logs on shutdown")
}

// Flush join
count, _ = flush.FlushLogs(ctx, redisClient, db, "logs:join", batchSize, db.BulkInsertJoinLogs)
// Flush remaining events
count, _ := flush.FlushLogs(ctx, redisClient, "logs:events", batchSize, db.BulkInsertEvents)
if count > 0 {
log.Info().Int("count", count).Msg("Flushed join logs on shutdown")
log.Info().Int("count", count).Msg("Flushed events on shutdown")
}
}

Expand Down
18 changes: 12 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
module github.com/bueapp/worker

go 1.25.3
go 1.23.0

require github.com/stretchr/testify v1.11.1
toolchain go1.24.10

require (
github.com/jackc/pgx/v5 v5.7.6
github.com/joho/godotenv v1.5.1
github.com/redis/go-redis/v9 v9.17.0
github.com/rs/zerolog v1.34.0
github.com/stretchr/testify v1.11.1
)

require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/pgx/v5 v5.7.6 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/joho/godotenv v1.5.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/redis/go-redis/v9 v9.17.0 // indirect
github.com/rs/zerolog v1.34.0 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect
github.com/stretchr/objx v0.5.2 // indirect
golang.org/x/crypto v0.37.0 // indirect
golang.org/x/sync v0.13.0 // indirect
Expand Down
13 changes: 13 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -17,6 +22,10 @@ github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
Expand All @@ -27,6 +36,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.17.0 h1:K6E+ZlYN95KSMmZeEQPbU/c++wfmEvfFB17yEAq/VhM=
github.com/redis/go-redis/v9 v9.17.0/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0=
github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY=
github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ=
Expand All @@ -49,6 +60,8 @@ golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Loading