Skip to content

fix(logs): sarama upgrade to 1.48.0 and panic recovery#1443

Merged
kongfei605 merged 7 commits into
flashcatcloud:mainfrom
kongfei605:sarama_fix
May 7, 2026
Merged

fix(logs): sarama upgrade to 1.48.0 and panic recovery#1443
kongfei605 merged 7 commits into
flashcatcloud:mainfrom
kongfei605:sarama_fix

Conversation

@kongfei605
Copy link
Copy Markdown
Collaborator

  1. 升级sarama到1.48.0 解决当前sarama版本bug的问题,sarama1.48.0要求go必须升级到1.25
  2. 给日志采集pipeline相关的goroutine加上recover

Copilot AI review requested due to automatic review settings May 7, 2026 01:44
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR upgrades the Kafka client dependency (github.com/IBM/sarama) to v1.48.0 (and bumps the module’s Go version accordingly), and introduces panic recovery/restart wrappers for key goroutines in the logs pipeline. It also changes Kafka destination construction to return errors instead of panicking, so pipeline initialization can handle failures more gracefully.

Changes:

  • Upgrade sarama to v1.48.0 and update related Go/module dependencies (including go 1.25.0).
  • Add logs/util helpers (SafeGo, SafeGoWithRestart) to recover from panics and optionally restart goroutines with backoff, plus a Prometheus panic counter.
  • Refactor Kafka destination creation to return (*Destination, error) and propagate/handle errors during pipeline startup.

Reviewed changes

Copilot reviewed 10 out of 11 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
logs/util/recover.go Adds SafeGo/SafeGoWithRestart wrappers with panic recovery and a Prometheus counter.
logs/sender/sender.go Switches sender start to SafeGoWithRestart.
logs/processor/processor.go Switches processor start to SafeGoWithRestart.
logs/pipeline/provider.go Handles NewPipeline errors during pipeline initialization instead of assuming success.
logs/pipeline/pipeline.go Updates NewPipeline to return error and propagates Kafka destination creation failures.
logs/input/file/tailer.go Wraps tailer goroutines with SafeGo panic recovery.
logs/input/file/scanner.go Switches scanner start to SafeGoWithRestart.
logs/client/kafka/producer.go Runs async producer workers under SafeGoWithRestart.
logs/client/kafka/destination.go Makes destination construction return errors; wraps async send goroutines with SafeGo.
go.mod Bumps Go version and updates sarama / x/* dependencies.
go.sum Updates sums for upgraded dependencies.
Comments suppressed due to low confidence (1)

logs/processor/processor.go:56

  • SafeGoWithRestart will not successfully recover/restart if p.run panics because p.run's deferred p.done <- struct{}{} blocks until Stop is waiting. That can turn a panic into a permanent hang and also makes restarts unsafe (multiple run exits can block on done). Consider removing restart here or redesigning the done/stop signaling to be restart-safe (e.g., close-once semantics, buffered channel with sync.Once, or per-run done channels).
// Start starts the Processor.
func (p *Processor) Start() {
	util.SafeGoWithRestart("logs/processor", p.run, 5*time.Second)
}

// Stop stops the Processor,
// this call blocks until inputChan is flushed
func (p *Processor) Stop() {
	close(p.inputChan)
	<-p.done
}

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread logs/sender/sender.go
Comment thread logs/pipeline/provider.go
Comment thread logs/client/kafka/destination.go
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 12 changed files in this pull request and generated 5 comments.

Comment thread logs/client/kafka/producer.go Outdated
Comment thread logs/input/file/scanner.go
Comment thread logs/client/kafka/destination.go Outdated
Comment thread logs/pipeline/provider.go
Comment thread .github/workflows/release.yaml
Copilot AI review requested due to automatic review settings May 7, 2026 03:36
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 13 changed files in this pull request and generated 2 comments.

Comment thread logs/util/recover.go Outdated
Comment thread logs/util/recover.go Outdated
Copilot AI review requested due to automatic review settings May 7, 2026 06:27
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 13 changed files in this pull request and generated 3 comments.

Comment thread logs/sender/sender.go
Comment thread logs/client/kafka/destination.go
Comment thread logs/pipeline/provider.go
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 13 changed files in this pull request and generated 1 comment.

Comment thread logs/sender/sender.go
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 13 changed files in this pull request and generated 3 comments.

Comment thread logs/util/recover.go
// automatic restart after a backoff delay.
// If stopChan is not nil, it will abort restart if stopChan is closed.
// onDone is called when fn exits naturally, or when restarts are aborted.
func SafeGoWithRestart(component string, fn func(), backoff time.Duration, stopChan chan struct{}, onDone func()) {
Comment thread logs/util/recover.go
Comment on lines +82 to +90
if stopChan != nil {
select {
case <-time.After(backoff):
case <-stopChan:
log.Printf("I! [%s] shutdown signal received, aborting restart", component)
return
}
} else {
time.Sleep(backoff)
Comment thread logs/pipeline/provider.go
Comment on lines +88 to +92
if len(p.pipelines) == 0 {
log.Printf("E! all %d pipelines failed to initialize, log collection is disabled", p.numberOfPipelines)
p.dropChan = make(chan *message.Message, 1000)
util.SafeGo("logs/provider/drop", func() {
for range p.dropChan {
@kongfei605 kongfei605 merged commit 0aa23a2 into flashcatcloud:main May 7, 2026
6 of 7 checks passed
@kongfei605 kongfei605 deleted the sarama_fix branch May 7, 2026 07:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants