Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ tools/bin
tools/include
tools/workload/bin

.design
.issue
.vscode
.idea
Expand Down
29 changes: 19 additions & 10 deletions api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ import (
"go.uber.org/zap"
)

func maskSinkURIForError(sinkURI string) string {
return util.MaskSensitiveDataInURIForError(sinkURI)
}

func genSinkURIInvalidError(sinkURI string, err error) error {
return errors.WrapError(
errors.ErrSinkURIInvalid, util.MaskSensitiveDataInURLError(err), maskSinkURIForError(sinkURI))
}

// CreateChangefeed handles create changefeed request,
// it returns the changefeed's changefeedInfo that it just created
// CreateChangefeed creates a changefeed
Expand Down Expand Up @@ -135,7 +144,7 @@ func (h *OpenAPIV2) CreateChangefeed(c *gin.Context) {
}
sinkURIParsed, err := url.Parse(cfg.SinkURI)
if err != nil {
_ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, cfg.SinkURI))
_ = c.Error(genSinkURIInvalidError(cfg.SinkURI, err))
return
}

Expand All @@ -144,7 +153,7 @@ func (h *OpenAPIV2) CreateChangefeed(c *gin.Context) {
if config.IsMQScheme(scheme) {
topic, err = helper.GetTopic(sinkURIParsed)
if err != nil {
_ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, cfg.SinkURI))
_ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, maskSinkURIForError(cfg.SinkURI)))
return
}
}
Expand Down Expand Up @@ -291,7 +300,7 @@ func (h *OpenAPIV2) CreateChangefeed(c *gin.Context) {
}
err = sink.Verify(ctx, cfConfig, changefeedID)
if err != nil {
_ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, cfg.SinkURI))
_ = c.Error(genSinkURIInvalidError(cfg.SinkURI, err))
return
}

Expand Down Expand Up @@ -431,7 +440,7 @@ func (h *OpenAPIV2) VerifyTable(c *gin.Context) {
// verify replicaConfig
sinkURIParsed, err := url.Parse(cfg.SinkURI)
if err != nil {
_ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, cfg.SinkURI))
_ = c.Error(genSinkURIInvalidError(cfg.SinkURI, err))
return
}
err = replicaCfg.ValidateAndAdjust(sinkURIParsed)
Expand All @@ -445,7 +454,7 @@ func (h *OpenAPIV2) VerifyTable(c *gin.Context) {
if config.IsMQScheme(scheme) {
topic, err = helper.GetTopic(sinkURIParsed)
if err != nil {
_ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, cfg.SinkURI))
_ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, maskSinkURIForError(cfg.SinkURI)))
return
}
}
Expand Down Expand Up @@ -822,14 +831,14 @@ func (h *OpenAPIV2) ResumeChangefeed(c *gin.Context) {
)
sinkURIParsed, err = url.Parse(cfInfo.SinkURI)
if err != nil {
_ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, cfInfo.SinkURI))
_ = c.Error(genSinkURIInvalidError(cfInfo.SinkURI, err))
return
}
scheme := sinkURIParsed.Scheme
if config.IsMQScheme(scheme) {
topic, err = helper.GetTopic(sinkURIParsed)
if err != nil {
_ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, cfInfo.SinkURI))
_ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, maskSinkURIForError(cfInfo.SinkURI)))
return
}
}
Expand Down Expand Up @@ -975,7 +984,7 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) {
// verify replicaConfig
sinkURIParsed, err := url.Parse(oldCfInfo.SinkURI)
if err != nil {
_ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, oldCfInfo.SinkURI))
_ = c.Error(genSinkURIInvalidError(oldCfInfo.SinkURI, err))
return
}
err = oldCfInfo.Config.ValidateAndAdjust(sinkURIParsed)
Expand All @@ -989,7 +998,7 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) {
if config.IsMQScheme(scheme) {
topic, err = helper.GetTopic(sinkURIParsed)
if err != nil {
_ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, oldCfInfo.SinkURI))
_ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, maskSinkURIForError(oldCfInfo.SinkURI)))
return
}
}
Expand Down Expand Up @@ -1033,7 +1042,7 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) {

err = sink.Verify(ctx, oldCfInfo.ToChangefeedConfig(), oldCfInfo.ChangefeedID)
if err != nil {
_ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, oldCfInfo.SinkURI))
_ = c.Error(genSinkURIInvalidError(oldCfInfo.SinkURI, err))
return
}

Expand Down
57 changes: 57 additions & 0 deletions api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package v2

import (
"net/url"
"testing"

perrors "github.com/pingcap/errors"
"github.com/stretchr/testify/require"
)

func TestMaskSinkURIForError(t *testing.T) {
sinkURI := "kafka://127.0.0.1:9092/topic?protocol=canal-json" +
"&sasl-user=ticdc&sasl-password=verysecure&secret-access-key=rawsecret"

maskedURI := maskSinkURIForError(sinkURI)
require.NotContains(t, maskedURI, "verysecure")
require.NotContains(t, maskedURI, "rawsecret")
require.Contains(t, maskedURI, "sasl-password=xxxxx")
require.Contains(t, maskedURI, "secret-access-key=xxxxx")
require.Contains(t, maskedURI, "sasl-user=ticdc")

invalidURI := "mysql://root:verysecure@127.0.0.1/%zz"
require.Equal(t, "<invalid uri>", maskSinkURIForError(invalidURI))

err := genSinkURIInvalidError(invalidURI, mustParseURLError(t, invalidURI))
require.NotContains(t, err.Error(), "verysecure")
require.Contains(t, err.Error(), "<invalid uri>")
require.Contains(t, err.Error(), `parse "<invalid uri>"`)
require.Contains(t, err.Error(), "invalid URL escape")

err = genSinkURIInvalidError(invalidURI, perrors.Trace(mustParseURLError(t, invalidURI)))
require.NotContains(t, err.Error(), "verysecure")
require.Contains(t, err.Error(), "<invalid uri>")
require.Contains(t, err.Error(), `parse "<invalid uri>"`)
require.Contains(t, err.Error(), "invalid URL escape")
}

func mustParseURLError(t *testing.T, rawURL string) error {
t.Helper()

_, err := url.Parse(rawURL)
require.Error(t, err)
return err
}
17 changes: 13 additions & 4 deletions downstreamadapter/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/util"
)

type Sink interface {
Expand All @@ -50,7 +51,10 @@ type Sink interface {
func New(ctx context.Context, cfg *config.ChangefeedConfig, changefeedID common.ChangeFeedID) (Sink, error) {
sinkURI, err := url.Parse(cfg.SinkURI)
if err != nil {
return nil, errors.WrapError(errors.ErrSinkURIInvalid, err)
return nil, errors.WrapError(
errors.ErrSinkURIInvalid,
util.MaskSensitiveDataInURLError(err),
util.MaskSensitiveDataInURIForError(cfg.SinkURI))
}
scheme := config.GetScheme(sinkURI)
switch scheme {
Expand All @@ -65,13 +69,17 @@ func New(ctx context.Context, cfg *config.ChangefeedConfig, changefeedID common.
case config.BlackHoleScheme:
return blackhole.New(changefeedID)
}
return nil, errors.ErrSinkURIInvalid.GenWithStackByArgs(sinkURI)
return nil, errors.ErrSinkURIInvalid.GenWithStackByArgs(
util.MaskSensitiveDataInURIForError(sinkURI.String()))
}

func Verify(ctx context.Context, cfg *config.ChangefeedConfig, changefeedID common.ChangeFeedID) error {
sinkURI, err := url.Parse(cfg.SinkURI)
if err != nil {
return errors.WrapError(errors.ErrSinkURIInvalid, err)
return errors.WrapError(
errors.ErrSinkURIInvalid,
util.MaskSensitiveDataInURLError(err),
util.MaskSensitiveDataInURIForError(cfg.SinkURI))
}
scheme := config.GetScheme(sinkURI)
switch scheme {
Expand All @@ -86,5 +94,6 @@ func Verify(ctx context.Context, cfg *config.ChangefeedConfig, changefeedID comm
case config.BlackHoleScheme:
return nil
}
return errors.ErrSinkURIInvalid.GenWithStackByArgs(sinkURI)
return errors.ErrSinkURIInvalid.GenWithStackByArgs(
util.MaskSensitiveDataInURIForError(sinkURI.String()))
}
77 changes: 77 additions & 0 deletions downstreamadapter/sink/sink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package sink

import (
"context"
"testing"

"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/config"
"github.com/stretchr/testify/require"
)

func TestUnknownSchemeMasksSensitiveSinkURI(t *testing.T) {
t.Parallel()

changefeedID := common.NewChangefeedID(common.DefaultKeyspaceName)
cfg := &config.ChangefeedConfig{
SinkURI: "unknown://127.0.0.1:9092/topic?sasl-password=verysecure&access-key=rawkey",
}

_, err := New(context.Background(), cfg, changefeedID)
require.Error(t, err)
requireMaskedSinkURIError(t, err)

err = Verify(context.Background(), cfg, changefeedID)
require.Error(t, err)
requireMaskedSinkURIError(t, err)
}

func TestParseErrorMasksSensitiveSinkURI(t *testing.T) {
t.Parallel()

changefeedID := common.NewChangefeedID(common.DefaultKeyspaceName)
cfg := &config.ChangefeedConfig{
SinkURI: "mysql://root:verysecure@127.0.0.1/%zz",
}

_, err := New(context.Background(), cfg, changefeedID)
require.Error(t, err)
requireInvalidSinkURIError(t, err)

err = Verify(context.Background(), cfg, changefeedID)
require.Error(t, err)
requireInvalidSinkURIError(t, err)
}

func requireMaskedSinkURIError(t *testing.T, err error) {
t.Helper()

errMsg := err.Error()
require.NotContains(t, errMsg, "verysecure")
require.NotContains(t, errMsg, "rawkey")
require.Contains(t, errMsg, "sasl-password=xxxxx")
require.Contains(t, errMsg, "access-key=xxxxx")
}

func requireInvalidSinkURIError(t *testing.T, err error) {
t.Helper()

errMsg := err.Error()
require.NotContains(t, errMsg, "verysecure")
require.Contains(t, errMsg, "<invalid uri>")
require.Contains(t, errMsg, `parse "<invalid uri>"`)
require.Contains(t, errMsg, "invalid URL escape")
}
5 changes: 4 additions & 1 deletion pkg/check/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@ func getClusterIDBySinkURI(
) (uint64, string, bool, error) {
uri, err := url.Parse(sinkURI)
if err != nil {
return 0, "", false, cerrors.WrapError(cerrors.ErrSinkURIInvalid, err, sinkURI)
return 0, "", false, cerrors.WrapError(
cerrors.ErrSinkURIInvalid,
util.MaskSensitiveDataInURLError(err),
util.MaskSensitiveDataInURIForError(sinkURI))
}

scheme := config.GetScheme(uri)
Expand Down
28 changes: 23 additions & 5 deletions pkg/util/uri.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@
package util

import (
stderrors "errors"
"net"
"net/url"
"strings"

"github.com/pingcap/log"
"go.uber.org/zap"
)

// IsValidIPv6AddressFormatInURI reports whether hostPort is a valid IPv6 address in URI.
Expand Down Expand Up @@ -70,7 +68,6 @@ func validOptionalPort(port string) bool {
func MaskSinkURI(uri string) (string, error) {
uriParsed, err := url.Parse(uri)
if err != nil {
log.Error("failed to parse sink URI", zap.Error(err))
return "", err
}
queries := uriParsed.Query()
Expand Down Expand Up @@ -99,7 +96,6 @@ var sensitiveQueryParameterNames = []string{
func MaskSensitiveDataInURI(uri string) string {
uriParsed, err := url.Parse(uri)
if err != nil {
log.Error("failed to parse sink URI", zap.Error(err))
return ""
}
queries := uriParsed.Query()
Expand All @@ -114,3 +110,25 @@ func MaskSensitiveDataInURI(uri string) string {
uriParsed.RawQuery = queries.Encode()
return uriParsed.Redacted()
}

// MaskSensitiveDataInURIForError masks sensitive data in a URI for error messages.
func MaskSensitiveDataInURIForError(uri string) string {
maskedURI := MaskSensitiveDataInURI(uri)
if maskedURI == "" && uri != "" {
return "<invalid uri>"
}
return maskedURI
}

// MaskSensitiveDataInURLError masks the URL carried by net/url errors.
func MaskSensitiveDataInURLError(err error) error {
if err == nil {
return nil
}
var urlErr *url.Error
if !stderrors.As(err, &urlErr) {
return err
}
urlErr.URL = MaskSensitiveDataInURIForError(urlErr.URL)
return err
}
Comment on lines +124 to +134

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

In Go (and especially TiCDC), errors are frequently wrapped using errors.Trace or other wrapping mechanisms. The current implementation of MaskSensitiveDataInURLError uses a direct type assertion err.(*url.Error), which will fail to detect and mask the sensitive URL if the error is wrapped.

We should traverse the error chain to find and mask the nested *url.Error in-place, ensuring that sensitive credentials are never leaked even when the error is wrapped.

Suggested change
func MaskSensitiveDataInURLError(err error) error {
if err == nil {
return nil
}
urlErr, ok := err.(*url.Error)
if !ok {
return err
}
return &url.Error{
Op: urlErr.Op,
URL: MaskSensitiveDataInURIForError(urlErr.URL),
Err: urlErr.Err,
}
}
func MaskSensitiveDataInURLError(err error) error {
if err == nil {
return nil
}
type unwrapper interface {
Unwrap() error
}
for curr := err; curr != nil; {
if urlErr, ok := curr.(*url.Error); ok {
urlErr.URL = MaskSensitiveDataInURIForError(urlErr.URL)
return err
}
u, ok := curr.(unwrapper)
if !ok {
break
}
curr = u.Unwrap()
}
return err
}

Loading
Loading