diff --git a/pkg/gateway/event_emission_test.go b/pkg/gateway/event_emission_test.go new file mode 100644 index 0000000..d1412e5 --- /dev/null +++ b/pkg/gateway/event_emission_test.go @@ -0,0 +1,524 @@ +// Copyright (c) CapiscIO, Inc. +// Licensed under the MIT License. + +package gateway_test + +import ( + "context" + "crypto/ed25519" + "crypto/rand" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/capiscio/capiscio-core/v2/pkg/badge" + "github.com/capiscio/capiscio-core/v2/pkg/gateway" + "github.com/capiscio/capiscio-core/v2/pkg/mediation" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// mockEventSink captures emitted events for testing. +type mockEventSink struct { + mu sync.Mutex + events []*mediation.Event +} + +func (m *mockEventSink) Receive(event *mediation.Event) { + m.mu.Lock() + defer m.mu.Unlock() + m.events = append(m.events, event) +} + +func (m *mockEventSink) Flush(ctx context.Context) error { + return nil +} + +func (m *mockEventSink) Close() error { + return nil +} + +func (m *mockEventSink) Events() []*mediation.Event { + m.mu.Lock() + defer m.mu.Unlock() + return append([]*mediation.Event{}, m.events...) +} + +func (m *mockEventSink) EventTypes() []mediation.EventType { + m.mu.Lock() + defer m.mu.Unlock() + types := make([]mediation.EventType, len(m.events)) + for i, e := range m.events { + types[i] = e.EventType + } + return types +} + +func (m *mockEventSink) Reset() { + m.mu.Lock() + defer m.mu.Unlock() + m.events = nil +} + +// HasEventType returns true if an event of the given type has been received. +func (m *mockEventSink) HasEventType(eventType mediation.EventType) bool { + m.mu.Lock() + defer m.mu.Unlock() + for _, e := range m.events { + if e.EventType == eventType { + return true + } + } + return false +} + +// EventCount returns the number of events received. +func (m *mockEventSink) EventCount() int { + m.mu.Lock() + defer m.mu.Unlock() + return len(m.events) +} + +func TestEventEmission_IdentityVerified(t *testing.T) { + // Setup keys and verifier + pub, priv, err := ed25519.GenerateKey(rand.Reader) + require.NoError(t, err) + + reg := &MockRegistry{Key: pub} + verifier := badge.NewVerifier(reg) + + // Create event sink and emitter + sink := &mockEventSink{} + emitter := mediation.NewAsyncEmitter(mediation.AsyncEmitterConfig{ + ComponentID: "test-gateway", + ComponentType: mediation.ComponentGateway, + Version: "test", + BufferSize: 100, + Sinks: []mediation.EventSink{sink}, + }) + defer emitter.Close() + + // Create valid badge with VC + claims := &badge.Claims{ + JTI: "test-jti-events", + Issuer: "did:web:test.capisc.io", + Subject: "did:web:test.capisc.io:agents:test-agent", + IssuedAt: time.Now().Unix(), + Expiry: time.Now().Add(1 * time.Hour).Unix(), + VC: badge.VerifiableCredential{ + Type: []string{"VerifiableCredential", "AgentIdentity"}, + CredentialSubject: badge.CredentialSubject{ + Domain: "test.example.com", + Level: "2", + }, + }, + } + + token, err := badge.SignBadge(claims, priv) + require.NoError(t, err) + + // Create handler + called := false + next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + w.WriteHeader(http.StatusOK) + }) + + // Create middleware with emitter + config := gateway.PEPConfig{ + RuntimeEmitter: emitter, + } + handler := gateway.NewPolicyMiddleware(verifier, config, next) + + // Make request + req := httptest.NewRequest("GET", "/test", nil) + req.Header.Set("X-Capiscio-Badge", token) + req.Header.Set("X-Trace-ID", "trace-123") + + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + // Wait for async event processing using deterministic polling + require.Eventually(t, func() bool { + return sink.HasEventType(mediation.EventIdentityVerified) + }, 500*time.Millisecond, 10*time.Millisecond, "identity.verified event not emitted within timeout") + + // Verify response + assert.Equal(t, http.StatusOK, w.Code) + assert.True(t, called) + + // Verify events emitted + events := sink.Events() + require.GreaterOrEqual(t, len(events), 1, "should emit at least identity.verified") + + // Find identity.verified event + var identityVerified *mediation.Event + for _, e := range events { + if e.EventType == mediation.EventIdentityVerified { + identityVerified = e + break + } + } + require.NotNil(t, identityVerified, "should emit identity.verified event") + assert.Equal(t, "trace-123", identityVerified.Context.TraceID) + assert.NotEmpty(t, identityVerified.Context.TxnID) +} + +func TestEventEmission_IdentityInvalid_MissingBadge(t *testing.T) { + // Setup keys and verifier (won't be used since badge is missing) + pub, _, err := ed25519.GenerateKey(rand.Reader) + require.NoError(t, err) + + reg := &MockRegistry{Key: pub} + verifier := badge.NewVerifier(reg) + + // Create event sink and emitter + sink := &mockEventSink{} + emitter := mediation.NewAsyncEmitter(mediation.AsyncEmitterConfig{ + ComponentID: "test-gateway", + ComponentType: mediation.ComponentGateway, + Version: "test", + BufferSize: 100, + Sinks: []mediation.EventSink{sink}, + }) + defer emitter.Close() + + // Create handler + called := false + next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + }) + + // Create middleware with emitter + config := gateway.PEPConfig{ + RuntimeEmitter: emitter, + } + handler := gateway.NewPolicyMiddleware(verifier, config, next) + + // Make request WITHOUT badge + req := httptest.NewRequest("GET", "/test", nil) + req.Header.Set("X-Trace-ID", "trace-456") + + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + // Wait for async event processing using deterministic polling + require.Eventually(t, func() bool { + return sink.HasEventType(mediation.EventIdentityInvalid) + }, 500*time.Millisecond, 10*time.Millisecond, "identity.invalid event not emitted within timeout") + + // Verify response + assert.Equal(t, http.StatusUnauthorized, w.Code) + assert.False(t, called) + + // Verify identity.invalid event emitted + events := sink.Events() + require.Len(t, events, 1) + assert.Equal(t, mediation.EventIdentityInvalid, events[0].EventType) + assert.Equal(t, "trace-456", events[0].Context.TraceID) +} + +func TestEventEmission_ExecutionLifecycle(t *testing.T) { + // Setup keys and verifier + pub, priv, err := ed25519.GenerateKey(rand.Reader) + require.NoError(t, err) + + reg := &MockRegistry{Key: pub} + verifier := badge.NewVerifier(reg) + + // Create event sink and emitter + sink := &mockEventSink{} + emitter := mediation.NewAsyncEmitter(mediation.AsyncEmitterConfig{ + ComponentID: "test-gateway", + ComponentType: mediation.ComponentGateway, + Version: "test", + BufferSize: 100, + Sinks: []mediation.EventSink{sink}, + }) + defer emitter.Close() + + // Create valid badge with VC + claims := &badge.Claims{ + JTI: "test-jti-exec", + Issuer: "did:web:test.capisc.io", + Subject: "did:web:test.capisc.io:agents:test-agent", + IssuedAt: time.Now().Unix(), + Expiry: time.Now().Add(1 * time.Hour).Unix(), + VC: badge.VerifiableCredential{ + Type: []string{"VerifiableCredential", "AgentIdentity"}, + CredentialSubject: badge.CredentialSubject{ + Domain: "test.example.com", + Level: "1", + }, + }, + } + + token, err := badge.SignBadge(claims, priv) + require.NoError(t, err) + + // Create handler that takes some time + next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(10 * time.Millisecond) + w.WriteHeader(http.StatusOK) + }) + + // Create middleware with emitter (badge-only mode, no PDP) + config := gateway.PEPConfig{ + RuntimeEmitter: emitter, + } + handler := gateway.NewPolicyMiddleware(verifier, config, next) + + // Make request + req := httptest.NewRequest("GET", "/test", nil) + req.Header.Set("X-Capiscio-Badge", token) + + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + // Wait for async event processing using deterministic polling + // Wait for execution.completed as it's the last event in the sequence + require.Eventually(t, func() bool { + return sink.HasEventType(mediation.EventExecutionCompleted) + }, 500*time.Millisecond, 10*time.Millisecond, "execution.completed event not emitted within timeout") + + // Verify response + assert.Equal(t, http.StatusOK, w.Code) + + // Verify event sequence: identity.verified, execution.started, execution.completed + eventTypes := sink.EventTypes() + require.GreaterOrEqual(t, len(eventTypes), 3, "should emit identity.verified, execution.started, execution.completed") + + assert.Contains(t, eventTypes, mediation.EventIdentityVerified) + assert.Contains(t, eventTypes, mediation.EventExecutionStarted) + assert.Contains(t, eventTypes, mediation.EventExecutionCompleted) +} + +func TestEventEmission_NoEmitter(t *testing.T) { + // Setup keys and verifier + pub, priv, err := ed25519.GenerateKey(rand.Reader) + require.NoError(t, err) + + reg := &MockRegistry{Key: pub} + verifier := badge.NewVerifier(reg) + + // Create valid badge with VC + claims := &badge.Claims{ + JTI: "test-jti-no-emitter", + Issuer: "did:web:test.capisc.io", + Subject: "did:web:test.capisc.io:agents:test-agent", + IssuedAt: time.Now().Unix(), + Expiry: time.Now().Add(1 * time.Hour).Unix(), + VC: badge.VerifiableCredential{ + Type: []string{"VerifiableCredential", "AgentIdentity"}, + CredentialSubject: badge.CredentialSubject{ + Domain: "test.example.com", + Level: "1", + }, + }, + } + + token, err := badge.SignBadge(claims, priv) + require.NoError(t, err) + + // Create handler + called := false + next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + w.WriteHeader(http.StatusOK) + }) + + // Create middleware WITHOUT emitter (nil) + config := gateway.PEPConfig{ + RuntimeEmitter: nil, // no emitter + } + handler := gateway.NewPolicyMiddleware(verifier, config, next) + + // Make request + req := httptest.NewRequest("GET", "/test", nil) + req.Header.Set("X-Capiscio-Badge", token) + + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + // Should still work without emitter + assert.Equal(t, http.StatusOK, w.Code) + assert.True(t, called) +} + +func TestEventEmission_IdentityInvalid_BadgeVerificationFailed(t *testing.T) { + // Setup keys but sign with different key to trigger verification failure + pub, _, err := ed25519.GenerateKey(rand.Reader) + require.NoError(t, err) + _, wrongPriv, err := ed25519.GenerateKey(rand.Reader) + require.NoError(t, err) + + reg := &MockRegistry{Key: pub} + verifier := badge.NewVerifier(reg) + + // Create event sink and emitter + sink := &mockEventSink{} + emitter := mediation.NewAsyncEmitter(mediation.AsyncEmitterConfig{ + ComponentID: "test-gateway", + ComponentType: mediation.ComponentGateway, + Version: "test", + BufferSize: 100, + Sinks: []mediation.EventSink{sink}, + }) + defer emitter.Close() + + // Create badge signed with WRONG key + claims := &badge.Claims{ + JTI: "test-jti-wrong-key", + Issuer: "did:web:test.capisc.io", + Subject: "did:web:test.capisc.io:agents:test-agent", + IssuedAt: time.Now().Unix(), + Expiry: time.Now().Add(1 * time.Hour).Unix(), + VC: badge.VerifiableCredential{ + Type: []string{"VerifiableCredential", "AgentIdentity"}, + CredentialSubject: badge.CredentialSubject{ + Domain: "test.example.com", + Level: "2", + }, + }, + } + + token, err := badge.SignBadge(claims, wrongPriv) + require.NoError(t, err) + + // Create handler + called := false + next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + }) + + // Create middleware with emitter + config := gateway.PEPConfig{ + RuntimeEmitter: emitter, + } + handler := gateway.NewPolicyMiddleware(verifier, config, next) + + // Make request with invalid badge + req := httptest.NewRequest("GET", "/test", nil) + req.Header.Set("X-Capiscio-Badge", token) + req.Header.Set("X-Trace-ID", "trace-bad-badge") + + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + // Wait for async event processing + require.Eventually(t, func() bool { + return sink.HasEventType(mediation.EventIdentityInvalid) + }, 500*time.Millisecond, 10*time.Millisecond, "identity.invalid event not emitted within timeout") + + // Verify response + assert.Equal(t, http.StatusUnauthorized, w.Code) + assert.False(t, called, "handler should not be called for invalid badge") + + // Verify identity.invalid event emitted (no execution events since we never authenticated) + events := sink.Events() + require.GreaterOrEqual(t, len(events), 1) + + hasIdentityInvalid := false + for _, e := range events { + if e.EventType == mediation.EventIdentityInvalid { + hasIdentityInvalid = true + // Verify error info in payload + assert.Equal(t, "VERIFICATION_FAILED", e.Payload["error_code"]) + } + } + assert.True(t, hasIdentityInvalid, "should emit identity.invalid") +} + +func TestEventEmission_OutcomeReflectsHTTPStatus(t *testing.T) { + // Setup keys and verifier + pub, priv, err := ed25519.GenerateKey(rand.Reader) + require.NoError(t, err) + + reg := &MockRegistry{Key: pub} + verifier := badge.NewVerifier(reg) + + tests := []struct { + name string + statusCode int + expectedOutcome string + }{ + {"success_200", http.StatusOK, "success"}, + {"success_201", http.StatusCreated, "success"}, + {"client_error_400", http.StatusBadRequest, "client_error"}, + {"client_error_404", http.StatusNotFound, "client_error"}, + {"server_error_500", http.StatusInternalServerError, "server_error"}, + {"server_error_503", http.StatusServiceUnavailable, "server_error"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create event sink and emitter + sink := &mockEventSink{} + emitter := mediation.NewAsyncEmitter(mediation.AsyncEmitterConfig{ + ComponentID: "test-gateway", + ComponentType: mediation.ComponentGateway, + Version: "test", + BufferSize: 100, + Sinks: []mediation.EventSink{sink}, + }) + defer emitter.Close() + + // Create valid badge + claims := &badge.Claims{ + JTI: "test-jti-" + tt.name, + Issuer: "did:web:test.capisc.io", + Subject: "did:web:test.capisc.io:agents:test-agent", + IssuedAt: time.Now().Unix(), + Expiry: time.Now().Add(1 * time.Hour).Unix(), + VC: badge.VerifiableCredential{ + Type: []string{"VerifiableCredential", "AgentIdentity"}, + CredentialSubject: badge.CredentialSubject{ + Domain: "test.example.com", + Level: "1", + }, + }, + } + + token, err := badge.SignBadge(claims, priv) + require.NoError(t, err) + + // Create handler that returns the test status code + next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(tt.statusCode) + }) + + // Create middleware with emitter + config := gateway.PEPConfig{ + RuntimeEmitter: emitter, + } + handler := gateway.NewPolicyMiddleware(verifier, config, next) + + // Make request + req := httptest.NewRequest("GET", "/test", nil) + req.Header.Set("X-Capiscio-Badge", token) + + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + // Wait for execution.completed event + require.Eventually(t, func() bool { + return sink.HasEventType(mediation.EventExecutionCompleted) + }, 500*time.Millisecond, 10*time.Millisecond, "execution.completed event not emitted within timeout") + + // Verify outcome field matches expected + events := sink.Events() + var execCompleted *mediation.Event + for _, e := range events { + if e.EventType == mediation.EventExecutionCompleted { + execCompleted = e + break + } + } + require.NotNil(t, execCompleted) + assert.Equal(t, tt.expectedOutcome, execCompleted.Payload["outcome"], + "outcome should reflect HTTP status %d", tt.statusCode) + }) + } +} diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index c173aa3..e43dd35 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -2,6 +2,7 @@ package gateway import ( + "context" "crypto" "encoding/json" "errors" @@ -16,10 +17,44 @@ import ( "github.com/capiscio/capiscio-core/v2/pkg/badge" "github.com/capiscio/capiscio-core/v2/pkg/envelope" + "github.com/capiscio/capiscio-core/v2/pkg/mediation" "github.com/capiscio/capiscio-core/v2/pkg/pip" "github.com/capiscio/capiscio-core/v2/pkg/trust" ) +// statusCapturingResponseWriter wraps http.ResponseWriter to capture the status code. +// RFC-011 §5.4: Used to derive outcome field in execution.completed events. +type statusCapturingResponseWriter struct { + http.ResponseWriter + statusCode int + written bool +} + +func (w *statusCapturingResponseWriter) WriteHeader(code int) { + if !w.written { + w.statusCode = code + w.written = true + } + w.ResponseWriter.WriteHeader(code) +} + +func (w *statusCapturingResponseWriter) Write(b []byte) (int, error) { + if !w.written { + w.statusCode = http.StatusOK + w.written = true + } + return w.ResponseWriter.Write(b) +} + +// executionState tracks the lifecycle state for RFC-011 event emission. +type executionState struct { + started bool + subjectDID string + aborted bool + abortCode string + abortMsg string +} + // NewAuthMiddleware creates a middleware that enforces Badge validity. // Deprecated: Use NewPolicyMiddleware for RFC-005 PDP integration. func NewAuthMiddleware(verifier *badge.Verifier, next http.Handler) http.Handler { @@ -79,6 +114,13 @@ type PEPConfig struct { // LocalVerifyOptions configures local verification behavior when TrustMaterial is set. LocalVerifyOptions badge.LocalVerifyOptions + + // RuntimeEmitter emits RFC-011 runtime events from the gateway. + // Per RFC-011 §4.2, event emission is non-blocking and asynchronous. + // NOTE: Default AsyncEmitter may block when buffer is full. For strict RFC-011 + // §4.2 compliance under load, configure the emitter with DropOnFull=true. + // nil = no event emission (silent mode). + RuntimeEmitter *mediation.AsyncEmitter } // defaultMaxChainDepth is the maximum chain depth per RFC-008 §9.5 RECOMMENDED. @@ -108,6 +150,7 @@ type pep struct { logger *slog.Logger bgValidator *pip.BreakGlassValidator callbacks []PolicyEventCallback + emitter *mediation.AsyncEmitter next http.Handler } @@ -116,6 +159,9 @@ type pep struct { // // When PEPConfig.TrustMaterial is set and bootstrapped, uses LocalVerifier for // badge verification (RFC-001 §2.3 compliant — no network calls on verification path). +// +// When PEPConfig.RuntimeEmitter is set, emits RFC-011 runtime events for +// identity verification, authority decisions, and request lifecycle. func NewPolicyMiddleware(verifier *badge.Verifier, config PEPConfig, next http.Handler, callbacks ...PolicyEventCallback) http.Handler { p := &pep{ verifier: verifier, @@ -123,6 +169,7 @@ func NewPolicyMiddleware(verifier *badge.Verifier, config PEPConfig, next http.H next: next, callbacks: callbacks, logger: config.Logger, + emitter: config.RuntimeEmitter, } if p.logger == nil { p.logger = slog.Default() @@ -138,45 +185,96 @@ func NewPolicyMiddleware(verifier *badge.Verifier, config PEPConfig, next http.H p.logger.Info("PEP using local-first verification (RFC-001 §2.3)") } + // Log emitter status + if p.emitter != nil { + p.logger.Info("PEP emitting RFC-011 runtime events") + } + return http.HandlerFunc(p.serveHTTP) } +// verifyBadge verifies the badge token using LocalVerifier or Verifier. +// Returns claims if valid, or nil with an error message for failure. +func (p *pep) verifyBadge(ctx context.Context, token string) (*badge.Claims, string) { + if p.localVerifier != nil { + result, err := p.localVerifier.Verify(ctx, token) + if err != nil { + p.logger.WarnContext(ctx, "local badge verification failed", slog.String("error", err.Error())) + return nil, err.Error() + } + // Log freshness warnings for stale material + if result.Freshness == trust.FreshnessStateStale { + p.logger.WarnContext(ctx, "badge verified with stale trust material", + slog.String("subject", result.Claims.Subject)) + } + return result.Claims, "" + } + claims, err := p.verifier.Verify(ctx, token) + if err != nil { + p.logger.WarnContext(ctx, "badge verification failed", slog.String("error", err.Error())) + return nil, err.Error() + } + return claims, "" +} + // serveHTTP implements the PEP request flow: authenticate → break-glass → cache → PDP → enforce. +// RFC-011 §7.1: Emits execution.started at entry and execution.completed or execution.aborted at exit. func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { + execStart := time.Now() + + // Generate trace/txn IDs for event correlation. + txnID := r.Header.Get(pip.TxnIDHeader) + if txnID == "" { + if u, err := uuid.NewV7(); err != nil { + p.logger.WarnContext(r.Context(), "UUIDv7 generation failed, falling back to v4", + slog.String("error", err.Error())) + txnID = uuid.New().String() + } else { + txnID = u.String() + } + r.Header.Set(pip.TxnIDHeader, txnID) + } + traceID := r.Header.Get("X-Trace-ID") + + // Execution state for RFC-011 lifecycle events. + // The defer ensures we always emit completion/aborted on every exit path. + execState := &executionState{} + sw := &statusCapturingResponseWriter{ResponseWriter: w, statusCode: http.StatusOK} + defer func() { + durationMs := time.Since(execStart).Milliseconds() + if execState.aborted { + p.emitExecutionAborted(traceID, txnID, execState.subjectDID, execState.abortCode, execState.abortMsg, durationMs) + } else if execState.started { + outcome := outcomeFromStatus(sw.statusCode) + p.emitExecutionCompleted(traceID, txnID, execState.subjectDID, outcome, durationMs) + } + // If !started && !aborted, no execution events emitted (pre-auth failures handled separately) + }() + // --- 1. Extract and verify badge (authentication) --- token := ExtractBadge(r) if token == "" { - http.Error(w, "Missing Trust Badge", http.StatusUnauthorized) + p.emitIdentityInvalid(traceID, txnID, "", "MISSING_BADGE", "no badge in request") + http.Error(sw, "Missing Trust Badge", http.StatusUnauthorized) return } - // RFC-001 §2.3: Use LocalVerifier when available (no network calls). - // Falls back to network-dependent Verifier when TrustMaterial not bootstrapped. - var claims *badge.Claims - var err error - if p.localVerifier != nil { - result, verifyErr := p.localVerifier.Verify(r.Context(), token) - if verifyErr != nil { - p.logger.WarnContext(r.Context(), "local badge verification failed", - slog.String("error", verifyErr.Error())) - http.Error(w, "Invalid Trust Badge", http.StatusUnauthorized) - return - } - claims = result.Claims - // Log freshness warnings for stale material - if result.Freshness == trust.FreshnessStateStale { - p.logger.WarnContext(r.Context(), "badge verified with stale trust material", - slog.String("subject", claims.Subject)) - } - } else { - claims, err = p.verifier.Verify(r.Context(), token) - if err != nil { - p.logger.WarnContext(r.Context(), "badge verification failed", slog.String("error", err.Error())) - http.Error(w, "Invalid Trust Badge", http.StatusUnauthorized) - return - } + claims, errMsg := p.verifyBadge(r.Context(), token) + if claims == nil { + p.emitIdentityInvalid(traceID, txnID, "", "VERIFICATION_FAILED", errMsg) + http.Error(sw, "Invalid Trust Badge", http.StatusUnauthorized) + return } + // RFC-011 §5.1: Emit identity.verified event. + p.emitIdentityVerified(traceID, txnID, claims) + + // Now that we have a verified identity, start the execution lifecycle. + // RFC-011 §7.1: Emit execution.started at execution boundary enter. + execState.subjectDID = claims.Subject + execState.started = true + p.emitExecutionStarted(traceID, txnID, claims.Subject, nil) + // Forward verified identity to upstream r.Header.Set("X-Capiscio-Subject", claims.Subject) r.Header.Set("X-Capiscio-Issuer", claims.Issuer) @@ -187,7 +285,16 @@ func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { var err error chainResult, err = p.verifyAuthorityChain(r, token, claims.Subject) if err != nil { - p.handleChainError(w, r, err) + // Get capability from request header for authority.denied event + capability := r.Header.Get("X-Capiscio-Capability-Class") + // Only mark as aborted if enforcement mode blocks (not EM-OBSERVE). + // EM-OBSERVE allows the request through, so it's not an abort. + if p.config.EnforcementMode != pip.EMObserve { + execState.aborted = true + execState.abortCode = "CHAIN_VERIFICATION_FAILED" + execState.abortMsg = err.Error() + } + p.handleChainError(sw, r, err, traceID, txnID, claims.Subject, capability) return } @@ -203,15 +310,25 @@ func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { ) chainErr := envelope.NewError(envelope.ErrCodeBadgeBindingFailed, fmt.Sprintf("leaf envelope subject_did %q does not match authenticated caller %q", leafSubject, claims.Subject)) - p.handleChainError(w, r, chainErr) + // Only mark as aborted if enforcement mode blocks + if p.config.EnforcementMode != pip.EMObserve { + execState.aborted = true + execState.abortCode = envelope.ErrCodeBadgeBindingFailed + execState.abortMsg = chainErr.Message + } + // handleChainError emits authority.denied, so no explicit emit here + p.handleChainError(sw, r, chainErr, traceID, txnID, claims.Subject, chainResult.LeafCapability) return } + // RFC-011 §5.2: Emit authority.granted after successful chain verification + p.emitAuthorityGranted(traceID, txnID, claims.Subject, chainResult) } } // If no PDP configured, operate in badge-only mode if p.config.PDPClient == nil { - p.next.ServeHTTP(w, r) + // RFC-011 §5.4: execution.started already emitted, completion via defer + p.next.ServeHTTP(sw, r) return } @@ -219,27 +336,19 @@ func (p *pep) serveHTTP(w http.ResponseWriter, r *http.Request) { pipReq := p.buildPIPRequest(r, claims, chainResult) // --- 4. Check break-glass override --- - if p.handleBreakGlass(w, r, pipReq) { + if p.handleBreakGlassWithState(sw, r, pipReq, execState) { return } // --- 5-9. Cache → PDP → enforce → obligations --- - p.evaluatePolicy(w, r, claims, pipReq) + p.evaluatePolicyWithState(sw, r, claims, pipReq, traceID, txnID, execState) } // buildPIPRequest constructs the PIP decision request from the HTTP request, badge claims, // and optional verified chain result (nil for badge-only requests). +// NOTE: serveHTTP must set X-Capiscio-Txn header before calling this function. func (p *pep) buildPIPRequest(r *http.Request, claims *badge.Claims, chain *envelope.ChainVerifyResult) *pip.DecisionRequest { txnID := r.Header.Get(pip.TxnIDHeader) - if txnID == "" { - if u, err := uuid.NewV7(); err != nil { - p.logger.ErrorContext(r.Context(), "failed to generate UUID v7 for txn_id", slog.String("error", err.Error())) - txnID = uuid.New().String() - } else { - txnID = u.String() - } - } - r.Header.Set(pip.TxnIDHeader, txnID) now := time.Now().UTC() nowStr := now.Format(time.RFC3339) @@ -394,7 +503,8 @@ func (p *pep) verifyAuthorityChain(r *http.Request, callerBadgeJWS string, calle // handleChainError maps envelope verification errors to HTTP responses. // Errors from chain verification are pre-PDP (RFC-008 §9.2 steps 2–8). -func (p *pep) handleChainError(w http.ResponseWriter, r *http.Request, err error) { +// RFC-011 §5.2: Emits authority.denied event on chain verification failure. +func (p *pep) handleChainError(w http.ResponseWriter, r *http.Request, err error, traceID, txnID, subjectDID, capability string) { var envErr *envelope.Error if errors.As(err, &envErr) { status := ChainErrorHTTPStatus(envErr.Code) @@ -405,6 +515,9 @@ func (p *pep) handleChainError(w http.ResponseWriter, r *http.Request, err error slog.String("enforcement_mode", p.config.EnforcementMode.String()), ) + // RFC-011 §5.2: Emit authority.denied on chain verification failure + p.emitAuthorityDenied(traceID, txnID, subjectDID, capability) + // In EM-OBSERVE, log but allow the request through (RFC-005 §6.3) if p.config.EnforcementMode == pip.EMObserve { p.logger.InfoContext(r.Context(), "chain error in EM-OBSERVE (allowing)", @@ -424,6 +537,9 @@ func (p *pep) handleChainError(w http.ResponseWriter, r *http.Request, err error } // Non-envelope error (e.g., network failure in key resolution) + // RFC-011 §5.2: Emit authority.denied for non-envelope errors too + p.emitAuthorityDenied(traceID, txnID, subjectDID, capability) + if p.config.EnforcementMode == pip.EMObserve { p.logger.WarnContext(r.Context(), "chain verification error in EM-OBSERVE (allowing)", slog.String("error", err.Error())) @@ -457,7 +573,9 @@ func ChainErrorHTTPStatus(code string) int { // handleBreakGlass checks for a valid break-glass override token. // Returns true if the request was handled (break-glass token was valid). -func (p *pep) handleBreakGlass(w http.ResponseWriter, r *http.Request, pipReq *pip.DecisionRequest) bool { +// handleBreakGlassWithState handles break-glass override with execution state tracking. +// RFC-011: Execution events are handled by the caller's defer, not here. +func (p *pep) handleBreakGlassWithState(w http.ResponseWriter, r *http.Request, pipReq *pip.DecisionRequest, _ *executionState) bool { if p.bgValidator == nil { return false } @@ -479,16 +597,18 @@ func (p *pep) handleBreakGlass(w http.ResponseWriter, r *http.Request, pipReq *p OverrideJTI: bgToken.JTI, }, pipReq) p.next.ServeHTTP(w, r) + // Break-glass is a successful path; defer will emit execution.completed return true } -// evaluatePolicy handles cache lookup, PDP query, decision enforcement, and obligations. -func (p *pep) evaluatePolicy(w http.ResponseWriter, r *http.Request, claims *badge.Claims, pipReq *pip.DecisionRequest) { +// evaluatePolicyWithState handles policy evaluation with execution state tracking. +// RFC-011: Execution events are handled by the caller's defer, not here. +func (p *pep) evaluatePolicyWithState(w http.ResponseWriter, r *http.Request, claims *badge.Claims, pipReq *pip.DecisionRequest, traceID, txnID string, execState *executionState) { cacheKey := pip.CacheKeyComponents(claims.Subject, claims.JTI, pipReq.Action.Operation, pipReq.Resource.Identifier) event := PolicyEvent{} // --- 5. Check cache --- - if p.handleCachedDecision(w, r, cacheKey, &event, pipReq) { + if p.handleCachedDecisionWithState(w, r, cacheKey, &event, pipReq, execState) { return } @@ -502,17 +622,22 @@ func (p *pep) evaluatePolicy(w http.ResponseWriter, r *http.Request, claims *bad slog.String(pip.TelemetryErrorCode, pip.ErrorCodePDPUnavailable), slog.String("error", pdpErr.Error()), slog.String("enforcement_mode", p.config.EnforcementMode.String())) - p.handlePDPUnavailable(w, r, &event, pipReq) + execState.aborted = true + execState.abortCode = pip.ErrorCodePDPUnavailable + execState.abortMsg = pdpErr.Error() + p.handlePDPUnavailableWithState(w, r, &event, pipReq, execState) return } - // Validate PDP response: Decision must be ALLOW or DENY, DecisionID must be non-empty. - // A non-compliant response is treated as PDP unavailability (fail-closed except EM-OBSERVE). + // Validate PDP response if !pip.ValidDecision(resp.Decision) || resp.DecisionID == "" { p.logger.ErrorContext(r.Context(), "PDP returned non-compliant response", slog.String("decision", resp.Decision), slog.String("decision_id", resp.DecisionID)) - p.handlePDPUnavailable(w, r, &event, pipReq) + execState.aborted = true + execState.abortCode = pip.ErrorCodePDPUnavailable + execState.abortMsg = "PDP returned non-compliant response" + p.handlePDPUnavailableWithState(w, r, &event, pipReq, execState) return } @@ -530,22 +655,26 @@ func (p *pep) evaluatePolicy(w http.ResponseWriter, r *http.Request, claims *bad // --- 8. Enforce decision --- if resp.Decision == pip.DecisionDeny { - p.handlePDPDeny(w, r, resp, &event, pipReq) + execState.aborted = true + execState.abortCode = "PDP_DENY" + execState.abortMsg = resp.Reason + p.handlePDPDenyWithState(w, r, resp, &event, pipReq, execState) return } // --- 9. Handle obligations --- - if p.enforceObligations(w, r, resp.Obligations, &event, pipReq) { + if p.enforceObligationsWithState(w, r, resp.Obligations, &event, pipReq, execState) { return } + // RFC-011: execution.completed handled by defer with captured HTTP status emitPolicyEvent(p.callbacks, event, pipReq) p.next.ServeHTTP(w, r) } -// handleCachedDecision serves a cached PDP decision if available. -// Returns true if the request was handled from cache. -func (p *pep) handleCachedDecision(w http.ResponseWriter, r *http.Request, cacheKey string, event *PolicyEvent, pipReq *pip.DecisionRequest) bool { +// handleCachedDecisionWithState serves a cached PDP decision with execution state tracking. +// RFC-011: Execution events are handled by the caller's defer, not here. +func (p *pep) handleCachedDecisionWithState(w http.ResponseWriter, r *http.Request, cacheKey string, event *PolicyEvent, pipReq *pip.DecisionRequest, execState *executionState) bool { if p.config.DecisionCache == nil { return false } @@ -566,9 +695,13 @@ func (p *pep) handleCachedDecision(w http.ResponseWriter, r *http.Request, cache slog.String(pip.TelemetryDecisionID, cached.DecisionID)) event.Decision = pip.DecisionObserve emitPolicyEvent(p.callbacks, *event, pipReq) + // EM-OBSERVE allows the request; defer handles execution.completed p.next.ServeHTTP(w, r) return true } + execState.aborted = true + execState.abortCode = "CACHED_DENY" + execState.abortMsg = cached.Reason reason := cached.Reason if reason == "" { reason = "Access denied by policy" @@ -581,6 +714,9 @@ func (p *pep) handleCachedDecision(w http.ResponseWriter, r *http.Request, cache if p.config.ObligationReg != nil && len(cached.Obligations) > 0 { oblResult := p.config.ObligationReg.Enforce(r.Context(), p.config.EnforcementMode, cached.Obligations) if !oblResult.Proceed { + execState.aborted = true + execState.abortCode = "OBLIGATION_FAILED" + execState.abortMsg = "cached obligation enforcement failed" event.Decision = pip.DecisionDeny emitPolicyEvent(p.callbacks, *event, pipReq) http.Error(w, "Access denied: obligation enforcement failed", http.StatusForbidden) @@ -588,16 +724,20 @@ func (p *pep) handleCachedDecision(w http.ResponseWriter, r *http.Request, cache } } + // Cached ALLOW; defer handles execution.completed emitPolicyEvent(p.callbacks, *event, pipReq) p.next.ServeHTTP(w, r) return true } -// handlePDPUnavailable handles PDP unreachability per enforcement mode (RFC-005 §7.4). -func (p *pep) handlePDPUnavailable(w http.ResponseWriter, r *http.Request, event *PolicyEvent, pipReq *pip.DecisionRequest) { +// handlePDPUnavailableWithState handles PDP unreachability with execution state tracking. +// RFC-011: Execution events are handled by the caller's defer, not here. +func (p *pep) handlePDPUnavailableWithState(w http.ResponseWriter, r *http.Request, event *PolicyEvent, pipReq *pip.DecisionRequest, execState *executionState) { event.ErrorCode = pip.ErrorCodePDPUnavailable if p.config.EnforcementMode == pip.EMObserve { + // EM-OBSERVE: mark as not aborted since request proceeds + execState.aborted = false event.Decision = pip.DecisionObserve event.DecisionID = "pdp-unavailable" emitPolicyEvent(p.callbacks, *event, pipReq) @@ -605,23 +745,27 @@ func (p *pep) handlePDPUnavailable(w http.ResponseWriter, r *http.Request, event return } - // EM-GUARD, EM-DELEGATE, EM-STRICT: fail-closed + // EM-GUARD, EM-DELEGATE, EM-STRICT: fail-closed (already marked aborted by caller) event.Decision = pip.DecisionDeny event.DecisionID = "pdp-unavailable" emitPolicyEvent(p.callbacks, *event, pipReq) http.Error(w, "Access denied: policy service unavailable", http.StatusForbidden) } -// handlePDPDeny handles a DENY decision from the PDP per enforcement mode. -func (p *pep) handlePDPDeny(w http.ResponseWriter, r *http.Request, resp *pip.DecisionResponse, event *PolicyEvent, pipReq *pip.DecisionRequest) { +// handlePDPDenyWithState handles a DENY decision with execution state tracking. +// RFC-011: Execution events are handled by the caller's defer, not here. +func (p *pep) handlePDPDenyWithState(w http.ResponseWriter, r *http.Request, resp *pip.DecisionResponse, event *PolicyEvent, pipReq *pip.DecisionRequest, execState *executionState) { switch p.config.EnforcementMode { case pip.EMObserve: + // EM-OBSERVE: mark as not aborted since request proceeds + execState.aborted = false p.logger.InfoContext(r.Context(), "PDP DENY in EM-OBSERVE (allowing)", slog.String(pip.TelemetryDecisionID, resp.DecisionID)) event.Decision = pip.DecisionObserve emitPolicyEvent(p.callbacks, *event, pipReq) p.next.ServeHTTP(w, r) default: + // Already marked aborted by caller emitPolicyEvent(p.callbacks, *event, pipReq) // RFC-008: SCOPE_INSUFFICIENT returns structured JSON 403 @@ -653,15 +797,18 @@ func (p *pep) handlePDPDeny(w http.ResponseWriter, r *http.Request, resp *pip.De } } -// enforceObligations attempts to enforce obligations from the PDP response. -// Returns true if the request was denied due to obligation failure. -func (p *pep) enforceObligations(w http.ResponseWriter, r *http.Request, obligations []pip.Obligation, event *PolicyEvent, pipReq *pip.DecisionRequest) bool { +// enforceObligationsWithState enforces obligations with execution state tracking. +// RFC-011: Execution events are handled by the caller's defer, not here. +func (p *pep) enforceObligationsWithState(w http.ResponseWriter, r *http.Request, obligations []pip.Obligation, event *PolicyEvent, pipReq *pip.DecisionRequest, execState *executionState) bool { if p.config.ObligationReg == nil || len(obligations) == 0 { return false } oblResult := p.config.ObligationReg.Enforce(r.Context(), p.config.EnforcementMode, obligations) if !oblResult.Proceed { + execState.aborted = true + execState.abortCode = "OBLIGATION_FAILED" + execState.abortMsg = "obligation enforcement failed" event.Decision = pip.DecisionDeny emitPolicyEvent(p.callbacks, *event, pipReq) http.Error(w, "Access denied: obligation enforcement failed", http.StatusForbidden) @@ -750,3 +897,150 @@ func emitPolicyEvent(callbacks []PolicyEventCallback, event PolicyEvent, req *pi }() } } + +// --- RFC-011 Event Emission Helpers --- + +// parseTrustLevel converts a trust level string to int (0-4). +// RFC-002 defines levels 0-4; unknown values default to 0. +func parseTrustLevel(level string) int { + switch level { + case "0": + return 0 + case "1": + return 1 + case "2": + return 2 + case "3": + return 3 + case "4": + return 4 + default: + return 0 + } +} + +// parseIAL converts an IAL string to int (0-3). +func parseIAL(ial string) int { + switch ial { + case "0": + return 0 + case "1": + return 1 + case "2": + return 2 + case "3": + return 3 + default: + return 0 + } +} + +// emitIdentityVerified emits an identity.verified event when badge verification succeeds. +func (p *pep) emitIdentityVerified(traceID, txnID string, claims *badge.Claims) { + if p.emitter == nil { + return + } + mctx := &mediation.Context{ + TraceID: traceID, + TxnID: txnID, + SubjectDID: claims.Subject, + TrustLevel: parseTrustLevel(claims.TrustLevel()), + } + p.emitter.EmitIdentityVerified(mctx, claims.JTI, parseTrustLevel(claims.TrustLevel()), parseIAL(claims.IAL)) +} + +// emitIdentityInvalid emits an identity.invalid event when badge verification fails. +func (p *pep) emitIdentityInvalid(traceID, txnID, badgeJTI, errorCode, reason string) { + if p.emitter == nil { + return + } + mctx := &mediation.Context{ + TraceID: traceID, + TxnID: txnID, + } + p.emitter.EmitIdentityInvalid(mctx, badgeJTI, errorCode, reason) +} + +// emitAuthorityGranted emits an authority.granted event when chain verification succeeds. +func (p *pep) emitAuthorityGranted(traceID, txnID, subjectDID string, chainResult *envelope.ChainVerifyResult) { + if p.emitter == nil || chainResult == nil { + return + } + mctx := &mediation.Context{ + TraceID: traceID, + TxnID: txnID, + SubjectDID: subjectDID, + } + p.emitter.EmitCapabilityCheck(mctx, chainResult.LeafCapability, true) +} + +// emitAuthorityDenied emits an authority.denied event when chain verification fails. +// Note: Error details are logged separately; this event only signals denial. +func (p *pep) emitAuthorityDenied(traceID, txnID, subjectDID, capability string) { + if p.emitter == nil { + return + } + mctx := &mediation.Context{ + TraceID: traceID, + TxnID: txnID, + SubjectDID: subjectDID, + } + // Use EmitCapabilityCheck with granted=false for denial + p.emitter.EmitCapabilityCheck(mctx, capability, false) +} + +// emitExecutionStarted emits an execution.started event at the beginning of request handling. +func (p *pep) emitExecutionStarted(traceID, txnID, subjectDID string, env *envelope.Token) { + if p.emitter == nil { + return + } + mctx := &mediation.Context{ + TraceID: traceID, + TxnID: txnID, + SubjectDID: subjectDID, + } + if env != nil { + mctx.Envelope = env + } + p.emitter.EmitExecutionStarted(mctx) +} + +// emitExecutionCompleted emits an execution.completed event at the end of request handling. +func (p *pep) emitExecutionCompleted(traceID, txnID, subjectDID string, outcome string, durationMs int64) { + if p.emitter == nil { + return + } + mctx := &mediation.Context{ + TraceID: traceID, + TxnID: txnID, + SubjectDID: subjectDID, + } + p.emitter.EmitExecutionCompleted(mctx, outcome, durationMs) +} + +// emitExecutionAborted emits an execution.aborted event for early terminations (RFC-011 §7.1). +func (p *pep) emitExecutionAborted(traceID, txnID, subjectDID, errorCode, reason string, durationMs int64) { + if p.emitter == nil { + return + } + mctx := &mediation.Context{ + TraceID: traceID, + TxnID: txnID, + SubjectDID: subjectDID, + } + p.emitter.EmitExecutionAborted(mctx, reason, errorCode, durationMs) +} + +// outcomeFromStatus derives the RFC-011 outcome field from an HTTP status code. +func outcomeFromStatus(code int) string { + switch { + case code >= 200 && code < 300: + return "success" + case code >= 400 && code < 500: + return "client_error" + case code >= 500: + return "server_error" + default: + return "unknown" + } +} diff --git a/pkg/mediation/emitter.go b/pkg/mediation/emitter.go index 43d91f1..a9c2368 100644 --- a/pkg/mediation/emitter.go +++ b/pkg/mediation/emitter.go @@ -374,6 +374,20 @@ func (e *AsyncEmitter) EmitExecutionCompleted(mctx *Context, outcome string, dur e.emit(event) } +// EmitExecutionAborted emits an execution.aborted event for early terminations. +func (e *AsyncEmitter) EmitExecutionAborted(mctx *Context, reason, errorCode string, durationMs int64) { + event := NewEvent(EventExecutionAborted, e.emitter) + if mctx != nil { + event.WithContext(mctx.TraceID, mctx.TxnID, mctx.HopID) + } + + event.WithPayload("reason", reason) + event.WithPayload("error_code", errorCode) + event.WithPayload("duration_ms", durationMs) + + e.emit(event) +} + // EmitTrustRevocationChecked emits a trust.revocation.checked event. func (e *AsyncEmitter) EmitTrustRevocationChecked(mctx *Context, jti string, revoked bool, cacheAgeSeconds int) { event := NewEvent(EventTrustRevocationChecked, e.emitter)