Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/pkg/evaluator/fractional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func setupEvaluator(source string, flags []model.Flag) (*JSON, error) {
return nil, err
}
je := NewJSON(log, s)
je.store.Update(source, flags, model.Metadata{})
je.store.Update(source, flags, model.Metadata{}, false)
return je, nil
}

Expand Down
2 changes: 1 addition & 1 deletion core/pkg/evaluator/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (je *JSON) SetState(payload sync.DataSync) error {
return err
}

je.store.Update(payload.Source, definition.Flags, definition.Metadata)
je.store.Update(payload.Source, definition.Flags, definition.Metadata, payload.IncrementalUpdates)

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion core/pkg/evaluator/semver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ func TestJSONEvaluator_semVerEvaluation(t *testing.T) {
t.Fatalf("NewStore failed: %v", err)
}
je := NewJSON(log, s)
je.store.Update(source, tt.flags, model.Metadata{})
je.store.Update(source, tt.flags, model.Metadata{}, false)

value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)

Expand Down
4 changes: 2 additions & 2 deletions core/pkg/evaluator/string_comparison_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func TestJSONEvaluator_startsWithEvaluation(t *testing.T) {
t.Fatalf("NewStore failed: %v", err)
}
je := NewJSON(log, s)
je.store.Update(source, tt.flags, model.Metadata{})
je.store.Update(source, tt.flags, model.Metadata{}, false)

value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)

Expand Down Expand Up @@ -325,7 +325,7 @@ func TestJSONEvaluator_endsWithEvaluation(t *testing.T) {
t.Fatalf("NewStore failed: %v", err)
}
je := NewJSON(log, s)
je.store.Update(source, tt.flags, model.Metadata{})
je.store.Update(source, tt.flags, model.Metadata{}, false)

value, variant, reason, _, err := resolve[string](ctx, reqID, tt.flagKey, tt.context, je.evaluateVariant)

Expand Down
17 changes: 12 additions & 5 deletions core/pkg/store/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,17 @@ func (s *Selector) ToMetadata() model.Metadata {
}

func (s *Selector) ToLogString() string {
if s != nil && len(s.indexMap) == 1 {
for k, v := range s.indexMap {
return fmt.Sprintf("'%s=%s'", k, v)
}
if s == nil || len(s.indexMap) == 0 {
return "<none>"
}
return "<none>"
keys := make([]string, 0, len(s.indexMap))
for k := range s.indexMap {
keys = append(keys, k)
}
sort.Strings(keys)
parts := make([]string, 0, len(keys))
for _, k := range keys {
parts = append(parts, fmt.Sprintf("%s=%s", k, s.indexMap[k]))
}
return "'" + strings.Join(parts, ",") + "'"
}
44 changes: 40 additions & 4 deletions core/pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type IStore interface {
Get(ctx context.Context, key string, selector *Selector) (model.Flag, model.Metadata, error)
GetAll(ctx context.Context, selector *Selector) ([]model.Flag, model.Metadata, error)
Watch(ctx context.Context, selector *Selector, watcher chan<- FlagQueryResult)
Update(source string, flags []model.Flag, metadata model.Metadata)
Update(source string, flags []model.Flag, metadata model.Metadata, incrementalUpdate bool)
}

var _ IStore = (*Store)(nil)
Expand Down Expand Up @@ -214,10 +214,16 @@ type flagIdentifier struct {
}

// Update the flag state with the provided flags.
// When incrementalUpdate is true, deletion is scoped to only the flagSetIds present in
// this payload (from metadata and flag-level overrides), allowing flags from other
// flagSetIds to accumulate across updates. When false, all flags for the source are
// replaced (the default full-snapshot behavior).
// EXPERIMENTAL: incrementalUpdate support may change or be removed in a future release.
func (s *Store) Update(
source string,
flags []model.Flag,
metadata model.Metadata,
incrementalUpdate bool,
) {
if source == "" {
panic("source cannot be empty")
Expand Down Expand Up @@ -254,9 +260,39 @@ func (s *Store) Update(
txn := s.db.Txn(true)
defer txn.Abort()

// get all flags for the source we are updating
selector := NewSelector(sourceIndex + "=" + source)
oldFlags, _, _ := s.GetAll(context.Background(), &selector)
// When incrementalUpdate is enabled, scope deletion to only the flagSetIds touched
// by this payload (metadata-level + flag-level overrides). This allows per-flagSetId
// updates (e.g., from per-project stream messages) to accumulate without deleting
// flags from unrelated flagSetIds. Otherwise, replace all flags for the source.
var oldFlags []model.Flag
if incrementalUpdate {
seenFlagSetIds := make(map[string]struct{})
if fsi, ok := metadata["flagSetId"].(string); ok && fsi != "" {
seenFlagSetIds[fsi] = struct{}{}
}
for id := range newFlags {
seenFlagSetIds[id.flagSetId] = struct{}{}
}
for fsi := range seenFlagSetIds {
sel := NewSelector(flagSetIdIndex+"="+fsi).WithIndex(sourceIndex, source)
indexId, constraints := sel.ToQuery()
it, err := txn.Get(flagsTable, indexId, constraints...)
if err != nil {
s.logger.Error(fmt.Sprintf("unable to query flags for flagSetId %s: %v", fsi, err))
return
}
Comment thread
toddbaert marked this conversation as resolved.
oldFlags = append(oldFlags, s.collect(it)...)
}
} else {
sel := NewSelector(sourceIndex + "=" + source)
indexId, constraints := sel.ToQuery()
it, err := txn.Get(flagsTable, indexId, constraints...)
if err != nil {
s.logger.Error(fmt.Sprintf("unable to query flags for source %s: %v", source, err))
return
}
Comment thread
toddbaert marked this conversation as resolved.
oldFlags = s.collect(it)
}

for _, oldFlag := range oldFlags {
if _, ok := newFlags[flagIdentifier{flagSetId: oldFlag.FlagSetId, key: oldFlag.Key}]; !ok {
Expand Down
Loading
Loading