dnm: cp pr 4030 4460 4950 and add switch for it#5369
Conversation
|
This cherry pick PR is for a release branch and has not yet been approved by triage owners. To merge this cherry pick:
DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces an adaptive scan window controller to dynamically adjust the scan interval based on memory usage ratios and release counts, updates congestion control to version 2 to support these metrics, and adds new Grafana panels for monitoring. It also introduces a DDL workload runner to the workload tool. Key feedback points include fixing serialization in AvailableMemory to respect the Version field, resolving connection retry and recovery issues in the DDL worker loop, ensuring nil-safety by using GetTableSpan() instead of direct field access, and making the TOML file extension check case-insensitive.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| func (m AvailableMemory) Marshal() []byte { | ||
| return m.marshalV1() | ||
| } | ||
|
|
||
| func (m *AvailableMemory) Unmarshal(buf *bytes.Buffer) { | ||
| m.unmarshalV1(buf) | ||
| } | ||
|
|
||
| func (m AvailableMemory) GetSize() int { | ||
| return m.sizeV1() | ||
| } |
There was a problem hiding this comment.
The AvailableMemory struct's Marshal() and GetSize() methods always default to V1 serialization and size calculation, completely ignoring the Version field. Additionally, unmarshalV1 and unmarshalV2 do not set the Version field on the unmarshaled struct. This leads to silent data loss or incorrect size calculations if AvailableMemory is marshaled or its size is queried directly after being unmarshaled as V2. Please ensure Version is set during unmarshaling, and respected during marshaling and size calculation.
func (m AvailableMemory) Marshal() []byte {
if m.Version == CongestionControlVersion2 {
return m.marshalV2()
}
return m.marshalV1()
}
func (m *AvailableMemory) Unmarshal(buf *bytes.Buffer) {
_ = m.unmarshalV1(buf)
}
func (m AvailableMemory) GetSize() int {
if m.Version == CongestionControlVersion2 {
return m.sizeV2()
}
return m.sizeV1()
}| go func(workerID int, db *DBWrapper) { | ||
| defer func() { | ||
| plog.Info("ddl worker exited", zap.Int("worker", workerID)) | ||
| wg.Done() | ||
| }() | ||
|
|
||
| conn, err := getConnWithTimeout(db.DB, 10*time.Second) | ||
| if err != nil { | ||
| plog.Info("get connection failed for ddl worker", zap.Error(err)) | ||
| time.Sleep(5 * time.Second) | ||
| return | ||
| } | ||
| defer func() { | ||
| if conn != nil { | ||
| conn.Close() | ||
| } | ||
| }() | ||
|
|
||
| plog.Info("start ddl worker", zap.Int("worker", workerID), zap.String("db", db.Name)) | ||
|
|
||
| for { | ||
| task := <-r.taskCh | ||
| if err := r.executeTask(conn, task); err != nil { | ||
| if r.app.isConnectionError(err) { | ||
| conn.Close() | ||
| time.Sleep(2 * time.Second) | ||
| newConn, err := getConnWithTimeout(db.DB, 10*time.Second) | ||
| if err != nil { | ||
| plog.Info("reconnect failed for ddl worker", zap.Error(err)) | ||
| time.Sleep(5 * time.Second) | ||
| continue | ||
| } | ||
| conn = newConn | ||
| } | ||
| } | ||
| } | ||
| }(workerID, db) |
There was a problem hiding this comment.
There are two critical connection handling issues in the worker loop:
- If the initial connection attempt fails, the worker goroutine logs the error and exits permanently. If all workers fail initially, the runner will end up with 0 active workers.
- If reconnection fails inside the loop, the worker continues to the next iteration and attempts to execute the next task on the closed connection, which immediately fails and inflates the error metrics incorrectly.
To fix both issues, the worker should retry connecting in a loop before starting task execution, and also retry in a loop upon connection loss before taking the next task.
go func(workerID int, db *DBWrapper) {
defer func() {
plog.Info("ddl worker exited", zap.Int("worker", workerID))
wg.Done()
}()
var conn *sql.Conn
var err error
for {
conn, err = getConnWithTimeout(db.DB, 10*time.Second)
if err == nil {
break
}
plog.Info("get connection failed for ddl worker, retrying...", zap.Error(err))
time.Sleep(5 * time.Second)
}
defer func() {
if conn != nil {
conn.Close()
}
}()
plog.Info("start ddl worker", zap.Int("worker", workerID), zap.String("db", db.Name))
for {
task := <-r.taskCh
if err := r.executeTask(conn, task); err != nil {
if r.app.isConnectionError(err) {
conn.Close()
for {
time.Sleep(2 * time.Second)
newConn, err := getConnWithTimeout(db.DB, 10*time.Second)
if err == nil {
conn = newConn
break
}
plog.Info("reconnect failed for ddl worker, retrying...", zap.Error(err))
}
}
}
}
}(workerID, db)| zap.Int64("oldTableID", d.tableSpan.GetTableID()), | ||
| zap.Int64("currentTableID", ddl.GetTableID()), |
There was a problem hiding this comment.
Accessing d.tableSpan directly may bypass concurrency synchronization or nil-safety checks that are normally provided by d.GetTableSpan(). To prevent potential data races or nil-pointer panics, it is safer to use d.GetTableSpan().GetTableID().
| zap.Int64("oldTableID", d.tableSpan.GetTableID()), | |
| zap.Int64("currentTableID", ddl.GetTableID()), | |
| zap.Int64("oldTableID", d.GetTableSpan().GetTableID()), | |
| zap.Int64("currentTableID", ddl.GetTableID()), |
| if filepath.Ext(path) != ".toml" { | ||
| return nil, errors.Errorf("ddl config must be a .toml file: %s", path) | ||
| } |
There was a problem hiding this comment.
The file extension check is case-sensitive. If a user provides a config file with an uppercase extension (e.g., .TOML), the validation will fail. Consider making the check case-insensitive.
| if filepath.Ext(path) != ".toml" { | |
| return nil, errors.Errorf("ddl config must be a .toml file: %s", path) | |
| } | |
| if strings.ToLower(filepath.Ext(path)) != ".toml" { | |
| return nil, errors.Errorf("ddl config must be a .toml file: %s", path) | |
| } |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
Add a per-changefeed `enable-scan-window` replica config (default false) and plumb it through the changefeed config, dispatcher info and event service. When the switch is off the adaptive scan-window feature is fully inert and the changefeed behaves as if it was never introduced: - event service: memory control, adaptive scan interval, base-ts capping, empty-range signal, pending-DDL local advance and scan-window metrics are all gated. - dynstream: the memory release ratio follows the switch (0.4 off / 0.6 on); the deadlock high-water-mark stays 0.6 in both modes.
7d0fe3a to
81e8500
Compare
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note