kafka: bump sarama to fix out or oder when controller failure#5483
kafka: bump sarama to fix out or oder when controller failure#54833AceShowHand wants to merge 7 commits into
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
There was a problem hiding this comment.
Code Review
This pull request updates the sarama dependency version and modifies initSaramaLogger in pkg/logger/log.go to restrict Sarama logging to levels below InfoLevel. The reviewer suggested simplifying the level check to level < zapcore.InfoLevel for better readability and consistency with other parts of the codebase.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
📝 WalkthroughWalkthroughReplaces ChangesCloud storage path keys, schema validation, consumer wiring, and sink DDL write flow
Sarama version bump and logger initialization guard
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 3❌ Failed checks (3 warnings)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 OpenGrep (1.23.0)downstreamadapter/sink/cloudstorage/encoder_group_test.go┌──────────────┐ �[32m✔�[39m �[1mOpengrep OSS�[0m [00.35][ERROR]: unable to find a config; path downstreamadapter/sink/cloudstorage/dml_writers.go┌──────────────┐ �[32m✔�[39m �[1mOpengrep OSS�[0m [00.71][ERROR]: unable to find a config; path downstreamadapter/sink/cloudstorage/buffer_manager_test.go┌──────────────┐ �[32m✔�[39m �[1mOpengrep OSS�[0m [00.80][ERROR]: unable to find a config; path
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
/test all |
|
/test all |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/sink/cloudstorage/schema_file.go (1)
230-249: 🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
Buildshould reset receiver state before populating fields.
BuildmutatesSchemaFilein place but never clears existingColumns/TotalColumns. A second call can keep stale column data (especially on the early-return branch whenTableInfois nil), producing invalid schema artifacts.Proposed fix
func (t *SchemaFile) Build(event *commonEvent.DDLEvent, outputColumnID bool) { - t.Version = defaultSchemaFileVersion - t.TableVersion = event.FinishedTs - t.Query = event.Query - t.Type = event.Type + *t = SchemaFile{ + Version: defaultSchemaFileVersion, + TableVersion: event.FinishedTs, + Query: event.Query, + Type: event.Type, + } info := event.TableInfo if info == nil { t.Schema = event.GetTargetSchemaName() t.Table = event.GetTargetTableName() return } t.Schema = info.GetTargetSchemaName() t.Table = info.GetTargetTableName() t.TotalColumns = len(info.GetColumns()) + t.Columns = make([]TableCol, 0, t.TotalColumns) for _, col := range info.GetColumns() { var tableCol TableCol tableCol.FromTiColumnInfo(col, outputColumnID) t.Columns = append(t.Columns, tableCol) } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/sink/cloudstorage/schema_file.go` around lines 230 - 249, The Build method on SchemaFile mutates the receiver in place without clearing existing state before population. At the start of the Build method (before any field assignments), reset the Columns slice to empty and reset TotalColumns to zero. This ensures that subsequent calls to Build do not retain stale column data from previous invocations, especially on the early-return path when TableInfo is nil.
🧹 Nitpick comments (2)
pkg/logger/log_test.go (1)
37-49: 📐 Maintainability & Code Quality | 🔵 Trivial | 💤 Low valueConsider testing additional log levels (optional).
The test validates the behavior for
DebugLevel(zap logger) andInfoLevel(discard logger). For completeness, you could add assertions forWarnLevelandErrorLevelto ensure they also use the discard logger. However, the current test is focused and sufficient given that the guard condition applies uniformly to all levels ≥ Info.🧪 Optional: Additional test coverage
require.NoError(t, initSaramaLogger(zapcore.InfoLevel)) require.NotSame(t, debugLogger, sarama.Logger) require.IsType(t, stdlog.New(nil, "", 0), sarama.Logger) + + // Verify WarnLevel and ErrorLevel also use discard logger + require.NoError(t, initSaramaLogger(zapcore.WarnLevel)) + require.IsType(t, stdlog.New(nil, "", 0), sarama.Logger) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/logger/log_test.go` around lines 37 - 49, The test TestInitSaramaLoggerResetsWhenInfoEnabled currently only validates behavior for DebugLevel and InfoLevel. To improve test coverage, add additional assertions that call initSaramaLogger with WarnLevel and ErrorLevel, and verify that these levels also result in sarama.Logger being set to a discard logger (using require.IsType with stdlog.New), similar to the existing InfoLevel validation.pkg/logger/log.go (1)
261-265: 📐 Maintainability & Code Quality | 🔵 TrivialConsider adding a clarifying comment for the guard condition.
The condition
zapcore.InfoLevel.Enabled(level)correctly silences Sarama when the log level is Info or higher (less verbose), but the intent may not be immediately clear to future readers. A brief comment explaining that Sarama is intentionally silenced at less verbose levels would improve maintainability.📝 Suggested clarifying comment
func initSaramaLogger(level zapcore.Level) error { + // Silence Sarama at Info or higher levels; it is noisy and not needed for normal operation. if zapcore.InfoLevel.Enabled(level) { sarama.Logger = stdlog.New(io.Discard, "[Sarama] ", stdlog.LstdFlags) return nil }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/logger/log.go` around lines 261 - 265, Add a clarifying comment above the if statement that checks `zapcore.InfoLevel.Enabled(level)` to explain that Sarama logging is intentionally silenced when the log level is Info or higher (less verbose logging levels). The comment should make it clear to future maintainers that this condition prevents overly verbose output from Sarama by discarding its logs when verbosity is lower, improving code readability and maintainability.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@cmd/storage-consumer/consumer.go`:
- Around line 593-594: The mustGetSchemaFile function at line 593-594 panics
when a schema file is missing, which causes hard failures when DML files are
encountered before their corresponding schema files due to unordered file walk
traversal in getNewFiles. To fix this, either enforce a two-pass approach in
getNewFiles that processes all schema files before DML files, or add schema
existence validation before invoking mustGetSchemaFile in the loop to check if
the schema is present in schemaFileMap and skip or log the DML key gracefully
instead of panicking. Additionally, replace the hard panic logic at lines 523,
529, and 536 with recoverable error handling or skip logic to handle transient
storage inconsistencies.
In `@pkg/sink/cloudstorage/schema_file.go`:
- Around line 300-305: The checksumPayload struct initialization is missing the
Version field, which means schema file version changes won't be reflected in the
checksum-derived filenames. In the checksumPayload struct literal construction
within marshalForChecksum, add the Version field assignment alongside the other
fields like Table, Schema, Columns, and TotalColumns. This ensures that version
information is included in the checksum payload calculation.
- Around line 275-280: The isTableLevel() method currently uses log.Panic to
handle schema validation failures, which crashes the process instead of allowing
callers to handle the error. Refactor isTableLevel() to return an error in
addition to the boolean result (or return only an error if appropriate), and
instead of calling log.Panic when len(t.Columns) != t.TotalColumns, return a
predefined repository error as per the coding guidelines documented in
docs/agents/error-handling.md. This allows callers to handle validation failures
gracefully without causing process-level availability impact.
---
Outside diff comments:
In `@pkg/sink/cloudstorage/schema_file.go`:
- Around line 230-249: The Build method on SchemaFile mutates the receiver in
place without clearing existing state before population. At the start of the
Build method (before any field assignments), reset the Columns slice to empty
and reset TotalColumns to zero. This ensures that subsequent calls to Build do
not retain stale column data from previous invocations, especially on the
early-return path when TableInfo is nil.
---
Nitpick comments:
In `@pkg/logger/log_test.go`:
- Around line 37-49: The test TestInitSaramaLoggerResetsWhenInfoEnabled
currently only validates behavior for DebugLevel and InfoLevel. To improve test
coverage, add additional assertions that call initSaramaLogger with WarnLevel
and ErrorLevel, and verify that these levels also result in sarama.Logger being
set to a discard logger (using require.IsType with stdlog.New), similar to the
existing InfoLevel validation.
In `@pkg/logger/log.go`:
- Around line 261-265: Add a clarifying comment above the if statement that
checks `zapcore.InfoLevel.Enabled(level)` to explain that Sarama logging is
intentionally silenced when the log level is Info or higher (less verbose
logging levels). The comment should make it clear to future maintainers that
this condition prevents overly verbose output from Sarama by discarding its logs
when verbosity is lower, improving code readability and maintainability.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 3d711bcf-bb42-4ca5-a5a9-64a068bff015
📒 Files selected for processing (13)
cmd/storage-consumer/consumer.godownstreamadapter/sink/cloudstorage/sink.godownstreamadapter/sink/cloudstorage/sink_test.gopkg/logger/log.gopkg/logger/log_test.gopkg/sink/cloudstorage/path.gopkg/sink/cloudstorage/path_key.gopkg/sink/cloudstorage/path_key_test.gopkg/sink/cloudstorage/path_test.gopkg/sink/cloudstorage/schema_file.gopkg/sink/cloudstorage/schema_file_parse.gopkg/sink/cloudstorage/schema_file_parse_test.gopkg/sink/cloudstorage/schema_file_test.go
|
/test all |
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@cmd/storage-consumer/consumer.go`:
- Around line 445-452: The code at ParseFileIndexFromFileName extracts a
dispatcherID from the filename into fileIndex, but then immediately overwrites
it with the dispatcherID from the index path without validating they match. Add
validation after ParseFileIndexFromFileName returns to check if the extracted
dispatcherID from the filename matches the dispatcherID parameter being assigned
to FileIndexKey. If there is a mismatch, return an error indicating the
dispatcher IDs are inconsistent, as this can cause incorrect file-index state
and replay behavior.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: fbd9743c-107c-4c78-a91f-205d8272aa8a
📒 Files selected for processing (19)
cmd/storage-consumer/consumer.godownstreamadapter/sink/cloudstorage/buffer_manager.godownstreamadapter/sink/cloudstorage/buffer_manager_test.godownstreamadapter/sink/cloudstorage/dml_writers.godownstreamadapter/sink/cloudstorage/encoder_group_test.godownstreamadapter/sink/cloudstorage/sink.godownstreamadapter/sink/cloudstorage/sink_test.godownstreamadapter/sink/cloudstorage/task.godownstreamadapter/sink/cloudstorage/writer.godownstreamadapter/sink/cloudstorage/writer_test.gopkg/cloudstorage/config.gopkg/cloudstorage/config_test.gopkg/cloudstorage/main_test.gopkg/cloudstorage/path.gopkg/cloudstorage/path_key.gopkg/cloudstorage/path_key_test.gopkg/cloudstorage/path_test.gopkg/cloudstorage/schema_file.gopkg/cloudstorage/schema_file_test.go
✅ Files skipped from review due to trivial changes (1)
- downstreamadapter/sink/cloudstorage/buffer_manager.go
🚧 Files skipped from review as they are similar to previous changes (1)
- downstreamadapter/sink/cloudstorage/sink_test.go
| fileIndex, err := cloudstorage.ParseFileIndexFromFileName(fileName, c.fileExtension) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| fileIndex := &cloudstorage.FileIndex{ | ||
| FileIndexKey: cloudstorage.FileIndexKey{ | ||
| DispatcherID: dispatcherID, | ||
| EnableTableAcrossNodes: dispatcherID != "", | ||
| }, | ||
| Idx: fileIdx, | ||
| fileIndex.FileIndexKey = cloudstorage.FileIndexKey{ | ||
| DispatcherID: dispatcherID, | ||
| EnableTableAcrossNodes: dispatcherID != "", | ||
| } |
There was a problem hiding this comment.
🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win
Validate dispatcher consistency between index-path and index-file payload.
At Line 445 and Line 449-452, the dispatcher parsed from the data filename is discarded and replaced by the dispatcher from the index path without validation. A mismatch can mis-key file-index state and replay from the wrong stream.
Proposed fix
- fileIndex, err := cloudstorage.ParseFileIndexFromFileName(fileName, c.fileExtension)
+ fileIndex, err := cloudstorage.ParseFileIndexFromFileName(fileName, c.fileExtension)
if err != nil {
return err
}
+ if fileIndex.DispatcherID != dispatcherID {
+ return errors.ErrStorageSinkInvalidFileName.GenWithStack(
+ "dispatcher mismatch between index path and file payload: path=%s, payload=%s, file=%s",
+ dispatcherID, fileIndex.DispatcherID, fileName,
+ )
+ }
fileIndex.FileIndexKey = cloudstorage.FileIndexKey{
DispatcherID: dispatcherID,
EnableTableAcrossNodes: dispatcherID != "",
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@cmd/storage-consumer/consumer.go` around lines 445 - 452, The code at
ParseFileIndexFromFileName extracts a dispatcherID from the filename into
fileIndex, but then immediately overwrites it with the dispatcherID from the
index path without validating they match. Add validation after
ParseFileIndexFromFileName returns to check if the extracted dispatcherID from
the filename matches the dispatcherID parameter being assigned to FileIndexKey.
If there is a mismatch, return an error indicating the dispatcher IDs are
inconsistent, as this can cause incorrect file-index state and replay behavior.
|
@3AceShowHand: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
Release Notes
Improvements
Chores