Skip to content
This repository was archived by the owner on Aug 30, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions backend/internal/cmds/mgmt/serve/connect/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ func serve(ctx context.Context) error {
var presAnalyzeClient presidioapi.AnalyzeInterface
var presAnonClient presidioapi.AnonymizeInterface
var presEntityClient presidioapi.EntityInterface
if ncloudlicense.IsValid() {
if cascadelicense.IsValid() {
analyzeClient, ok, err := getPresidioAnalyzeClient()
if err != nil {
return fmt.Errorf("unable to initialize presidio analyze client: %w", err)
Expand All @@ -719,10 +719,11 @@ func serve(ctx context.Context) error {
}
}

isPresidioEnabled := cascadelicense.IsValid() && presAnalyzeClient != nil && presAnonClient != nil

transformerService := v1alpha1_transformerservice.New(&v1alpha1_transformerservice.Config{
IsPresidioEnabled: ncloudlicense.IsValid(),
IsNeosyncCloud: ncloudlicense.IsValid(),
}, db, presEntityClient, userdataclient)
IsPresidioEnabled: isPresidioEnabled,
}, db, presEntityClient, userdataclient, cascadelicense)
api.Handle(
mgmtv1alpha1connect.NewTransformersServiceHandler(
transformerService,
Expand All @@ -734,11 +735,11 @@ func serve(ctx context.Context) error {
)

anonymizationService := v1alpha1_anonymizationservice.New(&v1alpha1_anonymizationservice.Config{
IsPresidioEnabled: ncloudlicense.IsValid(),
IsPresidioEnabled: isPresidioEnabled,
PresidioDefaultLanguage: getPresidioDefaultLanguage(),
IsAuthEnabled: isAuthEnabled,
IsNeosyncCloud: ncloudlicense.IsValid(),
}, anonymizerMeter, userdataclient, useraccountService, transformerService, presAnalyzeClient, presAnonClient, db)
}, anonymizerMeter, userdataclient, useraccountService, transformerService, presAnalyzeClient, presAnonClient, db, cascadelicense)
api.Handle(
mgmtv1alpha1connect.NewAnonymizationServiceHandler(
anonymizationService,
Expand Down
13 changes: 11 additions & 2 deletions backend/internal/dtomaps/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ func ToJobDto(
) (*mgmtv1alpha1.Job, error) {
mappings := []*mgmtv1alpha1.JobMapping{}
for _, mapping := range inputJob.Mappings {
mappings = append(mappings, mapping.ToDto())
dto, err := mapping.ToDto()
if err != nil {
return nil, fmt.Errorf("unable to convert job mapping to dto: %w", err)
}
mappings = append(mappings, dto)
}

virtualForeignKeys := []*mgmtv1alpha1.VirtualForeignConstraint{}
Expand Down Expand Up @@ -50,6 +54,11 @@ func ToJobDto(
}
}

sourceOptions, err := inputJob.ConnectionOptions.ToDto()
if err != nil {
return nil, fmt.Errorf("unable to convert job source options to dto: %w", err)
}

return &mgmtv1alpha1.Job{
Id: neosyncdb.UUIDString(inputJob.ID),
Name: inputJob.Name,
Expand All @@ -61,7 +70,7 @@ func ToJobDto(
Mappings: mappings,
VirtualForeignKeys: virtualForeignKeys,
Source: &mgmtv1alpha1.JobSource{
Options: inputJob.ConnectionOptions.ToDto(),
Options: sourceOptions,
},
Destinations: destinations,
AccountId: neosyncdb.UUIDString(inputJob.AccountID),
Expand Down
6 changes: 5 additions & 1 deletion backend/internal/dtomaps/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@ func ToUserDefinedTransformerDto(
input.Source,
)
}
dto, err := input.TransformerConfig.ToTransformerConfigDto()
if err != nil {
return nil, err
}
return &mgmtv1alpha1.UserDefinedTransformer{
Id: neosyncdb.UUIDString(input.ID),
Name: input.Name,
Description: input.Description,
Source: source,
DataType: transformer.DataType, //nolint:staticcheck
DataTypes: transformer.DataTypes,
Config: input.TransformerConfig.ToTransformerConfigDto(),
Config: dto,
CreatedAt: timestamppb.New(input.CreatedAt.Time),
UpdatedAt: timestamppb.New(input.UpdatedAt.Time),
AccountId: neosyncdb.UUIDString(input.AccountID),
Expand Down
3 changes: 2 additions & 1 deletion backend/pkg/integration-test/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,11 @@ func (s *NeosyncApiTestClient) setupMux(
transformerService := v1alpha1_transformersservice.New(
&v1alpha1_transformersservice.Config{
IsPresidioEnabled: isPresidioEnabled,
IsNeosyncCloud: isNeosyncCloud,
},
neosyncdb.New(pgcontainer.DB, db_queries.New()),
s.Mocks.Presidio.Entities,
userclient,
license,
)

sqlmanagerclient := NewTestSqlManagerClient()
Expand Down Expand Up @@ -289,6 +289,7 @@ func (s *NeosyncApiTestClient) setupMux(
presAnalyzeClient,
presAnonClient,
neosyncDb,
license,
)

connectionDataService := v1alpha1_connectiondataservice.New(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (s *Service) AnonymizeMany(
req *connect.Request[mgmtv1alpha1.AnonymizeManyRequest],
) (*connect.Response[mgmtv1alpha1.AnonymizeManyResponse], error) {
logger := logger_interceptor.GetLoggerFromContextOrDefault(ctx)
if !s.cfg.IsNeosyncCloud {
if !s.license.IsValid() {
return nil, nucleuserrors.NewNotImplemented(
fmt.Sprintf(
"%s is not implemented in the OSS version of Neosync.",
Expand Down Expand Up @@ -190,11 +190,11 @@ func (s *Service) AnonymizeSingle(
if err != nil {
return nil, err
}
if !s.cfg.IsNeosyncCloud || account.AccountType == int16(neosyncdb.AccountType_Personal) {
if !s.license.IsValid() || (s.cfg.IsNeosyncCloud && account.AccountType == int16(neosyncdb.AccountType_Personal)) {
for _, mapping := range req.Msg.GetTransformerMappings() {
if mapping.GetTransformer().GetTransformPiiTextConfig() != nil {
return nil, nucleuserrors.NewForbidden(
"TransformPiiText is not available for use. Please contact us to upgrade your account.",
"TransformPiiText is not available for use. Please contact us about upgrading your account.",
)
}
}
Expand All @@ -203,7 +203,7 @@ func (s *Service) AnonymizeSingle(
defaultTransforms.GetN().GetTransformPiiTextConfig() != nil ||
defaultTransforms.GetS().GetTransformPiiTextConfig() != nil {
return nil, nucleuserrors.NewForbidden(
"TransformPiiText is not available for use. Please contact us to upgrade your account.",
"TransformPiiText is not available for use. Please contact us about upgrading your account.",
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package v1alpha_anonymizationservice
import (
"github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1/mgmtv1alpha1connect"
"github.com/nucleuscloud/neosync/backend/internal/userdata"
"github.com/nucleuscloud/neosync/internal/ee/license"
presidioapi "github.com/nucleuscloud/neosync/internal/ee/presidio"
"github.com/nucleuscloud/neosync/internal/neosyncdb"
"go.opentelemetry.io/otel/metric"
Expand All @@ -17,13 +18,14 @@ type Service struct {
analyze presidioapi.AnalyzeInterface
anonymize presidioapi.AnonymizeInterface
db *neosyncdb.NeosyncDb
license license.EEInterface
}

type Config struct {
IsAuthEnabled bool
IsNeosyncCloud bool
IsPresidioEnabled bool
PresidioDefaultLanguage *string
IsNeosyncCloud bool
}

func New(
Expand All @@ -35,6 +37,7 @@ func New(
analyzeclient presidioapi.AnalyzeInterface,
anonymizeclient presidioapi.AnonymizeInterface,
db *neosyncdb.NeosyncDb,
license license.EEInterface,
) *Service {
return &Service{
cfg: cfg,
Expand All @@ -45,5 +48,6 @@ func New(
analyze: analyzeclient,
anonymize: anonymizeclient,
db: db,
license: license,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v1alpha1_transformersservice

import (
"github.com/nucleuscloud/neosync/backend/internal/userdata"
"github.com/nucleuscloud/neosync/internal/ee/license"
presidioapi "github.com/nucleuscloud/neosync/internal/ee/presidio"
"github.com/nucleuscloud/neosync/internal/neosyncdb"
)
Expand All @@ -11,23 +12,25 @@ type Service struct {
db *neosyncdb.NeosyncDb
entityclient presidioapi.EntityInterface
userdataclient userdata.Interface
license license.EEInterface
}

type Config struct {
IsPresidioEnabled bool
IsNeosyncCloud bool
}

func New(
cfg *Config,
db *neosyncdb.NeosyncDb,
recognizerclient presidioapi.EntityInterface,
userdataclient userdata.Interface,
license license.EEInterface,
) *Service {
return &Service{
cfg: cfg,
db: db,
entityclient: recognizerclient,
userdataclient: userdataclient,
license: license,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1006,14 +1006,14 @@ func (s *Service) GetSystemTransformerBySource(
}

func (s *Service) getSystemTransformerSourceMap() map[mgmtv1alpha1.TransformerSource]*mgmtv1alpha1.SystemTransformer {
if s.cfg.IsNeosyncCloud {
if s.license.IsValid() {
return allSystemTransformersSourceMap
}
return baseSystemTransformerSourceMap
}

func (s *Service) getSystemTransformers() []*mgmtv1alpha1.SystemTransformer {
if s.cfg.IsNeosyncCloud {
if s.license.IsValid() {
return allSystemTransformers
}
return baseSystemTransformers
Expand Down
72 changes: 50 additions & 22 deletions backend/sql/postgresql/models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,13 +857,17 @@ type JobMapping struct {
JobMappingTransformer *JobMappingTransformerModel `json:"jobMappingTransformerModel,omitempty"`
}

func (jm *JobMapping) ToDto() *mgmtv1alpha1.JobMapping {
func (jm *JobMapping) ToDto() (*mgmtv1alpha1.JobMapping, error) {
transformer, err := jm.JobMappingTransformer.ToTransformerDto()
if err != nil {
return nil, err
}
return &mgmtv1alpha1.JobMapping{
Schema: jm.Schema,
Table: jm.Table,
Column: jm.Column,
Transformer: jm.JobMappingTransformer.ToTransformerDto(),
}
Transformer: transformer,
}, nil
}

func (jm *JobMapping) FromDto(dto *mgmtv1alpha1.JobMapping) error {
Expand Down Expand Up @@ -1097,13 +1101,29 @@ type DynamoDBSourceUnmappedTransformConfig struct {
S *JobMappingTransformerModel `json:"s"`
}

func (s *DynamoDBSourceUnmappedTransformConfig) ToDto() *mgmtv1alpha1.DynamoDBSourceUnmappedTransformConfig {
return &mgmtv1alpha1.DynamoDBSourceUnmappedTransformConfig{
B: s.B.ToTransformerDto(),
Boolean: s.Boolean.ToTransformerDto(),
N: s.N.ToTransformerDto(),
S: s.S.ToTransformerDto(),
func (s *DynamoDBSourceUnmappedTransformConfig) ToDto() (*mgmtv1alpha1.DynamoDBSourceUnmappedTransformConfig, error) {
b, err := s.B.ToTransformerDto()
if err != nil {
return nil, err
}
boolean, err := s.Boolean.ToTransformerDto()
if err != nil {
return nil, err
}
n, err := s.N.ToTransformerDto()
if err != nil {
return nil, err
}
str, err := s.S.ToTransformerDto()
if err != nil {
return nil, err
}
return &mgmtv1alpha1.DynamoDBSourceUnmappedTransformConfig{
B: b,
Boolean: boolean,
N: n,
S: str,
}, nil
}

func (s *DynamoDBSourceUnmappedTransformConfig) FromDto(
Expand Down Expand Up @@ -1156,7 +1176,7 @@ func (s *DynamoDBSourceTableOption) FromDto(dto *mgmtv1alpha1.DynamoDBSourceTabl
s.WhereClause = dto.WhereClause
}

func (s *DynamoDBSourceOptions) ToDto() *mgmtv1alpha1.DynamoDBSourceConnectionOptions {
func (s *DynamoDBSourceOptions) ToDto() (*mgmtv1alpha1.DynamoDBSourceConnectionOptions, error) {
tables := make([]*mgmtv1alpha1.DynamoDBSourceTableOption, len(s.Tables))
for i, t := range s.Tables {
tables[i] = t.ToDto()
Expand Down Expand Up @@ -1185,12 +1205,16 @@ func (s *DynamoDBSourceOptions) ToDto() *mgmtv1alpha1.DynamoDBSourceConnectionOp
},
}
}
unmappedTransforms, err := s.UnmappedTransforms.ToDto()
if err != nil {
return nil, err
}
return &mgmtv1alpha1.DynamoDBSourceConnectionOptions{
ConnectionId: s.ConnectionId,
Tables: tables,
UnmappedTransforms: s.UnmappedTransforms.ToDto(),
UnmappedTransforms: unmappedTransforms,
EnableConsistentRead: s.EnableConsistentRead,
}
}, nil
}

func (s *DynamoDBSourceOptions) FromDto(dto *mgmtv1alpha1.DynamoDBSourceConnectionOptions) error {
Expand Down Expand Up @@ -1731,57 +1755,61 @@ type MysqlSourceTableOption struct {
WhereClause *string `json:"whereClause,omitempty"`
}

func (j *JobSourceOptions) ToDto() *mgmtv1alpha1.JobSourceOptions {
func (j *JobSourceOptions) ToDto() (*mgmtv1alpha1.JobSourceOptions, error) {
if j.PostgresOptions != nil {
return &mgmtv1alpha1.JobSourceOptions{
Config: &mgmtv1alpha1.JobSourceOptions_Postgres{
Postgres: j.PostgresOptions.ToDto(),
},
}
}, nil
}
if j.MysqlOptions != nil {
return &mgmtv1alpha1.JobSourceOptions{
Config: &mgmtv1alpha1.JobSourceOptions_Mysql{
Mysql: j.MysqlOptions.ToDto(),
},
}
}, nil
}
if j.GenerateOptions != nil {
return &mgmtv1alpha1.JobSourceOptions{
Config: &mgmtv1alpha1.JobSourceOptions_Generate{
Generate: j.GenerateOptions.ToDto(),
},
}
}, nil
}
if j.AiGenerateOptions != nil {
return &mgmtv1alpha1.JobSourceOptions{
Config: &mgmtv1alpha1.JobSourceOptions_AiGenerate{
AiGenerate: j.AiGenerateOptions.ToDto(),
},
}
}, nil
}
if j.MongoDbOptions != nil {
return &mgmtv1alpha1.JobSourceOptions{
Config: &mgmtv1alpha1.JobSourceOptions_Mongodb{
Mongodb: j.MongoDbOptions.ToDto(),
},
}
}, nil
}
if j.DynamoDBOptions != nil {
dto, err := j.DynamoDBOptions.ToDto()
if err != nil {
return nil, err
}
return &mgmtv1alpha1.JobSourceOptions{
Config: &mgmtv1alpha1.JobSourceOptions_Dynamodb{
Dynamodb: j.DynamoDBOptions.ToDto(),
Dynamodb: dto,
},
}
}, nil
}
if j.MssqlOptions != nil {
return &mgmtv1alpha1.JobSourceOptions{
Config: &mgmtv1alpha1.JobSourceOptions_Mssql{
Mssql: j.MssqlOptions.ToDto(),
},
}
}, nil
}
return nil
return nil, fmt.Errorf("invalid job source options config, received type: %T", j)
}

func (j *JobSourceOptions) FromDto(dto *mgmtv1alpha1.JobSourceOptions) error {
Expand Down
Loading