From db02386f258a9d7176603f9adec2c29adae86168 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Wed, 13 May 2026 10:59:16 +0000 Subject: [PATCH 1/9] init Signed-off-by: wk989898 --- downstreamadapter/sink/blackhole/sink.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/downstreamadapter/sink/blackhole/sink.go b/downstreamadapter/sink/blackhole/sink.go index b2dc727cdc..fd34bb99d5 100644 --- a/downstreamadapter/sink/blackhole/sink.go +++ b/downstreamadapter/sink/blackhole/sink.go @@ -19,18 +19,19 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/utils/chann" "go.uber.org/zap" ) // sink is responsible for writing data to blackhole. // Including DDL and DML. type sink struct { - eventCh chan *commonEvent.DMLEvent + eventCh *chann.UnlimitedChannel[*commonEvent.DMLEvent, any] } func New() (*sink, error) { return &sink{ - eventCh: make(chan *commonEvent.DMLEvent, 4096), + eventCh: chann.NewUnlimitedChannelDefault[*commonEvent.DMLEvent](), }, nil } @@ -50,7 +51,7 @@ func (s *sink) AddDMLEvent(event *commonEvent.DMLEvent) { // ref: https://github.com/pingcap/ticdc/blob/da834db76e0662ff15ef12645d1f37bfa6506d83/tests/integration_tests/lossy_ddl/run.sh#L23 // Use zap.Stringer to call String() method which applies log redaction log.Debug("BlackHoleSink: WriteEvents", zap.Stringer("dml", event)) - s.eventCh <- event + s.eventCh.Push(event) } func (s *sink) FlushDMLBeforeBlock(_ commonEvent.BlockEvent) error { @@ -84,14 +85,19 @@ func (s *sink) Run(ctx context.Context) error { select { case <-ctx.Done(): return nil - case event := <-s.eventCh: + default: + event, ok := s.eventCh.Get() + if !ok { + log.Info("blackhole sink event channel closed") + return nil + } event.PostFlush() } } } func (s *sink) BatchCount() int { - return 4096 + return s.eventCh.Len() } func (s *sink) BatchBytes() int { From 17a774ccdb07f04c2366c6567a8664b2c4fcbf28 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Wed, 13 May 2026 11:02:31 +0000 Subject: [PATCH 2/9] chore Signed-off-by: wk989898 --- downstreamadapter/sink/blackhole/sink.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/downstreamadapter/sink/blackhole/sink.go b/downstreamadapter/sink/blackhole/sink.go index fd34bb99d5..d70eb9bc30 100644 --- a/downstreamadapter/sink/blackhole/sink.go +++ b/downstreamadapter/sink/blackhole/sink.go @@ -78,7 +78,9 @@ func (s *sink) AddCheckpointTs(ts uint64) { log.Debug("BlackHoleSink: Checkpoint Ts Event", zap.Uint64("ts", ts)) } -func (s *sink) Close() {} +func (s *sink) Close() { + s.eventCh.Close() +} func (s *sink) Run(ctx context.Context) error { for { From e36f989b627013aa4593ca8f9a38513fbe605891 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Wed, 13 May 2026 11:45:49 +0000 Subject: [PATCH 3/9] add metrics Signed-off-by: wk989898 --- downstreamadapter/sink/blackhole/sink.go | 18 ++++++++++++++---- downstreamadapter/sink/blackhole/sink_test.go | 5 +++-- downstreamadapter/sink/sink.go | 2 +- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/downstreamadapter/sink/blackhole/sink.go b/downstreamadapter/sink/blackhole/sink.go index d70eb9bc30..ab4ef8f397 100644 --- a/downstreamadapter/sink/blackhole/sink.go +++ b/downstreamadapter/sink/blackhole/sink.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/utils/chann" "go.uber.org/zap" ) @@ -26,12 +27,14 @@ import ( // sink is responsible for writing data to blackhole. // Including DDL and DML. type sink struct { - eventCh *chann.UnlimitedChannel[*commonEvent.DMLEvent, any] + eventCh *chann.UnlimitedChannel[*commonEvent.DMLEvent, any] + statistics *metrics.Statistics } -func New() (*sink, error) { +func New(changefeedID common.ChangeFeedID) (*sink, error) { return &sink{ - eventCh: chann.NewUnlimitedChannelDefault[*commonEvent.DMLEvent](), + eventCh: chann.NewUnlimitedChannelDefault[*commonEvent.DMLEvent](), + statistics: metrics.NewStatistics(changefeedID, "sink"), }, nil } @@ -65,12 +68,15 @@ func (s *sink) WriteBlockEvent(event commonEvent.BlockEvent) error { // NOTE: don't change the log, integration test `lossy_ddl` depends on it. // ref: https://github.com/pingcap/ticdc/blob/da834db76e0662ff15ef12645d1f37bfa6506d83/tests/integration_tests/lossy_ddl/run.sh#L17 log.Debug("BlackHoleSink: DDL Event", zap.Any("ddl", e)) + ddlType := e.GetDDLType().String() + s.statistics.RecordDDLExecution(func() (string, error) { + return ddlType, nil + }) case commonEvent.TypeSyncPointEvent: default: log.Error("unknown event type", zap.Any("event", event)) } - event.PostFlush() return nil } @@ -80,6 +86,7 @@ func (s *sink) AddCheckpointTs(ts uint64) { func (s *sink) Close() { s.eventCh.Close() + s.statistics.Close() } func (s *sink) Run(ctx context.Context) error { @@ -93,6 +100,9 @@ func (s *sink) Run(ctx context.Context) error { log.Info("blackhole sink event channel closed") return nil } + s.statistics.RecordBatchExecution(func() (int, int64, error) { + return int(event.Length), event.ApproximateSize, nil + }) event.PostFlush() } } diff --git a/downstreamadapter/sink/blackhole/sink_test.go b/downstreamadapter/sink/blackhole/sink_test.go index 9fb9474d38..cd36457bf1 100644 --- a/downstreamadapter/sink/blackhole/sink_test.go +++ b/downstreamadapter/sink/blackhole/sink_test.go @@ -19,13 +19,14 @@ import ( "testing" "time" + "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/stretchr/testify/require" ) // Test callback and tableProgress works as expected after AddDMLEvent func TestBlacHoleSinkBasicFunctionality(t *testing.T) { - sink, err := New() + sink, err := New(common.NewChangefeedID(common.DefaultKeyspaceName)) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go sink.Run(ctx) @@ -90,7 +91,7 @@ func TestBlacHoleSinkBasicFunctionality(t *testing.T) { } func TestBlackHoleSinkBatchConfig(t *testing.T) { - sink, err := New() + sink, err := New(common.NewChangefeedID(common.DefaultKeyspaceName)) require.NoError(t, err) require.Equal(t, 4096, sink.BatchCount()) require.Zero(t, sink.BatchBytes()) diff --git a/downstreamadapter/sink/sink.go b/downstreamadapter/sink/sink.go index 2429f884b2..d9d20296e7 100644 --- a/downstreamadapter/sink/sink.go +++ b/downstreamadapter/sink/sink.go @@ -65,7 +65,7 @@ func New(ctx context.Context, cfg *config.ChangefeedConfig, changefeedID common. case config.S3Scheme, config.FileScheme, config.GCSScheme, config.GSScheme, config.AzblobScheme, config.AzureScheme, config.CloudStorageNoopScheme: return cloudstorage.New(ctx, changefeedID, sinkURI, cfg.SinkConfig, cfg.EnableTableAcrossNodes, nil) case config.BlackHoleScheme: - return blackhole.New() + return blackhole.New(changefeedID) } return nil, errors.ErrSinkURIInvalid.GenWithStackByArgs(sinkURI) } From 27b4de77fa5e56b4e1c3fa2a869cdfe5222ce664 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Wed, 13 May 2026 11:59:43 +0000 Subject: [PATCH 4/9] fix Signed-off-by: wk989898 --- downstreamadapter/sink/blackhole/sink.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/downstreamadapter/sink/blackhole/sink.go b/downstreamadapter/sink/blackhole/sink.go index ab4ef8f397..388c1f446a 100644 --- a/downstreamadapter/sink/blackhole/sink.go +++ b/downstreamadapter/sink/blackhole/sink.go @@ -101,7 +101,7 @@ func (s *sink) Run(ctx context.Context) error { return nil } s.statistics.RecordBatchExecution(func() (int, int64, error) { - return int(event.Length), event.ApproximateSize, nil + return int(event.Len()), event.GetSize(), nil }) event.PostFlush() } From 4eea0c99b3a51a1be357389b1482bd8aea53280d Mon Sep 17 00:00:00 2001 From: wk989898 Date: Wed, 13 May 2026 12:13:16 +0000 Subject: [PATCH 5/9] update Signed-off-by: wk989898 --- downstreamadapter/sink/blackhole/sink.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/downstreamadapter/sink/blackhole/sink.go b/downstreamadapter/sink/blackhole/sink.go index 388c1f446a..49b59491c9 100644 --- a/downstreamadapter/sink/blackhole/sink.go +++ b/downstreamadapter/sink/blackhole/sink.go @@ -22,6 +22,11 @@ import ( "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/utils/chann" "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +const ( + defaultFlushConcurrency = 8 ) // sink is responsible for writing data to blackhole. @@ -90,6 +95,16 @@ func (s *sink) Close() { } func (s *sink) Run(ctx context.Context) error { + eg, ctx := errgroup.WithContext(ctx) + for range defaultFlushConcurrency { + eg.Go(func() error { + return s.flush(ctx) + }) + } + return eg.Wait() +} + +func (s *sink) flush(ctx context.Context) error { for { select { case <-ctx.Done(): @@ -107,7 +122,6 @@ func (s *sink) Run(ctx context.Context) error { } } } - func (s *sink) BatchCount() int { return s.eventCh.Len() } From 12427304801c1f55ebe6e2290050ffd621e3867e Mon Sep 17 00:00:00 2001 From: wk989898 Date: Wed, 13 May 2026 12:13:26 +0000 Subject: [PATCH 6/9] fmt Signed-off-by: wk989898 --- downstreamadapter/sink/blackhole/sink.go | 1 + 1 file changed, 1 insertion(+) diff --git a/downstreamadapter/sink/blackhole/sink.go b/downstreamadapter/sink/blackhole/sink.go index 49b59491c9..7e0faf5b56 100644 --- a/downstreamadapter/sink/blackhole/sink.go +++ b/downstreamadapter/sink/blackhole/sink.go @@ -122,6 +122,7 @@ func (s *sink) flush(ctx context.Context) error { } } } + func (s *sink) BatchCount() int { return s.eventCh.Len() } From 2360eb534ca7d8aa6d9efec0aae9db04432c847a Mon Sep 17 00:00:00 2001 From: wk989898 Date: Wed, 13 May 2026 12:55:02 +0000 Subject: [PATCH 7/9] update Signed-off-by: wk989898 --- downstreamadapter/sink/blackhole/sink.go | 31 +++++++++++++++++------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/downstreamadapter/sink/blackhole/sink.go b/downstreamadapter/sink/blackhole/sink.go index 7e0faf5b56..4ca5b8f1ec 100644 --- a/downstreamadapter/sink/blackhole/sink.go +++ b/downstreamadapter/sink/blackhole/sink.go @@ -15,6 +15,7 @@ package blackhole import ( "context" + "sync/atomic" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" @@ -32,13 +33,18 @@ const ( // sink is responsible for writing data to blackhole. // Including DDL and DML. type sink struct { - eventCh *chann.UnlimitedChannel[*commonEvent.DMLEvent, any] + eventChs []*chann.UnlimitedChannel[*commonEvent.DMLEvent, any] statistics *metrics.Statistics + seq atomic.Uint64 } func New(changefeedID common.ChangeFeedID) (*sink, error) { + eventChs := make([]*chann.UnlimitedChannel[*commonEvent.DMLEvent, any], 0, defaultFlushConcurrency) + for range defaultFlushConcurrency { + eventChs = append(eventChs, chann.NewUnlimitedChannelDefault[*commonEvent.DMLEvent]()) + } return &sink{ - eventCh: chann.NewUnlimitedChannelDefault[*commonEvent.DMLEvent](), + eventChs: eventChs, statistics: metrics.NewStatistics(changefeedID, "sink"), }, nil } @@ -59,7 +65,8 @@ func (s *sink) AddDMLEvent(event *commonEvent.DMLEvent) { // ref: https://github.com/pingcap/ticdc/blob/da834db76e0662ff15ef12645d1f37bfa6506d83/tests/integration_tests/lossy_ddl/run.sh#L23 // Use zap.Stringer to call String() method which applies log redaction log.Debug("BlackHoleSink: WriteEvents", zap.Stringer("dml", event)) - s.eventCh.Push(event) + idx := s.seq.Add(1) % defaultFlushConcurrency + s.eventChs[idx].Push(event) } func (s *sink) FlushDMLBeforeBlock(_ commonEvent.BlockEvent) error { @@ -90,27 +97,29 @@ func (s *sink) AddCheckpointTs(ts uint64) { } func (s *sink) Close() { - s.eventCh.Close() + for _, ch := range s.eventChs { + ch.Close() + } s.statistics.Close() } func (s *sink) Run(ctx context.Context) error { eg, ctx := errgroup.WithContext(ctx) - for range defaultFlushConcurrency { + for idx := range defaultFlushConcurrency { eg.Go(func() error { - return s.flush(ctx) + return s.flush(ctx, idx) }) } return eg.Wait() } -func (s *sink) flush(ctx context.Context) error { +func (s *sink) flush(ctx context.Context, idx int) error { for { select { case <-ctx.Done(): return nil default: - event, ok := s.eventCh.Get() + event, ok := s.eventChs[idx].Get() if !ok { log.Info("blackhole sink event channel closed") return nil @@ -124,7 +133,11 @@ func (s *sink) flush(ctx context.Context) error { } func (s *sink) BatchCount() int { - return s.eventCh.Len() + count := 0 + for _, ch := range s.eventChs { + count += ch.Len() + } + return count } func (s *sink) BatchBytes() int { From 137ed109e3729038733221b095c104dd67d9f0c3 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Wed, 13 May 2026 13:32:42 +0000 Subject: [PATCH 8/9] update Signed-off-by: wk989898 --- downstreamadapter/sink/blackhole/sink.go | 40 ++++-------------------- 1 file changed, 6 insertions(+), 34 deletions(-) diff --git a/downstreamadapter/sink/blackhole/sink.go b/downstreamadapter/sink/blackhole/sink.go index 4ca5b8f1ec..388c1f446a 100644 --- a/downstreamadapter/sink/blackhole/sink.go +++ b/downstreamadapter/sink/blackhole/sink.go @@ -15,7 +15,6 @@ package blackhole import ( "context" - "sync/atomic" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" @@ -23,28 +22,18 @@ import ( "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/utils/chann" "go.uber.org/zap" - "golang.org/x/sync/errgroup" -) - -const ( - defaultFlushConcurrency = 8 ) // sink is responsible for writing data to blackhole. // Including DDL and DML. type sink struct { - eventChs []*chann.UnlimitedChannel[*commonEvent.DMLEvent, any] + eventCh *chann.UnlimitedChannel[*commonEvent.DMLEvent, any] statistics *metrics.Statistics - seq atomic.Uint64 } func New(changefeedID common.ChangeFeedID) (*sink, error) { - eventChs := make([]*chann.UnlimitedChannel[*commonEvent.DMLEvent, any], 0, defaultFlushConcurrency) - for range defaultFlushConcurrency { - eventChs = append(eventChs, chann.NewUnlimitedChannelDefault[*commonEvent.DMLEvent]()) - } return &sink{ - eventChs: eventChs, + eventCh: chann.NewUnlimitedChannelDefault[*commonEvent.DMLEvent](), statistics: metrics.NewStatistics(changefeedID, "sink"), }, nil } @@ -65,8 +54,7 @@ func (s *sink) AddDMLEvent(event *commonEvent.DMLEvent) { // ref: https://github.com/pingcap/ticdc/blob/da834db76e0662ff15ef12645d1f37bfa6506d83/tests/integration_tests/lossy_ddl/run.sh#L23 // Use zap.Stringer to call String() method which applies log redaction log.Debug("BlackHoleSink: WriteEvents", zap.Stringer("dml", event)) - idx := s.seq.Add(1) % defaultFlushConcurrency - s.eventChs[idx].Push(event) + s.eventCh.Push(event) } func (s *sink) FlushDMLBeforeBlock(_ commonEvent.BlockEvent) error { @@ -97,29 +85,17 @@ func (s *sink) AddCheckpointTs(ts uint64) { } func (s *sink) Close() { - for _, ch := range s.eventChs { - ch.Close() - } + s.eventCh.Close() s.statistics.Close() } func (s *sink) Run(ctx context.Context) error { - eg, ctx := errgroup.WithContext(ctx) - for idx := range defaultFlushConcurrency { - eg.Go(func() error { - return s.flush(ctx, idx) - }) - } - return eg.Wait() -} - -func (s *sink) flush(ctx context.Context, idx int) error { for { select { case <-ctx.Done(): return nil default: - event, ok := s.eventChs[idx].Get() + event, ok := s.eventCh.Get() if !ok { log.Info("blackhole sink event channel closed") return nil @@ -133,11 +109,7 @@ func (s *sink) flush(ctx context.Context, idx int) error { } func (s *sink) BatchCount() int { - count := 0 - for _, ch := range s.eventChs { - count += ch.Len() - } - return count + return s.eventCh.Len() } func (s *sink) BatchBytes() int { From 6731e5b93ac72dcf063006071e04e72fcd5fcc26 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Thu, 14 May 2026 11:20:30 +0000 Subject: [PATCH 9/9] update Signed-off-by: wk989898 --- downstreamadapter/sink/blackhole/sink.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/downstreamadapter/sink/blackhole/sink.go b/downstreamadapter/sink/blackhole/sink.go index db0f72ff5b..8ac3e63f6c 100644 --- a/downstreamadapter/sink/blackhole/sink.go +++ b/downstreamadapter/sink/blackhole/sink.go @@ -69,14 +69,18 @@ func (s *sink) WriteBlockEvent(event commonEvent.BlockEvent) error { // ref: https://github.com/pingcap/ticdc/blob/da834db76e0662ff15ef12645d1f37bfa6506d83/tests/integration_tests/lossy_ddl/run.sh#L17 log.Debug("BlackHoleSink: DDL Event", zap.Any("ddl", e)) ddlType := e.GetDDLType().String() - s.statistics.RecordDDLExecution(func() (string, error) { + err := s.statistics.RecordDDLExecution(func() (string, error) { return ddlType, nil }) + if err != nil { + return err + } case commonEvent.TypeSyncPointEvent: default: log.Error("unknown event type", zap.Any("event", event)) } + event.PostFlush() return nil } @@ -100,9 +104,12 @@ func (s *sink) Run(ctx context.Context) error { log.Info("blackhole sink event channel closed") return nil } - s.statistics.RecordBatchExecution(func() (int, int64, error) { + err := s.statistics.RecordBatchExecution(func() (int, int64, error) { return int(event.Len()), event.GetSize(), nil }) + if err != nil { + return err + } event.PostFlush() } }