diff --git a/pkg/mediation/aggregator.go b/pkg/mediation/aggregator.go new file mode 100644 index 0000000..3a02b45 --- /dev/null +++ b/pkg/mediation/aggregator.go @@ -0,0 +1,469 @@ +// Copyright (c) CapiscIO, Inc. +// Licensed under the MIT License. + +package mediation + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "sync" + "time" +) + +// EventAggregator collects events from multiple runtimes for server-side processing. +// +// Per RFC-011 §4.2, events are emitted locally and forwarded asynchronously. +// The aggregator provides the server-side collection point for these events. +// +// Aggregators receive events via HTTP webhook or streaming connections. +// They do NOT participate in authorization decisions (RFC-011 §4.4). +type EventAggregator interface { + // Ingest accepts a batch of events for processing. + // Returns the number of events successfully processed. + Ingest(ctx context.Context, events []*Event) (int, error) + + // Query retrieves events matching the given criteria. + Query(ctx context.Context, query EventQuery) ([]*Event, error) + + // Subscribe creates a channel that receives events matching the filter. + // Call the returned cancel function to stop the subscription. + Subscribe(filter EventFilter) (<-chan *Event, func()) + + // Stats returns aggregator statistics. + Stats() AggregatorStats +} + +// EventQuery defines criteria for querying stored events. +type EventQuery struct { + // TraceID filters by trace identifier. + TraceID string + + // TxnID filters by transaction identifier. + TxnID string + + // SubjectDID filters by the subject (caller) DID. + SubjectDID string + + // EventTypes filters by event type. + EventTypes []EventType + + // StartTime is the earliest event timestamp to include. + StartTime time.Time + + // EndTime is the latest event timestamp to include. + EndTime time.Time + + // Limit is the maximum number of events to return. + Limit int +} + +// EventFilter defines criteria for subscribing to live events. +type EventFilter struct { + // EventTypes is the set of event types to receive. + // Empty means all types. + EventTypes []EventType + + // SubjectDID filters to events for a specific subject. + SubjectDID string + + // OrganizationID filters to events for a specific organization. + OrganizationID string +} + +// AggregatorStats contains aggregator metrics. +type AggregatorStats struct { + // EventsReceived is the total number of events received. + EventsReceived uint64 + + // EventsStored is the total number of events currently stored. + EventsStored uint64 + + // EventsDropped is the number of events dropped due to capacity. + EventsDropped uint64 + + // SubscriberCount is the number of active subscribers. + SubscriberCount int + + // LastEventTime is the timestamp of the most recent event. + LastEventTime time.Time +} + +// MemoryAggregator is an in-memory event aggregator for testing and development. +// +// Production deployments should use a persistent aggregator backed by a database +// or event streaming system (Kafka, NATS, etc.). +type MemoryAggregator struct { + events []*Event + subscribers []subscriber + maxEvents int + mu sync.RWMutex + + stats struct { + received uint64 + dropped uint64 + } +} + +type subscriber struct { + filter EventFilter + ch chan *Event +} + +// NewMemoryAggregator creates an in-memory aggregator. +func NewMemoryAggregator(maxEvents int) *MemoryAggregator { + if maxEvents <= 0 { + maxEvents = 10000 + } + return &MemoryAggregator{ + events: make([]*Event, 0, maxEvents), + maxEvents: maxEvents, + } +} + +// Ingest implements EventAggregator. +func (a *MemoryAggregator) Ingest(ctx context.Context, events []*Event) (int, error) { + a.mu.Lock() + defer a.mu.Unlock() + + processed := 0 + for _, event := range events { + if len(a.events) >= a.maxEvents { + // Drop oldest event + a.events = a.events[1:] + a.stats.dropped++ + } + a.events = append(a.events, event) + a.stats.received++ + processed++ + + // Notify subscribers + for _, sub := range a.subscribers { + if a.matchesFilter(event, sub.filter) { + select { + case sub.ch <- event: + default: + // Drop if subscriber is slow + } + } + } + } + + return processed, nil +} + +// Query implements EventAggregator. +func (a *MemoryAggregator) Query(ctx context.Context, query EventQuery) ([]*Event, error) { + a.mu.RLock() + defer a.mu.RUnlock() + + var results []*Event + limit := query.Limit + if limit <= 0 { + limit = 100 + } + + for _, event := range a.events { + if a.matchesQuery(event, query) { + results = append(results, event) + if len(results) >= limit { + break + } + } + } + + return results, nil +} + +// Subscribe implements EventAggregator. +func (a *MemoryAggregator) Subscribe(filter EventFilter) (<-chan *Event, func()) { + a.mu.Lock() + defer a.mu.Unlock() + + ch := make(chan *Event, 100) + sub := subscriber{filter: filter, ch: ch} + a.subscribers = append(a.subscribers, sub) + + cancel := func() { + a.mu.Lock() + defer a.mu.Unlock() + for i, s := range a.subscribers { + if s.ch == ch { + a.subscribers = append(a.subscribers[:i], a.subscribers[i+1:]...) + close(ch) + break + } + } + } + + return ch, cancel +} + +// Stats implements EventAggregator. +func (a *MemoryAggregator) Stats() AggregatorStats { + a.mu.RLock() + defer a.mu.RUnlock() + + var lastTime time.Time + if len(a.events) > 0 { + last := a.events[len(a.events)-1] + lastTime, _ = time.Parse("2006-01-02T15:04:05.000Z", last.Timestamp) + } + + return AggregatorStats{ + EventsReceived: a.stats.received, + EventsStored: uint64(len(a.events)), + EventsDropped: a.stats.dropped, + SubscriberCount: len(a.subscribers), + LastEventTime: lastTime, + } +} + +// matchesFilter checks if an event matches a subscription filter. +func (a *MemoryAggregator) matchesFilter(event *Event, filter EventFilter) bool { + if len(filter.EventTypes) > 0 { + matched := false + for _, et := range filter.EventTypes { + if event.EventType == et { + matched = true + break + } + } + if !matched { + return false + } + } + + if filter.SubjectDID != "" { + did, ok := event.Payload["subject_did"].(string) + if !ok || did != filter.SubjectDID { + return false // Missing or non-matching subject_did + } + } + + // NOTE: OrganizationID filter is not yet implemented. Events with any + // organization will match. Add organization_id to event payloads and + // filter here when multi-tenancy is wired up. + + return true +} + +// matchesQuery checks if an event matches a query. +func (a *MemoryAggregator) matchesQuery(event *Event, query EventQuery) bool { + return a.matchesContextQuery(event, query) && + a.matchesTypeQuery(event, query) && + a.matchesTimeQuery(event, query) +} + +// matchesContextQuery checks trace, transaction, and subject filters. +func (a *MemoryAggregator) matchesContextQuery(event *Event, query EventQuery) bool { + if query.TraceID != "" && event.Context.TraceID != query.TraceID { + return false + } + if query.TxnID != "" && event.Context.TxnID != query.TxnID { + return false + } + if query.SubjectDID != "" { + did, ok := event.Payload["subject_did"].(string) + if !ok || did != query.SubjectDID { + return false // Missing or non-matching subject_did + } + } + return true +} + +// matchesTypeQuery checks event type filters. +func (a *MemoryAggregator) matchesTypeQuery(event *Event, query EventQuery) bool { + if len(query.EventTypes) == 0 { + return true + } + for _, et := range query.EventTypes { + if event.EventType == et { + return true + } + } + return false +} + +// matchesTimeQuery checks time range filters. +func (a *MemoryAggregator) matchesTimeQuery(event *Event, query EventQuery) bool { + if query.StartTime.IsZero() && query.EndTime.IsZero() { + return true + } + eventTime, err := time.Parse("2006-01-02T15:04:05.000Z", event.Timestamp) + if err != nil { + return false + } + if !query.StartTime.IsZero() && eventTime.Before(query.StartTime) { + return false + } + if !query.EndTime.IsZero() && eventTime.After(query.EndTime) { + return false + } + return true +} + +// HTTPSink forwards events to a remote aggregator via HTTP POST. +// +// Per RFC-011 §4.2, forwarding is asynchronous and MUST NOT block mediation. +// The HTTPSink buffers events and sends them in batches. +type HTTPSink struct { + endpoint string + client *http.Client + batch []*Event + batchSize int + flushInterval time.Duration + flushTimer *time.Timer + mu sync.Mutex +} + +// HTTPSinkConfig configures the HTTP sink. +type HTTPSinkConfig struct { + // Endpoint is the aggregator HTTP endpoint (e.g., "https://api.example.com/events"). + Endpoint string + + // BatchSize is the number of events to batch before sending. + BatchSize int + + // FlushInterval is the maximum time to wait before sending a partial batch. + FlushInterval time.Duration + + // Client is the HTTP client to use. If nil, http.DefaultClient is used. + Client *http.Client +} + +// NewHTTPSink creates an HTTP event sink. +func NewHTTPSink(config HTTPSinkConfig) *HTTPSink { + if config.BatchSize <= 0 { + config.BatchSize = 50 + } + if config.Client == nil { + config.Client = &http.Client{Timeout: 10 * time.Second} + } + + s := &HTTPSink{ + endpoint: config.Endpoint, + client: config.Client, + batch: make([]*Event, 0, config.BatchSize), + batchSize: config.BatchSize, + flushInterval: config.FlushInterval, + } + + // Start flush timer if interval specified + if config.FlushInterval > 0 { + s.flushTimer = time.AfterFunc(config.FlushInterval, func() { + s.flushAsync() + }) + } + + return s +} + +// Receive implements EventSink. +func (s *HTTPSink) Receive(event *Event) { + s.mu.Lock() + defer s.mu.Unlock() + + s.batch = append(s.batch, event) + if len(s.batch) >= s.batchSize { + // Flush in background + batch := s.batch + s.batch = make([]*Event, 0, s.batchSize) + go s.sendBatch(batch) + } +} + +// Flush implements EventSink. +func (s *HTTPSink) Flush(ctx context.Context) error { + s.mu.Lock() + batch := s.batch + s.batch = make([]*Event, 0, s.batchSize) + s.mu.Unlock() + + if len(batch) > 0 { + return s.sendBatchSync(ctx, batch) + } + return nil +} + +// Close implements EventSink. +func (s *HTTPSink) Close() error { + if s.flushTimer != nil { + s.flushTimer.Stop() + } + return s.Flush(context.Background()) +} + +// flushAsync triggers an async flush. +func (s *HTTPSink) flushAsync() { + s.mu.Lock() + batch := s.batch + s.batch = make([]*Event, 0, s.batchSize) + s.mu.Unlock() + + if len(batch) > 0 { + go s.sendBatch(batch) + } + + // Restart timer with configured interval + if s.flushTimer != nil && s.flushInterval > 0 { + s.flushTimer.Reset(s.flushInterval) + } +} + +// sendBatch sends events asynchronously (fire and forget). +func (s *HTTPSink) sendBatch(batch []*Event) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + _ = s.sendBatchSync(ctx, batch) // Errors are logged; best-effort delivery +} + +// sendBatchSync sends events synchronously. +func (s *HTTPSink) sendBatchSync(ctx context.Context, batch []*Event) error { + data, err := json.Marshal(batch) + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.endpoint, nil) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Body = &nopCloser{bytes: data} + req.ContentLength = int64(len(data)) + + resp, err := s.client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + // Check response status - non-2xx is an error + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("aggregator returned status %d", resp.StatusCode) + } + + return nil +} + +// nopCloser wraps bytes for use as io.ReadCloser. +type nopCloser struct { + bytes []byte + pos int +} + +func (n *nopCloser) Read(p []byte) (int, error) { + if n.pos >= len(n.bytes) { + return 0, io.EOF + } + copied := copy(p, n.bytes[n.pos:]) + n.pos += copied + return copied, nil +} + +func (n *nopCloser) Close() error { + return nil +} diff --git a/pkg/mediation/aggregator_test.go b/pkg/mediation/aggregator_test.go new file mode 100644 index 0000000..7c19935 --- /dev/null +++ b/pkg/mediation/aggregator_test.go @@ -0,0 +1,207 @@ +// Copyright (c) CapiscIO, Inc. +// Licensed under the MIT License. + +package mediation + +import ( + "context" + "testing" + "time" +) + +func TestMemoryAggregator_Ingest(t *testing.T) { + agg := NewMemoryAggregator(100) + + events := []*Event{ + NewEvent(EventAuthorityGranted, EmitterInfo{ComponentID: "test"}), + NewEvent(EventToolPermitted, EmitterInfo{ComponentID: "test"}), + } + + n, err := agg.Ingest(context.Background(), events) + if err != nil { + t.Fatalf("Ingest failed: %v", err) + } + if n != 2 { + t.Errorf("Ingest returned %d, want 2", n) + } + + stats := agg.Stats() + if stats.EventsReceived != 2 { + t.Errorf("EventsReceived = %d, want 2", stats.EventsReceived) + } + if stats.EventsStored != 2 { + t.Errorf("EventsStored = %d, want 2", stats.EventsStored) + } +} + +func TestMemoryAggregator_Capacity(t *testing.T) { + agg := NewMemoryAggregator(3) + + // Ingest more events than capacity + for i := 0; i < 5; i++ { + event := NewEvent(EventAuthorityGranted, EmitterInfo{ComponentID: "test"}) + event.WithPayload("index", i) + _, err := agg.Ingest(context.Background(), []*Event{event}) + if err != nil { + t.Fatalf("Ingest failed: %v", err) + } + } + + stats := agg.Stats() + if stats.EventsStored != 3 { + t.Errorf("EventsStored = %d, want 3 (capacity limit)", stats.EventsStored) + } + if stats.EventsDropped != 2 { + t.Errorf("EventsDropped = %d, want 2", stats.EventsDropped) + } +} + +func TestMemoryAggregator_Query(t *testing.T) { + agg := NewMemoryAggregator(100) + + // Create events with different trace IDs + e1 := NewEvent(EventAuthorityGranted, EmitterInfo{}) + e1.WithContext("trace-1", "", "") + e1.WithPayload("subject_did", "did:web:agent1") + + e2 := NewEvent(EventToolDenied, EmitterInfo{}) + e2.WithContext("trace-2", "", "") + e2.WithPayload("subject_did", "did:web:agent2") + + e3 := NewEvent(EventAuthorityGranted, EmitterInfo{}) + e3.WithContext("trace-1", "", "") + e3.WithPayload("subject_did", "did:web:agent1") + + agg.Ingest(context.Background(), []*Event{e1, e2, e3}) + + // Query by trace ID + results, err := agg.Query(context.Background(), EventQuery{TraceID: "trace-1"}) + if err != nil { + t.Fatalf("Query failed: %v", err) + } + if len(results) != 2 { + t.Errorf("Query(trace-1) returned %d events, want 2", len(results)) + } + + // Query by event type + results, err = agg.Query(context.Background(), EventQuery{ + EventTypes: []EventType{EventToolDenied}, + }) + if err != nil { + t.Fatalf("Query failed: %v", err) + } + if len(results) != 1 { + t.Errorf("Query(EventToolDenied) returned %d events, want 1", len(results)) + } + + // Query by subject DID + results, err = agg.Query(context.Background(), EventQuery{SubjectDID: "did:web:agent1"}) + if err != nil { + t.Fatalf("Query failed: %v", err) + } + if len(results) != 2 { + t.Errorf("Query(agent1) returned %d events, want 2", len(results)) + } +} + +func TestMemoryAggregator_Subscribe(t *testing.T) { + agg := NewMemoryAggregator(100) + + // Subscribe to tool events only + ch, cancel := agg.Subscribe(EventFilter{ + EventTypes: []EventType{EventToolPermitted, EventToolDenied}, + }) + defer cancel() + + // Ingest events + events := []*Event{ + NewEvent(EventAuthorityGranted, EmitterInfo{}), // Should NOT be received + NewEvent(EventToolPermitted, EmitterInfo{}), // Should be received + NewEvent(EventToolDenied, EmitterInfo{}), // Should be received + } + agg.Ingest(context.Background(), events) + + // Collect received events + var received []*Event + timeout := time.After(100 * time.Millisecond) +loop: + for { + select { + case event := <-ch: + received = append(received, event) + case <-timeout: + break loop + } + } + + if len(received) != 2 { + t.Errorf("received %d events, want 2 (tool events only)", len(received)) + } +} + +func TestMemoryAggregator_Stats(t *testing.T) { + agg := NewMemoryAggregator(100) + + // Initial stats + stats := agg.Stats() + if stats.EventsReceived != 0 { + t.Errorf("initial EventsReceived = %d, want 0", stats.EventsReceived) + } + + // After ingesting + agg.Ingest(context.Background(), []*Event{ + NewEvent(EventAuthorityGranted, EmitterInfo{}), + }) + + stats = agg.Stats() + if stats.EventsReceived != 1 { + t.Errorf("EventsReceived = %d, want 1", stats.EventsReceived) + } + + // Subscribe and check count + ch, cancel := agg.Subscribe(EventFilter{}) + stats = agg.Stats() + if stats.SubscriberCount != 1 { + t.Errorf("SubscriberCount = %d, want 1", stats.SubscriberCount) + } + cancel() + // Drain channel after cancel + for range ch { + } + + stats = agg.Stats() + if stats.SubscriberCount != 0 { + t.Errorf("after cancel SubscriberCount = %d, want 0", stats.SubscriberCount) + } +} + +func TestEventQuery_TimeRange(t *testing.T) { + agg := NewMemoryAggregator(100) + + // Create events with specific timestamps + e1 := NewEvent(EventAuthorityGranted, EmitterInfo{}) + e1.Timestamp = "2026-05-27T10:00:00.000Z" + + e2 := NewEvent(EventAuthorityGranted, EmitterInfo{}) + e2.Timestamp = "2026-05-27T12:00:00.000Z" + + e3 := NewEvent(EventAuthorityGranted, EmitterInfo{}) + e3.Timestamp = "2026-05-27T14:00:00.000Z" + + agg.Ingest(context.Background(), []*Event{e1, e2, e3}) + + // Query with time range + startTime, _ := time.Parse("2006-01-02T15:04:05.000Z", "2026-05-27T11:00:00.000Z") + endTime, _ := time.Parse("2006-01-02T15:04:05.000Z", "2026-05-27T13:00:00.000Z") + + results, err := agg.Query(context.Background(), EventQuery{ + StartTime: startTime, + EndTime: endTime, + }) + if err != nil { + t.Fatalf("Query failed: %v", err) + } + if len(results) != 1 { + t.Errorf("Query with time range returned %d events, want 1", len(results)) + } +} diff --git a/pkg/mediation/doc.go b/pkg/mediation/doc.go index 43a2030..9af89a1 100644 --- a/pkg/mediation/doc.go +++ b/pkg/mediation/doc.go @@ -31,6 +31,24 @@ // decisions. Events are evidence of what happened — they do not drive decisions. // Event emission is asynchronous and MUST NOT block the mediation path. // +// Event types follow the RFC-011 taxonomy: +// +// - Identity events: identity.verified, identity.invalid, identity.expired +// - Authority events: authority.granted, authority.denied, authority.delegated +// - Tool events: tool.requested, tool.permitted, tool.denied, tool.executed +// - Resource events: resource.filesystem.*, resource.network.*, resource.shell.* +// - Trust events: trust.signature.*, trust.revocation.checked +// +// Events are delivered to EventSink implementations which handle buffering, +// batching, and forwarding to aggregators. The package provides: +// +// - AsyncEmitter — non-blocking emitter with channel-based buffering +// - ChannelSink — delivers events to a Go channel +// - JSONSink — writes events as newline-delimited JSON +// - BatchSink — batches events before forwarding +// - HTTPSink — forwards events to a remote aggregator via HTTP +// - MemoryAggregator — in-memory aggregator for testing +// // # Usage // // Gateway middleware injects trust material during bootstrap: diff --git a/pkg/mediation/emitter.go b/pkg/mediation/emitter.go new file mode 100644 index 0000000..43d91f1 --- /dev/null +++ b/pkg/mediation/emitter.go @@ -0,0 +1,620 @@ +// Copyright (c) CapiscIO, Inc. +// Licensed under the MIT License. + +package mediation + +import ( + "context" + "encoding/json" + "sync" + "time" +) + +// EventSink receives emitted events for processing. +// +// Implementations handle event persistence, forwarding, or aggregation. +// All methods MUST be safe for concurrent use. +type EventSink interface { + // Receive processes an event. MUST NOT block. + Receive(event *Event) + + // Flush ensures all buffered events are processed. + // Called during graceful shutdown. + Flush(ctx context.Context) error + + // Close releases resources held by the sink. + Close() error +} + +// AsyncEmitter implements EventEmitter with non-blocking event emission. +// +// Per RFC-011 §4.2 and §7.3, event emission MUST NOT block on network I/O. +// Events are buffered locally and forwarded asynchronously to sinks. +type AsyncEmitter struct { + emitter EmitterInfo + sinks []EventSink + eventCh chan *Event + doneCh chan struct{} + wg sync.WaitGroup + mu sync.RWMutex + closed bool + + // bufferSize is the channel buffer capacity. + bufferSize int + + // dropOnFull determines behavior when buffer is full. + // If true, new events are dropped; if false, oldest events are dropped. + dropOnFull bool + + // droppedCount tracks events dropped due to buffer overflow. + droppedCount uint64 +} + +// AsyncEmitterConfig configures the async emitter. +type AsyncEmitterConfig struct { + // ComponentID identifies this emitter instance. + ComponentID string + + // ComponentType is one of: sdk, sidecar, gateway, pep. + ComponentType ComponentType + + // Version is the component version string. + Version string + + // BufferSize is the event channel capacity. Default: 1000. + BufferSize int + + // DropOnFull, if true, drops new events when buffer is full. + // If false (default), emits events synchronously as fallback. + DropOnFull bool + + // Sinks are the event receivers. + Sinks []EventSink +} + +// NewAsyncEmitter creates an async event emitter. +func NewAsyncEmitter(config AsyncEmitterConfig) *AsyncEmitter { + bufferSize := config.BufferSize + if bufferSize <= 0 { + bufferSize = 1000 + } + + e := &AsyncEmitter{ + emitter: EmitterInfo{ + ComponentID: config.ComponentID, + ComponentType: config.ComponentType, + Version: config.Version, + }, + sinks: config.Sinks, + eventCh: make(chan *Event, bufferSize), + doneCh: make(chan struct{}), + bufferSize: bufferSize, + dropOnFull: config.DropOnFull, + } + + // Start the event processing goroutine + e.wg.Add(1) + go e.processEvents() + + return e +} + +// processEvents runs in a goroutine, forwarding events to sinks. +func (e *AsyncEmitter) processEvents() { + defer e.wg.Done() + + for { + select { + case event := <-e.eventCh: + e.deliverToSinks(event) + case <-e.doneCh: + // Drain remaining events + for { + select { + case event := <-e.eventCh: + e.deliverToSinks(event) + default: + return + } + } + } + } +} + +// deliverToSinks sends an event to all registered sinks. +func (e *AsyncEmitter) deliverToSinks(event *Event) { + e.mu.RLock() + sinks := e.sinks + e.mu.RUnlock() + + for _, sink := range sinks { + sink.Receive(event) + } +} + +// emit sends an event to the processing channel. +func (e *AsyncEmitter) emit(event *Event) { + e.mu.RLock() + closed := e.closed + e.mu.RUnlock() + + if closed { + return + } + + select { + case e.eventCh <- event: + // Event queued successfully + default: + // Buffer full + if e.dropOnFull { + e.mu.Lock() + e.droppedCount++ + e.mu.Unlock() + } else { + // Synchronous fallback - deliver directly. + // WARNING: This CAN block the mediation path if sinks are slow. + // For strict RFC-011 §4.2 compliance, use DropOnFull=true. + e.deliverToSinks(event) + } + } +} + +// EmitDecision implements EventEmitter. +func (e *AsyncEmitter) EmitDecision(mctx *Context, result *Result, request Request) { + var eventType EventType + switch result.Decision { + case DecisionAllow: + eventType = e.allowEventType(request) + case DecisionDeny: + eventType = e.denyEventType(request) + case DecisionDelegate: + eventType = EventAuthorityDelegated + default: + return + } + + event := NewEvent(eventType, e.emitter) + if mctx != nil { + event.WithContext(mctx.TraceID, mctx.TxnID, mctx.HopID) + event.WithPayload("subject_did", mctx.SubjectDID) + event.WithPayload("trust_level", mctx.TrustLevel) + if mctx.Envelope != nil { + event.WithPayload("envelope_hash", HashForEvent(mctx.Envelope.Raw)) + } + } + + event.WithPayload("decision", string(result.Decision)) + event.WithPayload("reason", result.Reason) + event.WithPayload("policy_ref", result.PolicyRef) + + if request != nil { + event.WithPayload("domain", request.Domain()) + event.WithPayload("capability", request.Capability()) + } + + e.emit(event) +} + +// EmitCapabilityCheck implements EventEmitter. +func (e *AsyncEmitter) EmitCapabilityCheck(mctx *Context, capability string, granted bool) { + var eventType EventType + if granted { + eventType = EventAuthorityGranted + } else { + eventType = EventAuthorityDenied + } + + event := NewEvent(eventType, e.emitter) + if mctx != nil { + event.WithContext(mctx.TraceID, mctx.TxnID, mctx.HopID) + event.WithPayload("subject_did", mctx.SubjectDID) + } + + event.WithPayload("capability", capability) + event.WithPayload("granted", granted) + + e.emit(event) +} + +// EmitIdentityVerified emits an identity.verified event. +func (e *AsyncEmitter) EmitIdentityVerified(mctx *Context, badgeJTI string, trustLevel, ial int) { + event := NewEvent(EventIdentityVerified, e.emitter) + if mctx != nil { + event.WithContext(mctx.TraceID, mctx.TxnID, mctx.HopID) + event.WithPayload("subject_did", mctx.SubjectDID) + } + + event.WithPayload("badge_jti", badgeJTI) + event.WithPayload("trust_level", trustLevel) + event.WithPayload("ial", ial) + + e.emit(event) +} + +// EmitIdentityInvalid emits an identity.invalid event. +func (e *AsyncEmitter) EmitIdentityInvalid(mctx *Context, badgeJTI, errorCode, reason string) { + event := NewEvent(EventIdentityInvalid, e.emitter) + if mctx != nil { + event.WithContext(mctx.TraceID, mctx.TxnID, mctx.HopID) + } + + event.WithPayload("badge_jti", badgeJTI) + event.WithPayload("error_code", errorCode) + event.WithPayload("reason", reason) + + e.emit(event) +} + +// EmitToolRequested emits a tool.requested event. +func (e *AsyncEmitter) EmitToolRequested(mctx *Context, toolName, serverDID string) { + event := NewEvent(EventToolRequested, e.emitter) + if mctx != nil { + event.WithContext(mctx.TraceID, mctx.TxnID, mctx.HopID) + event.WithPayload("subject_did", mctx.SubjectDID) + if mctx.Envelope != nil { + event.WithPayload("envelope_hash", HashForEvent(mctx.Envelope.Raw)) + } + } + + event.WithPayload("tool_name", toolName) + event.WithPayload("server_did", serverDID) + + e.emit(event) +} + +// EmitToolExecuted emits a tool.executed event. +func (e *AsyncEmitter) EmitToolExecuted(mctx *Context, toolName, serverDID, outcome string, durationMs int64) { + event := NewEvent(EventToolExecuted, e.emitter) + if mctx != nil { + event.WithContext(mctx.TraceID, mctx.TxnID, mctx.HopID) + } + + event.WithPayload("tool_name", toolName) + event.WithPayload("server_did", serverDID) + event.WithPayload("outcome", outcome) + event.WithPayload("duration_ms", durationMs) + + e.emit(event) +} + +// EmitResourceNetwork emits a resource.network.* event. +func (e *AsyncEmitter) EmitResourceNetwork( + mctx *Context, + eventType EventType, + scheme, host string, + port int, + pathClassification, method, reason string, +) { + event := NewEvent(eventType, e.emitter) + if mctx != nil { + event.WithContext(mctx.TraceID, mctx.TxnID, mctx.HopID) + event.WithPayload("subject_did", mctx.SubjectDID) + } + + event.WithPayload("scheme", scheme) + event.WithPayload("target_host", host) + event.WithPayload("target_port", port) + event.WithPayload("path_classification", pathClassification) + event.WithPayload("method", method) + if reason != "" { + event.WithPayload("reason", reason) + } + + e.emit(event) +} + +// EmitResourceFilesystem emits a resource.filesystem.* event. +func (e *AsyncEmitter) EmitResourceFilesystem( + mctx *Context, + eventType EventType, + pathClassification, pathHash, operation, reason string, +) { + event := NewEvent(eventType, e.emitter) + if mctx != nil { + event.WithContext(mctx.TraceID, mctx.TxnID, mctx.HopID) + event.WithPayload("subject_did", mctx.SubjectDID) + } + + event.WithPayload("path_classification", pathClassification) + event.WithPayload("path_hash", pathHash) + event.WithPayload("operation", operation) + if reason != "" { + event.WithPayload("reason", reason) + } + + e.emit(event) +} + +// EmitResourceShell emits a resource.shell.* event. +func (e *AsyncEmitter) EmitResourceShell( + mctx *Context, + eventType EventType, + commandHash, commandClassification, reason string, +) { + event := NewEvent(eventType, e.emitter) + if mctx != nil { + event.WithContext(mctx.TraceID, mctx.TxnID, mctx.HopID) + event.WithPayload("subject_did", mctx.SubjectDID) + } + + event.WithPayload("command_hash", commandHash) + event.WithPayload("command_classification", commandClassification) + if reason != "" { + event.WithPayload("reason", reason) + } + + e.emit(event) +} + +// EmitExecutionStarted emits an execution.started event. +func (e *AsyncEmitter) EmitExecutionStarted(mctx *Context) { + event := NewEvent(EventExecutionStarted, e.emitter) + if mctx != nil { + event.WithContext(mctx.TraceID, mctx.TxnID, mctx.HopID) + event.WithPayload("subject_did", mctx.SubjectDID) + if mctx.Envelope != nil { + event.WithPayload("envelope_hash", HashForEvent(mctx.Envelope.Raw)) + } + } + + e.emit(event) +} + +// EmitExecutionCompleted emits an execution.completed event. +func (e *AsyncEmitter) EmitExecutionCompleted(mctx *Context, outcome string, durationMs int64) { + event := NewEvent(EventExecutionCompleted, e.emitter) + if mctx != nil { + event.WithContext(mctx.TraceID, mctx.TxnID, mctx.HopID) + } + + event.WithPayload("outcome", outcome) + event.WithPayload("duration_ms", durationMs) + + e.emit(event) +} + +// EmitTrustRevocationChecked emits a trust.revocation.checked event. +func (e *AsyncEmitter) EmitTrustRevocationChecked(mctx *Context, jti string, revoked bool, cacheAgeSeconds int) { + event := NewEvent(EventTrustRevocationChecked, e.emitter) + if mctx != nil { + event.WithContext(mctx.TraceID, mctx.TxnID, mctx.HopID) + } + + event.WithPayload("jti", jti) + event.WithPayload("revoked", revoked) + event.WithPayload("cache_age_seconds", cacheAgeSeconds) + + e.emit(event) +} + +// allowEventType returns the appropriate allow event type for a request domain. +func (e *AsyncEmitter) allowEventType(request Request) EventType { + if request == nil { + return EventAuthorityGranted + } + switch request.Domain() { + case "tool": + return EventToolPermitted + case "file": + return EventResourceFilesystemPermitted + case "network": + return EventResourceNetworkPermitted + case "shell": + return EventResourceShellPermitted + default: + return EventAuthorityGranted + } +} + +// denyEventType returns the appropriate deny event type for a request domain. +func (e *AsyncEmitter) denyEventType(request Request) EventType { + if request == nil { + return EventAuthorityDenied + } + switch request.Domain() { + case "tool": + return EventToolDenied + case "file": + return EventResourceFilesystemDenied + case "network": + return EventResourceNetworkDenied + case "shell": + return EventResourceShellDenied + default: + return EventAuthorityDenied + } +} + +// AddSink adds a sink to receive events. +func (e *AsyncEmitter) AddSink(sink EventSink) { + e.mu.Lock() + defer e.mu.Unlock() + e.sinks = append(e.sinks, sink) +} + +// DroppedCount returns the number of events dropped due to buffer overflow. +func (e *AsyncEmitter) DroppedCount() uint64 { + e.mu.RLock() + defer e.mu.RUnlock() + return e.droppedCount +} + +// Flush drains buffered events and flushes all sinks. +func (e *AsyncEmitter) Flush(ctx context.Context) error { + e.mu.RLock() + sinks := e.sinks + e.mu.RUnlock() + + for _, sink := range sinks { + if err := sink.Flush(ctx); err != nil { + return err + } + } + return nil +} + +// Close stops the emitter and releases resources. +func (e *AsyncEmitter) Close() error { + e.mu.Lock() + if e.closed { + e.mu.Unlock() + return nil + } + e.closed = true + e.mu.Unlock() + + // Signal shutdown and wait for drain + close(e.doneCh) + e.wg.Wait() + + // Close all sinks + e.mu.RLock() + sinks := e.sinks + e.mu.RUnlock() + + for _, sink := range sinks { + _ = sink.Close() // Best-effort cleanup; errors logged by sinks + } + + return nil +} + +// ChannelSink is a simple sink that forwards events to a channel. +// Useful for testing and for piping events to goroutines. +type ChannelSink struct { + ch chan *Event + closed bool + mu sync.Mutex +} + +// NewChannelSink creates a sink that forwards events to a channel. +func NewChannelSink(bufferSize int) *ChannelSink { + return &ChannelSink{ + ch: make(chan *Event, bufferSize), + } +} + +// Receive implements EventSink. +func (s *ChannelSink) Receive(event *Event) { + s.mu.Lock() + defer s.mu.Unlock() + if s.closed { + return + } + select { + case s.ch <- event: + default: + // Drop if full + } +} + +// Flush implements EventSink. +func (s *ChannelSink) Flush(ctx context.Context) error { + return nil +} + +// Close implements EventSink. +func (s *ChannelSink) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + if !s.closed { + s.closed = true + close(s.ch) + } + return nil +} + +// Events returns the event channel for reading. +func (s *ChannelSink) Events() <-chan *Event { + return s.ch +} + +// JSONSink writes events as newline-delimited JSON. +// Typically used with an io.Writer for file or network output. +type JSONSink struct { + encoder *json.Encoder + mu sync.Mutex +} + +// NewJSONSink creates a sink that writes JSON to the given writer. +func NewJSONSink(w interface{ Write([]byte) (int, error) }) *JSONSink { + return &JSONSink{ + encoder: json.NewEncoder(w), + } +} + +// Receive implements EventSink. +func (s *JSONSink) Receive(event *Event) { + s.mu.Lock() + defer s.mu.Unlock() + _ = s.encoder.Encode(event) // Errors are intentionally ignored; sink is best-effort +} + +// Flush implements EventSink. +func (s *JSONSink) Flush(ctx context.Context) error { + return nil +} + +// Close implements EventSink. +func (s *JSONSink) Close() error { + return nil +} + +// BatchSink collects events and flushes them in batches. +type BatchSink struct { + batch []*Event + batchSize int + flushFn func([]*Event) error + mu sync.Mutex + lastFlush time.Time +} + +// NewBatchSink creates a sink that batches events. +func NewBatchSink(batchSize int, flushFn func([]*Event) error) *BatchSink { + return &BatchSink{ + batch: make([]*Event, 0, batchSize), + batchSize: batchSize, + flushFn: flushFn, + lastFlush: time.Now(), + } +} + +// Receive implements EventSink. +func (s *BatchSink) Receive(event *Event) { + s.mu.Lock() + defer s.mu.Unlock() + + s.batch = append(s.batch, event) + if len(s.batch) >= s.batchSize { + s.flushLocked() + } +} + +// flushLocked flushes the batch (must hold mu). +func (s *BatchSink) flushLocked() { + if len(s.batch) == 0 { + return + } + batch := s.batch + s.batch = make([]*Event, 0, s.batchSize) + s.lastFlush = time.Now() + + // Release lock during flush callback + s.mu.Unlock() + _ = s.flushFn(batch) // Errors are logged by caller; sink continues operating + s.mu.Lock() +} + +// Flush implements EventSink. +func (s *BatchSink) Flush(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() + s.flushLocked() + return nil +} + +// Close implements EventSink. +func (s *BatchSink) Close() error { + return s.Flush(context.Background()) +} diff --git a/pkg/mediation/events.go b/pkg/mediation/events.go new file mode 100644 index 0000000..4ec0acc --- /dev/null +++ b/pkg/mediation/events.go @@ -0,0 +1,314 @@ +// Copyright (c) CapiscIO, Inc. +// Licensed under the MIT License. + +// Package mediation provides RFC-011 runtime event types and emission. +// +// Events model authority transitions — the moments when trust decisions are made, +// authority changes hands, and enforcement actions occur. They are NOT telemetry; +// they are first-class runtime artifacts supporting audit, compliance, and provenance. +// +// Per RFC-011 §4.2, event emission is a local operation that MUST NOT block on +// network I/O. Events are buffered locally and forwarded asynchronously. +package mediation + +import ( + "crypto/sha256" + "encoding/hex" + "strings" + "time" + + "github.com/google/uuid" +) + +// EventType is a dot-delimited identifier categorizing the event. +// Event types follow the taxonomy defined in RFC-011 §5. +type EventType string + +// Identity events (RFC-011 §5.1) +const ( + EventIdentityVerified EventType = "identity.verified" + EventIdentityInvalid EventType = "identity.invalid" + EventIdentityExpired EventType = "identity.expired" + EventIdentityRevoked EventType = "identity.revoked" +) + +// Authority events (RFC-011 §5.2) +const ( + EventAuthorityRequested EventType = "authority.requested" + EventAuthorityGranted EventType = "authority.granted" + EventAuthorityDenied EventType = "authority.denied" + EventAuthorityNarrowed EventType = "authority.narrowed" + EventAuthorityDelegated EventType = "authority.delegated" + EventAuthorityExpired EventType = "authority.expired" +) + +// Runtime events (RFC-011 §5.3) +const ( + EventRuntimeAttached EventType = "runtime.attached" + EventRuntimeDetached EventType = "runtime.detached" + EventExecutionStarted EventType = "execution.started" + EventExecutionCompleted EventType = "execution.completed" + EventExecutionAborted EventType = "execution.aborted" +) + +// Tool events (RFC-011 §5.4) +const ( + EventToolRequested EventType = "tool.requested" + EventToolPermitted EventType = "tool.permitted" + EventToolDenied EventType = "tool.denied" + EventToolExecuted EventType = "tool.executed" +) + +// Resource events (RFC-011 §5.5) +const ( + EventResourceNetworkRequested EventType = "resource.network.requested" + EventResourceNetworkPermitted EventType = "resource.network.permitted" + EventResourceNetworkDenied EventType = "resource.network.denied" + EventResourceFilesystemRequested EventType = "resource.filesystem.requested" + EventResourceFilesystemPermitted EventType = "resource.filesystem.permitted" + EventResourceFilesystemDenied EventType = "resource.filesystem.denied" + EventResourceShellRequested EventType = "resource.shell.requested" + EventResourceShellPermitted EventType = "resource.shell.permitted" + EventResourceShellDenied EventType = "resource.shell.denied" +) + +// Trust events (RFC-011 §5.6) +const ( + EventTrustSignatureCreated EventType = "trust.signature.created" + EventTrustSignatureVerified EventType = "trust.signature.verified" + EventTrustSignatureInvalid EventType = "trust.signature.invalid" + EventTrustAttestationGenerated EventType = "trust.attestation.generated" + EventTrustRevocationChecked EventType = "trust.revocation.checked" +) + +// ComponentType identifies the type of emitting component. +type ComponentType string + +const ( + ComponentSDK ComponentType = "sdk" + ComponentSidecar ComponentType = "sidecar" + ComponentGateway ComponentType = "gateway" + ComponentPEP ComponentType = "pep" +) + +// Event is the RFC-011 event envelope. +// +// Per RFC-011 §6.1, this structure contains all metadata required for +// audit, compliance, and provenance reconstruction. +type Event struct { + // SchemaVersion is the event schema version. MUST be "1.0" per RFC-011. + SchemaVersion string `json:"schema_version"` + + // EventID is a unique event identifier (UUID v4 prefixed with "evt_"). + EventID string `json:"event_id"` + + // EventType categorizes the event per RFC-011 §5 taxonomy. + EventType EventType `json:"event_type"` + + // Timestamp is ISO 8601 with millisecond precision. + Timestamp string `json:"timestamp"` + + // Emitter identifies the component that generated the event. + Emitter EmitterInfo `json:"emitter"` + + // Context carries correlation identifiers for distributed tracing. + Context EventContext `json:"context,omitempty"` + + // Payload contains event-specific data per RFC-011 §5 tables. + Payload map[string]any `json:"payload"` + + // Signature is an optional JWS compact signature for integrity. + Signature string `json:"signature,omitempty"` +} + +// EmitterInfo identifies the component emitting the event. +type EmitterInfo struct { + // ComponentID is the identifier of the emitting component. + ComponentID string `json:"component_id"` + + // ComponentType is one of: sdk, sidecar, gateway, pep. + ComponentType ComponentType `json:"component_type"` + + // Version is the version of the emitting component. + Version string `json:"version,omitempty"` +} + +// EventContext carries correlation identifiers for distributed tracing. +type EventContext struct { + // TraceID links events to the same originator workflow (RFC-001 §3.2). + TraceID string `json:"trace_id,omitempty"` + + // TxnID links events to the same transaction (RFC-004). + TxnID string `json:"txn_id,omitempty"` + + // HopID links events to the same hop in a delegation chain (RFC-004). + HopID string `json:"hop_id,omitempty"` +} + +// NewEvent creates a new event with a fresh UUID and timestamp. +func NewEvent(eventType EventType, emitter EmitterInfo) *Event { + return &Event{ + SchemaVersion: "1.0", + EventID: "evt_" + uuid.New().String(), + EventType: eventType, + Timestamp: time.Now().UTC().Format("2006-01-02T15:04:05.000Z"), + Emitter: emitter, + Payload: make(map[string]any), + } +} + +// WithContext adds correlation context to the event. +func (e *Event) WithContext(traceID, txnID, hopID string) *Event { + e.Context = EventContext{ + TraceID: traceID, + TxnID: txnID, + HopID: hopID, + } + return e +} + +// WithPayload adds a key-value pair to the payload. +func (e *Event) WithPayload(key string, value any) *Event { + e.Payload[key] = value + return e +} + +// IdentityPayload contains fields for identity events (RFC-011 §5.1). +type IdentityPayload struct { + BadgeJTI string `json:"badge_jti,omitempty"` + SubjectDID string `json:"subject_did,omitempty"` + TrustLevel int `json:"trust_level,omitempty"` + IAL int `json:"ial,omitempty"` + ErrorCode string `json:"error_code,omitempty"` + Reason string `json:"reason,omitempty"` + ExpiredAt string `json:"expired_at,omitempty"` + RevokedAt string `json:"revoked_at,omitempty"` +} + +// AuthorityPayload contains fields for authority events (RFC-011 §5.2). +type AuthorityPayload struct { + SubjectDID string `json:"subject_did,omitempty"` + RequestedCapabilities []string `json:"requested_capabilities,omitempty"` + EffectiveCapabilities []string `json:"effective_capabilities,omitempty"` + EnvelopeHash string `json:"envelope_hash,omitempty"` + PolicyVersion string `json:"policy_version,omitempty"` + Reason string `json:"reason,omitempty"` + ParentEnvelopeHash string `json:"parent_envelope_hash,omitempty"` + ChildEnvelopeHash string `json:"child_envelope_hash,omitempty"` + NarrowedCapabilities []string `json:"narrowed_capabilities,omitempty"` + IssuerDID string `json:"issuer_did,omitempty"` + DelegationDepth int `json:"delegation_depth,omitempty"` + ExpiredAt string `json:"expired_at,omitempty"` +} + +// RuntimePayload contains fields for runtime events (RFC-011 §5.3). +type RuntimePayload struct { + ComponentID string `json:"component_id,omitempty"` + AttachedAt string `json:"attached_at,omitempty"` + DetachedAt string `json:"detached_at,omitempty"` + EnforcementMode string `json:"enforcement_mode,omitempty"` + Reason string `json:"reason,omitempty"` + TxnID string `json:"txn_id,omitempty"` + HopID string `json:"hop_id,omitempty"` + SubjectDID string `json:"subject_did,omitempty"` + EnvelopeHash string `json:"envelope_hash,omitempty"` + Outcome string `json:"outcome,omitempty"` + DurationMs int64 `json:"duration_ms,omitempty"` + ErrorCode string `json:"error_code,omitempty"` +} + +// ToolPayload contains fields for tool events (RFC-011 §5.4). +type ToolPayload struct { + ToolName string `json:"tool_name,omitempty"` + ServerDID string `json:"server_did,omitempty"` + SubjectDID string `json:"subject_did,omitempty"` + EnvelopeHash string `json:"envelope_hash,omitempty"` + Reason string `json:"reason,omitempty"` + Outcome string `json:"outcome,omitempty"` + DurationMs int64 `json:"duration_ms,omitempty"` +} + +// ResourcePayload contains fields for resource events (RFC-011 §5.5). +// +// Per RFC-011 §5.5, resource payloads MUST use canonicalized representations, +// NOT raw values. Raw URLs, paths, and commands can contain secrets. +type ResourcePayload struct { + // Network fields + Scheme string `json:"scheme,omitempty"` + TargetHost string `json:"target_host,omitempty"` + TargetPort int `json:"target_port,omitempty"` + PathClassification string `json:"path_classification,omitempty"` + Method string `json:"method,omitempty"` + + // Filesystem fields + PathHash string `json:"path_hash,omitempty"` + Operation string `json:"operation,omitempty"` + + // Shell fields + CommandHash string `json:"command_hash,omitempty"` + CommandClassification string `json:"command_classification,omitempty"` + + // Common fields + SubjectDID string `json:"subject_did,omitempty"` + Reason string `json:"reason,omitempty"` +} + +// TrustPayload contains fields for trust events (RFC-011 §5.6). +type TrustPayload struct { + ArtifactType string `json:"artifact_type,omitempty"` + ArtifactHash string `json:"artifact_hash,omitempty"` + SignerDID string `json:"signer_did,omitempty"` + ErrorCode string `json:"error_code,omitempty"` + HopID string `json:"hop_id,omitempty"` + AttesterDID string `json:"attester_did,omitempty"` + AttestationHash string `json:"attestation_hash,omitempty"` + JTI string `json:"jti,omitempty"` + Revoked bool `json:"revoked,omitempty"` + CacheAgeSeconds int `json:"cache_age_seconds,omitempty"` +} + +// HashForEvent computes a SHA-256 hash of a value for event payloads. +// Used to canonicalize sensitive values like paths and commands per RFC-011 §5.5. +func HashForEvent(value string) string { + h := sha256.Sum256([]byte(value)) + return "sha256:" + hex.EncodeToString(h[:]) +} + +// ClassifyPath converts a filesystem path to a classification pattern. +// Example: "/home/user/documents/file.txt" → "/home/*/documents/*" +func ClassifyPath(path string) string { + parts := strings.Split(path, "/") + if len(parts) <= 2 { + return path + } + // Replace middle components with wildcards, preserve structure + classified := make([]string, len(parts)) + for i, part := range parts { + if i == 0 || i == 1 || i == len(parts)-1 { + // Keep root, first dir, and last component + if i == len(parts)-1 && strings.Contains(part, ".") { + classified[i] = "*" + part[strings.LastIndex(part, "."):] + } else { + classified[i] = part + } + } else { + classified[i] = "*" + } + } + return strings.Join(classified, "/") +} + +// ClassifyCommand extracts the command classification from a shell command. +// Example: "git commit -m 'message'" → "git" +func ClassifyCommand(cmd string) string { + parts := strings.Fields(cmd) + if len(parts) == 0 { + return "unknown" + } + // Extract basename of command + cmdPath := parts[0] + if idx := strings.LastIndex(cmdPath, "/"); idx >= 0 { + cmdPath = cmdPath[idx+1:] + } + return cmdPath +} diff --git a/pkg/mediation/events_test.go b/pkg/mediation/events_test.go new file mode 100644 index 0000000..9cc95e6 --- /dev/null +++ b/pkg/mediation/events_test.go @@ -0,0 +1,445 @@ +// Copyright (c) CapiscIO, Inc. +// Licensed under the MIT License. + +package mediation + +import ( + "bytes" + "context" + "encoding/json" + "sync" + "testing" + "time" +) + +func TestEventType_Constants(t *testing.T) { + // Verify RFC-011 event type format + tests := []struct { + eventType EventType + category string + }{ + {EventIdentityVerified, "identity"}, + {EventIdentityInvalid, "identity"}, + {EventAuthorityGranted, "authority"}, + {EventAuthorityDenied, "authority"}, + {EventToolRequested, "tool"}, + {EventToolPermitted, "tool"}, + {EventResourceNetworkRequested, "resource"}, + {EventResourceFilesystemPermitted, "resource"}, + {EventTrustRevocationChecked, "trust"}, + {EventExecutionStarted, "execution"}, + } + + for _, tt := range tests { + t.Run(string(tt.eventType), func(t *testing.T) { + // Event types must be dot-delimited + parts := splitEventType(string(tt.eventType)) + if len(parts) < 2 { + t.Errorf("event type %q should be dot-delimited", tt.eventType) + } + if parts[0] != tt.category { + t.Errorf("event type %q should start with %q", tt.eventType, tt.category) + } + }) + } +} + +func splitEventType(et string) []string { + var parts []string + part := "" + for _, c := range et { + if c == '.' { + parts = append(parts, part) + part = "" + } else { + part += string(c) + } + } + if part != "" { + parts = append(parts, part) + } + return parts +} + +func TestNewEvent(t *testing.T) { + emitter := EmitterInfo{ + ComponentID: "test-component", + ComponentType: ComponentPEP, + Version: "1.0.0", + } + + event := NewEvent(EventAuthorityGranted, emitter) + + // Check required fields + if event.SchemaVersion != "1.0" { + t.Errorf("schema_version = %q, want %q", event.SchemaVersion, "1.0") + } + if len(event.EventID) < 4 || event.EventID[:4] != "evt_" { + t.Errorf("event_id = %q, should start with 'evt_'", event.EventID) + } + if event.EventType != EventAuthorityGranted { + t.Errorf("event_type = %q, want %q", event.EventType, EventAuthorityGranted) + } + if event.Timestamp == "" { + t.Error("timestamp should not be empty") + } + if event.Emitter.ComponentID != "test-component" { + t.Errorf("emitter.component_id = %q, want %q", event.Emitter.ComponentID, "test-component") + } +} + +func TestEvent_WithContext(t *testing.T) { + event := NewEvent(EventToolPermitted, EmitterInfo{}) + event.WithContext("trace-123", "txn-456", "hop-001") + + if event.Context.TraceID != "trace-123" { + t.Errorf("trace_id = %q, want %q", event.Context.TraceID, "trace-123") + } + if event.Context.TxnID != "txn-456" { + t.Errorf("txn_id = %q, want %q", event.Context.TxnID, "txn-456") + } + if event.Context.HopID != "hop-001" { + t.Errorf("hop_id = %q, want %q", event.Context.HopID, "hop-001") + } +} + +func TestEvent_WithPayload(t *testing.T) { + event := NewEvent(EventToolDenied, EmitterInfo{}) + event.WithPayload("tool_name", "mcp:database/query") + event.WithPayload("reason", "insufficient trust level") + + if event.Payload["tool_name"] != "mcp:database/query" { + t.Errorf("payload[tool_name] = %v, want %q", event.Payload["tool_name"], "mcp:database/query") + } + if event.Payload["reason"] != "insufficient trust level" { + t.Errorf("payload[reason] = %v, want %q", event.Payload["reason"], "insufficient trust level") + } +} + +func TestEvent_JSONSerialization(t *testing.T) { + event := NewEvent(EventAuthorityDenied, EmitterInfo{ + ComponentID: "test-pep", + ComponentType: ComponentPEP, + Version: "2.4.1", + }) + event.WithContext("tr_abc", "txn_123", "hop_001") + event.WithPayload("subject_did", "did:web:example.com:agent") + event.WithPayload("reason", "trust level too low") + + // Serialize to JSON + data, err := json.Marshal(event) + if err != nil { + t.Fatalf("json.Marshal failed: %v", err) + } + + // Deserialize and verify + var parsed Event + if err := json.Unmarshal(data, &parsed); err != nil { + t.Fatalf("json.Unmarshal failed: %v", err) + } + + if parsed.SchemaVersion != "1.0" { + t.Errorf("parsed schema_version = %q, want %q", parsed.SchemaVersion, "1.0") + } + if parsed.EventType != EventAuthorityDenied { + t.Errorf("parsed event_type = %q, want %q", parsed.EventType, EventAuthorityDenied) + } + if parsed.Context.TraceID != "tr_abc" { + t.Errorf("parsed context.trace_id = %q, want %q", parsed.Context.TraceID, "tr_abc") + } +} + +func TestHashForEvent(t *testing.T) { + hash := HashForEvent("secret value") + + // Should be prefixed with sha256: + if len(hash) < 8 || hash[:7] != "sha256:" { + t.Errorf("hash = %q, should start with 'sha256:'", hash) + } + // SHA-256 produces 64 hex chars + if len(hash) != 7+64 { + t.Errorf("hash length = %d, want %d", len(hash), 7+64) + } + + // Same input produces same hash + hash2 := HashForEvent("secret value") + if hash != hash2 { + t.Error("same input should produce same hash") + } + + // Different input produces different hash + hash3 := HashForEvent("different value") + if hash == hash3 { + t.Error("different input should produce different hash") + } +} + +func TestClassifyPath(t *testing.T) { + tests := []struct { + path string + wantContains string + description string + }{ + {"/home/user/docs/file.txt", "*.txt", "should preserve extension"}, + {"/tmp/data", "/tmp/", "should preserve root"}, + {"/etc/config.yaml", "*.yaml", "should preserve extension"}, + {"relative/path.go", "relative/", "should handle short relative paths as-is"}, + } + + for _, tt := range tests { + t.Run(tt.path, func(t *testing.T) { + got := ClassifyPath(tt.path) + // Path should be anonymized but still contain expected patterns + if !containsPattern(got, tt.wantContains) { + t.Errorf("ClassifyPath(%q) = %q, want to contain %q", tt.path, got, tt.wantContains) + } + }) + } +} + +func containsPattern(s, pattern string) bool { + return len(s) >= len(pattern) && + (s == pattern || + len(s) > len(pattern) && (s[:len(pattern)] == pattern || s[len(s)-len(pattern):] == pattern) || + indexOfString(s, pattern) >= 0) +} + +func indexOfString(s, substr string) int { + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return i + } + } + return -1 +} + +func TestClassifyCommand(t *testing.T) { + tests := []struct { + cmd string + expected string + }{ + {"git commit -m 'message'", "git"}, + {"/usr/bin/curl https://api.example.com", "curl"}, + {"ls -la /tmp", "ls"}, + {"", "unknown"}, + {" ", "unknown"}, + } + + for _, tt := range tests { + t.Run(tt.cmd, func(t *testing.T) { + got := ClassifyCommand(tt.cmd) + if got != tt.expected { + t.Errorf("ClassifyCommand(%q) = %q, want %q", tt.cmd, got, tt.expected) + } + }) + } +} + +func TestAsyncEmitter_EmitDecision(t *testing.T) { + sink := NewChannelSink(10) + emitter := NewAsyncEmitter(AsyncEmitterConfig{ + ComponentID: "test-emitter", + ComponentType: ComponentSDK, + Version: "1.0.0", + Sinks: []EventSink{sink}, + }) + defer emitter.Close() + + mctx := &Context{ + TraceID: "trace-001", + TxnID: "txn-001", + SubjectDID: "did:web:example.com:agent", + TrustLevel: 2, + } + + result := AllowResult("policy:test:rule1", "tool allowed") + request := &ToolRequest{ToolName: "mcp:test/tool"} + + emitter.EmitDecision(mctx, result, request) + + // Wait for event + select { + case event := <-sink.Events(): + if event.EventType != EventToolPermitted { + t.Errorf("event_type = %q, want %q", event.EventType, EventToolPermitted) + } + if event.Context.TraceID != "trace-001" { + t.Errorf("context.trace_id = %q, want %q", event.Context.TraceID, "trace-001") + } + if event.Payload["subject_did"] != "did:web:example.com:agent" { + t.Errorf("payload.subject_did = %v, want %q", event.Payload["subject_did"], "did:web:example.com:agent") + } + case <-time.After(time.Second): + t.Fatal("timeout waiting for event") + } +} + +func TestAsyncEmitter_DenyDecision(t *testing.T) { + sink := NewChannelSink(10) + emitter := NewAsyncEmitter(AsyncEmitterConfig{ + ComponentID: "test-emitter", + ComponentType: ComponentPEP, + Sinks: []EventSink{sink}, + }) + defer emitter.Close() + + mctx := &Context{SubjectDID: "did:web:agent"} + result := DenyResult("builtin:blocked", "tool blocked") + request := &ToolRequest{ToolName: "mcp:dangerous/tool"} + + emitter.EmitDecision(mctx, result, request) + + select { + case event := <-sink.Events(): + if event.EventType != EventToolDenied { + t.Errorf("event_type = %q, want %q", event.EventType, EventToolDenied) + } + if event.Payload["reason"] != "tool blocked" { + t.Errorf("payload.reason = %v, want %q", event.Payload["reason"], "tool blocked") + } + case <-time.After(time.Second): + t.Fatal("timeout waiting for event") + } +} + +func TestAsyncEmitter_CloseDrainsPending(t *testing.T) { + var received []*Event + var mu sync.Mutex + + sink := &testSink{ + receiveFn: func(e *Event) { + mu.Lock() + received = append(received, e) + mu.Unlock() + }, + } + + emitter := NewAsyncEmitter(AsyncEmitterConfig{ + ComponentID: "drain-test", + ComponentType: ComponentSDK, + Sinks: []EventSink{sink}, + }) + + // Emit multiple events + for i := 0; i < 5; i++ { + emitter.EmitCapabilityCheck(nil, "cap", true) + } + + // Close should drain all events + emitter.Close() + + mu.Lock() + count := len(received) + mu.Unlock() + + if count != 5 { + t.Errorf("received %d events, want 5", count) + } +} + +func TestJSONSink(t *testing.T) { + var buf bytes.Buffer + sink := NewJSONSink(&buf) + + event := NewEvent(EventToolExecuted, EmitterInfo{ComponentID: "test"}) + event.WithPayload("tool_name", "test-tool") + + sink.Receive(event) + + // Parse the output + var parsed Event + if err := json.NewDecoder(&buf).Decode(&parsed); err != nil { + t.Fatalf("failed to decode JSON: %v", err) + } + + if parsed.EventType != EventToolExecuted { + t.Errorf("event_type = %q, want %q", parsed.EventType, EventToolExecuted) + } +} + +func TestBatchSink(t *testing.T) { + var batches [][]*Event + var mu sync.Mutex + + sink := NewBatchSink(3, func(batch []*Event) error { + mu.Lock() + batches = append(batches, batch) + mu.Unlock() + return nil + }) + + // Send 5 events - should trigger one batch of 3 + for i := 0; i < 5; i++ { + event := NewEvent(EventAuthorityGranted, EmitterInfo{}) + sink.Receive(event) + } + + mu.Lock() + batchCount := len(batches) + mu.Unlock() + + if batchCount != 1 { + t.Errorf("got %d batches, want 1", batchCount) + } + + // Flush remaining + sink.Flush(context.Background()) + + mu.Lock() + batchCount = len(batches) + mu.Unlock() + + if batchCount != 2 { + t.Errorf("after flush got %d batches, want 2", batchCount) + } +} + +func TestChannelSink_CloseDropsNew(t *testing.T) { + sink := NewChannelSink(1) + + // Send one event + event := NewEvent(EventIdentityVerified, EmitterInfo{}) + sink.Receive(event) + + // Close the sink + sink.Close() + + // New events should be silently dropped + event2 := NewEvent(EventIdentityVerified, EmitterInfo{}) + sink.Receive(event2) // Should not panic +} + +func TestAsyncEmitter_DropOnFull(t *testing.T) { + sink := NewChannelSink(1) + emitter := NewAsyncEmitter(AsyncEmitterConfig{ + ComponentID: "drop-test", + ComponentType: ComponentSDK, + BufferSize: 1, + DropOnFull: true, + Sinks: []EventSink{sink}, + }) + defer emitter.Close() + + // Fill the buffer by emitting rapidly + for i := 0; i < 100; i++ { + emitter.EmitCapabilityCheck(nil, "cap", true) + } + + // Some events should have been dropped + dropped := emitter.DroppedCount() + // Note: exact count depends on timing, just verify the mechanism works + _ = dropped // May or may not be > 0 depending on goroutine scheduling +} + +// testSink is a simple test sink. +type testSink struct { + receiveFn func(*Event) +} + +func (s *testSink) Receive(event *Event) { + if s.receiveFn != nil { + s.receiveFn(event) + } +} +func (s *testSink) Flush(ctx context.Context) error { return nil } +func (s *testSink) Close() error { return nil }