Skip to content

added kafka subscriber to observability package#2863

Open
sacrana0 wants to merge 1 commit into
thunder-id:mainfrom
Infosys:feature/2708
Open

added kafka subscriber to observability package#2863
sacrana0 wants to merge 1 commit into
thunder-id:mainfrom
Infosys:feature/2708

Conversation

@sacrana0
Copy link
Copy Markdown
Contributor

@sacrana0 sacrana0 commented May 20, 2026

Purpose

Added support for a subscriber to publish events to Kafka.

Approach

Related Issues

Related PRs

  • N/A

Checklist

  • Followed the contribution guidelines.
  • [ x Manual test round performed and verified.
  • Documentation provided. (Add links if there are any)
    • Ran Vale and fixed all errors and warnings
  • Tests provided. (Add links if there are any)
    • Unit Tests
    • Integration Tests
  • Breaking changes. (Fill if applicable)
    • Breaking changes section filled.
    • breaking change label added.

Security checks

  • Followed secure coding standards in WSO2 Secure Coding Guidelines
  • Confirmed that this PR doesn't commit any keys, passwords, tokens, usernames, or other secrets.

Summary by CodeRabbit

  • New Features
    • Added Kafka observability output and subscriber: can be enabled via config, validates brokers/topic, supports client ID/format/retries/timeouts, routes by event categories, and generates subscriber IDs.
  • Tests
    • Added unit tests covering adapter/subscriber behavior, lifecycle, validation, error handling, and factory registration.
  • Chores
    • Updated Go module dependency versions.

Review Change Stack

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 20, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds Kafka observability output: config schema (ObservabilityKafkaConfig), Sarama async kafkaAdapter with error-drain and graceful shutdown, InitializeKafkaAdapter wiring, a KafkaSubscriber to republish events to Kafka, and unit tests.

Changes

Kafka Observability Output Sink

Layer / File(s) Summary
Configuration Schema
backend/internal/system/config/config.go
New ObservabilityKafkaConfig struct defines Kafka sink settings and is attached to ObservabilityOutputConfig to enable YAML/JSON configuration.
Dependency updates (go.mod)
backend/go.mod
Updated direct and indirect Go module versions required by the new Kafka adapter and its tests.
Kafka Adapter implementation and tests
backend/internal/system/observability/adapter/kafka_adapter.go, backend/internal/system/observability/adapter/kafka_adapter_test.go
Sarama AsyncProducer-based kafkaAdapter with validation, producer config, error-drain goroutine, mutex-guarded writes after close, no-op Flush, Close with AsyncClose and timeout wait, GetName, and unit tests verifying config validation, publish semantics, close/idempotency, and name.
Adapter Initialization Wiring
backend/internal/system/observability/adapter/init.go
Exported InitializeKafkaAdapter wires configuration to newKafkaAdapter using sarama.NewAsyncProducer.
Kafka Subscriber Implementation and tests
backend/internal/system/observability/subscriber/kafka_subscriber.go, backend/internal/system/observability/subscriber/kafka_subscriber_test.go
KafkaSubscriber registers a "kafka" factory, gates enablement via config, validates brokers/topic at Initialize, sets up formatter and adapter, computes categories (defaults to CategoryAll), generates a UUIDv7 subscriber ID, forwards events via OnEvent to the adapter with "kafka" tag, manages Close/Flush, and includes tests using a stubAdapter to verify behavior and factory registration.

Sequence Diagrams

sequenceDiagram
  participant Adapter as kafkaAdapter
  participant Producer as sarama.AsyncProducer
  participant Errors as Producer.Errors()
  participant Drain as ErrorDrainGoroutine

  Adapter->>Producer: NewAsyncProducer(brokers, config)
  Adapter->>Drain: start error drain goroutine
  Drain->>Errors: range over Errors()

  Note over Adapter,Producer: runtime
  Adapter->>Producer: Input() <- ProducerMessage (topic, bytes)
  Errors->>Drain: send error (if any)
  Drain->>Adapter: log error with topic

  Note over Adapter,Producer: shutdown
  Adapter->>Producer: AsyncClose()
  Drain->>Drain: exit when Errors() closed
  Adapter->>Adapter: wait for done or timeout
Loading
sequenceDiagram
  participant Client as Client Code
  participant Subscriber as KafkaSubscriber
  participant Formatter as Formatter
  participant Adapter as kafkaAdapter
  participant Kafka as Kafka Broker

  Client->>Subscriber: Initialize()
  Subscriber->>Subscriber: validate brokers & topic
  Subscriber->>Adapter: InitializeKafkaAdapter(cfg)
  Subscriber->>Subscriber: generate UUIDv7 ID

  Client->>Subscriber: OnEvent(evt)
  Subscriber->>Formatter: format evt -> bytes
  Subscriber->>Adapter: Write(bytes)  (topic)
  Adapter->>Kafka: async publish

  Client->>Subscriber: Close()
  Subscriber->>Adapter: Flush()
  Subscriber->>Adapter: Close()
  Adapter->>Kafka: AsyncClose producer
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related issues

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 16.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'added kafka subscriber to observability package' clearly summarizes the main change—adding Kafka subscriber functionality to the observability system.
Description check ✅ Passed The description includes a clear purpose statement, references the related issue (#2708), provides a checklist with unit tests confirmed, and security checks completed, meeting the core template requirements.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown

Dependency Validation Results

Dependency name: golang.org/x/crypto
Version: v0.51.0 (was v0.46.0)
Allowed range: >=v0.31.0
Approved: ✅ Yes

Dependency name: golang.org/x/text
Version: v0.37.0 (was v0.32.0)
Allowed range: >=v0.31.0
Approved: ✅ Yes

⚠️ Please verify the scope of the dependencies usage is necessary

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@backend/internal/system/observability/adapter/kafka_adapter.go`:
- Around line 103-115: The Write method can race with Close and panic by sending
to the producer Input after the adapter is marked closed; to fix it, make the
closed check and the send atomic by keeping ka.mu locked across the check and
the send: in kafkaAdapter.Write, remove the early ka.mu.Unlock() and perform the
ka.producer.Input() <- ... while still holding ka.mu, then unlock; reference the
kafkaAdapter.Write method, ka.mu, ka.closed and ka.producer.Input() so reviewers
can locate and update the code (this guarantees the check and send are
serialized with Close which calls AsyncClose()).

In `@backend/internal/system/observability/subscriber/kafka_subscriber.go`:
- Around line 128-143: KafkaSubscriber.Close uses ks.logger without checking for
nil which can panic if Close is called before Initialize; update Close (method
KafkaSubscriber.Close) to guard all logger calls by checking ks.logger != nil
before calling ks.logger.Info or ks.logger.Error, and ensure the early adapter
flush/close logic still runs (keep the existing ks.adapter != nil checks) but
replace direct logger usage at the start and end of the method (and inside error
branches) with conditional logging so Close is safe when ks.logger is nil (do
not change adapter behavior or return values).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro Plus

Run ID: 801ec88a-a6e5-48f6-93d9-7192897056a1

📥 Commits

Reviewing files that changed from the base of the PR and between 0ee2b0d and 11c8c8b.

⛔ Files ignored due to path filters (1)
  • backend/go.sum is excluded by !**/*.sum
📒 Files selected for processing (7)
  • backend/go.mod
  • backend/internal/system/config/config.go
  • backend/internal/system/observability/adapter/init.go
  • backend/internal/system/observability/adapter/kafka_adapter.go
  • backend/internal/system/observability/adapter/kafka_adapter_test.go
  • backend/internal/system/observability/subscriber/kafka_subscriber.go
  • backend/internal/system/observability/subscriber/kafka_subscriber_test.go

Comment thread backend/internal/system/observability/adapter/kafka_adapter.go
@github-actions
Copy link
Copy Markdown

Dependency Validation Results

Dependency name: golang.org/x/crypto
Version: v0.51.0 (was v0.46.0)
Allowed range: >=v0.31.0
Approved: ✅ Yes

Dependency name: golang.org/x/text
Version: v0.37.0 (was v0.32.0)
Allowed range: >=v0.31.0
Approved: ✅ Yes

⚠️ Please verify the scope of the dependencies usage is necessary

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@backend/internal/system/config/config.go`:
- Around line 325-326: Add documentation for the new Kafka observability config
and the new agent.store option: document observability.output.kafka and all its
keys (enabled, brokers, topic, client_id, format, retries, timeout, categories)
with example YAML and explain initialization requirements (must have non-empty
brokers and topic), failure/close semantics, and category filtering behavior in
the observability config guide (e.g.,
docs/content/guides/getting-started/configuration.mdx or observability guide);
also document the new top-level agent.store accepted values and fallback
semantics alongside other *.store config docs so users know precedence and
defaults. Reference the new config struct ObservabilityKafkaConfig and the
agent.store option in the docs, and ensure a single consolidated docs update
covers the listed changed sections.

In `@backend/internal/system/observability/adapter/kafka_adapter.go`:
- Around line 103-115: In kafkaAdapter.Write, avoid blocking forever on
ka.producer.Input() by replacing the direct send with a timeout-based select:
check ka.closed under ka.mu as you already do, then use a select that attempts
to send the *sarama.ProducerMessage to ka.producer.Input(), a case for
<-time.After(timeout) (use cfg.Timeout if appropriate, otherwise introduce a
clear constant like producerEnqueueTimeout), and a case to detect if the adapter
was closed (e.g., a non-blocking check of ka.closed under ka.mu or a separate
closed channel). On timeout return a descriptive error (e.g., "kafka producer
input enqueue timed out"); keep the mutex usage minimal (only for checking
closed) and do not hold it while blocking on the select.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro Plus

Run ID: 88c10d7f-8b8f-40b3-b157-87eea22bd528

📥 Commits

Reviewing files that changed from the base of the PR and between 11c8c8b and 69d6357.

⛔ Files ignored due to path filters (1)
  • backend/go.sum is excluded by !**/*.sum
📒 Files selected for processing (7)
  • backend/go.mod
  • backend/internal/system/config/config.go
  • backend/internal/system/observability/adapter/init.go
  • backend/internal/system/observability/adapter/kafka_adapter.go
  • backend/internal/system/observability/adapter/kafka_adapter_test.go
  • backend/internal/system/observability/subscriber/kafka_subscriber.go
  • backend/internal/system/observability/subscriber/kafka_subscriber_test.go

Comment thread backend/internal/system/config/config.go
Comment on lines +103 to +115
func (ka *kafkaAdapter) Write(data []byte) error {
ka.mu.Lock()
if ka.closed {
ka.mu.Unlock()
return fmt.Errorf("kafka adapter is closed")
}
ka.mu.Unlock()

ka.producer.Input() <- &sarama.ProducerMessage{
Topic: ka.topic,
Value: sarama.ByteEncoder(data),
}
return nil
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

For github.com/IBM/sarama v1.49.0 AsyncProducer, does sending to producer.Input() block when the channel is full, and what is the recommended way to avoid indefinite blocking in producer write paths?

💡 Result:

Yes, sending a message to the AsyncProducer.Input channel in Sarama will block if the channel is full [1][2]. The Input channel is essentially an unbuffered channel [2], and the producer's internal processing must be able to keep up with your ingestion rate to prevent blocking [3][2]. To avoid indefinite blocking in your producer write paths, you should implement one of the following strategies: 1. Non-blocking Writes with Select: Use a select statement with a default case when sending to the Input channel. This allows you to handle cases where the producer is overloaded by dropping the message or logging a warning instead of blocking your application [2]. select { case producer.Input <- msg: // Message successfully queued default: // Channel is full; handle as needed (e.g., drop, buffer, or retry) } 2. Timeout-based Writes: Instead of an immediate default, use a time.After or context.WithTimeout within a select block. This allows the application to wait for a short, defined period for space to become available before giving up, providing a compromise between immediate dropping and indefinite blocking [2]. 3. Monitor and Drain Channels: Deadlocks can occur if you do not actively drain the Errors and Successes channels [1]. Even if you do not need to process successes, the Successes channel must be consumed if Return.Successes is enabled in your configuration [1]. Similarly, the Errors channel must be drained to prevent the producer from deadlocking when the internal error buffer fills up [1]. It is also important to note that performance bottlenecks often stem from misconfiguration or improper handling of these return channels [1][3]. Always ensure that you are calling Close or AsyncClose on the producer to prevent memory leaks and ensure that all in-flight messages are flushed correctly [1].

Citations:


🏁 Script executed:

cat -n backend/internal/system/observability/adapter/kafka_adapter.go

Repository: thunder-id/thunderid

Length of output: 5020


🏁 Script executed:

grep -A2 "github.com/IBM/sarama" go.mod

Repository: thunder-id/thunderid

Length of output: 104


🏁 Script executed:

find . -name "go.mod" -type f

Repository: thunder-id/thunderid

Length of output: 138


🏁 Script executed:

grep "github.com/IBM/sarama" backend/go.mod

Repository: thunder-id/thunderid

Length of output: 107


Add timeout to producer input write to prevent indefinite blocking on channel saturation.

The ka.producer.Input() <- ... send blocks indefinitely when the async producer input channel is saturated (e.g., due to broker/network degradation). Per Sarama v1.49.0 documentation, the Input channel blocks when full; you must implement a timeout-based select pattern to avoid stalling caller request paths.

While error draining is already correctly implemented (line 84), the Write path needs bounded enqueue behavior. Use cfg.Timeout (which exists but is currently only applied to network timeouts) or a new constant for the Input channel write timeout.

💡 Proposed change (timeout-based enqueue)
 type kafkaAdapter struct {
 	producer sarama.AsyncProducer
 	topic    string
 	logger   *log.Logger
 	done     chan struct{}
+	writeTimeout time.Duration
 	mu       sync.Mutex
 	closed   bool
 }
@@
 	ka := &kafkaAdapter{
 		producer: producer,
 		topic:    cfg.Topic,
 		logger:   logger,
 		done:     make(chan struct{}),
+		writeTimeout: func() time.Duration {
+			if cfg.Timeout > 0 {
+				return cfg.Timeout
+			}
+			return 5 * time.Second
+		}(),
 	}
@@
-	ka.producer.Input() <- &sarama.ProducerMessage{
-		Topic: ka.topic,
-		Value: sarama.ByteEncoder(data),
-	}
-	return nil
+	msg := &sarama.ProducerMessage{
+		Topic: ka.topic,
+		Value: sarama.ByteEncoder(data),
+	}
+
+	select {
+	case ka.producer.Input() <- msg:
+		return nil
+	case <-time.After(ka.writeTimeout):
+		return fmt.Errorf("timed out enqueueing kafka message")
+	}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/internal/system/observability/adapter/kafka_adapter.go` around lines
103 - 115, In kafkaAdapter.Write, avoid blocking forever on ka.producer.Input()
by replacing the direct send with a timeout-based select: check ka.closed under
ka.mu as you already do, then use a select that attempts to send the
*sarama.ProducerMessage to ka.producer.Input(), a case for <-time.After(timeout)
(use cfg.Timeout if appropriate, otherwise introduce a clear constant like
producerEnqueueTimeout), and a case to detect if the adapter was closed (e.g., a
non-blocking check of ka.closed under ka.mu or a separate closed channel). On
timeout return a descriptive error (e.g., "kafka producer input enqueue timed
out"); keep the mutex usage minimal (only for checking closed) and do not hold
it while blocking on the select.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to drop published audits/logs/events so its ok for it to block. Will revisit if we decide to change it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sacrana0, understood — backpressure semantics make sense here since losing audit/observability events would be worse than momentary caller blocking. I'll keep this in mind for future reviews of this adapter.

(˳˘ ɜ˘) 🐇


✏️ Learnings added
Learnt from: sacrana0
Repo: thunder-id/thunderid PR: 2863
File: backend/internal/system/observability/adapter/kafka_adapter.go:103-115
Timestamp: 2026-05-22T09:13:41.634Z
Learning: In `backend/internal/system/observability/adapter/kafka_adapter.go`, the `Write` method intentionally blocks on `ka.producer.Input() <- msg` without a timeout. This is by design: the team prefers backpressure over dropping audit/log/observability events. Do not flag this as a bug or suggest a timeout-based select pattern for this file.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: KaveeshaPiumini
Repo: thunder-id/thunderid PR: 2835
File: backend/internal/agent/declarative_resource.go:21-39
Timestamp: 2026-05-20T05:39:33.413Z
Learning: Do not treat importing a package named "testing" in production Go files as an anti-pattern in this codebase. For .go files that are not *_test.go, this convention is used to expose *ForTest constructors (e.g., NewAgentExporterForTest, NewApplicationExporterForTest) that are guarded by a call like testing.Testing() and panic when not running under tests. When reviewing, only flag importing "testing" in non-test files if it is not part of this established *ForTest/guard+panic convention.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai
Confirm this. Is it safe to do the ka.close inside a mutex and the relase the lock? This is between L102:113. Is there a possibility to panic happen within here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sahandilshan — great catch. There is a real panic risk here, but it does not come from the mutex usage itself — the lock/unlock mechanics are correct. The danger is a TOCTOU (time-of-check-time-of-use) race between checking ka.closed and the actual channel send on line 111, which happens outside the lock.

Here is the exact race:

Time Write goroutine Close goroutine
1 acquires lock
2 ka.closed → false
3 releases lock
4 acquires lock
5 sets ka.closed = true, releases lock
6 calls ka.producer.AsyncClose() → Sarama closes the Input() channel
7 sends on now-closed Input() channel → panic: send on closed channel

The mutex correctly guards the closed flag itself, but the flag and the channel send are not atomic together. Sarama's AsyncClose closes the internal input channel during shutdown, so any send after that point will panic.

Recommended fix — introduce a closedCh chan struct{} and use a select in Write. This preserves the blocking/backpressure semantics (still blocks on channel saturation) but safely unblocks when Close is called:

type kafkaAdapter struct {
    producer sarama.AsyncProducer
    topic    string
    logger   *log.Logger
    done     chan struct{}
+   closedCh chan struct{}   // closed exactly once by Close()
    mu       sync.Mutex
    closed   bool
}
// In constructor:
ka := &kafkaAdapter{
    producer: producer,
    topic:    cfg.Topic,
    logger:   logger,
    done:     make(chan struct{}),
+   closedCh: make(chan struct{}),
}
func (ka *kafkaAdapter) Write(data []byte) error {
    ka.mu.Lock()
    if ka.closed {
        ka.mu.Unlock()
        return fmt.Errorf("kafka adapter is closed")
    }
    ka.mu.Unlock()

    msg := &sarama.ProducerMessage{
        Topic: ka.topic,
        Value: sarama.ByteEncoder(data),
    }
    select {
    case ka.producer.Input() <- msg:
        return nil
    case <-ka.closedCh:
        return fmt.Errorf("kafka adapter is closed")
    }
}
func (ka *kafkaAdapter) Close() error {
    ka.mu.Lock()
    if ka.closed {
        ka.mu.Unlock()
        return nil
    }
    ka.closed = true
    close(ka.closedCh)   // unblocks any Write stuck in the select
    ka.mu.Unlock()

    ka.producer.AsyncClose()
    // ... rest unchanged
}

This pattern:

  • Still blocks on producer.Input() when the broker is slow (preserving the agreed backpressure semantics)
  • Never panics — closedCh is closed exactly once (guarded by the closed flag + mutex), so close(ka.closedCh) is safe
  • Any Write already mid-select when Close fires will exit cleanly through the closedCh case instead of racing with AsyncClose

🧠 Learnings used
Learnt from: KaveeshaPiumini
Repo: thunder-id/thunderid PR: 2835
File: backend/internal/agent/declarative_resource.go:21-39
Timestamp: 2026-05-20T05:39:33.413Z
Learning: Do not treat importing a package named "testing" in production Go files as an anti-pattern in this codebase. For .go files that are not *_test.go, this convention is used to expose *ForTest constructors (e.g., NewAgentExporterForTest, NewApplicationExporterForTest) that are guarded by a call like testing.Testing() and panic when not running under tests. When reviewing, only flag importing "testing" in non-test files if it is not part of this established *ForTest/guard+panic convention.

@github-actions
Copy link
Copy Markdown

Dependency Validation Results

Dependency name: golang.org/x/crypto
Version: v0.51.0 (was v0.46.0)
Allowed range: >=v0.31.0
Approved: ✅ Yes

Dependency name: golang.org/x/text
Version: v0.37.0 (was v0.32.0)
Allowed range: >=v0.31.0
Approved: ✅ Yes

⚠️ Please verify the scope of the dependencies usage is necessary

@github-actions
Copy link
Copy Markdown

Dependency Validation Results

Dependency name: golang.org/x/crypto
Version: v0.51.0 (was v0.46.0)
Allowed range: >=v0.31.0
Approved: ✅ Yes

Dependency name: golang.org/x/text
Version: v0.37.0 (was v0.32.0)
Allowed range: >=v0.31.0
Approved: ✅ Yes

⚠️ Please verify the scope of the dependencies usage is necessary

@github-actions
Copy link
Copy Markdown

Dependency Validation Results

Dependency name: golang.org/x/crypto
Version: v0.51.0 (was v0.46.0)
Allowed range: >=v0.31.0
Approved: ✅ Yes

Dependency name: golang.org/x/text
Version: v0.37.0 (was v0.32.0)
Allowed range: >=v0.31.0
Approved: ✅ Yes

⚠️ Please verify the scope of the dependencies usage is necessary

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
backend/internal/system/config/config.go (1)

325-355: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Add required docs for the new Kafka observability sink before merge.

🔴 Documentation Required
This PR introduces user-facing changes that are not covered by documentation updates under docs/.
Please update the relevant documentation before merging.

Missing documentation:

  • observability.output.kafka configuration: document all new keys (enabled, brokers, topic, client_id, format, retries, timeout, categories), expected value formats (including duration format), defaults, and a complete YAML example in docs/content/guides/getting-started/configuration.mdx (or the observability configuration guide).
  • Kafka observability sink behavior: document runtime behavior in observability docs (docs/content/guides/), including initialization requirements (non-empty brokers/topic), category filtering/default behavior, retry/backpressure behavior, and shutdown/flush semantics.

As per coding guidelines, “If ANY user-facing config/behavior changes are detected without updates under docs/, post a single consolidated documentation-required comment.”

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/internal/system/config/config.go` around lines 325 - 355, The PR adds
a new ObservabilityKafkaConfig struct and exposes observability.output.kafka
settings but lacks docs; add documentation under docs/content/guides (or the
observability configuration guide) describing observability.output.kafka and
each field (enabled, brokers, topic, client_id, format, retries, timeout —
specify duration format — and categories), include defaults and a complete YAML
example (showing nested ObservabilityKafkaConfig keys), and document runtime
behavior for the Kafka observability sink (initialization requirements like
non-empty brokers/topic, category filtering/defaults, retry/backpressure
behavior, and shutdown/flush semantics) so the new user-facing config and
behavior are covered before merge.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Duplicate comments:
In `@backend/internal/system/config/config.go`:
- Around line 325-355: The PR adds a new ObservabilityKafkaConfig struct and
exposes observability.output.kafka settings but lacks docs; add documentation
under docs/content/guides (or the observability configuration guide) describing
observability.output.kafka and each field (enabled, brokers, topic, client_id,
format, retries, timeout — specify duration format — and categories), include
defaults and a complete YAML example (showing nested ObservabilityKafkaConfig
keys), and document runtime behavior for the Kafka observability sink
(initialization requirements like non-empty brokers/topic, category
filtering/defaults, retry/backpressure behavior, and shutdown/flush semantics)
so the new user-facing config and behavior are covered before merge.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro Plus

Run ID: 7341e414-71f2-495c-843d-1098209a2409

📥 Commits

Reviewing files that changed from the base of the PR and between 1f1c8a8 and 25a54b1.

⛔ Files ignored due to path filters (1)
  • backend/go.sum is excluded by !**/*.sum
📒 Files selected for processing (7)
  • backend/go.mod
  • backend/internal/system/config/config.go
  • backend/internal/system/observability/adapter/init.go
  • backend/internal/system/observability/adapter/kafka_adapter.go
  • backend/internal/system/observability/adapter/kafka_adapter_test.go
  • backend/internal/system/observability/subscriber/kafka_subscriber.go
  • backend/internal/system/observability/subscriber/kafka_subscriber_test.go

Comment thread backend/internal/system/observability/adapter/kafka_adapter.go Outdated
Comment on lines +57 to +61
saramaCfg := sarama.NewConfig()
saramaCfg.Producer.RequiredAcks = sarama.WaitForLocal
saramaCfg.Producer.Retry.Max = cfg.Retries
saramaCfg.Producer.Return.Errors = true
saramaCfg.Producer.Return.Successes = false
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of using this sarama package. Why can't we use standard kafka dependency? What does this additionally gives us?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please mention which standard kafka dependency you are considering? Refer discussion #2710

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The discussion there talked about the choice between the sarama and the franz-go. We have to properly finalise if it's okay to use either of these 3rd party dependencies with thunderid. Will check on this and get back to you.

Meanwhile can you check on this. #2863 (comment). Here I can see sarama as imported as inderect. But it's a direct import, right?

@github-actions
Copy link
Copy Markdown

Dependency Validation Results

Dependency name: golang.org/x/crypto
Version: v0.51.0 (was v0.46.0)
Allowed range: >=v0.31.0
Approved: ✅ Yes

Dependency name: golang.org/x/text
Version: v0.37.0 (was v0.32.0)
Allowed range: >=v0.31.0
Approved: ✅ Yes

⚠️ Please verify the scope of the dependencies usage is necessary

@sahandilshan
Copy link
Copy Markdown
Contributor

sarama is listed as // indirect but is a direct dependency

In go.mod, github.com/IBM/sarama v1.49.0 is marked // indirect, but it is directly imported in adapter/init.go, kafka_adapter.go, and kafka_adapter_test.go.

This means go mod tidy was not run after adding the import — running it would move sarama to the direct require block automatically. As it stands, make lint or any go mod tidy check in CI will fail.

Fix: Move github.com/IBM/sarama v1.49.0 to the direct require block (remove the // indirect comment), then run go mod tidy to confirm everything is consistent.

@codecov
Copy link
Copy Markdown

codecov Bot commented May 25, 2026

Codecov Report

❌ Patch coverage is 75.00000% with 30 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
...ystem/observability/subscriber/kafka_subscriber.go 58.18% 23 Missing ⚠️
...rnal/system/observability/adapter/kafka_adapter.go 92.06% 4 Missing and 1 partial ⚠️
...kend/internal/system/observability/adapter/init.go 0.00% 2 Missing ⚠️

📢 Thoughts on this report? Let us know!

@sacrana0
Copy link
Copy Markdown
Contributor Author

Race condition in Write — can panic on concurrent shutdown

In kafka_adapter.go, the Write method checks ka.closed under the mutex, then releases the lock, then sends on ka.producer.Input(). Between the unlock and the channel send, Close() can run concurrently — it calls AsyncClose() which closes the Input channel. The subsequent send on the closed channel will panic.

// Current code — race window between Unlock and channel send
func (ka *kafkaAdapter) Write(data []byte) error {
    ka.mu.Lock()
    if ka.closed {
        ka.mu.Unlock()
        return fmt.Errorf("kafka adapter is closed")
    }
    ka.mu.Unlock()  // <-- Close() can run here

    ka.producer.Input() <- &sarama.ProducerMessage{  // <-- panic: send on closed channel
        Topic: ka.topic,
        Value: sarama.ByteEncoder(data),
    }
    return nil
}

Fix: Hold the mutex across the entire Write operation, including the channel send. This matches the pattern used by the Console and File adapters.

func (ka *kafkaAdapter) Write(data []byte) error {
    ka.mu.Lock()
    defer ka.mu.Unlock()

    if ka.closed {
        return fmt.Errorf("kafka adapter is closed")
    }

    ka.producer.Input() <- &sarama.ProducerMessage{
        Topic: ka.topic,
        Value: sarama.ByteEncoder(data),
    }
    return nil
}

Also add a concurrency test (the existing Console and File subscribers have OnEventConcurrent tests — Kafka should too). The race is detectable with go test -race.

Similar comment added by coderabbit as well - #2863 (comment)

The close is called on server shutdown and the publisher takes care of calling close after all writes are completed so this situation never occurs.

Signed-off-by: Sachin Rana <sacrana324@gmail.com>
@github-actions
Copy link
Copy Markdown

Dependency Validation Results

Dependency name: github.com/IBM/sarama
Version: v1.49.0
Approved: ❌ No - Module not found in dependency registry

Dependency name: go.opentelemetry.io/otel/metric
Version: v1.43.0
Approved: ❌ No - Module not found in dependency registry

Dependency name: golang.org/x/net
Version: v0.54.0
Approved: ❌ No - Module not found in dependency registry

Dependency name: golang.org/x/crypto
Version: v0.51.0 (was v0.46.0)
Allowed range: >=v0.31.0
Approved: ✅ Yes

Dependency name: golang.org/x/text
Version: v0.37.0 (was v0.32.0)
Allowed range: >=v0.31.0
Approved: ✅ Yes


Next Steps

  1. Review the validation failures listed above
  2. Check if dependencies are in the approved dependency list
  3. Options to resolve:
    • Remove the unapproved dependencies from this PR
    • OR submit a PR to add these dependencies to the approved list in engineering-governance
  4. Once resolved, push changes to re-run validation

This PR is blocked until all dependencies are approved.

⚠️ Please verify the scope of the dependencies usage is necessary

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants