diff --git a/.gitignore b/.gitignore index 28b0d22..3d97323 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ *.so *.dylib bin/ +worker # Test binary, built with `go test -c` *.test diff --git a/README.md b/README.md index 7116229..75820df 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ A high-performance Go worker service that fetches events from Redis queues and b ## Features -- 🚀 **High Performance**: Concurrent processing of multiple event queues +- 🚀 **High Performance**: Efficient event processing with adaptive batching - 📊 **Adaptive Tuning**: Dynamically adjusts batch sizes and intervals based on queue length - 🔄 **Dead Letter Queue**: Automatic retry mechanism for failed events - 🛡️ **Graceful Shutdown**: Ensures all pending events are flushed before shutdown @@ -13,17 +13,13 @@ A high-performance Go worker service that fetches events from Redis queues and b ## Architecture -The worker processes three types of Discord events: +The worker processes Discord events from Redis and bulk inserts them into PostgreSQL. -- **Event Logs** (`logs:events`) - General Discord events -- **Guardian Logs** (`logs:guardian`) - Moderation/guardian actions -- **Join Logs** (`logs:join`) - User join/leave events +Events flow: -Events are: - -1. Fetched from Redis lists in batches +1. Fetched from Redis list (`logs:events`) in batches 2. Base64-decoded and parsed from JSON -3. Bulk inserted into PostgreSQL tables +3. Bulk inserted into PostgreSQL `events` table 4. Failed events are automatically moved to a DLQ for retry ## Prerequisites @@ -122,49 +118,39 @@ volumes: ## Database Schema -The worker automatically creates the following tables: - -### Events Table +The worker expects the `events` table to exist with the following schema (managed by Drizzle ORM): ```sql CREATE TABLE events ( - id TEXT PRIMARY KEY, + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), guild_id TEXT NOT NULL, - channel_id TEXT, - user_id TEXT, - event_type TEXT NOT NULL, - logged_at TIMESTAMPTZ NOT NULL, - data JSONB, - created_at TIMESTAMPTZ DEFAULT NOW() -); -``` - -### Guardian Logs Table - -```sql -CREATE TABLE guardian_logs ( - id TEXT PRIMARY KEY, - guild_id TEXT NOT NULL, - user_id TEXT, action TEXT NOT NULL, - created_at TIMESTAMPTZ NOT NULL, - data JSONB, - inserted_at TIMESTAMPTZ DEFAULT NOW() + action_thread_id TEXT, + logged_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + actor_id TEXT, + channel_id TEXT, + target_id TEXT, + message_id TEXT, + reason TEXT, + duration INTEGER, + old_value JSONB, + new_value JSONB, + url TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); -``` - -### Join Logs Table -```sql -CREATE TABLE join_logs ( - id TEXT PRIMARY KEY, - guild_id TEXT NOT NULL, - user_id TEXT NOT NULL, - event_type TEXT NOT NULL, - created_at TIMESTAMPTZ NOT NULL, - data JSONB, - inserted_at TIMESTAMPTZ DEFAULT NOW() -); +-- Indexes +CREATE INDEX events_guild_id_idx ON events(guild_id); +CREATE INDEX events_action_idx ON events(action); +CREATE INDEX events_logged_at_idx ON events(logged_at); +CREATE INDEX events_actor_id_idx ON events(actor_id); +CREATE INDEX events_target_id_idx ON events(target_id); +CREATE INDEX events_action_thread_id_idx ON events(action_thread_id); +CREATE INDEX events_guild_logged_idx ON events(guild_id, logged_at); +CREATE INDEX events_guild_action_logged_idx ON events(guild_id, action, logged_at); +CREATE INDEX events_guild_actor_logged_idx ON events(guild_id, actor_id, logged_at); +CREATE INDEX events_guild_target_logged_idx ON events(guild_id, target_id, logged_at); ``` ## Adaptive Tuning @@ -199,7 +185,7 @@ The worker provides structured JSON logs suitable for log aggregation systems: "count": 1000, "batch_size": 1000, "interval": 5000000000, - "message": "✅ Flushed event logs" + "message": "✅ Flushed events" } ``` diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 6e73afc..77a51db 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -90,111 +90,42 @@ func workerLoop(ctx context.Context, redisClient *redis.Client, db *database.DB, case <-ctx.Done(): return case <-ticker.C: - // Get queue lengths + // Get queue length for events eventsLen, _ := redisClient.LLen(ctx, "logs:events") - guardianLen, _ := redisClient.LLen(ctx, "logs:guardian") - joinLen, _ := redisClient.LLen(ctx, "logs:join") + totalQueue := int(eventsLen) - totalQueue := int(eventsLen + guardianLen + joinLen) - - // Adjust tuning + // Adjust tuning based on queue depth batchSize, interval := tuning.AdjustTuning(totalQueue, cfg.BatchSize, cfg.FlushInterval) // Update ticker interval ticker.Reset(interval) - // Flush logs concurrently - var wg sync.WaitGroup - wg.Add(3) - - // Events - go func() { - defer wg.Done() - count, err := flush.FlushLogs( - ctx, - redisClient, - db, - "logs:events", - batchSize, - db.BulkInsertEvents, - ) - if err != nil { - log.Error().Err(err).Msg("❌ Error flushing events") - } else if count > 0 { - log.Info(). - Int("count", count). - Int("batch_size", batchSize). - Dur("interval", interval). - Msg("✅ Flushed event logs") - } - }() - - // Guardian - go func() { - defer wg.Done() - count, err := flush.FlushLogs( - ctx, - redisClient, - db, - "logs:guardian", - batchSize, - db.BulkInsertGuardianLogs, - ) - if err != nil { - log.Error().Err(err).Msg("❌ Error flushing guardian logs") - } else if count > 0 { - log.Info(). - Int("count", count). - Int("batch_size", batchSize). - Dur("interval", interval). - Msg("✅ Flushed guardian logs") - } - }() - - // Join - go func() { - defer wg.Done() - count, err := flush.FlushLogs( - ctx, - redisClient, - db, - "logs:join", - batchSize, - db.BulkInsertJoinLogs, - ) - if err != nil { - log.Error().Err(err).Msg("❌ Error flushing join logs") - } else if count > 0 { - log.Info(). - Int("count", count). - Int("batch_size", batchSize). - Dur("interval", interval). - Msg("✅ Flushed join logs") - } - }() - - wg.Wait() + // Flush events + count, err := flush.FlushLogs( + ctx, + redisClient, + "logs:events", + batchSize, + db.BulkInsertEvents, + ) + if err != nil { + log.Error().Err(err).Msg("❌ Error flushing events") + } else if count > 0 { + log.Info(). + Int("count", count). + Int("batch_size", batchSize). + Dur("interval", interval). + Msg("✅ Flushed events") + } } } } func flushRemaining(ctx context.Context, redisClient *redis.Client, db *database.DB, batchSize int) { - // Flush events - count, _ := flush.FlushLogs(ctx, redisClient, db, "logs:events", batchSize, db.BulkInsertEvents) - if count > 0 { - log.Info().Int("count", count).Msg("Flushed event logs on shutdown") - } - - // Flush guardian - count, _ = flush.FlushLogs(ctx, redisClient, db, "logs:guardian", batchSize, db.BulkInsertGuardianLogs) - if count > 0 { - log.Info().Int("count", count).Msg("Flushed guardian logs on shutdown") - } - - // Flush join - count, _ = flush.FlushLogs(ctx, redisClient, db, "logs:join", batchSize, db.BulkInsertJoinLogs) + // Flush remaining events + count, _ := flush.FlushLogs(ctx, redisClient, "logs:events", batchSize, db.BulkInsertEvents) if count > 0 { - log.Info().Int("count", count).Msg("Flushed join logs on shutdown") + log.Info().Int("count", count).Msg("Flushed events on shutdown") } } diff --git a/go.mod b/go.mod index bd9c4b6..5c686d8 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,16 @@ module github.com/bueapp/worker -go 1.25.3 +go 1.23.0 -require github.com/stretchr/testify v1.11.1 +toolchain go1.24.10 + +require ( + github.com/jackc/pgx/v5 v5.7.6 + github.com/joho/godotenv v1.5.1 + github.com/redis/go-redis/v9 v9.17.0 + github.com/rs/zerolog v1.34.0 + github.com/stretchr/testify v1.11.1 +) require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -10,14 +18,12 @@ require ( github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect - github.com/jackc/pgx/v5 v5.7.6 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect - github.com/joho/godotenv v1.5.1 // indirect + github.com/kr/text v0.2.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/redis/go-redis/v9 v9.17.0 // indirect - github.com/rs/zerolog v1.34.0 // indirect + github.com/rogpeppe/go-internal v1.14.1 // indirect github.com/stretchr/objx v0.5.2 // indirect golang.org/x/crypto v0.37.0 // indirect golang.org/x/sync v0.13.0 // indirect diff --git a/go.sum b/go.sum index 28b2e2e..4c91df7 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,11 @@ +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -17,6 +22,10 @@ github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= @@ -27,6 +36,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.17.0 h1:K6E+ZlYN95KSMmZeEQPbU/c++wfmEvfFB17yEAq/VhM= github.com/redis/go-redis/v9 v9.17.0/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY= github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ= @@ -49,6 +60,8 @@ golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/database/database.go b/internal/database/database.go index 4585ff2..96667e5 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -21,11 +21,8 @@ func New(ctx context.Context, databaseURL string) (*DB, error) { return nil, fmt.Errorf("parse database URL: %w", err) } - // Configure connection to prefer simple protocol and disable TLS if not in URL - // This handles servers that don't support TLS - if config.ConnConfig.TLSConfig == nil { - config.ConnConfig.TLSConfig = nil // Disable TLS - } + // TLS configuration is handled by pgx based on the connection URL + // No additional TLS configuration needed here pool, err := pgxpool.NewWithConfig(ctx, config) if err != nil { @@ -39,163 +36,58 @@ func New(ctx context.Context, databaseURL string) (*DB, error) { db := &DB{pool: pool} - // Ensure tables exist - if err := db.ensureTables(ctx); err != nil { - return nil, fmt.Errorf("ensure tables: %w", err) - } - return db, nil } -// ensureTables creates the necessary tables if they don't exist -func (db *DB) ensureTables(ctx context.Context) error { - queries := []string{ - `CREATE TABLE IF NOT EXISTS events ( - id TEXT PRIMARY KEY, - guild_id TEXT NOT NULL, - channel_id TEXT, - user_id TEXT, - event_type TEXT NOT NULL, - logged_at TIMESTAMPTZ NOT NULL, - data JSONB, - created_at TIMESTAMPTZ DEFAULT NOW() - )`, - `CREATE INDEX IF NOT EXISTS idx_events_guild_id ON events(guild_id)`, - `CREATE INDEX IF NOT EXISTS idx_events_logged_at ON events(logged_at)`, - - `CREATE TABLE IF NOT EXISTS guardian_logs ( - id TEXT PRIMARY KEY, - guild_id TEXT NOT NULL, - user_id TEXT, - action TEXT NOT NULL, - created_at TIMESTAMPTZ NOT NULL, - data JSONB, - inserted_at TIMESTAMPTZ DEFAULT NOW() - )`, - `CREATE INDEX IF NOT EXISTS idx_guardian_logs_guild_id ON guardian_logs(guild_id)`, - `CREATE INDEX IF NOT EXISTS idx_guardian_logs_created_at ON guardian_logs(created_at)`, - - `CREATE TABLE IF NOT EXISTS join_logs ( - id TEXT PRIMARY KEY, - guild_id TEXT NOT NULL, - user_id TEXT NOT NULL, - event_type TEXT NOT NULL, - created_at TIMESTAMPTZ NOT NULL, - data JSONB, - inserted_at TIMESTAMPTZ DEFAULT NOW() - )`, - `CREATE INDEX IF NOT EXISTS idx_join_logs_guild_id ON join_logs(guild_id)`, - `CREATE INDEX IF NOT EXISTS idx_join_logs_created_at ON join_logs(created_at)`, - } - - for _, query := range queries { - if _, err := db.pool.Exec(ctx, query); err != nil { - return fmt.Errorf("execute query: %w", err) - } - } - - return nil -} - -// BulkInsertEvents inserts multiple event logs in a single transaction +// BulkInsertEvents inserts multiple events in a single transaction using the flattened schema func (db *DB) BulkInsertEvents(ctx context.Context, logs []map[string]interface{}) (int, error) { if len(logs) == 0 { return 0, nil } batch := &pgx.Batch{} - for _, log := range logs { - dataJSON, _ := json.Marshal(log["data"]) - - batch.Queue( - `INSERT INTO events (id, guild_id, channel_id, user_id, event_type, logged_at, data) - VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT (id) DO NOTHING`, - log["id"], - log["guildId"], - log["channelId"], - log["userId"], - log["eventType"], - log["loggedAt"], - dataJSON, - ) - } - - results := db.pool.SendBatch(ctx, batch) - defer results.Close() - - inserted := 0 - for i := 0; i < len(logs); i++ { - _, err := results.Exec() - if err != nil { - log.Warn().Err(err).Msg("Failed to insert event log") - } else { - inserted++ + for _, logData := range logs { + // Marshal oldValue to JSONB if present + var oldValueJSON []byte + if oldValue, ok := logData["oldValue"]; ok && oldValue != nil { + var err error + oldValueJSON, err = json.Marshal(oldValue) + if err != nil { + log.Warn().Err(err).Msg("Failed to marshal oldValue to JSON") + continue + } } - } - return inserted, nil -} - -// BulkInsertGuardianLogs inserts multiple guardian logs in a single transaction -func (db *DB) BulkInsertGuardianLogs(ctx context.Context, logs []map[string]interface{}) (int, error) { - if len(logs) == 0 { - return 0, nil - } - - batch := &pgx.Batch{} - for _, log := range logs { - dataJSON, _ := json.Marshal(log["data"]) - - batch.Queue( - `INSERT INTO guardian_logs (id, guild_id, user_id, action, created_at, data) - VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT (id) DO NOTHING`, - log["id"], - log["guildId"], - log["userId"], - log["action"], - log["createdAt"], - dataJSON, - ) - } - - results := db.pool.SendBatch(ctx, batch) - defer results.Close() - - inserted := 0 - for i := 0; i < len(logs); i++ { - _, err := results.Exec() - if err != nil { - log.Warn().Err(err).Msg("Failed to insert guardian log") - } else { - inserted++ + // Marshal newValue to JSONB if present + var newValueJSON []byte + if newValue, ok := logData["newValue"]; ok && newValue != nil { + var err error + newValueJSON, err = json.Marshal(newValue) + if err != nil { + log.Warn().Err(err).Msg("Failed to marshal newValue to JSON") + continue + } } - } - - return inserted, nil -} - -// BulkInsertJoinLogs inserts multiple join logs in a single transaction -func (db *DB) BulkInsertJoinLogs(ctx context.Context, logs []map[string]interface{}) (int, error) { - if len(logs) == 0 { - return 0, nil - } - - batch := &pgx.Batch{} - for _, log := range logs { - dataJSON, _ := json.Marshal(log["data"]) batch.Queue( - `INSERT INTO join_logs (id, guild_id, user_id, event_type, created_at, data) - VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT (id) DO NOTHING`, - log["id"], - log["guildId"], - log["userId"], - log["eventType"], - log["createdAt"], - dataJSON, + `INSERT INTO events ( + guild_id, action, action_thread_id, logged_at, + actor_id, channel_id, target_id, message_id, + reason, duration, old_value, new_value, url + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)`, + logData["guildId"], + logData["action"], + logData["actionThreadId"], + logData["loggedAt"], + logData["actorId"], + logData["channelId"], + logData["targetId"], + logData["messageId"], + logData["reason"], + logData["duration"], + oldValueJSON, + newValueJSON, + logData["url"], ) } @@ -203,10 +95,10 @@ func (db *DB) BulkInsertJoinLogs(ctx context.Context, logs []map[string]interfac defer results.Close() inserted := 0 - for i := 0; i < len(logs); i++ { + for i := 0; i < batch.Len(); i++ { _, err := results.Exec() if err != nil { - log.Warn().Err(err).Msg("Failed to insert join log") + log.Warn().Err(err).Msg("Failed to insert event") } else { inserted++ } diff --git a/internal/database/database_test.go b/internal/database/database_test.go index dbd02ab..35da982 100644 --- a/internal/database/database_test.go +++ b/internal/database/database_test.go @@ -20,16 +20,18 @@ func TestBulkInsertEvents(t *testing.T) { expected: 0, }, { - name: "single event", + name: "single event with flattened structure", logs: []map[string]interface{}{ { - "id": "event-1", "guildId": "guild-123", - "channelId": "channel-456", - "userId": "user-789", - "eventType": "message", + "action": "USER_BAN", "loggedAt": time.Now(), - "data": map[string]string{"content": "test"}, + "actorId": "actor-456", + "channelId": "channel-789", + "targetId": "target-101", + "reason": "spam", + "oldValue": map[string]string{"status": "active"}, + "newValue": map[string]string{"status": "banned"}, }, }, wantErr: false, @@ -39,22 +41,18 @@ func TestBulkInsertEvents(t *testing.T) { name: "multiple events", logs: []map[string]interface{}{ { - "id": "event-1", - "guildId": "guild-123", - "channelId": "channel-456", - "userId": "user-789", - "eventType": "message", - "loggedAt": time.Now(), - "data": map[string]string{"content": "test1"}, + "guildId": "guild-123", + "action": "MESSAGE_DELETE", + "loggedAt": time.Now(), + "actorId": "actor-456", }, { - "id": "event-2", - "guildId": "guild-123", - "channelId": "channel-456", - "userId": "user-789", - "eventType": "message", - "loggedAt": time.Now(), - "data": map[string]string{"content": "test2"}, + "guildId": "guild-123", + "action": "MEMBER_KICK", + "loggedAt": time.Now(), + "actorId": "actor-789", + "targetId": "target-101", + "reason": "violation", }, }, wantErr: false, @@ -76,84 +74,4 @@ func TestBulkInsertEvents(t *testing.T) { } }) } -} - -func TestBulkInsertGuardianLogs(t *testing.T) { - tests := []struct { - name string - logs []map[string]interface{} - expected int - }{ - { - name: "empty logs", - logs: []map[string]interface{}{}, - expected: 0, - }, - { - name: "single guardian log", - logs: []map[string]interface{}{ - { - "id": "guard-1", - "guildId": "guild-123", - "userId": "user-789", - "action": "ban", - "createdAt": time.Now(), - "data": map[string]string{"reason": "spam"}, - }, - }, - expected: 1, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if tt.name == "empty logs" { - db := &DB{} - got, _ := db.BulkInsertGuardianLogs(context.Background(), tt.logs) - if got != tt.expected { - t.Errorf("BulkInsertGuardianLogs() got = %v, want %v", got, tt.expected) - } - } - }) - } -} - -func TestBulkInsertJoinLogs(t *testing.T) { - tests := []struct { - name string - logs []map[string]interface{} - expected int - }{ - { - name: "empty logs", - logs: []map[string]interface{}{}, - expected: 0, - }, - { - name: "single join log", - logs: []map[string]interface{}{ - { - "id": "join-1", - "guildId": "guild-123", - "userId": "user-789", - "eventType": "join", - "createdAt": time.Now(), - "data": map[string]string{"status": "success"}, - }, - }, - expected: 1, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if tt.name == "empty logs" { - db := &DB{} - got, _ := db.BulkInsertJoinLogs(context.Background(), tt.logs) - if got != tt.expected { - t.Errorf("BulkInsertJoinLogs() got = %v, want %v", got, tt.expected) - } - } - }) - } } \ No newline at end of file diff --git a/internal/dlq/dlq.go b/internal/dlq/dlq.go index b024d34..046df14 100644 --- a/internal/dlq/dlq.go +++ b/internal/dlq/dlq.go @@ -86,15 +86,21 @@ func processFailedLogs( return nil } - // Try to insert into events table (you might want to route to the correct table) + // Try to insert into events table count, err := db.BulkInsertEvents(ctx, parsedLogs) if err != nil { log.Error().Err(err).Msg("Failed to reprocess DLQ logs, pushing back") // Push back to DLQ for _, logData := range parsedLogs { - jsonBytes, _ := json.Marshal(logData) + jsonBytes, marshalErr := json.Marshal(logData) + if marshalErr != nil { + log.Error().Err(marshalErr).Msg("Failed to marshal log data for DLQ") + continue + } encoded := base64.StdEncoding.EncodeToString(jsonBytes) - _ = redisClient.LPush(ctx, "logs:failed", encoded) + if pushErr := redisClient.LPush(ctx, "logs:failed", encoded); pushErr != nil { + log.Error().Err(pushErr).Msg("Failed to push back to DLQ") + } } return err } diff --git a/internal/flush/flush.go b/internal/flush/flush.go index fda5c62..a68dc51 100644 --- a/internal/flush/flush.go +++ b/internal/flush/flush.go @@ -20,7 +20,6 @@ type RedisClient interface { func FlushLogs( ctx context.Context, redisClient RedisClient, - db interface{}, redisKey string, batchSize int, insertFunc func(context.Context, []map[string]interface{}) (int, error), @@ -67,7 +66,11 @@ func FlushLogs( if err != nil { // On failure, push to DLQ for _, logData := range parsedLogs { - jsonBytes, _ := json.Marshal(logData) + jsonBytes, marshalErr := json.Marshal(logData) + if marshalErr != nil { + log.Error().Err(marshalErr).Msg("Failed to marshal log data for DLQ") + continue + } encoded := base64.StdEncoding.EncodeToString(jsonBytes) if pushErr := redisClient.LPush(ctx, "logs:failed", encoded); pushErr != nil { log.Error().Err(pushErr).Msg("Failed to push to DLQ") diff --git a/internal/flush/flush_test.go b/internal/flush/flush_test.go index dcf4dcd..60aab3e 100644 --- a/internal/flush/flush_test.go +++ b/internal/flush/flush_test.go @@ -28,7 +28,7 @@ func (m *mockRedisClient) LPush(ctx context.Context, key string, values ...inter func TestFlushLogsSuccess(t *testing.T) { ctx := context.Background() - logData := map[string]interface{}{"message": "test", "level": "info"} + logData := map[string]interface{}{"guildId": "guild-123", "action": "USER_BAN"} jsonBytes, _ := json.Marshal(logData) encoded := base64.StdEncoding.EncodeToString(jsonBytes) @@ -48,7 +48,7 @@ func TestFlushLogsSuccess(t *testing.T) { return len(logs), nil } - count, err := FlushLogs(ctx, redis, nil, "logs", 10, insertFunc) + count, err := FlushLogs(ctx, redis, "logs:events", 10, insertFunc) if err != nil || count != 1 { t.Errorf("expected count=1 and no error, got count=%d and err=%v", count, err) } @@ -68,7 +68,7 @@ func TestFlushLogsEmptyRedis(t *testing.T) { }, } - count, err := FlushLogs(ctx, redis, nil, "logs", 10, nil) + count, err := FlushLogs(ctx, redis, "logs:events", 10, nil) if err != nil || count != 0 { t.Errorf("expected count=0 and no error, got count=%d and err=%v", count, err) } @@ -88,7 +88,7 @@ func TestFlushLogsLRangeError(t *testing.T) { }, } - _, err := FlushLogs(ctx, redis, nil, "logs", 10, nil) + _, err := FlushLogs(ctx, redis, "logs:events", 10, nil) if err == nil { t.Error("expected error from LRange, got nil") } @@ -96,7 +96,7 @@ func TestFlushLogsLRangeError(t *testing.T) { func TestFlushLogsInsertError(t *testing.T) { ctx := context.Background() - logData := map[string]interface{}{"message": "test"} + logData := map[string]interface{}{"guildId": "guild-123", "action": "USER_BAN"} jsonBytes, _ := json.Marshal(logData) encoded := base64.StdEncoding.EncodeToString(jsonBytes) @@ -116,7 +116,7 @@ func TestFlushLogsInsertError(t *testing.T) { return 0, errors.New("insert failed") } - _, err := FlushLogs(ctx, redis, nil, "logs", 10, insertFunc) + _, err := FlushLogs(ctx, redis, "logs:events", 10, insertFunc) if err == nil { t.Error("expected error from insert, got nil") } diff --git a/internal/models/models.go b/internal/models/models.go index 8be3c7d..7d0add9 100644 --- a/internal/models/models.go +++ b/internal/models/models.go @@ -2,33 +2,23 @@ package models import "time" -// EventLog represents a Discord event log entry -type EventLog struct { - ID string `json:"id"` - GuildID string `json:"guildId"` - ChannelID string `json:"channelId,omitempty"` - UserID string `json:"userId,omitempty"` - EventType string `json:"eventType"` - LoggedAt time.Time `json:"loggedAt"` - Data map[string]interface{} `json:"data,omitempty"` -} - -// GuardianLog represents a guardian action log -type GuardianLog struct { - ID string `json:"id"` - GuildID string `json:"guildId"` - UserID string `json:"userId,omitempty"` - Action string `json:"action"` - CreatedAt time.Time `json:"createdAt"` - Data map[string]interface{} `json:"data,omitempty"` -} - -// JoinLog represents a user join/leave log -type JoinLog struct { - ID string `json:"id"` - GuildID string `json:"guildId"` - UserID string `json:"userId"` - EventType string `json:"eventType"` - CreatedAt time.Time `json:"createdAt"` - Data map[string]interface{} `json:"data,omitempty"` +// Event represents a Discord event log entry with flattened structure +// This matches the events table schema from Drizzle ORM +type Event struct { + ID string `json:"id,omitempty"` + GuildID string `json:"guildId"` + Action string `json:"action"` + ActionThreadID *string `json:"actionThreadId,omitempty"` + LoggedAt time.Time `json:"loggedAt"` + ActorID *string `json:"actorId,omitempty"` + ChannelID *string `json:"channelId,omitempty"` + TargetID *string `json:"targetId,omitempty"` + MessageID *string `json:"messageId,omitempty"` + Reason *string `json:"reason,omitempty"` + Duration *int `json:"duration,omitempty"` + OldValue interface{} `json:"oldValue,omitempty"` + NewValue interface{} `json:"newValue,omitempty"` + URL *string `json:"url,omitempty"` + CreatedAt time.Time `json:"createdAt,omitempty"` + UpdatedAt time.Time `json:"updatedAt,omitempty"` } diff --git a/internal/models/models_test.go b/internal/models/models_test.go index c0c09aa..587a825 100644 --- a/internal/models/models_test.go +++ b/internal/models/models_test.go @@ -6,81 +6,77 @@ import ( "time" ) -func TestEventLogJSON(t *testing.T) { +func TestEventJSON(t *testing.T) { now := time.Now() - el := EventLog{ - ID: "1", + actorID := "actor123" + channelID := "channel123" + targetID := "target123" + reason := "test reason" + + event := Event{ + ID: "uuid-1", GuildID: "guild123", - ChannelID: "channel123", - UserID: "user123", - EventType: "message", + Action: "USER_BAN", LoggedAt: now, - Data: map[string]interface{}{"test": "data"}, + ActorID: &actorID, + ChannelID: &channelID, + TargetID: &targetID, + Reason: &reason, + OldValue: map[string]interface{}{"status": "active"}, + NewValue: map[string]interface{}{"status": "banned"}, } - data, err := json.Marshal(el) + data, err := json.Marshal(event) if err != nil { - t.Fatalf("Failed to marshal EventLog: %v", err) + t.Fatalf("Failed to marshal Event: %v", err) } - var el2 EventLog - if err := json.Unmarshal(data, &el2); err != nil { - t.Fatalf("Failed to unmarshal EventLog: %v", err) + var event2 Event + if err := json.Unmarshal(data, &event2); err != nil { + t.Fatalf("Failed to unmarshal Event: %v", err) } - if el2.ID != el.ID || el2.GuildID != el.GuildID { - t.Errorf("EventLog mismatch after JSON round-trip") + if event2.GuildID != event.GuildID || event2.Action != event.Action { + t.Errorf("Event mismatch after JSON round-trip") } -} -func TestGuardianLogJSON(t *testing.T) { - now := time.Now() - gl := GuardianLog{ - ID: "1", - GuildID: "guild123", - UserID: "user123", - Action: "kick", - CreatedAt: now, - Data: map[string]interface{}{"reason": "spam"}, + if event2.ActorID == nil || *event2.ActorID != actorID { + t.Errorf("Event ActorID mismatch after JSON round-trip") } - data, err := json.Marshal(gl) - if err != nil { - t.Fatalf("Failed to marshal GuardianLog: %v", err) + if event2.Reason == nil || *event2.Reason != reason { + t.Errorf("Event Reason mismatch after JSON round-trip") } +} - var gl2 GuardianLog - if err := json.Unmarshal(data, &gl2); err != nil { - t.Fatalf("Failed to unmarshal GuardianLog: %v", err) +func TestEventWithNilOptionalFields(t *testing.T) { + now := time.Now() + + event := Event{ + GuildID: "guild123", + Action: "MESSAGE_DELETE", + LoggedAt: now, } - if gl2.Action != gl.Action { - t.Errorf("GuardianLog mismatch after JSON round-trip") + data, err := json.Marshal(event) + if err != nil { + t.Fatalf("Failed to marshal Event with nil fields: %v", err) } -} -func TestJoinLogJSON(t *testing.T) { - now := time.Now() - jl := JoinLog{ - ID: "1", - GuildID: "guild123", - UserID: "user123", - EventType: "join", - CreatedAt: now, - Data: map[string]interface{}{"bot": false}, + var event2 Event + if err := json.Unmarshal(data, &event2); err != nil { + t.Fatalf("Failed to unmarshal Event with nil fields: %v", err) } - data, err := json.Marshal(jl) - if err != nil { - t.Fatalf("Failed to marshal JoinLog: %v", err) + if event2.ActorID != nil { + t.Errorf("Expected ActorID to be nil") } - var jl2 JoinLog - if err := json.Unmarshal(data, &jl2); err != nil { - t.Fatalf("Failed to unmarshal JoinLog: %v", err) + if event2.ChannelID != nil { + t.Errorf("Expected ChannelID to be nil") } - if jl2.UserID != jl.UserID || jl2.EventType != jl.EventType { - t.Errorf("JoinLog mismatch after JSON round-trip") + if event2.OldValue != nil { + t.Errorf("Expected OldValue to be nil") } }