Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/golang-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ jobs:
- name: golangci-lint flow
uses: golangci/golangci-lint-action@1e7e51e771db61008b38414a730f564565cf7c20 # v9
with:
version: v2.9.0
version: v2.10.1
working-directory: ./flow
args: --timeout=10m
- name: golangci-lint e2e_cleanup
uses: golangci/golangci-lint-action@1e7e51e771db61008b38414a730f564565cf7c20 # v9
with:
version: v2.9.0
version: v2.10.1
working-directory: ./e2e_cleanup
args: --timeout=10m
2 changes: 1 addition & 1 deletion e2e_cleanup/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/PeerDB-io/peerdb/e2e_cleanup

go 1.25.0
go 1.26.0

require (
cloud.google.com/go/bigquery v1.72.0
Expand Down
6 changes: 3 additions & 3 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (a *FlowableActivity) SyncFlow(
var normalizeWaiting atomic.Bool
var syncingBatchID atomic.Int64
var syncState atomic.Pointer[string]
syncState.Store(shared.Ptr("setup"))
syncState.Store(new("setup"))
shutdown := common.HeartbeatRoutine(ctx, func() string {
// Must load Waiting after BatchID to avoid race saying we're waiting on currently processing batch
sBatchID := syncingBatchID.Load()
Expand Down Expand Up @@ -399,7 +399,7 @@ func (a *FlowableActivity) SyncFlow(
break
}
logger.Error("failed to sync records", slog.Any("error", syncErr))
syncState.Store(shared.Ptr("cleanup"))
syncState.Store(new("cleanup"))
close(syncDone)
normRequests.Close()
normResponses.Close()
Expand All @@ -418,7 +418,7 @@ func (a *FlowableActivity) SyncFlow(
}
}

syncState.Store(shared.Ptr("cleanup"))
syncState.Store(new("cleanup"))
close(syncDone)
normRequests.Close()
normResponses.Close()
Expand Down
10 changes: 5 additions & 5 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
}

startTime := time.Now()
syncState.Store(shared.Ptr("syncing"))
syncState.Store(new("syncing"))
errGroup, errCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
return pull(srcConn, errCtx, a.CatalogPool, a.OtelManager, &model.PullRecordsRequest[Items]{
Expand Down Expand Up @@ -235,7 +235,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
}
defer dstClose(ctx)

syncState.Store(shared.Ptr("updating schema"))
syncState.Store(new("updating schema"))
if err := dstConn.ReplayTableSchemaDeltas(
ctx, config.Env, flowName, options.TableMappings, recordBatchSync.SchemaDeltas, config.Flags,
); err != nil {
Expand Down Expand Up @@ -319,7 +319,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
}
return nil, fmt.Errorf("[cdc] failed to pull records: %w", err)
}
syncState.Store(shared.Ptr("bookkeeping"))
syncState.Store(new("bookkeeping"))

syncDuration := time.Since(syncStartTime)
lastCheckpoint := recordBatchSync.GetLastCheckpoint()
Expand Down Expand Up @@ -350,13 +350,13 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon

a.OtelManager.Metrics.CurrentBatchIdGauge.Record(ctx, res.CurrentSyncBatchID)

syncState.Store(shared.Ptr("updating schema"))
syncState.Store(new("updating schema"))
if err := a.applySchemaDeltas(ctx, config, res.TableSchemaDeltas); err != nil {
return nil, err
}

if recordBatchSync.NeedsNormalize() {
syncState.Store(shared.Ptr("normalizing"))
syncState.Store(new("normalizing"))
normRequests.Update(res.CurrentSyncBatchID)
normWaitThreshold := res.CurrentSyncBatchID - normBufferSize
if normResponses.Load() <= normWaitThreshold {
Expand Down
2 changes: 1 addition & 1 deletion flow/alerting/slack_alert_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (s *SlackAlertSender) getOpenConnectionsAlertThreshold() uint32 {
}

type slackAlertConfig struct {
AuthToken string `json:"auth_token"`
AuthToken string `json:"auth_token"` //nolint:gosec // G117: credential field by design, encrypted outside
ChannelIDs []string `json:"channel_ids"`
Members []string `json:"members"`
SlotLagMBAlertThreshold uint32 `json:"slot_lag_mb_alert_threshold"`
Expand Down
11 changes: 5 additions & 6 deletions flow/cmd/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"os"
"strings"

"github.com/aws/smithy-go/ptr"
"go.temporal.io/sdk/client"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -73,7 +72,7 @@ func MaintenanceMain(ctx context.Context, args *MaintenanceCLIParams) error {
slog.InfoContext(ctx, "Assuming maintenance workflows were skipped")
return WriteMaintenanceOutputToCatalog(ctx, StartMaintenanceResult{
Skipped: true,
SkippedReason: ptr.String("Assumed skipped by CLI Flag"),
SkippedReason: new("Assumed skipped by CLI Flag"),
CLIVersion: internal.PeerDBVersionShaShort(),
})
}
Expand Down Expand Up @@ -143,7 +142,7 @@ func skipStartMaintenanceIfNeeded(ctx context.Context, args *MaintenanceCLIParam
slog.InfoContext(ctx, "Skipping maintenance workflow due to missing k8s service", "service", args.SkipIfK8sServiceMissing)
return true, WriteMaintenanceOutputToCatalog(ctx, StartMaintenanceResult{
Skipped: true,
SkippedReason: ptr.String(fmt.Sprintf("K8s service %s missing", args.SkipIfK8sServiceMissing)),
SkippedReason: new(fmt.Sprintf("K8s service %s missing", args.SkipIfK8sServiceMissing)),
CLIVersion: internal.PeerDBVersionShaShort(),
CLIDeployVersion: internal.PeerDBDeploymentVersion(),
})
Expand Down Expand Up @@ -189,11 +188,11 @@ func skipStartMaintenanceIfNeeded(ctx context.Context, args *MaintenanceCLIParam
"deployApiVersion", version.DeploymentVersion, "cliDeployVersion", internal.PeerDBDeploymentVersion())
return true, WriteMaintenanceOutputToCatalog(ctx, StartMaintenanceResult{
Skipped: true,
SkippedReason: ptr.String("Version Mismatch: " + strings.Join(skippedReasons, ", ")),
SkippedReason: new("Version Mismatch: " + strings.Join(skippedReasons, ", ")),
CLIVersion: internal.PeerDBVersionShaShort(),
CLIDeployVersion: internal.PeerDBDeploymentVersion(),
APIVersion: version.Version,
APIDeployVersion: ptr.ToString(version.DeploymentVersion),
APIDeployVersion: shared.Val(version.DeploymentVersion),
})
}
}
Expand All @@ -208,7 +207,7 @@ func skipStartMaintenanceIfNeeded(ctx context.Context, args *MaintenanceCLIParam
slog.InfoContext(ctx, "Skipping maintenance workflow due to no mirrors")
return true, WriteMaintenanceOutputToCatalog(ctx, StartMaintenanceResult{
Skipped: true,
SkippedReason: ptr.String("No mirrors found"),
SkippedReason: new("No mirrors found"),
})
}
}
Expand Down
3 changes: 1 addition & 2 deletions flow/cmd/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ func (h *FlowRequestHandler) GetDynamicSettings(
if _, err := pgx.ForEachRow(rows, []any{&name, &value}, func() error {
if idx, ok := internal.DynamicIndex[name]; ok {
settings[idx] = proto.CloneOf(settings[idx])
newValue := value // create a new string reference as value can be overwritten by the next iteration.
settings[idx].Value = &newValue
settings[idx].Value = new(value) // create a new string reference as value can be overwritten by the next iteration.
}
return nil
}); err != nil {
Expand Down
5 changes: 2 additions & 3 deletions flow/connectors/mysql/rds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"os"
"testing"

"github.com/aws/smithy-go/ptr"
"github.com/stretchr/testify/require"

"github.com/PeerDB-io/peerdb/flow/generated/protos"
Expand All @@ -29,7 +28,7 @@ func TestAwsRDSIAMAuthConnectForMYSQL(t *testing.T) {
AuthConfig: &protos.AwsAuthenticationConfig_Role{
Role: &protos.AWSAuthAssumeRoleConfig{
AssumeRoleArn: os.Getenv("FLOW_TESTS_RDS_IAM_AUTH_ASSUME_ROLE"),
ChainedRoleArn: ptr.String(os.Getenv("FLOW_TESTS_RDS_IAM_AUTH_CHAINED_ROLE")),
ChainedRoleArn: new(os.Getenv("FLOW_TESTS_RDS_IAM_AUTH_CHAINED_ROLE")),
},
},
},
Expand Down Expand Up @@ -60,7 +59,7 @@ func TestAwsRDSIAMAuthConnectForMYSQLViaProxy(t *testing.T) {
AuthConfig: &protos.AwsAuthenticationConfig_Role{
Role: &protos.AWSAuthAssumeRoleConfig{
AssumeRoleArn: os.Getenv("FLOW_TESTS_RDS_IAM_AUTH_ASSUME_ROLE"),
ChainedRoleArn: ptr.String(os.Getenv("FLOW_TESTS_RDS_IAM_AUTH_CHAINED_ROLE")),
ChainedRoleArn: new(os.Getenv("FLOW_TESTS_RDS_IAM_AUTH_CHAINED_ROLE")),
},
},
},
Expand Down
5 changes: 2 additions & 3 deletions flow/connectors/postgres/rds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"os"
"testing"

"github.com/aws/smithy-go/ptr"
"github.com/stretchr/testify/require"

"github.com/PeerDB-io/peerdb/flow/generated/protos"
Expand All @@ -31,7 +30,7 @@ func TestAwsRDSIAMAuthConnectForPostgres(t *testing.T) {
AuthConfig: &protos.AwsAuthenticationConfig_Role{
Role: &protos.AWSAuthAssumeRoleConfig{
AssumeRoleArn: os.Getenv("FLOW_TESTS_RDS_IAM_AUTH_ASSUME_ROLE"),
ChainedRoleArn: ptr.String(os.Getenv("FLOW_TESTS_RDS_IAM_AUTH_CHAINED_ROLE")),
ChainedRoleArn: new(os.Getenv("FLOW_TESTS_RDS_IAM_AUTH_CHAINED_ROLE")),
},
},
},
Expand Down Expand Up @@ -74,7 +73,7 @@ func TestAwsRDSIAMAuthConnectForPostgresViaProxy(t *testing.T) {
AuthConfig: &protos.AwsAuthenticationConfig_Role{
Role: &protos.AWSAuthAssumeRoleConfig{
AssumeRoleArn: os.Getenv("FLOW_TESTS_RDS_IAM_AUTH_ASSUME_ROLE"),
ChainedRoleArn: ptr.String(os.Getenv("FLOW_TESTS_RDS_IAM_AUTH_CHAINED_ROLE")),
ChainedRoleArn: new(os.Getenv("FLOW_TESTS_RDS_IAM_AUTH_CHAINED_ROLE")),
},
},
},
Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"sync/atomic"
"time"

"github.com/aws/smithy-go/ptr"
"github.com/jackc/pgx/v5/pgtype"
"github.com/snowflakedb/gosnowflake"
"go.temporal.io/sdk/log"
Expand Down Expand Up @@ -92,7 +91,7 @@ func NewSnowflakeConnector(
}

additionalParams := make(map[string]*string)
additionalParams["CLIENT_SESSION_KEEP_ALIVE"] = ptr.String("true")
additionalParams["CLIENT_SESSION_KEEP_ALIVE"] = new("true")

snowflakeConfig := gosnowflake.Config{
Account: snowflakeProtoConfig.AccountId,
Expand Down
12 changes: 1 addition & 11 deletions flow/connectors/utils/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/sts"
smithyendpoints "github.com/aws/smithy-go/endpoints"
"github.com/aws/smithy-go/ptr"
"github.com/google/uuid"

"github.com/PeerDB-io/peerdb/flow/generated/protos"
Expand All @@ -33,15 +32,6 @@ const (

var s3CompatibleServiceEndpointPattern = regexp.MustCompile(`^https?://[a-zA-Z0-9.-]+(:\d+)?$`)

type AWSSecrets struct {
AccessKeyID string
SecretAccessKey string
AwsRoleArn string
Region string
Endpoint string
SessionToken string
}

type PeerAWSCredentials struct {
Credentials aws.Credentials
RoleArn *string
Expand Down Expand Up @@ -199,7 +189,7 @@ func (a *AssumeRoleBasedAWSCredentialsProvider) Retrieve(ctx context.Context) (A
}
return AWSCredentials{
AWS: retrieved,
EndpointUrl: ptr.String(a.GetEndpointURL()),
EndpointUrl: new(a.GetEndpointURL()),
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/utils/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type GcpServiceAccount struct {
Type string `json:"type"`
ProjectID string `json:"project_id"`
PrivateKeyID string `json:"private_key_id"`
PrivateKey string `json:"private_key"`
PrivateKey string `json:"private_key"` //nolint:gosec // G117: deserialized from Google credential files, by design
ClientEmail string `json:"client_email"`
ClientID string `json:"client_id"`
AuthURI string `json:"auth_uri"`
Expand Down
1 change: 0 additions & 1 deletion flow/e2e/clickhouse_mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,6 @@ func (s ClickHouseSuite) Test_MySQL_Specific_Geometric_Types() {
require.Len(s.t, row, 8, "expected 8 columns")
for j := 1; j < 8; j++ {
geometryVal := row[j].Value()
//nolint:gosec
require.Equal(s.t, expectedValues[i][j-1], geometryVal, "geometry value mismatch at row %d column %d", i+1, j)
}
}
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TableMappings(s GenericSuite, tables ...string) []*protos.TableMapping {
for i := 0; i < len(tables); i += 2 {
tm = append(tm, &protos.TableMapping{
SourceTableIdentifier: AttachSchema(s, tables[i]),
DestinationTableIdentifier: s.DestinationTable(tables[i+1]),
DestinationTableIdentifier: s.DestinationTable(tables[i+1]), //nolint:gosec // G602: even length enforced above
ShardingKey: "id",
})
}
Expand Down
6 changes: 3 additions & 3 deletions flow/e2e/s3_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type S3PeerCredentials struct {
AccessKeyID string `json:"accessKeyId"`
SecretAccessKey string `json:"secretAccessKey"`
AwsRoleArn string `json:"awsRoleArn"`
SessionToken string `json:"sessionToken"`
SessionToken string `json:"sessionToken"` //nolint:gosec // G117: sometimes provided from file
Region string `json:"region"`
Endpoint string `json:"endpoint"`
}
Expand Down Expand Up @@ -73,7 +73,7 @@ func NewS3TestHelper(ctx context.Context, s3environment S3Environment) (*S3TestH
if err != nil {
return nil, err
}
rootCA = shared.Ptr(base64.StdEncoding.EncodeToString(bytes))
rootCA = new(base64.StdEncoding.EncodeToString(bytes))
tlsHost = "minio.local"
default:
panic(fmt.Sprintf("invalid s3environment %d", s3environment))
Expand All @@ -97,7 +97,7 @@ func NewS3TestHelper(ctx context.Context, s3environment S3Environment) (*S3TestH
AccessKeyId: &config.AccessKeyID,
SecretAccessKey: &config.SecretAccessKey,
Region: &config.Region,
Endpoint: shared.Ptr(endpoint),
Endpoint: new(endpoint),
RootCa: rootCA,
TlsHost: tlsHost,
}
Expand Down
2 changes: 1 addition & 1 deletion flow/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/PeerDB-io/peerdb/flow

go 1.25.5
go 1.26.0

require (
cloud.google.com/go v0.123.0
Expand Down
3 changes: 1 addition & 2 deletions flow/internal/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"strings"
"time"

"github.com/aws/smithy-go/ptr"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"golang.org/x/exp/constraints"
Expand Down Expand Up @@ -725,7 +724,7 @@ func PeerDBMaintenanceModeEnabled(ctx context.Context, env map[string]string) (b
}

func UpdatePeerDBMaintenanceModeEnabled(ctx context.Context, pool shared.CatalogPool, enabled bool) error {
return UpdateDynamicSetting(ctx, pool, "PEERDB_MAINTENANCE_MODE_ENABLED", ptr.String(strconv.FormatBool(enabled)))
return UpdateDynamicSetting(ctx, pool, "PEERDB_MAINTENANCE_MODE_ENABLED", new(strconv.FormatBool(enabled)))
}

func PeerDBPKMEmptyBatchThrottleThresholdSeconds(ctx context.Context, env map[string]string) (int64, error) {
Expand Down
6 changes: 3 additions & 3 deletions flow/model/qrecord_avro_size_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestMongoDBAvroSizeComputation(t *testing.T) {
"Estimated size should be within 5% of actual (upper bound)")

// for completion, print out the compression ratio
fileInfo, err := os.Stat(tmpfile.Name())
fileInfo, err := os.Stat(tmpfile.Name()) //nolint:gosec // G703: temp file path
require.NoError(t, err)
compressedSize := fileInfo.Size()
compressionRatio := float64(actualSize) / float64(compressedSize)
Expand Down Expand Up @@ -106,7 +106,7 @@ func writeAvroFileCompressed(
ConstructColumnNameAvroFieldMap(schema.Fields),
)
require.NoError(t, err)
file, err := os.Create(filePath)
file, err := os.Create(filePath) //nolint:gosec // G703: temp file path
require.NoError(t, err)
defer file.Close()

Expand Down Expand Up @@ -233,7 +233,7 @@ func (w *MeteredWriter) Write(p []byte) (int, error) {
func getActualUncompressedSize(t *testing.T, filePath string, avroSchema *QRecordAvroSchemaDefinition, numRecords int) int64 {
t.Helper()

file, err := os.Open(filePath)
file, err := os.Open(filePath) //nolint:gosec // G703: temp file path
require.NoError(t, err)
defer file.Close()
decoder, err := ocf.NewDecoder(file)
Expand Down
2 changes: 1 addition & 1 deletion flow/pkg/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/PeerDB-io/peerdb/flow/pkg

go 1.25.0
go 1.26.0

require (
github.com/ClickHouse/ch-go v0.68.0
Expand Down
3 changes: 2 additions & 1 deletion flow/pkg/mongo/client_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"testing"
"time"

"github.com/PeerDB-io/peerdb/flow/pkg/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/PeerDB-io/peerdb/flow/pkg/common"
)

func TestBuildClientOptions_TLS(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion flow/pkg/mongo/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
"slices"
"strings"

"github.com/PeerDB-io/peerdb/flow/pkg/common"
"go.mongodb.org/mongo-driver/v2/mongo"

"github.com/PeerDB-io/peerdb/flow/pkg/common"
)

const (
Expand Down
Loading
Loading