diff --git a/splitio/admin/views/dashboard/js.go b/splitio/admin/views/dashboard/js.go
index 434766dd..48344789 100644
--- a/splitio/admin/views/dashboard/js.go
+++ b/splitio/admin/views/dashboard/js.go
@@ -86,6 +86,23 @@ const mainScript = `
}
});
}
+
+ function resetFilterRuleBasedSegments(){
+ $("tr.ruleBasedItem").removeClass("filterDisplayNone");
+ $("#filterRuleBasedSegmentNameInput").val("");
+ }
+
+ function filterRuleBasedSegments(){
+ $("tr.ruleBasedItem").removeClass("filterDisplayNone");
+ var filter = $("#filterRuleBasedSegmentNameInput").val();
+ $("tr.ruleBasedItem").each(function() {
+ $this = $(this);
+ var ruleBasedName = $this.find("span.ruleBasedItemName").html();
+ if (ruleBasedName.indexOf(filter.trim()) == -1) {
+ $this.addClass("filterDisplayNone");
+ }
+ });
+ }
$(function () {
$('[data-toggle="tooltip"]').tooltip()
@@ -255,6 +272,42 @@ const mainScript = `
}
};
+ function formatRuleBasedSegment(ruleBasedSegment) {
+ var excludedSegments = Array.isArray(ruleBasedSegment.excludedSegments)
+ ? ruleBasedSegment.excludedSegments
+ : [];
+
+ var excludedSegmentsHtml = excludedSegments.length
+ ? excludedSegments.map(function(seg, i) {
+ var segName = seg && seg.name ? seg.name : 'Unnamed';
+ var segType = seg && seg.type ? seg.type : 'Unknown';
+ var separator = i < excludedSegments.length - 1 ? ', ' : '';
+ return '' + segName + ' (' + segType + ')' + separator;
+ }).join('')
+ : '—';
+
+ return (
+ '
' +
+ '| ' + ruleBasedSegment.name + ' | ' +
+ (ruleBasedSegment.active
+ ? 'ACTIVE | '
+ : 'ARCHIVED | ') +
+ '' + (ruleBasedSegment.excludedKeys || '') + ' | ' +
+ '' + excludedSegmentsHtml + ' | ' +
+ '' + (ruleBasedSegment.cn || '') + ' | ' +
+ '
\n'
+ );
+ }
+
+ function updateRuleBasedSegments(ruleBasedSegments) {
+ ruleBasedSegments.sort((a, b) => parseFloat(b.changeNumber) - parseFloat(a.changeNumber));
+ const formatted = ruleBasedSegments.map(formatRuleBasedSegment).join('\n');
+ if (document.getElementById('filterRuleBasedSegmentNameInput').value.length == 0) {
+ $('#rule_based_segment_rows tbody').empty();
+ $('#rule_based_segment_rows tbody').append(formatted);
+ }
+ };
+
function formatFlagSet(flagSet) {
return (
'' +
@@ -443,6 +496,7 @@ const mainScript = `
updateFeatureFlags(stats.featureFlags);
updateSegments(stats.segments);
updateLargeSegments(stats.largesegments);
+ updateRuleBasedSegments(stats.rulebasedsegments)
updateLogEntries(stats.loggedMessages);
updateFlagSets(stats.flagSets)
diff --git a/splitio/admin/views/dashboard/main.go b/splitio/admin/views/dashboard/main.go
index 876fc873..757a14dd 100644
--- a/splitio/admin/views/dashboard/main.go
+++ b/splitio/admin/views/dashboard/main.go
@@ -89,25 +89,26 @@ type RootObject struct {
// GlobalStats runtime stats used to render the dashboard
type GlobalStats struct {
- BackendTotalRequests int64 `json:"backendTotalRequests"`
- RequestsOk int64 `json:"requestsOk"`
- RequestsErrored int64 `json:"requestsErrored"`
- BackendRequestsOk int64 `json:"backendRequestsOk"`
- BackendRequestsErrored int64 `json:"backendRequestsErrored"`
- SdksTotalRequests int64 `json:"sdksTotalRequests"`
- LoggedErrors int64 `json:"loggedErrors"`
- LoggedMessages []string `json:"loggedMessages"`
- FeatureFlags []SplitSummary `json:"featureFlags"`
- Segments []SegmentSummary `json:"segments"`
- LargeSegments []LargeSegmentSummary `json:"largesegments"`
- Latencies []ChartJSData `json:"latencies"`
- BackendLatencies []ChartJSData `json:"backendLatencies"`
- ImpressionsQueueSize int64 `json:"impressionsQueueSize"`
- ImpressionsLambda float64 `json:"impressionsLambda"`
- EventsQueueSize int64 `json:"eventsQueueSize"`
- EventsLambda float64 `json:"eventsLambda"`
- Uptime int64 `json:"uptime"`
- FlagSets []FlagSetsSummary `json:"flagSets"`
+ BackendTotalRequests int64 `json:"backendTotalRequests"`
+ RequestsOk int64 `json:"requestsOk"`
+ RequestsErrored int64 `json:"requestsErrored"`
+ BackendRequestsOk int64 `json:"backendRequestsOk"`
+ BackendRequestsErrored int64 `json:"backendRequestsErrored"`
+ SdksTotalRequests int64 `json:"sdksTotalRequests"`
+ LoggedErrors int64 `json:"loggedErrors"`
+ LoggedMessages []string `json:"loggedMessages"`
+ FeatureFlags []SplitSummary `json:"featureFlags"`
+ Segments []SegmentSummary `json:"segments"`
+ LargeSegments []LargeSegmentSummary `json:"largesegments"`
+ RuleBasedSegments []RuleBasedSegmentSummary `json:"rulebasedsegments"`
+ Latencies []ChartJSData `json:"latencies"`
+ BackendLatencies []ChartJSData `json:"backendLatencies"`
+ ImpressionsQueueSize int64 `json:"impressionsQueueSize"`
+ ImpressionsLambda float64 `json:"impressionsLambda"`
+ EventsQueueSize int64 `json:"eventsQueueSize"`
+ EventsLambda float64 `json:"eventsLambda"`
+ Uptime int64 `json:"uptime"`
+ FlagSets []FlagSetsSummary `json:"flagSets"`
}
// SplitSummary encapsulates a minimalistic view of feature flag properties to be presented in the dashboard
@@ -137,6 +138,20 @@ type LargeSegmentSummary struct {
LastModified string `json:"cn"`
}
+type RuleBasedSegmentSummary struct {
+ Name string `json:"name"`
+ Active bool `json:"active"`
+ ExcludedKeys []string `json:"excludedKeys"`
+ ExcludedSegments []ExcludedSegments `json:"excludedSegments"`
+ LastModified string `json:"cn"`
+ ChangeNumber int64 `json:"changeNumber"`
+}
+
+type ExcludedSegments struct {
+ Name string `json:"name"`
+ Type string `json:"type"`
+}
+
// SegmentKeySummary encapsulates basic information associated to the key in proxy mode
// (fields other than name are empty when running as producer
type SegmentKeySummary struct {
diff --git a/splitio/commitversion.go b/splitio/commitversion.go
index 2b9ff1ac..ff47aa2b 100644
--- a/splitio/commitversion.go
+++ b/splitio/commitversion.go
@@ -5,4 +5,4 @@ This file is created automatically, please do not edit
*/
// CommitVersion is the version of the last commit previous to release
-const CommitVersion = "514a95c"
+const CommitVersion = "8fd26d7"
diff --git a/splitio/common/snapshot/snapshot.go b/splitio/common/snapshot/snapshot.go
index 71613ca3..20e4e15a 100644
--- a/splitio/common/snapshot/snapshot.go
+++ b/splitio/common/snapshot/snapshot.go
@@ -7,6 +7,7 @@ import (
"encoding/gob"
"errors"
"fmt"
+ "io"
"io/ioutil"
"os"
"path/filepath"
@@ -38,8 +39,9 @@ var ErrMetadataRead = errors.New("snapshot metadata cannot be decoded")
// Metadata represents the Snapshot metadata object
type Metadata struct {
- Version uint64
- Storage uint64
+ Version uint64
+ Storage uint64
+ SpecVersion string
}
// Snapshot represents a snapshot struct with metadata and data
@@ -71,7 +73,7 @@ func (s *Snapshot) Meta() Metadata {
func (s *Snapshot) Data() ([]byte, error) {
gz, err := gzip.NewReader(bytes.NewBuffer(s.data))
defer gz.Close()
- data, err := ioutil.ReadAll(gz)
+ data, err := io.ReadAll(gz)
if err != nil {
return nil, fmt.Errorf("error reading gzip data: %w", err)
}
@@ -80,11 +82,12 @@ func (s *Snapshot) Data() ([]byte, error) {
// Encode returns the bytes slice snapshot representation
// Snapshot Layout:
-// |metadata-size|metadata|data|
//
-// metadata-size: uint64 (8 bytes) specifies the amount of metadata bytes
-// metadata: Gob encoded of Metadata struct
-// data: Proxy data, byte slice. The Metadata have information about it, Storage, Gzipped and version.
+// |metadata-size|metadata|data|
+//
+// metadata-size: uint64 (8 bytes) specifies the amount of metadata bytes
+// metadata: Gob encoded of Metadata struct
+// data: Proxy data, byte slice. The Metadata have information about it, Storage, Gzipped and version.
func (s *Snapshot) Encode() ([]byte, error) {
metaBytes, err := metaToBytes(s.meta)
diff --git a/splitio/proxy/initialization.go b/splitio/proxy/initialization.go
index 60a129a4..f4628bd2 100644
--- a/splitio/proxy/initialization.go
+++ b/splitio/proxy/initialization.go
@@ -57,6 +57,10 @@ func Start(logger logging.LoggerInterface, cfg *pconf.Main) error {
return fmt.Errorf("error writing temporary snapshot file: %w", err)
}
+ if snap.Meta().SpecVersion != cfg.FlagSpecVersion {
+ return common.NewInitError(fmt.Errorf("snapshot spec version %s does not match config spec version %s", snap.Meta().SpecVersion, cfg.FlagSpecVersion), common.ExitErrorDB)
+ }
+
logger.Debug("Database created from snapshot at", dbpath)
}
@@ -84,7 +88,7 @@ func Start(logger logging.LoggerInterface, cfg *pconf.Main) error {
// Proxy storages already implement the observable interface, so no need to wrap them
splitStorage := storage.NewProxySplitStorage(dbInstance, logger, flagsets.NewFlagSetFilter(cfg.FlagSetsFilter), cfg.Initialization.Snapshot != "")
- ruleBasedStorage := storage.NewProxyRuleBasedSegmentsStorage(logger)
+ ruleBasedStorage := storage.NewProxyRuleBasedSegmentsStorage(dbInstance, logger, cfg.Initialization.Snapshot != "")
segmentStorage := storage.NewProxySegmentStorage(dbInstance, logger, cfg.Initialization.Snapshot != "")
largeSegmentStorage := inmemory.NewLargeSegmentsStorage()
@@ -208,10 +212,11 @@ func Start(logger logging.LoggerInterface, cfg *pconf.Main) error {
rtm := common.NewRuntime(false, syncManager, logger, "Split Proxy", nil, nil, appMonitor, servicesMonitor)
storages := adminCommon.Storages{
- SplitStorage: splitStorage,
- SegmentStorage: segmentStorage,
- LocalTelemetryStorage: localTelemetryStorage,
- LargeSegmentStorage: largeSegmentStorage,
+ SplitStorage: splitStorage,
+ SegmentStorage: segmentStorage,
+ LocalTelemetryStorage: localTelemetryStorage,
+ LargeSegmentStorage: largeSegmentStorage,
+ RuleBasedSegmentsStorage: ruleBasedStorage,
}
// --------------------------- ADMIN DASHBOARD ------------------------------
diff --git a/splitio/proxy/storage/persistent/common.go b/splitio/proxy/storage/persistent/common.go
new file mode 100644
index 00000000..fdcfa0da
--- /dev/null
+++ b/splitio/proxy/storage/persistent/common.go
@@ -0,0 +1,38 @@
+package persistent
+
+// ChangesItem represents an changesItem service response
+type ChangesItem struct {
+ ChangeNumber int64 `json:"changeNumber"`
+ Name string `json:"name"`
+ Status string `json:"status"`
+ JSON string
+}
+
+// ChangesItem Sortable list
+type ChangesItems struct {
+ items []ChangesItem
+}
+
+func NewChangesItems(size int) ChangesItems {
+ return ChangesItems{
+ items: make([]ChangesItem, 0, size),
+ }
+}
+
+func (c *ChangesItems) Append(item ChangesItem) {
+ c.items = append(c.items, item)
+}
+
+func (c *ChangesItems) Len() int {
+ return len(c.items)
+}
+
+func (c *ChangesItems) Less(i, j int) bool {
+ return c.items[i].ChangeNumber > c.items[j].ChangeNumber
+}
+
+func (c *ChangesItems) Swap(i, j int) {
+ c.items[i], c.items[j] = c.items[j], c.items[i]
+}
+
+//----------------------------------------------------
diff --git a/splitio/proxy/storage/persistent/common_test.go b/splitio/proxy/storage/persistent/common_test.go
new file mode 100644
index 00000000..a92904a6
--- /dev/null
+++ b/splitio/proxy/storage/persistent/common_test.go
@@ -0,0 +1,45 @@
+package persistent
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestChangesItem(t *testing.T) {
+ item := ChangesItem{
+ ChangeNumber: 123,
+ Name: "test_split",
+ Status: "ACTIVE",
+ JSON: `{"name":"test_split","status":"ACTIVE"}`,
+ }
+
+ items := NewChangesItems(2)
+ items.Append(item)
+ items.Append(ChangesItem{
+ ChangeNumber: 124,
+ Name: "another_split",
+ Status: "ARCHIVED",
+ JSON: `{"name":"another_split","status":"ARCHIVED"}`,
+ })
+
+ assert.Equal(t, 2, items.Len(), "Expected length to be 2")
+ assert.Equal(t, "test_split", items.items[0].Name)
+ assert.Equal(t, int64(123), items.items[0].ChangeNumber)
+ assert.Equal(t, "ACTIVE", items.items[0].Status)
+ assert.Equal(t, `{"name":"test_split","status":"ACTIVE"}`, items.items[0].JSON)
+ assert.Equal(t, "another_split", items.items[1].Name)
+ assert.Equal(t, int64(124), items.items[1].ChangeNumber)
+ assert.Equal(t, "ARCHIVED", items.items[1].Status)
+ assert.Equal(t, `{"name":"another_split","status":"ARCHIVED"}`, items.items[1].JSON)
+ assert.True(t, items.Less(1, 0), "Expected item at index 1 to be less than item at index 0")
+ items.Swap(0, 1)
+ assert.Equal(t, "another_split", items.items[0].Name)
+ assert.Equal(t, int64(124), items.items[0].ChangeNumber)
+ assert.Equal(t, "ARCHIVED", items.items[0].Status)
+ assert.Equal(t, `{"name":"another_split","status":"ARCHIVED"}`, items.items[0].JSON)
+ assert.Equal(t, "test_split", items.items[1].Name)
+ assert.Equal(t, int64(123), items.items[1].ChangeNumber)
+ assert.Equal(t, "ACTIVE", items.items[1].Status)
+ assert.Equal(t, `{"name":"test_split","status":"ACTIVE"}`, items.items[1].JSON)
+}
diff --git a/splitio/proxy/storage/persistent/rulebasedsegments.go b/splitio/proxy/storage/persistent/rulebasedsegments.go
new file mode 100644
index 00000000..8361a302
--- /dev/null
+++ b/splitio/proxy/storage/persistent/rulebasedsegments.go
@@ -0,0 +1,108 @@
+package persistent
+
+import (
+ "bytes"
+ "encoding/gob"
+ "encoding/json"
+ "sync"
+
+ "github.com/splitio/go-split-commons/v8/dtos"
+ "github.com/splitio/go-toolkit/v5/logging"
+)
+
+const ruleBasedSegmentsChangesCollectionName = "RULE_BASED_SEGMENTS_CHANGES_COLLECTION"
+
+// RBChangesCollection represents a collection of ChangesItem for rule-based segments
+type RBChangesCollection struct {
+ collection CollectionWrapper
+ changeNumber int64
+ mutex sync.RWMutex
+}
+
+// NewRBChangesCollection returns an instance of RBChangesCollection
+func NewRBChangesCollection(db DBWrapper, logger logging.LoggerInterface) *RBChangesCollection {
+ return &RBChangesCollection{
+ collection: &BoltDBCollectionWrapper{db: db, name: ruleBasedSegmentsChangesCollectionName, logger: logger},
+ changeNumber: 0,
+ }
+}
+
+// Update processes a set of rule based changes items + a changeNumber bump atomically
+func (c *RBChangesCollection) Update(toAdd []dtos.RuleBasedSegmentDTO, toRemove []dtos.RuleBasedSegmentDTO, cn int64) {
+ items := NewChangesItems(len(toAdd) + len(toRemove))
+ process := func(rb *dtos.RuleBasedSegmentDTO) {
+ asJSON, err := json.Marshal(rb)
+ if err != nil {
+ // This should not happen unless the DTO class is broken
+ return
+ }
+ items.Append(ChangesItem{
+ ChangeNumber: rb.ChangeNumber,
+ Name: rb.Name,
+ Status: rb.Status,
+ JSON: string(asJSON),
+ })
+ }
+
+ for _, rb := range toAdd {
+ process(&rb)
+ }
+
+ for _, rb := range toRemove {
+ process(&rb)
+ }
+
+ c.mutex.Lock()
+ defer c.mutex.Unlock()
+ for idx := range items.items {
+ err := c.collection.SaveAs([]byte(items.items[idx].Name), items.items[idx])
+ if err != nil {
+ // TODO(mredolatti): log
+ }
+ }
+ c.changeNumber = cn
+}
+
+// FetchAll return a ChangesItem
+func (c *RBChangesCollection) FetchAll() ([]dtos.RuleBasedSegmentDTO, error) {
+ c.mutex.RLock()
+ defer c.mutex.RUnlock()
+ items, err := c.collection.FetchAll()
+ if err != nil {
+ return nil, err
+ }
+
+ toReturn := make([]dtos.RuleBasedSegmentDTO, 0)
+
+ var decodeBuffer bytes.Buffer
+ for _, v := range items {
+ var q ChangesItem
+ // resets buffer data
+ decodeBuffer.Reset()
+ decodeBuffer.Write(v)
+ dec := gob.NewDecoder(&decodeBuffer)
+
+ errq := dec.Decode(&q)
+ if errq != nil {
+ c.collection.Logger().Error("decode error:", errq, "|", string(v))
+ continue
+ }
+
+ var parsed dtos.RuleBasedSegmentDTO
+ err := json.Unmarshal([]byte(q.JSON), &parsed)
+ if err != nil {
+ c.collection.Logger().Error("error decoding feature flag fetched from db: ", err, "|", q.JSON)
+ continue
+ }
+ toReturn = append(toReturn, parsed)
+ }
+
+ return toReturn, nil
+}
+
+// ChangeNumber returns changeNumber
+func (c *RBChangesCollection) ChangeNumber() int64 {
+ c.mutex.RLock()
+ defer c.mutex.RUnlock()
+ return c.changeNumber
+}
diff --git a/splitio/proxy/storage/persistent/rulebasedsegments_test.go b/splitio/proxy/storage/persistent/rulebasedsegments_test.go
new file mode 100644
index 00000000..512eba75
--- /dev/null
+++ b/splitio/proxy/storage/persistent/rulebasedsegments_test.go
@@ -0,0 +1,60 @@
+package persistent
+
+import (
+ "testing"
+
+ "github.com/splitio/go-split-commons/v8/dtos"
+ "github.com/splitio/go-toolkit/v5/logging"
+)
+
+func TestRBChangesCollection(t *testing.T) {
+ dbw, err := NewBoltWrapper(BoltInMemoryMode, nil)
+ if err != nil {
+ t.Error("error creating bolt wrapper: ", err)
+ }
+
+ logger := logging.NewLogger(nil)
+ rbChangesCollection := NewRBChangesCollection(dbw, logger)
+
+ rbChangesCollection.Update([]dtos.RuleBasedSegmentDTO{
+ {Name: "rb1", ChangeNumber: 1, Status: "ACTIVE"},
+ {Name: "rb2", ChangeNumber: 1, Status: "ACTIVE"},
+ }, nil, 1)
+
+ all, err := rbChangesCollection.FetchAll()
+ if err != nil {
+ t.Error("FetchAll should not return an error. Got: ", err)
+ }
+
+ if len(all) != 2 {
+ t.Error("invalid number of items fetched.")
+ return
+ }
+
+ if all[0].Name != "rb1" || all[1].Name != "rb2" {
+ t.Error("Invalid payload in fetched changes.")
+ }
+
+ if rbChangesCollection.ChangeNumber() != 1 {
+ t.Error("CN should be 1.")
+ }
+
+ rbChangesCollection.Update([]dtos.RuleBasedSegmentDTO{{Name: "rb1", ChangeNumber: 2, Status: "ARCHIVED"}}, nil, 2)
+ all, err = rbChangesCollection.FetchAll()
+ if err != nil {
+ t.Error("FetchAll should not return an error. Got: ", err)
+ }
+
+ if len(all) != 2 {
+ t.Error("invalid number of items fetched.")
+ return
+ }
+
+ if all[0].Name != "rb1" || all[0].Status != "ARCHIVED" {
+ t.Error("rb1 should be archived.")
+ }
+
+ if rbChangesCollection.ChangeNumber() != 2 {
+ t.Error("CN should be 2.")
+ }
+}
diff --git a/splitio/proxy/storage/persistent/splits.go b/splitio/proxy/storage/persistent/splits.go
index c927f676..73cad538 100644
--- a/splitio/proxy/storage/persistent/splits.go
+++ b/splitio/proxy/storage/persistent/splits.go
@@ -12,31 +12,6 @@ import (
const splitChangesCollectionName = "SPLIT_CHANGES_COLLECTION"
-// SplitChangesItem represents an SplitChanges service response
-type SplitChangesItem struct {
- ChangeNumber int64 `json:"changeNumber"`
- Name string `json:"name"`
- Status string `json:"status"`
- JSON string
-}
-
-// SplitsChangesItems Sortable list
-type SplitsChangesItems []SplitChangesItem
-
-func (slice SplitsChangesItems) Len() int {
- return len(slice)
-}
-
-func (slice SplitsChangesItems) Less(i, j int) bool {
- return slice[i].ChangeNumber > slice[j].ChangeNumber
-}
-
-func (slice SplitsChangesItems) Swap(i, j int) {
- slice[i], slice[j] = slice[j], slice[i]
-}
-
-//----------------------------------------------------
-
// SplitChangesCollection represents a collection of SplitChangesItem
type SplitChangesCollection struct {
collection CollectionWrapper
@@ -54,15 +29,14 @@ func NewSplitChangesCollection(db DBWrapper, logger logging.LoggerInterface) *Sp
// Update processes a set of feature flag changes items + a changeNumber bump atomically
func (c *SplitChangesCollection) Update(toAdd []dtos.SplitDTO, toRemove []dtos.SplitDTO, cn int64) {
-
- items := make(SplitsChangesItems, 0, len(toAdd)+len(toRemove))
+ items := NewChangesItems(len(toAdd) + len(toRemove))
process := func(split *dtos.SplitDTO) {
asJSON, err := json.Marshal(split)
if err != nil {
// This should not happen unless the DTO class is broken
return
}
- items = append(items, SplitChangesItem{
+ items.Append(ChangesItem{
ChangeNumber: split.ChangeNumber,
Name: split.Name,
Status: split.Status,
@@ -80,8 +54,8 @@ func (c *SplitChangesCollection) Update(toAdd []dtos.SplitDTO, toRemove []dtos.S
c.mutex.Lock()
defer c.mutex.Unlock()
- for idx := range items {
- err := c.collection.SaveAs([]byte(items[idx].Name), items[idx])
+ for idx := range items.items {
+ err := c.collection.SaveAs([]byte(items.items[idx].Name), items.items[idx])
if err != nil {
// TODO(mredolatti): log
}
@@ -102,7 +76,7 @@ func (c *SplitChangesCollection) FetchAll() ([]dtos.SplitDTO, error) {
var decodeBuffer bytes.Buffer
for _, v := range items {
- var q SplitChangesItem
+ var q ChangesItem
// resets buffer data
decodeBuffer.Reset()
decodeBuffer.Write(v)
diff --git a/splitio/proxy/storage/rulebasedsegments.go b/splitio/proxy/storage/rulebasedsegments.go
index cc8ef03f..4f71d6f1 100644
--- a/splitio/proxy/storage/rulebasedsegments.go
+++ b/splitio/proxy/storage/rulebasedsegments.go
@@ -4,13 +4,15 @@ import (
"fmt"
"sync"
+ "github.com/splitio/split-synchronizer/v5/splitio/proxy/storage/optimized"
+ "github.com/splitio/split-synchronizer/v5/splitio/proxy/storage/persistent"
+
"github.com/splitio/go-split-commons/v8/dtos"
"github.com/splitio/go-split-commons/v8/engine/grammar/constants"
"github.com/splitio/go-split-commons/v8/storage"
"github.com/splitio/go-split-commons/v8/storage/inmemory/mutexmap"
"github.com/splitio/go-toolkit/v5/datastructures/set"
"github.com/splitio/go-toolkit/v5/logging"
- "github.com/splitio/split-synchronizer/v5/splitio/proxy/storage/optimized"
)
// ProxyRuleBasedSegmentsStorage defines the interface of a storage that can be used for serving payloads
@@ -22,6 +24,7 @@ type ProxyRuleBasedSegmentsStorage interface {
// ProxyRuleBasedSegmentsStorageImpl implements the ProxyRuleBasedSegmentsStorage interface and the RuleBasedSegmentProducer interface
type ProxyRuleBasedSegmentsStorageImpl struct {
snapshot mutexmap.RuleBasedSegmentsStorageImpl
+ db *persistent.RBChangesCollection
logger logging.LoggerInterface
oldestKnownCN int64
mtx sync.Mutex
@@ -30,16 +33,21 @@ type ProxyRuleBasedSegmentsStorageImpl struct {
// NewProxyRuleBasedSegmentsStorage instantiates a new proxy storage that wraps an in-memory snapshot of the last known
// flag configuration
-func NewProxyRuleBasedSegmentsStorage(logger logging.LoggerInterface) *ProxyRuleBasedSegmentsStorageImpl {
+func NewProxyRuleBasedSegmentsStorage(db persistent.DBWrapper, logger logging.LoggerInterface, restoreBackup bool) *ProxyRuleBasedSegmentsStorageImpl {
+ disk := persistent.NewRBChangesCollection(db, logger)
snapshot := mutexmap.NewRuleBasedSegmentsStorage()
historic := optimized.NewHistoricRBChanges(1000)
- var initialCN int64 = -1
+ var initialCN int64 = -1
+ if restoreBackup {
+ initialCN = snapshotFromDiskRB(snapshot, historic, disk, logger)
+ }
return &ProxyRuleBasedSegmentsStorageImpl{
snapshot: *snapshot,
+ db: disk,
+ historic: historic,
logger: logger,
oldestKnownCN: initialCN,
- historic: historic,
}
}
@@ -53,6 +61,36 @@ func (p *ProxyRuleBasedSegmentsStorageImpl) sinceIsTooOld(since int64) bool {
return since < p.oldestKnownCN
}
+func snapshotFromDiskRB(
+ dst *mutexmap.RuleBasedSegmentsStorageImpl,
+ historic optimized.HistoricChangesRB,
+ src *persistent.RBChangesCollection,
+ logger logging.LoggerInterface,
+) int64 {
+ all, err := src.FetchAll()
+ if err != nil {
+ logger.Error("error parsing feature flags from snapshot. No data will be available!: ", err)
+ return -1
+ }
+
+ var filtered []dtos.RuleBasedSegmentDTO
+ var cn = src.ChangeNumber()
+ for idx := range all {
+
+ // Make sure the CN matches is at least large as the payloads' max.
+ if thisCN := all[idx].ChangeNumber; thisCN > cn {
+ cn = thisCN
+ }
+ if all[idx].Status == constants.SplitStatusActive {
+ filtered = append(filtered, all[idx])
+ }
+ }
+
+ dst.Update(filtered, nil, cn)
+ historic.Update(filtered, nil, cn)
+ return cn
+}
+
func archivedRBDTOForView(view *optimized.RBView) dtos.RuleBasedSegmentDTO {
return dtos.RuleBasedSegmentDTO{
ChangeNumber: view.LastUpdated,
@@ -162,7 +200,7 @@ func (p *ProxyRuleBasedSegmentsStorageImpl) Update(toAdd []dtos.RuleBasedSegment
p.mtx.Lock()
p.snapshot.Update(toAdd, toRemove, changeNumber)
p.historic.Update(toAdd, toRemove, changeNumber)
- // p.db.Update(toAdd, toRemove, changeNumber)
+ p.db.Update(toAdd, toRemove, changeNumber)
p.mtx.Unlock()
return nil
}
diff --git a/splitio/proxy/storage/rulebasedsegments_test.go b/splitio/proxy/storage/rulebasedsegments_test.go
index 9ce4e95d..8be5a17b 100644
--- a/splitio/proxy/storage/rulebasedsegments_test.go
+++ b/splitio/proxy/storage/rulebasedsegments_test.go
@@ -3,101 +3,72 @@ package storage
import (
"testing"
+ "github.com/splitio/split-synchronizer/v5/splitio/proxy/storage/persistent"
+
"github.com/splitio/go-split-commons/v8/dtos"
"github.com/splitio/go-toolkit/v5/logging"
+
"github.com/stretchr/testify/assert"
)
func TestRBSChangesSince(t *testing.T) {
logger := logging.NewLogger(nil)
- // Initialize storage with some test data
- pss := NewProxyRuleBasedSegmentsStorage(logger)
-
- // Test case 1: since == -1
- {
- initialRuleBaseds := []dtos.RuleBasedSegmentDTO{
- {Name: "rbs1", ChangeNumber: 10, Status: "ACTIVE", TrafficTypeName: "user"},
- {Name: "rbs2", ChangeNumber: 10, Status: "ACTIVE", TrafficTypeName: "user"},
- }
- pss.Update(initialRuleBaseds, nil, 10)
-
- changes, err := pss.ChangesSince(-1)
- assert.Nil(t, err)
- assert.Equal(t, int64(-1), changes.Since)
- assert.Equal(t, int64(10), changes.Till)
- assert.ElementsMatch(t, initialRuleBaseds, changes.RuleBasedSegments)
- }
+ dbw, err := persistent.NewBoltWrapper(persistent.BoltInMemoryMode, nil)
+ assert.Nil(t, err)
+ pss := NewProxyRuleBasedSegmentsStorage(dbw, logger, false)
- // Test case 2: Error when since is too old
- {
- // The storage was initialized with CN 10, so requesting CN 5 should fail
- changes, err := pss.ChangesSince(5)
- assert.Equal(t, ErrSinceParamTooOld, err)
- assert.Nil(t, changes)
+ // From -1
+ rbs := []dtos.RuleBasedSegmentDTO{
+ {Name: "rbs1", ChangeNumber: 10, Status: "ACTIVE", TrafficTypeName: "user"},
+ {Name: "rbs2", ChangeNumber: 10, Status: "ACTIVE", TrafficTypeName: "user"},
}
+ pss.Update(rbs, nil, 10)
+ changes, err := pss.ChangesSince(-1)
+ assert.Nil(t, err)
+ assert.Equal(t, int64(-1), changes.Since)
+ assert.Equal(t, int64(10), changes.Till)
+ assert.ElementsMatch(t, rbs, changes.RuleBasedSegments)
- // Test case 3: Active and archived rule-based segment
- {
- // Add a new rule-based segment and archive an existing one
- toAdd := []dtos.RuleBasedSegmentDTO{{Name: "rbs3", ChangeNumber: 15, Status: "ACTIVE", TrafficTypeName: "user"}}
- toRemove := []dtos.RuleBasedSegmentDTO{
- {
- Name: "rbs2",
- ChangeNumber: 15,
- Status: "ARCHIVED",
- TrafficTypeName: "user",
- Conditions: []dtos.RuleBasedConditionDTO{},
- },
- }
+ changes, err = pss.ChangesSince(5)
+ assert.Equal(t, ErrSinceParamTooOld, err)
+ assert.Nil(t, changes)
- pss.Update(toAdd, toRemove, 15)
-
- changes, err := pss.ChangesSince(10)
- assert.Nil(t, err)
- assert.Equal(t, int64(10), changes.Since)
- assert.Equal(t, int64(15), changes.Till)
-
- // Should include both the new active rule-based segment and the archived one
- expectedRBSs := []dtos.RuleBasedSegmentDTO{
- {
- Name: "rbs2",
- ChangeNumber: 15,
- Status: "ARCHIVED",
- // Note: Archived segments have minimal fields set by archivedRBDTOForView
- },
- {
- Name: "rbs3",
- ChangeNumber: 15,
- Status: "ACTIVE",
- TrafficTypeName: "user",
- Excluded: dtos.ExcludedDTO{
- Keys: nil,
- Segments: nil,
- },
- Conditions: nil,
- },
- }
- assert.ElementsMatch(t, expectedRBSs, changes.RuleBasedSegments)
+ // Add a new rule-based segment and archive an existing one
+ toAdd := []dtos.RuleBasedSegmentDTO{{Name: "rbs3", ChangeNumber: 15, Status: "ACTIVE", TrafficTypeName: "user"}}
+ toRemove := []dtos.RuleBasedSegmentDTO{
+ {
+ Name: "rbs2",
+ ChangeNumber: 15,
+ Status: "ARCHIVED",
+ TrafficTypeName: "user",
+ Conditions: []dtos.RuleBasedConditionDTO{},
+ },
}
+ pss.Update(toAdd, toRemove, 15)
+ changes, err = pss.ChangesSince(10)
+ assert.Nil(t, err)
+ assert.Equal(t, int64(10), changes.Since)
+ assert.Equal(t, int64(15), changes.Till)
- // Test case 4: Proper till calculation with multiple changes
- {
- // Add changes with different change numbers
- changes1 := []dtos.RuleBasedSegmentDTO{{Name: "rbs6", ChangeNumber: 25, Status: "ACTIVE", TrafficTypeName: "user"}}
- changes2 := []dtos.RuleBasedSegmentDTO{{Name: "rbs7", ChangeNumber: 30, Status: "ACTIVE", TrafficTypeName: "user"}}
-
- pss.Update(changes1, nil, 25)
- pss.Update(changes2, nil, 30)
+ // Should include both the new active rule-based segment and the archived one
+ expectedRBSs := []dtos.RuleBasedSegmentDTO{
+ {Name: "rbs2", ChangeNumber: 15, Status: "ARCHIVED"},
+ {Name: "rbs3", ChangeNumber: 15, Status: "ACTIVE", TrafficTypeName: "user"},
+ }
+ assert.ElementsMatch(t, expectedRBSs, changes.RuleBasedSegments)
- changes, err := pss.ChangesSince(20)
- assert.Nil(t, err)
- assert.Equal(t, int64(20), changes.Since)
- assert.Equal(t, int64(30), changes.Till)
- expectedChanges := []dtos.RuleBasedSegmentDTO{
- {Name: "rbs6", ChangeNumber: 25, Status: "ACTIVE", TrafficTypeName: "user"},
- {Name: "rbs7", ChangeNumber: 30, Status: "ACTIVE", TrafficTypeName: "user"},
- }
- assert.ElementsMatch(t, expectedChanges, changes.RuleBasedSegments)
+ changes1 := []dtos.RuleBasedSegmentDTO{{Name: "rbs6", ChangeNumber: 25, Status: "ACTIVE", TrafficTypeName: "user"}}
+ changes2 := []dtos.RuleBasedSegmentDTO{{Name: "rbs7", ChangeNumber: 30, Status: "ACTIVE", TrafficTypeName: "user"}}
+ pss.Update(changes1, nil, 25)
+ pss.Update(changes2, nil, 30)
+ changes, err = pss.ChangesSince(20)
+ assert.Nil(t, err)
+ assert.Equal(t, int64(20), changes.Since)
+ assert.Equal(t, int64(30), changes.Till)
+ expectedChanges := []dtos.RuleBasedSegmentDTO{
+ {Name: "rbs6", ChangeNumber: 25, Status: "ACTIVE", TrafficTypeName: "user"},
+ {Name: "rbs7", ChangeNumber: 30, Status: "ACTIVE", TrafficTypeName: "user"},
}
+ assert.ElementsMatch(t, expectedChanges, changes.RuleBasedSegments)
}