diff --git a/internal/temporalcli/commands.activity.go b/internal/temporalcli/commands.activity.go index 582f5d622..8d105a412 100644 --- a/internal/temporalcli/commands.activity.go +++ b/internal/temporalcli/commands.activity.go @@ -568,6 +568,39 @@ func (c *TemporalActivityTerminateCommand) run(cctx *CommandContext, args []stri return nil } +// buildInputPayloadOptions builds and returns InputPayloadOptions using at most one +// supplied values for input, inputFile, etc. It is used when input like semantics are +// implemented at other places such as for result or detail in activity complete or +// fail command handler, respectively. If both of input are inputFile are supplied, +// otherwise it return nil with error. If none of these are supplied, it returns nil without error. +func buildInputPayloadOptions( + input string, + inputFile string, + inputMeta string, + inputBase64 bool, +) (*PayloadInputOptions, error) { + if input == "" && inputFile == "" { + // no error if none are supplied. + return nil, nil + } + if input != "" && inputFile != "" { + return nil, fmt.Errorf("provide exactly one of input or inputFile") + } + + resultPayloadOpts := PayloadInputOptions{ + InputBase64: inputBase64, + } + if inputMeta != "" { + resultPayloadOpts.InputMeta = []string{inputMeta} + } + if input != "" && inputFile == "" { + resultPayloadOpts.Input = []string{input} + } else if input == "" && inputFile != "" { + resultPayloadOpts.InputFile = []string{inputFile} + } + return &resultPayloadOpts, nil +} + func (c *TemporalActivityCompleteCommand) run(cctx *CommandContext, args []string) error { cl, err := dialClient(cctx, &c.Parent.ClientOptions) if err != nil { @@ -575,8 +608,13 @@ func (c *TemporalActivityCompleteCommand) run(cctx *CommandContext, args []strin } defer cl.Close() - metadata := map[string][][]byte{"encoding": {[]byte("json/plain")}} - resultPayloads, err := CreatePayloads([][]byte{[]byte(c.Result)}, metadata, false) + resultPayloadOpts, err := buildInputPayloadOptions(c.Result, c.ResultFile, c.ResultMeta, c.ResultBase64) + if resultPayloadOpts == nil || err != nil { + // TODO: where do we check that one of result or result-file is used. also for details in activity fail command. + return fmt.Errorf("provide exactly one of result or result-file") + } + + resultPayloads, err := resultPayloadOpts.buildRawInputPayloads() if err != nil { return err } @@ -602,10 +640,15 @@ func (c *TemporalActivityFailCommand) run(cctx *CommandContext, args []string) e } defer cl.Close() + detailPayloadOpts, err := buildInputPayloadOptions(c.Detail, c.DetailFile, c.DetailMeta, c.DetailBase64) + if err != nil { + // TODO: if both detail and detail-file were used, then return error. + return fmt.Errorf("provide one of detail or detail-file, but not both") + } + var detailPayloads *common.Payloads - if len(c.Detail) > 0 { - metadata := map[string][][]byte{"encoding": {[]byte("json/plain")}} - detailPayloads, err = CreatePayloads([][]byte{[]byte(c.Detail)}, metadata, false) + if detailPayloadOpts != nil { + detailPayloads, err = detailPayloadOpts.buildRawInputPayloads() if err != nil { return err } diff --git a/internal/temporalcli/commands.activity_test.go b/internal/temporalcli/commands.activity_test.go index ec6b50a4f..267ab2682 100644 --- a/internal/temporalcli/commands.activity_test.go +++ b/internal/temporalcli/commands.activity_test.go @@ -2,14 +2,19 @@ package temporalcli_test import ( "context" + "encoding/base64" "encoding/json" "fmt" + "os" + "path/filepath" "strings" "sync" "sync/atomic" "time" + "github.com/gogo/protobuf/proto" "github.com/google/uuid" + "go.temporal.io/api/common/v1" "go.temporal.io/api/enums/v1" "go.temporal.io/api/history/v1" "go.temporal.io/api/serviceerror" @@ -117,6 +122,264 @@ func (s *SharedServerSuite) TestActivity_Fail_InvalidDetail() { s.Nil(failed) } +// Tests activity complete --result-meta ... --result-file ... --result-base64 +func (s *SharedServerSuite) TestActivity_Complete_ResultMeta() { + activityStarted := make(chan struct{}) + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + close(activityStarted) + <-ctx.Done() + return nil, ctx.Err() + }) + + activityId := "sa-complete-test-result-meta" + started := s.startActivity(activityId) + runID := started["runId"].(string) + <-activityStarted + + // although including nul (\x00) will work here, it won't in actual command line. + result := "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a" + + res := s.Execute( + "activity", "complete", + "--activity-id", activityId, + "--run-id", runID, + "--result", result, + "--result-meta", "encoding=binary/plain", + "--address", s.Address(), + ) + s.NoError(res.Err) + + handle := s.Client.GetActivityHandle(client.GetActivityHandleOptions{ + ActivityID: activityId, + RunID: runID, + }) + var actual []byte + s.NoError(handle.Get(s.Context, &actual)) + s.Equal(result, string(actual)) + + res = s.Execute( + "activity", "result", + "--activity-id", activityId, + "--run-id", runID, + "--address", s.Address(), + ) + s.NoError(res.Err) + + output := res.Stdout.String() + s.Contains(output, base64.StdEncoding.EncodeToString([]byte(result))) +} + +func (s *SharedServerSuite) TestActivity_Complete_ResultMeta_Protobuf() { + activityStarted := make(chan struct{}) + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + close(activityStarted) + <-ctx.Done() + return nil, ctx.Err() + }) + + activityId := "sa-complete-test-result-meta-protobuf" + started := s.startActivity(activityId) + runID := started["runId"].(string) + <-activityStarted + + // similar to workflow input-meta test + result := &workflowservice.StartWorkflowExecutionRequest{ + WorkflowId: "result-meta-test", + Input: &common.Payloads{ + Payloads: []*common.Payload{ + {Data: []byte("xyz")}, + }, + }, + } + serialized, _ := proto.Marshal(result) + + res := s.Execute( + "activity", "complete", + "--activity-id", activityId, + "--run-id", runID, + "--result", string(serialized), + "--result-meta", "encoding=binary/protobuf", + "--address", s.Address(), + ) + s.NoError(res.Err) + + handle := s.Client.GetActivityHandle(client.GetActivityHandleOptions{ + ActivityID: activityId, + RunID: runID, + }) + var actual workflowservice.StartWorkflowExecutionRequest + s.NoError(handle.Get(s.Context, &actual)) + actualSerialized, _ := proto.Marshal(&actual) + s.Equal(serialized, actualSerialized) + s.Equal(result.WorkflowId, actual.WorkflowId) + s.Equal(result.Input.Payloads[0].Data, actual.Input.Payloads[0].Data) + +} + +func (s *SharedServerSuite) TestActivity_Complete_ResultFile() { + activityStarted := make(chan struct{}) + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + close(activityStarted) + <-ctx.Done() + return nil, ctx.Err() + }) + + activityId := "sa-complete-test-result-file" + started := s.startActivity(activityId) + runID := started["runId"].(string) + <-activityStarted + + // this can include nul, as it gets stored in a file. + result := "\x48\x65\x6c\x6c\x6f\x00\x57\x6f\x72\x6c\x64" + resultFile := filepath.Join(s.T().TempDir(), "input.bin") + os.WriteFile(resultFile, []byte(result), 0644) + defer os.Remove(resultFile) + + res := s.Execute( + "activity", "complete", + "--activity-id", activityId, + "--run-id", runID, + "--result", result, + "--result-file", resultFile, + "--address", s.Address(), + ) + s.ErrorContains(res.Err, "one of result or result-file") + + res = s.Execute( + "activity", "complete", + "--activity-id", activityId, + "--run-id", runID, + "--result-file", resultFile, + "--result-meta", "encoding=binary/plain", + "--identity", identity, + "--address", s.Address(), + ) + s.NoError(res.Err) + + handle := s.Client.GetActivityHandle(client.GetActivityHandleOptions{ + ActivityID: activityId, + RunID: runID, + }) + var actual []byte + s.NoError(handle.Get(s.Context, &actual)) + s.Equal(result, string(actual)) +} + +func (s *SharedServerSuite) TestActivity_Complete_ResultBase64() { + activityStarted := make(chan struct{}) + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + close(activityStarted) + <-ctx.Done() + return nil, ctx.Err() + }) + + activityId := "sa-complete-test-result-base64" + started := s.startActivity(activityId) + runID := started["runId"].(string) + <-activityStarted + + // base64 encoding works with nul character. + result := "\x48\x65\x6c\x6c\x6f\x00\x57\x6f\x72\x6c\x64" + encoded := base64.StdEncoding.EncodeToString([]byte(result)) + + res := s.Execute( + "activity", "complete", + "--activity-id", activityId, + "--run-id", runID, + "--result", encoded, + "--result-meta", "encoding=binary/plain", + "--result-base64", + "--address", s.Address(), + ) + s.NoError(res.Err) + + handle := s.Client.GetActivityHandle(client.GetActivityHandleOptions{ + ActivityID: activityId, + RunID: runID, + }) + var actual []byte + s.NoError(handle.Get(s.Context, &actual)) + s.Equal(result, string(actual)) +} + +// Tests activity fail --detail-file ... --detail-meta ... +func (s *SharedServerSuite) TestActivity_Fail_DetailMeta() { + activityStarted := make(chan struct{}) + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + close(activityStarted) + <-ctx.Done() + return nil, ctx.Err() + }) + + activityId := "sa-fail-test-detail-meta" + started := s.startActivity(activityId) + runID := started["runId"].(string) + <-activityStarted + + detail := "\x48\x65\x6c\x6c\x6f\x00\x57\x6f\x72\x6c\x64" + detailFile := filepath.Join(s.T().TempDir(), "input.bin") + os.WriteFile(detailFile, []byte(detail), 0644) + defer os.Remove(detailFile) + + res := s.Execute( + "activity", "fail", + "--activity-id", activityId, + "--run-id", runID, + "--detail", detail, + "--detail-file", detailFile, + "--address", s.Address(), + ) + s.ErrorContains(res.Err, "one of detail or detail-file") + + res = s.Execute( + "activity", "fail", + "--activity-id", activityId, + "--run-id", runID, + "--reason", "testing for fail", + "--detail-file", detailFile, + "--detail-meta", "encoding=binary/plain", + "--identity", identity, + "--address", s.Address(), + ) + s.NoError(res.Err) + + handle := s.Client.GetActivityHandle(client.GetActivityHandleOptions{ + ActivityID: activityId, + RunID: runID, + }) + err := handle.Get(s.Context, nil) + s.Error(err) + s.Equal(err.Error(), "testing for fail") + + res = s.Execute( + "activity", "result", + "-o", "json", + "--activity-id", activityId, + "--run-id", runID, + "--address", s.Address(), + ) + s.Error(res.Err) + + output := res.Stdout.String() + s.Contains(output, "testing for fail") + s.Contains(output, base64.StdEncoding.EncodeToString([]byte(detail))) + + // //// TODO: is there a simpler way to check this? + // var jsonOut map[string]any + // s.NoError(json.Unmarshal(output, &jsonOut)) + // s.NotEmpty(jsonOut["failure"]) + // jsonOut = jsonOut["failure"].(map[string]any) + // s.NotEmpty(jsonOut["applicationFailureInfo"]) + // jsonOut = jsonOut["details"].(map[string]any) + // s.NotEmpty(jsonOut["payloads"]) + // jsonArr := jsonOut["payloads"].([]any) + // s.NotEmpty(jsonArr) + // jsonOut = jsonArr[0].(map[string]any) + // s.NotEmpty(jsonOut["data"]) + // s.Equal(detail, jsonOut["data"]) + +} + func (s *SharedServerSuite) TestActivityOptionsUpdate_Accept() { run := s.waitActivityStarted() wid := run.GetID() diff --git a/internal/temporalcli/commands.gen.go b/internal/temporalcli/commands.gen.go index 03424647a..d6141185d 100644 --- a/internal/temporalcli/commands.gen.go +++ b/internal/temporalcli/commands.gen.go @@ -533,12 +533,15 @@ func NewTemporalActivityCancelCommand(cctx *CommandContext, parent *TemporalActi } type TemporalActivityCompleteCommand struct { - Parent *TemporalActivityCommand - Command cobra.Command - ActivityId string - WorkflowId string - RunId string - Result string + Parent *TemporalActivityCommand + Command cobra.Command + ActivityId string + WorkflowId string + RunId string + Result string + ResultFile string + ResultMeta string + ResultBase64 bool } func NewTemporalActivityCompleteCommand(cctx *CommandContext, parent *TemporalActivityCommand) *TemporalActivityCompleteCommand { @@ -557,8 +560,10 @@ func NewTemporalActivityCompleteCommand(cctx *CommandContext, parent *TemporalAc _ = cobra.MarkFlagRequired(s.Command.Flags(), "activity-id") s.Command.Flags().StringVarP(&s.WorkflowId, "workflow-id", "w", "", "Workflow ID. Required for workflow Activities. Omit for Standalone Activities.") s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run ID. For workflow Activities (when --workflow-id is provided), this is the Workflow Run ID. For Standalone Activities, this is the Activity Run ID.") - s.Command.Flags().StringVar(&s.Result, "result", "", "Result `JSON` to return. Required.") - _ = cobra.MarkFlagRequired(s.Command.Flags(), "result") + s.Command.Flags().StringVar(&s.Result, "result", "", "Result to return. Use JSON content or set --result-meta to override. Can't be combined with --result-file.") + s.Command.Flags().StringVar(&s.ResultFile, "result-file", "", "A path for result file. Use JSON content or set --result-file to override. Can't be combined with --result.") + s.Command.Flags().StringVar(&s.ResultMeta, "result-meta", "", "Result payload metadata as a `KEY=VALUE` pair. When the KEY is \"encoding\", this overrides the default (\"json/plain\").") + s.Command.Flags().BoolVar(&s.ResultBase64, "result-base64", false, "Assume result is base64-encoded and attempt to decode them.") s.Command.Run = func(c *cobra.Command, args []string) { if err := s.run(cctx, args); err != nil { cctx.Options.Fail(err) @@ -653,13 +658,16 @@ func NewTemporalActivityExecuteCommand(cctx *CommandContext, parent *TemporalAct } type TemporalActivityFailCommand struct { - Parent *TemporalActivityCommand - Command cobra.Command - ActivityId string - WorkflowId string - RunId string - Detail string - Reason string + Parent *TemporalActivityCommand + Command cobra.Command + ActivityId string + WorkflowId string + RunId string + Detail string + DetailFile string + DetailMeta string + DetailBase64 bool + Reason string } func NewTemporalActivityFailCommand(cctx *CommandContext, parent *TemporalActivityCommand) *TemporalActivityFailCommand { @@ -678,7 +686,10 @@ func NewTemporalActivityFailCommand(cctx *CommandContext, parent *TemporalActivi _ = cobra.MarkFlagRequired(s.Command.Flags(), "activity-id") s.Command.Flags().StringVarP(&s.WorkflowId, "workflow-id", "w", "", "Workflow ID. Required for workflow Activities. Omit for Standalone Activities.") s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run ID. For workflow Activities (when --workflow-id is provided), this is the Workflow Run ID. For Standalone Activities, this is the Activity Run ID.") - s.Command.Flags().StringVar(&s.Detail, "detail", "", "Failure detail (JSON). Attached as the failure details payload.") + s.Command.Flags().StringVar(&s.Detail, "detail", "", "Failure detail. Attached as the failure details payload. Use JSON content or set --detail-meta to override. Can't be combined with --detail-file.") + s.Command.Flags().StringVar(&s.DetailFile, "detail-file", "", "A path for result file. Use JSON content or set --detail-file to override. Can't be combined with --detail.") + s.Command.Flags().StringVar(&s.DetailMeta, "detail-meta", "", "Result payload metadata as a `KEY=VALUE` pair. When the KEY is \"encoding\", this overrides the default (\"json/plain\").") + s.Command.Flags().BoolVar(&s.DetailBase64, "detail-base64", false, "Assume detail is base64-encoded and attempt to decode them.") s.Command.Flags().StringVar(&s.Reason, "reason", "", "Failure reason. Attached as the failure message.") s.Command.Run = func(c *cobra.Command, args []string) { if err := s.run(cctx, args); err != nil { diff --git a/internal/temporalcli/commands.yaml b/internal/temporalcli/commands.yaml index 78bdf3513..dd9a07de9 100644 --- a/internal/temporalcli/commands.yaml +++ b/internal/temporalcli/commands.yaml @@ -235,8 +235,25 @@ commands: Activities, this is the Activity Run ID. - name: result type: string - description: Result `JSON` to return. - required: true + description: | + Result to return. + Use JSON content or set --result-meta to override. + Can't be combined with --result-file. + - name: result-file + type: string + description: | + A path for result file. + Use JSON content or set --result-file to override. + Can't be combined with --result. + - name: result-meta + type: string + description: | + Result payload metadata as a `KEY=VALUE` pair. + When the KEY is "encoding", this overrides the default ("json/plain"). + - name: result-base64 + type: bool + description: | + Assume result is base64-encoded and attempt to decode them. - name: temporal activity count summary: Count Standalone Activities matching a query (Experimental) @@ -326,8 +343,24 @@ commands: - name: detail type: string description: | - Failure detail (JSON). Attached as the failure details - payload. + Failure detail. Attached as the failure details payload. + Use JSON content or set --detail-meta to override. + Can't be combined with --detail-file. + - name: detail-file + type: string + description: | + A path for result file. + Use JSON content or set --detail-file to override. + Can't be combined with --detail. + - name: detail-meta + type: string + description: | + Result payload metadata as a `KEY=VALUE` pair. + When the KEY is "encoding", this overrides the default ("json/plain"). + - name: detail-base64 + type: bool + description: | + Assume detail is base64-encoded and attempt to decode them. - name: reason type: string description: |