Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion downstreamadapter/eventcollector/event_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
const (
receiveChanSize = 1024 * 8
commonMsgRetryQuota = 3 // The number of retries for most droppable dispatcher requests.
eventServiceHeartbeatInterval = time.Second
eventServiceHeartbeatInterval = time.Second * 10
)

// DispatcherMessage is the message send to EventService.
Expand Down
8 changes: 6 additions & 2 deletions pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ func (c *eventBroker) refreshMinSentResolvedTs(ctx context.Context) error {
case <-ctx.Done():
return context.Cause(ctx)
case <-ticker.C:
// for test
continue
c.changefeedMap.Range(func(key, value interface{}) bool {
status := value.(*changefeedStatus)
status.refreshMinSentResolvedTs()
Expand Down Expand Up @@ -434,7 +436,9 @@ func (c *eventBroker) getScanTaskDataRange(task scanTask) (bool, common.DataRang
// this dispatcher still has pending ddl to catch up.
hasPendingDDLEventInCurrentRange := dataRange.CommitTsStart < ddlState.MaxEventCommitTs &&
ddlState.MaxEventCommitTs <= commitTsEndBeforeWindow
scanMaxTs := task.changefeedStat.getScanMaxTs()
//scanMaxTs := task.changefeedStat.getScanMaxTs()
// fizz for test
scanMaxTs := uint64(0)
Comment on lines +439 to +441
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The scan window capping logic has been disabled by hardcoding scanMaxTs to 0. The accompanying comment indicates this is for testing purposes. This should be reverted to use task.changefeedStat.getScanMaxTs() to ensure proper flow control and prevent potential memory issues in production.

Suggested change
//scanMaxTs := task.changefeedStat.getScanMaxTs()
// fizz for test
scanMaxTs := uint64(0)
scanMaxTs := task.changefeedStat.getScanMaxTs()

if scanMaxTs > 0 {
dataRange.CommitTsEnd = min(dataRange.CommitTsEnd, scanMaxTs)
if dataRange.CommitTsEnd < commitTsEndBeforeWindow {
Expand Down Expand Up @@ -477,7 +481,7 @@ func (c *eventBroker) getScanTaskDataRange(task scanTask) (bool, common.DataRang
// Scan range can become empty after applying capping (for example, scan window).
// Send a signal resolved-ts event (rate limited) to keep downstream responsive,
// but do not advance the watermark here.
c.sendSignalResolvedTs(task)
//c.sendSignalResolvedTs(task)
return false, common.DataRange{}
}

Expand Down
Loading