diff --git a/downstreamadapter/eventcollector/event_collector.go b/downstreamadapter/eventcollector/event_collector.go index 6f9a5215b9..e543ceacce 100644 --- a/downstreamadapter/eventcollector/event_collector.go +++ b/downstreamadapter/eventcollector/event_collector.go @@ -39,7 +39,7 @@ import ( const ( receiveChanSize = 1024 * 8 commonMsgRetryQuota = 3 // The number of retries for most droppable dispatcher requests. - eventServiceHeartbeatInterval = time.Second + eventServiceHeartbeatInterval = time.Second * 10 ) // DispatcherMessage is the message send to EventService. diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 21bdb93641..274ce64b9d 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -270,6 +270,8 @@ func (c *eventBroker) refreshMinSentResolvedTs(ctx context.Context) error { case <-ctx.Done(): return context.Cause(ctx) case <-ticker.C: + // for test + continue c.changefeedMap.Range(func(key, value interface{}) bool { status := value.(*changefeedStatus) status.refreshMinSentResolvedTs() @@ -434,7 +436,9 @@ func (c *eventBroker) getScanTaskDataRange(task scanTask) (bool, common.DataRang // this dispatcher still has pending ddl to catch up. hasPendingDDLEventInCurrentRange := dataRange.CommitTsStart < ddlState.MaxEventCommitTs && ddlState.MaxEventCommitTs <= commitTsEndBeforeWindow - scanMaxTs := task.changefeedStat.getScanMaxTs() + //scanMaxTs := task.changefeedStat.getScanMaxTs() + // fizz for test + scanMaxTs := uint64(0) if scanMaxTs > 0 { dataRange.CommitTsEnd = min(dataRange.CommitTsEnd, scanMaxTs) if dataRange.CommitTsEnd < commitTsEndBeforeWindow { @@ -477,7 +481,7 @@ func (c *eventBroker) getScanTaskDataRange(task scanTask) (bool, common.DataRang // Scan range can become empty after applying capping (for example, scan window). // Send a signal resolved-ts event (rate limited) to keep downstream responsive, // but do not advance the watermark here. - c.sendSignalResolvedTs(task) + //c.sendSignalResolvedTs(task) return false, common.DataRange{} }