Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions testservice/flag_change_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package main

import (
"context"
"sync"

"github.com/launchdarkly/go-sdk-common/v3/ldcontext"
"github.com/launchdarkly/go-sdk-common/v3/ldvalue"
"github.com/launchdarkly/go-server-sdk/v7/interfaces"
"github.com/launchdarkly/go-server-sdk/v7/testservice/servicedef"
)

// listenerEntry holds the cancellation handle for one registered listener goroutine.
type listenerEntry struct {
cancel context.CancelFunc
}

// listenerRegistry manages all active flag change listener registrations for a single
// SDK client entity. It is safe to use from multiple goroutines.
type listenerRegistry struct {
mu sync.Mutex
entries map[string]*listenerEntry // keyed by listenerId
tracker interfaces.FlagTracker
}

func newListenerRegistry(tracker interfaces.FlagTracker) *listenerRegistry {
return &listenerRegistry{
entries: make(map[string]*listenerEntry),
tracker: tracker,
}
}

// registerFlagChangeListener subscribes to general flag configuration changes. If flagKey
// is non-empty, only events for that specific flag are forwarded to the callback URI.
func (r *listenerRegistry) registerFlagChangeListener(listenerID, flagKey, callbackURI string) {
ch := r.tracker.AddFlagChangeListener()
ctx, cancel := context.WithCancel(context.Background())

r.mu.Lock()
r.entries[listenerID] = &listenerEntry{cancel: cancel}
r.mu.Unlock()

svc := callbackService{baseURL: callbackURI}
go func() {
defer r.tracker.RemoveFlagChangeListener(ch)
for {
select {
case <-ctx.Done():
return
case event, ok := <-ch:
if !ok {
return
}
if flagKey != "" && event.Key != flagKey {
continue
}
_ = svc.post("", servicedef.ListenerNotification{
ListenerID: listenerID,
FlagKey: event.Key,
}, nil)
}
}
}()
}

// registerFlagValueChangeListener subscribes to value changes for a specific flag and
// evaluation context. The callback is invoked only when the evaluated value actually
// changes; configuration changes that leave the value unchanged are suppressed by the SDK.
func (r *listenerRegistry) registerFlagValueChangeListener(
listenerID, flagKey string,
evalCtx ldcontext.Context,
defaultValue ldvalue.Value,
callbackURI string,
) {
ch := r.tracker.AddFlagValueChangeListener(flagKey, evalCtx, defaultValue)
ctx, cancel := context.WithCancel(context.Background())

r.mu.Lock()
r.entries[listenerID] = &listenerEntry{cancel: cancel}
r.mu.Unlock()

svc := callbackService{baseURL: callbackURI}
go func() {
defer r.tracker.RemoveFlagValueChangeListener(ch)
for {
select {
case <-ctx.Done():
return
case event, ok := <-ch:
if !ok {
return
}
oldVal := event.OldValue
newVal := event.NewValue
_ = svc.post("", servicedef.ListenerNotification{
ListenerID: listenerID,
FlagKey: event.Key,
OldValue: &oldVal,
NewValue: &newVal,
}, nil)
}
}
}()
}

// unregister stops the listener goroutine for the given ID and removes it from the
// registry. Returns false if no listener with that ID was found.
func (r *listenerRegistry) unregister(listenerID string) bool {
r.mu.Lock()
entry, ok := r.entries[listenerID]
if ok {
delete(r.entries, listenerID)
}
r.mu.Unlock()

if ok {
entry.cancel()
}
return ok
}

// closeAll stops all active listener goroutines. Called when the SDK client entity closes.
func (r *listenerRegistry) closeAll() {
r.mu.Lock()
entries := r.entries
r.entries = make(map[string]*listenerEntry)
r.mu.Unlock()

for _, entry := range entries {
entry.cancel()
}
}
21 changes: 19 additions & 2 deletions testservice/sdk_client_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ import (
const defaultStartWaitTime = 5 * time.Second

type SDKClientEntity struct {
sdk *ld.LDClient
logger *log.Logger
sdk *ld.LDClient
logger *log.Logger
listeners *listenerRegistry
}

func NewSDKClientEntity(params servicedef.CreateInstanceParams) (*SDKClientEntity, error) {
Expand Down Expand Up @@ -71,11 +72,13 @@ func NewSDKClientEntity(params servicedef.CreateInstanceParams) (*SDKClientEntit
return nil, err
}
c.sdk = sdk
c.listeners = newListenerRegistry(sdk.GetFlagTracker())

return c, nil
}

func (c *SDKClientEntity) Close() {
c.listeners.closeAll()
_ = c.sdk.Close()
c.logger.Println("Test ended")
c.logger.SetOutput(io.Discard)
Expand Down Expand Up @@ -130,6 +133,20 @@ func (c *SDKClientEntity) DoCommand(params servicedef.CommandParams) (interface{
return servicedef.MigrationVariationResponse{Result: string(stage)}, nil
case servicedef.CommandMigrationOperation:
return c.migrationOperation(*params.MigrationOperation)
case servicedef.CommandRegisterFlagChangeListener:
p := params.RegisterFlagChangeListener
c.listeners.registerFlagChangeListener(p.ListenerID, p.FlagKey, p.CallbackURI)
return nil, nil
case servicedef.CommandRegisterFlagValueChangeListener:
p := params.RegisterFlagValueChangeListener
c.listeners.registerFlagValueChangeListener(p.ListenerID, p.FlagKey, p.Context, p.DefaultValue, p.CallbackURI)
return nil, nil
case servicedef.CommandUnregisterListener:
p := params.UnregisterListener
if !c.listeners.unregister(p.ListenerID) {
return nil, BadRequestError{Message: fmt.Sprintf("no listener with id %q", p.ListenerID)}
}
return nil, nil
default:
return nil, BadRequestError{Message: fmt.Sprintf("unknown command %q", params.Command)}
}
Expand Down
1 change: 1 addition & 0 deletions testservice/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var capabilities = []string{
servicedef.CapabilityPersistentDataStoreRedis,
servicedef.CapabilityPersistentDataStoreConsul,
servicedef.CapabilityPersistentDataStoreDynamoDB,
servicedef.CapabilityFlagChangeListeners,
}

// gets the specified environment variable, or the default if not set
Expand Down
87 changes: 64 additions & 23 deletions testservice/servicedef/command_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@ import (
)

const (
CommandEvaluateFlag = "evaluate"
CommandEvaluateAllFlags = "evaluateAll"
CommandIdentifyEvent = "identifyEvent"
CommandCustomEvent = "customEvent"
CommandAliasEvent = "aliasEvent"
CommandFlushEvents = "flushEvents"
CommandGetBigSegmentStoreStatus = "getBigSegmentStoreStatus"
CommandContextBuild = "contextBuild"
CommandContextConvert = "contextConvert"
CommandSecureModeHash = "secureModeHash"
CommandMigrationVariation = "migrationVariation"
CommandMigrationOperation = "migrationOperation"
CommandEvaluateFlag = "evaluate"
CommandEvaluateAllFlags = "evaluateAll"
CommandIdentifyEvent = "identifyEvent"
CommandCustomEvent = "customEvent"
CommandAliasEvent = "aliasEvent"
CommandFlushEvents = "flushEvents"
CommandGetBigSegmentStoreStatus = "getBigSegmentStoreStatus"
CommandContextBuild = "contextBuild"
CommandContextConvert = "contextConvert"
CommandSecureModeHash = "secureModeHash"
CommandMigrationVariation = "migrationVariation"
CommandMigrationOperation = "migrationOperation"
CommandRegisterFlagChangeListener = "registerFlagChangeListener"
CommandRegisterFlagValueChangeListener = "registerFlagValueChangeListener"
CommandUnregisterListener = "unregisterListener"
)

type ValueType string
Expand All @@ -33,16 +36,19 @@ const (
)

type CommandParams struct {
Command string `json:"command"`
Evaluate *EvaluateFlagParams `json:"evaluate,omitempty"`
EvaluateAll *EvaluateAllFlagsParams `json:"evaluateAll,omitempty"`
CustomEvent *CustomEventParams `json:"customEvent,omitempty"`
IdentifyEvent *IdentifyEventParams `json:"identifyEvent,omitempty"`
ContextBuild *ContextBuildParams `json:"contextBuild,omitempty"`
ContextConvert *ContextConvertParams `json:"contextConvert,omitempty"`
SecureModeHash *SecureModeHashParams `json:"secureModeHash,omitempty"`
MigrationVariation *MigrationVariationParams `json:"migrationVariation,omitempty"`
MigrationOperation *MigrationOperationParams `json:"migrationOperation,omitempty"`
Command string `json:"command"`
Evaluate *EvaluateFlagParams `json:"evaluate,omitempty"`
EvaluateAll *EvaluateAllFlagsParams `json:"evaluateAll,omitempty"`
CustomEvent *CustomEventParams `json:"customEvent,omitempty"`
IdentifyEvent *IdentifyEventParams `json:"identifyEvent,omitempty"`
ContextBuild *ContextBuildParams `json:"contextBuild,omitempty"`
ContextConvert *ContextConvertParams `json:"contextConvert,omitempty"`
SecureModeHash *SecureModeHashParams `json:"secureModeHash,omitempty"`
MigrationVariation *MigrationVariationParams `json:"migrationVariation,omitempty"`
MigrationOperation *MigrationOperationParams `json:"migrationOperation,omitempty"`
RegisterFlagChangeListener *RegisterFlagChangeListenerParams `json:"registerFlagChangeListener,omitempty"` //nolint:lll
RegisterFlagValueChangeListener *RegisterFlagValueChangeListenerParams `json:"registerFlagValueChangeListener,omitempty"` //nolint:lll
UnregisterListener *UnregisterListenerParams `json:"unregisterListener,omitempty"`
}

type EvaluateFlagParams struct {
Expand Down Expand Up @@ -180,5 +186,40 @@ type HookExecutionEvaluationPayload struct {

type HookExecutionTrackPayload struct {
TrackSeriesContext TrackSeriesContext `json:"trackSeriesContext,omitempty"`
Stage HookStage `json:"stage,omitempty"`
Stage HookStage `json:"stage,omitempty"`
}

// RegisterFlagChangeListenerParams defines parameters for registering a general flag change listener.
// FlagKey may be empty to listen for changes to any flag, or non-empty to filter to a specific flag.
type RegisterFlagChangeListenerParams struct {
ListenerID string `json:"listenerId"`
FlagKey string `json:"flagKey"`
CallbackURI string `json:"callbackUri"`
}

// RegisterFlagValueChangeListenerParams defines parameters for registering a flag value change listener.
// The listener fires when the evaluated value of FlagKey changes for the given Context.
type RegisterFlagValueChangeListenerParams struct {
ListenerID string `json:"listenerId"`
FlagKey string `json:"flagKey"`
Context ldcontext.Context `json:"context"`
DefaultValue ldvalue.Value `json:"defaultValue"`
CallbackURI string `json:"callbackUri"`
}

// UnregisterListenerParams defines parameters for unregistering a previously registered listener.
// Works for both flag change and flag value change listeners.
type UnregisterListenerParams struct {
ListenerID string `json:"listenerId"`
}

// ListenerNotification is the JSON payload POSTed by the test service to a callback URI when a
// listener fires. OldValue and NewValue are only present for value-change notifications
// (registerFlagValueChangeListener); they are nil for general flag-change notifications
// (registerFlagChangeListener).
type ListenerNotification struct {
ListenerID string `json:"listenerId"`
FlagKey string `json:"flagKey"`
OldValue *ldvalue.Value `json:"oldValue,omitempty"`
NewValue *ldvalue.Value `json:"newValue,omitempty"`
}
1 change: 1 addition & 0 deletions testservice/servicedef/service_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
CapabilityPersistentDataStoreRedis = "persistent-data-store-redis"
CapabilityPersistentDataStoreConsul = "persistent-data-store-consul"
CapabilityPersistentDataStoreDynamoDB = "persistent-data-store-dynamodb"
CapabilityFlagChangeListeners = "flag-change-listeners"
)

type StatusRep struct {
Expand Down