From b6eaf5f074e3e1f61270ed6f23ed57d56d843c0e Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Wed, 27 May 2026 03:37:02 -0400 Subject: [PATCH 1/6] feat: publish network audit evidence contract --- .github/workflows/ci.yml | 10 +- .github/workflows/release-candidate.yml | 46 ++ .github/workflows/release.yml | 10 +- .goreleaser.yaml | 4 + buf.gen.yaml | 6 + buf.yaml | 9 + descriptors/network_audit.pb | 48 ++ plugin.contracts.json | 22 +- plugin.json | 2 +- .../protocol/v1/network_audit.fields.json | 46 ++ .../protocol/v1/network_audit.proto | 51 ++ protocol/network_audit_metadata_test.go | 95 +++ protocol/network_audit_projection_test.go | 192 +++++ protocol/network_audit_test.go | 273 +++++++ protocol/pb/network_audit.pb.go | 558 ++++++++++++++ protocol/types.go | 702 ++++++++++++++++++ scripts/check-proto.sh | 8 + scripts/check-wfctl-action-pin.sh | 53 ++ scripts/check-workflow-engine-load.sh | 49 ++ scripts/generate-proto.sh | 8 + tools.go | 5 + 21 files changed, 2186 insertions(+), 11 deletions(-) create mode 100644 .github/workflows/release-candidate.yml create mode 100644 buf.gen.yaml create mode 100644 buf.yaml create mode 100644 descriptors/network_audit.pb create mode 100644 proto/workflow_plugin_compute_core/protocol/v1/network_audit.fields.json create mode 100644 proto/workflow_plugin_compute_core/protocol/v1/network_audit.proto create mode 100644 protocol/network_audit_metadata_test.go create mode 100644 protocol/network_audit_projection_test.go create mode 100644 protocol/network_audit_test.go create mode 100644 protocol/pb/network_audit.pb.go create mode 100755 scripts/check-proto.sh create mode 100755 scripts/check-wfctl-action-pin.sh create mode 100755 scripts/check-workflow-engine-load.sh create mode 100755 scripts/generate-proto.sh create mode 100644 tools.go diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6b0087c..ba78805 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -27,8 +27,12 @@ jobs: - uses: actions/setup-go@v6 with: go-version-file: go.mod - - uses: GoCodeAlone/setup-wfctl@v1 + - uses: GoCodeAlone/setup-wfctl@bcd880980f5bbe8d192d0c20ff6279d25331f956 with: - version: v0.63.1 + version: v0.64.7 - name: Validate plugin contract - run: wfctl plugin validate-contract . + run: wfctl plugin validate-contract --require-contract-kind message . + - name: Verify wfctl action pin + run: ./scripts/check-wfctl-action-pin.sh --workflow .github/workflows/ci.yml --workflow .github/workflows/release.yml --workflow .github/workflows/release-candidate.yml --wfctl-version v0.64.7 + - name: Verify proto descriptors + run: ./scripts/check-proto.sh diff --git a/.github/workflows/release-candidate.yml b/.github/workflows/release-candidate.yml new file mode 100644 index 0000000..d075b34 --- /dev/null +++ b/.github/workflows/release-candidate.yml @@ -0,0 +1,46 @@ +name: Release Candidate +on: + workflow_dispatch: + inputs: + wfctl_version: + description: wfctl version used for contract validation + required: true + type: string + candidate_tag: + description: Candidate semver tag used for snapshot validation + required: true + type: string +permissions: + contents: read +jobs: + validate: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + with: + fetch-depth: 0 + - uses: actions/setup-go@v6 + with: + go-version-file: go.mod + - uses: GoCodeAlone/setup-wfctl@bcd880980f5bbe8d192d0c20ff6279d25331f956 + with: + version: ${{ inputs.wfctl_version }} + - name: Check proto generation + run: ./scripts/check-proto.sh + - name: Check plugin contract + run: ./scripts/check-workflow-engine-load.sh --mode public --wfctl-version "${{ inputs.wfctl_version }}" + - name: Build + run: go build ./... + - name: Test + run: go test ./... -race -count=1 + - name: Vet + run: go vet ./... + - name: GoReleaser snapshot + uses: goreleaser/goreleaser-action@v7 + with: + distribution: goreleaser + version: '~> v2' + args: release --snapshot --clean + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + GORELEASER_CURRENT_TAG: ${{ inputs.candidate_tag }} diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 6ff83f6..22a0d2a 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -15,11 +15,11 @@ jobs: - uses: actions/setup-go@v6 with: go-version-file: go.mod - - uses: GoCodeAlone/setup-wfctl@v1 + - uses: GoCodeAlone/setup-wfctl@bcd880980f5bbe8d192d0c20ff6279d25331f956 with: - version: v0.63.1 + version: v0.64.7 - name: Validate plugin contract for publish (pre-build) - run: wfctl plugin validate-contract --for-publish --tag "${{ github.ref_name }}" . + run: wfctl plugin validate-contract --require-contract-kind message --for-publish --tag "${{ github.ref_name }}" . - uses: goreleaser/goreleaser-action@v7 with: distribution: goreleaser @@ -30,9 +30,9 @@ jobs: - name: Verify shipped plugin.json carries tag (post-build) run: | if [ -f .release/plugin.json ]; then - wfctl plugin validate-contract --for-publish --tag "${{ github.ref_name }}" --release-dir .release . + wfctl plugin validate-contract --require-contract-kind message --for-publish --tag "${{ github.ref_name }}" --release-dir .release . else - wfctl plugin validate-contract --for-publish --tag "${{ github.ref_name }}" --release-dir . . + wfctl plugin validate-contract --require-contract-kind message --for-publish --tag "${{ github.ref_name }}" --release-dir . . fi # workflow#765: runtime truth-check via plugin verify-capabilities. # Spawns the built plugin binary, calls PluginService.GetManifest via raw gRPC, diff --git a/.goreleaser.yaml b/.goreleaser.yaml index 2cf5b39..f8d01fa 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -26,6 +26,10 @@ archives: name_template: "{{ .ProjectName }}-{{ .Os }}-{{ .Arch }}" files: - plugin.json + - plugin.contracts.json + - descriptors/network_audit.pb + - proto/workflow_plugin_compute_core/protocol/v1/network_audit.proto + - proto/workflow_plugin_compute_core/protocol/v1/network_audit.fields.json - LICENSE checksum: diff --git a/buf.gen.yaml b/buf.gen.yaml new file mode 100644 index 0000000..da19392 --- /dev/null +++ b/buf.gen.yaml @@ -0,0 +1,6 @@ +version: v2 +plugins: + - local: protoc-gen-go + out: . + opt: + - module=github.com/GoCodeAlone/workflow-plugin-compute-core diff --git a/buf.yaml b/buf.yaml new file mode 100644 index 0000000..c7e30e3 --- /dev/null +++ b/buf.yaml @@ -0,0 +1,9 @@ +version: v2 +modules: + - path: proto +lint: + use: + - STANDARD +breaking: + use: + - FILE diff --git a/descriptors/network_audit.pb b/descriptors/network_audit.pb new file mode 100644 index 0000000..ef10b58 --- /dev/null +++ b/descriptors/network_audit.pb @@ -0,0 +1,48 @@ + +½ + labels = 9; + int64 started_at_unix_nano = 10; + int64 finished_at_unix_nano = 11; + int64 observed_at_unix_nano = 12; +} + +message NetworkAuditProviderEvidence { + string provider_id = 1; + string plugin_name = 2; + string plugin_version = 3; + string contract_id = 4; + string contract_version = 5; + string descriptor_digest = 6; +} + +message NetworkAuditDestination { + string kind = 1; + string value = 2; +} + +message NetworkAuditResourceUsage { + int64 cpu_millis = 1; + int64 gpu_millis = 2; + int64 max_memory_bytes = 3; + int64 network_rx_bytes = 4; + int64 network_tx_bytes = 5; + int64 workspace_bytes = 6; + int64 output_bytes = 7; + string limit_hit = 8; +} + +message NetworkAuditValidationIssue { + string code = 1; + string field = 2; + string message = 3; +} diff --git a/protocol/network_audit_metadata_test.go b/protocol/network_audit_metadata_test.go new file mode 100644 index 0000000..1bca777 --- /dev/null +++ b/protocol/network_audit_metadata_test.go @@ -0,0 +1,95 @@ +package protocol_test + +import ( + "encoding/json" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/GoCodeAlone/workflow-plugin-compute-core/protocol" +) + +func TestNetworkAuditStaticMessageContractMetadata(t *testing.T) { + data, err := os.ReadFile(filepath.Join("..", "plugin.contracts.json")) + if err != nil { + t.Fatalf("read plugin.contracts.json: %v", err) + } + var contracts struct { + DescriptorSetRef string `json:"descriptorSetRef"` + Contracts []struct { + Kind string `json:"kind"` + ContractType string `json:"contractType"` + Mode string `json:"mode"` + ProtoPackage string `json:"protoPackage"` + MessageNames []string `json:"messageNames"` + GoImportPath string `json:"goImportPath"` + SchemaDigest string `json:"schemaDigest"` + ProtocolVersion string `json:"protocolVersion"` + } `json:"contracts"` + } + if err := json.Unmarshal(data, &contracts); err != nil { + t.Fatalf("parse plugin.contracts.json: %v", err) + } + if contracts.DescriptorSetRef != "descriptors/network_audit.pb" { + t.Fatalf("descriptorSetRef = %q", contracts.DescriptorSetRef) + } + var found bool + for _, contract := range contracts.Contracts { + if contract.ContractType != "compute.network_audit_evidence.v1" { + continue + } + found = true + if contract.Kind != "message" || contract.Mode != "strict" { + t.Fatalf("unexpected contract kind/mode: %#v", contract) + } + if contract.ProtoPackage != "workflow_plugin_compute_core.protocol.v1" { + t.Fatalf("protoPackage = %q", contract.ProtoPackage) + } + if contract.GoImportPath != "github.com/GoCodeAlone/workflow-plugin-compute-core/protocol/pb" { + t.Fatalf("goImportPath = %q", contract.GoImportPath) + } + if contract.SchemaDigest != protocol.NetworkAuditDescriptorDigest() { + t.Fatalf("schemaDigest = %q, want %q", contract.SchemaDigest, protocol.NetworkAuditDescriptorDigest()) + } + if contract.ProtocolVersion != protocol.NetworkAuditProtocolVersion { + t.Fatalf("protocolVersion = %q", contract.ProtocolVersion) + } + wantMessages := map[string]bool{ + "workflow_plugin_compute_core.protocol.v1.NetworkAuditRecord": false, + "workflow_plugin_compute_core.protocol.v1.NetworkAuditDestination": false, + "workflow_plugin_compute_core.protocol.v1.NetworkAuditValidationIssue": false, + } + for _, name := range contract.MessageNames { + if _, ok := wantMessages[name]; ok { + wantMessages[name] = true + } + } + for name, ok := range wantMessages { + if !ok { + t.Fatalf("messageNames missing %s: %#v", name, contract.MessageNames) + } + } + } + if !found { + t.Fatal("compute.network_audit_evidence.v1 message contract not found") + } +} + +func TestNetworkAuditReleaseArchiveIncludesContractArtifacts(t *testing.T) { + data, err := os.ReadFile(filepath.Join("..", ".goreleaser.yaml")) + if err != nil { + t.Fatalf("read .goreleaser.yaml: %v", err) + } + body := string(data) + for _, want := range []string{ + "plugin.contracts.json", + "descriptors/network_audit.pb", + "proto/workflow_plugin_compute_core/protocol/v1/network_audit.proto", + "proto/workflow_plugin_compute_core/protocol/v1/network_audit.fields.json", + } { + if !strings.Contains(body, want) { + t.Fatalf(".goreleaser.yaml archive files missing %s", want) + } + } +} diff --git a/protocol/network_audit_projection_test.go b/protocol/network_audit_projection_test.go new file mode 100644 index 0000000..d945c85 --- /dev/null +++ b/protocol/network_audit_projection_test.go @@ -0,0 +1,192 @@ +package protocol_test + +import ( + "strings" + "testing" + "time" + + "github.com/GoCodeAlone/workflow-plugin-compute-core/protocol" +) + +func TestNetworkAuditProjectDestination(t *testing.T) { + cases := []struct { + name string + raw string + kind protocol.NetworkAuditDestinationKind + }{ + {"endpoint", "https://collector.example.invalid/audit", protocol.NetworkAuditDestinationEndpoint}, + {"digest", protocol.CanonicalHash("destination"), protocol.NetworkAuditDestinationSHA256}, + {"artifact", "artifact://network-audit/evidence.json", protocol.NetworkAuditDestinationArtifact}, + {"lifecycle", "network-lifecycle://lease-1/egress", protocol.NetworkAuditDestinationLifecycle}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + destination, issues := protocol.ProjectNetworkAuditDestination(tc.raw) + if len(issues) != 0 { + t.Fatalf("expected projected destination, got issues: %#v", issues) + } + if destination.Kind != tc.kind || destination.Value != tc.raw { + t.Fatalf("unexpected destination: %#v", destination) + } + }) + } + + _, issues := protocol.ProjectNetworkAuditDestination("https://collector.example.invalid/audit?token=secret-token") + assertNetworkAuditIssue(t, issues, protocol.NetworkAuditValidationDestinationInvalid) + for _, issue := range issues { + if strings.Contains(issue.Message, "secret-token") { + t.Fatalf("destination issue leaked raw value: %#v", issue) + } + } +} + +func TestNetworkAuditProjectLifecycle(t *testing.T) { + destination, issues := protocol.ProjectNetworkAuditLifecycle("lease-1", "egress") + if len(issues) != 0 { + t.Fatalf("expected lifecycle projection, got issues: %#v", issues) + } + if destination.Kind != protocol.NetworkAuditDestinationLifecycle || destination.Value != "network-lifecycle://lease-1/egress" { + t.Fatalf("unexpected lifecycle destination: %#v", destination) + } + + created, err := protocol.NewNetworkAuditLifecycleDestination("lease-1", "dns") + if err != nil { + t.Fatalf("new lifecycle destination: %v", err) + } + if created.Value != "network-lifecycle://lease-1/dns" { + t.Fatalf("unexpected lifecycle destination: %#v", created) + } +} + +func TestNetworkAuditProjectLabelsProviderAndID(t *testing.T) { + labels, issues := protocol.ProjectNetworkAuditLabels(map[string]string{ + "scenario": "no-account", + }) + if len(issues) != 0 { + t.Fatalf("expected projected labels, got issues: %#v", issues) + } + labels["scenario"] = "mutated" + if got := labels["scenario"]; got != "mutated" { + t.Fatalf("expected local mutation to affect projected copy only, got %q", got) + } + + provider, issues := protocol.ProjectNetworkAuditProvider("local", "workflow-compute", "v1.2.3", "network-audit", "v1", protocol.CanonicalHash("descriptor")) + if len(issues) != 0 { + t.Fatalf("expected projected provider, got issues: %#v", issues) + } + if provider.ProviderID != "local" || provider.PluginName != "workflow-compute" { + t.Fatalf("unexpected provider projection: %#v", provider) + } + + left, err := protocol.ProjectNetworkAuditID("ab", "c") + if err != nil { + t.Fatalf("project id: %v", err) + } + right, err := protocol.ProjectNetworkAuditID("a", "bc") + if err != nil { + t.Fatalf("project id: %v", err) + } + if left == right { + t.Fatal("component digest projection collided") + } + if !strings.HasPrefix(left, "network-audit-sha256-") { + t.Fatalf("unexpected audit id: %q", left) + } + record := validNetworkAuditRecord() + record.RecordID = left + if issues := record.ValidateNetworkAudit(); len(issues) != 0 { + t.Fatalf("projected id did not validate as record id: %#v", issues) + } +} + +func TestNetworkAuditRefProjector(t *testing.T) { + record := validNetworkAuditRecord() + projector, err := protocol.NewNetworkAuditRefProjector([]byte("0123456789abcdef0123456789abcdef")) + if err != nil { + t.Fatalf("new projector: %v", err) + } + projection, err := projector.Project(record, protocol.NetworkAuditRefOptions{ + Stability: protocol.NetworkAuditRefStable, + Timestamp: record.StartedAt.In(time.FixedZone("test", -5*60*60)), + }) + if err != nil { + t.Fatalf("project ref: %v", err) + } + if projection.Epoch != protocol.NetworkAuditRefKeyEpoch { + t.Fatalf("unexpected epoch: %q", projection.Epoch) + } + if projection.Ref != "network-audit-ref-v1:stable:e1de3b6b44e27a56bc87cd84de9947cadcd77e69e768dad495f5b1766f8f3b19" { + t.Fatalf("unexpected ref vector: %q", projection.Ref) + } + if projection.Timestamp != "2023-11-14T22:13:20Z" { + t.Fatalf("timestamp was not normalized: %q", projection.Timestamp) + } + + ephemeral, err := projector.Project(record, protocol.NetworkAuditRefOptions{ + Stability: protocol.NetworkAuditRefEphemeral, + Timestamp: record.StartedAt.Add(time.Second), + }) + if err != nil { + t.Fatalf("project ephemeral ref: %v", err) + } + if ephemeral.Ref == projection.Ref || ephemeral.Stability != protocol.NetworkAuditRefEphemeral { + t.Fatalf("ephemeral ref was not distinct: %#v", ephemeral) + } + + mutated := record + mutated.ResourceUsage.NetworkTxBytes++ + mutatedProjection, err := projector.Project(mutated, protocol.NetworkAuditRefOptions{ + Stability: protocol.NetworkAuditRefStable, + Timestamp: record.StartedAt, + }) + if err != nil { + t.Fatalf("project mutated ref: %v", err) + } + if mutatedProjection.Ref == projection.Ref { + t.Fatal("ref projection did not bind full record content") + } +} + +func TestNetworkAuditLifecycleRejectsMalformedRefs(t *testing.T) { + record := validNetworkAuditRecord() + record.Destination = protocol.NetworkAuditDestination{ + Kind: protocol.NetworkAuditDestinationLifecycle, + Value: "network-lifecycle://lease-1/", + } + assertNetworkAuditIssue(t, record.ValidateNetworkAudit(), protocol.NetworkAuditValidationDestinationInvalid) +} + +func TestNetworkAuditRefProjectorRejectsWeakKeys(t *testing.T) { + _, err := protocol.NewNetworkAuditRefProjector([]byte("short-secret")) + if err == nil { + t.Fatal("expected weak key rejection") + } + if strings.Contains(err.Error(), "short-secret") { + t.Fatalf("weak key error leaked raw key: %v", err) + } +} + +func TestNetworkAuditLegacyClassification(t *testing.T) { + findings := protocol.ClassifyLegacyNetworkAuditRecord(map[string]any{ + "destination": "https://collector.example.invalid/audit?token=secret-token", + "labels": map[string]any{ + "bad/key": "secret-token", + }, + "resource_usage": map[string]any{ + "network_tx_bytes": -1, + }, + }) + wantCodes := []protocol.NetworkAuditValidationCode{ + protocol.NetworkAuditValidationDestinationInvalid, + protocol.NetworkAuditValidationLabelInvalid, + protocol.NetworkAuditValidationResourceUsageInvalid, + } + for _, code := range wantCodes { + assertNetworkAuditIssue(t, findings, code) + } + for _, finding := range findings { + if strings.Contains(finding.Message, "secret-token") { + t.Fatalf("legacy finding leaked raw value: %#v", finding) + } + } +} diff --git a/protocol/network_audit_test.go b/protocol/network_audit_test.go new file mode 100644 index 0000000..9fe9c5c --- /dev/null +++ b/protocol/network_audit_test.go @@ -0,0 +1,273 @@ +package protocol_test + +import ( + "strings" + "testing" + "time" + + "github.com/GoCodeAlone/workflow-plugin-compute-core/protocol" + pb "github.com/GoCodeAlone/workflow-plugin-compute-core/protocol/pb" + "google.golang.org/protobuf/encoding/protowire" + "google.golang.org/protobuf/proto" +) + +func TestNetworkAuditProtocolVersion(t *testing.T) { + if protocol.NetworkAuditProtocolVersion != "compute.v1alpha1" { + t.Fatalf("NetworkAuditProtocolVersion = %q", protocol.NetworkAuditProtocolVersion) + } +} + +func TestNetworkAuditDestinationValidation(t *testing.T) { + validDigest := protocol.CanonicalHash("destination") + cases := []struct { + name string + destination protocol.NetworkAuditDestination + wantCode protocol.NetworkAuditValidationCode + }{ + { + name: "endpoint", + destination: protocol.NetworkAuditDestination{ + Kind: protocol.NetworkAuditDestinationEndpoint, + Value: "https://collector.example.invalid/audit", + }, + }, + { + name: "sha256", + destination: protocol.NetworkAuditDestination{ + Kind: protocol.NetworkAuditDestinationSHA256, + Value: validDigest, + }, + }, + { + name: "artifact", + destination: protocol.NetworkAuditDestination{ + Kind: protocol.NetworkAuditDestinationArtifact, + Value: "artifact://network-audit/evidence.json", + }, + }, + { + name: "network lifecycle", + destination: protocol.NetworkAuditDestination{ + Kind: protocol.NetworkAuditDestinationLifecycle, + Value: "network-lifecycle://lease-123/egress", + }, + }, + { + name: "invalid endpoint", + destination: protocol.NetworkAuditDestination{ + Kind: protocol.NetworkAuditDestinationEndpoint, + Value: "https://collector.example.invalid/audit?token=secret-token", + }, + wantCode: protocol.NetworkAuditValidationDestinationInvalid, + }, + { + name: "invalid sha256", + destination: protocol.NetworkAuditDestination{ + Kind: protocol.NetworkAuditDestinationSHA256, + Value: "sha256:not-a-digest", + }, + wantCode: protocol.NetworkAuditValidationDestinationInvalid, + }, + { + name: "invalid artifact", + destination: protocol.NetworkAuditDestination{ + Kind: protocol.NetworkAuditDestinationArtifact, + Value: "artifact://../secret", + }, + wantCode: protocol.NetworkAuditValidationDestinationInvalid, + }, + { + name: "invalid network lifecycle", + destination: protocol.NetworkAuditDestination{ + Kind: protocol.NetworkAuditDestinationLifecycle, + Value: "network-lifecycle://lease-123/../secret", + }, + wantCode: protocol.NetworkAuditValidationDestinationInvalid, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + record := validNetworkAuditRecord() + record.Destination = tc.destination + issues := record.ValidateNetworkAudit() + if tc.wantCode == "" { + if len(issues) != 0 { + t.Fatalf("expected valid destination, got issues: %#v", issues) + } + return + } + assertNetworkAuditIssue(t, issues, tc.wantCode) + }) + } +} + +func TestNetworkAuditValidationIssuesAreTypedAndRedacted(t *testing.T) { + record := validNetworkAuditRecord() + rawSecret := "secret-token-value" + record.ProtocolVersion = "wrong-" + rawSecret + record.RecordID = "" + record.Destination = protocol.NetworkAuditDestination{ + Kind: protocol.NetworkAuditDestinationKind("side-channel"), + Value: "https://collector.example.invalid/audit?token=" + rawSecret, + } + record.ResourceUsage.CPUMillis = -1 + record.Labels = map[string]string{"bad/key": rawSecret} + record.FinishedAt = record.StartedAt.Add(-time.Second) + + issues := record.ValidateNetworkAudit() + wantCodes := []protocol.NetworkAuditValidationCode{ + protocol.NetworkAuditValidationProtocolVersionInvalid, + protocol.NetworkAuditValidationRecordIDRequired, + protocol.NetworkAuditValidationDestinationInvalid, + protocol.NetworkAuditValidationResourceUsageInvalid, + protocol.NetworkAuditValidationLabelInvalid, + protocol.NetworkAuditValidationTimeRangeInvalid, + } + for _, code := range wantCodes { + assertNetworkAuditIssue(t, issues, code) + } + for _, issue := range issues { + if issue.Code == "" { + t.Fatalf("issue missing typed code: %#v", issue) + } + if strings.Contains(issue.Message, rawSecret) { + t.Fatalf("issue leaked raw rejected value: %#v", issue) + } + } +} + +func TestNetworkAuditProtoRoundTripStrict(t *testing.T) { + record := validNetworkAuditRecord() + message := record.ToProto() + data, err := proto.MarshalOptions{Deterministic: true}.Marshal(message) + if err != nil { + t.Fatalf("marshal proto: %v", err) + } + + decoded, err := protocol.UnmarshalNetworkAuditRecordProtoStrict(data) + if err != nil { + t.Fatalf("strict unmarshal: %v", err) + } + roundTrip, err := protocol.NetworkAuditRecordFromProto(decoded) + if err != nil { + t.Fatalf("from proto: %v", err) + } + if got, want := protocol.CanonicalHash(roundTrip), protocol.CanonicalHash(record); got != want { + t.Fatalf("round trip hash mismatch: got %s want %s", got, want) + } +} + +func TestNetworkAuditProtoRejectsUnknownFields(t *testing.T) { + message := validNetworkAuditRecord().ToProto() + data, err := proto.Marshal(message) + if err != nil { + t.Fatalf("marshal proto: %v", err) + } + data = append(data, protowire.AppendTag(nil, 99, protowire.VarintType)...) + data = protowire.AppendVarint(data, 1) + if _, err := protocol.UnmarshalNetworkAuditRecordProtoStrict(data); err == nil { + t.Fatal("expected strict unmarshal to reject top-level unknown fields") + } +} + +func TestNetworkAuditProtoRejectsNestedUnknownFields(t *testing.T) { + message := validNetworkAuditRecord().ToProto() + message.Destination.ProtoReflect().SetUnknown(protowire.AppendVarint( + protowire.AppendTag(nil, 99, protowire.VarintType), + 1, + )) + data, err := proto.Marshal(message) + if err != nil { + t.Fatalf("marshal proto: %v", err) + } + if _, err := protocol.UnmarshalNetworkAuditRecordProtoStrict(data); err == nil { + t.Fatal("expected strict unmarshal to reject nested unknown fields") + } +} + +func TestNetworkAuditProtoRejectsInvalidIntegerRanges(t *testing.T) { + message := validNetworkAuditRecord().ToProto() + message.ResourceUsage.CpuMillis = -1 + if _, err := protocol.NetworkAuditRecordFromProto(message); err == nil { + t.Fatal("expected negative resource usage to be rejected") + } + + message = validNetworkAuditRecord().ToProto() + message.StartedAtUnixNano = -1 + if _, err := protocol.NetworkAuditRecordFromProto(message); err == nil { + t.Fatal("expected negative timestamp to be rejected") + } +} + +func TestNetworkAuditRejectsTooManyLabels(t *testing.T) { + record := validNetworkAuditRecord() + record.Labels = make(map[string]string, protocol.NetworkAuditMaxLabels+1) + for i := range protocol.NetworkAuditMaxLabels + 1 { + record.Labels["label"+string(rune('a'+i))] = "value" + } + issues := record.ValidateNetworkAudit() + assertNetworkAuditIssue(t, issues, protocol.NetworkAuditValidationLabelCountExceeded) +} + +func TestNetworkAuditGoProtoParity(t *testing.T) { + record := validNetworkAuditRecord() + protoRecord := record.ToProto() + if protoRecord.GetProtocolVersion() != record.ProtocolVersion { + t.Fatalf("protocol version mismatch") + } + got, err := protocol.NetworkAuditRecordFromProto(protoRecord) + if err != nil { + t.Fatalf("from proto: %v", err) + } + if got.ResourceUsage.NetworkTxBytes != record.ResourceUsage.NetworkTxBytes { + t.Fatalf("resource usage mismatch: got %d want %d", got.ResourceUsage.NetworkTxBytes, record.ResourceUsage.NetworkTxBytes) + } + + var _ *pb.NetworkAuditRecord = protoRecord +} + +func validNetworkAuditRecord() protocol.NetworkAuditRecord { + started := time.Unix(1_700_000_000, 0).UTC() + return protocol.NetworkAuditRecord{ + ProtocolVersion: protocol.NetworkAuditProtocolVersion, + RecordID: "audit-record-1", + TaskID: "task-1", + LeaseID: "lease-1", + WorkerID: "worker-1", + Provider: protocol.NetworkAuditProviderEvidence{ + ProviderID: "local", + PluginName: "workflow-compute", + PluginVersion: "v1.2.3", + ContractID: "network-audit", + ContractVersion: "v1", + DescriptorDigest: protocol.CanonicalHash("descriptor"), + }, + Destination: protocol.NetworkAuditDestination{ + Kind: protocol.NetworkAuditDestinationEndpoint, + Value: "https://collector.example.invalid/audit", + }, + ResourceUsage: protocol.ResourceUsage{ + CPUMillis: 10, + MaxMemoryBytes: 4096, + NetworkRxBytes: 128, + NetworkTxBytes: 256, + }, + Labels: map[string]string{ + "scenario": "no-account", + }, + StartedAt: started, + FinishedAt: started.Add(time.Second), + ObservedAt: started.Add(2 * time.Second), + } +} + +func assertNetworkAuditIssue(t *testing.T, issues []protocol.NetworkAuditValidationIssue, code protocol.NetworkAuditValidationCode) { + t.Helper() + for _, issue := range issues { + if issue.Code == code { + return + } + } + t.Fatalf("missing issue code %q in %#v", code, issues) +} diff --git a/protocol/pb/network_audit.pb.go b/protocol/pb/network_audit.pb.go new file mode 100644 index 0000000..f1e7877 --- /dev/null +++ b/protocol/pb/network_audit.pb.go @@ -0,0 +1,558 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.11 +// protoc (unknown) +// source: workflow_plugin_compute_core/protocol/v1/network_audit.proto + +package pb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type NetworkAuditRecord struct { + state protoimpl.MessageState `protogen:"open.v1"` + ProtocolVersion string `protobuf:"bytes,1,opt,name=protocol_version,json=protocolVersion,proto3" json:"protocol_version,omitempty"` + RecordId string `protobuf:"bytes,2,opt,name=record_id,json=recordId,proto3" json:"record_id,omitempty"` + TaskId string `protobuf:"bytes,3,opt,name=task_id,json=taskId,proto3" json:"task_id,omitempty"` + LeaseId string `protobuf:"bytes,4,opt,name=lease_id,json=leaseId,proto3" json:"lease_id,omitempty"` + WorkerId string `protobuf:"bytes,5,opt,name=worker_id,json=workerId,proto3" json:"worker_id,omitempty"` + Provider *NetworkAuditProviderEvidence `protobuf:"bytes,6,opt,name=provider,proto3" json:"provider,omitempty"` + Destination *NetworkAuditDestination `protobuf:"bytes,7,opt,name=destination,proto3" json:"destination,omitempty"` + ResourceUsage *NetworkAuditResourceUsage `protobuf:"bytes,8,opt,name=resource_usage,json=resourceUsage,proto3" json:"resource_usage,omitempty"` + Labels map[string]string `protobuf:"bytes,9,rep,name=labels,proto3" json:"labels,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + StartedAtUnixNano int64 `protobuf:"varint,10,opt,name=started_at_unix_nano,json=startedAtUnixNano,proto3" json:"started_at_unix_nano,omitempty"` + FinishedAtUnixNano int64 `protobuf:"varint,11,opt,name=finished_at_unix_nano,json=finishedAtUnixNano,proto3" json:"finished_at_unix_nano,omitempty"` + ObservedAtUnixNano int64 `protobuf:"varint,12,opt,name=observed_at_unix_nano,json=observedAtUnixNano,proto3" json:"observed_at_unix_nano,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *NetworkAuditRecord) Reset() { + *x = NetworkAuditRecord{} + mi := &file_workflow_plugin_compute_core_protocol_v1_network_audit_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *NetworkAuditRecord) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*NetworkAuditRecord) ProtoMessage() {} + +func (x *NetworkAuditRecord) ProtoReflect() protoreflect.Message { + mi := &file_workflow_plugin_compute_core_protocol_v1_network_audit_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use NetworkAuditRecord.ProtoReflect.Descriptor instead. +func (*NetworkAuditRecord) Descriptor() ([]byte, []int) { + return file_workflow_plugin_compute_core_protocol_v1_network_audit_proto_rawDescGZIP(), []int{0} +} + +func (x *NetworkAuditRecord) GetProtocolVersion() string { + if x != nil { + return x.ProtocolVersion + } + return "" +} + +func (x *NetworkAuditRecord) GetRecordId() string { + if x != nil { + return x.RecordId + } + return "" +} + +func (x *NetworkAuditRecord) GetTaskId() string { + if x != nil { + return x.TaskId + } + return "" +} + +func (x *NetworkAuditRecord) GetLeaseId() string { + if x != nil { + return x.LeaseId + } + return "" +} + +func (x *NetworkAuditRecord) GetWorkerId() string { + if x != nil { + return x.WorkerId + } + return "" +} + +func (x *NetworkAuditRecord) GetProvider() *NetworkAuditProviderEvidence { + if x != nil { + return x.Provider + } + return nil +} + +func (x *NetworkAuditRecord) GetDestination() *NetworkAuditDestination { + if x != nil { + return x.Destination + } + return nil +} + +func (x *NetworkAuditRecord) GetResourceUsage() *NetworkAuditResourceUsage { + if x != nil { + return x.ResourceUsage + } + return nil +} + +func (x *NetworkAuditRecord) GetLabels() map[string]string { + if x != nil { + return x.Labels + } + return nil +} + +func (x *NetworkAuditRecord) GetStartedAtUnixNano() int64 { + if x != nil { + return x.StartedAtUnixNano + } + return 0 +} + +func (x *NetworkAuditRecord) GetFinishedAtUnixNano() int64 { + if x != nil { + return x.FinishedAtUnixNano + } + return 0 +} + +func (x *NetworkAuditRecord) GetObservedAtUnixNano() int64 { + if x != nil { + return x.ObservedAtUnixNano + } + return 0 +} + +type NetworkAuditProviderEvidence struct { + state protoimpl.MessageState `protogen:"open.v1"` + ProviderId string `protobuf:"bytes,1,opt,name=provider_id,json=providerId,proto3" json:"provider_id,omitempty"` + PluginName string `protobuf:"bytes,2,opt,name=plugin_name,json=pluginName,proto3" json:"plugin_name,omitempty"` + PluginVersion string `protobuf:"bytes,3,opt,name=plugin_version,json=pluginVersion,proto3" json:"plugin_version,omitempty"` + ContractId string `protobuf:"bytes,4,opt,name=contract_id,json=contractId,proto3" json:"contract_id,omitempty"` + ContractVersion string `protobuf:"bytes,5,opt,name=contract_version,json=contractVersion,proto3" json:"contract_version,omitempty"` + DescriptorDigest string `protobuf:"bytes,6,opt,name=descriptor_digest,json=descriptorDigest,proto3" json:"descriptor_digest,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *NetworkAuditProviderEvidence) Reset() { + *x = NetworkAuditProviderEvidence{} + mi := &file_workflow_plugin_compute_core_protocol_v1_network_audit_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *NetworkAuditProviderEvidence) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*NetworkAuditProviderEvidence) ProtoMessage() {} + +func (x *NetworkAuditProviderEvidence) ProtoReflect() protoreflect.Message { + mi := &file_workflow_plugin_compute_core_protocol_v1_network_audit_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use NetworkAuditProviderEvidence.ProtoReflect.Descriptor instead. +func (*NetworkAuditProviderEvidence) Descriptor() ([]byte, []int) { + return file_workflow_plugin_compute_core_protocol_v1_network_audit_proto_rawDescGZIP(), []int{1} +} + +func (x *NetworkAuditProviderEvidence) GetProviderId() string { + if x != nil { + return x.ProviderId + } + return "" +} + +func (x *NetworkAuditProviderEvidence) GetPluginName() string { + if x != nil { + return x.PluginName + } + return "" +} + +func (x *NetworkAuditProviderEvidence) GetPluginVersion() string { + if x != nil { + return x.PluginVersion + } + return "" +} + +func (x *NetworkAuditProviderEvidence) GetContractId() string { + if x != nil { + return x.ContractId + } + return "" +} + +func (x *NetworkAuditProviderEvidence) GetContractVersion() string { + if x != nil { + return x.ContractVersion + } + return "" +} + +func (x *NetworkAuditProviderEvidence) GetDescriptorDigest() string { + if x != nil { + return x.DescriptorDigest + } + return "" +} + +type NetworkAuditDestination struct { + state protoimpl.MessageState `protogen:"open.v1"` + Kind string `protobuf:"bytes,1,opt,name=kind,proto3" json:"kind,omitempty"` + Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *NetworkAuditDestination) Reset() { + *x = NetworkAuditDestination{} + mi := &file_workflow_plugin_compute_core_protocol_v1_network_audit_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *NetworkAuditDestination) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*NetworkAuditDestination) ProtoMessage() {} + +func (x *NetworkAuditDestination) ProtoReflect() protoreflect.Message { + mi := &file_workflow_plugin_compute_core_protocol_v1_network_audit_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use NetworkAuditDestination.ProtoReflect.Descriptor instead. +func (*NetworkAuditDestination) Descriptor() ([]byte, []int) { + return file_workflow_plugin_compute_core_protocol_v1_network_audit_proto_rawDescGZIP(), []int{2} +} + +func (x *NetworkAuditDestination) GetKind() string { + if x != nil { + return x.Kind + } + return "" +} + +func (x *NetworkAuditDestination) GetValue() string { + if x != nil { + return x.Value + } + return "" +} + +type NetworkAuditResourceUsage struct { + state protoimpl.MessageState `protogen:"open.v1"` + CpuMillis int64 `protobuf:"varint,1,opt,name=cpu_millis,json=cpuMillis,proto3" json:"cpu_millis,omitempty"` + GpuMillis int64 `protobuf:"varint,2,opt,name=gpu_millis,json=gpuMillis,proto3" json:"gpu_millis,omitempty"` + MaxMemoryBytes int64 `protobuf:"varint,3,opt,name=max_memory_bytes,json=maxMemoryBytes,proto3" json:"max_memory_bytes,omitempty"` + NetworkRxBytes int64 `protobuf:"varint,4,opt,name=network_rx_bytes,json=networkRxBytes,proto3" json:"network_rx_bytes,omitempty"` + NetworkTxBytes int64 `protobuf:"varint,5,opt,name=network_tx_bytes,json=networkTxBytes,proto3" json:"network_tx_bytes,omitempty"` + WorkspaceBytes int64 `protobuf:"varint,6,opt,name=workspace_bytes,json=workspaceBytes,proto3" json:"workspace_bytes,omitempty"` + OutputBytes int64 `protobuf:"varint,7,opt,name=output_bytes,json=outputBytes,proto3" json:"output_bytes,omitempty"` + LimitHit string `protobuf:"bytes,8,opt,name=limit_hit,json=limitHit,proto3" json:"limit_hit,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *NetworkAuditResourceUsage) Reset() { + *x = NetworkAuditResourceUsage{} + mi := &file_workflow_plugin_compute_core_protocol_v1_network_audit_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *NetworkAuditResourceUsage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*NetworkAuditResourceUsage) ProtoMessage() {} + +func (x *NetworkAuditResourceUsage) ProtoReflect() protoreflect.Message { + mi := &file_workflow_plugin_compute_core_protocol_v1_network_audit_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use NetworkAuditResourceUsage.ProtoReflect.Descriptor instead. +func (*NetworkAuditResourceUsage) Descriptor() ([]byte, []int) { + return file_workflow_plugin_compute_core_protocol_v1_network_audit_proto_rawDescGZIP(), []int{3} +} + +func (x *NetworkAuditResourceUsage) GetCpuMillis() int64 { + if x != nil { + return x.CpuMillis + } + return 0 +} + +func (x *NetworkAuditResourceUsage) GetGpuMillis() int64 { + if x != nil { + return x.GpuMillis + } + return 0 +} + +func (x *NetworkAuditResourceUsage) GetMaxMemoryBytes() int64 { + if x != nil { + return x.MaxMemoryBytes + } + return 0 +} + +func (x *NetworkAuditResourceUsage) GetNetworkRxBytes() int64 { + if x != nil { + return x.NetworkRxBytes + } + return 0 +} + +func (x *NetworkAuditResourceUsage) GetNetworkTxBytes() int64 { + if x != nil { + return x.NetworkTxBytes + } + return 0 +} + +func (x *NetworkAuditResourceUsage) GetWorkspaceBytes() int64 { + if x != nil { + return x.WorkspaceBytes + } + return 0 +} + +func (x *NetworkAuditResourceUsage) GetOutputBytes() int64 { + if x != nil { + return x.OutputBytes + } + return 0 +} + +func (x *NetworkAuditResourceUsage) GetLimitHit() string { + if x != nil { + return x.LimitHit + } + return "" +} + +type NetworkAuditValidationIssue struct { + state protoimpl.MessageState `protogen:"open.v1"` + Code string `protobuf:"bytes,1,opt,name=code,proto3" json:"code,omitempty"` + Field string `protobuf:"bytes,2,opt,name=field,proto3" json:"field,omitempty"` + Message string `protobuf:"bytes,3,opt,name=message,proto3" json:"message,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *NetworkAuditValidationIssue) Reset() { + *x = NetworkAuditValidationIssue{} + mi := &file_workflow_plugin_compute_core_protocol_v1_network_audit_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *NetworkAuditValidationIssue) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*NetworkAuditValidationIssue) ProtoMessage() {} + +func (x *NetworkAuditValidationIssue) ProtoReflect() protoreflect.Message { + mi := &file_workflow_plugin_compute_core_protocol_v1_network_audit_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use NetworkAuditValidationIssue.ProtoReflect.Descriptor instead. +func (*NetworkAuditValidationIssue) Descriptor() ([]byte, []int) { + return file_workflow_plugin_compute_core_protocol_v1_network_audit_proto_rawDescGZIP(), []int{4} +} + +func (x *NetworkAuditValidationIssue) GetCode() string { + if x != nil { + return x.Code + } + return "" +} + +func (x *NetworkAuditValidationIssue) GetField() string { + if x != nil { + return x.Field + } + return "" +} + +func (x *NetworkAuditValidationIssue) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +var File_workflow_plugin_compute_core_protocol_v1_network_audit_proto protoreflect.FileDescriptor + +const file_workflow_plugin_compute_core_protocol_v1_network_audit_proto_rawDesc = "" + + "\n" + + " workflow_plugin_compute_core.protocol.v1.NetworkAuditProviderEvidence + 2, // 1: workflow_plugin_compute_core.protocol.v1.NetworkAuditRecord.destination:type_name -> workflow_plugin_compute_core.protocol.v1.NetworkAuditDestination + 3, // 2: workflow_plugin_compute_core.protocol.v1.NetworkAuditRecord.resource_usage:type_name -> workflow_plugin_compute_core.protocol.v1.NetworkAuditResourceUsage + 5, // 3: workflow_plugin_compute_core.protocol.v1.NetworkAuditRecord.labels:type_name -> workflow_plugin_compute_core.protocol.v1.NetworkAuditRecord.LabelsEntry + 4, // [4:4] is the sub-list for method output_type + 4, // [4:4] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name +} + +func init() { file_workflow_plugin_compute_core_protocol_v1_network_audit_proto_init() } +func file_workflow_plugin_compute_core_protocol_v1_network_audit_proto_init() { + if File_workflow_plugin_compute_core_protocol_v1_network_audit_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_workflow_plugin_compute_core_protocol_v1_network_audit_proto_rawDesc), len(file_workflow_plugin_compute_core_protocol_v1_network_audit_proto_rawDesc)), + NumEnums: 0, + NumMessages: 6, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_workflow_plugin_compute_core_protocol_v1_network_audit_proto_goTypes, + DependencyIndexes: file_workflow_plugin_compute_core_protocol_v1_network_audit_proto_depIdxs, + MessageInfos: file_workflow_plugin_compute_core_protocol_v1_network_audit_proto_msgTypes, + }.Build() + File_workflow_plugin_compute_core_protocol_v1_network_audit_proto = out.File + file_workflow_plugin_compute_core_protocol_v1_network_audit_proto_goTypes = nil + file_workflow_plugin_compute_core_protocol_v1_network_audit_proto_depIdxs = nil +} diff --git a/protocol/types.go b/protocol/types.go index f62cfe0..9b88cc9 100644 --- a/protocol/types.go +++ b/protocol/types.go @@ -1,6 +1,7 @@ package protocol import ( + "crypto/hmac" "crypto/sha256" "encoding/hex" "encoding/json" @@ -8,13 +9,22 @@ import ( "fmt" "mime" "net/netip" + "net/url" "path" "strings" "time" + + pb "github.com/GoCodeAlone/workflow-plugin-compute-core/protocol/pb" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protodesc" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/types/descriptorpb" ) const Version = "compute.v1alpha1" +const NetworkAuditProtocolVersion = Version + type NetworkOperatingMode string const ( @@ -819,6 +829,698 @@ type ResourceUsage struct { LimitHit string `json:"limit_hit,omitempty"` } +const NetworkAuditMaxLabels = 32 + +const NetworkAuditRefKeyEpoch = "network-audit-ref-v1" + +type NetworkAuditDestinationKind string + +const ( + NetworkAuditDestinationEndpoint NetworkAuditDestinationKind = "endpoint" + NetworkAuditDestinationSHA256 NetworkAuditDestinationKind = "sha256" + NetworkAuditDestinationArtifact NetworkAuditDestinationKind = "artifact" + NetworkAuditDestinationLifecycle NetworkAuditDestinationKind = "network-lifecycle" +) + +type NetworkAuditValidationCode string + +const ( + NetworkAuditValidationProtocolVersionInvalid NetworkAuditValidationCode = "protocol_version_invalid" + NetworkAuditValidationRecordIDRequired NetworkAuditValidationCode = "record_id_required" + NetworkAuditValidationDestinationRequired NetworkAuditValidationCode = "destination_required" + NetworkAuditValidationDestinationInvalid NetworkAuditValidationCode = "destination_invalid" + NetworkAuditValidationResourceUsageInvalid NetworkAuditValidationCode = "resource_usage_invalid" + NetworkAuditValidationLabelInvalid NetworkAuditValidationCode = "label_invalid" + NetworkAuditValidationLabelCountExceeded NetworkAuditValidationCode = "label_count_exceeded" + NetworkAuditValidationTimeRangeInvalid NetworkAuditValidationCode = "time_range_invalid" + NetworkAuditValidationProviderInvalid NetworkAuditValidationCode = "provider_invalid" +) + +type NetworkAuditValidationIssue struct { + Code NetworkAuditValidationCode `json:"code"` + Field string `json:"field"` + Message string `json:"message"` +} + +type NetworkAuditValidationError struct { + Issues []NetworkAuditValidationIssue +} + +func (e NetworkAuditValidationError) Error() string { + if len(e.Issues) == 0 { + return "network audit validation failed" + } + return fmt.Sprintf("network audit validation failed with %d issue(s)", len(e.Issues)) +} + +type NetworkAuditDestination struct { + Kind NetworkAuditDestinationKind `json:"kind"` + Value string `json:"value"` +} + +type NetworkAuditProviderEvidence struct { + ProviderID string `json:"provider_id,omitempty"` + PluginName string `json:"plugin_name,omitempty"` + PluginVersion string `json:"plugin_version,omitempty"` + ContractID string `json:"contract_id,omitempty"` + ContractVersion string `json:"contract_version,omitempty"` + DescriptorDigest string `json:"descriptor_digest,omitempty"` +} + +type NetworkAuditRefStability string + +const ( + NetworkAuditRefStable NetworkAuditRefStability = "stable" + NetworkAuditRefEphemeral NetworkAuditRefStability = "ephemeral" +) + +type NetworkAuditRefOptions struct { + Stability NetworkAuditRefStability + Timestamp time.Time +} + +type NetworkAuditRefProjection struct { + Ref string `json:"ref"` + Epoch string `json:"epoch"` + Stability NetworkAuditRefStability `json:"stability"` + Timestamp string `json:"timestamp"` +} + +type NetworkAuditRefProjector struct { + key []byte +} + +type NetworkAuditRecord struct { + ProtocolVersion string `json:"protocol_version"` + RecordID string `json:"record_id"` + TaskID string `json:"task_id,omitempty"` + LeaseID string `json:"lease_id,omitempty"` + WorkerID string `json:"worker_id,omitempty"` + Provider NetworkAuditProviderEvidence `json:"provider,omitempty"` + Destination NetworkAuditDestination `json:"destination"` + ResourceUsage ResourceUsage `json:"resource_usage,omitempty"` + Labels map[string]string `json:"labels,omitempty"` + StartedAt time.Time `json:"started_at,omitempty"` + FinishedAt time.Time `json:"finished_at,omitempty"` + ObservedAt time.Time `json:"observed_at,omitempty"` +} + +func ProjectNetworkAuditDestination(raw string) (NetworkAuditDestination, []NetworkAuditValidationIssue) { + value := strings.TrimSpace(raw) + var kind NetworkAuditDestinationKind + switch { + case strings.HasPrefix(value, "sha256:"): + kind = NetworkAuditDestinationSHA256 + case strings.HasPrefix(value, "artifact://"): + kind = NetworkAuditDestinationArtifact + case strings.HasPrefix(value, "network-lifecycle://"): + kind = NetworkAuditDestinationLifecycle + default: + kind = NetworkAuditDestinationEndpoint + } + destination := NetworkAuditDestination{Kind: kind, Value: value} + return destination, destination.validateNetworkAudit() +} + +func ProjectNetworkAuditLifecycle(leaseID, event string) (NetworkAuditDestination, []NetworkAuditValidationIssue) { + destination, err := NewNetworkAuditLifecycleDestination(leaseID, event) + if err != nil { + return NetworkAuditDestination{}, []NetworkAuditValidationIssue{ + networkAuditIssue(NetworkAuditValidationDestinationInvalid, "destination", "lifecycle destination is invalid"), + } + } + return destination, nil +} + +func NewNetworkAuditLifecycleDestination(leaseID, event string) (NetworkAuditDestination, error) { + if err := validateIdentifier("lease_id", leaseID); err != nil { + return NetworkAuditDestination{}, err + } + if err := validateIdentifier("event", event); err != nil { + return NetworkAuditDestination{}, err + } + destination := NetworkAuditDestination{ + Kind: NetworkAuditDestinationLifecycle, + Value: "network-lifecycle://" + leaseID + "/" + event, + } + if issues := destination.validateNetworkAudit(); len(issues) != 0 { + return NetworkAuditDestination{}, NetworkAuditValidationError{Issues: issues} + } + return destination, nil +} + +func ProjectNetworkAuditLabels(labels map[string]string) (map[string]string, []NetworkAuditValidationIssue) { + projected := copyStringMap(labels) + return projected, validateNetworkAuditLabels(projected) +} + +func ProjectNetworkAuditProvider(providerID, pluginName, pluginVersion, contractID, contractVersion, descriptorDigest string) (NetworkAuditProviderEvidence, []NetworkAuditValidationIssue) { + provider := NetworkAuditProviderEvidence{ + ProviderID: strings.TrimSpace(providerID), + PluginName: strings.TrimSpace(pluginName), + PluginVersion: strings.TrimSpace(pluginVersion), + ContractID: strings.TrimSpace(contractID), + ContractVersion: strings.TrimSpace(contractVersion), + DescriptorDigest: strings.TrimSpace(descriptorDigest), + } + return provider, provider.validateNetworkAudit() +} + +func ProjectNetworkAuditID(components ...string) (string, error) { + if len(components) == 0 { + return "", errors.New("at least one network audit id component is required") + } + hash := sha256.New() + for _, component := range components { + if strings.TrimSpace(component) == "" || strings.ContainsAny(component, "\x00\r\n\t") { + return "", errors.New("network audit id component is invalid") + } + fmt.Fprintf(hash, "%d:", len(component)) + _, _ = hash.Write([]byte(component)) + _, _ = hash.Write([]byte{0}) + } + return "network-audit-sha256-" + hex.EncodeToString(hash.Sum(nil)), nil +} + +func NewNetworkAuditRefProjector(key []byte) (NetworkAuditRefProjector, error) { + if len(key) < 32 { + return NetworkAuditRefProjector{}, errors.New("network audit ref key must be at least 32 bytes") + } + copied := make([]byte, len(key)) + copy(copied, key) + return NetworkAuditRefProjector{key: copied}, nil +} + +func (p NetworkAuditRefProjector) Project(record NetworkAuditRecord, options NetworkAuditRefOptions) (NetworkAuditRefProjection, error) { + if len(p.key) < 32 { + return NetworkAuditRefProjection{}, errors.New("network audit ref projector key is invalid") + } + if err := record.Validate(); err != nil { + return NetworkAuditRefProjection{}, err + } + stability := options.Stability + if stability == "" { + stability = NetworkAuditRefStable + } + switch stability { + case NetworkAuditRefStable, NetworkAuditRefEphemeral: + default: + return NetworkAuditRefProjection{}, errors.New("network audit ref stability is invalid") + } + timestamp := options.Timestamp + if timestamp.IsZero() { + timestamp = record.StartedAt + } + if timestamp.IsZero() { + timestamp = record.ObservedAt + } + if timestamp.IsZero() { + timestamp = record.FinishedAt + } + normalizedTimestamp := timestamp.UTC().Format(time.RFC3339Nano) + normalizedRecord := record + normalizedRecord.StartedAt = normalizedRecord.StartedAt.UTC() + normalizedRecord.FinishedAt = normalizedRecord.FinishedAt.UTC() + normalizedRecord.ObservedAt = normalizedRecord.ObservedAt.UTC() + recordDigest := CanonicalHash(normalizedRecord) + input := strings.Join([]string{ + NetworkAuditRefKeyEpoch, + string(stability), + recordDigest, + normalizedTimestamp, + "", + }, "\n") + mac := hmac.New(sha256.New, p.key) + _, _ = mac.Write([]byte(input)) + ref := NetworkAuditRefKeyEpoch + ":" + string(stability) + ":" + hex.EncodeToString(mac.Sum(nil)) + return NetworkAuditRefProjection{ + Ref: ref, + Epoch: NetworkAuditRefKeyEpoch, + Stability: stability, + Timestamp: normalizedTimestamp, + }, nil +} + +func ClassifyLegacyNetworkAuditRecord(record map[string]any) []NetworkAuditValidationIssue { + var findings []NetworkAuditValidationIssue + if rawDestination, ok := record["destination"].(string); ok { + _, issues := ProjectNetworkAuditDestination(rawDestination) + findings = append(findings, issues...) + } + if rawLabels, ok := record["labels"].(map[string]any); ok { + labels := make(map[string]string, len(rawLabels)) + for key, value := range rawLabels { + stringValue, ok := value.(string) + if !ok { + findings = append(findings, networkAuditIssue(NetworkAuditValidationLabelInvalid, "labels", "label value is invalid")) + continue + } + labels[key] = stringValue + } + _, issues := ProjectNetworkAuditLabels(labels) + findings = append(findings, issues...) + } + if rawUsage, ok := record["resource_usage"].(map[string]any); ok { + usage := ResourceUsage{ + CPUMillis: legacyInt64(rawUsage["cpu_millis"]), + GPUMillis: legacyInt64(rawUsage["gpu_millis"]), + MaxMemoryBytes: legacyInt64(rawUsage["max_memory_bytes"]), + NetworkRxBytes: legacyInt64(rawUsage["network_rx_bytes"]), + NetworkTxBytes: legacyInt64(rawUsage["network_tx_bytes"]), + WorkspaceBytes: legacyInt64(rawUsage["workspace_bytes"]), + OutputBytes: legacyInt64(rawUsage["output_bytes"]), + } + if err := validateNetworkAuditResourceUsage(usage); err != nil { + findings = append(findings, networkAuditIssue(NetworkAuditValidationResourceUsageInvalid, "resource_usage", "resource usage contains invalid counters")) + } + } + return findings +} + +func legacyInt64(value any) int64 { + switch typed := value.(type) { + case int: + return int64(typed) + case int8: + return int64(typed) + case int16: + return int64(typed) + case int32: + return int64(typed) + case int64: + return typed + case float64: + return int64(typed) + case json.Number: + converted, _ := typed.Int64() + return converted + default: + return 0 + } +} + +func (r NetworkAuditRecord) Validate() error { + issues := r.ValidateNetworkAudit() + if len(issues) == 0 { + return nil + } + return NetworkAuditValidationError{Issues: issues} +} + +func (r NetworkAuditRecord) ValidateNetworkAudit() []NetworkAuditValidationIssue { + var issues []NetworkAuditValidationIssue + if r.ProtocolVersion != NetworkAuditProtocolVersion { + issues = append(issues, networkAuditIssue(NetworkAuditValidationProtocolVersionInvalid, "protocol_version", "protocol version is not supported")) + } + if err := validateIdentifier("record_id", r.RecordID); err != nil { + issues = append(issues, networkAuditIssue(NetworkAuditValidationRecordIDRequired, "record_id", "record id is required")) + } + for _, field := range []struct { + name string + value string + }{ + {"task_id", r.TaskID}, + {"lease_id", r.LeaseID}, + {"worker_id", r.WorkerID}, + } { + if field.value != "" { + if err := validateIdentifier(field.name, field.value); err != nil { + issues = append(issues, networkAuditIssue(NetworkAuditValidationRecordIDRequired, field.name, "identifier is invalid")) + } + } + } + issues = append(issues, r.Provider.validateNetworkAudit()...) + issues = append(issues, r.Destination.validateNetworkAudit()...) + if err := validateNetworkAuditResourceUsage(r.ResourceUsage); err != nil { + issues = append(issues, networkAuditIssue(NetworkAuditValidationResourceUsageInvalid, "resource_usage", "resource usage contains invalid counters")) + } + issues = append(issues, validateNetworkAuditLabels(r.Labels)...) + if !r.StartedAt.IsZero() && !r.FinishedAt.IsZero() && r.FinishedAt.Before(r.StartedAt) { + issues = append(issues, networkAuditIssue(NetworkAuditValidationTimeRangeInvalid, "finished_at", "finish time must not precede start time")) + } + if !r.FinishedAt.IsZero() && !r.ObservedAt.IsZero() && r.ObservedAt.Before(r.FinishedAt) { + issues = append(issues, networkAuditIssue(NetworkAuditValidationTimeRangeInvalid, "observed_at", "observed time must not precede finish time")) + } + return issues +} + +func (p NetworkAuditProviderEvidence) validateNetworkAudit() []NetworkAuditValidationIssue { + var issues []NetworkAuditValidationIssue + for _, field := range []struct { + name string + value string + }{ + {"provider_id", p.ProviderID}, + {"plugin_name", p.PluginName}, + {"plugin_version", p.PluginVersion}, + {"contract_id", p.ContractID}, + {"contract_version", p.ContractVersion}, + } { + if field.value != "" { + if err := validateIdentifier(field.name, field.value); err != nil { + issues = append(issues, networkAuditIssue(NetworkAuditValidationProviderInvalid, field.name, "provider evidence identifier is invalid")) + } + } + } + if p.DescriptorDigest != "" && !validSHA256Digest(p.DescriptorDigest) { + issues = append(issues, networkAuditIssue(NetworkAuditValidationProviderInvalid, "descriptor_digest", "descriptor digest is invalid")) + } + return issues +} + +func (d NetworkAuditDestination) validateNetworkAudit() []NetworkAuditValidationIssue { + if d.Kind == "" && strings.TrimSpace(d.Value) == "" { + return []NetworkAuditValidationIssue{ + networkAuditIssue(NetworkAuditValidationDestinationRequired, "destination", "destination is required"), + } + } + if !validNetworkAuditDestination(d) { + return []NetworkAuditValidationIssue{ + networkAuditIssue(NetworkAuditValidationDestinationInvalid, "destination", "destination is invalid"), + } + } + return nil +} + +func validNetworkAuditDestination(d NetworkAuditDestination) bool { + value := strings.TrimSpace(d.Value) + if value == "" || value != d.Value || strings.ContainsAny(value, " \t\r\n\x00") { + return false + } + switch d.Kind { + case NetworkAuditDestinationEndpoint: + return validNetworkAuditEndpoint(value) + case NetworkAuditDestinationSHA256: + return validSHA256Ref(value) + case NetworkAuditDestinationArtifact: + return validateScopedRef("destination", value, "artifact://") == nil && !strings.ContainsAny(value, "\t\r\n\x00") + case NetworkAuditDestinationLifecycle: + return validNetworkLifecycleRef(value) + default: + return false + } +} + +func validNetworkAuditEndpoint(value string) bool { + parsed, err := url.Parse(value) + if err != nil { + return false + } + switch parsed.Scheme { + case "http", "https", "tcp", "udp": + default: + return false + } + return parsed.Host != "" && + parsed.User == nil && + parsed.RawQuery == "" && + parsed.Fragment == "" && + !strings.Contains(parsed.Path, "..") +} + +func validNetworkLifecycleRef(value string) bool { + if !strings.HasPrefix(value, "network-lifecycle://") { + return false + } + parsed, err := url.Parse(value) + if err != nil { + return false + } + return parsed.Scheme == "network-lifecycle" && + parsed.Host != "" && + parsed.User == nil && + parsed.RawQuery == "" && + parsed.Fragment == "" && + parsed.Path != "" && + !strings.Contains(parsed.Path, "..") && + validateIdentifier("lease_id", parsed.Host) == nil && + validateIdentifier("event", strings.TrimPrefix(parsed.Path, "/")) == nil +} + +func validateNetworkAuditResourceUsage(usage ResourceUsage) error { + var errs []error + for _, field := range []struct { + name string + value int64 + }{ + {"cpu_millis", usage.CPUMillis}, + {"gpu_millis", usage.GPUMillis}, + {"max_memory_bytes", usage.MaxMemoryBytes}, + {"network_rx_bytes", usage.NetworkRxBytes}, + {"network_tx_bytes", usage.NetworkTxBytes}, + {"workspace_bytes", usage.WorkspaceBytes}, + {"output_bytes", usage.OutputBytes}, + } { + if field.value < 0 { + errs = append(errs, fmt.Errorf("%s cannot be negative", field.name)) + } + } + if usage.LimitHit != "" && (strings.TrimSpace(usage.LimitHit) != usage.LimitHit || strings.ContainsAny(usage.LimitHit, " \t\r\n/:?&#\x00")) { + errs = append(errs, errors.New("limit_hit is invalid")) + } + return errors.Join(errs...) +} + +func validateNetworkAuditLabels(labels map[string]string) []NetworkAuditValidationIssue { + var issues []NetworkAuditValidationIssue + if len(labels) > NetworkAuditMaxLabels { + issues = append(issues, networkAuditIssue(NetworkAuditValidationLabelCountExceeded, "labels", "label count exceeds limit")) + } + for key, value := range labels { + if err := validateIdentifier("label", key); err != nil { + issues = append(issues, networkAuditIssue(NetworkAuditValidationLabelInvalid, "labels", "label key is invalid")) + } + if value == "" || strings.TrimSpace(value) != value || strings.ContainsAny(value, "\t\r\n\x00") { + issues = append(issues, networkAuditIssue(NetworkAuditValidationLabelInvalid, "labels", "label value is invalid")) + } + } + return issues +} + +func networkAuditIssue(code NetworkAuditValidationCode, field, message string) NetworkAuditValidationIssue { + return NetworkAuditValidationIssue{ + Code: code, + Field: field, + Message: message, + } +} + +func (r NetworkAuditRecord) ToProto() *pb.NetworkAuditRecord { + return &pb.NetworkAuditRecord{ + ProtocolVersion: r.ProtocolVersion, + RecordId: r.RecordID, + TaskId: r.TaskID, + LeaseId: r.LeaseID, + WorkerId: r.WorkerID, + Provider: r.Provider.toProto(), + Destination: r.Destination.toProto(), + ResourceUsage: networkAuditResourceUsageToProto(r.ResourceUsage), + Labels: copyStringMap(r.Labels), + StartedAtUnixNano: unixNanoOrZero(r.StartedAt), + FinishedAtUnixNano: unixNanoOrZero(r.FinishedAt), + ObservedAtUnixNano: unixNanoOrZero(r.ObservedAt), + } +} + +func NetworkAuditRecordFromProto(message *pb.NetworkAuditRecord) (NetworkAuditRecord, error) { + if message == nil { + return NetworkAuditRecord{}, NetworkAuditValidationError{Issues: []NetworkAuditValidationIssue{ + networkAuditIssue(NetworkAuditValidationRecordIDRequired, "record", "record is required"), + }} + } + if message.GetStartedAtUnixNano() < 0 || message.GetFinishedAtUnixNano() < 0 || message.GetObservedAtUnixNano() < 0 { + return NetworkAuditRecord{}, NetworkAuditValidationError{Issues: []NetworkAuditValidationIssue{ + networkAuditIssue(NetworkAuditValidationTimeRangeInvalid, "timestamp", "timestamp must not be negative"), + }} + } + record := NetworkAuditRecord{ + ProtocolVersion: message.GetProtocolVersion(), + RecordID: message.GetRecordId(), + TaskID: message.GetTaskId(), + LeaseID: message.GetLeaseId(), + WorkerID: message.GetWorkerId(), + Provider: networkAuditProviderFromProto(message.GetProvider()), + Destination: networkAuditDestinationFromProto(message.GetDestination()), + ResourceUsage: networkAuditResourceUsageFromProto(message.GetResourceUsage()), + Labels: copyStringMap(message.GetLabels()), + StartedAt: timeFromUnixNano(message.GetStartedAtUnixNano()), + FinishedAt: timeFromUnixNano(message.GetFinishedAtUnixNano()), + ObservedAt: timeFromUnixNano(message.GetObservedAtUnixNano()), + } + if err := record.Validate(); err != nil { + return NetworkAuditRecord{}, err + } + return record, nil +} + +func UnmarshalNetworkAuditRecordProtoStrict(data []byte) (*pb.NetworkAuditRecord, error) { + var message pb.NetworkAuditRecord + if err := (proto.UnmarshalOptions{DiscardUnknown: false}).Unmarshal(data, &message); err != nil { + return nil, err + } + if err := rejectUnknownProtoFields(&message); err != nil { + return nil, err + } + return &message, nil +} + +func rejectUnknownProtoFields(message proto.Message) error { + if message == nil { + return nil + } + reflected := message.ProtoReflect() + if len(reflected.GetUnknown()) != 0 { + return fmt.Errorf("proto message %s contains unknown fields", reflected.Descriptor().FullName()) + } + var err error + reflected.Range(func(field protoreflect.FieldDescriptor, value protoreflect.Value) bool { + if field.IsMap() { + if field.MapValue().Message() == nil { + return true + } + value.Map().Range(func(_ protoreflect.MapKey, mapValue protoreflect.Value) bool { + if nestedErr := rejectUnknownProtoFields(mapValue.Message().Interface()); nestedErr != nil { + err = nestedErr + return false + } + return true + }) + return err == nil + } + if field.IsList() && field.Message() != nil { + list := value.List() + for i := range list.Len() { + if nestedErr := rejectUnknownProtoFields(list.Get(i).Message().Interface()); nestedErr != nil { + err = nestedErr + return false + } + } + return true + } + if field.Message() != nil { + if nestedErr := rejectUnknownProtoFields(value.Message().Interface()); nestedErr != nil { + err = nestedErr + return false + } + } + return true + }) + return err +} + +func (p NetworkAuditProviderEvidence) toProto() *pb.NetworkAuditProviderEvidence { + return &pb.NetworkAuditProviderEvidence{ + ProviderId: p.ProviderID, + PluginName: p.PluginName, + PluginVersion: p.PluginVersion, + ContractId: p.ContractID, + ContractVersion: p.ContractVersion, + DescriptorDigest: p.DescriptorDigest, + } +} + +func networkAuditProviderFromProto(message *pb.NetworkAuditProviderEvidence) NetworkAuditProviderEvidence { + if message == nil { + return NetworkAuditProviderEvidence{} + } + return NetworkAuditProviderEvidence{ + ProviderID: message.GetProviderId(), + PluginName: message.GetPluginName(), + PluginVersion: message.GetPluginVersion(), + ContractID: message.GetContractId(), + ContractVersion: message.GetContractVersion(), + DescriptorDigest: message.GetDescriptorDigest(), + } +} + +func (d NetworkAuditDestination) toProto() *pb.NetworkAuditDestination { + return &pb.NetworkAuditDestination{ + Kind: string(d.Kind), + Value: d.Value, + } +} + +func networkAuditDestinationFromProto(message *pb.NetworkAuditDestination) NetworkAuditDestination { + if message == nil { + return NetworkAuditDestination{} + } + return NetworkAuditDestination{ + Kind: NetworkAuditDestinationKind(message.GetKind()), + Value: message.GetValue(), + } +} + +func networkAuditResourceUsageToProto(usage ResourceUsage) *pb.NetworkAuditResourceUsage { + return &pb.NetworkAuditResourceUsage{ + CpuMillis: usage.CPUMillis, + GpuMillis: usage.GPUMillis, + MaxMemoryBytes: usage.MaxMemoryBytes, + NetworkRxBytes: usage.NetworkRxBytes, + NetworkTxBytes: usage.NetworkTxBytes, + WorkspaceBytes: usage.WorkspaceBytes, + OutputBytes: usage.OutputBytes, + LimitHit: usage.LimitHit, + } +} + +func networkAuditResourceUsageFromProto(message *pb.NetworkAuditResourceUsage) ResourceUsage { + if message == nil { + return ResourceUsage{} + } + return ResourceUsage{ + CPUMillis: message.GetCpuMillis(), + GPUMillis: message.GetGpuMillis(), + MaxMemoryBytes: message.GetMaxMemoryBytes(), + NetworkRxBytes: message.GetNetworkRxBytes(), + NetworkTxBytes: message.GetNetworkTxBytes(), + WorkspaceBytes: message.GetWorkspaceBytes(), + OutputBytes: message.GetOutputBytes(), + LimitHit: message.GetLimitHit(), + } +} + +func unixNanoOrZero(value time.Time) int64 { + if value.IsZero() { + return 0 + } + return value.UTC().UnixNano() +} + +func timeFromUnixNano(value int64) time.Time { + if value == 0 { + return time.Time{} + } + return time.Unix(0, value).UTC() +} + +func copyStringMap(input map[string]string) map[string]string { + if len(input) == 0 { + return nil + } + output := make(map[string]string, len(input)) + for key, value := range input { + output[key] = value + } + return output +} + +func NetworkAuditDescriptorSet() *descriptorpb.FileDescriptorSet { + return &descriptorpb.FileDescriptorSet{ + File: []*descriptorpb.FileDescriptorProto{ + protodesc.ToFileDescriptorProto(pb.File_workflow_plugin_compute_core_protocol_v1_network_audit_proto), + }, + } +} + +func NetworkAuditDescriptorDigest() string { + data, err := (proto.MarshalOptions{Deterministic: true}).Marshal(NetworkAuditDescriptorSet()) + if err != nil { + return CanonicalHash(pb.File_workflow_plugin_compute_core_protocol_v1_network_audit_proto.FullName()) + } + sum := sha256.Sum256(data) + return "sha256:" + hex.EncodeToString(sum[:]) +} + type ResourceLimits struct { CPUPercent int `json:"cpu_percent,omitempty"` MemoryBytes int64 `json:"memory_bytes,omitempty"` diff --git a/scripts/check-proto.sh b/scripts/check-proto.sh new file mode 100755 index 0000000..36000f7 --- /dev/null +++ b/scripts/check-proto.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash +set -euo pipefail + +cd "$(dirname "${BASH_SOURCE[0]}")/.." + +buf lint +./scripts/generate-proto.sh +git diff --exit-code -- proto protocol/pb descriptors diff --git a/scripts/check-wfctl-action-pin.sh b/scripts/check-wfctl-action-pin.sh new file mode 100755 index 0000000..e152c0c --- /dev/null +++ b/scripts/check-wfctl-action-pin.sh @@ -0,0 +1,53 @@ +#!/usr/bin/env bash +set -euo pipefail + +cd "$(dirname "${BASH_SOURCE[0]}")/.." + +wfctl_version="" +workflows=() + +while [[ $# -gt 0 ]]; do + case "$1" in + --workflow) + workflows+=("${2:?missing --workflow value}") + shift 2 + ;; + --wfctl-version) + wfctl_version="${2:?missing --wfctl-version value}" + shift 2 + ;; + *) + echo "unknown argument: $1" >&2 + exit 2 + ;; + esac +done + +if [[ -z "$wfctl_version" || "${#workflows[@]}" -eq 0 ]]; then + echo "--workflow and --wfctl-version are required" >&2 + exit 2 +fi + +for workflow in "${workflows[@]}"; do + if ! [[ -f "$workflow" ]]; then + echo "$workflow: workflow file not found" >&2 + exit 1 + fi + if rg -n 'GoCodeAlone/setup-wfctl@v[0-9]+\b|GoCodeAlone/setup-wfctl@main\b' "$workflow"; then + echo "$workflow: setup-wfctl must be pinned to an immutable commit SHA" >&2 + exit 1 + fi + if ! rg -n 'GoCodeAlone/setup-wfctl@[0-9a-f]{40}\b' "$workflow" >/dev/null; then + echo "$workflow: missing immutable setup-wfctl commit pin" >&2 + exit 1 + fi + if rg -n "version:[[:space:]]*${wfctl_version}\\b" "$workflow" >/dev/null; then + continue + fi + if rg -n 'wfctl_version:' "$workflow" >/dev/null && + rg -n 'version:[[:space:]]*\$\{\{ inputs\.wfctl_version \}\}' "$workflow" >/dev/null; then + continue + fi + echo "$workflow: missing wfctl version ${wfctl_version}" >&2 + exit 1 +done diff --git a/scripts/check-workflow-engine-load.sh b/scripts/check-workflow-engine-load.sh new file mode 100755 index 0000000..5d50828 --- /dev/null +++ b/scripts/check-workflow-engine-load.sh @@ -0,0 +1,49 @@ +#!/usr/bin/env bash +set -euo pipefail + +cd "$(dirname "${BASH_SOURCE[0]}")/.." + +mode="public" +wfctl_version="" + +while [[ $# -gt 0 ]]; do + case "$1" in + --mode) + mode="${2:?missing --mode value}" + shift 2 + ;; + --wfctl-version) + wfctl_version="${2:?missing --wfctl-version value}" + shift 2 + ;; + *) + echo "unknown argument: $1" >&2 + exit 2 + ;; + esac +done + +if [[ -z "$wfctl_version" ]]; then + echo "--wfctl-version is required" >&2 + exit 2 +fi + +if [[ "$mode" == "public" && -n "${WORKFLOW_REPO:-}" ]]; then + echo "public mode must not use WORKFLOW_REPO" >&2 + exit 1 +fi + +run_wfctl() { + if [[ "$mode" == "public" ]]; then + GOWORK=off go run "github.com/GoCodeAlone/workflow/cmd/wfctl@${wfctl_version}" "$@" + return + fi + if command -v wfctl >/dev/null 2>&1 && [[ "$(wfctl version 2>/dev/null | tr -d '[:space:]')" == "$wfctl_version" ]]; then + wfctl "$@" + return + fi + GOWORK=off go run "github.com/GoCodeAlone/workflow/cmd/wfctl@${wfctl_version}" "$@" +} + +GOWORK=off go test ./protocol -run 'NetworkAuditStaticMessageContractMetadata' -count=1 +run_wfctl plugin validate-contract --require-contract-kind message . diff --git a/scripts/generate-proto.sh b/scripts/generate-proto.sh new file mode 100755 index 0000000..c068bb2 --- /dev/null +++ b/scripts/generate-proto.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash +set -euo pipefail + +cd "$(dirname "${BASH_SOURCE[0]}")/.." + +buf generate +mkdir -p descriptors +protoc -I proto --descriptor_set_out=descriptors/network_audit.pb workflow_plugin_compute_core/protocol/v1/network_audit.proto diff --git a/tools.go b/tools.go new file mode 100644 index 0000000..c15ff97 --- /dev/null +++ b/tools.go @@ -0,0 +1,5 @@ +//go:build tools + +package tools + +import _ "google.golang.org/protobuf/cmd/protoc-gen-go" From 3d79e1b65188cc44b0785ec3965c6860d5e12eae Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Wed, 27 May 2026 03:41:12 -0400 Subject: [PATCH 2/6] ci: validate explicit release candidate refs --- .github/workflows/release-candidate.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.github/workflows/release-candidate.yml b/.github/workflows/release-candidate.yml index d075b34..498d554 100644 --- a/.github/workflows/release-candidate.yml +++ b/.github/workflows/release-candidate.yml @@ -2,6 +2,10 @@ name: Release Candidate on: workflow_dispatch: inputs: + candidate_ref: + description: Branch, tag, or commit SHA to validate + required: true + type: string wfctl_version: description: wfctl version used for contract validation required: true @@ -18,6 +22,7 @@ jobs: steps: - uses: actions/checkout@v6 with: + ref: ${{ inputs.candidate_ref }} fetch-depth: 0 - uses: actions/setup-go@v6 with: From 546161bfb5c2b746e8cd7f8c79eb2161da87e971 Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Wed, 27 May 2026 03:43:11 -0400 Subject: [PATCH 3/6] fix: remove rg dependency from pin check --- scripts/check-wfctl-action-pin.sh | 10 +++---- scripts/check_wfctl_action_pin_test.go | 39 ++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 5 deletions(-) create mode 100644 scripts/check_wfctl_action_pin_test.go diff --git a/scripts/check-wfctl-action-pin.sh b/scripts/check-wfctl-action-pin.sh index e152c0c..dd5700b 100755 --- a/scripts/check-wfctl-action-pin.sh +++ b/scripts/check-wfctl-action-pin.sh @@ -33,19 +33,19 @@ for workflow in "${workflows[@]}"; do echo "$workflow: workflow file not found" >&2 exit 1 fi - if rg -n 'GoCodeAlone/setup-wfctl@v[0-9]+\b|GoCodeAlone/setup-wfctl@main\b' "$workflow"; then + if grep -En 'GoCodeAlone/setup-wfctl@v[0-9]+([^[:alnum:]_]|$)|GoCodeAlone/setup-wfctl@main([^[:alnum:]_]|$)' "$workflow"; then echo "$workflow: setup-wfctl must be pinned to an immutable commit SHA" >&2 exit 1 fi - if ! rg -n 'GoCodeAlone/setup-wfctl@[0-9a-f]{40}\b' "$workflow" >/dev/null; then + if ! grep -Eq 'GoCodeAlone/setup-wfctl@[0-9a-f]{40}([^0-9a-f]|$)' "$workflow"; then echo "$workflow: missing immutable setup-wfctl commit pin" >&2 exit 1 fi - if rg -n "version:[[:space:]]*${wfctl_version}\\b" "$workflow" >/dev/null; then + if grep -Eq "version:[[:space:]]*${wfctl_version}([^[:alnum:]_]|$)" "$workflow"; then continue fi - if rg -n 'wfctl_version:' "$workflow" >/dev/null && - rg -n 'version:[[:space:]]*\$\{\{ inputs\.wfctl_version \}\}' "$workflow" >/dev/null; then + if grep -Eq 'wfctl_version:' "$workflow" && + grep -Eq 'version:[[:space:]]*\$\{\{ inputs\.wfctl_version \}\}' "$workflow"; then continue fi echo "$workflow: missing wfctl version ${wfctl_version}" >&2 diff --git a/scripts/check_wfctl_action_pin_test.go b/scripts/check_wfctl_action_pin_test.go new file mode 100644 index 0000000..1433f17 --- /dev/null +++ b/scripts/check_wfctl_action_pin_test.go @@ -0,0 +1,39 @@ +package scripts_test + +import ( + "os" + "os/exec" + "path/filepath" + "runtime" + "testing" +) + +func TestCheckWfctlActionPinDoesNotRequireRipgrep(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("shell scripts are not exercised on Windows") + } + + binDir := t.TempDir() + for _, name := range []string{"bash", "dirname", "grep"} { + path, err := exec.LookPath(name) + if err != nil { + t.Fatalf("find %s: %v", name, err) + } + if err := os.Symlink(path, filepath.Join(binDir, name)); err != nil { + t.Fatalf("link %s: %v", name, err) + } + } + + cmd := exec.Command( + "./check-wfctl-action-pin.sh", + "--workflow", ".github/workflows/ci.yml", + "--workflow", ".github/workflows/release.yml", + "--workflow", ".github/workflows/release-candidate.yml", + "--wfctl-version", "v0.64.7", + ) + cmd.Env = append(os.Environ(), "PATH="+binDir) + out, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("check-wfctl-action-pin.sh should not require rg: %v\n%s", err, out) + } +} From 1844768a221ddf5317c8a49c33eea1b9b82ead5f Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Wed, 27 May 2026 03:44:18 -0400 Subject: [PATCH 4/6] ci: make proto checks self contained --- scripts/generate-proto.sh | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/scripts/generate-proto.sh b/scripts/generate-proto.sh index c068bb2..1fc84e6 100755 --- a/scripts/generate-proto.sh +++ b/scripts/generate-proto.sh @@ -3,6 +3,38 @@ set -euo pipefail cd "$(dirname "${BASH_SOURCE[0]}")/.." +export PATH="${PATH}:${HOME}/go/bin" + +if ! command -v buf >/dev/null 2>&1; then + GOWORK=off go install github.com/bufbuild/buf/cmd/buf@v1.47.2 +fi + +if ! command -v protoc-gen-go >/dev/null 2>&1; then + GOWORK=off go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.36.11 +fi + buf generate mkdir -p descriptors -protoc -I proto --descriptor_set_out=descriptors/network_audit.pb workflow_plugin_compute_core/protocol/v1/network_audit.proto +tmpdir="$(mktemp -d)" +trap 'rm -rf "$tmpdir"' EXIT +cat > "$tmpdir/write-network-audit-descriptor.go" <<'GO' +package main + +import ( + "os" + + "github.com/GoCodeAlone/workflow-plugin-compute-core/protocol" + "google.golang.org/protobuf/proto" +) + +func main() { + data, err := (proto.MarshalOptions{Deterministic: true}).Marshal(protocol.NetworkAuditDescriptorSet()) + if err != nil { + panic(err) + } + if err := os.WriteFile("descriptors/network_audit.pb", data, 0o644); err != nil { + panic(err) + } +} +GO +GOWORK=off go run "$tmpdir/write-network-audit-descriptor.go" From 554ca3e689d3fe0c050601198d0042038bceabb7 Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Wed, 27 May 2026 03:45:27 -0400 Subject: [PATCH 5/6] ci: bootstrap proto tools before lint --- scripts/check-proto.sh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scripts/check-proto.sh b/scripts/check-proto.sh index 36000f7..96e00f7 100755 --- a/scripts/check-proto.sh +++ b/scripts/check-proto.sh @@ -3,6 +3,7 @@ set -euo pipefail cd "$(dirname "${BASH_SOURCE[0]}")/.." -buf lint ./scripts/generate-proto.sh +export PATH="${PATH}:${HOME}/go/bin" +buf lint git diff --exit-code -- proto protocol/pb descriptors From 5104c1fdac2eaf473cb5d35d787421f1a3cee542 Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Wed, 27 May 2026 03:50:37 -0400 Subject: [PATCH 6/6] ci: resolve exact workflow runs --- scripts/resolve-gh-run-for-ref.sh | 62 +++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100755 scripts/resolve-gh-run-for-ref.sh diff --git a/scripts/resolve-gh-run-for-ref.sh b/scripts/resolve-gh-run-for-ref.sh new file mode 100755 index 0000000..dbe4f34 --- /dev/null +++ b/scripts/resolve-gh-run-for-ref.sh @@ -0,0 +1,62 @@ +#!/usr/bin/env bash +set -euo pipefail + +workflow="" +commit="" +event="" +branch="" +created_after="" + +while [[ $# -gt 0 ]]; do + case "$1" in + --workflow) + workflow="${2:?missing --workflow value}" + shift 2 + ;; + --commit) + commit="${2:?missing --commit value}" + shift 2 + ;; + --event) + event="${2:?missing --event value}" + shift 2 + ;; + --branch) + branch="${2:?missing --branch value}" + shift 2 + ;; + --created-after) + created_after="${2:?missing --created-after value}" + shift 2 + ;; + *) + echo "unknown argument: $1" >&2 + exit 2 + ;; + esac +done + +if [[ -z "$workflow" || -z "$commit" || -z "$event" || -z "$branch" || -z "$created_after" ]]; then + echo "--workflow, --commit, --event, --branch, and --created-after are required" >&2 + exit 2 +fi + +runs="$( + gh run list \ + --workflow "$workflow" \ + --branch "$branch" \ + --event "$event" \ + --created ">=$created_after" \ + --limit 20 \ + --json databaseId,headSha,createdAt \ + | jq --arg commit "$commit" '[.[] | select(.headSha == $commit)]' +)" + +count="$(jq 'length' <<<"$runs")" +if [[ "$count" != "1" ]]; then + echo "expected exactly one matching run, found $count" >&2 + jq -r '.[] | "\(.databaseId) \(.headSha) \(.createdAt)"' <<<"$runs" >&2 + exit 1 +fi + +jq -r '.[0].databaseId' <<<"$runs"