From a1450cea902797b58b56180a8d354a7e567abf85 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 22 Jun 2026 10:53:30 -0700 Subject: [PATCH 1/3] alpha version --- docs/design/2026-06-18-puller-memory-quota.md | 257 ++++++++++++++++++ logservice/logpuller/memory_quota.go | 114 ++++++++ logservice/logpuller/memory_quota_test.go | 100 +++++++ logservice/logpuller/region_req_cache.go | 45 ++- logservice/logpuller/region_request_worker.go | 56 ++-- .../logpuller/region_request_worker_test.go | 67 +++++ logservice/logpuller/subscription_client.go | 56 +++- .../logpuller/subscription_client_test.go | 10 +- pkg/config/debug.go | 8 + pkg/config/debug_test.go | 29 ++ pkg/metrics/log_puller.go | 16 ++ utils/dynstream/interfaces.go | 14 + utils/dynstream/memory_control.go | 6 +- 13 files changed, 748 insertions(+), 30 deletions(-) create mode 100644 docs/design/2026-06-18-puller-memory-quota.md create mode 100644 logservice/logpuller/memory_quota.go create mode 100644 logservice/logpuller/memory_quota_test.go create mode 100644 pkg/config/debug_test.go diff --git a/docs/design/2026-06-18-puller-memory-quota.md b/docs/design/2026-06-18-puller-memory-quota.md new file mode 100644 index 0000000000..f336d3e369 --- /dev/null +++ b/docs/design/2026-06-18-puller-memory-quota.md @@ -0,0 +1,257 @@ +# Puller Memory Quota and Region Scan Throttling + +## Status + +Proposed. + +## Background + +The log puller receives Region events from TiKV and pushes them into a dynamic +stream for processing. The dynamic stream currently uses a fixed 1 GiB memory +quota and applies area-level flow control with the following thresholds: + +- Pause input when pending memory reaches 80% of the quota. +- Resume input when pending memory falls below 50% of the quota. + +Once input is paused, the puller stops consuming from the TiKV gRPC stream. This +protects TiCDC from unbounded pending data, but transfers the pressure to TiKV. +In an extreme case, TiKV may continue running newly requested incremental Region +scans while it is unable to send their output to TiCDC, increasing TiKV memory +usage. + +The fixed 80% pause threshold also leaves a significant portion of the puller +quota unused. The puller should be able to use all available quota before +applying hard backpressure. + +## Goals + +- Give the log puller a dedicated, configurable memory quota. +- Allow the puller to use available quota up to the hard limit. +- Stop sending new Region incremental scan requests before the quota is full. +- Retain hard backpressure as the final memory protection mechanism. +- Always allow subscription cleanup requests to make progress. +- Keep the change local to the log puller where possible. + +## Non-goals + +- Limiting the total RSS of the TiCDC process. +- Accounting for all puller-related allocations, such as gRPC internals, Region + state, transaction matcher state, or Go runtime overhead. +- Cancelling Region scans that have already been sent to TiKV. +- Dynamically tuning the scan throttling thresholds. +- Providing separate quotas for individual changefeeds or subscriptions. + +The quota in this design is a logical budget for Region events admitted into the +puller processing pipeline. It is not a strict process memory limit. + +## Configuration + +Add one server-level debug configuration: + +```toml +[debug.puller] +memory-quota = 1073741824 # 1 GiB +``` + +`memory-quota` is expressed in bytes and must be greater than zero. Its default +value is 1 GiB, preserving the current effective quota. + +The following thresholds are implementation constants and are not configurable: + +```go +const ( + regionScanPauseRatio = 0.5 + regionScanResumeRatio = 0.4 +) +``` + +The hard event admission limit is always 100% of `memory-quota`. + +## Design Overview + +The subscription client owns one `PullerMemoryQuota`. All subscriptions handled +by that client share the quota. + +The quota controls two independent actions: + +1. Region scan admission: stop new Region register requests at 50% usage and + resume them below 40% usage. +2. Event admission: block new Region events when admitting them would exceed the + hard quota, and admit them as soon as sufficient space becomes available. + +```text + usage >= 50% + NORMAL ---------------------------------> SCAN_THROTTLED + ^ | + | usage < 40% | usage >= 100% + | v + +------------------------------------------- FULL + | + | sufficient space released + v + SCAN_THROTTLED +``` + +`FULL` does not have a separate low-watermark. Input resumes as soon as usage +falls below 100%. This removes the current behavior where input remains paused +until usage falls from 80% to 50%. + +The 50%/40% hysteresis applies only to Region scan admission. It prevents Region +requests from repeatedly pausing and resuming when usage fluctuates around 50%. + +## Puller Memory Quota + +The quota controller belongs to `logservice/logpuller`, rather than a shared +package, because its Region scan gate and lifecycle semantics are puller-specific. +It is installed as the memory control algorithm for the puller's dynamic stream. +Dynamic stream remains responsible for pending-event memory accounting, including +normal dequeue and path removal, while the puller-owned controller makes the +backpressure and Region scan decisions. + +### Event admission + +Dynamic stream continues accepting events while pending usage is below 100%. +When an append moves usage to or above 100%, it sends `PauseArea` feedback and +the subscription client stops pushing subsequent Region events. Once usage falls +below 100%, it sends `ResumeArea` immediately. + +Feedback and already-buffered events make this a bounded soft overshoot rather +than a strict allocation barrier. The overshoot consists of events already in +the receive and dynamic stream input buffers when the threshold is crossed. This +matches the safety model of the existing puller flow control while allowing the +full configured quota to be used. + +### Event release + +Memory is released through the existing dynamic stream accounting when an event +is dequeued or a path is removed. The old puller 80%/50% algorithm is replaced by +the puller-owned controller and must not independently trigger flow control. + +## Region Scan Gate + +Every Region register request can initiate an incremental scan in TiKV. This +includes both the first request for a subscription and a registration retried +after a Region error. Therefore all register requests are subject to the scan +gate. + +Requests already sent to TiKV remain active. Pausing them would discard work and +could cause repeated scans after resubscription. + +The gate never blocks: + +- Deregister requests. +- Subscription cleanup. +- Local error handling. +- Resolve-lock processing. + +### Authoritative gate + +The authoritative check is immediately before sending a Region register request +on the TiKV gRPC stream. This is the only point that can guarantee no additional +incremental scan is started after the gate closes. + +If the gate closes after a request has been dequeued, the worker retains that +request and waits. It must still process deregister requests while waiting. The +request cache or send loop therefore needs a separate control path for deregister +requests so that cleanup cannot be blocked behind a waiting register request. + +### Non-authoritative producer gate + +An additional gate is checked in `divideSpanAndScheduleRegionRequests`: + +- Before each `BatchLoadRegionsWithKeyRange` call. +- While scheduling the Regions returned by a batch. + +This gate is an optimization. It avoids unnecessary PD requests, Region range +splitting, and growth of local task queues while scans are paused. It is called +non-authoritative because memory usage can cross 50% after this check, and +requests may already exist in the Region task queue or worker request cache. + +Waiting at this gate must exit when the context is cancelled or the subscribed +span is stopped. The send-time gate remains mandatory even when the producer +gate is present. + +## Concurrency and Ordering + +Quota state transitions and Region scan gate transitions must be atomic with +respect to memory reservation: + +- An `Acquire` that moves usage to or above 50% closes the scan gate before it + returns. +- A `Release` that moves usage below 40% opens the scan gate and wakes waiting + Region workers. +- A `Release` wakes event admission waiters whenever their request may now fit. +- State-change logs are emitted only on threshold crossings. + +The existing Region request priority continues to apply after the scan gate +opens. Deregister requests have a separate bypass and do not depend on that +priority ordering. + +## Observability + +Add metrics for: + +- Configured puller quota in bytes. +- Admitted memory in bytes. +- Available quota in bytes. +- Whether the Region scan gate is open or closed. +- Number of Region register requests waiting on the gate. +- Scan gate pause and resume counts. + +Log quota and scan gate state only on transitions. Transition logs should include +the configured quota, current usage, threshold, and number of waiting requests. + +## Failure and Shutdown Behavior + +- Context cancellation wakes all quota and Region gate waiters. +- Unsubscribe and deregister continue even while the quota is full. +- Closing the subscription client wakes blocked event pushes and Region gate + waiters through context cancellation. +- A Region worker reconnect does not bypass the scan gate for register requests. +- Quota accounting errors must not be corrected silently. Underflow, double + release, or leaked reservations should be surfaced during tests and with an + operational error log. + +## Compatibility + +The default 1 GiB value preserves the current configuration-free behavior with +the following intentional changes: + +- New Region scans stop at 50% puller memory usage. +- Event input continues beyond 80% while space remains. +- Hard backpressure starts only when the next event cannot fit. +- Input resumes as soon as the waiting event can fit instead of waiting for usage + to fall to 50%. + +No TiKV protocol change is required. + +## Test Plan + +### Quota unit tests + +- Input is not paused below 100% usage. +- Input pauses when usage reaches 100%. +- Input resumes immediately after usage falls below 100%. +- Existing dynamic stream path removal correctly reduces accounted memory. + +### Region gate unit tests + +- The gate closes when usage reaches 50%. +- The gate remains closed between 40% and 50%. +- The gate opens when usage falls below 40%. +- Register requests are not sent while the gate is closed. +- Both initial and retry register requests are gated. +- Deregister requests are sent while register requests are waiting. +- Cancellation and unsubscribe wake producer-side waiters. + +### Integration tests + +Use a deliberately slow event consumer and verify that: + +- Puller admitted memory remains bounded by the expected quota and receive + headroom. +- The number of active TiKV incremental scans stops increasing after the 50% + threshold is crossed. +- Existing Region streams continue making progress until hard backpressure is + required. +- Region request scheduling resumes after usage falls below 40%. diff --git a/logservice/logpuller/memory_quota.go b/logservice/logpuller/memory_quota.go new file mode 100644 index 0000000000..819f0d0d4a --- /dev/null +++ b/logservice/logpuller/memory_quota.go @@ -0,0 +1,114 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package logpuller + +import ( + "context" + "sync" + + "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/metrics" + "go.uber.org/zap" +) + +const ( + regionScanPauseRatio = 0.5 + regionScanResumeRatio = 0.4 +) + +// pullerMemoryQuota applies hard backpressure at the configured quota and +// gates new region scans before the hard limit is reached. Dynamic stream owns +// the memory accounting and calls this controller whenever usage changes. +type pullerMemoryQuota struct { + mu sync.Mutex + + regionScanPaused bool + regionScanResume chan struct{} +} + +func newPullerMemoryQuota() *pullerMemoryQuota { + metrics.PullerRegionScanGate.Set(1) + return &pullerMemoryQuota{ + regionScanResume: make(chan struct{}), + } +} + +// ShouldPausePath implements dynstream.MemoryControlAlgorithm. Puller flow +// control is global, so it does not pause individual subscription paths. +func (q *pullerMemoryQuota) ShouldPausePath( + _ bool, pathPendingSize int64, _ int64, maxPendingSize uint64, _ int64, +) (bool, bool, float64) { + return false, false, float64(pathPendingSize) / float64(maxPendingSize) +} + +// ShouldPauseArea implements dynstream.MemoryControlAlgorithm. +func (q *pullerMemoryQuota) ShouldPauseArea( + paused bool, pendingSize int64, maxPendingSize uint64, +) (bool, bool, float64) { + usageRatio := float64(pendingSize) / float64(maxPendingSize) + q.updateRegionScanState(usageRatio, pendingSize, maxPendingSize) + + if paused { + return false, usageRatio < 1, usageRatio + } + return usageRatio >= 1, false, usageRatio +} + +func (q *pullerMemoryQuota) updateRegionScanState( + usageRatio float64, pendingSize int64, maxPendingSize uint64, +) { + q.mu.Lock() + defer q.mu.Unlock() + + switch { + case !q.regionScanPaused && usageRatio >= regionScanPauseRatio: + q.regionScanPaused = true + q.regionScanResume = make(chan struct{}) + metrics.PullerRegionScanGate.Set(0) + metrics.PullerRegionScanGateTransition.WithLabelValues("pause").Inc() + log.Info("puller pauses region scans", + zap.Int64("memoryUsage", pendingSize), + zap.Uint64("memoryQuota", maxPendingSize), + zap.Float64("memoryUsageRatio", usageRatio)) + case q.regionScanPaused && usageRatio < regionScanResumeRatio: + q.regionScanPaused = false + close(q.regionScanResume) + metrics.PullerRegionScanGate.Set(1) + metrics.PullerRegionScanGateTransition.WithLabelValues("resume").Inc() + log.Info("puller resumes region scans", + zap.Int64("memoryUsage", pendingSize), + zap.Uint64("memoryQuota", maxPendingSize), + zap.Float64("memoryUsageRatio", usageRatio)) + } +} + +func (q *pullerMemoryQuota) waitRegionScanAllowed(ctx context.Context) error { + for { + resume, paused := q.regionScanResumeNotify() + if !paused { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-resume: + } + } +} + +func (q *pullerMemoryQuota) regionScanResumeNotify() (<-chan struct{}, bool) { + q.mu.Lock() + defer q.mu.Unlock() + return q.regionScanResume, q.regionScanPaused +} diff --git a/logservice/logpuller/memory_quota_test.go b/logservice/logpuller/memory_quota_test.go new file mode 100644 index 0000000000..dcf7e44118 --- /dev/null +++ b/logservice/logpuller/memory_quota_test.go @@ -0,0 +1,100 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package logpuller + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestPullerMemoryQuotaThresholds(t *testing.T) { + quota := newPullerMemoryQuota() + + pause, resume, _ := quota.ShouldPauseArea(false, 49, 100) + require.False(t, pause) + require.False(t, resume) + _, scanPaused := quota.regionScanResumeNotify() + require.False(t, scanPaused) + + pause, resume, _ = quota.ShouldPauseArea(false, 50, 100) + require.False(t, pause) + require.False(t, resume) + _, scanPaused = quota.regionScanResumeNotify() + require.True(t, scanPaused) + + pause, resume, _ = quota.ShouldPauseArea(false, 100, 100) + require.True(t, pause) + require.False(t, resume) + + pause, resume, _ = quota.ShouldPauseArea(true, 99, 100) + require.False(t, pause) + require.True(t, resume) + _, scanPaused = quota.regionScanResumeNotify() + require.True(t, scanPaused) + + quota.ShouldPauseArea(false, 40, 100) + _, scanPaused = quota.regionScanResumeNotify() + require.True(t, scanPaused) + + quota.ShouldPauseArea(false, 39, 100) + _, scanPaused = quota.regionScanResumeNotify() + require.False(t, scanPaused) + require.NoError(t, quota.waitRegionScanAllowed(context.Background())) +} + +func TestPullerMemoryQuotaWaitCancellation(t *testing.T) { + quota := newPullerMemoryQuota() + quota.ShouldPauseArea(false, 50, 100) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { + done <- quota.waitRegionScanAllowed(ctx) + }() + + select { + case err := <-done: + t.Fatalf("wait returned before cancellation: %v", err) + case <-time.After(20 * time.Millisecond): + } + + cancel() + require.ErrorIs(t, <-done, context.Canceled) +} + +func TestProducerGateStopsWaitingAfterUnsubscribe(t *testing.T) { + quota := newPullerMemoryQuota() + quota.ShouldPauseArea(false, 50, 100) + client := &subscriptionClient{memoryQuota: quota} + span := &subscribedSpan{stoppedCh: make(chan struct{})} + + type result struct { + stopped bool + err error + } + done := make(chan result, 1) + go func() { + stopped, err := client.waitRegionScanAllowed(context.Background(), span) + done <- result{stopped: stopped, err: err} + }() + + span.stopped.Store(true) + close(span.stoppedCh) + res := <-done + require.NoError(t, res.err) + require.True(t, res.stopped) +} diff --git a/logservice/logpuller/region_req_cache.go b/logservice/logpuller/region_req_cache.go index b4478cab07..1b44625b05 100644 --- a/logservice/logpuller/region_req_cache.go +++ b/logservice/logpuller/region_req_cache.go @@ -53,6 +53,9 @@ func (r *regionReq) isStale() bool { type requestCache struct { // pending requests waiting to be sent pendingQueue chan regionReq + // controlQueue contains deregister requests. It is separate so cleanup can + // proceed while normal register requests are gated by puller memory usage. + controlQueue chan regionReq // sent requests waiting for initialization (subscriptionID -> regions -> regionReq) sentRequests struct { @@ -78,6 +81,7 @@ type requestCache struct { func newRequestCache(maxPendingCount int) *requestCache { res := &requestCache{ pendingQueue: make(chan regionReq, maxPendingCount), // Large buffer to reduce blocking + controlQueue: make(chan regionReq, maxPendingCount), sentRequests: struct { sync.RWMutex regionReqs map[SubscriptionID]map[uint64]regionReq @@ -104,10 +108,14 @@ func (c *requestCache) add(ctx context.Context, region regionInfo, force bool) ( if current < c.maxPendingCount || force { // Try to add the request req := newRegionReq(region) + queue := c.pendingQueue + if region.isStopped() { + queue = c.controlQueue + } select { case <-ctx.Done(): return false, ctx.Err() - case c.pendingQueue <- req: + case queue <- req: c.pendingCount.Inc() cost := time.Since(start) metrics.SubscriptionClientAddRegionRequestDuration.Observe(cost.Seconds()) @@ -142,6 +150,13 @@ func (c *requestCache) add(ctx context.Context, region regionInfo, force bool) ( // (e.g. resolve/markStopped/markDone). func (c *requestCache) pop(ctx context.Context) (regionReq, error) { select { + case req := <-c.controlQueue: + return req, nil + default: + } + select { + case req := <-c.controlQueue: + return req, nil case req := <-c.pendingQueue: return req, nil case <-ctx.Done(): @@ -149,6 +164,21 @@ func (c *requestCache) pop(ctx context.Context) (regionReq, error) { } } +// popControlOrWait returns a deregister request while normal register requests +// are gated, or returns false after the gate is resumed. +func (c *requestCache) popControlOrWait( + ctx context.Context, resume <-chan struct{}, +) (regionReq, bool, error) { + select { + case req := <-c.controlQueue: + return req, true, nil + case <-resume: + return regionReq{}, false, nil + case <-ctx.Done(): + return regionReq{}, false, ctx.Err() + } +} + // markSent marks a request as sent and adds it to sent requests. // It doesn't change pendingCount: the slot is released when the request is finished/removed. func (c *requestCache) markSent(req regionReq) { @@ -265,7 +295,7 @@ func (c *requestCache) clearStaleRequest() { // If there are no in-cache region requests but pendingCount isn't 0, it means pendingCount is stale. // Reset it to avoid blocking add() forever. - if reqCount == 0 && len(c.pendingQueue) == 0 && c.pendingCount.Load() != 0 { + if reqCount == 0 && len(c.pendingQueue) == 0 && len(c.controlQueue) == 0 && c.pendingCount.Load() != 0 { log.Info("region worker pending request count is not equal to actual region request count, correct it", zap.Int("pendingCount", int(c.pendingCount.Load())), zap.Int("actualReqCount", reqCount), @@ -297,6 +327,17 @@ LOOP: } } +CONTROL_LOOP: + for { + select { + case req := <-c.controlQueue: + regions = append(regions, req.regionInfo) + c.markDone() + default: + break CONTROL_LOOP + } + } + c.sentRequests.Lock() defer c.sentRequests.Unlock() diff --git a/logservice/logpuller/region_request_worker.go b/logservice/logpuller/region_request_worker.go index 142f4c6493..ec399df7bf 100644 --- a/logservice/logpuller/region_request_worker.go +++ b/logservice/logpuller/region_request_worker.go @@ -362,6 +362,29 @@ func (s *regionRequestWorker) processRegionSendTask( // TODO: add a metric? return nil } + handleDeregister := func(region regionInfo) error { + subID := region.subscribedSpan.subID + req := &cdcpb.ChangeDataRequest{ + Header: &cdcpb.Header{ClusterId: s.client.clusterID, TicdcVersion: version.ReleaseSemver()}, + RequestId: uint64(subID), + Request: &cdcpb.ChangeDataRequest_Deregister_{ + Deregister: &cdcpb.ChangeDataRequest_Deregister{}, + }, + FilterLoop: region.filterLoop, + } + s.requestCache.markDone() + if err := doSend(req); err != nil { + return err + } + for _, state := range s.takeRegionStates(subID) { + state.markStopped(&requestCancelledErr{}) + regionEvent := regionEvent{ + states: []*regionFeedState{state}, + } + s.client.pushRegionEventToDS(subID, regionEvent) + } + return nil + } // Handle pre-fetched region first region := *s.preFetchForConnecting @@ -371,6 +394,21 @@ func (s *regionRequestWorker) processRegionSendTask( for { region := regionReq.regionInfo subID := region.subscribedSpan.subID + for !region.isStopped() && !region.subscribedSpan.stopped.Load() && s.client.memoryQuota != nil { + resume, paused := s.client.memoryQuota.regionScanResumeNotify() + if !paused { + break + } + controlReq, ok, err := s.requestCache.popControlOrWait(ctx, resume) + if err != nil { + return err + } + if ok { + if err := handleDeregister(controlReq.regionInfo); err != nil { + return err + } + } + } log.Debug("region request worker gets a singleRegionInfo", zap.Uint64("workerID", s.workerID), zap.Uint64("subscriptionID", uint64(subID)), @@ -380,25 +418,9 @@ func (s *regionRequestWorker) processRegionSendTask( // It means it's a special task for stopping the table. if region.isStopped() { - req := &cdcpb.ChangeDataRequest{ - Header: &cdcpb.Header{ClusterId: s.client.clusterID, TicdcVersion: version.ReleaseSemver()}, - RequestId: uint64(subID), - Request: &cdcpb.ChangeDataRequest_Deregister_{ - Deregister: &cdcpb.ChangeDataRequest_Deregister{}, - }, - FilterLoop: region.filterLoop, - } - s.requestCache.markDone() - if err := doSend(req); err != nil { + if err := handleDeregister(region); err != nil { return err } - for _, state := range s.takeRegionStates(subID) { - state.markStopped(&requestCancelledErr{}) - regionEvent := regionEvent{ - states: []*regionFeedState{state}, - } - s.client.pushRegionEventToDS(subID, regionEvent) - } } else if region.subscribedSpan.stopped.Load() { // It can be skipped directly because there must be no pending states from // the stopped subscribedTable, or the special singleRegionInfo for stopping diff --git a/logservice/logpuller/region_request_worker_test.go b/logservice/logpuller/region_request_worker_test.go index f9752a0eb3..9dd671f051 100644 --- a/logservice/logpuller/region_request_worker_test.go +++ b/logservice/logpuller/region_request_worker_test.go @@ -17,6 +17,7 @@ import ( "context" "io" "testing" + "time" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/cdcpb" @@ -37,6 +38,16 @@ type mockEventFeedV2Client struct { recvErr error } +type recordingEventFeedV2Client struct { + *mockEventFeedV2Client + sent chan *cdcpb.ChangeDataRequest +} + +func (m *recordingEventFeedV2Client) Send(req *cdcpb.ChangeDataRequest) error { + m.sent <- req + return nil +} + func (m *mockEventFeedV2Client) Send(*cdcpb.ChangeDataRequest) error { return m.sendErr } func (m *mockEventFeedV2Client) Recv() (*cdcpb.ChangeDataEvent, error) { return nil, m.recvErr } func (m *mockEventFeedV2Client) Header() (metadata.MD, error) { return metadata.MD{}, nil } @@ -337,6 +348,62 @@ func TestProcessRegionSendTaskSendFailureCleansSentRequest(t *testing.T) { require.True(t, state == nil || state.isStale(), "region state should be removed or marked stale after send failure") } +func TestProcessRegionSendTaskAllowsDeregisterWhileScanPaused(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + quota := newPullerMemoryQuota() + quota.ShouldPauseArea(false, 50, 100) + worker := ®ionRequestWorker{ + requestCache: newRequestCache(10), + store: &requestedStore{storeAddr: "store-1"}, + client: &subscriptionClient{memoryQuota: quota}, + } + worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates) + + registerRegion := prepareRegionForSendTest(createTestRegionInfo(1, 1)) + ok, err := worker.requestCache.add(ctx, registerRegion, false) + require.NoError(t, err) + require.True(t, ok) + registerReq, err := worker.requestCache.pop(ctx) + require.NoError(t, err) + worker.preFetchForConnecting = ®isterReq.regionInfo + + deregisterRegion := regionInfo{subscribedSpan: &subscribedSpan{subID: 2}} + ok, err = worker.requestCache.add(ctx, deregisterRegion, true) + require.NoError(t, err) + require.True(t, ok) + + client := &recordingEventFeedV2Client{ + mockEventFeedV2Client: &mockEventFeedV2Client{}, + sent: make(chan *cdcpb.ChangeDataRequest, 2), + } + errCh := make(chan error, 1) + go func() { + errCh <- worker.processRegionSendTask(ctx, &ConnAndClient{ + Client: client, + Conn: &grpc.ClientConn{}, + }) + }() + + request := <-client.sent + require.NotNil(t, request.GetDeregister()) + require.Equal(t, uint64(2), request.RequestId) + select { + case request = <-client.sent: + t.Fatalf("register request sent while scan gate was closed: %v", request) + case <-time.After(20 * time.Millisecond): + } + + quota.ShouldPauseArea(false, 39, 100) + request = <-client.sent + require.Nil(t, request.GetDeregister()) + require.Equal(t, uint64(1), request.RequestId) + + cancel() + require.ErrorIs(t, <-errCh, context.Canceled) +} + func TestProcessRegionSendTaskSendEOFIsRetriable(t *testing.T) { testCases := []struct { name string diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 0ba3b4be24..097fb4288a 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -129,7 +129,8 @@ type subscribedSpan struct { kvEventsCache []common.RawKVEntry // To handle span removing. - stopped atomic.Bool + stopped atomic.Bool + stoppedCh chan struct{} // To handle stale lock resolvings. tryResolveLock func(regionID uint64, state *regionlock.LockedRangeState) @@ -194,6 +195,8 @@ type subscriptionClient struct { stores sync.Map ds dynstream.DynamicStream[int, SubscriptionID, regionEvent, *subscribedSpan, *regionEventHandler] + // memoryQuota controls puller backpressure and new region scan admission. + memoryQuota *pullerMemoryQuota // the following three fields are used to manage feedback from ds and notify other goroutines mu sync.Mutex cond *sync.Cond @@ -248,6 +251,7 @@ func NewSubscriptionClient( } subClient.ctx, subClient.cancel = context.WithCancel(context.Background()) subClient.totalSpans.spanMap = make(map[SubscriptionID]*subscribedSpan) + subClient.memoryQuota = newPullerMemoryQuota() option := dynstream.NewOption() // Note: it is max batch size of the kv sent from tikv(not committed rows) @@ -366,7 +370,9 @@ func (s *subscriptionClient) Subscribe( s.totalSpans.spanMap[subID] = rt s.totalSpans.Unlock() - areaSetting := dynstream.NewAreaSettingsWithMaxPendingSize(1*1024*1024*1024, dynstream.MemoryControlForPuller, "logPuller") // 1GB + pullerConfig := config.GetGlobalServerConfig().Debug.Puller + areaSetting := dynstream.NewAreaSettingsWithMemoryControlAlgorithm( + pullerConfig.MemoryQuota, s.memoryQuota, "logPuller") s.ds.AddPath(rt.subID, rt, areaSetting) select { @@ -492,6 +498,9 @@ func (s *subscriptionClient) setTableStopped(rt *subscribedSpan) { // Then send a special singleRegionInfo to regionRouter to deregister the table // from all TiKV instances. if rt.stopped.CompareAndSwap(false, true) { + if rt.stoppedCh != nil { + close(rt.stoppedCh) + } s.regionTaskQueue.Push(NewRegionPriorityTask(TaskHighPrior, regionInfo{subscribedSpan: rt, filterLoop: rt.filterLoop}, s.pdClock.CurrentTS())) if rt.rangeLock.Stop() { s.onTableDrained(rt) @@ -727,6 +736,13 @@ func (s *subscriptionClient) divideSpanAndScheduleRegionRequests( nextSpan := span backoffBeforeLoad := false for { + stopped, err := s.waitRegionScanAllowed(ctx, subscribedSpan) + if err != nil { + return err + } + if stopped { + return nil + } if backoffBeforeLoad { if err := util.Hang(ctx, loadRegionRetryInterval); err != nil { return err @@ -763,6 +779,13 @@ func (s *subscriptionClient) divideSpanAndScheduleRegionRequests( } for _, regionMeta := range regionMetas { + stopped, err := s.waitRegionScanAllowed(ctx, subscribedSpan) + if err != nil { + return err + } + if stopped { + return nil + } regionSpan := heartbeatpb.TableSpan{ StartKey: regionMeta.StartKey, EndKey: regionMeta.EndKey, @@ -796,6 +819,34 @@ func (s *subscriptionClient) divideSpanAndScheduleRegionRequests( } } +func (s *subscriptionClient) waitRegionScanAllowed( + ctx context.Context, subscribedSpan *subscribedSpan, +) (bool, error) { + if subscribedSpan.stopped.Load() { + return true, nil + } + if s.memoryQuota == nil { + return false, nil + } + for { + resume, paused := s.memoryQuota.regionScanResumeNotify() + if !paused { + break + } + select { + case <-ctx.Done(): + return false, ctx.Err() + case <-subscribedSpan.stoppedCh: + return true, nil + case <-resume: + } + } + if subscribedSpan.stopped.Load() { + return true, nil + } + return false, nil +} + // scheduleRegionRequest locks the region's range and send the region to regionTaskQueue, // which will be handled by handleRegions. func (s *subscriptionClient) scheduleRegionRequest(ctx context.Context, region regionInfo, priority TaskType) { @@ -1089,6 +1140,7 @@ func (s *subscriptionClient) newSubscribedSpan( startTs: startTs, filterLoop: filterLoop, rangeLock: rangeLock, + stoppedCh: make(chan struct{}), consumeKVEvents: consumeKVEvents, advanceResolvedTs: advanceResolvedTs, diff --git a/logservice/logpuller/subscription_client_test.go b/logservice/logpuller/subscription_client_test.go index 2e87faa4fc..0b0ed247e3 100644 --- a/logservice/logpuller/subscription_client_test.go +++ b/logservice/logpuller/subscription_client_test.go @@ -412,7 +412,7 @@ func TestPushRegionEventToDSUnblocksOnClose(t *testing.T) { } } -func TestEnqueueRegionToAllStoresRetryWhenCacheFull(t *testing.T) { +func TestEnqueueStopRegionWhenRegisterCacheFull(t *testing.T) { ctx := context.Background() client := &subscriptionClient{} @@ -436,15 +436,9 @@ func TestEnqueueRegionToAllStoresRetryWhenCacheFull(t *testing.T) { } enqueued, err := client.enqueueRegionToAllStores(ctx, stopRegion) require.NoError(t, err) - require.False(t, enqueued) - - <-worker.requestCache.pendingQueue - worker.requestCache.markDone() - - enqueued, err = client.enqueueRegionToAllStores(ctx, stopRegion) - require.NoError(t, err) require.True(t, enqueued) require.Equal(t, 1, len(worker.requestCache.pendingQueue)) + require.Equal(t, 1, len(worker.requestCache.controlQueue)) } func TestSubscriptionWithFailedTiKV(t *testing.T) { diff --git a/pkg/config/debug.go b/pkg/config/debug.go index 1aa6f9b924..7304b0254e 100644 --- a/pkg/config/debug.go +++ b/pkg/config/debug.go @@ -17,6 +17,7 @@ import ( "time" "github.com/pingcap/errors" + cerror "github.com/pingcap/ticdc/pkg/errors" ) // DebugConfig represents config for ticdc unexposed feature configurations @@ -49,12 +50,18 @@ func (c *DebugConfig) ValidateAndAdjust() error { if err := c.Scheduler.ValidateAndAdjust(); err != nil { return errors.Trace(err) } + if c.Puller.MemoryQuota == 0 { + return cerror.ErrInvalidServerOption.GenWithStackByArgs( + "debug.puller.memory-quota must be greater than 0") + } return nil } // PullerConfig represents config for puller type PullerConfig struct { + // MemoryQuota is the memory quota in bytes for events buffered by the puller. + MemoryQuota uint64 `toml:"memory-quota" json:"memory_quota"` // EnableResolvedTsStuckDetection is used to enable resolved ts stuck detection. EnableResolvedTsStuckDetection bool `toml:"enable-resolved-ts-stuck-detection" json:"enable_resolved_ts_stuck_detection"` // ResolvedTsStuckInterval is the interval of checking resolved ts stuck. @@ -72,6 +79,7 @@ type PullerConfig struct { // NewDefaultPullerConfig return the default puller configuration func NewDefaultPullerConfig() *PullerConfig { return &PullerConfig{ + MemoryQuota: 1024 * 1024 * 1024, // 1 GiB. EnableResolvedTsStuckDetection: false, ResolvedTsStuckInterval: TomlDuration(5 * time.Minute), LogRegionDetails: false, diff --git a/pkg/config/debug_test.go b/pkg/config/debug_test.go new file mode 100644 index 0000000000..0889bcefc8 --- /dev/null +++ b/pkg/config/debug_test.go @@ -0,0 +1,29 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestPullerMemoryQuotaConfig(t *testing.T) { + config := GetDefaultServerConfig() + require.Equal(t, uint64(1024*1024*1024), config.Debug.Puller.MemoryQuota) + require.NoError(t, config.Debug.ValidateAndAdjust()) + + config.Debug.Puller.MemoryQuota = 0 + require.Error(t, config.Debug.ValidateAndAdjust()) +} diff --git a/pkg/metrics/log_puller.go b/pkg/metrics/log_puller.go index e874aa4428..01d15c0f21 100644 --- a/pkg/metrics/log_puller.go +++ b/pkg/metrics/log_puller.go @@ -64,6 +64,20 @@ var ( Name: "resolved_ts_lag", Help: "The lag of resolved ts", }) + PullerRegionScanGate = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "log_puller", + Name: "region_scan_gate_open", + Help: "Whether the puller region scan gate is open.", + }) + PullerRegionScanGateTransition = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "log_puller", + Name: "region_scan_gate_transition_count", + Help: "The number of puller region scan gate state transitions.", + }, []string{"type"}) SubscriptionClientResolvedTsLagGauge = prometheus.NewGauge( prometheus.GaugeOpts{ @@ -164,6 +178,8 @@ func initLogPullerMetrics(registry *prometheus.Registry) { registry.MustRegister(LogPullerPrewriteCacheRowNum) registry.MustRegister(LogPullerMatcherCount) registry.MustRegister(LogPullerResolvedTsLag) + registry.MustRegister(PullerRegionScanGate) + registry.MustRegister(PullerRegionScanGateTransition) registry.MustRegister(SubscriptionClientRequestedRegionCount) registry.MustRegister(SubscriptionClientAddRegionRequestDuration) registry.MustRegister(RegionRequestFinishScanDuration) diff --git a/utils/dynstream/interfaces.go b/utils/dynstream/interfaces.go index eb891fa13d..efa79b1457 100644 --- a/utils/dynstream/interfaces.go +++ b/utils/dynstream/interfaces.go @@ -256,6 +256,10 @@ type AreaSettings struct { // Remove it when we determine the v2 is working well. // The algorithm of the memory control. algorithm int + // memoryControlAlgorithm overrides algorithm when it is set. It allows a + // component to own state derived from memory usage without moving that state + // into dynamic stream. + memoryControlAlgorithm MemoryControlAlgorithm // control how to batch events batchConfig batchConfig @@ -286,6 +290,16 @@ func NewAreaSettingsWithMaxPendingSize( } } +// NewAreaSettingsWithMemoryControlAlgorithm creates area settings with a +// component-owned memory control algorithm. +func NewAreaSettingsWithMemoryControlAlgorithm( + quota uint64, algorithm MemoryControlAlgorithm, component string, +) AreaSettings { + settings := NewAreaSettingsWithMaxPendingSize(quota, MemoryControlForPuller, component) + settings.memoryControlAlgorithm = algorithm + return settings +} + func NewAreaSettingsWithMaxPendingSizeAndBatchConfig( quota uint64, algorithm int, component string, batchCount int, batchBytes int, ) AreaSettings { diff --git a/utils/dynstream/memory_control.go b/utils/dynstream/memory_control.go index 93af18b8ed..14b5e3792f 100644 --- a/utils/dynstream/memory_control.go +++ b/utils/dynstream/memory_control.go @@ -70,13 +70,17 @@ func newAreaMemStat[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]]( feedbackChan chan<- Feedback[A, P, D], ) *areaMemStat[A, P, T, D, H] { settings.fix() + algorithm := settings.memoryControlAlgorithm + if algorithm == nil { + algorithm = NewMemoryControlAlgorithm(settings.algorithm) + } res := &areaMemStat[A, P, T, D, H]{ area: area, memControl: memoryControl, feedbackChan: feedbackChan, lastSendFeedbackTime: atomic.Value{}, lastSizeDecreaseTime: atomic.Value{}, - algorithm: NewMemoryControlAlgorithm(settings.algorithm), + algorithm: algorithm, } res.lastAppendEventTime.Store(time.Now()) From 9cd8758c550b5ee4c0191fe46ee3623fd1b8cf37 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 22 Jun 2026 11:32:26 -0700 Subject: [PATCH 2/3] fix --- docs/design/2026-06-18-puller-memory-quota.md | 60 +++--- logservice/logpuller/memory_quota.go | 188 +++++++++++++++--- logservice/logpuller/memory_quota_test.go | 96 ++++++--- logservice/logpuller/region_event_handler.go | 14 +- .../logpuller/region_request_worker_test.go | 7 +- logservice/logpuller/subscription_client.go | 95 ++------- .../logpuller/subscription_client_test.go | 12 +- pkg/metrics/log_puller.go | 8 + utils/dynstream/interfaces.go | 14 -- utils/dynstream/memory_control.go | 6 +- 10 files changed, 320 insertions(+), 180 deletions(-) diff --git a/docs/design/2026-06-18-puller-memory-quota.md b/docs/design/2026-06-18-puller-memory-quota.md index f336d3e369..34f9f16560 100644 --- a/docs/design/2026-06-18-puller-memory-quota.md +++ b/docs/design/2026-06-18-puller-memory-quota.md @@ -83,7 +83,7 @@ The quota controls two independent actions: usage >= 50% NORMAL ---------------------------------> SCAN_THROTTLED ^ | - | usage < 40% | usage >= 100% + | usage < 40% | next event does not fit | v +------------------------------------------- FULL | @@ -92,9 +92,9 @@ The quota controls two independent actions: SCAN_THROTTLED ``` -`FULL` does not have a separate low-watermark. Input resumes as soon as usage -falls below 100%. This removes the current behavior where input remains paused -until usage falls from 80% to 50%. +`FULL` does not have a separate low-watermark. A waiting event proceeds as soon +as sufficient quota is released. This removes the current behavior where input +remains paused until usage falls from 80% to 50%. The 50%/40% hysteresis applies only to Region scan admission. It prevents Region requests from repeatedly pausing and resuming when usage fluctuates around 50%. @@ -103,29 +103,40 @@ requests from repeatedly pausing and resuming when usage fluctuates around 50%. The quota controller belongs to `logservice/logpuller`, rather than a shared package, because its Region scan gate and lifecycle semantics are puller-specific. -It is installed as the memory control algorithm for the puller's dynamic stream. -Dynamic stream remains responsible for pending-event memory accounting, including -normal dequeue and path removal, while the puller-owned controller makes the -backpressure and Region scan decisions. +It is independent from dynamic stream memory control. The puller does not enable +dynamic stream memory control, consume its feedback, or implement its memory +control algorithm interface. ### Event admission -Dynamic stream continues accepting events while pending usage is below 100%. -When an append moves usage to or above 100%, it sends `PauseArea` feedback and -the subscription client stops pushing subsequent Region events. Once usage falls -below 100%, it sends `ResumeArea` immediately. +After receiving a Region event and calculating its size, the subscription client +acquires a puller-owned reservation before calling `DynamicStream.Push`. -Feedback and already-buffered events make this a bounded soft overshoot rather -than a strict allocation barrier. The overshoot consists of events already in -the receive and dynamic stream input buffers when the threshold is crossed. This -matches the safety model of the existing puller flow control while allowing the -full configured quota to be used. +- The event is admitted immediately when it fits in the remaining quota. +- Otherwise its receive goroutine waits for a reservation release, subscription + stop, or context cancellation. +- Concurrent waiters are serialized by the quota and cannot over-admit memory. +- A single event larger than the configured quota is admitted only when it can + run alone, avoiding permanent deadlock. + +The event has already been allocated by gRPC when its size becomes known. Each +receive goroutine may therefore hold one unadmitted event while waiting. These +events are outside the logical quota, but no additional events are received by +the blocked goroutines. ### Event release -Memory is released through the existing dynamic stream accounting when an event -is dequeued or a path is removed. The old puller 80%/50% algorithm is replaced by -the puller-owned controller and must not independently trigger flow control. +Every admitted event carries a puller reservation. The reservation is released: + +- At the start of normal dynamic stream handling, when the event leaves the + buffered pipeline. +- From `OnDrop` when dynamic stream rejects the event. +- In aggregate when its subscription is removed. +- In aggregate when the subscription client closes. + +Reservations refer to an internal subscription accounting object. Aggregate +subscription release removes that object, so a later release from an in-flight +stale event becomes a no-op instead of decrementing unrelated memory. ## Region Scan Gate @@ -229,10 +240,11 @@ No TiKV protocol change is required. ### Quota unit tests -- Input is not paused below 100% usage. -- Input pauses when usage reaches 100%. -- Input resumes immediately after usage falls below 100%. -- Existing dynamic stream path removal correctly reduces accounted memory. +- An event is admitted whenever it fits in the remaining quota. +- A non-fitting event waits and resumes as soon as sufficient quota is released. +- An oversized event runs alone without deadlocking the puller. +- Subscription removal releases all of its reservations. +- Releases from stale subscriptions do not corrupt accounting. ### Region gate unit tests diff --git a/logservice/logpuller/memory_quota.go b/logservice/logpuller/memory_quota.go index 819f0d0d4a..653e494435 100644 --- a/logservice/logpuller/memory_quota.go +++ b/logservice/logpuller/memory_quota.go @@ -16,6 +16,7 @@ package logpuller import ( "context" "sync" + "sync/atomic" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/metrics" @@ -27,50 +28,179 @@ const ( regionScanResumeRatio = 0.4 ) -// pullerMemoryQuota applies hard backpressure at the configured quota and -// gates new region scans before the hard limit is reached. Dynamic stream owns -// the memory accounting and calls this controller whenever usage changes. +// pullerMemoryQuota is the admission controller for events entering the puller +// dynamic stream. It is intentionally independent from dynamic stream memory +// control because it also controls region scan admission. type pullerMemoryQuota struct { mu sync.Mutex + capacity uint64 + used uint64 + closed bool + + memoryReleased chan struct{} + regionScanPaused bool regionScanResume chan struct{} + + subscriptions map[SubscriptionID]*pullerMemorySubscription } -func newPullerMemoryQuota() *pullerMemoryQuota { +type pullerMemorySubscription struct { + usage uint64 +} + +type pullerMemoryReservation struct { + quota *pullerMemoryQuota + subID SubscriptionID + subscription *pullerMemorySubscription + bytes uint64 + released atomic.Bool +} + +func newPullerMemoryQuota(capacity uint64) *pullerMemoryQuota { metrics.PullerRegionScanGate.Set(1) + metrics.PullerMemoryQuota.WithLabelValues("quota").Set(float64(capacity)) + metrics.PullerMemoryQuota.WithLabelValues("used").Set(0) return &pullerMemoryQuota{ + capacity: capacity, + memoryReleased: make(chan struct{}), regionScanResume: make(chan struct{}), + subscriptions: make(map[SubscriptionID]*pullerMemorySubscription), + } +} + +func (q *pullerMemoryQuota) acquire( + ctx context.Context, subID SubscriptionID, bytes uint64, stopped <-chan struct{}, +) (*pullerMemoryReservation, error) { + for { + q.mu.Lock() + if q.closed { + q.mu.Unlock() + return nil, context.Canceled + } + // A single oversized event must be allowed to run alone, otherwise it can + // never make progress and permanently deadlocks the puller. + fits := q.used <= q.capacity && bytes <= q.capacity-q.used + if fits || q.used == 0 && bytes > q.capacity { + subscription := q.subscriptions[subID] + if subscription == nil { + subscription = &pullerMemorySubscription{} + q.subscriptions[subID] = subscription + } + q.used += bytes + subscription.usage += bytes + q.updateRegionScanStateLocked() + q.mu.Unlock() + return &pullerMemoryReservation{ + quota: q, + subID: subID, + subscription: subscription, + bytes: bytes, + }, nil + } + memoryReleased := q.memoryReleased + q.mu.Unlock() + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-stopped: + return nil, context.Canceled + case <-memoryReleased: + } } } -// ShouldPausePath implements dynstream.MemoryControlAlgorithm. Puller flow -// control is global, so it does not pause individual subscription paths. -func (q *pullerMemoryQuota) ShouldPausePath( - _ bool, pathPendingSize int64, _ int64, maxPendingSize uint64, _ int64, -) (bool, bool, float64) { - return false, false, float64(pathPendingSize) / float64(maxPendingSize) +func (r *pullerMemoryReservation) release() { + if r == nil || !r.released.CompareAndSwap(false, true) { + return + } + r.quota.release(r) +} + +func (q *pullerMemoryQuota) release(reservation *pullerMemoryReservation) { + q.mu.Lock() + defer q.mu.Unlock() + if q.closed || q.subscriptions[reservation.subID] != reservation.subscription { + return + } + + usage := reservation.subscription.usage + if usage < reservation.bytes || q.used < reservation.bytes { + log.Error("puller memory quota accounting underflow", + zap.Uint64("subscriptionID", uint64(reservation.subID)), + zap.Uint64("releaseBytes", reservation.bytes), + zap.Uint64("subscriptionUsage", usage), + zap.Uint64("memoryUsage", q.used)) + return + } + + q.used -= reservation.bytes + usage -= reservation.bytes + if usage == 0 { + delete(q.subscriptions, reservation.subID) + } else { + reservation.subscription.usage = usage + } + q.notifyMemoryReleasedLocked() + q.updateRegionScanStateLocked() } -// ShouldPauseArea implements dynstream.MemoryControlAlgorithm. -func (q *pullerMemoryQuota) ShouldPauseArea( - paused bool, pendingSize int64, maxPendingSize uint64, -) (bool, bool, float64) { - usageRatio := float64(pendingSize) / float64(maxPendingSize) - q.updateRegionScanState(usageRatio, pendingSize, maxPendingSize) +// releaseSubscription releases all reservations owned by a removed +// subscription. Reservation ownership makes later stale releases no-ops. +func (q *pullerMemoryQuota) releaseSubscription(subID SubscriptionID) { + q.mu.Lock() + defer q.mu.Unlock() + if q.closed { + return + } - if paused { - return false, usageRatio < 1, usageRatio + subscription := q.subscriptions[subID] + if subscription == nil { + return + } + usage := subscription.usage + if usage > q.used { + log.Error("puller subscription memory usage exceeds total usage", + zap.Uint64("subscriptionID", uint64(subID)), + zap.Uint64("subscriptionUsage", usage), + zap.Uint64("memoryUsage", q.used)) + usage = q.used + } + q.used -= usage + delete(q.subscriptions, subID) + if usage != 0 { + q.notifyMemoryReleasedLocked() + q.updateRegionScanStateLocked() } - return usageRatio >= 1, false, usageRatio } -func (q *pullerMemoryQuota) updateRegionScanState( - usageRatio float64, pendingSize int64, maxPendingSize uint64, -) { +func (q *pullerMemoryQuota) close() { q.mu.Lock() defer q.mu.Unlock() + if q.closed { + return + } + q.closed = true + q.used = 0 + clear(q.subscriptions) + metrics.PullerMemoryQuota.WithLabelValues("used").Set(0) + close(q.memoryReleased) + if q.regionScanPaused { + q.regionScanPaused = false + close(q.regionScanResume) + } + metrics.PullerRegionScanGate.Set(1) +} +func (q *pullerMemoryQuota) notifyMemoryReleasedLocked() { + close(q.memoryReleased) + q.memoryReleased = make(chan struct{}) +} + +func (q *pullerMemoryQuota) updateRegionScanStateLocked() { + usageRatio := float64(q.used) / float64(q.capacity) switch { case !q.regionScanPaused && usageRatio >= regionScanPauseRatio: q.regionScanPaused = true @@ -78,8 +208,8 @@ func (q *pullerMemoryQuota) updateRegionScanState( metrics.PullerRegionScanGate.Set(0) metrics.PullerRegionScanGateTransition.WithLabelValues("pause").Inc() log.Info("puller pauses region scans", - zap.Int64("memoryUsage", pendingSize), - zap.Uint64("memoryQuota", maxPendingSize), + zap.Uint64("memoryUsage", q.used), + zap.Uint64("memoryQuota", q.capacity), zap.Float64("memoryUsageRatio", usageRatio)) case q.regionScanPaused && usageRatio < regionScanResumeRatio: q.regionScanPaused = false @@ -87,8 +217,8 @@ func (q *pullerMemoryQuota) updateRegionScanState( metrics.PullerRegionScanGate.Set(1) metrics.PullerRegionScanGateTransition.WithLabelValues("resume").Inc() log.Info("puller resumes region scans", - zap.Int64("memoryUsage", pendingSize), - zap.Uint64("memoryQuota", maxPendingSize), + zap.Uint64("memoryUsage", q.used), + zap.Uint64("memoryQuota", q.capacity), zap.Float64("memoryUsageRatio", usageRatio)) } } @@ -112,3 +242,9 @@ func (q *pullerMemoryQuota) regionScanResumeNotify() (<-chan struct{}, bool) { defer q.mu.Unlock() return q.regionScanResume, q.regionScanPaused } + +func (q *pullerMemoryQuota) usage() (used, capacity uint64) { + q.mu.Lock() + defer q.mu.Unlock() + return q.used, q.capacity +} diff --git a/logservice/logpuller/memory_quota_test.go b/logservice/logpuller/memory_quota_test.go index dcf7e44118..75f3186dc7 100644 --- a/logservice/logpuller/memory_quota_test.go +++ b/logservice/logpuller/memory_quota_test.go @@ -21,64 +21,102 @@ import ( "github.com/stretchr/testify/require" ) -func TestPullerMemoryQuotaThresholds(t *testing.T) { - quota := newPullerMemoryQuota() +func TestPullerMemoryQuotaAdmissionAndScanThresholds(t *testing.T) { + quota := newPullerMemoryQuota(100) + ctx := context.Background() - pause, resume, _ := quota.ShouldPauseArea(false, 49, 100) - require.False(t, pause) - require.False(t, resume) + first, err := quota.acquire(ctx, 1, 49, nil) + require.NoError(t, err) _, scanPaused := quota.regionScanResumeNotify() require.False(t, scanPaused) - pause, resume, _ = quota.ShouldPauseArea(false, 50, 100) - require.False(t, pause) - require.False(t, resume) + second, err := quota.acquire(ctx, 1, 1, nil) + require.NoError(t, err) _, scanPaused = quota.regionScanResumeNotify() require.True(t, scanPaused) - pause, resume, _ = quota.ShouldPauseArea(false, 100, 100) - require.True(t, pause) - require.False(t, resume) + third, err := quota.acquire(ctx, 1, 50, nil) + require.NoError(t, err) + used, capacity := quota.usage() + require.Equal(t, uint64(100), used) + require.Equal(t, uint64(100), capacity) - pause, resume, _ = quota.ShouldPauseArea(true, 99, 100) - require.False(t, pause) - require.True(t, resume) - _, scanPaused = quota.regionScanResumeNotify() - require.True(t, scanPaused) + blocked := make(chan *pullerMemoryReservation, 1) + go func() { + reservation, acquireErr := quota.acquire(ctx, 2, 1, nil) + require.NoError(t, acquireErr) + blocked <- reservation + }() + select { + case <-blocked: + t.Fatal("event admitted when quota was full") + case <-time.After(20 * time.Millisecond): + } - quota.ShouldPauseArea(false, 40, 100) + third.release() + fourth := <-blocked + used, _ = quota.usage() + require.Equal(t, uint64(51), used) _, scanPaused = quota.regionScanResumeNotify() require.True(t, scanPaused) - quota.ShouldPauseArea(false, 39, 100) + first.release() _, scanPaused = quota.regionScanResumeNotify() require.False(t, scanPaused) - require.NoError(t, quota.waitRegionScanAllowed(context.Background())) + require.NoError(t, quota.waitRegionScanAllowed(ctx)) + + second.release() + fourth.release() + used, _ = quota.usage() + require.Zero(t, used) } -func TestPullerMemoryQuotaWaitCancellation(t *testing.T) { - quota := newPullerMemoryQuota() - quota.ShouldPauseArea(false, 50, 100) +func TestPullerMemoryQuotaReleaseSubscription(t *testing.T) { + quota := newPullerMemoryQuota(100) + reservation, err := quota.acquire(context.Background(), 1, 60, nil) + require.NoError(t, err) + + quota.releaseSubscription(1) + used, _ := quota.usage() + require.Zero(t, used) + + // A stale event can still be handled after subscription cleanup. Its release + // must not affect a later generation of accounting. + newReservation, err := quota.acquire(context.Background(), 1, 20, nil) + require.NoError(t, err) + reservation.release() + used, _ = quota.usage() + require.Equal(t, uint64(20), used) + newReservation.release() +} + +func TestPullerMemoryQuotaOversizedEventRunsAlone(t *testing.T) { + quota := newPullerMemoryQuota(100) + reservation, err := quota.acquire(context.Background(), 1, 101, nil) + require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) done := make(chan error, 1) go func() { - done <- quota.waitRegionScanAllowed(ctx) + _, acquireErr := quota.acquire(ctx, 2, 1, nil) + done <- acquireErr }() - select { case err := <-done: - t.Fatalf("wait returned before cancellation: %v", err) + t.Fatalf("second event admitted with oversized event: %v", err) case <-time.After(20 * time.Millisecond): } cancel() require.ErrorIs(t, <-done, context.Canceled) + reservation.release() } func TestProducerGateStopsWaitingAfterUnsubscribe(t *testing.T) { - quota := newPullerMemoryQuota() - quota.ShouldPauseArea(false, 50, 100) + quota := newPullerMemoryQuota(100) + reservation, err := quota.acquire(context.Background(), 1, 50, nil) + require.NoError(t, err) + defer reservation.release() client := &subscriptionClient{memoryQuota: quota} span := &subscribedSpan{stoppedCh: make(chan struct{})} @@ -88,8 +126,8 @@ func TestProducerGateStopsWaitingAfterUnsubscribe(t *testing.T) { } done := make(chan result, 1) go func() { - stopped, err := client.waitRegionScanAllowed(context.Background(), span) - done <- result{stopped: stopped, err: err} + stopped, waitErr := client.waitRegionScanAllowed(context.Background(), span) + done <- result{stopped: stopped, err: waitErr} }() span.stopped.Store(true) diff --git a/logservice/logpuller/region_event_handler.go b/logservice/logpuller/region_event_handler.go index 7e96008c77..c283e55de4 100644 --- a/logservice/logpuller/region_event_handler.go +++ b/logservice/logpuller/region_event_handler.go @@ -56,6 +56,14 @@ type regionEvent struct { entries *cdcpb.Event_Entries_ resolvedTs uint64 + + memoryReservation *pullerMemoryReservation +} + +func (event *regionEvent) releaseMemory() { + if event != nil { + event.memoryReservation.release() + } } func (event *regionEvent) getSize() int { @@ -63,6 +71,7 @@ func (event *regionEvent) getSize() int { return 0 } size := int(unsafe.Sizeof(*event)) + size += int(unsafe.Sizeof(pullerMemoryReservation{})) if event.entries != nil { size += int(unsafe.Sizeof(*event.entries)) size += int(unsafe.Sizeof(*event.entries.Entries)) @@ -95,6 +104,9 @@ func (h *regionEventHandler) Path(event regionEvent) SubscriptionID { } func (h *regionEventHandler) Handle(span *subscribedSpan, events ...regionEvent) bool { + for i := range events { + events[i].releaseMemory() + } startTime := time.Now() hasEntries := false hasResolved := false @@ -226,7 +238,7 @@ func (h *regionEventHandler) GetType(event regionEvent) dynstream.EventType { } func (h *regionEventHandler) OnDrop(event regionEvent) interface{} { - // TODO: Distinguish between drop events caused by "path not found" errors and memory control. + event.releaseMemory() state := event.mustFirstState() fields := []zap.Field{ zap.Bool("hasEntries", event.entries != nil), diff --git a/logservice/logpuller/region_request_worker_test.go b/logservice/logpuller/region_request_worker_test.go index 9dd671f051..2f729e29e5 100644 --- a/logservice/logpuller/region_request_worker_test.go +++ b/logservice/logpuller/region_request_worker_test.go @@ -352,8 +352,9 @@ func TestProcessRegionSendTaskAllowsDeregisterWhileScanPaused(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - quota := newPullerMemoryQuota() - quota.ShouldPauseArea(false, 50, 100) + quota := newPullerMemoryQuota(100) + reservation, err := quota.acquire(ctx, 99, 50, nil) + require.NoError(t, err) worker := ®ionRequestWorker{ requestCache: newRequestCache(10), store: &requestedStore{storeAddr: "store-1"}, @@ -395,7 +396,7 @@ func TestProcessRegionSendTaskAllowsDeregisterWhileScanPaused(t *testing.T) { case <-time.After(20 * time.Millisecond): } - quota.ShouldPauseArea(false, 39, 100) + reservation.release() request = <-client.sent require.Nil(t, request.GetDeregister()) require.Equal(t, uint64(1), request.RequestId) diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 097fb4288a..e5756a5a94 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -197,10 +197,6 @@ type subscriptionClient struct { ds dynstream.DynamicStream[int, SubscriptionID, regionEvent, *subscribedSpan, *regionEventHandler] // memoryQuota controls puller backpressure and new region scan admission. memoryQuota *pullerMemoryQuota - // the following three fields are used to manage feedback from ds and notify other goroutines - mu sync.Mutex - cond *sync.Cond - paused atomic.Bool // the credential to connect tikv credential *security.Credential @@ -227,13 +223,13 @@ type subscriptionClient struct { // NewSubscriptionClient creates a client. func NewSubscriptionClient( - config *SubscriptionClientConfig, + clientConfig *SubscriptionClientConfig, pd pd.Client, lockResolver txnutil.LockResolver, credential *security.Credential, ) SubscriptionClient { subClient := &subscriptionClient{ - config: config, + config: clientConfig, stores: sync.Map{}, pd: pd, @@ -251,7 +247,8 @@ func NewSubscriptionClient( } subClient.ctx, subClient.cancel = context.WithCancel(context.Background()) subClient.totalSpans.spanMap = make(map[SubscriptionID]*subscribedSpan) - subClient.memoryQuota = newPullerMemoryQuota() + pullerConfig := config.GetGlobalServerConfig().Debug.Puller + subClient.memoryQuota = newPullerMemoryQuota(pullerConfig.MemoryQuota) option := dynstream.NewOption() // Note: it is max batch size of the kv sent from tikv(not committed rows) @@ -259,7 +256,6 @@ func NewSubscriptionClient( // TODO: Set `UseBuffer` to true until we refactor the `regionEventHandler.Handle` method so that it doesn't call any method of the dynamic stream. Currently, if `UseBuffer` is set to false, there will be a deadlock: // ds.handleLoop fetch events from `ch` -> regionEventHandler.Handle -> ds.RemovePath -> send event to `ch` option.UseBuffer = true - option.EnableMemoryControl = true ds := dynstream.NewParallelDynamicStream( "log-puller", ®ionEventHandler{subClient: subClient}, @@ -267,7 +263,6 @@ func NewSubscriptionClient( ) ds.Start() subClient.ds = ds - subClient.cond = sync.NewCond(&subClient.mu) subClient.initMetrics() return subClient @@ -302,24 +297,9 @@ func (s *subscriptionClient) updateMetrics(ctx context.Context) error { dsMetrics := s.ds.GetMetrics() metricSubscriptionClientDSChannelSize.Set(float64(dsMetrics.EventChanSize)) metricSubscriptionClientDSPendingQueueLen.Set(float64(dsMetrics.PendingQueueLen)) - if len(dsMetrics.MemoryControl.AreaMemoryMetrics) > 1 { - log.Panic("subscription client should have only one area") - } - if len(dsMetrics.MemoryControl.AreaMemoryMetrics) > 0 { - areaMetric := dsMetrics.MemoryControl.AreaMemoryMetrics[0] - metrics.DynamicStreamMemoryUsage.WithLabelValues( - "log-puller", - "max", - "default", - "default", - ).Set(float64(areaMetric.MaxMemory())) - metrics.DynamicStreamMemoryUsage.WithLabelValues( - "log-puller", - "used", - "default", - "default", - ).Set(float64(areaMetric.MemoryUsage())) - } + usedMemory, memoryQuota := s.memoryQuota.usage() + metrics.PullerMemoryQuota.WithLabelValues("quota").Set(float64(memoryQuota)) + metrics.PullerMemoryQuota.WithLabelValues("used").Set(float64(usedMemory)) pendingRegionReqCount := 0 s.stores.Range(func(_, value any) bool { @@ -370,10 +350,7 @@ func (s *subscriptionClient) Subscribe( s.totalSpans.spanMap[subID] = rt s.totalSpans.Unlock() - pullerConfig := config.GetGlobalServerConfig().Debug.Puller - areaSetting := dynstream.NewAreaSettingsWithMemoryControlAlgorithm( - pullerConfig.MemoryQuota, s.memoryQuota, "logPuller") - s.ds.AddPath(rt.subID, rt, areaSetting) + s.ds.AddPath(rt.subID, rt) select { case <-s.ctx.Done(): @@ -408,51 +385,20 @@ func (s *subscriptionClient) wakeSubscription(subID SubscriptionID) { } func (s *subscriptionClient) pushRegionEventToDS(subID SubscriptionID, event regionEvent) { - // fast path - if !s.paused.Load() { + if s.memoryQuota == nil { s.ds.Push(subID, event) return } - // slow path: wait until paused is false - s.mu.Lock() - for s.paused.Load() { - select { - case <-s.ctx.Done(): - s.mu.Unlock() - return - default: - s.cond.Wait() - } + span := event.mustFirstState().region.subscribedSpan + reservation, err := s.memoryQuota.acquire( + s.ctx, subID, uint64(event.getSize()), span.stoppedCh) + if err != nil { + return } - s.mu.Unlock() + event.memoryReservation = reservation s.ds.Push(subID, event) } -func (s *subscriptionClient) handleDSFeedBack(ctx context.Context) error { - for { - select { - case <-ctx.Done(): - return nil - case feedback := <-s.ds.Feedback(): - switch feedback.FeedbackType { - case dynstream.PauseArea: - s.mu.Lock() - s.paused.Store(true) - s.mu.Unlock() - log.Info("subscription client pause push region event") - case dynstream.ResumeArea: - s.mu.Lock() - s.paused.Store(false) - s.cond.Broadcast() - s.mu.Unlock() - log.Info("subscription client resume push region event") - case dynstream.ReleasePath, dynstream.ResumePath: - // Ignore it, because it is no need to pause and resume a path in puller. - } - } - } -} - func (s *subscriptionClient) Run(ctx context.Context) error { // s.consume = consume if s.pd == nil { @@ -464,7 +410,6 @@ func (s *subscriptionClient) Run(ctx context.Context) error { g, ctx := errgroup.WithContext(ctx) g.Go(func() error { return s.updateMetrics(ctx) }) - g.Go(func() error { return s.handleDSFeedBack(ctx) }) g.Go(func() error { return s.handleRangeTasks(ctx) }) g.Go(func() error { return s.handleRegions(ctx, g) }) g.Go(func() error { return s.handleErrors(ctx) }) @@ -481,10 +426,9 @@ func (s *subscriptionClient) Run(ctx context.Context) error { // Close closes the client. Must be called after `Run` returns. func (s *subscriptionClient) Close(ctx context.Context) error { s.cancel() - s.mu.Lock() - s.paused.Store(false) - s.cond.Broadcast() - s.mu.Unlock() + if s.memoryQuota != nil { + s.memoryQuota.close() + } s.ds.Close() s.regionTaskQueue.Close() return nil @@ -512,6 +456,9 @@ func (s *subscriptionClient) onTableDrained(rt *subscribedSpan) { log.Info("subscription client stop span is finished", zap.Uint64("subscriptionID", uint64(rt.subID))) + if s.memoryQuota != nil { + s.memoryQuota.releaseSubscription(rt.subID) + } err := s.ds.RemovePath(rt.subID) if err != nil { log.Warn("subscription client remove path failed", diff --git a/logservice/logpuller/subscription_client_test.go b/logservice/logpuller/subscription_client_test.go index 0b0ed247e3..6948257a55 100644 --- a/logservice/logpuller/subscription_client_test.go +++ b/logservice/logpuller/subscription_client_test.go @@ -382,18 +382,22 @@ func (s *mockDynamicStream) GetMetrics() dynstream.Metrics[int, SubscriptionID] } func TestPushRegionEventToDSUnblocksOnClose(t *testing.T) { + quota := newPullerMemoryQuota(1) + reservation, err := quota.acquire(context.Background(), 2, 1, nil) + require.NoError(t, err) + defer reservation.release() + span := &subscribedSpan{stoppedCh: make(chan struct{})} + state := ®ionFeedState{region: regionInfo{subscribedSpan: span}} client := &subscriptionClient{ ds: &mockDynamicStream{}, + memoryQuota: quota, regionTaskQueue: NewPriorityQueue(), } client.ctx, client.cancel = context.WithCancel(context.Background()) - client.cond = sync.NewCond(&client.mu) - - client.paused.Store(true) done := make(chan struct{}) go func() { - client.pushRegionEventToDS(SubscriptionID(1), regionEvent{}) + client.pushRegionEventToDS(SubscriptionID(1), regionEvent{states: []*regionFeedState{state}}) close(done) }() diff --git a/pkg/metrics/log_puller.go b/pkg/metrics/log_puller.go index 01d15c0f21..9684c648af 100644 --- a/pkg/metrics/log_puller.go +++ b/pkg/metrics/log_puller.go @@ -78,6 +78,13 @@ var ( Name: "region_scan_gate_transition_count", Help: "The number of puller region scan gate state transitions.", }, []string{"type"}) + PullerMemoryQuota = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "log_puller", + Name: "memory_quota_bytes", + Help: "Puller memory quota and current usage in bytes.", + }, []string{"type"}) SubscriptionClientResolvedTsLagGauge = prometheus.NewGauge( prometheus.GaugeOpts{ @@ -180,6 +187,7 @@ func initLogPullerMetrics(registry *prometheus.Registry) { registry.MustRegister(LogPullerResolvedTsLag) registry.MustRegister(PullerRegionScanGate) registry.MustRegister(PullerRegionScanGateTransition) + registry.MustRegister(PullerMemoryQuota) registry.MustRegister(SubscriptionClientRequestedRegionCount) registry.MustRegister(SubscriptionClientAddRegionRequestDuration) registry.MustRegister(RegionRequestFinishScanDuration) diff --git a/utils/dynstream/interfaces.go b/utils/dynstream/interfaces.go index efa79b1457..eb891fa13d 100644 --- a/utils/dynstream/interfaces.go +++ b/utils/dynstream/interfaces.go @@ -256,10 +256,6 @@ type AreaSettings struct { // Remove it when we determine the v2 is working well. // The algorithm of the memory control. algorithm int - // memoryControlAlgorithm overrides algorithm when it is set. It allows a - // component to own state derived from memory usage without moving that state - // into dynamic stream. - memoryControlAlgorithm MemoryControlAlgorithm // control how to batch events batchConfig batchConfig @@ -290,16 +286,6 @@ func NewAreaSettingsWithMaxPendingSize( } } -// NewAreaSettingsWithMemoryControlAlgorithm creates area settings with a -// component-owned memory control algorithm. -func NewAreaSettingsWithMemoryControlAlgorithm( - quota uint64, algorithm MemoryControlAlgorithm, component string, -) AreaSettings { - settings := NewAreaSettingsWithMaxPendingSize(quota, MemoryControlForPuller, component) - settings.memoryControlAlgorithm = algorithm - return settings -} - func NewAreaSettingsWithMaxPendingSizeAndBatchConfig( quota uint64, algorithm int, component string, batchCount int, batchBytes int, ) AreaSettings { diff --git a/utils/dynstream/memory_control.go b/utils/dynstream/memory_control.go index 14b5e3792f..93af18b8ed 100644 --- a/utils/dynstream/memory_control.go +++ b/utils/dynstream/memory_control.go @@ -70,17 +70,13 @@ func newAreaMemStat[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]]( feedbackChan chan<- Feedback[A, P, D], ) *areaMemStat[A, P, T, D, H] { settings.fix() - algorithm := settings.memoryControlAlgorithm - if algorithm == nil { - algorithm = NewMemoryControlAlgorithm(settings.algorithm) - } res := &areaMemStat[A, P, T, D, H]{ area: area, memControl: memoryControl, feedbackChan: feedbackChan, lastSendFeedbackTime: atomic.Value{}, lastSizeDecreaseTime: atomic.Value{}, - algorithm: algorithm, + algorithm: NewMemoryControlAlgorithm(settings.algorithm), } res.lastAppendEventTime.Store(time.Now()) From 934e7b9321eb73afc5c690ab05dabbe9afba7cb6 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 22 Jun 2026 14:56:35 -0700 Subject: [PATCH 3/3] wip --- docs/design/2026-06-18-puller-memory-quota.md | 313 +++++++++-- logservice/logpuller/memory_quota.go | 5 +- logservice/logpuller/priority_task.go | 5 +- .../logpuller/region_event_handler_test.go | 4 +- logservice/logpuller/region_req_cache.go | 492 ++++++++---------- logservice/logpuller/region_req_cache_test.go | 409 +++++---------- logservice/logpuller/region_request_worker.go | 157 +++--- .../logpuller/region_request_worker_test.go | 199 ++++--- logservice/logpuller/region_state.go | 6 +- logservice/logpuller/subscription_client.go | 44 +- .../logpuller/subscription_client_test.go | 15 +- pkg/config/debug.go | 7 +- 12 files changed, 898 insertions(+), 758 deletions(-) diff --git a/docs/design/2026-06-18-puller-memory-quota.md b/docs/design/2026-06-18-puller-memory-quota.md index 34f9f16560..1925bf5427 100644 --- a/docs/design/2026-06-18-puller-memory-quota.md +++ b/docs/design/2026-06-18-puller-memory-quota.md @@ -61,7 +61,7 @@ The following thresholds are implementation constants and are not configurable: ```go const ( regionScanPauseRatio = 0.5 - regionScanResumeRatio = 0.4 + regionScanResumeRatio = 0.2 ) ``` @@ -75,7 +75,7 @@ by that client share the quota. The quota controls two independent actions: 1. Region scan admission: stop new Region register requests at 50% usage and - resume them below 40% usage. + resume them below 20% usage. 2. Event admission: block new Region events when admitting them would exceed the hard quota, and admit them as soon as sufficient space becomes available. @@ -83,7 +83,7 @@ The quota controls two independent actions: usage >= 50% NORMAL ---------------------------------> SCAN_THROTTLED ^ | - | usage < 40% | next event does not fit + | usage < 20% | next event does not fit | v +------------------------------------------- FULL | @@ -96,26 +96,26 @@ The quota controls two independent actions: as sufficient quota is released. This removes the current behavior where input remains paused until usage falls from 80% to 50%. -The 50%/40% hysteresis applies only to Region scan admission. It prevents Region +The 50%/20% hysteresis applies only to Region scan admission. It prevents Region requests from repeatedly pausing and resuming when usage fluctuates around 50%. +The slow-consumer integration test must verify that normal steady-state usage can +fall below 20%; otherwise the resume threshold would starve new subscriptions +and Region retries and must be adjusted before rollout. ## Puller Memory Quota The quota controller belongs to `logservice/logpuller`, rather than a shared package, because its Region scan gate and lifecycle semantics are puller-specific. -It is independent from dynamic stream memory control. The puller does not enable -dynamic stream memory control, consume its feedback, or implement its memory -control algorithm interface. ### Event admission After receiving a Region event and calculating its size, the subscription client -acquires a puller-owned reservation before calling `DynamicStream.Push`. +acquires quota before admitting the event into the processing pipeline. - The event is admitted immediately when it fits in the remaining quota. -- Otherwise its receive goroutine waits for a reservation release, subscription +- Otherwise its receive goroutine waits for memory release, subscription stop, or context cancellation. -- Concurrent waiters are serialized by the quota and cannot over-admit memory. +- Concurrent waiters use atomic compare-and-swap and cannot over-admit memory. - A single event larger than the configured quota is admitted only when it can run alone, avoiding permanent deadlock. @@ -124,19 +124,112 @@ receive goroutine may therefore hold one unadmitted event while waiting. These events are outside the logical quota, but no additional events are received by the blocked goroutines. +### Fast path + +Memory quota is on the event receive critical path. The normal path must not use +a global mutex or allocate an object per event. + +- Global admitted memory is maintained by an atomic counter. +- Acquire reserves memory with a compare-and-swap loop. +- Subscription accounting state is created at subscription setup and referenced + directly by events; Acquire does not look up a global map. +- The event stores its accounted byte count and subscription owner directly. +- A handler batch sums the accounted bytes and performs one Release operation. +- Waiter locks and notification channels are used only when quota is exhausted. +- Scan gate locks are used only when usage crosses a threshold. + ### Event release -Every admitted event carries a puller reservation. The reservation is released: +Accounted memory is released: -- At the start of normal dynamic stream handling, when the event leaves the - buffered pipeline. -- From `OnDrop` when dynamic stream rejects the event. +- When the consumer starts handling the event and it leaves the buffered + pipeline. +- When the pipeline rejects the event. - In aggregate when its subscription is removed. - In aggregate when the subscription client closes. -Reservations refer to an internal subscription accounting object. Aggregate -subscription release removes that object, so a later release from an in-flight -stale event becomes a no-op instead of decrementing unrelated memory. +Each subscription accounting object has an active lifecycle protected by a local +lock. Aggregate subscription release marks it inactive and releases its total +usage. A later release from an in-flight stale event becomes a no-op instead of +decrementing unrelated memory. + +## Region Request Limiting + +Region request queueing, incremental scan concurrency, and sent-request tracking +are separate responsibilities. They must not share one counter or repair each +other's state periodically. + +### Store-level scan limiter + +Each TiKV store owns one `regionScanLimiter`. A scan slot represents exactly one +Register request that has been sent but has not completed its incremental scan. +The limit is enforced across all request workers connected to the store instead +of being divided approximately between workers. + +A slot is acquired immediately before sending Register and released on exactly +one of these terminal paths: + +- The Region reports Initialized. +- TiKV returns a Region error. +- Sending Register fails. +- The subscription is deregistered. +- The gRPC stream reconnects and its in-flight requests are rescheduled. + +Every acquired slot is represented by an idempotent token. Releasing the token +more than once is a no-op. Counter underflow is an invariant violation and must +not be repaired at runtime. + +### Request queues + +Pending requests are split by semantics: + +- A store-level `registerQueue` contains initial subscriptions and Region error + retries. All workers for the store consume this shared priority queue. It is + subject to both the memory scan gate and the store-level scan limiter. +- Each worker has a `controlQueue` containing Deregister requests for its gRPC + stream. Deregister is broadcast to the relevant workers, bypasses the memory + scan gate and scan limiter, and is always selected first. + +Queue capacity protects local TiCDC memory only. It is not used as the number of +active incremental scans. Retry and initial-subscription priorities remain +explicit request attributes; priority never implies bypassing the limiter. + +### In-flight tracker + +Each request worker tracks only Register requests sent on its own gRPC stream. +The tracker owns the corresponding scan slot until a terminal event occurs. It +does not contain pending queues and does not maintain a second flow-control +counter. + +The following state is removed: + +- `pendingCount` and `spaceAvailable`. +- The `force` bypass derived from dynamic request priority. +- Periodic reconciliation between pending count and actual requests. +- The `region worker pending request count is not equal to actual region request + count, correct it` log and its correction path. + +```text +Region discovery + | + v +store register queue + | + v +memory scan gate + | + v +store scan limiter + | + v +worker Send(Register) ---> in-flight tracker + | + +-- Initialized -> release slot + +-- Error -> release and reschedule + +-- Reconnect -> release and reschedule + +Deregister -> control queue -> worker Send(Deregister) +``` ## Region Scan Gate @@ -155,48 +248,158 @@ The gate never blocks: - Local error handling. - Resolve-lock processing. -### Authoritative gate +The gate contains an atomic paused flag and a notification mechanism used only +by waiters. Checking an open gate is one atomic load. + +### Sender gate + +The effective check is performed when a worker selects a Register request for +sending. Request selection has these rules: -The authoritative check is immediately before sending a Region register request -on the TiKV gRPC stream. This is the only point that can guarantee no additional -incremental scan is started after the gate closes. +- When the gate is open, the worker may select control or Register requests. +- When the gate is closed, the worker may select only control requests, gate + changes, or context cancellation. +- A Register selected immediately before the gate closes is allowed to finish + sending. Holding a lock across gRPC Send is explicitly forbidden. -If the gate closes after a request has been dequeued, the worker retains that -request and waits. It must still process deregister requests while waiting. The -request cache or send loop therefore needs a separate control path for deregister -requests so that cleanup cannot be blocked behind a waiting register request. +Queue selection and gate waiting are implemented in one helper. Workers do not +inspect memory quota internals or maintain a nested loop for a retained Register +request. -### Non-authoritative producer gate +### Producer gate An additional gate is checked in `divideSpanAndScheduleRegionRequests`: - Before each `BatchLoadRegionsWithKeyRange` call. -- While scheduling the Regions returned by a batch. This gate is an optimization. It avoids unnecessary PD requests, Region range -splitting, and growth of local task queues while scans are paused. It is called -non-authoritative because memory usage can cross 50% after this check, and -requests may already exist in the Region task queue or worker request cache. +splitting, and growth of local task queues while scans are paused. It is not +checked for every Region returned by a batch. At most one loaded batch, currently +1024 Regions, may be queued after the gate closes; the sender gate prevents those +requests from reaching TiKV. Waiting at this gate must exit when the context is cancelled or the subscribed -span is stopped. The send-time gate remains mandatory even when the producer -gate is present. +span is stopped. ## Concurrency and Ordering Quota state transitions and Region scan gate transitions must be atomic with -respect to memory reservation: +respect to memory accounting: -- An `Acquire` that moves usage to or above 50% closes the scan gate before it - returns. -- A `Release` that moves usage below 40% opens the scan gate and wakes waiting +- An Acquire that moves usage to or above 50% closes the scan gate. +- A Release that moves usage below 20% opens the scan gate and wakes waiting Region workers. - A `Release` wakes event admission waiters whenever their request may now fit. - State-change logs are emitted only on threshold crossings. -The existing Region request priority continues to apply after the scan gate -opens. Deregister requests have a separate bypass and do not depend on that -priority ordering. +Thresholds are precomputed in bytes. Acquire and Release compare integer byte +counts rather than performing floating-point division on the critical path. + +## Implementation Plan + +The refactoring is split into independently testable steps. Region request +limiting is completed before the new quota fast path so memory throttling is not +built on the current request-cache state machine. + +### Step 0: Establish performance and behavior baselines + +Add benchmarks and assertions around the current critical paths before changing +their implementation: + +- Parallel event admission and release with 1, 8, 32, and 128 goroutines. +- Single-subscription and many-subscription workloads. +- Region request enqueue, Send, Initialized, error, and reconnect lifecycles. +- Mutex and allocation profiles for the puller receive path. +- The maximum number of active incremental scans per store. + +The baseline records throughput, nanoseconds per operation, allocations per +operation, mutex wait time, and the number of scan requests sent during memory +pressure. + +### Step 1: Replace Region request flow control + +Introduce the store-level limiter, semantic request queues, and worker in-flight +tracker without changing memory quota behavior. + +1. Add `regionScanLimiter` to `requestedStore` so all workers for a store share + one exact limit. +2. Add an idempotent scan slot token. Attach it only after a Register request is + selected for sending. +3. Split pending Register and Deregister requests into `registerQueue` and + `controlQueue`. +4. Move sent-request state into `regionRequestTracker` owned by each worker. +5. Release scan slots from Initialized, Region error, Send failure, Deregister, + reconnect, and shutdown paths. +6. Remove `pendingCount`, `spaceAvailable`, `force`, stale count correction, and + the count-correction log. + +During migration, old and new counters must not run in parallel. The new limiter +becomes the only source of active-scan count in the same change that removes +`pendingCount`. + +Step 1 is complete when tests prove that every acquired slot reaches exactly one +terminal release and the configured per-store limit is never exceeded. + +### Step 2: Introduce the standalone Region scan gate + +Add `regionScanGate` independently of memory accounting, initially leaving it +open in production tests. + +1. Implement atomic open/paused state and transition notifications. +2. Change worker request selection so a closed gate disables Register selection + but leaves Deregister selection enabled. +3. Move gate-aware selection into one queue helper and remove worker-level nested + wait loops. +4. Check the producer gate once before each PD Region batch load and remove the + per-Region gate check. +5. Add deterministic tests for pause during queueing, pause immediately before + Send, Deregister bypass, cancellation, and resume. + +Step 2 is complete when the open-gate fast path is one atomic load and no worker +depends on quota-internal channels or locks. + +### Step 3: Implement the zero-allocation memory quota fast path + +Replace the mutex-based quota and per-event reservation object. + +1. Create subscription memory state during Subscribe and store its pointer on + `subscribedSpan`. +2. Store accounted bytes and the subscription owner directly in `regionEvent`. +3. Implement global quota reservation with atomic compare-and-swap. +4. Protect only subscription lifecycle changes with a subscription-local lock. +5. Aggregate Release bytes for every handler batch. +6. Allocate waiter notification state only after quota exhaustion; Release does + not touch the waiter lock when there are no waiters. +7. Precompute pause and resume thresholds in bytes and drive `regionScanGate` + only when usage crosses them. +8. Allow an oversized event only when it can run alone. + +Acquire that races with unsubscribe must either complete under the active +subscription lifecycle or return cancellation. Aggregate unsubscribe release +must not race past an in-progress Acquire. + +Step 3 is complete when quota-available Acquire and batched Release have zero +allocations, do not acquire a global mutex, and preserve exact admitted-byte +accounting under the race detector. + +### Step 4: Integrate, observe, and remove transitional code + +Connect memory usage transitions to the standalone scan gate and remove all +remaining transitional paths: + +1. Pause Region Register selection at 50% admitted memory. +2. Resume selection below 20% admitted memory. +3. Keep event admission open until the next event cannot fit. +4. Remove obsolete request-cache metrics and replace them with queue, active + scan, gate, waiter, and quota metrics. +5. Remove dead methods, compatibility branches, and comments describing deleted + algorithms. +6. Update the design document with any threshold changes justified by benchmark + or stress-test results. + +Step 4 is complete after focused unit tests, race tests, the main binary build, +and a slow-consumer integration test all pass. A mutex profile must show no quota +global lock on the normal receive path. ## Observability @@ -205,9 +408,13 @@ Add metrics for: - Configured puller quota in bytes. - Admitted memory in bytes. - Available quota in bytes. +- Number and duration of quota waiters. - Whether the Region scan gate is open or closed. - Number of Region register requests waiting on the gate. - Scan gate pause and resume counts. +- Register and control queue lengths. +- Active incremental scan slots per store and their configured limit. +- Scan slot lifetime from Register Send to Initialized or another terminal path. Log quota and scan gate state only on transitions. Transition logs should include the configured quota, current usage, threshold, and number of waiting requests. @@ -219,9 +426,9 @@ the configured quota, current usage, threshold, and number of waiting requests. - Closing the subscription client wakes blocked event pushes and Region gate waiters through context cancellation. - A Region worker reconnect does not bypass the scan gate for register requests. -- Quota accounting errors must not be corrected silently. Underflow, double - release, or leaked reservations should be surfaced during tests and with an - operational error log. +- Quota and scan-slot accounting errors must not be corrected silently. + Underflow, double release, or leaked ownership should be surfaced during tests + and with an operational error log. ## Compatibility @@ -243,14 +450,17 @@ No TiKV protocol change is required. - An event is admitted whenever it fits in the remaining quota. - A non-fitting event waits and resumes as soon as sufficient quota is released. - An oversized event runs alone without deadlocking the puller. -- Subscription removal releases all of its reservations. +- Subscription removal releases all of its accounted memory. - Releases from stale subscriptions do not corrupt accounting. +- Available-quota Acquire and batch Release perform zero allocations. +- Concurrent Acquire and unsubscribe preserve exact global and subscription + accounting. ### Region gate unit tests - The gate closes when usage reaches 50%. -- The gate remains closed between 40% and 50%. -- The gate opens when usage falls below 40%. +- The gate remains closed between 20% and 50%. +- The gate opens when usage falls below 20%. - Register requests are not sent while the gate is closed. - Both initial and retry register requests are gated. - Deregister requests are sent while register requests are waiting. @@ -266,4 +476,15 @@ Use a deliberately slow event consumer and verify that: threshold is crossed. - Existing Region streams continue making progress until hard backpressure is required. -- Region request scheduling resumes after usage falls below 40%. +- Region request scheduling resumes after usage falls below 20%. +- Active incremental scans for a store never exceed its configured limit. + +### Performance tests + +- Benchmark quota Acquire and Release with 1, 8, 32, and 128 goroutines. +- Benchmark Region request selection with the gate open and closed. +- Compare event receive throughput and allocations against the Step 0 baseline. +- Capture mutex and allocation profiles under sustained high Region traffic. + +The quota fast path must have zero allocations per operation and must not expose +a globally contended mutex in the profile. diff --git a/logservice/logpuller/memory_quota.go b/logservice/logpuller/memory_quota.go index 653e494435..96e865969e 100644 --- a/logservice/logpuller/memory_quota.go +++ b/logservice/logpuller/memory_quota.go @@ -28,9 +28,8 @@ const ( regionScanResumeRatio = 0.4 ) -// pullerMemoryQuota is the admission controller for events entering the puller -// dynamic stream. It is intentionally independent from dynamic stream memory -// control because it also controls region scan admission. +// pullerMemoryQuota bounds memory held by admitted events and controls region +// scan admission based on current usage. type pullerMemoryQuota struct { mu sync.Mutex diff --git a/logservice/logpuller/priority_task.go b/logservice/logpuller/priority_task.go index 68ef9e885d..cffaa42cd2 100644 --- a/logservice/logpuller/priority_task.go +++ b/logservice/logpuller/priority_task.go @@ -33,9 +33,8 @@ const ( ) const ( - highPriorityBase = 0 - lowPriorityBase = 60 * 60 * 24 // 1 day - forcedPriorityBase = 60 * 60 // 60 minutes + highPriorityBase = 0 + lowPriorityBase = 60 * 60 * 24 // 1 day ) func (t TaskType) String() string { diff --git a/logservice/logpuller/region_event_handler_test.go b/logservice/logpuller/region_event_handler_test.go index e94d176ed0..4635a638d5 100644 --- a/logservice/logpuller/region_event_handler_test.go +++ b/logservice/logpuller/region_event_handler_test.go @@ -78,7 +78,7 @@ func TestHandleEventEntryEventOutOfOrder(t *testing.T) { ds.AddPath(subID, subSpan, dynstream.AreaSettings{}) worker := ®ionRequestWorker{ - requestCache: &requestCache{}, + requestTracker: newRegionRequestTracker(), } region := newRegionInfo( tikv.RegionVerID{}, @@ -217,7 +217,7 @@ func TestHandleResolvedTs(t *testing.T) { subID1 := SubscriptionID(1) worker := ®ionRequestWorker{ - requestCache: &requestCache{}, + requestTracker: newRegionRequestTracker(), } state1 := newRegionFeedState(regionInfo{verID: tikv.NewRegionVerID(1, 1, 1)}, uint64(subID1), worker) state1.start() diff --git a/logservice/logpuller/region_req_cache.go b/logservice/logpuller/region_req_cache.go index 1b44625b05..cf4986ec1f 100644 --- a/logservice/logpuller/region_req_cache.go +++ b/logservice/logpuller/region_req_cache.go @@ -16,367 +16,315 @@ package logpuller import ( "context" "sync" + "sync/atomic" "time" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/metrics" - "go.uber.org/atomic" + "github.com/pingcap/ticdc/utils/heap" "go.uber.org/zap" ) const ( - checkStaleRequestInterval = time.Second * 10 - requestGCLifeTime = time.Minute * 180 - addReqRetryInterval = time.Millisecond * 1 + addReqRetryInterval = time.Millisecond addReqRetryLimit = 3 abnormalRequestDurationInSec = 60 * 60 * 2 // 2 hours ) -// regionReq represents a wrapped region request with state +// regionReq is a pending or in-flight Region register request. type regionReq struct { regionInfo regionInfo createTime time.Time + priority int + heapIndex int } -func newRegionReq(region regionInfo) regionReq { - return regionReq{ +func newRegionReq(region regionInfo, priority int) *regionReq { + return ®ionReq{ regionInfo: region, createTime: time.Now(), + priority: priority, } } -func (r *regionReq) isStale() bool { - return time.Since(r.createTime) > requestGCLifeTime +func (r *regionReq) SetHeapIndex(index int) { + r.heapIndex = index } -// requestCache manages region requests with flow control -type requestCache struct { - // pending requests waiting to be sent - pendingQueue chan regionReq - // controlQueue contains deregister requests. It is separate so cleanup can - // proceed while normal register requests are gated by puller memory usage. - controlQueue chan regionReq - - // sent requests waiting for initialization (subscriptionID -> regions -> regionReq) - sentRequests struct { - sync.RWMutex - regionReqs map[SubscriptionID]map[uint64]regionReq - } - - // pendingCount is a flow control slot counter. - // A slot is acquired when a request is successfully enqueued into pendingQueue (see add), - // and is released when the request is finished/removed (resolve/markStopped/markDone/clear). - // pop and markSent don't change it. If markSent overwrites an existing request for the same region, - // it will release a slot for the replaced request to avoid leaking pendingCount. - pendingCount atomic.Int64 - // maximum number of pending requests allowed - maxPendingCount int64 +func (r *regionReq) GetHeapIndex() int { + return r.heapIndex +} - // channel to signal when space becomes available - spaceAvailable chan struct{} +func (r *regionReq) LessThan(other *regionReq) bool { + if r.priority == other.priority { + return r.createTime.Before(other.createTime) + } + return r.priority < other.priority +} - lastCheckStaleRequestTime atomic.Time +// regionRegisterQueue is the bounded, store-level queue shared by all workers. +// Queue capacity limits local pending work only; active scans are controlled by +// regionScanLimiter. +type regionRegisterQueue struct { + mu sync.Mutex + requests *heap.Heap[*regionReq] + capacity int + changed chan struct{} } -func newRequestCache(maxPendingCount int) *requestCache { - res := &requestCache{ - pendingQueue: make(chan regionReq, maxPendingCount), // Large buffer to reduce blocking - controlQueue: make(chan regionReq, maxPendingCount), - sentRequests: struct { - sync.RWMutex - regionReqs map[SubscriptionID]map[uint64]regionReq - }{regionReqs: make(map[SubscriptionID]map[uint64]regionReq)}, - pendingCount: atomic.Int64{}, - maxPendingCount: int64(maxPendingCount), - spaceAvailable: make(chan struct{}, 16), // Buffered to avoid blocking +func newRegionRegisterQueue(capacity int) *regionRegisterQueue { + if capacity <= 0 { + capacity = 1 + } + return ®ionRegisterQueue{ + requests: heap.NewHeap[*regionReq](), + capacity: capacity, + changed: make(chan struct{}), } - - res.lastCheckStaleRequestTime.Store(time.Now()) - return res } -// add adds a new region request to the cache -// It blocks if pendingCount >= maxPendingCount until there's space or ctx is cancelled -func (c *requestCache) add(ctx context.Context, region regionInfo, force bool) (bool, error) { +func (q *regionRegisterQueue) add( + ctx context.Context, region regionInfo, priority int, +) (bool, error) { start := time.Now() ticker := time.NewTicker(addReqRetryInterval) defer ticker.Stop() - addReqRetryLimit := addReqRetryLimit + retries := addReqRetryLimit for { - current := c.pendingCount.Load() - if current < c.maxPendingCount || force { - // Try to add the request - req := newRegionReq(region) - queue := c.pendingQueue - if region.isStopped() { - queue = c.controlQueue - } - select { - case <-ctx.Done(): - return false, ctx.Err() - case queue <- req: - c.pendingCount.Inc() - cost := time.Since(start) - metrics.SubscriptionClientAddRegionRequestDuration.Observe(cost.Seconds()) - return true, nil - case <-ticker.C: - addReqRetryLimit-- - if addReqRetryLimit <= 0 { - return false, nil - } - continue - } + q.mu.Lock() + if q.requests.Len() < q.capacity { + q.requests.AddOrUpdate(newRegionReq(region, priority)) + q.notifyChangedLocked() + q.mu.Unlock() + metrics.SubscriptionClientAddRegionRequestDuration.Observe(time.Since(start).Seconds()) + return true, nil } + changed := q.changed + q.mu.Unlock() - // Wait for space to become available select { + case <-ctx.Done(): + return false, ctx.Err() + case <-changed: case <-ticker.C: - addReqRetryLimit-- - if addReqRetryLimit <= 0 { + retries-- + if retries <= 0 { return false, nil } - continue - case <-c.spaceAvailable: - continue - case <-ctx.Done(): - return false, ctx.Err() } } } -// pop gets the next pending request. -// Note: it doesn't change pendingCount. The slot acquired in add() should be released later -// (e.g. resolve/markStopped/markDone). -func (c *requestCache) pop(ctx context.Context) (regionReq, error) { - select { - case req := <-c.controlQueue: - return req, nil - default: - } - select { - case req := <-c.controlQueue: - return req, nil - case req := <-c.pendingQueue: - return req, nil - case <-ctx.Done(): - return regionReq{}, ctx.Err() +func (q *regionRegisterQueue) tryPopOrNotify() (*regionReq, bool, <-chan struct{}) { + q.mu.Lock() + defer q.mu.Unlock() + req, ok := q.requests.PopTop() + if ok { + q.notifyChangedLocked() + return req, true, nil } + return nil, false, q.changed } -// popControlOrWait returns a deregister request while normal register requests -// are gated, or returns false after the gate is resumed. -func (c *requestCache) popControlOrWait( - ctx context.Context, resume <-chan struct{}, -) (regionReq, bool, error) { - select { - case req := <-c.controlQueue: - return req, true, nil - case <-resume: - return regionReq{}, false, nil - case <-ctx.Done(): - return regionReq{}, false, ctx.Err() +func (q *regionRegisterQueue) len() int { + q.mu.Lock() + defer q.mu.Unlock() + return q.requests.Len() +} + +func (q *regionRegisterQueue) clear() []regionInfo { + q.mu.Lock() + defer q.mu.Unlock() + regions := make([]regionInfo, 0, q.requests.Len()) + for { + req, ok := q.requests.PopTop() + if !ok { + break + } + regions = append(regions, req.regionInfo) } + q.notifyChangedLocked() + return regions } -// markSent marks a request as sent and adds it to sent requests. -// It doesn't change pendingCount: the slot is released when the request is finished/removed. -func (c *requestCache) markSent(req regionReq) { - c.sentRequests.Lock() - defer c.sentRequests.Unlock() +func (q *regionRegisterQueue) notifyChangedLocked() { + close(q.changed) + q.changed = make(chan struct{}) +} - m, ok := c.sentRequests.regionReqs[req.regionInfo.subscribedSpan.subID] +// regionScanLimiter controls the exact number of active incremental scans for +// one store. +type regionScanLimiter struct { + mu sync.Mutex + limit int + active int + available chan struct{} +} - if !ok { - m = make(map[uint64]regionReq) - c.sentRequests.regionReqs[req.regionInfo.subscribedSpan.subID] = m +func newRegionScanLimiter(limit int) *regionScanLimiter { + if limit <= 0 { + limit = 1 + } + return ®ionScanLimiter{ + limit: limit, + available: make(chan struct{}), } +} - if oldReq, exists := m[req.regionInfo.verID.GetID()]; exists { - log.Warn("region request overwritten", - zap.Uint64("subID", uint64(req.regionInfo.subscribedSpan.subID)), - zap.Uint64("regionID", req.regionInfo.verID.GetID()), - zap.Float64("oldAgeSec", time.Since(oldReq.createTime).Seconds()), - zap.Float64("newAgeSec", time.Since(req.createTime).Seconds()), - zap.Int("pendingCount", int(c.pendingCount.Load())), - zap.Int("pendingQueueLen", len(c.pendingQueue))) - c.markDone() +func (l *regionScanLimiter) tryAcquireOrNotify() (*regionScanSlot, <-chan struct{}) { + l.mu.Lock() + defer l.mu.Unlock() + if l.active >= l.limit { + return nil, l.available } - m[req.regionInfo.verID.GetID()] = req + l.active++ + return ®ionScanSlot{limiter: l}, nil } -// markStopped removes a sent request and releases a slot. -func (c *requestCache) markStopped(subID SubscriptionID, regionID uint64) { - c.sentRequests.Lock() - defer c.sentRequests.Unlock() +func (l *regionScanLimiter) usage() (active, limit int) { + l.mu.Lock() + defer l.mu.Unlock() + return l.active, l.limit +} - regionReqs, ok := c.sentRequests.regionReqs[subID] - if !ok { +func (l *regionScanLimiter) release() { + l.mu.Lock() + defer l.mu.Unlock() + if l.active <= 0 { + log.Error("region scan limiter underflow", zap.Int("limit", l.limit)) return } + l.active-- + close(l.available) + l.available = make(chan struct{}) +} + +// regionScanSlot is an idempotent ownership token for one active scan. +type regionScanSlot struct { + limiter *regionScanLimiter + released atomic.Bool +} - _, exists := regionReqs[regionID] - if !exists { +func (s *regionScanSlot) release() { + if s == nil || !s.released.CompareAndSwap(false, true) { return } + s.limiter.release() +} - delete(regionReqs, regionID) - if len(regionReqs) == 0 { - delete(c.sentRequests.regionReqs, subID) - } - c.markDone() +type trackedRegionRequest struct { + req *regionReq + slot *regionScanSlot } -// resolve marks a region as initialized and removes it from sent requests -func (c *requestCache) resolve(subscriptionID SubscriptionID, regionID uint64) bool { - c.sentRequests.Lock() - defer c.sentRequests.Unlock() - regionReqs, ok := c.sentRequests.regionReqs[subscriptionID] - if !ok { - return false +// regionRequestTracker owns the sent Register requests and scan slots for one +// worker's gRPC stream. +type regionRequestTracker struct { + mu sync.Mutex + requests map[SubscriptionID]map[uint64]trackedRegionRequest +} + +func newRegionRequestTracker() *regionRequestTracker { + return ®ionRequestTracker{ + requests: make(map[SubscriptionID]map[uint64]trackedRegionRequest), } +} - req, exists := regionReqs[regionID] - if !exists { +func (t *regionRequestTracker) track(req *regionReq, slot *regionScanSlot) bool { + t.mu.Lock() + defer t.mu.Unlock() + subID := req.regionInfo.subscribedSpan.subID + regionID := req.regionInfo.verID.GetID() + regions := t.requests[subID] + if regions == nil { + regions = make(map[uint64]trackedRegionRequest) + t.requests[subID] = regions + } + if _, exists := regions[regionID]; exists { return false } + regions[regionID] = trackedRegionRequest{req: req, slot: slot} + return true +} - // Check if the subscription ID matches - if req.regionInfo.subscribedSpan.subID == subscriptionID { - delete(regionReqs, regionID) - c.markDone() - cost := time.Since(req.createTime).Seconds() - if cost > 0 && cost < abnormalRequestDurationInSec { - log.Debug("cdc resolve region request", zap.Uint64("subID", uint64(subscriptionID)), zap.Uint64("regionID", regionID), zap.Float64("cost", cost), zap.Int("pendingCount", int(c.pendingCount.Load())), zap.Int("pendingQueueLen", len(c.pendingQueue))) - metrics.RegionRequestFinishScanDuration.Observe(cost) - } else { - log.Info("region request duration abnormal, skip metric", zap.Float64("cost", cost), zap.Uint64("regionID", regionID)) - } - return true +func (t *regionRequestTracker) stop(subID SubscriptionID, regionID uint64) bool { + tracked, ok := t.remove(subID, regionID) + if ok { + tracked.slot.release() } - - return false + return ok } -// clearStaleRequest clears stale requests from the cache -// Note: Sometimes, the CDC sends the same region request to TiKV multiple times. In such cases, this method is needed to reduce the pendingSize. -func (c *requestCache) clearStaleRequest() { - if time.Since(c.lastCheckStaleRequestTime.Load()) < checkStaleRequestInterval { - return - } - c.sentRequests.Lock() - defer c.sentRequests.Unlock() - reqCount := 0 - for subID, regionReqs := range c.sentRequests.regionReqs { - for regionID, regionReq := range regionReqs { - if regionReq.regionInfo.isStopped() || - regionReq.regionInfo.subscribedSpan.stopped.Load() || - regionReq.regionInfo.lockedRangeState.Initialized.Load() || - regionReq.isStale() { - c.markDone() - log.Warn("region worker delete stale region request", - zap.Uint64("subID", uint64(subID)), - zap.Uint64("regionID", regionID), - zap.Int("pendingCount", int(c.pendingCount.Load())), - zap.Int("pendingQueueLen", len(c.pendingQueue)), - zap.Bool("isRegionStopped", regionReq.regionInfo.isStopped()), - zap.Bool("isSubscribedSpanStopped", regionReq.regionInfo.subscribedSpan.stopped.Load()), - zap.Bool("isStale", regionReq.isStale()), - zap.Time("createTime", regionReq.createTime)) - delete(regionReqs, regionID) - } else { - reqCount++ - } - } - if len(regionReqs) == 0 { - delete(c.sentRequests.regionReqs, subID) - } +func (t *regionRequestTracker) resolve(subID SubscriptionID, regionID uint64) bool { + tracked, ok := t.remove(subID, regionID) + if !ok { + return false } - - // If there are no in-cache region requests but pendingCount isn't 0, it means pendingCount is stale. - // Reset it to avoid blocking add() forever. - if reqCount == 0 && len(c.pendingQueue) == 0 && len(c.controlQueue) == 0 && c.pendingCount.Load() != 0 { - log.Info("region worker pending request count is not equal to actual region request count, correct it", - zap.Int("pendingCount", int(c.pendingCount.Load())), - zap.Int("actualReqCount", reqCount), - zap.Int("pendingQueueLen", len(c.pendingQueue))) - c.pendingCount.Store(0) - // Notify waiting add operations that there's space available. - select { - case c.spaceAvailable <- struct{}{}: - default: - } + tracked.slot.release() + cost := time.Since(tracked.req.createTime).Seconds() + if cost > 0 && cost < abnormalRequestDurationInSec { + metrics.RegionRequestFinishScanDuration.Observe(cost) + } else { + log.Info("region request duration abnormal, skip metric", + zap.Float64("cost", cost), + zap.Uint64("regionID", regionID)) } - - c.lastCheckStaleRequestTime.Store(time.Now()) + return true } -// clear removes all requests and returns them -func (c *requestCache) clear() []regionInfo { - var regions []regionInfo - - // Drain pending requests from channel -LOOP: - for { - select { - case req := <-c.pendingQueue: - regions = append(regions, req.regionInfo) - c.markDone() - default: - break LOOP - } +func (t *regionRequestTracker) remove( + subID SubscriptionID, regionID uint64, +) (trackedRegionRequest, bool) { + t.mu.Lock() + defer t.mu.Unlock() + regions := t.requests[subID] + if regions == nil { + return trackedRegionRequest{}, false + } + tracked, ok := regions[regionID] + if !ok { + return trackedRegionRequest{}, false + } + delete(regions, regionID) + if len(regions) == 0 { + delete(t.requests, subID) } + return tracked, true +} -CONTROL_LOOP: - for { - select { - case req := <-c.controlQueue: - regions = append(regions, req.regionInfo) - c.markDone() - default: - break CONTROL_LOOP - } +func (t *regionRequestTracker) removeSubscription(subID SubscriptionID) { + t.mu.Lock() + regions := t.requests[subID] + delete(t.requests, subID) + t.mu.Unlock() + for _, tracked := range regions { + tracked.slot.release() } +} - c.sentRequests.Lock() - defer c.sentRequests.Unlock() +func (t *regionRequestTracker) clear() []regionInfo { + t.mu.Lock() + requests := t.requests + t.requests = make(map[SubscriptionID]map[uint64]trackedRegionRequest) + t.mu.Unlock() - for subID, regionReqs := range c.sentRequests.regionReqs { - for regionID := range regionReqs { - regions = append(regions, regionReqs[regionID].regionInfo) - delete(regionReqs, regionID) - c.markDone() + var regions []regionInfo + for _, trackedRegions := range requests { + for _, tracked := range trackedRegions { + regions = append(regions, tracked.req.regionInfo) + tracked.slot.release() } - delete(c.sentRequests.regionReqs, subID) } return regions } -// getPendingCount returns the current pending count -func (c *requestCache) getPendingCount() int { - return int(c.pendingCount.Load()) -} - -func (c *requestCache) markDone() { - // Decrement pendingCount by 1, but never let it go below 0. - // Do it with CAS to avoid clobbering concurrent Inc() calls. - for { - old := c.pendingCount.Load() - if old == 0 { - break - } else if old < 0 { - if c.pendingCount.CompareAndSwap(old, 0) { - break - } - } else { - if c.pendingCount.CompareAndSwap(old, old-1) { - break - } - } - } - // Notify waiting add operations that there's space available. - select { - case c.spaceAvailable <- struct{}{}: - default: // If channel is full, skip notification +func (t *regionRequestTracker) len() int { + t.mu.Lock() + defer t.mu.Unlock() + count := 0 + for _, regions := range t.requests { + count += len(regions) } + return count } diff --git a/logservice/logpuller/region_req_cache_test.go b/logservice/logpuller/region_req_cache_test.go index 62706a8542..badcdd7e02 100644 --- a/logservice/logpuller/region_req_cache_test.go +++ b/logservice/logpuller/region_req_cache_test.go @@ -15,8 +15,8 @@ package logpuller import ( "context" + "sync" "testing" - "time" "github.com/pingcap/ticdc/heartbeatpb" "github.com/stretchr/testify/require" @@ -25,312 +25,187 @@ import ( func createTestRegionInfo(subID SubscriptionID, regionID uint64) regionInfo { verID := tikv.NewRegionVerID(regionID, 1, 1) - span := heartbeatpb.TableSpan{ TableID: 1, StartKey: []byte("start"), EndKey: []byte("end"), } - - subscribedSpan := &subscribedSpan{ + return newRegionInfo(verID, span, nil, &subscribedSpan{ subID: subID, startTs: 100, span: span, - } - - return newRegionInfo(verID, span, nil, subscribedSpan, false) + }, false) } -func TestRequestCacheAdd_NormalCase(t *testing.T) { - cache := newRequestCache(10) +func TestRegionRegisterQueuePriorityAndCapacity(t *testing.T) { + queue := newRegionRegisterQueue(2) ctx := context.Background() - region := createTestRegionInfo(1, 1) - - ok, err := cache.add(ctx, region, false) + ok, err := queue.add(ctx, createTestRegionInfo(1, 1), 10) require.NoError(t, err) require.True(t, ok) - require.Equal(t, 1, cache.getPendingCount()) - - // Verify the request was added to the queue - req, err := cache.pop(ctx) + ok, err = queue.add(ctx, createTestRegionInfo(1, 2), 1) require.NoError(t, err) - require.NotNil(t, req) - require.Equal(t, region.verID.GetID(), req.regionInfo.verID.GetID()) - require.Equal(t, region.subscribedSpan.subID, req.regionInfo.subscribedSpan.subID) -} - -func TestRequestCacheAdd_ForceFlag(t *testing.T) { - cache := newRequestCache(1) - ctx := context.Background() - - // Fill up the cache - region1 := createTestRegionInfo(1, 1) - ok, err := cache.add(ctx, region1, false) require.True(t, ok) + ok, err = queue.add(ctx, createTestRegionInfo(1, 3), 0) require.NoError(t, err) - require.Equal(t, 1, cache.getPendingCount()) - - // Try to add another request without force - should fail due to retry limit - region2 := createTestRegionInfo(1, 2) - ok, err = cache.add(ctx, region2, false) - require.False(t, ok) - require.NoError(t, err) - - // With force=true, it should still fail because the channel is full - // The force flag only bypasses the pendingCount check, not the channel capacity - region3 := createTestRegionInfo(1, 3) - ok, err = cache.add(ctx, region3, true) require.False(t, ok) - require.NoError(t, err) - // consume the pending queue ann add with force - req, err := cache.pop(ctx) - require.NoError(t, err) - require.NotNil(t, req) - require.Equal(t, region1.verID.GetID(), req.regionInfo.verID.GetID()) - require.Equal(t, region1.subscribedSpan.subID, req.regionInfo.subscribedSpan.subID) - cache.markSent(req) - require.Equal(t, 1, cache.getPendingCount()) - - ok, err = cache.add(ctx, region3, true) + req, ok, _ := queue.tryPopOrNotify() require.True(t, ok) - require.NoError(t, err) - // It is 2 since region1 is unresolved - require.Equal(t, 2, cache.getPendingCount()) + require.Equal(t, uint64(2), req.regionInfo.verID.GetID()) - // resolve region1 - cache.resolve(region1.subscribedSpan.subID, region1.verID.GetID()) - require.Equal(t, 1, cache.getPendingCount()) -} - -func TestRequestCacheAdd_ContextCancellation(t *testing.T) { - cache := newRequestCache(1) - - // Fill up the cache - region1 := createTestRegionInfo(1, 1) - ctx1 := context.Background() - ok, err := cache.add(ctx1, region1, false) - require.True(t, ok) + ok, err = queue.add(ctx, createTestRegionInfo(1, 3), 0) require.NoError(t, err) - - // Try to add another request with a cancelled context - ctx2, cancel := context.WithCancel(context.Background()) - cancel() // Cancel immediately - - region2 := createTestRegionInfo(1, 2) - ok, err = cache.add(ctx2, region2, false) - require.False(t, ok) - require.Error(t, err) - require.Equal(t, context.Canceled, err) -} - -func TestRequestCacheAdd_RetryLimitExceeded(t *testing.T) { - cache := newRequestCache(1) - ctx := context.Background() - - // Fill up the cache - region1 := createTestRegionInfo(1, 1) - ok, err := cache.add(ctx, region1, false) require.True(t, ok) - require.NoError(t, err) - - // Try to add another request - should eventually hit retry limit - region2 := createTestRegionInfo(1, 2) - ok, err = cache.add(ctx, region2, false) - require.False(t, ok) - require.NoError(t, err) + req, ok, _ = queue.tryPopOrNotify() + require.True(t, ok) + require.Equal(t, uint64(3), req.regionInfo.verID.GetID()) + require.Equal(t, 1, queue.len()) } -func TestRequestCacheAdd_SpaceAvailableNotification(t *testing.T) { - cache := newRequestCache(2) - ctx := context.Background() - - // Fill up the cache - region1 := createTestRegionInfo(1, 1) - ok, err := cache.add(ctx, region1, false) - require.True(t, ok) +func TestRegionRegisterQueueContextCancellation(t *testing.T) { + queue := newRegionRegisterQueue(1) + ok, err := queue.add(context.Background(), createTestRegionInfo(1, 1), 0) require.NoError(t, err) - require.Equal(t, 1, cache.getPendingCount()) - - region2 := createTestRegionInfo(1, 2) - ok, err = cache.add(ctx, region2, false) require.True(t, ok) - require.NoError(t, err) - require.Equal(t, 2, cache.getPendingCount()) - - // Pop a request and mark it as sent, then resolve it to free up space - req, err := cache.pop(ctx) - require.NoError(t, err) - require.NotNil(t, req) - require.Equal(t, 2, cache.getPendingCount()) // pop doesn't change pendingCount - // Mark as sent - cache.markSent(req) - require.Equal(t, 2, cache.getPendingCount()) - - // Resolve the request to free up space - success := cache.resolve(req.regionInfo.subscribedSpan.subID, req.regionInfo.verID.GetID()) - require.True(t, success) - require.Equal(t, 1, cache.getPendingCount()) - // Now we should be able to add another request - region3 := createTestRegionInfo(1, 3) - ok, err = cache.add(ctx, region3, false) - require.True(t, ok) - require.NoError(t, err) - require.Equal(t, 2, cache.getPendingCount()) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + ok, err = queue.add(ctx, createTestRegionInfo(1, 2), 0) + require.ErrorIs(t, err, context.Canceled) + require.False(t, ok) } -func TestRequestCacheAdd_ConcurrentAdds(t *testing.T) { - cache := newRequestCache(10) - ctx := context.Background() - - const numGoroutines = 5 - done := make(chan error, numGoroutines) - - // Start multiple goroutines adding requests concurrently - for i := 0; i < numGoroutines; i++ { - go func(id int) { - region := createTestRegionInfo(SubscriptionID(id%3), uint64(id)) - ok, err := cache.add(ctx, region, false) - require.True(t, ok) - require.NoError(t, err) - done <- err - }(i) +func TestRegionScanLimiterExactLimitAndIdempotentRelease(t *testing.T) { + limiter := newRegionScanLimiter(2) + slot1, _ := limiter.tryAcquireOrNotify() + slot2, _ := limiter.tryAcquireOrNotify() + slot3, notify := limiter.tryAcquireOrNotify() + require.NotNil(t, slot1) + require.NotNil(t, slot2) + require.Nil(t, slot3) + require.NotNil(t, notify) + require.Equal(t, 2, activeScanCount(limiter)) + + slot1.release() + slot1.release() + require.Equal(t, 1, activeScanCount(limiter)) + select { + case <-notify: + default: + t.Fatal("slot release must notify waiters") } - // Wait for all goroutines to complete - for i := 0; i < numGoroutines; i++ { - select { - case err := <-done: - require.NoError(t, err) - case <-time.After(1 * time.Second): - t.Fatal("Timeout waiting for concurrent adds to complete") - } - } - - require.Equal(t, numGoroutines, cache.getPendingCount()) + slot3, _ = limiter.tryAcquireOrNotify() + require.NotNil(t, slot3) + require.Equal(t, 2, activeScanCount(limiter)) + slot2.release() + slot3.release() + require.Equal(t, 0, activeScanCount(limiter)) } -func TestRequestCacheAdd_StaleRequestCleanup(t *testing.T) { - cache := newRequestCache(10) - ctx := context.Background() - - // Add a request and mark it as sent - region := createTestRegionInfo(1, 1) - ok, err := cache.add(ctx, region, false) - require.True(t, ok) - require.NoError(t, err) - - req, err := cache.pop(ctx) - require.NoError(t, err) - require.NotNil(t, req) - - // Mark as sent - cache.markSent(req) - require.Equal(t, 1, cache.getPendingCount()) - - // Manually set the request as stale by modifying createTime - cache.sentRequests.Lock() - regionReqs := cache.sentRequests.regionReqs[req.regionInfo.subscribedSpan.subID] - regionReqs[req.regionInfo.verID.GetID()] = regionReq{ - regionInfo: req.regionInfo, - createTime: time.Now().Add(-requestGCLifeTime - time.Second), // Make it stale +func TestRegionScanLimiterNeverExceedsLimit(t *testing.T) { + const ( + limit = 4 + goroutines = 32 + ) + limiter := newRegionScanLimiter(limit) + var wg sync.WaitGroup + start := make(chan struct{}) + maxActive := 0 + var maxMu sync.Mutex + + for i := 0; i < goroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + <-start + for { + slot, notify := limiter.tryAcquireOrNotify() + if slot == nil { + <-notify + continue + } + active := activeScanCount(limiter) + maxMu.Lock() + if active > maxActive { + maxActive = active + } + maxMu.Unlock() + slot.release() + return + } + }() } - cache.sentRequests.Unlock() - - // Manually set lastCheckStaleRequestTime to bypass the time interval check - cache.lastCheckStaleRequestTime.Store(time.Now().Add(-checkStaleRequestInterval - time.Second)) - - // Manually trigger stale cleanup by calling clearStaleRequest - cache.clearStaleRequest() - - // The stale request should be cleaned up - require.Equal(t, 0, cache.getPendingCount()) + close(start) + wg.Wait() + require.LessOrEqual(t, maxActive, limit) + require.Equal(t, 0, activeScanCount(limiter)) } -func TestRequestCacheAdd_WithStoppedRegion(t *testing.T) { - cache := newRequestCache(10) - ctx := context.Background() - - // Create a region info with stopped state (lockedRangeState = nil) - region := createTestRegionInfo(1, 1) - region.lockedRangeState = nil // This makes it stopped - - ok, err := cache.add(ctx, region, false) - require.True(t, ok) - require.NoError(t, err) - require.Equal(t, 1, cache.getPendingCount()) - - req, err := cache.pop(ctx) - require.NoError(t, err) - require.NotNil(t, req) - - // Mark as sent - cache.markSent(req) - require.Equal(t, 1, cache.getPendingCount()) - - // Manually set lastCheckStaleRequestTime to bypass the time interval check - cache.lastCheckStaleRequestTime.Store(time.Now().Add(-checkStaleRequestInterval - time.Second)) - - // Manually trigger cleanup of stopped region - cache.clearStaleRequest() +func TestRegionRequestTrackerTerminalReleasePaths(t *testing.T) { + testCases := []struct { + name string + release func(*testing.T, *regionRequestTracker, SubscriptionID, uint64) + }{ + { + name: "initialized", + release: func(t *testing.T, tracker *regionRequestTracker, subID SubscriptionID, regionID uint64) { + require.True(t, tracker.resolve(subID, regionID)) + }, + }, + { + name: "region error or send failure", + release: func(t *testing.T, tracker *regionRequestTracker, subID SubscriptionID, regionID uint64) { + require.True(t, tracker.stop(subID, regionID)) + }, + }, + { + name: "deregister", + release: func(_ *testing.T, tracker *regionRequestTracker, subID SubscriptionID, _ uint64) { + tracker.removeSubscription(subID) + }, + }, + { + name: "reconnect or shutdown", + release: func(t *testing.T, tracker *regionRequestTracker, _ SubscriptionID, _ uint64) { + require.Len(t, tracker.clear(), 1) + }, + }, + } - // The stopped region should be cleaned up - require.Equal(t, 0, cache.getPendingCount()) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + limiter := newRegionScanLimiter(1) + slot, _ := limiter.tryAcquireOrNotify() + tracker := newRegionRequestTracker() + req := newRegionReq(createTestRegionInfo(1, 1), 0) + require.True(t, tracker.track(req, slot)) + require.Equal(t, 1, activeScanCount(limiter)) + + tc.release(t, tracker, 1, 1) + require.Equal(t, 0, activeScanCount(limiter)) + require.Equal(t, 0, tracker.len()) + require.False(t, tracker.stop(1, 1)) + }) + } } -func TestRequestCacheMarkSent_DuplicateReleaseSlot(t *testing.T) { - cache := newRequestCache(10) - ctx := context.Background() - - region := createTestRegionInfo(1, 1) - - ok, err := cache.add(ctx, region, false) - require.True(t, ok) - require.NoError(t, err) - - // Add a duplicate request for the same region. It should not leak pendingCount even if - // markSent overwrites the existing entry. - ok, err = cache.add(ctx, region, false) - require.True(t, ok) - require.NoError(t, err) - require.Equal(t, 2, cache.getPendingCount()) - - req1, err := cache.pop(ctx) - require.NoError(t, err) - cache.markSent(req1) - require.Equal(t, 2, cache.getPendingCount()) - - req2, err := cache.pop(ctx) - require.NoError(t, err) - cache.markSent(req2) - require.Equal(t, 1, cache.getPendingCount()) - - // Finish the remaining tracked request. - require.True(t, cache.resolve(region.subscribedSpan.subID, region.verID.GetID())) - require.Equal(t, 0, cache.getPendingCount()) +func TestRegionRequestTrackerRejectsDuplicate(t *testing.T) { + limiter := newRegionScanLimiter(2) + tracker := newRegionRequestTracker() + req := newRegionReq(createTestRegionInfo(1, 1), 0) + slot1, _ := limiter.tryAcquireOrNotify() + slot2, _ := limiter.tryAcquireOrNotify() + require.True(t, tracker.track(req, slot1)) + require.False(t, tracker.track(newRegionReq(req.regionInfo, 0), slot2)) + slot2.release() + tracker.clear() + require.Equal(t, 0, activeScanCount(limiter)) } -func TestRequestCacheMarkStopped_ReleasesSlot(t *testing.T) { - cache := newRequestCache(10) - ctx := context.Background() - - region := createTestRegionInfo(1, 1) - - ok, err := cache.add(ctx, region, false) - require.True(t, ok) - require.NoError(t, err) - require.Equal(t, 1, cache.getPendingCount()) - - req, err := cache.pop(ctx) - require.NoError(t, err) - - cache.markSent(req) - require.Equal(t, 1, cache.getPendingCount()) - require.Contains(t, cache.sentRequests.regionReqs, req.regionInfo.subscribedSpan.subID) - - cache.markStopped(req.regionInfo.subscribedSpan.subID, req.regionInfo.verID.GetID()) - require.Equal(t, 0, cache.getPendingCount()) - require.NotContains(t, cache.sentRequests.regionReqs, req.regionInfo.subscribedSpan.subID) +func activeScanCount(limiter *regionScanLimiter) int { + active, _ := limiter.usage() + return active } diff --git a/logservice/logpuller/region_request_worker.go b/logservice/logpuller/region_request_worker.go index ec399df7bf..f8b616dbaa 100644 --- a/logservice/logpuller/region_request_worker.go +++ b/logservice/logpuller/region_request_worker.go @@ -48,10 +48,12 @@ type regionRequestWorker struct { // we must always get a region to request before create a grpc stream. // only in this way we can avoid to try to connect to an offline store infinitely. - preFetchForConnecting *regionInfo + preFetchForConnecting *regionReq - // request cache with flow control - requestCache *requestCache + // controlQueue contains Deregister requests for this worker's gRPC stream. + controlQueue chan *regionReq + // requestTracker owns Register requests sent on this worker's gRPC stream. + requestTracker *regionRequestTracker // all regions maintained by this worker. requestedRegions struct { @@ -67,13 +69,14 @@ func newRegionRequestWorker( credential *security.Credential, g *errgroup.Group, store *requestedStore, - requestCacheSize int, + controlQueueSize int, ) *regionRequestWorker { worker := ®ionRequestWorker{ - workerID: workerIDGen.Add(1), - client: client, - store: store, - requestCache: newRequestCache(requestCacheSize), + workerID: workerIDGen.Add(1), + client: client, + store: store, + controlQueue: make(chan *regionReq, controlQueueSize), + requestTracker: newRegionRequestTracker(), } worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates) @@ -84,16 +87,14 @@ func newRegionRequestWorker( zap.String("addr", store.storeAddr)) } for { - req, err := worker.requestCache.pop(ctx) + req, err := worker.popRequest(ctx) if err != nil { return err } if req.regionInfo.isStopped() { - worker.requestCache.markDone() continue } - worker.preFetchForConnecting = new(regionInfo) - *worker.preFetchForConnecting = req.regionInfo + worker.preFetchForConnecting = req return nil } } @@ -372,7 +373,6 @@ func (s *regionRequestWorker) processRegionSendTask( }, FilterLoop: region.filterLoop, } - s.requestCache.markDone() if err := doSend(req); err != nil { return err } @@ -383,32 +383,17 @@ func (s *regionRequestWorker) processRegionSendTask( } s.client.pushRegionEventToDS(subID, regionEvent) } + s.requestTracker.removeSubscription(subID) return nil } // Handle pre-fetched region first - region := *s.preFetchForConnecting + regionReq := s.preFetchForConnecting s.preFetchForConnecting = nil - regionReq := newRegionReq(region) var err error for { region := regionReq.regionInfo subID := region.subscribedSpan.subID - for !region.isStopped() && !region.subscribedSpan.stopped.Load() && s.client.memoryQuota != nil { - resume, paused := s.client.memoryQuota.regionScanResumeNotify() - if !paused { - break - } - controlReq, ok, err := s.requestCache.popControlOrWait(ctx, resume) - if err != nil { - return err - } - if ok { - if err := handleDeregister(controlReq.regionInfo); err != nil { - return err - } - } - } log.Debug("region request worker gets a singleRegionInfo", zap.Uint64("workerID", s.workerID), zap.Uint64("subscriptionID", uint64(subID)), @@ -426,33 +411,62 @@ func (s *regionRequestWorker) processRegionSendTask( // the stopped subscribedTable, or the special singleRegionInfo for stopping // the table will be handled later. s.client.onRegionFail(newRegionErrorInfo(region, &storeStreamErr{})) - s.requestCache.markDone() } else { + var slot *regionScanSlot + for slot == nil { + var resume <-chan struct{} + var scanSlotAvailable <-chan struct{} + paused := false + if s.client.memoryQuota != nil { + resume, paused = s.client.memoryQuota.regionScanResumeNotify() + } + if !paused { + slot, scanSlotAvailable = s.store.scanLimiter.tryAcquireOrNotify() + if slot != nil { + break + } + } + + select { + case <-ctx.Done(): + return ctx.Err() + case controlReq := <-s.controlQueue: + if err := handleDeregister(controlReq.regionInfo); err != nil { + return err + } + case <-resume: + case <-scanSlotAvailable: + } + + if region.subscribedSpan.stopped.Load() { + s.client.onRegionFail(newRegionErrorInfo(region, &storeStreamErr{})) + break + } + } + if slot == nil { + regionReq, err = s.popRequest(ctx) + if err != nil { + return err + } + continue + } + if !s.requestTracker.track(regionReq, slot) { + slot.release() + regionReq, err = s.popRequest(ctx) + if err != nil { + return err + } + continue + } state := newRegionFeedState(region, uint64(subID), s) state.start() s.addRegionState(subID, region.verID.GetID(), state) - // Mark the request as sent before sending it. - // Otherwise there is a race with the receiver goroutine: - // 1. addRegionState makes the region visible to error handling. - // 2. doSend sends the request. - // 3. the receiver goroutine may receive a region error immediately. - // 4. markStopped runs before markSent, so requestCache.markStopped cannot - // find the request in sentRequests. - // 5. the sender goroutine then calls markSent and leaves a stale sent - // request behind, even though the region has already been - // unlocked/rescheduled. - // - // Tracking the request before Send keeps requestedRegions and - // sentRequests visible in the same order and avoids leaving stale - // requests in cleanup. - s.requestCache.markSent(regionReq) if err := doSend(s.createRegionRequest(region)); err != nil { state.markStopped(err) return err } } - // Try to get from cache - regionReq, err = s.requestCache.pop(ctx) + regionReq, err = s.popRequest(ctx) if err != nil { return err } @@ -524,10 +538,40 @@ func (s *regionRequestWorker) clearRegionStates() map[SubscriptionID]regionFeedS return subscriptions } -// add adds a region request to the worker's cache -// It blocks if the cache is full until there's space or ctx is cancelled -func (s *regionRequestWorker) add(ctx context.Context, region regionInfo, force bool) (bool, error) { - return s.requestCache.add(ctx, region, force) +func (s *regionRequestWorker) addControl( + ctx context.Context, region regionInfo, +) (bool, error) { + timer := time.NewTimer(addReqRetryInterval * addReqRetryLimit) + defer timer.Stop() + select { + case <-ctx.Done(): + return false, ctx.Err() + case s.controlQueue <- newRegionReq(region, 0): + return true, nil + case <-timer.C: + return false, nil + } +} + +func (s *regionRequestWorker) popRequest(ctx context.Context) (*regionReq, error) { + for { + select { + case req := <-s.controlQueue: + return req, nil + default: + } + req, ok, registerChanged := s.store.registerQueue.tryPopOrNotify() + if ok { + return req, nil + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case req := <-s.controlQueue: + return req, nil + case <-registerChanged: + } + } } func (s *regionRequestWorker) clearPendingRegions() []regionInfo { @@ -535,16 +579,11 @@ func (s *regionRequestWorker) clearPendingRegions() []regionInfo { // Clear pre-fetched region if s.preFetchForConnecting != nil { - region := *s.preFetchForConnecting + region := s.preFetchForConnecting.regionInfo s.preFetchForConnecting = nil regions = append(regions, region) - // The pre-fetched region was popped from pendingQueue but hasn't been marked as sent or done yet. - // Release its pendingCount slot to avoid leaking flow control credits on worker failures. - s.requestCache.markDone() } - // Clear all regions from cache - cacheRegions := s.requestCache.clear() - regions = append(regions, cacheRegions...) + regions = append(regions, s.requestTracker.clear()...) return regions } diff --git a/logservice/logpuller/region_request_worker_test.go b/logservice/logpuller/region_request_worker_test.go index 2f729e29e5..ba89ec6068 100644 --- a/logservice/logpuller/region_request_worker_test.go +++ b/logservice/logpuller/region_request_worker_test.go @@ -68,6 +68,25 @@ func prepareRegionForSendTest(region regionInfo) regionInfo { return region } +func newTestRegionRequestWorker( + queueSize int, client *subscriptionClient, +) *regionRequestWorker { + store := &requestedStore{ + storeAddr: "store-1", + registerQueue: newRegionRegisterQueue(queueSize), + scanLimiter: newRegionScanLimiter(queueSize), + } + worker := ®ionRequestWorker{ + client: client, + store: store, + controlQueue: make(chan *regionReq, queueSize), + requestTracker: newRegionRequestTracker(), + } + worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates) + store.requestWorkers.s = []*regionRequestWorker{worker} + return worker +} + func TestRegionStatesOperation(t *testing.T) { worker := ®ionRequestWorker{} worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates) @@ -88,29 +107,23 @@ func TestRegionStatesOperation(t *testing.T) { require.Equal(t, 0, len(worker.requestedRegions.subscriptions)) } -func TestClearPendingRegionsReleaseSlotForPreFetchedRegion(t *testing.T) { - worker := ®ionRequestWorker{ - requestCache: newRequestCache(10), - } +func TestClearPendingRegionsReturnsPreFetchedRegion(t *testing.T) { + worker := newTestRegionRequestWorker(10, &subscriptionClient{}) ctx := context.Background() region := createTestRegionInfo(1, 1) - ok, err := worker.requestCache.add(ctx, region, false) + ok, err := worker.store.registerQueue.add(ctx, region, 0) require.NoError(t, err) require.True(t, ok) - req, err := worker.requestCache.pop(ctx) - require.NoError(t, err) - require.Equal(t, 1, worker.requestCache.getPendingCount()) - - worker.preFetchForConnecting = new(regionInfo) - *worker.preFetchForConnecting = req.regionInfo + req, ok, _ := worker.store.registerQueue.tryPopOrNotify() + require.True(t, ok) + worker.preFetchForConnecting = req regions := worker.clearPendingRegions() require.Len(t, regions, 1) require.Nil(t, worker.preFetchForConnecting) - require.Equal(t, 0, worker.requestCache.getPendingCount()) } type pushedResolvedEvent struct { @@ -281,58 +294,36 @@ func BenchmarkDispatchResolvedTsEventSmallBatchCurrent(b *testing.B) { } func TestClearPendingRegionsDoesNotReturnStoppedSentRegion(t *testing.T) { - worker := ®ionRequestWorker{ - requestCache: newRequestCache(10), - } - worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates) + worker := newTestRegionRequestWorker(10, &subscriptionClient{}) - ctx := context.Background() region := createTestRegionInfo(1, 1) - - ok, err := worker.requestCache.add(ctx, region, false) - require.NoError(t, err) - require.True(t, ok) - - req, err := worker.requestCache.pop(ctx) - require.NoError(t, err) + req := newRegionReq(region, 0) + slot, _ := worker.store.scanLimiter.tryAcquireOrNotify() + require.True(t, worker.requestTracker.track(req, slot)) state := newRegionFeedState(req.regionInfo, uint64(req.regionInfo.subscribedSpan.subID), worker) state.start() worker.addRegionState(req.regionInfo.subscribedSpan.subID, req.regionInfo.verID.GetID(), state) - // Simulate the race we are fixing in processRegionSendTask: - // once a request is visible in sentRequests, a fast region error may mark the - // region stopped before worker cleanup runs. In that case, markStopped should - // remove the sent request immediately, so clearPendingRegions must not return - // the stale region again during worker shutdown. - worker.requestCache.markSent(req) state.markStopped(errors.New("send request to store error")) worker.takeRegionState(req.regionInfo.subscribedSpan.subID, req.regionInfo.verID.GetID()) - require.Equal(t, 0, worker.requestCache.getPendingCount()) + require.Equal(t, 0, activeScanCount(worker.store.scanLimiter)) require.Empty(t, worker.clearPendingRegions()) } func TestProcessRegionSendTaskSendFailureCleansSentRequest(t *testing.T) { - worker := ®ionRequestWorker{ - requestCache: newRequestCache(10), - store: &requestedStore{storeAddr: "store-1"}, - client: &subscriptionClient{}, - } - worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates) + worker := newTestRegionRequestWorker(10, &subscriptionClient{}) ctx := context.Background() region := prepareRegionForSendTest(createTestRegionInfo(1, 1)) - ok, err := worker.requestCache.add(ctx, region, false) + ok, err := worker.store.registerQueue.add(ctx, region, 0) require.NoError(t, err) require.True(t, ok) - require.Equal(t, 1, worker.requestCache.getPendingCount()) - - req, err := worker.requestCache.pop(ctx) - require.NoError(t, err) - worker.preFetchForConnecting = new(regionInfo) - *worker.preFetchForConnecting = req.regionInfo + req, ok, _ := worker.store.registerQueue.tryPopOrNotify() + require.True(t, ok) + worker.preFetchForConnecting = req sendErr := errors.New("send failed") conn := &ConnAndClient{ @@ -342,8 +333,8 @@ func TestProcessRegionSendTaskSendFailureCleansSentRequest(t *testing.T) { err = worker.processRegionSendTask(ctx, conn) require.ErrorIs(t, err, sendErr) - require.Equal(t, 0, worker.requestCache.getPendingCount()) - require.Empty(t, worker.requestCache.sentRequests.regionReqs) + require.Equal(t, 0, activeScanCount(worker.store.scanLimiter)) + require.Equal(t, 0, worker.requestTracker.len()) state := worker.getRegionState(req.regionInfo.subscribedSpan.subID, req.regionInfo.verID.GetID()) require.True(t, state == nil || state.isStale(), "region state should be removed or marked stale after send failure") } @@ -355,23 +346,18 @@ func TestProcessRegionSendTaskAllowsDeregisterWhileScanPaused(t *testing.T) { quota := newPullerMemoryQuota(100) reservation, err := quota.acquire(ctx, 99, 50, nil) require.NoError(t, err) - worker := ®ionRequestWorker{ - requestCache: newRequestCache(10), - store: &requestedStore{storeAddr: "store-1"}, - client: &subscriptionClient{memoryQuota: quota}, - } - worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates) + worker := newTestRegionRequestWorker(10, &subscriptionClient{memoryQuota: quota}) registerRegion := prepareRegionForSendTest(createTestRegionInfo(1, 1)) - ok, err := worker.requestCache.add(ctx, registerRegion, false) + ok, err := worker.store.registerQueue.add(ctx, registerRegion, 0) require.NoError(t, err) require.True(t, ok) - registerReq, err := worker.requestCache.pop(ctx) - require.NoError(t, err) - worker.preFetchForConnecting = ®isterReq.regionInfo + registerReq, ok, _ := worker.store.registerQueue.tryPopOrNotify() + require.True(t, ok) + worker.preFetchForConnecting = registerReq deregisterRegion := regionInfo{subscribedSpan: &subscribedSpan{subID: 2}} - ok, err = worker.requestCache.add(ctx, deregisterRegion, true) + ok, err = worker.addControl(ctx, deregisterRegion) require.NoError(t, err) require.True(t, ok) @@ -405,6 +391,85 @@ func TestProcessRegionSendTaskAllowsDeregisterWhileScanPaused(t *testing.T) { require.ErrorIs(t, <-errCh, context.Canceled) } +func TestProcessRegionSendTaskSharesStoreScanLimit(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := &requestedStore{ + storeAddr: "store-1", + registerQueue: newRegionRegisterQueue(10), + scanLimiter: newRegionScanLimiter(1), + } + newWorker := func(regionID uint64) (*regionRequestWorker, *recordingEventFeedV2Client, <-chan error) { + worker := ®ionRequestWorker{ + client: &subscriptionClient{}, + store: store, + controlQueue: make(chan *regionReq, 10), + requestTracker: newRegionRequestTracker(), + } + worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates) + region := prepareRegionForSendTest(createTestRegionInfo(1, regionID)) + worker.preFetchForConnecting = newRegionReq(region, 0) + client := &recordingEventFeedV2Client{ + mockEventFeedV2Client: &mockEventFeedV2Client{}, + sent: make(chan *cdcpb.ChangeDataRequest, 1), + } + errCh := make(chan error, 1) + go func() { + errCh <- worker.processRegionSendTask(ctx, &ConnAndClient{ + Client: client, + Conn: &grpc.ClientConn{}, + }) + }() + return worker, client, errCh + } + + worker1, client1, errCh1 := newWorker(1) + worker2, client2, errCh2 := newWorker(2) + + var firstWorker, secondWorker *regionRequestWorker + var secondClient *recordingEventFeedV2Client + select { + case <-client1.sent: + firstWorker, secondWorker, secondClient = worker1, worker2, client2 + case <-client2.sent: + firstWorker, secondWorker, secondClient = worker2, worker1, client1 + case <-time.After(time.Second): + t.Fatal("expected one Register request") + } + + select { + case request := <-secondClient.sent: + t.Fatalf("scan limit exceeded before first request completed: %v", request) + case <-time.After(20 * time.Millisecond): + } + require.Equal(t, 1, activeScanCount(store.scanLimiter)) + + var firstState *regionFeedState + for _, regionID := range []uint64{1, 2} { + if state := firstWorker.getRegionState(1, regionID); state != nil { + firstState = state + break + } + } + require.NotNil(t, firstState) + firstState.setInitialized() + + select { + case <-secondClient.sent: + case <-time.After(time.Second): + t.Fatal("second Register request did not proceed after slot release") + } + require.Equal(t, 1, activeScanCount(store.scanLimiter)) + + cancel() + require.ErrorIs(t, <-errCh1, context.Canceled) + require.ErrorIs(t, <-errCh2, context.Canceled) + firstWorker.clearPendingRegions() + secondWorker.clearPendingRegions() + require.Equal(t, 0, activeScanCount(store.scanLimiter)) +} + func TestProcessRegionSendTaskSendEOFIsRetriable(t *testing.T) { testCases := []struct { name string @@ -422,24 +487,18 @@ func TestProcessRegionSendTaskSendEOFIsRetriable(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - worker := ®ionRequestWorker{ - requestCache: newRequestCache(10), - store: &requestedStore{storeAddr: "store-1"}, - client: &subscriptionClient{}, - } - worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates) + worker := newTestRegionRequestWorker(10, &subscriptionClient{}) ctx := context.Background() region := prepareRegionForSendTest(createTestRegionInfo(1, 1)) - ok, err := worker.requestCache.add(ctx, region, false) + ok, err := worker.store.registerQueue.add(ctx, region, 0) require.NoError(t, err) require.True(t, ok) - req, err := worker.requestCache.pop(ctx) - require.NoError(t, err) - worker.preFetchForConnecting = new(regionInfo) - *worker.preFetchForConnecting = req.regionInfo + req, ok, _ := worker.store.registerQueue.tryPopOrNotify() + require.True(t, ok) + worker.preFetchForConnecting = req conn := &ConnAndClient{ Client: &mockEventFeedV2Client{sendErr: tc.sendErr}, @@ -449,8 +508,8 @@ func TestProcessRegionSendTaskSendEOFIsRetriable(t *testing.T) { err = worker.processRegionSendTask(ctx, conn) var streamErr *storeStreamErr require.ErrorAs(t, err, &streamErr) - require.Equal(t, 0, worker.requestCache.getPendingCount()) - require.Empty(t, worker.requestCache.sentRequests.regionReqs) + require.Equal(t, 0, activeScanCount(worker.store.scanLimiter)) + require.Equal(t, 0, worker.requestTracker.len()) state := worker.getRegionState(req.regionInfo.subscribedSpan.subID, req.regionInfo.verID.GetID()) require.NotNil(t, state) diff --git a/logservice/logpuller/region_state.go b/logservice/logpuller/region_state.go index e9c21a7aad..5197dcb277 100644 --- a/logservice/logpuller/region_state.go +++ b/logservice/logpuller/region_state.go @@ -126,7 +126,7 @@ func (s *regionFeedState) markStopped(err error) { s.state.v = stateStopped s.state.err = err } - s.worker.requestCache.markStopped(s.region.subscribedSpan.subID, s.region.verID.GetID()) + s.worker.requestTracker.stop(s.region.subscribedSpan.subID, s.region.verID.GetID()) } // mark regionFeedState as removed if possible. @@ -138,7 +138,7 @@ func (s *regionFeedState) markRemoved() (changed bool) { changed = true s.matcher.clear() } - s.worker.requestCache.markStopped(s.region.subscribedSpan.subID, s.region.verID.GetID()) + s.worker.requestTracker.stop(s.region.subscribedSpan.subID, s.region.verID.GetID()) return } @@ -162,7 +162,7 @@ func (s *regionFeedState) isInitialized() bool { func (s *regionFeedState) setInitialized() { s.region.lockedRangeState.Initialized.Store(true) - s.worker.requestCache.resolve(s.region.subscribedSpan.subID, s.region.verID.GetID()) + s.worker.requestTracker.resolve(s.region.subscribedSpan.subID, s.region.verID.GetID()) } func (s *regionFeedState) getRegionID() uint64 { diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index e5756a5a94..dacfac861b 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -304,10 +304,12 @@ func (s *subscriptionClient) updateMetrics(ctx context.Context) error { pendingRegionReqCount := 0 s.stores.Range(func(_, value any) bool { store := value.(*requestedStore) + pendingRegionReqCount += store.registerQueue.len() + activeScanCount, _ := store.scanLimiter.usage() + pendingRegionReqCount += activeScanCount store.requestWorkers.RLock() for _, worker := range store.requestWorkers.s { - worker.requestCache.clearStaleRequest() - pendingRegionReqCount += worker.requestCache.getPendingCount() + pendingRegionReqCount += len(worker.controlQueue) } store.requestWorkers.RUnlock() return true @@ -484,9 +486,9 @@ func (s *subscriptionClient) onRegionFail(errInfo regionErrorInfo) { // requestedStore represents a store that has been connected. type requestedStore struct { - storeAddr string - // Use to select a worker to send request. - nextWorker atomic.Uint32 + storeAddr string + registerQueue *regionRegisterQueue + scanLimiter *regionScanLimiter requestWorkers struct { sync.RWMutex @@ -494,14 +496,6 @@ type requestedStore struct { } } -func (rs *requestedStore) getRequestWorker() *regionRequestWorker { - rs.requestWorkers.RLock() - defer rs.requestWorkers.RUnlock() - - index := rs.nextWorker.Add(1) % uint32(len(rs.requestWorkers.s)) - return rs.requestWorkers.s[index] -} - // handleRegions receives regionInfo from regionTaskQueue and attach rpcCtx to them, // then send them to corresponding requestedStore. func (s *subscriptionClient) handleRegions(ctx context.Context, eg *errgroup.Group) error { @@ -514,7 +508,11 @@ func (s *subscriptionClient) handleRegions(ctx context.Context, eg *errgroup.Gro return rs } - rs = &requestedStore{storeAddr: storeAddr} + rs = &requestedStore{ + storeAddr: storeAddr, + registerQueue: newRegionRegisterQueue(pendingRegionRequestQueueSize), + scanLimiter: newRegionScanLimiter(pendingRegionRequestQueueSize), + } rs.requestWorkers.s = make([]*regionRequestWorker, 0, s.config.RegionRequestWorkerPerStore) s.stores.Store(storeAddr, rs) @@ -538,10 +536,19 @@ func (s *subscriptionClient) handleRegions(ctx context.Context, eg *errgroup.Gro defer func() { s.stores.Range(func(_, value any) bool { rs := value.(*requestedStore) + rs.registerQueue.clear() rs.requestWorkers.RLock() for _, w := range rs.requestWorkers.s { - w.requestCache.clear() + DRAIN_CONTROL: + for { + select { + case <-w.controlQueue: + default: + break DRAIN_CONTROL + } + } + w.requestTracker.clear() } rs.requestWorkers.RUnlock() @@ -582,10 +589,8 @@ func (s *subscriptionClient) handleRegions(ctx context.Context, eg *errgroup.Gro } store := getStore(region.rpcCtx.Addr) - worker := store.getRequestWorker() - force := regionTask.Priority() <= forcedPriorityBase - ok, err = worker.add(ctx, region, force) + ok, err = store.registerQueue.add(ctx, region, regionTask.Priority()) if err != nil { log.Warn("subscription client add region request failed", zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID)), @@ -600,7 +605,6 @@ func (s *subscriptionClient) handleRegions(ctx context.Context, eg *errgroup.Gro } log.Debug("subscription client will request a region", - zap.Uint64("workID", worker.workerID), zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID)), zap.Uint64("regionID", region.verID.GetID()), zap.String("addr", store.storeAddr)) @@ -616,7 +620,7 @@ func (s *subscriptionClient) enqueueRegionToAllStores(ctx context.Context, regio workers := rs.requestWorkers.s rs.requestWorkers.RUnlock() for _, worker := range workers { - ok, err := worker.add(ctx, region, true) + ok, err := worker.addControl(ctx, region) if err != nil { firstErr = err enqueued = false diff --git a/logservice/logpuller/subscription_client_test.go b/logservice/logpuller/subscription_client_test.go index 6948257a55..f69907e7c7 100644 --- a/logservice/logpuller/subscription_client_test.go +++ b/logservice/logpuller/subscription_client_test.go @@ -92,7 +92,7 @@ func TestGenerateResolveLockTask(t *testing.T) { } worker := ®ionRequestWorker{ - requestCache: &requestCache{}, + requestTracker: newRegionRequestTracker(), } // Lock another range, no task will be triggered before initialized. res = span.rangeLock.LockRange(context.Background(), []byte{'c'}, []byte{'d'}, 2, 100) @@ -420,18 +420,15 @@ func TestEnqueueStopRegionWhenRegisterCacheFull(t *testing.T) { ctx := context.Background() client := &subscriptionClient{} - worker := ®ionRequestWorker{ - requestCache: newRequestCache(1), - } - store := &requestedStore{storeAddr: "store-1"} - store.requestWorkers.s = []*regionRequestWorker{worker} + worker := newTestRegionRequestWorker(1, client) + store := worker.store client.stores.Store(store.storeAddr, store) dummyRegion := regionInfo{ subscribedSpan: &subscribedSpan{subID: SubscriptionID(2)}, lockedRangeState: ®ionlock.LockedRangeState{}, } - ok, err := worker.add(ctx, dummyRegion, true) + ok, err := store.registerQueue.add(ctx, dummyRegion, 0) require.NoError(t, err) require.True(t, ok) @@ -441,8 +438,8 @@ func TestEnqueueStopRegionWhenRegisterCacheFull(t *testing.T) { enqueued, err := client.enqueueRegionToAllStores(ctx, stopRegion) require.NoError(t, err) require.True(t, enqueued) - require.Equal(t, 1, len(worker.requestCache.pendingQueue)) - require.Equal(t, 1, len(worker.requestCache.controlQueue)) + require.Equal(t, 1, store.registerQueue.len()) + require.Equal(t, 1, len(worker.controlQueue)) } func TestSubscriptionWithFailedTiKV(t *testing.T) { diff --git a/pkg/config/debug.go b/pkg/config/debug.go index 7304b0254e..9271aa91ea 100644 --- a/pkg/config/debug.go +++ b/pkg/config/debug.go @@ -69,10 +69,9 @@ type PullerConfig struct { // LogRegionDetails determines whether logs Region details or not in puller and kv-client. LogRegionDetails bool `toml:"log-region-details" json:"log_region_details"` - // PendingRegionRequestQueueSize is the total size of the pending region request queue shared across - // all puller workers connecting to a single TiKV store. This size is divided equally among all workers. - // For example, if PendingRegionRequestQueueSize is 32 and there are 8 workers connecting to the same store, - // each worker's queue size will be 32 / 8 = 4. + // PendingRegionRequestQueueSize is the capacity of the pending Register queue + // shared by all puller workers connecting to one TiKV store. It also limits + // the number of active incremental scans for that store. PendingRegionRequestQueueSize int `toml:"pending-region-request-queue-size" json:"pending_region_request_queue_size"` }