diff --git a/downstreamadapter/sink/blackhole/sink.go b/downstreamadapter/sink/blackhole/sink.go index c6dc52dcda..8ac3e63f6c 100644 --- a/downstreamadapter/sink/blackhole/sink.go +++ b/downstreamadapter/sink/blackhole/sink.go @@ -19,18 +19,22 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/metrics" + "github.com/pingcap/ticdc/utils/chann" "go.uber.org/zap" ) // sink is responsible for writing data to blackhole. // Including DDL and DML. type sink struct { - eventCh chan *commonEvent.DMLEvent + eventCh *chann.UnlimitedChannel[*commonEvent.DMLEvent, any] + statistics *metrics.Statistics } -func New() (*sink, error) { +func New(changefeedID common.ChangeFeedID) (*sink, error) { return &sink{ - eventCh: make(chan *commonEvent.DMLEvent, 4096), + eventCh: chann.NewUnlimitedChannelDefault[*commonEvent.DMLEvent](), + statistics: metrics.NewStatistics(changefeedID, "sink"), }, nil } @@ -50,7 +54,7 @@ func (s *sink) AddDMLEvent(event *commonEvent.DMLEvent) { // ref: https://github.com/pingcap/ticdc/blob/da834db76e0662ff15ef12645d1f37bfa6506d83/tests/integration_tests/lossy_ddl/run.sh#L23 // Use zap.Stringer to call String() method which applies log redaction log.Debug("BlackHoleSink: WriteEvents", zap.Stringer("dml", event)) - s.eventCh <- event + s.eventCh.Push(event) } func (s *sink) FlushDMLBeforeBlock(_ commonEvent.BlockEvent) error { @@ -64,6 +68,13 @@ func (s *sink) WriteBlockEvent(event commonEvent.BlockEvent) error { // NOTE: don't change the log, integration test `lossy_ddl` depends on it. // ref: https://github.com/pingcap/ticdc/blob/da834db76e0662ff15ef12645d1f37bfa6506d83/tests/integration_tests/lossy_ddl/run.sh#L17 log.Debug("BlackHoleSink: DDL Event", zap.Any("ddl", e)) + ddlType := e.GetDDLType().String() + err := s.statistics.RecordDDLExecution(func() (string, error) { + return ddlType, nil + }) + if err != nil { + return err + } case commonEvent.TypeSyncPointEvent: default: log.Error("unknown event type", @@ -77,21 +88,35 @@ func (s *sink) AddCheckpointTs(ts uint64) { log.Debug("BlackHoleSink: Checkpoint Ts Event", zap.Uint64("ts", ts)) } -func (s *sink) Close() {} +func (s *sink) Close() { + s.eventCh.Close() + s.statistics.Close() +} func (s *sink) Run(ctx context.Context) error { for { select { case <-ctx.Done(): return nil - case event := <-s.eventCh: + default: + event, ok := s.eventCh.Get() + if !ok { + log.Info("blackhole sink event channel closed") + return nil + } + err := s.statistics.RecordBatchExecution(func() (int, int64, error) { + return int(event.Len()), event.GetSize(), nil + }) + if err != nil { + return err + } event.PostFlush() } } } func (s *sink) BatchCount() int { - return 4096 + return s.eventCh.Len() } func (s *sink) BatchBytes() int { diff --git a/downstreamadapter/sink/blackhole/sink_test.go b/downstreamadapter/sink/blackhole/sink_test.go index 9fb9474d38..cd36457bf1 100644 --- a/downstreamadapter/sink/blackhole/sink_test.go +++ b/downstreamadapter/sink/blackhole/sink_test.go @@ -19,13 +19,14 @@ import ( "testing" "time" + "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/stretchr/testify/require" ) // Test callback and tableProgress works as expected after AddDMLEvent func TestBlacHoleSinkBasicFunctionality(t *testing.T) { - sink, err := New() + sink, err := New(common.NewChangefeedID(common.DefaultKeyspaceName)) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go sink.Run(ctx) @@ -90,7 +91,7 @@ func TestBlacHoleSinkBasicFunctionality(t *testing.T) { } func TestBlackHoleSinkBatchConfig(t *testing.T) { - sink, err := New() + sink, err := New(common.NewChangefeedID(common.DefaultKeyspaceName)) require.NoError(t, err) require.Equal(t, 4096, sink.BatchCount()) require.Zero(t, sink.BatchBytes()) diff --git a/downstreamadapter/sink/sink.go b/downstreamadapter/sink/sink.go index 2429f884b2..d9d20296e7 100644 --- a/downstreamadapter/sink/sink.go +++ b/downstreamadapter/sink/sink.go @@ -65,7 +65,7 @@ func New(ctx context.Context, cfg *config.ChangefeedConfig, changefeedID common. case config.S3Scheme, config.FileScheme, config.GCSScheme, config.GSScheme, config.AzblobScheme, config.AzureScheme, config.CloudStorageNoopScheme: return cloudstorage.New(ctx, changefeedID, sinkURI, cfg.SinkConfig, cfg.EnableTableAcrossNodes, nil) case config.BlackHoleScheme: - return blackhole.New() + return blackhole.New(changefeedID) } return nil, errors.ErrSinkURIInvalid.GenWithStackByArgs(sinkURI) }