Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions plugin/external/remote_step.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *RemoteStep) executeRequest(pc *module.PipelineContext, resolvedConfig m
if err != nil {
return nil, fmt.Errorf("remote step %q (handle %s) encode trigger_data as Struct: %w", s.name, s.handleID, err)
}
metadata, err := mapToStruct(pc.Metadata)
metadata, err := mapToStruct(remotePluginMetadata(pc.Metadata))
if err != nil {
return nil, fmt.Errorf("remote step %q (handle %s) encode metadata as Struct: %w", s.name, s.handleID, err)
}
Expand Down Expand Up @@ -149,7 +149,7 @@ func (s *RemoteStep) executeRequest(pc *module.PipelineContext, resolvedConfig m
if s.contract.Mode == pb.ContractMode_CONTRACT_MODE_LEGACY_STRUCT {
return req, nil
}
typedConfig, err := mapToTypedAny(s.contract.ConfigMessage, resolvedConfig, s.types)
typedConfig, err := mapToTypedAny(s.contract.ConfigMessage, stripInternalKeys(resolvedConfig), s.types)
if err != nil {
if s.contract.Mode == pb.ContractMode_CONTRACT_MODE_STRICT_PROTO {
return nil, fmt.Errorf("remote step %q STRICT_PROTO config message %q cannot use legacy Struct fallback: %w", s.name, s.contract.ConfigMessage, err)
Expand Down Expand Up @@ -177,6 +177,20 @@ func (s *RemoteStep) executeRequest(pc *module.PipelineContext, resolvedConfig m
return req, nil
}

func remotePluginMetadata(metadata map[string]any) map[string]any {
if metadata == nil {
return nil
}
filtered := make(map[string]any, len(metadata))
for key, value := range metadata {
if _, err := structpb.NewValue(value); err != nil {
continue
}
filtered[key] = value
}
return filtered
}

// Destroy releases the remote step resources.
func (s *RemoteStep) Destroy() error {
resp, err := s.client.DestroyStep(context.Background(), &pb.HandleRequest{
Expand Down
68 changes: 68 additions & 0 deletions plugin/external/remote_step_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,74 @@ func TestRemoteStep_Execute_StrictContractSkipsLegacyStructEncodeForCurrent(t *t
}
}

func TestRemoteStep_Execute_FiltersUnrepresentableMetadata(t *testing.T) {
stub := &stubPluginServiceClient{}
step := NewRemoteStep("test-step", "handle-metadata", stub, nil)
pc := module.NewPipelineContext(map[string]any{"name": "typed-input"}, map[string]any{
"pipeline": "http-flow",
"_http_response_writer": make(chan int),
"_http_request": map[int]string{1: "request"},
"explicit_trace": true,
})

if _, err := step.Execute(context.Background(), pc); err != nil {
t.Fatalf("Execute returned error: %v", err)
}
if stub.lastRequest == nil {
t.Fatal("expected ExecuteStep to be called")
}
got := stub.lastRequest.Metadata.AsMap()
if got["pipeline"] != "http-flow" {
t.Fatalf("expected serializable metadata to be preserved, got %#v", got)
}
if got["explicit_trace"] != true {
t.Fatalf("expected boolean metadata to be preserved, got %#v", got)
}
if _, ok := got["_http_response_writer"]; ok {
t.Fatalf("expected response writer metadata to be filtered, got %#v", got)
}
if _, ok := got["_http_request"]; ok {
t.Fatalf("expected request metadata to be filtered, got %#v", got)
}
}

func TestRemoteStep_Execute_StrictContractStripsInternalConfigKeys(t *testing.T) {
stub := &stubPluginServiceClient{
response: &pb.ExecuteStepResponse{TypedOutput: mustAnyFromMapForTest(t, "workflow.plugin.v1.Manifest", map[string]any{
"name": "typed-output",
"version": "v1",
})},
}
contract := &pb.ContractDescriptor{
Kind: pb.ContractKind_CONTRACT_KIND_STEP,
StepType: "test.strict",
ConfigMessage: "workflow.plugin.v1.Manifest",
InputMessage: "workflow.plugin.v1.Manifest",
OutputMessage: "workflow.plugin.v1.Manifest",
Mode: pb.ContractMode_CONTRACT_MODE_STRICT_PROTO,
}
step := NewRemoteStep("test-step", "handle-strict", stub, map[string]any{
"_config_dir": "/config",
"name": "typed-config",
"version": "v1",
}, contract)
pc := module.NewPipelineContext(map[string]any{
"name": "typed-input",
"version": "v1",
}, nil)

if _, err := step.Execute(context.Background(), pc); err != nil {
t.Fatalf("STRICT_PROTO execute should strip internal config keys before typed encode; got %v", err)
}
if stub.lastRequest == nil {
t.Fatal("expected ExecuteStep to be called")
}
if stub.lastRequest.Config != nil {
t.Fatalf("expected strict step to omit legacy Config, got %v", stub.lastRequest.Config)
}
assertAnyTypeForTest(t, stub.lastRequest.TypedConfig, "workflow.plugin.v1.Manifest")
}

func TestRemoteStep_Execute_StrictContractFiltersUnknownCurrentFields(t *testing.T) {
stub := &stubPluginServiceClient{
response: &pb.ExecuteStepResponse{TypedOutput: mustAnyFromMapForTest(t, "workflow.plugin.v1.Manifest", map[string]any{
Expand Down
Loading