Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 129 additions & 108 deletions internal/datastore/persistent_data_store_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package datastore
import (
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/launchdarkly/go-sdk-common/v3/ldlog"
Expand All @@ -17,11 +18,17 @@ import (
)

// persistentDataStoreWrapper is the implementation of DataStore that we use for all persistent data stores.
//
// The cache is held behind an atomic.Pointer so that DropCache can swap it out
// for nil without blocking readers. Each method that touches the cache calls
// Load once at the top and works with that local snapshot for the rest of the
// call -- so a concurrent drop can never observe a half-modified cache, and
// readers never block on a mutex.
type persistentDataStoreWrapper struct {
core subsystems.PersistentDataStore
dataStoreUpdates subsystems.DataStoreUpdateSink
statusPoller *dataStoreStatusPoller
cache *cache.Cache
cache atomic.Pointer[cache.Cache]
cacheTTL time.Duration
requests singleflight.Group
loggers ldlog.Loggers
Expand All @@ -39,26 +46,24 @@ func NewPersistentDataStoreWrapper(
cacheTTL time.Duration,
loggers ldlog.Loggers,
) subsystems.DataStore {
var myCache *cache.Cache
if cacheTTL != 0 {
myCache = cache.New(cacheTTL, 5*time.Minute)
// Note that the documented behavior of go-cache is that if cacheTTL is negative, the
// cache never expires. That is consistent with we've defined the parameter.
}

w := &persistentDataStoreWrapper{
core: core,
dataStoreUpdates: dataStoreUpdates,
cache: myCache,
cacheTTL: cacheTTL,
loggers: loggers,
}

if cacheTTL != 0 {
// Note that the documented behavior of go-cache is that if cacheTTL is negative, the
// cache never expires. That is consistent with how we've defined the parameter.
w.cache.Store(cache.New(cacheTTL, 5*time.Minute))
}

w.statusPoller = newDataStoreStatusPoller(
true,
w.pollAvailabilityAfterOutage,
dataStoreUpdates.UpdateStatus,
myCache == nil || cacheTTL > 0, // needsRefresh=true unless we're in infinite cache mode
cacheTTL >= 0, // needsRefresh=true unless we're in infinite cache mode (cacheTTL < 0)
loggers,
)

Expand All @@ -67,10 +72,11 @@ func NewPersistentDataStoreWrapper(

func (w *persistentDataStoreWrapper) Init(allData []st.Collection) error {
err := w.initCore(allData)
if w.cache != nil {
w.cache.Flush()
c := w.cache.Load()
if c != nil {
c.Flush()
}
if err != nil && !w.hasInfiniteCache() {
if err != nil && (c == nil || w.cacheTTL >= 0) {
// If the underlying store failed to do the update, and we've got an expiring cache, then:
// 1) We shouldn't update the cache, and
// 2) We shouldn't be considered initialized.
Expand All @@ -80,9 +86,9 @@ func (w *persistentDataStoreWrapper) Init(allData []st.Collection) error {
}
// However, if the cache TTL is infinite, then it makes sense to update the cache regardless of the
// initialization result of the underlying store.
if w.cache != nil {
if c != nil {
for _, coll := range allData {
w.cacheItems(coll.Kind, coll.Items)
cacheCollection(c, coll.Kind, coll.Items)
}
}
w.initLock.Lock()
Expand All @@ -92,25 +98,29 @@ func (w *persistentDataStoreWrapper) Init(allData []st.Collection) error {
}

func (w *persistentDataStoreWrapper) Get(kind st.DataKind, key string) (st.ItemDescriptor, error) {
if w.cache == nil {
c := w.cache.Load()
if c == nil {
item, err := w.getAndDeserializeItem(kind, key)
w.processError(err)
return item, err
}
cacheKey := dataStoreCacheKey(kind, key)
if data, present := w.cache.Get(cacheKey); present {
if data, present := c.Get(cacheKey); present {
if item, ok := data.(st.ItemDescriptor); ok {
return item, nil
}
}
// Item was not cached or cached value was not valid. Use singleflight to ensure that we'll only
// do this core query once even if multiple goroutines are requesting it
// do this core query once even if multiple goroutines are requesting it.
reqKey := fmt.Sprintf("get:%s:%s", kind.GetName(), key)
itemIntf, err, _ := w.requests.Do(reqKey, func() (interface{}, error) {
item, err := w.getAndDeserializeItem(kind, key)
w.processError(err)
if err == nil {
w.cache.Set(cacheKey, item, cache.DefaultExpiration)
// Re-load in case the cache was dropped while we were waiting on the core.
if c := w.cache.Load(); c != nil {
c.Set(cacheKey, item, cache.DefaultExpiration)
}
return item, nil
}
return nil, err
Expand All @@ -127,26 +137,28 @@ func (w *persistentDataStoreWrapper) Get(kind st.DataKind, key string) (st.ItemD
}

func (w *persistentDataStoreWrapper) GetAll(kind st.DataKind) ([]st.KeyedItemDescriptor, error) {
if w.cache == nil {
c := w.cache.Load()
if c == nil {
items, err := w.getAllAndDeserialize(kind)
w.processError(err)
return items, err
}
// Check whether we have a cache item for the entire data set
cacheKey := dataStoreAllItemsCacheKey(kind)
if data, present := w.cache.Get(cacheKey); present {
if data, present := c.Get(cacheKey); present {
if items, ok := data.([]st.KeyedItemDescriptor); ok {
return items, nil
}
}
// Data set was not cached or cached value was not valid. Use singleflight to ensure that we'll only
// do this core query once even if multiple goroutines are requesting it
// do this core query once even if multiple goroutines are requesting it.
reqKey := fmt.Sprintf("all:%s", kind.GetName())
itemsIntf, err, _ := w.requests.Do(reqKey, func() (interface{}, error) {
items, err := w.getAllAndDeserialize(kind)
w.processError(err)
if err == nil {
w.cache.Set(cacheKey, items, cache.DefaultExpiration)
if c := w.cache.Load(); c != nil {
c.Set(cacheKey, items, cache.DefaultExpiration)
}
return items, nil
}
return nil, err
Expand All @@ -170,55 +182,57 @@ func (w *persistentDataStoreWrapper) Upsert(
serializedItem := w.serialize(kind, newItem)
updated, err := w.core.Upsert(kind, key, serializedItem)
w.processError(err)
// Normally, if the underlying store failed to do the update, we do not want to update the cache -
// the idea being that it's better to stay in a consistent state of having old data than to act
// like we have new data but then suddenly fall back to old data when the cache expires. However,
// if the cache TTL is infinite, then it makes sense to update the cache always.
if err != nil {
if !w.hasInfiniteCache() {
return updated, err
}

c := w.cache.Load()
infinite := w.cacheTTL < 0

// Normally, if the underlying store failed to do the update, we do not want to update the cache:
// it's better to stay in a consistent state of having old data than to act like we have new data
// but then suddenly fall back to old data when the cache expires. The exception is infinite-TTL
// mode, where we keep the cache in sync regardless so it can repopulate the store after a recovered
// outage.
if err != nil && (c == nil || !infinite) {
return updated, err
}
if w.cache != nil {
cacheKey := dataStoreCacheKey(kind, key)
allCacheKey := dataStoreAllItemsCacheKey(kind)
if err == nil {
if updated {
w.cache.Set(cacheKey, newItem, cache.DefaultExpiration)
// If the cache has a finite TTL, then we should remove the "all items" cache entry to force
// a reread the next time All is called. However, if it's an infinite TTL, we need to just
// update the item within the existing "all items" entry (since we want things to still work
// even if the underlying store is unavailable).
if w.hasInfiniteCache() {
if data, present := w.cache.Get(allCacheKey); present {
if items, ok := data.([]st.KeyedItemDescriptor); ok {
w.cache.Set(allCacheKey, updateSingleItem(items, key, newItem), cache.DefaultExpiration)
}
if c == nil {
return updated, err
}

cacheKey := dataStoreCacheKey(kind, key)
allCacheKey := dataStoreAllItemsCacheKey(kind)

if err == nil {
if updated {
c.Set(cacheKey, newItem, cache.DefaultExpiration)
// Finite TTL: drop the "all items" entry to force a reread next time GetAll is called.
// Infinite TTL: update the entry in place so things still work if the store is unavailable.
if infinite {
if data, present := c.Get(allCacheKey); present {
if items, ok := data.([]st.KeyedItemDescriptor); ok {
c.Set(allCacheKey, updateSingleItem(items, key, newItem), cache.DefaultExpiration)
}
} else {
w.cache.Delete(allCacheKey)
}
} else {
// there was a concurrent modification elsewhere - update the cache to get the new state
w.cache.Delete(cacheKey)
w.cache.Delete(allCacheKey)
_, _ = w.Get(kind, key) // doing this query repopulates the cache
c.Delete(allCacheKey)
}
} else {
// The underlying store returned an error. If the cache has an infinite TTL, then we should go
// ahead and update the cache so that it always has the latest data; we may be able to use the
// cached data to repopulate the store later if it starts working again.
if w.hasInfiniteCache() {
w.cache.Set(cacheKey, newItem, cache.DefaultExpiration)
cachedItems := []st.KeyedItemDescriptor{}
if data, present := w.cache.Get(allCacheKey); present {
if items, ok := data.([]st.KeyedItemDescriptor); ok {
cachedItems = items
}
}
w.cache.Set(allCacheKey, updateSingleItem(cachedItems, key, newItem), cache.DefaultExpiration)
// Concurrent modification elsewhere -- drop our cached values and refetch.
c.Delete(cacheKey)
c.Delete(allCacheKey)
_, _ = w.Get(kind, key) // doing this query repopulates the cache
}
} else {
// err != nil and infinite cache mode (we already returned for the !infinite case).
// Update the cache so it always has the latest data; we may be able to use it to repopulate
// the store later if it starts working again.
c.Set(cacheKey, newItem, cache.DefaultExpiration)
cachedItems := []st.KeyedItemDescriptor{}
if data, present := c.Get(allCacheKey); present {
if items, ok := data.([]st.KeyedItemDescriptor); ok {
cachedItems = items
}
}
c.Set(allCacheKey, updateSingleItem(cachedItems, key, newItem), cache.DefaultExpiration)
}
return updated, err
}
Expand All @@ -231,22 +245,23 @@ func (w *persistentDataStoreWrapper) IsInitialized() bool {
return true
}

if w.cache != nil {
if _, found := w.cache.Get(initCheckedKey); found {
c := w.cache.Load()
if c != nil {
if _, found := c.Get(initCheckedKey); found {
return false
}
}

newValue := w.core.IsInitialized()
if newValue {
w.initLock.Lock()
defer w.initLock.Unlock()
w.inited = true
if w.cache != nil {
w.cache.Delete(initCheckedKey)
w.initLock.Unlock()
if c != nil {
c.Delete(initCheckedKey)
}
} else if w.cache != nil {
w.cache.Set(initCheckedKey, "", cache.DefaultExpiration)
} else if c != nil {
c.Set(initCheckedKey, "", cache.DefaultExpiration)
}
return newValue
}
Expand All @@ -256,46 +271,55 @@ func (w *persistentDataStoreWrapper) IsStatusMonitoringEnabled() bool {
}

func (w *persistentDataStoreWrapper) Close() error {
w.DropCache()
w.statusPoller.Close()
return w.core.Close()
}

// DropCache flushes and releases the in-memory cache. Called once the FDv2
// in-memory store has been initialized and is the source of truth. Safe to
// call multiple times.
func (w *persistentDataStoreWrapper) DropCache() {
if c := w.cache.Swap(nil); c != nil {
c.Flush()
w.loggers.Debug("Persistent store cache dropped; in-memory store is now active")
}
}

func (w *persistentDataStoreWrapper) pollAvailabilityAfterOutage() bool {
if !w.core.IsStoreAvailable() {
return false
}
if w.hasInfiniteCache() {
// If we're in infinite cache mode, then we can assume the cache has a full set of current
// flag data (since presumably the data source has still been running) and we can just
// write the contents of the cache to the underlying data store.
kinds := datakinds.AllDataKinds()
allData := make([]st.Collection, 0, len(kinds))
for _, kind := range kinds {
allCacheKey := dataStoreAllItemsCacheKey(kind)
if data, present := w.cache.Get(allCacheKey); present {
if items, ok := data.([]st.KeyedItemDescriptor); ok {
allData = append(allData, st.Collection{Kind: kind, Items: items})
}
c := w.cache.Load()
if c == nil || w.cacheTTL >= 0 {
// Either we have no cache or the cache is finite-TTL. In either case there's nothing
// useful to write back to the store from the cache.
return true
}
// Infinite-cache mode: assume the cache has a full set of current flag data (since the
// data source has been running) and write the contents back to the underlying data store.
kinds := datakinds.AllDataKinds()
allData := make([]st.Collection, 0, len(kinds))
for _, kind := range kinds {
allCacheKey := dataStoreAllItemsCacheKey(kind)
if data, present := c.Get(allCacheKey); present {
if items, ok := data.([]st.KeyedItemDescriptor); ok {
allData = append(allData, st.Collection{Kind: kind, Items: items})
}
}
err := w.initCore(allData)
if err != nil {
// We failed to write the cached data to the underlying store. In this case,
// w.initCore() has already put us back into the failed state. The only further
// thing we can do is to log a note about what just happened.
w.loggers.Errorf("Tried to write cached data to persistent store after a store outage, but failed: %s", err)
} else {
w.loggers.Warn("Successfully updated persistent store from cached data")
// Note that w.inited should have already been set when InitInternal was originally called -
// in infinite cache mode, we set it even if the database update failed.
}
}
err := w.initCore(allData)
if err != nil {
// initCore has already put us back into the failed state. Just log a note.
w.loggers.Errorf("Tried to write cached data to persistent store after a store outage, but failed: %s", err)
} else {
w.loggers.Warn("Successfully updated persistent store from cached data")
// Note that w.inited should have already been set when InitInternal was originally called -
// in infinite cache mode, we set it even if the database update failed.
}
return true
}

func (w *persistentDataStoreWrapper) hasInfiniteCache() bool {
return w.cache != nil && w.cacheTTL < 0
}
func dataStoreCacheKey(kind st.DataKind, key string) string {
return kind.GetName() + ":" + key
}
Expand Down Expand Up @@ -346,17 +370,14 @@ func (w *persistentDataStoreWrapper) getAllAndDeserialize(
return nil, err
}

func (w *persistentDataStoreWrapper) cacheItems(
kind st.DataKind,
items []st.KeyedItemDescriptor,
) {
if w.cache != nil {
copyOfItems := slices.Clone(items)
w.cache.Set(dataStoreAllItemsCacheKey(kind), copyOfItems, cache.DefaultExpiration)
// cacheCollection writes a kind's items into the given cache snapshot.
// Caller is responsible for handling a nil cache.
func cacheCollection(c *cache.Cache, kind st.DataKind, items []st.KeyedItemDescriptor) {
copyOfItems := slices.Clone(items)
c.Set(dataStoreAllItemsCacheKey(kind), copyOfItems, cache.DefaultExpiration)

for _, item := range items {
w.cache.Set(dataStoreCacheKey(kind, item.Key), item.Item, cache.DefaultExpiration)
}
for _, item := range items {
c.Set(dataStoreCacheKey(kind, item.Key), item.Item, cache.DefaultExpiration)
}
}

Expand Down
Loading
Loading