-
Notifications
You must be signed in to change notification settings - Fork 1
fix: respect response_status/response_body/response_headers from pipeline context when no HTTP response written #254
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
eda8934
0c33faf
5081e4f
db6365a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,8 @@ import ( | |
| "log" | ||
| "maps" | ||
| "net/http" | ||
| "strconv" | ||
| "strings" | ||
|
|
||
| "github.com/CrisisTextLine/modular" | ||
| ) | ||
|
|
@@ -31,6 +33,129 @@ type httpReqContextKey struct{} | |
| // headers, path parameters, and the request body. | ||
| var HTTPRequestContextKey = httpReqContextKey{} | ||
|
|
||
| // pipelineResultKey is the unexported type for the pipeline result context key. | ||
| type pipelineResultKey struct{} | ||
|
|
||
| // PipelineResultContextKey is the context key used to capture pipeline execution | ||
| // results from TriggerWorkflow. HTTP trigger handlers store a *PipelineResultHolder | ||
| // in the context before calling TriggerWorkflow; the engine populates it with the | ||
| // pipeline's result.Current map after execution. This lets the trigger apply | ||
| // response_status/response_body/response_headers from the pipeline output when no | ||
| // step wrote directly to the HTTP response writer. | ||
| var PipelineResultContextKey = pipelineResultKey{} | ||
|
|
||
| // PipelineResultHolder is a mutable container used to pass pipeline execution | ||
| // results back through the context from the engine to the HTTP trigger handler. | ||
| type PipelineResultHolder struct { | ||
| result map[string]any | ||
| } | ||
|
|
||
| // Set stores the pipeline result in the holder. | ||
| func (h *PipelineResultHolder) Set(result map[string]any) { | ||
| h.result = result | ||
| } | ||
|
|
||
| // Get returns the stored pipeline result, or nil if not set. | ||
| func (h *PipelineResultHolder) Get() map[string]any { | ||
| return h.result | ||
| } | ||
|
|
||
| // coercePipelineStatus coerces common numeric/string types into an HTTP status | ||
| // code. Pipeline steps may emit response_status as int, int64, float64 (common | ||
| // after generic JSON decoding), json.Number, or a numeric string. | ||
| func coercePipelineStatus(v any) (int, bool) { | ||
| switch s := v.(type) { | ||
| case int: | ||
| return s, true | ||
| case int64: | ||
| status := int(s) | ||
| if int64(status) != s { | ||
| return 0, false | ||
| } | ||
| return status, true | ||
| case float64: | ||
| status := int(s) | ||
| if float64(status) != s { | ||
| return 0, false | ||
| } | ||
| return status, true | ||
| case json.Number: | ||
| i64, err := s.Int64() | ||
| if err != nil { | ||
| return 0, false | ||
| } | ||
| status := int(i64) | ||
| if int64(status) != i64 { | ||
| return 0, false | ||
| } | ||
| return status, true | ||
| case string: | ||
| n, err := strconv.Atoi(strings.TrimSpace(s)) | ||
| if err != nil { | ||
| return 0, false | ||
| } | ||
| return n, true | ||
| default: | ||
| return 0, false | ||
| } | ||
| } | ||
|
|
||
| // applyPipelineHeaders writes response headers from common map/header shapes | ||
| // that pipeline steps may emit for response_headers. | ||
| func applyPipelineHeaders(w http.ResponseWriter, rawHeaders any) { | ||
| switch headers := rawHeaders.(type) { | ||
| case map[string]any: | ||
| for k, v := range headers { | ||
| switch hv := v.(type) { | ||
| case string: | ||
| w.Header().Set(k, hv) | ||
| case []string: | ||
| for _, sv := range hv { | ||
| w.Header().Add(k, sv) | ||
| } | ||
| case []any: | ||
| for _, sv := range hv { | ||
| w.Header().Add(k, fmt.Sprint(sv)) | ||
| } | ||
| default: | ||
| w.Header().Set(k, fmt.Sprint(hv)) | ||
| } | ||
| } | ||
| case map[string]string: | ||
| for k, v := range headers { | ||
| w.Header().Set(k, v) | ||
| } | ||
| case http.Header: | ||
| for k, vals := range headers { | ||
| for _, v := range vals { | ||
| w.Header().Add(k, v) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // writePipelineContextResponse checks the result map for response_status and, | ||
| // if present, applies response_headers and writes the response. Returns true if | ||
| // the response was written from the pipeline context fields. | ||
| func writePipelineContextResponse(w http.ResponseWriter, result map[string]any) bool { | ||
| rawStatus, ok := result["response_status"] | ||
| if !ok { | ||
| return false | ||
| } | ||
| status, ok := coercePipelineStatus(rawStatus) | ||
| if !ok { | ||
| return false | ||
| } | ||
| if rawHeaders, ok := result["response_headers"]; ok { | ||
| applyPipelineHeaders(w, rawHeaders) | ||
| } | ||
| w.WriteHeader(status) | ||
| if body, ok := result["response_body"].(string); ok { | ||
| _, _ = w.Write([]byte(body)) //nolint:gosec // G705: body is pipeline step output explicitly set as response body | ||
| } | ||
| return true | ||
| } | ||
|
|
||
| // trackedResponseWriter wraps http.ResponseWriter and tracks whether a response | ||
| // body has been written, so the HTTP trigger can fall back to the generic | ||
| // "workflow triggered" response only when the pipeline didn't write one. | ||
|
|
@@ -267,6 +392,11 @@ func (t *HTTPTrigger) createHandler(route HTTPTriggerRoute) HTTPHandler { | |
| // to headers (e.g. Authorization), method, URL, and body. | ||
| ctx = context.WithValue(ctx, HTTPRequestContextKey, r) | ||
|
|
||
| // Inject a result holder so the engine can pass the pipeline's result.Current | ||
| // back to this handler without changing the WorkflowEngine interface. | ||
| resultHolder := &PipelineResultHolder{} | ||
| ctx = context.WithValue(ctx, PipelineResultContextKey, resultHolder) | ||
|
|
||
| // Extract data from the request to pass to the workflow. | ||
| // Include method, path, and parsed body so pipelines have full | ||
| // access to request context (consistent with CommandHandler). | ||
|
|
@@ -316,6 +446,14 @@ func (t *HTTPTrigger) createHandler(route HTTPTriggerRoute) HTTPHandler { | |
| return | ||
| } | ||
|
|
||
| // If the pipeline set response_status in its output (without writing | ||
| // directly to the response writer), use those values to build the response. | ||
| if result := resultHolder.Get(); result != nil { | ||
| if writePipelineContextResponse(w, result) { | ||
| return | ||
| } | ||
|
Comment on lines
+449
to
+454
|
||
| } | ||
|
|
||
| // Fallback: return a generic accepted response when the pipeline doesn't | ||
| // write its own HTTP response. | ||
| w.Header().Set("Content-Type", "application/json") | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This adds a new behavior path where
TriggerWorkflowpopulates aPipelineResultHolderfrom the context. There are existing unit tests forStdEngine.TriggerWorkflowinengine_test.go, but none exercising this holder population; adding a test that asserts the holder is set (and remains nil when absent) would help prevent regressions.