From de2864cd470ff7227adee13765f78c64d4226a70 Mon Sep 17 00:00:00 2001 From: Beon de Nood Date: Wed, 27 May 2026 14:13:12 -0400 Subject: [PATCH 1/6] feat(gateway): wire RFC-011 AsyncEmitter into PEP middleware MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add RuntimeEmitter field to PEPConfig for RFC-011 event emission - Add emitter field to pep struct and wire in NewPolicyMiddleware - Emit identity.verified event after successful badge verification - Emit identity.invalid event on badge verification failure - Emit authority.granted/denied events on chain verification - Emit execution.started and execution.completed events for request lifecycle - Add helper functions for emitting events with proper mediation context - Add parseTrustLevel/parseIAL helpers to convert string levels to int Tests: - Add event_emission_test.go with tests for: - identity.verified emission on valid badge - identity.invalid emission on missing badge - execution lifecycle (started/completed) emission - graceful handling when emitter is nil Implements P1 of the Verification Locality plan. RFC: RFC-011 §5.1, §5.2, §5.4 --- pkg/gateway/event_emission_test.go | 322 +++++++++++++++++++++++++++++ pkg/gateway/middleware.go | 180 +++++++++++++++- 2 files changed, 498 insertions(+), 4 deletions(-) create mode 100644 pkg/gateway/event_emission_test.go diff --git a/pkg/gateway/event_emission_test.go b/pkg/gateway/event_emission_test.go new file mode 100644 index 0000000..a4f455c --- /dev/null +++ b/pkg/gateway/event_emission_test.go @@ -0,0 +1,322 @@ +// Copyright (c) CapiscIO, Inc. +// Licensed under the MIT License. + +package gateway_test + +import ( + "context" + "crypto/ed25519" + "crypto/rand" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/capiscio/capiscio-core/v2/pkg/badge" + "github.com/capiscio/capiscio-core/v2/pkg/gateway" + "github.com/capiscio/capiscio-core/v2/pkg/mediation" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// mockEventSink captures emitted events for testing. +type mockEventSink struct { + mu sync.Mutex + events []*mediation.Event +} + +func (m *mockEventSink) Receive(event *mediation.Event) { + m.mu.Lock() + defer m.mu.Unlock() + m.events = append(m.events, event) +} + +func (m *mockEventSink) Flush(ctx context.Context) error { + return nil +} + +func (m *mockEventSink) Close() error { + return nil +} + +func (m *mockEventSink) Events() []*mediation.Event { + m.mu.Lock() + defer m.mu.Unlock() + return append([]*mediation.Event{}, m.events...) +} + +func (m *mockEventSink) EventTypes() []mediation.EventType { + m.mu.Lock() + defer m.mu.Unlock() + types := make([]mediation.EventType, len(m.events)) + for i, e := range m.events { + types[i] = e.EventType + } + return types +} + +func (m *mockEventSink) Reset() { + m.mu.Lock() + defer m.mu.Unlock() + m.events = nil +} + +func TestEventEmission_IdentityVerified(t *testing.T) { + // Setup keys and verifier + pub, priv, err := ed25519.GenerateKey(rand.Reader) + require.NoError(t, err) + + reg := &MockRegistry{Key: pub} + verifier := badge.NewVerifier(reg) + + // Create event sink and emitter + sink := &mockEventSink{} + emitter := mediation.NewAsyncEmitter(mediation.AsyncEmitterConfig{ + ComponentID: "test-gateway", + ComponentType: mediation.ComponentGateway, + Version: "test", + BufferSize: 100, + Sinks: []mediation.EventSink{sink}, + }) + defer emitter.Close() + + // Create valid badge with VC + claims := &badge.Claims{ + JTI: "test-jti-events", + Issuer: "did:web:test.capisc.io", + Subject: "did:web:test.capisc.io:agents:test-agent", + IssuedAt: time.Now().Unix(), + Expiry: time.Now().Add(1 * time.Hour).Unix(), + VC: badge.VerifiableCredential{ + Type: []string{"VerifiableCredential", "AgentIdentity"}, + CredentialSubject: badge.CredentialSubject{ + Domain: "test.example.com", + Level: "2", + }, + }, + } + + token, err := badge.SignBadge(claims, priv) + require.NoError(t, err) + + // Create handler + called := false + next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + w.WriteHeader(http.StatusOK) + }) + + // Create middleware with emitter + config := gateway.PEPConfig{ + RuntimeEmitter: emitter, + } + handler := gateway.NewPolicyMiddleware(verifier, config, next) + + // Make request + req := httptest.NewRequest("GET", "/test", nil) + req.Header.Set("X-Capiscio-Badge", token) + req.Header.Set("X-Trace-ID", "trace-123") + + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + // Wait for async event processing + time.Sleep(50 * time.Millisecond) + + // Verify response + assert.Equal(t, http.StatusOK, w.Code) + assert.True(t, called) + + // Verify events emitted + events := sink.Events() + require.GreaterOrEqual(t, len(events), 1, "should emit at least identity.verified") + + // Find identity.verified event + var identityVerified *mediation.Event + for _, e := range events { + if e.EventType == mediation.EventIdentityVerified { + identityVerified = e + break + } + } + require.NotNil(t, identityVerified, "should emit identity.verified event") + assert.Equal(t, "trace-123", identityVerified.Context.TraceID) + assert.NotEmpty(t, identityVerified.Context.TxnID) +} + +func TestEventEmission_IdentityInvalid_MissingBadge(t *testing.T) { + // Setup keys and verifier (won't be used since badge is missing) + pub, _, err := ed25519.GenerateKey(rand.Reader) + require.NoError(t, err) + + reg := &MockRegistry{Key: pub} + verifier := badge.NewVerifier(reg) + + // Create event sink and emitter + sink := &mockEventSink{} + emitter := mediation.NewAsyncEmitter(mediation.AsyncEmitterConfig{ + ComponentID: "test-gateway", + ComponentType: mediation.ComponentGateway, + Version: "test", + BufferSize: 100, + Sinks: []mediation.EventSink{sink}, + }) + defer emitter.Close() + + // Create handler + called := false + next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + }) + + // Create middleware with emitter + config := gateway.PEPConfig{ + RuntimeEmitter: emitter, + } + handler := gateway.NewPolicyMiddleware(verifier, config, next) + + // Make request WITHOUT badge + req := httptest.NewRequest("GET", "/test", nil) + req.Header.Set("X-Trace-ID", "trace-456") + + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + // Wait for async event processing + time.Sleep(50 * time.Millisecond) + + // Verify response + assert.Equal(t, http.StatusUnauthorized, w.Code) + assert.False(t, called) + + // Verify identity.invalid event emitted + events := sink.Events() + require.Len(t, events, 1) + assert.Equal(t, mediation.EventIdentityInvalid, events[0].EventType) + assert.Equal(t, "trace-456", events[0].Context.TraceID) +} + +func TestEventEmission_ExecutionLifecycle(t *testing.T) { + // Setup keys and verifier + pub, priv, err := ed25519.GenerateKey(rand.Reader) + require.NoError(t, err) + + reg := &MockRegistry{Key: pub} + verifier := badge.NewVerifier(reg) + + // Create event sink and emitter + sink := &mockEventSink{} + emitter := mediation.NewAsyncEmitter(mediation.AsyncEmitterConfig{ + ComponentID: "test-gateway", + ComponentType: mediation.ComponentGateway, + Version: "test", + BufferSize: 100, + Sinks: []mediation.EventSink{sink}, + }) + defer emitter.Close() + + // Create valid badge with VC + claims := &badge.Claims{ + JTI: "test-jti-exec", + Issuer: "did:web:test.capisc.io", + Subject: "did:web:test.capisc.io:agents:test-agent", + IssuedAt: time.Now().Unix(), + Expiry: time.Now().Add(1 * time.Hour).Unix(), + VC: badge.VerifiableCredential{ + Type: []string{"VerifiableCredential", "AgentIdentity"}, + CredentialSubject: badge.CredentialSubject{ + Domain: "test.example.com", + Level: "1", + }, + }, + } + + token, err := badge.SignBadge(claims, priv) + require.NoError(t, err) + + // Create handler that takes some time + next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(10 * time.Millisecond) + w.WriteHeader(http.StatusOK) + }) + + // Create middleware with emitter (badge-only mode, no PDP) + config := gateway.PEPConfig{ + RuntimeEmitter: emitter, + } + handler := gateway.NewPolicyMiddleware(verifier, config, next) + + // Make request + req := httptest.NewRequest("GET", "/test", nil) + req.Header.Set("X-Capiscio-Badge", token) + + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + // Wait for async event processing + time.Sleep(50 * time.Millisecond) + + // Verify response + assert.Equal(t, http.StatusOK, w.Code) + + // Verify event sequence: identity.verified, execution.started, execution.completed + eventTypes := sink.EventTypes() + require.GreaterOrEqual(t, len(eventTypes), 3, "should emit identity.verified, execution.started, execution.completed") + + assert.Contains(t, eventTypes, mediation.EventIdentityVerified) + assert.Contains(t, eventTypes, mediation.EventExecutionStarted) + assert.Contains(t, eventTypes, mediation.EventExecutionCompleted) +} + +func TestEventEmission_NoEmitter(t *testing.T) { + // Setup keys and verifier + pub, priv, err := ed25519.GenerateKey(rand.Reader) + require.NoError(t, err) + + reg := &MockRegistry{Key: pub} + verifier := badge.NewVerifier(reg) + + // Create valid badge with VC + claims := &badge.Claims{ + JTI: "test-jti-no-emitter", + Issuer: "did:web:test.capisc.io", + Subject: "did:web:test.capisc.io:agents:test-agent", + IssuedAt: time.Now().Unix(), + Expiry: time.Now().Add(1 * time.Hour).Unix(), + VC: badge.VerifiableCredential{ + Type: []string{"VerifiableCredential", "AgentIdentity"}, + CredentialSubject: badge.CredentialSubject{ + Domain: "test.example.com", + Level: "1", + }, + }, + } + + token, err := badge.SignBadge(claims, priv) + require.NoError(t, err) + + // Create handler + called := false + next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + w.WriteHeader(http.StatusOK) + }) + + // Create middleware WITHOUT emitter (nil) + config := gateway.PEPConfig{ + RuntimeEmitter: nil, // no emitter + } + handler := gateway.NewPolicyMiddleware(verifier, config, next) + + // Make request + req := httptest.NewRequest("GET", "/test", nil) + req.Header.Set("X-Capiscio-Badge", token) + + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + // Should still work without emitter + assert.Equal(t, http.StatusOK, w.Code) + assert.True(t, called) +} diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index c173aa3..21bd0ef 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -16,6 +16,7 @@ import ( "github.com/capiscio/capiscio-core/v2/pkg/badge" "github.com/capiscio/capiscio-core/v2/pkg/envelope" + "github.com/capiscio/capiscio-core/v2/pkg/mediation" "github.com/capiscio/capiscio-core/v2/pkg/pip" "github.com/capiscio/capiscio-core/v2/pkg/trust" ) @@ -79,6 +80,11 @@ type PEPConfig struct { // LocalVerifyOptions configures local verification behavior when TrustMaterial is set. LocalVerifyOptions badge.LocalVerifyOptions + + // RuntimeEmitter emits RFC-011 runtime events from the gateway. + // Per RFC-011 §4.2, event emission is non-blocking and asynchronous. + // nil = no event emission (silent mode). + RuntimeEmitter *mediation.AsyncEmitter } // defaultMaxChainDepth is the maximum chain depth per RFC-008 §9.5 RECOMMENDED. @@ -108,6 +114,7 @@ type pep struct { logger *slog.Logger bgValidator *pip.BreakGlassValidator callbacks []PolicyEventCallback + emitter *mediation.AsyncEmitter next http.Handler } @@ -116,6 +123,9 @@ type pep struct { // // When PEPConfig.TrustMaterial is set and bootstrapped, uses LocalVerifier for // badge verification (RFC-001 §2.3 compliant — no network calls on verification path). +// +// When PEPConfig.RuntimeEmitter is set, emits RFC-011 runtime events for +// identity verification, authority decisions, and request lifecycle. func NewPolicyMiddleware(verifier *badge.Verifier, config PEPConfig, next http.Handler, callbacks ...PolicyEventCallback) http.Handler { p := &pep{ verifier: verifier, @@ -123,6 +133,7 @@ func NewPolicyMiddleware(verifier *badge.Verifier, config PEPConfig, next http.H next: next, callbacks: callbacks, logger: config.Logger, + emitter: config.RuntimeEmitter, } if p.logger == nil { p.logger = slog.Default() @@ -138,14 +149,32 @@ func NewPolicyMiddleware(verifier *badge.Verifier, config PEPConfig, next http.H p.logger.Info("PEP using local-first verification (RFC-001 §2.3)") } + // Log emitter status + if p.emitter != nil { + p.logger.Info("PEP emitting RFC-011 runtime events") + } + return http.HandlerFunc(p.serveHTTP) } // serveHTTP implements the PEP request flow: authenticate → break-glass → cache → PDP → enforce. func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { + // Generate trace/txn IDs for event correlation. + txnID := r.Header.Get(pip.TxnIDHeader) + if txnID == "" { + if u, err := uuid.NewV7(); err != nil { + txnID = uuid.New().String() + } else { + txnID = u.String() + } + r.Header.Set(pip.TxnIDHeader, txnID) + } + traceID := r.Header.Get("X-Trace-ID") + // --- 1. Extract and verify badge (authentication) --- token := ExtractBadge(r) if token == "" { + p.emitIdentityInvalid(traceID, txnID, "", "MISSING_BADGE", "no badge in request") http.Error(w, "Missing Trust Badge", http.StatusUnauthorized) return } @@ -159,6 +188,7 @@ func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { if verifyErr != nil { p.logger.WarnContext(r.Context(), "local badge verification failed", slog.String("error", verifyErr.Error())) + p.emitIdentityInvalid(traceID, txnID, "", "VERIFICATION_FAILED", verifyErr.Error()) http.Error(w, "Invalid Trust Badge", http.StatusUnauthorized) return } @@ -172,11 +202,15 @@ func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { claims, err = p.verifier.Verify(r.Context(), token) if err != nil { p.logger.WarnContext(r.Context(), "badge verification failed", slog.String("error", err.Error())) + p.emitIdentityInvalid(traceID, txnID, "", "VERIFICATION_FAILED", err.Error()) http.Error(w, "Invalid Trust Badge", http.StatusUnauthorized) return } } + // RFC-011 §5.1: Emit identity.verified event. + p.emitIdentityVerified(traceID, txnID, claims) + // Forward verified identity to upstream r.Header.Set("X-Capiscio-Subject", claims.Subject) r.Header.Set("X-Capiscio-Issuer", claims.Issuer) @@ -203,15 +237,22 @@ func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { ) chainErr := envelope.NewError(envelope.ErrCodeBadgeBindingFailed, fmt.Sprintf("leaf envelope subject_did %q does not match authenticated caller %q", leafSubject, claims.Subject)) + p.emitAuthorityDenied(traceID, txnID, claims.Subject, "", envelope.ErrCodeBadgeBindingFailed, chainErr.Message) p.handleChainError(w, r, chainErr) return } + // RFC-011 §5.2: Emit authority.granted after successful chain verification + p.emitAuthorityGranted(traceID, txnID, claims.Subject, chainResult) } } // If no PDP configured, operate in badge-only mode if p.config.PDPClient == nil { + // RFC-011 §5.4: Emit execution lifecycle for badge-only mode + start := time.Now() + p.emitExecutionStarted(traceID, txnID, claims.Subject, nil) p.next.ServeHTTP(w, r) + p.emitExecutionCompleted(traceID, txnID, "success", time.Since(start).Milliseconds()) return } @@ -224,7 +265,7 @@ func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { } // --- 5-9. Cache → PDP → enforce → obligations --- - p.evaluatePolicy(w, r, claims, pipReq) + p.evaluatePolicy(w, r, claims, pipReq, traceID, txnID) } // buildPIPRequest constructs the PIP decision request from the HTTP request, badge claims, @@ -483,12 +524,12 @@ func (p *pep) handleBreakGlass(w http.ResponseWriter, r *http.Request, pipReq *p } // evaluatePolicy handles cache lookup, PDP query, decision enforcement, and obligations. -func (p *pep) evaluatePolicy(w http.ResponseWriter, r *http.Request, claims *badge.Claims, pipReq *pip.DecisionRequest) { +func (p *pep) evaluatePolicy(w http.ResponseWriter, r *http.Request, claims *badge.Claims, pipReq *pip.DecisionRequest, traceID, txnID string) { cacheKey := pip.CacheKeyComponents(claims.Subject, claims.JTI, pipReq.Action.Operation, pipReq.Resource.Identifier) event := PolicyEvent{} // --- 5. Check cache --- - if p.handleCachedDecision(w, r, cacheKey, &event, pipReq) { + if p.handleCachedDecision(w, r, cacheKey, &event, pipReq, traceID, txnID, claims.Subject) { return } @@ -539,13 +580,19 @@ func (p *pep) evaluatePolicy(w http.ResponseWriter, r *http.Request, claims *bad return } + // RFC-011 §5.4: Emit execution lifecycle events for PDP mode + execStart := time.Now() + p.emitExecutionStarted(traceID, txnID, claims.Subject, nil) + emitPolicyEvent(p.callbacks, event, pipReq) p.next.ServeHTTP(w, r) + + p.emitExecutionCompleted(traceID, txnID, "success", time.Since(execStart).Milliseconds()) } // handleCachedDecision serves a cached PDP decision if available. // Returns true if the request was handled from cache. -func (p *pep) handleCachedDecision(w http.ResponseWriter, r *http.Request, cacheKey string, event *PolicyEvent, pipReq *pip.DecisionRequest) bool { +func (p *pep) handleCachedDecision(w http.ResponseWriter, r *http.Request, cacheKey string, event *PolicyEvent, pipReq *pip.DecisionRequest, traceID, txnID, subjectDID string) bool { if p.config.DecisionCache == nil { return false } @@ -566,7 +613,11 @@ func (p *pep) handleCachedDecision(w http.ResponseWriter, r *http.Request, cache slog.String(pip.TelemetryDecisionID, cached.DecisionID)) event.Decision = pip.DecisionObserve emitPolicyEvent(p.callbacks, *event, pipReq) + // RFC-011: emit execution events for cached EM-OBSERVE + execStart := time.Now() + p.emitExecutionStarted(traceID, txnID, subjectDID, nil) p.next.ServeHTTP(w, r) + p.emitExecutionCompleted(traceID, txnID, "success", time.Since(execStart).Milliseconds()) return true } reason := cached.Reason @@ -588,8 +639,14 @@ func (p *pep) handleCachedDecision(w http.ResponseWriter, r *http.Request, cache } } + // RFC-011: emit execution events for cached ALLOW + execStart := time.Now() + p.emitExecutionStarted(traceID, txnID, subjectDID, nil) + emitPolicyEvent(p.callbacks, *event, pipReq) p.next.ServeHTTP(w, r) + + p.emitExecutionCompleted(traceID, txnID, "success", time.Since(execStart).Milliseconds()) return true } @@ -750,3 +807,118 @@ func emitPolicyEvent(callbacks []PolicyEventCallback, event PolicyEvent, req *pi }() } } + +// --- RFC-011 Event Emission Helpers --- + +// parseTrustLevel converts a trust level string to int (0-3). +func parseTrustLevel(level string) int { + switch level { + case "0": + return 0 + case "1": + return 1 + case "2": + return 2 + case "3": + return 3 + default: + return 0 + } +} + +// parseIAL converts an IAL string to int (0-3). +func parseIAL(ial string) int { + switch ial { + case "0": + return 0 + case "1": + return 1 + case "2": + return 2 + case "3": + return 3 + default: + return 0 + } +} + +// emitIdentityVerified emits an identity.verified event when badge verification succeeds. +func (p *pep) emitIdentityVerified(traceID, txnID string, claims *badge.Claims) { + if p.emitter == nil { + return + } + mctx := &mediation.Context{ + TraceID: traceID, + TxnID: txnID, + SubjectDID: claims.Subject, + TrustLevel: parseTrustLevel(claims.TrustLevel()), + } + p.emitter.EmitIdentityVerified(mctx, claims.JTI, parseTrustLevel(claims.TrustLevel()), parseIAL(claims.IAL)) +} + +// emitIdentityInvalid emits an identity.invalid event when badge verification fails. +func (p *pep) emitIdentityInvalid(traceID, txnID, badgeJTI, errorCode, reason string) { + if p.emitter == nil { + return + } + mctx := &mediation.Context{ + TraceID: traceID, + TxnID: txnID, + } + p.emitter.EmitIdentityInvalid(mctx, badgeJTI, errorCode, reason) +} + +// emitAuthorityGranted emits an authority.granted event when chain verification succeeds. +func (p *pep) emitAuthorityGranted(traceID, txnID, subjectDID string, chainResult *envelope.ChainVerifyResult) { + if p.emitter == nil || chainResult == nil { + return + } + mctx := &mediation.Context{ + TraceID: traceID, + TxnID: txnID, + SubjectDID: subjectDID, + } + p.emitter.EmitCapabilityCheck(mctx, chainResult.LeafCapability, true) +} + +// emitAuthorityDenied emits an authority.denied event when chain verification fails. +func (p *pep) emitAuthorityDenied(traceID, txnID, subjectDID, capability, errorCode, reason string) { + if p.emitter == nil { + return + } + mctx := &mediation.Context{ + TraceID: traceID, + TxnID: txnID, + SubjectDID: subjectDID, + } + // Use EmitCapabilityCheck with granted=false for denial + p.emitter.EmitCapabilityCheck(mctx, capability, false) +} + +// emitExecutionStarted emits an execution.started event at the beginning of request handling. +func (p *pep) emitExecutionStarted(traceID, txnID, subjectDID string, env *envelope.Token) { + if p.emitter == nil { + return + } + mctx := &mediation.Context{ + TraceID: traceID, + TxnID: txnID, + SubjectDID: subjectDID, + } + if env != nil { + mctx.Envelope = env + } + p.emitter.EmitExecutionStarted(mctx) +} + +// emitExecutionCompleted emits an execution.completed event at the end of request handling. +func (p *pep) emitExecutionCompleted(traceID, txnID string, outcome string, durationMs int64) { + if p.emitter == nil { + return + } + mctx := &mediation.Context{ + TraceID: traceID, + TxnID: txnID, + } + p.emitter.EmitExecutionCompleted(mctx, outcome, durationMs) +} From eaf80b2ec7eeaafde736d28aadc4c6bc45fc6088 Mon Sep 17 00:00:00 2001 From: Beon de Nood Date: Wed, 27 May 2026 14:25:20 -0400 Subject: [PATCH 2/6] fix: address PR review comments - Clarify RuntimeEmitter doc comment re: blocking behavior (DropOnFull=true) - Remove duplicate txnID generation in buildPIPRequest (set by serveHTTP) - Pass LeafCapability in emitAuthorityDenied for leaf subject mismatch - Add subjectDID parameter to emitExecutionCompleted for correlation - Add authority.denied emission in handleChainError for chain failures - Replace time.Sleep with require.Eventually in tests for determinism Addresses: copilot-pull-request-reviewer comments on PR #90 --- pkg/gateway/event_emission_test.go | 38 ++++++++++++++++++++++---- pkg/gateway/middleware.go | 44 ++++++++++++++++-------------- 2 files changed, 56 insertions(+), 26 deletions(-) diff --git a/pkg/gateway/event_emission_test.go b/pkg/gateway/event_emission_test.go index a4f455c..4f88e24 100644 --- a/pkg/gateway/event_emission_test.go +++ b/pkg/gateway/event_emission_test.go @@ -62,6 +62,25 @@ func (m *mockEventSink) Reset() { m.events = nil } +// HasEventType returns true if an event of the given type has been received. +func (m *mockEventSink) HasEventType(eventType mediation.EventType) bool { + m.mu.Lock() + defer m.mu.Unlock() + for _, e := range m.events { + if e.EventType == eventType { + return true + } + } + return false +} + +// EventCount returns the number of events received. +func (m *mockEventSink) EventCount() int { + m.mu.Lock() + defer m.mu.Unlock() + return len(m.events) +} + func TestEventEmission_IdentityVerified(t *testing.T) { // Setup keys and verifier pub, priv, err := ed25519.GenerateKey(rand.Reader) @@ -121,8 +140,10 @@ func TestEventEmission_IdentityVerified(t *testing.T) { w := httptest.NewRecorder() handler.ServeHTTP(w, req) - // Wait for async event processing - time.Sleep(50 * time.Millisecond) + // Wait for async event processing using deterministic polling + require.Eventually(t, func() bool { + return sink.HasEventType(mediation.EventIdentityVerified) + }, 500*time.Millisecond, 10*time.Millisecond, "identity.verified event not emitted within timeout") // Verify response assert.Equal(t, http.StatusOK, w.Code) @@ -183,8 +204,10 @@ func TestEventEmission_IdentityInvalid_MissingBadge(t *testing.T) { w := httptest.NewRecorder() handler.ServeHTTP(w, req) - // Wait for async event processing - time.Sleep(50 * time.Millisecond) + // Wait for async event processing using deterministic polling + require.Eventually(t, func() bool { + return sink.HasEventType(mediation.EventIdentityInvalid) + }, 500*time.Millisecond, 10*time.Millisecond, "identity.invalid event not emitted within timeout") // Verify response assert.Equal(t, http.StatusUnauthorized, w.Code) @@ -254,8 +277,11 @@ func TestEventEmission_ExecutionLifecycle(t *testing.T) { w := httptest.NewRecorder() handler.ServeHTTP(w, req) - // Wait for async event processing - time.Sleep(50 * time.Millisecond) + // Wait for async event processing using deterministic polling + // Wait for execution.completed as it's the last event in the sequence + require.Eventually(t, func() bool { + return sink.HasEventType(mediation.EventExecutionCompleted) + }, 500*time.Millisecond, 10*time.Millisecond, "execution.completed event not emitted within timeout") // Verify response assert.Equal(t, http.StatusOK, w.Code) diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index 21bd0ef..3ed343c 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -83,6 +83,8 @@ type PEPConfig struct { // RuntimeEmitter emits RFC-011 runtime events from the gateway. // Per RFC-011 §4.2, event emission is non-blocking and asynchronous. + // NOTE: Default AsyncEmitter may block when buffer is full. For strict RFC-011 + // §4.2 compliance under load, configure the emitter with DropOnFull=true. // nil = no event emission (silent mode). RuntimeEmitter *mediation.AsyncEmitter } @@ -221,7 +223,9 @@ func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { var err error chainResult, err = p.verifyAuthorityChain(r, token, claims.Subject) if err != nil { - p.handleChainError(w, r, err) + // Get capability from request header for authority.denied event + capability := r.Header.Get("X-Capiscio-Capability-Class") + p.handleChainError(w, r, err, traceID, txnID, claims.Subject, capability) return } @@ -237,8 +241,8 @@ func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { ) chainErr := envelope.NewError(envelope.ErrCodeBadgeBindingFailed, fmt.Sprintf("leaf envelope subject_did %q does not match authenticated caller %q", leafSubject, claims.Subject)) - p.emitAuthorityDenied(traceID, txnID, claims.Subject, "", envelope.ErrCodeBadgeBindingFailed, chainErr.Message) - p.handleChainError(w, r, chainErr) + p.emitAuthorityDenied(traceID, txnID, claims.Subject, chainResult.LeafCapability, envelope.ErrCodeBadgeBindingFailed, chainErr.Message) + p.handleChainError(w, r, chainErr, traceID, txnID, claims.Subject, "") return } // RFC-011 §5.2: Emit authority.granted after successful chain verification @@ -252,7 +256,7 @@ func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { start := time.Now() p.emitExecutionStarted(traceID, txnID, claims.Subject, nil) p.next.ServeHTTP(w, r) - p.emitExecutionCompleted(traceID, txnID, "success", time.Since(start).Milliseconds()) + p.emitExecutionCompleted(traceID, txnID, claims.Subject, "success", time.Since(start).Milliseconds()) return } @@ -270,17 +274,9 @@ func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { // buildPIPRequest constructs the PIP decision request from the HTTP request, badge claims, // and optional verified chain result (nil for badge-only requests). +// NOTE: serveHTTP must set X-Capiscio-Txn header before calling this function. func (p *pep) buildPIPRequest(r *http.Request, claims *badge.Claims, chain *envelope.ChainVerifyResult) *pip.DecisionRequest { txnID := r.Header.Get(pip.TxnIDHeader) - if txnID == "" { - if u, err := uuid.NewV7(); err != nil { - p.logger.ErrorContext(r.Context(), "failed to generate UUID v7 for txn_id", slog.String("error", err.Error())) - txnID = uuid.New().String() - } else { - txnID = u.String() - } - } - r.Header.Set(pip.TxnIDHeader, txnID) now := time.Now().UTC() nowStr := now.Format(time.RFC3339) @@ -435,7 +431,8 @@ func (p *pep) verifyAuthorityChain(r *http.Request, callerBadgeJWS string, calle // handleChainError maps envelope verification errors to HTTP responses. // Errors from chain verification are pre-PDP (RFC-008 §9.2 steps 2–8). -func (p *pep) handleChainError(w http.ResponseWriter, r *http.Request, err error) { +// RFC-011 §5.2: Emits authority.denied event on chain verification failure. +func (p *pep) handleChainError(w http.ResponseWriter, r *http.Request, err error, traceID, txnID, subjectDID, capability string) { var envErr *envelope.Error if errors.As(err, &envErr) { status := ChainErrorHTTPStatus(envErr.Code) @@ -446,6 +443,9 @@ func (p *pep) handleChainError(w http.ResponseWriter, r *http.Request, err error slog.String("enforcement_mode", p.config.EnforcementMode.String()), ) + // RFC-011 §5.2: Emit authority.denied on chain verification failure + p.emitAuthorityDenied(traceID, txnID, subjectDID, capability, envErr.Code, envErr.Message) + // In EM-OBSERVE, log but allow the request through (RFC-005 §6.3) if p.config.EnforcementMode == pip.EMObserve { p.logger.InfoContext(r.Context(), "chain error in EM-OBSERVE (allowing)", @@ -465,6 +465,9 @@ func (p *pep) handleChainError(w http.ResponseWriter, r *http.Request, err error } // Non-envelope error (e.g., network failure in key resolution) + // RFC-011 §5.2: Emit authority.denied for non-envelope errors too + p.emitAuthorityDenied(traceID, txnID, subjectDID, capability, "INTERNAL_ERROR", err.Error()) + if p.config.EnforcementMode == pip.EMObserve { p.logger.WarnContext(r.Context(), "chain verification error in EM-OBSERVE (allowing)", slog.String("error", err.Error())) @@ -587,7 +590,7 @@ func (p *pep) evaluatePolicy(w http.ResponseWriter, r *http.Request, claims *bad emitPolicyEvent(p.callbacks, event, pipReq) p.next.ServeHTTP(w, r) - p.emitExecutionCompleted(traceID, txnID, "success", time.Since(execStart).Milliseconds()) + p.emitExecutionCompleted(traceID, txnID, claims.Subject, "success", time.Since(execStart).Milliseconds()) } // handleCachedDecision serves a cached PDP decision if available. @@ -617,7 +620,7 @@ func (p *pep) handleCachedDecision(w http.ResponseWriter, r *http.Request, cache execStart := time.Now() p.emitExecutionStarted(traceID, txnID, subjectDID, nil) p.next.ServeHTTP(w, r) - p.emitExecutionCompleted(traceID, txnID, "success", time.Since(execStart).Milliseconds()) + p.emitExecutionCompleted(traceID, txnID, subjectDID, "success", time.Since(execStart).Milliseconds()) return true } reason := cached.Reason @@ -646,7 +649,7 @@ func (p *pep) handleCachedDecision(w http.ResponseWriter, r *http.Request, cache emitPolicyEvent(p.callbacks, *event, pipReq) p.next.ServeHTTP(w, r) - p.emitExecutionCompleted(traceID, txnID, "success", time.Since(execStart).Milliseconds()) + p.emitExecutionCompleted(traceID, txnID, subjectDID, "success", time.Since(execStart).Milliseconds()) return true } @@ -912,13 +915,14 @@ func (p *pep) emitExecutionStarted(traceID, txnID, subjectDID string, env *envel } // emitExecutionCompleted emits an execution.completed event at the end of request handling. -func (p *pep) emitExecutionCompleted(traceID, txnID string, outcome string, durationMs int64) { +func (p *pep) emitExecutionCompleted(traceID, txnID, subjectDID string, outcome string, durationMs int64) { if p.emitter == nil { return } mctx := &mediation.Context{ - TraceID: traceID, - TxnID: txnID, + TraceID: traceID, + TxnID: txnID, + SubjectDID: subjectDID, } p.emitter.EmitExecutionCompleted(mctx, outcome, durationMs) } From 4e0d68db0341374dc3b67f7c1c25a4e7879c7989 Mon Sep 17 00:00:00 2001 From: Beon de Nood Date: Wed, 27 May 2026 14:45:07 -0400 Subject: [PATCH 3/6] fix: address round 2 PR review comments - Remove duplicate authority.denied emission (emit only in handleChainError) - Pass LeafCapability to handleChainError for meaningful denial events - Simplify emitAuthorityDenied signature (remove unused errorCode/reason) - Add trust level 4 to parseTrustLevel (RFC-002 defines 0-4) - Add warning log when UUIDv7 generation fails and falls back to v4 Addresses: copilot-pull-request-reviewer comments on PR #90 --- pkg/gateway/middleware.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index 3ed343c..ae53cc1 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -165,6 +165,8 @@ func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { txnID := r.Header.Get(pip.TxnIDHeader) if txnID == "" { if u, err := uuid.NewV7(); err != nil { + p.logger.WarnContext(r.Context(), "UUIDv7 generation failed, falling back to v4", + slog.String("error", err.Error())) txnID = uuid.New().String() } else { txnID = u.String() @@ -241,8 +243,8 @@ func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { ) chainErr := envelope.NewError(envelope.ErrCodeBadgeBindingFailed, fmt.Sprintf("leaf envelope subject_did %q does not match authenticated caller %q", leafSubject, claims.Subject)) - p.emitAuthorityDenied(traceID, txnID, claims.Subject, chainResult.LeafCapability, envelope.ErrCodeBadgeBindingFailed, chainErr.Message) - p.handleChainError(w, r, chainErr, traceID, txnID, claims.Subject, "") + // handleChainError emits authority.denied, so no explicit emit here + p.handleChainError(w, r, chainErr, traceID, txnID, claims.Subject, chainResult.LeafCapability) return } // RFC-011 §5.2: Emit authority.granted after successful chain verification @@ -444,7 +446,7 @@ func (p *pep) handleChainError(w http.ResponseWriter, r *http.Request, err error ) // RFC-011 §5.2: Emit authority.denied on chain verification failure - p.emitAuthorityDenied(traceID, txnID, subjectDID, capability, envErr.Code, envErr.Message) + p.emitAuthorityDenied(traceID, txnID, subjectDID, capability) // In EM-OBSERVE, log but allow the request through (RFC-005 §6.3) if p.config.EnforcementMode == pip.EMObserve { @@ -466,7 +468,7 @@ func (p *pep) handleChainError(w http.ResponseWriter, r *http.Request, err error // Non-envelope error (e.g., network failure in key resolution) // RFC-011 §5.2: Emit authority.denied for non-envelope errors too - p.emitAuthorityDenied(traceID, txnID, subjectDID, capability, "INTERNAL_ERROR", err.Error()) + p.emitAuthorityDenied(traceID, txnID, subjectDID, capability) if p.config.EnforcementMode == pip.EMObserve { p.logger.WarnContext(r.Context(), "chain verification error in EM-OBSERVE (allowing)", @@ -813,7 +815,8 @@ func emitPolicyEvent(callbacks []PolicyEventCallback, event PolicyEvent, req *pi // --- RFC-011 Event Emission Helpers --- -// parseTrustLevel converts a trust level string to int (0-3). +// parseTrustLevel converts a trust level string to int (0-4). +// RFC-002 defines levels 0-4; unknown values default to 0. func parseTrustLevel(level string) int { switch level { case "0": @@ -824,6 +827,8 @@ func parseTrustLevel(level string) int { return 2 case "3": return 3 + case "4": + return 4 default: return 0 } @@ -885,7 +890,8 @@ func (p *pep) emitAuthorityGranted(traceID, txnID, subjectDID string, chainResul } // emitAuthorityDenied emits an authority.denied event when chain verification fails. -func (p *pep) emitAuthorityDenied(traceID, txnID, subjectDID, capability, errorCode, reason string) { +// Note: Error details are logged separately; this event only signals denial. +func (p *pep) emitAuthorityDenied(traceID, txnID, subjectDID, capability string) { if p.emitter == nil { return } From b2b2d7c5f3d58a219ce288a2fbd9e2a18fcfe0d9 Mon Sep 17 00:00:00 2001 From: Beon de Nood Date: Wed, 27 May 2026 15:00:29 -0400 Subject: [PATCH 4/6] =?UTF-8?q?feat(gateway):=20implement=20RFC-011=20?= =?UTF-8?q?=C2=A77.1=20compliant=20execution=20lifecycle?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add statusCapturingResponseWriter to capture HTTP status codes - Add executionState struct for tracking execution lifecycle - Implement defer-based pattern ensuring execution events on ALL exit paths - Add EmitExecutionAborted to AsyncEmitter for early terminations - Add emitExecutionAborted helper and outcomeFromStatus for middleware - Create WithState variants of key handlers for state tracking - Add tests for execution.aborted and outcome-based execution.completed - outcome field now reflects actual HTTP status (success/client_error/server_error) RFC-011 §7.1 PEP Emission Requirements: - MUST emit execution.completed OR execution.aborted on every exit path - execution.completed: outcome derived from HTTP status code - execution.aborted: emitted for auth failures, chain errors, PDP deny Addresses PR review comments: - Incomplete execution lifecycle (PRRT_kwDOQYQ-986FMftQ) - execution.completed always 'success' (PRRT_kwDOQYQ-986FMfti) - Missing test coverage (PRRT_kwDOQYQ-986FMfuU) --- pkg/gateway/event_emission_test.go | 176 ++++++++++++++ pkg/gateway/middleware.go | 356 ++++++++++++++++++++++++++++- pkg/mediation/emitter.go | 14 ++ 3 files changed, 534 insertions(+), 12 deletions(-) diff --git a/pkg/gateway/event_emission_test.go b/pkg/gateway/event_emission_test.go index 4f88e24..d0fd47a 100644 --- a/pkg/gateway/event_emission_test.go +++ b/pkg/gateway/event_emission_test.go @@ -346,3 +346,179 @@ func TestEventEmission_NoEmitter(t *testing.T) { assert.Equal(t, http.StatusOK, w.Code) assert.True(t, called) } + +func TestEventEmission_ExecutionAborted_BadgeVerificationFailed(t *testing.T) { + // Setup keys but sign with different key to trigger verification failure + pub, _, err := ed25519.GenerateKey(rand.Reader) + require.NoError(t, err) + _, wrongPriv, err := ed25519.GenerateKey(rand.Reader) + require.NoError(t, err) + + reg := &MockRegistry{Key: pub} + verifier := badge.NewVerifier(reg) + + // Create event sink and emitter + sink := &mockEventSink{} + emitter := mediation.NewAsyncEmitter(mediation.AsyncEmitterConfig{ + ComponentID: "test-gateway", + ComponentType: mediation.ComponentGateway, + Version: "test", + BufferSize: 100, + Sinks: []mediation.EventSink{sink}, + }) + defer emitter.Close() + + // Create badge signed with WRONG key + claims := &badge.Claims{ + JTI: "test-jti-wrong-key", + Issuer: "did:web:test.capisc.io", + Subject: "did:web:test.capisc.io:agents:test-agent", + IssuedAt: time.Now().Unix(), + Expiry: time.Now().Add(1 * time.Hour).Unix(), + VC: badge.VerifiableCredential{ + Type: []string{"VerifiableCredential", "AgentIdentity"}, + CredentialSubject: badge.CredentialSubject{ + Domain: "test.example.com", + Level: "2", + }, + }, + } + + token, err := badge.SignBadge(claims, wrongPriv) + require.NoError(t, err) + + // Create handler + called := false + next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + }) + + // Create middleware with emitter + config := gateway.PEPConfig{ + RuntimeEmitter: emitter, + } + handler := gateway.NewPolicyMiddleware(verifier, config, next) + + // Make request with invalid badge + req := httptest.NewRequest("GET", "/test", nil) + req.Header.Set("X-Capiscio-Badge", token) + req.Header.Set("X-Trace-ID", "trace-bad-badge") + + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + // Wait for async event processing + require.Eventually(t, func() bool { + return sink.HasEventType(mediation.EventIdentityInvalid) + }, 500*time.Millisecond, 10*time.Millisecond, "identity.invalid event not emitted within timeout") + + // Verify response + assert.Equal(t, http.StatusUnauthorized, w.Code) + assert.False(t, called, "handler should not be called for invalid badge") + + // Verify identity.invalid event emitted (no execution events since we never authenticated) + events := sink.Events() + require.GreaterOrEqual(t, len(events), 1) + + hasIdentityInvalid := false + for _, e := range events { + if e.EventType == mediation.EventIdentityInvalid { + hasIdentityInvalid = true + // Verify error info in payload + assert.Equal(t, "VERIFICATION_FAILED", e.Payload["error_code"]) + } + } + assert.True(t, hasIdentityInvalid, "should emit identity.invalid") +} + +func TestEventEmission_OutcomeReflectsHTTPStatus(t *testing.T) { + // Setup keys and verifier + pub, priv, err := ed25519.GenerateKey(rand.Reader) + require.NoError(t, err) + + reg := &MockRegistry{Key: pub} + verifier := badge.NewVerifier(reg) + + tests := []struct { + name string + statusCode int + expectedOutcome string + }{ + {"success_200", http.StatusOK, "success"}, + {"success_201", http.StatusCreated, "success"}, + {"client_error_400", http.StatusBadRequest, "client_error"}, + {"client_error_404", http.StatusNotFound, "client_error"}, + {"server_error_500", http.StatusInternalServerError, "server_error"}, + {"server_error_503", http.StatusServiceUnavailable, "server_error"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create event sink and emitter + sink := &mockEventSink{} + emitter := mediation.NewAsyncEmitter(mediation.AsyncEmitterConfig{ + ComponentID: "test-gateway", + ComponentType: mediation.ComponentGateway, + Version: "test", + BufferSize: 100, + Sinks: []mediation.EventSink{sink}, + }) + defer emitter.Close() + + // Create valid badge + claims := &badge.Claims{ + JTI: "test-jti-" + tt.name, + Issuer: "did:web:test.capisc.io", + Subject: "did:web:test.capisc.io:agents:test-agent", + IssuedAt: time.Now().Unix(), + Expiry: time.Now().Add(1 * time.Hour).Unix(), + VC: badge.VerifiableCredential{ + Type: []string{"VerifiableCredential", "AgentIdentity"}, + CredentialSubject: badge.CredentialSubject{ + Domain: "test.example.com", + Level: "1", + }, + }, + } + + token, err := badge.SignBadge(claims, priv) + require.NoError(t, err) + + // Create handler that returns the test status code + next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(tt.statusCode) + }) + + // Create middleware with emitter + config := gateway.PEPConfig{ + RuntimeEmitter: emitter, + } + handler := gateway.NewPolicyMiddleware(verifier, config, next) + + // Make request + req := httptest.NewRequest("GET", "/test", nil) + req.Header.Set("X-Capiscio-Badge", token) + + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + // Wait for execution.completed event + require.Eventually(t, func() bool { + return sink.HasEventType(mediation.EventExecutionCompleted) + }, 500*time.Millisecond, 10*time.Millisecond, "execution.completed event not emitted within timeout") + + // Verify outcome field matches expected + events := sink.Events() + var execCompleted *mediation.Event + for _, e := range events { + if e.EventType == mediation.EventExecutionCompleted { + execCompleted = e + break + } + } + require.NotNil(t, execCompleted) + assert.Equal(t, tt.expectedOutcome, execCompleted.Payload["outcome"], + "outcome should reflect HTTP status %d", tt.statusCode) + }) + } +} diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index ae53cc1..a835192 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -21,6 +21,39 @@ import ( "github.com/capiscio/capiscio-core/v2/pkg/trust" ) +// statusCapturingResponseWriter wraps http.ResponseWriter to capture the status code. +// RFC-011 §5.4: Used to derive outcome field in execution.completed events. +type statusCapturingResponseWriter struct { + http.ResponseWriter + statusCode int + written bool +} + +func (w *statusCapturingResponseWriter) WriteHeader(code int) { + if !w.written { + w.statusCode = code + w.written = true + } + w.ResponseWriter.WriteHeader(code) +} + +func (w *statusCapturingResponseWriter) Write(b []byte) (int, error) { + if !w.written { + w.statusCode = http.StatusOK + w.written = true + } + return w.ResponseWriter.Write(b) +} + +// executionState tracks the lifecycle state for RFC-011 event emission. +type executionState struct { + started bool + subjectDID string + aborted bool + abortCode string + abortMsg string +} + // NewAuthMiddleware creates a middleware that enforces Badge validity. // Deprecated: Use NewPolicyMiddleware for RFC-005 PDP integration. func NewAuthMiddleware(verifier *badge.Verifier, next http.Handler) http.Handler { @@ -160,7 +193,10 @@ func NewPolicyMiddleware(verifier *badge.Verifier, config PEPConfig, next http.H } // serveHTTP implements the PEP request flow: authenticate → break-glass → cache → PDP → enforce. +// RFC-011 §7.1: Emits execution.started at entry and execution.completed or execution.aborted at exit. func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { + execStart := time.Now() + // Generate trace/txn IDs for event correlation. txnID := r.Header.Get(pip.TxnIDHeader) if txnID == "" { @@ -175,11 +211,26 @@ func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { } traceID := r.Header.Get("X-Trace-ID") + // Execution state for RFC-011 lifecycle events. + // The defer ensures we always emit completion/aborted on every exit path. + execState := &executionState{} + sw := &statusCapturingResponseWriter{ResponseWriter: w, statusCode: http.StatusOK} + defer func() { + durationMs := time.Since(execStart).Milliseconds() + if execState.aborted { + p.emitExecutionAborted(traceID, txnID, execState.subjectDID, execState.abortCode, execState.abortMsg, durationMs) + } else if execState.started { + outcome := outcomeFromStatus(sw.statusCode) + p.emitExecutionCompleted(traceID, txnID, execState.subjectDID, outcome, durationMs) + } + // If !started && !aborted, no execution events emitted (pre-auth failures handled separately) + }() + // --- 1. Extract and verify badge (authentication) --- token := ExtractBadge(r) if token == "" { p.emitIdentityInvalid(traceID, txnID, "", "MISSING_BADGE", "no badge in request") - http.Error(w, "Missing Trust Badge", http.StatusUnauthorized) + http.Error(sw, "Missing Trust Badge", http.StatusUnauthorized) return } @@ -193,7 +244,7 @@ func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { p.logger.WarnContext(r.Context(), "local badge verification failed", slog.String("error", verifyErr.Error())) p.emitIdentityInvalid(traceID, txnID, "", "VERIFICATION_FAILED", verifyErr.Error()) - http.Error(w, "Invalid Trust Badge", http.StatusUnauthorized) + http.Error(sw, "Invalid Trust Badge", http.StatusUnauthorized) return } claims = result.Claims @@ -207,7 +258,7 @@ func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { if err != nil { p.logger.WarnContext(r.Context(), "badge verification failed", slog.String("error", err.Error())) p.emitIdentityInvalid(traceID, txnID, "", "VERIFICATION_FAILED", err.Error()) - http.Error(w, "Invalid Trust Badge", http.StatusUnauthorized) + http.Error(sw, "Invalid Trust Badge", http.StatusUnauthorized) return } } @@ -215,6 +266,12 @@ func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { // RFC-011 §5.1: Emit identity.verified event. p.emitIdentityVerified(traceID, txnID, claims) + // Now that we have a verified identity, start the execution lifecycle. + // RFC-011 §7.1: Emit execution.started at execution boundary enter. + execState.subjectDID = claims.Subject + execState.started = true + p.emitExecutionStarted(traceID, txnID, claims.Subject, nil) + // Forward verified identity to upstream r.Header.Set("X-Capiscio-Subject", claims.Subject) r.Header.Set("X-Capiscio-Issuer", claims.Issuer) @@ -227,7 +284,10 @@ func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { if err != nil { // Get capability from request header for authority.denied event capability := r.Header.Get("X-Capiscio-Capability-Class") - p.handleChainError(w, r, err, traceID, txnID, claims.Subject, capability) + execState.aborted = true + execState.abortCode = "CHAIN_VERIFICATION_FAILED" + execState.abortMsg = err.Error() + p.handleChainError(sw, r, err, traceID, txnID, claims.Subject, capability) return } @@ -243,8 +303,11 @@ func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { ) chainErr := envelope.NewError(envelope.ErrCodeBadgeBindingFailed, fmt.Sprintf("leaf envelope subject_did %q does not match authenticated caller %q", leafSubject, claims.Subject)) + execState.aborted = true + execState.abortCode = envelope.ErrCodeBadgeBindingFailed + execState.abortMsg = chainErr.Message // handleChainError emits authority.denied, so no explicit emit here - p.handleChainError(w, r, chainErr, traceID, txnID, claims.Subject, chainResult.LeafCapability) + p.handleChainError(sw, r, chainErr, traceID, txnID, claims.Subject, chainResult.LeafCapability) return } // RFC-011 §5.2: Emit authority.granted after successful chain verification @@ -254,11 +317,8 @@ func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { // If no PDP configured, operate in badge-only mode if p.config.PDPClient == nil { - // RFC-011 §5.4: Emit execution lifecycle for badge-only mode - start := time.Now() - p.emitExecutionStarted(traceID, txnID, claims.Subject, nil) - p.next.ServeHTTP(w, r) - p.emitExecutionCompleted(traceID, txnID, claims.Subject, "success", time.Since(start).Milliseconds()) + // RFC-011 §5.4: execution.started already emitted, completion via defer + p.next.ServeHTTP(sw, r) return } @@ -266,12 +326,12 @@ func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { pipReq := p.buildPIPRequest(r, claims, chainResult) // --- 4. Check break-glass override --- - if p.handleBreakGlass(w, r, pipReq) { + if p.handleBreakGlassWithState(sw, r, pipReq, execState) { return } // --- 5-9. Cache → PDP → enforce → obligations --- - p.evaluatePolicy(w, r, claims, pipReq, traceID, txnID) + p.evaluatePolicyWithState(sw, r, claims, pipReq, traceID, txnID, execState) } // buildPIPRequest constructs the PIP decision request from the HTTP request, badge claims, @@ -528,6 +588,34 @@ func (p *pep) handleBreakGlass(w http.ResponseWriter, r *http.Request, pipReq *p return true } +// handleBreakGlassWithState handles break-glass override with execution state tracking. +// RFC-011: Execution events are handled by the caller's defer, not here. +func (p *pep) handleBreakGlassWithState(w http.ResponseWriter, r *http.Request, pipReq *pip.DecisionRequest, _ *executionState) bool { + if p.bgValidator == nil { + return false + } + + bgToken := extractBreakGlass(r, p.bgValidator) + if bgToken == nil { + return false + } + + p.logger.WarnContext(r.Context(), "break-glass override active", + slog.String(pip.TelemetryOverrideJTI, bgToken.JTI), + slog.String("operator", bgToken.SUB), + slog.String("reason", bgToken.Reason)) + + emitPolicyEvent(p.callbacks, PolicyEvent{ + Decision: pip.DecisionAllow, + DecisionID: "breakglass:" + bgToken.JTI, + Override: true, + OverrideJTI: bgToken.JTI, + }, pipReq) + p.next.ServeHTTP(w, r) + // Break-glass is a successful path; defer will emit execution.completed + return true +} + // evaluatePolicy handles cache lookup, PDP query, decision enforcement, and obligations. func (p *pep) evaluatePolicy(w http.ResponseWriter, r *http.Request, claims *badge.Claims, pipReq *pip.DecisionRequest, traceID, txnID string) { cacheKey := pip.CacheKeyComponents(claims.Subject, claims.JTI, pipReq.Action.Operation, pipReq.Resource.Identifier) @@ -595,6 +683,77 @@ func (p *pep) evaluatePolicy(w http.ResponseWriter, r *http.Request, claims *bad p.emitExecutionCompleted(traceID, txnID, claims.Subject, "success", time.Since(execStart).Milliseconds()) } +// evaluatePolicyWithState handles policy evaluation with execution state tracking. +// RFC-011: Execution events are handled by the caller's defer, not here. +func (p *pep) evaluatePolicyWithState(w http.ResponseWriter, r *http.Request, claims *badge.Claims, pipReq *pip.DecisionRequest, traceID, txnID string, execState *executionState) { + cacheKey := pip.CacheKeyComponents(claims.Subject, claims.JTI, pipReq.Action.Operation, pipReq.Resource.Identifier) + event := PolicyEvent{} + + // --- 5. Check cache --- + if p.handleCachedDecisionWithState(w, r, cacheKey, &event, pipReq, execState) { + return + } + + // --- 6. Query PDP --- + start := time.Now() + resp, pdpErr := p.config.PDPClient.Evaluate(r.Context(), pipReq) + event.PDPLatencyMs = time.Since(start).Milliseconds() + + if pdpErr != nil { + p.logger.ErrorContext(r.Context(), "PDP unavailable", + slog.String(pip.TelemetryErrorCode, pip.ErrorCodePDPUnavailable), + slog.String("error", pdpErr.Error()), + slog.String("enforcement_mode", p.config.EnforcementMode.String())) + execState.aborted = true + execState.abortCode = pip.ErrorCodePDPUnavailable + execState.abortMsg = pdpErr.Error() + p.handlePDPUnavailableWithState(w, r, &event, pipReq, execState) + return + } + + // Validate PDP response + if !pip.ValidDecision(resp.Decision) || resp.DecisionID == "" { + p.logger.ErrorContext(r.Context(), "PDP returned non-compliant response", + slog.String("decision", resp.Decision), + slog.String("decision_id", resp.DecisionID)) + execState.aborted = true + execState.abortCode = pip.ErrorCodePDPUnavailable + execState.abortMsg = "PDP returned non-compliant response" + p.handlePDPUnavailableWithState(w, r, &event, pipReq, execState) + return + } + + event.Decision = resp.Decision + event.DecisionID = resp.DecisionID + event.Obligations = obligationTypes(resp.Obligations) + + // --- 7. Cache the response --- + if p.config.DecisionCache != nil { + maxTTL := time.Until(time.Unix(claims.Expiry, 0)) + if maxTTL > 0 { + p.config.DecisionCache.Put(cacheKey, resp, maxTTL) + } + } + + // --- 8. Enforce decision --- + if resp.Decision == pip.DecisionDeny { + execState.aborted = true + execState.abortCode = "PDP_DENY" + execState.abortMsg = resp.Reason + p.handlePDPDenyWithState(w, r, resp, &event, pipReq, execState) + return + } + + // --- 9. Handle obligations --- + if p.enforceObligationsWithState(w, r, resp.Obligations, &event, pipReq, execState) { + return + } + + // RFC-011: execution.completed handled by defer with captured HTTP status + emitPolicyEvent(p.callbacks, event, pipReq) + p.next.ServeHTTP(w, r) +} + // handleCachedDecision serves a cached PDP decision if available. // Returns true if the request was handled from cache. func (p *pep) handleCachedDecision(w http.ResponseWriter, r *http.Request, cacheKey string, event *PolicyEvent, pipReq *pip.DecisionRequest, traceID, txnID, subjectDID string) bool { @@ -733,6 +892,152 @@ func (p *pep) enforceObligations(w http.ResponseWriter, r *http.Request, obligat return false } +// handleCachedDecisionWithState serves a cached PDP decision with execution state tracking. +// RFC-011: Execution events are handled by the caller's defer, not here. +func (p *pep) handleCachedDecisionWithState(w http.ResponseWriter, r *http.Request, cacheKey string, event *PolicyEvent, pipReq *pip.DecisionRequest, execState *executionState) bool { + if p.config.DecisionCache == nil { + return false + } + + cached, ok := p.config.DecisionCache.Get(cacheKey) + if !ok { + return false + } + + event.Decision = cached.Decision + event.DecisionID = cached.DecisionID + event.CacheHit = true + event.Obligations = obligationTypes(cached.Obligations) + + if cached.Decision == pip.DecisionDeny { + if p.config.EnforcementMode == pip.EMObserve { + p.logger.InfoContext(r.Context(), "cached PDP DENY in EM-OBSERVE (allowing)", + slog.String(pip.TelemetryDecisionID, cached.DecisionID)) + event.Decision = pip.DecisionObserve + emitPolicyEvent(p.callbacks, *event, pipReq) + // EM-OBSERVE allows the request; defer handles execution.completed + p.next.ServeHTTP(w, r) + return true + } + execState.aborted = true + execState.abortCode = "CACHED_DENY" + execState.abortMsg = cached.Reason + reason := cached.Reason + if reason == "" { + reason = "Access denied by policy" + } + emitPolicyEvent(p.callbacks, *event, pipReq) + http.Error(w, reason, http.StatusForbidden) + return true + } + + if p.config.ObligationReg != nil && len(cached.Obligations) > 0 { + oblResult := p.config.ObligationReg.Enforce(r.Context(), p.config.EnforcementMode, cached.Obligations) + if !oblResult.Proceed { + execState.aborted = true + execState.abortCode = "OBLIGATION_FAILED" + execState.abortMsg = "cached obligation enforcement failed" + event.Decision = pip.DecisionDeny + emitPolicyEvent(p.callbacks, *event, pipReq) + http.Error(w, "Access denied: obligation enforcement failed", http.StatusForbidden) + return true + } + } + + // Cached ALLOW; defer handles execution.completed + emitPolicyEvent(p.callbacks, *event, pipReq) + p.next.ServeHTTP(w, r) + return true +} + +// handlePDPUnavailableWithState handles PDP unreachability with execution state tracking. +// RFC-011: Execution events are handled by the caller's defer, not here. +func (p *pep) handlePDPUnavailableWithState(w http.ResponseWriter, r *http.Request, event *PolicyEvent, pipReq *pip.DecisionRequest, execState *executionState) { + event.ErrorCode = pip.ErrorCodePDPUnavailable + + if p.config.EnforcementMode == pip.EMObserve { + // EM-OBSERVE: mark as not aborted since request proceeds + execState.aborted = false + event.Decision = pip.DecisionObserve + event.DecisionID = "pdp-unavailable" + emitPolicyEvent(p.callbacks, *event, pipReq) + p.next.ServeHTTP(w, r) + return + } + + // EM-GUARD, EM-DELEGATE, EM-STRICT: fail-closed (already marked aborted by caller) + event.Decision = pip.DecisionDeny + event.DecisionID = "pdp-unavailable" + emitPolicyEvent(p.callbacks, *event, pipReq) + http.Error(w, "Access denied: policy service unavailable", http.StatusForbidden) +} + +// handlePDPDenyWithState handles a DENY decision with execution state tracking. +// RFC-011: Execution events are handled by the caller's defer, not here. +func (p *pep) handlePDPDenyWithState(w http.ResponseWriter, r *http.Request, resp *pip.DecisionResponse, event *PolicyEvent, pipReq *pip.DecisionRequest, execState *executionState) { + switch p.config.EnforcementMode { + case pip.EMObserve: + // EM-OBSERVE: mark as not aborted since request proceeds + execState.aborted = false + p.logger.InfoContext(r.Context(), "PDP DENY in EM-OBSERVE (allowing)", + slog.String(pip.TelemetryDecisionID, resp.DecisionID)) + event.Decision = pip.DecisionObserve + emitPolicyEvent(p.callbacks, *event, pipReq) + p.next.ServeHTTP(w, r) + default: + // Already marked aborted by caller + emitPolicyEvent(p.callbacks, *event, pipReq) + + // RFC-008: SCOPE_INSUFFICIENT returns structured JSON 403 + if resp.ErrorCode == pip.ErrorCodeScopeInsufficient { + var presentedCap, envelopeID string + if pipReq.Action.CapabilityClass != nil { + presentedCap = *pipReq.Action.CapabilityClass + } + if pipReq.Context.EnvelopeID != nil { + envelopeID = *pipReq.Context.EnvelopeID + } + body := envelope.NewScopeInsufficientRejection( + resp.RequestedCapability, + presentedCap, + envelopeID, + pipReq.Context.TxnID, + ) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusForbidden) + _ = json.NewEncoder(w).Encode(body) + return + } + + reason := "Access denied by policy" + if resp.Reason != "" { + reason = resp.Reason + } + http.Error(w, reason, http.StatusForbidden) + } +} + +// enforceObligationsWithState enforces obligations with execution state tracking. +// RFC-011: Execution events are handled by the caller's defer, not here. +func (p *pep) enforceObligationsWithState(w http.ResponseWriter, r *http.Request, obligations []pip.Obligation, event *PolicyEvent, pipReq *pip.DecisionRequest, execState *executionState) bool { + if p.config.ObligationReg == nil || len(obligations) == 0 { + return false + } + + oblResult := p.config.ObligationReg.Enforce(r.Context(), p.config.EnforcementMode, obligations) + if !oblResult.Proceed { + execState.aborted = true + execState.abortCode = "OBLIGATION_FAILED" + execState.abortMsg = "obligation enforcement failed" + event.Decision = pip.DecisionDeny + emitPolicyEvent(p.callbacks, *event, pipReq) + http.Error(w, "Access denied: obligation enforcement failed", http.StatusForbidden) + return true + } + + return false +} + // ExtractBadge retrieves the badge from headers. func ExtractBadge(r *http.Request) string { // 1. X-Capiscio-Badge @@ -932,3 +1237,30 @@ func (p *pep) emitExecutionCompleted(traceID, txnID, subjectDID string, outcome } p.emitter.EmitExecutionCompleted(mctx, outcome, durationMs) } + +// emitExecutionAborted emits an execution.aborted event for early terminations (RFC-011 §7.1). +func (p *pep) emitExecutionAborted(traceID, txnID, subjectDID, errorCode, reason string, durationMs int64) { + if p.emitter == nil { + return + } + mctx := &mediation.Context{ + TraceID: traceID, + TxnID: txnID, + SubjectDID: subjectDID, + } + p.emitter.EmitExecutionAborted(mctx, reason, errorCode, durationMs) +} + +// outcomeFromStatus derives the RFC-011 outcome field from an HTTP status code. +func outcomeFromStatus(code int) string { + switch { + case code >= 200 && code < 300: + return "success" + case code >= 400 && code < 500: + return "client_error" + case code >= 500: + return "server_error" + default: + return "unknown" + } +} diff --git a/pkg/mediation/emitter.go b/pkg/mediation/emitter.go index 43d91f1..a9c2368 100644 --- a/pkg/mediation/emitter.go +++ b/pkg/mediation/emitter.go @@ -374,6 +374,20 @@ func (e *AsyncEmitter) EmitExecutionCompleted(mctx *Context, outcome string, dur e.emit(event) } +// EmitExecutionAborted emits an execution.aborted event for early terminations. +func (e *AsyncEmitter) EmitExecutionAborted(mctx *Context, reason, errorCode string, durationMs int64) { + event := NewEvent(EventExecutionAborted, e.emitter) + if mctx != nil { + event.WithContext(mctx.TraceID, mctx.TxnID, mctx.HopID) + } + + event.WithPayload("reason", reason) + event.WithPayload("error_code", errorCode) + event.WithPayload("duration_ms", durationMs) + + e.emit(event) +} + // EmitTrustRevocationChecked emits a trust.revocation.checked event. func (e *AsyncEmitter) EmitTrustRevocationChecked(mctx *Context, jti string, revoked bool, cacheAgeSeconds int) { event := NewEvent(EventTrustRevocationChecked, e.emitter) From 41b700c3e07d1d6e46753d500a0c2d0b472e2d60 Mon Sep 17 00:00:00 2001 From: Beon de Nood Date: Wed, 27 May 2026 16:06:12 -0400 Subject: [PATCH 5/6] fix(gateway): remove unused functions and reduce cyclomatic complexity - Remove old non-WithState handler functions (handleBreakGlass, evaluatePolicy, handleCachedDecision, handlePDPUnavailable, handlePDPDeny, enforceObligations) now that serveHTTP uses the WithState variants exclusively - Extract verifyBadge helper to reduce serveHTTP complexity from 16 to under 15 (gocyclo limit) --- pkg/gateway/middleware.go | 287 ++++---------------------------------- 1 file changed, 30 insertions(+), 257 deletions(-) diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index a835192..f3b4b70 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -2,6 +2,7 @@ package gateway import ( + "context" "crypto" "encoding/json" "errors" @@ -192,6 +193,30 @@ func NewPolicyMiddleware(verifier *badge.Verifier, config PEPConfig, next http.H return http.HandlerFunc(p.serveHTTP) } +// verifyBadge verifies the badge token using LocalVerifier or Verifier. +// Returns claims if valid, or nil with an error message for failure. +func (p *pep) verifyBadge(ctx context.Context, token string) (*badge.Claims, string) { + if p.localVerifier != nil { + result, err := p.localVerifier.Verify(ctx, token) + if err != nil { + p.logger.WarnContext(ctx, "local badge verification failed", slog.String("error", err.Error())) + return nil, err.Error() + } + // Log freshness warnings for stale material + if result.Freshness == trust.FreshnessStateStale { + p.logger.WarnContext(ctx, "badge verified with stale trust material", + slog.String("subject", result.Claims.Subject)) + } + return result.Claims, "" + } + claims, err := p.verifier.Verify(ctx, token) + if err != nil { + p.logger.WarnContext(ctx, "badge verification failed", slog.String("error", err.Error())) + return nil, err.Error() + } + return claims, "" +} + // serveHTTP implements the PEP request flow: authenticate → break-glass → cache → PDP → enforce. // RFC-011 §7.1: Emits execution.started at entry and execution.completed or execution.aborted at exit. func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { @@ -234,33 +259,11 @@ func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { return } - // RFC-001 §2.3: Use LocalVerifier when available (no network calls). - // Falls back to network-dependent Verifier when TrustMaterial not bootstrapped. - var claims *badge.Claims - var err error - if p.localVerifier != nil { - result, verifyErr := p.localVerifier.Verify(r.Context(), token) - if verifyErr != nil { - p.logger.WarnContext(r.Context(), "local badge verification failed", - slog.String("error", verifyErr.Error())) - p.emitIdentityInvalid(traceID, txnID, "", "VERIFICATION_FAILED", verifyErr.Error()) - http.Error(sw, "Invalid Trust Badge", http.StatusUnauthorized) - return - } - claims = result.Claims - // Log freshness warnings for stale material - if result.Freshness == trust.FreshnessStateStale { - p.logger.WarnContext(r.Context(), "badge verified with stale trust material", - slog.String("subject", claims.Subject)) - } - } else { - claims, err = p.verifier.Verify(r.Context(), token) - if err != nil { - p.logger.WarnContext(r.Context(), "badge verification failed", slog.String("error", err.Error())) - p.emitIdentityInvalid(traceID, txnID, "", "VERIFICATION_FAILED", err.Error()) - http.Error(sw, "Invalid Trust Badge", http.StatusUnauthorized) - return - } + claims, errMsg := p.verifyBadge(r.Context(), token) + if claims == nil { + p.emitIdentityInvalid(traceID, txnID, "", "VERIFICATION_FAILED", errMsg) + http.Error(sw, "Invalid Trust Badge", http.StatusUnauthorized) + return } // RFC-011 §5.1: Emit identity.verified event. @@ -563,31 +566,6 @@ func ChainErrorHTTPStatus(code string) int { // handleBreakGlass checks for a valid break-glass override token. // Returns true if the request was handled (break-glass token was valid). -func (p *pep) handleBreakGlass(w http.ResponseWriter, r *http.Request, pipReq *pip.DecisionRequest) bool { - if p.bgValidator == nil { - return false - } - - bgToken := extractBreakGlass(r, p.bgValidator) - if bgToken == nil { - return false - } - - p.logger.WarnContext(r.Context(), "break-glass override active", - slog.String(pip.TelemetryOverrideJTI, bgToken.JTI), - slog.String("operator", bgToken.SUB), - slog.String("reason", bgToken.Reason)) - - emitPolicyEvent(p.callbacks, PolicyEvent{ - Decision: pip.DecisionAllow, - DecisionID: "breakglass:" + bgToken.JTI, - Override: true, - OverrideJTI: bgToken.JTI, - }, pipReq) - p.next.ServeHTTP(w, r) - return true -} - // handleBreakGlassWithState handles break-glass override with execution state tracking. // RFC-011: Execution events are handled by the caller's defer, not here. func (p *pep) handleBreakGlassWithState(w http.ResponseWriter, r *http.Request, pipReq *pip.DecisionRequest, _ *executionState) bool { @@ -616,73 +594,6 @@ func (p *pep) handleBreakGlassWithState(w http.ResponseWriter, r *http.Request, return true } -// evaluatePolicy handles cache lookup, PDP query, decision enforcement, and obligations. -func (p *pep) evaluatePolicy(w http.ResponseWriter, r *http.Request, claims *badge.Claims, pipReq *pip.DecisionRequest, traceID, txnID string) { - cacheKey := pip.CacheKeyComponents(claims.Subject, claims.JTI, pipReq.Action.Operation, pipReq.Resource.Identifier) - event := PolicyEvent{} - - // --- 5. Check cache --- - if p.handleCachedDecision(w, r, cacheKey, &event, pipReq, traceID, txnID, claims.Subject) { - return - } - - // --- 6. Query PDP --- - start := time.Now() - resp, pdpErr := p.config.PDPClient.Evaluate(r.Context(), pipReq) - event.PDPLatencyMs = time.Since(start).Milliseconds() - - if pdpErr != nil { - p.logger.ErrorContext(r.Context(), "PDP unavailable", - slog.String(pip.TelemetryErrorCode, pip.ErrorCodePDPUnavailable), - slog.String("error", pdpErr.Error()), - slog.String("enforcement_mode", p.config.EnforcementMode.String())) - p.handlePDPUnavailable(w, r, &event, pipReq) - return - } - - // Validate PDP response: Decision must be ALLOW or DENY, DecisionID must be non-empty. - // A non-compliant response is treated as PDP unavailability (fail-closed except EM-OBSERVE). - if !pip.ValidDecision(resp.Decision) || resp.DecisionID == "" { - p.logger.ErrorContext(r.Context(), "PDP returned non-compliant response", - slog.String("decision", resp.Decision), - slog.String("decision_id", resp.DecisionID)) - p.handlePDPUnavailable(w, r, &event, pipReq) - return - } - - event.Decision = resp.Decision - event.DecisionID = resp.DecisionID - event.Obligations = obligationTypes(resp.Obligations) - - // --- 7. Cache the response --- - if p.config.DecisionCache != nil { - maxTTL := time.Until(time.Unix(claims.Expiry, 0)) - if maxTTL > 0 { - p.config.DecisionCache.Put(cacheKey, resp, maxTTL) - } - } - - // --- 8. Enforce decision --- - if resp.Decision == pip.DecisionDeny { - p.handlePDPDeny(w, r, resp, &event, pipReq) - return - } - - // --- 9. Handle obligations --- - if p.enforceObligations(w, r, resp.Obligations, &event, pipReq) { - return - } - - // RFC-011 §5.4: Emit execution lifecycle events for PDP mode - execStart := time.Now() - p.emitExecutionStarted(traceID, txnID, claims.Subject, nil) - - emitPolicyEvent(p.callbacks, event, pipReq) - p.next.ServeHTTP(w, r) - - p.emitExecutionCompleted(traceID, txnID, claims.Subject, "success", time.Since(execStart).Milliseconds()) -} - // evaluatePolicyWithState handles policy evaluation with execution state tracking. // RFC-011: Execution events are handled by the caller's defer, not here. func (p *pep) evaluatePolicyWithState(w http.ResponseWriter, r *http.Request, claims *badge.Claims, pipReq *pip.DecisionRequest, traceID, txnID string, execState *executionState) { @@ -754,144 +665,6 @@ func (p *pep) evaluatePolicyWithState(w http.ResponseWriter, r *http.Request, cl p.next.ServeHTTP(w, r) } -// handleCachedDecision serves a cached PDP decision if available. -// Returns true if the request was handled from cache. -func (p *pep) handleCachedDecision(w http.ResponseWriter, r *http.Request, cacheKey string, event *PolicyEvent, pipReq *pip.DecisionRequest, traceID, txnID, subjectDID string) bool { - if p.config.DecisionCache == nil { - return false - } - - cached, ok := p.config.DecisionCache.Get(cacheKey) - if !ok { - return false - } - - event.Decision = cached.Decision - event.DecisionID = cached.DecisionID - event.CacheHit = true - event.Obligations = obligationTypes(cached.Obligations) - - if cached.Decision == pip.DecisionDeny { - if p.config.EnforcementMode == pip.EMObserve { - p.logger.InfoContext(r.Context(), "cached PDP DENY in EM-OBSERVE (allowing)", - slog.String(pip.TelemetryDecisionID, cached.DecisionID)) - event.Decision = pip.DecisionObserve - emitPolicyEvent(p.callbacks, *event, pipReq) - // RFC-011: emit execution events for cached EM-OBSERVE - execStart := time.Now() - p.emitExecutionStarted(traceID, txnID, subjectDID, nil) - p.next.ServeHTTP(w, r) - p.emitExecutionCompleted(traceID, txnID, subjectDID, "success", time.Since(execStart).Milliseconds()) - return true - } - reason := cached.Reason - if reason == "" { - reason = "Access denied by policy" - } - emitPolicyEvent(p.callbacks, *event, pipReq) - http.Error(w, reason, http.StatusForbidden) - return true - } - - if p.config.ObligationReg != nil && len(cached.Obligations) > 0 { - oblResult := p.config.ObligationReg.Enforce(r.Context(), p.config.EnforcementMode, cached.Obligations) - if !oblResult.Proceed { - event.Decision = pip.DecisionDeny - emitPolicyEvent(p.callbacks, *event, pipReq) - http.Error(w, "Access denied: obligation enforcement failed", http.StatusForbidden) - return true - } - } - - // RFC-011: emit execution events for cached ALLOW - execStart := time.Now() - p.emitExecutionStarted(traceID, txnID, subjectDID, nil) - - emitPolicyEvent(p.callbacks, *event, pipReq) - p.next.ServeHTTP(w, r) - - p.emitExecutionCompleted(traceID, txnID, subjectDID, "success", time.Since(execStart).Milliseconds()) - return true -} - -// handlePDPUnavailable handles PDP unreachability per enforcement mode (RFC-005 §7.4). -func (p *pep) handlePDPUnavailable(w http.ResponseWriter, r *http.Request, event *PolicyEvent, pipReq *pip.DecisionRequest) { - event.ErrorCode = pip.ErrorCodePDPUnavailable - - if p.config.EnforcementMode == pip.EMObserve { - event.Decision = pip.DecisionObserve - event.DecisionID = "pdp-unavailable" - emitPolicyEvent(p.callbacks, *event, pipReq) - p.next.ServeHTTP(w, r) - return - } - - // EM-GUARD, EM-DELEGATE, EM-STRICT: fail-closed - event.Decision = pip.DecisionDeny - event.DecisionID = "pdp-unavailable" - emitPolicyEvent(p.callbacks, *event, pipReq) - http.Error(w, "Access denied: policy service unavailable", http.StatusForbidden) -} - -// handlePDPDeny handles a DENY decision from the PDP per enforcement mode. -func (p *pep) handlePDPDeny(w http.ResponseWriter, r *http.Request, resp *pip.DecisionResponse, event *PolicyEvent, pipReq *pip.DecisionRequest) { - switch p.config.EnforcementMode { - case pip.EMObserve: - p.logger.InfoContext(r.Context(), "PDP DENY in EM-OBSERVE (allowing)", - slog.String(pip.TelemetryDecisionID, resp.DecisionID)) - event.Decision = pip.DecisionObserve - emitPolicyEvent(p.callbacks, *event, pipReq) - p.next.ServeHTTP(w, r) - default: - emitPolicyEvent(p.callbacks, *event, pipReq) - - // RFC-008: SCOPE_INSUFFICIENT returns structured JSON 403 - if resp.ErrorCode == pip.ErrorCodeScopeInsufficient { - var presentedCap, envelopeID string - if pipReq.Action.CapabilityClass != nil { - presentedCap = *pipReq.Action.CapabilityClass - } - if pipReq.Context.EnvelopeID != nil { - envelopeID = *pipReq.Context.EnvelopeID - } - body := envelope.NewScopeInsufficientRejection( - resp.RequestedCapability, - presentedCap, - envelopeID, - pipReq.Context.TxnID, - ) - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusForbidden) - _ = json.NewEncoder(w).Encode(body) - return - } - - reason := "Access denied by policy" - if resp.Reason != "" { - reason = resp.Reason - } - http.Error(w, reason, http.StatusForbidden) - } -} - -// enforceObligations attempts to enforce obligations from the PDP response. -// Returns true if the request was denied due to obligation failure. -func (p *pep) enforceObligations(w http.ResponseWriter, r *http.Request, obligations []pip.Obligation, event *PolicyEvent, pipReq *pip.DecisionRequest) bool { - if p.config.ObligationReg == nil || len(obligations) == 0 { - return false - } - - oblResult := p.config.ObligationReg.Enforce(r.Context(), p.config.EnforcementMode, obligations) - if !oblResult.Proceed { - event.Decision = pip.DecisionDeny - emitPolicyEvent(p.callbacks, *event, pipReq) - http.Error(w, "Access denied: obligation enforcement failed", http.StatusForbidden) - return true - } - - return false -} - // handleCachedDecisionWithState serves a cached PDP decision with execution state tracking. // RFC-011: Execution events are handled by the caller's defer, not here. func (p *pep) handleCachedDecisionWithState(w http.ResponseWriter, r *http.Request, cacheKey string, event *PolicyEvent, pipReq *pip.DecisionRequest, execState *executionState) bool { From 87af836853aaef61a89885a19d2ec162106eb19c Mon Sep 17 00:00:00 2001 From: Beon de Nood Date: Wed, 27 May 2026 16:40:41 -0400 Subject: [PATCH 6/6] fix(gateway): correct EM-OBSERVE execution lifecycle and rename test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix: Only set execState.aborted when enforcement mode actually blocks. In EM-OBSERVE mode, chain errors allow the request through, so we should NOT emit execution.aborted (it's not an abort). - Rename misleading test: TestEventEmission_ExecutionAborted_BadgeVerificationFailed → TestEventEmission_IdentityInvalid_BadgeVerificationFailed (reflects what the test actually verifies) --- pkg/gateway/event_emission_test.go | 2 +- pkg/gateway/middleware.go | 19 +++++++++++++------ 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/pkg/gateway/event_emission_test.go b/pkg/gateway/event_emission_test.go index d0fd47a..d1412e5 100644 --- a/pkg/gateway/event_emission_test.go +++ b/pkg/gateway/event_emission_test.go @@ -347,7 +347,7 @@ func TestEventEmission_NoEmitter(t *testing.T) { assert.True(t, called) } -func TestEventEmission_ExecutionAborted_BadgeVerificationFailed(t *testing.T) { +func TestEventEmission_IdentityInvalid_BadgeVerificationFailed(t *testing.T) { // Setup keys but sign with different key to trigger verification failure pub, _, err := ed25519.GenerateKey(rand.Reader) require.NoError(t, err) diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index f3b4b70..e43dd35 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -287,9 +287,13 @@ func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { if err != nil { // Get capability from request header for authority.denied event capability := r.Header.Get("X-Capiscio-Capability-Class") - execState.aborted = true - execState.abortCode = "CHAIN_VERIFICATION_FAILED" - execState.abortMsg = err.Error() + // Only mark as aborted if enforcement mode blocks (not EM-OBSERVE). + // EM-OBSERVE allows the request through, so it's not an abort. + if p.config.EnforcementMode != pip.EMObserve { + execState.aborted = true + execState.abortCode = "CHAIN_VERIFICATION_FAILED" + execState.abortMsg = err.Error() + } p.handleChainError(sw, r, err, traceID, txnID, claims.Subject, capability) return } @@ -306,9 +310,12 @@ func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { ) chainErr := envelope.NewError(envelope.ErrCodeBadgeBindingFailed, fmt.Sprintf("leaf envelope subject_did %q does not match authenticated caller %q", leafSubject, claims.Subject)) - execState.aborted = true - execState.abortCode = envelope.ErrCodeBadgeBindingFailed - execState.abortMsg = chainErr.Message + // Only mark as aborted if enforcement mode blocks + if p.config.EnforcementMode != pip.EMObserve { + execState.aborted = true + execState.abortCode = envelope.ErrCodeBadgeBindingFailed + execState.abortMsg = chainErr.Message + } // handleChainError emits authority.denied, so no explicit emit here p.handleChainError(sw, r, chainErr, traceID, txnID, claims.Subject, chainResult.LeafCapability) return