Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ linters:
- bodyclose
# copyloopvar: detects redundant "x := x" in Go 1.22+ loop variables.
- copyloopvar
# depguard: forbids direct imports that bypass repository wrappers.
- depguard
# durationcheck: checks for suspicious duration multiplication.
- durationcheck
# errname: checks sentinel error naming (Err prefix, Error suffix).
Expand All @@ -19,6 +21,8 @@ linters:
- errorlint
# gosec: inspects source code for security problems.
- gosec
# importas: enforces consistent import aliases.
- importas
# misspell: finds commonly misspelled English words.
- misspell
# nilerr: finds code that returns nil even if err is not nil.
Expand All @@ -35,6 +39,23 @@ linters:
- unconvert

settings:
depguard:
rules:
no-direct-errors:
files:
- $all
- '!pkg/errors/**'
deny:
- pkg: errors$
desc: use github.com/pingcap/ticdc/pkg/errors outside pkg/errors
- pkg: github.com/pingcap/errors$
desc: use github.com/pingcap/ticdc/pkg/errors outside pkg/errors

importas:
alias:
- pkg: github.com/pingcap/ticdc/pkg/errors
alias: ""

revive:
rules:
- name: blank-imports
Expand Down Expand Up @@ -67,10 +88,12 @@ linters:
- G104

# ST1000: don't require package comments.
# ST1003: don't enforce naming conventions on legacy identifiers.
staticcheck:
checks:
- all
- -ST1000
- -ST1003

exclusions:
# Strict: exclude all known generated file patterns.
Expand Down
10 changes: 9 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Phony targets are targets that are not associated with files.
# Add new phony targets here to make them available in the `make` command.
.PHONY: clean fmt check check-static tidy \
.PHONY: clean fmt check check-static local-static-check tidy \
generate-protobuf generate_mock \
cdc kafka_consumer storage_consumer pulsar_consumer filter_helper \
prepare_test_binaries \
Expand Down Expand Up @@ -320,6 +320,14 @@ else
tools/bin/golangci-lint run --timeout 10m0s
endif

# Lint only code changed on the current branch (vs upstream/master by default).
# Override base with LINT_BASE=<ref>.
# make local-static-check
# make local-static-check LINT_BASE=HEAD~3
local-static-check: tools/bin/golangci-lint
$(eval BASE := $(if $(LINT_BASE),$(LINT_BASE),upstream/master))
tools/bin/golangci-lint run --timeout 10m0s --new-from-rev=$(BASE)

check-ticdc-dashboard:
@echo "check-ticdc-dashboard"
@./scripts/check-ticdc-dashboard.sh
Expand Down
2 changes: 1 addition & 1 deletion api/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (h *statusAPI) handleDebugInfo(w http.ResponseWriter, req *http.Request) {
h.writeEtcdInfo(ctx, h.server.GetEtcdClient(), w)
}

func (h *statusAPI) handleStatus(w http.ResponseWriter, req *http.Request) {
func (h *statusAPI) handleStatus(w http.ResponseWriter, _ *http.Request) {
st := status{
Version: version.ReleaseVersion,
GitHash: version.GitHash,
Expand Down
19 changes: 0 additions & 19 deletions api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/integrity"
"github.com/pingcap/ticdc/pkg/liveness"
"github.com/pingcap/ticdc/pkg/security"
"github.com/pingcap/ticdc/pkg/util"
)

Expand Down Expand Up @@ -85,12 +84,6 @@ type VerifyTableConfig struct {
SinkURI string `json:"sink_uri"`
}

func getDefaultVerifyTableConfig() *VerifyTableConfig {
return &VerifyTableConfig{
ReplicaConfig: GetDefaultReplicaConfig(),
}
}

// ResumeChangefeedConfig is used by resume changefeed api
type ResumeChangefeedConfig struct {
PDConfig
Expand Down Expand Up @@ -1350,18 +1343,6 @@ type SyncedStatus struct {
Info string `json:"info"`
}

// toCredential generates a security.Credential from a PDConfig
func (cfg *PDConfig) toCredential() *security.Credential {
credential := &security.Credential{
CAPath: cfg.CAPath,
CertPath: cfg.CertPath,
KeyPath: cfg.KeyPath,
}
credential.CertAllowedCN = make([]string, len(cfg.CertAllowedCN))
copy(credential.CertAllowedCN, cfg.CertAllowedCN)
return credential
}

// Marshal returns the json marshal format of a ChangeFeedInfo
func (info *ChangeFeedInfo) Marshal() (string, error) {
data, err := json.Marshal(info)
Expand Down
19 changes: 4 additions & 15 deletions cmd/cdc/cli/cli_changefeed_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@
package cli

import (
"bufio"
"fmt"
"os"
"strings"
"time"

v2 "github.com/pingcap/ticdc/api/v2"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/spf13/cobra"
"github.com/tikv/client-go/v2/oracle"
)
Expand All @@ -32,15 +30,6 @@ const (
tsGapWarning = 86400 * 1000
)

func readInput() (string, error) {
reader := bufio.NewReader(os.Stdin)
msg, err := reader.ReadString('\n')
if err != nil {
return "", err
}
return strings.TrimSpace(msg), nil
}

func readYOrN(cmd *cobra.Command) bool {
var yOrN string
_, err := fmt.Scan(&yOrN)
Expand All @@ -66,7 +55,7 @@ func confirmLargeDataGap(cmd *cobra.Command, currentPhysical int64, startTs uint
confirmed := readYOrN(cmd)
if !confirmed {
cmd.Printf("Abort changefeed %s.\n", command)
return cerror.ErrCliAborted.FastGenByArgs(fmt.Sprintf("cli changefeed %s", command))
return errors.ErrCliAborted.FastGen("cli changefeed %s", command)
}
}

Expand All @@ -84,7 +73,7 @@ func confirmOverwriteCheckpointTs(
confirmed := readYOrN(cmd)
if !confirmed {
cmd.Printf("Abort changefeed resume.\n")
return cerror.ErrCliAborted.FastGenByArgs("cli changefeed resume")
return errors.ErrCliAborted.FastGenByArgs("cli changefeed resume")
}

return nil
Expand All @@ -99,7 +88,7 @@ func confirmIgnoreIneligibleTables(cmd *cobra.Command) (bool, error) {
confirmed := readYOrN(cmd)
if !confirmed {
cmd.Printf("No changefeed is created because you don't want to ignore some tables.\n")
return false, cerror.ErrCliAborted.FastGenByArgs("cli changefeed create")
return false, errors.ErrCliAborted.FastGenByArgs("cli changefeed create")
}

return true, nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/config-converter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func main() {
}
}

func run(cmd *cobra.Command, args []string) {
func run(_ *cobra.Command, _ []string) {
if cfgPath != "" && modelPath != "" {
fmt.Fprintln(os.Stderr, "can't specify both config and model")
os.Exit(ExitCodeInvalidFlag)
Expand Down
8 changes: 5 additions & 3 deletions cmd/storage-consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import (
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cmd/util"
"github.com/pingcap/ticdc/downstreamadapter/sink"
"github.com/pingcap/ticdc/downstreamadapter/sink/helper"
commonType "github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/sink/cloudstorage"
"github.com/pingcap/ticdc/pkg/sink/codec/canal"
"github.com/pingcap/ticdc/pkg/sink/codec/common"
Expand Down Expand Up @@ -244,7 +244,7 @@ func (c *consumer) getNewFiles(
origDMLIdxMap[k] = m
}

err := c.externalStorage.WalkDir(ctx, opt, func(path string, size int64) error {
err := c.externalStorage.WalkDir(ctx, opt, func(path string, _ int64) error {
if cloudstorage.IsSchemaFile(path) {
err := c.parseSchemaFilePath(ctx, path)
if err != nil {
Expand Down Expand Up @@ -713,7 +713,9 @@ func (c *consumer) handleNewFiles(
}
}
}
c.flushDMLEvents(ctx, tableID)
if err := c.flushDMLEvents(ctx, tableID); err != nil {
return err
}
}

return nil
Expand Down
3 changes: 2 additions & 1 deletion cmd/storage-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/logger"
"github.com/pingcap/ticdc/pkg/version"
"go.uber.org/zap"
Expand Down Expand Up @@ -103,7 +104,7 @@ func main() {
if consumer != nil {
consumer.sink.Close()
}
if err != nil && err != context.Canceled {
if err != nil && !errors.Is(err, context.Canceled) {
return 1
}
return 0
Expand Down
17 changes: 11 additions & 6 deletions coordinator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func NewController(
// detect the capture changes
c.nodeManager.RegisterNodeChangeHandler(
nodeChangeHandlerID,
func(allNodes map[node.ID]*node.Info) {
func(_ map[node.ID]*node.Info) {
c.nodeChanged.Lock()
defer c.nodeChanged.Unlock()
c.nodeChanged.changed = true
Expand Down Expand Up @@ -355,9 +355,14 @@ func (c *Controller) RequestResolvedTsFromLogCoordinator(ctx context.Context, ch
changefeedID := c.changefeedDB.GetChangefeedIDByName(changefeedDisplayName)
ids := c.nodeManager.GetAliveNodeIDs()
for _, id := range ids {
c.messageCenter.SendEvent(messaging.NewSingleTargetMessage(id, messaging.LogCoordinatorTopic, &heartbeatpb.LogCoordinatorResolvedTsRequest{
if err := c.messageCenter.SendEvent(messaging.NewSingleTargetMessage(id, messaging.LogCoordinatorTopic, &heartbeatpb.LogCoordinatorResolvedTsRequest{
ChangefeedID: changefeedID.ToPB(),
}))
})); err != nil {
log.Warn("failed to request resolved ts from log coordinator",
zap.Stringer("target", id),
zap.String("changefeed", changefeedID.DisplayName.String()),
zap.Error(err))
}
}

// wait for some time to get the resolved ts
Expand Down Expand Up @@ -686,7 +691,7 @@ func (c *Controller) CreateChangefeed(ctx context.Context, info *config.ChangeFe
return errors.Trace(ctx.Err())
case <-ticker.C:
log.Warn("changefeed is in scheduling, wait a moment", zap.String("changefeed", info.ChangefeedID.DisplayName.String()))
count += 1
count++
}
}

Expand Down Expand Up @@ -728,7 +733,7 @@ func (c *Controller) RemoveChangefeed(ctx context.Context, id common.ChangeFeedI
case <-ctx.Done():
return 0, errors.Trace(ctx.Err())
case <-ticker.C:
count += 1
count++
log.Info("wait for stop changefeed operator finished", zap.Int("count", count), zap.Any("id", id))
}
}
Expand Down Expand Up @@ -766,7 +771,7 @@ func (c *Controller) PauseChangefeed(ctx context.Context, id common.ChangeFeedID
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-ticker.C:
count += 1
count++
log.Info("wait for stop changefeed operator finished", zap.Int("count", count), zap.Any("id", id))
}
}
Expand Down
5 changes: 3 additions & 2 deletions coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/ticdc/pkg/common"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/etcd"
"github.com/pingcap/ticdc/pkg/eventservice"
"github.com/pingcap/ticdc/pkg/messaging"
Expand Down Expand Up @@ -682,7 +683,7 @@ func TestConcurrentStopAndSendEvents(t *testing.T) {
ctxRun, cancelRun := context.WithCancel(ctx)
go func() {
err := cr.Run(ctxRun)
if err != nil && err != context.Canceled {
if err != nil && !errors.Is(err, context.Canceled) {
t.Errorf("Coordinator Run returned unexpected error: %v", err)
}
}()
Expand Down Expand Up @@ -715,7 +716,7 @@ func TestConcurrentStopAndSendEvents(t *testing.T) {

// Use recvMessages to send event to channel
err := co.recvMessages(ctx, msg)
if err != nil && err != context.Canceled {
if err != nil && !errors.Is(err, context.Canceled) {
t.Logf("Failed to send event in goroutine %d: %v", id, err)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (e *DispatcherManager) getRedoEventCollectorBatchCountAndBytes(redoSink *re
return batchCount, batchBytes
}

func (e *DispatcherManager) newRedoDispatchers(infos map[common.DispatcherID]dispatcherCreateInfo, removeDDLTs bool) error {
func (e *DispatcherManager) newRedoDispatchers(infos map[common.DispatcherID]dispatcherCreateInfo, _ bool) error {
start := time.Now()
batchCount, batchBytes := e.getRedoEventCollectorBatchCountAndBytes(e.redoSink)

Expand Down
Loading
Loading