diff --git a/docs/design/2026-06-18-puller-memory-quota.md b/docs/design/2026-06-18-puller-memory-quota.md new file mode 100644 index 0000000000..1925bf5427 --- /dev/null +++ b/docs/design/2026-06-18-puller-memory-quota.md @@ -0,0 +1,490 @@ +# Puller Memory Quota and Region Scan Throttling + +## Status + +Proposed. + +## Background + +The log puller receives Region events from TiKV and pushes them into a dynamic +stream for processing. The dynamic stream currently uses a fixed 1 GiB memory +quota and applies area-level flow control with the following thresholds: + +- Pause input when pending memory reaches 80% of the quota. +- Resume input when pending memory falls below 50% of the quota. + +Once input is paused, the puller stops consuming from the TiKV gRPC stream. This +protects TiCDC from unbounded pending data, but transfers the pressure to TiKV. +In an extreme case, TiKV may continue running newly requested incremental Region +scans while it is unable to send their output to TiCDC, increasing TiKV memory +usage. + +The fixed 80% pause threshold also leaves a significant portion of the puller +quota unused. The puller should be able to use all available quota before +applying hard backpressure. + +## Goals + +- Give the log puller a dedicated, configurable memory quota. +- Allow the puller to use available quota up to the hard limit. +- Stop sending new Region incremental scan requests before the quota is full. +- Retain hard backpressure as the final memory protection mechanism. +- Always allow subscription cleanup requests to make progress. +- Keep the change local to the log puller where possible. + +## Non-goals + +- Limiting the total RSS of the TiCDC process. +- Accounting for all puller-related allocations, such as gRPC internals, Region + state, transaction matcher state, or Go runtime overhead. +- Cancelling Region scans that have already been sent to TiKV. +- Dynamically tuning the scan throttling thresholds. +- Providing separate quotas for individual changefeeds or subscriptions. + +The quota in this design is a logical budget for Region events admitted into the +puller processing pipeline. It is not a strict process memory limit. + +## Configuration + +Add one server-level debug configuration: + +```toml +[debug.puller] +memory-quota = 1073741824 # 1 GiB +``` + +`memory-quota` is expressed in bytes and must be greater than zero. Its default +value is 1 GiB, preserving the current effective quota. + +The following thresholds are implementation constants and are not configurable: + +```go +const ( + regionScanPauseRatio = 0.5 + regionScanResumeRatio = 0.2 +) +``` + +The hard event admission limit is always 100% of `memory-quota`. + +## Design Overview + +The subscription client owns one `PullerMemoryQuota`. All subscriptions handled +by that client share the quota. + +The quota controls two independent actions: + +1. Region scan admission: stop new Region register requests at 50% usage and + resume them below 20% usage. +2. Event admission: block new Region events when admitting them would exceed the + hard quota, and admit them as soon as sufficient space becomes available. + +```text + usage >= 50% + NORMAL ---------------------------------> SCAN_THROTTLED + ^ | + | usage < 20% | next event does not fit + | v + +------------------------------------------- FULL + | + | sufficient space released + v + SCAN_THROTTLED +``` + +`FULL` does not have a separate low-watermark. A waiting event proceeds as soon +as sufficient quota is released. This removes the current behavior where input +remains paused until usage falls from 80% to 50%. + +The 50%/20% hysteresis applies only to Region scan admission. It prevents Region +requests from repeatedly pausing and resuming when usage fluctuates around 50%. +The slow-consumer integration test must verify that normal steady-state usage can +fall below 20%; otherwise the resume threshold would starve new subscriptions +and Region retries and must be adjusted before rollout. + +## Puller Memory Quota + +The quota controller belongs to `logservice/logpuller`, rather than a shared +package, because its Region scan gate and lifecycle semantics are puller-specific. + +### Event admission + +After receiving a Region event and calculating its size, the subscription client +acquires quota before admitting the event into the processing pipeline. + +- The event is admitted immediately when it fits in the remaining quota. +- Otherwise its receive goroutine waits for memory release, subscription + stop, or context cancellation. +- Concurrent waiters use atomic compare-and-swap and cannot over-admit memory. +- A single event larger than the configured quota is admitted only when it can + run alone, avoiding permanent deadlock. + +The event has already been allocated by gRPC when its size becomes known. Each +receive goroutine may therefore hold one unadmitted event while waiting. These +events are outside the logical quota, but no additional events are received by +the blocked goroutines. + +### Fast path + +Memory quota is on the event receive critical path. The normal path must not use +a global mutex or allocate an object per event. + +- Global admitted memory is maintained by an atomic counter. +- Acquire reserves memory with a compare-and-swap loop. +- Subscription accounting state is created at subscription setup and referenced + directly by events; Acquire does not look up a global map. +- The event stores its accounted byte count and subscription owner directly. +- A handler batch sums the accounted bytes and performs one Release operation. +- Waiter locks and notification channels are used only when quota is exhausted. +- Scan gate locks are used only when usage crosses a threshold. + +### Event release + +Accounted memory is released: + +- When the consumer starts handling the event and it leaves the buffered + pipeline. +- When the pipeline rejects the event. +- In aggregate when its subscription is removed. +- In aggregate when the subscription client closes. + +Each subscription accounting object has an active lifecycle protected by a local +lock. Aggregate subscription release marks it inactive and releases its total +usage. A later release from an in-flight stale event becomes a no-op instead of +decrementing unrelated memory. + +## Region Request Limiting + +Region request queueing, incremental scan concurrency, and sent-request tracking +are separate responsibilities. They must not share one counter or repair each +other's state periodically. + +### Store-level scan limiter + +Each TiKV store owns one `regionScanLimiter`. A scan slot represents exactly one +Register request that has been sent but has not completed its incremental scan. +The limit is enforced across all request workers connected to the store instead +of being divided approximately between workers. + +A slot is acquired immediately before sending Register and released on exactly +one of these terminal paths: + +- The Region reports Initialized. +- TiKV returns a Region error. +- Sending Register fails. +- The subscription is deregistered. +- The gRPC stream reconnects and its in-flight requests are rescheduled. + +Every acquired slot is represented by an idempotent token. Releasing the token +more than once is a no-op. Counter underflow is an invariant violation and must +not be repaired at runtime. + +### Request queues + +Pending requests are split by semantics: + +- A store-level `registerQueue` contains initial subscriptions and Region error + retries. All workers for the store consume this shared priority queue. It is + subject to both the memory scan gate and the store-level scan limiter. +- Each worker has a `controlQueue` containing Deregister requests for its gRPC + stream. Deregister is broadcast to the relevant workers, bypasses the memory + scan gate and scan limiter, and is always selected first. + +Queue capacity protects local TiCDC memory only. It is not used as the number of +active incremental scans. Retry and initial-subscription priorities remain +explicit request attributes; priority never implies bypassing the limiter. + +### In-flight tracker + +Each request worker tracks only Register requests sent on its own gRPC stream. +The tracker owns the corresponding scan slot until a terminal event occurs. It +does not contain pending queues and does not maintain a second flow-control +counter. + +The following state is removed: + +- `pendingCount` and `spaceAvailable`. +- The `force` bypass derived from dynamic request priority. +- Periodic reconciliation between pending count and actual requests. +- The `region worker pending request count is not equal to actual region request + count, correct it` log and its correction path. + +```text +Region discovery + | + v +store register queue + | + v +memory scan gate + | + v +store scan limiter + | + v +worker Send(Register) ---> in-flight tracker + | + +-- Initialized -> release slot + +-- Error -> release and reschedule + +-- Reconnect -> release and reschedule + +Deregister -> control queue -> worker Send(Deregister) +``` + +## Region Scan Gate + +Every Region register request can initiate an incremental scan in TiKV. This +includes both the first request for a subscription and a registration retried +after a Region error. Therefore all register requests are subject to the scan +gate. + +Requests already sent to TiKV remain active. Pausing them would discard work and +could cause repeated scans after resubscription. + +The gate never blocks: + +- Deregister requests. +- Subscription cleanup. +- Local error handling. +- Resolve-lock processing. + +The gate contains an atomic paused flag and a notification mechanism used only +by waiters. Checking an open gate is one atomic load. + +### Sender gate + +The effective check is performed when a worker selects a Register request for +sending. Request selection has these rules: + +- When the gate is open, the worker may select control or Register requests. +- When the gate is closed, the worker may select only control requests, gate + changes, or context cancellation. +- A Register selected immediately before the gate closes is allowed to finish + sending. Holding a lock across gRPC Send is explicitly forbidden. + +Queue selection and gate waiting are implemented in one helper. Workers do not +inspect memory quota internals or maintain a nested loop for a retained Register +request. + +### Producer gate + +An additional gate is checked in `divideSpanAndScheduleRegionRequests`: + +- Before each `BatchLoadRegionsWithKeyRange` call. + +This gate is an optimization. It avoids unnecessary PD requests, Region range +splitting, and growth of local task queues while scans are paused. It is not +checked for every Region returned by a batch. At most one loaded batch, currently +1024 Regions, may be queued after the gate closes; the sender gate prevents those +requests from reaching TiKV. + +Waiting at this gate must exit when the context is cancelled or the subscribed +span is stopped. + +## Concurrency and Ordering + +Quota state transitions and Region scan gate transitions must be atomic with +respect to memory accounting: + +- An Acquire that moves usage to or above 50% closes the scan gate. +- A Release that moves usage below 20% opens the scan gate and wakes waiting + Region workers. +- A `Release` wakes event admission waiters whenever their request may now fit. +- State-change logs are emitted only on threshold crossings. + +Thresholds are precomputed in bytes. Acquire and Release compare integer byte +counts rather than performing floating-point division on the critical path. + +## Implementation Plan + +The refactoring is split into independently testable steps. Region request +limiting is completed before the new quota fast path so memory throttling is not +built on the current request-cache state machine. + +### Step 0: Establish performance and behavior baselines + +Add benchmarks and assertions around the current critical paths before changing +their implementation: + +- Parallel event admission and release with 1, 8, 32, and 128 goroutines. +- Single-subscription and many-subscription workloads. +- Region request enqueue, Send, Initialized, error, and reconnect lifecycles. +- Mutex and allocation profiles for the puller receive path. +- The maximum number of active incremental scans per store. + +The baseline records throughput, nanoseconds per operation, allocations per +operation, mutex wait time, and the number of scan requests sent during memory +pressure. + +### Step 1: Replace Region request flow control + +Introduce the store-level limiter, semantic request queues, and worker in-flight +tracker without changing memory quota behavior. + +1. Add `regionScanLimiter` to `requestedStore` so all workers for a store share + one exact limit. +2. Add an idempotent scan slot token. Attach it only after a Register request is + selected for sending. +3. Split pending Register and Deregister requests into `registerQueue` and + `controlQueue`. +4. Move sent-request state into `regionRequestTracker` owned by each worker. +5. Release scan slots from Initialized, Region error, Send failure, Deregister, + reconnect, and shutdown paths. +6. Remove `pendingCount`, `spaceAvailable`, `force`, stale count correction, and + the count-correction log. + +During migration, old and new counters must not run in parallel. The new limiter +becomes the only source of active-scan count in the same change that removes +`pendingCount`. + +Step 1 is complete when tests prove that every acquired slot reaches exactly one +terminal release and the configured per-store limit is never exceeded. + +### Step 2: Introduce the standalone Region scan gate + +Add `regionScanGate` independently of memory accounting, initially leaving it +open in production tests. + +1. Implement atomic open/paused state and transition notifications. +2. Change worker request selection so a closed gate disables Register selection + but leaves Deregister selection enabled. +3. Move gate-aware selection into one queue helper and remove worker-level nested + wait loops. +4. Check the producer gate once before each PD Region batch load and remove the + per-Region gate check. +5. Add deterministic tests for pause during queueing, pause immediately before + Send, Deregister bypass, cancellation, and resume. + +Step 2 is complete when the open-gate fast path is one atomic load and no worker +depends on quota-internal channels or locks. + +### Step 3: Implement the zero-allocation memory quota fast path + +Replace the mutex-based quota and per-event reservation object. + +1. Create subscription memory state during Subscribe and store its pointer on + `subscribedSpan`. +2. Store accounted bytes and the subscription owner directly in `regionEvent`. +3. Implement global quota reservation with atomic compare-and-swap. +4. Protect only subscription lifecycle changes with a subscription-local lock. +5. Aggregate Release bytes for every handler batch. +6. Allocate waiter notification state only after quota exhaustion; Release does + not touch the waiter lock when there are no waiters. +7. Precompute pause and resume thresholds in bytes and drive `regionScanGate` + only when usage crosses them. +8. Allow an oversized event only when it can run alone. + +Acquire that races with unsubscribe must either complete under the active +subscription lifecycle or return cancellation. Aggregate unsubscribe release +must not race past an in-progress Acquire. + +Step 3 is complete when quota-available Acquire and batched Release have zero +allocations, do not acquire a global mutex, and preserve exact admitted-byte +accounting under the race detector. + +### Step 4: Integrate, observe, and remove transitional code + +Connect memory usage transitions to the standalone scan gate and remove all +remaining transitional paths: + +1. Pause Region Register selection at 50% admitted memory. +2. Resume selection below 20% admitted memory. +3. Keep event admission open until the next event cannot fit. +4. Remove obsolete request-cache metrics and replace them with queue, active + scan, gate, waiter, and quota metrics. +5. Remove dead methods, compatibility branches, and comments describing deleted + algorithms. +6. Update the design document with any threshold changes justified by benchmark + or stress-test results. + +Step 4 is complete after focused unit tests, race tests, the main binary build, +and a slow-consumer integration test all pass. A mutex profile must show no quota +global lock on the normal receive path. + +## Observability + +Add metrics for: + +- Configured puller quota in bytes. +- Admitted memory in bytes. +- Available quota in bytes. +- Number and duration of quota waiters. +- Whether the Region scan gate is open or closed. +- Number of Region register requests waiting on the gate. +- Scan gate pause and resume counts. +- Register and control queue lengths. +- Active incremental scan slots per store and their configured limit. +- Scan slot lifetime from Register Send to Initialized or another terminal path. + +Log quota and scan gate state only on transitions. Transition logs should include +the configured quota, current usage, threshold, and number of waiting requests. + +## Failure and Shutdown Behavior + +- Context cancellation wakes all quota and Region gate waiters. +- Unsubscribe and deregister continue even while the quota is full. +- Closing the subscription client wakes blocked event pushes and Region gate + waiters through context cancellation. +- A Region worker reconnect does not bypass the scan gate for register requests. +- Quota and scan-slot accounting errors must not be corrected silently. + Underflow, double release, or leaked ownership should be surfaced during tests + and with an operational error log. + +## Compatibility + +The default 1 GiB value preserves the current configuration-free behavior with +the following intentional changes: + +- New Region scans stop at 50% puller memory usage. +- Event input continues beyond 80% while space remains. +- Hard backpressure starts only when the next event cannot fit. +- Input resumes as soon as the waiting event can fit instead of waiting for usage + to fall to 50%. + +No TiKV protocol change is required. + +## Test Plan + +### Quota unit tests + +- An event is admitted whenever it fits in the remaining quota. +- A non-fitting event waits and resumes as soon as sufficient quota is released. +- An oversized event runs alone without deadlocking the puller. +- Subscription removal releases all of its accounted memory. +- Releases from stale subscriptions do not corrupt accounting. +- Available-quota Acquire and batch Release perform zero allocations. +- Concurrent Acquire and unsubscribe preserve exact global and subscription + accounting. + +### Region gate unit tests + +- The gate closes when usage reaches 50%. +- The gate remains closed between 20% and 50%. +- The gate opens when usage falls below 20%. +- Register requests are not sent while the gate is closed. +- Both initial and retry register requests are gated. +- Deregister requests are sent while register requests are waiting. +- Cancellation and unsubscribe wake producer-side waiters. + +### Integration tests + +Use a deliberately slow event consumer and verify that: + +- Puller admitted memory remains bounded by the expected quota and receive + headroom. +- The number of active TiKV incremental scans stops increasing after the 50% + threshold is crossed. +- Existing Region streams continue making progress until hard backpressure is + required. +- Region request scheduling resumes after usage falls below 20%. +- Active incremental scans for a store never exceed its configured limit. + +### Performance tests + +- Benchmark quota Acquire and Release with 1, 8, 32, and 128 goroutines. +- Benchmark Region request selection with the gate open and closed. +- Compare event receive throughput and allocations against the Step 0 baseline. +- Capture mutex and allocation profiles under sustained high Region traffic. + +The quota fast path must have zero allocations per operation and must not expose +a globally contended mutex in the profile. diff --git a/logservice/logpuller/memory_quota.go b/logservice/logpuller/memory_quota.go new file mode 100644 index 0000000000..96e865969e --- /dev/null +++ b/logservice/logpuller/memory_quota.go @@ -0,0 +1,249 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package logpuller + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/metrics" + "go.uber.org/zap" +) + +const ( + regionScanPauseRatio = 0.5 + regionScanResumeRatio = 0.4 +) + +// pullerMemoryQuota bounds memory held by admitted events and controls region +// scan admission based on current usage. +type pullerMemoryQuota struct { + mu sync.Mutex + + capacity uint64 + used uint64 + closed bool + + memoryReleased chan struct{} + + regionScanPaused bool + regionScanResume chan struct{} + + subscriptions map[SubscriptionID]*pullerMemorySubscription +} + +type pullerMemorySubscription struct { + usage uint64 +} + +type pullerMemoryReservation struct { + quota *pullerMemoryQuota + subID SubscriptionID + subscription *pullerMemorySubscription + bytes uint64 + released atomic.Bool +} + +func newPullerMemoryQuota(capacity uint64) *pullerMemoryQuota { + metrics.PullerRegionScanGate.Set(1) + metrics.PullerMemoryQuota.WithLabelValues("quota").Set(float64(capacity)) + metrics.PullerMemoryQuota.WithLabelValues("used").Set(0) + return &pullerMemoryQuota{ + capacity: capacity, + memoryReleased: make(chan struct{}), + regionScanResume: make(chan struct{}), + subscriptions: make(map[SubscriptionID]*pullerMemorySubscription), + } +} + +func (q *pullerMemoryQuota) acquire( + ctx context.Context, subID SubscriptionID, bytes uint64, stopped <-chan struct{}, +) (*pullerMemoryReservation, error) { + for { + q.mu.Lock() + if q.closed { + q.mu.Unlock() + return nil, context.Canceled + } + // A single oversized event must be allowed to run alone, otherwise it can + // never make progress and permanently deadlocks the puller. + fits := q.used <= q.capacity && bytes <= q.capacity-q.used + if fits || q.used == 0 && bytes > q.capacity { + subscription := q.subscriptions[subID] + if subscription == nil { + subscription = &pullerMemorySubscription{} + q.subscriptions[subID] = subscription + } + q.used += bytes + subscription.usage += bytes + q.updateRegionScanStateLocked() + q.mu.Unlock() + return &pullerMemoryReservation{ + quota: q, + subID: subID, + subscription: subscription, + bytes: bytes, + }, nil + } + memoryReleased := q.memoryReleased + q.mu.Unlock() + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-stopped: + return nil, context.Canceled + case <-memoryReleased: + } + } +} + +func (r *pullerMemoryReservation) release() { + if r == nil || !r.released.CompareAndSwap(false, true) { + return + } + r.quota.release(r) +} + +func (q *pullerMemoryQuota) release(reservation *pullerMemoryReservation) { + q.mu.Lock() + defer q.mu.Unlock() + if q.closed || q.subscriptions[reservation.subID] != reservation.subscription { + return + } + + usage := reservation.subscription.usage + if usage < reservation.bytes || q.used < reservation.bytes { + log.Error("puller memory quota accounting underflow", + zap.Uint64("subscriptionID", uint64(reservation.subID)), + zap.Uint64("releaseBytes", reservation.bytes), + zap.Uint64("subscriptionUsage", usage), + zap.Uint64("memoryUsage", q.used)) + return + } + + q.used -= reservation.bytes + usage -= reservation.bytes + if usage == 0 { + delete(q.subscriptions, reservation.subID) + } else { + reservation.subscription.usage = usage + } + q.notifyMemoryReleasedLocked() + q.updateRegionScanStateLocked() +} + +// releaseSubscription releases all reservations owned by a removed +// subscription. Reservation ownership makes later stale releases no-ops. +func (q *pullerMemoryQuota) releaseSubscription(subID SubscriptionID) { + q.mu.Lock() + defer q.mu.Unlock() + if q.closed { + return + } + + subscription := q.subscriptions[subID] + if subscription == nil { + return + } + usage := subscription.usage + if usage > q.used { + log.Error("puller subscription memory usage exceeds total usage", + zap.Uint64("subscriptionID", uint64(subID)), + zap.Uint64("subscriptionUsage", usage), + zap.Uint64("memoryUsage", q.used)) + usage = q.used + } + q.used -= usage + delete(q.subscriptions, subID) + if usage != 0 { + q.notifyMemoryReleasedLocked() + q.updateRegionScanStateLocked() + } +} + +func (q *pullerMemoryQuota) close() { + q.mu.Lock() + defer q.mu.Unlock() + if q.closed { + return + } + q.closed = true + q.used = 0 + clear(q.subscriptions) + metrics.PullerMemoryQuota.WithLabelValues("used").Set(0) + close(q.memoryReleased) + if q.regionScanPaused { + q.regionScanPaused = false + close(q.regionScanResume) + } + metrics.PullerRegionScanGate.Set(1) +} + +func (q *pullerMemoryQuota) notifyMemoryReleasedLocked() { + close(q.memoryReleased) + q.memoryReleased = make(chan struct{}) +} + +func (q *pullerMemoryQuota) updateRegionScanStateLocked() { + usageRatio := float64(q.used) / float64(q.capacity) + switch { + case !q.regionScanPaused && usageRatio >= regionScanPauseRatio: + q.regionScanPaused = true + q.regionScanResume = make(chan struct{}) + metrics.PullerRegionScanGate.Set(0) + metrics.PullerRegionScanGateTransition.WithLabelValues("pause").Inc() + log.Info("puller pauses region scans", + zap.Uint64("memoryUsage", q.used), + zap.Uint64("memoryQuota", q.capacity), + zap.Float64("memoryUsageRatio", usageRatio)) + case q.regionScanPaused && usageRatio < regionScanResumeRatio: + q.regionScanPaused = false + close(q.regionScanResume) + metrics.PullerRegionScanGate.Set(1) + metrics.PullerRegionScanGateTransition.WithLabelValues("resume").Inc() + log.Info("puller resumes region scans", + zap.Uint64("memoryUsage", q.used), + zap.Uint64("memoryQuota", q.capacity), + zap.Float64("memoryUsageRatio", usageRatio)) + } +} + +func (q *pullerMemoryQuota) waitRegionScanAllowed(ctx context.Context) error { + for { + resume, paused := q.regionScanResumeNotify() + if !paused { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-resume: + } + } +} + +func (q *pullerMemoryQuota) regionScanResumeNotify() (<-chan struct{}, bool) { + q.mu.Lock() + defer q.mu.Unlock() + return q.regionScanResume, q.regionScanPaused +} + +func (q *pullerMemoryQuota) usage() (used, capacity uint64) { + q.mu.Lock() + defer q.mu.Unlock() + return q.used, q.capacity +} diff --git a/logservice/logpuller/memory_quota_test.go b/logservice/logpuller/memory_quota_test.go new file mode 100644 index 0000000000..75f3186dc7 --- /dev/null +++ b/logservice/logpuller/memory_quota_test.go @@ -0,0 +1,138 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package logpuller + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestPullerMemoryQuotaAdmissionAndScanThresholds(t *testing.T) { + quota := newPullerMemoryQuota(100) + ctx := context.Background() + + first, err := quota.acquire(ctx, 1, 49, nil) + require.NoError(t, err) + _, scanPaused := quota.regionScanResumeNotify() + require.False(t, scanPaused) + + second, err := quota.acquire(ctx, 1, 1, nil) + require.NoError(t, err) + _, scanPaused = quota.regionScanResumeNotify() + require.True(t, scanPaused) + + third, err := quota.acquire(ctx, 1, 50, nil) + require.NoError(t, err) + used, capacity := quota.usage() + require.Equal(t, uint64(100), used) + require.Equal(t, uint64(100), capacity) + + blocked := make(chan *pullerMemoryReservation, 1) + go func() { + reservation, acquireErr := quota.acquire(ctx, 2, 1, nil) + require.NoError(t, acquireErr) + blocked <- reservation + }() + select { + case <-blocked: + t.Fatal("event admitted when quota was full") + case <-time.After(20 * time.Millisecond): + } + + third.release() + fourth := <-blocked + used, _ = quota.usage() + require.Equal(t, uint64(51), used) + _, scanPaused = quota.regionScanResumeNotify() + require.True(t, scanPaused) + + first.release() + _, scanPaused = quota.regionScanResumeNotify() + require.False(t, scanPaused) + require.NoError(t, quota.waitRegionScanAllowed(ctx)) + + second.release() + fourth.release() + used, _ = quota.usage() + require.Zero(t, used) +} + +func TestPullerMemoryQuotaReleaseSubscription(t *testing.T) { + quota := newPullerMemoryQuota(100) + reservation, err := quota.acquire(context.Background(), 1, 60, nil) + require.NoError(t, err) + + quota.releaseSubscription(1) + used, _ := quota.usage() + require.Zero(t, used) + + // A stale event can still be handled after subscription cleanup. Its release + // must not affect a later generation of accounting. + newReservation, err := quota.acquire(context.Background(), 1, 20, nil) + require.NoError(t, err) + reservation.release() + used, _ = quota.usage() + require.Equal(t, uint64(20), used) + newReservation.release() +} + +func TestPullerMemoryQuotaOversizedEventRunsAlone(t *testing.T) { + quota := newPullerMemoryQuota(100) + reservation, err := quota.acquire(context.Background(), 1, 101, nil) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { + _, acquireErr := quota.acquire(ctx, 2, 1, nil) + done <- acquireErr + }() + select { + case err := <-done: + t.Fatalf("second event admitted with oversized event: %v", err) + case <-time.After(20 * time.Millisecond): + } + + cancel() + require.ErrorIs(t, <-done, context.Canceled) + reservation.release() +} + +func TestProducerGateStopsWaitingAfterUnsubscribe(t *testing.T) { + quota := newPullerMemoryQuota(100) + reservation, err := quota.acquire(context.Background(), 1, 50, nil) + require.NoError(t, err) + defer reservation.release() + client := &subscriptionClient{memoryQuota: quota} + span := &subscribedSpan{stoppedCh: make(chan struct{})} + + type result struct { + stopped bool + err error + } + done := make(chan result, 1) + go func() { + stopped, waitErr := client.waitRegionScanAllowed(context.Background(), span) + done <- result{stopped: stopped, err: waitErr} + }() + + span.stopped.Store(true) + close(span.stoppedCh) + res := <-done + require.NoError(t, res.err) + require.True(t, res.stopped) +} diff --git a/logservice/logpuller/priority_task.go b/logservice/logpuller/priority_task.go index 68ef9e885d..cffaa42cd2 100644 --- a/logservice/logpuller/priority_task.go +++ b/logservice/logpuller/priority_task.go @@ -33,9 +33,8 @@ const ( ) const ( - highPriorityBase = 0 - lowPriorityBase = 60 * 60 * 24 // 1 day - forcedPriorityBase = 60 * 60 // 60 minutes + highPriorityBase = 0 + lowPriorityBase = 60 * 60 * 24 // 1 day ) func (t TaskType) String() string { diff --git a/logservice/logpuller/region_event_handler.go b/logservice/logpuller/region_event_handler.go index 7e96008c77..c283e55de4 100644 --- a/logservice/logpuller/region_event_handler.go +++ b/logservice/logpuller/region_event_handler.go @@ -56,6 +56,14 @@ type regionEvent struct { entries *cdcpb.Event_Entries_ resolvedTs uint64 + + memoryReservation *pullerMemoryReservation +} + +func (event *regionEvent) releaseMemory() { + if event != nil { + event.memoryReservation.release() + } } func (event *regionEvent) getSize() int { @@ -63,6 +71,7 @@ func (event *regionEvent) getSize() int { return 0 } size := int(unsafe.Sizeof(*event)) + size += int(unsafe.Sizeof(pullerMemoryReservation{})) if event.entries != nil { size += int(unsafe.Sizeof(*event.entries)) size += int(unsafe.Sizeof(*event.entries.Entries)) @@ -95,6 +104,9 @@ func (h *regionEventHandler) Path(event regionEvent) SubscriptionID { } func (h *regionEventHandler) Handle(span *subscribedSpan, events ...regionEvent) bool { + for i := range events { + events[i].releaseMemory() + } startTime := time.Now() hasEntries := false hasResolved := false @@ -226,7 +238,7 @@ func (h *regionEventHandler) GetType(event regionEvent) dynstream.EventType { } func (h *regionEventHandler) OnDrop(event regionEvent) interface{} { - // TODO: Distinguish between drop events caused by "path not found" errors and memory control. + event.releaseMemory() state := event.mustFirstState() fields := []zap.Field{ zap.Bool("hasEntries", event.entries != nil), diff --git a/logservice/logpuller/region_event_handler_test.go b/logservice/logpuller/region_event_handler_test.go index e94d176ed0..4635a638d5 100644 --- a/logservice/logpuller/region_event_handler_test.go +++ b/logservice/logpuller/region_event_handler_test.go @@ -78,7 +78,7 @@ func TestHandleEventEntryEventOutOfOrder(t *testing.T) { ds.AddPath(subID, subSpan, dynstream.AreaSettings{}) worker := ®ionRequestWorker{ - requestCache: &requestCache{}, + requestTracker: newRegionRequestTracker(), } region := newRegionInfo( tikv.RegionVerID{}, @@ -217,7 +217,7 @@ func TestHandleResolvedTs(t *testing.T) { subID1 := SubscriptionID(1) worker := ®ionRequestWorker{ - requestCache: &requestCache{}, + requestTracker: newRegionRequestTracker(), } state1 := newRegionFeedState(regionInfo{verID: tikv.NewRegionVerID(1, 1, 1)}, uint64(subID1), worker) state1.start() diff --git a/logservice/logpuller/region_req_cache.go b/logservice/logpuller/region_req_cache.go index b4478cab07..cf4986ec1f 100644 --- a/logservice/logpuller/region_req_cache.go +++ b/logservice/logpuller/region_req_cache.go @@ -16,326 +16,315 @@ package logpuller import ( "context" "sync" + "sync/atomic" "time" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/metrics" - "go.uber.org/atomic" + "github.com/pingcap/ticdc/utils/heap" "go.uber.org/zap" ) const ( - checkStaleRequestInterval = time.Second * 10 - requestGCLifeTime = time.Minute * 180 - addReqRetryInterval = time.Millisecond * 1 + addReqRetryInterval = time.Millisecond addReqRetryLimit = 3 abnormalRequestDurationInSec = 60 * 60 * 2 // 2 hours ) -// regionReq represents a wrapped region request with state +// regionReq is a pending or in-flight Region register request. type regionReq struct { regionInfo regionInfo createTime time.Time + priority int + heapIndex int } -func newRegionReq(region regionInfo) regionReq { - return regionReq{ +func newRegionReq(region regionInfo, priority int) *regionReq { + return ®ionReq{ regionInfo: region, createTime: time.Now(), + priority: priority, } } -func (r *regionReq) isStale() bool { - return time.Since(r.createTime) > requestGCLifeTime +func (r *regionReq) SetHeapIndex(index int) { + r.heapIndex = index } -// requestCache manages region requests with flow control -type requestCache struct { - // pending requests waiting to be sent - pendingQueue chan regionReq +func (r *regionReq) GetHeapIndex() int { + return r.heapIndex +} - // sent requests waiting for initialization (subscriptionID -> regions -> regionReq) - sentRequests struct { - sync.RWMutex - regionReqs map[SubscriptionID]map[uint64]regionReq +func (r *regionReq) LessThan(other *regionReq) bool { + if r.priority == other.priority { + return r.createTime.Before(other.createTime) } + return r.priority < other.priority +} - // pendingCount is a flow control slot counter. - // A slot is acquired when a request is successfully enqueued into pendingQueue (see add), - // and is released when the request is finished/removed (resolve/markStopped/markDone/clear). - // pop and markSent don't change it. If markSent overwrites an existing request for the same region, - // it will release a slot for the replaced request to avoid leaking pendingCount. - pendingCount atomic.Int64 - // maximum number of pending requests allowed - maxPendingCount int64 - - // channel to signal when space becomes available - spaceAvailable chan struct{} - - lastCheckStaleRequestTime atomic.Time +// regionRegisterQueue is the bounded, store-level queue shared by all workers. +// Queue capacity limits local pending work only; active scans are controlled by +// regionScanLimiter. +type regionRegisterQueue struct { + mu sync.Mutex + requests *heap.Heap[*regionReq] + capacity int + changed chan struct{} } -func newRequestCache(maxPendingCount int) *requestCache { - res := &requestCache{ - pendingQueue: make(chan regionReq, maxPendingCount), // Large buffer to reduce blocking - sentRequests: struct { - sync.RWMutex - regionReqs map[SubscriptionID]map[uint64]regionReq - }{regionReqs: make(map[SubscriptionID]map[uint64]regionReq)}, - pendingCount: atomic.Int64{}, - maxPendingCount: int64(maxPendingCount), - spaceAvailable: make(chan struct{}, 16), // Buffered to avoid blocking +func newRegionRegisterQueue(capacity int) *regionRegisterQueue { + if capacity <= 0 { + capacity = 1 + } + return ®ionRegisterQueue{ + requests: heap.NewHeap[*regionReq](), + capacity: capacity, + changed: make(chan struct{}), } - - res.lastCheckStaleRequestTime.Store(time.Now()) - return res } -// add adds a new region request to the cache -// It blocks if pendingCount >= maxPendingCount until there's space or ctx is cancelled -func (c *requestCache) add(ctx context.Context, region regionInfo, force bool) (bool, error) { +func (q *regionRegisterQueue) add( + ctx context.Context, region regionInfo, priority int, +) (bool, error) { start := time.Now() ticker := time.NewTicker(addReqRetryInterval) defer ticker.Stop() - addReqRetryLimit := addReqRetryLimit + retries := addReqRetryLimit for { - current := c.pendingCount.Load() - if current < c.maxPendingCount || force { - // Try to add the request - req := newRegionReq(region) - select { - case <-ctx.Done(): - return false, ctx.Err() - case c.pendingQueue <- req: - c.pendingCount.Inc() - cost := time.Since(start) - metrics.SubscriptionClientAddRegionRequestDuration.Observe(cost.Seconds()) - return true, nil - case <-ticker.C: - addReqRetryLimit-- - if addReqRetryLimit <= 0 { - return false, nil - } - continue - } + q.mu.Lock() + if q.requests.Len() < q.capacity { + q.requests.AddOrUpdate(newRegionReq(region, priority)) + q.notifyChangedLocked() + q.mu.Unlock() + metrics.SubscriptionClientAddRegionRequestDuration.Observe(time.Since(start).Seconds()) + return true, nil } + changed := q.changed + q.mu.Unlock() - // Wait for space to become available select { + case <-ctx.Done(): + return false, ctx.Err() + case <-changed: case <-ticker.C: - addReqRetryLimit-- - if addReqRetryLimit <= 0 { + retries-- + if retries <= 0 { return false, nil } - continue - case <-c.spaceAvailable: - continue - case <-ctx.Done(): - return false, ctx.Err() } } } -// pop gets the next pending request. -// Note: it doesn't change pendingCount. The slot acquired in add() should be released later -// (e.g. resolve/markStopped/markDone). -func (c *requestCache) pop(ctx context.Context) (regionReq, error) { - select { - case req := <-c.pendingQueue: - return req, nil - case <-ctx.Done(): - return regionReq{}, ctx.Err() +func (q *regionRegisterQueue) tryPopOrNotify() (*regionReq, bool, <-chan struct{}) { + q.mu.Lock() + defer q.mu.Unlock() + req, ok := q.requests.PopTop() + if ok { + q.notifyChangedLocked() + return req, true, nil } + return nil, false, q.changed } -// markSent marks a request as sent and adds it to sent requests. -// It doesn't change pendingCount: the slot is released when the request is finished/removed. -func (c *requestCache) markSent(req regionReq) { - c.sentRequests.Lock() - defer c.sentRequests.Unlock() +func (q *regionRegisterQueue) len() int { + q.mu.Lock() + defer q.mu.Unlock() + return q.requests.Len() +} - m, ok := c.sentRequests.regionReqs[req.regionInfo.subscribedSpan.subID] +func (q *regionRegisterQueue) clear() []regionInfo { + q.mu.Lock() + defer q.mu.Unlock() + regions := make([]regionInfo, 0, q.requests.Len()) + for { + req, ok := q.requests.PopTop() + if !ok { + break + } + regions = append(regions, req.regionInfo) + } + q.notifyChangedLocked() + return regions +} - if !ok { - m = make(map[uint64]regionReq) - c.sentRequests.regionReqs[req.regionInfo.subscribedSpan.subID] = m +func (q *regionRegisterQueue) notifyChangedLocked() { + close(q.changed) + q.changed = make(chan struct{}) +} + +// regionScanLimiter controls the exact number of active incremental scans for +// one store. +type regionScanLimiter struct { + mu sync.Mutex + limit int + active int + available chan struct{} +} + +func newRegionScanLimiter(limit int) *regionScanLimiter { + if limit <= 0 { + limit = 1 } + return ®ionScanLimiter{ + limit: limit, + available: make(chan struct{}), + } +} - if oldReq, exists := m[req.regionInfo.verID.GetID()]; exists { - log.Warn("region request overwritten", - zap.Uint64("subID", uint64(req.regionInfo.subscribedSpan.subID)), - zap.Uint64("regionID", req.regionInfo.verID.GetID()), - zap.Float64("oldAgeSec", time.Since(oldReq.createTime).Seconds()), - zap.Float64("newAgeSec", time.Since(req.createTime).Seconds()), - zap.Int("pendingCount", int(c.pendingCount.Load())), - zap.Int("pendingQueueLen", len(c.pendingQueue))) - c.markDone() +func (l *regionScanLimiter) tryAcquireOrNotify() (*regionScanSlot, <-chan struct{}) { + l.mu.Lock() + defer l.mu.Unlock() + if l.active >= l.limit { + return nil, l.available } - m[req.regionInfo.verID.GetID()] = req + l.active++ + return ®ionScanSlot{limiter: l}, nil } -// markStopped removes a sent request and releases a slot. -func (c *requestCache) markStopped(subID SubscriptionID, regionID uint64) { - c.sentRequests.Lock() - defer c.sentRequests.Unlock() +func (l *regionScanLimiter) usage() (active, limit int) { + l.mu.Lock() + defer l.mu.Unlock() + return l.active, l.limit +} - regionReqs, ok := c.sentRequests.regionReqs[subID] - if !ok { +func (l *regionScanLimiter) release() { + l.mu.Lock() + defer l.mu.Unlock() + if l.active <= 0 { + log.Error("region scan limiter underflow", zap.Int("limit", l.limit)) return } + l.active-- + close(l.available) + l.available = make(chan struct{}) +} + +// regionScanSlot is an idempotent ownership token for one active scan. +type regionScanSlot struct { + limiter *regionScanLimiter + released atomic.Bool +} - _, exists := regionReqs[regionID] - if !exists { +func (s *regionScanSlot) release() { + if s == nil || !s.released.CompareAndSwap(false, true) { return } + s.limiter.release() +} - delete(regionReqs, regionID) - if len(regionReqs) == 0 { - delete(c.sentRequests.regionReqs, subID) - } - c.markDone() +type trackedRegionRequest struct { + req *regionReq + slot *regionScanSlot } -// resolve marks a region as initialized and removes it from sent requests -func (c *requestCache) resolve(subscriptionID SubscriptionID, regionID uint64) bool { - c.sentRequests.Lock() - defer c.sentRequests.Unlock() - regionReqs, ok := c.sentRequests.regionReqs[subscriptionID] - if !ok { - return false +// regionRequestTracker owns the sent Register requests and scan slots for one +// worker's gRPC stream. +type regionRequestTracker struct { + mu sync.Mutex + requests map[SubscriptionID]map[uint64]trackedRegionRequest +} + +func newRegionRequestTracker() *regionRequestTracker { + return ®ionRequestTracker{ + requests: make(map[SubscriptionID]map[uint64]trackedRegionRequest), } +} - req, exists := regionReqs[regionID] - if !exists { +func (t *regionRequestTracker) track(req *regionReq, slot *regionScanSlot) bool { + t.mu.Lock() + defer t.mu.Unlock() + subID := req.regionInfo.subscribedSpan.subID + regionID := req.regionInfo.verID.GetID() + regions := t.requests[subID] + if regions == nil { + regions = make(map[uint64]trackedRegionRequest) + t.requests[subID] = regions + } + if _, exists := regions[regionID]; exists { return false } + regions[regionID] = trackedRegionRequest{req: req, slot: slot} + return true +} - // Check if the subscription ID matches - if req.regionInfo.subscribedSpan.subID == subscriptionID { - delete(regionReqs, regionID) - c.markDone() - cost := time.Since(req.createTime).Seconds() - if cost > 0 && cost < abnormalRequestDurationInSec { - log.Debug("cdc resolve region request", zap.Uint64("subID", uint64(subscriptionID)), zap.Uint64("regionID", regionID), zap.Float64("cost", cost), zap.Int("pendingCount", int(c.pendingCount.Load())), zap.Int("pendingQueueLen", len(c.pendingQueue))) - metrics.RegionRequestFinishScanDuration.Observe(cost) - } else { - log.Info("region request duration abnormal, skip metric", zap.Float64("cost", cost), zap.Uint64("regionID", regionID)) - } - return true +func (t *regionRequestTracker) stop(subID SubscriptionID, regionID uint64) bool { + tracked, ok := t.remove(subID, regionID) + if ok { + tracked.slot.release() } - - return false + return ok } -// clearStaleRequest clears stale requests from the cache -// Note: Sometimes, the CDC sends the same region request to TiKV multiple times. In such cases, this method is needed to reduce the pendingSize. -func (c *requestCache) clearStaleRequest() { - if time.Since(c.lastCheckStaleRequestTime.Load()) < checkStaleRequestInterval { - return +func (t *regionRequestTracker) resolve(subID SubscriptionID, regionID uint64) bool { + tracked, ok := t.remove(subID, regionID) + if !ok { + return false } - c.sentRequests.Lock() - defer c.sentRequests.Unlock() - reqCount := 0 - for subID, regionReqs := range c.sentRequests.regionReqs { - for regionID, regionReq := range regionReqs { - if regionReq.regionInfo.isStopped() || - regionReq.regionInfo.subscribedSpan.stopped.Load() || - regionReq.regionInfo.lockedRangeState.Initialized.Load() || - regionReq.isStale() { - c.markDone() - log.Warn("region worker delete stale region request", - zap.Uint64("subID", uint64(subID)), - zap.Uint64("regionID", regionID), - zap.Int("pendingCount", int(c.pendingCount.Load())), - zap.Int("pendingQueueLen", len(c.pendingQueue)), - zap.Bool("isRegionStopped", regionReq.regionInfo.isStopped()), - zap.Bool("isSubscribedSpanStopped", regionReq.regionInfo.subscribedSpan.stopped.Load()), - zap.Bool("isStale", regionReq.isStale()), - zap.Time("createTime", regionReq.createTime)) - delete(regionReqs, regionID) - } else { - reqCount++ - } - } - if len(regionReqs) == 0 { - delete(c.sentRequests.regionReqs, subID) - } + tracked.slot.release() + cost := time.Since(tracked.req.createTime).Seconds() + if cost > 0 && cost < abnormalRequestDurationInSec { + metrics.RegionRequestFinishScanDuration.Observe(cost) + } else { + log.Info("region request duration abnormal, skip metric", + zap.Float64("cost", cost), + zap.Uint64("regionID", regionID)) } + return true +} - // If there are no in-cache region requests but pendingCount isn't 0, it means pendingCount is stale. - // Reset it to avoid blocking add() forever. - if reqCount == 0 && len(c.pendingQueue) == 0 && c.pendingCount.Load() != 0 { - log.Info("region worker pending request count is not equal to actual region request count, correct it", - zap.Int("pendingCount", int(c.pendingCount.Load())), - zap.Int("actualReqCount", reqCount), - zap.Int("pendingQueueLen", len(c.pendingQueue))) - c.pendingCount.Store(0) - // Notify waiting add operations that there's space available. - select { - case c.spaceAvailable <- struct{}{}: - default: - } +func (t *regionRequestTracker) remove( + subID SubscriptionID, regionID uint64, +) (trackedRegionRequest, bool) { + t.mu.Lock() + defer t.mu.Unlock() + regions := t.requests[subID] + if regions == nil { + return trackedRegionRequest{}, false } - - c.lastCheckStaleRequestTime.Store(time.Now()) + tracked, ok := regions[regionID] + if !ok { + return trackedRegionRequest{}, false + } + delete(regions, regionID) + if len(regions) == 0 { + delete(t.requests, subID) + } + return tracked, true } -// clear removes all requests and returns them -func (c *requestCache) clear() []regionInfo { - var regions []regionInfo - - // Drain pending requests from channel -LOOP: - for { - select { - case req := <-c.pendingQueue: - regions = append(regions, req.regionInfo) - c.markDone() - default: - break LOOP - } +func (t *regionRequestTracker) removeSubscription(subID SubscriptionID) { + t.mu.Lock() + regions := t.requests[subID] + delete(t.requests, subID) + t.mu.Unlock() + for _, tracked := range regions { + tracked.slot.release() } +} - c.sentRequests.Lock() - defer c.sentRequests.Unlock() +func (t *regionRequestTracker) clear() []regionInfo { + t.mu.Lock() + requests := t.requests + t.requests = make(map[SubscriptionID]map[uint64]trackedRegionRequest) + t.mu.Unlock() - for subID, regionReqs := range c.sentRequests.regionReqs { - for regionID := range regionReqs { - regions = append(regions, regionReqs[regionID].regionInfo) - delete(regionReqs, regionID) - c.markDone() + var regions []regionInfo + for _, trackedRegions := range requests { + for _, tracked := range trackedRegions { + regions = append(regions, tracked.req.regionInfo) + tracked.slot.release() } - delete(c.sentRequests.regionReqs, subID) } return regions } -// getPendingCount returns the current pending count -func (c *requestCache) getPendingCount() int { - return int(c.pendingCount.Load()) -} - -func (c *requestCache) markDone() { - // Decrement pendingCount by 1, but never let it go below 0. - // Do it with CAS to avoid clobbering concurrent Inc() calls. - for { - old := c.pendingCount.Load() - if old == 0 { - break - } else if old < 0 { - if c.pendingCount.CompareAndSwap(old, 0) { - break - } - } else { - if c.pendingCount.CompareAndSwap(old, old-1) { - break - } - } - } - // Notify waiting add operations that there's space available. - select { - case c.spaceAvailable <- struct{}{}: - default: // If channel is full, skip notification +func (t *regionRequestTracker) len() int { + t.mu.Lock() + defer t.mu.Unlock() + count := 0 + for _, regions := range t.requests { + count += len(regions) } + return count } diff --git a/logservice/logpuller/region_req_cache_test.go b/logservice/logpuller/region_req_cache_test.go index 62706a8542..badcdd7e02 100644 --- a/logservice/logpuller/region_req_cache_test.go +++ b/logservice/logpuller/region_req_cache_test.go @@ -15,8 +15,8 @@ package logpuller import ( "context" + "sync" "testing" - "time" "github.com/pingcap/ticdc/heartbeatpb" "github.com/stretchr/testify/require" @@ -25,312 +25,187 @@ import ( func createTestRegionInfo(subID SubscriptionID, regionID uint64) regionInfo { verID := tikv.NewRegionVerID(regionID, 1, 1) - span := heartbeatpb.TableSpan{ TableID: 1, StartKey: []byte("start"), EndKey: []byte("end"), } - - subscribedSpan := &subscribedSpan{ + return newRegionInfo(verID, span, nil, &subscribedSpan{ subID: subID, startTs: 100, span: span, - } - - return newRegionInfo(verID, span, nil, subscribedSpan, false) + }, false) } -func TestRequestCacheAdd_NormalCase(t *testing.T) { - cache := newRequestCache(10) +func TestRegionRegisterQueuePriorityAndCapacity(t *testing.T) { + queue := newRegionRegisterQueue(2) ctx := context.Background() - region := createTestRegionInfo(1, 1) - - ok, err := cache.add(ctx, region, false) + ok, err := queue.add(ctx, createTestRegionInfo(1, 1), 10) require.NoError(t, err) require.True(t, ok) - require.Equal(t, 1, cache.getPendingCount()) - - // Verify the request was added to the queue - req, err := cache.pop(ctx) + ok, err = queue.add(ctx, createTestRegionInfo(1, 2), 1) require.NoError(t, err) - require.NotNil(t, req) - require.Equal(t, region.verID.GetID(), req.regionInfo.verID.GetID()) - require.Equal(t, region.subscribedSpan.subID, req.regionInfo.subscribedSpan.subID) -} - -func TestRequestCacheAdd_ForceFlag(t *testing.T) { - cache := newRequestCache(1) - ctx := context.Background() - - // Fill up the cache - region1 := createTestRegionInfo(1, 1) - ok, err := cache.add(ctx, region1, false) require.True(t, ok) + ok, err = queue.add(ctx, createTestRegionInfo(1, 3), 0) require.NoError(t, err) - require.Equal(t, 1, cache.getPendingCount()) - - // Try to add another request without force - should fail due to retry limit - region2 := createTestRegionInfo(1, 2) - ok, err = cache.add(ctx, region2, false) - require.False(t, ok) - require.NoError(t, err) - - // With force=true, it should still fail because the channel is full - // The force flag only bypasses the pendingCount check, not the channel capacity - region3 := createTestRegionInfo(1, 3) - ok, err = cache.add(ctx, region3, true) require.False(t, ok) - require.NoError(t, err) - // consume the pending queue ann add with force - req, err := cache.pop(ctx) - require.NoError(t, err) - require.NotNil(t, req) - require.Equal(t, region1.verID.GetID(), req.regionInfo.verID.GetID()) - require.Equal(t, region1.subscribedSpan.subID, req.regionInfo.subscribedSpan.subID) - cache.markSent(req) - require.Equal(t, 1, cache.getPendingCount()) - - ok, err = cache.add(ctx, region3, true) + req, ok, _ := queue.tryPopOrNotify() require.True(t, ok) - require.NoError(t, err) - // It is 2 since region1 is unresolved - require.Equal(t, 2, cache.getPendingCount()) + require.Equal(t, uint64(2), req.regionInfo.verID.GetID()) - // resolve region1 - cache.resolve(region1.subscribedSpan.subID, region1.verID.GetID()) - require.Equal(t, 1, cache.getPendingCount()) -} - -func TestRequestCacheAdd_ContextCancellation(t *testing.T) { - cache := newRequestCache(1) - - // Fill up the cache - region1 := createTestRegionInfo(1, 1) - ctx1 := context.Background() - ok, err := cache.add(ctx1, region1, false) - require.True(t, ok) + ok, err = queue.add(ctx, createTestRegionInfo(1, 3), 0) require.NoError(t, err) - - // Try to add another request with a cancelled context - ctx2, cancel := context.WithCancel(context.Background()) - cancel() // Cancel immediately - - region2 := createTestRegionInfo(1, 2) - ok, err = cache.add(ctx2, region2, false) - require.False(t, ok) - require.Error(t, err) - require.Equal(t, context.Canceled, err) -} - -func TestRequestCacheAdd_RetryLimitExceeded(t *testing.T) { - cache := newRequestCache(1) - ctx := context.Background() - - // Fill up the cache - region1 := createTestRegionInfo(1, 1) - ok, err := cache.add(ctx, region1, false) require.True(t, ok) - require.NoError(t, err) - - // Try to add another request - should eventually hit retry limit - region2 := createTestRegionInfo(1, 2) - ok, err = cache.add(ctx, region2, false) - require.False(t, ok) - require.NoError(t, err) + req, ok, _ = queue.tryPopOrNotify() + require.True(t, ok) + require.Equal(t, uint64(3), req.regionInfo.verID.GetID()) + require.Equal(t, 1, queue.len()) } -func TestRequestCacheAdd_SpaceAvailableNotification(t *testing.T) { - cache := newRequestCache(2) - ctx := context.Background() - - // Fill up the cache - region1 := createTestRegionInfo(1, 1) - ok, err := cache.add(ctx, region1, false) - require.True(t, ok) +func TestRegionRegisterQueueContextCancellation(t *testing.T) { + queue := newRegionRegisterQueue(1) + ok, err := queue.add(context.Background(), createTestRegionInfo(1, 1), 0) require.NoError(t, err) - require.Equal(t, 1, cache.getPendingCount()) - - region2 := createTestRegionInfo(1, 2) - ok, err = cache.add(ctx, region2, false) require.True(t, ok) - require.NoError(t, err) - require.Equal(t, 2, cache.getPendingCount()) - - // Pop a request and mark it as sent, then resolve it to free up space - req, err := cache.pop(ctx) - require.NoError(t, err) - require.NotNil(t, req) - require.Equal(t, 2, cache.getPendingCount()) // pop doesn't change pendingCount - // Mark as sent - cache.markSent(req) - require.Equal(t, 2, cache.getPendingCount()) - - // Resolve the request to free up space - success := cache.resolve(req.regionInfo.subscribedSpan.subID, req.regionInfo.verID.GetID()) - require.True(t, success) - require.Equal(t, 1, cache.getPendingCount()) - // Now we should be able to add another request - region3 := createTestRegionInfo(1, 3) - ok, err = cache.add(ctx, region3, false) - require.True(t, ok) - require.NoError(t, err) - require.Equal(t, 2, cache.getPendingCount()) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + ok, err = queue.add(ctx, createTestRegionInfo(1, 2), 0) + require.ErrorIs(t, err, context.Canceled) + require.False(t, ok) } -func TestRequestCacheAdd_ConcurrentAdds(t *testing.T) { - cache := newRequestCache(10) - ctx := context.Background() - - const numGoroutines = 5 - done := make(chan error, numGoroutines) - - // Start multiple goroutines adding requests concurrently - for i := 0; i < numGoroutines; i++ { - go func(id int) { - region := createTestRegionInfo(SubscriptionID(id%3), uint64(id)) - ok, err := cache.add(ctx, region, false) - require.True(t, ok) - require.NoError(t, err) - done <- err - }(i) +func TestRegionScanLimiterExactLimitAndIdempotentRelease(t *testing.T) { + limiter := newRegionScanLimiter(2) + slot1, _ := limiter.tryAcquireOrNotify() + slot2, _ := limiter.tryAcquireOrNotify() + slot3, notify := limiter.tryAcquireOrNotify() + require.NotNil(t, slot1) + require.NotNil(t, slot2) + require.Nil(t, slot3) + require.NotNil(t, notify) + require.Equal(t, 2, activeScanCount(limiter)) + + slot1.release() + slot1.release() + require.Equal(t, 1, activeScanCount(limiter)) + select { + case <-notify: + default: + t.Fatal("slot release must notify waiters") } - // Wait for all goroutines to complete - for i := 0; i < numGoroutines; i++ { - select { - case err := <-done: - require.NoError(t, err) - case <-time.After(1 * time.Second): - t.Fatal("Timeout waiting for concurrent adds to complete") - } - } - - require.Equal(t, numGoroutines, cache.getPendingCount()) + slot3, _ = limiter.tryAcquireOrNotify() + require.NotNil(t, slot3) + require.Equal(t, 2, activeScanCount(limiter)) + slot2.release() + slot3.release() + require.Equal(t, 0, activeScanCount(limiter)) } -func TestRequestCacheAdd_StaleRequestCleanup(t *testing.T) { - cache := newRequestCache(10) - ctx := context.Background() - - // Add a request and mark it as sent - region := createTestRegionInfo(1, 1) - ok, err := cache.add(ctx, region, false) - require.True(t, ok) - require.NoError(t, err) - - req, err := cache.pop(ctx) - require.NoError(t, err) - require.NotNil(t, req) - - // Mark as sent - cache.markSent(req) - require.Equal(t, 1, cache.getPendingCount()) - - // Manually set the request as stale by modifying createTime - cache.sentRequests.Lock() - regionReqs := cache.sentRequests.regionReqs[req.regionInfo.subscribedSpan.subID] - regionReqs[req.regionInfo.verID.GetID()] = regionReq{ - regionInfo: req.regionInfo, - createTime: time.Now().Add(-requestGCLifeTime - time.Second), // Make it stale +func TestRegionScanLimiterNeverExceedsLimit(t *testing.T) { + const ( + limit = 4 + goroutines = 32 + ) + limiter := newRegionScanLimiter(limit) + var wg sync.WaitGroup + start := make(chan struct{}) + maxActive := 0 + var maxMu sync.Mutex + + for i := 0; i < goroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + <-start + for { + slot, notify := limiter.tryAcquireOrNotify() + if slot == nil { + <-notify + continue + } + active := activeScanCount(limiter) + maxMu.Lock() + if active > maxActive { + maxActive = active + } + maxMu.Unlock() + slot.release() + return + } + }() } - cache.sentRequests.Unlock() - - // Manually set lastCheckStaleRequestTime to bypass the time interval check - cache.lastCheckStaleRequestTime.Store(time.Now().Add(-checkStaleRequestInterval - time.Second)) - - // Manually trigger stale cleanup by calling clearStaleRequest - cache.clearStaleRequest() - - // The stale request should be cleaned up - require.Equal(t, 0, cache.getPendingCount()) + close(start) + wg.Wait() + require.LessOrEqual(t, maxActive, limit) + require.Equal(t, 0, activeScanCount(limiter)) } -func TestRequestCacheAdd_WithStoppedRegion(t *testing.T) { - cache := newRequestCache(10) - ctx := context.Background() - - // Create a region info with stopped state (lockedRangeState = nil) - region := createTestRegionInfo(1, 1) - region.lockedRangeState = nil // This makes it stopped - - ok, err := cache.add(ctx, region, false) - require.True(t, ok) - require.NoError(t, err) - require.Equal(t, 1, cache.getPendingCount()) - - req, err := cache.pop(ctx) - require.NoError(t, err) - require.NotNil(t, req) - - // Mark as sent - cache.markSent(req) - require.Equal(t, 1, cache.getPendingCount()) - - // Manually set lastCheckStaleRequestTime to bypass the time interval check - cache.lastCheckStaleRequestTime.Store(time.Now().Add(-checkStaleRequestInterval - time.Second)) - - // Manually trigger cleanup of stopped region - cache.clearStaleRequest() +func TestRegionRequestTrackerTerminalReleasePaths(t *testing.T) { + testCases := []struct { + name string + release func(*testing.T, *regionRequestTracker, SubscriptionID, uint64) + }{ + { + name: "initialized", + release: func(t *testing.T, tracker *regionRequestTracker, subID SubscriptionID, regionID uint64) { + require.True(t, tracker.resolve(subID, regionID)) + }, + }, + { + name: "region error or send failure", + release: func(t *testing.T, tracker *regionRequestTracker, subID SubscriptionID, regionID uint64) { + require.True(t, tracker.stop(subID, regionID)) + }, + }, + { + name: "deregister", + release: func(_ *testing.T, tracker *regionRequestTracker, subID SubscriptionID, _ uint64) { + tracker.removeSubscription(subID) + }, + }, + { + name: "reconnect or shutdown", + release: func(t *testing.T, tracker *regionRequestTracker, _ SubscriptionID, _ uint64) { + require.Len(t, tracker.clear(), 1) + }, + }, + } - // The stopped region should be cleaned up - require.Equal(t, 0, cache.getPendingCount()) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + limiter := newRegionScanLimiter(1) + slot, _ := limiter.tryAcquireOrNotify() + tracker := newRegionRequestTracker() + req := newRegionReq(createTestRegionInfo(1, 1), 0) + require.True(t, tracker.track(req, slot)) + require.Equal(t, 1, activeScanCount(limiter)) + + tc.release(t, tracker, 1, 1) + require.Equal(t, 0, activeScanCount(limiter)) + require.Equal(t, 0, tracker.len()) + require.False(t, tracker.stop(1, 1)) + }) + } } -func TestRequestCacheMarkSent_DuplicateReleaseSlot(t *testing.T) { - cache := newRequestCache(10) - ctx := context.Background() - - region := createTestRegionInfo(1, 1) - - ok, err := cache.add(ctx, region, false) - require.True(t, ok) - require.NoError(t, err) - - // Add a duplicate request for the same region. It should not leak pendingCount even if - // markSent overwrites the existing entry. - ok, err = cache.add(ctx, region, false) - require.True(t, ok) - require.NoError(t, err) - require.Equal(t, 2, cache.getPendingCount()) - - req1, err := cache.pop(ctx) - require.NoError(t, err) - cache.markSent(req1) - require.Equal(t, 2, cache.getPendingCount()) - - req2, err := cache.pop(ctx) - require.NoError(t, err) - cache.markSent(req2) - require.Equal(t, 1, cache.getPendingCount()) - - // Finish the remaining tracked request. - require.True(t, cache.resolve(region.subscribedSpan.subID, region.verID.GetID())) - require.Equal(t, 0, cache.getPendingCount()) +func TestRegionRequestTrackerRejectsDuplicate(t *testing.T) { + limiter := newRegionScanLimiter(2) + tracker := newRegionRequestTracker() + req := newRegionReq(createTestRegionInfo(1, 1), 0) + slot1, _ := limiter.tryAcquireOrNotify() + slot2, _ := limiter.tryAcquireOrNotify() + require.True(t, tracker.track(req, slot1)) + require.False(t, tracker.track(newRegionReq(req.regionInfo, 0), slot2)) + slot2.release() + tracker.clear() + require.Equal(t, 0, activeScanCount(limiter)) } -func TestRequestCacheMarkStopped_ReleasesSlot(t *testing.T) { - cache := newRequestCache(10) - ctx := context.Background() - - region := createTestRegionInfo(1, 1) - - ok, err := cache.add(ctx, region, false) - require.True(t, ok) - require.NoError(t, err) - require.Equal(t, 1, cache.getPendingCount()) - - req, err := cache.pop(ctx) - require.NoError(t, err) - - cache.markSent(req) - require.Equal(t, 1, cache.getPendingCount()) - require.Contains(t, cache.sentRequests.regionReqs, req.regionInfo.subscribedSpan.subID) - - cache.markStopped(req.regionInfo.subscribedSpan.subID, req.regionInfo.verID.GetID()) - require.Equal(t, 0, cache.getPendingCount()) - require.NotContains(t, cache.sentRequests.regionReqs, req.regionInfo.subscribedSpan.subID) +func activeScanCount(limiter *regionScanLimiter) int { + active, _ := limiter.usage() + return active } diff --git a/logservice/logpuller/region_request_worker.go b/logservice/logpuller/region_request_worker.go index 142f4c6493..f8b616dbaa 100644 --- a/logservice/logpuller/region_request_worker.go +++ b/logservice/logpuller/region_request_worker.go @@ -48,10 +48,12 @@ type regionRequestWorker struct { // we must always get a region to request before create a grpc stream. // only in this way we can avoid to try to connect to an offline store infinitely. - preFetchForConnecting *regionInfo + preFetchForConnecting *regionReq - // request cache with flow control - requestCache *requestCache + // controlQueue contains Deregister requests for this worker's gRPC stream. + controlQueue chan *regionReq + // requestTracker owns Register requests sent on this worker's gRPC stream. + requestTracker *regionRequestTracker // all regions maintained by this worker. requestedRegions struct { @@ -67,13 +69,14 @@ func newRegionRequestWorker( credential *security.Credential, g *errgroup.Group, store *requestedStore, - requestCacheSize int, + controlQueueSize int, ) *regionRequestWorker { worker := ®ionRequestWorker{ - workerID: workerIDGen.Add(1), - client: client, - store: store, - requestCache: newRequestCache(requestCacheSize), + workerID: workerIDGen.Add(1), + client: client, + store: store, + controlQueue: make(chan *regionReq, controlQueueSize), + requestTracker: newRegionRequestTracker(), } worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates) @@ -84,16 +87,14 @@ func newRegionRequestWorker( zap.String("addr", store.storeAddr)) } for { - req, err := worker.requestCache.pop(ctx) + req, err := worker.popRequest(ctx) if err != nil { return err } if req.regionInfo.isStopped() { - worker.requestCache.markDone() continue } - worker.preFetchForConnecting = new(regionInfo) - *worker.preFetchForConnecting = req.regionInfo + worker.preFetchForConnecting = req return nil } } @@ -362,11 +363,33 @@ func (s *regionRequestWorker) processRegionSendTask( // TODO: add a metric? return nil } + handleDeregister := func(region regionInfo) error { + subID := region.subscribedSpan.subID + req := &cdcpb.ChangeDataRequest{ + Header: &cdcpb.Header{ClusterId: s.client.clusterID, TicdcVersion: version.ReleaseSemver()}, + RequestId: uint64(subID), + Request: &cdcpb.ChangeDataRequest_Deregister_{ + Deregister: &cdcpb.ChangeDataRequest_Deregister{}, + }, + FilterLoop: region.filterLoop, + } + if err := doSend(req); err != nil { + return err + } + for _, state := range s.takeRegionStates(subID) { + state.markStopped(&requestCancelledErr{}) + regionEvent := regionEvent{ + states: []*regionFeedState{state}, + } + s.client.pushRegionEventToDS(subID, regionEvent) + } + s.requestTracker.removeSubscription(subID) + return nil + } // Handle pre-fetched region first - region := *s.preFetchForConnecting + regionReq := s.preFetchForConnecting s.preFetchForConnecting = nil - regionReq := newRegionReq(region) var err error for { region := regionReq.regionInfo @@ -380,57 +403,70 @@ func (s *regionRequestWorker) processRegionSendTask( // It means it's a special task for stopping the table. if region.isStopped() { - req := &cdcpb.ChangeDataRequest{ - Header: &cdcpb.Header{ClusterId: s.client.clusterID, TicdcVersion: version.ReleaseSemver()}, - RequestId: uint64(subID), - Request: &cdcpb.ChangeDataRequest_Deregister_{ - Deregister: &cdcpb.ChangeDataRequest_Deregister{}, - }, - FilterLoop: region.filterLoop, - } - s.requestCache.markDone() - if err := doSend(req); err != nil { + if err := handleDeregister(region); err != nil { return err } - for _, state := range s.takeRegionStates(subID) { - state.markStopped(&requestCancelledErr{}) - regionEvent := regionEvent{ - states: []*regionFeedState{state}, - } - s.client.pushRegionEventToDS(subID, regionEvent) - } } else if region.subscribedSpan.stopped.Load() { // It can be skipped directly because there must be no pending states from // the stopped subscribedTable, or the special singleRegionInfo for stopping // the table will be handled later. s.client.onRegionFail(newRegionErrorInfo(region, &storeStreamErr{})) - s.requestCache.markDone() } else { + var slot *regionScanSlot + for slot == nil { + var resume <-chan struct{} + var scanSlotAvailable <-chan struct{} + paused := false + if s.client.memoryQuota != nil { + resume, paused = s.client.memoryQuota.regionScanResumeNotify() + } + if !paused { + slot, scanSlotAvailable = s.store.scanLimiter.tryAcquireOrNotify() + if slot != nil { + break + } + } + + select { + case <-ctx.Done(): + return ctx.Err() + case controlReq := <-s.controlQueue: + if err := handleDeregister(controlReq.regionInfo); err != nil { + return err + } + case <-resume: + case <-scanSlotAvailable: + } + + if region.subscribedSpan.stopped.Load() { + s.client.onRegionFail(newRegionErrorInfo(region, &storeStreamErr{})) + break + } + } + if slot == nil { + regionReq, err = s.popRequest(ctx) + if err != nil { + return err + } + continue + } + if !s.requestTracker.track(regionReq, slot) { + slot.release() + regionReq, err = s.popRequest(ctx) + if err != nil { + return err + } + continue + } state := newRegionFeedState(region, uint64(subID), s) state.start() s.addRegionState(subID, region.verID.GetID(), state) - // Mark the request as sent before sending it. - // Otherwise there is a race with the receiver goroutine: - // 1. addRegionState makes the region visible to error handling. - // 2. doSend sends the request. - // 3. the receiver goroutine may receive a region error immediately. - // 4. markStopped runs before markSent, so requestCache.markStopped cannot - // find the request in sentRequests. - // 5. the sender goroutine then calls markSent and leaves a stale sent - // request behind, even though the region has already been - // unlocked/rescheduled. - // - // Tracking the request before Send keeps requestedRegions and - // sentRequests visible in the same order and avoids leaving stale - // requests in cleanup. - s.requestCache.markSent(regionReq) if err := doSend(s.createRegionRequest(region)); err != nil { state.markStopped(err) return err } } - // Try to get from cache - regionReq, err = s.requestCache.pop(ctx) + regionReq, err = s.popRequest(ctx) if err != nil { return err } @@ -502,10 +538,40 @@ func (s *regionRequestWorker) clearRegionStates() map[SubscriptionID]regionFeedS return subscriptions } -// add adds a region request to the worker's cache -// It blocks if the cache is full until there's space or ctx is cancelled -func (s *regionRequestWorker) add(ctx context.Context, region regionInfo, force bool) (bool, error) { - return s.requestCache.add(ctx, region, force) +func (s *regionRequestWorker) addControl( + ctx context.Context, region regionInfo, +) (bool, error) { + timer := time.NewTimer(addReqRetryInterval * addReqRetryLimit) + defer timer.Stop() + select { + case <-ctx.Done(): + return false, ctx.Err() + case s.controlQueue <- newRegionReq(region, 0): + return true, nil + case <-timer.C: + return false, nil + } +} + +func (s *regionRequestWorker) popRequest(ctx context.Context) (*regionReq, error) { + for { + select { + case req := <-s.controlQueue: + return req, nil + default: + } + req, ok, registerChanged := s.store.registerQueue.tryPopOrNotify() + if ok { + return req, nil + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case req := <-s.controlQueue: + return req, nil + case <-registerChanged: + } + } } func (s *regionRequestWorker) clearPendingRegions() []regionInfo { @@ -513,16 +579,11 @@ func (s *regionRequestWorker) clearPendingRegions() []regionInfo { // Clear pre-fetched region if s.preFetchForConnecting != nil { - region := *s.preFetchForConnecting + region := s.preFetchForConnecting.regionInfo s.preFetchForConnecting = nil regions = append(regions, region) - // The pre-fetched region was popped from pendingQueue but hasn't been marked as sent or done yet. - // Release its pendingCount slot to avoid leaking flow control credits on worker failures. - s.requestCache.markDone() } - // Clear all regions from cache - cacheRegions := s.requestCache.clear() - regions = append(regions, cacheRegions...) + regions = append(regions, s.requestTracker.clear()...) return regions } diff --git a/logservice/logpuller/region_request_worker_test.go b/logservice/logpuller/region_request_worker_test.go index f9752a0eb3..ba89ec6068 100644 --- a/logservice/logpuller/region_request_worker_test.go +++ b/logservice/logpuller/region_request_worker_test.go @@ -17,6 +17,7 @@ import ( "context" "io" "testing" + "time" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/cdcpb" @@ -37,6 +38,16 @@ type mockEventFeedV2Client struct { recvErr error } +type recordingEventFeedV2Client struct { + *mockEventFeedV2Client + sent chan *cdcpb.ChangeDataRequest +} + +func (m *recordingEventFeedV2Client) Send(req *cdcpb.ChangeDataRequest) error { + m.sent <- req + return nil +} + func (m *mockEventFeedV2Client) Send(*cdcpb.ChangeDataRequest) error { return m.sendErr } func (m *mockEventFeedV2Client) Recv() (*cdcpb.ChangeDataEvent, error) { return nil, m.recvErr } func (m *mockEventFeedV2Client) Header() (metadata.MD, error) { return metadata.MD{}, nil } @@ -57,6 +68,25 @@ func prepareRegionForSendTest(region regionInfo) regionInfo { return region } +func newTestRegionRequestWorker( + queueSize int, client *subscriptionClient, +) *regionRequestWorker { + store := &requestedStore{ + storeAddr: "store-1", + registerQueue: newRegionRegisterQueue(queueSize), + scanLimiter: newRegionScanLimiter(queueSize), + } + worker := ®ionRequestWorker{ + client: client, + store: store, + controlQueue: make(chan *regionReq, queueSize), + requestTracker: newRegionRequestTracker(), + } + worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates) + store.requestWorkers.s = []*regionRequestWorker{worker} + return worker +} + func TestRegionStatesOperation(t *testing.T) { worker := ®ionRequestWorker{} worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates) @@ -77,29 +107,23 @@ func TestRegionStatesOperation(t *testing.T) { require.Equal(t, 0, len(worker.requestedRegions.subscriptions)) } -func TestClearPendingRegionsReleaseSlotForPreFetchedRegion(t *testing.T) { - worker := ®ionRequestWorker{ - requestCache: newRequestCache(10), - } +func TestClearPendingRegionsReturnsPreFetchedRegion(t *testing.T) { + worker := newTestRegionRequestWorker(10, &subscriptionClient{}) ctx := context.Background() region := createTestRegionInfo(1, 1) - ok, err := worker.requestCache.add(ctx, region, false) + ok, err := worker.store.registerQueue.add(ctx, region, 0) require.NoError(t, err) require.True(t, ok) - req, err := worker.requestCache.pop(ctx) - require.NoError(t, err) - require.Equal(t, 1, worker.requestCache.getPendingCount()) - - worker.preFetchForConnecting = new(regionInfo) - *worker.preFetchForConnecting = req.regionInfo + req, ok, _ := worker.store.registerQueue.tryPopOrNotify() + require.True(t, ok) + worker.preFetchForConnecting = req regions := worker.clearPendingRegions() require.Len(t, regions, 1) require.Nil(t, worker.preFetchForConnecting) - require.Equal(t, 0, worker.requestCache.getPendingCount()) } type pushedResolvedEvent struct { @@ -270,58 +294,36 @@ func BenchmarkDispatchResolvedTsEventSmallBatchCurrent(b *testing.B) { } func TestClearPendingRegionsDoesNotReturnStoppedSentRegion(t *testing.T) { - worker := ®ionRequestWorker{ - requestCache: newRequestCache(10), - } - worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates) + worker := newTestRegionRequestWorker(10, &subscriptionClient{}) - ctx := context.Background() region := createTestRegionInfo(1, 1) - - ok, err := worker.requestCache.add(ctx, region, false) - require.NoError(t, err) - require.True(t, ok) - - req, err := worker.requestCache.pop(ctx) - require.NoError(t, err) + req := newRegionReq(region, 0) + slot, _ := worker.store.scanLimiter.tryAcquireOrNotify() + require.True(t, worker.requestTracker.track(req, slot)) state := newRegionFeedState(req.regionInfo, uint64(req.regionInfo.subscribedSpan.subID), worker) state.start() worker.addRegionState(req.regionInfo.subscribedSpan.subID, req.regionInfo.verID.GetID(), state) - // Simulate the race we are fixing in processRegionSendTask: - // once a request is visible in sentRequests, a fast region error may mark the - // region stopped before worker cleanup runs. In that case, markStopped should - // remove the sent request immediately, so clearPendingRegions must not return - // the stale region again during worker shutdown. - worker.requestCache.markSent(req) state.markStopped(errors.New("send request to store error")) worker.takeRegionState(req.regionInfo.subscribedSpan.subID, req.regionInfo.verID.GetID()) - require.Equal(t, 0, worker.requestCache.getPendingCount()) + require.Equal(t, 0, activeScanCount(worker.store.scanLimiter)) require.Empty(t, worker.clearPendingRegions()) } func TestProcessRegionSendTaskSendFailureCleansSentRequest(t *testing.T) { - worker := ®ionRequestWorker{ - requestCache: newRequestCache(10), - store: &requestedStore{storeAddr: "store-1"}, - client: &subscriptionClient{}, - } - worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates) + worker := newTestRegionRequestWorker(10, &subscriptionClient{}) ctx := context.Background() region := prepareRegionForSendTest(createTestRegionInfo(1, 1)) - ok, err := worker.requestCache.add(ctx, region, false) + ok, err := worker.store.registerQueue.add(ctx, region, 0) require.NoError(t, err) require.True(t, ok) - require.Equal(t, 1, worker.requestCache.getPendingCount()) - - req, err := worker.requestCache.pop(ctx) - require.NoError(t, err) - worker.preFetchForConnecting = new(regionInfo) - *worker.preFetchForConnecting = req.regionInfo + req, ok, _ := worker.store.registerQueue.tryPopOrNotify() + require.True(t, ok) + worker.preFetchForConnecting = req sendErr := errors.New("send failed") conn := &ConnAndClient{ @@ -331,12 +333,143 @@ func TestProcessRegionSendTaskSendFailureCleansSentRequest(t *testing.T) { err = worker.processRegionSendTask(ctx, conn) require.ErrorIs(t, err, sendErr) - require.Equal(t, 0, worker.requestCache.getPendingCount()) - require.Empty(t, worker.requestCache.sentRequests.regionReqs) + require.Equal(t, 0, activeScanCount(worker.store.scanLimiter)) + require.Equal(t, 0, worker.requestTracker.len()) state := worker.getRegionState(req.regionInfo.subscribedSpan.subID, req.regionInfo.verID.GetID()) require.True(t, state == nil || state.isStale(), "region state should be removed or marked stale after send failure") } +func TestProcessRegionSendTaskAllowsDeregisterWhileScanPaused(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + quota := newPullerMemoryQuota(100) + reservation, err := quota.acquire(ctx, 99, 50, nil) + require.NoError(t, err) + worker := newTestRegionRequestWorker(10, &subscriptionClient{memoryQuota: quota}) + + registerRegion := prepareRegionForSendTest(createTestRegionInfo(1, 1)) + ok, err := worker.store.registerQueue.add(ctx, registerRegion, 0) + require.NoError(t, err) + require.True(t, ok) + registerReq, ok, _ := worker.store.registerQueue.tryPopOrNotify() + require.True(t, ok) + worker.preFetchForConnecting = registerReq + + deregisterRegion := regionInfo{subscribedSpan: &subscribedSpan{subID: 2}} + ok, err = worker.addControl(ctx, deregisterRegion) + require.NoError(t, err) + require.True(t, ok) + + client := &recordingEventFeedV2Client{ + mockEventFeedV2Client: &mockEventFeedV2Client{}, + sent: make(chan *cdcpb.ChangeDataRequest, 2), + } + errCh := make(chan error, 1) + go func() { + errCh <- worker.processRegionSendTask(ctx, &ConnAndClient{ + Client: client, + Conn: &grpc.ClientConn{}, + }) + }() + + request := <-client.sent + require.NotNil(t, request.GetDeregister()) + require.Equal(t, uint64(2), request.RequestId) + select { + case request = <-client.sent: + t.Fatalf("register request sent while scan gate was closed: %v", request) + case <-time.After(20 * time.Millisecond): + } + + reservation.release() + request = <-client.sent + require.Nil(t, request.GetDeregister()) + require.Equal(t, uint64(1), request.RequestId) + + cancel() + require.ErrorIs(t, <-errCh, context.Canceled) +} + +func TestProcessRegionSendTaskSharesStoreScanLimit(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := &requestedStore{ + storeAddr: "store-1", + registerQueue: newRegionRegisterQueue(10), + scanLimiter: newRegionScanLimiter(1), + } + newWorker := func(regionID uint64) (*regionRequestWorker, *recordingEventFeedV2Client, <-chan error) { + worker := ®ionRequestWorker{ + client: &subscriptionClient{}, + store: store, + controlQueue: make(chan *regionReq, 10), + requestTracker: newRegionRequestTracker(), + } + worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates) + region := prepareRegionForSendTest(createTestRegionInfo(1, regionID)) + worker.preFetchForConnecting = newRegionReq(region, 0) + client := &recordingEventFeedV2Client{ + mockEventFeedV2Client: &mockEventFeedV2Client{}, + sent: make(chan *cdcpb.ChangeDataRequest, 1), + } + errCh := make(chan error, 1) + go func() { + errCh <- worker.processRegionSendTask(ctx, &ConnAndClient{ + Client: client, + Conn: &grpc.ClientConn{}, + }) + }() + return worker, client, errCh + } + + worker1, client1, errCh1 := newWorker(1) + worker2, client2, errCh2 := newWorker(2) + + var firstWorker, secondWorker *regionRequestWorker + var secondClient *recordingEventFeedV2Client + select { + case <-client1.sent: + firstWorker, secondWorker, secondClient = worker1, worker2, client2 + case <-client2.sent: + firstWorker, secondWorker, secondClient = worker2, worker1, client1 + case <-time.After(time.Second): + t.Fatal("expected one Register request") + } + + select { + case request := <-secondClient.sent: + t.Fatalf("scan limit exceeded before first request completed: %v", request) + case <-time.After(20 * time.Millisecond): + } + require.Equal(t, 1, activeScanCount(store.scanLimiter)) + + var firstState *regionFeedState + for _, regionID := range []uint64{1, 2} { + if state := firstWorker.getRegionState(1, regionID); state != nil { + firstState = state + break + } + } + require.NotNil(t, firstState) + firstState.setInitialized() + + select { + case <-secondClient.sent: + case <-time.After(time.Second): + t.Fatal("second Register request did not proceed after slot release") + } + require.Equal(t, 1, activeScanCount(store.scanLimiter)) + + cancel() + require.ErrorIs(t, <-errCh1, context.Canceled) + require.ErrorIs(t, <-errCh2, context.Canceled) + firstWorker.clearPendingRegions() + secondWorker.clearPendingRegions() + require.Equal(t, 0, activeScanCount(store.scanLimiter)) +} + func TestProcessRegionSendTaskSendEOFIsRetriable(t *testing.T) { testCases := []struct { name string @@ -354,24 +487,18 @@ func TestProcessRegionSendTaskSendEOFIsRetriable(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - worker := ®ionRequestWorker{ - requestCache: newRequestCache(10), - store: &requestedStore{storeAddr: "store-1"}, - client: &subscriptionClient{}, - } - worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates) + worker := newTestRegionRequestWorker(10, &subscriptionClient{}) ctx := context.Background() region := prepareRegionForSendTest(createTestRegionInfo(1, 1)) - ok, err := worker.requestCache.add(ctx, region, false) + ok, err := worker.store.registerQueue.add(ctx, region, 0) require.NoError(t, err) require.True(t, ok) - req, err := worker.requestCache.pop(ctx) - require.NoError(t, err) - worker.preFetchForConnecting = new(regionInfo) - *worker.preFetchForConnecting = req.regionInfo + req, ok, _ := worker.store.registerQueue.tryPopOrNotify() + require.True(t, ok) + worker.preFetchForConnecting = req conn := &ConnAndClient{ Client: &mockEventFeedV2Client{sendErr: tc.sendErr}, @@ -381,8 +508,8 @@ func TestProcessRegionSendTaskSendEOFIsRetriable(t *testing.T) { err = worker.processRegionSendTask(ctx, conn) var streamErr *storeStreamErr require.ErrorAs(t, err, &streamErr) - require.Equal(t, 0, worker.requestCache.getPendingCount()) - require.Empty(t, worker.requestCache.sentRequests.regionReqs) + require.Equal(t, 0, activeScanCount(worker.store.scanLimiter)) + require.Equal(t, 0, worker.requestTracker.len()) state := worker.getRegionState(req.regionInfo.subscribedSpan.subID, req.regionInfo.verID.GetID()) require.NotNil(t, state) diff --git a/logservice/logpuller/region_state.go b/logservice/logpuller/region_state.go index e9c21a7aad..5197dcb277 100644 --- a/logservice/logpuller/region_state.go +++ b/logservice/logpuller/region_state.go @@ -126,7 +126,7 @@ func (s *regionFeedState) markStopped(err error) { s.state.v = stateStopped s.state.err = err } - s.worker.requestCache.markStopped(s.region.subscribedSpan.subID, s.region.verID.GetID()) + s.worker.requestTracker.stop(s.region.subscribedSpan.subID, s.region.verID.GetID()) } // mark regionFeedState as removed if possible. @@ -138,7 +138,7 @@ func (s *regionFeedState) markRemoved() (changed bool) { changed = true s.matcher.clear() } - s.worker.requestCache.markStopped(s.region.subscribedSpan.subID, s.region.verID.GetID()) + s.worker.requestTracker.stop(s.region.subscribedSpan.subID, s.region.verID.GetID()) return } @@ -162,7 +162,7 @@ func (s *regionFeedState) isInitialized() bool { func (s *regionFeedState) setInitialized() { s.region.lockedRangeState.Initialized.Store(true) - s.worker.requestCache.resolve(s.region.subscribedSpan.subID, s.region.verID.GetID()) + s.worker.requestTracker.resolve(s.region.subscribedSpan.subID, s.region.verID.GetID()) } func (s *regionFeedState) getRegionID() uint64 { diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 0ba3b4be24..dacfac861b 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -129,7 +129,8 @@ type subscribedSpan struct { kvEventsCache []common.RawKVEntry // To handle span removing. - stopped atomic.Bool + stopped atomic.Bool + stoppedCh chan struct{} // To handle stale lock resolvings. tryResolveLock func(regionID uint64, state *regionlock.LockedRangeState) @@ -194,10 +195,8 @@ type subscriptionClient struct { stores sync.Map ds dynstream.DynamicStream[int, SubscriptionID, regionEvent, *subscribedSpan, *regionEventHandler] - // the following three fields are used to manage feedback from ds and notify other goroutines - mu sync.Mutex - cond *sync.Cond - paused atomic.Bool + // memoryQuota controls puller backpressure and new region scan admission. + memoryQuota *pullerMemoryQuota // the credential to connect tikv credential *security.Credential @@ -224,13 +223,13 @@ type subscriptionClient struct { // NewSubscriptionClient creates a client. func NewSubscriptionClient( - config *SubscriptionClientConfig, + clientConfig *SubscriptionClientConfig, pd pd.Client, lockResolver txnutil.LockResolver, credential *security.Credential, ) SubscriptionClient { subClient := &subscriptionClient{ - config: config, + config: clientConfig, stores: sync.Map{}, pd: pd, @@ -248,6 +247,8 @@ func NewSubscriptionClient( } subClient.ctx, subClient.cancel = context.WithCancel(context.Background()) subClient.totalSpans.spanMap = make(map[SubscriptionID]*subscribedSpan) + pullerConfig := config.GetGlobalServerConfig().Debug.Puller + subClient.memoryQuota = newPullerMemoryQuota(pullerConfig.MemoryQuota) option := dynstream.NewOption() // Note: it is max batch size of the kv sent from tikv(not committed rows) @@ -255,7 +256,6 @@ func NewSubscriptionClient( // TODO: Set `UseBuffer` to true until we refactor the `regionEventHandler.Handle` method so that it doesn't call any method of the dynamic stream. Currently, if `UseBuffer` is set to false, there will be a deadlock: // ds.handleLoop fetch events from `ch` -> regionEventHandler.Handle -> ds.RemovePath -> send event to `ch` option.UseBuffer = true - option.EnableMemoryControl = true ds := dynstream.NewParallelDynamicStream( "log-puller", ®ionEventHandler{subClient: subClient}, @@ -263,7 +263,6 @@ func NewSubscriptionClient( ) ds.Start() subClient.ds = ds - subClient.cond = sync.NewCond(&subClient.mu) subClient.initMetrics() return subClient @@ -298,32 +297,19 @@ func (s *subscriptionClient) updateMetrics(ctx context.Context) error { dsMetrics := s.ds.GetMetrics() metricSubscriptionClientDSChannelSize.Set(float64(dsMetrics.EventChanSize)) metricSubscriptionClientDSPendingQueueLen.Set(float64(dsMetrics.PendingQueueLen)) - if len(dsMetrics.MemoryControl.AreaMemoryMetrics) > 1 { - log.Panic("subscription client should have only one area") - } - if len(dsMetrics.MemoryControl.AreaMemoryMetrics) > 0 { - areaMetric := dsMetrics.MemoryControl.AreaMemoryMetrics[0] - metrics.DynamicStreamMemoryUsage.WithLabelValues( - "log-puller", - "max", - "default", - "default", - ).Set(float64(areaMetric.MaxMemory())) - metrics.DynamicStreamMemoryUsage.WithLabelValues( - "log-puller", - "used", - "default", - "default", - ).Set(float64(areaMetric.MemoryUsage())) - } + usedMemory, memoryQuota := s.memoryQuota.usage() + metrics.PullerMemoryQuota.WithLabelValues("quota").Set(float64(memoryQuota)) + metrics.PullerMemoryQuota.WithLabelValues("used").Set(float64(usedMemory)) pendingRegionReqCount := 0 s.stores.Range(func(_, value any) bool { store := value.(*requestedStore) + pendingRegionReqCount += store.registerQueue.len() + activeScanCount, _ := store.scanLimiter.usage() + pendingRegionReqCount += activeScanCount store.requestWorkers.RLock() for _, worker := range store.requestWorkers.s { - worker.requestCache.clearStaleRequest() - pendingRegionReqCount += worker.requestCache.getPendingCount() + pendingRegionReqCount += len(worker.controlQueue) } store.requestWorkers.RUnlock() return true @@ -366,8 +352,7 @@ func (s *subscriptionClient) Subscribe( s.totalSpans.spanMap[subID] = rt s.totalSpans.Unlock() - areaSetting := dynstream.NewAreaSettingsWithMaxPendingSize(1*1024*1024*1024, dynstream.MemoryControlForPuller, "logPuller") // 1GB - s.ds.AddPath(rt.subID, rt, areaSetting) + s.ds.AddPath(rt.subID, rt) select { case <-s.ctx.Done(): @@ -402,51 +387,20 @@ func (s *subscriptionClient) wakeSubscription(subID SubscriptionID) { } func (s *subscriptionClient) pushRegionEventToDS(subID SubscriptionID, event regionEvent) { - // fast path - if !s.paused.Load() { + if s.memoryQuota == nil { s.ds.Push(subID, event) return } - // slow path: wait until paused is false - s.mu.Lock() - for s.paused.Load() { - select { - case <-s.ctx.Done(): - s.mu.Unlock() - return - default: - s.cond.Wait() - } + span := event.mustFirstState().region.subscribedSpan + reservation, err := s.memoryQuota.acquire( + s.ctx, subID, uint64(event.getSize()), span.stoppedCh) + if err != nil { + return } - s.mu.Unlock() + event.memoryReservation = reservation s.ds.Push(subID, event) } -func (s *subscriptionClient) handleDSFeedBack(ctx context.Context) error { - for { - select { - case <-ctx.Done(): - return nil - case feedback := <-s.ds.Feedback(): - switch feedback.FeedbackType { - case dynstream.PauseArea: - s.mu.Lock() - s.paused.Store(true) - s.mu.Unlock() - log.Info("subscription client pause push region event") - case dynstream.ResumeArea: - s.mu.Lock() - s.paused.Store(false) - s.cond.Broadcast() - s.mu.Unlock() - log.Info("subscription client resume push region event") - case dynstream.ReleasePath, dynstream.ResumePath: - // Ignore it, because it is no need to pause and resume a path in puller. - } - } - } -} - func (s *subscriptionClient) Run(ctx context.Context) error { // s.consume = consume if s.pd == nil { @@ -458,7 +412,6 @@ func (s *subscriptionClient) Run(ctx context.Context) error { g, ctx := errgroup.WithContext(ctx) g.Go(func() error { return s.updateMetrics(ctx) }) - g.Go(func() error { return s.handleDSFeedBack(ctx) }) g.Go(func() error { return s.handleRangeTasks(ctx) }) g.Go(func() error { return s.handleRegions(ctx, g) }) g.Go(func() error { return s.handleErrors(ctx) }) @@ -475,10 +428,9 @@ func (s *subscriptionClient) Run(ctx context.Context) error { // Close closes the client. Must be called after `Run` returns. func (s *subscriptionClient) Close(ctx context.Context) error { s.cancel() - s.mu.Lock() - s.paused.Store(false) - s.cond.Broadcast() - s.mu.Unlock() + if s.memoryQuota != nil { + s.memoryQuota.close() + } s.ds.Close() s.regionTaskQueue.Close() return nil @@ -492,6 +444,9 @@ func (s *subscriptionClient) setTableStopped(rt *subscribedSpan) { // Then send a special singleRegionInfo to regionRouter to deregister the table // from all TiKV instances. if rt.stopped.CompareAndSwap(false, true) { + if rt.stoppedCh != nil { + close(rt.stoppedCh) + } s.regionTaskQueue.Push(NewRegionPriorityTask(TaskHighPrior, regionInfo{subscribedSpan: rt, filterLoop: rt.filterLoop}, s.pdClock.CurrentTS())) if rt.rangeLock.Stop() { s.onTableDrained(rt) @@ -503,6 +458,9 @@ func (s *subscriptionClient) onTableDrained(rt *subscribedSpan) { log.Info("subscription client stop span is finished", zap.Uint64("subscriptionID", uint64(rt.subID))) + if s.memoryQuota != nil { + s.memoryQuota.releaseSubscription(rt.subID) + } err := s.ds.RemovePath(rt.subID) if err != nil { log.Warn("subscription client remove path failed", @@ -528,9 +486,9 @@ func (s *subscriptionClient) onRegionFail(errInfo regionErrorInfo) { // requestedStore represents a store that has been connected. type requestedStore struct { - storeAddr string - // Use to select a worker to send request. - nextWorker atomic.Uint32 + storeAddr string + registerQueue *regionRegisterQueue + scanLimiter *regionScanLimiter requestWorkers struct { sync.RWMutex @@ -538,14 +496,6 @@ type requestedStore struct { } } -func (rs *requestedStore) getRequestWorker() *regionRequestWorker { - rs.requestWorkers.RLock() - defer rs.requestWorkers.RUnlock() - - index := rs.nextWorker.Add(1) % uint32(len(rs.requestWorkers.s)) - return rs.requestWorkers.s[index] -} - // handleRegions receives regionInfo from regionTaskQueue and attach rpcCtx to them, // then send them to corresponding requestedStore. func (s *subscriptionClient) handleRegions(ctx context.Context, eg *errgroup.Group) error { @@ -558,7 +508,11 @@ func (s *subscriptionClient) handleRegions(ctx context.Context, eg *errgroup.Gro return rs } - rs = &requestedStore{storeAddr: storeAddr} + rs = &requestedStore{ + storeAddr: storeAddr, + registerQueue: newRegionRegisterQueue(pendingRegionRequestQueueSize), + scanLimiter: newRegionScanLimiter(pendingRegionRequestQueueSize), + } rs.requestWorkers.s = make([]*regionRequestWorker, 0, s.config.RegionRequestWorkerPerStore) s.stores.Store(storeAddr, rs) @@ -582,10 +536,19 @@ func (s *subscriptionClient) handleRegions(ctx context.Context, eg *errgroup.Gro defer func() { s.stores.Range(func(_, value any) bool { rs := value.(*requestedStore) + rs.registerQueue.clear() rs.requestWorkers.RLock() for _, w := range rs.requestWorkers.s { - w.requestCache.clear() + DRAIN_CONTROL: + for { + select { + case <-w.controlQueue: + default: + break DRAIN_CONTROL + } + } + w.requestTracker.clear() } rs.requestWorkers.RUnlock() @@ -626,10 +589,8 @@ func (s *subscriptionClient) handleRegions(ctx context.Context, eg *errgroup.Gro } store := getStore(region.rpcCtx.Addr) - worker := store.getRequestWorker() - force := regionTask.Priority() <= forcedPriorityBase - ok, err = worker.add(ctx, region, force) + ok, err = store.registerQueue.add(ctx, region, regionTask.Priority()) if err != nil { log.Warn("subscription client add region request failed", zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID)), @@ -644,7 +605,6 @@ func (s *subscriptionClient) handleRegions(ctx context.Context, eg *errgroup.Gro } log.Debug("subscription client will request a region", - zap.Uint64("workID", worker.workerID), zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID)), zap.Uint64("regionID", region.verID.GetID()), zap.String("addr", store.storeAddr)) @@ -660,7 +620,7 @@ func (s *subscriptionClient) enqueueRegionToAllStores(ctx context.Context, regio workers := rs.requestWorkers.s rs.requestWorkers.RUnlock() for _, worker := range workers { - ok, err := worker.add(ctx, region, true) + ok, err := worker.addControl(ctx, region) if err != nil { firstErr = err enqueued = false @@ -727,6 +687,13 @@ func (s *subscriptionClient) divideSpanAndScheduleRegionRequests( nextSpan := span backoffBeforeLoad := false for { + stopped, err := s.waitRegionScanAllowed(ctx, subscribedSpan) + if err != nil { + return err + } + if stopped { + return nil + } if backoffBeforeLoad { if err := util.Hang(ctx, loadRegionRetryInterval); err != nil { return err @@ -763,6 +730,13 @@ func (s *subscriptionClient) divideSpanAndScheduleRegionRequests( } for _, regionMeta := range regionMetas { + stopped, err := s.waitRegionScanAllowed(ctx, subscribedSpan) + if err != nil { + return err + } + if stopped { + return nil + } regionSpan := heartbeatpb.TableSpan{ StartKey: regionMeta.StartKey, EndKey: regionMeta.EndKey, @@ -796,6 +770,34 @@ func (s *subscriptionClient) divideSpanAndScheduleRegionRequests( } } +func (s *subscriptionClient) waitRegionScanAllowed( + ctx context.Context, subscribedSpan *subscribedSpan, +) (bool, error) { + if subscribedSpan.stopped.Load() { + return true, nil + } + if s.memoryQuota == nil { + return false, nil + } + for { + resume, paused := s.memoryQuota.regionScanResumeNotify() + if !paused { + break + } + select { + case <-ctx.Done(): + return false, ctx.Err() + case <-subscribedSpan.stoppedCh: + return true, nil + case <-resume: + } + } + if subscribedSpan.stopped.Load() { + return true, nil + } + return false, nil +} + // scheduleRegionRequest locks the region's range and send the region to regionTaskQueue, // which will be handled by handleRegions. func (s *subscriptionClient) scheduleRegionRequest(ctx context.Context, region regionInfo, priority TaskType) { @@ -1089,6 +1091,7 @@ func (s *subscriptionClient) newSubscribedSpan( startTs: startTs, filterLoop: filterLoop, rangeLock: rangeLock, + stoppedCh: make(chan struct{}), consumeKVEvents: consumeKVEvents, advanceResolvedTs: advanceResolvedTs, diff --git a/logservice/logpuller/subscription_client_test.go b/logservice/logpuller/subscription_client_test.go index 2e87faa4fc..f69907e7c7 100644 --- a/logservice/logpuller/subscription_client_test.go +++ b/logservice/logpuller/subscription_client_test.go @@ -92,7 +92,7 @@ func TestGenerateResolveLockTask(t *testing.T) { } worker := ®ionRequestWorker{ - requestCache: &requestCache{}, + requestTracker: newRegionRequestTracker(), } // Lock another range, no task will be triggered before initialized. res = span.rangeLock.LockRange(context.Background(), []byte{'c'}, []byte{'d'}, 2, 100) @@ -382,18 +382,22 @@ func (s *mockDynamicStream) GetMetrics() dynstream.Metrics[int, SubscriptionID] } func TestPushRegionEventToDSUnblocksOnClose(t *testing.T) { + quota := newPullerMemoryQuota(1) + reservation, err := quota.acquire(context.Background(), 2, 1, nil) + require.NoError(t, err) + defer reservation.release() + span := &subscribedSpan{stoppedCh: make(chan struct{})} + state := ®ionFeedState{region: regionInfo{subscribedSpan: span}} client := &subscriptionClient{ ds: &mockDynamicStream{}, + memoryQuota: quota, regionTaskQueue: NewPriorityQueue(), } client.ctx, client.cancel = context.WithCancel(context.Background()) - client.cond = sync.NewCond(&client.mu) - - client.paused.Store(true) done := make(chan struct{}) go func() { - client.pushRegionEventToDS(SubscriptionID(1), regionEvent{}) + client.pushRegionEventToDS(SubscriptionID(1), regionEvent{states: []*regionFeedState{state}}) close(done) }() @@ -412,22 +416,19 @@ func TestPushRegionEventToDSUnblocksOnClose(t *testing.T) { } } -func TestEnqueueRegionToAllStoresRetryWhenCacheFull(t *testing.T) { +func TestEnqueueStopRegionWhenRegisterCacheFull(t *testing.T) { ctx := context.Background() client := &subscriptionClient{} - worker := ®ionRequestWorker{ - requestCache: newRequestCache(1), - } - store := &requestedStore{storeAddr: "store-1"} - store.requestWorkers.s = []*regionRequestWorker{worker} + worker := newTestRegionRequestWorker(1, client) + store := worker.store client.stores.Store(store.storeAddr, store) dummyRegion := regionInfo{ subscribedSpan: &subscribedSpan{subID: SubscriptionID(2)}, lockedRangeState: ®ionlock.LockedRangeState{}, } - ok, err := worker.add(ctx, dummyRegion, true) + ok, err := store.registerQueue.add(ctx, dummyRegion, 0) require.NoError(t, err) require.True(t, ok) @@ -436,15 +437,9 @@ func TestEnqueueRegionToAllStoresRetryWhenCacheFull(t *testing.T) { } enqueued, err := client.enqueueRegionToAllStores(ctx, stopRegion) require.NoError(t, err) - require.False(t, enqueued) - - <-worker.requestCache.pendingQueue - worker.requestCache.markDone() - - enqueued, err = client.enqueueRegionToAllStores(ctx, stopRegion) - require.NoError(t, err) require.True(t, enqueued) - require.Equal(t, 1, len(worker.requestCache.pendingQueue)) + require.Equal(t, 1, store.registerQueue.len()) + require.Equal(t, 1, len(worker.controlQueue)) } func TestSubscriptionWithFailedTiKV(t *testing.T) { diff --git a/pkg/config/debug.go b/pkg/config/debug.go index 1aa6f9b924..9271aa91ea 100644 --- a/pkg/config/debug.go +++ b/pkg/config/debug.go @@ -17,6 +17,7 @@ import ( "time" "github.com/pingcap/errors" + cerror "github.com/pingcap/ticdc/pkg/errors" ) // DebugConfig represents config for ticdc unexposed feature configurations @@ -49,12 +50,18 @@ func (c *DebugConfig) ValidateAndAdjust() error { if err := c.Scheduler.ValidateAndAdjust(); err != nil { return errors.Trace(err) } + if c.Puller.MemoryQuota == 0 { + return cerror.ErrInvalidServerOption.GenWithStackByArgs( + "debug.puller.memory-quota must be greater than 0") + } return nil } // PullerConfig represents config for puller type PullerConfig struct { + // MemoryQuota is the memory quota in bytes for events buffered by the puller. + MemoryQuota uint64 `toml:"memory-quota" json:"memory_quota"` // EnableResolvedTsStuckDetection is used to enable resolved ts stuck detection. EnableResolvedTsStuckDetection bool `toml:"enable-resolved-ts-stuck-detection" json:"enable_resolved_ts_stuck_detection"` // ResolvedTsStuckInterval is the interval of checking resolved ts stuck. @@ -62,16 +69,16 @@ type PullerConfig struct { // LogRegionDetails determines whether logs Region details or not in puller and kv-client. LogRegionDetails bool `toml:"log-region-details" json:"log_region_details"` - // PendingRegionRequestQueueSize is the total size of the pending region request queue shared across - // all puller workers connecting to a single TiKV store. This size is divided equally among all workers. - // For example, if PendingRegionRequestQueueSize is 32 and there are 8 workers connecting to the same store, - // each worker's queue size will be 32 / 8 = 4. + // PendingRegionRequestQueueSize is the capacity of the pending Register queue + // shared by all puller workers connecting to one TiKV store. It also limits + // the number of active incremental scans for that store. PendingRegionRequestQueueSize int `toml:"pending-region-request-queue-size" json:"pending_region_request_queue_size"` } // NewDefaultPullerConfig return the default puller configuration func NewDefaultPullerConfig() *PullerConfig { return &PullerConfig{ + MemoryQuota: 1024 * 1024 * 1024, // 1 GiB. EnableResolvedTsStuckDetection: false, ResolvedTsStuckInterval: TomlDuration(5 * time.Minute), LogRegionDetails: false, diff --git a/pkg/config/debug_test.go b/pkg/config/debug_test.go new file mode 100644 index 0000000000..0889bcefc8 --- /dev/null +++ b/pkg/config/debug_test.go @@ -0,0 +1,29 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestPullerMemoryQuotaConfig(t *testing.T) { + config := GetDefaultServerConfig() + require.Equal(t, uint64(1024*1024*1024), config.Debug.Puller.MemoryQuota) + require.NoError(t, config.Debug.ValidateAndAdjust()) + + config.Debug.Puller.MemoryQuota = 0 + require.Error(t, config.Debug.ValidateAndAdjust()) +} diff --git a/pkg/metrics/log_puller.go b/pkg/metrics/log_puller.go index e874aa4428..9684c648af 100644 --- a/pkg/metrics/log_puller.go +++ b/pkg/metrics/log_puller.go @@ -64,6 +64,27 @@ var ( Name: "resolved_ts_lag", Help: "The lag of resolved ts", }) + PullerRegionScanGate = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "log_puller", + Name: "region_scan_gate_open", + Help: "Whether the puller region scan gate is open.", + }) + PullerRegionScanGateTransition = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "log_puller", + Name: "region_scan_gate_transition_count", + Help: "The number of puller region scan gate state transitions.", + }, []string{"type"}) + PullerMemoryQuota = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "log_puller", + Name: "memory_quota_bytes", + Help: "Puller memory quota and current usage in bytes.", + }, []string{"type"}) SubscriptionClientResolvedTsLagGauge = prometheus.NewGauge( prometheus.GaugeOpts{ @@ -164,6 +185,9 @@ func initLogPullerMetrics(registry *prometheus.Registry) { registry.MustRegister(LogPullerPrewriteCacheRowNum) registry.MustRegister(LogPullerMatcherCount) registry.MustRegister(LogPullerResolvedTsLag) + registry.MustRegister(PullerRegionScanGate) + registry.MustRegister(PullerRegionScanGateTransition) + registry.MustRegister(PullerMemoryQuota) registry.MustRegister(SubscriptionClientRequestedRegionCount) registry.MustRegister(SubscriptionClientAddRegionRequestDuration) registry.MustRegister(RegionRequestFinishScanDuration)