diff --git a/backend/internal/cmds/mgmt/serve/connect/cmd.go b/backend/internal/cmds/mgmt/serve/connect/cmd.go index f95a3e7cb1..20a10ee769 100644 --- a/backend/internal/cmds/mgmt/serve/connect/cmd.go +++ b/backend/internal/cmds/mgmt/serve/connect/cmd.go @@ -699,7 +699,7 @@ func serve(ctx context.Context) error { var presAnalyzeClient presidioapi.AnalyzeInterface var presAnonClient presidioapi.AnonymizeInterface var presEntityClient presidioapi.EntityInterface - if ncloudlicense.IsValid() { + if cascadelicense.IsValid() { analyzeClient, ok, err := getPresidioAnalyzeClient() if err != nil { return fmt.Errorf("unable to initialize presidio analyze client: %w", err) @@ -719,10 +719,11 @@ func serve(ctx context.Context) error { } } + isPresidioEnabled := cascadelicense.IsValid() && presAnalyzeClient != nil && presAnonClient != nil + transformerService := v1alpha1_transformerservice.New(&v1alpha1_transformerservice.Config{ - IsPresidioEnabled: ncloudlicense.IsValid(), - IsNeosyncCloud: ncloudlicense.IsValid(), - }, db, presEntityClient, userdataclient) + IsPresidioEnabled: isPresidioEnabled, + }, db, presEntityClient, userdataclient, cascadelicense) api.Handle( mgmtv1alpha1connect.NewTransformersServiceHandler( transformerService, @@ -734,11 +735,11 @@ func serve(ctx context.Context) error { ) anonymizationService := v1alpha1_anonymizationservice.New(&v1alpha1_anonymizationservice.Config{ - IsPresidioEnabled: ncloudlicense.IsValid(), + IsPresidioEnabled: isPresidioEnabled, PresidioDefaultLanguage: getPresidioDefaultLanguage(), IsAuthEnabled: isAuthEnabled, IsNeosyncCloud: ncloudlicense.IsValid(), - }, anonymizerMeter, userdataclient, useraccountService, transformerService, presAnalyzeClient, presAnonClient, db) + }, anonymizerMeter, userdataclient, useraccountService, transformerService, presAnalyzeClient, presAnonClient, db, cascadelicense) api.Handle( mgmtv1alpha1connect.NewAnonymizationServiceHandler( anonymizationService, diff --git a/backend/internal/dtomaps/jobs.go b/backend/internal/dtomaps/jobs.go index 92a4cb4446..d9ee9fe541 100644 --- a/backend/internal/dtomaps/jobs.go +++ b/backend/internal/dtomaps/jobs.go @@ -17,7 +17,11 @@ func ToJobDto( ) (*mgmtv1alpha1.Job, error) { mappings := []*mgmtv1alpha1.JobMapping{} for _, mapping := range inputJob.Mappings { - mappings = append(mappings, mapping.ToDto()) + dto, err := mapping.ToDto() + if err != nil { + return nil, fmt.Errorf("unable to convert job mapping to dto: %w", err) + } + mappings = append(mappings, dto) } virtualForeignKeys := []*mgmtv1alpha1.VirtualForeignConstraint{} @@ -50,6 +54,11 @@ func ToJobDto( } } + sourceOptions, err := inputJob.ConnectionOptions.ToDto() + if err != nil { + return nil, fmt.Errorf("unable to convert job source options to dto: %w", err) + } + return &mgmtv1alpha1.Job{ Id: neosyncdb.UUIDString(inputJob.ID), Name: inputJob.Name, @@ -61,7 +70,7 @@ func ToJobDto( Mappings: mappings, VirtualForeignKeys: virtualForeignKeys, Source: &mgmtv1alpha1.JobSource{ - Options: inputJob.ConnectionOptions.ToDto(), + Options: sourceOptions, }, Destinations: destinations, AccountId: neosyncdb.UUIDString(inputJob.AccountID), diff --git a/backend/internal/dtomaps/transformers.go b/backend/internal/dtomaps/transformers.go index 94ec2a4a8d..516c5c2d0c 100644 --- a/backend/internal/dtomaps/transformers.go +++ b/backend/internal/dtomaps/transformers.go @@ -24,6 +24,10 @@ func ToUserDefinedTransformerDto( input.Source, ) } + dto, err := input.TransformerConfig.ToTransformerConfigDto() + if err != nil { + return nil, err + } return &mgmtv1alpha1.UserDefinedTransformer{ Id: neosyncdb.UUIDString(input.ID), Name: input.Name, @@ -31,7 +35,7 @@ func ToUserDefinedTransformerDto( Source: source, DataType: transformer.DataType, //nolint:staticcheck DataTypes: transformer.DataTypes, - Config: input.TransformerConfig.ToTransformerConfigDto(), + Config: dto, CreatedAt: timestamppb.New(input.CreatedAt.Time), UpdatedAt: timestamppb.New(input.UpdatedAt.Time), AccountId: neosyncdb.UUIDString(input.AccountID), diff --git a/backend/pkg/integration-test/mux.go b/backend/pkg/integration-test/mux.go index d3e59e1042..8dce329b51 100644 --- a/backend/pkg/integration-test/mux.go +++ b/backend/pkg/integration-test/mux.go @@ -209,11 +209,11 @@ func (s *NeosyncApiTestClient) setupMux( transformerService := v1alpha1_transformersservice.New( &v1alpha1_transformersservice.Config{ IsPresidioEnabled: isPresidioEnabled, - IsNeosyncCloud: isNeosyncCloud, }, neosyncdb.New(pgcontainer.DB, db_queries.New()), s.Mocks.Presidio.Entities, userclient, + license, ) sqlmanagerclient := NewTestSqlManagerClient() @@ -289,6 +289,7 @@ func (s *NeosyncApiTestClient) setupMux( presAnalyzeClient, presAnonClient, neosyncDb, + license, ) connectionDataService := v1alpha1_connectiondataservice.New( diff --git a/backend/services/mgmt/v1alpha1/anonymization-service/anonymization.go b/backend/services/mgmt/v1alpha1/anonymization-service/anonymization.go index e6d295f03f..f64a6ed3d3 100644 --- a/backend/services/mgmt/v1alpha1/anonymization-service/anonymization.go +++ b/backend/services/mgmt/v1alpha1/anonymization-service/anonymization.go @@ -32,7 +32,7 @@ func (s *Service) AnonymizeMany( req *connect.Request[mgmtv1alpha1.AnonymizeManyRequest], ) (*connect.Response[mgmtv1alpha1.AnonymizeManyResponse], error) { logger := logger_interceptor.GetLoggerFromContextOrDefault(ctx) - if !s.cfg.IsNeosyncCloud { + if !s.license.IsValid() { return nil, nucleuserrors.NewNotImplemented( fmt.Sprintf( "%s is not implemented in the OSS version of Neosync.", @@ -190,11 +190,11 @@ func (s *Service) AnonymizeSingle( if err != nil { return nil, err } - if !s.cfg.IsNeosyncCloud || account.AccountType == int16(neosyncdb.AccountType_Personal) { + if !s.license.IsValid() || (s.cfg.IsNeosyncCloud && account.AccountType == int16(neosyncdb.AccountType_Personal)) { for _, mapping := range req.Msg.GetTransformerMappings() { if mapping.GetTransformer().GetTransformPiiTextConfig() != nil { return nil, nucleuserrors.NewForbidden( - "TransformPiiText is not available for use. Please contact us to upgrade your account.", + "TransformPiiText is not available for use. Please contact us about upgrading your account.", ) } } @@ -203,7 +203,7 @@ func (s *Service) AnonymizeSingle( defaultTransforms.GetN().GetTransformPiiTextConfig() != nil || defaultTransforms.GetS().GetTransformPiiTextConfig() != nil { return nil, nucleuserrors.NewForbidden( - "TransformPiiText is not available for use. Please contact us to upgrade your account.", + "TransformPiiText is not available for use. Please contact us about upgrading your account.", ) } } diff --git a/backend/services/mgmt/v1alpha1/anonymization-service/service.go b/backend/services/mgmt/v1alpha1/anonymization-service/service.go index 2f6bf63fa2..83d8f7c197 100644 --- a/backend/services/mgmt/v1alpha1/anonymization-service/service.go +++ b/backend/services/mgmt/v1alpha1/anonymization-service/service.go @@ -3,6 +3,7 @@ package v1alpha_anonymizationservice import ( "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1/mgmtv1alpha1connect" "github.com/nucleuscloud/neosync/backend/internal/userdata" + "github.com/nucleuscloud/neosync/internal/ee/license" presidioapi "github.com/nucleuscloud/neosync/internal/ee/presidio" "github.com/nucleuscloud/neosync/internal/neosyncdb" "go.opentelemetry.io/otel/metric" @@ -17,13 +18,14 @@ type Service struct { analyze presidioapi.AnalyzeInterface anonymize presidioapi.AnonymizeInterface db *neosyncdb.NeosyncDb + license license.EEInterface } type Config struct { IsAuthEnabled bool + IsNeosyncCloud bool IsPresidioEnabled bool PresidioDefaultLanguage *string - IsNeosyncCloud bool } func New( @@ -35,6 +37,7 @@ func New( analyzeclient presidioapi.AnalyzeInterface, anonymizeclient presidioapi.AnonymizeInterface, db *neosyncdb.NeosyncDb, + license license.EEInterface, ) *Service { return &Service{ cfg: cfg, @@ -45,5 +48,6 @@ func New( analyze: analyzeclient, anonymize: anonymizeclient, db: db, + license: license, } } diff --git a/backend/services/mgmt/v1alpha1/transformers-service/service.go b/backend/services/mgmt/v1alpha1/transformers-service/service.go index 9047c21927..6b3e3af1d5 100644 --- a/backend/services/mgmt/v1alpha1/transformers-service/service.go +++ b/backend/services/mgmt/v1alpha1/transformers-service/service.go @@ -2,6 +2,7 @@ package v1alpha1_transformersservice import ( "github.com/nucleuscloud/neosync/backend/internal/userdata" + "github.com/nucleuscloud/neosync/internal/ee/license" presidioapi "github.com/nucleuscloud/neosync/internal/ee/presidio" "github.com/nucleuscloud/neosync/internal/neosyncdb" ) @@ -11,11 +12,11 @@ type Service struct { db *neosyncdb.NeosyncDb entityclient presidioapi.EntityInterface userdataclient userdata.Interface + license license.EEInterface } type Config struct { IsPresidioEnabled bool - IsNeosyncCloud bool } func New( @@ -23,11 +24,13 @@ func New( db *neosyncdb.NeosyncDb, recognizerclient presidioapi.EntityInterface, userdataclient userdata.Interface, + license license.EEInterface, ) *Service { return &Service{ cfg: cfg, db: db, entityclient: recognizerclient, userdataclient: userdataclient, + license: license, } } diff --git a/backend/services/mgmt/v1alpha1/transformers-service/system_transformers.go b/backend/services/mgmt/v1alpha1/transformers-service/system_transformers.go index 65ef921eeb..401cc4f588 100644 --- a/backend/services/mgmt/v1alpha1/transformers-service/system_transformers.go +++ b/backend/services/mgmt/v1alpha1/transformers-service/system_transformers.go @@ -1006,14 +1006,14 @@ func (s *Service) GetSystemTransformerBySource( } func (s *Service) getSystemTransformerSourceMap() map[mgmtv1alpha1.TransformerSource]*mgmtv1alpha1.SystemTransformer { - if s.cfg.IsNeosyncCloud { + if s.license.IsValid() { return allSystemTransformersSourceMap } return baseSystemTransformerSourceMap } func (s *Service) getSystemTransformers() []*mgmtv1alpha1.SystemTransformer { - if s.cfg.IsNeosyncCloud { + if s.license.IsValid() { return allSystemTransformers } return baseSystemTransformers diff --git a/backend/sql/postgresql/models/models.go b/backend/sql/postgresql/models/models.go index 2297b47fb6..e551a8b3bc 100644 --- a/backend/sql/postgresql/models/models.go +++ b/backend/sql/postgresql/models/models.go @@ -857,13 +857,17 @@ type JobMapping struct { JobMappingTransformer *JobMappingTransformerModel `json:"jobMappingTransformerModel,omitempty"` } -func (jm *JobMapping) ToDto() *mgmtv1alpha1.JobMapping { +func (jm *JobMapping) ToDto() (*mgmtv1alpha1.JobMapping, error) { + transformer, err := jm.JobMappingTransformer.ToTransformerDto() + if err != nil { + return nil, err + } return &mgmtv1alpha1.JobMapping{ Schema: jm.Schema, Table: jm.Table, Column: jm.Column, - Transformer: jm.JobMappingTransformer.ToTransformerDto(), - } + Transformer: transformer, + }, nil } func (jm *JobMapping) FromDto(dto *mgmtv1alpha1.JobMapping) error { @@ -1097,13 +1101,29 @@ type DynamoDBSourceUnmappedTransformConfig struct { S *JobMappingTransformerModel `json:"s"` } -func (s *DynamoDBSourceUnmappedTransformConfig) ToDto() *mgmtv1alpha1.DynamoDBSourceUnmappedTransformConfig { - return &mgmtv1alpha1.DynamoDBSourceUnmappedTransformConfig{ - B: s.B.ToTransformerDto(), - Boolean: s.Boolean.ToTransformerDto(), - N: s.N.ToTransformerDto(), - S: s.S.ToTransformerDto(), +func (s *DynamoDBSourceUnmappedTransformConfig) ToDto() (*mgmtv1alpha1.DynamoDBSourceUnmappedTransformConfig, error) { + b, err := s.B.ToTransformerDto() + if err != nil { + return nil, err + } + boolean, err := s.Boolean.ToTransformerDto() + if err != nil { + return nil, err + } + n, err := s.N.ToTransformerDto() + if err != nil { + return nil, err + } + str, err := s.S.ToTransformerDto() + if err != nil { + return nil, err } + return &mgmtv1alpha1.DynamoDBSourceUnmappedTransformConfig{ + B: b, + Boolean: boolean, + N: n, + S: str, + }, nil } func (s *DynamoDBSourceUnmappedTransformConfig) FromDto( @@ -1156,7 +1176,7 @@ func (s *DynamoDBSourceTableOption) FromDto(dto *mgmtv1alpha1.DynamoDBSourceTabl s.WhereClause = dto.WhereClause } -func (s *DynamoDBSourceOptions) ToDto() *mgmtv1alpha1.DynamoDBSourceConnectionOptions { +func (s *DynamoDBSourceOptions) ToDto() (*mgmtv1alpha1.DynamoDBSourceConnectionOptions, error) { tables := make([]*mgmtv1alpha1.DynamoDBSourceTableOption, len(s.Tables)) for i, t := range s.Tables { tables[i] = t.ToDto() @@ -1185,12 +1205,16 @@ func (s *DynamoDBSourceOptions) ToDto() *mgmtv1alpha1.DynamoDBSourceConnectionOp }, } } + unmappedTransforms, err := s.UnmappedTransforms.ToDto() + if err != nil { + return nil, err + } return &mgmtv1alpha1.DynamoDBSourceConnectionOptions{ ConnectionId: s.ConnectionId, Tables: tables, - UnmappedTransforms: s.UnmappedTransforms.ToDto(), + UnmappedTransforms: unmappedTransforms, EnableConsistentRead: s.EnableConsistentRead, - } + }, nil } func (s *DynamoDBSourceOptions) FromDto(dto *mgmtv1alpha1.DynamoDBSourceConnectionOptions) error { @@ -1731,57 +1755,61 @@ type MysqlSourceTableOption struct { WhereClause *string `json:"whereClause,omitempty"` } -func (j *JobSourceOptions) ToDto() *mgmtv1alpha1.JobSourceOptions { +func (j *JobSourceOptions) ToDto() (*mgmtv1alpha1.JobSourceOptions, error) { if j.PostgresOptions != nil { return &mgmtv1alpha1.JobSourceOptions{ Config: &mgmtv1alpha1.JobSourceOptions_Postgres{ Postgres: j.PostgresOptions.ToDto(), }, - } + }, nil } if j.MysqlOptions != nil { return &mgmtv1alpha1.JobSourceOptions{ Config: &mgmtv1alpha1.JobSourceOptions_Mysql{ Mysql: j.MysqlOptions.ToDto(), }, - } + }, nil } if j.GenerateOptions != nil { return &mgmtv1alpha1.JobSourceOptions{ Config: &mgmtv1alpha1.JobSourceOptions_Generate{ Generate: j.GenerateOptions.ToDto(), }, - } + }, nil } if j.AiGenerateOptions != nil { return &mgmtv1alpha1.JobSourceOptions{ Config: &mgmtv1alpha1.JobSourceOptions_AiGenerate{ AiGenerate: j.AiGenerateOptions.ToDto(), }, - } + }, nil } if j.MongoDbOptions != nil { return &mgmtv1alpha1.JobSourceOptions{ Config: &mgmtv1alpha1.JobSourceOptions_Mongodb{ Mongodb: j.MongoDbOptions.ToDto(), }, - } + }, nil } if j.DynamoDBOptions != nil { + dto, err := j.DynamoDBOptions.ToDto() + if err != nil { + return nil, err + } return &mgmtv1alpha1.JobSourceOptions{ Config: &mgmtv1alpha1.JobSourceOptions_Dynamodb{ - Dynamodb: j.DynamoDBOptions.ToDto(), + Dynamodb: dto, }, - } + }, nil } if j.MssqlOptions != nil { return &mgmtv1alpha1.JobSourceOptions{ Config: &mgmtv1alpha1.JobSourceOptions_Mssql{ Mssql: j.MssqlOptions.ToDto(), }, - } + }, nil } - return nil + return nil, fmt.Errorf("invalid job source options config, received type: %T", j) } func (j *JobSourceOptions) FromDto(dto *mgmtv1alpha1.JobSourceOptions) error { diff --git a/backend/sql/postgresql/models/transformers.go b/backend/sql/postgresql/models/transformers.go index f88c010f6d..303521b3fb 100644 --- a/backend/sql/postgresql/models/transformers.go +++ b/backend/sql/postgresql/models/transformers.go @@ -1,6 +1,9 @@ package pg_models import ( + "encoding/json" + "fmt" + mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" ) @@ -56,6 +59,7 @@ type TransformerConfig struct { GenerateIpAddress *GenerateIpAddressConfig `json:"generateIpAddressConfig,omitempty"` TransformUuid *TransformUuidConfig `json:"transformUuid,omitempty"` TransformScrambleIdentity *TransformScrambleIdentityConfig `json:"transformScrambleIdentity,omitempty"` + TransformPiiText string `json:"transformPiiText,omitempty"` } type TransformScrambleIdentityConfig struct{} @@ -400,6 +404,12 @@ func (t *TransformerConfig) FromTransformerConfigDto(tr *mgmtv1alpha1.Transforme t.TransformUuid = &TransformUuidConfig{} case *mgmtv1alpha1.TransformerConfig_TransformScrambleIdentityConfig: t.TransformScrambleIdentity = &TransformScrambleIdentityConfig{} + case *mgmtv1alpha1.TransformerConfig_TransformPiiTextConfig: + bits, err := json.Marshal(tr.GetTransformPiiTextConfig()) + if err != nil { + return fmt.Errorf("unable to marshal transform pii text config: %w", err) + } + t.TransformPiiText = string(bits) default: t = &TransformerConfig{} } @@ -407,16 +417,20 @@ func (t *TransformerConfig) FromTransformerConfigDto(tr *mgmtv1alpha1.Transforme return nil } -func (t *JobMappingTransformerModel) ToTransformerDto() *mgmtv1alpha1.JobMappingTransformer { +func (t *JobMappingTransformerModel) ToTransformerDto() (*mgmtv1alpha1.JobMappingTransformer, error) { if t.Config == nil { t.Config = &TransformerConfig{} } - return &mgmtv1alpha1.JobMappingTransformer{ - Config: t.Config.ToTransformerConfigDto(), + cfg, err := t.Config.ToTransformerConfigDto() + if err != nil { + return nil, fmt.Errorf("unable to convert transformer config to dto: %w", err) } + return &mgmtv1alpha1.JobMappingTransformer{ + Config: cfg, + }, nil } -func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerConfig { +func (t *TransformerConfig) ToTransformerConfigDto() (*mgmtv1alpha1.TransformerConfig, error) { switch { case t.GenerateEmail != nil: return &mgmtv1alpha1.TransformerConfig{ @@ -425,7 +439,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo EmailType: (*mgmtv1alpha1.GenerateEmailType)(t.GenerateEmail.EmailType), }, }, - } + }, nil case t.TransformEmail != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformEmailConfig{ @@ -441,13 +455,13 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo ), }, }, - } + }, nil case t.GenerateBool != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateBoolConfig{ GenerateBoolConfig: &mgmtv1alpha1.GenerateBool{}, }, - } + }, nil case t.GenerateCardNumber != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateCardNumberConfig{ @@ -455,17 +469,17 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo ValidLuhn: t.GenerateCardNumber.ValidLuhn, }, }, - } + }, nil case t.GenerateCity != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateCityConfig{ GenerateCityConfig: &mgmtv1alpha1.GenerateCity{}, }, - } + }, nil case t.GenerateDefault != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateDefaultConfig{}, - } + }, nil case t.GenerateE164PhoneNumber != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateE164PhoneNumberConfig{ @@ -474,13 +488,13 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo Max: t.GenerateE164PhoneNumber.Max, }, }, - } + }, nil case t.GenerateFirstName != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateFirstNameConfig{ GenerateFirstNameConfig: &mgmtv1alpha1.GenerateFirstName{}, }, - } + }, nil case t.GenerateFloat64 != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateFloat64Config{ @@ -491,19 +505,19 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo Precision: t.GenerateFloat64.Precision, }, }, - } + }, nil case t.GenerateFullAddress != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateFullAddressConfig{ GenerateFullAddressConfig: &mgmtv1alpha1.GenerateFullAddress{}, }, - } + }, nil case t.GenerateFullName != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateFullNameConfig{ GenerateFullNameConfig: &mgmtv1alpha1.GenerateFullName{}, }, - } + }, nil case t.GenerateGender != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateGenderConfig{ @@ -511,13 +525,13 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo Abbreviate: t.GenerateGender.Abbreviate, }, }, - } + }, nil case t.GenerateInt64PhoneNumber != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateInt64PhoneNumberConfig{ GenerateInt64PhoneNumberConfig: &mgmtv1alpha1.GenerateInt64PhoneNumber{}, }, - } + }, nil case t.GenerateInt64 != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateInt64Config{ @@ -527,25 +541,25 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo Max: t.GenerateInt64.Max, }, }, - } + }, nil case t.GenerateLastName != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateLastNameConfig{ GenerateLastNameConfig: &mgmtv1alpha1.GenerateLastName{}, }, - } + }, nil case t.GenerateSha256Hash != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateSha256HashConfig{ GenerateSha256HashConfig: &mgmtv1alpha1.GenerateSha256Hash{}, }, - } + }, nil case t.GenerateSsn != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateSsnConfig{ GenerateSsnConfig: &mgmtv1alpha1.GenerateSSN{}, }, - } + }, nil case t.GenerateState != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateStateConfig{ @@ -553,13 +567,13 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo GenerateFullName: t.GenerateState.GenerateFullName, }, }, - } + }, nil case t.GenerateStreetAddress != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateStreetAddressConfig{ GenerateStreetAddressConfig: &mgmtv1alpha1.GenerateStreetAddress{}, }, - } + }, nil case t.GenerateStringPhoneNumber != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateStringPhoneNumberConfig{ @@ -568,7 +582,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo Max: t.GenerateStringPhoneNumber.Max, }, }, - } + }, nil case t.GenerateString != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateStringConfig{ @@ -577,25 +591,25 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo Max: t.GenerateString.Max, }, }, - } + }, nil case t.GenerateUnixTimestamp != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateUnixtimestampConfig{ GenerateUnixtimestampConfig: &mgmtv1alpha1.GenerateUnixTimestamp{}, }, - } + }, nil case t.GenerateUsername != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateUsernameConfig{ GenerateUsernameConfig: &mgmtv1alpha1.GenerateUsername{}, }, - } + }, nil case t.GenerateUtcTimestamp != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateUtctimestampConfig{ GenerateUtctimestampConfig: &mgmtv1alpha1.GenerateUtcTimestamp{}, }, - } + }, nil case t.GenerateUuid != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateUuidConfig{ @@ -603,13 +617,13 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo IncludeHyphens: t.GenerateUuid.IncludeHyphens, }, }, - } + }, nil case t.GenerateZipcode != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateZipcodeConfig{ GenerateZipcodeConfig: &mgmtv1alpha1.GenerateZipcode{}, }, - } + }, nil case t.TransformE164PhoneNumber != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformE164PhoneNumberConfig{ @@ -617,7 +631,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo PreserveLength: t.TransformE164PhoneNumber.PreserveLength, }, }, - } + }, nil case t.TransformFirstname != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformFirstNameConfig{ @@ -625,7 +639,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo PreserveLength: t.TransformFirstname.PreserveLength, }, }, - } + }, nil case t.TransformFloat64 != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformFloat64Config{ @@ -634,7 +648,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo RandomizationRangeMax: t.TransformFloat64.RandomizationRangeMin, }, }, - } + }, nil case t.TransformFullName != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformFullNameConfig{ @@ -642,7 +656,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo PreserveLength: t.TransformFullName.PreserveLength, }, }, - } + }, nil case t.TransformInt64PhoneNumber != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformInt64PhoneNumberConfig{ @@ -650,7 +664,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo PreserveLength: t.TransformInt64PhoneNumber.PreserveLength, }, }, - } + }, nil case t.TransformInt64 != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformInt64Config{ @@ -659,7 +673,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo RandomizationRangeMax: t.TransformInt64.RandomizationRangeMax, }, }, - } + }, nil case t.TransformLastName != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformLastNameConfig{ @@ -667,7 +681,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo PreserveLength: t.TransformLastName.PreserveLength, }, }, - } + }, nil case t.TransformPhoneNumber != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformPhoneNumberConfig{ @@ -675,7 +689,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo PreserveLength: t.TransformPhoneNumber.PreserveLength, }, }, - } + }, nil case t.TransformString != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformStringConfig{ @@ -683,19 +697,19 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo PreserveLength: t.TransformString.PreserveLength, }, }, - } + }, nil case t.Passthrough != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{ PassthroughConfig: &mgmtv1alpha1.Passthrough{}, }, - } + }, nil case t.Null != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_Nullconfig{ Nullconfig: &mgmtv1alpha1.Null{}, }, - } + }, nil case t.UserDefinedTransformer != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_UserDefinedTransformerConfig{ @@ -703,7 +717,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo Id: t.UserDefinedTransformer.Id, }, }, - } + }, nil case t.TransformJavascript != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformJavascriptConfig{ @@ -711,7 +725,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo Code: t.TransformJavascript.Code, }, }, - } + }, nil case t.GenerateCategorical != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateCategoricalConfig{ @@ -719,7 +733,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo Categories: t.GenerateCategorical.Categories, }, }, - } + }, nil case t.TransformCharacterScramble != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformCharacterScrambleConfig{ @@ -727,7 +741,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo UserProvidedRegex: t.TransformCharacterScramble.UserProvidedRegex, }, }, - } + }, nil case t.GenerateJavascript != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateJavascriptConfig{ @@ -735,7 +749,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo Code: t.GenerateJavascript.Code, }, }, - } + }, nil case t.GenerateCountry != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateCountryConfig{ @@ -743,13 +757,13 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo GenerateFullName: t.GenerateCountry.GenerateFullName, }, }, - } + }, nil case t.GenerateBusinessName != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateBusinessNameConfig{ GenerateBusinessNameConfig: &mgmtv1alpha1.GenerateBusinessName{}, }, - } + }, nil case t.GenerateIpAddress != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateIpAddressConfig{ @@ -757,20 +771,31 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo IpType: (*mgmtv1alpha1.GenerateIpAddressType)(t.GenerateIpAddress.IpType), }, }, - } + }, nil case t.TransformUuid != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformUuidConfig{ TransformUuidConfig: &mgmtv1alpha1.TransformUuid{}, }, - } + }, nil case t.TransformScrambleIdentity != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformScrambleIdentityConfig{ TransformScrambleIdentityConfig: &mgmtv1alpha1.TransformScrambleIdentity{}, }, + }, nil + case t.TransformPiiText != "": + var v *mgmtv1alpha1.TransformPiiText + err := json.Unmarshal([]byte(t.TransformPiiText), &v) + if err != nil { + return nil, fmt.Errorf("unable to unmarshal transform pii text config: %w", err) } + return &mgmtv1alpha1.TransformerConfig{ + Config: &mgmtv1alpha1.TransformerConfig_TransformPiiTextConfig{ + TransformPiiTextConfig: v, + }, + }, nil default: - return &mgmtv1alpha1.TransformerConfig{} + return &mgmtv1alpha1.TransformerConfig{}, nil } } diff --git a/frontend/apps/web/libs/hooks/useGetTransformersHandler.ts b/frontend/apps/web/libs/hooks/useGetTransformersHandler.ts index db0cacefa5..47b7e00acd 100644 --- a/frontend/apps/web/libs/hooks/useGetTransformersHandler.ts +++ b/frontend/apps/web/libs/hooks/useGetTransformersHandler.ts @@ -1,6 +1,6 @@ import { TransformerHandler } from '@/components/jobs/SchemaTable/transformer-handler'; import { useQuery } from '@connectrpc/connect-query'; -import { TransformerSource, TransformersService } from '@neosync/sdk'; +import { TransformersService } from '@neosync/sdk'; import { useMemo } from 'react'; export function useGetTransformersHandler(accountId: string): { @@ -31,12 +31,7 @@ export function useGetTransformersHandler(accountId: string): { const handler = useMemo( (): TransformerHandler => - new TransformerHandler( - systemTransformers.filter( - (st) => st.source !== TransformerSource.TRANSFORM_PII_TEXT - ), - userDefinedTransformers - ), + new TransformerHandler(systemTransformers, userDefinedTransformers), [isValidating] ); diff --git a/internal/benthos/benthos-builder/builders/benthos-builder_test.go b/internal/benthos/benthos-builder/builders/benthos-builder_test.go index 3d1b032540..548d6e4f04 100644 --- a/internal/benthos/benthos-builder/builders/benthos-builder_test.go +++ b/internal/benthos/benthos-builder/builders/benthos-builder_test.go @@ -948,6 +948,16 @@ func Test_computeMutationFunction_Validate_Bloblang_Output(t *testing.T) { }, }, }, + { + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_PII_TEXT, + Config: &mgmtv1alpha1.TransformerConfig{ + Config: &mgmtv1alpha1.TransformerConfig_TransformPiiTextConfig{ + TransformPiiTextConfig: &mgmtv1alpha1.TransformPiiText{ + Language: gotypeutil.ToPtr("en"), + }, + }, + }, + }, } emailColInfo := &sqlmanager_shared.DatabaseSchemaRow{ @@ -962,6 +972,7 @@ func Test_computeMutationFunction_Validate_Bloblang_Output(t *testing.T) { blobenv := bloblang.NewEnvironment() neosync_benthos_transformers.RegisterTransformIdentityScramble(blobenv, nil) + neosync_benthos_transformers.RegisterTransformPiiText(blobenv, nil) for _, transformer := range transformers { t.Run(fmt.Sprintf("%s_%T_lint", t.Name(), transformer.Config.Config), func(t *testing.T) { @@ -1103,6 +1114,9 @@ func Test_computeMutationFunction_Validate_Bloblang_Output_EmptyConfigs(t *testi { Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_TransformScrambleIdentityConfig{}}, }, + { + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_TransformPiiTextConfig{}}, + }, } emailColInfo := &sqlmanager_shared.DatabaseSchemaRow{ @@ -1117,6 +1131,7 @@ func Test_computeMutationFunction_Validate_Bloblang_Output_EmptyConfigs(t *testi blobenv := bloblang.NewEnvironment() neosync_benthos_transformers.RegisterTransformIdentityScramble(blobenv, nil) + neosync_benthos_transformers.RegisterTransformPiiText(blobenv, nil) for _, transformer := range transformers { t.Run(fmt.Sprintf("%s_%T_lint", t.Name(), transformer.Config.Config), func(t *testing.T) { diff --git a/internal/benthos/benthos-builder/builders/processors.go b/internal/benthos/benthos-builder/builders/processors.go index 6125d33c05..db720e0814 100644 --- a/internal/benthos/benthos-builder/builders/processors.go +++ b/internal/benthos/benthos-builder/builders/processors.go @@ -856,6 +856,16 @@ func computeMutationFunction( return "", err } return opts.BuildBloblangString(formattedColPath), nil + case *mgmtv1alpha1.TransformerConfig_TransformPiiTextConfig: + opts, err := transformers.NewTransformPiiTextOptsFromConfig(cfg.TransformPiiTextConfig) + if err != nil { + return "", err + } + bloblangString, err := opts.BuildBloblangString(formattedColPath) + if err != nil { + return "", fmt.Errorf("unable to build bloblang string for TransformPiiText: %w", err) + } + return bloblangString, nil default: return "", fmt.Errorf("unsupported transformer: %T", cfg) } diff --git a/internal/integration-tests/api/anonymization-service_integration_test.go b/internal/integration-tests/api/anonymization-service_integration_test.go index a599a46bc0..97d0a2cefb 100644 --- a/internal/integration-tests/api/anonymization-service_integration_test.go +++ b/internal/integration-tests/api/anonymization-service_integration_test.go @@ -18,10 +18,10 @@ func (s *IntegrationTestSuite) Test_AnonymizeService_AnonymizeMany() { t := s.T() t.Run("OSS-fail", func(t *testing.T) { - userclient := s.OSSUnauthenticatedLicensedClients.Users() + userclient := s.OSSUnauthenticatedUnlicensedClients.Users() s.setUser(s.ctx, userclient) accountId := s.createPersonalAccount(s.ctx, userclient) - resp, err := s.OSSUnauthenticatedLicensedClients.Anonymize().AnonymizeMany( + resp, err := s.OSSUnauthenticatedUnlicensedClients.Anonymize().AnonymizeMany( s.ctx, connect.NewRequest(&mgmtv1alpha1.AnonymizeManyRequest{ AccountId: accountId, @@ -433,7 +433,7 @@ func (s *IntegrationTestSuite) Test_AnonymizeService_AnonymizeSingle_ForbiddenTr t := s.T() t.Run("OSS", func(t *testing.T) { - accountId := s.createPersonalAccount(s.ctx, s.OSSUnauthenticatedLicensedClients.Users()) + accountId := s.createPersonalAccount(s.ctx, s.OSSUnauthenticatedUnlicensedClients.Users()) t.Run("transformpiitext", func(t *testing.T) { t.Run("mappings", func(t *testing.T) { diff --git a/internal/integration-tests/worker/workflow/datasync-workflow.go b/internal/integration-tests/worker/workflow/datasync-workflow.go index 6952758afc..030d4d3846 100644 --- a/internal/integration-tests/worker/workflow/datasync-workflow.go +++ b/internal/integration-tests/worker/workflow/datasync-workflow.go @@ -116,6 +116,7 @@ func NewTestDataSyncWorkflowEnv( transformerclient := neosyncApi.OSSUnauthenticatedLicensedClients.Transformers() userclient := neosyncApi.OSSUnauthenticatedLicensedClients.Users() accounthookclient := neosyncApi.OSSUnauthenticatedLicensedClients.AccountHooks() + anonymizationclient := neosyncApi.OSSUnauthenticatedLicensedClients.Anonymize() testSuite := &testsuite.WorkflowTestSuite{} testSuite.SetLogger(log.NewStructuredLogger(testutil.GetConcurrentTestLogger(t))) env := testSuite.NewTestWorkflowEnvironment() @@ -155,6 +156,7 @@ func NewTestDataSyncWorkflowEnv( benthosstream.NewBenthosStreamManager(), neosyncApi.Mocks.TemporalClient, workflowEnv.maxIterations, + anonymizationclient, ) if workflowEnv.fakeEELicense.IsValid() { diff --git a/internal/javascript/functions/neosync/functions.go b/internal/javascript/functions/neosync/functions.go index 99863cde90..8ae2f8ee89 100644 --- a/internal/javascript/functions/neosync/functions.go +++ b/internal/javascript/functions/neosync/functions.go @@ -16,12 +16,14 @@ const ( namespace = "neosync" ) -func Get() ([]*javascript_functions.FunctionDefinition, error) { +func Get( + transformPiiTextApi transformers.TransformPiiTextApi, +) ([]*javascript_functions.FunctionDefinition, error) { generatorFns, err := getNeosyncGenerators() if err != nil { return nil, err } - transformerFns, err := getNeosyncTransformers() + transformerFns, err := getNeosyncTransformers(transformPiiTextApi) if err != nil { return nil, err } @@ -151,8 +153,13 @@ func getNeosyncGenerators() ([]*javascript_functions.FunctionDefinition, error) return fns, nil } -func getNeosyncTransformers() ([]*javascript_functions.FunctionDefinition, error) { +func getNeosyncTransformers( + transformPiiTextApi transformers.TransformPiiTextApi, +) ([]*javascript_functions.FunctionDefinition, error) { neosyncTransformers := transformers.GetNeosyncTransformers() + if transformPiiTextApi != nil { + neosyncTransformers = append(neosyncTransformers, transformers.NewTransformPiiText(transformPiiTextApi)) + } fns := make([]*javascript_functions.FunctionDefinition, 0, len(neosyncTransformers)) for _, f := range neosyncTransformers { templateData, err := f.GetJsTemplateData() diff --git a/internal/javascript/functions/neosync/functions_test.go b/internal/javascript/functions/neosync/functions_test.go index 44390cd17f..e1f629dc71 100644 --- a/internal/javascript/functions/neosync/functions_test.go +++ b/internal/javascript/functions/neosync/functions_test.go @@ -7,7 +7,7 @@ import ( ) func TestGet(t *testing.T) { - functions, err := Get() + functions, err := Get(nil) require.NoError(t, err) require.NotEmpty(t, functions) } diff --git a/internal/javascript/javascript.go b/internal/javascript/javascript.go index 7caf6d93b8..36192a7b95 100644 --- a/internal/javascript/javascript.go +++ b/internal/javascript/javascript.go @@ -8,14 +8,16 @@ import ( benthos_functions "github.com/nucleuscloud/neosync/internal/javascript/functions/benthos" neosync_functions "github.com/nucleuscloud/neosync/internal/javascript/functions/neosync" javascript_vm "github.com/nucleuscloud/neosync/internal/javascript/vm" + "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers" ) // Comes full featured, but expects a value api that the benthos/neosync functions can manipulate func NewDefaultValueRunner( valueApi javascript_functions.ValueApi, + transformPiiTextApi transformers.TransformPiiTextApi, logger *slog.Logger, ) (*javascript_vm.Runner, error) { - functions, err := getDefaultFunctions() + functions, err := getDefaultFunctions(transformPiiTextApi) if err != nil { return nil, err } @@ -39,9 +41,11 @@ func NewDefaultRunner( ) } -func getDefaultFunctions() ([]*javascript_functions.FunctionDefinition, error) { +func getDefaultFunctions( + transformPiiTextApi transformers.TransformPiiTextApi, +) ([]*javascript_functions.FunctionDefinition, error) { benthosFns := benthos_functions.Get() - neosyncFns, err := neosync_functions.Get() + neosyncFns, err := neosync_functions.Get(transformPiiTextApi) if err != nil { return nil, err } diff --git a/tools/go.mod b/tools/go.mod index 8f6c37ad85..cb4edab881 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -13,6 +13,7 @@ require ( require ( buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.5-20250219170025-d39267d9df8f.1 // indirect + connectrpc.com/connect v1.18.1 // indirect github.com/Jeffail/gabs/v2 v2.7.0 // indirect github.com/OneOfOne/xxhash v1.2.8 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect diff --git a/worker/internal/cmds/worker/serve/serve.go b/worker/internal/cmds/worker/serve/serve.go index ec7c325df9..cc3fb931c9 100644 --- a/worker/internal/cmds/worker/serve/serve.go +++ b/worker/internal/cmds/worker/serve/serve.go @@ -358,6 +358,11 @@ func serve(ctx context.Context) error { neosyncurl, connectInterceptorOption, ) + anonymizationclient := mgmtv1alpha1connect.NewAnonymizationServiceClient( + httpclient, + neosyncurl, + connectInterceptorOption, + ) sqlConnector := &sqlconnect.SqlOpenConnector{} sqlconnmanager := connectionmanager.NewConnectionManager(sqlprovider.NewProvider(sqlConnector)) @@ -389,6 +394,7 @@ func serve(ctx context.Context) error { streamManager, temporalClient, maxIterations, + anonymizationclient, ) schemainit_workflow_register.Register( diff --git a/worker/pkg/benthos/environment/environment.go b/worker/pkg/benthos/environment/environment.go index d0c3cd2e05..6f16adebb7 100644 --- a/worker/pkg/benthos/environment/environment.go +++ b/worker/pkg/benthos/environment/environment.go @@ -17,6 +17,7 @@ import ( neosync_benthos_connectiondata "github.com/nucleuscloud/neosync/worker/pkg/benthos/neosync_connection_data" openaigenerate "github.com/nucleuscloud/neosync/worker/pkg/benthos/openai_generate" neosync_benthos_sql "github.com/nucleuscloud/neosync/worker/pkg/benthos/sql" + "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers" "github.com/redpanda-data/benthos/v4/public/bloblang" "github.com/redpanda-data/benthos/v4/public/service" "go.opentelemetry.io/otel/metric" @@ -34,6 +35,8 @@ type RegisterConfig struct { stopChannel chan<- error blobEnv *bloblang.Environment + + transformPiiTextApi transformers.TransformPiiTextApi } type Option func(cfg *RegisterConfig) @@ -69,6 +72,11 @@ func WithBlobEnv(b *bloblang.Environment) Option { cfg.blobEnv = b } } +func WithTransformPiiTextApi(transformPiiTextApi transformers.TransformPiiTextApi) Option { + return func(cfg *RegisterConfig) { + cfg.transformPiiTextApi = transformPiiTextApi + } +} type SqlConfig struct { Provider neosync_benthos_sql.ConnectionProvider @@ -248,7 +256,7 @@ func NewWithEnvironment( ) } - err = javascript_processor.RegisterNeosyncJavascriptProcessor(env) + err = javascript_processor.RegisterNeosyncJavascriptProcessor(env, config.transformPiiTextApi) if err != nil { return nil, fmt.Errorf( "unable to register javascript processor to benthos instance: %w", diff --git a/worker/pkg/benthos/javascript/processor.go b/worker/pkg/benthos/javascript/processor.go index 87364f4cdc..736e204a64 100644 --- a/worker/pkg/benthos/javascript/processor.go +++ b/worker/pkg/benthos/javascript/processor.go @@ -10,6 +10,7 @@ import ( "github.com/nucleuscloud/neosync/internal/benthos_slogger" "github.com/nucleuscloud/neosync/internal/javascript" javascript_vm "github.com/nucleuscloud/neosync/internal/javascript/vm" + "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers" "github.com/redpanda-data/benthos/v4/public/service" ) @@ -23,11 +24,14 @@ func javascriptProcessorConfig() *service.ConfigSpec { Field(service.NewInterpolatedStringField(codeField)) } -func RegisterNeosyncJavascriptProcessor(env *service.Environment) error { +func RegisterNeosyncJavascriptProcessor( + env *service.Environment, + transformPiiTextApi transformers.TransformPiiTextApi, +) error { return env.RegisterBatchProcessor( "neosync_javascript", javascriptProcessorConfig(), func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchProcessor, error) { - return newJavascriptProcessorFromConfig(conf, mgr) + return newJavascriptProcessorFromConfig(conf, mgr, transformPiiTextApi) }) } @@ -40,6 +44,7 @@ type javascriptProcessor struct { func newJavascriptProcessorFromConfig( conf *service.ParsedConfig, mgr *service.Resources, + transformPiiTextApi transformers.TransformPiiTextApi, ) (*javascriptProcessor, error) { code, err := conf.FieldString(codeField) if err != nil { @@ -60,7 +65,7 @@ func newJavascriptProcessorFromConfig( slogger: slogger, vmPool: sync.Pool{ New: func() any { - val, err := newPoolItem(slogger) + val, err := newPoolItem(slogger, transformPiiTextApi) if err != nil { return err } @@ -133,9 +138,12 @@ func (j *javascriptProcessor) Close(ctx context.Context) error { return nil } -func newPoolItem(logger *slog.Logger) (*vmPoolItem, error) { +func newPoolItem( + logger *slog.Logger, + transformPiiTextApi transformers.TransformPiiTextApi, +) (*vmPoolItem, error) { valueApi := newBatchBenthosValueApi() - runner, err := javascript.NewDefaultValueRunner(valueApi, logger) + runner, err := javascript.NewDefaultValueRunner(valueApi, transformPiiTextApi, logger) if err != nil { return nil, err } diff --git a/worker/pkg/benthos/javascript/processor_test.go b/worker/pkg/benthos/javascript/processor_test.go index 92d90f85b5..08275767ea 100644 --- a/worker/pkg/benthos/javascript/processor_test.go +++ b/worker/pkg/benthos/javascript/processor_test.go @@ -26,7 +26,7 @@ code: | `, nil) require.NoError(t, err) - proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources()) + proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources(), nil) require.NoError(t, err) bCtx, done := context.WithTimeout(context.Background(), time.Second*30) @@ -57,7 +57,7 @@ code: 'benthos.v0_msg_set_string(benthos.v0_msg_as_string() + "hello world");' `, nil) require.NoError(t, err) - proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources()) + proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources(), nil) require.NoError(t, err) bCtx, done := context.WithTimeout(context.Background(), time.Second*30) @@ -94,7 +94,7 @@ code: | `, nil) require.NoError(t, err) - proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources()) + proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources(), nil) require.NoError(t, err) bCtx, done := context.WithTimeout(context.Background(), time.Second*30) @@ -144,7 +144,7 @@ code: | `, nil) require.NoError(t, err) - proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources()) + proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources(), nil) require.NoError(t, err) bCtx, done := context.WithTimeout(context.Background(), time.Second*30) @@ -178,7 +178,7 @@ code: | `, nil) require.NoError(t, err) - proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources()) + proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources(), nil) require.NoError(t, err) bCtx, done := context.WithTimeout(context.Background(), time.Second*30) @@ -221,7 +221,7 @@ code: | `, nil) require.NoError(t, err) - proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources()) + proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources(), nil) require.NoError(t, err) bCtx, done := context.WithTimeout(context.Background(), time.Second*30) @@ -272,7 +272,7 @@ code: | `, testServer.URL), nil) require.NoError(t, err) - proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources()) + proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources(), nil) require.NoError(t, err) bCtx, done := context.WithTimeout(context.Background(), time.Second*30) @@ -307,7 +307,7 @@ code: | `), nil) require.NoError(t, err) - proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources()) + proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources(), nil) require.NoError(t, err) bCtx, done := context.WithTimeout(context.Background(), time.Second*30) diff --git a/worker/pkg/benthos/transformer_executor/executor.go b/worker/pkg/benthos/transformer_executor/executor.go index 1203ca6f2d..45bbb0f887 100644 --- a/worker/pkg/benthos/transformer_executor/executor.go +++ b/worker/pkg/benthos/transformer_executor/executor.go @@ -112,7 +112,17 @@ func InitializeTransformerByConfigType( } valueApi := newAnonValueApi() - runner, err := javascript.NewDefaultValueRunner(valueApi, execCfg.logger) + var transformPiiTextApi transformers.TransformPiiTextApi + if execCfg.transformPiiText != nil { + execCfg.logger.Debug("configuring using transform pii text api in generate javascript") + transformPiiTextApi = newFromExecConfig( + execCfg.transformPiiText, + execCfg.transformPiiText.neosyncOperatorApi, + execCfg.logger, + ) + } + + runner, err := javascript.NewDefaultValueRunner(valueApi, transformPiiTextApi, execCfg.logger) if err != nil { return nil, err } @@ -148,7 +158,16 @@ func InitializeTransformerByConfigType( } valueApi := newAnonValueApi() - runner, err := javascript.NewDefaultValueRunner(valueApi, execCfg.logger) + var transformPiiTextApi transformers.TransformPiiTextApi + if execCfg.transformPiiText != nil { + execCfg.logger.Debug("configuring using transform pii text api in transform javascript") + transformPiiTextApi = newFromExecConfig( + execCfg.transformPiiText, + execCfg.transformPiiText.neosyncOperatorApi, + execCfg.logger, + ) + } + runner, err := javascript.NewDefaultValueRunner(valueApi, transformPiiTextApi, execCfg.logger) if err != nil { return nil, err } @@ -731,6 +750,12 @@ func InitializeTransformerByConfigType( config.Language = execCfg.transformPiiText.defaultLanguage } + transformPiiTextApi := newFromExecConfig( + execCfg.transformPiiText, + execCfg.transformPiiText.neosyncOperatorApi, + execCfg.logger, + ) + return &TransformerExecutor{ Opts: nil, Mutate: func(value, opts any) (any, error) { @@ -738,12 +763,10 @@ func InitializeTransformerByConfigType( if !ok { return nil, fmt.Errorf("expected value to be of type string. %T", value) } - return ee_transformer_fns.TransformPiiText( + return transformPiiTextApi.Transform( context.Background(), - execCfg.transformPiiText.analyze, execCfg.transformPiiText.anonymize, execCfg.transformPiiText.neosyncOperatorApi, config, valueStr, - execCfg.logger, ) }, }, nil diff --git a/worker/pkg/benthos/transformer_executor/transform_pii_text_api.go b/worker/pkg/benthos/transformer_executor/transform_pii_text_api.go new file mode 100644 index 0000000000..695d2c991e --- /dev/null +++ b/worker/pkg/benthos/transformer_executor/transform_pii_text_api.go @@ -0,0 +1,40 @@ +package transformer_executor + +import ( + "context" + "log/slog" + + mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" + ee_transformer_fns "github.com/nucleuscloud/neosync/internal/ee/transformers/functions" + "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers" +) + +type piiTextApi struct { + execConfig *transformPiiTextConfig + neosyncOperatorApi ee_transformer_fns.NeosyncOperatorApi + logger *slog.Logger +} + +func newFromExecConfig( + execConfig *transformPiiTextConfig, + neosyncOperatorApi ee_transformer_fns.NeosyncOperatorApi, + logger *slog.Logger, +) transformers.TransformPiiTextApi { + return &piiTextApi{ + execConfig: execConfig, + neosyncOperatorApi: neosyncOperatorApi, + logger: logger, + } +} + +func (p *piiTextApi) Transform(ctx context.Context, config *mgmtv1alpha1.TransformPiiText, value string) (string, error) { + return ee_transformer_fns.TransformPiiText( + ctx, + p.execConfig.analyze, + p.execConfig.anonymize, + p.neosyncOperatorApi, + config, + value, + p.logger, + ) +} diff --git a/worker/pkg/benthos/transformers/gen_transform_pii_text.go b/worker/pkg/benthos/transformers/gen_transform_pii_text.go new file mode 100644 index 0000000000..164b1e8d07 --- /dev/null +++ b/worker/pkg/benthos/transformers/gen_transform_pii_text.go @@ -0,0 +1,168 @@ +// source: transform_pii_text.go + +package transformers + +import ( + "encoding/json" + "fmt" + "strings" +) + +type TransformPiiText struct { + api TransformPiiTextApi +} + +type TransformPiiTextOpts struct { + scoreThreshold *float64 + language *string + allowedPhrases any + allowedEntities any + defaultAnonymizer any + denyRecognizers any + entityAnonymizers any +} + +func NewTransformPiiText( + api TransformPiiTextApi, +) *TransformPiiText { + return &TransformPiiText{ + api: api, + } +} + +func NewTransformPiiTextOpts( + scoreThresholdArg *float64, + language *string, + allowedPhrasesArg any, + allowedEntitiesArg any, + defaultAnonymizerArg any, + denyRecognizersArg any, + entityAnonymizersArg any, +) (*TransformPiiTextOpts, error) { + scoreThreshold := float64(0.5) + if scoreThresholdArg != nil { + scoreThreshold = *scoreThresholdArg + } + + return &TransformPiiTextOpts{ + scoreThreshold: &scoreThreshold, + language: language, + allowedPhrases: allowedPhrasesArg, + allowedEntities: allowedEntitiesArg, + defaultAnonymizer: defaultAnonymizerArg, + denyRecognizers: denyRecognizersArg, + entityAnonymizers: entityAnonymizersArg, + }, nil +} + +func (o *TransformPiiTextOpts) BuildBloblangString( + valuePath string, +) (string, error) { + fnStr := []string{ + "value:this.%s", + } + + params := []any{ + valuePath, + } + + if o.scoreThreshold != nil { + fnStr = append(fnStr, "score_threshold:%v") + params = append(params, *o.scoreThreshold) + } + if o.language != nil { + fnStr = append(fnStr, "language:%q") + params = append(params, *o.language) + } + if o.allowedPhrases != nil { + fnStr = append(fnStr, "allowed_phrases:%v") + params = append(params, o.allowedPhrases) + } + if o.allowedEntities != nil { + fnStr = append(fnStr, "allowed_entities:%v") + params = append(params, o.allowedEntities) + } + if o.defaultAnonymizer != nil { + fnStr = append(fnStr, "default_anonymizer:%s") + json, err := json.Marshal(o.defaultAnonymizer) + if err != nil { + return "", fmt.Errorf("unable to marshal default_anonymizer: %w", err) + } + params = append(params, string(json)) + } + if o.denyRecognizers != nil { + fnStr = append(fnStr, "deny_recognizers:%s") + json, err := json.Marshal(o.denyRecognizers) + if err != nil { + return "", fmt.Errorf("unable to marshal deny_recognizers: %w", err) + } + params = append(params, string(json)) + } + if o.entityAnonymizers != nil { + fnStr = append(fnStr, "entity_anonymizers:%s") + json, err := json.Marshal(o.entityAnonymizers) + if err != nil { + return "", fmt.Errorf("unable to marshal entity_anonymizers: %w", err) + } + params = append(params, string(json)) + } + + template := fmt.Sprintf("transform_pii_text(%s)", strings.Join(fnStr, ",")) + return fmt.Sprintf(template, params...), nil +} + +func (t *TransformPiiText) GetJsTemplateData() (*TemplateData, error) { + return &TemplateData{ + Name: "transformPiiText", + Description: "Anonymizes and transforms freeform text.", + Example: "", + }, nil +} + +func (t *TransformPiiText) ParseOptions(opts map[string]any) (any, error) { + transformerOpts := &TransformPiiTextOpts{} + + scoreThreshold, ok := opts["scoreThreshold"].(float64) + if !ok { + scoreThreshold = 0.5 + } + transformerOpts.scoreThreshold = &scoreThreshold + + var language *string + if arg, ok := opts["language"].(string); ok { + language = &arg + } + transformerOpts.language = language + + allowedPhrases, ok := opts["allowedPhrases"] + if !ok { + allowedPhrases = []any{} + } + transformerOpts.allowedPhrases = allowedPhrases + + allowedEntities, ok := opts["allowedEntities"] + if !ok { + allowedEntities = []any{} + } + transformerOpts.allowedEntities = allowedEntities + + var defaultAnonymizer any + if arg, ok := opts["defaultAnonymizer"]; ok { + defaultAnonymizer = arg + } + transformerOpts.defaultAnonymizer = defaultAnonymizer + + denyRecognizers, ok := opts["denyRecognizers"] + if !ok { + denyRecognizers = []any{} + } + transformerOpts.denyRecognizers = denyRecognizers + + entityAnonymizers, ok := opts["entityAnonymizers"] + if !ok { + entityAnonymizers = map[string]any{} + } + transformerOpts.entityAnonymizers = entityAnonymizers + + return transformerOpts, nil +} diff --git a/worker/pkg/benthos/transformers/transform_pii_text.go b/worker/pkg/benthos/transformers/transform_pii_text.go new file mode 100644 index 0000000000..8ab0075455 --- /dev/null +++ b/worker/pkg/benthos/transformers/transform_pii_text.go @@ -0,0 +1,470 @@ +package transformers + +import ( + context "context" + "encoding/json" + "errors" + "fmt" + "reflect" + + "connectrpc.com/connect" + mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" + "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1/mgmtv1alpha1connect" + "github.com/redpanda-data/benthos/v4/public/bloblang" +) + +// Minimal interface that includes the config and value +// To be used deep in the transformers so we don't have to be aware of the account id at the benthos level +type TransformPiiTextApi interface { + Transform(ctx context.Context, config *mgmtv1alpha1.TransformPiiText, value string) (string, error) +} + +// Full interface that includes the account id +type AccountTransformPiiTextApi interface { + Transform(ctx context.Context, accountId string, config *mgmtv1alpha1.TransformPiiText, value string) (string, error) +} + +type AccountAwareAnonymizationPiiTextApi struct { + anonApi mgmtv1alpha1connect.AnonymizationServiceClient + accountId string +} + +func NewAccountAwareAnonymizationPiiTextApi( + anonApi mgmtv1alpha1connect.AnonymizationServiceClient, + accountId string, +) *AccountAwareAnonymizationPiiTextApi { + return &AccountAwareAnonymizationPiiTextApi{ + anonApi: anonApi, + accountId: accountId, + } +} + +func (a *AccountAwareAnonymizationPiiTextApi) Transform( + ctx context.Context, + config *mgmtv1alpha1.TransformPiiText, + value string, +) (string, error) { + wrapper := valueWrapper{ + Input: value, + } + bits, err := json.Marshal(wrapper) + if err != nil { + return "", fmt.Errorf("unable to marshal value: %w", err) + } + + resp, err := a.anonApi.AnonymizeSingle(ctx, connect.NewRequest(&mgmtv1alpha1.AnonymizeSingleRequest{ + InputData: string(bits), + AccountId: a.accountId, + TransformerMappings: []*mgmtv1alpha1.TransformerMapping{ + { + Expression: ".input", + Transformer: &mgmtv1alpha1.TransformerConfig{ + Config: &mgmtv1alpha1.TransformerConfig_TransformPiiTextConfig{ + TransformPiiTextConfig: config, + }, + }, + }, + }, + })) + + if err != nil { + return "", fmt.Errorf("unable to anonymize text: %w", err) + } + + outputData := resp.Msg.GetOutputData() + var output valueWrapper + err = json.Unmarshal([]byte(outputData), &output) + if err != nil { + return "", fmt.Errorf("unable to unmarshal output: %w", err) + } + + return output.Input, nil +} + +type valueWrapper struct { + Input string `json:"input"` +} + +func RegisterTransformPiiText( + env *bloblang.Environment, + api TransformPiiTextApi, +) error { + spec := bloblang.NewPluginSpec(). + Description("Anonymizes and transforms freeform text."). + Category("string"). + Param(bloblang.NewAnyParam("value").Optional()). + Param(bloblang.NewFloat64Param("score_threshold"). + Default(0.5). + Optional(). + Description("The minimum score for a text to be considered PII."), + ). + Param(bloblang.NewStringParam("language"). + Optional(). + Default("en"). + Description("The language of the text to be anonymized."), + ). + Param(bloblang.NewAnyParam("allowed_phrases"). + Optional(). + Default([]any{}). + Description("A list of phrases that will not be considered PII."), + ). + Param(bloblang.NewAnyParam("allowed_entities"). + Optional(). + Default([]any{}). + Description("A list of entities to be used for PII analysis. If not provided or empty, all entities are considered. If specified, any ad-hoc, or deny_recognizers entity names must also be provided. To see available builtin entities, cal the GetPiiTextEntities() RPC method for your account."), + ). + Param(bloblang.NewAnyParam("default_anonymizer"). + Optional(). + Description("The default anonymization configuration used for all instances of detected PII."), + ). + Param(bloblang.NewAnyParam("deny_recognizers"). + Optional(). + Default([]any{}). + Description("Configure deny lists where each word is treated as PII. Each entry should contain 'name' and 'deny_words' fields."), + ). + Param(bloblang.NewAnyParam("entity_anonymizers"). + Optional(). + Default(map[string]any{}). + Description("A map of entity names to anonymizer configurations. The key corresponds to a recognized entity (e.g. PERSON, PHONE_NUMBER) and the value is the anonymizer configuration."), + ) + + err := env.RegisterFunctionV2( + "transform_pii_text", + spec, + func(args *bloblang.ParsedParams) (bloblang.Function, error) { + valuePtr, err := args.GetOptionalString("value") + if err != nil { + return nil, err + } + + scoreThresholdParam, err := args.GetOptionalFloat64("score_threshold") + if err != nil { + return nil, err + } + scoreThreshold := float32(0.5) + if scoreThresholdParam != nil { + scoreThreshold = float32(*scoreThresholdParam) + } + + language, err := args.GetOptionalString("language") + if err != nil { + return nil, err + } + if language == nil { + defaultLanguage := "en" + language = &defaultLanguage + } + + allowedPhrasesParam, err := args.Get("allowed_phrases") + if err != nil { + return nil, err + } + allowedPhrases, err := fromAnyToStringSlice(allowedPhrasesParam) + if err != nil { + return nil, err + } + + allowedEntitiesParam, err := args.Get("allowed_entities") + if err != nil { + return nil, err + } + allowedEntities, err := fromAnyToStringSlice(allowedEntitiesParam) + if err != nil { + return nil, err + } + + defaultAnonymizer, err := args.Get("default_anonymizer") + if err != nil { + return nil, err + } + // Convert to PiiAnonymizer struct + var defaultAnonymizerConfig *mgmtv1alpha1.PiiAnonymizer + if defaultAnonymizer != nil { + defaultAnonymizerConfig, err = convertToPiiAnonymizer(defaultAnonymizer) + if err != nil { + return nil, fmt.Errorf("invalid default_anonymizer config: %w", err) + } + } + + denyRecognizersRaw, err := args.Get("deny_recognizers") + if err != nil { + return nil, err + } + // Convert to PiiDenyRecognizer array + denyRecognizers, err := convertToPiiDenyRecognizerArray(denyRecognizersRaw) + if err != nil { + return nil, fmt.Errorf("invalid deny_recognizers config: %w", err) + } + + entityAnonymizersRaw, err := args.Get("entity_anonymizers") + if err != nil { + return nil, err + } + // Convert to map[string]PiiAnonymizer + entityAnonymizers, err := convertToPiiAnonymizerMap(entityAnonymizersRaw) + if err != nil { + return nil, fmt.Errorf("invalid entity_anonymizers config: %w", err) + } + + config := &mgmtv1alpha1.TransformPiiText{ + ScoreThreshold: float32(scoreThreshold), + Language: language, + AllowedPhrases: allowedPhrases, + AllowedEntities: allowedEntities, + DefaultAnonymizer: defaultAnonymizerConfig, + DenyRecognizers: denyRecognizers, + EntityAnonymizers: entityAnonymizers, + } + + return func() (any, error) { + res, err := transformPiiText(api, config, valuePtr) + if err != nil { + return nil, fmt.Errorf("unable to run transform_pii_text: %w", err) + } + return res, nil + }, nil + }, + ) + if err != nil { + return fmt.Errorf("unable to register transform_pii_text: %w", err) + } + return nil +} + +func NewTransformPiiTextOptsFromConfig( + config *mgmtv1alpha1.TransformPiiText, +) (*TransformPiiTextOpts, error) { + if config == nil { + defaultLanguage := "en" + config = &mgmtv1alpha1.TransformPiiText{ + Language: &defaultLanguage, + ScoreThreshold: 0.5, + } + } + scoreThreshold := float64(config.ScoreThreshold) + return NewTransformPiiTextOpts( + &scoreThreshold, + config.Language, + config.AllowedPhrases, + config.AllowedEntities, + config.DefaultAnonymizer, + config.DenyRecognizers, + config.EntityAnonymizers, + ) +} + +func (t *TransformPiiText) Transform(value, opts any) (any, error) { + parsedOpts, ok := opts.(*TransformPiiTextOpts) + if !ok { + return nil, fmt.Errorf("invalid parsed opts: %T", opts) + } + + valueStr, ok := value.(string) + if !ok { + return nil, errors.New("value is not a string") + } + + allowedPhrases, err := fromAnyToStringSlice(parsedOpts.allowedPhrases) + if err != nil { + return nil, fmt.Errorf("invalid allowed_phrases: %w", err) + } + + allowedEntities, err := fromAnyToStringSlice(parsedOpts.allowedEntities) + if err != nil { + return nil, fmt.Errorf("invalid allowed_entities: %w", err) + } + + defaultAnonymizer, err := convertToPiiAnonymizer(parsedOpts.defaultAnonymizer) + if err != nil { + return nil, fmt.Errorf("invalid default_anonymizer: %w", err) + } + + denyRecognizers, err := convertToPiiDenyRecognizerArray(parsedOpts.denyRecognizers) + if err != nil { + return nil, fmt.Errorf("invalid deny_recognizers: %w", err) + } + + entityAnonymizers, err := convertToPiiAnonymizerMap(parsedOpts.entityAnonymizers) + if err != nil { + return nil, fmt.Errorf("invalid entity_anonymizers: %w", err) + } + + config := &mgmtv1alpha1.TransformPiiText{ + ScoreThreshold: float32(*parsedOpts.scoreThreshold), + Language: parsedOpts.language, + AllowedPhrases: allowedPhrases, + AllowedEntities: allowedEntities, + DefaultAnonymizer: defaultAnonymizer, + DenyRecognizers: denyRecognizers, + EntityAnonymizers: entityAnonymizers, + } + + return transformPiiText(t.api, config, &valueStr) +} + +func transformPiiText(api TransformPiiTextApi, config *mgmtv1alpha1.TransformPiiText, value any) (*string, error) { + if value == nil { + return nil, nil + } + + v := reflect.ValueOf(value) + var result string + switch v.Kind() { + case reflect.Ptr: + if v.IsNil() { + return nil, nil + } + result = v.Elem().String() + case reflect.String: + result = v.String() + default: + result = v.String() + } + + if result == "" { + return &result, nil + } + + transformedResult, err := api.Transform(context.Background(), config, result) + if err != nil { + return nil, fmt.Errorf("unable to transform PII text: %w", err) + } + + return &transformedResult, nil +} + +func convertToPiiDenyRecognizerArray(raw any) ([]*mgmtv1alpha1.PiiDenyRecognizer, error) { + denyRecognizers := make([]*mgmtv1alpha1.PiiDenyRecognizer, 0) + if raw == nil { + return denyRecognizers, nil + } + denyRecognizersRawArray, ok := raw.([]any) + if !ok { + return nil, fmt.Errorf("deny_recognizers must be an array") + } + for _, recognizer := range denyRecognizersRawArray { + recognizerMap, ok := recognizer.(map[string]any) + if !ok { + return nil, fmt.Errorf("deny_recognizer must be a map, was: %T", recognizer) + } + denyRecognizer, err := convertToPiiDenyRecognizer(recognizerMap) + if err != nil { + return nil, fmt.Errorf("invalid deny_recognizer config: %w", err) + } + denyRecognizers = append(denyRecognizers, denyRecognizer) + } + return denyRecognizers, nil +} + +func convertToPiiAnonymizerMap(raw any) (map[string]*mgmtv1alpha1.PiiAnonymizer, error) { + entityAnonymizers := make(map[string]*mgmtv1alpha1.PiiAnonymizer) + if raw == nil { + return entityAnonymizers, nil + } + entityAnonymizersRawMap, ok := raw.(map[string]any) + if !ok { + return nil, fmt.Errorf("entity_anonymizers must be a map, was: %T", raw) + } + for entity, anonymizer := range entityAnonymizersRawMap { + anonymizerConfig, err := convertToPiiAnonymizer(anonymizer) + if err != nil { + return nil, fmt.Errorf("invalid entity_anonymizer config for entity %s: %w", entity, err) + } + entityAnonymizers[entity] = anonymizerConfig + } + return entityAnonymizers, nil +} + +func convertToPiiAnonymizer(raw any) (*mgmtv1alpha1.PiiAnonymizer, error) { + if raw == nil { + return nil, nil + } + + configMap, ok := raw.(map[string]any) + if !ok { + return nil, fmt.Errorf("anonymizer config must be a map") + } + + anonymizer := &mgmtv1alpha1.PiiAnonymizer{} + + // Check for each possible config type and set accordingly + if replace, ok := configMap["replace"].(map[string]any); ok { + var value *string + valueParam, ok := replace["value"].(string) + if ok && valueParam != "" { + value = &valueParam + } + anonymizer.Config = &mgmtv1alpha1.PiiAnonymizer_Replace_{ + Replace: &mgmtv1alpha1.PiiAnonymizer_Replace{ + Value: value, + }, + } + } else if _, ok := configMap["redact"].(map[string]any); ok { + anonymizer.Config = &mgmtv1alpha1.PiiAnonymizer_Redact_{ + Redact: &mgmtv1alpha1.PiiAnonymizer_Redact{}, + } + } else if mask, ok := configMap["mask"].(map[string]any); ok { + maskConfig := &mgmtv1alpha1.PiiAnonymizer_Mask{} + if char, ok := mask["masking_char"].(string); ok { + maskConfig.MaskingChar = &char + } + if chars, ok := mask["chars_to_mask"].(float64); ok { + intChars := int32(chars) + maskConfig.CharsToMask = &intChars + } + if fromEnd, ok := mask["from_end"].(bool); ok { + maskConfig.FromEnd = &fromEnd + } + anonymizer.Config = &mgmtv1alpha1.PiiAnonymizer_Mask_{ + Mask: &mgmtv1alpha1.PiiAnonymizer_Mask{ + MaskingChar: maskConfig.MaskingChar, + CharsToMask: maskConfig.CharsToMask, + FromEnd: maskConfig.FromEnd, + }, + } + } else if hash, ok := configMap["hash"].(map[string]any); ok { + if algo, ok := hash["algo"].(int64); ok { + convertedAlgo := mgmtv1alpha1.PiiAnonymizer_Hash_HashType(algo) //nolint:gosec + if _, ok := mgmtv1alpha1.PiiAnonymizer_Hash_HashType_name[int32(convertedAlgo)]; !ok { + return nil, fmt.Errorf("invalid hash algorithm: %d", convertedAlgo) + } + anonymizer.Config = &mgmtv1alpha1.PiiAnonymizer_Hash_{ + Hash: &mgmtv1alpha1.PiiAnonymizer_Hash{ + Algo: &convertedAlgo, + }, + } + } else { + return nil, fmt.Errorf("invalid hash algorithm: %T", hash["algo"]) + } + } else if _, ok := configMap["transform"].(map[string]any); ok { + return nil, fmt.Errorf("transform not currently supported") + } else { + return nil, fmt.Errorf("invalid anonymizer config: must contain one of replace, redact, mask, hash, or transform") + } + + return anonymizer, nil +} + +func convertToPiiDenyRecognizer(raw map[string]any) (*mgmtv1alpha1.PiiDenyRecognizer, error) { + name, ok := raw["name"].(string) + if !ok { + return nil, fmt.Errorf("deny_recognizer must have a name") + } + + denyWordsRaw, ok := raw["deny_words"].([]any) + if !ok { + return nil, fmt.Errorf("deny_recognizer must have deny_words array") + } + + denyWords := make([]string, 0) + for _, word := range denyWordsRaw { + if str, ok := word.(string); ok { + denyWords = append(denyWords, str) + } + } + + return &mgmtv1alpha1.PiiDenyRecognizer{ + Name: name, + DenyWords: denyWords, + }, nil +} diff --git a/worker/pkg/benthos/transformers/transform_pii_text_test.go b/worker/pkg/benthos/transformers/transform_pii_text_test.go new file mode 100644 index 0000000000..ca62ad69b0 --- /dev/null +++ b/worker/pkg/benthos/transformers/transform_pii_text_test.go @@ -0,0 +1,199 @@ +package transformers + +import ( + "context" + "testing" + + mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" + "github.com/redpanda-data/benthos/v4/public/bloblang" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type mockTransformPiiTextApi struct { + transformFn func(ctx context.Context, config *mgmtv1alpha1.TransformPiiText, value string) (string, error) +} + +func (m *mockTransformPiiTextApi) Transform(ctx context.Context, config *mgmtv1alpha1.TransformPiiText, value string) (string, error) { + return m.transformFn(ctx, config, value) +} + +func TestRegisterTransformPiiText(t *testing.T) { + env := bloblang.NewEnvironment() + mockApi := &mockTransformPiiTextApi{ + transformFn: func(ctx context.Context, config *mgmtv1alpha1.TransformPiiText, value string) (string, error) { + return "anonymized: " + value, nil + }, + } + + err := RegisterTransformPiiText(env, mockApi) + require.NoError(t, err) + + t.Run("basic usage", func(t *testing.T) { + mapping := `transform_pii_text("sensitive data")` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query(nil) + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) + + t.Run("with value param", func(t *testing.T) { + mapping := `transform_pii_text(value: "sensitive data")` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query(nil) + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) + + t.Run("with score_threshold", func(t *testing.T) { + mapping := `transform_pii_text(value: "sensitive data", score_threshold: 0.7)` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query(nil) + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) + + t.Run("with language", func(t *testing.T) { + mapping := `transform_pii_text(value: "sensitive data", language: "en")` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query(nil) + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) + + t.Run("with allowed_phrases", func(t *testing.T) { + mapping := `transform_pii_text(value: "sensitive data", allowed_phrases: ["data"])` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query(nil) + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) + + t.Run("with allowed_entities", func(t *testing.T) { + mapping := `transform_pii_text(value: "sensitive data", allowed_entities: ["PERSON", "EMAIL"])` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query(nil) + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) + + t.Run("with default_anonymizer", func(t *testing.T) { + mapping := `transform_pii_text(value: "sensitive data", default_anonymizer: {"replace": {"value": "REDACTED"}})` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query(nil) + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) + + t.Run("with deny_recognizers", func(t *testing.T) { + mapping := `transform_pii_text(value: "sensitive data", deny_recognizers: [{"name": "test", "deny_words": ["sensitive"]}])` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query(nil) + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) + + t.Run("with entity_anonymizers", func(t *testing.T) { + mapping := `transform_pii_text(value: "sensitive data", entity_anonymizers: {"PERSON": {"replace": {"new_value": "PERSON"}}})` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query(nil) + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) + t.Run("with entity_anonymizers using redact", func(t *testing.T) { + mapping := `transform_pii_text(value: "sensitive data", entity_anonymizers: {"PERSON": {"redact": {}}})` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query(nil) + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) + + t.Run("with entity_anonymizers using mask", func(t *testing.T) { + mapping := `transform_pii_text(value: "sensitive data", entity_anonymizers: {"PERSON": {"mask": {"masking_char": "*", "chars_to_mask": 4, "from_end": true}}})` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query(nil) + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) + + t.Run("with entity_anonymizers using hash", func(t *testing.T) { + mapping := `transform_pii_text(value: "sensitive data", entity_anonymizers: {"PERSON": {"hash": {"algo": 1}}})` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query(nil) + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) + + t.Run("with value from context", func(t *testing.T) { + mapping := `transform_pii_text(value: this)` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query("sensitive data") + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) +} diff --git a/worker/pkg/workflows/tablesync/activities/sync/activity.go b/worker/pkg/workflows/tablesync/activities/sync/activity.go index 97b14fbbab..2a177cc41b 100644 --- a/worker/pkg/workflows/tablesync/activities/sync/activity.go +++ b/worker/pkg/workflows/tablesync/activities/sync/activity.go @@ -45,6 +45,7 @@ type Activity struct { meter metric.Meter // optional benthosStreamManager benthosstream.BenthosStreamManagerClient temporalclient temporalclient.Client + anonymizationClient mgmtv1alpha1connect.AnonymizationServiceClient } func New( @@ -55,6 +56,7 @@ func New( meter metric.Meter, benthosStreamManager benthosstream.BenthosStreamManagerClient, temporalclient temporalclient.Client, + anonymizationClient mgmtv1alpha1connect.AnonymizationServiceClient, ) *Activity { return &Activity{ connclient: connclient, @@ -64,6 +66,7 @@ func New( meter: meter, benthosStreamManager: benthosStreamManager, temporalclient: temporalclient, + anonymizationClient: anonymizationClient, } } @@ -200,6 +203,7 @@ func (a *Activity) SyncTable( bstream, err := a.getBenthosStream( &info, + req.AccountId, benthosConfig, session, stopActivityChan, @@ -207,6 +211,7 @@ func (a *Activity) SyncTable( hasMorePages, continuationToken, identityAllocator, + a.anonymizationClient, logger, ) if err != nil { @@ -338,6 +343,7 @@ func runStream( func (a *Activity) getBenthosStream( info *activity.Info, + accountId string, benthosConfig string, session connectionmanager.SessionInterface, stopActivityChan chan error, @@ -345,17 +351,20 @@ func (a *Activity) getBenthosStream( hasMorePages neosync_benthos_sql.OnHasMorePagesFn, continuationToken *continuation_token.ContinuationToken, identityAllocator tablesync_shared.IdentityAllocator, + anonymizationClient mgmtv1alpha1connect.AnonymizationServiceClient, logger *slog.Logger, ) (benthosstream.BenthosStreamClient, error) { benenv, err := a.getBenthosEnvironment( logger, info.Attempt > 1, + accountId, getConnectionById, session, stopActivityChan, hasMorePages, continuationToken, identityAllocator, + anonymizationClient, ) if err != nil { return nil, err @@ -393,18 +402,28 @@ func (a *Activity) getBenthosStream( func (a *Activity) getBenthosEnvironment( logger *slog.Logger, isRetry bool, + accountId string, getConnectionById func(connectionId string) (connectionmanager.ConnectionInput, error), session connectionmanager.SessionInterface, stopActivityChan chan error, hasMorePages neosync_benthos_sql.OnHasMorePagesFn, continuationToken *continuation_token.ContinuationToken, identityAllocator tablesync_shared.IdentityAllocator, + anonymizationClient mgmtv1alpha1connect.AnonymizationServiceClient, ) (*service.Environment, error) { blobEnv := bloblang.NewEnvironment() err := transformers.RegisterTransformIdentityScramble(blobEnv, identityAllocator) if err != nil { return nil, fmt.Errorf("unable to register identity scramble transformer: %w", err) } + transformPiiTextApiForAccount := transformers.NewAccountAwareAnonymizationPiiTextApi( + anonymizationClient, + accountId, + ) + err = transformers.RegisterTransformPiiText(blobEnv, transformPiiTextApiForAccount) + if err != nil { + return nil, fmt.Errorf("unable to register pii text transformer: %w", err) + } benenv, err := benthos_environment.NewEnvironment( logger, benthos_environment.WithMeter(a.meter), @@ -429,6 +448,7 @@ func (a *Activity) getBenthosEnvironment( }), benthos_environment.WithStopChannel(stopActivityChan), benthos_environment.WithBlobEnv(blobEnv), + benthos_environment.WithTransformPiiTextApi(transformPiiTextApiForAccount), ) if err != nil { return nil, fmt.Errorf("unable to instantiate benthos environment: %w", err) diff --git a/worker/pkg/workflows/tablesync/activities/sync/activity_test.go b/worker/pkg/workflows/tablesync/activities/sync/activity_test.go index bfe2434375..ca0141b39b 100644 --- a/worker/pkg/workflows/tablesync/activities/sync/activity_test.go +++ b/worker/pkg/workflows/tablesync/activities/sync/activity_test.go @@ -88,7 +88,7 @@ output: var meter metric.Meter temporalclient := tmprl_mocks.NewClient(t) - activity := New(connclient, jobclient, sqlconnmanager, mongoconnmanager, meter, benthosStreamManager, temporalclient) + activity := New(connclient, jobclient, sqlconnmanager, mongoconnmanager, meter, benthosStreamManager, temporalclient, nil) env.RegisterActivity(activity.SyncTable) @@ -167,7 +167,7 @@ output: var meter metric.Meter temporalclient := tmprl_mocks.NewClient(t) - activity := New(connclient, jobclient, sqlconnmanager, mongoconnmanager, meter, benthosStreamManager, temporalclient) + activity := New(connclient, jobclient, sqlconnmanager, mongoconnmanager, meter, benthosStreamManager, temporalclient, nil) env.RegisterActivity(activity.SyncTable) t.Run("valid continuation token", func(t *testing.T) { @@ -210,7 +210,7 @@ func Test_Sync_Run_No_BenthosConfig(t *testing.T) { benthosStreamManager := benthosstream.NewBenthosStreamManager() temporalclient := tmprl_mocks.NewClient(t) - activity := New(nil, nil, nil, nil, nil, benthosStreamManager, temporalclient) + activity := New(nil, nil, nil, nil, nil, benthosStreamManager, temporalclient, nil) env.RegisterActivity(activity.SyncTable) @@ -278,7 +278,7 @@ metrics: sqlconnmanager := connectionmanager.NewConnectionManager(sqlprovider.NewProvider(&sqlconnect.SqlOpenConnector{}), connectionmanager.WithCloseOnRelease()) mongoconnmanager := connectionmanager.NewConnectionManager(mongoprovider.NewProvider(), connectionmanager.WithCloseOnRelease()) temporalclient := tmprl_mocks.NewClient(t) - activity := New(connclient, jobclient, sqlconnmanager, mongoconnmanager, meter, benthosStreamManager, temporalclient) + activity := New(connclient, jobclient, sqlconnmanager, mongoconnmanager, meter, benthosStreamManager, temporalclient, nil) env.RegisterActivity(activity.SyncTable) @@ -354,7 +354,7 @@ func Test_Sync_Run_Processor_Error(t *testing.T) { mongoconnmanager := connectionmanager.NewConnectionManager(mongoprovider.NewProvider(), connectionmanager.WithCloseOnRelease()) var meter metric.Meter temporalclient := tmprl_mocks.NewClient(t) - activity := New(connclient, jobclient, sqlconnmanager, mongoconnmanager, meter, benthosStreamManager, temporalclient) + activity := New(connclient, jobclient, sqlconnmanager, mongoconnmanager, meter, benthosStreamManager, temporalclient, nil) env.RegisterActivity(activity.SyncTable) @@ -424,7 +424,7 @@ output: mongoconnmanager := connectionmanager.NewConnectionManager(mongoprovider.NewProvider(), connectionmanager.WithCloseOnRelease()) var meter metric.Meter temporalclient := tmprl_mocks.NewClient(t) - activity := New(connclient, jobclient, sqlconnmanager, mongoconnmanager, meter, mockBenthosStreamManager, temporalclient) + activity := New(connclient, jobclient, sqlconnmanager, mongoconnmanager, meter, mockBenthosStreamManager, temporalclient, nil) env.RegisterActivity(activity.SyncTable) @@ -515,7 +515,7 @@ output: mongoconnmanager := connectionmanager.NewConnectionManager(mongoprovider.NewProvider(), connectionmanager.WithCloseOnRelease()) var meter metric.Meter temporalclient := tmprl_mocks.NewClient(t) - activity := New(connclient, jobclient, sqlconnmanager, mongoconnmanager, meter, mockBenthosStreamManager, temporalclient) + activity := New(connclient, jobclient, sqlconnmanager, mongoconnmanager, meter, mockBenthosStreamManager, temporalclient, nil) env.RegisterActivity(activity.SyncTable) _, err := env.ExecuteActivity(activity.SyncTable, &SyncTableRequest{ diff --git a/worker/pkg/workflows/tablesync/workflow/register/register.go b/worker/pkg/workflows/tablesync/workflow/register/register.go index 5b0f05db60..2f61923faa 100644 --- a/worker/pkg/workflows/tablesync/workflow/register/register.go +++ b/worker/pkg/workflows/tablesync/workflow/register/register.go @@ -27,6 +27,7 @@ func Register( benthosStreamManager benthosstream.BenthosStreamManagerClient, temporalclient client.Client, maxIterations int, + anonymizationClient mgmtv1alpha1connect.AnonymizationServiceClient, ) { tsWf := tablesync_workflow.New(maxIterations) w.RegisterWorkflow(tsWf.TableSync) @@ -39,6 +40,7 @@ func Register( meter, benthosStreamManager, temporalclient, + anonymizationClient, ) w.RegisterActivity(syncActivity.Sync)