Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions core/pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,18 @@ func (s *Store) GetAll(ctx context.Context, selector *Selector) ([]model.Flag, m
return flags, queryMeta, nil
}

// watchSelector returns a channel that will be closed when the flags matching the given selector are modified.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, we "hook in" to memdb's listener for the selector, and use it to invalidate the cache.

func (s *Store) WatchSelector(selector *Selector) <-chan struct{} {
it, err := s.selectOrAll(selector)
if err != nil {
// return a closed channel on error
ch := make(chan struct{})
close(ch)
return ch
}
return it.WatchCh()
}

type flagIdentifier struct {
flagSetId string
key string
Expand Down
1 change: 1 addition & 0 deletions docs/reference/flagd-cli/flagd_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ flagd start [flags]
-z, --log-format string Set the logging format, e.g. console or json (default "console")
-m, --management-port int32 Port for management operations (default 8014)
-t, --metrics-exporter string Set the metrics exporter. Default(if unset) is Prometheus. Can be override to otel - OpenTelemetry metric exporter. Overriding to otel require otelCollectorURI to be present
--ofrep-cache-capacity int32 Max number of selectors to cache for OFREP bulk evaluation ETags (0 = unlimited) (default 100)
-r, --ofrep-port int32 ofrep service port (default 8016)
-A, --otel-ca-path string tls certificate authority path to use with OpenTelemetry collector
-D, --otel-cert-path string tls certificate path to use with OpenTelemetry collector
Expand Down
6 changes: 6 additions & 0 deletions docs/reference/flagd-ofrep.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,10 @@ To evaluate all flags currently configured at flagd, use OFREP bulk evaluation r
curl -X POST 'http://localhost:8016/ofrep/v1/evaluate/flags'
```

## Evaluation Caching

The bulk evaluation endpoint caches responses per selector to avoid redundant evaluations. Clients can use the `If-None-Match` header with a previously received `ETag` to check if the cache is still valid. When the ETag matches, flagd returns the cached response without re-evaluating.

**Important**: The ETag only corresponds the flag configuration version, not the evaluation context. Clients must not send a cached ETag when their evaluation context has changed, otherwise they may receive stale results.

See the [cheat sheet](./cheat-sheet.md#ofrep-api-http) for more OFREP examples including context-sensitive evaluation and selectors.
5 changes: 5 additions & 0 deletions flagd/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
managementPortFlagName = "management-port"
metricsExporter = "metrics-exporter"
ofrepPortFlagName = "ofrep-port"
ofrepCacheCapacityFlagName = "ofrep-cache-capacity"
otelCollectorURI = "otel-collector-uri"
otelCertPathFlagName = "otel-cert-path"
otelKeyPathFlagName = "otel-key-path"
Expand Down Expand Up @@ -52,6 +53,8 @@ func init() {
flags.Int32P(portFlagName, "p", 8013, "Port to listen on")
flags.Int32P(syncPortFlagName, "g", 8015, "gRPC Sync port")
flags.Int32P(ofrepPortFlagName, "r", 8016, "ofrep service port")
flags.Int32(ofrepCacheCapacityFlagName, 100,
"Max number of selectors to cache for OFREP bulk evaluation ETags (0 = unlimited)")

flags.StringP(socketPathFlagName, "d", "", "Flagd unix socket path. "+
"With grpc the evaluations service will become available on this address. "+
Expand Down Expand Up @@ -113,6 +116,7 @@ func bindFlags(flags *pflag.FlagSet) {
_ = viper.BindPFlag(syncPortFlagName, flags.Lookup(syncPortFlagName))
_ = viper.BindPFlag(syncSocketPathFlagName, flags.Lookup(syncSocketPathFlagName))
_ = viper.BindPFlag(ofrepPortFlagName, flags.Lookup(ofrepPortFlagName))
_ = viper.BindPFlag(ofrepCacheCapacityFlagName, flags.Lookup(ofrepCacheCapacityFlagName))
_ = viper.BindPFlag(contextValueFlagName, flags.Lookup(contextValueFlagName))
_ = viper.BindPFlag(headerToContextKeyFlagName, flags.Lookup(headerToContextKeyFlagName))
_ = viper.BindPFlag(streamDeadlineFlagName, flags.Lookup(streamDeadlineFlagName))
Expand Down Expand Up @@ -177,6 +181,7 @@ var startCmd = &cobra.Command{
MetricExporter: viper.GetString(metricsExporter),
ManagementPort: viper.GetUint16(managementPortFlagName),
OfrepServicePort: viper.GetUint16(ofrepPortFlagName),
OfrepCacheCapacity: viper.GetInt(ofrepCacheCapacityFlagName),
OtelCollectorURI: viper.GetString(otelCollectorURI),
OtelCertPath: viper.GetString(otelCertPathFlagName),
OtelKeyPath: viper.GetString(otelKeyPathFlagName),
Expand Down
10 changes: 6 additions & 4 deletions flagd/pkg/runtime/from_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Config struct {
MetricExporter string
ManagementPort uint16
OfrepServicePort uint16
OfrepCacheCapacity int
OtelCollectorURI string
OtelCertPath string
OtelKeyPath string
Expand Down Expand Up @@ -104,10 +105,11 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
recorder)

// ofrep service
ofrepService, err := ofrep.NewOfrepService(jsonEvaluator, config.CORS, ofrep.SvcConfiguration{
Logger: logger.WithFields(zap.String("component", "OFREPService")),
Port: config.OfrepServicePort,
ServiceName: svcName,
ofrepService, err := ofrep.NewOfrepService(jsonEvaluator, store, config.CORS, ofrep.SvcConfiguration{
Logger: logger.WithFields(zap.String("component", "OFREPService")),
Port: config.OfrepServicePort,
CacheCapacity: config.OfrepCacheCapacity,
ServiceName: svcName,
MetricsRecorder: recorder,
},
config.ContextValues,
Expand Down
73 changes: 66 additions & 7 deletions flagd/pkg/service/flag-evaluation/ofrep/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,26 @@ import (
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

// ISelectorVersionTracker defines the interface for selector version tracking.
// it enables ETag-based cache validation for bulk evaluations.
type ISelectorVersionTracker interface {
// ETag returns the current ETag for a selector, or empty if not tracked
ETag(selectorExpression string) string
// Track starts tracking a selector and returns its initial ETag
Track(selectorExpression string) string
}

const (
key = "key"
singleEvaluation = "/ofrep/v1/evaluate/flags/{key}"
bulkEvaluation = "/ofrep/v1/evaluate/{path:flags\\/|flags}"
key = "key"
singleEvaluation = "/ofrep/v1/evaluate/flags/{key}"
bulkEvaluation = "/ofrep/v1/evaluate/{path:flags\\/|flags}"
headerETag = "ETag"
headerIfNoneMatch = "If-None-Match"
headerContentType = "Content-Type"
contentTypeJSON = "application/json"
)

type handler struct {
Expand All @@ -33,6 +47,7 @@ type handler struct {
contextValues map[string]any
headerToContextKeyMappings map[string]string
tracer trace.Tracer
versionTracker ISelectorVersionTracker
}

func NewOfrepHandler(
Expand All @@ -42,13 +57,15 @@ func NewOfrepHandler(
headerToContextKeyMappings map[string]string,
metricsRecorder telemetry.IMetricsRecorder,
serviceName string,
versionTracker ISelectorVersionTracker,
) http.Handler {
h := handler{
Logger: logger,
evaluator: evaluator,
contextValues: contextValues,
headerToContextKeyMappings: headerToContextKeyMappings,
tracer: otel.Tracer("flagd.ofrep.v1"),
versionTracker: versionTracker,
}

router := mux.NewRouter()
Expand Down Expand Up @@ -117,6 +134,19 @@ func (h *handler) HandleBulkEvaluation(w http.ResponseWriter, r *http.Request) {

evaluationContext := flagdContext(h.Logger, requestID, request, h.contextValues, r.Header, h.headerToContextKeyMappings)
selectorExpression := r.Header.Get(service.FLAGD_SELECTOR_HEADER)

// check if client's ETag matches current version - return 304 if unchanged
ifNoneMatch := r.Header.Get(headerIfNoneMatch)
if h.versionTracker != nil && ifNoneMatch != "" {
currentETag := h.versionTracker.ETag(selectorExpression)
if currentETag != "" && ifNoneMatch == currentETag {
h.Logger.Debug("ETag match, returning 304", zap.String("selector", selectorExpression))
w.Header().Add(headerETag, currentETag)
w.WriteHeader(http.StatusNotModified)
return
}
}

selector := store.NewSelector(selectorExpression)
ctx := context.WithValue(r.Context(), store.SelectorContextKey{}, selector)

Expand All @@ -128,7 +158,36 @@ func (h *handler) HandleBulkEvaluation(w http.ResponseWriter, r *http.Request) {
fmt.Sprintf("Bulk evaluation failed. Tracking ID: %s", requestID))
h.writeJSONToResponse(http.StatusInternalServerError, res, w)
} else {
h.writeJSONToResponse(http.StatusOK, ofrep.BulkEvaluationResponseFrom(evaluations, metadata), w)
response := ofrep.BulkEvaluationResponseFrom(evaluations, metadata)
h.writeBulkEvaluationResponse(w, r, selectorExpression, response)
}
}

// writes the bulk evaluation response with ETag support
func (h *handler) writeBulkEvaluationResponse(w http.ResponseWriter, _ *http.Request, selectorExpression string, response ofrep.BulkEvaluationResponse) {
// marshal the response
body, err := json.Marshal(response)
if err != nil {
h.Logger.Warn("error marshalling response", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
return
}

// track this selector and get ETag for response
var eTag string
if h.versionTracker != nil {
eTag = h.versionTracker.Track(selectorExpression)
}

// write response with ETag
w.Header().Add(headerContentType, contentTypeJSON)
if eTag != "" {
w.Header().Add(headerETag, eTag)
}
w.WriteHeader(http.StatusOK)
_, err = w.Write(body)
if err != nil {
h.Logger.Warn("error while writing response", zap.Error(err))
}
}

Expand All @@ -137,16 +196,16 @@ func (h *handler) writeJSONToResponse(status int, payload interface{}, w http.Re
marshal, err := json.Marshal(payload)
if err != nil {
// always a 500
h.Logger.Warn(fmt.Sprintf("error marshelling the response: %v", err))
h.Logger.Warn("error marshalling the response", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
return
}

w.Header().Add("Content-Type", "application/json")
w.Header().Add(headerContentType, contentTypeJSON)
w.WriteHeader(status)
_, err = w.Write(marshal)
if err != nil {
h.Logger.Warn(fmt.Sprintf("error while writing response: %v", err))
h.Logger.Warn("error while writing response", zap.Error(err))
}
}

Expand Down
Loading
Loading