From 5dafda74c93de9e326c35025b2a01cd70998c229 Mon Sep 17 00:00:00 2001 From: Matias Melograno Date: Wed, 5 Nov 2025 12:43:53 -0300 Subject: [PATCH 1/4] moved common logic of changesItem --- splitio/proxy/storage/persistent/common.go | 38 ++++++++++++++++ .../proxy/storage/persistent/common_test.go | 45 +++++++++++++++++++ .../storage/persistent/rulebasedsegments.go | 3 ++ splitio/proxy/storage/persistent/splits.go | 36 +++------------ 4 files changed, 91 insertions(+), 31 deletions(-) create mode 100644 splitio/proxy/storage/persistent/common.go create mode 100644 splitio/proxy/storage/persistent/common_test.go create mode 100644 splitio/proxy/storage/persistent/rulebasedsegments.go diff --git a/splitio/proxy/storage/persistent/common.go b/splitio/proxy/storage/persistent/common.go new file mode 100644 index 00000000..fdcfa0da --- /dev/null +++ b/splitio/proxy/storage/persistent/common.go @@ -0,0 +1,38 @@ +package persistent + +// ChangesItem represents an changesItem service response +type ChangesItem struct { + ChangeNumber int64 `json:"changeNumber"` + Name string `json:"name"` + Status string `json:"status"` + JSON string +} + +// ChangesItem Sortable list +type ChangesItems struct { + items []ChangesItem +} + +func NewChangesItems(size int) ChangesItems { + return ChangesItems{ + items: make([]ChangesItem, 0, size), + } +} + +func (c *ChangesItems) Append(item ChangesItem) { + c.items = append(c.items, item) +} + +func (c *ChangesItems) Len() int { + return len(c.items) +} + +func (c *ChangesItems) Less(i, j int) bool { + return c.items[i].ChangeNumber > c.items[j].ChangeNumber +} + +func (c *ChangesItems) Swap(i, j int) { + c.items[i], c.items[j] = c.items[j], c.items[i] +} + +//---------------------------------------------------- diff --git a/splitio/proxy/storage/persistent/common_test.go b/splitio/proxy/storage/persistent/common_test.go new file mode 100644 index 00000000..a92904a6 --- /dev/null +++ b/splitio/proxy/storage/persistent/common_test.go @@ -0,0 +1,45 @@ +package persistent + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestChangesItem(t *testing.T) { + item := ChangesItem{ + ChangeNumber: 123, + Name: "test_split", + Status: "ACTIVE", + JSON: `{"name":"test_split","status":"ACTIVE"}`, + } + + items := NewChangesItems(2) + items.Append(item) + items.Append(ChangesItem{ + ChangeNumber: 124, + Name: "another_split", + Status: "ARCHIVED", + JSON: `{"name":"another_split","status":"ARCHIVED"}`, + }) + + assert.Equal(t, 2, items.Len(), "Expected length to be 2") + assert.Equal(t, "test_split", items.items[0].Name) + assert.Equal(t, int64(123), items.items[0].ChangeNumber) + assert.Equal(t, "ACTIVE", items.items[0].Status) + assert.Equal(t, `{"name":"test_split","status":"ACTIVE"}`, items.items[0].JSON) + assert.Equal(t, "another_split", items.items[1].Name) + assert.Equal(t, int64(124), items.items[1].ChangeNumber) + assert.Equal(t, "ARCHIVED", items.items[1].Status) + assert.Equal(t, `{"name":"another_split","status":"ARCHIVED"}`, items.items[1].JSON) + assert.True(t, items.Less(1, 0), "Expected item at index 1 to be less than item at index 0") + items.Swap(0, 1) + assert.Equal(t, "another_split", items.items[0].Name) + assert.Equal(t, int64(124), items.items[0].ChangeNumber) + assert.Equal(t, "ARCHIVED", items.items[0].Status) + assert.Equal(t, `{"name":"another_split","status":"ARCHIVED"}`, items.items[0].JSON) + assert.Equal(t, "test_split", items.items[1].Name) + assert.Equal(t, int64(123), items.items[1].ChangeNumber) + assert.Equal(t, "ACTIVE", items.items[1].Status) + assert.Equal(t, `{"name":"test_split","status":"ACTIVE"}`, items.items[1].JSON) +} diff --git a/splitio/proxy/storage/persistent/rulebasedsegments.go b/splitio/proxy/storage/persistent/rulebasedsegments.go new file mode 100644 index 00000000..e21960b4 --- /dev/null +++ b/splitio/proxy/storage/persistent/rulebasedsegments.go @@ -0,0 +1,3 @@ +package persistent + +const ruleBasedSegmentsChangesCollectionName = "RULE_BASED_SEGMENTS_CHANGES_COLLECTION" diff --git a/splitio/proxy/storage/persistent/splits.go b/splitio/proxy/storage/persistent/splits.go index c927f676..73cad538 100644 --- a/splitio/proxy/storage/persistent/splits.go +++ b/splitio/proxy/storage/persistent/splits.go @@ -12,31 +12,6 @@ import ( const splitChangesCollectionName = "SPLIT_CHANGES_COLLECTION" -// SplitChangesItem represents an SplitChanges service response -type SplitChangesItem struct { - ChangeNumber int64 `json:"changeNumber"` - Name string `json:"name"` - Status string `json:"status"` - JSON string -} - -// SplitsChangesItems Sortable list -type SplitsChangesItems []SplitChangesItem - -func (slice SplitsChangesItems) Len() int { - return len(slice) -} - -func (slice SplitsChangesItems) Less(i, j int) bool { - return slice[i].ChangeNumber > slice[j].ChangeNumber -} - -func (slice SplitsChangesItems) Swap(i, j int) { - slice[i], slice[j] = slice[j], slice[i] -} - -//---------------------------------------------------- - // SplitChangesCollection represents a collection of SplitChangesItem type SplitChangesCollection struct { collection CollectionWrapper @@ -54,15 +29,14 @@ func NewSplitChangesCollection(db DBWrapper, logger logging.LoggerInterface) *Sp // Update processes a set of feature flag changes items + a changeNumber bump atomically func (c *SplitChangesCollection) Update(toAdd []dtos.SplitDTO, toRemove []dtos.SplitDTO, cn int64) { - - items := make(SplitsChangesItems, 0, len(toAdd)+len(toRemove)) + items := NewChangesItems(len(toAdd) + len(toRemove)) process := func(split *dtos.SplitDTO) { asJSON, err := json.Marshal(split) if err != nil { // This should not happen unless the DTO class is broken return } - items = append(items, SplitChangesItem{ + items.Append(ChangesItem{ ChangeNumber: split.ChangeNumber, Name: split.Name, Status: split.Status, @@ -80,8 +54,8 @@ func (c *SplitChangesCollection) Update(toAdd []dtos.SplitDTO, toRemove []dtos.S c.mutex.Lock() defer c.mutex.Unlock() - for idx := range items { - err := c.collection.SaveAs([]byte(items[idx].Name), items[idx]) + for idx := range items.items { + err := c.collection.SaveAs([]byte(items.items[idx].Name), items.items[idx]) if err != nil { // TODO(mredolatti): log } @@ -102,7 +76,7 @@ func (c *SplitChangesCollection) FetchAll() ([]dtos.SplitDTO, error) { var decodeBuffer bytes.Buffer for _, v := range items { - var q SplitChangesItem + var q ChangesItem // resets buffer data decodeBuffer.Reset() decodeBuffer.Write(v) From 35fdde31c1dcffffbd774874ba6a044f0be97049 Mon Sep 17 00:00:00 2001 From: Matias Melograno Date: Wed, 5 Nov 2025 15:48:35 -0300 Subject: [PATCH 2/4] added snapshot --- splitio/admin/admin.go | 2 +- splitio/admin/controllers/dashboard.go | 1 + splitio/admin/controllers/helpers.go | 19 ++++ splitio/admin/controllers/helpers_test.go | 24 ++++ splitio/admin/controllers/snapshot.go | 11 +- splitio/admin/controllers/snapshot_test.go | 54 +++------ splitio/admin/views/dashboard/main.go | 9 ++ splitio/commitversion.go | 2 +- splitio/common/snapshot/snapshot.go | 17 +-- splitio/proxy/initialization.go | 11 +- .../storage/persistent/rulebasedsegments.go | 105 ++++++++++++++++++ .../persistent/rulebasedsegments_test.go | 60 ++++++++++ splitio/proxy/storage/rulebasedsegments.go | 48 +++++++- 13 files changed, 300 insertions(+), 63 deletions(-) create mode 100644 splitio/admin/controllers/helpers_test.go create mode 100644 splitio/proxy/storage/persistent/rulebasedsegments_test.go diff --git a/splitio/admin/admin.go b/splitio/admin/admin.go index 3d0da679..47c9f87c 100644 --- a/splitio/admin/admin.go +++ b/splitio/admin/admin.go @@ -96,7 +96,7 @@ func NewServer(options *Options) (*AdminServer, error) { observabilityController.Register(admin) if options.Snapshotter != nil { - snapshotController := controllers.NewSnapshotController(options.Logger, options.Snapshotter) + snapshotController := controllers.NewSnapshotController(options.Logger, options.Snapshotter, options.FlagSpecVersion) snapshotController.Register(admin) } diff --git a/splitio/admin/controllers/dashboard.go b/splitio/admin/controllers/dashboard.go index 49a40bbe..7222b06a 100644 --- a/splitio/admin/controllers/dashboard.go +++ b/splitio/admin/controllers/dashboard.go @@ -166,5 +166,6 @@ func (c *DashboardController) gatherStats() *dashboard.GlobalStats { LoggedMessages: errorMessages, Uptime: int64(c.runtime.Uptime().Seconds()), FlagSets: getFlagSetsInfo(c.storages.SplitStorage), + RuleBasedSegments: bundleRBInfo(c.storages.RuleBasedSegmentsStorage), } } diff --git a/splitio/admin/controllers/helpers.go b/splitio/admin/controllers/helpers.go index 4aa784f3..abc2aa36 100644 --- a/splitio/admin/controllers/helpers.go +++ b/splitio/admin/controllers/helpers.go @@ -316,3 +316,22 @@ func getProxyRequestCount(metrics storage.TelemetryRuntimeConsumer) (ok int64, e return okCount, errorCount } + +func bundleRBInfo(rbStorage storage.RuleBasedSegmentsStorage) []dashboard.RBSummary { + all := rbStorage.All() + summaries := make([]dashboard.RBSummary, 0, len(all)) + for _, segment := range all { + excludedSegments := make([]string, 0, len(segment.Excluded.Segments)) + for _, seg := range segment.Excluded.Segments { + excludedSegments = append(excludedSegments, seg.Name) + } + summaries = append(summaries, dashboard.RBSummary{ + Name: segment.Name, + ChangeNumber: segment.ChangeNumber, + Active: segment.Status == "ACTIVE", + ExcludedKeys: segment.Excluded.Keys, + ExcludedSegments: excludedSegments, + }) + } + return summaries +} diff --git a/splitio/admin/controllers/helpers_test.go b/splitio/admin/controllers/helpers_test.go new file mode 100644 index 00000000..a7d90280 --- /dev/null +++ b/splitio/admin/controllers/helpers_test.go @@ -0,0 +1,24 @@ +package controllers + +import ( + "testing" + + "github.com/splitio/go-split-commons/v8/dtos" + "github.com/splitio/go-split-commons/v8/storage/mocks" + "github.com/splitio/split-synchronizer/v5/splitio/admin/views/dashboard" + "github.com/stretchr/testify/assert" +) + +func TestBundleRBInfo(t *testing.T) { + rb := &mocks.MockRuleBasedSegmentStorage{} + rb.On("All").Return([]dtos.RuleBasedSegmentDTO{ + {Name: "rb1", ChangeNumber: 1, Status: "ACTIVE", Excluded: dtos.ExcludedDTO{Keys: []string{"one"}}}, + {Name: "rb2", ChangeNumber: 2, Status: "ARCHIVED"}, + }, nil) + result := bundleRBInfo(rb) + assert.Len(t, result, 2) + assert.ElementsMatch(t, result, []dashboard.RBSummary{ + {Name: "rb1", ChangeNumber: 1, Active: true, ExcludedKeys: []string{"one"}, ExcludedSegments: []string{}}, + {Name: "rb2", ChangeNumber: 2, Active: false, ExcludedKeys: nil, ExcludedSegments: []string{}}, + }) +} diff --git a/splitio/admin/controllers/snapshot.go b/splitio/admin/controllers/snapshot.go index 2bc933af..657907b3 100644 --- a/splitio/admin/controllers/snapshot.go +++ b/splitio/admin/controllers/snapshot.go @@ -14,13 +14,14 @@ import ( // SnapshotController bundles endpoints associated to snapshot management type SnapshotController struct { - logger logging.LoggerInterface - db storage.Snapshotter + logger logging.LoggerInterface + db storage.Snapshotter + version string } // NewSnapshotController constructs a new snapshot controller -func NewSnapshotController(logger logging.LoggerInterface, db storage.Snapshotter) *SnapshotController { - return &SnapshotController{logger: logger, db: db} +func NewSnapshotController(logger logging.LoggerInterface, db storage.Snapshotter, version string) *SnapshotController { + return &SnapshotController{logger: logger, db: db, version: version} } // Register mounts the endpoints int he provided router @@ -38,7 +39,7 @@ func (c *SnapshotController) downloadSnapshot(ctx *gin.Context) { return } - s, err := snapshot.New(snapshot.Metadata{Version: 1, Storage: snapshot.StorageBoltDB}, b) + s, err := snapshot.New(snapshot.Metadata{Version: 1, Storage: snapshot.StorageBoltDB, SpecVersion: c.version}, b) if err != nil { c.logger.Error("error building snapshot: ", err) ctx.JSON(http.StatusInternalServerError, gin.H{"error": "error building snapshot"}) diff --git a/splitio/admin/controllers/snapshot_test.go b/splitio/admin/controllers/snapshot_test.go index 28d0e69c..77394a7e 100644 --- a/splitio/admin/controllers/snapshot_test.go +++ b/splitio/admin/controllers/snapshot_test.go @@ -2,7 +2,7 @@ package controllers import ( "bytes" - "io/ioutil" + "io" "net/http" "net/http/httptest" "testing" @@ -13,31 +13,23 @@ import ( "github.com/splitio/go-toolkit/v5/logging" "github.com/gin-gonic/gin" + "github.com/stretchr/testify/assert" ) func TestDownloadProxySnapshot(t *testing.T) { // Read DB snapshot for test path := "../../../test/snapshot/proxy.snapshot" snap, err := snapshot.DecodeFromFile(path) - if err != nil { - t.Error(err) - return - } + assert.Nil(t, err) tmpDataFile, err := snap.WriteDataToTmpFile() - if err != nil { - t.Error(err) - return - } + assert.Nil(t, err) // loading snapshot from disk dbInstance, err := persistent.NewBoltWrapper(tmpDataFile, nil) - if err != nil { - t.Error(err) - return - } + assert.Nil(t, err) - ctrl := NewSnapshotController(logging.NewLogger(nil), dbInstance) + ctrl := NewSnapshotController(logging.NewLogger(nil), dbInstance, "1.3") resp := httptest.NewRecorder() ctx, router := gin.CreateTestContext(resp) @@ -46,35 +38,19 @@ func TestDownloadProxySnapshot(t *testing.T) { ctx.Request, _ = http.NewRequest(http.MethodGet, "/snapshot", nil) router.ServeHTTP(resp, ctx.Request) - responseBody, err := ioutil.ReadAll(resp.Body) - if err != nil { - t.Error(err) - return - } + responseBody, err := io.ReadAll(resp.Body) + assert.Nil(t, err) snapRes, err := snapshot.Decode(responseBody) - if err != nil { - t.Error(err) - return - } + assert.Nil(t, err) - if snapRes.Meta().Version != 1 { - t.Error("Invalid Metadata version") - } - - if snapRes.Meta().Storage != 1 { - t.Error("Invalid Metadata storage") - } + assert.Equal(t, uint64(1), snapRes.Meta().Version) + assert.Equal(t, uint64(1), snapRes.Meta().Storage) + assert.Equal(t, "1.3", snapRes.Meta().SpecVersion) dat, err := snap.Data() - if err != nil { - t.Error(err) - } + assert.Nil(t, err) resData, err := snapRes.Data() - if err != nil { - t.Error(err) - } - if bytes.Compare(dat, resData) != 0 { - t.Error("loaded snapshot is different to downloaded") - } + assert.Nil(t, err) + assert.Equal(t, 0, bytes.Compare(dat, resData)) } diff --git a/splitio/admin/views/dashboard/main.go b/splitio/admin/views/dashboard/main.go index 876fc873..6149a30c 100644 --- a/splitio/admin/views/dashboard/main.go +++ b/splitio/admin/views/dashboard/main.go @@ -108,6 +108,7 @@ type GlobalStats struct { EventsLambda float64 `json:"eventsLambda"` Uptime int64 `json:"uptime"` FlagSets []FlagSetsSummary `json:"flagSets"` + RuleBasedSegments []RBSummary `json:"ruleBasedSegments"` } // SplitSummary encapsulates a minimalistic view of feature flag properties to be presented in the dashboard @@ -151,6 +152,14 @@ type FlagSetsSummary struct { FeatureFlags string `json:"featureFlags"` } +type RBSummary struct { + Name string `json:"name"` + ChangeNumber int64 `json:"cn"` + Active bool `json:"active"` + ExcludedKeys []string `json:"excludedKeys"` + ExcludedSegments []string `json:"excludedSegments"` +} + // RGBA bundles input to CSS's rgba function type RGBA struct { Red int32 diff --git a/splitio/commitversion.go b/splitio/commitversion.go index e0846185..a80119f6 100644 --- a/splitio/commitversion.go +++ b/splitio/commitversion.go @@ -5,4 +5,4 @@ This file is created automatically, please do not edit */ // CommitVersion is the version of the last commit previous to release -const CommitVersion = "1fbc498" +const CommitVersion = "5dafda7" diff --git a/splitio/common/snapshot/snapshot.go b/splitio/common/snapshot/snapshot.go index 71613ca3..20e4e15a 100644 --- a/splitio/common/snapshot/snapshot.go +++ b/splitio/common/snapshot/snapshot.go @@ -7,6 +7,7 @@ import ( "encoding/gob" "errors" "fmt" + "io" "io/ioutil" "os" "path/filepath" @@ -38,8 +39,9 @@ var ErrMetadataRead = errors.New("snapshot metadata cannot be decoded") // Metadata represents the Snapshot metadata object type Metadata struct { - Version uint64 - Storage uint64 + Version uint64 + Storage uint64 + SpecVersion string } // Snapshot represents a snapshot struct with metadata and data @@ -71,7 +73,7 @@ func (s *Snapshot) Meta() Metadata { func (s *Snapshot) Data() ([]byte, error) { gz, err := gzip.NewReader(bytes.NewBuffer(s.data)) defer gz.Close() - data, err := ioutil.ReadAll(gz) + data, err := io.ReadAll(gz) if err != nil { return nil, fmt.Errorf("error reading gzip data: %w", err) } @@ -80,11 +82,12 @@ func (s *Snapshot) Data() ([]byte, error) { // Encode returns the bytes slice snapshot representation // Snapshot Layout: -// |metadata-size|metadata|data| // -// metadata-size: uint64 (8 bytes) specifies the amount of metadata bytes -// metadata: Gob encoded of Metadata struct -// data: Proxy data, byte slice. The Metadata have information about it, Storage, Gzipped and version. +// |metadata-size|metadata|data| +// +// metadata-size: uint64 (8 bytes) specifies the amount of metadata bytes +// metadata: Gob encoded of Metadata struct +// data: Proxy data, byte slice. The Metadata have information about it, Storage, Gzipped and version. func (s *Snapshot) Encode() ([]byte, error) { metaBytes, err := metaToBytes(s.meta) diff --git a/splitio/proxy/initialization.go b/splitio/proxy/initialization.go index 60a129a4..82bc2b5e 100644 --- a/splitio/proxy/initialization.go +++ b/splitio/proxy/initialization.go @@ -84,7 +84,7 @@ func Start(logger logging.LoggerInterface, cfg *pconf.Main) error { // Proxy storages already implement the observable interface, so no need to wrap them splitStorage := storage.NewProxySplitStorage(dbInstance, logger, flagsets.NewFlagSetFilter(cfg.FlagSetsFilter), cfg.Initialization.Snapshot != "") - ruleBasedStorage := storage.NewProxyRuleBasedSegmentsStorage(logger) + ruleBasedStorage := storage.NewProxyRuleBasedSegmentsStorage(dbInstance, logger, cfg.Initialization.Snapshot != "") segmentStorage := storage.NewProxySegmentStorage(dbInstance, logger, cfg.Initialization.Snapshot != "") largeSegmentStorage := inmemory.NewLargeSegmentsStorage() @@ -208,10 +208,11 @@ func Start(logger logging.LoggerInterface, cfg *pconf.Main) error { rtm := common.NewRuntime(false, syncManager, logger, "Split Proxy", nil, nil, appMonitor, servicesMonitor) storages := adminCommon.Storages{ - SplitStorage: splitStorage, - SegmentStorage: segmentStorage, - LocalTelemetryStorage: localTelemetryStorage, - LargeSegmentStorage: largeSegmentStorage, + SplitStorage: splitStorage, + SegmentStorage: segmentStorage, + LocalTelemetryStorage: localTelemetryStorage, + LargeSegmentStorage: largeSegmentStorage, + RuleBasedSegmentsStorage: ruleBasedStorage, } // --------------------------- ADMIN DASHBOARD ------------------------------ diff --git a/splitio/proxy/storage/persistent/rulebasedsegments.go b/splitio/proxy/storage/persistent/rulebasedsegments.go index e21960b4..8361a302 100644 --- a/splitio/proxy/storage/persistent/rulebasedsegments.go +++ b/splitio/proxy/storage/persistent/rulebasedsegments.go @@ -1,3 +1,108 @@ package persistent +import ( + "bytes" + "encoding/gob" + "encoding/json" + "sync" + + "github.com/splitio/go-split-commons/v8/dtos" + "github.com/splitio/go-toolkit/v5/logging" +) + const ruleBasedSegmentsChangesCollectionName = "RULE_BASED_SEGMENTS_CHANGES_COLLECTION" + +// RBChangesCollection represents a collection of ChangesItem for rule-based segments +type RBChangesCollection struct { + collection CollectionWrapper + changeNumber int64 + mutex sync.RWMutex +} + +// NewRBChangesCollection returns an instance of RBChangesCollection +func NewRBChangesCollection(db DBWrapper, logger logging.LoggerInterface) *RBChangesCollection { + return &RBChangesCollection{ + collection: &BoltDBCollectionWrapper{db: db, name: ruleBasedSegmentsChangesCollectionName, logger: logger}, + changeNumber: 0, + } +} + +// Update processes a set of rule based changes items + a changeNumber bump atomically +func (c *RBChangesCollection) Update(toAdd []dtos.RuleBasedSegmentDTO, toRemove []dtos.RuleBasedSegmentDTO, cn int64) { + items := NewChangesItems(len(toAdd) + len(toRemove)) + process := func(rb *dtos.RuleBasedSegmentDTO) { + asJSON, err := json.Marshal(rb) + if err != nil { + // This should not happen unless the DTO class is broken + return + } + items.Append(ChangesItem{ + ChangeNumber: rb.ChangeNumber, + Name: rb.Name, + Status: rb.Status, + JSON: string(asJSON), + }) + } + + for _, rb := range toAdd { + process(&rb) + } + + for _, rb := range toRemove { + process(&rb) + } + + c.mutex.Lock() + defer c.mutex.Unlock() + for idx := range items.items { + err := c.collection.SaveAs([]byte(items.items[idx].Name), items.items[idx]) + if err != nil { + // TODO(mredolatti): log + } + } + c.changeNumber = cn +} + +// FetchAll return a ChangesItem +func (c *RBChangesCollection) FetchAll() ([]dtos.RuleBasedSegmentDTO, error) { + c.mutex.RLock() + defer c.mutex.RUnlock() + items, err := c.collection.FetchAll() + if err != nil { + return nil, err + } + + toReturn := make([]dtos.RuleBasedSegmentDTO, 0) + + var decodeBuffer bytes.Buffer + for _, v := range items { + var q ChangesItem + // resets buffer data + decodeBuffer.Reset() + decodeBuffer.Write(v) + dec := gob.NewDecoder(&decodeBuffer) + + errq := dec.Decode(&q) + if errq != nil { + c.collection.Logger().Error("decode error:", errq, "|", string(v)) + continue + } + + var parsed dtos.RuleBasedSegmentDTO + err := json.Unmarshal([]byte(q.JSON), &parsed) + if err != nil { + c.collection.Logger().Error("error decoding feature flag fetched from db: ", err, "|", q.JSON) + continue + } + toReturn = append(toReturn, parsed) + } + + return toReturn, nil +} + +// ChangeNumber returns changeNumber +func (c *RBChangesCollection) ChangeNumber() int64 { + c.mutex.RLock() + defer c.mutex.RUnlock() + return c.changeNumber +} diff --git a/splitio/proxy/storage/persistent/rulebasedsegments_test.go b/splitio/proxy/storage/persistent/rulebasedsegments_test.go new file mode 100644 index 00000000..512eba75 --- /dev/null +++ b/splitio/proxy/storage/persistent/rulebasedsegments_test.go @@ -0,0 +1,60 @@ +package persistent + +import ( + "testing" + + "github.com/splitio/go-split-commons/v8/dtos" + "github.com/splitio/go-toolkit/v5/logging" +) + +func TestRBChangesCollection(t *testing.T) { + dbw, err := NewBoltWrapper(BoltInMemoryMode, nil) + if err != nil { + t.Error("error creating bolt wrapper: ", err) + } + + logger := logging.NewLogger(nil) + rbChangesCollection := NewRBChangesCollection(dbw, logger) + + rbChangesCollection.Update([]dtos.RuleBasedSegmentDTO{ + {Name: "rb1", ChangeNumber: 1, Status: "ACTIVE"}, + {Name: "rb2", ChangeNumber: 1, Status: "ACTIVE"}, + }, nil, 1) + + all, err := rbChangesCollection.FetchAll() + if err != nil { + t.Error("FetchAll should not return an error. Got: ", err) + } + + if len(all) != 2 { + t.Error("invalid number of items fetched.") + return + } + + if all[0].Name != "rb1" || all[1].Name != "rb2" { + t.Error("Invalid payload in fetched changes.") + } + + if rbChangesCollection.ChangeNumber() != 1 { + t.Error("CN should be 1.") + } + + rbChangesCollection.Update([]dtos.RuleBasedSegmentDTO{{Name: "rb1", ChangeNumber: 2, Status: "ARCHIVED"}}, nil, 2) + all, err = rbChangesCollection.FetchAll() + if err != nil { + t.Error("FetchAll should not return an error. Got: ", err) + } + + if len(all) != 2 { + t.Error("invalid number of items fetched.") + return + } + + if all[0].Name != "rb1" || all[0].Status != "ARCHIVED" { + t.Error("rb1 should be archived.") + } + + if rbChangesCollection.ChangeNumber() != 2 { + t.Error("CN should be 2.") + } +} diff --git a/splitio/proxy/storage/rulebasedsegments.go b/splitio/proxy/storage/rulebasedsegments.go index e28d3333..db62f3ee 100644 --- a/splitio/proxy/storage/rulebasedsegments.go +++ b/splitio/proxy/storage/rulebasedsegments.go @@ -4,13 +4,15 @@ import ( "fmt" "sync" + "github.com/splitio/split-synchronizer/v5/splitio/proxy/storage/optimized" + "github.com/splitio/split-synchronizer/v5/splitio/proxy/storage/persistent" + "github.com/splitio/go-split-commons/v8/dtos" "github.com/splitio/go-split-commons/v8/engine/grammar/constants" "github.com/splitio/go-split-commons/v8/storage" "github.com/splitio/go-split-commons/v8/storage/inmemory/mutexmap" "github.com/splitio/go-toolkit/v5/datastructures/set" "github.com/splitio/go-toolkit/v5/logging" - "github.com/splitio/split-synchronizer/v5/splitio/proxy/storage/optimized" ) // ProxyRuleBasedSegmentsStorage defines the interface of a storage that can be used for serving payloads @@ -22,6 +24,7 @@ type ProxyRuleBasedSegmentsStorage interface { // ProxyRuleBasedSegmentsStorageImpl implements the ProxyRuleBasedSegmentsStorage interface and the RuleBasedSegmentProducer interface type ProxyRuleBasedSegmentsStorageImpl struct { snapshot mutexmap.RuleBasedSegmentsStorageImpl + db *persistent.RBChangesCollection logger logging.LoggerInterface oldestKnownCN int64 mtx sync.Mutex @@ -30,16 +33,21 @@ type ProxyRuleBasedSegmentsStorageImpl struct { // NewProxyRuleBasedSegmentsStorage instantiates a new proxy storage that wraps an in-memory snapshot of the last known // flag configuration -func NewProxyRuleBasedSegmentsStorage(logger logging.LoggerInterface) *ProxyRuleBasedSegmentsStorageImpl { +func NewProxyRuleBasedSegmentsStorage(db persistent.DBWrapper, logger logging.LoggerInterface, restoreBackup bool) *ProxyRuleBasedSegmentsStorageImpl { + disk := persistent.NewRBChangesCollection(db, logger) snapshot := mutexmap.NewRuleBasedSegmentsStorage() historic := optimized.NewHistoricRBChanges(1000) - var initialCN int64 = -1 + var initialCN int64 = -1 + if restoreBackup { + initialCN = snapshotFromDiskRB(snapshot, historic, disk, logger) + } return &ProxyRuleBasedSegmentsStorageImpl{ snapshot: *snapshot, + db: disk, + historic: historic, logger: logger, oldestKnownCN: initialCN, - historic: historic, } } @@ -53,6 +61,36 @@ func (p *ProxyRuleBasedSegmentsStorageImpl) sinceIsTooOld(since int64) bool { return since < p.oldestKnownCN } +func snapshotFromDiskRB( + dst *mutexmap.RuleBasedSegmentsStorageImpl, + historic optimized.HistoricChangesRB, + src *persistent.RBChangesCollection, + logger logging.LoggerInterface, +) int64 { + all, err := src.FetchAll() + if err != nil { + logger.Error("error parsing feature flags from snapshot. No data will be available!: ", err) + return -1 + } + + var filtered []dtos.RuleBasedSegmentDTO + var cn = src.ChangeNumber() + for idx := range all { + + // Make sure the CN matches is at least large as the payloads' max. + if thisCN := all[idx].ChangeNumber; thisCN > cn { + cn = thisCN + } + if all[idx].Status == constants.SplitStatusActive { + filtered = append(filtered, all[idx]) + } + } + + dst.Update(filtered, nil, cn) + historic.Update(filtered, nil, cn) + return cn +} + func archivedRBDTOForView(view *optimized.RBView) dtos.RuleBasedSegmentDTO { return dtos.RuleBasedSegmentDTO{ ChangeNumber: view.LastUpdated, @@ -168,7 +206,7 @@ func (p *ProxyRuleBasedSegmentsStorageImpl) Update(toAdd []dtos.RuleBasedSegment p.mtx.Lock() p.snapshot.Update(toAdd, toRemove, changeNumber) p.historic.Update(toAdd, toRemove, changeNumber) - // p.db.Update(toAdd, toRemove, changeNumber) + p.db.Update(toAdd, toRemove, changeNumber) p.mtx.Unlock() return nil } From 4f19e348a5179456515bf7537ba40296a4b1b465 Mon Sep 17 00:00:00 2001 From: Matias Melograno Date: Wed, 5 Nov 2025 15:56:43 -0300 Subject: [PATCH 3/4] added flag validation into snapshot --- splitio/commitversion.go | 2 +- splitio/proxy/initialization.go | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/splitio/commitversion.go b/splitio/commitversion.go index a80119f6..d1705cd1 100644 --- a/splitio/commitversion.go +++ b/splitio/commitversion.go @@ -5,4 +5,4 @@ This file is created automatically, please do not edit */ // CommitVersion is the version of the last commit previous to release -const CommitVersion = "5dafda7" +const CommitVersion = "35fdde3" diff --git a/splitio/proxy/initialization.go b/splitio/proxy/initialization.go index 82bc2b5e..f4628bd2 100644 --- a/splitio/proxy/initialization.go +++ b/splitio/proxy/initialization.go @@ -57,6 +57,10 @@ func Start(logger logging.LoggerInterface, cfg *pconf.Main) error { return fmt.Errorf("error writing temporary snapshot file: %w", err) } + if snap.Meta().SpecVersion != cfg.FlagSpecVersion { + return common.NewInitError(fmt.Errorf("snapshot spec version %s does not match config spec version %s", snap.Meta().SpecVersion, cfg.FlagSpecVersion), common.ExitErrorDB) + } + logger.Debug("Database created from snapshot at", dbpath) } From 88290d5bf4302bcc33d0d2b990e390fc7d3e7c22 Mon Sep 17 00:00:00 2001 From: Nadia Mayor Date: Thu, 6 Nov 2025 13:02:15 -0300 Subject: [PATCH 4/4] Added rule-based segments in the dashboard --- splitio/admin/controllers/dashboard.go | 1 + splitio/admin/controllers/helpers.go | 39 +++++++++++++ .../admin/views/dashboard/datainspector.go | 57 +++++++++++++++++++ splitio/admin/views/dashboard/js.go | 54 ++++++++++++++++++ splitio/admin/views/dashboard/main.go | 53 ++++++++++------- splitio/commitversion.go | 2 +- splitio/proxy/initialization.go | 9 +-- 7 files changed, 191 insertions(+), 24 deletions(-) diff --git a/splitio/admin/controllers/dashboard.go b/splitio/admin/controllers/dashboard.go index 49a40bbe..a51ee444 100644 --- a/splitio/admin/controllers/dashboard.go +++ b/splitio/admin/controllers/dashboard.go @@ -150,6 +150,7 @@ func (c *DashboardController) gatherStats() *dashboard.GlobalStats { FeatureFlags: bundleSplitInfo(c.storages.SplitStorage), Segments: bundleSegmentInfo(c.storages.SplitStorage, c.storages.SegmentStorage), LargeSegments: bundleLargeSegmentInfo(c.storages.SplitStorage, c.storages.LargeSegmentStorage), + RuleBasedSegments: bundleRuleBasedInfo(c.storages.SplitStorage, c.storages.RuleBasedSegmentsStorage), Latencies: bundleProxyLatencies(c.storages.LocalTelemetryStorage), BackendLatencies: bundleLocalSyncLatencies(c.storages.LocalTelemetryStorage), ImpressionsQueueSize: getImpressionSize(c.storages.ImpressionStorage), diff --git a/splitio/admin/controllers/helpers.go b/splitio/admin/controllers/helpers.go index 4aa784f3..b2288b28 100644 --- a/splitio/admin/controllers/helpers.go +++ b/splitio/admin/controllers/helpers.go @@ -108,6 +108,45 @@ func bundleSegmentInfo(splitStorage storage.SplitStorage, segmentStorage storage return summaries } +func bundleRuleBasedInfo(splitStorage storage.SplitStorage, ruleBasedSegmentStorage storage.RuleBasedSegmentStorageConsumer) []dashboard.RuleBasedSegmentSummary { + + names := splitStorage.RuleBasedSegmentNames() + summaries := make([]dashboard.RuleBasedSegmentSummary, 0, names.Size()) + + for _, name := range names.List() { + strName, ok := name.(string) + if !ok { + continue + } + ruleBased, err := ruleBasedSegmentStorage.GetRuleBasedSegmentByName(strName) + + if err != nil { + continue + } + excluededSegments := make([]dashboard.ExcluededSegments, 0, len(ruleBased.Excluded.Segments)) + for _, excludedSegment := range ruleBased.Excluded.Segments { + excluededSegments = append(excluededSegments, dashboard.ExcluededSegments{ + Name: excludedSegment.Name, + Type: excludedSegment.Type, + }) + } + + if ruleBased.Excluded.Keys == nil { + ruleBased.Excluded.Keys = make([]string, 0) + } + + summaries = append(summaries, dashboard.RuleBasedSegmentSummary{ + Name: ruleBased.Name, + Active: ruleBased.Status == "ACTIVE", + ExcludedKeys: ruleBased.Excluded.Keys, + ExcluededSegments: excluededSegments, + LastModified: time.Unix(0, ruleBased.ChangeNumber*int64(time.Millisecond)).UTC().Format(time.UnixDate), + ChangeNumber: ruleBased.ChangeNumber, + }) + } + return summaries +} + func bundleSegmentKeysInfo(name string, segmentStorage storage.SegmentStorageConsumer) []dashboard.SegmentKeySummary { keys := segmentStorage.Keys(name) diff --git a/splitio/admin/views/dashboard/datainspector.go b/splitio/admin/views/dashboard/datainspector.go index f4f63577..3af58c63 100644 --- a/splitio/admin/views/dashboard/datainspector.go +++ b/splitio/admin/views/dashboard/datainspector.go @@ -38,6 +38,19 @@ const dataInspector = ` {{end}} +
  • + + +  Rule-based Segments + +