diff --git a/cmd/functions.go b/cmd/functions.go index 62968b7..5386829 100644 --- a/cmd/functions.go +++ b/cmd/functions.go @@ -48,8 +48,8 @@ func buildFmeFlowRequest(endpoint string, method string, body io.Reader) (http.R } // since the JSON for published parameters has subtypes, we need to implement this ourselves -func (f *JobRequest) UnmarshalJSON(b []byte) error { - type job JobRequest +func (f *JobRequestV3) UnmarshalJSON(b []byte) error { + type job JobRequestV3 err := json.Unmarshal(b, (*job)(f)) if err != nil { return err @@ -85,9 +85,9 @@ func (f *JobRequest) UnmarshalJSON(b []byte) error { return nil } -func (f *JobRequest) MarshalJSON() ([]byte, error) { +func (f *JobRequestV3) MarshalJSON() ([]byte, error) { - type job JobRequest + type job JobRequestV3 if f.PublishedParameters != nil { for _, v := range f.PublishedParameters { b, err := json.Marshal(v) diff --git a/cmd/jobs.go b/cmd/jobs.go index 658984b..4a2f548 100644 --- a/cmd/jobs.go +++ b/cmd/jobs.go @@ -41,28 +41,28 @@ type JobStatusV4 struct { } type JobStatusV3 struct { - Request JobRequest `json:"request"` - TimeDelivered time.Time `json:"timeDelivered"` - Workspace string `json:"workspace"` - NumErrors int `json:"numErrors"` - NumLines int `json:"numLines"` - EngineHost string `json:"engineHost"` - TimeQueued time.Time `json:"timeQueued"` - CPUPct float64 `json:"cpuPct"` - Description string `json:"description"` - TimeStarted time.Time `json:"timeStarted"` - Repository string `json:"repository"` - UserName string `json:"userName"` - Result JobResult `json:"result"` - CPUTime int `json:"cpuTime"` - ID int `json:"id"` - TimeFinished time.Time `json:"timeFinished"` - EngineName string `json:"engineName"` - NumWarnings int `json:"numWarnings"` - TimeSubmitted time.Time `json:"timeSubmitted"` - ElapsedTime int `json:"elapsedTime"` - PeakMemUsage int `json:"peakMemUsage"` - Status string `json:"status"` + Request JobRequestV3 `json:"request"` + TimeDelivered time.Time `json:"timeDelivered"` + Workspace string `json:"workspace"` + NumErrors int `json:"numErrors"` + NumLines int `json:"numLines"` + EngineHost string `json:"engineHost"` + TimeQueued time.Time `json:"timeQueued"` + CPUPct float64 `json:"cpuPct"` + Description string `json:"description"` + TimeStarted time.Time `json:"timeStarted"` + Repository string `json:"repository"` + UserName string `json:"userName"` + Result JobResultV3 `json:"result"` + CPUTime int `json:"cpuTime"` + ID int `json:"id"` + TimeFinished time.Time `json:"timeFinished"` + EngineName string `json:"engineName"` + NumWarnings int `json:"numWarnings"` + TimeSubmitted time.Time `json:"timeSubmitted"` + ElapsedTime int `json:"elapsedTime"` + PeakMemUsage int `json:"peakMemUsage"` + Status string `json:"status"` } type JobsV4 struct { diff --git a/cmd/run.go b/cmd/run.go index 69757c4..cd973fe 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -16,6 +16,7 @@ import ( "github.com/jedib0t/go-pretty/v6/table" "github.com/spf13/cobra" + "github.com/spf13/viper" ) type PublishedParameter struct { @@ -41,7 +42,20 @@ type JobId struct { Id int `json:"id"` } -type JobRequest struct { +type JobRequestV4 struct { + Directives map[string]string `json:"directives,omitempty"` + FailureTopics []string `json:"failureTopics,omitempty"` + SuccessTopics []string `json:"successTopics,omitempty"` + MaxJobRuntime int `json:"maxJobRuntime,omitempty"` + MaxTimeInQueue int `json:"maxTimeInQueue,omitempty"` + Queue string `json:"queue,omitempty"` + Repository string `json:"repository,omitempty"` + Workspace string `json:"workspace,omitempty"` + PublishedParameters map[string]interface{} `json:"publishedParameters,omitempty"` + MaxTotalLifeTime int `json:"maxTotalLifeTime,omitempty"` +} + +type JobRequestV3 struct { PublishedParameters []interface{} `json:"-"` RawPublishedParameters []json.RawMessage `json:"publishedParameters,omitempty"` TMDirectives struct { @@ -58,7 +72,19 @@ type JobRequest struct { } `json:"NMDirectives,omitempty"` } -type JobResult struct { +type JobResultV4 struct { + ID int `json:"id"` + FeatureOutputCount int `json:"featureOutputCount"` + RequesterHost string `json:"requesterHost"` + RequesterResultPort int `json:"requesterResultPort"` + Status string `json:"status"` + StatusMessage string `json:"statusMessage"` + TimeFinished time.Time `json:"timeFinished"` + TimeQueued time.Time `json:"timeQueued"` + TimeStarted time.Time `json:"timeStarted"` +} + +type JobResultV3 struct { TimeRequested time.Time `json:"timeRequested"` RequesterResultPort int `json:"requesterResultPort"` NumFeaturesOutput int `json:"numFeaturesOutput"` @@ -76,9 +102,6 @@ type runFlags struct { repository string wait bool rtc bool - ttc int - ttl int - tag string description string sourceData string successTopics []string @@ -86,6 +109,11 @@ type runFlags struct { publishedParameter []string listPublishedParameter []string nodeManagerDirective []string + directive []string + queue string + maxJobRuntime int + maxTimeInQueue int + maxTotalLifeTime int outputType string noHeaders bool } @@ -104,7 +132,7 @@ func newRunCmd() *cobra.Command { fmeflow run --repository Samples --workspace austinApartments.fmw --wait # Submit a job to a specific queue and set a time to live in the queue - fmeflow run --repository Samples --workspace austinApartments.fmw --tag Queue1 --time-to-live 120 + fmeflow run --repository Samples --workspace austinApartments.fmw --queue Queue1 --max-time-in-queue 120 # Submit a job and pass in a few published parameters fmeflow run --repository Samples --workspace austinDownload.fmw --published-parameter-list THEMES=railroad,airports --published-parameter COORDSYS=TX83-CF @@ -121,17 +149,22 @@ func newRunCmd() *cobra.Command { cmd.Flags().StringVar(&f.repository, "repository", "", "The name of the repository containing the workspace to run.") cmd.Flags().StringVar(&f.workspace, "workspace", "", "The name of the workspace to run.") cmd.Flags().BoolVar(&f.wait, "wait", false, "Submit job and wait for it to finish.") - cmd.Flags().StringVar(&f.tag, "tag", "", "The job routing tag for the request") cmd.Flags().StringArrayVar(&f.publishedParameter, "published-parameter", []string{}, "Published parameters defined for this workspace. Specify as Key=Value. Can be passed in multiple times. For list parameters, use the --list-published-parameter flag.") cmd.Flags().StringArrayVar(&f.listPublishedParameter, "published-parameter-list", []string{}, "A List-type published parameters defined for this workspace. Specify as Key=Value1,Value2. Can be passed in multiple times.") - cmd.Flags().StringVar(&f.sourceData, "file", "", "Upload a local file Source dataset to use to run the workspace. Note this causes the translation to run in synchonous mode whether the --wait flag is passed in or not.") - cmd.Flags().BoolVar(&f.rtc, "run-until-canceled", false, "Runs a job until it is explicitly canceled. The job will run again regardless of whether the job completed successfully, failed, or the server crashed or was shut down.") - cmd.Flags().IntVar(&f.ttc, "time-until-canceled", -1, "Time (in seconds) elapsed for a running job before it's cancelled. The minimum value is 1 second, values less than 1 second are ignored.") - cmd.Flags().IntVar(&f.ttl, "time-to-live", -1, "Time to live in the job queue (in seconds)") - cmd.Flags().StringVar(&f.description, "description", "", "Description of the request.") + cmd.Flags().StringVar(&f.sourceData, "file", "", "Upload a local file Source dataset to use to run the workspace. Note this causes the translation to run in synchonous mode whether the --wait flag is passed in or not. For v3 API only.") + cmd.Flags().BoolVar(&f.rtc, "run-until-canceled", false, "Runs a job until it is explicitly canceled. The job will run again regardless of whether the job completed successfully, failed, or the server crashed or was shut down. For v3 API only.") + cmd.Flags().StringVar(&f.description, "description", "", "Description of the request. For v3 API only.") cmd.Flags().StringArrayVar(&f.successTopics, "success-topic", []string{}, "Topics to notify when the job succeeds. Can be specified more than once.") cmd.Flags().StringArrayVar(&f.failureTopics, "failure-topic", []string{}, "Topics to notify when the job fails. Can be specified more than once.") - cmd.Flags().StringArrayVar(&f.nodeManagerDirective, "node-manager-directive", []string{}, "Additional NM Directives, which can include client-configured keys, to pass to the notification service for custom use by subscriptions. Specify as Key=Value Can be passed in multiple times.") + cmd.Flags().StringArrayVar(&f.nodeManagerDirective, "node-manager-directive", []string{}, "Additional NM Directives, which can include client-configured keys, to pass to the notification service for custom use by subscriptions. Specify as Key=Value Can be passed in multiple times. For v3 API only.") + cmd.Flags().StringArrayVar(&f.directive, "directive", []string{}, "Additional directives to pass to the job submission. Specify as Key=Value. Can be passed in multiple times. For v4 API only.") + cmd.Flags().StringVar(&f.queue, "queue", "", "Queue of the job to submit. Equavalent to --tag (deprecated).") + cmd.Flags().StringVar(&f.queue, "tag", "", "The queue (job routing tag) for the request.") + cmd.Flags().IntVar(&f.maxJobRuntime, "max-job-runtime", -1, "Time (in seconds) elapsed for a running job before it's cancelled. The minimum value is 1 second, values less than 1 second are ignored. Equavalent to --time-until-canceled (deprecated).") + cmd.Flags().IntVar(&f.maxJobRuntime, "time-until-canceled", -1, "Time (in seconds) elapsed for a running job before it's cancelled. The minimum value is 1 second, values less than 1 second are ignored.") + cmd.Flags().IntVar(&f.maxTimeInQueue, "max-time-in-queue", -1, "Time to live in the job queue (in seconds). Equavalent to --time-to-live (deprecated).") + cmd.Flags().IntVar(&f.maxTimeInQueue, "time-to-live", -1, "Time to live in the job queue (in seconds).") + cmd.Flags().IntVar(&f.maxTotalLifeTime, "max-total-life-time", -1, "Time to live including both time in the queue and run time (in seconds). The maximum value is 86400 and the minimum value is 1. For v4 API only.") cmd.Flags().StringVarP(&f.outputType, "output", "o", "table", "Specify the output type. Should be one of table, json, or custom-columns") cmd.Flags().BoolVar(&f.noHeaders, "no-headers", false, "Don't print column headers") @@ -145,6 +178,16 @@ func newRunCmd() *cobra.Command { cmd.MarkFlagsMutuallyExclusive("file", "node-manager-directive") cmd.MarkFlagsMutuallyExclusive("file", "run-until-canceled") + // deprecated flags can't be used with the equavalent new flags + cmd.MarkFlagsMutuallyExclusive("tag", "queue") + cmd.MarkFlagsMutuallyExclusive("time-until-canceled", "max-job-runtime") + cmd.MarkFlagsMutuallyExclusive("time-to-live", "max-time-in-queue") + + // mark v3 deprecated flags + cmd.Flags().MarkDeprecated("tag", "please use --queue instead") + cmd.Flags().MarkDeprecated("time-until-canceled", "please use --max-job-runtime instead") + cmd.Flags().MarkDeprecated("time-to-live", "please use --max-time-in-queue instead") + return cmd } @@ -161,60 +204,47 @@ func runRun(f *runFlags) func(cmd *cobra.Command, args []string) error { Timeout: 604800 * time.Second, } - var result JobResult - var responseData []byte + if viper.GetInt("build") >= 26018 { + var result JobResultV4 + var responseData []byte - if f.sourceData == "" { - job := &JobRequest{} + job := &JobRequestV4{} + job.PublishedParameters = make(map[string]interface{}) // get published parameters for _, parameter := range f.publishedParameter { this_parameter := strings.SplitN(parameter, "=", 2) - var a SimpleParameter - a.Name = this_parameter[0] - a.Value = this_parameter[1] - job.PublishedParameters = append(job.PublishedParameters, a) + job.PublishedParameters[this_parameter[0]] = this_parameter[1] } // get list published parameters for _, parameter := range f.listPublishedParameter { this_parameter := strings.SplitN(parameter, "=", 2) - var a ListParameter - a.Name = this_parameter[0] - // split on commas, unless they are escaped - a.Value = splitEscapedString(this_parameter[1], ',') - job.PublishedParameters = append(job.PublishedParameters, a) - + job.PublishedParameters[this_parameter[0]] = splitEscapedString(this_parameter[1], ',') } - // get node manager directives - for _, directive := range f.nodeManagerDirective { - this_directive := strings.Split(directive, "=") - var a Directive - a.Name = this_directive[0] - a.Value = this_directive[1] - job.NMDirectives.Directives = append(job.NMDirectives.Directives, a) + job.Directives = make(map[string]string) + for _, directive := range f.directive { + this_directive := strings.SplitN(directive, "=", 2) + job.Directives[this_directive[0]] = this_directive[1] } - if f.ttc != -1 { - job.TMDirectives.Ttc = f.ttc - } - if f.ttl != -1 { - job.TMDirectives.TTL = f.ttl - } + job.SuccessTopics = append(job.SuccessTopics, f.successTopics...) + job.FailureTopics = append(job.FailureTopics, f.failureTopics...) + job.Queue = f.queue + job.Repository = f.repository + job.Workspace = f.workspace - if f.tag != "" { - job.TMDirectives.Tag = f.tag + if f.maxJobRuntime > 0 { + job.MaxJobRuntime = f.maxJobRuntime } - job.TMDirectives.Rtc = f.rtc - - // append slice to slice - job.NMDirectives.SuccessTopics = append(job.NMDirectives.SuccessTopics, f.successTopics...) - job.NMDirectives.FailureTopics = append(job.NMDirectives.FailureTopics, f.failureTopics...) + if f.maxTimeInQueue > 0 { + job.MaxTimeInQueue = f.maxTimeInQueue + } - if f.description != "" { - job.TMDirectives.Description = f.description + if f.wait && f.maxTotalLifeTime > 0 && f.maxTotalLifeTime < 86401 { + job.MaxTotalLifeTime = f.maxTotalLifeTime } jobJson, err := json.Marshal(job) @@ -222,12 +252,12 @@ func runRun(f *runFlags) func(cmd *cobra.Command, args []string) error { return err } - submitEndpoint := "submit" + syncEndpoint := "" if f.wait { - submitEndpoint = "transact" + syncEndpoint = "/sync" } - endpoint := "/fmerest/v3/transformations/" + submitEndpoint + "/" + f.repository + "/" + f.workspace + endpoint := "/fmeapiv4/jobs" + syncEndpoint request, err := buildFmeFlowRequest(endpoint, "POST", strings.NewReader(string(jobJson))) if err != nil { @@ -235,19 +265,12 @@ func runRun(f *runFlags) func(cmd *cobra.Command, args []string) error { } request.Header.Add("Content-Type", "application/json") - response, err := client.Do(&request) if err != nil { return err } else if response.StatusCode != 200 && response.StatusCode != 202 { - if response.StatusCode == 404 { - return fmt.Errorf("%w: check that the specified workspace and repository exist", errors.New(response.Status)) - } else if response.StatusCode == 422 { - return fmt.Errorf("%w: either job failed or published parameters are invalid", errors.New(response.Status)) - } else { - return errors.New(response.Status) - } + return errors.New(response.Status) } responseData, err = io.ReadAll(response.Body) @@ -275,146 +298,325 @@ func runRun(f *runFlags) func(cmd *cobra.Command, args []string) error { return err } } - } else { - // we are uploading a source file, so we want to send the file in the body as octet stream, and parameters as url parameters - file, err := os.Open(f.sourceData) - if err != nil { - return err - } - defer file.Close() - endpoint := "/fmerest/v3/transformations/transactdata/" + f.repository + "/" + f.workspace - request, err := buildFmeFlowRequest(endpoint, "POST", file) - if err != nil { - return err - } + if f.wait { + if f.outputType == "table" { + t := table.NewWriter() + t.SetStyle(defaultStyle) - q := request.URL.Query() + t.AppendHeader(table.Row{"ID", "Status", "Status Message", "Features Output"}) - if f.description != "" { - q.Add("opt_description", f.description) - } + t.AppendRow(table.Row{result.ID, result.Status, result.StatusMessage, result.FeatureOutputCount}) - for _, topic := range f.successTopics { - q.Add("opt_successtopics", topic) - } + if f.noHeaders { + t.ResetHeaders() + } + fmt.Fprintln(cmd.OutOrStdout(), t.Render()) - for _, topic := range f.failureTopics { - q.Add("opt_failuretopics", topic) - } + } else if f.outputType == "json" { + prettyJSON, err := prettyPrintJSON(responseData) + if err != nil { + return err + } + fmt.Fprintln(cmd.OutOrStdout(), prettyJSON) + } else if strings.HasPrefix(f.outputType, "custom-columns") { + // parse the columns and json queries + columnsString := "" + if strings.HasPrefix(f.outputType, "custom-columns=") { + columnsString = f.outputType[len("custom-columns="):] + } + if len(columnsString) == 0 { + return errors.New("custom-columns format specified but no custom columns given") + } - if f.description != "" { - endpoint += "opt_description=" + f.description - } + // we have to marshal the Items array, then create an array of marshalled items + // to pass to the creation of the table. + marshalledItems := [][]byte{} - if f.tag != "" { - q.Add("opt_tag", f.tag) - } + mJson, err := json.Marshal(result) + if err != nil { + return err + } - if f.ttl != -1 { - q.Add("opt_ttl", strconv.Itoa(f.ttl)) - } + marshalledItems = append(marshalledItems, mJson) - if f.ttc != -1 { - q.Add("opt_ttc", strconv.Itoa(f.ttc)) - } + columnsInput := strings.Split(columnsString, ",") + t, err := createTableFromCustomColumns(marshalledItems, columnsInput) + if err != nil { + return err + } + if f.noHeaders { + t.ResetHeaders() + } + fmt.Fprintln(cmd.OutOrStdout(), t.Render()) - for _, parameter := range f.publishedParameter { - this_parameter := strings.SplitN(parameter, "=", 2) - q.Add(this_parameter[0], this_parameter[1]) - } - for _, parameter := range f.listPublishedParameter { - this_parameter := strings.SplitN(parameter, "=", 2) - this_list := splitEscapedString(this_parameter[1], ',') - for _, item := range this_list { - q.Add(this_parameter[0], item) + } else { + return errors.New("invalid output format specified") } } + return nil - request.URL.RawQuery = q.Encode() + } else { - request.Header.Set("Content-Type", "application/octet-stream") + var result JobResultV3 + var responseData []byte - response, err := client.Do(&request) - if err != nil { - return err - } else if response.StatusCode != 200 { - if response.StatusCode == 404 { - return fmt.Errorf("%w: check that the specified workspace and repository exist", errors.New(response.Status)) - } else { - return errors.New(response.Status) + if f.sourceData == "" { + job := &JobRequestV3{} + + // get published parameters + for _, parameter := range f.publishedParameter { + this_parameter := strings.SplitN(parameter, "=", 2) + var a SimpleParameter + a.Name = this_parameter[0] + a.Value = this_parameter[1] + job.PublishedParameters = append(job.PublishedParameters, a) } - } + // get list published parameters + for _, parameter := range f.listPublishedParameter { + this_parameter := strings.SplitN(parameter, "=", 2) + var a ListParameter + a.Name = this_parameter[0] + // split on commas, unless they are escaped + a.Value = splitEscapedString(this_parameter[1], ',') + job.PublishedParameters = append(job.PublishedParameters, a) - responseData, err = io.ReadAll(response.Body) - if err != nil { - return err - } + } - if err := json.Unmarshal(responseData, &result); err != nil { - return err - } - } + // get node manager directives + for _, directive := range f.nodeManagerDirective { + this_directive := strings.Split(directive, "=") + var a Directive + a.Name = this_directive[0] + a.Value = this_directive[1] + job.NMDirectives.Directives = append(job.NMDirectives.Directives, a) + } + + if f.maxJobRuntime != -1 { + job.TMDirectives.Ttc = f.maxJobRuntime + } + if f.maxTimeInQueue != -1 { + job.TMDirectives.TTL = f.maxTimeInQueue + } - // the transactdata endpoint only runs synchonously - if f.wait || f.sourceData != "" { - if f.outputType == "table" { - t := table.NewWriter() - t.SetStyle(defaultStyle) + if f.queue != "" { + job.TMDirectives.Tag = f.queue + } - t.AppendHeader(table.Row{"ID", "Status", "Status Message", "Features Output"}) + job.TMDirectives.Rtc = f.rtc - t.AppendRow(table.Row{result.ID, result.Status, result.StatusMessage, result.NumFeaturesOutput}) + // append slice to slice + job.NMDirectives.SuccessTopics = append(job.NMDirectives.SuccessTopics, f.successTopics...) + job.NMDirectives.FailureTopics = append(job.NMDirectives.FailureTopics, f.failureTopics...) - if f.noHeaders { - t.ResetHeaders() + if f.description != "" { + job.TMDirectives.Description = f.description } - fmt.Fprintln(cmd.OutOrStdout(), t.Render()) - } else if f.outputType == "json" { - prettyJSON, err := prettyPrintJSON(responseData) + jobJson, err := json.Marshal(job) if err != nil { return err } - fmt.Fprintln(cmd.OutOrStdout(), prettyJSON) - } else if strings.HasPrefix(f.outputType, "custom-columns") { - // parse the columns and json queries - columnsString := "" - if strings.HasPrefix(f.outputType, "custom-columns=") { - columnsString = f.outputType[len("custom-columns="):] - } - if len(columnsString) == 0 { - return errors.New("custom-columns format specified but no custom columns given") + + submitEndpoint := "submit" + if f.wait { + submitEndpoint = "transact" } - // we have to marshal the Items array, then create an array of marshalled items - // to pass to the creation of the table. - marshalledItems := [][]byte{} + endpoint := "/fmerest/v3/transformations/" + submitEndpoint + "/" + f.repository + "/" + f.workspace - mJson, err := json.Marshal(result) + request, err := buildFmeFlowRequest(endpoint, "POST", strings.NewReader(string(jobJson))) if err != nil { return err } - marshalledItems = append(marshalledItems, mJson) + request.Header.Add("Content-Type", "application/json") + + response, err := client.Do(&request) - columnsInput := strings.Split(columnsString, ",") - t, err := createTableFromCustomColumns(marshalledItems, columnsInput) if err != nil { return err + } else if response.StatusCode != 200 && response.StatusCode != 202 { + if response.StatusCode == 404 { + return fmt.Errorf("%w: check that the specified workspace and repository exist", errors.New(response.Status)) + } else if response.StatusCode == 422 { + return fmt.Errorf("%w: either job failed or published parameters are invalid", errors.New(response.Status)) + } else { + return errors.New(response.Status) + } } - if f.noHeaders { - t.ResetHeaders() + + responseData, err = io.ReadAll(response.Body) + if err != nil { + return err } - fmt.Fprintln(cmd.OutOrStdout(), t.Render()) + if !f.wait { + var result JobId + if err := json.Unmarshal(responseData, &result); err != nil { + return err + } else { + if !jsonOutput { + fmt.Fprintln(cmd.OutOrStdout(), "Job submitted with id: "+strconv.Itoa(result.Id)) + } else { + prettyJSON, err := prettyPrintJSON(responseData) + if err != nil { + return err + } + fmt.Fprintln(cmd.OutOrStdout(), prettyJSON) + } + } + } else { + if err := json.Unmarshal(responseData, &result); err != nil { + return err + } + } } else { - return errors.New("invalid output format specified") + // we are uploading a source file, so we want to send the file in the body as octet stream, and parameters as url parameters + file, err := os.Open(f.sourceData) + if err != nil { + return err + } + defer file.Close() + + endpoint := "/fmerest/v3/transformations/transactdata/" + f.repository + "/" + f.workspace + request, err := buildFmeFlowRequest(endpoint, "POST", file) + if err != nil { + return err + } + + q := request.URL.Query() + + if f.description != "" { + q.Add("opt_description", f.description) + } + + for _, topic := range f.successTopics { + q.Add("opt_successtopics", topic) + } + + for _, topic := range f.failureTopics { + q.Add("opt_failuretopics", topic) + } + + if f.description != "" { + endpoint += "opt_description=" + f.description + } + + if f.queue != "" { + q.Add("opt_tag", f.queue) + } + + if f.maxTimeInQueue != -1 { + q.Add("opt_ttl", strconv.Itoa(f.maxTimeInQueue)) + } + + if f.maxJobRuntime != -1 { + q.Add("opt_ttc", strconv.Itoa(f.maxJobRuntime)) + } + + for _, parameter := range f.publishedParameter { + this_parameter := strings.SplitN(parameter, "=", 2) + q.Add(this_parameter[0], this_parameter[1]) + } + for _, parameter := range f.listPublishedParameter { + this_parameter := strings.SplitN(parameter, "=", 2) + this_list := splitEscapedString(this_parameter[1], ',') + for _, item := range this_list { + q.Add(this_parameter[0], item) + } + + } + + request.URL.RawQuery = q.Encode() + + request.Header.Set("Content-Type", "application/octet-stream") + + response, err := client.Do(&request) + if err != nil { + return err + } else if response.StatusCode != 200 { + if response.StatusCode == 404 { + return fmt.Errorf("%w: check that the specified workspace and repository exist", errors.New(response.Status)) + } else { + return errors.New(response.Status) + } + + } + + responseData, err = io.ReadAll(response.Body) + if err != nil { + return err + } + + if err := json.Unmarshal(responseData, &result); err != nil { + return err + } } + + // the transactdata endpoint only runs synchonously + if f.wait || f.sourceData != "" { + if f.outputType == "table" { + t := table.NewWriter() + t.SetStyle(defaultStyle) + + t.AppendHeader(table.Row{"ID", "Status", "Status Message", "Features Output"}) + + t.AppendRow(table.Row{result.ID, result.Status, result.StatusMessage, result.NumFeaturesOutput}) + + if f.noHeaders { + t.ResetHeaders() + } + fmt.Fprintln(cmd.OutOrStdout(), t.Render()) + + } else if f.outputType == "json" { + prettyJSON, err := prettyPrintJSON(responseData) + if err != nil { + return err + } + fmt.Fprintln(cmd.OutOrStdout(), prettyJSON) + } else if strings.HasPrefix(f.outputType, "custom-columns") { + // parse the columns and json queries + columnsString := "" + if strings.HasPrefix(f.outputType, "custom-columns=") { + columnsString = f.outputType[len("custom-columns="):] + } + if len(columnsString) == 0 { + return errors.New("custom-columns format specified but no custom columns given") + } + + // we have to marshal the Items array, then create an array of marshalled items + // to pass to the creation of the table. + marshalledItems := [][]byte{} + + mJson, err := json.Marshal(result) + if err != nil { + return err + } + + marshalledItems = append(marshalledItems, mJson) + + columnsInput := strings.Split(columnsString, ",") + t, err := createTableFromCustomColumns(marshalledItems, columnsInput) + if err != nil { + return err + } + if f.noHeaders { + t.ResetHeaders() + } + fmt.Fprintln(cmd.OutOrStdout(), t.Render()) + + } else { + return errors.New("invalid output format specified") + } + } + return nil } + return nil + } } diff --git a/cmd/run_test.go b/cmd/run_v3_test.go similarity index 88% rename from cmd/run_test.go rename to cmd/run_v3_test.go index bb08e22..32fdd4f 100644 --- a/cmd/run_test.go +++ b/cmd/run_v3_test.go @@ -131,25 +131,52 @@ func TestRun(t *testing.T) { statusCode: http.StatusOK, body: responseV3ASync, args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--tag", "myqueue"}, + wantOutputRegex: "Job submitted with id: 1", + wantBodyRegEx: ".*\"TMDirectives\":{.*\"tag\":\"myqueue\".*}.*", + }, + { + name: "queue flag async", + statusCode: http.StatusOK, + body: responseV3ASync, + args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--queue", "myqueue"}, wantOutputRegex: "^[\\s]*Job submitted with id: 1[\\s]*$", wantBodyRegEx: ".*\"TMDirectives\":{.*\"tag\":\"myqueue\".*}.*", }, + { name: "time to live flag async", statusCode: http.StatusOK, body: responseV3ASync, args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--time-to-live", "60"}, + wantOutputRegex: "Job submitted with id: 1", + wantBodyRegEx: ".*\"TMDirectives\":{.*\"ttl\":60.*}.*", + }, + { + name: "max time in queue flag async", + statusCode: http.StatusOK, + body: responseV3ASync, + args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--max-time-in-queue", "60"}, wantOutputRegex: "^[\\s]*Job submitted with id: 1[\\s]*$", wantBodyRegEx: ".*\"TMDirectives\":{.*\"ttl\":60.*}.*", }, + { name: "timeuntil canceled flag async", statusCode: http.StatusOK, body: responseV3ASync, args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--time-until-canceled", "60"}, + wantOutputRegex: "Job submitted with id: 1", + wantBodyRegEx: ".*\"TMDirectives\":{.*\"ttc\":60.*}.*", + }, + { + name: "max job runtime flag async", + statusCode: http.StatusOK, + body: responseV3ASync, + args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--max-job-runtime", "60"}, wantOutputRegex: "^[\\s]*Job submitted with id: 1[\\s]*$", wantBodyRegEx: ".*\"TMDirectives\":{.*\"ttc\":60.*}.*", }, + { name: "published parameter async", statusCode: http.StatusOK, @@ -196,7 +223,7 @@ func TestRun(t *testing.T) { statusCode: http.StatusOK, body: responseV3Sync, args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--tag", "myqueue", "--file", f.Name()}, - wantOutputRegex: "^[\\s]*ID[\\s]*STATUS[\\s]*STATUS MESSAGE[\\s]*FEATURES OUTPUT[\\s]*1[\\s]*SUCCESS[\\s]*Translation Successful[\\s]*1539[\\s]*$", + wantOutputRegex: "ID[\\s]*STATUS[\\s]*STATUS MESSAGE[\\s]*FEATURES OUTPUT[\\s]*1[\\s]*SUCCESS[\\s]*Translation Successful[\\s]*1539", wantFormParams: map[string]string{"opt_tag": "myqueue"}, }, { @@ -204,7 +231,7 @@ func TestRun(t *testing.T) { statusCode: http.StatusOK, body: responseV3Sync, args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--time-to-live", "60", "--file", f.Name()}, - wantOutputRegex: "^[\\s]*ID[\\s]*STATUS[\\s]*STATUS MESSAGE[\\s]*FEATURES OUTPUT[\\s]*1[\\s]*SUCCESS[\\s]*Translation Successful[\\s]*1539[\\s]*$", + wantOutputRegex: "ID[\\s]*STATUS[\\s]*STATUS MESSAGE[\\s]*FEATURES OUTPUT[\\s]*1[\\s]*SUCCESS[\\s]*Translation Successful[\\s]*1539", wantFormParams: map[string]string{"opt_ttl": "60"}, }, { @@ -212,7 +239,7 @@ func TestRun(t *testing.T) { statusCode: http.StatusOK, body: responseV3Sync, args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--time-until-canceled", "60", "--file", f.Name()}, - wantOutputRegex: "^[\\s]*ID[\\s]*STATUS[\\s]*STATUS MESSAGE[\\s]*FEATURES OUTPUT[\\s]*1[\\s]*SUCCESS[\\s]*Translation Successful[\\s]*1539[\\s]*$", + wantOutputRegex: "ID[\\s]*STATUS[\\s]*STATUS MESSAGE[\\s]*FEATURES OUTPUT[\\s]*1[\\s]*SUCCESS[\\s]*Translation Successful[\\s]*1539", wantFormParams: map[string]string{"opt_ttc": "60"}, }, { diff --git a/cmd/run_v4_test.go b/cmd/run_v4_test.go new file mode 100644 index 0000000..5e5ff66 --- /dev/null +++ b/cmd/run_v4_test.go @@ -0,0 +1,224 @@ +package cmd + +import ( + "net/http" + "testing" +) + +func TestRunV4(t *testing.T) { + responseV4ASync := `{ + "id": 1 + }` + + responseV4Sync := `{ + "id": 1, + "featureOutputCount": 1539, + "requesterHost": "10.1.113.39", + "requesterResultPort": 37805, + "status": "SUCCESS", + "statusMessage": "Translation Successful", + "timeFinished": "2023-02-04T00:16:30Z", + "timeQueued": "2023-02-04T00:16:28Z", + "timeStarted": "2023-02-04T00:16:28Z" + }` + + cases := []testCase{ + { + name: "unknown flag", + statusCode: http.StatusOK, + args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--badflag"}, + wantErrOutputRegex: "unknown flag: --badflag", + }, + { + name: "500 bad status code", + statusCode: http.StatusInternalServerError, + wantErrText: "500 Internal Server Error", + args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw"}, + fmeflowBuild: 26018, + }, + { + name: "repository flag required", + wantErrText: "required flag(s) \"repository\" not set", + args: []string{"run", "--workspace", "austinApartments.fmw"}, + }, + { + name: "workspace flag required", + wantErrText: "required flag(s) \"workspace\" not set", + args: []string{"run", "--repository", "Samples"}, + }, + { + name: "run sync job table output", + statusCode: http.StatusOK, + args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--wait"}, + body: responseV4Sync, + wantOutputRegex: "^[\\s]*ID[\\s]*STATUS[\\s]*STATUS MESSAGE[\\s]*FEATURES OUTPUT[\\s]*1[\\s]*SUCCESS[\\s]*Translation Successful[\\s]*1539[\\s]*$", + fmeflowBuild: 26018, + }, + { + name: "run async job regular output", + statusCode: http.StatusOK, + body: responseV4ASync, + args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw"}, + wantOutputRegex: "^[\\s]*Job submitted with id: 1[\\s]*$", + fmeflowBuild: 26018, + }, + { + name: "run async job json", + statusCode: http.StatusOK, + body: responseV4ASync, + args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--json"}, + wantOutputJson: responseV4ASync, + fmeflowBuild: 26018, + }, + { + name: "run sync job json output", + statusCode: http.StatusOK, + body: responseV4Sync, + args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--json", "--wait"}, + wantOutputJson: responseV4Sync, + fmeflowBuild: 26018, + }, + { + name: "failure topic flag async", + statusCode: http.StatusOK, + body: responseV4ASync, + args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--failure-topic", "FAILURE_TOPIC"}, + wantOutputRegex: "^[\\s]*Job submitted with id: 1[\\s]*$", + wantBodyRegEx: ".*\"failureTopics\":\\[\"FAILURE_TOPIC\"\\].*", + fmeflowBuild: 26018, + }, + { + name: "success topic flag async", + statusCode: http.StatusOK, + body: responseV4ASync, + args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--success-topic", "SUCCESS_TOPIC"}, + wantOutputRegex: "^[\\s]*Job submitted with id: 1[\\s]*$", + wantBodyRegEx: ".*\"successTopics\":\\[\"SUCCESS_TOPIC\"\\].*", + fmeflowBuild: 26018, + }, + { + name: "directive flag async", + statusCode: http.StatusOK, + body: responseV4ASync, + args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--directive", "directive1=value1", "--directive", "directive2=value2"}, + wantOutputRegex: "^[\\s]*Job submitted with id: 1[\\s]*$", + wantBodyRegEx: ".*\"directives\":{.*\"directive1\":\"value1\".*\"directive2\":\"value2\".*}.*", + fmeflowBuild: 26018, + }, + { + name: "published parameter async", + statusCode: http.StatusOK, + body: responseV4ASync, + args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--published-parameter", "COORDSYS=TX83-CF"}, + wantOutputRegex: "^[\\s]*Job submitted with id: 1[\\s]*$", + wantBodyRegEx: ".*\"publishedParameters\":{.*\"COORDSYS\":\"TX83-CF\".*}.*", + fmeflowBuild: 26018, + }, + { + name: "published parameter list async", + statusCode: http.StatusOK, + body: responseV4ASync, + args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--published-parameter-list", "THEMES=railroad,airports"}, + wantOutputRegex: "^[\\s]*Job submitted with id: 1[\\s]*$", + wantBodyRegEx: ".*\"publishedParameters\":{.*\"THEMES\":\\[\"railroad\",\"airports\"\\].*}.*", + fmeflowBuild: 26018, + }, + { + name: "max job runtime flag async", + statusCode: http.StatusOK, + body: responseV4ASync, + args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--max-job-runtime", "10"}, + wantOutputRegex: "^[\\s]*Job submitted with id: 1[\\s]*$", + wantBodyRegEx: ".*\"maxJobRuntime\":10.*", + fmeflowBuild: 26018, + }, + { + name: "time until canceled flag async deprecated", + statusCode: http.StatusOK, + body: responseV4ASync, + args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--time-until-canceled", "10"}, + wantOutputRegex: "Flag --time-until-canceled has been deprecated, please use --max-job-runtime instead[\\s\\S]*Job submitted with id: 1", + wantBodyRegEx: ".*\"maxJobRuntime\":10.*", + fmeflowBuild: 26018, + }, + { + name: "max job runtime invalid value ignored", + statusCode: http.StatusOK, + body: responseV4ASync, + args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--max-job-runtime", "-5"}, + wantOutputRegex: "^[\\s]*Job submitted with id: 1[\\s]*$", + fmeflowBuild: 26018, + }, + { + name: "max time in queue flag async", + statusCode: http.StatusOK, + body: responseV4ASync, + args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--max-time-in-queue", "60"}, + wantOutputRegex: "^[\\s]*Job submitted with id: 1[\\s]*$", + wantBodyRegEx: ".*\"maxTimeInQueue\":60.*", + fmeflowBuild: 26018, + }, + { + name: "time to live flag async deprecated", + statusCode: http.StatusOK, + body: responseV4ASync, + args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--time-to-live", "60"}, + wantOutputRegex: "Flag --time-to-live has been deprecated, please use --max-time-in-queue instead[\\s\\S]*Job submitted with id: 1", + wantBodyRegEx: ".*\"maxTimeInQueue\":60.*", + fmeflowBuild: 26018, + }, + { + name: "max time in queue invalid value ignored", + statusCode: http.StatusOK, + body: responseV4ASync, + args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--max-time-in-queue", "-1"}, + wantOutputRegex: "^[\\s]*Job submitted with id: 1[\\s]*$", + fmeflowBuild: 26018, + }, + { + name: "max total life time flag sync", + statusCode: http.StatusOK, + body: responseV4Sync, + args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--wait", "--max-total-life-time", "300"}, + wantOutputRegex: "^[\\s]*ID[\\s]*STATUS[\\s]*STATUS MESSAGE[\\s]*FEATURES OUTPUT[\\s]*1[\\s]*SUCCESS[\\s]*Translation Successful[\\s]*1539[\\s]*$", + wantBodyRegEx: ".*\"maxTotalLifeTime\":300.*", + fmeflowBuild: 26018, + }, + { + name: "max total life time invalid value ignored", + statusCode: http.StatusOK, + body: responseV4Sync, + args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--wait", "--max-total-life-time", "100000"}, + wantOutputRegex: "^[\\s]*ID[\\s]*STATUS[\\s]*STATUS MESSAGE[\\s]*FEATURES OUTPUT[\\s]*1[\\s]*SUCCESS[\\s]*Translation Successful[\\s]*1539[\\s]*$", + fmeflowBuild: 26018, + }, + { + name: "queue flag async", + statusCode: http.StatusOK, + body: responseV4ASync, + args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--queue", "MyQueue"}, + wantOutputRegex: "^[\\s]*Job submitted with id: 1[\\s]*$", + wantBodyRegEx: ".*\"queue\":\"MyQueue\".*", + fmeflowBuild: 26018, + }, + { + name: "tag flag async deprecated", + statusCode: http.StatusOK, + body: responseV4ASync, + args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--tag", "MyQueue"}, + wantOutputRegex: "Flag --tag has been deprecated, please use --queue instead[\\s\\S]*Job submitted with id: 1", + wantBodyRegEx: ".*\"queue\":\"MyQueue\".*", + fmeflowBuild: 26018, + }, + { + name: "published parameter and list combined", + statusCode: http.StatusOK, + body: responseV4ASync, + args: []string{"run", "--repository", "Samples", "--workspace", "austinApartments.fmw", "--published-parameter", "COORDSYS=TX83-CF", "--published-parameter-list", "THEMES=railroad,airports"}, + wantOutputRegex: "^[\\s]*Job submitted with id: 1[\\s]*$", + wantBodyRegEx: ".*\"publishedParameters\":{.*\"COORDSYS\":\"TX83-CF\".*\"THEMES\":\\[\"railroad\",\"airports\"\\].*}.*", + fmeflowBuild: 26018, + }, + } + runTests(cases, t) +} diff --git a/docs/fmeflow_run.md b/docs/fmeflow_run.md index 4b16cbc..694e550 100644 --- a/docs/fmeflow_run.md +++ b/docs/fmeflow_run.md @@ -21,7 +21,7 @@ fmeflow run [flags] fmeflow run --repository Samples --workspace austinApartments.fmw --wait # Submit a job to a specific queue and set a time to live in the queue - fmeflow run --repository Samples --workspace austinApartments.fmw --tag Queue1 --time-to-live 120 + fmeflow run --repository Samples --workspace austinApartments.fmw --queue Queue1 --max-time-in-queue 120 # Submit a job and pass in a few published parameters fmeflow run --repository Samples --workspace austinDownload.fmw --published-parameter-list THEMES=railroad,airports --published-parameter COORDSYS=TX83-CF @@ -39,17 +39,19 @@ fmeflow run [flags] --repository string The name of the repository containing the workspace to run. --workspace string The name of the workspace to run. --wait Submit job and wait for it to finish. - --tag string The job routing tag for the request --published-parameter stringArray Published parameters defined for this workspace. Specify as Key=Value. Can be passed in multiple times. For list parameters, use the --list-published-parameter flag. --published-parameter-list stringArray A List-type published parameters defined for this workspace. Specify as Key=Value1,Value2. Can be passed in multiple times. - --file string Upload a local file Source dataset to use to run the workspace. Note this causes the translation to run in synchonous mode whether the --wait flag is passed in or not. - --run-until-canceled Runs a job until it is explicitly canceled. The job will run again regardless of whether the job completed successfully, failed, or the server crashed or was shut down. - --time-until-canceled int Time (in seconds) elapsed for a running job before it's cancelled. The minimum value is 1 second, values less than 1 second are ignored. (default -1) - --time-to-live int Time to live in the job queue (in seconds) (default -1) - --description string Description of the request. + --file string Upload a local file Source dataset to use to run the workspace. Note this causes the translation to run in synchonous mode whether the --wait flag is passed in or not. For v3 API only. + --run-until-canceled Runs a job until it is explicitly canceled. The job will run again regardless of whether the job completed successfully, failed, or the server crashed or was shut down. For v3 API only. + --description string Description of the request. For v3 API only. --success-topic stringArray Topics to notify when the job succeeds. Can be specified more than once. --failure-topic stringArray Topics to notify when the job fails. Can be specified more than once. - --node-manager-directive stringArray Additional NM Directives, which can include client-configured keys, to pass to the notification service for custom use by subscriptions. Specify as Key=Value Can be passed in multiple times. + --node-manager-directive stringArray Additional NM Directives, which can include client-configured keys, to pass to the notification service for custom use by subscriptions. Specify as Key=Value Can be passed in multiple times. For v3 API only. + --directive stringArray Additional directives to pass to the job submission. Specify as Key=Value. Can be passed in multiple times. For v4 API only. + --queue string Queue of the job to submit. Equavalent to --tag (deprecated). + --max-job-runtime int Time (in seconds) elapsed for a running job before it's cancelled. The minimum value is 1 second, values less than 1 second are ignored. Equavalent to --time-until-canceled (deprecated). (default -1) + --max-time-in-queue int Time to live in the job queue (in seconds). Equavalent to --time-to-live (deprecated). (default -1) + --max-total-life-time int Time to live including both time in the queue and run time (in seconds). The maximum value is 86400 and the minimum value is 1. For v4 API only. (default -1) -o, --output string Specify the output type. Should be one of table, json, or custom-columns (default "table") --no-headers Don't print column headers -h, --help help for run