Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 48 additions & 5 deletions internal/temporalcli/commands.activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,15 +568,53 @@ func (c *TemporalActivityTerminateCommand) run(cctx *CommandContext, args []stri
return nil
}

// buildInputPayloadOptions builds and returns InputPayloadOptions using at most one
// supplied values for input, inputFile, etc. It is used when input like semantics are
// implemented at other places such as for result or detail in activity complete or
// fail command handler, respectively. If both of input are inputFile are supplied,
// otherwise it return nil with error. If none of these are supplied, it returns nil without error.
func buildInputPayloadOptions(
input string,
inputFile string,
inputMeta string,
inputBase64 bool,
) (*PayloadInputOptions, error) {
if input == "" && inputFile == "" {
// no error if none are supplied.
return nil, nil
}
if input != "" && inputFile != "" {
return nil, fmt.Errorf("provide exactly one of input or inputFile")
}

resultPayloadOpts := PayloadInputOptions{
InputBase64: inputBase64,
}
if inputMeta != "" {
resultPayloadOpts.InputMeta = []string{inputMeta}
}
if input != "" && inputFile == "" {
resultPayloadOpts.Input = []string{input}
} else if input == "" && inputFile != "" {
resultPayloadOpts.InputFile = []string{inputFile}
}
return &resultPayloadOpts, nil
}

func (c *TemporalActivityCompleteCommand) run(cctx *CommandContext, args []string) error {
cl, err := dialClient(cctx, &c.Parent.ClientOptions)
if err != nil {
return err
}
defer cl.Close()

metadata := map[string][][]byte{"encoding": {[]byte("json/plain")}}
resultPayloads, err := CreatePayloads([][]byte{[]byte(c.Result)}, metadata, false)
resultPayloadOpts, err := buildInputPayloadOptions(c.Result, c.ResultFile, c.ResultMeta, c.ResultBase64)
if resultPayloadOpts == nil || err != nil {
// TODO: where do we check that one of result or result-file is used. also for details in activity fail command.
return fmt.Errorf("provide exactly one of result or result-file")
}

resultPayloads, err := resultPayloadOpts.buildRawInputPayloads()
if err != nil {
return err
}
Expand All @@ -602,10 +640,15 @@ func (c *TemporalActivityFailCommand) run(cctx *CommandContext, args []string) e
}
defer cl.Close()

detailPayloadOpts, err := buildInputPayloadOptions(c.Detail, c.DetailFile, c.DetailMeta, c.DetailBase64)
if err != nil {
// TODO: if both detail and detail-file were used, then return error.
return fmt.Errorf("provide one of detail or detail-file, but not both")
}

var detailPayloads *common.Payloads
if len(c.Detail) > 0 {
metadata := map[string][][]byte{"encoding": {[]byte("json/plain")}}
detailPayloads, err = CreatePayloads([][]byte{[]byte(c.Detail)}, metadata, false)
if detailPayloadOpts != nil {
detailPayloads, err = detailPayloadOpts.buildRawInputPayloads()
if err != nil {
return err
}
Expand Down
263 changes: 263 additions & 0 deletions internal/temporalcli/commands.activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@ package temporalcli_test

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/gogo/protobuf/proto"
"github.com/google/uuid"
"go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"
Expand Down Expand Up @@ -117,6 +122,264 @@ func (s *SharedServerSuite) TestActivity_Fail_InvalidDetail() {
s.Nil(failed)
}

// Tests activity complete --result-meta ... --result-file ... --result-base64
func (s *SharedServerSuite) TestActivity_Complete_ResultMeta() {
activityStarted := make(chan struct{})
s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) {
close(activityStarted)
<-ctx.Done()
return nil, ctx.Err()
})

activityId := "sa-complete-test-result-meta"
started := s.startActivity(activityId)
runID := started["runId"].(string)
<-activityStarted

// although including nul (\x00) will work here, it won't in actual command line.
result := "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a"

res := s.Execute(
"activity", "complete",
"--activity-id", activityId,
"--run-id", runID,
"--result", result,
"--result-meta", "encoding=binary/plain",
"--address", s.Address(),
)
s.NoError(res.Err)

handle := s.Client.GetActivityHandle(client.GetActivityHandleOptions{
ActivityID: activityId,
RunID: runID,
})
var actual []byte
s.NoError(handle.Get(s.Context, &actual))
s.Equal(result, string(actual))

res = s.Execute(
"activity", "result",
"--activity-id", activityId,
"--run-id", runID,
"--address", s.Address(),
)
s.NoError(res.Err)

output := res.Stdout.String()
s.Contains(output, base64.StdEncoding.EncodeToString([]byte(result)))
}

func (s *SharedServerSuite) TestActivity_Complete_ResultMeta_Protobuf() {
activityStarted := make(chan struct{})
s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) {
close(activityStarted)
<-ctx.Done()
return nil, ctx.Err()
})

activityId := "sa-complete-test-result-meta-protobuf"
started := s.startActivity(activityId)
runID := started["runId"].(string)
<-activityStarted

// similar to workflow input-meta test
result := &workflowservice.StartWorkflowExecutionRequest{
WorkflowId: "result-meta-test",
Input: &common.Payloads{
Payloads: []*common.Payload{
{Data: []byte("xyz")},
},
},
}
serialized, _ := proto.Marshal(result)

res := s.Execute(
"activity", "complete",
"--activity-id", activityId,
"--run-id", runID,
"--result", string(serialized),
"--result-meta", "encoding=binary/protobuf",
"--address", s.Address(),
)
s.NoError(res.Err)

handle := s.Client.GetActivityHandle(client.GetActivityHandleOptions{
ActivityID: activityId,
RunID: runID,
})
var actual workflowservice.StartWorkflowExecutionRequest
s.NoError(handle.Get(s.Context, &actual))
actualSerialized, _ := proto.Marshal(&actual)
s.Equal(serialized, actualSerialized)
s.Equal(result.WorkflowId, actual.WorkflowId)
s.Equal(result.Input.Payloads[0].Data, actual.Input.Payloads[0].Data)

}

func (s *SharedServerSuite) TestActivity_Complete_ResultFile() {
activityStarted := make(chan struct{})
s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) {
close(activityStarted)
<-ctx.Done()
return nil, ctx.Err()
})

activityId := "sa-complete-test-result-file"
started := s.startActivity(activityId)
runID := started["runId"].(string)
<-activityStarted

// this can include nul, as it gets stored in a file.
result := "\x48\x65\x6c\x6c\x6f\x00\x57\x6f\x72\x6c\x64"
resultFile := filepath.Join(s.T().TempDir(), "input.bin")
os.WriteFile(resultFile, []byte(result), 0644)
defer os.Remove(resultFile)

res := s.Execute(
"activity", "complete",
"--activity-id", activityId,
"--run-id", runID,
"--result", result,
"--result-file", resultFile,
"--address", s.Address(),
)
s.ErrorContains(res.Err, "one of result or result-file")

res = s.Execute(
"activity", "complete",
"--activity-id", activityId,
"--run-id", runID,
"--result-file", resultFile,
"--result-meta", "encoding=binary/plain",
"--identity", identity,
"--address", s.Address(),
)
s.NoError(res.Err)

handle := s.Client.GetActivityHandle(client.GetActivityHandleOptions{
ActivityID: activityId,
RunID: runID,
})
var actual []byte
s.NoError(handle.Get(s.Context, &actual))
s.Equal(result, string(actual))
}

func (s *SharedServerSuite) TestActivity_Complete_ResultBase64() {
activityStarted := make(chan struct{})
s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) {
close(activityStarted)
<-ctx.Done()
return nil, ctx.Err()
})

activityId := "sa-complete-test-result-base64"
started := s.startActivity(activityId)
runID := started["runId"].(string)
<-activityStarted

// base64 encoding works with nul character.
result := "\x48\x65\x6c\x6c\x6f\x00\x57\x6f\x72\x6c\x64"
encoded := base64.StdEncoding.EncodeToString([]byte(result))

res := s.Execute(
"activity", "complete",
"--activity-id", activityId,
"--run-id", runID,
"--result", encoded,
"--result-meta", "encoding=binary/plain",
"--result-base64",
"--address", s.Address(),
)
s.NoError(res.Err)

handle := s.Client.GetActivityHandle(client.GetActivityHandleOptions{
ActivityID: activityId,
RunID: runID,
})
var actual []byte
s.NoError(handle.Get(s.Context, &actual))
s.Equal(result, string(actual))
}

// Tests activity fail --detail-file ... --detail-meta ...
func (s *SharedServerSuite) TestActivity_Fail_DetailMeta() {
activityStarted := make(chan struct{})
s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) {
close(activityStarted)
<-ctx.Done()
return nil, ctx.Err()
})

activityId := "sa-fail-test-detail-meta"
started := s.startActivity(activityId)
runID := started["runId"].(string)
<-activityStarted

detail := "\x48\x65\x6c\x6c\x6f\x00\x57\x6f\x72\x6c\x64"
detailFile := filepath.Join(s.T().TempDir(), "input.bin")
os.WriteFile(detailFile, []byte(detail), 0644)
defer os.Remove(detailFile)

res := s.Execute(
"activity", "fail",
"--activity-id", activityId,
"--run-id", runID,
"--detail", detail,
"--detail-file", detailFile,
"--address", s.Address(),
)
s.ErrorContains(res.Err, "one of detail or detail-file")

res = s.Execute(
"activity", "fail",
"--activity-id", activityId,
"--run-id", runID,
"--reason", "testing for fail",
"--detail-file", detailFile,
"--detail-meta", "encoding=binary/plain",
"--identity", identity,
"--address", s.Address(),
)
s.NoError(res.Err)

handle := s.Client.GetActivityHandle(client.GetActivityHandleOptions{
ActivityID: activityId,
RunID: runID,
})
err := handle.Get(s.Context, nil)
s.Error(err)
s.Equal(err.Error(), "testing for fail")

res = s.Execute(
"activity", "result",
"-o", "json",
"--activity-id", activityId,
"--run-id", runID,
"--address", s.Address(),
)
s.Error(res.Err)

output := res.Stdout.String()
s.Contains(output, "testing for fail")
s.Contains(output, base64.StdEncoding.EncodeToString([]byte(detail)))

// //// TODO: is there a simpler way to check this?
// var jsonOut map[string]any
// s.NoError(json.Unmarshal(output, &jsonOut))
// s.NotEmpty(jsonOut["failure"])
// jsonOut = jsonOut["failure"].(map[string]any)
// s.NotEmpty(jsonOut["applicationFailureInfo"])
// jsonOut = jsonOut["details"].(map[string]any)
// s.NotEmpty(jsonOut["payloads"])
// jsonArr := jsonOut["payloads"].([]any)
// s.NotEmpty(jsonArr)
// jsonOut = jsonArr[0].(map[string]any)
// s.NotEmpty(jsonOut["data"])
// s.Equal(detail, jsonOut["data"])

}

func (s *SharedServerSuite) TestActivityOptionsUpdate_Accept() {
run := s.waitActivityStarted()
wid := run.GetID()
Expand Down
Loading
Loading