Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions docs/plans/2026-05-24-runtime-execution-contracts-design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Runtime Execution Contracts Design

## Goal

Expose the public request/result shapes runtime plugins need without moving
`workflow-compute` host execution internals into compute-core. This is the next
step after runtime descriptor extraction: plugin authors can now share the same
invocation metadata, result evidence, resource usage, and service health
evidence while `workflow-compute` keeps lease authorization, workspace paths,
network binding, mounts, and supervisor integration local.

## Design

`RuntimeExecutionRequest` carries host-independent invocation metadata:
protocol version, task and lease IDs, workload kind, optional provider config,
operation, JSON input, environment values, and resource limits. It deliberately
does not include `Task`, `Lease`, workspace paths, mounted volumes, or network
callback functions.

`RuntimeExecutionResult` carries generic short-lived runtime output: timing,
exit code, stdout/stderr bytes, artifacts, result preview, artifact hash, and
resource usage.

`RuntimeServiceResult` carries service runtime health and response evidence:
timing, request/response hashes, resource usage, SLO evidence, and optional
status evidence. `SLOEvidence` and `ServiceStatusEvidence` move to compute-core
because service runtime plugins and hosts both need the same health contract.
Hashes in service status evidence are validated with the same canonical
`sha256:<64 hex chars>` shape as request and response evidence.

## Assumptions

- Runtime plugins need a stable public envelope before they need full host
execution adapters.
- Runtime output may include bytes and previews, but host policy still decides
retention, upload, signing, proof verification, and reward handling.
- Service health evidence is generic enough for compute-core; service leases,
ingress claims, and durable session lifecycle remain host/application
concerns for now.

## Rollback

Rollback is reverting these additive contracts and keeping the equivalent
runtime result/evidence structs local in `workflow-compute`. The JSON fields
match the existing host structs, so no state migration is introduced.

## Self-Challenge

- Laziest solution: keep only descriptors in compute-core. Rejected because a
runtime plugin cannot be useful without a stable result and evidence shape.
- Fragile assumption: `RuntimeExecutionRequest` may be too generic for every
runtime. The mitigation is that provider-specific payload stays in JSON
`input`, while host-only capabilities stay out of the contract.
- Security risk: exposing env values could normalize secret passing through
plugin contracts. This contract only describes already-resolved runtime env;
secret resolution and authorization remain host-owned.
- Validation gap: status evidence has its own hashes. The contract validates
those hashes so service health probes cannot accidentally emit ambiguous
digest strings while request and response hashes remain strict.
155 changes: 155 additions & 0 deletions protocol/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,161 @@ func (l ResourceLimits) Validate() error {
return errors.Join(errs...)
}

type RuntimeExecutionRequest struct {
ProtocolVersion string `json:"protocol_version"`
TaskID string `json:"task_id"`
LeaseID string `json:"lease_id"`
WorkloadKind WorkloadKind `json:"workload_kind"`
ProviderConfig ProviderConfig `json:"provider_config,omitzero"`
Operation string `json:"operation,omitempty"`
Input json.RawMessage `json:"input,omitempty"`
Env map[string]string `json:"env,omitempty"`
Limits ResourceLimits `json:"limits,omitzero"`
}

func (r RuntimeExecutionRequest) Validate() error {
var errs []error
if r.ProtocolVersion != Version {
errs = append(errs, fmt.Errorf("protocol_version must be %q", Version))
}
if strings.TrimSpace(r.TaskID) == "" {
errs = append(errs, errors.New("task_id is required"))
}
if strings.TrimSpace(r.LeaseID) == "" {
errs = append(errs, errors.New("lease_id is required"))
}
if !validWorkloadKind(r.WorkloadKind) {
errs = append(errs, fmt.Errorf("workload_kind %q is unsupported", r.WorkloadKind))
}
if r.ProviderConfig != (ProviderConfig{}) {
if err := r.ProviderConfig.Validate(); err != nil {
errs = append(errs, fmt.Errorf("provider_config: %w", err))
}
}
if strings.ContainsAny(r.Operation, " \t\r\n\x00") {
errs = append(errs, errors.New("operation must not contain whitespace"))
}
if len(r.Input) > 0 && !json.Valid(r.Input) {
errs = append(errs, errors.New("input must be valid JSON"))
}
if err := r.Limits.Validate(); err != nil {
errs = append(errs, fmt.Errorf("resource_limits: %w", err))
}
return errors.Join(errs...)
}

const MaxRuntimeResultPreviewBytes = 16 * 1024

type RuntimeExecutionResult struct {
StartedAt time.Time `json:"started_at,omitempty"`
FinishedAt time.Time `json:"finished_at,omitempty"`
ExitCode int `json:"exit_code,omitempty"`
Stdout []byte `json:"stdout,omitempty"`
Stderr []byte `json:"stderr,omitempty"`
ArtifactHash string `json:"artifact_hash,omitempty"`
Artifacts []string `json:"artifacts,omitempty"`
ResultPreview map[string]any `json:"result_preview,omitempty"`
ResourceUsage ResourceUsage `json:"resource_usage,omitzero"`
}

func (r RuntimeExecutionResult) Validate() error {
var errs []error
if !r.StartedAt.IsZero() && !r.FinishedAt.IsZero() && r.FinishedAt.Before(r.StartedAt) {
errs = append(errs, errors.New("finished_at must be after started_at"))
}
if r.ArtifactHash != "" && !validSHA256Ref(r.ArtifactHash) {
errs = append(errs, errors.New("artifact_hash must be sha256:<64 hex chars>"))
}
if len(r.ResultPreview) > 0 {
data, err := json.Marshal(r.ResultPreview)
if err != nil {
errs = append(errs, fmt.Errorf("result_preview must be JSON-serializable: %w", err))
} else if len(data) > MaxRuntimeResultPreviewBytes {
errs = append(errs, fmt.Errorf("result_preview must be at most %d bytes", MaxRuntimeResultPreviewBytes))
}
}
return errors.Join(errs...)
}

type SLOEvidence struct {
LatencyMillis int64 `json:"latency_millis,omitempty"`
StatusCode int `json:"status_code,omitempty"`
DeadlineMS int64 `json:"deadline_ms,omitempty"`
Healthy bool `json:"healthy,omitempty"`
}

func (e SLOEvidence) Validate() error {
var errs []error
if e.StatusCode <= 0 {
errs = append(errs, errors.New("status_code is required"))
}
if e.LatencyMillis <= 0 {
errs = append(errs, errors.New("latency_millis is required"))
}
if e.DeadlineMS < 0 {
errs = append(errs, errors.New("deadline_ms must not be negative"))
}
return errors.Join(errs...)
}

type ServiceStatusEvidence struct {
CommandHash string `json:"command_hash,omitempty"`
OutputHash string `json:"output_hash,omitempty"`
Preview string `json:"preview,omitempty"`
Truncated bool `json:"truncated,omitempty"`
}

func (e ServiceStatusEvidence) Validate() error {
var errs []error
for _, field := range []struct {
name string
value string
}{
{"command_hash", e.CommandHash},
{"output_hash", e.OutputHash},
} {
if field.value != "" && !validSHA256Ref(field.value) {
errs = append(errs, fmt.Errorf("%s must be sha256:<64 hex chars>", field.name))
}
}
return errors.Join(errs...)
}

type RuntimeServiceResult struct {
StartedAt time.Time `json:"started_at,omitempty"`
FinishedAt time.Time `json:"finished_at,omitempty"`
RequestHash string `json:"request_hash,omitempty"`
ResponseHash string `json:"response_hash,omitempty"`
ResourceUsage ResourceUsage `json:"resource_usage,omitzero"`
SLOEvidence SLOEvidence `json:"slo_evidence,omitzero"`
StatusEvidence ServiceStatusEvidence `json:"status_evidence,omitzero"`
}

func (r RuntimeServiceResult) Validate() error {
var errs []error
if !r.StartedAt.IsZero() && !r.FinishedAt.IsZero() && r.FinishedAt.Before(r.StartedAt) {
errs = append(errs, errors.New("finished_at must be after started_at"))
}
for _, field := range []struct {
name string
value string
}{
{"request_hash", r.RequestHash},
{"response_hash", r.ResponseHash},
} {
if field.value != "" && !validSHA256Ref(field.value) {
errs = append(errs, fmt.Errorf("%s must be sha256:<64 hex chars>", field.name))
}
}
if err := r.SLOEvidence.Validate(); err != nil {
errs = append(errs, fmt.Errorf("slo_evidence: %w", err))
}
if err := r.StatusEvidence.Validate(); err != nil {
errs = append(errs, fmt.Errorf("status_evidence: %w", err))
}
return errors.Join(errs...)
}

type NetworkMode string

const (
Expand Down
106 changes: 106 additions & 0 deletions protocol/types_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package protocol_test

import (
"encoding/json"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -38,6 +39,111 @@ func TestRuntimeDescriptorProducesExecutorRef(t *testing.T) {
}
}

func TestRuntimeExecutionRequestValidatesHostIndependentInvocation(t *testing.T) {
req := protocol.RuntimeExecutionRequest{
ProtocolVersion: protocol.Version,
TaskID: "task-1",
LeaseID: "lease-1",
WorkloadKind: protocol.WorkloadProvider,
ProviderConfig: protocol.ProviderConfig{
PluginID: "workflow-plugin-example",
ProviderID: "example",
ContractID: "example.v1",
Version: "v1.0.0",
ConfigRef: "config://providers/example",
},
Operation: "capture_product",
Input: mustRawMessage(t, map[string]string{"url": "https://example.test"}),
Env: map[string]string{"MODE": "test"},
Limits: protocol.ResourceLimits{RuntimeSeconds: 30, OutputBytes: 1024},
}

if err := req.Validate(); err != nil {
t.Fatalf("request invalid: %v", err)
}
}

func mustRawMessage(t *testing.T, value any) json.RawMessage {
t.Helper()
data, err := json.Marshal(value)
if err != nil {
t.Fatalf("marshal raw message: %v", err)
}
return data
}

func TestRuntimeExecutionRequestRejectsMalformedInvocation(t *testing.T) {
req := protocol.RuntimeExecutionRequest{
ProtocolVersion: "wrong",
TaskID: "task-1",
LeaseID: "lease-1",
WorkloadKind: protocol.WorkloadProvider,
Limits: protocol.ResourceLimits{RuntimeSeconds: -1},
}

err := req.Validate()
if err == nil {
t.Fatal("expected malformed request to fail")
}
if !strings.Contains(err.Error(), "protocol_version") ||
!strings.Contains(err.Error(), "resource_limits") {
t.Fatalf("expected protocol/resource errors, got %v", err)
}
}

func TestRuntimeExecutionResultValidatesTimingAndPreview(t *testing.T) {
result := protocol.RuntimeExecutionResult{
StartedAt: time.Unix(10, 0).UTC(),
FinishedAt: time.Unix(9, 0).UTC(),
ResultPreview: map[string]any{"payload": strings.Repeat("x", protocol.MaxRuntimeResultPreviewBytes+1)},
}

err := result.Validate()
if err == nil {
t.Fatal("expected malformed result to fail")
}
if !strings.Contains(err.Error(), "finished_at") ||
!strings.Contains(err.Error(), "result_preview") {
t.Fatalf("expected timing/preview errors, got %v", err)
}
}

func TestRuntimeServiceResultValidatesSLOEvidence(t *testing.T) {
result := protocol.RuntimeServiceResult{
StartedAt: time.Unix(1, 0).UTC(),
FinishedAt: time.Unix(2, 0).UTC(),
RequestHash: protocol.CanonicalHash("request"),
ResponseHash: protocol.CanonicalHash("response"),
SLOEvidence: protocol.SLOEvidence{StatusCode: 200, LatencyMillis: 3, Healthy: true},
}

if err := result.Validate(); err != nil {
t.Fatalf("service result invalid: %v", err)
}

result.SLOEvidence.StatusCode = 0
if err := result.Validate(); err == nil || !strings.Contains(err.Error(), "status_code") {
t.Fatalf("expected status code error, got %v", err)
}
}

func TestRuntimeServiceResultValidatesStatusEvidenceHashes(t *testing.T) {
result := protocol.RuntimeServiceResult{
RequestHash: protocol.CanonicalHash("request"),
ResponseHash: protocol.CanonicalHash("response"),
SLOEvidence: protocol.SLOEvidence{StatusCode: 200, LatencyMillis: 3, Healthy: true},
StatusEvidence: protocol.ServiceStatusEvidence{
CommandHash: "not-a-hash",
OutputHash: protocol.CanonicalHash("output"),
},
}

err := result.Validate()
if err == nil || !strings.Contains(err.Error(), "command_hash") {
t.Fatalf("expected command hash error, got %v", err)
}
}

func TestRuntimeDescriptorFallsBackToProviderNameAndDevVersion(t *testing.T) {
ref := (protocol.RuntimeDescriptor{}).ExecutorRef("command")

Expand Down
Loading