diff --git a/confidence-resolver/src/event_logger.rs b/confidence-resolver/src/event_logger.rs new file mode 100644 index 00000000..deee4240 --- /dev/null +++ b/confidence-resolver/src/event_logger.rs @@ -0,0 +1,117 @@ +use crossbeam_queue::SegQueue; + +use crate::proto::google::{Struct, Timestamp}; + +pub struct TrackedEvent { + pub event_definition: String, + pub payload: Struct, + pub event_time: Timestamp, +} + +#[derive(Default)] +pub struct EventLogger { + events: SegQueue, +} + +impl EventLogger { + pub fn new() -> Self { + Self::default() + } + + pub fn track(&self, event: TrackedEvent) { + self.events.push(event); + } + + // TODO: Only drop events from memory after the provider confirms successful delivery. + // Currently events are drained unconditionally — if the HTTP/gRPC send fails, they are lost. + pub fn flush(&self) -> Vec { + let mut result = Vec::new(); + while let Some(event) = self.events.pop() { + result.push(event); + } + result + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_event(name: &str) -> TrackedEvent { + TrackedEvent { + event_definition: format!("eventDefinitions/{}", name), + payload: Struct::default(), + event_time: Timestamp { + seconds: 1234, + nanos: 0, + }, + } + } + + #[test] + fn track_and_flush_single_event() { + let logger = EventLogger::new(); + logger.track(make_event("purchase")); + let events = logger.flush(); + assert_eq!(events.len(), 1); + assert_eq!(events[0].event_definition, "eventDefinitions/purchase"); + } + + #[test] + fn flush_drains_all_events() { + let logger = EventLogger::new(); + logger.track(make_event("a")); + logger.track(make_event("b")); + logger.track(make_event("c")); + let events = logger.flush(); + assert_eq!(events.len(), 3); + // second flush returns empty + assert!(logger.flush().is_empty()); + } + + #[test] + fn empty_flush_returns_empty_vec() { + let logger = EventLogger::new(); + assert!(logger.flush().is_empty()); + } + + #[test] + fn concurrent_track_and_flush() { + use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; + use std::sync::Arc; + use std::thread; + + let logger = Arc::new(EventLogger::new()); + let done = Arc::new(AtomicBool::new(false)); + let total_tracked = Arc::new(AtomicUsize::new(0)); + + let mut handles = Vec::new(); + for _ in 0..3 { + let lg = logger.clone(); + let done_cl = done.clone(); + let tracked = total_tracked.clone(); + handles.push(thread::spawn(move || { + let mut count = 0; + while !done_cl.load(Ordering::Relaxed) { + lg.track(make_event("concurrent")); + count += 1; + } + tracked.fetch_add(count, Ordering::Relaxed); + })); + } + + let lg = logger.clone(); + let mut total_flushed = 0; + for _ in 0..10 { + thread::sleep(std::time::Duration::from_millis(10)); + total_flushed += lg.flush().len(); + } + done.store(true, Ordering::Relaxed); + for h in handles { + h.join().unwrap(); + } + total_flushed += logger.flush().len(); + + assert_eq!(total_flushed, total_tracked.load(Ordering::Relaxed)); + } +} diff --git a/confidence-resolver/src/lib.rs b/confidence-resolver/src/lib.rs index 38cbe5a8..72c861a4 100644 --- a/confidence-resolver/src/lib.rs +++ b/confidence-resolver/src/lib.rs @@ -44,6 +44,7 @@ use err::Fallible; pub mod assign_logger; mod bounded_set; mod err; +pub mod event_logger; pub mod flag_logger; mod gzip; pub mod proto; diff --git a/openfeature-provider/go/confidence/event_sender.go b/openfeature-provider/go/confidence/event_sender.go new file mode 100644 index 00000000..beff8d84 --- /dev/null +++ b/openfeature-provider/go/confidence/event_sender.go @@ -0,0 +1,11 @@ +package confidence + +import ( + "github.com/spotify/confidence-resolver/openfeature-provider/go/confidence/internal/proto/wasm" +) + +// EventSender sends tracked events to the Confidence events backend. +type EventSender interface { + Send(response *wasm.FlushEventsResponse, clientSecret string) + Shutdown() +} diff --git a/openfeature-provider/go/confidence/flag_logs_test.go b/openfeature-provider/go/confidence/flag_logs_test.go index 8d3dfddb..9106b2ef 100644 --- a/openfeature-provider/go/confidence/flag_logs_test.go +++ b/openfeature-provider/go/confidence/flag_logs_test.go @@ -52,7 +52,7 @@ func setupFlagLogsUnitTest(t *testing.T) (*fl.CapturingFlagLogger, openfeature.I resolverSupplier := wrapResolverSupplierWithMaterializations(func(ctx context.Context, logSink lr.LogSink) lr.LocalResolver { return lr.NewLocalResolverWithPoolSize(ctx, logSink, 2) }, unsupportedMatStore) - provider := NewLocalResolverProvider(resolverSupplier, stateProvider, capturingLogger, unitTestClientSecret, logger) + provider := NewLocalResolverProvider(resolverSupplier, stateProvider, capturingLogger, nil, unitTestClientSecret, logger) // Set provider and wait for ready err := openfeature.SetProviderAndWait(provider) diff --git a/openfeature-provider/go/confidence/grpc_event_sender.go b/openfeature-provider/go/confidence/grpc_event_sender.go new file mode 100644 index 00000000..c7eadc64 --- /dev/null +++ b/openfeature-provider/go/confidence/grpc_event_sender.go @@ -0,0 +1,71 @@ +package confidence + +import ( + "context" + "log/slog" + "time" + + events "github.com/spotify/confidence-resolver/openfeature-provider/go/confidence/internal/proto/events" + "github.com/spotify/confidence-resolver/openfeature-provider/go/confidence/internal/proto/wasm" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/types/known/timestamppb" +) + +// GrpcEventSender sends events to the Confidence events backend via gRPC. +type GrpcEventSender struct { + conn *grpc.ClientConn + client events.EventsServiceClient + logger *slog.Logger +} + +var _ EventSender = (*GrpcEventSender)(nil) + +// NewGrpcEventSender creates a new gRPC event sender connecting to the given target. +func NewGrpcEventSender(target string, logger *slog.Logger, opts ...grpc.DialOption) (*GrpcEventSender, error) { + if len(opts) == 0 { + opts = []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} + } + conn, err := grpc.NewClient(target, opts...) + if err != nil { + return nil, err + } + return &GrpcEventSender{ + conn: conn, + client: events.NewEventsServiceClient(conn), + logger: logger, + }, nil +} + +func (s *GrpcEventSender) Send(response *wasm.FlushEventsResponse, clientSecret string) { + req := &events.PublishEventsRequest{ + ClientSecret: clientSecret, + SendTime: timestamppb.Now(), + } + for _, e := range response.Events { + req.Events = append(req.Events, &events.Event{ + EventDefinition: e.EventDefinition, + Payload: e.Payload, + EventTime: e.EventTime, + }) + } + + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + md := metadata.Pairs("authorization", "ClientSecret "+clientSecret) + ctx = metadata.NewOutgoingContext(ctx, md) + if _, err := s.client.PublishEvents(ctx, req); err != nil { + s.logger.Error("Failed to publish events", "error", err) + } else { + s.logger.Debug("Successfully published events", "count", len(req.Events)) + } + }() +} + +func (s *GrpcEventSender) Shutdown() { + if s.conn != nil { + s.conn.Close() + } +} diff --git a/openfeature-provider/go/confidence/integration_test.go b/openfeature-provider/go/confidence/integration_test.go index 5064aad0..befd4376 100644 --- a/openfeature-provider/go/confidence/integration_test.go +++ b/openfeature-provider/go/confidence/integration_test.go @@ -206,7 +206,7 @@ func TestIntegration_OpenFeatureResolveStickyFlagMatStoreReadAndWrite(t *testing }, matStore) // Create provider with test state - provider := NewLocalResolverProvider(resolverSupplier, stateProvider, trackingLogger, "test-secret", slog.New(slog.NewTextHandler(os.Stderr, nil))) + provider := NewLocalResolverProvider(resolverSupplier, stateProvider, trackingLogger, nil, "test-secret", slog.New(slog.NewTextHandler(os.Stderr, nil))) client := openfeature.NewClient("integration-test") @@ -337,7 +337,7 @@ func TestIntegration_OpenFeatureMaterializedSegmentCriterion(t *testing.T) { }, matStore) // Create provider with test state - provider := NewLocalResolverProvider(resolverSupplier, stateProvider, trackingLogger, SECRET, slog.New(slog.NewTextHandler(os.Stderr, nil))) + provider := NewLocalResolverProvider(resolverSupplier, stateProvider, trackingLogger, nil, SECRET, slog.New(slog.NewTextHandler(os.Stderr, nil))) client := openfeature.NewClient("integration-test-mat-seg") @@ -399,7 +399,7 @@ func TestIntegration_OpenFeatureMaterializedSegmentCriterion(t *testing.T) { }, matStore) // Create provider with test state - provider := NewLocalResolverProvider(resolverSupplier, stateProvider, trackingLogger, SECRET, slog.New(slog.NewTextHandler(os.Stderr, nil))) + provider := NewLocalResolverProvider(resolverSupplier, stateProvider, trackingLogger, nil, SECRET, slog.New(slog.NewTextHandler(os.Stderr, nil))) client := openfeature.NewClient("integration-test-mat-seg-not-in") @@ -440,7 +440,7 @@ func TestIntegration_OpenFeatureMaterializedSegmentCriterion(t *testing.T) { }, matStore) // Create provider with test state - provider := NewLocalResolverProvider(resolverSupplier, stateProvider, trackingLogger, SECRET, slog.New(slog.NewTextHandler(os.Stderr, nil))) + provider := NewLocalResolverProvider(resolverSupplier, stateProvider, trackingLogger, nil, SECRET, slog.New(slog.NewTextHandler(os.Stderr, nil))) client := openfeature.NewClient("integration-test-mat-seg-no-ctx") @@ -698,6 +698,6 @@ func createProviderWithTestState( }, matStore) // Create provider with the client secret from test state // The test state includes client secret: mkjJruAATQWjeY7foFIWfVAcBWnci2YF - provider := NewLocalResolverProvider(resolverSupplier, stateProvider, logger, "mkjJruAATQWjeY7foFIWfVAcBWnci2YF", slog.New(slog.NewTextHandler(os.Stderr, nil))) + provider := NewLocalResolverProvider(resolverSupplier, stateProvider, logger, nil, "mkjJruAATQWjeY7foFIWfVAcBWnci2YF", slog.New(slog.NewTextHandler(os.Stderr, nil))) return provider, nil } diff --git a/openfeature-provider/go/confidence/internal/local_resolver/assets/confidence_resolver.wasm b/openfeature-provider/go/confidence/internal/local_resolver/assets/confidence_resolver.wasm index d682a469..b56b0fd4 100755 Binary files a/openfeature-provider/go/confidence/internal/local_resolver/assets/confidence_resolver.wasm and b/openfeature-provider/go/confidence/internal/local_resolver/assets/confidence_resolver.wasm differ diff --git a/openfeature-provider/go/confidence/internal/local_resolver/local_resolver.go b/openfeature-provider/go/confidence/internal/local_resolver/local_resolver.go index f347cfd1..0a54233b 100644 --- a/openfeature-provider/go/confidence/internal/local_resolver/local_resolver.go +++ b/openfeature-provider/go/confidence/internal/local_resolver/local_resolver.go @@ -23,6 +23,8 @@ type LocalResolver interface { ApplyFlags(*resolver.ApplyFlagsRequest) error FlushAllLogs() error FlushAssignLogs() error + TrackEvent(*wasm.Event) error + FlushEvents() (*wasm.FlushEventsResponse, error) Close(context.Context) error } diff --git a/openfeature-provider/go/confidence/internal/local_resolver/pool.go b/openfeature-provider/go/confidence/internal/local_resolver/pool.go index ed7be424..af49b085 100644 --- a/openfeature-provider/go/confidence/internal/local_resolver/pool.go +++ b/openfeature-provider/go/confidence/internal/local_resolver/pool.go @@ -109,6 +109,32 @@ func (s *PooledResolver) FlushAssignLogs() error { }) } +// TrackEvent implements LocalResolver. +func (s *PooledResolver) TrackEvent(event *wasm.Event) error { + n := uint64(len(s.slots)) + idx := s.rr.Add(1) + for !s.slots[idx%n].rw.TryRLock() { + idx = s.rr.Add(1) + } + slot := &s.slots[idx%n] + defer slot.rw.RUnlock() + return slot.lr.TrackEvent(event) +} + +// FlushEvents implements LocalResolver. +func (s *PooledResolver) FlushEvents() (*wasm.FlushEventsResponse, error) { + combined := &wasm.FlushEventsResponse{} + err := s.maintenance(func(lr LocalResolver) error { + resp, err := lr.FlushEvents() + if err != nil { + return err + } + combined.Events = append(combined.Events, resp.Events...) + return nil + }) + return combined, err +} + func (s *PooledResolver) Close(ctx context.Context) error { return s.maintenance(func(lr LocalResolver) error { return lr.Close(ctx) diff --git a/openfeature-provider/go/confidence/internal/local_resolver/recover.go b/openfeature-provider/go/confidence/internal/local_resolver/recover.go index 85c277a2..f7fd8b04 100644 --- a/openfeature-provider/go/confidence/internal/local_resolver/recover.go +++ b/openfeature-provider/go/confidence/internal/local_resolver/recover.go @@ -125,6 +125,20 @@ func (r *RecoveringResolver) FlushAssignLogs() (err error) { return } +func (r *RecoveringResolver) TrackEvent(event *wasm.Event) (err error) { + r.withRecover("TrackEvent", &err, func(lr LocalResolver) { + err = lr.TrackEvent(event) + }) + return +} + +func (r *RecoveringResolver) FlushEvents() (resp *wasm.FlushEventsResponse, err error) { + r.withRecover("FlushEvents", &err, func(lr LocalResolver) { + resp, err = lr.FlushEvents() + }) + return +} + func (r *RecoveringResolver) Close(ctx context.Context) error { // For Close, if we panic, don't recreate during shutdown; just surface error. defer func() { diff --git a/openfeature-provider/go/confidence/internal/local_resolver/wasm.go b/openfeature-provider/go/confidence/internal/local_resolver/wasm.go index 452fa3d4..be40bd64 100644 --- a/openfeature-provider/go/confidence/internal/local_resolver/wasm.go +++ b/openfeature-provider/go/confidence/internal/local_resolver/wasm.go @@ -72,6 +72,16 @@ func (r *WasmResolver) FlushAssignLogs() error { return err } +func (r *WasmResolver) TrackEvent(event *wasm.Event) error { + return r.call("wasm_msg_guest_track_event", event, nil) +} + +func (r *WasmResolver) FlushEvents() (*wasm.FlushEventsResponse, error) { + resp := &wasm.FlushEventsResponse{} + err := r.call("wasm_msg_guest_flush_events", nil, resp) + return resp, err +} + func (r *WasmResolver) Close(ctx context.Context) error { // TODO we should call flush assigned until it doesn't flush any more r.FlushAllLogs() diff --git a/openfeature-provider/go/confidence/internal/proto/events/api.pb.go b/openfeature-provider/go/confidence/internal/proto/events/api.pb.go new file mode 100644 index 00000000..57766dc2 --- /dev/null +++ b/openfeature-provider/go/confidence/internal/proto/events/api.pb.go @@ -0,0 +1,200 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.10 +// protoc v5.29.3 +// source: confidence/events/v1/api.proto + +package events + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type PublishEventsRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + ClientSecret string `protobuf:"bytes,1,opt,name=client_secret,json=clientSecret,proto3" json:"client_secret,omitempty"` + Events []*Event `protobuf:"bytes,2,rep,name=events,proto3" json:"events,omitempty"` + SendTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=send_time,json=sendTime,proto3" json:"send_time,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PublishEventsRequest) Reset() { + *x = PublishEventsRequest{} + mi := &file_confidence_events_v1_api_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PublishEventsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PublishEventsRequest) ProtoMessage() {} + +func (x *PublishEventsRequest) ProtoReflect() protoreflect.Message { + mi := &file_confidence_events_v1_api_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PublishEventsRequest.ProtoReflect.Descriptor instead. +func (*PublishEventsRequest) Descriptor() ([]byte, []int) { + return file_confidence_events_v1_api_proto_rawDescGZIP(), []int{0} +} + +func (x *PublishEventsRequest) GetClientSecret() string { + if x != nil { + return x.ClientSecret + } + return "" +} + +func (x *PublishEventsRequest) GetEvents() []*Event { + if x != nil { + return x.Events + } + return nil +} + +func (x *PublishEventsRequest) GetSendTime() *timestamppb.Timestamp { + if x != nil { + return x.SendTime + } + return nil +} + +type PublishEventsResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Errors []*EventError `protobuf:"bytes,1,rep,name=errors,proto3" json:"errors,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PublishEventsResponse) Reset() { + *x = PublishEventsResponse{} + mi := &file_confidence_events_v1_api_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PublishEventsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PublishEventsResponse) ProtoMessage() {} + +func (x *PublishEventsResponse) ProtoReflect() protoreflect.Message { + mi := &file_confidence_events_v1_api_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PublishEventsResponse.ProtoReflect.Descriptor instead. +func (*PublishEventsResponse) Descriptor() ([]byte, []int) { + return file_confidence_events_v1_api_proto_rawDescGZIP(), []int{1} +} + +func (x *PublishEventsResponse) GetErrors() []*EventError { + if x != nil { + return x.Errors + } + return nil +} + +var File_confidence_events_v1_api_proto protoreflect.FileDescriptor + +const file_confidence_events_v1_api_proto_rawDesc = "" + + "\n" + + "\x1econfidence/events/v1/api.proto\x12\x14confidence.events.v1\x1a\x1fgoogle/protobuf/timestamp.proto\x1a confidence/events/v1/types.proto\"\xa9\x01\n" + + "\x14PublishEventsRequest\x12#\n" + + "\rclient_secret\x18\x01 \x01(\tR\fclientSecret\x123\n" + + "\x06events\x18\x02 \x03(\v2\x1b.confidence.events.v1.EventR\x06events\x127\n" + + "\tsend_time\x18\x03 \x01(\v2\x1a.google.protobuf.TimestampR\bsendTime\"Q\n" + + "\x15PublishEventsResponse\x128\n" + + "\x06errors\x18\x01 \x03(\v2 .confidence.events.v1.EventErrorR\x06errors2{\n" + + "\rEventsService\x12j\n" + + "\rPublishEvents\x12*.confidence.events.v1.PublishEventsRequest\x1a+.confidence.events.v1.PublishEventsResponse\"\x00B\x93\x01\n" + + "$com.spotify.confidence.sdk.events.v1B\bApiProtoP\x01Z_github.com/spotify/confidence-resolver/openfeature-provider/go/confidence/internal/proto/eventsb\x06proto3" + +var ( + file_confidence_events_v1_api_proto_rawDescOnce sync.Once + file_confidence_events_v1_api_proto_rawDescData []byte +) + +func file_confidence_events_v1_api_proto_rawDescGZIP() []byte { + file_confidence_events_v1_api_proto_rawDescOnce.Do(func() { + file_confidence_events_v1_api_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_confidence_events_v1_api_proto_rawDesc), len(file_confidence_events_v1_api_proto_rawDesc))) + }) + return file_confidence_events_v1_api_proto_rawDescData +} + +var file_confidence_events_v1_api_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_confidence_events_v1_api_proto_goTypes = []any{ + (*PublishEventsRequest)(nil), // 0: confidence.events.v1.PublishEventsRequest + (*PublishEventsResponse)(nil), // 1: confidence.events.v1.PublishEventsResponse + (*Event)(nil), // 2: confidence.events.v1.Event + (*timestamppb.Timestamp)(nil), // 3: google.protobuf.Timestamp + (*EventError)(nil), // 4: confidence.events.v1.EventError +} +var file_confidence_events_v1_api_proto_depIdxs = []int32{ + 2, // 0: confidence.events.v1.PublishEventsRequest.events:type_name -> confidence.events.v1.Event + 3, // 1: confidence.events.v1.PublishEventsRequest.send_time:type_name -> google.protobuf.Timestamp + 4, // 2: confidence.events.v1.PublishEventsResponse.errors:type_name -> confidence.events.v1.EventError + 0, // 3: confidence.events.v1.EventsService.PublishEvents:input_type -> confidence.events.v1.PublishEventsRequest + 1, // 4: confidence.events.v1.EventsService.PublishEvents:output_type -> confidence.events.v1.PublishEventsResponse + 4, // [4:5] is the sub-list for method output_type + 3, // [3:4] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name +} + +func init() { file_confidence_events_v1_api_proto_init() } +func file_confidence_events_v1_api_proto_init() { + if File_confidence_events_v1_api_proto != nil { + return + } + file_confidence_events_v1_types_proto_init() + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_confidence_events_v1_api_proto_rawDesc), len(file_confidence_events_v1_api_proto_rawDesc)), + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_confidence_events_v1_api_proto_goTypes, + DependencyIndexes: file_confidence_events_v1_api_proto_depIdxs, + MessageInfos: file_confidence_events_v1_api_proto_msgTypes, + }.Build() + File_confidence_events_v1_api_proto = out.File + file_confidence_events_v1_api_proto_goTypes = nil + file_confidence_events_v1_api_proto_depIdxs = nil +} diff --git a/openfeature-provider/go/confidence/internal/proto/events/api_grpc.pb.go b/openfeature-provider/go/confidence/internal/proto/events/api_grpc.pb.go new file mode 100644 index 00000000..ec217cac --- /dev/null +++ b/openfeature-provider/go/confidence/internal/proto/events/api_grpc.pb.go @@ -0,0 +1,121 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.5.1 +// - protoc v5.29.3 +// source: confidence/events/v1/api.proto + +package events + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + EventsService_PublishEvents_FullMethodName = "/confidence.events.v1.EventsService/PublishEvents" +) + +// EventsServiceClient is the client API for EventsService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type EventsServiceClient interface { + PublishEvents(ctx context.Context, in *PublishEventsRequest, opts ...grpc.CallOption) (*PublishEventsResponse, error) +} + +type eventsServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewEventsServiceClient(cc grpc.ClientConnInterface) EventsServiceClient { + return &eventsServiceClient{cc} +} + +func (c *eventsServiceClient) PublishEvents(ctx context.Context, in *PublishEventsRequest, opts ...grpc.CallOption) (*PublishEventsResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(PublishEventsResponse) + err := c.cc.Invoke(ctx, EventsService_PublishEvents_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// EventsServiceServer is the server API for EventsService service. +// All implementations must embed UnimplementedEventsServiceServer +// for forward compatibility. +type EventsServiceServer interface { + PublishEvents(context.Context, *PublishEventsRequest) (*PublishEventsResponse, error) + mustEmbedUnimplementedEventsServiceServer() +} + +// UnimplementedEventsServiceServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedEventsServiceServer struct{} + +func (UnimplementedEventsServiceServer) PublishEvents(context.Context, *PublishEventsRequest) (*PublishEventsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method PublishEvents not implemented") +} +func (UnimplementedEventsServiceServer) mustEmbedUnimplementedEventsServiceServer() {} +func (UnimplementedEventsServiceServer) testEmbeddedByValue() {} + +// UnsafeEventsServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to EventsServiceServer will +// result in compilation errors. +type UnsafeEventsServiceServer interface { + mustEmbedUnimplementedEventsServiceServer() +} + +func RegisterEventsServiceServer(s grpc.ServiceRegistrar, srv EventsServiceServer) { + // If the following call pancis, it indicates UnimplementedEventsServiceServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&EventsService_ServiceDesc, srv) +} + +func _EventsService_PublishEvents_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PublishEventsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(EventsServiceServer).PublishEvents(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: EventsService_PublishEvents_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(EventsServiceServer).PublishEvents(ctx, req.(*PublishEventsRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// EventsService_ServiceDesc is the grpc.ServiceDesc for EventsService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var EventsService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "confidence.events.v1.EventsService", + HandlerType: (*EventsServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "PublishEvents", + Handler: _EventsService_PublishEvents_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "confidence/events/v1/api.proto", +} diff --git a/openfeature-provider/go/confidence/internal/proto/events/types.pb.go b/openfeature-provider/go/confidence/internal/proto/events/types.pb.go new file mode 100644 index 00000000..1d38e8a3 --- /dev/null +++ b/openfeature-provider/go/confidence/internal/proto/events/types.pb.go @@ -0,0 +1,272 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.10 +// protoc v5.29.3 +// source: confidence/events/v1/types.proto + +package events + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + structpb "google.golang.org/protobuf/types/known/structpb" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type EventError_Reason int32 + +const ( + EventError_REASON_UNSPECIFIED EventError_Reason = 0 + EventError_EVENT_DEFINITION_NOT_FOUND EventError_Reason = 1 + EventError_EVENT_SCHEMA_VALIDATION_FAILED EventError_Reason = 2 +) + +// Enum value maps for EventError_Reason. +var ( + EventError_Reason_name = map[int32]string{ + 0: "REASON_UNSPECIFIED", + 1: "EVENT_DEFINITION_NOT_FOUND", + 2: "EVENT_SCHEMA_VALIDATION_FAILED", + } + EventError_Reason_value = map[string]int32{ + "REASON_UNSPECIFIED": 0, + "EVENT_DEFINITION_NOT_FOUND": 1, + "EVENT_SCHEMA_VALIDATION_FAILED": 2, + } +) + +func (x EventError_Reason) Enum() *EventError_Reason { + p := new(EventError_Reason) + *p = x + return p +} + +func (x EventError_Reason) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (EventError_Reason) Descriptor() protoreflect.EnumDescriptor { + return file_confidence_events_v1_types_proto_enumTypes[0].Descriptor() +} + +func (EventError_Reason) Type() protoreflect.EnumType { + return &file_confidence_events_v1_types_proto_enumTypes[0] +} + +func (x EventError_Reason) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use EventError_Reason.Descriptor instead. +func (EventError_Reason) EnumDescriptor() ([]byte, []int) { + return file_confidence_events_v1_types_proto_rawDescGZIP(), []int{1, 0} +} + +type Event struct { + state protoimpl.MessageState `protogen:"open.v1"` + EventDefinition string `protobuf:"bytes,1,opt,name=event_definition,json=eventDefinition,proto3" json:"event_definition,omitempty"` + Payload *structpb.Struct `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` + EventTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=event_time,json=eventTime,proto3" json:"event_time,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Event) Reset() { + *x = Event{} + mi := &file_confidence_events_v1_types_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Event) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Event) ProtoMessage() {} + +func (x *Event) ProtoReflect() protoreflect.Message { + mi := &file_confidence_events_v1_types_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Event.ProtoReflect.Descriptor instead. +func (*Event) Descriptor() ([]byte, []int) { + return file_confidence_events_v1_types_proto_rawDescGZIP(), []int{0} +} + +func (x *Event) GetEventDefinition() string { + if x != nil { + return x.EventDefinition + } + return "" +} + +func (x *Event) GetPayload() *structpb.Struct { + if x != nil { + return x.Payload + } + return nil +} + +func (x *Event) GetEventTime() *timestamppb.Timestamp { + if x != nil { + return x.EventTime + } + return nil +} + +type EventError struct { + state protoimpl.MessageState `protogen:"open.v1"` + Index int32 `protobuf:"varint,1,opt,name=index,proto3" json:"index,omitempty"` + Reason EventError_Reason `protobuf:"varint,2,opt,name=reason,proto3,enum=confidence.events.v1.EventError_Reason" json:"reason,omitempty"` + Message string `protobuf:"bytes,3,opt,name=message,proto3" json:"message,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *EventError) Reset() { + *x = EventError{} + mi := &file_confidence_events_v1_types_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *EventError) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EventError) ProtoMessage() {} + +func (x *EventError) ProtoReflect() protoreflect.Message { + mi := &file_confidence_events_v1_types_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EventError.ProtoReflect.Descriptor instead. +func (*EventError) Descriptor() ([]byte, []int) { + return file_confidence_events_v1_types_proto_rawDescGZIP(), []int{1} +} + +func (x *EventError) GetIndex() int32 { + if x != nil { + return x.Index + } + return 0 +} + +func (x *EventError) GetReason() EventError_Reason { + if x != nil { + return x.Reason + } + return EventError_REASON_UNSPECIFIED +} + +func (x *EventError) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +var File_confidence_events_v1_types_proto protoreflect.FileDescriptor + +const file_confidence_events_v1_types_proto_rawDesc = "" + + "\n" + + " confidence/events/v1/types.proto\x12\x14confidence.events.v1\x1a\x1cgoogle/protobuf/struct.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"\xa0\x01\n" + + "\x05Event\x12)\n" + + "\x10event_definition\x18\x01 \x01(\tR\x0feventDefinition\x121\n" + + "\apayload\x18\x02 \x01(\v2\x17.google.protobuf.StructR\apayload\x129\n" + + "\n" + + "event_time\x18\x03 \x01(\v2\x1a.google.protobuf.TimestampR\teventTime\"\xe3\x01\n" + + "\n" + + "EventError\x12\x14\n" + + "\x05index\x18\x01 \x01(\x05R\x05index\x12?\n" + + "\x06reason\x18\x02 \x01(\x0e2'.confidence.events.v1.EventError.ReasonR\x06reason\x12\x18\n" + + "\amessage\x18\x03 \x01(\tR\amessage\"d\n" + + "\x06Reason\x12\x16\n" + + "\x12REASON_UNSPECIFIED\x10\x00\x12\x1e\n" + + "\x1aEVENT_DEFINITION_NOT_FOUND\x10\x01\x12\"\n" + + "\x1eEVENT_SCHEMA_VALIDATION_FAILED\x10\x02B\x95\x01\n" + + "$com.spotify.confidence.sdk.events.v1B\n" + + "TypesProtoP\x01Z_github.com/spotify/confidence-resolver/openfeature-provider/go/confidence/internal/proto/eventsb\x06proto3" + +var ( + file_confidence_events_v1_types_proto_rawDescOnce sync.Once + file_confidence_events_v1_types_proto_rawDescData []byte +) + +func file_confidence_events_v1_types_proto_rawDescGZIP() []byte { + file_confidence_events_v1_types_proto_rawDescOnce.Do(func() { + file_confidence_events_v1_types_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_confidence_events_v1_types_proto_rawDesc), len(file_confidence_events_v1_types_proto_rawDesc))) + }) + return file_confidence_events_v1_types_proto_rawDescData +} + +var file_confidence_events_v1_types_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_confidence_events_v1_types_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_confidence_events_v1_types_proto_goTypes = []any{ + (EventError_Reason)(0), // 0: confidence.events.v1.EventError.Reason + (*Event)(nil), // 1: confidence.events.v1.Event + (*EventError)(nil), // 2: confidence.events.v1.EventError + (*structpb.Struct)(nil), // 3: google.protobuf.Struct + (*timestamppb.Timestamp)(nil), // 4: google.protobuf.Timestamp +} +var file_confidence_events_v1_types_proto_depIdxs = []int32{ + 3, // 0: confidence.events.v1.Event.payload:type_name -> google.protobuf.Struct + 4, // 1: confidence.events.v1.Event.event_time:type_name -> google.protobuf.Timestamp + 0, // 2: confidence.events.v1.EventError.reason:type_name -> confidence.events.v1.EventError.Reason + 3, // [3:3] is the sub-list for method output_type + 3, // [3:3] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name +} + +func init() { file_confidence_events_v1_types_proto_init() } +func file_confidence_events_v1_types_proto_init() { + if File_confidence_events_v1_types_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_confidence_events_v1_types_proto_rawDesc), len(file_confidence_events_v1_types_proto_rawDesc)), + NumEnums: 1, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_confidence_events_v1_types_proto_goTypes, + DependencyIndexes: file_confidence_events_v1_types_proto_depIdxs, + EnumInfos: file_confidence_events_v1_types_proto_enumTypes, + MessageInfos: file_confidence_events_v1_types_proto_msgTypes, + }.Build() + File_confidence_events_v1_types_proto = out.File + file_confidence_events_v1_types_proto_goTypes = nil + file_confidence_events_v1_types_proto_depIdxs = nil +} diff --git a/openfeature-provider/go/confidence/internal/proto/wasm/messages.pb.go b/openfeature-provider/go/confidence/internal/proto/wasm/messages.pb.go index 25ddc2d4..34a5caf9 100644 --- a/openfeature-provider/go/confidence/internal/proto/wasm/messages.pb.go +++ b/openfeature-provider/go/confidence/internal/proto/wasm/messages.pb.go @@ -10,6 +10,8 @@ import ( resolver "github.com/spotify/confidence-resolver/openfeature-provider/go/confidence/internal/proto/resolver" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + structpb "google.golang.org/protobuf/types/known/structpb" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" unsafe "unsafe" @@ -244,11 +246,115 @@ func (*Response_Data) isResponse_Result() {} func (*Response_Error) isResponse_Result() {} +type Event struct { + state protoimpl.MessageState `protogen:"open.v1"` + EventDefinition string `protobuf:"bytes,1,opt,name=event_definition,json=eventDefinition,proto3" json:"event_definition,omitempty"` + Payload *structpb.Struct `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` + EventTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=event_time,json=eventTime,proto3" json:"event_time,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Event) Reset() { + *x = Event{} + mi := &file_confidence_wasm_messages_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Event) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Event) ProtoMessage() {} + +func (x *Event) ProtoReflect() protoreflect.Message { + mi := &file_confidence_wasm_messages_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Event.ProtoReflect.Descriptor instead. +func (*Event) Descriptor() ([]byte, []int) { + return file_confidence_wasm_messages_proto_rawDescGZIP(), []int{4} +} + +func (x *Event) GetEventDefinition() string { + if x != nil { + return x.EventDefinition + } + return "" +} + +func (x *Event) GetPayload() *structpb.Struct { + if x != nil { + return x.Payload + } + return nil +} + +func (x *Event) GetEventTime() *timestamppb.Timestamp { + if x != nil { + return x.EventTime + } + return nil +} + +type FlushEventsResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Events []*Event `protobuf:"bytes,2,rep,name=events,proto3" json:"events,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *FlushEventsResponse) Reset() { + *x = FlushEventsResponse{} + mi := &file_confidence_wasm_messages_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *FlushEventsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FlushEventsResponse) ProtoMessage() {} + +func (x *FlushEventsResponse) ProtoReflect() protoreflect.Message { + mi := &file_confidence_wasm_messages_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FlushEventsResponse.ProtoReflect.Descriptor instead. +func (*FlushEventsResponse) Descriptor() ([]byte, []int) { + return file_confidence_wasm_messages_proto_rawDescGZIP(), []int{5} +} + +func (x *FlushEventsResponse) GetEvents() []*Event { + if x != nil { + return x.Events + } + return nil +} + var File_confidence_wasm_messages_proto protoreflect.FileDescriptor const file_confidence_wasm_messages_proto_rawDesc = "" + "\n" + - "\x1econfidence/wasm/messages.proto\x12\x0fconfidence.wasm\x1a(confidence/flags/resolver/v1/types.proto\"\x06\n" + + "\x1econfidence/wasm/messages.proto\x12\x0fconfidence.wasm\x1a(confidence/flags/resolver/v1/types.proto\x1a\x1cgoogle/protobuf/struct.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"\x06\n" + "\x04Void\"\x83\x01\n" + "\x17SetResolverStateRequest\x12\x14\n" + "\x05state\x18\x01 \x01(\fR\x05state\x12\x1d\n" + @@ -260,7 +366,14 @@ const file_confidence_wasm_messages_proto_rawDesc = "" + "\bResponse\x12\x14\n" + "\x04data\x18\x01 \x01(\fH\x00R\x04data\x12\x16\n" + "\x05error\x18\x02 \x01(\tH\x00R\x05errorB\b\n" + - "\x06resultB\x8c\x01\n" + + "\x06result\"\xa0\x01\n" + + "\x05Event\x12)\n" + + "\x10event_definition\x18\x01 \x01(\tR\x0feventDefinition\x121\n" + + "\apayload\x18\x02 \x01(\v2\x17.google.protobuf.StructR\apayload\x129\n" + + "\n" + + "event_time\x18\x03 \x01(\v2\x1a.google.protobuf.TimestampR\teventTime\"E\n" + + "\x13FlushEventsResponse\x12.\n" + + "\x06events\x18\x02 \x03(\v2\x16.confidence.wasm.EventR\x06eventsB\x8c\x01\n" + "\x1fcom.spotify.confidence.sdk.wasmB\bMessagesP\x00Z]github.com/spotify/confidence-resolver/openfeature-provider/go/confidence/internal/proto/wasmb\x06proto3" var ( @@ -275,21 +388,28 @@ func file_confidence_wasm_messages_proto_rawDescGZIP() []byte { return file_confidence_wasm_messages_proto_rawDescData } -var file_confidence_wasm_messages_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_confidence_wasm_messages_proto_msgTypes = make([]protoimpl.MessageInfo, 6) var file_confidence_wasm_messages_proto_goTypes = []any{ (*Void)(nil), // 0: confidence.wasm.Void (*SetResolverStateRequest)(nil), // 1: confidence.wasm.SetResolverStateRequest (*Request)(nil), // 2: confidence.wasm.Request (*Response)(nil), // 3: confidence.wasm.Response - (*resolver.Sdk)(nil), // 4: confidence.flags.resolver.v1.Sdk + (*Event)(nil), // 4: confidence.wasm.Event + (*FlushEventsResponse)(nil), // 5: confidence.wasm.FlushEventsResponse + (*resolver.Sdk)(nil), // 6: confidence.flags.resolver.v1.Sdk + (*structpb.Struct)(nil), // 7: google.protobuf.Struct + (*timestamppb.Timestamp)(nil), // 8: google.protobuf.Timestamp } var file_confidence_wasm_messages_proto_depIdxs = []int32{ - 4, // 0: confidence.wasm.SetResolverStateRequest.sdk:type_name -> confidence.flags.resolver.v1.Sdk - 1, // [1:1] is the sub-list for method output_type - 1, // [1:1] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name + 6, // 0: confidence.wasm.SetResolverStateRequest.sdk:type_name -> confidence.flags.resolver.v1.Sdk + 7, // 1: confidence.wasm.Event.payload:type_name -> google.protobuf.Struct + 8, // 2: confidence.wasm.Event.event_time:type_name -> google.protobuf.Timestamp + 4, // 3: confidence.wasm.FlushEventsResponse.events:type_name -> confidence.wasm.Event + 4, // [4:4] is the sub-list for method output_type + 4, // [4:4] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name } func init() { file_confidence_wasm_messages_proto_init() } @@ -307,7 +427,7 @@ func file_confidence_wasm_messages_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_confidence_wasm_messages_proto_rawDesc), len(file_confidence_wasm_messages_proto_rawDesc)), NumEnums: 0, - NumMessages: 4, + NumMessages: 6, NumExtensions: 0, NumServices: 0, }, diff --git a/openfeature-provider/go/confidence/internal/testutil/helpers.go b/openfeature-provider/go/confidence/internal/testutil/helpers.go index 52ac8044..e039dd79 100644 --- a/openfeature-provider/go/confidence/internal/testutil/helpers.go +++ b/openfeature-provider/go/confidence/internal/testutil/helpers.go @@ -429,6 +429,10 @@ func (m *MockedLocalResolver) ResolveProcess(*wasm.ResolveProcessRequest) (*wasm } func (m MockedLocalResolver) SetResolverState(*wasm.SetResolverStateRequest) error { return nil } func (m MockedLocalResolver) ApplyFlags(*resolver.ApplyFlagsRequest) error { return nil } +func (m MockedLocalResolver) TrackEvent(*wasm.Event) error { return nil } +func (m MockedLocalResolver) FlushEvents() (*wasm.FlushEventsResponse, error) { + return &wasm.FlushEventsResponse{}, nil +} func MustJSONToProto(jsonString string) *structpb.Value { var v structpb.Value diff --git a/openfeature-provider/go/confidence/materialization.go b/openfeature-provider/go/confidence/materialization.go index 4b2e1ebd..90b1acf7 100644 --- a/openfeature-provider/go/confidence/materialization.go +++ b/openfeature-provider/go/confidence/materialization.go @@ -185,6 +185,14 @@ func (m *materializationSupportedResolver) SetResolverState(request *wasm.SetRes return m.current.SetResolverState(request) } +func (m *materializationSupportedResolver) TrackEvent(event *wasm.Event) error { + return m.current.TrackEvent(event) +} + +func (m *materializationSupportedResolver) FlushEvents() (*wasm.FlushEventsResponse, error) { + return m.current.FlushEvents() +} + func (m *materializationSupportedResolver) Close(ctx context.Context) error { return m.current.Close(ctx) } diff --git a/openfeature-provider/go/confidence/provider.go b/openfeature-provider/go/confidence/provider.go index 659c6a65..489f78e1 100644 --- a/openfeature-provider/go/confidence/provider.go +++ b/openfeature-provider/go/confidence/provider.go @@ -16,6 +16,7 @@ import ( resolvertypes "github.com/spotify/confidence-resolver/openfeature-provider/go/confidence/internal/proto/resolver" "github.com/spotify/confidence-resolver/openfeature-provider/go/confidence/internal/proto/wasm" "google.golang.org/protobuf/types/known/structpb" + "google.golang.org/protobuf/types/known/timestamppb" ) const ( @@ -54,6 +55,7 @@ type LocalResolverProvider struct { resolver lr.LocalResolver stateProvider StateProvider flagLogger FlagLogger + eventSender EventSender clientSecret string logger *slog.Logger cancelFunc context.CancelFunc @@ -67,6 +69,7 @@ type LocalResolverProvider struct { var ( _ openfeature.FeatureProvider = (*LocalResolverProvider)(nil) _ openfeature.StateHandler = (*LocalResolverProvider)(nil) + _ openfeature.Tracker = (*LocalResolverProvider)(nil) ) // NewLocalResolverProvider creates a new LocalResolverProvider @@ -74,6 +77,7 @@ func NewLocalResolverProvider( resolverSupplier LocalResolverSupplier, stateProvider StateProvider, flagLogger FlagLogger, + eventSender EventSender, clientSecret string, logger *slog.Logger, opts ...Option, @@ -105,6 +109,7 @@ func NewLocalResolverProvider( resolverSupplier: resolverSupplier, stateProvider: stateProvider, flagLogger: flagLogger, + eventSender: eventSender, clientSecret: clientSecret, logger: logger, statePollInterval: statePollInterval, @@ -363,6 +368,33 @@ func (p *LocalResolverProvider) ApplyFlags( return p.resolver.ApplyFlags(request) } +// Track implements the openfeature.Tracker interface for recording business events. +func (p *LocalResolverProvider) Track( + ctx context.Context, + trackingEventName string, + evalCtx openfeature.EvaluationContext, + details openfeature.TrackingEventDetails, +) { + if p.resolver == nil { + return + } + + processedCtx := processTargetingKey(evalCtx.Attributes()) + protoCtx, err := flattenedContextToProto(processedCtx) + if err != nil { + p.logger.Error("Failed to convert context for track event", "error", err) + return + } + + if err := p.resolver.TrackEvent(&wasm.Event{ + EventDefinition: "eventDefinitions/" + trackingEventName, + Payload: protoCtx, + EventTime: timestamppb.Now(), + }); err != nil { + p.logger.Error("Failed to track event", "event", trackingEventName, "error", err) + } +} + // Hooks returns provider hooks (none for this implementation) func (p *LocalResolverProvider) Hooks() []openfeature.Hook { return []openfeature.Hook{} @@ -470,6 +502,14 @@ func (p *LocalResolverProvider) Shutdown() { } } + // Shutdown event sender + if p.eventSender != nil { + p.eventSender.Shutdown() + if p.logger != nil { + p.logger.Debug("Shut down event sender") + } + } + if p.logger != nil { p.logger.Info("Provider has been shut down") } @@ -538,6 +578,11 @@ func (p *LocalResolverProvider) startScheduledTasks(parentCtx context.Context) { if err := p.resolver.FlushAllLogs(); err != nil { p.logger.Error("Failed to flush all logs", "error", err) } + if resp, err := p.resolver.FlushEvents(); err != nil { + p.logger.Error("Failed to flush events", "error", err) + } else if len(resp.GetEvents()) > 0 && p.eventSender != nil { + p.eventSender.Send(resp, p.clientSecret) + } case <-assignTicker.C: if err := p.resolver.FlushAssignLogs(); err != nil { p.logger.Error("Failed to flush assign logs", "error", err) diff --git a/openfeature-provider/go/confidence/provider_builder.go b/openfeature-provider/go/confidence/provider_builder.go index 7760f5df..8dc0c59e 100644 --- a/openfeature-provider/go/confidence/provider_builder.go +++ b/openfeature-provider/go/confidence/provider_builder.go @@ -82,12 +82,18 @@ func NewProvider(ctx context.Context, config ProviderConfig) (*LocalResolverProv materializationStore = newRemoteMaterializationStore(resolverv1.NewInternalFlagLoggerServiceClient(conn), config.ClientSecret) } + // Create event sender using the same gRPC connection + eventSender, err := NewGrpcEventSender(target, logger, opts...) + if err != nil { + return nil, fmt.Errorf("failed to create event sender: %w", err) + } + resolverSupplier := func(ctx context.Context, logSink lr.LogSink) lr.LocalResolver { return lr.NewLocalResolverWithPoolSize(ctx, logSink, config.ResolverPoolSize) } resolverSupplierWithMaterialization := wrapResolverSupplierWithMaterializations(resolverSupplier, materializationStore) providerOpts := buildProviderOptions(config.StatePollInterval, config.LogPollInterval) - provider := NewLocalResolverProvider(resolverSupplierWithMaterialization, stateProvider, flagLogger, config.ClientSecret, logger, providerOpts...) + provider := NewLocalResolverProvider(resolverSupplierWithMaterialization, stateProvider, flagLogger, eventSender, config.ClientSecret, logger, providerOpts...) return provider, nil } @@ -116,7 +122,7 @@ func NewProviderForTest(ctx context.Context, config ProviderTestConfig) (*LocalR } resolverSupplierWithMaterialization := wrapResolverSupplierWithMaterializations(resolverSupplier, materializationStore) providerOpts := buildProviderOptions(config.StatePollInterval, config.LogPollInterval) - provider := NewLocalResolverProvider(resolverSupplierWithMaterialization, config.StateProvider, config.FlagLogger, config.ClientSecret, logger, providerOpts...) + provider := NewLocalResolverProvider(resolverSupplierWithMaterialization, config.StateProvider, config.FlagLogger, nil, config.ClientSecret, logger, providerOpts...) return provider, nil } diff --git a/openfeature-provider/go/confidence/provider_resolve_test.go b/openfeature-provider/go/confidence/provider_resolve_test.go index 6541f7ea..80529e81 100644 --- a/openfeature-provider/go/confidence/provider_resolve_test.go +++ b/openfeature-provider/go/confidence/provider_resolve_test.go @@ -43,7 +43,7 @@ func TestLocalResolverProvider_ReturnsDefaultOnError(t *testing.T) { return lr.NewLocalResolverWithPoolSize(ctx, logSink, 2) }, unsupportedMatStore) // Use different client secret that won't match - openfeature.SetProviderAndWait(NewLocalResolverProvider(resolverSupplier, stateProvider, mockFlagLogger, "test-secret", slog.New(slog.NewTextHandler(os.Stderr, nil)))) + openfeature.SetProviderAndWait(NewLocalResolverProvider(resolverSupplier, stateProvider, mockFlagLogger, nil, "test-secret", slog.New(slog.NewTextHandler(os.Stderr, nil)))) client := openfeature.NewClient("test-client") evalCtx := openfeature.NewTargetlessEvaluationContext(map[string]interface{}{ @@ -90,7 +90,7 @@ func TestLocalResolverProvider_ReturnsCorrectValue(t *testing.T) { return lr.NewLocalResolverWithPoolSize(ctx, logSink, 2) }, unsupportedMatStore) // Use the correct client secret from test data - openfeature.SetProviderAndWait(NewLocalResolverProvider(resolverSupplier, stateProvider, mockFlagLogger, "mkjJruAATQWjeY7foFIWfVAcBWnci2YF", slog.New(slog.NewTextHandler(os.Stderr, nil)))) + openfeature.SetProviderAndWait(NewLocalResolverProvider(resolverSupplier, stateProvider, mockFlagLogger, nil, "mkjJruAATQWjeY7foFIWfVAcBWnci2YF", slog.New(slog.NewTextHandler(os.Stderr, nil)))) client := openfeature.NewClient("test-client") evalCtx := openfeature.NewTargetlessEvaluationContext(map[string]interface{}{ @@ -173,7 +173,7 @@ func TestLocalResolverProvider_PathNotFound(t *testing.T) { return lr.NewLocalResolverWithPoolSize(ctx, logSink, 2) }, unsupportedMatStore) // Use the correct client secret from test data - openfeature.SetProviderAndWait(NewLocalResolverProvider(resolverSupplier, stateProvider, mockFlagLogger, "mkjJruAATQWjeY7foFIWfVAcBWnci2YF", slog.New(slog.NewTextHandler(os.Stderr, nil)))) + openfeature.SetProviderAndWait(NewLocalResolverProvider(resolverSupplier, stateProvider, mockFlagLogger, nil, "mkjJruAATQWjeY7foFIWfVAcBWnci2YF", slog.New(slog.NewTextHandler(os.Stderr, nil)))) client := openfeature.NewClient("test-client") evalCtx := openfeature.NewTargetlessEvaluationContext(map[string]interface{}{ @@ -240,7 +240,7 @@ func TestLocalResolverProvider_MissingMaterializations(t *testing.T) { resolverSupplier := wrapResolverSupplierWithMaterializations(func(ctx context.Context, logSink lr.LogSink) lr.LocalResolver { return lr.NewLocalResolverWithPoolSize(ctx, logSink, 2) }, unsupportedMatStore) - openfeature.SetProviderAndWait(NewLocalResolverProvider(resolverSupplier, stateProvider, mockFlagLogger, "mkjJruAATQWjeY7foFIWfVAcBWnci2YF", slog.New(slog.NewTextHandler(os.Stderr, nil)))) + openfeature.SetProviderAndWait(NewLocalResolverProvider(resolverSupplier, stateProvider, mockFlagLogger, nil, "mkjJruAATQWjeY7foFIWfVAcBWnci2YF", slog.New(slog.NewTextHandler(os.Stderr, nil)))) client := openfeature.NewClient("test-client") evalCtx := openfeature.NewTargetlessEvaluationContext(map[string]interface{}{ @@ -281,7 +281,7 @@ func TestLocalResolverProvider_MissingMaterializations(t *testing.T) { resolverSupplier := wrapResolverSupplierWithMaterializations(func(ctx context.Context, logSink lr.LogSink) lr.LocalResolver { return lr.NewLocalResolverWithPoolSize(ctx, logSink, 2) }, unsupportedMatStore) - openfeature.SetProviderAndWait(NewLocalResolverProvider(resolverSupplier, stateProvider, mockFlagLogger, "test-secret", slog.New(slog.NewTextHandler(os.Stderr, nil)))) + openfeature.SetProviderAndWait(NewLocalResolverProvider(resolverSupplier, stateProvider, mockFlagLogger, nil, "test-secret", slog.New(slog.NewTextHandler(os.Stderr, nil)))) client := openfeature.NewClient("test-client") evalCtx := openfeature.NewTargetlessEvaluationContext(map[string]interface{}{ diff --git a/openfeature-provider/go/confidence/provider_test.go b/openfeature-provider/go/confidence/provider_test.go index 26777983..2fe870d9 100644 --- a/openfeature-provider/go/confidence/provider_test.go +++ b/openfeature-provider/go/confidence/provider_test.go @@ -15,7 +15,7 @@ import ( ) func TestNewLocalResolverProvider(t *testing.T) { - provider := NewLocalResolverProvider(nil, nil, nil, "test-secret", nil) + provider := NewLocalResolverProvider(nil, nil, nil, nil, "test-secret", nil) if provider == nil { t.Fatal("Expected provider to be created, got nil") @@ -26,7 +26,7 @@ func TestNewLocalResolverProvider(t *testing.T) { } func TestLocalResolverProvider_Metadata(t *testing.T) { - provider := NewLocalResolverProvider(nil, nil, nil, "secret", nil) + provider := NewLocalResolverProvider(nil, nil, nil, nil, "secret", nil) metadata := provider.Metadata() if metadata.Name != "confidence-sdk-go-local" { @@ -35,7 +35,7 @@ func TestLocalResolverProvider_Metadata(t *testing.T) { } func TestLocalResolverProvider_Hooks(t *testing.T) { - provider := NewLocalResolverProvider(nil, nil, nil, "secret", nil) + provider := NewLocalResolverProvider(nil, nil, nil, nil, "secret", nil) hooks := provider.Hooks() if hooks == nil { @@ -429,7 +429,7 @@ func TestFlattenedContextToProto_InvalidValue(t *testing.T) { } func TestLocalResolverProvider_Shutdown(t *testing.T) { - provider := NewLocalResolverProvider(nil, nil, nil, "secret", nil) + provider := NewLocalResolverProvider(nil, nil, nil, nil, "secret", nil) provider.Shutdown() // Verify the method can be called without panicking even with nil components @@ -437,7 +437,7 @@ func TestLocalResolverProvider_Shutdown(t *testing.T) { } func TestLocalResolverProvider_ShutdownWithCancelFunc(t *testing.T) { - provider := NewLocalResolverProvider(nil, nil, nil, "secret", nil) + provider := NewLocalResolverProvider(nil, nil, nil, nil, "secret", nil) // Simulate Init having been called by setting cancelFunc cancelCalled := false @@ -503,12 +503,18 @@ func (m *mockResolverAPIForInit) ApplyFlags(request *resolver.ApplyFlagsRequest) return nil } +func (m *mockResolverAPIForInit) TrackEvent(event *wasm.Event) error { return nil } +func (m *mockResolverAPIForInit) FlushEvents() (*wasm.FlushEventsResponse, error) { + return &wasm.FlushEventsResponse{}, nil +} + // TestLocalResolverProvider_Init_NilStateProvider verifies Init fails when stateProvider is nil func TestLocalResolverProvider_Init_NilStateProvider(t *testing.T) { provider := NewLocalResolverProvider( mockResolverSupplier, nil, // nil state provider &tu.MockFlagLogger{}, + nil, "secret", nil, ) @@ -528,6 +534,7 @@ func TestLocalResolverProvider_Init_NilResolverAPI(t *testing.T) { nil, // nil resolver API &tu.StateProviderMock{}, &tu.MockFlagLogger{}, + nil, "secret", nil, ) @@ -547,6 +554,7 @@ func TestLocalResolverProvider_Init_NilFlagLogger(t *testing.T) { mockResolverSupplier, &tu.StateProviderMock{}, nil, // nil flag logger + nil, "secret", nil, ) @@ -574,6 +582,7 @@ func TestLocalResolverProvider_Init_StateProviderError(t *testing.T) { mockResolverSupplier, mockStateProvider, mockFlagLogger, + nil, "secret", nil, ) @@ -604,6 +613,7 @@ func TestLocalResolverProvider_Init_EmptyAccountID(t *testing.T) { func(ctx context.Context, ls lr.LogSink) lr.LocalResolver { return mockResolverAPI }, mockStateProvider, mockFlagLogger, + nil, "secret", nil, ) @@ -641,6 +651,7 @@ func TestLocalResolverProvider_Init_UpdateStateError(t *testing.T) { mockResolverSupplier, mockStateProvider, mockFlagLogger, + nil, "secret", nil, ) @@ -685,6 +696,7 @@ func TestLocalResolverProvider_Init_Success(t *testing.T) { mockResolverSupplier, mockStateProvider, mockFlagLogger, + nil, "secret", nil, ) diff --git a/openfeature-provider/go/scripts/generate_proto.sh b/openfeature-provider/go/scripts/generate_proto.sh index 58a5de19..2bf88a21 100755 --- a/openfeature-provider/go/scripts/generate_proto.sh +++ b/openfeature-provider/go/scripts/generate_proto.sh @@ -35,6 +35,7 @@ mkdir -p confidence/internal/proto/resolverinternal mkdir -p confidence/internal/proto/admin mkdir -p confidence/internal/proto/types mkdir -p confidence/internal/proto/wasm +mkdir -p confidence/internal/proto/events protoc --proto_path=../proto \ --go_out=confidence/internal/proto \ @@ -48,7 +49,9 @@ protoc --proto_path=../proto \ confidence/flags/resolver/v1/internal_api.proto \ confidence/flags/admin/v1/resolver.proto \ confidence/wasm/wasm_api.proto \ - confidence/wasm/messages.proto + confidence/wasm/messages.proto \ + confidence/events/v1/types.proto \ + confidence/events/v1/api.proto echo "Protobuf generation complete!" echo "Generated files:" diff --git a/openfeature-provider/java/pom.xml b/openfeature-provider/java/pom.xml index 5d1240ad..76de3d55 100644 --- a/openfeature-provider/java/pom.xml +++ b/openfeature-provider/java/pom.xml @@ -246,6 +246,7 @@ confidence/flags/resolver/v1/**/*.proto confidence/flags/types/v1/**/*.proto confidence/flags/admin/v1/**/*.proto + confidence/events/v1/**/*.proto confidence/wasm/*.proto diff --git a/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/GrpcEventSender.java b/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/GrpcEventSender.java new file mode 100644 index 00000000..e7aedcd2 --- /dev/null +++ b/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/GrpcEventSender.java @@ -0,0 +1,120 @@ +package com.spotify.confidence.sdk; + +import static com.spotify.confidence.sdk.GrpcUtil.createConfidenceChannel; + +import com.google.protobuf.Timestamp; +import com.spotify.confidence.sdk.events.v1.EventsServiceGrpc; +import com.spotify.confidence.sdk.events.v1.PublishEventsRequest; +import com.spotify.confidence.sdk.wasm.Messages; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class GrpcEventSender implements Consumer { + private static final Logger logger = LoggerFactory.getLogger(GrpcEventSender.class); + private static final Duration DEFAULT_SHUTDOWN_TIMEOUT = Duration.ofSeconds(10); + + private final String clientSecret; + private final EventsServiceGrpc.EventsServiceBlockingStub stub; + private final ExecutorService executorService; + private final Duration shutdownTimeout; + private final ManagedChannel channel; + + GrpcEventSender(String clientSecret, ChannelFactory channelFactory) { + this.clientSecret = clientSecret; + this.channel = createConfidenceChannel(channelFactory); + this.stub = addAuthInterceptor(EventsServiceGrpc.newBlockingStub(channel), clientSecret); + this.executorService = Executors.newCachedThreadPool(); + this.shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT; + } + + @Override + public void accept(Messages.FlushEventsResponse response) { + final PublishEventsRequest.Builder builder = + PublishEventsRequest.newBuilder().setClientSecret(clientSecret); + + for (Messages.Event wasmEvent : response.getEventsList()) { + builder.addEvents( + com.spotify.confidence.sdk.events.v1.Event.newBuilder() + .setEventDefinition(wasmEvent.getEventDefinition()) + .setPayload(wasmEvent.getPayload()) + .setEventTime(wasmEvent.getEventTime()) + .build()); + } + + java.time.Instant now = java.time.Instant.now(); + builder.setSendTime( + Timestamp.newBuilder().setSeconds(now.getEpochSecond()).setNanos(now.getNano()).build()); + + final PublishEventsRequest request = builder.build(); + + executorService.submit( + () -> { + try { + stub.publishEvents(request); + logger.debug("Successfully published {} events", response.getEventsCount()); + } catch (Exception e) { + logger.error("Failed to publish events", e); + } + }); + } + + void shutdown() { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS)) { + logger.warn("Event sender executor did not terminate gracefully"); + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + logger.warn("Interrupted while waiting for event sender shutdown", e); + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + + if (channel != null) { + channel.shutdown(); + try { + if (!channel.awaitTermination(shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS)) { + channel.shutdownNow(); + } + } catch (InterruptedException e) { + channel.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + } + + private static EventsServiceGrpc.EventsServiceBlockingStub addAuthInterceptor( + EventsServiceGrpc.EventsServiceBlockingStub stub, String clientSecret) { + return stub.withInterceptors( + new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return new ForwardingClientCall.SimpleForwardingClientCall( + next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + Metadata.Key authKey = + Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER); + headers.put(authKey, "ClientSecret " + clientSecret); + super.start(responseListener, headers); + } + }; + } + }); + } +} diff --git a/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/LocalResolver.java b/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/LocalResolver.java index bc65c064..f9a71ff4 100644 --- a/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/LocalResolver.java +++ b/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/LocalResolver.java @@ -4,6 +4,7 @@ import com.spotify.confidence.sdk.flags.resolver.v1.ResolveProcessRequest; import com.spotify.confidence.sdk.flags.resolver.v1.ResolveProcessResponse; import com.spotify.confidence.sdk.flags.resolver.v1.Sdk; +import com.spotify.confidence.sdk.wasm.Messages; import java.util.concurrent.CompletionStage; /** Common interface for the compositional local resolver layers. */ @@ -40,6 +41,16 @@ interface LocalResolver { /** Flushes pending assignment logs only. */ void flushAssignLogs(); + /** + * Tracks a business event for experimentation analytics. + * + * @param event the event to track + */ + void trackEvent(Messages.Event event); + + /** Flushes pending tracked events. */ + void flushEvents(); + /** Closes the resolver and releases resources. */ void close(); } diff --git a/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/MaterializingResolver.java b/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/MaterializingResolver.java index f8ee5c64..14bc4f5d 100644 --- a/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/MaterializingResolver.java +++ b/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/MaterializingResolver.java @@ -5,6 +5,7 @@ import com.spotify.confidence.sdk.flags.resolver.v1.ResolveProcessRequest; import com.spotify.confidence.sdk.flags.resolver.v1.ResolveProcessResponse; import com.spotify.confidence.sdk.flags.resolver.v1.Sdk; +import com.spotify.confidence.sdk.wasm.Messages; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -181,6 +182,16 @@ public void flushAssignLogs() { delegate.flushAssignLogs(); } + @Override + public void trackEvent(Messages.Event event) { + delegate.trackEvent(event); + } + + @Override + public void flushEvents() { + delegate.flushEvents(); + } + @Override public void close() { delegate.close(); diff --git a/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/OpenFeatureLocalResolveProvider.java b/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/OpenFeatureLocalResolveProvider.java index 983a0e8e..f5047440 100644 --- a/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/OpenFeatureLocalResolveProvider.java +++ b/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/OpenFeatureLocalResolveProvider.java @@ -3,6 +3,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.Struct; +import com.google.protobuf.Timestamp; import com.spotify.confidence.sdk.flags.resolver.v1.ApplyFlagsRequest; import com.spotify.confidence.sdk.flags.resolver.v1.ResolveFlagsRequest; import com.spotify.confidence.sdk.flags.resolver.v1.ResolveFlagsResponse; @@ -10,6 +11,7 @@ import com.spotify.confidence.sdk.flags.resolver.v1.ResolvedFlag; import com.spotify.confidence.sdk.flags.resolver.v1.Sdk; import com.spotify.confidence.sdk.flags.resolver.v1.SdkId; +import com.spotify.confidence.sdk.wasm.Messages; import dev.openfeature.sdk.*; import dev.openfeature.sdk.exceptions.FlagNotFoundError; import dev.openfeature.sdk.exceptions.GeneralError; @@ -17,10 +19,12 @@ import io.grpc.Status; import io.grpc.StatusRuntimeException; import java.time.Duration; +import java.time.Instant; import java.util.List; import java.util.Optional; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Function; import org.slf4j.Logger; @@ -52,6 +56,7 @@ public class OpenFeatureLocalResolveProvider implements FeatureProvider { org.slf4j.LoggerFactory.getLogger(OpenFeatureLocalResolveProvider.class); private final LocalResolver resolver; private final WasmFlagLogger flagLogger; + private final GrpcEventSender eventSender; private final MaterializationStore materializationStore; private static final Duration ASSIGN_LOG_FLUSH_INTERVAL = Duration.ofMillis(100); private static final Duration DEFAULT_POLL_INTERVAL = Duration.ofSeconds(15); @@ -138,11 +143,15 @@ public OpenFeatureLocalResolveProvider( this.stateProvider = new FlagsAdminStateFetcher(clientSecret, config.getHttpClientFactory()); final var wasmFlagLogger = new GrpcWasmFlagLogger(clientSecret, config.getChannelFactory()); this.flagLogger = wasmFlagLogger; + this.eventSender = new GrpcEventSender(clientSecret, config.getChannelFactory()); final int numInstances = PooledResolver.getNumInstances(config.getResolverPoolSize()); + final Consumer eventSinkRef = this.eventSender; final LocalResolver inner = new PooledResolver( numInstances, - () -> new RecoveringResolver(() -> new WasmLocalResolver(flagLogger::write))); + () -> + new RecoveringResolver( + () -> new WasmLocalResolver(flagLogger::write, eventSinkRef))); this.resolver = new MaterializingResolver(inner, materializationStore); } @@ -160,16 +169,29 @@ public OpenFeatureLocalResolveProvider( String clientSecret, MaterializationStore materializationStore, WasmFlagLogger wasmFlagLogger) { + this(accountStateProvider, clientSecret, materializationStore, wasmFlagLogger, resp -> {}); + } + + @VisibleForTesting + public OpenFeatureLocalResolveProvider( + AccountStateProvider accountStateProvider, + String clientSecret, + MaterializationStore materializationStore, + WasmFlagLogger wasmFlagLogger, + Consumer eventSink) { this.clientSecret = clientSecret; this.materializationStore = materializationStore; this.stateProvider = accountStateProvider; this.flagLogger = wasmFlagLogger; + this.eventSender = null; final int numInstances = PooledResolver.getNumInstances(LocalProviderConfig.DEFAULT_RESOLVER_POOL_SIZE); final LocalResolver inner = new PooledResolver( numInstances, - () -> new RecoveringResolver(() -> new WasmLocalResolver(wasmFlagLogger::write))); + () -> + new RecoveringResolver( + () -> new WasmLocalResolver(wasmFlagLogger::write, eventSink))); this.resolver = new MaterializingResolver(inner, materializationStore); } @@ -234,9 +256,10 @@ private void scheduleStateRefresh( this.state.set(ProviderState.READY); log.info("Provider recovered and is now READY"); } else { - // State refresh + full log flush + // State refresh + full log flush + event flush resolver.setResolverState(resolverStateProtobuf.get(), accountIdRef.get(), SDK); resolver.flushAllLogs(); + resolver.flushEvents(); } } @@ -349,6 +372,11 @@ public void shutdown() { // flagLogger.shutdown() waits for pending async writes to complete this.flagLogger.shutdown(); + // eventSender.shutdown() waits for pending event publishes to complete + if (this.eventSender != null) { + this.eventSender.shutdown(); + } + FeatureProvider.super.shutdown(); } @@ -491,6 +519,37 @@ void applyFlags(ApplyFlagsRequest request) { resolver.applyFlags(request); } + @Override + public void track(String trackingEventName, EvaluationContext ctx, TrackingEventDetails details) { + if (!initialized) { + return; + } + Struct contextStruct = OpenFeatureUtils.convertToProto(ctx); + Struct.Builder payloadBuilder = contextStruct.toBuilder(); + if (details != null) { + details + .getValue() + .ifPresent( + value -> + payloadBuilder.putFields( + "value", + com.google.protobuf.Value.newBuilder() + .setNumberValue(value.doubleValue()) + .build())); + } + Instant now = Instant.now(); + resolver.trackEvent( + Messages.Event.newBuilder() + .setEventDefinition("eventDefinitions/" + trackingEventName) + .setPayload(payloadBuilder.build()) + .setEventTime( + Timestamp.newBuilder() + .setSeconds(now.getEpochSecond()) + .setNanos(now.getNano()) + .build()) + .build()); + } + private static void handleStatusRuntimeException(StatusRuntimeException e) { if (e.getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED) { log.error("Deadline exceeded when calling provider backend", e); diff --git a/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/PooledResolver.java b/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/PooledResolver.java index 08e527c0..0cfaab8a 100644 --- a/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/PooledResolver.java +++ b/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/PooledResolver.java @@ -4,6 +4,7 @@ import com.spotify.confidence.sdk.flags.resolver.v1.ResolveProcessRequest; import com.spotify.confidence.sdk.flags.resolver.v1.ResolveProcessResponse; import com.spotify.confidence.sdk.flags.resolver.v1.Sdk; +import com.spotify.confidence.sdk.wasm.Messages; import java.util.Optional; import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicLong; @@ -76,6 +77,16 @@ public void flushAssignLogs() { maintenance(LocalResolver::flushAssignLogs); } + @Override + public void trackEvent(Messages.Event event) { + withReadSlotVoid(lr -> lr.trackEvent(event)); + } + + @Override + public void flushEvents() { + maintenance(LocalResolver::flushEvents); + } + @Override public void close() { maintenance(LocalResolver::close); diff --git a/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/RecoveringResolver.java b/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/RecoveringResolver.java index 5112e52e..df28d996 100644 --- a/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/RecoveringResolver.java +++ b/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/RecoveringResolver.java @@ -5,6 +5,7 @@ import com.spotify.confidence.sdk.flags.resolver.v1.ResolveProcessRequest; import com.spotify.confidence.sdk.flags.resolver.v1.ResolveProcessResponse; import com.spotify.confidence.sdk.flags.resolver.v1.Sdk; +import com.spotify.confidence.sdk.wasm.Messages; import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -122,6 +123,25 @@ public void flushAssignLogs() { } } + @Override + public void trackEvent(Messages.Event event) { + try { + current.get().trackEvent(event); + } catch (ChicoryException e) { + handleFailure("trackEvent", e); + throw e; + } + } + + @Override + public void flushEvents() { + try { + current.get().flushEvents(); + } catch (ChicoryException e) { + handleFailure("flushEvents", e); + } + } + @Override public void close() { // During close, do NOT recreate on failure — we are shutting down. diff --git a/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/WasmLocalResolver.java b/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/WasmLocalResolver.java index 9d9c2f79..73768d80 100644 --- a/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/WasmLocalResolver.java +++ b/openfeature-provider/java/src/main/java/com/spotify/confidence/sdk/WasmLocalResolver.java @@ -43,16 +43,22 @@ class WasmLocalResolver implements LocalResolver { private final ExportFunction wasmMsgFree; private final Consumer logSink; + private final Consumer eventSink; + // api private final ExportFunction wasmMsgGuestSetResolverState; private final ExportFunction wasmMsgBoundedFlushLogs; private final ExportFunction wasmMsgBoundedFlushAssign; private final ExportFunction wasmMsgGuestApplyFlags; private final ExportFunction wasmMsgGuestResolveProcess; + private final ExportFunction wasmMsgGuestTrackEvent; + private final ExportFunction wasmMsgGuestFlushEvents; private final ReentrantLock lock = new ReentrantLock(); - public WasmLocalResolver(Consumer logSink) { + public WasmLocalResolver( + Consumer logSink, Consumer eventSink) { this.logSink = logSink; + this.eventSink = eventSink; instance = Instance.builder(ConfidenceResolverModule.load()) .withImportValues( @@ -78,6 +84,8 @@ public WasmLocalResolver(Consumer logSink) { wasmMsgBoundedFlushAssign = instance.export("wasm_msg_guest_bounded_flush_assign"); wasmMsgGuestApplyFlags = instance.export("wasm_msg_guest_apply_flags"); wasmMsgGuestResolveProcess = instance.export("wasm_msg_guest_resolve_flags"); + wasmMsgGuestTrackEvent = instance.export("wasm_msg_guest_track_event"); + wasmMsgGuestFlushEvents = instance.export("wasm_msg_guest_flush_events"); } private Message log(LogMessage message) { @@ -188,6 +196,40 @@ public void flushAssignLogs() { } } + @Override + public void trackEvent(Messages.Event event) { + lock.lock(); + try { + if (closed) { + return; + } + final int reqPtr = transferRequest(event); + final int respPtr = (int) wasmMsgGuestTrackEvent.apply(reqPtr)[0]; + consumeResponse(respPtr, Messages.Void::parseFrom); + } finally { + lock.unlock(); + } + } + + @Override + public void flushEvents() { + lock.lock(); + try { + if (closed) { + return; + } + final var voidRequest = Messages.Void.getDefaultInstance(); + final var reqPtr = transferRequest(voidRequest); + final var respPtr = (int) wasmMsgGuestFlushEvents.apply(reqPtr)[0]; + final var response = consumeResponse(respPtr, Messages.FlushEventsResponse::parseFrom); + if (response.getEventsCount() > 0) { + eventSink.accept(response); + } + } finally { + lock.unlock(); + } + } + @Override public void close() { lock.lock(); @@ -205,6 +247,15 @@ public void close() { logSink.accept(assignRequest); } + // Flush pending events + final var eventReqPtr = transferRequest(voidRequest); + final var eventRespPtr = (int) wasmMsgGuestFlushEvents.apply(eventReqPtr)[0]; + final var eventResponse = + consumeResponse(eventRespPtr, Messages.FlushEventsResponse::parseFrom); + if (eventResponse.getEventsCount() > 0) { + eventSink.accept(eventResponse); + } + // Final flush of resolve logs (also drains any remaining assigns) final var reqPtr = transferRequest(voidRequest); final var respPtr = (int) wasmMsgBoundedFlushLogs.apply(reqPtr)[0]; diff --git a/openfeature-provider/java/src/test/java/com/spotify/confidence/sdk/ChannelFactoryTest.java b/openfeature-provider/java/src/test/java/com/spotify/confidence/sdk/ChannelFactoryTest.java index 3d64852b..cd30e50d 100644 --- a/openfeature-provider/java/src/test/java/com/spotify/confidence/sdk/ChannelFactoryTest.java +++ b/openfeature-provider/java/src/test/java/com/spotify/confidence/sdk/ChannelFactoryTest.java @@ -39,9 +39,9 @@ public ManagedChannel create(String target, List interceptors new OpenFeatureLocalResolveProvider(new LocalProviderConfig(customFactory), "clientsecret"); assertEquals( - 1, + 2, factoryCallCount.get(), - "ChannelFactory should have been called once for flag logger, but was called " + "ChannelFactory should have been called twice (flag logger + event sender), but was called " + factoryCallCount.get() + " times"); @@ -50,7 +50,8 @@ public ManagedChannel create(String target, List interceptors assertTrue( targetsReceived.get(0).contains("grpc") || targetsReceived.get(0).contains("edge"), "Target should be a gRPC endpoint, got: " + targetsReceived.get(0)); - assertEquals(1, interceptorCounts.size(), "Interceptors should have been called"); + assertEquals( + 2, interceptorCounts.size(), "Interceptors should have been called for each channel"); } @Test diff --git a/openfeature-provider/java/src/test/java/com/spotify/confidence/sdk/ResolveTest.java b/openfeature-provider/java/src/test/java/com/spotify/confidence/sdk/ResolveTest.java index 4894f63c..5da06abb 100644 --- a/openfeature-provider/java/src/test/java/com/spotify/confidence/sdk/ResolveTest.java +++ b/openfeature-provider/java/src/test/java/com/spotify/confidence/sdk/ResolveTest.java @@ -34,7 +34,7 @@ class ResolveTest { private final LocalResolver resolver; public ResolveTest() { - resolver = new WasmLocalResolver(request -> {}); + resolver = new WasmLocalResolver(request -> {}, response -> {}); } @BeforeEach diff --git a/openfeature-provider/java/src/test/java/com/spotify/confidence/sdk/WasmResolveApiFlushCloseRaceTest.java b/openfeature-provider/java/src/test/java/com/spotify/confidence/sdk/WasmResolveApiFlushCloseRaceTest.java index be488bbb..48564c91 100644 --- a/openfeature-provider/java/src/test/java/com/spotify/confidence/sdk/WasmResolveApiFlushCloseRaceTest.java +++ b/openfeature-provider/java/src/test/java/com/spotify/confidence/sdk/WasmResolveApiFlushCloseRaceTest.java @@ -44,7 +44,7 @@ void concurrentFlushAndCloseShouldNotLoseAssignments() throws Exception { for (int i = 0; i < iterations; i++) { final var logger = new CapturingWasmFlagLogger(); - final var resolver = new WasmLocalResolver(logger::write); + final var resolver = new WasmLocalResolver(logger::write, response -> {}); resolver.setResolverState(resolverState, accountId, null); // Resolve a flag to create a flag assignment in the WASM buffer diff --git a/openfeature-provider/js/Makefile b/openfeature-provider/js/Makefile index 40c853cc..5fa55de8 100644 --- a/openfeature-provider/js/Makefile +++ b/openfeature-provider/js/Makefile @@ -7,8 +7,8 @@ ROOT := $(realpath $(CURDIR)/../..) INSTALL_STAMP := .install.stamp BUILD_STAMP := .build.stamp GEN_DIR := src/proto -GEN_TS := $(GEN_DIR)/test-only.ts $(GEN_DIR)/confidence/wasm/messages.ts $(GEN_DIR)/confidence/wasm/wasm_api.ts $(GEN_DIR)/confidence/flags/resolver/v1/types.ts $(GEN_DIR)/confidence/flags/resolver/v1/api.ts $(GEN_DIR)/confidence/flags/resolver/v1/internal_api.ts $(GEN_DIR)/confidence/flags/types/v1/types.ts -PROTO_SRC := proto/test-only.proto $(ROOT)/openfeature-provider/proto/confidence/wasm/messages.proto $(ROOT)/openfeature-provider/proto/confidence/wasm/wasm_api.proto $(ROOT)/openfeature-provider/proto/confidence/flags/resolver/v1/types.proto $(ROOT)/openfeature-provider/proto/confidence/flags/resolver/v1/api.proto $(ROOT)/openfeature-provider/proto/confidence/flags/resolver/v1/internal_api.proto $(ROOT)/openfeature-provider/proto/confidence/flags/types/v1/types.proto +GEN_TS := $(GEN_DIR)/test-only.ts $(GEN_DIR)/confidence/wasm/messages.ts $(GEN_DIR)/confidence/wasm/wasm_api.ts $(GEN_DIR)/confidence/flags/resolver/v1/types.ts $(GEN_DIR)/confidence/flags/resolver/v1/api.ts $(GEN_DIR)/confidence/flags/resolver/v1/internal_api.ts $(GEN_DIR)/confidence/flags/types/v1/types.ts $(GEN_DIR)/confidence/events/v1/api.ts $(GEN_DIR)/confidence/events/v1/types.ts +PROTO_SRC := proto/test-only.proto $(ROOT)/openfeature-provider/proto/confidence/wasm/messages.proto $(ROOT)/openfeature-provider/proto/confidence/wasm/wasm_api.proto $(ROOT)/openfeature-provider/proto/confidence/flags/resolver/v1/types.proto $(ROOT)/openfeature-provider/proto/confidence/flags/resolver/v1/api.proto $(ROOT)/openfeature-provider/proto/confidence/flags/resolver/v1/internal_api.proto $(ROOT)/openfeature-provider/proto/confidence/flags/types/v1/types.proto $(ROOT)/openfeature-provider/proto/confidence/events/v1/api.proto $(ROOT)/openfeature-provider/proto/confidence/events/v1/types.proto SRC := $(shell find src -name '*.ts') CONFIG := package.json yarn.lock tsconfig.json tsdown.config.ts vitest.config.ts WASM_ARTIFACT := $(ROOT)/wasm/confidence_resolver.wasm diff --git a/openfeature-provider/js/package.json b/openfeature-provider/js/package.json index 0c8fd4f0..df634406 100644 --- a/openfeature-provider/js/package.json +++ b/openfeature-provider/js/package.json @@ -41,7 +41,7 @@ "format:check": "prettier --config prettier.config.cjs -c .", "test": "vitest", "typecheck": "tsc --noEmit", - "proto:gen": "rm -rf src/proto && mkdir -p src/proto && protoc --plugin=node_modules/.bin/protoc-gen-ts_proto --ts_proto_opt useOptionals=messages --ts_proto_opt esModuleInterop=true --ts_proto_out src/proto -Iproto -I../../openfeature-provider/proto test-only.proto ../../openfeature-provider/proto/confidence/wasm/messages.proto ../../openfeature-provider/proto/confidence/wasm/wasm_api.proto ../../openfeature-provider/proto/confidence/flags/resolver/v1/types.proto ../../openfeature-provider/proto/confidence/flags/resolver/v1/api.proto ../../openfeature-provider/proto/confidence/flags/resolver/v1/internal_api.proto ../../openfeature-provider/proto/confidence/flags/types/v1/types.proto" + "proto:gen": "rm -rf src/proto && mkdir -p src/proto && protoc --plugin=node_modules/.bin/protoc-gen-ts_proto --ts_proto_opt useOptionals=messages --ts_proto_opt esModuleInterop=true --ts_proto_out src/proto -Iproto -I../../openfeature-provider/proto test-only.proto ../../openfeature-provider/proto/confidence/wasm/messages.proto ../../openfeature-provider/proto/confidence/wasm/wasm_api.proto ../../openfeature-provider/proto/confidence/flags/resolver/v1/types.proto ../../openfeature-provider/proto/confidence/flags/resolver/v1/api.proto ../../openfeature-provider/proto/confidence/flags/resolver/v1/internal_api.proto ../../openfeature-provider/proto/confidence/flags/types/v1/types.proto ../../openfeature-provider/proto/confidence/events/v1/api.proto ../../openfeature-provider/proto/confidence/events/v1/types.proto" }, "dependencies": { "@bufbuild/protobuf": "^2.9.0" diff --git a/openfeature-provider/js/src/ConfidenceServerProviderLocal.test.ts b/openfeature-provider/js/src/ConfidenceServerProviderLocal.test.ts index 0ad118b9..7e5a1773 100644 --- a/openfeature-provider/js/src/ConfidenceServerProviderLocal.test.ts +++ b/openfeature-provider/js/src/ConfidenceServerProviderLocal.test.ts @@ -23,6 +23,8 @@ const mockedWasmResolver: MockedObject = { flushLogs: vi.fn().mockReturnValue(new Uint8Array(100)), flushAssigned: vi.fn().mockReturnValue(new Uint8Array(50)), applyFlags: vi.fn(), + trackEvent: vi.fn(), + flushEvents: vi.fn().mockReturnValue({ events: [], sendTime: undefined }), }; let provider: ConfidenceServerProviderLocal; diff --git a/openfeature-provider/js/src/ConfidenceServerProviderLocal.ts b/openfeature-provider/js/src/ConfidenceServerProviderLocal.ts index 03fc8f43..6371077a 100644 --- a/openfeature-provider/js/src/ConfidenceServerProviderLocal.ts +++ b/openfeature-provider/js/src/ConfidenceServerProviderLocal.ts @@ -1,4 +1,11 @@ -import type { EvaluationContext, JsonValue, Provider, ProviderMetadata, ProviderStatus } from '@openfeature/server-sdk'; +import type { + EvaluationContext, + JsonValue, + Provider, + ProviderMetadata, + ProviderStatus, + TrackingEventDetails, +} from '@openfeature/server-sdk'; import { ResolveFlagsResponse } from './proto/confidence/flags/resolver/v1/api'; import { ResolveProcessRequest, ResolveProcessResponse } from './proto/confidence/wasm/wasm_api'; import { SdkId } from './proto/confidence/flags/resolver/v1/types'; @@ -16,6 +23,7 @@ import { readResultsToMaterializationRecords, } from './materialization'; import { SetResolverStateRequest } from './proto/confidence/wasm/messages'; +import { PublishEventsRequest } from './proto/confidence/events/v1/api'; import FlagBundleType, * as FlagBundle from './flag-bundle'; import { ErrorCode, ResolutionDetails } from './types'; @@ -108,6 +116,10 @@ export class ConfidenceServerProviderLocal implements Provider { ], }), ], + 'https://events.confidence.dev/*': [ + withRetry({ maxAttempts: 5, baseInterval: 500 }), + withTimeout(5 * TimeUnit.SECOND), + ], '*': [ withResponse(url => { throw new Error(`Unknown route ${url}`); @@ -161,6 +173,19 @@ export class ConfidenceServerProviderLocal implements Provider { this.main.abort(); } + track(trackingEventName: string, context: EvaluationContext, details: TrackingEventDetails): void { + const payload = { + ...ConfidenceServerProviderLocal.convertEvaluationContext(context), + ...(details ?? {}), + }; + const now = new Date(); + this.resolver.trackEvent({ + eventDefinition: `eventDefinitions/${trackingEventName}`, + payload, + eventTime: now, + }); + } + async resolve(context: EvaluationContext, flagNames: string[], apply = false): Promise { const resolveRequest = { flags: flagNames.map(name => `flags/${name}`), @@ -271,6 +296,10 @@ export class ConfidenceServerProviderLocal implements Provider { if (writeFlagLogRequest.length > 0) { await this.sendFlagLogs(writeFlagLogRequest, signal); } + const { events } = this.resolver.flushEvents(); + if (events.length > 0) { + await this.sendEvents(events, signal); + } } private async flushAssigned(): Promise { @@ -301,6 +330,33 @@ export class ConfidenceServerProviderLocal implements Provider { } } + private async sendEvents( + events: { eventDefinition: string; payload?: { [key: string]: any }; eventTime?: Date }[], + signal = this.main.signal, + ): Promise { + try { + const body = PublishEventsRequest.encode({ + clientSecret: this.options.flagClientSecret, + events, + sendTime: new Date(), + }).finish(); + const response = await this.fetch('https://events.confidence.dev/v1/events:publish', { + method: 'post', + signal, + headers: { + 'Content-Type': 'application/x-protobuf', + }, + body: body as Uint8Array, + }); + if (!response.ok) { + logger.error(`Failed to publish events: ${response.status} ${response.statusText} - ${await response.text()}`); + } + } catch (err) { + logger.warn('Failed to send events', err); + throw err; + } + } + private async readMaterializations( readOps: MaterializationStore.ReadOp[], ): Promise { diff --git a/openfeature-provider/js/src/LocalResolver.ts b/openfeature-provider/js/src/LocalResolver.ts index 9827c89d..5e83d3ee 100644 --- a/openfeature-provider/js/src/LocalResolver.ts +++ b/openfeature-provider/js/src/LocalResolver.ts @@ -1,5 +1,5 @@ import type { ResolveProcessRequest, ResolveProcessResponse } from './proto/confidence/wasm/wasm_api'; -import type { SetResolverStateRequest } from './proto/confidence/wasm/messages'; +import type { Event, FlushEventsResponse, SetResolverStateRequest } from './proto/confidence/wasm/messages'; import type { ApplyFlagsRequest } from './proto/confidence/flags/resolver/v1/api'; export interface LocalResolver { @@ -8,4 +8,6 @@ export interface LocalResolver { flushLogs(): Uint8Array; flushAssigned(): Uint8Array; applyFlags(request: ApplyFlagsRequest): void; + trackEvent(event: Event): void; + flushEvents(): FlushEventsResponse; } diff --git a/openfeature-provider/js/src/WasmResolver.ts b/openfeature-provider/js/src/WasmResolver.ts index 7708879e..055bd93e 100644 --- a/openfeature-provider/js/src/WasmResolver.ts +++ b/openfeature-provider/js/src/WasmResolver.ts @@ -1,5 +1,12 @@ import { BinaryWriter } from '@bufbuild/protobuf/wire'; -import { Request, Response, Void, SetResolverStateRequest } from './proto/confidence/wasm/messages'; +import { + Request, + Response, + Void, + SetResolverStateRequest, + FlushEventsResponse, +} from './proto/confidence/wasm/messages'; +import { Event } from './proto/confidence/wasm/messages'; import { Timestamp } from './proto/google/protobuf/timestamp'; import { ResolveProcessRequest, ResolveProcessResponse } from './proto/confidence/wasm/wasm_api'; import { ApplyFlagsRequest } from './proto/confidence/flags/resolver/v1/api'; @@ -21,6 +28,8 @@ const EXPORT_FN_NAMES = [ 'wasm_msg_guest_bounded_flush_logs', 'wasm_msg_guest_bounded_flush_assign', 'wasm_msg_guest_apply_flags', + 'wasm_msg_guest_track_event', + 'wasm_msg_guest_flush_events', ] as const; type EXPORT_FN_NAMES = (typeof EXPORT_FN_NAMES)[number]; @@ -96,6 +105,21 @@ export class UnsafeWasmResolver implements LocalResolver { this.consumeResponse(resPtr, Void); } + trackEvent(event: Event): void { + const reqPtr = this.transferRequest(event, Event); + const resPtr = this.exports.wasm_msg_guest_track_event(reqPtr); + this.consumeResponse(resPtr, Void); + } + + flushEvents(): FlushEventsResponse { + const resPtr = this.exports.wasm_msg_guest_flush_events(0); + const { data, error } = this.consume(resPtr, Response); + if (error) { + throw new Error(error); + } + return FlushEventsResponse.decode(data!); + } + private transferRequest(value: T, codec: Codec): number { const data = codec.encode(value).finish(); return this.transfer({ data }, Request); @@ -219,4 +243,26 @@ export class WasmResolver implements LocalResolver { throw error; } } + + trackEvent(event: Event): void { + try { + this.delegate.trackEvent(event); + } catch (error: unknown) { + if (error instanceof WebAssembly.RuntimeError) { + this.reloadInstance(error); + } + throw error; + } + } + + flushEvents(): FlushEventsResponse { + try { + return this.delegate.flushEvents(); + } catch (error: unknown) { + if (error instanceof WebAssembly.RuntimeError) { + this.reloadInstance(error); + } + throw error; + } + } } diff --git a/openfeature-provider/proto/confidence/events/v1/api.proto b/openfeature-provider/proto/confidence/events/v1/api.proto new file mode 100644 index 00000000..c20aafa6 --- /dev/null +++ b/openfeature-provider/proto/confidence/events/v1/api.proto @@ -0,0 +1,24 @@ +syntax = "proto3"; +package confidence.events.v1; + +import "google/protobuf/timestamp.proto"; +import "confidence/events/v1/types.proto"; + +option java_package = "com.spotify.confidence.sdk.events.v1"; +option java_multiple_files = true; +option java_outer_classname = "ApiProto"; +option go_package = "github.com/spotify/confidence-resolver/openfeature-provider/go/confidence/internal/proto/events"; + +service EventsService { + rpc PublishEvents(PublishEventsRequest) returns (PublishEventsResponse) {} +} + +message PublishEventsRequest { + string client_secret = 1; + repeated Event events = 2; + google.protobuf.Timestamp send_time = 3; +} + +message PublishEventsResponse { + repeated EventError errors = 1; +} diff --git a/openfeature-provider/proto/confidence/events/v1/types.proto b/openfeature-provider/proto/confidence/events/v1/types.proto new file mode 100644 index 00000000..cfb8c718 --- /dev/null +++ b/openfeature-provider/proto/confidence/events/v1/types.proto @@ -0,0 +1,28 @@ +syntax = "proto3"; +package confidence.events.v1; + +import "google/protobuf/struct.proto"; +import "google/protobuf/timestamp.proto"; + +option java_package = "com.spotify.confidence.sdk.events.v1"; +option java_multiple_files = true; +option java_outer_classname = "TypesProto"; +option go_package = "github.com/spotify/confidence-resolver/openfeature-provider/go/confidence/internal/proto/events"; + +message Event { + string event_definition = 1; + google.protobuf.Struct payload = 2; + google.protobuf.Timestamp event_time = 3; +} + +message EventError { + int32 index = 1; + Reason reason = 2; + string message = 3; + + enum Reason { + REASON_UNSPECIFIED = 0; + EVENT_DEFINITION_NOT_FOUND = 1; + EVENT_SCHEMA_VALIDATION_FAILED = 2; + } +} diff --git a/openfeature-provider/proto/confidence/wasm/messages.proto b/openfeature-provider/proto/confidence/wasm/messages.proto index 52153dca..bdef56fb 100644 --- a/openfeature-provider/proto/confidence/wasm/messages.proto +++ b/openfeature-provider/proto/confidence/wasm/messages.proto @@ -3,6 +3,8 @@ syntax = "proto3"; package confidence.wasm; import "confidence/flags/resolver/v1/types.proto"; +import "google/protobuf/struct.proto"; +import "google/protobuf/timestamp.proto"; option java_package = "com.spotify.confidence.sdk.wasm"; option java_multiple_files = false; @@ -27,3 +29,13 @@ message Response { string error = 2; } } + +message Event { + string event_definition = 1; + google.protobuf.Struct payload = 2; + google.protobuf.Timestamp event_time = 3; +} + +message FlushEventsResponse { + repeated Event events = 2; +} diff --git a/wasm/proto/messages.proto b/wasm/proto/messages.proto index 6bc493e8..bf1f4653 100644 --- a/wasm/proto/messages.proto +++ b/wasm/proto/messages.proto @@ -3,6 +3,7 @@ syntax = "proto3"; package rust_guest; import "google/protobuf/struct.proto"; +import "google/protobuf/timestamp.proto"; import "types.proto"; option java_package = "com.spotify.confidence.wasm"; @@ -38,3 +39,13 @@ message Response { string error = 2; } } + +message Event { + string event_definition = 1; + google.protobuf.Struct payload = 2; + google.protobuf.Timestamp event_time = 3; +} + +message FlushEventsResponse { + repeated Event events = 2; +} diff --git a/wasm/rust-guest/src/lib.rs b/wasm/rust-guest/src/lib.rs index c361e0c9..85ca2071 100644 --- a/wasm/rust-guest/src/lib.rs +++ b/wasm/rust-guest/src/lib.rs @@ -4,6 +4,7 @@ use std::sync::LazyLock; use arc_swap::{ArcSwap, ArcSwapOption}; use bytes::Bytes; use confidence_resolver::assign_logger::AssignLogger; +use confidence_resolver::event_logger::{EventLogger, TrackedEvent}; use confidence_resolver::proto::confidence::flags::resolver::v1::resolve_process_response; use confidence_resolver::telemetry::{Telemetry, TelemetrySnapshot}; use prost::Message; @@ -56,6 +57,7 @@ const ENCRYPTION_KEY: Bytes = Bytes::from_static(&[0; 16]); static RESOLVER_STATE: ArcSwapOption = ArcSwapOption::const_empty(); static RESOLVE_LOGGER: LazyLock> = LazyLock::new(ResolveLogger::new); static ASSIGN_LOGGER: LazyLock = LazyLock::new(AssignLogger::new); +static EVENT_LOGGER: LazyLock = LazyLock::new(EventLogger::new); static TELEMETRY: LazyLock = LazyLock::new(|| { Telemetry::with_memory_provider(|| (core::arch::wasm32::memory_size::<0>() * 65536) as u64) }); @@ -199,6 +201,27 @@ wasm_msg_guest! { Ok(ASSIGN_LOGGER.checkpoint_with_limit(LOG_TARGET_BYTES, true)) } + fn track_event(event: proto::Event) -> WasmResult { + EVENT_LOGGER.track(TrackedEvent { + event_definition: event.event_definition, + payload: event.payload.unwrap_or_default(), + event_time: event.event_time.unwrap_or_default(), + }); + Ok(VOID) + } + + fn flush_events(_request: Void) -> WasmResult { + let events = EVENT_LOGGER.flush(); + let pb_events = events.into_iter().map(|e| proto::Event { + event_definition: e.event_definition, + payload: Some(e.payload), + event_time: Some(e.event_time), + }).collect(); + Ok(proto::FlushEventsResponse { + events: pb_events, + }) + } + fn apply_flags(request: ApplyFlagsRequest) -> WasmResult { let resolver_state = get_resolver_state()?; // Use empty evaluation context - the real one is extracted from the resolve token