Skip to content

Conversation

@andrewwormald
Copy link
Collaborator

Implement the EventStreamer interface and support test containers

@coderabbitai
Copy link

coderabbitai bot commented Dec 12, 2025

Walkthrough

Adds a Redis-backed Event Streamer implementation in adapters/wredis/streamer.go with Sender and Receiver using Redis Streams (XADD, XREADGROUP, XACK), consumer-group management, JSON event marshalling, stream-ID parsing and polling behaviour. Exposes NewStreamer, NewSender and NewReceiver. Adds tests: parsestream_test.go (stream ID parsing, edge cases, error handling, polling, ack behaviour) and streamer_test.go (integration via testcontainers Redis). Changes adapters/wredis/store_test.go to use package wredis_test import form. Adds an integration_tests job to .github/workflows/tests.yml and a minor newline formatting fix.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Focus areas:
    • Streamer send/receive flows: XADD, XREADGROUP, XACK and error handling.
    • Consumer-group creation semantics (from-beginning vs from-latest) and existing-group handling.
    • parseStreamID correctness, sequence/overflow and uniqueness guarantees.
    • Receiver polling, pending-message prioritisation and concurrency/synchronisation.
    • Tests using testcontainers: setup, DB flush/isolation and assertions.

Possibly related PRs

Poem

🐇 I hopped along the Redis stream tonight,
I XADD a whisper, JSON snug and light,
XREADGROUP hummed and XACK replied,
Testcontainers watched as events aligned,
I twitched my nose — the stream ran right.

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 9.09% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarises the main changes: adding a Redis event streamer implementation and supporting container-based tests to the wredis adapter package.
Description check ✅ Passed The description is directly related to the changeset, stating the objective to implement the EventStreamer interface and support test containers, which aligns with the actual changes made.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch andreww-wredis-streamer-integration-test

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
.github/workflows/tests.yml (1)

54-81: Well-structured integration test job.

The new integration_tests job correctly isolates containerised tests from the main test suite. The Docker availability check and explicit timeouts are good practices.

Consider whether these adapter tests could run in parallel to reduce CI time, as there are no dependencies between Kafka and Redis tests.

adapters/wredis/streamer.go (2)

22-31: Unused mutex field.

The mu sync.RWMutex field in Streamer is declared but never used. If it's intended for future thread-safety requirements, consider removing it until needed to avoid confusion.

 type Streamer struct {
 	client redis.UniversalClient
-	mu     sync.RWMutex
 }

107-136: String-based error checking is fragile.

Comparing error messages via err.Error() is brittle as Redis error strings may change between versions. Consider using redis.HasErrorPrefix or checking for specific error types.

-		if err != nil && err.Error() != "ERR no such key" {
+		if err != nil && !isNoSuchKeyError(err) {
 			return nil, nil, err
 		}

Similarly for line 133:

-		if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
+		if err != nil && !isBusyGroupError(err) {
 			return nil, nil, err
 		}

You could add helper functions or use strings.Contains with the error prefix for slightly more robustness.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0f5d767 and a3ea551.

📒 Files selected for processing (4)
  • .github/workflows/tests.yml (1 hunks)
  • adapters/wredis/store_test.go (2 hunks)
  • adapters/wredis/streamer.go (1 hunks)
  • adapters/wredis/streamer_test.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
adapters/wredis/streamer_test.go (4)
run.go (1)
  • Run (11-17)
eventstreamer.go (1)
  • EventStreamer (10-13)
adapters/wredis/streamer.go (1)
  • NewStreamer (27-31)
adapters/adaptertest/eventstreaming.go (1)
  • RunEventStreamerTest (44-187)
adapters/wredis/store_test.go (1)
adapters/wredis/store.go (1)
  • New (25-29)
adapters/wredis/streamer.go (2)
eventstreamer.go (8)
  • EventStreamer (10-13)
  • EventSender (17-20)
  • ReceiverOption (51-51)
  • EventReceiver (24-27)
  • ReceiverOptions (46-49)
  • Header (34-34)
  • Ack (32-32)
  • StreamFromLatest (62-66)
event.go (1)
  • Event (13-28)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Integration Tests (Testcontainers)
🔇 Additional comments (2)
adapters/wredis/store_test.go (1)

1-14: Good refactor to external test package.

Switching to wredis_test ensures tests exercise the public API, improving test quality and catching export issues early.

adapters/wredis/streamer_test.go (1)

16-42: Well-structured integration test.

The test follows the same pattern as store_test.go, using Testcontainers for an ephemeral Redis instance and the standard adapter test suite. The factory pattern with FlushDB ensures test isolation.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
adapters/wredis/streamer.go (1)

139-178: This does not read “pending messages first”, and malformed-event handling causes silent loss

  • Line 140 + Lines 144-147: ">" reads new messages, not pending ones (comment is misleading).
  • Lines 156-160: acking on parse failure drops the message on the floor; Line 158 also ignores XAck errors (observability + potential re-delivery weirdness). This matches prior review feedback.

Either:

  1. actually read pending first (e.g., Streams: {streamKey, "0"} / XAUTOCLAIM), or
  2. change the comment and stop acking malformed events (or dead-letter them), and never ignore XAck errors.
- // Try to read pending messages first
+ // Read new messages for this consumer group

   pendingMsgs, err := r.client.XReadGroup(ctx, &redis.XReadGroupArgs{
     Group:    consumerGroup,
     Consumer: r.name,
     Streams:  []string{streamKey, ">"},
     Count:    1,
     Block:    100 * time.Millisecond,
   }).Result()

   ...

   event, err := r.parseEvent(msg)
   if err != nil {
-    // Acknowledge bad message and continue
-    r.client.XAck(ctx, streamKey, consumerGroup, msg.ID)
-    continue
+    // Don't ack malformed messages silently; bubble up (or dead-letter).
+    return nil, nil, fmt.Errorf("parse event from redis message %s: %w", msg.ID, err)
   }
🧹 Nitpick comments (1)
adapters/wredis/parsestream_test.go (1)

151-180: Tighten the “negative sequence” edge-case test (it doesn’t test negatives)

Lines 153-168 describe negative-sequence handling, but the test only re-checks valid IDs. Either:

  • drop the negative-sequence commentary (and consider removing the negative-seq branch in parseStreamID), or
  • change parseStreamID parsing to make -1 reachable (e.g., strings.Cut/SplitN) and then assert the expected error.
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a3ea551 and abc5f1f.

📒 Files selected for processing (3)
  • adapters/wredis/parsestream_test.go (1 hunks)
  • adapters/wredis/streamer.go (1 hunks)
  • adapters/wredis/streamer_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • adapters/wredis/streamer_test.go
🧰 Additional context used
🧬 Code graph analysis (1)
adapters/wredis/streamer.go (2)
eventstreamer.go (7)
  • EventStreamer (10-13)
  • EventSender (17-20)
  • ReceiverOption (51-51)
  • EventReceiver (24-27)
  • ReceiverOptions (46-49)
  • Header (34-34)
  • StreamFromLatest (62-66)
event.go (1)
  • Event (13-28)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Integration Tests (Testcontainers)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
adapters/wredis/streamer.go (2)

19-19: Unused constant can be removed.

The cursorKeyPrefix constant is defined but never referenced in this file. Consider removing it or adding a comment explaining its reserved purpose if it's intended for future use.

-	cursorKeyPrefix = "workflow:cursor:"

24-27: Unused mutex field can be removed.

The Streamer struct declares a sync.RWMutex that is never locked or unlocked. Since Streamer currently has no mutable shared state requiring synchronisation, this field appears unnecessary.

 type Streamer struct {
 	client redis.UniversalClient
-	mu     sync.RWMutex
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between abc5f1f and 5f9e5bf.

📒 Files selected for processing (2)
  • adapters/wredis/parsestream_test.go (1 hunks)
  • adapters/wredis/streamer.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
adapters/wredis/streamer.go (2)
eventstreamer.go (8)
  • EventStreamer (10-13)
  • EventSender (17-20)
  • ReceiverOption (51-51)
  • EventReceiver (24-27)
  • ReceiverOptions (46-49)
  • Header (34-34)
  • Ack (32-32)
  • StreamFromLatest (62-66)
event.go (1)
  • Event (13-28)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Integration Tests (Testcontainers)
🔇 Additional comments (3)
adapters/wredis/parsestream_test.go (3)

12-127: Excellent test coverage for parseStreamID.

The table-driven tests comprehensively cover valid inputs, invalid formats, boundary conditions, and overflow protection. The test case names are descriptive, and error message validation ensures helpful feedback.


129-152: LGTM!

The collision prevention test effectively validates that distinct stream IDs map to unique event IDs. The use of a map to detect duplicates is appropriate.


186-244: Excellent integration test validates error handling semantics.

This test effectively verifies that malformed messages trigger an error return rather than silent acknowledgement, preventing data loss. The assertion that the message remains pending (line 243) confirms proper error recovery behaviour. The use of testcontainers provides realistic Redis interaction.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
adapters/wredis/parsestream_test.go (1)

129-152: Collision test is fine; consider require.NotContains for clearer intent
This works, but you can simplify the collision assertion by using a require.NotContains(t, seenIDs, id) prior to inserting, to avoid manual if + t.Errorf.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5f9e5bf and 2c6e701.

📒 Files selected for processing (1)
  • adapters/wredis/parsestream_test.go (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Integration Tests (Testcontainers)
🔇 Additional comments (3)
adapters/wredis/parsestream_test.go (3)

154-184: Comment now matches behaviour (bounds), nice
The “sequence bounds validation” subtest and its comment line up with the assertions (0 and 999999), which removes the previous mismatch.


12-126: Range variable shadowing is unnecessary on Go 1.25.3

The suggested tc := tc pattern addresses a loop variable capture issue that existed before Go 1.22. However, this repository requires Go 1.25.3 (from adapters/wredis/go.mod), where each range iteration automatically receives its own variable copy. The code as written is correct; no modification is needed.


186-205: No action required. The repository targets Go 1.25.3, which introduced testing.T.Context() in Go 1.24. The code will compile without issues and the suggested alternative is unnecessary.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
adapters/wredis/streamer.go (3)

209-228: parseEvent should defensively handle string and []byte stream values.
This is still assuming msg.Values["event"] is always string. go-redis can surface bulk strings as []byte depending on client/config.

diff --git a/adapters/wredis/streamer.go b/adapters/wredis/streamer.go
@@
 func (r *Receiver) parseEvent(msg redis.XMessage) (*workflow.Event, error) {
-	eventData, ok := msg.Values["event"].(string)
-	if !ok {
-		return nil, fmt.Errorf("invalid event data format")
-	}
+	raw, ok := msg.Values["event"]
+	if !ok {
+		return nil, fmt.Errorf("missing event field")
+	}
+	var eventData string
+	switch v := raw.(type) {
+	case string:
+		eventData = v
+	case []byte:
+		eventData = string(v)
+	default:
+		return nil, fmt.Errorf("invalid event data type: %T", raw)
+	}

245-300: parseStreamID overflow check is incorrect for timestamp == maxTimestamp (can overflow int64).
With timestamp == 9,223,372,036,854 and sequence near 999,999, timestamp*1_000_000 + sequence exceeds math.MaxInt64. The check currently permits that, causing wraparound.

Also: negative timestamps should be rejected, and strings.Cut avoids edge-cases with extra dashes.

diff --git a/adapters/wredis/streamer.go b/adapters/wredis/streamer.go
@@
 func parseStreamID(streamID string) (int64, error) {
@@
-	parts := strings.Split(streamID, "-")
-	if len(parts) != 2 {
+	timestampStr, sequenceStr, ok := strings.Cut(streamID, "-")
+	if !ok {
 		return 0, fmt.Errorf("invalid stream ID format: %s", streamID)
 	}
-
-	timestampStr := parts[0]
-	sequenceStr := parts[1]
@@
 	timestamp, err := strconv.ParseInt(timestampStr, 10, 64)
 	if err != nil {
 		return 0, fmt.Errorf("invalid timestamp in stream ID %s: %w", streamID, err)
 	}
+	if timestamp < 0 {
+		return 0, fmt.Errorf("timestamp cannot be negative in stream ID %s", streamID)
+	}
@@
 	sequence, err := strconv.ParseInt(sequenceStr, 10, 64)
 	if err != nil {
 		return 0, fmt.Errorf("invalid sequence in stream ID %s: %w", streamID, err)
 	}
@@
-	const maxTimestamp = 9223372036854
-	if timestamp > maxTimestamp {
+	const (
+		factor                    = int64(1_000_000)
+		maxTimestamp              = int64(9223372036854) // floor(MaxInt64 / factor)
+		maxSequenceAtMaxTimestamp = int64(775807)        // MaxInt64 - maxTimestamp*factor
+	)
+	if timestamp > maxTimestamp || (timestamp == maxTimestamp && sequence > maxSequenceAtMaxTimestamp) {
 		return 0, fmt.Errorf("timestamp too large in stream ID %s: would cause overflow", streamID)
 	}
-	if sequence >= 1000000 {
+	if sequence >= factor {
 		return 0, fmt.Errorf("sequence too large in stream ID %s: must be less than 1,000,000", streamID)
 	}
@@
-	combinedID := timestamp*1000000 + sequence
+	combinedID := timestamp*factor + sequence
 	return combinedID, nil
 }

104-138: Consumer naming will collide across instances; brittle error matching allows BUSYGROUP race in StreamFromLatest.

  • Two Receiver instances with the same r.name on the same topic will use identical Consumer identities in XReadGroup, causing them to share message consumption rather than load-balance properly.
  • Error matching using err.Error() != "exact string" is fragile across Redis/go-redis versions and breaks with server message variations.
  • In StreamFromLatest, concurrent XGroupCreateMkStream calls will cause a race: the second instance receives BUSYGROUP and currently returns an error rather than tolerating it.

Generate a unique consumer identifier per Receiver instance, keep the consumer group stable, and use errors.As() or prefix-based checking for error handling:

diff --git a/adapters/wredis/streamer.go b/adapters/wredis/streamer.go
@@
 import (
 	"context"
+	"crypto/rand"
 	"encoding/json"
 	"errors"
 	"fmt"
 	"log"
+	"math/big"
 	"strconv"
 	"strings"
 	"sync"
 	"time"
@@
 type Receiver struct {
 	client  redis.UniversalClient
 	topic   string
 	name    string
+	consumer string
 	options workflow.ReceiverOptions
 }
@@
 func (s *Streamer) NewReceiver(ctx context.Context, topic string, name string, opts ...workflow.ReceiverOption) (workflow.EventReceiver, error) {
@@
+	consumer := name
+	// Generate unique suffix per receiver instance to avoid consumer identity collisions across instances.
+	if n, err := rand.Int(rand.Reader, big.NewInt(1<<62)); err == nil {
+		consumer = fmt.Sprintf("%s-%d", name, n.Int64())
+	}
 	return &Receiver{
 		client:  s.client,
 		topic:   topic,
 		name:    name,
+		consumer: consumer,
 		options: options,
 	}, nil
 }
@@
 	if r.options.StreamFromLatest {
@@
-		if err != nil && err.Error() != "ERR no such key" {
+		// "ERR no such key" means stream doesn't exist yet; treat as no groups.
+		if err != nil && !strings.HasPrefix(err.Error(), "ERR no such key") {
 			return nil, nil, err
 		}
@@
 			_, err := r.client.XGroupCreateMkStream(ctx, streamKey, consumerGroup, "$").Result()
-			if err != nil {
+			// BUSYGROUP is benign here (another instance created the group concurrently).
+			var respErr *redis.ResponseError
+			if err != nil && !(errors.As(err, &respErr) && strings.Contains(err.Error(), "BUSYGROUP")) {
 				return nil, nil, err
 			}
 		}
 	} else {
 		// Create consumer group if it doesn't exist, starting from beginning
 		_, err := r.client.XGroupCreateMkStream(ctx, streamKey, consumerGroup, "0").Result()
-		if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
+		var respErr *redis.ResponseError
+		if err != nil && !(errors.As(err, &respErr) && strings.Contains(err.Error(), "BUSYGROUP")) {
 			return nil, nil, err
 		}
 	}
 
 	for ctx.Err() == nil {
-		pendingMsgs, err := r.client.XReadGroup(ctx, &redis.XReadGroupArgs{
 			Group:    consumerGroup,
-			Consumer: r.name,
+			Consumer: r.consumer,
🧹 Nitpick comments (2)
adapters/wredis/streamer.go (2)

24-56: Streamer.mu is currently unused; consider removing or using it to avoid misleading intent.
Not a blocker, but it reads as if Streamer is intended to guard shared state. If there’s no shared mutable state, dropping it simplifies the API surface.


65-89: Consider using time.Now().UTC() for workflow.Event.CreatedAt to avoid mixed timezones.
If the rest of the system expects UTC timestamps, storing local time here will create inconsistent behaviour across hosts.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2c6e701 and 74c014e.

📒 Files selected for processing (2)
  • adapters/wredis/parsestream_test.go (1 hunks)
  • adapters/wredis/streamer.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • adapters/wredis/parsestream_test.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: build
  • GitHub Check: Integration Tests (Testcontainers)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (4)
adapters/wredis/streamer.go (4)

103-133: Brittle err.Error() == ... comparisons; also add a comment for intentionally-ignored errors.
Exact string matches (Line 107-108, Line 130-131) are fragile across Redis / go-redis versions and make the “ignore” rationale unclear. This was previously flagged and still applies.

@@
-		groups, err := r.client.XInfoGroups(ctx, streamKey).Result()
-		if err != nil && err.Error() != "ERR no such key" {
+		groups, err := r.client.XInfoGroups(ctx, streamKey).Result()
+		// If the stream doesn't exist yet, Redis returns a "no such key" error; that's OK for StreamFromLatest.
+		if err != nil && !strings.Contains(err.Error(), "no such key") {
 			return nil, nil, err
 		}
@@
-	_, err := r.client.XGroupCreateMkStream(ctx, streamKey, consumerGroup, "0").Result()
-	if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
+	_, err := r.client.XGroupCreateMkStream(ctx, streamKey, consumerGroup, "0").Result()
+	// BUSYGROUP means the group already exists; that's OK for idempotent initialisation.
+	if err != nil && !strings.HasPrefix(err.Error(), "BUSYGROUP") {
 		return nil, nil, err
 	}
go-redis/v9: what error is returned for XInfoGroups when the stream key does not exist (ERR no such key) and is there a sentinel/type to check instead of matching strings?

135-203: Consumer identity is coupled to name; scaling + recovery semantics are unclear (pending is per-consumer).
Using Consumer: r.name (Line 139, 175) means multiple receiver instances with the same name will share a Redis “consumer” identity, which can lead to duplicate processing/races and makes pending ownership ambiguous. This was previously flagged and still applies. Also, reading pending with "0" only addresses this consumer’s pending; it won’t recover messages left pending by a dead consumer without XAUTOCLAIM/XCLAIM.

@@
 type Receiver struct {
 	client  redis.UniversalClient
 	topic   string
 	name    string
+	consumer string
 	options workflow.ReceiverOptions
 }
@@
 func (s *Streamer) NewReceiver(ctx context.Context, topic string, name string, opts ...workflow.ReceiverOption) (workflow.EventReceiver, error) {
@@
 	return &Receiver{
 		client:  s.client,
 		topic:   topic,
 		name:    name,
+		consumer: fmt.Sprintf("%s-%d", name, time.Now().UnixNano()),
 		options: options,
 	}, nil
 }
@@
-			Consumer: r.name,
+			Consumer: r.consumer,
@@
-			Consumer: r.name,
+			Consumer: r.consumer,
Redis Streams consumer groups: does XREADGROUP with ID "0" return pending entries for the group or only for the specified consumer name?

208-227: parseEvent should accept both string and []byte stream values.
go-redis stream values are map[string]any; depending on producer/client config you may see []byte. This was previously suggested and is still not implemented.

 func (r *Receiver) parseEvent(msg redis.XMessage) (*workflow.Event, error) {
-	eventData, ok := msg.Values["event"].(string)
-	if !ok {
-		return nil, fmt.Errorf("invalid event data format")
-	}
+	raw, ok := msg.Values["event"]
+	if !ok {
+		return nil, fmt.Errorf("missing event field")
+	}
+	var eventData string
+	switch v := raw.(type) {
+	case string:
+		eventData = v
+	case []byte:
+		eventData = string(v)
+	default:
+		return nil, fmt.Errorf("invalid event data type: %T", raw)
+	}
go-redis/v9 streams: under what conditions are XMessage.Values values returned as []byte vs string?

243-298: parseStreamID has a hard throughput cap and dead-path checks; consider bit-packing + strings.Cut.

  • strings.Split makes some validation branches effectively unreachable and is less direct than strings.Cut.
  • sequence >= 1_000_000 is an artificial cap; Redis stream sequences can exceed this under load, causing failures (and/or forcing you into a collision-prone mapping). This was previously raised and still applies.
 func parseStreamID(streamID string) (int64, error) {
@@
-	parts := strings.Split(streamID, "-")
-	if len(parts) != 2 {
+	timestampStr, sequenceStr, ok := strings.Cut(streamID, "-")
+	if !ok {
 		return 0, fmt.Errorf("invalid stream ID format: %s", streamID)
 	}
-
-	timestampStr := parts[0]
-	sequenceStr := parts[1]
@@
-	// Check for potential overflow before combining
-	// We need timestamp*1_000_000 + sequence to fit in int64
-	// Max int64 is 9,223,372,036,854,775,807
-	// So timestamp should not exceed 9,223,372,036,854
-	const maxTimestamp = 9223372036854
-	if timestamp > maxTimestamp {
-		return 0, fmt.Errorf("timestamp too large in stream ID %s: would cause overflow", streamID)
-	}
-	if sequence >= 1000000 {
-		return 0, fmt.Errorf("sequence too large in stream ID %s: must be less than 1,000,000", streamID)
-	}
-	if sequence < 0 {
-		return 0, fmt.Errorf("sequence cannot be negative in stream ID %s", streamID)
-	}
-
-	// Combine timestamp and sequence to create unique ID
-	combinedID := timestamp*1000000 + sequence
-	return combinedID, nil
+	if sequence < 0 {
+		return 0, fmt.Errorf("sequence cannot be negative in stream ID %s", streamID)
+	}
+
+	// Bit-pack: preserves ordering and avoids decimal throughput caps.
+	const seqBits = 21 // allows 0..2,097,151 sequences per millisecond
+	const maxTimestamp = int64(^uint64(0)>>1) >> seqBits
+	const maxSequence = int64(1<<seqBits) - 1
+	if timestamp > maxTimestamp {
+		return 0, fmt.Errorf("timestamp too large in stream ID %s: would cause overflow", streamID)
+	}
+	if sequence > maxSequence {
+		return 0, fmt.Errorf("sequence too large in stream ID %s: must be <= %d", streamID, maxSequence)
+	}
+
+	return (timestamp << seqBits) | sequence, nil
 }
Redis Streams ID format: what are the practical/maximum bounds of the sequence part for a single millisecond?
🧹 Nitpick comments (4)
adapters/wredis/streamer.go (2)

24-51: Consider validating topic/name early (empty strings create odd Redis keys).
Right now NewSender/NewReceiver accept empty values and defer failure to Redis operations.


60-84: Prefer UTC for CreatedAt to avoid mixed timezones across producers.
If consumers compare/serialise times, time.Now().UTC() tends to be safer.

adapters/wredis/parsestream_test.go (2)

16-131: Tests are tightly coupled to the current timestamp*1_000_000 + sequence mapping.
If you change parseStreamID to avoid the 1,000,000 cap (recommended), these hard-coded expectedID values will need updating; consider computing expected IDs via a local helper mirroring the intended mapping to reduce churn.


255-299: Good coverage for PollFrequency; consider extracting a Redis-container helper to cut repetition.
The container/client bootstrapping pattern is repeated across tests and could be factored to a small helper for readability.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bd63d5a and 761dfa5.

📒 Files selected for processing (2)
  • adapters/wredis/parsestream_test.go (1 hunks)
  • adapters/wredis/streamer.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
adapters/wredis/streamer.go (4)
eventstreamer.go (7)
  • EventStreamer (10-13)
  • EventSender (17-20)
  • ReceiverOption (51-51)
  • EventReceiver (24-27)
  • ReceiverOptions (46-49)
  • Header (34-34)
  • StreamFromLatest (62-66)
event.go (1)
  • Event (13-28)
marshal.go (1)
  • Marshal (8-10)
unmarshal.go (1)
  • Unmarshal (6-8)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Integration Tests (Testcontainers)
🔇 Additional comments (2)
adapters/wredis/parsestream_test.go (2)

190-253: Good: malformed payload returns an error and stays pending (no silent acknowledgement).
This asserts the intended “don’t ack on parse failure” behaviour end-to-end.


301-370: Good: ack works after Recv context cancellation.
This directly protects against “can’t ack because the receive context timed out”.

@sonarqubecloud
Copy link

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
adapters/wredis/streamer.go (4)

107-107: Consider using strings.Contains for more robust error matching.

The exact error string comparison is fragile if Redis or go-redis changes the format. Using strings.Contains(err.Error(), "no such key") would be more resilient.

-		if err != nil && err.Error() != "ERR no such key" {
+		if err != nil && !strings.Contains(err.Error(), "no such key") {

208-212: Consider handling both string and []byte types for robustness.

Whilst the Sender always writes strings, defensively handling []byte would improve robustness against edge cases or future changes.

 func (r *Receiver) parseEvent(msg redis.XMessage) (*workflow.Event, error) {
-	eventData, ok := msg.Values["event"].(string)
-	if !ok {
-		return nil, fmt.Errorf("invalid event data format")
+	raw, ok := msg.Values["event"]
+	if !ok {
+		return nil, fmt.Errorf("missing event field")
+	}
+	var eventData string
+	switch v := raw.(type) {
+	case string:
+		eventData = v
+	case []byte:
+		eventData = string(v)
+	default:
+		return nil, fmt.Errorf("invalid event data type: %T", raw)
 	}

291-293: Dead code: negative sequence check is unreachable.

With strings.Split(streamID, "-"), an input like "123--1" produces three parts and fails at line 253-254. An input like "123-(-1)" would fail at ParseInt. There's no valid path where sequence < 0 after successful parsing.

Consider removing this check or switching to strings.Cut as suggested in prior reviews:

-	if sequence < 0 {
-		return 0, fmt.Errorf("sequence cannot be negative in stream ID %s", streamID)
-	}

60-66: Consider using time.Now().UTC() for consistent timestamps.

Using time.Now() includes local timezone information. For event timestamps that may be compared across systems, UTC is typically preferred.

 	event := &workflow.Event{
 		ForeignID: foreignID,
 		Type:      statusType,
 		Headers:   headers,
-		CreatedAt: time.Now(),
+		CreatedAt: time.Now().UTC(),
 	}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 761dfa5 and 1e08766.

📒 Files selected for processing (2)
  • adapters/wredis/parsestream_test.go (1 hunks)
  • adapters/wredis/streamer.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
adapters/wredis/parsestream_test.go (2)
adapters/wredis/streamer.go (2)
  • NewStreamer (24-28)
  • Receiver (90-95)
eventstreamer.go (2)
  • WithReceiverPollFrequency (53-57)
  • StreamFromLatest (62-66)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: core (1)
  • GitHub Check: Integration Tests (Testcontainers)
🔇 Additional comments (1)
adapters/wredis/parsestream_test.go (1)

1-437: LGTM!

The test suite is comprehensive with good coverage of:

  • Stream ID parsing with table-driven tests
  • Collision prevention verification
  • Edge cases and overflow protection
  • Error handling for malformed messages
  • Polling frequency configuration
  • Context cancellation resilience
  • Concurrent receiver creation

Resource cleanup follows testcontainers-go v0.40.0 best practices with CleanupContainer called immediately after Run, and Redis clients are properly closed via t.Cleanup.

@andrewwormald andrewwormald merged commit 27e7b2e into main Dec 15, 2025
12 of 13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants