From 3c37ba8e81dbc65918bdac01a9c92e6f5945f55f Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Mon, 31 Mar 2025 10:16:27 -0700 Subject: [PATCH 01/20] wires up benthos transform pii text function --- .../transformers/transform_pii_text.go | 350 ++++++++++++++++++ 1 file changed, 350 insertions(+) create mode 100644 worker/pkg/benthos/transformers/transform_pii_text.go diff --git a/worker/pkg/benthos/transformers/transform_pii_text.go b/worker/pkg/benthos/transformers/transform_pii_text.go new file mode 100644 index 0000000000..5b1989f735 --- /dev/null +++ b/worker/pkg/benthos/transformers/transform_pii_text.go @@ -0,0 +1,350 @@ +package transformers + +import ( + context "context" + "fmt" + "reflect" + + mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" + "github.com/redpanda-data/benthos/v4/public/bloblang" +) + +type TransformPiiTextApi interface { + Transform(ctx context.Context, config *mgmtv1alpha1.TransformPiiText, value string) (string, error) +} + +// +neosyncTransformerBuilder:transform:transformPiiText + +func RegisterTransformPiiText( + env *bloblang.Environment, + api TransformPiiTextApi, +) error { + spec := bloblang.NewPluginSpec(). + Description("Anonymizes and transforms freeform text."). + Category("string"). + Param(bloblang.NewAnyParam("value").Optional()). + Param(bloblang.NewFloat64Param("score_threshold"). + Default(0.5). + Optional(). + Description("The minimum score for a text to be considered PII."), + ). + Param(bloblang.NewStringParam("language"). + Optional(). + Description("The language of the text to be anonymized."), + ). + Param(bloblang.NewAnyParam("allowed_phrases"). + Optional(). + Default([]any{}). + Description("A list of phrases that will not be considered PII."), + ). + Param(bloblang.NewAnyParam("allowed_entities"). + Optional(). + Default([]any{}). + Description("A list of entities to be used for PII analysis. If not provided or empty, all entities are considered. If specified, any ad-hoc, or deny_recognizers entity names must also be provided. To see available builtin entities, cal the GetPiiTextEntities() RPC method for your account."), + ). + Param(bloblang.NewAnyParam("default_anonymizer"). + Optional(). + Description("The default anonymization configuration used for all instances of detected PII."), + ). + Param(bloblang.NewAnyParam("deny_recognizers"). + Optional(). + Default([]map[string]any{}). + Description("Configure deny lists where each word is treated as PII. Each entry should contain 'name' and 'deny_words' fields."), + ). + Param(bloblang.NewAnyParam("entity_anonymizers"). + Optional(). + Default(map[string]any{}). + Description("A map of entity names to anonymizer configurations. The key corresponds to a recognized entity (e.g. PERSON, PHONE_NUMBER) and the value is the anonymizer configuration."), + ) + + err := env.RegisterFunctionV2( + "transform_pii_text", + spec, + func(args *bloblang.ParsedParams) (bloblang.Function, error) { + valuePtr, err := args.GetOptionalString("value") + if err != nil { + return nil, err + } + + scoreThreshold, err := args.GetFloat64("score_threshold") + if err != nil { + return nil, err + } + + language, err := args.GetOptionalString("language") + if err != nil { + return nil, err + } + + allowedPhrasesParam, err := args.Get("allowed_phrases") + if err != nil { + return nil, err + } + allowedPhrases, err := fromAnyToStringSlice(allowedPhrasesParam) + if err != nil { + return nil, err + } + + allowedEntitiesParam, err := args.Get("allowed_entities") + if err != nil { + return nil, err + } + allowedEntities, err := fromAnyToStringSlice(allowedEntitiesParam) + if err != nil { + return nil, err + } + + defaultAnonymizer, err := args.Get("default_anonymizer") + if err != nil { + return nil, err + } + // Convert to PiiAnonymizer struct + var defaultAnonymizerConfig *mgmtv1alpha1.PiiAnonymizer + if defaultAnonymizer != nil { + defaultAnonymizerConfig, err = convertToPiiAnonymizer(defaultAnonymizer) + if err != nil { + return nil, fmt.Errorf("invalid default_anonymizer config: %w", err) + } + } + + denyRecognizersRaw, err := args.Get("deny_recognizers") + if err != nil { + return nil, err + } + // Convert to PiiDenyRecognizer array + denyRecognizers, err := convertToPiiDenyRecognizerArray(denyRecognizersRaw) + if err != nil { + return nil, fmt.Errorf("invalid deny_recognizers config: %w", err) + } + + entityAnonymizersRaw, err := args.Get("entity_anonymizers") + if err != nil { + return nil, err + } + // Convert to map[string]PiiAnonymizer + entityAnonymizers, err := convertToPiiAnonymizerMap(entityAnonymizersRaw) + if err != nil { + return nil, fmt.Errorf("invalid entity_anonymizers config: %w", err) + } + + config := &mgmtv1alpha1.TransformPiiText{ + ScoreThreshold: float32(scoreThreshold), + Language: language, + AllowedPhrases: allowedPhrases, + AllowedEntities: allowedEntities, + DefaultAnonymizer: defaultAnonymizerConfig, + DenyRecognizers: denyRecognizers, + EntityAnonymizers: entityAnonymizers, + } + + return func() (any, error) { + res, err := transformPiiText(api, config, valuePtr) + if err != nil { + return nil, fmt.Errorf("unable to run transform_pii_text: %w", err) + } + return res, nil + }, nil + }, + ) + if err != nil { + return fmt.Errorf("unable to register transform_pii_text: %w", err) + } + return nil +} + +func transformPiiText(api TransformPiiTextApi, config *mgmtv1alpha1.TransformPiiText, value any) (*string, error) { + if value == nil { + return nil, nil + } + + v := reflect.ValueOf(value) + var result string + switch v.Kind() { + case reflect.Ptr: + if v.IsNil() { + return nil, nil + } + result = v.Elem().String() + case reflect.String: + result = v.String() + default: + result = v.String() + } + + if result == "" { + return &result, nil + } + + transformedResult, err := api.Transform(context.Background(), config, result) + if err != nil { + return nil, fmt.Errorf("unable to transform PII text: %w", err) + } + + return &transformedResult, nil +} + +type PiiAnonymizer struct { + Replace *Replace + Redact *Redact + Mask *Mask + Hash *Hash + Transform *Transform +} + +type Replace struct { + Value *string +} + +type Redact struct{} + +type Mask struct { + MaskingChar *string + CharsToMask *int32 + FromEnd *bool +} + +type Hash struct { + Algo *string +} + +type Transform struct { + Config map[string]interface{} +} + +type PiiDenyRecognizer struct { + Name string + DenyWords []string +} + +func convertToPiiDenyRecognizerArray(raw any) ([]*mgmtv1alpha1.PiiDenyRecognizer, error) { + denyRecognizers := make([]*mgmtv1alpha1.PiiDenyRecognizer, 0) + if raw == nil { + return denyRecognizers, nil + } + denyRecognizersRawArray, ok := raw.([]any) + if !ok { + return nil, fmt.Errorf("deny_recognizers must be an array") + } + for _, recognizer := range denyRecognizersRawArray { + recognizerMap, ok := recognizer.(map[string]any) + if !ok { + return nil, fmt.Errorf("deny_recognizer must be a map") + } + denyRecognizer, err := convertToPiiDenyRecognizer(recognizerMap) + if err != nil { + return nil, fmt.Errorf("invalid deny_recognizer config: %w", err) + } + denyRecognizers = append(denyRecognizers, denyRecognizer) + } + return denyRecognizers, nil +} + +func convertToPiiAnonymizerMap(raw any) (map[string]*mgmtv1alpha1.PiiAnonymizer, error) { + entityAnonymizers := make(map[string]*mgmtv1alpha1.PiiAnonymizer) + if raw == nil { + return entityAnonymizers, nil + } + entityAnonymizersRawMap, ok := raw.(map[string]any) + if !ok { + return nil, fmt.Errorf("entity_anonymizers must be a map") + } + for entity, anonymizer := range entityAnonymizersRawMap { + anonymizerConfig, err := convertToPiiAnonymizer(anonymizer) + if err != nil { + return nil, fmt.Errorf("invalid entity_anonymizer config for entity %s: %w", entity, err) + } + entityAnonymizers[entity] = anonymizerConfig + } + return entityAnonymizers, nil +} + +func convertToPiiAnonymizer(raw any) (*mgmtv1alpha1.PiiAnonymizer, error) { + if raw == nil { + return nil, nil + } + + configMap, ok := raw.(map[string]any) + if !ok { + return nil, fmt.Errorf("anonymizer config must be a map") + } + + anonymizer := &mgmtv1alpha1.PiiAnonymizer{} + + // Check for each possible config type and set accordingly + if replace, ok := configMap["replace"].(map[string]any); ok { + value, ok := replace["value"].(string) + if !ok { + return nil, fmt.Errorf("replace value must be a string") + } + anonymizer.Config = &mgmtv1alpha1.PiiAnonymizer_Replace_{ + Replace: &mgmtv1alpha1.PiiAnonymizer_Replace{ + Value: &value, + }, + } + } else if _, ok := configMap["redact"].(map[string]any); ok { + anonymizer.Config = &mgmtv1alpha1.PiiAnonymizer_Redact_{ + Redact: &mgmtv1alpha1.PiiAnonymizer_Redact{}, + } + } else if mask, ok := configMap["mask"].(map[string]any); ok { + maskConfig := &Mask{} + if char, ok := mask["masking_char"].(string); ok { + maskConfig.MaskingChar = &char + } + if chars, ok := mask["chars_to_mask"].(float64); ok { + intChars := int32(chars) + maskConfig.CharsToMask = &intChars + } + if fromEnd, ok := mask["from_end"].(bool); ok { + maskConfig.FromEnd = &fromEnd + } + anonymizer.Config = &mgmtv1alpha1.PiiAnonymizer_Mask_{ + Mask: &mgmtv1alpha1.PiiAnonymizer_Mask{ + MaskingChar: maskConfig.MaskingChar, + CharsToMask: maskConfig.CharsToMask, + FromEnd: maskConfig.FromEnd, + }, + } + } else if hash, ok := configMap["hash"].(map[string]interface{}); ok { + if algo, ok := hash["algo"].(int32); ok { + convertedAlgo := mgmtv1alpha1.PiiAnonymizer_Hash_HashType(algo) + if _, ok := mgmtv1alpha1.PiiAnonymizer_Hash_HashType_name[int32(convertedAlgo)]; !ok { + return nil, fmt.Errorf("invalid hash algorithm: %d", convertedAlgo) + } + anonymizer.Config = &mgmtv1alpha1.PiiAnonymizer_Hash_{ + Hash: &mgmtv1alpha1.PiiAnonymizer_Hash{ + Algo: &convertedAlgo, + }, + } + } + } else if _, ok := configMap["transform"].(map[string]any); ok { + return nil, fmt.Errorf("transform not currently supported") + } else { + return nil, fmt.Errorf("invalid anonymizer config: must contain one of replace, redact, mask, hash, or transform") + } + + return anonymizer, nil +} + +func convertToPiiDenyRecognizer(raw map[string]any) (*mgmtv1alpha1.PiiDenyRecognizer, error) { + name, ok := raw["name"].(string) + if !ok { + return nil, fmt.Errorf("deny_recognizer must have a name") + } + + denyWordsRaw, ok := raw["deny_words"].([]any) + if !ok { + return nil, fmt.Errorf("deny_recognizer must have deny_words array") + } + + denyWords := make([]string, 0) + for _, word := range denyWordsRaw { + if str, ok := word.(string); ok { + denyWords = append(denyWords, str) + } + } + + return &mgmtv1alpha1.PiiDenyRecognizer{ + Name: name, + DenyWords: denyWords, + }, nil +} From 0cdb5d17befe612f79e5dd0763c0f5b7e694f58f Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Mon, 31 Mar 2025 10:37:48 -0700 Subject: [PATCH 02/20] adds many benthos parse tests --- .../transformers/transform_pii_text.go | 62 ++---- .../transformers/transform_pii_text_test.go | 199 ++++++++++++++++++ 2 files changed, 217 insertions(+), 44 deletions(-) create mode 100644 worker/pkg/benthos/transformers/transform_pii_text_test.go diff --git a/worker/pkg/benthos/transformers/transform_pii_text.go b/worker/pkg/benthos/transformers/transform_pii_text.go index 5b1989f735..dd6b8dd0be 100644 --- a/worker/pkg/benthos/transformers/transform_pii_text.go +++ b/worker/pkg/benthos/transformers/transform_pii_text.go @@ -48,7 +48,7 @@ func RegisterTransformPiiText( ). Param(bloblang.NewAnyParam("deny_recognizers"). Optional(). - Default([]map[string]any{}). + Default([]any{}). Description("Configure deny lists where each word is treated as PII. Each entry should contain 'name' and 'deny_words' fields."), ). Param(bloblang.NewAnyParam("entity_anonymizers"). @@ -66,10 +66,14 @@ func RegisterTransformPiiText( return nil, err } - scoreThreshold, err := args.GetFloat64("score_threshold") + scoreThresholdParam, err := args.GetOptionalFloat64("score_threshold") if err != nil { return nil, err } + scoreThreshold := float32(0.5) + if scoreThresholdParam != nil { + scoreThreshold = float32(*scoreThresholdParam) + } language, err := args.GetOptionalString("language") if err != nil { @@ -183,39 +187,6 @@ func transformPiiText(api TransformPiiTextApi, config *mgmtv1alpha1.TransformPii return &transformedResult, nil } -type PiiAnonymizer struct { - Replace *Replace - Redact *Redact - Mask *Mask - Hash *Hash - Transform *Transform -} - -type Replace struct { - Value *string -} - -type Redact struct{} - -type Mask struct { - MaskingChar *string - CharsToMask *int32 - FromEnd *bool -} - -type Hash struct { - Algo *string -} - -type Transform struct { - Config map[string]interface{} -} - -type PiiDenyRecognizer struct { - Name string - DenyWords []string -} - func convertToPiiDenyRecognizerArray(raw any) ([]*mgmtv1alpha1.PiiDenyRecognizer, error) { denyRecognizers := make([]*mgmtv1alpha1.PiiDenyRecognizer, 0) if raw == nil { @@ -228,7 +199,7 @@ func convertToPiiDenyRecognizerArray(raw any) ([]*mgmtv1alpha1.PiiDenyRecognizer for _, recognizer := range denyRecognizersRawArray { recognizerMap, ok := recognizer.(map[string]any) if !ok { - return nil, fmt.Errorf("deny_recognizer must be a map") + return nil, fmt.Errorf("deny_recognizer must be a map, was: %T", recognizer) } denyRecognizer, err := convertToPiiDenyRecognizer(recognizerMap) if err != nil { @@ -246,7 +217,7 @@ func convertToPiiAnonymizerMap(raw any) (map[string]*mgmtv1alpha1.PiiAnonymizer, } entityAnonymizersRawMap, ok := raw.(map[string]any) if !ok { - return nil, fmt.Errorf("entity_anonymizers must be a map") + return nil, fmt.Errorf("entity_anonymizers must be a map, was: %T", raw) } for entity, anonymizer := range entityAnonymizersRawMap { anonymizerConfig, err := convertToPiiAnonymizer(anonymizer) @@ -272,13 +243,14 @@ func convertToPiiAnonymizer(raw any) (*mgmtv1alpha1.PiiAnonymizer, error) { // Check for each possible config type and set accordingly if replace, ok := configMap["replace"].(map[string]any); ok { - value, ok := replace["value"].(string) - if !ok { - return nil, fmt.Errorf("replace value must be a string") + var value *string + valueParam, ok := replace["value"].(string) + if ok && valueParam != "" { + value = &valueParam } anonymizer.Config = &mgmtv1alpha1.PiiAnonymizer_Replace_{ Replace: &mgmtv1alpha1.PiiAnonymizer_Replace{ - Value: &value, + Value: value, }, } } else if _, ok := configMap["redact"].(map[string]any); ok { @@ -286,7 +258,7 @@ func convertToPiiAnonymizer(raw any) (*mgmtv1alpha1.PiiAnonymizer, error) { Redact: &mgmtv1alpha1.PiiAnonymizer_Redact{}, } } else if mask, ok := configMap["mask"].(map[string]any); ok { - maskConfig := &Mask{} + maskConfig := &mgmtv1alpha1.PiiAnonymizer_Mask{} if char, ok := mask["masking_char"].(string); ok { maskConfig.MaskingChar = &char } @@ -304,8 +276,8 @@ func convertToPiiAnonymizer(raw any) (*mgmtv1alpha1.PiiAnonymizer, error) { FromEnd: maskConfig.FromEnd, }, } - } else if hash, ok := configMap["hash"].(map[string]interface{}); ok { - if algo, ok := hash["algo"].(int32); ok { + } else if hash, ok := configMap["hash"].(map[string]any); ok { + if algo, ok := hash["algo"].(int64); ok { convertedAlgo := mgmtv1alpha1.PiiAnonymizer_Hash_HashType(algo) if _, ok := mgmtv1alpha1.PiiAnonymizer_Hash_HashType_name[int32(convertedAlgo)]; !ok { return nil, fmt.Errorf("invalid hash algorithm: %d", convertedAlgo) @@ -315,6 +287,8 @@ func convertToPiiAnonymizer(raw any) (*mgmtv1alpha1.PiiAnonymizer, error) { Algo: &convertedAlgo, }, } + } else { + return nil, fmt.Errorf("invalid hash algorithm: %T", hash["algo"]) } } else if _, ok := configMap["transform"].(map[string]any); ok { return nil, fmt.Errorf("transform not currently supported") diff --git a/worker/pkg/benthos/transformers/transform_pii_text_test.go b/worker/pkg/benthos/transformers/transform_pii_text_test.go new file mode 100644 index 0000000000..ca62ad69b0 --- /dev/null +++ b/worker/pkg/benthos/transformers/transform_pii_text_test.go @@ -0,0 +1,199 @@ +package transformers + +import ( + "context" + "testing" + + mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" + "github.com/redpanda-data/benthos/v4/public/bloblang" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type mockTransformPiiTextApi struct { + transformFn func(ctx context.Context, config *mgmtv1alpha1.TransformPiiText, value string) (string, error) +} + +func (m *mockTransformPiiTextApi) Transform(ctx context.Context, config *mgmtv1alpha1.TransformPiiText, value string) (string, error) { + return m.transformFn(ctx, config, value) +} + +func TestRegisterTransformPiiText(t *testing.T) { + env := bloblang.NewEnvironment() + mockApi := &mockTransformPiiTextApi{ + transformFn: func(ctx context.Context, config *mgmtv1alpha1.TransformPiiText, value string) (string, error) { + return "anonymized: " + value, nil + }, + } + + err := RegisterTransformPiiText(env, mockApi) + require.NoError(t, err) + + t.Run("basic usage", func(t *testing.T) { + mapping := `transform_pii_text("sensitive data")` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query(nil) + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) + + t.Run("with value param", func(t *testing.T) { + mapping := `transform_pii_text(value: "sensitive data")` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query(nil) + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) + + t.Run("with score_threshold", func(t *testing.T) { + mapping := `transform_pii_text(value: "sensitive data", score_threshold: 0.7)` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query(nil) + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) + + t.Run("with language", func(t *testing.T) { + mapping := `transform_pii_text(value: "sensitive data", language: "en")` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query(nil) + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) + + t.Run("with allowed_phrases", func(t *testing.T) { + mapping := `transform_pii_text(value: "sensitive data", allowed_phrases: ["data"])` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query(nil) + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) + + t.Run("with allowed_entities", func(t *testing.T) { + mapping := `transform_pii_text(value: "sensitive data", allowed_entities: ["PERSON", "EMAIL"])` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query(nil) + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) + + t.Run("with default_anonymizer", func(t *testing.T) { + mapping := `transform_pii_text(value: "sensitive data", default_anonymizer: {"replace": {"value": "REDACTED"}})` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query(nil) + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) + + t.Run("with deny_recognizers", func(t *testing.T) { + mapping := `transform_pii_text(value: "sensitive data", deny_recognizers: [{"name": "test", "deny_words": ["sensitive"]}])` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query(nil) + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) + + t.Run("with entity_anonymizers", func(t *testing.T) { + mapping := `transform_pii_text(value: "sensitive data", entity_anonymizers: {"PERSON": {"replace": {"new_value": "PERSON"}}})` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query(nil) + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) + t.Run("with entity_anonymizers using redact", func(t *testing.T) { + mapping := `transform_pii_text(value: "sensitive data", entity_anonymizers: {"PERSON": {"redact": {}}})` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query(nil) + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) + + t.Run("with entity_anonymizers using mask", func(t *testing.T) { + mapping := `transform_pii_text(value: "sensitive data", entity_anonymizers: {"PERSON": {"mask": {"masking_char": "*", "chars_to_mask": 4, "from_end": true}}})` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query(nil) + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) + + t.Run("with entity_anonymizers using hash", func(t *testing.T) { + mapping := `transform_pii_text(value: "sensitive data", entity_anonymizers: {"PERSON": {"hash": {"algo": 1}}})` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query(nil) + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) + + t.Run("with value from context", func(t *testing.T) { + mapping := `transform_pii_text(value: this)` + exec, err := env.Parse(mapping) + require.NoError(t, err) + + res, err := exec.Query("sensitive data") + require.NoError(t, err) + require.NotNil(t, res) + output, ok := res.(*string) + require.True(t, ok) + assert.Equal(t, "anonymized: sensitive data", *output) + }) +} From a8b650efa1f9633054c897220d0c8d635321113f Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Mon, 31 Mar 2025 11:03:38 -0700 Subject: [PATCH 03/20] adds a ton of plumbing for transform pii text api --- .../gen-javascript-transformer.md | 44 +++++ .../apps/web/@types/neosync-transformers.d.ts | 23 +++ .../javascript/functions/neosync/functions.go | 13 +- .../functions/neosync/functions_test.go | 2 +- internal/javascript/javascript.go | 10 +- worker/pkg/benthos/environment/environment.go | 10 +- worker/pkg/benthos/javascript/processor.go | 18 +- .../pkg/benthos/javascript/processor_test.go | 16 +- .../benthos/transformer_executor/executor.go | 33 +++- .../transform_pii_text_api.go | 40 +++++ .../transformers/gen_transform_pii_text.go | 159 ++++++++++++++++++ .../transformers/transform_pii_text.go | 70 +++++++- 12 files changed, 410 insertions(+), 28 deletions(-) create mode 100644 worker/pkg/benthos/transformer_executor/transform_pii_text_api.go create mode 100644 worker/pkg/benthos/transformers/gen_transform_pii_text.go diff --git a/docs/docs/transformers/gen-javascript-transformer.md b/docs/docs/transformers/gen-javascript-transformer.md index 6d94b6d54b..7f1ad55a6c 100644 --- a/docs/docs/transformers/gen-javascript-transformer.md +++ b/docs/docs/transformers/gen-javascript-transformer.md @@ -357,6 +357,50 @@ const newValue = neosync.transformLastName(value, {
+ + +### transformPiiText + +Anonymizes and transforms freeform text. + +**Parameters** + +**Value** +Type: Any +Description: Value that will be transformed + +**Config** + +| Field | Type | Default | Required | Description | +| -------- | ---- | ------- | -------- | ----------- | +| scoreThreshold | float64 | 0.5 | false | +| language | string | | false | The language of the text to be anonymized. +| allowedPhrases | any | [] | false | A list of phrases that will not be considered PII. +| allowedEntities | any | [] | false | A list of entities to be used for PII analysis. If not provided or empty, all entities are considered. If specified, any ad-hoc, or deny_recognizers entity names must also be provided. To see available builtin entities, cal the GetPiiTextEntities() RPC method for your account. +| defaultAnonymizer | any | | false | The default anonymization configuration used for all instances of detected PII. +| denyRecognizers | any | [] | false | Configure deny lists where each word is treated as PII. Each entry should contain 'name' and 'deny_words' fields. +| entityAnonymizers | any | map[string]any{} | false | A map of entity names to anonymizer configurations. The key corresponds to a recognized entity (e.g. PERSON, PHONE_NUMBER) and the value is the anonymizer configuration.
+ +**Example** + +```javascript + +const newValue = neosync.transformPiiText(value, { + scoreThreshold: 0.5, + language: "", + allowedPhrases: [], + allowedEntities: [], + defaultAnonymizer: "", + denyRecognizers: [], + entityAnonymizers: map[string]any{}, +}); + +``` +
+ + diff --git a/frontend/apps/web/@types/neosync-transformers.d.ts b/frontend/apps/web/@types/neosync-transformers.d.ts index 75ce0af06d..362dc29dfe 100644 --- a/frontend/apps/web/@types/neosync-transformers.d.ts +++ b/frontend/apps/web/@types/neosync-transformers.d.ts @@ -155,6 +155,29 @@ declare namespace neosync { declare function transformLastName(value: any, options: TransformLastNameOptions): any; + export interface TransformPiiTextOptions { + /** */ + scoreThreshold?: number; + /** The language of the text to be anonymized. */ + language?: string; + /** A list of phrases that will not be considered PII. */ + allowedPhrases?: any[]; + /** A list of entities to be used for PII analysis. If not provided or empty, all entities are considered. If specified, any ad-hoc, or deny_recognizers entity names must also be provided. To see available builtin entities, cal the GetPiiTextEntities() RPC method for your account. */ + allowedEntities?: any[]; + /** The default anonymization configuration used for all instances of detected PII. */ + defaultAnonymizer?: any; + /** Configure deny lists where each word is treated as PII. Each entry should contain 'name' and 'deny_words' fields. */ + denyRecognizers?: any[]; + /** A map of entity names to anonymizer configurations. The key corresponds to a recognized entity (e.g. PERSON, PHONE_NUMBER) and the value is the anonymizer configuration. */ + entityAnonymizers?: any; + } + + /** + * Anonymizes and transforms freeform text. + */ + declare function transformPiiText(value: any, options: TransformPiiTextOptions): any; + + export interface TransformStringOptions { /** Whether the original length of the input data should be preserved during transformation. If set to true, the transformation logic will ensure that the output data has the same length as the input data. */ preserveLength?: boolean; diff --git a/internal/javascript/functions/neosync/functions.go b/internal/javascript/functions/neosync/functions.go index 99863cde90..8ae2f8ee89 100644 --- a/internal/javascript/functions/neosync/functions.go +++ b/internal/javascript/functions/neosync/functions.go @@ -16,12 +16,14 @@ const ( namespace = "neosync" ) -func Get() ([]*javascript_functions.FunctionDefinition, error) { +func Get( + transformPiiTextApi transformers.TransformPiiTextApi, +) ([]*javascript_functions.FunctionDefinition, error) { generatorFns, err := getNeosyncGenerators() if err != nil { return nil, err } - transformerFns, err := getNeosyncTransformers() + transformerFns, err := getNeosyncTransformers(transformPiiTextApi) if err != nil { return nil, err } @@ -151,8 +153,13 @@ func getNeosyncGenerators() ([]*javascript_functions.FunctionDefinition, error) return fns, nil } -func getNeosyncTransformers() ([]*javascript_functions.FunctionDefinition, error) { +func getNeosyncTransformers( + transformPiiTextApi transformers.TransformPiiTextApi, +) ([]*javascript_functions.FunctionDefinition, error) { neosyncTransformers := transformers.GetNeosyncTransformers() + if transformPiiTextApi != nil { + neosyncTransformers = append(neosyncTransformers, transformers.NewTransformPiiText(transformPiiTextApi)) + } fns := make([]*javascript_functions.FunctionDefinition, 0, len(neosyncTransformers)) for _, f := range neosyncTransformers { templateData, err := f.GetJsTemplateData() diff --git a/internal/javascript/functions/neosync/functions_test.go b/internal/javascript/functions/neosync/functions_test.go index 44390cd17f..e1f629dc71 100644 --- a/internal/javascript/functions/neosync/functions_test.go +++ b/internal/javascript/functions/neosync/functions_test.go @@ -7,7 +7,7 @@ import ( ) func TestGet(t *testing.T) { - functions, err := Get() + functions, err := Get(nil) require.NoError(t, err) require.NotEmpty(t, functions) } diff --git a/internal/javascript/javascript.go b/internal/javascript/javascript.go index 7caf6d93b8..36192a7b95 100644 --- a/internal/javascript/javascript.go +++ b/internal/javascript/javascript.go @@ -8,14 +8,16 @@ import ( benthos_functions "github.com/nucleuscloud/neosync/internal/javascript/functions/benthos" neosync_functions "github.com/nucleuscloud/neosync/internal/javascript/functions/neosync" javascript_vm "github.com/nucleuscloud/neosync/internal/javascript/vm" + "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers" ) // Comes full featured, but expects a value api that the benthos/neosync functions can manipulate func NewDefaultValueRunner( valueApi javascript_functions.ValueApi, + transformPiiTextApi transformers.TransformPiiTextApi, logger *slog.Logger, ) (*javascript_vm.Runner, error) { - functions, err := getDefaultFunctions() + functions, err := getDefaultFunctions(transformPiiTextApi) if err != nil { return nil, err } @@ -39,9 +41,11 @@ func NewDefaultRunner( ) } -func getDefaultFunctions() ([]*javascript_functions.FunctionDefinition, error) { +func getDefaultFunctions( + transformPiiTextApi transformers.TransformPiiTextApi, +) ([]*javascript_functions.FunctionDefinition, error) { benthosFns := benthos_functions.Get() - neosyncFns, err := neosync_functions.Get() + neosyncFns, err := neosync_functions.Get(transformPiiTextApi) if err != nil { return nil, err } diff --git a/worker/pkg/benthos/environment/environment.go b/worker/pkg/benthos/environment/environment.go index d0c3cd2e05..6f16adebb7 100644 --- a/worker/pkg/benthos/environment/environment.go +++ b/worker/pkg/benthos/environment/environment.go @@ -17,6 +17,7 @@ import ( neosync_benthos_connectiondata "github.com/nucleuscloud/neosync/worker/pkg/benthos/neosync_connection_data" openaigenerate "github.com/nucleuscloud/neosync/worker/pkg/benthos/openai_generate" neosync_benthos_sql "github.com/nucleuscloud/neosync/worker/pkg/benthos/sql" + "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers" "github.com/redpanda-data/benthos/v4/public/bloblang" "github.com/redpanda-data/benthos/v4/public/service" "go.opentelemetry.io/otel/metric" @@ -34,6 +35,8 @@ type RegisterConfig struct { stopChannel chan<- error blobEnv *bloblang.Environment + + transformPiiTextApi transformers.TransformPiiTextApi } type Option func(cfg *RegisterConfig) @@ -69,6 +72,11 @@ func WithBlobEnv(b *bloblang.Environment) Option { cfg.blobEnv = b } } +func WithTransformPiiTextApi(transformPiiTextApi transformers.TransformPiiTextApi) Option { + return func(cfg *RegisterConfig) { + cfg.transformPiiTextApi = transformPiiTextApi + } +} type SqlConfig struct { Provider neosync_benthos_sql.ConnectionProvider @@ -248,7 +256,7 @@ func NewWithEnvironment( ) } - err = javascript_processor.RegisterNeosyncJavascriptProcessor(env) + err = javascript_processor.RegisterNeosyncJavascriptProcessor(env, config.transformPiiTextApi) if err != nil { return nil, fmt.Errorf( "unable to register javascript processor to benthos instance: %w", diff --git a/worker/pkg/benthos/javascript/processor.go b/worker/pkg/benthos/javascript/processor.go index 87364f4cdc..736e204a64 100644 --- a/worker/pkg/benthos/javascript/processor.go +++ b/worker/pkg/benthos/javascript/processor.go @@ -10,6 +10,7 @@ import ( "github.com/nucleuscloud/neosync/internal/benthos_slogger" "github.com/nucleuscloud/neosync/internal/javascript" javascript_vm "github.com/nucleuscloud/neosync/internal/javascript/vm" + "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers" "github.com/redpanda-data/benthos/v4/public/service" ) @@ -23,11 +24,14 @@ func javascriptProcessorConfig() *service.ConfigSpec { Field(service.NewInterpolatedStringField(codeField)) } -func RegisterNeosyncJavascriptProcessor(env *service.Environment) error { +func RegisterNeosyncJavascriptProcessor( + env *service.Environment, + transformPiiTextApi transformers.TransformPiiTextApi, +) error { return env.RegisterBatchProcessor( "neosync_javascript", javascriptProcessorConfig(), func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchProcessor, error) { - return newJavascriptProcessorFromConfig(conf, mgr) + return newJavascriptProcessorFromConfig(conf, mgr, transformPiiTextApi) }) } @@ -40,6 +44,7 @@ type javascriptProcessor struct { func newJavascriptProcessorFromConfig( conf *service.ParsedConfig, mgr *service.Resources, + transformPiiTextApi transformers.TransformPiiTextApi, ) (*javascriptProcessor, error) { code, err := conf.FieldString(codeField) if err != nil { @@ -60,7 +65,7 @@ func newJavascriptProcessorFromConfig( slogger: slogger, vmPool: sync.Pool{ New: func() any { - val, err := newPoolItem(slogger) + val, err := newPoolItem(slogger, transformPiiTextApi) if err != nil { return err } @@ -133,9 +138,12 @@ func (j *javascriptProcessor) Close(ctx context.Context) error { return nil } -func newPoolItem(logger *slog.Logger) (*vmPoolItem, error) { +func newPoolItem( + logger *slog.Logger, + transformPiiTextApi transformers.TransformPiiTextApi, +) (*vmPoolItem, error) { valueApi := newBatchBenthosValueApi() - runner, err := javascript.NewDefaultValueRunner(valueApi, logger) + runner, err := javascript.NewDefaultValueRunner(valueApi, transformPiiTextApi, logger) if err != nil { return nil, err } diff --git a/worker/pkg/benthos/javascript/processor_test.go b/worker/pkg/benthos/javascript/processor_test.go index 92d90f85b5..08275767ea 100644 --- a/worker/pkg/benthos/javascript/processor_test.go +++ b/worker/pkg/benthos/javascript/processor_test.go @@ -26,7 +26,7 @@ code: | `, nil) require.NoError(t, err) - proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources()) + proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources(), nil) require.NoError(t, err) bCtx, done := context.WithTimeout(context.Background(), time.Second*30) @@ -57,7 +57,7 @@ code: 'benthos.v0_msg_set_string(benthos.v0_msg_as_string() + "hello world");' `, nil) require.NoError(t, err) - proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources()) + proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources(), nil) require.NoError(t, err) bCtx, done := context.WithTimeout(context.Background(), time.Second*30) @@ -94,7 +94,7 @@ code: | `, nil) require.NoError(t, err) - proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources()) + proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources(), nil) require.NoError(t, err) bCtx, done := context.WithTimeout(context.Background(), time.Second*30) @@ -144,7 +144,7 @@ code: | `, nil) require.NoError(t, err) - proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources()) + proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources(), nil) require.NoError(t, err) bCtx, done := context.WithTimeout(context.Background(), time.Second*30) @@ -178,7 +178,7 @@ code: | `, nil) require.NoError(t, err) - proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources()) + proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources(), nil) require.NoError(t, err) bCtx, done := context.WithTimeout(context.Background(), time.Second*30) @@ -221,7 +221,7 @@ code: | `, nil) require.NoError(t, err) - proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources()) + proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources(), nil) require.NoError(t, err) bCtx, done := context.WithTimeout(context.Background(), time.Second*30) @@ -272,7 +272,7 @@ code: | `, testServer.URL), nil) require.NoError(t, err) - proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources()) + proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources(), nil) require.NoError(t, err) bCtx, done := context.WithTimeout(context.Background(), time.Second*30) @@ -307,7 +307,7 @@ code: | `), nil) require.NoError(t, err) - proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources()) + proc, err := newJavascriptProcessorFromConfig(conf, service.MockResources(), nil) require.NoError(t, err) bCtx, done := context.WithTimeout(context.Background(), time.Second*30) diff --git a/worker/pkg/benthos/transformer_executor/executor.go b/worker/pkg/benthos/transformer_executor/executor.go index 1203ca6f2d..d0eca49460 100644 --- a/worker/pkg/benthos/transformer_executor/executor.go +++ b/worker/pkg/benthos/transformer_executor/executor.go @@ -112,7 +112,15 @@ func InitializeTransformerByConfigType( } valueApi := newAnonValueApi() - runner, err := javascript.NewDefaultValueRunner(valueApi, execCfg.logger) + transformPiiTextApi, err := newFromExecConfig( + execCfg.transformPiiText, + execCfg.transformPiiText.neosyncOperatorApi, + execCfg.logger, + ) + if err != nil { + return nil, err + } + runner, err := javascript.NewDefaultValueRunner(valueApi, transformPiiTextApi, execCfg.logger) if err != nil { return nil, err } @@ -148,7 +156,15 @@ func InitializeTransformerByConfigType( } valueApi := newAnonValueApi() - runner, err := javascript.NewDefaultValueRunner(valueApi, execCfg.logger) + transformPiiTextApi, err := newFromExecConfig( + execCfg.transformPiiText, + execCfg.transformPiiText.neosyncOperatorApi, + execCfg.logger, + ) + if err != nil { + return nil, err + } + runner, err := javascript.NewDefaultValueRunner(valueApi, transformPiiTextApi, execCfg.logger) if err != nil { return nil, err } @@ -731,6 +747,15 @@ func InitializeTransformerByConfigType( config.Language = execCfg.transformPiiText.defaultLanguage } + transformPiiTextApi, err := newFromExecConfig( + execCfg.transformPiiText, + execCfg.transformPiiText.neosyncOperatorApi, + execCfg.logger, + ) + if err != nil { + return nil, err + } + return &TransformerExecutor{ Opts: nil, Mutate: func(value, opts any) (any, error) { @@ -738,12 +763,10 @@ func InitializeTransformerByConfigType( if !ok { return nil, fmt.Errorf("expected value to be of type string. %T", value) } - return ee_transformer_fns.TransformPiiText( + return transformPiiTextApi.Transform( context.Background(), - execCfg.transformPiiText.analyze, execCfg.transformPiiText.anonymize, execCfg.transformPiiText.neosyncOperatorApi, config, valueStr, - execCfg.logger, ) }, }, nil diff --git a/worker/pkg/benthos/transformer_executor/transform_pii_text_api.go b/worker/pkg/benthos/transformer_executor/transform_pii_text_api.go new file mode 100644 index 0000000000..c50b56af9a --- /dev/null +++ b/worker/pkg/benthos/transformer_executor/transform_pii_text_api.go @@ -0,0 +1,40 @@ +package transformer_executor + +import ( + "context" + "log/slog" + + mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" + ee_transformer_fns "github.com/nucleuscloud/neosync/internal/ee/transformers/functions" + "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers" +) + +type piiTextApi struct { + execConfig *transformPiiTextConfig + neosyncOperatorApi ee_transformer_fns.NeosyncOperatorApi + logger *slog.Logger +} + +func newFromExecConfig( + execConfig *transformPiiTextConfig, + neosyncOperatorApi ee_transformer_fns.NeosyncOperatorApi, + logger *slog.Logger, +) (transformers.TransformPiiTextApi, error) { + return &piiTextApi{ + execConfig: execConfig, + neosyncOperatorApi: neosyncOperatorApi, + logger: logger, + }, nil +} + +func (p *piiTextApi) Transform(ctx context.Context, config *mgmtv1alpha1.TransformPiiText, value string) (string, error) { + return ee_transformer_fns.TransformPiiText( + ctx, + p.execConfig.analyze, + p.execConfig.anonymize, + p.neosyncOperatorApi, + config, + value, + p.logger, + ) +} diff --git a/worker/pkg/benthos/transformers/gen_transform_pii_text.go b/worker/pkg/benthos/transformers/gen_transform_pii_text.go new file mode 100644 index 0000000000..dd1085a6b3 --- /dev/null +++ b/worker/pkg/benthos/transformers/gen_transform_pii_text.go @@ -0,0 +1,159 @@ + +// Code generated by Neosync neosync_transformer_generator.go. DO NOT EDIT. +// source: transform_pii_text.go + +package transformers + +import ( + "strings" + "fmt" +) + +type TransformPiiText struct{ + api TransformPiiTextApi +} + +type TransformPiiTextOpts struct { + scoreThreshold *float64 + language *string + allowedPhrases any + allowedEntities any + defaultAnonymizer any + denyRecognizers any + entityAnonymizers any +} + +func NewTransformPiiText( + api TransformPiiTextApi, +) *TransformPiiText { + return &TransformPiiText{ + api: api, + } +} + +func NewTransformPiiTextOpts( + scoreThresholdArg *float64, + language *string, + allowedPhrasesArg any, + allowedEntitiesArg any, + defaultAnonymizerArg any, + denyRecognizersArg any, + entityAnonymizersArg any, +) (*TransformPiiTextOpts, error) { + scoreThreshold := float64(0.5) + if scoreThresholdArg != nil { + scoreThreshold = *scoreThresholdArg + } + + + return &TransformPiiTextOpts{ + scoreThreshold: &scoreThreshold, + language: language, + allowedPhrases: allowedPhrasesArg, + allowedEntities: allowedEntitiesArg, + defaultAnonymizer: defaultAnonymizerArg, + denyRecognizers: denyRecognizersArg, + entityAnonymizers: entityAnonymizersArg, + }, nil +} + +func (o *TransformPiiTextOpts) BuildBloblangString( + valuePath string, +) string { + fnStr := []string{ + "value:this.%s", + } + + params := []any{ + valuePath, + } + + + if o.scoreThreshold != nil { + fnStr = append(fnStr, "score_threshold:%v") + params = append(params, *o.scoreThreshold) + } + if o.language != nil { + fnStr = append(fnStr, "language:%q") + params = append(params, *o.language) + } + if o.allowedPhrases != nil { + fnStr = append(fnStr, "allowed_phrases:%v") + params = append(params, o.allowedPhrases) + } + if o.allowedEntities != nil { + fnStr = append(fnStr, "allowed_entities:%v") + params = append(params, o.allowedEntities) + } + if o.defaultAnonymizer != nil { + fnStr = append(fnStr, "default_anonymizer:%v") + params = append(params, o.defaultAnonymizer) + } + if o.denyRecognizers != nil { + fnStr = append(fnStr, "deny_recognizers:%v") + params = append(params, o.denyRecognizers) + } + if o.entityAnonymizers != nil { + fnStr = append(fnStr, "entity_anonymizers:%v") + params = append(params, o.entityAnonymizers) + } + + template := fmt.Sprintf("transform_pii_text(%s)", strings.Join(fnStr, ",")) + return fmt.Sprintf(template, params...) +} + +func (t *TransformPiiText) GetJsTemplateData() (*TemplateData, error) { + return &TemplateData{ + Name: "transformPiiText", + Description: "Anonymizes and transforms freeform text.", + Example: "", + }, nil +} + +func (t *TransformPiiText) ParseOptions(opts map[string]any) (any, error) { + transformerOpts := &TransformPiiTextOpts{} + + scoreThreshold, ok := opts["scoreThreshold"].(float64) + if !ok { + scoreThreshold = 0.5 + } + transformerOpts.scoreThreshold = &scoreThreshold + + var language *string + if arg, ok := opts["language"].(string); ok { + language = &arg + } + transformerOpts.language = language + + allowedPhrases, ok := opts["allowedPhrases"].(any) + if !ok { + allowedPhrases = []any{} + } + transformerOpts.allowedPhrases = allowedPhrases + + allowedEntities, ok := opts["allowedEntities"].(any) + if !ok { + allowedEntities = []any{} + } + transformerOpts.allowedEntities = allowedEntities + + var defaultAnonymizer any + if arg, ok := opts["defaultAnonymizer"].(any); ok { + defaultAnonymizer = arg + } + transformerOpts.defaultAnonymizer = defaultAnonymizer + + denyRecognizers, ok := opts["denyRecognizers"].(any) + if !ok { + denyRecognizers = []any{} + } + transformerOpts.denyRecognizers = denyRecognizers + + entityAnonymizers, ok := opts["entityAnonymizers"].(any) + if !ok { + entityAnonymizers = map[string]any{} + } + transformerOpts.entityAnonymizers = entityAnonymizers + + return transformerOpts, nil +} diff --git a/worker/pkg/benthos/transformers/transform_pii_text.go b/worker/pkg/benthos/transformers/transform_pii_text.go index dd6b8dd0be..d048ba9a66 100644 --- a/worker/pkg/benthos/transformers/transform_pii_text.go +++ b/worker/pkg/benthos/transformers/transform_pii_text.go @@ -2,6 +2,7 @@ package transformers import ( context "context" + "errors" "fmt" "reflect" @@ -13,8 +14,6 @@ type TransformPiiTextApi interface { Transform(ctx context.Context, config *mgmtv1alpha1.TransformPiiText, value string) (string, error) } -// +neosyncTransformerBuilder:transform:transformPiiText - func RegisterTransformPiiText( env *bloblang.Environment, api TransformPiiTextApi, @@ -156,6 +155,73 @@ func RegisterTransformPiiText( return nil } +func NewTransformPiiTextOptsFromConfig( + config *mgmtv1alpha1.TransformPiiText, +) (*TransformPiiTextOpts, error) { + if config == nil { + return nil, fmt.Errorf("config is nil") + } + scoreThreshold := float64(config.ScoreThreshold) + return NewTransformPiiTextOpts( + &scoreThreshold, + config.Language, + config.AllowedPhrases, + config.AllowedEntities, + config.DefaultAnonymizer, + config.DenyRecognizers, + config.EntityAnonymizers, + ) +} + +func (t *TransformPiiText) Transform(value, opts any) (any, error) { + parsedOpts, ok := opts.(*TransformPiiTextOpts) + if !ok { + return nil, fmt.Errorf("invalid parsed opts: %T", opts) + } + + valueStr, ok := value.(string) + if !ok { + return nil, errors.New("value is not a string") + } + + allowedPhrases, err := fromAnyToStringSlice(parsedOpts.allowedPhrases) + if err != nil { + return nil, fmt.Errorf("invalid allowed_phrases: %w", err) + } + + allowedEntities, err := fromAnyToStringSlice(parsedOpts.allowedEntities) + if err != nil { + return nil, fmt.Errorf("invalid allowed_entities: %w", err) + } + + defaultAnonymizer, err := convertToPiiAnonymizer(parsedOpts.defaultAnonymizer) + if err != nil { + return nil, fmt.Errorf("invalid default_anonymizer: %w", err) + } + + denyRecognizers, err := convertToPiiDenyRecognizerArray(parsedOpts.denyRecognizers) + if err != nil { + return nil, fmt.Errorf("invalid deny_recognizers: %w", err) + } + + entityAnonymizers, err := convertToPiiAnonymizerMap(parsedOpts.entityAnonymizers) + if err != nil { + return nil, fmt.Errorf("invalid entity_anonymizers: %w", err) + } + + config := &mgmtv1alpha1.TransformPiiText{ + ScoreThreshold: float32(*parsedOpts.scoreThreshold), + Language: parsedOpts.language, + AllowedPhrases: allowedPhrases, + AllowedEntities: allowedEntities, + DefaultAnonymizer: defaultAnonymizer, + DenyRecognizers: denyRecognizers, + EntityAnonymizers: entityAnonymizers, + } + + return transformPiiText(t.api, config, &valueStr) +} + func transformPiiText(api TransformPiiTextApi, config *mgmtv1alpha1.TransformPiiText, value any) (*string, error) { if value == nil { return nil, nil From 9c50ccc8aa64afea798fa58b08a2d866ce3e5955 Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Mon, 31 Mar 2025 11:23:38 -0700 Subject: [PATCH 04/20] wires up transform pii text in sync activity --- .../pkg/workflows/tablesync/activities/sync/activity.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/worker/pkg/workflows/tablesync/activities/sync/activity.go b/worker/pkg/workflows/tablesync/activities/sync/activity.go index 97b14fbbab..d4dd7132ed 100644 --- a/worker/pkg/workflows/tablesync/activities/sync/activity.go +++ b/worker/pkg/workflows/tablesync/activities/sync/activity.go @@ -45,6 +45,7 @@ type Activity struct { meter metric.Meter // optional benthosStreamManager benthosstream.BenthosStreamManagerClient temporalclient temporalclient.Client + transformPiiTextApi transformers.TransformPiiTextApi } func New( @@ -55,6 +56,7 @@ func New( meter metric.Meter, benthosStreamManager benthosstream.BenthosStreamManagerClient, temporalclient temporalclient.Client, + transformPiiTextApi transformers.TransformPiiTextApi, ) *Activity { return &Activity{ connclient: connclient, @@ -64,6 +66,7 @@ func New( meter: meter, benthosStreamManager: benthosStreamManager, temporalclient: temporalclient, + transformPiiTextApi: transformPiiTextApi, } } @@ -405,6 +408,10 @@ func (a *Activity) getBenthosEnvironment( if err != nil { return nil, fmt.Errorf("unable to register identity scramble transformer: %w", err) } + err = transformers.RegisterTransformPiiText(blobEnv, a.transformPiiTextApi) + if err != nil { + return nil, fmt.Errorf("unable to register pii text transformer: %w", err) + } benenv, err := benthos_environment.NewEnvironment( logger, benthos_environment.WithMeter(a.meter), @@ -429,6 +436,7 @@ func (a *Activity) getBenthosEnvironment( }), benthos_environment.WithStopChannel(stopActivityChan), benthos_environment.WithBlobEnv(blobEnv), + benthos_environment.WithTransformPiiTextApi(a.transformPiiTextApi), ) if err != nil { return nil, fmt.Errorf("unable to instantiate benthos environment: %w", err) From c5496a1aad2c1d1a38a1ece5a2cc2add72fae055 Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Mon, 31 Mar 2025 11:43:32 -0700 Subject: [PATCH 05/20] removes go generate --- .../gen-javascript-transformer.md | 44 ------------------- .../apps/web/@types/neosync-transformers.d.ts | 23 ---------- 2 files changed, 67 deletions(-) diff --git a/docs/docs/transformers/gen-javascript-transformer.md b/docs/docs/transformers/gen-javascript-transformer.md index 7f1ad55a6c..6d94b6d54b 100644 --- a/docs/docs/transformers/gen-javascript-transformer.md +++ b/docs/docs/transformers/gen-javascript-transformer.md @@ -357,50 +357,6 @@ const newValue = neosync.transformLastName(value, {
- - -### transformPiiText - -Anonymizes and transforms freeform text. - -**Parameters** - -**Value** -Type: Any -Description: Value that will be transformed - -**Config** - -| Field | Type | Default | Required | Description | -| -------- | ---- | ------- | -------- | ----------- | -| scoreThreshold | float64 | 0.5 | false | -| language | string | | false | The language of the text to be anonymized. -| allowedPhrases | any | [] | false | A list of phrases that will not be considered PII. -| allowedEntities | any | [] | false | A list of entities to be used for PII analysis. If not provided or empty, all entities are considered. If specified, any ad-hoc, or deny_recognizers entity names must also be provided. To see available builtin entities, cal the GetPiiTextEntities() RPC method for your account. -| defaultAnonymizer | any | | false | The default anonymization configuration used for all instances of detected PII. -| denyRecognizers | any | [] | false | Configure deny lists where each word is treated as PII. Each entry should contain 'name' and 'deny_words' fields. -| entityAnonymizers | any | map[string]any{} | false | A map of entity names to anonymizer configurations. The key corresponds to a recognized entity (e.g. PERSON, PHONE_NUMBER) and the value is the anonymizer configuration.
- -**Example** - -```javascript - -const newValue = neosync.transformPiiText(value, { - scoreThreshold: 0.5, - language: "", - allowedPhrases: [], - allowedEntities: [], - defaultAnonymizer: "", - denyRecognizers: [], - entityAnonymizers: map[string]any{}, -}); - -``` -
- - diff --git a/frontend/apps/web/@types/neosync-transformers.d.ts b/frontend/apps/web/@types/neosync-transformers.d.ts index 362dc29dfe..75ce0af06d 100644 --- a/frontend/apps/web/@types/neosync-transformers.d.ts +++ b/frontend/apps/web/@types/neosync-transformers.d.ts @@ -155,29 +155,6 @@ declare namespace neosync { declare function transformLastName(value: any, options: TransformLastNameOptions): any; - export interface TransformPiiTextOptions { - /** */ - scoreThreshold?: number; - /** The language of the text to be anonymized. */ - language?: string; - /** A list of phrases that will not be considered PII. */ - allowedPhrases?: any[]; - /** A list of entities to be used for PII analysis. If not provided or empty, all entities are considered. If specified, any ad-hoc, or deny_recognizers entity names must also be provided. To see available builtin entities, cal the GetPiiTextEntities() RPC method for your account. */ - allowedEntities?: any[]; - /** The default anonymization configuration used for all instances of detected PII. */ - defaultAnonymizer?: any; - /** Configure deny lists where each word is treated as PII. Each entry should contain 'name' and 'deny_words' fields. */ - denyRecognizers?: any[]; - /** A map of entity names to anonymizer configurations. The key corresponds to a recognized entity (e.g. PERSON, PHONE_NUMBER) and the value is the anonymizer configuration. */ - entityAnonymizers?: any; - } - - /** - * Anonymizes and transforms freeform text. - */ - declare function transformPiiText(value: any, options: TransformPiiTextOptions): any; - - export interface TransformStringOptions { /** Whether the original length of the input data should be preserved during transformation. If set to true, the transformation logic will ensure that the output data has the same length as the input data. */ preserveLength?: boolean; From 15d083c53858a34454a726035b3af8c5ccdfd743 Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Mon, 31 Mar 2025 14:20:02 -0700 Subject: [PATCH 06/20] fixes plumbing, wires up model --- backend/internal/dtomaps/jobs.go | 13 +- backend/internal/dtomaps/transformers.go | 6 +- backend/sql/postgresql/models/models.go | 72 +++++++--- backend/sql/postgresql/models/transformers.go | 127 +++++++++++------- .../worker/workflow/datasync-workflow.go | 2 + worker/internal/cmds/worker/serve/serve.go | 6 + .../transformers/transform_pii_text.go | 69 ++++++++++ .../tablesync/activities/sync/activity.go | 22 ++- .../activities/sync/activity_test.go | 14 +- .../tablesync/workflow/register/register.go | 2 + 10 files changed, 245 insertions(+), 88 deletions(-) diff --git a/backend/internal/dtomaps/jobs.go b/backend/internal/dtomaps/jobs.go index 92a4cb4446..d9ee9fe541 100644 --- a/backend/internal/dtomaps/jobs.go +++ b/backend/internal/dtomaps/jobs.go @@ -17,7 +17,11 @@ func ToJobDto( ) (*mgmtv1alpha1.Job, error) { mappings := []*mgmtv1alpha1.JobMapping{} for _, mapping := range inputJob.Mappings { - mappings = append(mappings, mapping.ToDto()) + dto, err := mapping.ToDto() + if err != nil { + return nil, fmt.Errorf("unable to convert job mapping to dto: %w", err) + } + mappings = append(mappings, dto) } virtualForeignKeys := []*mgmtv1alpha1.VirtualForeignConstraint{} @@ -50,6 +54,11 @@ func ToJobDto( } } + sourceOptions, err := inputJob.ConnectionOptions.ToDto() + if err != nil { + return nil, fmt.Errorf("unable to convert job source options to dto: %w", err) + } + return &mgmtv1alpha1.Job{ Id: neosyncdb.UUIDString(inputJob.ID), Name: inputJob.Name, @@ -61,7 +70,7 @@ func ToJobDto( Mappings: mappings, VirtualForeignKeys: virtualForeignKeys, Source: &mgmtv1alpha1.JobSource{ - Options: inputJob.ConnectionOptions.ToDto(), + Options: sourceOptions, }, Destinations: destinations, AccountId: neosyncdb.UUIDString(inputJob.AccountID), diff --git a/backend/internal/dtomaps/transformers.go b/backend/internal/dtomaps/transformers.go index 94ec2a4a8d..516c5c2d0c 100644 --- a/backend/internal/dtomaps/transformers.go +++ b/backend/internal/dtomaps/transformers.go @@ -24,6 +24,10 @@ func ToUserDefinedTransformerDto( input.Source, ) } + dto, err := input.TransformerConfig.ToTransformerConfigDto() + if err != nil { + return nil, err + } return &mgmtv1alpha1.UserDefinedTransformer{ Id: neosyncdb.UUIDString(input.ID), Name: input.Name, @@ -31,7 +35,7 @@ func ToUserDefinedTransformerDto( Source: source, DataType: transformer.DataType, //nolint:staticcheck DataTypes: transformer.DataTypes, - Config: input.TransformerConfig.ToTransformerConfigDto(), + Config: dto, CreatedAt: timestamppb.New(input.CreatedAt.Time), UpdatedAt: timestamppb.New(input.UpdatedAt.Time), AccountId: neosyncdb.UUIDString(input.AccountID), diff --git a/backend/sql/postgresql/models/models.go b/backend/sql/postgresql/models/models.go index 2297b47fb6..e551a8b3bc 100644 --- a/backend/sql/postgresql/models/models.go +++ b/backend/sql/postgresql/models/models.go @@ -857,13 +857,17 @@ type JobMapping struct { JobMappingTransformer *JobMappingTransformerModel `json:"jobMappingTransformerModel,omitempty"` } -func (jm *JobMapping) ToDto() *mgmtv1alpha1.JobMapping { +func (jm *JobMapping) ToDto() (*mgmtv1alpha1.JobMapping, error) { + transformer, err := jm.JobMappingTransformer.ToTransformerDto() + if err != nil { + return nil, err + } return &mgmtv1alpha1.JobMapping{ Schema: jm.Schema, Table: jm.Table, Column: jm.Column, - Transformer: jm.JobMappingTransformer.ToTransformerDto(), - } + Transformer: transformer, + }, nil } func (jm *JobMapping) FromDto(dto *mgmtv1alpha1.JobMapping) error { @@ -1097,13 +1101,29 @@ type DynamoDBSourceUnmappedTransformConfig struct { S *JobMappingTransformerModel `json:"s"` } -func (s *DynamoDBSourceUnmappedTransformConfig) ToDto() *mgmtv1alpha1.DynamoDBSourceUnmappedTransformConfig { - return &mgmtv1alpha1.DynamoDBSourceUnmappedTransformConfig{ - B: s.B.ToTransformerDto(), - Boolean: s.Boolean.ToTransformerDto(), - N: s.N.ToTransformerDto(), - S: s.S.ToTransformerDto(), +func (s *DynamoDBSourceUnmappedTransformConfig) ToDto() (*mgmtv1alpha1.DynamoDBSourceUnmappedTransformConfig, error) { + b, err := s.B.ToTransformerDto() + if err != nil { + return nil, err + } + boolean, err := s.Boolean.ToTransformerDto() + if err != nil { + return nil, err + } + n, err := s.N.ToTransformerDto() + if err != nil { + return nil, err + } + str, err := s.S.ToTransformerDto() + if err != nil { + return nil, err } + return &mgmtv1alpha1.DynamoDBSourceUnmappedTransformConfig{ + B: b, + Boolean: boolean, + N: n, + S: str, + }, nil } func (s *DynamoDBSourceUnmappedTransformConfig) FromDto( @@ -1156,7 +1176,7 @@ func (s *DynamoDBSourceTableOption) FromDto(dto *mgmtv1alpha1.DynamoDBSourceTabl s.WhereClause = dto.WhereClause } -func (s *DynamoDBSourceOptions) ToDto() *mgmtv1alpha1.DynamoDBSourceConnectionOptions { +func (s *DynamoDBSourceOptions) ToDto() (*mgmtv1alpha1.DynamoDBSourceConnectionOptions, error) { tables := make([]*mgmtv1alpha1.DynamoDBSourceTableOption, len(s.Tables)) for i, t := range s.Tables { tables[i] = t.ToDto() @@ -1185,12 +1205,16 @@ func (s *DynamoDBSourceOptions) ToDto() *mgmtv1alpha1.DynamoDBSourceConnectionOp }, } } + unmappedTransforms, err := s.UnmappedTransforms.ToDto() + if err != nil { + return nil, err + } return &mgmtv1alpha1.DynamoDBSourceConnectionOptions{ ConnectionId: s.ConnectionId, Tables: tables, - UnmappedTransforms: s.UnmappedTransforms.ToDto(), + UnmappedTransforms: unmappedTransforms, EnableConsistentRead: s.EnableConsistentRead, - } + }, nil } func (s *DynamoDBSourceOptions) FromDto(dto *mgmtv1alpha1.DynamoDBSourceConnectionOptions) error { @@ -1731,57 +1755,61 @@ type MysqlSourceTableOption struct { WhereClause *string `json:"whereClause,omitempty"` } -func (j *JobSourceOptions) ToDto() *mgmtv1alpha1.JobSourceOptions { +func (j *JobSourceOptions) ToDto() (*mgmtv1alpha1.JobSourceOptions, error) { if j.PostgresOptions != nil { return &mgmtv1alpha1.JobSourceOptions{ Config: &mgmtv1alpha1.JobSourceOptions_Postgres{ Postgres: j.PostgresOptions.ToDto(), }, - } + }, nil } if j.MysqlOptions != nil { return &mgmtv1alpha1.JobSourceOptions{ Config: &mgmtv1alpha1.JobSourceOptions_Mysql{ Mysql: j.MysqlOptions.ToDto(), }, - } + }, nil } if j.GenerateOptions != nil { return &mgmtv1alpha1.JobSourceOptions{ Config: &mgmtv1alpha1.JobSourceOptions_Generate{ Generate: j.GenerateOptions.ToDto(), }, - } + }, nil } if j.AiGenerateOptions != nil { return &mgmtv1alpha1.JobSourceOptions{ Config: &mgmtv1alpha1.JobSourceOptions_AiGenerate{ AiGenerate: j.AiGenerateOptions.ToDto(), }, - } + }, nil } if j.MongoDbOptions != nil { return &mgmtv1alpha1.JobSourceOptions{ Config: &mgmtv1alpha1.JobSourceOptions_Mongodb{ Mongodb: j.MongoDbOptions.ToDto(), }, - } + }, nil } if j.DynamoDBOptions != nil { + dto, err := j.DynamoDBOptions.ToDto() + if err != nil { + return nil, err + } return &mgmtv1alpha1.JobSourceOptions{ Config: &mgmtv1alpha1.JobSourceOptions_Dynamodb{ - Dynamodb: j.DynamoDBOptions.ToDto(), + Dynamodb: dto, }, - } + }, nil } if j.MssqlOptions != nil { return &mgmtv1alpha1.JobSourceOptions{ Config: &mgmtv1alpha1.JobSourceOptions_Mssql{ Mssql: j.MssqlOptions.ToDto(), }, - } + }, nil } - return nil + return nil, fmt.Errorf("invalid job source options config, received type: %T", j) } func (j *JobSourceOptions) FromDto(dto *mgmtv1alpha1.JobSourceOptions) error { diff --git a/backend/sql/postgresql/models/transformers.go b/backend/sql/postgresql/models/transformers.go index f88c010f6d..f66237e394 100644 --- a/backend/sql/postgresql/models/transformers.go +++ b/backend/sql/postgresql/models/transformers.go @@ -1,6 +1,9 @@ package pg_models import ( + "encoding/json" + "fmt" + mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" ) @@ -56,6 +59,7 @@ type TransformerConfig struct { GenerateIpAddress *GenerateIpAddressConfig `json:"generateIpAddressConfig,omitempty"` TransformUuid *TransformUuidConfig `json:"transformUuid,omitempty"` TransformScrambleIdentity *TransformScrambleIdentityConfig `json:"transformScrambleIdentity,omitempty"` + TransformPiiText []byte `json:"transformPiiText,omitempty"` } type TransformScrambleIdentityConfig struct{} @@ -400,6 +404,12 @@ func (t *TransformerConfig) FromTransformerConfigDto(tr *mgmtv1alpha1.Transforme t.TransformUuid = &TransformUuidConfig{} case *mgmtv1alpha1.TransformerConfig_TransformScrambleIdentityConfig: t.TransformScrambleIdentity = &TransformScrambleIdentityConfig{} + case *mgmtv1alpha1.TransformerConfig_TransformPiiTextConfig: + bits, err := json.Marshal(tr.GetTransformPiiTextConfig()) + if err != nil { + return fmt.Errorf("unable to marshal transform pii text config: %w", err) + } + t.TransformPiiText = bits default: t = &TransformerConfig{} } @@ -407,16 +417,20 @@ func (t *TransformerConfig) FromTransformerConfigDto(tr *mgmtv1alpha1.Transforme return nil } -func (t *JobMappingTransformerModel) ToTransformerDto() *mgmtv1alpha1.JobMappingTransformer { +func (t *JobMappingTransformerModel) ToTransformerDto() (*mgmtv1alpha1.JobMappingTransformer, error) { if t.Config == nil { t.Config = &TransformerConfig{} } - return &mgmtv1alpha1.JobMappingTransformer{ - Config: t.Config.ToTransformerConfigDto(), + cfg, err := t.Config.ToTransformerConfigDto() + if err != nil { + return nil, fmt.Errorf("unable to convert transformer config to dto: %w", err) } + return &mgmtv1alpha1.JobMappingTransformer{ + Config: cfg, + }, nil } -func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerConfig { +func (t *TransformerConfig) ToTransformerConfigDto() (*mgmtv1alpha1.TransformerConfig, error) { switch { case t.GenerateEmail != nil: return &mgmtv1alpha1.TransformerConfig{ @@ -425,7 +439,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo EmailType: (*mgmtv1alpha1.GenerateEmailType)(t.GenerateEmail.EmailType), }, }, - } + }, nil case t.TransformEmail != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformEmailConfig{ @@ -441,13 +455,13 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo ), }, }, - } + }, nil case t.GenerateBool != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateBoolConfig{ GenerateBoolConfig: &mgmtv1alpha1.GenerateBool{}, }, - } + }, nil case t.GenerateCardNumber != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateCardNumberConfig{ @@ -455,17 +469,17 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo ValidLuhn: t.GenerateCardNumber.ValidLuhn, }, }, - } + }, nil case t.GenerateCity != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateCityConfig{ GenerateCityConfig: &mgmtv1alpha1.GenerateCity{}, }, - } + }, nil case t.GenerateDefault != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateDefaultConfig{}, - } + }, nil case t.GenerateE164PhoneNumber != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateE164PhoneNumberConfig{ @@ -474,13 +488,13 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo Max: t.GenerateE164PhoneNumber.Max, }, }, - } + }, nil case t.GenerateFirstName != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateFirstNameConfig{ GenerateFirstNameConfig: &mgmtv1alpha1.GenerateFirstName{}, }, - } + }, nil case t.GenerateFloat64 != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateFloat64Config{ @@ -491,19 +505,19 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo Precision: t.GenerateFloat64.Precision, }, }, - } + }, nil case t.GenerateFullAddress != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateFullAddressConfig{ GenerateFullAddressConfig: &mgmtv1alpha1.GenerateFullAddress{}, }, - } + }, nil case t.GenerateFullName != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateFullNameConfig{ GenerateFullNameConfig: &mgmtv1alpha1.GenerateFullName{}, }, - } + }, nil case t.GenerateGender != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateGenderConfig{ @@ -511,13 +525,13 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo Abbreviate: t.GenerateGender.Abbreviate, }, }, - } + }, nil case t.GenerateInt64PhoneNumber != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateInt64PhoneNumberConfig{ GenerateInt64PhoneNumberConfig: &mgmtv1alpha1.GenerateInt64PhoneNumber{}, }, - } + }, nil case t.GenerateInt64 != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateInt64Config{ @@ -527,25 +541,25 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo Max: t.GenerateInt64.Max, }, }, - } + }, nil case t.GenerateLastName != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateLastNameConfig{ GenerateLastNameConfig: &mgmtv1alpha1.GenerateLastName{}, }, - } + }, nil case t.GenerateSha256Hash != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateSha256HashConfig{ GenerateSha256HashConfig: &mgmtv1alpha1.GenerateSha256Hash{}, }, - } + }, nil case t.GenerateSsn != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateSsnConfig{ GenerateSsnConfig: &mgmtv1alpha1.GenerateSSN{}, }, - } + }, nil case t.GenerateState != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateStateConfig{ @@ -553,13 +567,13 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo GenerateFullName: t.GenerateState.GenerateFullName, }, }, - } + }, nil case t.GenerateStreetAddress != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateStreetAddressConfig{ GenerateStreetAddressConfig: &mgmtv1alpha1.GenerateStreetAddress{}, }, - } + }, nil case t.GenerateStringPhoneNumber != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateStringPhoneNumberConfig{ @@ -568,7 +582,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo Max: t.GenerateStringPhoneNumber.Max, }, }, - } + }, nil case t.GenerateString != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateStringConfig{ @@ -577,25 +591,25 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo Max: t.GenerateString.Max, }, }, - } + }, nil case t.GenerateUnixTimestamp != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateUnixtimestampConfig{ GenerateUnixtimestampConfig: &mgmtv1alpha1.GenerateUnixTimestamp{}, }, - } + }, nil case t.GenerateUsername != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateUsernameConfig{ GenerateUsernameConfig: &mgmtv1alpha1.GenerateUsername{}, }, - } + }, nil case t.GenerateUtcTimestamp != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateUtctimestampConfig{ GenerateUtctimestampConfig: &mgmtv1alpha1.GenerateUtcTimestamp{}, }, - } + }, nil case t.GenerateUuid != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateUuidConfig{ @@ -603,13 +617,13 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo IncludeHyphens: t.GenerateUuid.IncludeHyphens, }, }, - } + }, nil case t.GenerateZipcode != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateZipcodeConfig{ GenerateZipcodeConfig: &mgmtv1alpha1.GenerateZipcode{}, }, - } + }, nil case t.TransformE164PhoneNumber != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformE164PhoneNumberConfig{ @@ -617,7 +631,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo PreserveLength: t.TransformE164PhoneNumber.PreserveLength, }, }, - } + }, nil case t.TransformFirstname != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformFirstNameConfig{ @@ -625,7 +639,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo PreserveLength: t.TransformFirstname.PreserveLength, }, }, - } + }, nil case t.TransformFloat64 != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformFloat64Config{ @@ -634,7 +648,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo RandomizationRangeMax: t.TransformFloat64.RandomizationRangeMin, }, }, - } + }, nil case t.TransformFullName != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformFullNameConfig{ @@ -642,7 +656,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo PreserveLength: t.TransformFullName.PreserveLength, }, }, - } + }, nil case t.TransformInt64PhoneNumber != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformInt64PhoneNumberConfig{ @@ -650,7 +664,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo PreserveLength: t.TransformInt64PhoneNumber.PreserveLength, }, }, - } + }, nil case t.TransformInt64 != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformInt64Config{ @@ -659,7 +673,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo RandomizationRangeMax: t.TransformInt64.RandomizationRangeMax, }, }, - } + }, nil case t.TransformLastName != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformLastNameConfig{ @@ -667,7 +681,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo PreserveLength: t.TransformLastName.PreserveLength, }, }, - } + }, nil case t.TransformPhoneNumber != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformPhoneNumberConfig{ @@ -675,7 +689,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo PreserveLength: t.TransformPhoneNumber.PreserveLength, }, }, - } + }, nil case t.TransformString != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformStringConfig{ @@ -683,19 +697,19 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo PreserveLength: t.TransformString.PreserveLength, }, }, - } + }, nil case t.Passthrough != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{ PassthroughConfig: &mgmtv1alpha1.Passthrough{}, }, - } + }, nil case t.Null != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_Nullconfig{ Nullconfig: &mgmtv1alpha1.Null{}, }, - } + }, nil case t.UserDefinedTransformer != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_UserDefinedTransformerConfig{ @@ -703,7 +717,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo Id: t.UserDefinedTransformer.Id, }, }, - } + }, nil case t.TransformJavascript != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformJavascriptConfig{ @@ -711,7 +725,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo Code: t.TransformJavascript.Code, }, }, - } + }, nil case t.GenerateCategorical != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateCategoricalConfig{ @@ -719,7 +733,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo Categories: t.GenerateCategorical.Categories, }, }, - } + }, nil case t.TransformCharacterScramble != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformCharacterScrambleConfig{ @@ -727,7 +741,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo UserProvidedRegex: t.TransformCharacterScramble.UserProvidedRegex, }, }, - } + }, nil case t.GenerateJavascript != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateJavascriptConfig{ @@ -735,7 +749,7 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo Code: t.GenerateJavascript.Code, }, }, - } + }, nil case t.GenerateCountry != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateCountryConfig{ @@ -743,13 +757,13 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo GenerateFullName: t.GenerateCountry.GenerateFullName, }, }, - } + }, nil case t.GenerateBusinessName != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateBusinessNameConfig{ GenerateBusinessNameConfig: &mgmtv1alpha1.GenerateBusinessName{}, }, - } + }, nil case t.GenerateIpAddress != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_GenerateIpAddressConfig{ @@ -757,20 +771,31 @@ func (t *TransformerConfig) ToTransformerConfigDto() *mgmtv1alpha1.TransformerCo IpType: (*mgmtv1alpha1.GenerateIpAddressType)(t.GenerateIpAddress.IpType), }, }, - } + }, nil case t.TransformUuid != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformUuidConfig{ TransformUuidConfig: &mgmtv1alpha1.TransformUuid{}, }, - } + }, nil case t.TransformScrambleIdentity != nil: return &mgmtv1alpha1.TransformerConfig{ Config: &mgmtv1alpha1.TransformerConfig_TransformScrambleIdentityConfig{ TransformScrambleIdentityConfig: &mgmtv1alpha1.TransformScrambleIdentity{}, }, + }, nil + case t.TransformPiiText != nil: + var v *mgmtv1alpha1.TransformPiiText + err := json.Unmarshal(t.TransformPiiText, &v) + if err != nil { + return nil, fmt.Errorf("unable to unmarshal transform pii text config: %w", err) } + return &mgmtv1alpha1.TransformerConfig{ + Config: &mgmtv1alpha1.TransformerConfig_TransformPiiTextConfig{ + TransformPiiTextConfig: v, + }, + }, nil default: - return &mgmtv1alpha1.TransformerConfig{} + return &mgmtv1alpha1.TransformerConfig{}, nil } } diff --git a/internal/integration-tests/worker/workflow/datasync-workflow.go b/internal/integration-tests/worker/workflow/datasync-workflow.go index 6952758afc..030d4d3846 100644 --- a/internal/integration-tests/worker/workflow/datasync-workflow.go +++ b/internal/integration-tests/worker/workflow/datasync-workflow.go @@ -116,6 +116,7 @@ func NewTestDataSyncWorkflowEnv( transformerclient := neosyncApi.OSSUnauthenticatedLicensedClients.Transformers() userclient := neosyncApi.OSSUnauthenticatedLicensedClients.Users() accounthookclient := neosyncApi.OSSUnauthenticatedLicensedClients.AccountHooks() + anonymizationclient := neosyncApi.OSSUnauthenticatedLicensedClients.Anonymize() testSuite := &testsuite.WorkflowTestSuite{} testSuite.SetLogger(log.NewStructuredLogger(testutil.GetConcurrentTestLogger(t))) env := testSuite.NewTestWorkflowEnvironment() @@ -155,6 +156,7 @@ func NewTestDataSyncWorkflowEnv( benthosstream.NewBenthosStreamManager(), neosyncApi.Mocks.TemporalClient, workflowEnv.maxIterations, + anonymizationclient, ) if workflowEnv.fakeEELicense.IsValid() { diff --git a/worker/internal/cmds/worker/serve/serve.go b/worker/internal/cmds/worker/serve/serve.go index ec7c325df9..cc3fb931c9 100644 --- a/worker/internal/cmds/worker/serve/serve.go +++ b/worker/internal/cmds/worker/serve/serve.go @@ -358,6 +358,11 @@ func serve(ctx context.Context) error { neosyncurl, connectInterceptorOption, ) + anonymizationclient := mgmtv1alpha1connect.NewAnonymizationServiceClient( + httpclient, + neosyncurl, + connectInterceptorOption, + ) sqlConnector := &sqlconnect.SqlOpenConnector{} sqlconnmanager := connectionmanager.NewConnectionManager(sqlprovider.NewProvider(sqlConnector)) @@ -389,6 +394,7 @@ func serve(ctx context.Context) error { streamManager, temporalClient, maxIterations, + anonymizationclient, ) schemainit_workflow_register.Register( diff --git a/worker/pkg/benthos/transformers/transform_pii_text.go b/worker/pkg/benthos/transformers/transform_pii_text.go index d048ba9a66..d5e00e3550 100644 --- a/worker/pkg/benthos/transformers/transform_pii_text.go +++ b/worker/pkg/benthos/transformers/transform_pii_text.go @@ -2,18 +2,87 @@ package transformers import ( context "context" + "encoding/json" "errors" "fmt" "reflect" + "connectrpc.com/connect" mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" + "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1/mgmtv1alpha1connect" "github.com/redpanda-data/benthos/v4/public/bloblang" ) +// Minimal interface that includes the config and value +// To be used deep in the transformers so we don't have to be aware of the account id at the benthos level type TransformPiiTextApi interface { Transform(ctx context.Context, config *mgmtv1alpha1.TransformPiiText, value string) (string, error) } +// Full interface that includes the account id +type AccountTransformPiiTextApi interface { + Transform(ctx context.Context, accountId string, config *mgmtv1alpha1.TransformPiiText, value string) (string, error) +} + +type AccountAwareAnonymizationPiiTextApi struct { + anonApi mgmtv1alpha1connect.AnonymizationServiceClient + accountId string +} + +func NewAccountAwareAnonymizationPiiTextApi( + anonApi mgmtv1alpha1connect.AnonymizationServiceClient, + accountId string, +) *AccountAwareAnonymizationPiiTextApi { + return &AccountAwareAnonymizationPiiTextApi{ + anonApi: anonApi, + accountId: accountId, + } +} + +func (a *AccountAwareAnonymizationPiiTextApi) Transform( + ctx context.Context, + config *mgmtv1alpha1.TransformPiiText, + value string, +) (string, error) { + wrapper := valueWrapper{ + Input: value, + } + bits, err := json.Marshal(wrapper) + if err != nil { + return "", fmt.Errorf("unable to marshal value: %w", err) + } + + resp, err := a.anonApi.AnonymizeSingle(ctx, connect.NewRequest(&mgmtv1alpha1.AnonymizeSingleRequest{ + InputData: string(bits), + AccountId: a.accountId, + TransformerMappings: []*mgmtv1alpha1.TransformerMapping{ + { + Expression: ".input", + Transformer: &mgmtv1alpha1.TransformerConfig{ + Config: nil, // TODO: add config + }, + }, + }, + })) + + if err != nil { + return "", fmt.Errorf("unable to anonymize text: %w", err) + } + + outputData := resp.Msg.GetOutputData() + var output valueWrapper + err = json.Unmarshal([]byte(outputData), &output) + if err != nil { + return "", fmt.Errorf("unable to unmarshal output: %w", err) + } + + return output.Input, nil +} + +type valueWrapper struct { + Input string `json:"input"` +} + func RegisterTransformPiiText( env *bloblang.Environment, api TransformPiiTextApi, diff --git a/worker/pkg/workflows/tablesync/activities/sync/activity.go b/worker/pkg/workflows/tablesync/activities/sync/activity.go index d4dd7132ed..2a177cc41b 100644 --- a/worker/pkg/workflows/tablesync/activities/sync/activity.go +++ b/worker/pkg/workflows/tablesync/activities/sync/activity.go @@ -45,7 +45,7 @@ type Activity struct { meter metric.Meter // optional benthosStreamManager benthosstream.BenthosStreamManagerClient temporalclient temporalclient.Client - transformPiiTextApi transformers.TransformPiiTextApi + anonymizationClient mgmtv1alpha1connect.AnonymizationServiceClient } func New( @@ -56,7 +56,7 @@ func New( meter metric.Meter, benthosStreamManager benthosstream.BenthosStreamManagerClient, temporalclient temporalclient.Client, - transformPiiTextApi transformers.TransformPiiTextApi, + anonymizationClient mgmtv1alpha1connect.AnonymizationServiceClient, ) *Activity { return &Activity{ connclient: connclient, @@ -66,7 +66,7 @@ func New( meter: meter, benthosStreamManager: benthosStreamManager, temporalclient: temporalclient, - transformPiiTextApi: transformPiiTextApi, + anonymizationClient: anonymizationClient, } } @@ -203,6 +203,7 @@ func (a *Activity) SyncTable( bstream, err := a.getBenthosStream( &info, + req.AccountId, benthosConfig, session, stopActivityChan, @@ -210,6 +211,7 @@ func (a *Activity) SyncTable( hasMorePages, continuationToken, identityAllocator, + a.anonymizationClient, logger, ) if err != nil { @@ -341,6 +343,7 @@ func runStream( func (a *Activity) getBenthosStream( info *activity.Info, + accountId string, benthosConfig string, session connectionmanager.SessionInterface, stopActivityChan chan error, @@ -348,17 +351,20 @@ func (a *Activity) getBenthosStream( hasMorePages neosync_benthos_sql.OnHasMorePagesFn, continuationToken *continuation_token.ContinuationToken, identityAllocator tablesync_shared.IdentityAllocator, + anonymizationClient mgmtv1alpha1connect.AnonymizationServiceClient, logger *slog.Logger, ) (benthosstream.BenthosStreamClient, error) { benenv, err := a.getBenthosEnvironment( logger, info.Attempt > 1, + accountId, getConnectionById, session, stopActivityChan, hasMorePages, continuationToken, identityAllocator, + anonymizationClient, ) if err != nil { return nil, err @@ -396,19 +402,25 @@ func (a *Activity) getBenthosStream( func (a *Activity) getBenthosEnvironment( logger *slog.Logger, isRetry bool, + accountId string, getConnectionById func(connectionId string) (connectionmanager.ConnectionInput, error), session connectionmanager.SessionInterface, stopActivityChan chan error, hasMorePages neosync_benthos_sql.OnHasMorePagesFn, continuationToken *continuation_token.ContinuationToken, identityAllocator tablesync_shared.IdentityAllocator, + anonymizationClient mgmtv1alpha1connect.AnonymizationServiceClient, ) (*service.Environment, error) { blobEnv := bloblang.NewEnvironment() err := transformers.RegisterTransformIdentityScramble(blobEnv, identityAllocator) if err != nil { return nil, fmt.Errorf("unable to register identity scramble transformer: %w", err) } - err = transformers.RegisterTransformPiiText(blobEnv, a.transformPiiTextApi) + transformPiiTextApiForAccount := transformers.NewAccountAwareAnonymizationPiiTextApi( + anonymizationClient, + accountId, + ) + err = transformers.RegisterTransformPiiText(blobEnv, transformPiiTextApiForAccount) if err != nil { return nil, fmt.Errorf("unable to register pii text transformer: %w", err) } @@ -436,7 +448,7 @@ func (a *Activity) getBenthosEnvironment( }), benthos_environment.WithStopChannel(stopActivityChan), benthos_environment.WithBlobEnv(blobEnv), - benthos_environment.WithTransformPiiTextApi(a.transformPiiTextApi), + benthos_environment.WithTransformPiiTextApi(transformPiiTextApiForAccount), ) if err != nil { return nil, fmt.Errorf("unable to instantiate benthos environment: %w", err) diff --git a/worker/pkg/workflows/tablesync/activities/sync/activity_test.go b/worker/pkg/workflows/tablesync/activities/sync/activity_test.go index bfe2434375..ca0141b39b 100644 --- a/worker/pkg/workflows/tablesync/activities/sync/activity_test.go +++ b/worker/pkg/workflows/tablesync/activities/sync/activity_test.go @@ -88,7 +88,7 @@ output: var meter metric.Meter temporalclient := tmprl_mocks.NewClient(t) - activity := New(connclient, jobclient, sqlconnmanager, mongoconnmanager, meter, benthosStreamManager, temporalclient) + activity := New(connclient, jobclient, sqlconnmanager, mongoconnmanager, meter, benthosStreamManager, temporalclient, nil) env.RegisterActivity(activity.SyncTable) @@ -167,7 +167,7 @@ output: var meter metric.Meter temporalclient := tmprl_mocks.NewClient(t) - activity := New(connclient, jobclient, sqlconnmanager, mongoconnmanager, meter, benthosStreamManager, temporalclient) + activity := New(connclient, jobclient, sqlconnmanager, mongoconnmanager, meter, benthosStreamManager, temporalclient, nil) env.RegisterActivity(activity.SyncTable) t.Run("valid continuation token", func(t *testing.T) { @@ -210,7 +210,7 @@ func Test_Sync_Run_No_BenthosConfig(t *testing.T) { benthosStreamManager := benthosstream.NewBenthosStreamManager() temporalclient := tmprl_mocks.NewClient(t) - activity := New(nil, nil, nil, nil, nil, benthosStreamManager, temporalclient) + activity := New(nil, nil, nil, nil, nil, benthosStreamManager, temporalclient, nil) env.RegisterActivity(activity.SyncTable) @@ -278,7 +278,7 @@ metrics: sqlconnmanager := connectionmanager.NewConnectionManager(sqlprovider.NewProvider(&sqlconnect.SqlOpenConnector{}), connectionmanager.WithCloseOnRelease()) mongoconnmanager := connectionmanager.NewConnectionManager(mongoprovider.NewProvider(), connectionmanager.WithCloseOnRelease()) temporalclient := tmprl_mocks.NewClient(t) - activity := New(connclient, jobclient, sqlconnmanager, mongoconnmanager, meter, benthosStreamManager, temporalclient) + activity := New(connclient, jobclient, sqlconnmanager, mongoconnmanager, meter, benthosStreamManager, temporalclient, nil) env.RegisterActivity(activity.SyncTable) @@ -354,7 +354,7 @@ func Test_Sync_Run_Processor_Error(t *testing.T) { mongoconnmanager := connectionmanager.NewConnectionManager(mongoprovider.NewProvider(), connectionmanager.WithCloseOnRelease()) var meter metric.Meter temporalclient := tmprl_mocks.NewClient(t) - activity := New(connclient, jobclient, sqlconnmanager, mongoconnmanager, meter, benthosStreamManager, temporalclient) + activity := New(connclient, jobclient, sqlconnmanager, mongoconnmanager, meter, benthosStreamManager, temporalclient, nil) env.RegisterActivity(activity.SyncTable) @@ -424,7 +424,7 @@ output: mongoconnmanager := connectionmanager.NewConnectionManager(mongoprovider.NewProvider(), connectionmanager.WithCloseOnRelease()) var meter metric.Meter temporalclient := tmprl_mocks.NewClient(t) - activity := New(connclient, jobclient, sqlconnmanager, mongoconnmanager, meter, mockBenthosStreamManager, temporalclient) + activity := New(connclient, jobclient, sqlconnmanager, mongoconnmanager, meter, mockBenthosStreamManager, temporalclient, nil) env.RegisterActivity(activity.SyncTable) @@ -515,7 +515,7 @@ output: mongoconnmanager := connectionmanager.NewConnectionManager(mongoprovider.NewProvider(), connectionmanager.WithCloseOnRelease()) var meter metric.Meter temporalclient := tmprl_mocks.NewClient(t) - activity := New(connclient, jobclient, sqlconnmanager, mongoconnmanager, meter, mockBenthosStreamManager, temporalclient) + activity := New(connclient, jobclient, sqlconnmanager, mongoconnmanager, meter, mockBenthosStreamManager, temporalclient, nil) env.RegisterActivity(activity.SyncTable) _, err := env.ExecuteActivity(activity.SyncTable, &SyncTableRequest{ diff --git a/worker/pkg/workflows/tablesync/workflow/register/register.go b/worker/pkg/workflows/tablesync/workflow/register/register.go index 5b0f05db60..2f61923faa 100644 --- a/worker/pkg/workflows/tablesync/workflow/register/register.go +++ b/worker/pkg/workflows/tablesync/workflow/register/register.go @@ -27,6 +27,7 @@ func Register( benthosStreamManager benthosstream.BenthosStreamManagerClient, temporalclient client.Client, maxIterations int, + anonymizationClient mgmtv1alpha1connect.AnonymizationServiceClient, ) { tsWf := tablesync_workflow.New(maxIterations) w.RegisterWorkflow(tsWf.TableSync) @@ -39,6 +40,7 @@ func Register( meter, benthosStreamManager, temporalclient, + anonymizationClient, ) w.RegisterActivity(syncActivity.Sync) From 29b3f1d144ad7c19ee2a745fa1c40eca261a09f7 Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Mon, 31 Mar 2025 14:21:13 -0700 Subject: [PATCH 07/20] allows transform pii text on fe --- .../apps/web/libs/hooks/useGetTransformersHandler.ts | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/frontend/apps/web/libs/hooks/useGetTransformersHandler.ts b/frontend/apps/web/libs/hooks/useGetTransformersHandler.ts index db0cacefa5..47b7e00acd 100644 --- a/frontend/apps/web/libs/hooks/useGetTransformersHandler.ts +++ b/frontend/apps/web/libs/hooks/useGetTransformersHandler.ts @@ -1,6 +1,6 @@ import { TransformerHandler } from '@/components/jobs/SchemaTable/transformer-handler'; import { useQuery } from '@connectrpc/connect-query'; -import { TransformerSource, TransformersService } from '@neosync/sdk'; +import { TransformersService } from '@neosync/sdk'; import { useMemo } from 'react'; export function useGetTransformersHandler(accountId: string): { @@ -31,12 +31,7 @@ export function useGetTransformersHandler(accountId: string): { const handler = useMemo( (): TransformerHandler => - new TransformerHandler( - systemTransformers.filter( - (st) => st.source !== TransformerSource.TRANSFORM_PII_TEXT - ), - userDefinedTransformers - ), + new TransformerHandler(systemTransformers, userDefinedTransformers), [isValidating] ); From 05977b3e212a0b91abe7a5a76b3765489ec0575c Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Mon, 31 Mar 2025 14:29:46 -0700 Subject: [PATCH 08/20] moves licensing around for transformpiitext --- backend/internal/cmds/mgmt/serve/connect/cmd.go | 7 +++---- backend/pkg/integration-test/mux.go | 2 +- .../services/mgmt/v1alpha1/transformers-service/service.go | 5 ++++- .../v1alpha1/transformers-service/system_transformers.go | 4 ++-- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/backend/internal/cmds/mgmt/serve/connect/cmd.go b/backend/internal/cmds/mgmt/serve/connect/cmd.go index f95a3e7cb1..68f81d0c9c 100644 --- a/backend/internal/cmds/mgmt/serve/connect/cmd.go +++ b/backend/internal/cmds/mgmt/serve/connect/cmd.go @@ -720,9 +720,8 @@ func serve(ctx context.Context) error { } transformerService := v1alpha1_transformerservice.New(&v1alpha1_transformerservice.Config{ - IsPresidioEnabled: ncloudlicense.IsValid(), - IsNeosyncCloud: ncloudlicense.IsValid(), - }, db, presEntityClient, userdataclient) + IsPresidioEnabled: cascadelicense.IsValid(), + }, db, presEntityClient, userdataclient, cascadelicense) api.Handle( mgmtv1alpha1connect.NewTransformersServiceHandler( transformerService, @@ -734,7 +733,7 @@ func serve(ctx context.Context) error { ) anonymizationService := v1alpha1_anonymizationservice.New(&v1alpha1_anonymizationservice.Config{ - IsPresidioEnabled: ncloudlicense.IsValid(), + IsPresidioEnabled: cascadelicense.IsValid(), PresidioDefaultLanguage: getPresidioDefaultLanguage(), IsAuthEnabled: isAuthEnabled, IsNeosyncCloud: ncloudlicense.IsValid(), diff --git a/backend/pkg/integration-test/mux.go b/backend/pkg/integration-test/mux.go index d3e59e1042..894e3b3d6c 100644 --- a/backend/pkg/integration-test/mux.go +++ b/backend/pkg/integration-test/mux.go @@ -209,11 +209,11 @@ func (s *NeosyncApiTestClient) setupMux( transformerService := v1alpha1_transformersservice.New( &v1alpha1_transformersservice.Config{ IsPresidioEnabled: isPresidioEnabled, - IsNeosyncCloud: isNeosyncCloud, }, neosyncdb.New(pgcontainer.DB, db_queries.New()), s.Mocks.Presidio.Entities, userclient, + license, ) sqlmanagerclient := NewTestSqlManagerClient() diff --git a/backend/services/mgmt/v1alpha1/transformers-service/service.go b/backend/services/mgmt/v1alpha1/transformers-service/service.go index 9047c21927..6b3e3af1d5 100644 --- a/backend/services/mgmt/v1alpha1/transformers-service/service.go +++ b/backend/services/mgmt/v1alpha1/transformers-service/service.go @@ -2,6 +2,7 @@ package v1alpha1_transformersservice import ( "github.com/nucleuscloud/neosync/backend/internal/userdata" + "github.com/nucleuscloud/neosync/internal/ee/license" presidioapi "github.com/nucleuscloud/neosync/internal/ee/presidio" "github.com/nucleuscloud/neosync/internal/neosyncdb" ) @@ -11,11 +12,11 @@ type Service struct { db *neosyncdb.NeosyncDb entityclient presidioapi.EntityInterface userdataclient userdata.Interface + license license.EEInterface } type Config struct { IsPresidioEnabled bool - IsNeosyncCloud bool } func New( @@ -23,11 +24,13 @@ func New( db *neosyncdb.NeosyncDb, recognizerclient presidioapi.EntityInterface, userdataclient userdata.Interface, + license license.EEInterface, ) *Service { return &Service{ cfg: cfg, db: db, entityclient: recognizerclient, userdataclient: userdataclient, + license: license, } } diff --git a/backend/services/mgmt/v1alpha1/transformers-service/system_transformers.go b/backend/services/mgmt/v1alpha1/transformers-service/system_transformers.go index 65ef921eeb..401cc4f588 100644 --- a/backend/services/mgmt/v1alpha1/transformers-service/system_transformers.go +++ b/backend/services/mgmt/v1alpha1/transformers-service/system_transformers.go @@ -1006,14 +1006,14 @@ func (s *Service) GetSystemTransformerBySource( } func (s *Service) getSystemTransformerSourceMap() map[mgmtv1alpha1.TransformerSource]*mgmtv1alpha1.SystemTransformer { - if s.cfg.IsNeosyncCloud { + if s.license.IsValid() { return allSystemTransformersSourceMap } return baseSystemTransformerSourceMap } func (s *Service) getSystemTransformers() []*mgmtv1alpha1.SystemTransformer { - if s.cfg.IsNeosyncCloud { + if s.license.IsValid() { return allSystemTransformers } return baseSystemTransformers From 46f839eda19feb1d4c9b354ddc854e732f41bf56 Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Mon, 31 Mar 2025 14:53:28 -0700 Subject: [PATCH 09/20] gets blobl string working properly --- .../benthos-builder/builders/processors.go | 10 +++ .../transformers/gen_transform_pii_text.go | 61 +++++++++++-------- 2 files changed, 45 insertions(+), 26 deletions(-) diff --git a/internal/benthos/benthos-builder/builders/processors.go b/internal/benthos/benthos-builder/builders/processors.go index 6125d33c05..db720e0814 100644 --- a/internal/benthos/benthos-builder/builders/processors.go +++ b/internal/benthos/benthos-builder/builders/processors.go @@ -856,6 +856,16 @@ func computeMutationFunction( return "", err } return opts.BuildBloblangString(formattedColPath), nil + case *mgmtv1alpha1.TransformerConfig_TransformPiiTextConfig: + opts, err := transformers.NewTransformPiiTextOptsFromConfig(cfg.TransformPiiTextConfig) + if err != nil { + return "", err + } + bloblangString, err := opts.BuildBloblangString(formattedColPath) + if err != nil { + return "", fmt.Errorf("unable to build bloblang string for TransformPiiText: %w", err) + } + return bloblangString, nil default: return "", fmt.Errorf("unsupported transformer: %T", cfg) } diff --git a/worker/pkg/benthos/transformers/gen_transform_pii_text.go b/worker/pkg/benthos/transformers/gen_transform_pii_text.go index dd1085a6b3..61e0bc59a8 100644 --- a/worker/pkg/benthos/transformers/gen_transform_pii_text.go +++ b/worker/pkg/benthos/transformers/gen_transform_pii_text.go @@ -1,25 +1,24 @@ - -// Code generated by Neosync neosync_transformer_generator.go. DO NOT EDIT. // source: transform_pii_text.go package transformers import ( - "strings" + "encoding/json" "fmt" + "strings" ) -type TransformPiiText struct{ +type TransformPiiText struct { api TransformPiiTextApi } type TransformPiiTextOpts struct { - scoreThreshold *float64 - language *string - allowedPhrases any - allowedEntities any + scoreThreshold *float64 + language *string + allowedPhrases any + allowedEntities any defaultAnonymizer any - denyRecognizers any + denyRecognizers any entityAnonymizers any } @@ -45,21 +44,20 @@ func NewTransformPiiTextOpts( scoreThreshold = *scoreThresholdArg } - return &TransformPiiTextOpts{ - scoreThreshold: &scoreThreshold, - language: language, - allowedPhrases: allowedPhrasesArg, - allowedEntities: allowedEntitiesArg, + scoreThreshold: &scoreThreshold, + language: language, + allowedPhrases: allowedPhrasesArg, + allowedEntities: allowedEntitiesArg, defaultAnonymizer: defaultAnonymizerArg, - denyRecognizers: denyRecognizersArg, + denyRecognizers: denyRecognizersArg, entityAnonymizers: entityAnonymizersArg, }, nil } func (o *TransformPiiTextOpts) BuildBloblangString( valuePath string, -) string { +) (string, error) { fnStr := []string{ "value:this.%s", } @@ -68,7 +66,6 @@ func (o *TransformPiiTextOpts) BuildBloblangString( valuePath, } - if o.scoreThreshold != nil { fnStr = append(fnStr, "score_threshold:%v") params = append(params, *o.scoreThreshold) @@ -86,27 +83,39 @@ func (o *TransformPiiTextOpts) BuildBloblangString( params = append(params, o.allowedEntities) } if o.defaultAnonymizer != nil { - fnStr = append(fnStr, "default_anonymizer:%v") - params = append(params, o.defaultAnonymizer) + fnStr = append(fnStr, "default_anonymizer:%s") + json, err := json.Marshal(o.defaultAnonymizer) + if err != nil { + return "", fmt.Errorf("unable to marshal default_anonymizer: %w", err) + } + params = append(params, string(json)) } if o.denyRecognizers != nil { - fnStr = append(fnStr, "deny_recognizers:%v") - params = append(params, o.denyRecognizers) + fnStr = append(fnStr, "deny_recognizers:%s") + json, err := json.Marshal(o.denyRecognizers) + if err != nil { + return "", fmt.Errorf("unable to marshal deny_recognizers: %w", err) + } + params = append(params, string(json)) } if o.entityAnonymizers != nil { - fnStr = append(fnStr, "entity_anonymizers:%v") - params = append(params, o.entityAnonymizers) + fnStr = append(fnStr, "entity_anonymizers:%s") + json, err := json.Marshal(o.entityAnonymizers) + if err != nil { + return "", fmt.Errorf("unable to marshal entity_anonymizers: %w", err) + } + params = append(params, string(json)) } template := fmt.Sprintf("transform_pii_text(%s)", strings.Join(fnStr, ",")) - return fmt.Sprintf(template, params...) + return fmt.Sprintf(template, params...), nil } func (t *TransformPiiText) GetJsTemplateData() (*TemplateData, error) { return &TemplateData{ - Name: "transformPiiText", + Name: "transformPiiText", Description: "Anonymizes and transforms freeform text.", - Example: "", + Example: "", }, nil } From 1987765dfa477d15f63c23c009f7bd2db5440671 Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Mon, 31 Mar 2025 15:31:17 -0700 Subject: [PATCH 10/20] updates license to allow for EE --- backend/internal/cmds/mgmt/serve/connect/cmd.go | 4 ++-- backend/pkg/integration-test/mux.go | 1 + .../v1alpha1/anonymization-service/anonymization.go | 8 ++++---- .../mgmt/v1alpha1/anonymization-service/service.go | 7 ++++++- .../pkg/benthos/transformers/transform_pii_text.go | 12 +++++++++++- 5 files changed, 24 insertions(+), 8 deletions(-) diff --git a/backend/internal/cmds/mgmt/serve/connect/cmd.go b/backend/internal/cmds/mgmt/serve/connect/cmd.go index 68f81d0c9c..bc6dab8d69 100644 --- a/backend/internal/cmds/mgmt/serve/connect/cmd.go +++ b/backend/internal/cmds/mgmt/serve/connect/cmd.go @@ -699,7 +699,7 @@ func serve(ctx context.Context) error { var presAnalyzeClient presidioapi.AnalyzeInterface var presAnonClient presidioapi.AnonymizeInterface var presEntityClient presidioapi.EntityInterface - if ncloudlicense.IsValid() { + if cascadelicense.IsValid() { analyzeClient, ok, err := getPresidioAnalyzeClient() if err != nil { return fmt.Errorf("unable to initialize presidio analyze client: %w", err) @@ -737,7 +737,7 @@ func serve(ctx context.Context) error { PresidioDefaultLanguage: getPresidioDefaultLanguage(), IsAuthEnabled: isAuthEnabled, IsNeosyncCloud: ncloudlicense.IsValid(), - }, anonymizerMeter, userdataclient, useraccountService, transformerService, presAnalyzeClient, presAnonClient, db) + }, anonymizerMeter, userdataclient, useraccountService, transformerService, presAnalyzeClient, presAnonClient, db, cascadelicense) api.Handle( mgmtv1alpha1connect.NewAnonymizationServiceHandler( anonymizationService, diff --git a/backend/pkg/integration-test/mux.go b/backend/pkg/integration-test/mux.go index 894e3b3d6c..8dce329b51 100644 --- a/backend/pkg/integration-test/mux.go +++ b/backend/pkg/integration-test/mux.go @@ -289,6 +289,7 @@ func (s *NeosyncApiTestClient) setupMux( presAnalyzeClient, presAnonClient, neosyncDb, + license, ) connectionDataService := v1alpha1_connectiondataservice.New( diff --git a/backend/services/mgmt/v1alpha1/anonymization-service/anonymization.go b/backend/services/mgmt/v1alpha1/anonymization-service/anonymization.go index e6d295f03f..f64a6ed3d3 100644 --- a/backend/services/mgmt/v1alpha1/anonymization-service/anonymization.go +++ b/backend/services/mgmt/v1alpha1/anonymization-service/anonymization.go @@ -32,7 +32,7 @@ func (s *Service) AnonymizeMany( req *connect.Request[mgmtv1alpha1.AnonymizeManyRequest], ) (*connect.Response[mgmtv1alpha1.AnonymizeManyResponse], error) { logger := logger_interceptor.GetLoggerFromContextOrDefault(ctx) - if !s.cfg.IsNeosyncCloud { + if !s.license.IsValid() { return nil, nucleuserrors.NewNotImplemented( fmt.Sprintf( "%s is not implemented in the OSS version of Neosync.", @@ -190,11 +190,11 @@ func (s *Service) AnonymizeSingle( if err != nil { return nil, err } - if !s.cfg.IsNeosyncCloud || account.AccountType == int16(neosyncdb.AccountType_Personal) { + if !s.license.IsValid() || (s.cfg.IsNeosyncCloud && account.AccountType == int16(neosyncdb.AccountType_Personal)) { for _, mapping := range req.Msg.GetTransformerMappings() { if mapping.GetTransformer().GetTransformPiiTextConfig() != nil { return nil, nucleuserrors.NewForbidden( - "TransformPiiText is not available for use. Please contact us to upgrade your account.", + "TransformPiiText is not available for use. Please contact us about upgrading your account.", ) } } @@ -203,7 +203,7 @@ func (s *Service) AnonymizeSingle( defaultTransforms.GetN().GetTransformPiiTextConfig() != nil || defaultTransforms.GetS().GetTransformPiiTextConfig() != nil { return nil, nucleuserrors.NewForbidden( - "TransformPiiText is not available for use. Please contact us to upgrade your account.", + "TransformPiiText is not available for use. Please contact us about upgrading your account.", ) } } diff --git a/backend/services/mgmt/v1alpha1/anonymization-service/service.go b/backend/services/mgmt/v1alpha1/anonymization-service/service.go index 2f6bf63fa2..38262109c9 100644 --- a/backend/services/mgmt/v1alpha1/anonymization-service/service.go +++ b/backend/services/mgmt/v1alpha1/anonymization-service/service.go @@ -3,6 +3,7 @@ package v1alpha_anonymizationservice import ( "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1/mgmtv1alpha1connect" "github.com/nucleuscloud/neosync/backend/internal/userdata" + "github.com/nucleuscloud/neosync/internal/ee/license" presidioapi "github.com/nucleuscloud/neosync/internal/ee/presidio" "github.com/nucleuscloud/neosync/internal/neosyncdb" "go.opentelemetry.io/otel/metric" @@ -17,13 +18,15 @@ type Service struct { analyze presidioapi.AnalyzeInterface anonymize presidioapi.AnonymizeInterface db *neosyncdb.NeosyncDb + license license.EEInterface + isNeosyncCloud bool } type Config struct { IsAuthEnabled bool + IsNeosyncCloud bool IsPresidioEnabled bool PresidioDefaultLanguage *string - IsNeosyncCloud bool } func New( @@ -35,6 +38,7 @@ func New( analyzeclient presidioapi.AnalyzeInterface, anonymizeclient presidioapi.AnonymizeInterface, db *neosyncdb.NeosyncDb, + license license.EEInterface, ) *Service { return &Service{ cfg: cfg, @@ -45,5 +49,6 @@ func New( analyze: analyzeclient, anonymize: anonymizeclient, db: db, + license: license, } } diff --git a/worker/pkg/benthos/transformers/transform_pii_text.go b/worker/pkg/benthos/transformers/transform_pii_text.go index d5e00e3550..834ff6c790 100644 --- a/worker/pkg/benthos/transformers/transform_pii_text.go +++ b/worker/pkg/benthos/transformers/transform_pii_text.go @@ -59,7 +59,9 @@ func (a *AccountAwareAnonymizationPiiTextApi) Transform( { Expression: ".input", Transformer: &mgmtv1alpha1.TransformerConfig{ - Config: nil, // TODO: add config + Config: &mgmtv1alpha1.TransformerConfig_TransformPiiTextConfig{ + TransformPiiTextConfig: config, + }, }, }, }, @@ -98,6 +100,7 @@ func RegisterTransformPiiText( ). Param(bloblang.NewStringParam("language"). Optional(). + Default("en"). Description("The language of the text to be anonymized."), ). Param(bloblang.NewAnyParam("allowed_phrases"). @@ -147,6 +150,10 @@ func RegisterTransformPiiText( if err != nil { return nil, err } + if language == nil { + defaultLanguage := "en" + language = &defaultLanguage + } allowedPhrasesParam, err := args.Get("allowed_phrases") if err != nil { @@ -314,6 +321,9 @@ func transformPiiText(api TransformPiiTextApi, config *mgmtv1alpha1.TransformPii return &result, nil } + bits, _ := json.Marshal(config) + fmt.Println("result!!!", string(bits)) + transformedResult, err := api.Transform(context.Background(), config, result) if err != nil { return nil, fmt.Errorf("unable to transform PII text: %w", err) From fb6dfdce4d561ab2333bc65a20f22ec3adccd375 Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Mon, 31 Mar 2025 15:32:17 -0700 Subject: [PATCH 11/20] fremove print --- worker/pkg/benthos/transformers/transform_pii_text.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/worker/pkg/benthos/transformers/transform_pii_text.go b/worker/pkg/benthos/transformers/transform_pii_text.go index 834ff6c790..4ed6f12859 100644 --- a/worker/pkg/benthos/transformers/transform_pii_text.go +++ b/worker/pkg/benthos/transformers/transform_pii_text.go @@ -321,9 +321,6 @@ func transformPiiText(api TransformPiiTextApi, config *mgmtv1alpha1.TransformPii return &result, nil } - bits, _ := json.Marshal(config) - fmt.Println("result!!!", string(bits)) - transformedResult, err := api.Transform(context.Background(), config, result) if err != nil { return nil, fmt.Errorf("unable to transform PII text: %w", err) From 2a8825b7bb77954010dec171d5d3337dc6444de2 Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Mon, 31 Mar 2025 15:39:16 -0700 Subject: [PATCH 12/20] updates logic for handling if presidio is enabled --- backend/internal/cmds/mgmt/serve/connect/cmd.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/backend/internal/cmds/mgmt/serve/connect/cmd.go b/backend/internal/cmds/mgmt/serve/connect/cmd.go index bc6dab8d69..20a10ee769 100644 --- a/backend/internal/cmds/mgmt/serve/connect/cmd.go +++ b/backend/internal/cmds/mgmt/serve/connect/cmd.go @@ -719,8 +719,10 @@ func serve(ctx context.Context) error { } } + isPresidioEnabled := cascadelicense.IsValid() && presAnalyzeClient != nil && presAnonClient != nil + transformerService := v1alpha1_transformerservice.New(&v1alpha1_transformerservice.Config{ - IsPresidioEnabled: cascadelicense.IsValid(), + IsPresidioEnabled: isPresidioEnabled, }, db, presEntityClient, userdataclient, cascadelicense) api.Handle( mgmtv1alpha1connect.NewTransformersServiceHandler( @@ -733,7 +735,7 @@ func serve(ctx context.Context) error { ) anonymizationService := v1alpha1_anonymizationservice.New(&v1alpha1_anonymizationservice.Config{ - IsPresidioEnabled: cascadelicense.IsValid(), + IsPresidioEnabled: isPresidioEnabled, PresidioDefaultLanguage: getPresidioDefaultLanguage(), IsAuthEnabled: isAuthEnabled, IsNeosyncCloud: ncloudlicense.IsValid(), From 978827ef5a6dab15137b51e01594980a4826f8ec Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Mon, 31 Mar 2025 15:40:53 -0700 Subject: [PATCH 13/20] consolidate --- backend/services/mgmt/v1alpha1/anonymization-service/service.go | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/services/mgmt/v1alpha1/anonymization-service/service.go b/backend/services/mgmt/v1alpha1/anonymization-service/service.go index 38262109c9..83d8f7c197 100644 --- a/backend/services/mgmt/v1alpha1/anonymization-service/service.go +++ b/backend/services/mgmt/v1alpha1/anonymization-service/service.go @@ -19,7 +19,6 @@ type Service struct { anonymize presidioapi.AnonymizeInterface db *neosyncdb.NeosyncDb license license.EEInterface - isNeosyncCloud bool } type Config struct { From e5fc94948fdf18dd797f6f74709a2733fb23c7c1 Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Mon, 31 Mar 2025 15:44:58 -0700 Subject: [PATCH 14/20] stringifies transformed data so it is readable --- backend/sql/postgresql/models/transformers.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/backend/sql/postgresql/models/transformers.go b/backend/sql/postgresql/models/transformers.go index f66237e394..303521b3fb 100644 --- a/backend/sql/postgresql/models/transformers.go +++ b/backend/sql/postgresql/models/transformers.go @@ -59,7 +59,7 @@ type TransformerConfig struct { GenerateIpAddress *GenerateIpAddressConfig `json:"generateIpAddressConfig,omitempty"` TransformUuid *TransformUuidConfig `json:"transformUuid,omitempty"` TransformScrambleIdentity *TransformScrambleIdentityConfig `json:"transformScrambleIdentity,omitempty"` - TransformPiiText []byte `json:"transformPiiText,omitempty"` + TransformPiiText string `json:"transformPiiText,omitempty"` } type TransformScrambleIdentityConfig struct{} @@ -409,7 +409,7 @@ func (t *TransformerConfig) FromTransformerConfigDto(tr *mgmtv1alpha1.Transforme if err != nil { return fmt.Errorf("unable to marshal transform pii text config: %w", err) } - t.TransformPiiText = bits + t.TransformPiiText = string(bits) default: t = &TransformerConfig{} } @@ -784,9 +784,9 @@ func (t *TransformerConfig) ToTransformerConfigDto() (*mgmtv1alpha1.TransformerC TransformScrambleIdentityConfig: &mgmtv1alpha1.TransformScrambleIdentity{}, }, }, nil - case t.TransformPiiText != nil: + case t.TransformPiiText != "": var v *mgmtv1alpha1.TransformPiiText - err := json.Unmarshal(t.TransformPiiText, &v) + err := json.Unmarshal([]byte(t.TransformPiiText), &v) if err != nil { return nil, fmt.Errorf("unable to unmarshal transform pii text config: %w", err) } From c8ea4304bb3bbc98e06dfb797516a1955fbb3bd2 Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Mon, 31 Mar 2025 15:47:08 -0700 Subject: [PATCH 15/20] updates go tools --- tools/go.mod | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/go.mod b/tools/go.mod index 8f6c37ad85..cb4edab881 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -13,6 +13,7 @@ require ( require ( buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.5-20250219170025-d39267d9df8f.1 // indirect + connectrpc.com/connect v1.18.1 // indirect github.com/Jeffail/gabs/v2 v2.7.0 // indirect github.com/OneOfOne/xxhash v1.2.8 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect From 06b48f5a50be32d49dc47154473eebeeb5f594d5 Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Mon, 31 Mar 2025 15:51:48 -0700 Subject: [PATCH 16/20] Adds worker tests --- .../builders/benthos-builder_test.go | 15 +++++++++++++++ .../benthos/transformers/transform_pii_text.go | 6 +++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/internal/benthos/benthos-builder/builders/benthos-builder_test.go b/internal/benthos/benthos-builder/builders/benthos-builder_test.go index 3d1b032540..548d6e4f04 100644 --- a/internal/benthos/benthos-builder/builders/benthos-builder_test.go +++ b/internal/benthos/benthos-builder/builders/benthos-builder_test.go @@ -948,6 +948,16 @@ func Test_computeMutationFunction_Validate_Bloblang_Output(t *testing.T) { }, }, }, + { + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_PII_TEXT, + Config: &mgmtv1alpha1.TransformerConfig{ + Config: &mgmtv1alpha1.TransformerConfig_TransformPiiTextConfig{ + TransformPiiTextConfig: &mgmtv1alpha1.TransformPiiText{ + Language: gotypeutil.ToPtr("en"), + }, + }, + }, + }, } emailColInfo := &sqlmanager_shared.DatabaseSchemaRow{ @@ -962,6 +972,7 @@ func Test_computeMutationFunction_Validate_Bloblang_Output(t *testing.T) { blobenv := bloblang.NewEnvironment() neosync_benthos_transformers.RegisterTransformIdentityScramble(blobenv, nil) + neosync_benthos_transformers.RegisterTransformPiiText(blobenv, nil) for _, transformer := range transformers { t.Run(fmt.Sprintf("%s_%T_lint", t.Name(), transformer.Config.Config), func(t *testing.T) { @@ -1103,6 +1114,9 @@ func Test_computeMutationFunction_Validate_Bloblang_Output_EmptyConfigs(t *testi { Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_TransformScrambleIdentityConfig{}}, }, + { + Config: &mgmtv1alpha1.TransformerConfig{Config: &mgmtv1alpha1.TransformerConfig_TransformPiiTextConfig{}}, + }, } emailColInfo := &sqlmanager_shared.DatabaseSchemaRow{ @@ -1117,6 +1131,7 @@ func Test_computeMutationFunction_Validate_Bloblang_Output_EmptyConfigs(t *testi blobenv := bloblang.NewEnvironment() neosync_benthos_transformers.RegisterTransformIdentityScramble(blobenv, nil) + neosync_benthos_transformers.RegisterTransformPiiText(blobenv, nil) for _, transformer := range transformers { t.Run(fmt.Sprintf("%s_%T_lint", t.Name(), transformer.Config.Config), func(t *testing.T) { diff --git a/worker/pkg/benthos/transformers/transform_pii_text.go b/worker/pkg/benthos/transformers/transform_pii_text.go index 4ed6f12859..5a84bb20d6 100644 --- a/worker/pkg/benthos/transformers/transform_pii_text.go +++ b/worker/pkg/benthos/transformers/transform_pii_text.go @@ -235,7 +235,11 @@ func NewTransformPiiTextOptsFromConfig( config *mgmtv1alpha1.TransformPiiText, ) (*TransformPiiTextOpts, error) { if config == nil { - return nil, fmt.Errorf("config is nil") + defaultLanguage := "en" + config = &mgmtv1alpha1.TransformPiiText{ + Language: &defaultLanguage, + ScoreThreshold: 0.5, + } } scoreThreshold := float64(config.ScoreThreshold) return NewTransformPiiTextOpts( From a500e82093aa289906318c786c84880f95f49434 Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Mon, 31 Mar 2025 15:53:28 -0700 Subject: [PATCH 17/20] fixes int test --- .../api/anonymization-service_integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/integration-tests/api/anonymization-service_integration_test.go b/internal/integration-tests/api/anonymization-service_integration_test.go index a599a46bc0..cd46cab11d 100644 --- a/internal/integration-tests/api/anonymization-service_integration_test.go +++ b/internal/integration-tests/api/anonymization-service_integration_test.go @@ -433,7 +433,7 @@ func (s *IntegrationTestSuite) Test_AnonymizeService_AnonymizeSingle_ForbiddenTr t := s.T() t.Run("OSS", func(t *testing.T) { - accountId := s.createPersonalAccount(s.ctx, s.OSSUnauthenticatedLicensedClients.Users()) + accountId := s.createPersonalAccount(s.ctx, s.OSSUnauthenticatedUnlicensedClients.Users()) t.Run("transformpiitext", func(t *testing.T) { t.Run("mappings", func(t *testing.T) { From 598f107f5e2a068fe84ffa554bb91752fc7c0e7a Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Mon, 31 Mar 2025 16:06:29 -0700 Subject: [PATCH 18/20] fixes lints --- .../pkg/benthos/transformer_executor/executor.go | 16 ++++------------ .../transform_pii_text_api.go | 4 ++-- .../transformers/gen_transform_pii_text.go | 10 +++++----- .../benthos/transformers/transform_pii_text.go | 2 +- 4 files changed, 12 insertions(+), 20 deletions(-) diff --git a/worker/pkg/benthos/transformer_executor/executor.go b/worker/pkg/benthos/transformer_executor/executor.go index d0eca49460..8050eb6b24 100644 --- a/worker/pkg/benthos/transformer_executor/executor.go +++ b/worker/pkg/benthos/transformer_executor/executor.go @@ -112,14 +112,12 @@ func InitializeTransformerByConfigType( } valueApi := newAnonValueApi() - transformPiiTextApi, err := newFromExecConfig( + transformPiiTextApi := newFromExecConfig( execCfg.transformPiiText, execCfg.transformPiiText.neosyncOperatorApi, execCfg.logger, ) - if err != nil { - return nil, err - } + runner, err := javascript.NewDefaultValueRunner(valueApi, transformPiiTextApi, execCfg.logger) if err != nil { return nil, err @@ -156,14 +154,11 @@ func InitializeTransformerByConfigType( } valueApi := newAnonValueApi() - transformPiiTextApi, err := newFromExecConfig( + transformPiiTextApi := newFromExecConfig( execCfg.transformPiiText, execCfg.transformPiiText.neosyncOperatorApi, execCfg.logger, ) - if err != nil { - return nil, err - } runner, err := javascript.NewDefaultValueRunner(valueApi, transformPiiTextApi, execCfg.logger) if err != nil { return nil, err @@ -747,14 +742,11 @@ func InitializeTransformerByConfigType( config.Language = execCfg.transformPiiText.defaultLanguage } - transformPiiTextApi, err := newFromExecConfig( + transformPiiTextApi := newFromExecConfig( execCfg.transformPiiText, execCfg.transformPiiText.neosyncOperatorApi, execCfg.logger, ) - if err != nil { - return nil, err - } return &TransformerExecutor{ Opts: nil, diff --git a/worker/pkg/benthos/transformer_executor/transform_pii_text_api.go b/worker/pkg/benthos/transformer_executor/transform_pii_text_api.go index c50b56af9a..695d2c991e 100644 --- a/worker/pkg/benthos/transformer_executor/transform_pii_text_api.go +++ b/worker/pkg/benthos/transformer_executor/transform_pii_text_api.go @@ -19,12 +19,12 @@ func newFromExecConfig( execConfig *transformPiiTextConfig, neosyncOperatorApi ee_transformer_fns.NeosyncOperatorApi, logger *slog.Logger, -) (transformers.TransformPiiTextApi, error) { +) transformers.TransformPiiTextApi { return &piiTextApi{ execConfig: execConfig, neosyncOperatorApi: neosyncOperatorApi, logger: logger, - }, nil + } } func (p *piiTextApi) Transform(ctx context.Context, config *mgmtv1alpha1.TransformPiiText, value string) (string, error) { diff --git a/worker/pkg/benthos/transformers/gen_transform_pii_text.go b/worker/pkg/benthos/transformers/gen_transform_pii_text.go index 61e0bc59a8..164b1e8d07 100644 --- a/worker/pkg/benthos/transformers/gen_transform_pii_text.go +++ b/worker/pkg/benthos/transformers/gen_transform_pii_text.go @@ -134,31 +134,31 @@ func (t *TransformPiiText) ParseOptions(opts map[string]any) (any, error) { } transformerOpts.language = language - allowedPhrases, ok := opts["allowedPhrases"].(any) + allowedPhrases, ok := opts["allowedPhrases"] if !ok { allowedPhrases = []any{} } transformerOpts.allowedPhrases = allowedPhrases - allowedEntities, ok := opts["allowedEntities"].(any) + allowedEntities, ok := opts["allowedEntities"] if !ok { allowedEntities = []any{} } transformerOpts.allowedEntities = allowedEntities var defaultAnonymizer any - if arg, ok := opts["defaultAnonymizer"].(any); ok { + if arg, ok := opts["defaultAnonymizer"]; ok { defaultAnonymizer = arg } transformerOpts.defaultAnonymizer = defaultAnonymizer - denyRecognizers, ok := opts["denyRecognizers"].(any) + denyRecognizers, ok := opts["denyRecognizers"] if !ok { denyRecognizers = []any{} } transformerOpts.denyRecognizers = denyRecognizers - entityAnonymizers, ok := opts["entityAnonymizers"].(any) + entityAnonymizers, ok := opts["entityAnonymizers"] if !ok { entityAnonymizers = map[string]any{} } diff --git a/worker/pkg/benthos/transformers/transform_pii_text.go b/worker/pkg/benthos/transformers/transform_pii_text.go index 5a84bb20d6..8ab0075455 100644 --- a/worker/pkg/benthos/transformers/transform_pii_text.go +++ b/worker/pkg/benthos/transformers/transform_pii_text.go @@ -424,7 +424,7 @@ func convertToPiiAnonymizer(raw any) (*mgmtv1alpha1.PiiAnonymizer, error) { } } else if hash, ok := configMap["hash"].(map[string]any); ok { if algo, ok := hash["algo"].(int64); ok { - convertedAlgo := mgmtv1alpha1.PiiAnonymizer_Hash_HashType(algo) + convertedAlgo := mgmtv1alpha1.PiiAnonymizer_Hash_HashType(algo) //nolint:gosec if _, ok := mgmtv1alpha1.PiiAnonymizer_Hash_HashType_name[int32(convertedAlgo)]; !ok { return nil, fmt.Errorf("invalid hash algorithm: %d", convertedAlgo) } From 7c8b3c04af3ce55c06b36b4c69ab7116f6544f52 Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Mon, 31 Mar 2025 16:11:05 -0700 Subject: [PATCH 19/20] conditionally enables pii transform in js executor --- .../benthos/transformer_executor/executor.go | 28 ++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/worker/pkg/benthos/transformer_executor/executor.go b/worker/pkg/benthos/transformer_executor/executor.go index 8050eb6b24..45bbb0f887 100644 --- a/worker/pkg/benthos/transformer_executor/executor.go +++ b/worker/pkg/benthos/transformer_executor/executor.go @@ -112,11 +112,15 @@ func InitializeTransformerByConfigType( } valueApi := newAnonValueApi() - transformPiiTextApi := newFromExecConfig( - execCfg.transformPiiText, - execCfg.transformPiiText.neosyncOperatorApi, - execCfg.logger, - ) + var transformPiiTextApi transformers.TransformPiiTextApi + if execCfg.transformPiiText != nil { + execCfg.logger.Debug("configuring using transform pii text api in generate javascript") + transformPiiTextApi = newFromExecConfig( + execCfg.transformPiiText, + execCfg.transformPiiText.neosyncOperatorApi, + execCfg.logger, + ) + } runner, err := javascript.NewDefaultValueRunner(valueApi, transformPiiTextApi, execCfg.logger) if err != nil { @@ -154,11 +158,15 @@ func InitializeTransformerByConfigType( } valueApi := newAnonValueApi() - transformPiiTextApi := newFromExecConfig( - execCfg.transformPiiText, - execCfg.transformPiiText.neosyncOperatorApi, - execCfg.logger, - ) + var transformPiiTextApi transformers.TransformPiiTextApi + if execCfg.transformPiiText != nil { + execCfg.logger.Debug("configuring using transform pii text api in transform javascript") + transformPiiTextApi = newFromExecConfig( + execCfg.transformPiiText, + execCfg.transformPiiText.neosyncOperatorApi, + execCfg.logger, + ) + } runner, err := javascript.NewDefaultValueRunner(valueApi, transformPiiTextApi, execCfg.logger) if err != nil { return nil, err From e32d48e9bfde03d1a69038d7fe55ffae86b3b24a Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Mon, 31 Mar 2025 16:50:15 -0700 Subject: [PATCH 20/20] Fixes test --- .../api/anonymization-service_integration_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/integration-tests/api/anonymization-service_integration_test.go b/internal/integration-tests/api/anonymization-service_integration_test.go index cd46cab11d..97d0a2cefb 100644 --- a/internal/integration-tests/api/anonymization-service_integration_test.go +++ b/internal/integration-tests/api/anonymization-service_integration_test.go @@ -18,10 +18,10 @@ func (s *IntegrationTestSuite) Test_AnonymizeService_AnonymizeMany() { t := s.T() t.Run("OSS-fail", func(t *testing.T) { - userclient := s.OSSUnauthenticatedLicensedClients.Users() + userclient := s.OSSUnauthenticatedUnlicensedClients.Users() s.setUser(s.ctx, userclient) accountId := s.createPersonalAccount(s.ctx, userclient) - resp, err := s.OSSUnauthenticatedLicensedClients.Anonymize().AnonymizeMany( + resp, err := s.OSSUnauthenticatedUnlicensedClients.Anonymize().AnonymizeMany( s.ctx, connect.NewRequest(&mgmtv1alpha1.AnonymizeManyRequest{ AccountId: accountId,