From 227ab8c154081cd42f240f1133c580ea32b0a3bc Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Sat, 6 Jun 2026 02:36:31 -0400 Subject: [PATCH 1/3] refactor: use compute-core task proof sdk --- go.mod | 6 +- go.sum | 5 +- internal/plugin/client.go | 157 --------------------------------- internal/plugin/plugin_test.go | 2 +- internal/plugin/sign.go | 2 +- internal/plugin/step.go | 26 +++--- module_dependency_test.go | 39 ++++++++ 7 files changed, 61 insertions(+), 176 deletions(-) delete mode 100644 internal/plugin/client.go create mode 100644 module_dependency_test.go diff --git a/go.mod b/go.mod index 1e14863..3d289e5 100644 --- a/go.mod +++ b/go.mod @@ -4,11 +4,12 @@ go 1.26.0 require ( github.com/GoCodeAlone/workflow v0.64.0 - github.com/GoCodeAlone/workflow-compute v0.0.0-20260523064329-58fe23d9f596 + github.com/GoCodeAlone/workflow-plugin-compute-core v0.4.0 golang.org/x/net v0.54.0 ) require ( + github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect github.com/BurntSushi/toml v1.6.0 // indirect github.com/DataDog/datadog-go/v5 v5.8.3 // indirect github.com/GoCodeAlone/go-plugin v1.7.0 // indirect @@ -41,7 +42,6 @@ require ( github.com/cloudevents/sdk-go/v2 v2.16.2 // indirect github.com/containerd/errdefs v1.0.0 // indirect github.com/containerd/errdefs/pkg v0.3.0 // indirect - github.com/containerd/log v0.1.0 // indirect github.com/danieljoos/wincred v1.2.3 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/distribution/reference v0.6.0 // indirect @@ -96,8 +96,6 @@ require ( github.com/mitchellh/go-testing-interface v1.14.1 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect - github.com/moby/sys/sequential v0.6.0 // indirect - github.com/moby/term v0.5.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect diff --git a/go.sum b/go.sum index 025238f..9168ab4 100644 --- a/go.sum +++ b/go.sum @@ -14,8 +14,8 @@ github.com/GoCodeAlone/modular/modules/eventbus/v2 v2.8.0 h1:buYs0TGNbAZgtTq1Qb+ github.com/GoCodeAlone/modular/modules/eventbus/v2 v2.8.0/go.mod h1:329flAKmwrPq2JEwu9iltWv6A83H/Di82Xze+kvdKDw= github.com/GoCodeAlone/workflow v0.64.0 h1:2CpbYPwIqdGDb3xi3YJpwcteIum4ehBSrnRql/1YvB4= github.com/GoCodeAlone/workflow v0.64.0/go.mod h1:659GGDrw3QJ7b625y9rf8QhKIpt1VCoEG0MxKu5tGQs= -github.com/GoCodeAlone/workflow-compute v0.0.0-20260523064329-58fe23d9f596 h1:+ydGmjmP5Sh7l/qdtC+k9G4S65Xxlkxie/fXoIg4fuk= -github.com/GoCodeAlone/workflow-compute v0.0.0-20260523064329-58fe23d9f596/go.mod h1:T8yGXrRBm2USwkRFvMaoq4aPDt/f7JciZY9Y/l/upYs= +github.com/GoCodeAlone/workflow-plugin-compute-core v0.4.0 h1:0jpBwHsX3YorjPWdz/rBs39TmgK+ipOttBJjRXF95gQ= +github.com/GoCodeAlone/workflow-plugin-compute-core v0.4.0/go.mod h1:1T6uCpUWPCNk6XPYgKq5CL/7LkD24MphKYsYVzF4jnI= github.com/GoCodeAlone/yaegi v0.17.2 h1:WK6Y6e0t1a6U7r+S2dN3CGWW1PizYD3zO0zneToZPxM= github.com/GoCodeAlone/yaegi v0.17.2/go.mod h1:z5Pr6Wse6QJcQvpgxTxzMAevFarH0N37TG88Y9dprx0= github.com/IBM/sarama v1.47.0 h1:GcQFEd12+KzfPYeLgN69Fh7vLCtYRhVIx0rO4TZO318= @@ -417,6 +417,7 @@ golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/plugin/client.go b/internal/plugin/client.go deleted file mode 100644 index 17a8384..0000000 --- a/internal/plugin/client.go +++ /dev/null @@ -1,157 +0,0 @@ -package plugin - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "net" - "net/http" - "net/url" - "strings" - "time" - - "github.com/GoCodeAlone/workflow-compute/pkg/protocol" -) - -type computeClient struct { - baseURL *url.URL - token string - http *http.Client -} - -type taskList struct { - Tasks []protocol.Task `json:"tasks"` - Stalls []taskStall `json:"stalls,omitempty"` -} - -type taskStall struct { - TaskID string `json:"task_id,omitempty"` - LeaseID string `json:"lease_id,omitempty"` - AgentID string `json:"agent_id,omitempty"` - Reason string `json:"reason"` - AgeMS int64 `json:"age_ms"` -} - -func newComputeClient(serverURL, token string, timeout time.Duration) (*computeClient, error) { - parsed, err := url.ParseRequestURI(serverURL) - if err != nil || (parsed.Scheme != "http" && parsed.Scheme != "https") { - return nil, fmt.Errorf("server_url must be absolute http(s) URL") - } - if token != "" && parsed.Scheme != "https" && !isLoopbackHost(parsed.Hostname()) { - return nil, fmt.Errorf("server_url must use https when auth token is set") - } - if timeout <= 0 { - timeout = 30 * time.Second - } - return &computeClient{ - baseURL: parsed, - token: token, - http: &http.Client{Timeout: timeout}, - }, nil -} - -func isLoopbackHost(host string) bool { - host = strings.TrimSpace(strings.Trim(host, "[]")) - if host == "localhost" { - return true - } - ip := net.ParseIP(host) - return ip != nil && ip.IsLoopback() -} - -func (c *computeClient) submitTask(ctx context.Context, task protocol.Task) (protocol.Task, error) { - var out struct { - Task protocol.Task `json:"task"` - } - if err := c.doJSON(ctx, http.MethodPost, "/v1/tasks", task, http.StatusCreated, &out); err != nil { - return protocol.Task{}, err - } - return out.Task, nil -} - -func (c *computeClient) listTasks(ctx context.Context) (taskList, error) { - var out taskList - if err := c.doJSON(ctx, http.MethodGet, "/v1/tasks", nil, http.StatusOK, &out); err != nil { - return taskList{}, err - } - return out, nil -} - -func (c *computeClient) taskSnapshot(ctx context.Context, id string) (protocol.Task, bool, []taskStall, error) { - list, err := c.listTasks(ctx) - if err != nil { - return protocol.Task{}, false, nil, err - } - matchingStalls := make([]taskStall, 0) - for _, stall := range list.Stalls { - if stall.TaskID == id { - matchingStalls = append(matchingStalls, stall) - } - } - for _, task := range list.Tasks { - if task.ID == id { - return task, true, matchingStalls, nil - } - } - return protocol.Task{}, false, matchingStalls, nil -} - -func (c *computeClient) listProofs(ctx context.Context) ([]protocol.ProofReceipt, error) { - var out struct { - Proofs []protocol.ProofReceipt `json:"proofs"` - } - if err := c.doJSON(ctx, http.MethodGet, "/v1/proofs", nil, http.StatusOK, &out); err != nil { - return nil, err - } - return out.Proofs, nil -} - -func (c *computeClient) findProof(ctx context.Context, taskID string) (protocol.ProofReceipt, bool, error) { - proofs, err := c.listProofs(ctx) - if err != nil { - return protocol.ProofReceipt{}, false, err - } - for _, proof := range proofs { - if proof.TaskID == taskID { - return proof, true, nil - } - } - return protocol.ProofReceipt{}, false, nil -} - -func (c *computeClient) doJSON(ctx context.Context, method, path string, body any, want int, out any) error { - var requestBody *bytes.Reader - if body != nil { - data, err := json.Marshal(body) - if err != nil { - return fmt.Errorf("marshal request: %w", err) - } - requestBody = bytes.NewReader(data) - } else { - requestBody = bytes.NewReader(nil) - } - endpoint := c.baseURL.JoinPath(path) - req, err := http.NewRequestWithContext(ctx, method, endpoint.String(), requestBody) - if err != nil { - return fmt.Errorf("create request: %w", err) - } - if body != nil { - req.Header.Set("Content-Type", "application/json") - } - if c.token != "" { - req.Header.Set("Authorization", "Bearer "+c.token) - } - resp, err := c.http.Do(req) - if err != nil { - return fmt.Errorf("%s %s: %w", method, path, err) - } - defer resp.Body.Close() - if resp.StatusCode != want { - return fmt.Errorf("%s %s: got status %d want %d", method, path, resp.StatusCode, want) - } - if out == nil { - return nil - } - return protocol.DecodeStrict(resp.Body, out) -} diff --git a/internal/plugin/plugin_test.go b/internal/plugin/plugin_test.go index 18c0dc5..dac4acd 100644 --- a/internal/plugin/plugin_test.go +++ b/internal/plugin/plugin_test.go @@ -10,7 +10,7 @@ import ( "strings" "testing" - "github.com/GoCodeAlone/workflow-compute/pkg/protocol" + "github.com/GoCodeAlone/workflow-plugin-compute-core/protocol" ) const testProviderImageRef = "ghcr.io/gocodealone/workflow-plugin-product-capture/product-capture-browser@sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" diff --git a/internal/plugin/sign.go b/internal/plugin/sign.go index 90437d6..3963f8a 100644 --- a/internal/plugin/sign.go +++ b/internal/plugin/sign.go @@ -6,7 +6,7 @@ import ( "encoding/json" "time" - "github.com/GoCodeAlone/workflow-compute/pkg/protocol" + "github.com/GoCodeAlone/workflow-plugin-compute-core/protocol" ) func buildTask(cfg taskConfig, workload protocol.WorkloadSpec) protocol.Task { diff --git a/internal/plugin/step.go b/internal/plugin/step.go index 2d9b0bd..8b63f04 100644 --- a/internal/plugin/step.go +++ b/internal/plugin/step.go @@ -8,7 +8,7 @@ import ( "strings" "time" - "github.com/GoCodeAlone/workflow-compute/pkg/protocol" + "github.com/GoCodeAlone/workflow-plugin-compute-core/protocol" sdk "github.com/GoCodeAlone/workflow/plugin/external/sdk" ) @@ -37,7 +37,7 @@ func (c connectionConfig) validate() error { return errors.Join(errs...) } -func (c connectionConfig) client(ctx context.Context, metadata, runtimeConfig map[string]any) (*computeClient, error) { +func (c connectionConfig) client(ctx context.Context, metadata, runtimeConfig map[string]any) (*protocol.Client, error) { _ = ctx token, err := resolveRuntimeRef(c.AuthTokenRef, metadata, runtimeConfig) if err != nil { @@ -50,7 +50,11 @@ func (c connectionConfig) client(ctx context.Context, metadata, runtimeConfig ma return nil, err } } - return newComputeClient(c.ServerURL, token, timeout) + return protocol.NewClient(protocol.ClientConfig{ + ServerURL: c.ServerURL, + Token: token, + Timeout: timeout, + }) } type taskConfig struct { @@ -184,7 +188,7 @@ func (s *productCaptureStep) Execute(ctx context.Context, _ map[string]any, _ ma if err := workload.Validate(); err != nil { return errorResult(err.Error()), nil } - task, err := client.submitTask(ctx, buildTask(s.config.taskConfig, workload)) + task, err := client.SubmitTask(ctx, buildTask(s.config.taskConfig, workload)) if err != nil { return errorResult(err.Error()), nil } @@ -223,13 +227,13 @@ func (s *productCaptureStep) productCaptureProviderConfig() protocol.ProviderCon return cfg } -func (s *productCaptureStep) waitForProductCapture(ctx context.Context, client *computeClient, taskID string) (map[string]any, error) { +func (s *productCaptureStep) waitForProductCapture(ctx context.Context, client *protocol.Client, taskID string) (map[string]any, error) { pollInterval := durationOrDefault(s.config.PollInterval, time.Second) timeout := durationOrDefault(s.config.WaitTimeout, 5*time.Minute) waitCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() for { - task, found, stalls, err := client.taskSnapshot(waitCtx, taskID) + task, found, stalls, err := client.TaskSnapshot(waitCtx, taskID) if err != nil { return nil, err } @@ -246,7 +250,7 @@ func (s *productCaptureStep) waitForProductCapture(ctx context.Context, client * return output, nil } if isTerminalTaskStatus(task.Status) { - proof, hasProof, err := client.findProof(waitCtx, task.ID) + proof, hasProof, err := client.FindProof(waitCtx, task.ID) if err != nil { return nil, err } @@ -370,7 +374,7 @@ func flattenedPreviewField(key string) bool { } } -func addStallOutput(output map[string]any, stall taskStall) { +func addStallOutput(output map[string]any, stall protocol.TaskStall) { output["stall_reason"] = stall.Reason if stall.LeaseID != "" { output["lease_id"] = stall.LeaseID @@ -383,18 +387,18 @@ func addStallOutput(output map[string]any, stall taskStall) { } } -func taskWaitError(task protocol.Task, stalls []taskStall) string { +func taskWaitError(task protocol.Task, stalls []protocol.TaskStall) string { if len(stalls) > 0 { return fmt.Sprintf("task %q stalled: %s", task.ID, stalls[0].Reason) } return fmt.Sprintf("task %q %s", task.ID, task.Status) } -func actionableStalls(stalls []taskStall, requireProof bool) []taskStall { +func actionableStalls(stalls []protocol.TaskStall, requireProof bool) []protocol.TaskStall { if requireProof { return stalls } - actionable := make([]taskStall, 0, len(stalls)) + actionable := make([]protocol.TaskStall, 0, len(stalls)) for _, stall := range stalls { if stall.Reason == "proof_missing" { continue diff --git a/module_dependency_test.go b/module_dependency_test.go new file mode 100644 index 0000000..28dcd2e --- /dev/null +++ b/module_dependency_test.go @@ -0,0 +1,39 @@ +package productcapture + +import ( + "os" + "strings" + "testing" +) + +func TestProductCaptureUsesPublicComputeCoreSDK(t *testing.T) { + goMod, err := os.ReadFile("go.mod") + if err != nil { + t.Fatalf("read go.mod: %v", err) + } + text := string(goMod) + if strings.Contains(text, "github.com/GoCodeAlone/workflow-compute") { + t.Fatal("product-capture must not depend on private workflow-compute; use workflow-plugin-compute-core") + } + if !strings.Contains(text, "github.com/GoCodeAlone/workflow-plugin-compute-core") { + t.Fatal("product-capture must consume the public workflow-plugin-compute-core SDK") + } + for _, path := range []string{ + "internal/plugin/sign.go", + "internal/plugin/step.go", + "internal/plugin/plugin_test.go", + } { + data, err := os.ReadFile(path) + if err != nil { + t.Fatalf("read %s: %v", path, err) + } + if strings.Contains(string(data), "github.com/GoCodeAlone/workflow-compute/pkg/protocol") { + t.Fatalf("%s imports private workflow-compute protocol package", path) + } + } + if _, err := os.Stat("internal/plugin/client.go"); err == nil { + t.Fatal("product-capture must use compute-core protocol.Client instead of a duplicate local compute client") + } else if !os.IsNotExist(err) { + t.Fatalf("stat internal/plugin/client.go: %v", err) + } +} From f2e57dbc9e1b6a1f3312e530441deaa141f4c7f3 Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Sat, 6 Jun 2026 02:39:49 -0400 Subject: [PATCH 2/3] docs: add BuyMyWishlist product capture usage --- README.md | 3 ++ docs/buymywishlist-live-usage.md | 83 ++++++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+) create mode 100644 docs/buymywishlist-live-usage.md diff --git a/README.md b/README.md index 558d253..8f0aef5 100644 --- a/README.md +++ b/README.md @@ -56,3 +56,6 @@ workflow-compute. The step owns product-capture-specific validation and submits a generic provider workload using the `product-capture.browser.v1` contract. `workflow-plugin-compute` should only provide generic dispatch/wait/catalog plumbing. + +BuyMyWishlist live wiring details are in +[`docs/buymywishlist-live-usage.md`](docs/buymywishlist-live-usage.md). diff --git a/docs/buymywishlist-live-usage.md b/docs/buymywishlist-live-usage.md new file mode 100644 index 0000000..456245e --- /dev/null +++ b/docs/buymywishlist-live-usage.md @@ -0,0 +1,83 @@ +# BuyMyWishlist Live Usage + +`workflow-plugin-product-capture` lets BuyMyWishlist submit product URLs to a +wfcompute deployment through a generic provider workload. BuyMyWishlist should +own the user-facing wishlist/product workflow; wfcompute owns task admission, +lease placement, agent execution, proof verification, and artifact retention. + +## wfcompute Prerequisites + +Before BuyMyWishlist enables live capture, the target wfcompute environment must +have: + +- provider contract `product-capture.browser.v1` registered from this plugin; +- a network product such as `bmw-product-capture` whose provider config points + at `workflow-plugin-product-capture` provider `browser`; +- a digest-pinned provider image ref from this plugin release; +- a promoted provider package or runtime image available to agents; +- at least one online agent advertising executor provider + `product-capture-browser`, workload kind `provider`, execution tier + `sandboxed-container`, and proof tier `artifact-hash`; +- a scoped task token for BuyMyWishlist. Do not use a dashboard admin, + bootstrap, or operator token from the application. + +The deployment is not live-ready until a BMW-shaped provider task returns an +accepted proof from a `product-capture-browser` agent in the target wfcompute +environment. + +## Workflow Step + +Use `step.product_capture` with a secret reference for the scoped wfcompute +token: + +```yaml +steps: + - id: capture_product + type: step.product_capture + config: + server_url: https:// + auth_token_ref: secret:wfcompute_product_capture_token + product_id: bmw-product-capture + org_id: + pool_id: + policy_id: + timeout_seconds: 120 + url_field: product_url + allowed_hosts: + - www.amazon.com + - amazon.com + provider_image_ref: ghcr.io/gocodealone/workflow-plugin-product-capture/product-capture-browser@sha256: + capture_timeout_seconds: 60 + max_html_bytes: 1048576 + max_image_count: 8 + poll_interval: 2s + wait_timeout: 5m +``` + +The step submits a generic `provider` workload with operation +`capture_product`. It does not call a product-capture-specific wfcompute API. + +## Application Handling + +BuyMyWishlist should treat the proof preview as user-confirmation data, not as a +silent purchase instruction. Expected fields include `title`, `canonical_url`, +`external_id`, `price`, `currency`, `seller`, `ships_from`, +`shipping_summary`, `image_url`, `images`, `availability`, and +`requires_user_confirmation`. + +The app should persist the wfcompute `task_id`, `proof_id`, artifact hash, and +selected preview fields with the wishlist item. It should not store raw HTML, +provider cookies, wfcompute admin credentials, browser runtime paths, or +operator-only artifacts. + +## Failure Handling + +- If the step returns `error`, keep the wishlist item in a user-actionable + review state. +- If no accepted proof arrives before `wait_timeout`, retry by submitting a new + task rather than mutating the old task. +- If wfcompute reports no compatible agent capacity, keep capture disabled for + live traffic until the provider package and `product-capture-browser` agents + are promoted again. +- If the product URL host is outside `allowed_hosts`, reject it in + BuyMyWishlist before submission. From 824feb03a3ca9a21fceea3838d781c950778cda1 Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Sat, 6 Jun 2026 02:46:36 -0400 Subject: [PATCH 3/3] test: harden compute SDK boundary check --- module_dependency_test.go | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/module_dependency_test.go b/module_dependency_test.go index 28dcd2e..f94281a 100644 --- a/module_dependency_test.go +++ b/module_dependency_test.go @@ -1,7 +1,9 @@ -package productcapture +package productcapture_test import ( + "io/fs" "os" + "path/filepath" "strings" "testing" ) @@ -18,18 +20,24 @@ func TestProductCaptureUsesPublicComputeCoreSDK(t *testing.T) { if !strings.Contains(text, "github.com/GoCodeAlone/workflow-plugin-compute-core") { t.Fatal("product-capture must consume the public workflow-plugin-compute-core SDK") } - for _, path := range []string{ - "internal/plugin/sign.go", - "internal/plugin/step.go", - "internal/plugin/plugin_test.go", - } { + err = filepath.WalkDir("internal/plugin", func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if d.IsDir() || filepath.Ext(path) != ".go" { + return nil + } data, err := os.ReadFile(path) if err != nil { - t.Fatalf("read %s: %v", path, err) + return err } if strings.Contains(string(data), "github.com/GoCodeAlone/workflow-compute/pkg/protocol") { t.Fatalf("%s imports private workflow-compute protocol package", path) } + return nil + }) + if err != nil { + t.Fatalf("scan internal/plugin: %v", err) } if _, err := os.Stat("internal/plugin/client.go"); err == nil { t.Fatal("product-capture must use compute-core protocol.Client instead of a duplicate local compute client")