diff --git a/.gitignore b/.gitignore index 5012978619..e224245024 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ tools/bin tools/include tools/workload/bin +.design .issue .vscode .idea diff --git a/api/v2/changefeed.go b/api/v2/changefeed.go index 2524389312..e0eb54519e 100644 --- a/api/v2/changefeed.go +++ b/api/v2/changefeed.go @@ -52,6 +52,15 @@ import ( "go.uber.org/zap" ) +func maskSinkURIForError(sinkURI string) string { + return util.MaskSensitiveDataInURIForError(sinkURI) +} + +func genSinkURIInvalidError(sinkURI string, err error) error { + return errors.WrapError( + errors.ErrSinkURIInvalid, util.MaskSensitiveDataInURLError(err), maskSinkURIForError(sinkURI)) +} + // CreateChangefeed handles create changefeed request, // it returns the changefeed's changefeedInfo that it just created // CreateChangefeed creates a changefeed @@ -135,7 +144,7 @@ func (h *OpenAPIV2) CreateChangefeed(c *gin.Context) { } sinkURIParsed, err := url.Parse(cfg.SinkURI) if err != nil { - _ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, cfg.SinkURI)) + _ = c.Error(genSinkURIInvalidError(cfg.SinkURI, err)) return } @@ -144,7 +153,7 @@ func (h *OpenAPIV2) CreateChangefeed(c *gin.Context) { if config.IsMQScheme(scheme) { topic, err = helper.GetTopic(sinkURIParsed) if err != nil { - _ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, cfg.SinkURI)) + _ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, maskSinkURIForError(cfg.SinkURI))) return } } @@ -291,7 +300,7 @@ func (h *OpenAPIV2) CreateChangefeed(c *gin.Context) { } err = sink.Verify(ctx, cfConfig, changefeedID) if err != nil { - _ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, cfg.SinkURI)) + _ = c.Error(genSinkURIInvalidError(cfg.SinkURI, err)) return } @@ -431,7 +440,7 @@ func (h *OpenAPIV2) VerifyTable(c *gin.Context) { // verify replicaConfig sinkURIParsed, err := url.Parse(cfg.SinkURI) if err != nil { - _ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, cfg.SinkURI)) + _ = c.Error(genSinkURIInvalidError(cfg.SinkURI, err)) return } err = replicaCfg.ValidateAndAdjust(sinkURIParsed) @@ -445,7 +454,7 @@ func (h *OpenAPIV2) VerifyTable(c *gin.Context) { if config.IsMQScheme(scheme) { topic, err = helper.GetTopic(sinkURIParsed) if err != nil { - _ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, cfg.SinkURI)) + _ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, maskSinkURIForError(cfg.SinkURI))) return } } @@ -822,14 +831,14 @@ func (h *OpenAPIV2) ResumeChangefeed(c *gin.Context) { ) sinkURIParsed, err = url.Parse(cfInfo.SinkURI) if err != nil { - _ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, cfInfo.SinkURI)) + _ = c.Error(genSinkURIInvalidError(cfInfo.SinkURI, err)) return } scheme := sinkURIParsed.Scheme if config.IsMQScheme(scheme) { topic, err = helper.GetTopic(sinkURIParsed) if err != nil { - _ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, cfInfo.SinkURI)) + _ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, maskSinkURIForError(cfInfo.SinkURI))) return } } @@ -975,7 +984,7 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) { // verify replicaConfig sinkURIParsed, err := url.Parse(oldCfInfo.SinkURI) if err != nil { - _ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, oldCfInfo.SinkURI)) + _ = c.Error(genSinkURIInvalidError(oldCfInfo.SinkURI, err)) return } err = oldCfInfo.Config.ValidateAndAdjust(sinkURIParsed) @@ -989,7 +998,7 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) { if config.IsMQScheme(scheme) { topic, err = helper.GetTopic(sinkURIParsed) if err != nil { - _ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, oldCfInfo.SinkURI)) + _ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, maskSinkURIForError(oldCfInfo.SinkURI))) return } } @@ -1033,7 +1042,7 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) { err = sink.Verify(ctx, oldCfInfo.ToChangefeedConfig(), oldCfInfo.ChangefeedID) if err != nil { - _ = c.Error(errors.WrapError(errors.ErrSinkURIInvalid, err, oldCfInfo.SinkURI)) + _ = c.Error(genSinkURIInvalidError(oldCfInfo.SinkURI, err)) return } diff --git a/api/v2/changefeed_test.go b/api/v2/changefeed_test.go new file mode 100644 index 0000000000..76a161d6e3 --- /dev/null +++ b/api/v2/changefeed_test.go @@ -0,0 +1,57 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package v2 + +import ( + "net/url" + "testing" + + perrors "github.com/pingcap/errors" + "github.com/stretchr/testify/require" +) + +func TestMaskSinkURIForError(t *testing.T) { + sinkURI := "kafka://127.0.0.1:9092/topic?protocol=canal-json" + + "&sasl-user=ticdc&sasl-password=verysecure&secret-access-key=rawsecret" + + maskedURI := maskSinkURIForError(sinkURI) + require.NotContains(t, maskedURI, "verysecure") + require.NotContains(t, maskedURI, "rawsecret") + require.Contains(t, maskedURI, "sasl-password=xxxxx") + require.Contains(t, maskedURI, "secret-access-key=xxxxx") + require.Contains(t, maskedURI, "sasl-user=ticdc") + + invalidURI := "mysql://root:verysecure@127.0.0.1/%zz" + require.Equal(t, "", maskSinkURIForError(invalidURI)) + + err := genSinkURIInvalidError(invalidURI, mustParseURLError(t, invalidURI)) + require.NotContains(t, err.Error(), "verysecure") + require.Contains(t, err.Error(), "") + require.Contains(t, err.Error(), `parse ""`) + require.Contains(t, err.Error(), "invalid URL escape") + + err = genSinkURIInvalidError(invalidURI, perrors.Trace(mustParseURLError(t, invalidURI))) + require.NotContains(t, err.Error(), "verysecure") + require.Contains(t, err.Error(), "") + require.Contains(t, err.Error(), `parse ""`) + require.Contains(t, err.Error(), "invalid URL escape") +} + +func mustParseURLError(t *testing.T, rawURL string) error { + t.Helper() + + _, err := url.Parse(rawURL) + require.Error(t, err) + return err +} diff --git a/downstreamadapter/sink/sink.go b/downstreamadapter/sink/sink.go index 12a934d2c6..6e797fc448 100644 --- a/downstreamadapter/sink/sink.go +++ b/downstreamadapter/sink/sink.go @@ -26,6 +26,7 @@ import ( commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/util" ) type Sink interface { @@ -50,7 +51,10 @@ type Sink interface { func New(ctx context.Context, cfg *config.ChangefeedConfig, changefeedID common.ChangeFeedID) (Sink, error) { sinkURI, err := url.Parse(cfg.SinkURI) if err != nil { - return nil, errors.WrapError(errors.ErrSinkURIInvalid, err) + return nil, errors.WrapError( + errors.ErrSinkURIInvalid, + util.MaskSensitiveDataInURLError(err), + util.MaskSensitiveDataInURIForError(cfg.SinkURI)) } scheme := config.GetScheme(sinkURI) switch scheme { @@ -65,13 +69,17 @@ func New(ctx context.Context, cfg *config.ChangefeedConfig, changefeedID common. case config.BlackHoleScheme: return blackhole.New(changefeedID) } - return nil, errors.ErrSinkURIInvalid.GenWithStackByArgs(sinkURI) + return nil, errors.ErrSinkURIInvalid.GenWithStackByArgs( + util.MaskSensitiveDataInURIForError(sinkURI.String())) } func Verify(ctx context.Context, cfg *config.ChangefeedConfig, changefeedID common.ChangeFeedID) error { sinkURI, err := url.Parse(cfg.SinkURI) if err != nil { - return errors.WrapError(errors.ErrSinkURIInvalid, err) + return errors.WrapError( + errors.ErrSinkURIInvalid, + util.MaskSensitiveDataInURLError(err), + util.MaskSensitiveDataInURIForError(cfg.SinkURI)) } scheme := config.GetScheme(sinkURI) switch scheme { @@ -86,5 +94,6 @@ func Verify(ctx context.Context, cfg *config.ChangefeedConfig, changefeedID comm case config.BlackHoleScheme: return nil } - return errors.ErrSinkURIInvalid.GenWithStackByArgs(sinkURI) + return errors.ErrSinkURIInvalid.GenWithStackByArgs( + util.MaskSensitiveDataInURIForError(sinkURI.String())) } diff --git a/downstreamadapter/sink/sink_test.go b/downstreamadapter/sink/sink_test.go new file mode 100644 index 0000000000..0d812aa533 --- /dev/null +++ b/downstreamadapter/sink/sink_test.go @@ -0,0 +1,77 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sink + +import ( + "context" + "testing" + + "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/config" + "github.com/stretchr/testify/require" +) + +func TestUnknownSchemeMasksSensitiveSinkURI(t *testing.T) { + t.Parallel() + + changefeedID := common.NewChangefeedID(common.DefaultKeyspaceName) + cfg := &config.ChangefeedConfig{ + SinkURI: "unknown://127.0.0.1:9092/topic?sasl-password=verysecure&access-key=rawkey", + } + + _, err := New(context.Background(), cfg, changefeedID) + require.Error(t, err) + requireMaskedSinkURIError(t, err) + + err = Verify(context.Background(), cfg, changefeedID) + require.Error(t, err) + requireMaskedSinkURIError(t, err) +} + +func TestParseErrorMasksSensitiveSinkURI(t *testing.T) { + t.Parallel() + + changefeedID := common.NewChangefeedID(common.DefaultKeyspaceName) + cfg := &config.ChangefeedConfig{ + SinkURI: "mysql://root:verysecure@127.0.0.1/%zz", + } + + _, err := New(context.Background(), cfg, changefeedID) + require.Error(t, err) + requireInvalidSinkURIError(t, err) + + err = Verify(context.Background(), cfg, changefeedID) + require.Error(t, err) + requireInvalidSinkURIError(t, err) +} + +func requireMaskedSinkURIError(t *testing.T, err error) { + t.Helper() + + errMsg := err.Error() + require.NotContains(t, errMsg, "verysecure") + require.NotContains(t, errMsg, "rawkey") + require.Contains(t, errMsg, "sasl-password=xxxxx") + require.Contains(t, errMsg, "access-key=xxxxx") +} + +func requireInvalidSinkURIError(t *testing.T, err error) { + t.Helper() + + errMsg := err.Error() + require.NotContains(t, errMsg, "verysecure") + require.Contains(t, errMsg, "") + require.Contains(t, errMsg, `parse ""`) + require.Contains(t, errMsg, "invalid URL escape") +} diff --git a/pkg/check/cluster.go b/pkg/check/cluster.go index d320297a94..534fdaab41 100644 --- a/pkg/check/cluster.go +++ b/pkg/check/cluster.go @@ -103,7 +103,10 @@ func getClusterIDBySinkURI( ) (uint64, string, bool, error) { uri, err := url.Parse(sinkURI) if err != nil { - return 0, "", false, cerrors.WrapError(cerrors.ErrSinkURIInvalid, err, sinkURI) + return 0, "", false, cerrors.WrapError( + cerrors.ErrSinkURIInvalid, + util.MaskSensitiveDataInURLError(err), + util.MaskSensitiveDataInURIForError(sinkURI)) } scheme := config.GetScheme(uri) diff --git a/pkg/util/uri.go b/pkg/util/uri.go index 18bdb01bd5..fb249a96f6 100644 --- a/pkg/util/uri.go +++ b/pkg/util/uri.go @@ -14,12 +14,10 @@ package util import ( + stderrors "errors" "net" "net/url" "strings" - - "github.com/pingcap/log" - "go.uber.org/zap" ) // IsValidIPv6AddressFormatInURI reports whether hostPort is a valid IPv6 address in URI. @@ -70,7 +68,6 @@ func validOptionalPort(port string) bool { func MaskSinkURI(uri string) (string, error) { uriParsed, err := url.Parse(uri) if err != nil { - log.Error("failed to parse sink URI", zap.Error(err)) return "", err } queries := uriParsed.Query() @@ -99,7 +96,6 @@ var sensitiveQueryParameterNames = []string{ func MaskSensitiveDataInURI(uri string) string { uriParsed, err := url.Parse(uri) if err != nil { - log.Error("failed to parse sink URI", zap.Error(err)) return "" } queries := uriParsed.Query() @@ -114,3 +110,25 @@ func MaskSensitiveDataInURI(uri string) string { uriParsed.RawQuery = queries.Encode() return uriParsed.Redacted() } + +// MaskSensitiveDataInURIForError masks sensitive data in a URI for error messages. +func MaskSensitiveDataInURIForError(uri string) string { + maskedURI := MaskSensitiveDataInURI(uri) + if maskedURI == "" && uri != "" { + return "" + } + return maskedURI +} + +// MaskSensitiveDataInURLError masks the URL carried by net/url errors. +func MaskSensitiveDataInURLError(err error) error { + if err == nil { + return nil + } + var urlErr *url.Error + if !stderrors.As(err, &urlErr) { + return err + } + urlErr.URL = MaskSensitiveDataInURIForError(urlErr.URL) + return err +} diff --git a/pkg/util/uri_test.go b/pkg/util/uri_test.go new file mode 100644 index 0000000000..42bfe61f6d --- /dev/null +++ b/pkg/util/uri_test.go @@ -0,0 +1,51 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "net/url" + "testing" + + perrors "github.com/pingcap/errors" + "github.com/stretchr/testify/require" +) + +func TestMaskSensitiveDataInURIForError(t *testing.T) { + require.Equal(t, "", MaskSensitiveDataInURIForError("")) + require.Equal(t, "abc", MaskSensitiveDataInURIForError("abc")) + require.Equal(t, + "mysql://root:xxxxx@127.0.0.1:3306/?sasl-password=xxxxx", + MaskSensitiveDataInURIForError("mysql://root:verysecure@127.0.0.1:3306/?sasl-password=rawsecret")) + require.Equal(t, "", MaskSensitiveDataInURIForError("mysql://root:verysecure@127.0.0.1/%zz")) +} + +func TestMaskSensitiveDataInURLError(t *testing.T) { + rawURL := "mysql://root:verysecure@127.0.0.1/%zz" + _, err := url.Parse(rawURL) + require.Error(t, err) + + maskedErr := MaskSensitiveDataInURLError(err) + require.NotContains(t, maskedErr.Error(), "verysecure") + require.Contains(t, maskedErr.Error(), `parse ""`) + require.Contains(t, maskedErr.Error(), "invalid URL escape") + + _, wrappedErr := url.Parse(rawURL) + require.Error(t, wrappedErr) + wrappedErr = perrors.Trace(wrappedErr) + + maskedErr = MaskSensitiveDataInURLError(wrappedErr) + require.NotContains(t, maskedErr.Error(), "verysecure") + require.Contains(t, maskedErr.Error(), `parse ""`) + require.Contains(t, maskedErr.Error(), "invalid URL escape") +}