Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions downstreamadapter/sink/blackhole/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,22 @@
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/utils/chann"
"go.uber.org/zap"
)

// sink is responsible for writing data to blackhole.
// Including DDL and DML.
type sink struct {
eventCh chan *commonEvent.DMLEvent
eventCh *chann.UnlimitedChannel[*commonEvent.DMLEvent, any]
statistics *metrics.Statistics
}

func New() (*sink, error) {
func New(changefeedID common.ChangeFeedID) (*sink, error) {

Check failure on line 34 in downstreamadapter/sink/blackhole/sink.go

View workflow job for this annotation

GitHub Actions / Check

unexported-return: exported func New returns unexported type *blackhole.sink, which can be annoying to use (revive)

Check failure on line 34 in downstreamadapter/sink/blackhole/sink.go

View workflow job for this annotation

GitHub Actions / Check

unexported-return: exported func New returns unexported type *blackhole.sink, which can be annoying to use (revive)
return &sink{
eventCh: make(chan *commonEvent.DMLEvent, 4096),
eventCh: chann.NewUnlimitedChannelDefault[*commonEvent.DMLEvent](),
statistics: metrics.NewStatistics(changefeedID, "sink"),
}, nil
}

Expand All @@ -50,7 +54,7 @@
// ref: https://github.com/pingcap/ticdc/blob/da834db76e0662ff15ef12645d1f37bfa6506d83/tests/integration_tests/lossy_ddl/run.sh#L23
// Use zap.Stringer to call String() method which applies log redaction
log.Debug("BlackHoleSink: WriteEvents", zap.Stringer("dml", event))
s.eventCh <- event
s.eventCh.Push(event)
}

func (s *sink) FlushDMLBeforeBlock(_ commonEvent.BlockEvent) error {
Expand All @@ -64,34 +68,48 @@
// NOTE: don't change the log, integration test `lossy_ddl` depends on it.
// ref: https://github.com/pingcap/ticdc/blob/da834db76e0662ff15ef12645d1f37bfa6506d83/tests/integration_tests/lossy_ddl/run.sh#L17
log.Debug("BlackHoleSink: DDL Event", zap.Any("ddl", e))
ddlType := e.GetDDLType().String()
s.statistics.RecordDDLExecution(func() (string, error) {

Check failure on line 72 in downstreamadapter/sink/blackhole/sink.go

View workflow job for this annotation

GitHub Actions / Check

Error return value of `s.statistics.RecordDDLExecution` is not checked (errcheck)

Check failure on line 72 in downstreamadapter/sink/blackhole/sink.go

View workflow job for this annotation

GitHub Actions / Check

Error return value of `s.statistics.RecordDDLExecution` is not checked (errcheck)
return ddlType, nil
})
Comment on lines +72 to +74
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Handle the error return from RecordDDLExecution.

The error returned by RecordDDLExecution is silently ignored, which violates error-handling best practices and is flagged by the pipeline. If the metrics recording fails, the sink should log the error or propagate it appropriately.

🛡️ Proposed fix to check the error
 		ddlType := e.GetDDLType().String()
-		s.statistics.RecordDDLExecution(func() (string, error) {
+		if err := s.statistics.RecordDDLExecution(func() (string, error) {
 			return ddlType, nil
-		})
+		}); err != nil {
+			log.Warn("failed to record DDL execution", zap.Error(err))
+		}

As per coding guidelines, use predefined repository errors; see docs/agents/error-handling.md before changing error creation, wrapping, or propagation.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
s.statistics.RecordDDLExecution(func() (string, error) {
return ddlType, nil
})
if err := s.statistics.RecordDDLExecution(func() (string, error) {
return ddlType, nil
}); err != nil {
log.Warn("failed to record DDL execution", zap.Error(err))
}
🧰 Tools
🪛 GitHub Actions: PR Build and Unit Test / 1_Check.txt

[error] 72-72: golangci-lint (errcheck): Error return value of s.statistics.RecordDDLExecution is not checked.

🪛 GitHub Check: Check

[failure] 72-72:
Error return value of s.statistics.RecordDDLExecution is not checked (errcheck)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@downstreamadapter/sink/blackhole/sink.go` around lines 72 - 74, The call to
s.statistics.RecordDDLExecution(…) ignores its error; update the sink (in
sink.go around s.statistics.RecordDDLExecution and ddlType) to capture the
returned error and handle it—either log the error using the sink's logger (e.g.,
s.logger.Error/Warning) or return it up the call chain from the enclosing
method; when creating or wrapping any error use the repository's predefined
error types/patterns per docs/agents/error-handling.md instead of ad-hoc strings
so the error follows project conventions.

case commonEvent.TypeSyncPointEvent:
default:
log.Error("unknown event type",
zap.Any("event", event))
}
event.PostFlush()
return nil
}

func (s *sink) AddCheckpointTs(ts uint64) {
log.Debug("BlackHoleSink: Checkpoint Ts Event", zap.Uint64("ts", ts))
}

func (s *sink) Close() {}
func (s *sink) Close() {
s.eventCh.Close()
s.statistics.Close()
}

func (s *sink) Run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case event := <-s.eventCh:
default:
event, ok := s.eventCh.Get()
if !ok {
log.Info("blackhole sink event channel closed")
return nil
}
s.statistics.RecordBatchExecution(func() (int, int64, error) {

Check failure on line 103 in downstreamadapter/sink/blackhole/sink.go

View workflow job for this annotation

GitHub Actions / Check

Error return value of `s.statistics.RecordBatchExecution` is not checked (errcheck)

Check failure on line 103 in downstreamadapter/sink/blackhole/sink.go

View workflow job for this annotation

GitHub Actions / Check

Error return value of `s.statistics.RecordBatchExecution` is not checked (errcheck)
return int(event.Len()), event.GetSize(), nil
})
Comment on lines +103 to +105
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Handle the error return from RecordBatchExecution.

The error returned by RecordBatchExecution is silently ignored, which violates error-handling best practices and is flagged by the pipeline.

🛡️ Proposed fix included in Run loop refactoring above

See the previous comment on lines 97-106 for the combined fix that addresses both the busy-wait issue and error handling.

As per coding guidelines, use predefined repository errors; see docs/agents/error-handling.md before changing error creation, wrapping, or propagation.

🧰 Tools
🪛 GitHub Check: Check

[failure] 103-103:
Error return value of s.statistics.RecordBatchExecution is not checked (errcheck)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@downstreamadapter/sink/blackhole/sink.go` around lines 103 - 105, The call to
s.statistics.RecordBatchExecution currently ignores its error return; update the
Run loop (the block calling s.statistics.RecordBatchExecution) to capture the
returned error and handle it according to repo guidelines: check the error
value, wrap or map it to the appropriate predefined repository error from
docs/agents/error-handling.md (do not create ad-hoc errors), and either log and
continue or return/propagate it up from the containing function consistent with
surrounding error flow. Ensure you reference the
s.statistics.RecordBatchExecution call site and follow existing patterns in this
file for logging/propagation so behavior remains consistent with other error
handling.

event.PostFlush()
Comment on lines +97 to 106
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Check UnlimitedChannel API for context-aware methods

# Search for UnlimitedChannel method signatures
ast-grep --pattern $'type UnlimitedChannel[$_, $_] struct {
  $$$
}'

ast-grep --pattern $'func ($_ *UnlimitedChannel[$_, $_]) GetWithContext($$$) $$$'

Repository: pingcap/ticdc

Length of output: 435


🏁 Script executed:

# Check the UnlimitedChannel implementation to understand Get() behavior
cat -n utils/chann/unlimited_chann.go | head -120 | tail -80

Repository: pingcap/ticdc

Length of output: 2758


🏁 Script executed:

# Look at the actual sink.go file to see the full Run() method context
cat -n downstreamadapter/sink/blackhole/sink.go | sed -n '80,120p'

Repository: pingcap/ticdc

Length of output: 1013


🏁 Script executed:

# Check for other patterns of UnlimitedChannel usage in the codebase
rg -A 3 "\.Get\(\)" --type go | grep -A 3 "select"

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# View the complete getWithContext implementation
cat -n utils/chann/unlimited_chann.go | sed -n '106,150p'

Repository: pingcap/ticdc

Length of output: 1217


🏁 Script executed:

# Search for similar patterns in other sink implementations
fd -e go -path "*sink*.go" | xargs grep -l "\.Get()" | head -5

Repository: pingcap/ticdc

Length of output: 226


🏁 Script executed:

# Find other implementations using the same sink pattern
rg "\.Run\(ctx.*context\.Context\)" --type go -A 15 | grep -A 15 "select"

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Look for other uses of GetWithContext to understand idiomatic pattern
rg "GetWithContext" --type go -B 2 -A 5

Repository: pingcap/ticdc

Length of output: 3359


🏁 Script executed:

# Get the full context of the cloudstorage pattern
cat -n downstreamadapter/sink/cloudstorage/dml_writers.go | sed -n '80,120p'

Repository: pingcap/ticdc

Length of output: 1194


🏁 Script executed:

# Check if there are other sinks that use GetWithContext properly
rg "GetWithContext" --type go -B 5 | grep -A 5 "for {" | head -30

Repository: pingcap/ticdc

Length of output: 39


Use GetWithContext to enable proper context cancellation handling.

The current pattern blocks on Get() in the default case, which cannot be interrupted by context cancellation. When the context is cancelled, the goroutine remains blocked in Get() waiting for an event, delaying graceful shutdown. Replace with GetWithContext(ctx) to make the blocking channel operation respect the context:

Proposed fix
 func (s *sink) Run(ctx context.Context) error {
 	for {
-		select {
-		case <-ctx.Done():
-			return nil
-		default:
-			event, ok := s.eventCh.Get()
-			if !ok {
-				log.Info("blackhole sink event channel closed")
-				return nil
-			}
-			s.statistics.RecordBatchExecution(func() (int, int64, error) {
-				return int(event.Len()), event.GetSize(), nil
-			})
-			event.PostFlush()
-		}
+		event, ok, err := s.eventCh.GetWithContext(ctx)
+		if err != nil {
+			return nil
+		}
+		if !ok {
+			log.Info("blackhole sink event channel closed")
+			return nil
+		}
+		s.statistics.RecordBatchExecution(func() (int, int64, error) {
+			return int(event.Len()), event.GetSize(), nil
+		})
+		event.PostFlush()
 	}
 }
🧰 Tools
🪛 GitHub Check: Check

[failure] 103-103:
Error return value of s.statistics.RecordBatchExecution is not checked (errcheck)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@downstreamadapter/sink/blackhole/sink.go` around lines 97 - 106, Replace the
blocking s.eventCh.Get() call with s.eventCh.GetWithContext(ctx) in the default
case so the wait respects context cancellation: call GetWithContext(ctx) on
s.eventCh (instead of Get()), check its return values (handle the
closed-channel/false case and any context-related error), log and return on
those cases the same way you already do for Get, then proceed to call
s.statistics.RecordBatchExecution(...) and event.PostFlush() as before; ensure
the surrounding function has a context variable named ctx passed in or available
to use.

}
Comment on lines 94 to 107
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation of the Run loop has a cancellation issue. The select statement with a default block will immediately call s.eventCh.Get(), which is a blocking call. If the context is cancelled while Get() is waiting for an event, the loop will not exit until an event is received or the channel is closed. This prevents the sink from shutting down gracefully when idle. Using GetWithContext(ctx) allows the loop to respond to context cancellation while waiting for events.

		event, ok, err := s.eventCh.GetWithContext(ctx)
		if err != nil {
			return nil
		}
		if !ok {
			log.Info("blackhole sink event channel closed")
			return nil
		}
		event.PostFlush()

}
}

func (s *sink) BatchCount() int {
return 4096
return s.eventCh.Len()
}

func (s *sink) BatchBytes() int {
Expand Down
5 changes: 3 additions & 2 deletions downstreamadapter/sink/blackhole/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ import (
"testing"
"time"

"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/stretchr/testify/require"
)

// Test callback and tableProgress works as expected after AddDMLEvent
func TestBlacHoleSinkBasicFunctionality(t *testing.T) {
sink, err := New()
sink, err := New(common.NewChangefeedID(common.DefaultKeyspaceName))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go sink.Run(ctx)
Expand Down Expand Up @@ -90,7 +91,7 @@ func TestBlacHoleSinkBasicFunctionality(t *testing.T) {
}

func TestBlackHoleSinkBatchConfig(t *testing.T) {
sink, err := New()
sink, err := New(common.NewChangefeedID(common.DefaultKeyspaceName))
require.NoError(t, err)
require.Equal(t, 4096, sink.BatchCount())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Update test expectation to match new BatchCount semantics.

The test expects BatchCount() to return 4096, which was the old constant value. The new implementation returns eventCh.Len(), the actual number of events in the channel (0 at initialization). Update the test to reflect the new behavior or add events before asserting.

🧪 Proposed fix to match new semantics

Option 1: Assert the correct initial value

 func TestBlackHoleSinkBatchConfig(t *testing.T) {
 	sink, err := New(common.NewChangefeedID(common.DefaultKeyspaceName))
 	require.NoError(t, err)
-	require.Equal(t, 4096, sink.BatchCount())
+	require.Equal(t, 0, sink.BatchCount())
 	require.Zero(t, sink.BatchBytes())
 }

Option 2: Test after adding events

 func TestBlackHoleSinkBatchConfig(t *testing.T) {
 	sink, err := New(common.NewChangefeedID(common.DefaultKeyspaceName))
 	require.NoError(t, err)
+	
+	// Add some test events
+	helper := commonEvent.NewEventTestHelper(t)
+	defer helper.Close()
+	helper.Tk().MustExec("use test")
+	dmlEvent := helper.DML2Event("test", "t", "insert into t values (1, 'test')")
+	sink.AddDMLEvent(dmlEvent)
+	
-	require.Equal(t, 4096, sink.BatchCount())
+	require.Equal(t, 1, sink.BatchCount())
 	require.Zero(t, sink.BatchBytes())
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
require.Equal(t, 4096, sink.BatchCount())
require.Equal(t, 0, sink.BatchCount())
🧰 Tools
🪛 GitHub Actions: PR Build and Unit Test / 0_Next Gen Unit Tests.txt

[error] 96-96: downstreamadapter/sink/blackhole TestBlackHoleSinkBatchConfig failed: Not equal (expected: 4096, actual: 0).

🪛 GitHub Actions: PR Build and Unit Test / 4_Classic Unit Tests.txt

[error] 96-96: TestBlackHoleSinkBatchConfig failed: Not equal (expected: 4096, actual: 0).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@downstreamadapter/sink/blackhole/sink_test.go` at line 96, The test assertion
for sink.BatchCount() is stale: BatchCount now returns the current channel
length (eventCh.Len()) so update the test to either assert the initial value is
0 (require.Equal(t, 0, sink.BatchCount())) or, if you want the old behavior,
push the expected number of events into the sink/channel first (e.g., use the
sink's enqueue/publish method to add 4096 events) and then assert
require.Equal(t, 4096, sink.BatchCount()); reference the BatchCount() method and
eventCh.Len() semantics when making the change.

require.Zero(t, sink.BatchBytes())
Expand Down
2 changes: 1 addition & 1 deletion downstreamadapter/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func New(ctx context.Context, cfg *config.ChangefeedConfig, changefeedID common.
case config.S3Scheme, config.FileScheme, config.GCSScheme, config.GSScheme, config.AzblobScheme, config.AzureScheme, config.CloudStorageNoopScheme:
return cloudstorage.New(ctx, changefeedID, sinkURI, cfg.SinkConfig, cfg.EnableTableAcrossNodes, nil)
case config.BlackHoleScheme:
return blackhole.New()
return blackhole.New(changefeedID)
}
return nil, errors.ErrSinkURIInvalid.GenWithStackByArgs(sinkURI)
}
Expand Down
Loading