Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions confidence-resolver/src/event_logger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
use crossbeam_queue::SegQueue;

use crate::proto::google::{Struct, Timestamp};

pub struct TrackedEvent {
pub event_definition: String,
pub payload: Struct,
pub event_time: Timestamp,
}

#[derive(Default)]
pub struct EventLogger {
events: SegQueue<TrackedEvent>,
}

impl EventLogger {
pub fn new() -> Self {
Self::default()
}

pub fn track(&self, event: TrackedEvent) {
self.events.push(event);
}

// TODO: Only drop events from memory after the provider confirms successful delivery.
// Currently events are drained unconditionally — if the HTTP/gRPC send fails, they are lost.
pub fn flush(&self) -> Vec<TrackedEvent> {
let mut result = Vec::new();
while let Some(event) = self.events.pop() {
result.push(event);
}
result
}
}

#[cfg(test)]
mod tests {
use super::*;

fn make_event(name: &str) -> TrackedEvent {
TrackedEvent {
event_definition: format!("eventDefinitions/{}", name),
payload: Struct::default(),
event_time: Timestamp {
seconds: 1234,
nanos: 0,
},
}
}

#[test]
fn track_and_flush_single_event() {
let logger = EventLogger::new();
logger.track(make_event("purchase"));
let events = logger.flush();
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_definition, "eventDefinitions/purchase");
}

#[test]
fn flush_drains_all_events() {
let logger = EventLogger::new();
logger.track(make_event("a"));
logger.track(make_event("b"));
logger.track(make_event("c"));
let events = logger.flush();
assert_eq!(events.len(), 3);
// second flush returns empty
assert!(logger.flush().is_empty());
}

#[test]
fn empty_flush_returns_empty_vec() {
let logger = EventLogger::new();
assert!(logger.flush().is_empty());
}

#[test]
fn concurrent_track_and_flush() {
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;

let logger = Arc::new(EventLogger::new());
let done = Arc::new(AtomicBool::new(false));
let total_tracked = Arc::new(AtomicUsize::new(0));

let mut handles = Vec::new();
for _ in 0..3 {
let lg = logger.clone();
let done_cl = done.clone();
let tracked = total_tracked.clone();
handles.push(thread::spawn(move || {
let mut count = 0;
while !done_cl.load(Ordering::Relaxed) {
lg.track(make_event("concurrent"));
count += 1;
}
tracked.fetch_add(count, Ordering::Relaxed);
}));
}

let lg = logger.clone();
let mut total_flushed = 0;
for _ in 0..10 {
thread::sleep(std::time::Duration::from_millis(10));
total_flushed += lg.flush().len();
}
done.store(true, Ordering::Relaxed);
for h in handles {
h.join().unwrap();
}
total_flushed += logger.flush().len();

assert_eq!(total_flushed, total_tracked.load(Ordering::Relaxed));
}
}
1 change: 1 addition & 0 deletions confidence-resolver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use err::Fallible;
pub mod assign_logger;
mod bounded_set;
mod err;
pub mod event_logger;
pub mod flag_logger;
mod gzip;
pub mod proto;
Expand Down
11 changes: 11 additions & 0 deletions openfeature-provider/go/confidence/event_sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package confidence

import (
"github.com/spotify/confidence-resolver/openfeature-provider/go/confidence/internal/proto/wasm"
)

// EventSender sends tracked events to the Confidence events backend.
type EventSender interface {
Send(response *wasm.FlushEventsResponse, clientSecret string)
Shutdown()
}
2 changes: 1 addition & 1 deletion openfeature-provider/go/confidence/flag_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func setupFlagLogsUnitTest(t *testing.T) (*fl.CapturingFlagLogger, openfeature.I
resolverSupplier := wrapResolverSupplierWithMaterializations(func(ctx context.Context, logSink lr.LogSink) lr.LocalResolver {
return lr.NewLocalResolverWithPoolSize(ctx, logSink, 2)
}, unsupportedMatStore)
provider := NewLocalResolverProvider(resolverSupplier, stateProvider, capturingLogger, unitTestClientSecret, logger)
provider := NewLocalResolverProvider(resolverSupplier, stateProvider, capturingLogger, nil, unitTestClientSecret, logger)

// Set provider and wait for ready
err := openfeature.SetProviderAndWait(provider)
Expand Down
71 changes: 71 additions & 0 deletions openfeature-provider/go/confidence/grpc_event_sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package confidence

import (
"context"
"log/slog"
"time"

events "github.com/spotify/confidence-resolver/openfeature-provider/go/confidence/internal/proto/events"
"github.com/spotify/confidence-resolver/openfeature-provider/go/confidence/internal/proto/wasm"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/timestamppb"
)

// GrpcEventSender sends events to the Confidence events backend via gRPC.
type GrpcEventSender struct {
conn *grpc.ClientConn
client events.EventsServiceClient
logger *slog.Logger
}

var _ EventSender = (*GrpcEventSender)(nil)

// NewGrpcEventSender creates a new gRPC event sender connecting to the given target.
func NewGrpcEventSender(target string, logger *slog.Logger, opts ...grpc.DialOption) (*GrpcEventSender, error) {
if len(opts) == 0 {
opts = []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
}
conn, err := grpc.NewClient(target, opts...)
if err != nil {
return nil, err
}
return &GrpcEventSender{
conn: conn,
client: events.NewEventsServiceClient(conn),
logger: logger,
}, nil
}

func (s *GrpcEventSender) Send(response *wasm.FlushEventsResponse, clientSecret string) {
req := &events.PublishEventsRequest{
ClientSecret: clientSecret,
SendTime: timestamppb.Now(),
}
for _, e := range response.Events {
req.Events = append(req.Events, &events.Event{
EventDefinition: e.EventDefinition,
Payload: e.Payload,
EventTime: e.EventTime,
})
}

go func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
md := metadata.Pairs("authorization", "ClientSecret "+clientSecret)
ctx = metadata.NewOutgoingContext(ctx, md)
if _, err := s.client.PublishEvents(ctx, req); err != nil {
s.logger.Error("Failed to publish events", "error", err)
} else {
s.logger.Debug("Successfully published events", "count", len(req.Events))
}
}()
}

func (s *GrpcEventSender) Shutdown() {
if s.conn != nil {
s.conn.Close()
}
}
10 changes: 5 additions & 5 deletions openfeature-provider/go/confidence/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func TestIntegration_OpenFeatureResolveStickyFlagMatStoreReadAndWrite(t *testing
}, matStore)

// Create provider with test state
provider := NewLocalResolverProvider(resolverSupplier, stateProvider, trackingLogger, "test-secret", slog.New(slog.NewTextHandler(os.Stderr, nil)))
provider := NewLocalResolverProvider(resolverSupplier, stateProvider, trackingLogger, nil, "test-secret", slog.New(slog.NewTextHandler(os.Stderr, nil)))

client := openfeature.NewClient("integration-test")

Expand Down Expand Up @@ -337,7 +337,7 @@ func TestIntegration_OpenFeatureMaterializedSegmentCriterion(t *testing.T) {
}, matStore)

// Create provider with test state
provider := NewLocalResolverProvider(resolverSupplier, stateProvider, trackingLogger, SECRET, slog.New(slog.NewTextHandler(os.Stderr, nil)))
provider := NewLocalResolverProvider(resolverSupplier, stateProvider, trackingLogger, nil, SECRET, slog.New(slog.NewTextHandler(os.Stderr, nil)))

client := openfeature.NewClient("integration-test-mat-seg")

Expand Down Expand Up @@ -399,7 +399,7 @@ func TestIntegration_OpenFeatureMaterializedSegmentCriterion(t *testing.T) {
}, matStore)

// Create provider with test state
provider := NewLocalResolverProvider(resolverSupplier, stateProvider, trackingLogger, SECRET, slog.New(slog.NewTextHandler(os.Stderr, nil)))
provider := NewLocalResolverProvider(resolverSupplier, stateProvider, trackingLogger, nil, SECRET, slog.New(slog.NewTextHandler(os.Stderr, nil)))

client := openfeature.NewClient("integration-test-mat-seg-not-in")

Expand Down Expand Up @@ -440,7 +440,7 @@ func TestIntegration_OpenFeatureMaterializedSegmentCriterion(t *testing.T) {
}, matStore)

// Create provider with test state
provider := NewLocalResolverProvider(resolverSupplier, stateProvider, trackingLogger, SECRET, slog.New(slog.NewTextHandler(os.Stderr, nil)))
provider := NewLocalResolverProvider(resolverSupplier, stateProvider, trackingLogger, nil, SECRET, slog.New(slog.NewTextHandler(os.Stderr, nil)))

client := openfeature.NewClient("integration-test-mat-seg-no-ctx")

Expand Down Expand Up @@ -698,6 +698,6 @@ func createProviderWithTestState(
}, matStore)
// Create provider with the client secret from test state
// The test state includes client secret: mkjJruAATQWjeY7foFIWfVAcBWnci2YF
provider := NewLocalResolverProvider(resolverSupplier, stateProvider, logger, "mkjJruAATQWjeY7foFIWfVAcBWnci2YF", slog.New(slog.NewTextHandler(os.Stderr, nil)))
provider := NewLocalResolverProvider(resolverSupplier, stateProvider, logger, nil, "mkjJruAATQWjeY7foFIWfVAcBWnci2YF", slog.New(slog.NewTextHandler(os.Stderr, nil)))
return provider, nil
}
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type LocalResolver interface {
ApplyFlags(*resolver.ApplyFlagsRequest) error
FlushAllLogs() error
FlushAssignLogs() error
TrackEvent(*wasm.Event) error
FlushEvents() (*wasm.FlushEventsResponse, error)
Close(context.Context) error
}

Expand Down
26 changes: 26 additions & 0 deletions openfeature-provider/go/confidence/internal/local_resolver/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,32 @@ func (s *PooledResolver) FlushAssignLogs() error {
})
}

// TrackEvent implements LocalResolver.
func (s *PooledResolver) TrackEvent(event *wasm.Event) error {
n := uint64(len(s.slots))
idx := s.rr.Add(1)
for !s.slots[idx%n].rw.TryRLock() {
idx = s.rr.Add(1)
}
slot := &s.slots[idx%n]
defer slot.rw.RUnlock()
return slot.lr.TrackEvent(event)
}

// FlushEvents implements LocalResolver.
func (s *PooledResolver) FlushEvents() (*wasm.FlushEventsResponse, error) {
combined := &wasm.FlushEventsResponse{}
err := s.maintenance(func(lr LocalResolver) error {
resp, err := lr.FlushEvents()
if err != nil {
return err
}
combined.Events = append(combined.Events, resp.Events...)
return nil
})
return combined, err
}

func (s *PooledResolver) Close(ctx context.Context) error {
return s.maintenance(func(lr LocalResolver) error {
return lr.Close(ctx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,20 @@ func (r *RecoveringResolver) FlushAssignLogs() (err error) {
return
}

func (r *RecoveringResolver) TrackEvent(event *wasm.Event) (err error) {
r.withRecover("TrackEvent", &err, func(lr LocalResolver) {
err = lr.TrackEvent(event)
})
return
}

func (r *RecoveringResolver) FlushEvents() (resp *wasm.FlushEventsResponse, err error) {
r.withRecover("FlushEvents", &err, func(lr LocalResolver) {
resp, err = lr.FlushEvents()
})
return
}

func (r *RecoveringResolver) Close(ctx context.Context) error {
// For Close, if we panic, don't recreate during shutdown; just surface error.
defer func() {
Expand Down
10 changes: 10 additions & 0 deletions openfeature-provider/go/confidence/internal/local_resolver/wasm.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ func (r *WasmResolver) FlushAssignLogs() error {
return err
}

func (r *WasmResolver) TrackEvent(event *wasm.Event) error {
return r.call("wasm_msg_guest_track_event", event, nil)
}

func (r *WasmResolver) FlushEvents() (*wasm.FlushEventsResponse, error) {
resp := &wasm.FlushEventsResponse{}
err := r.call("wasm_msg_guest_flush_events", nil, resp)
return resp, err
}

func (r *WasmResolver) Close(ctx context.Context) error {
// TODO we should call flush assigned until it doesn't flush any more
r.FlushAllLogs()
Expand Down
Loading
Loading