Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion downstreamadapter/eventcollector/event_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ func (c *EventCollector) PrepareAddDispatcher(

ds := c.getDynamicStream(target.GetMode())
areaSetting := dynstream.NewAreaSettingsWithMaxPendingSize(memoryQuota, dynstream.MemoryControlForEventCollector, "eventCollector")
err := ds.AddPath(target.GetId(), stat, areaSetting)
ds.AddArea(changefeedID.ID(), areaSetting)
err := ds.AddPath(target.GetId(), stat)
if err != nil {
log.Warn("add dispatcher to dynamic stream failed", zap.Error(err))
}
Expand Down Expand Up @@ -329,6 +330,8 @@ func (c *EventCollector) RemoveDispatcher(target dispatcher.DispatcherService) {
stat.removeMetrics()
log.Info("last dispatcher removed, clean up changefeed stat", zap.Stringer("changefeedID", target.GetChangefeedID()))
}
c.ds.RemoveArea(changefeedID.ID())
c.redoDs.RemoveArea(changefeedID.ID())
}
}

Expand Down
8 changes: 4 additions & 4 deletions logservice/logpuller/region_event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestHandleEventEntryEventOutOfOrder(t *testing.T) {
advanceResolvedTs: advanceResolvedTs,
advanceInterval: 0,
}
ds.AddPath(subID, subSpan, dynstream.AreaSettings{})
ds.AddPath(subID, subSpan)

worker := &regionRequestWorker{
requestCache: &requestCache{},
Expand Down Expand Up @@ -233,7 +233,7 @@ func TestHandleResolvedTs(t *testing.T) {
advanceResolvedTs: advanceResolvedTs,
advanceInterval: 0,
}
ds.AddPath(subID1, subSpan, dynstream.AreaSettings{})
ds.AddPath(subID1, subSpan)
state1.region.subscribedSpan = subSpan
state1.region.lockedRangeState = &regionlock.LockedRangeState{}
state1.setInitialized()
Expand All @@ -257,7 +257,7 @@ func TestHandleResolvedTs(t *testing.T) {
advanceResolvedTs: advanceResolvedTs,
advanceInterval: 0,
}
ds.AddPath(subID2, subSpan, dynstream.AreaSettings{})
ds.AddPath(subID2, subSpan)
state2.region.subscribedSpan = subSpan
state2.region.lockedRangeState = &regionlock.LockedRangeState{}
state2.setInitialized()
Expand All @@ -281,7 +281,7 @@ func TestHandleResolvedTs(t *testing.T) {
advanceResolvedTs: advanceResolvedTs,
advanceInterval: 0,
}
ds.AddPath(subID3, subSpan, dynstream.AreaSettings{})
ds.AddPath(subID3, subSpan)
state3.region.subscribedSpan = subSpan
state3.region.lockedRangeState = &regionlock.LockedRangeState{}
state3.updateResolvedTs(8)
Expand Down
3 changes: 2 additions & 1 deletion logservice/logpuller/subscription_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,8 @@ func (s *subscriptionClient) Subscribe(
s.totalSpans.Unlock()

areaSetting := dynstream.NewAreaSettingsWithMaxPendingSize(1*1024*1024*1024, dynstream.MemoryControlForPuller, "logPuller") // 1GB
s.ds.AddPath(rt.subID, rt, areaSetting)
s.ds.AddArea(0, areaSetting)
s.ds.AddPath(rt.subID, rt)
Comment on lines +364 to +365
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The AddArea call is made for every subscription, but since the area is always 0 and the settings are the same, this is redundant. This will repeatedly overwrite the settings for area 0.

It would be cleaner to call AddArea once when the subscriptionClient is initialized, for example in NewSubscriptionClient, and remove this call from the Subscribe method.


select {
case <-s.ctx.Done():
Expand Down
11 changes: 10 additions & 1 deletion utils/dynstream/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,19 @@ type DynamicStream[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]] inter
// Return nil if Option.EnableMemoryControl is false.
Feedback() <-chan Feedback[A, P, D]

// AddArea registers an area with the given settings.
// The settings will be applied to the area when it has paths, and can be updated by calling AddArea again.
// Call RemoveArea when the area is no longer used to avoid settings leak.
AddArea(area A, settings AreaSettings)

// RemoveArea unregisters an area that was previously added by AddArea.
// It only removes the registered settings, and does not remove existing paths.
RemoveArea(area A)

// AddPath add the path to the dynamic stream to receive the events.
// An event of a path not already added will be dropped.
// Return ErrorTypeDuplicate if the path already exists.
AddPath(path P, dest D, area ...AreaSettings) error
AddPath(path P, dest D) error

// RemovePath removes the path from the dynamic stream.
// After this call return, future events with the path will be dropped, including events which are already in the stream.
Expand Down
52 changes: 41 additions & 11 deletions utils/dynstream/memory_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,15 +296,37 @@ type memControl[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]] struct {
// Since this struct is global level, different streams may access it concurrently.
mutex sync.Mutex

areaStatMap map[A]*areaMemStat[A, P, T, D, H]
areaStatMap map[A]*areaMemStat[A, P, T, D, H]
areaSettingsMap map[A]AreaSettings
}

func newMemControl[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]]() *memControl[A, P, T, D, H] {
return &memControl[A, P, T, D, H]{
areaStatMap: make(map[A]*areaMemStat[A, P, T, D, H]),
areaStatMap: make(map[A]*areaMemStat[A, P, T, D, H]),
areaSettingsMap: make(map[A]AreaSettings),
}
}

func (m *memControl[A, P, T, D, H]) addArea(area A, settings AreaSettings) {
settings.fix()

m.mutex.Lock()
defer m.mutex.Unlock()

m.areaSettingsMap[area] = settings
if as, ok := m.areaStatMap[area]; ok {
as.settings.Store(&settings)
}
}
Comment on lines +310 to +320
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The settings parameter is passed by value, so it's a local copy on the stack. Storing its address with as.settings.Store(&settings) on line 318 is incorrect as the pointer will be invalid after addArea returns. This can lead to memory corruption.

You should allocate settings on the heap before storing a pointer to it. For example:

	if as, ok := m.areaStatMap[area]; ok {
		// The `settings` is a copy on the stack, we can't store a pointer to it.
		// So we need to allocate a new one on the heap.
		s := new(AreaSettings)
		*s = settings
		as.settings.Store(s)
	}


func (m *memControl[A, P, T, D, H]) removeArea(area A) {
m.mutex.Lock()
defer m.mutex.Unlock()

delete(m.areaSettingsMap, area)
// The area stat is still managed by paths' lifecycle.
}

func (m *memControl[A, P, T, D, H]) setAreaSettings(area A, settings AreaSettings) {
m.mutex.Lock()
defer m.mutex.Unlock()
Expand All @@ -315,21 +337,29 @@ func (m *memControl[A, P, T, D, H]) setAreaSettings(area A, settings AreaSetting
}
}

func (m *memControl[A, P, T, D, H]) addPathToArea(path *pathInfo[A, P, T, D, H], settings AreaSettings, feedbackChan chan<- Feedback[A, P, D]) {
func (m *memControl[A, P, T, D, H]) addPathToArea(path *pathInfo[A, P, T, D, H], feedbackChan chan<- Feedback[A, P, D]) {
m.mutex.Lock()
defer m.mutex.Unlock()

area, ok := m.areaStatMap[path.area]
areaStat, ok := m.areaStatMap[path.area]
if !ok {
area = newAreaMemStat(path.area, m, settings, feedbackChan)
m.areaStatMap[path.area] = area
settings, ok := m.areaSettingsMap[path.area]
if !ok {
settings = AreaSettings{}
}
settings.fix()

areaStat = newAreaMemStat(path.area, m, settings, feedbackChan)
m.areaStatMap[path.area] = areaStat
} else if settings, ok := m.areaSettingsMap[path.area]; ok {
// Ensure the stat uses the latest settings from AddArea.
settings.fix()
areaStat.settings.Store(&settings)
}

path.areaMemStat = area
area.pathMap.Store(path.path, path)
area.pathCount.Add(1)
// Update the settings
area.settings.Store(&settings)
path.areaMemStat = areaStat
areaStat.pathMap.Store(path.path, path)
areaStat.pathCount.Add(1)
}
Comment on lines +340 to 363
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This function has a critical bug related to storing pointers to stack variables, which can lead to memory corruption.

  1. On line 357, areaStat.settings.Store(&settings) stores a pointer to settings, which is a local variable (a copy from the map). This pointer becomes invalid when addPathToArea returns.
  2. On line 352, newAreaMemStat is called with settings, which is also a local variable. The newAreaMemStat function also stores a pointer to its settings parameter, which is a copy on its own stack. This is also a bug.

To fix this, you need to ensure you are storing pointers to heap-allocated AreaSettings objects. For example, by using new(AreaSettings) before storing the pointer.


// This method is called after the path is removed.
Expand Down
24 changes: 15 additions & 9 deletions utils/dynstream/memory_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ func TestMemControlAddRemovePath(t *testing.T) {
feedbackChan := make(chan Feedback[int, string, any], 10)

// Test adding path
mc.addPathToArea(path, settings, feedbackChan)
mc.addArea(path.area, settings)
mc.addPathToArea(path, feedbackChan)
require.NotNil(t, path.areaMemStat)
require.Equal(t, int64(1), path.areaMemStat.pathCount.Load())

Expand All @@ -65,7 +66,8 @@ func TestAreaMemStatAppendEvent(t *testing.T) {
algorithm: MemoryControlForPuller,
}
feedbackChan := make(chan Feedback[int, string, any], 10)
mc.addPathToArea(path1, settings, feedbackChan)
mc.addArea(path1.area, settings)
mc.addPathToArea(path1, feedbackChan)

handler := &mockHandler{}
option := NewOption()
Expand Down Expand Up @@ -173,7 +175,8 @@ func TestSetAreaSettings(t *testing.T) {
algorithm: MemoryControlForPuller,
}
feedbackChan := make(chan Feedback[int, string, any], 10)
mc.addPathToArea(path, initialSettings, feedbackChan)
mc.addArea(path.area, initialSettings)
mc.addPathToArea(path, feedbackChan)
require.Equal(t, initialSettings, *path.areaMemStat.settings.Load())

// Case 2: Set the new settings.
Expand Down Expand Up @@ -202,10 +205,11 @@ func TestGetMetrics(t *testing.T) {
metrics := mc.getMetrics()
require.Equal(t, 0, len(metrics.AreaMemoryMetrics))

mc.addPathToArea(path, AreaSettings{
mc.addArea(path.area, AreaSettings{
maxPendingSize: 100,
feedbackInterval: time.Second,
}, nil)
})
mc.addPathToArea(path, nil)
metrics = mc.getMetrics()
require.Equal(t, 1, len(metrics.AreaMemoryMetrics))
require.Equal(t, int64(0), metrics.AreaMemoryMetrics[0].UsedMemoryValue)
Expand All @@ -225,7 +229,8 @@ func TestUpdateAreaPauseState(t *testing.T) {
}

feedbackChan := make(chan Feedback[int, string, any], 10)
mc.addPathToArea(path, settings, feedbackChan)
mc.addArea(path.area, settings)
mc.addPathToArea(path, feedbackChan)
areaMemStat := path.areaMemStat

areaMemStat.totalPendingSize.Store(int64(10))
Expand Down Expand Up @@ -298,9 +303,10 @@ func TestReleaseMemory(t *testing.T) {
path3.blocking.Store(true)

// Add paths to area
mc.addPathToArea(path1, settings, feedbackChan)
mc.addPathToArea(path2, settings, feedbackChan)
mc.addPathToArea(path3, settings, feedbackChan)
mc.addArea(area, settings)
mc.addPathToArea(path1, feedbackChan)
mc.addPathToArea(path2, feedbackChan)
mc.addPathToArea(path3, feedbackChan)

// Set different last handle event timestamps
// path1: most recent (largest ts), should be released first
Expand Down
23 changes: 15 additions & 8 deletions utils/dynstream/parallel_dynamic_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,19 @@ func (s *parallelDynamicStream[A, P, T, D, H]) Feedback() <-chan Feedback[A, P,
return s.feedbackChan
}

func (s *parallelDynamicStream[A, P, T, D, H]) AddPath(path P, dest D, as ...AreaSettings) error {
func (s *parallelDynamicStream[A, P, T, D, H]) AddArea(area A, settings AreaSettings) {
if s.memControl != nil {
s.memControl.addArea(area, settings)
}
}

func (s *parallelDynamicStream[A, P, T, D, H]) RemoveArea(area A) {
if s.memControl != nil {
s.memControl.removeArea(area)
}
}

func (s *parallelDynamicStream[A, P, T, D, H]) AddPath(path P, dest D) error {
s.pathMap.Lock()
_, ok := s.pathMap.m[path]
if ok {
Expand All @@ -209,7 +221,7 @@ func (s *parallelDynamicStream[A, P, T, D, H]) AddPath(path P, dest D, as ...Are
s._statAddPathCount.Add(1)
s.pathMap.Unlock()

s.setMemControl(pi, as...)
s.setMemControl(pi)

if pi.stream.closed.Load() {
return nil
Expand Down Expand Up @@ -263,13 +275,8 @@ func (s *parallelDynamicStream[A, P, T, D, H]) GetMetrics() Metrics[A, P] {

func (s *parallelDynamicStream[A, P, T, D, H]) setMemControl(
pi *pathInfo[A, P, T, D, H],
as ...AreaSettings,
) {
if s.memControl != nil {
setting := AreaSettings{}
if len(as) > 0 {
setting = as[0]
}
s.memControl.addPathToArea(pi, setting, s.feedbackChan)
s.memControl.addPathToArea(pi, s.feedbackChan)
}
}
3 changes: 2 additions & 1 deletion utils/dynstream/parallel_dynamic_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ func TestParallelDynamicStreamMemoryControl(t *testing.T) {
require.NotNil(t, stream.feedbackChan)
settings := AreaSettings{maxPendingSize: 1024, feedbackInterval: 10 * time.Millisecond}
// The path is belong to area 0
stream.AddPath("path1", "dest1", settings)
stream.AddArea(0, settings)
stream.AddPath("path1", "dest1")
stream.pathMap.RLock()
require.Equal(t, 1, len(stream.pathMap.m))
pi := stream.pathMap.m["path1"]
Expand Down