From 80d9b471e7a7d97da0ef1e92b9bee5d15f321e76 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Wed, 6 May 2026 10:37:01 +0800 Subject: [PATCH 1/4] eventservice: turn off scan window cap and refresh resolvedTs Signed-off-by: dongmen <414110582@qq.com> --- pkg/eventservice/event_broker.go | 4 +++- pkg/eventservice/scan_window.go | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 21bdb93641..1886dab108 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -434,7 +434,9 @@ func (c *eventBroker) getScanTaskDataRange(task scanTask) (bool, common.DataRang // this dispatcher still has pending ddl to catch up. hasPendingDDLEventInCurrentRange := dataRange.CommitTsStart < ddlState.MaxEventCommitTs && ddlState.MaxEventCommitTs <= commitTsEndBeforeWindow - scanMaxTs := task.changefeedStat.getScanMaxTs() + //scanMaxTs := task.changefeedStat.getScanMaxTs() + // fizz for test + scanMaxTs := uint64(0) if scanMaxTs > 0 { dataRange.CommitTsEnd = min(dataRange.CommitTsEnd, scanMaxTs) if dataRange.CommitTsEnd < commitTsEndBeforeWindow { diff --git a/pkg/eventservice/scan_window.go b/pkg/eventservice/scan_window.go index 46cb9cffb3..9f575eee5f 100644 --- a/pkg/eventservice/scan_window.go +++ b/pkg/eventservice/scan_window.go @@ -329,6 +329,8 @@ func (c *changefeedStatus) maxScanInterval() time.Duration { } func (c *changefeedStatus) refreshMinSentResolvedTs() { + // fizz for test + return now := time.Now() minSentResolvedTs := ^uint64(0) minSentResolvedTsWithStale := ^uint64(0) From 9be1c4856115264fee1f08b1a34f9e4cdbc3e2d2 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Wed, 6 May 2026 11:00:24 +0800 Subject: [PATCH 2/4] eventservice: don't send signal resolvedTs while need to skip scan Signed-off-by: dongmen <414110582@qq.com> --- pkg/eventservice/event_broker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 1886dab108..c75c68a5b9 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -479,7 +479,7 @@ func (c *eventBroker) getScanTaskDataRange(task scanTask) (bool, common.DataRang // Scan range can become empty after applying capping (for example, scan window). // Send a signal resolved-ts event (rate limited) to keep downstream responsive, // but do not advance the watermark here. - c.sendSignalResolvedTs(task) + //c.sendSignalResolvedTs(task) return false, common.DataRange{} } From 1f510b08d0a6bbd8796a6b7975079798f8e7bdbb Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Wed, 6 May 2026 11:02:20 +0800 Subject: [PATCH 3/4] eventservice: skip refresh min resolvedTs Signed-off-by: dongmen <414110582@qq.com> --- pkg/eventservice/event_broker.go | 2 ++ pkg/eventservice/scan_window.go | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index c75c68a5b9..274ce64b9d 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -270,6 +270,8 @@ func (c *eventBroker) refreshMinSentResolvedTs(ctx context.Context) error { case <-ctx.Done(): return context.Cause(ctx) case <-ticker.C: + // for test + continue c.changefeedMap.Range(func(key, value interface{}) bool { status := value.(*changefeedStatus) status.refreshMinSentResolvedTs() diff --git a/pkg/eventservice/scan_window.go b/pkg/eventservice/scan_window.go index 9f575eee5f..46cb9cffb3 100644 --- a/pkg/eventservice/scan_window.go +++ b/pkg/eventservice/scan_window.go @@ -329,8 +329,6 @@ func (c *changefeedStatus) maxScanInterval() time.Duration { } func (c *changefeedStatus) refreshMinSentResolvedTs() { - // fizz for test - return now := time.Now() minSentResolvedTs := ^uint64(0) minSentResolvedTsWithStale := ^uint64(0) From dc5846a28c8461760e8878758267307885eb7da2 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Wed, 6 May 2026 11:29:22 +0800 Subject: [PATCH 4/4] eventcollector: adjust heartbeat interval to 10s Signed-off-by: dongmen <414110582@qq.com> --- downstreamadapter/eventcollector/event_collector.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/downstreamadapter/eventcollector/event_collector.go b/downstreamadapter/eventcollector/event_collector.go index 6f9a5215b9..e543ceacce 100644 --- a/downstreamadapter/eventcollector/event_collector.go +++ b/downstreamadapter/eventcollector/event_collector.go @@ -39,7 +39,7 @@ import ( const ( receiveChanSize = 1024 * 8 commonMsgRetryQuota = 3 // The number of retries for most droppable dispatcher requests. - eventServiceHeartbeatInterval = time.Second + eventServiceHeartbeatInterval = time.Second * 10 ) // DispatcherMessage is the message send to EventService.