Skip to content
Draft

[WIP] #5479

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
490 changes: 490 additions & 0 deletions docs/design/2026-06-18-puller-memory-quota.md

Large diffs are not rendered by default.

249 changes: 249 additions & 0 deletions logservice/logpuller/memory_quota.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package logpuller

import (
"context"
"sync"
"sync/atomic"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/metrics"
"go.uber.org/zap"
)

const (
regionScanPauseRatio = 0.5
regionScanResumeRatio = 0.4
)

// pullerMemoryQuota bounds memory held by admitted events and controls region
// scan admission based on current usage.
type pullerMemoryQuota struct {
mu sync.Mutex

capacity uint64
used uint64
closed bool

memoryReleased chan struct{}

regionScanPaused bool
regionScanResume chan struct{}

subscriptions map[SubscriptionID]*pullerMemorySubscription
}

type pullerMemorySubscription struct {
usage uint64
}

type pullerMemoryReservation struct {
quota *pullerMemoryQuota
subID SubscriptionID
subscription *pullerMemorySubscription
bytes uint64
released atomic.Bool
}

func newPullerMemoryQuota(capacity uint64) *pullerMemoryQuota {
metrics.PullerRegionScanGate.Set(1)
metrics.PullerMemoryQuota.WithLabelValues("quota").Set(float64(capacity))
metrics.PullerMemoryQuota.WithLabelValues("used").Set(0)
return &pullerMemoryQuota{
capacity: capacity,
memoryReleased: make(chan struct{}),
regionScanResume: make(chan struct{}),
subscriptions: make(map[SubscriptionID]*pullerMemorySubscription),
}
}

func (q *pullerMemoryQuota) acquire(
ctx context.Context, subID SubscriptionID, bytes uint64, stopped <-chan struct{},
) (*pullerMemoryReservation, error) {
for {
q.mu.Lock()
if q.closed {
q.mu.Unlock()
return nil, context.Canceled
}
// A single oversized event must be allowed to run alone, otherwise it can
// never make progress and permanently deadlocks the puller.
fits := q.used <= q.capacity && bytes <= q.capacity-q.used
if fits || q.used == 0 && bytes > q.capacity {
subscription := q.subscriptions[subID]
if subscription == nil {
subscription = &pullerMemorySubscription{}
q.subscriptions[subID] = subscription
}
q.used += bytes
subscription.usage += bytes
q.updateRegionScanStateLocked()
q.mu.Unlock()
return &pullerMemoryReservation{
quota: q,
subID: subID,
subscription: subscription,
bytes: bytes,
}, nil
}
memoryReleased := q.memoryReleased
q.mu.Unlock()

select {
case <-ctx.Done():
return nil, ctx.Err()
case <-stopped:
return nil, context.Canceled
case <-memoryReleased:
}
}
}

func (r *pullerMemoryReservation) release() {
if r == nil || !r.released.CompareAndSwap(false, true) {
return
}
r.quota.release(r)
}

func (q *pullerMemoryQuota) release(reservation *pullerMemoryReservation) {
q.mu.Lock()
defer q.mu.Unlock()
if q.closed || q.subscriptions[reservation.subID] != reservation.subscription {
return
}

usage := reservation.subscription.usage
if usage < reservation.bytes || q.used < reservation.bytes {
log.Error("puller memory quota accounting underflow",
zap.Uint64("subscriptionID", uint64(reservation.subID)),
zap.Uint64("releaseBytes", reservation.bytes),
zap.Uint64("subscriptionUsage", usage),
zap.Uint64("memoryUsage", q.used))
return
}

q.used -= reservation.bytes
usage -= reservation.bytes
if usage == 0 {
delete(q.subscriptions, reservation.subID)
} else {
reservation.subscription.usage = usage
}
q.notifyMemoryReleasedLocked()
q.updateRegionScanStateLocked()
}

// releaseSubscription releases all reservations owned by a removed
// subscription. Reservation ownership makes later stale releases no-ops.
func (q *pullerMemoryQuota) releaseSubscription(subID SubscriptionID) {
q.mu.Lock()
defer q.mu.Unlock()
if q.closed {
return
}

subscription := q.subscriptions[subID]
if subscription == nil {
return
}
usage := subscription.usage
if usage > q.used {
log.Error("puller subscription memory usage exceeds total usage",
zap.Uint64("subscriptionID", uint64(subID)),
zap.Uint64("subscriptionUsage", usage),
zap.Uint64("memoryUsage", q.used))
usage = q.used
}
q.used -= usage
delete(q.subscriptions, subID)
if usage != 0 {
q.notifyMemoryReleasedLocked()
q.updateRegionScanStateLocked()
}
}

func (q *pullerMemoryQuota) close() {
q.mu.Lock()
defer q.mu.Unlock()
if q.closed {
return
}
q.closed = true
q.used = 0
clear(q.subscriptions)
metrics.PullerMemoryQuota.WithLabelValues("used").Set(0)
close(q.memoryReleased)
if q.regionScanPaused {
q.regionScanPaused = false
close(q.regionScanResume)
}
metrics.PullerRegionScanGate.Set(1)
}

func (q *pullerMemoryQuota) notifyMemoryReleasedLocked() {
close(q.memoryReleased)
q.memoryReleased = make(chan struct{})
}

func (q *pullerMemoryQuota) updateRegionScanStateLocked() {
usageRatio := float64(q.used) / float64(q.capacity)
switch {
case !q.regionScanPaused && usageRatio >= regionScanPauseRatio:
q.regionScanPaused = true
q.regionScanResume = make(chan struct{})
metrics.PullerRegionScanGate.Set(0)
metrics.PullerRegionScanGateTransition.WithLabelValues("pause").Inc()
log.Info("puller pauses region scans",
zap.Uint64("memoryUsage", q.used),
zap.Uint64("memoryQuota", q.capacity),
zap.Float64("memoryUsageRatio", usageRatio))
case q.regionScanPaused && usageRatio < regionScanResumeRatio:
q.regionScanPaused = false
close(q.regionScanResume)
metrics.PullerRegionScanGate.Set(1)
metrics.PullerRegionScanGateTransition.WithLabelValues("resume").Inc()
log.Info("puller resumes region scans",
zap.Uint64("memoryUsage", q.used),
zap.Uint64("memoryQuota", q.capacity),
zap.Float64("memoryUsageRatio", usageRatio))
}
}

func (q *pullerMemoryQuota) waitRegionScanAllowed(ctx context.Context) error {
for {
resume, paused := q.regionScanResumeNotify()
if !paused {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-resume:
}
}
}

func (q *pullerMemoryQuota) regionScanResumeNotify() (<-chan struct{}, bool) {
q.mu.Lock()
defer q.mu.Unlock()
return q.regionScanResume, q.regionScanPaused
}

func (q *pullerMemoryQuota) usage() (used, capacity uint64) {
q.mu.Lock()
defer q.mu.Unlock()
return q.used, q.capacity
}
138 changes: 138 additions & 0 deletions logservice/logpuller/memory_quota_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package logpuller

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestPullerMemoryQuotaAdmissionAndScanThresholds(t *testing.T) {
quota := newPullerMemoryQuota(100)
ctx := context.Background()

first, err := quota.acquire(ctx, 1, 49, nil)
require.NoError(t, err)
_, scanPaused := quota.regionScanResumeNotify()
require.False(t, scanPaused)

second, err := quota.acquire(ctx, 1, 1, nil)
require.NoError(t, err)
_, scanPaused = quota.regionScanResumeNotify()
require.True(t, scanPaused)

third, err := quota.acquire(ctx, 1, 50, nil)
require.NoError(t, err)
used, capacity := quota.usage()
require.Equal(t, uint64(100), used)
require.Equal(t, uint64(100), capacity)

blocked := make(chan *pullerMemoryReservation, 1)
go func() {
reservation, acquireErr := quota.acquire(ctx, 2, 1, nil)
require.NoError(t, acquireErr)
blocked <- reservation
}()
select {
case <-blocked:
t.Fatal("event admitted when quota was full")
case <-time.After(20 * time.Millisecond):
}

third.release()
fourth := <-blocked
used, _ = quota.usage()
require.Equal(t, uint64(51), used)
_, scanPaused = quota.regionScanResumeNotify()
require.True(t, scanPaused)

first.release()
_, scanPaused = quota.regionScanResumeNotify()
require.False(t, scanPaused)
require.NoError(t, quota.waitRegionScanAllowed(ctx))

second.release()
fourth.release()
used, _ = quota.usage()
require.Zero(t, used)
}

func TestPullerMemoryQuotaReleaseSubscription(t *testing.T) {
quota := newPullerMemoryQuota(100)
reservation, err := quota.acquire(context.Background(), 1, 60, nil)
require.NoError(t, err)

quota.releaseSubscription(1)
used, _ := quota.usage()
require.Zero(t, used)

// A stale event can still be handled after subscription cleanup. Its release
// must not affect a later generation of accounting.
newReservation, err := quota.acquire(context.Background(), 1, 20, nil)
require.NoError(t, err)
reservation.release()
used, _ = quota.usage()
require.Equal(t, uint64(20), used)
newReservation.release()
}

func TestPullerMemoryQuotaOversizedEventRunsAlone(t *testing.T) {
quota := newPullerMemoryQuota(100)
reservation, err := quota.acquire(context.Background(), 1, 101, nil)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
done := make(chan error, 1)
go func() {
_, acquireErr := quota.acquire(ctx, 2, 1, nil)
done <- acquireErr
}()
select {
case err := <-done:
t.Fatalf("second event admitted with oversized event: %v", err)
case <-time.After(20 * time.Millisecond):
}

cancel()
require.ErrorIs(t, <-done, context.Canceled)
reservation.release()
}

func TestProducerGateStopsWaitingAfterUnsubscribe(t *testing.T) {
quota := newPullerMemoryQuota(100)
reservation, err := quota.acquire(context.Background(), 1, 50, nil)
require.NoError(t, err)
defer reservation.release()
client := &subscriptionClient{memoryQuota: quota}
span := &subscribedSpan{stoppedCh: make(chan struct{})}

type result struct {
stopped bool
err error
}
done := make(chan result, 1)
go func() {
stopped, waitErr := client.waitRegionScanAllowed(context.Background(), span)
done <- result{stopped: stopped, err: waitErr}
}()

span.stopped.Store(true)
close(span.stoppedCh)
res := <-done
require.NoError(t, res.err)
require.True(t, res.stopped)
}
Loading
Loading