Skip to content

Commit c1fefab

Browse files
committed
feat: cross-replica, OFREP bulk evaluation caching
- sends per-selector ETag for caching - ensures that ETags will be consistent across replicas - skips evaluation entirely if ETag matches Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
1 parent 33774c7 commit c1fefab

10 files changed

Lines changed: 756 additions & 22 deletions

File tree

core/pkg/store/store.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,18 @@ func (s *Store) GetAll(ctx context.Context, selector *Selector) ([]model.Flag, m
206206
return flags, queryMeta, nil
207207
}
208208

209+
// watchSelector returns a channel that will be closed when the flags matching the given selector are modified.
210+
func (s *Store) WatchSelector(selector *Selector) <-chan struct{} {
211+
it, err := s.selectOrAll(selector)
212+
if err != nil {
213+
// return a closed channel on error
214+
ch := make(chan struct{})
215+
close(ch)
216+
return ch
217+
}
218+
return it.WatchCh()
219+
}
220+
209221
type flagIdentifier struct {
210222
flagSetId string
211223
key string

docs/reference/flagd-ofrep.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,10 @@ To evaluate all flags currently configured at flagd, use OFREP bulk evaluation r
2323
curl -X POST 'http://localhost:8016/ofrep/v1/evaluate/flags'
2424
```
2525

26+
## Evaluation Caching
27+
28+
The bulk evaluation endpoint caches responses per selector to avoid redundant evaluations. Clients can use the `If-None-Match` header with a previously received `ETag` to check if the cache is still valid. When the ETag matches, flagd returns the cached response without re-evaluating.
29+
30+
**Important**: The ETag only corresponds the flag configuration version, not the evaluation context. Clients must not send a cached ETag when their evaluation context has changed, otherwise they may receive stale results.
31+
2632
See the [cheat sheet](./cheat-sheet.md#ofrep-api-http) for more OFREP examples including context-sensitive evaluation and selectors.

flagd/cmd/start.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ const (
2323
managementPortFlagName = "management-port"
2424
metricsExporter = "metrics-exporter"
2525
ofrepPortFlagName = "ofrep-port"
26+
ofrepCacheCapacityFlagName = "ofrep-cache-capacity"
2627
otelCollectorURI = "otel-collector-uri"
2728
otelCertPathFlagName = "otel-cert-path"
2829
otelKeyPathFlagName = "otel-key-path"
@@ -52,6 +53,8 @@ func init() {
5253
flags.Int32P(portFlagName, "p", 8013, "Port to listen on")
5354
flags.Int32P(syncPortFlagName, "g", 8015, "gRPC Sync port")
5455
flags.Int32P(ofrepPortFlagName, "r", 8016, "ofrep service port")
56+
flags.Int32(ofrepCacheCapacityFlagName, 100,
57+
"Max number of selectors to cache for OFREP bulk evaluation ETags (0 = unlimited)")
5558

5659
flags.StringP(socketPathFlagName, "d", "", "Flagd unix socket path. "+
5760
"With grpc the evaluations service will become available on this address. "+
@@ -113,6 +116,7 @@ func bindFlags(flags *pflag.FlagSet) {
113116
_ = viper.BindPFlag(syncPortFlagName, flags.Lookup(syncPortFlagName))
114117
_ = viper.BindPFlag(syncSocketPathFlagName, flags.Lookup(syncSocketPathFlagName))
115118
_ = viper.BindPFlag(ofrepPortFlagName, flags.Lookup(ofrepPortFlagName))
119+
_ = viper.BindPFlag(ofrepCacheCapacityFlagName, flags.Lookup(ofrepCacheCapacityFlagName))
116120
_ = viper.BindPFlag(contextValueFlagName, flags.Lookup(contextValueFlagName))
117121
_ = viper.BindPFlag(headerToContextKeyFlagName, flags.Lookup(headerToContextKeyFlagName))
118122
_ = viper.BindPFlag(streamDeadlineFlagName, flags.Lookup(streamDeadlineFlagName))
@@ -177,6 +181,7 @@ var startCmd = &cobra.Command{
177181
MetricExporter: viper.GetString(metricsExporter),
178182
ManagementPort: viper.GetUint16(managementPortFlagName),
179183
OfrepServicePort: viper.GetUint16(ofrepPortFlagName),
184+
OfrepCacheCapacity: viper.GetInt(ofrepCacheCapacityFlagName),
180185
OtelCollectorURI: viper.GetString(otelCollectorURI),
181186
OtelCertPath: viper.GetString(otelCertPathFlagName),
182187
OtelKeyPath: viper.GetString(otelKeyPathFlagName),

flagd/pkg/runtime/from_config.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type Config struct {
2727
MetricExporter string
2828
ManagementPort uint16
2929
OfrepServicePort uint16
30+
OfrepCacheCapacity int
3031
OtelCollectorURI string
3132
OtelCertPath string
3233
OtelKeyPath string
@@ -104,10 +105,11 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
104105
recorder)
105106

106107
// ofrep service
107-
ofrepService, err := ofrep.NewOfrepService(jsonEvaluator, config.CORS, ofrep.SvcConfiguration{
108-
Logger: logger.WithFields(zap.String("component", "OFREPService")),
109-
Port: config.OfrepServicePort,
110-
ServiceName: svcName,
108+
ofrepService, err := ofrep.NewOfrepService(jsonEvaluator, store, config.CORS, ofrep.SvcConfiguration{
109+
Logger: logger.WithFields(zap.String("component", "OFREPService")),
110+
Port: config.OfrepServicePort,
111+
CacheCapacity: config.OfrepCacheCapacity,
112+
ServiceName: svcName,
111113
MetricsRecorder: recorder,
112114
},
113115
config.ContextValues,

flagd/pkg/service/flag-evaluation/ofrep/handler.go

Lines changed: 66 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,26 @@ import (
1919
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
2020
"go.opentelemetry.io/otel"
2121
"go.opentelemetry.io/otel/trace"
22+
"go.uber.org/zap"
2223
)
2324

25+
// ISelectorVersionTracker defines the interface for selector version tracking.
26+
// it enables ETag-based cache validation for bulk evaluations.
27+
type ISelectorVersionTracker interface {
28+
// ETag returns the current ETag for a selector, or empty if not tracked
29+
ETag(selectorExpression string) string
30+
// Track starts tracking a selector and returns its initial ETag
31+
Track(selectorExpression string) string
32+
}
33+
2434
const (
25-
key = "key"
26-
singleEvaluation = "/ofrep/v1/evaluate/flags/{key}"
27-
bulkEvaluation = "/ofrep/v1/evaluate/{path:flags\\/|flags}"
35+
key = "key"
36+
singleEvaluation = "/ofrep/v1/evaluate/flags/{key}"
37+
bulkEvaluation = "/ofrep/v1/evaluate/{path:flags\\/|flags}"
38+
headerETag = "ETag"
39+
headerIfNoneMatch = "If-None-Match"
40+
headerContentType = "Content-Type"
41+
contentTypeJSON = "application/json"
2842
)
2943

3044
type handler struct {
@@ -33,6 +47,7 @@ type handler struct {
3347
contextValues map[string]any
3448
headerToContextKeyMappings map[string]string
3549
tracer trace.Tracer
50+
versionTracker ISelectorVersionTracker
3651
}
3752

3853
func NewOfrepHandler(
@@ -42,13 +57,15 @@ func NewOfrepHandler(
4257
headerToContextKeyMappings map[string]string,
4358
metricsRecorder telemetry.IMetricsRecorder,
4459
serviceName string,
60+
versionTracker ISelectorVersionTracker,
4561
) http.Handler {
4662
h := handler{
4763
Logger: logger,
4864
evaluator: evaluator,
4965
contextValues: contextValues,
5066
headerToContextKeyMappings: headerToContextKeyMappings,
5167
tracer: otel.Tracer("flagd.ofrep.v1"),
68+
versionTracker: versionTracker,
5269
}
5370

5471
router := mux.NewRouter()
@@ -117,6 +134,19 @@ func (h *handler) HandleBulkEvaluation(w http.ResponseWriter, r *http.Request) {
117134

118135
evaluationContext := flagdContext(h.Logger, requestID, request, h.contextValues, r.Header, h.headerToContextKeyMappings)
119136
selectorExpression := r.Header.Get(service.FLAGD_SELECTOR_HEADER)
137+
138+
// check if client's ETag matches current version - return 304 if unchanged
139+
ifNoneMatch := r.Header.Get(headerIfNoneMatch)
140+
if h.versionTracker != nil && ifNoneMatch != "" {
141+
currentETag := h.versionTracker.ETag(selectorExpression)
142+
if currentETag != "" && ifNoneMatch == currentETag {
143+
h.Logger.Debug(fmt.Sprintf("ETag match for selector '%s', returning 304", selectorExpression))
144+
w.Header().Add(headerETag, currentETag)
145+
w.WriteHeader(http.StatusNotModified)
146+
return
147+
}
148+
}
149+
120150
selector := store.NewSelector(selectorExpression)
121151
ctx := context.WithValue(r.Context(), store.SelectorContextKey{}, selector)
122152

@@ -128,7 +158,36 @@ func (h *handler) HandleBulkEvaluation(w http.ResponseWriter, r *http.Request) {
128158
fmt.Sprintf("Bulk evaluation failed. Tracking ID: %s", requestID))
129159
h.writeJSONToResponse(http.StatusInternalServerError, res, w)
130160
} else {
131-
h.writeJSONToResponse(http.StatusOK, ofrep.BulkEvaluationResponseFrom(evaluations, metadata), w)
161+
response := ofrep.BulkEvaluationResponseFrom(evaluations, metadata)
162+
h.writeBulkEvaluationResponse(w, r, selectorExpression, response)
163+
}
164+
}
165+
166+
// writes the bulk evaluation response with ETag support
167+
func (h *handler) writeBulkEvaluationResponse(w http.ResponseWriter, _ *http.Request, selectorExpression string, response ofrep.BulkEvaluationResponse) {
168+
// marshal the response
169+
body, err := json.Marshal(response)
170+
if err != nil {
171+
h.Logger.Warn("error marshalling response", zap.Error(err))
172+
w.WriteHeader(http.StatusInternalServerError)
173+
return
174+
}
175+
176+
// track this selector and get ETag for response
177+
var eTag string
178+
if h.versionTracker != nil {
179+
eTag = h.versionTracker.Track(selectorExpression)
180+
}
181+
182+
// write response with ETag
183+
w.Header().Add(headerContentType, contentTypeJSON)
184+
if eTag != "" {
185+
w.Header().Add(headerETag, eTag)
186+
}
187+
w.WriteHeader(http.StatusOK)
188+
_, err = w.Write(body)
189+
if err != nil {
190+
h.Logger.Warn("error while writing response", zap.Error(err))
132191
}
133192
}
134193

@@ -137,16 +196,16 @@ func (h *handler) writeJSONToResponse(status int, payload interface{}, w http.Re
137196
marshal, err := json.Marshal(payload)
138197
if err != nil {
139198
// always a 500
140-
h.Logger.Warn(fmt.Sprintf("error marshelling the response: %v", err))
199+
h.Logger.Warn("error marshalling the response", zap.Error(err))
141200
w.WriteHeader(http.StatusInternalServerError)
142201
return
143202
}
144203

145-
w.Header().Add("Content-Type", "application/json")
204+
w.Header().Add(headerContentType, contentTypeJSON)
146205
w.WriteHeader(status)
147206
_, err = w.Write(marshal)
148207
if err != nil {
149-
h.Logger.Warn(fmt.Sprintf("error while writing response: %v", err))
208+
h.Logger.Warn("error while writing response", zap.Error(err))
150209
}
151210
}
152211

0 commit comments

Comments
 (0)