Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions protocol/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,97 @@ type ExecutorCapabilities struct {
ExecutionTiers []ExecutionSecurityTier `json:"execution_tiers,omitempty"`
}

type ProviderCapabilityStatus string

const (
ProviderCapabilitySupported ProviderCapabilityStatus = "supported"
ProviderCapabilityDegraded ProviderCapabilityStatus = "degraded"
ProviderCapabilityUnsupported ProviderCapabilityStatus = "unsupported"
)

type ProviderCapabilityReport struct {
Provider string `json:"provider"`
Status ProviderCapabilityStatus `json:"status"`
Reason string `json:"reason,omitempty"`
}

type PlacementRequirementCapabilities struct {
CapabilityTags []string `json:"capability_tags,omitempty"`
ExecutorProviders []string `json:"executor_providers,omitempty"`
ExecutionTiers []ExecutionSecurityTier `json:"execution_tiers,omitempty"`
ProofTiers []ProofTier `json:"proof_tiers,omitempty"`
CapabilityReports []ProviderCapabilityReport `json:"capability_reports,omitempty"`
Executors []ExecutorRef `json:"executors,omitempty"`
}

func ValidatePlacementRequirementsAgainstCapabilities(req PlacementRequirements, caps PlacementRequirementCapabilities) error {
var errs []error
workerCapabilityTags := map[string]struct{}{}
for _, tag := range caps.CapabilityTags {
workerCapabilityTags[tag] = struct{}{}
}
requiredCapabilityTags := map[string]struct{}{}
for i, tag := range req.RequiredCapabilities {
if err := validateIdentifier(fmt.Sprintf("task requirements required_capabilities[%d]", i), tag); err != nil {
errs = append(errs, err)
continue
}
if _, ok := requiredCapabilityTags[tag]; ok {
errs = append(errs, fmt.Errorf("task requirements required_capabilities[%d] %q is duplicated", i, tag))
}
requiredCapabilityTags[tag] = struct{}{}
if _, ok := workerCapabilityTags[tag]; !ok {
errs = append(errs, fmt.Errorf("worker missing required capability %q", tag))
}
}
if req.ExecutionSecurityTier != "" && !validExecutionSecurityTier(req.ExecutionSecurityTier) {
errs = append(errs, fmt.Errorf("task requirements execution_security_tier %q is unknown", req.ExecutionSecurityTier))
}
if req.ProofTier != "" && !validProofTier(req.ProofTier) {
errs = append(errs, fmt.Errorf("task requirements proof_tier %q is unknown", req.ProofTier))
}
if req.ExecutorProvider != "" && !contains(caps.ExecutorProviders, req.ExecutorProvider) {
errs = append(errs, fmt.Errorf("worker does not support executor provider %q", req.ExecutorProvider))
}
if report, ok := providerCapabilityReportForProvider(caps.CapabilityReports, req.ExecutorProvider); ok && report.Status != ProviderCapabilitySupported {
errs = append(errs, fmt.Errorf("worker executor provider %q is %s: %s", req.ExecutorProvider, report.Status, report.Reason))
}
if req.ExecutionSecurityTier != "" && !contains(caps.ExecutionTiers, req.ExecutionSecurityTier) {
errs = append(errs, fmt.Errorf("worker does not support execution security tier %q", req.ExecutionSecurityTier))
}
if req.ProofTier != "" && !contains(caps.ProofTiers, req.ProofTier) {
errs = append(errs, fmt.Errorf("worker does not support proof tier %q", req.ProofTier))
}
if len(caps.Executors) > 0 && !ExecutorCapabilitiesHaveSupportedMatch(req, caps) {
errs = append(errs, errors.New("worker has no supported executor matching task placement requirements"))
}
return errors.Join(errs...)
}

func ExecutorCapabilitiesHaveSupportedMatch(req PlacementRequirements, caps PlacementRequirementCapabilities) bool {
for _, executor := range caps.Executors {
if ExecutorMatchesPlacementRequirements(executor, req) {
report, ok := providerCapabilityReportForProvider(caps.CapabilityReports, executor.Provider)
if !ok || report.Status == ProviderCapabilitySupported {
return true
}
}
}
return false
}

func providerCapabilityReportForProvider(reports []ProviderCapabilityReport, provider string) (ProviderCapabilityReport, bool) {
if provider == "" {
return ProviderCapabilityReport{}, false
}
for _, report := range reports {
if report.Provider == provider {
return report, true
}
}
return ProviderCapabilityReport{}, false
}

func ResourceLimitsRequireResourceConstrainedExecutor(limits ResourceLimits) bool {
return limits.CPUPercent > 0 || limits.MemoryBytes > 0
}
Expand Down
71 changes: 71 additions & 0 deletions protocol/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,77 @@ func TestExecutorCapabilitiesHaveResourceConstrainedMatch(t *testing.T) {
}
}

func TestValidatePlacementRequirementsAgainstCapabilities(t *testing.T) {
req := protocol.PlacementRequirements{
ExecutorProvider: "sandboxed-command",
ExecutionSecurityTier: protocol.ExecutionSandboxedContainer,
ProofTier: protocol.ProofArtifactHash,
RequiredCapabilities: []string{"mobile", "mobile", "bad tag"},
}
caps := protocol.PlacementRequirementCapabilities{
CapabilityTags: []string{"mobile"},
ExecutorProviders: []string{"other-provider"},
ExecutionTiers: []protocol.ExecutionSecurityTier{protocol.ExecutionTrustedNative},
ProofTiers: []protocol.ProofTier{protocol.ProofReceiptOnly},
CapabilityReports: []protocol.ProviderCapabilityReport{{
Provider: "sandboxed-command",
Status: protocol.ProviderCapabilityUnsupported,
Reason: "runtime unavailable",
}},
Executors: []protocol.ExecutorRef{{
Provider: "sandboxed-command",
ExecutionSecurityTier: protocol.ExecutionTrustedNative,
ProofTier: protocol.ProofReceiptOnly,
}},
}

err := protocol.ValidatePlacementRequirementsAgainstCapabilities(req, caps)
if err == nil {
t.Fatal("expected placement requirements to reject incompatible capabilities")
}
for _, want := range []string{
"task requirements required_capabilities[1] \"mobile\" is duplicated",
"task requirements required_capabilities[2] must not contain whitespace",
`worker does not support executor provider "sandboxed-command"`,
`worker executor provider "sandboxed-command" is unsupported: runtime unavailable`,
`worker does not support execution security tier "sandboxed-container"`,
`worker does not support proof tier "artifact-hash"`,
"worker has no supported executor matching task placement requirements",
} {
if !strings.Contains(err.Error(), want) {
t.Fatalf("expected placement error to contain %q, got %v", want, err)
}
}

validCaps := protocol.PlacementRequirementCapabilities{
CapabilityTags: []string{"mobile"},
ExecutorProviders: []string{"sandboxed-command"},
ExecutionTiers: []protocol.ExecutionSecurityTier{protocol.ExecutionSandboxedContainer},
ProofTiers: []protocol.ProofTier{protocol.ProofArtifactHash},
Executors: []protocol.ExecutorRef{{
Provider: "sandboxed-command",
ExecutionSecurityTier: protocol.ExecutionSandboxedContainer,
ProofTier: protocol.ProofArtifactHash,
}},
}
validReq := req
validReq.RequiredCapabilities = []string{"mobile"}
if err := protocol.ValidatePlacementRequirementsAgainstCapabilities(validReq, validCaps); err != nil {
t.Fatalf("compatible placement requirements rejected: %v", err)
}

unknownReq := protocol.PlacementRequirements{
ExecutionSecurityTier: protocol.ExecutionSecurityTier("magic-vm"),
ProofTier: protocol.ProofTier("pinkie-promise"),
}
err = protocol.ValidatePlacementRequirementsAgainstCapabilities(unknownReq, protocol.PlacementRequirementCapabilities{})
if err == nil ||
!strings.Contains(err.Error(), "task requirements execution_security_tier") ||
!strings.Contains(err.Error(), "task requirements proof_tier") {
t.Fatalf("expected unknown tier errors, got %v", err)
}
}

func TestResourceLimitsRejectNegativeValues(t *testing.T) {
limits := protocol.ResourceLimits{
CPUPercent: -1,
Expand Down
Loading