From d1aba09fd5382b22589766acc5d3c0e590f91d84 Mon Sep 17 00:00:00 2001 From: Andreas Bergmeier Date: Mon, 11 May 2026 19:39:44 +0200 Subject: [PATCH] Add support for Valkey as Online Store. Signed-off-by: Andreas Bergmeier --- go.mod | 15 +- go.sum | 16 ++ go/internal/feast/onlinestore/onlinestore.go | 3 + .../feast/onlinestore/redisonlinestore.go | 4 +- .../onlinestore/redisonlinestore_test.go | 8 +- .../feast/onlinestore/valkeyonlinestore.go | 264 ++++++++++++++++++ .../onlinestore/valkeyonlinestore_test.go | 156 +++++++++++ 7 files changed, 452 insertions(+), 14 deletions(-) create mode 100644 go/internal/feast/onlinestore/valkeyonlinestore.go create mode 100644 go/internal/feast/onlinestore/valkeyonlinestore_test.go diff --git a/go.mod b/go.mod index b76ac71d1e1..0186420becc 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.38.0 go.opentelemetry.io/otel/sdk v1.43.0 go.opentelemetry.io/otel/trace v1.43.0 - golang.org/x/sync v0.18.0 + golang.org/x/sync v0.19.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba google.golang.org/grpc v1.76.0 google.golang.org/protobuf v1.36.10 @@ -105,6 +105,7 @@ require ( github.com/prometheus/procfs v0.16.1 // indirect github.com/spiffe/go-spiffe/v2 v2.5.0 // indirect github.com/stretchr/objx v0.5.2 // indirect + github.com/valkey-io/valkey-go v1.0.74 github.com/zeebo/errs v1.4.0 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect @@ -115,16 +116,16 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.43.0 // indirect go.opentelemetry.io/proto/otlp v1.7.1 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect - golang.org/x/crypto v0.45.0 // indirect + golang.org/x/crypto v0.46.0 // indirect golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect - golang.org/x/mod v0.29.0 // indirect - golang.org/x/net v0.47.0 // indirect + golang.org/x/mod v0.30.0 // indirect + golang.org/x/net v0.48.0 // indirect golang.org/x/oauth2 v0.33.0 // indirect golang.org/x/sys v0.42.0 // indirect - golang.org/x/telemetry v0.0.0-20251008203120-078029d740a8 // indirect - golang.org/x/text v0.31.0 // indirect + golang.org/x/telemetry v0.0.0-20251111182119-bc8e575c7b54 // indirect + golang.org/x/text v0.32.0 // indirect golang.org/x/time v0.14.0 // indirect - golang.org/x/tools v0.38.0 // indirect + golang.org/x/tools v0.39.0 // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect google.golang.org/api v0.256.0 // indirect google.golang.org/genproto v0.0.0-20250922171735-9219d122eba9 // indirect diff --git a/go.sum b/go.sum index b43b860c04f..e6a9958ea54 100644 --- a/go.sum +++ b/go.sum @@ -248,6 +248,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/valkey-io/valkey-go v1.0.74 h1:NqtBHzjybz+is+c71hsyZP7hoE5lwCHQX026me0Vb08= +github.com/valkey-io/valkey-go v1.0.74/go.mod h1:VGhZ6fs68Qrn2+OhH+6waZH27bjpgQOiLyUQyXuYK5k= github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/errs v1.4.0 h1:XNdoD/RRMKP7HD0UhJnIzUy74ISdGGxURlYG8HSWSfM= @@ -287,19 +289,27 @@ go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q= golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4= +golang.org/x/crypto v0.46.0 h1:cKRW/pmt1pKAfetfu+RCEvjvZkA9RimPbh7bhFjGVBU= +golang.org/x/crypto v0.46.0/go.mod h1:Evb/oLKmMraqjZ2iQTwDwvCtJkczlDuTmdJXoZVzqU0= golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk= golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY= golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA= golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w= +golang.org/x/mod v0.30.0 h1:fDEXFVZ/fmCKProc/yAXXUijritrDzahmwwefnjoPFk= +golang.org/x/mod v0.30.0/go.mod h1:lAsf5O2EvJeSFMiBxXDki7sCgAxEUcZHXoXMKT4GJKc= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= +golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU= +golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY= golang.org/x/oauth2 v0.33.0 h1:4Q+qn+E5z8gPRJfmRy7C2gGG3T4jIprK6aSYgTXGRpo= golang.org/x/oauth2 v0.33.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -310,12 +320,18 @@ golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/telemetry v0.0.0-20251008203120-078029d740a8 h1:LvzTn0GQhWuvKH/kVRS3R3bVAsdQWI7hvfLHGgh9+lU= golang.org/x/telemetry v0.0.0-20251008203120-078029d740a8/go.mod h1:Pi4ztBfryZoJEkyFTI5/Ocsu2jXyDr6iSdgJiYE/uwE= +golang.org/x/telemetry v0.0.0-20251111182119-bc8e575c7b54 h1:E2/AqCUMZGgd73TQkxUMcMla25GB9i/5HOdLr+uH7Vo= +golang.org/x/telemetry v0.0.0-20251111182119-bc8e575c7b54/go.mod h1:hKdjCMrbv9skySur+Nek8Hd0uJ0GuxJIoIX2payrIdQ= golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM= golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= +golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU= +golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY= golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ= golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs= +golang.org/x/tools v0.39.0 h1:ik4ho21kwuQln40uelmciQPp9SipgNDdrafrYA4TmQQ= +golang.org/x/tools v0.39.0/go.mod h1:JnefbkDPyD8UU2kI5fuf8ZX4/yUeh9W877ZeBONxUqQ= golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da h1:noIWHXmPHxILtqtCOPIhSt0ABwskkZKjD3bXGnZGpNY= golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= diff --git a/go/internal/feast/onlinestore/onlinestore.go b/go/internal/feast/onlinestore/onlinestore.go index 837a3f31d82..4e22099ac6b 100644 --- a/go/internal/feast/onlinestore/onlinestore.go +++ b/go/internal/feast/onlinestore/onlinestore.go @@ -63,6 +63,9 @@ func NewOnlineStore(config *registry.RepoConfig) (OnlineStore, error) { } else if onlineStoreType == "redis" { onlineStore, err := NewRedisOnlineStore(config.Project, config, config.OnlineStore) return onlineStore, err + } else if onlineStoreType == "valkey" { + onlineStore, err := NewValkeyOnlineStore(config.Project, config, config.OnlineStore) + return onlineStore, err } else if onlineStoreType == "dynamodb" { onlineStore, err := NewDynamodbOnlineStore(config.Project, config, config.OnlineStore) return onlineStore, err diff --git a/go/internal/feast/onlinestore/redisonlinestore.go b/go/internal/feast/onlinestore/redisonlinestore.go index e39b3505710..0543fdb5d99 100644 --- a/go/internal/feast/onlinestore/redisonlinestore.go +++ b/go/internal/feast/onlinestore/redisonlinestore.go @@ -175,7 +175,7 @@ func (r *RedisOnlineStore) buildFeatureViewIndices(featureViewNames []string, fe return featureViewIndices, indicesFeatureView, index } -func (r *RedisOnlineStore) buildRedisHashSetKeys(featureViewNames []string, featureNames []string, indicesFeatureView map[int]string, index int) ([]string, []string) { +func buildRedisHashSetKeys(featureViewNames []string, featureNames []string, indicesFeatureView map[int]string, index int) ([]string, []string) { featureCount := len(featureNames) var hsetKeys = make([]string, index) h := murmur3.New32() @@ -218,7 +218,7 @@ func (r *RedisOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.E featureCount := len(featureNames) featureViewIndices, indicesFeatureView, index := r.buildFeatureViewIndices(featureViewNames, featureNames) - hsetKeys, featureNamesWithTimeStamps := r.buildRedisHashSetKeys(featureViewNames, featureNames, indicesFeatureView, index) + hsetKeys, featureNamesWithTimeStamps := buildRedisHashSetKeys(featureViewNames, featureNames, indicesFeatureView, index) redisKeys, redisKeyToEntityIndex, err := r.buildRedisKeys(entityKeys) if err != nil { return nil, err diff --git a/go/internal/feast/onlinestore/redisonlinestore_test.go b/go/internal/feast/onlinestore/redisonlinestore_test.go index 466f8a763f1..9c3a353969c 100644 --- a/go/internal/feast/onlinestore/redisonlinestore_test.go +++ b/go/internal/feast/onlinestore/redisonlinestore_test.go @@ -107,16 +107,14 @@ func TestBuildFeatureViewIndices(t *testing.T) { } func TestBuildHsetKeys(t *testing.T) { - r := &RedisOnlineStore{} - t.Run("test with empty featureViewNames and featureNames", func(t *testing.T) { - hsetKeys, featureNames := r.buildRedisHashSetKeys([]string{}, []string{}, map[int]string{}, 0) + hsetKeys, featureNames := buildRedisHashSetKeys([]string{}, []string{}, map[int]string{}, 0) assert.Equal(t, 0, len(hsetKeys)) assert.Equal(t, 0, len(featureNames)) }) t.Run("test with non-empty featureViewNames and featureNames", func(t *testing.T) { - hsetKeys, featureNames := r.buildRedisHashSetKeys([]string{"view1", "view2"}, []string{"feature1", "feature2"}, map[int]string{2: "view1", 3: "view2"}, 4) + hsetKeys, featureNames := buildRedisHashSetKeys([]string{"view1", "view2"}, []string{"feature1", "feature2"}, map[int]string{2: "view1", 3: "view2"}, 4) assert.Equal(t, 4, len(hsetKeys)) assert.Equal(t, 4, len(featureNames)) assert.Equal(t, "_ts:view1", hsetKeys[2]) @@ -126,7 +124,7 @@ func TestBuildHsetKeys(t *testing.T) { }) t.Run("test with more featureViewNames than featureNames", func(t *testing.T) { - hsetKeys, featureNames := r.buildRedisHashSetKeys([]string{"view1", "view2", "view3"}, []string{"feature1", "feature2", "feature3"}, map[int]string{3: "view1", 4: "view2", 5: "view3"}, 6) + hsetKeys, featureNames := buildRedisHashSetKeys([]string{"view1", "view2", "view3"}, []string{"feature1", "feature2", "feature3"}, map[int]string{3: "view1", 4: "view2", 5: "view3"}, 6) assert.Equal(t, 6, len(hsetKeys)) assert.Equal(t, 6, len(featureNames)) assert.Equal(t, "_ts:view1", hsetKeys[3]) diff --git a/go/internal/feast/onlinestore/valkeyonlinestore.go b/go/internal/feast/onlinestore/valkeyonlinestore.go new file mode 100644 index 00000000000..9cb0d319a92 --- /dev/null +++ b/go/internal/feast/onlinestore/valkeyonlinestore.go @@ -0,0 +1,264 @@ +package onlinestore + +import ( + "context" + "crypto/tls" + "errors" + "fmt" + "strconv" + "strings" + + "github.com/feast-dev/feast/go/internal/feast/registry" + valkey "github.com/valkey-io/valkey-go" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/feast-dev/feast/go/protos/feast/serving" + "github.com/feast-dev/feast/go/protos/feast/types" + "github.com/rs/zerolog/log" +) + +type ValkeyOnlineStore struct { + + // Feast project name + project string + + // Valkey connection type, either a single node server (redisNode) or a cluster (redisCluster) + t redisType + + // Valkey client (handles both standalone and cluster modes) + client valkey.Client + + // Parsed connection options (stored for introspection/testing) + opts valkey.ClientOption + + config *registry.RepoConfig +} + +func NewValkeyOnlineStore(project string, config *registry.RepoConfig, onlineStoreConfig map[string]interface{}) (*ValkeyOnlineStore, error) { + store := ValkeyOnlineStore{ + project: project, + config: config, + } + + var address []string + var username string + var password string + var tlsConfig *tls.Config + var db int // Default to 0 + + valkeyStoreType, err := getRedisType(onlineStoreConfig) + if err != nil { + return nil, err + } + store.t = valkeyStoreType + + // Parse connection_string and write it into conf.address, conf.password, and conf.ssl + valkeyConnJson, ok := onlineStoreConfig["connection_string"] + if !ok { + // Default to "localhost:6379" + valkeyConnJson = "localhost:6379" + } + if valkeyConnStr, ok := valkeyConnJson.(string); !ok { + return nil, fmt.Errorf("failed to convert connection_string to string: %+v", valkeyConnJson) + } else { + parts := strings.Split(valkeyConnStr, ",") + for _, part := range parts { + if strings.Contains(part, ":") { + address = append(address, part) + } else if strings.Contains(part, "=") { + kv := strings.SplitN(part, "=", 2) + if kv[0] == "password" { + password = kv[1] + } else if kv[0] == "ssl" { + result, err := strconv.ParseBool(kv[1]) + if err != nil { + return nil, err + } else if result { + tlsConfig = &tls.Config{} + } + } else if kv[0] == "db" { + db, err = strconv.Atoi(kv[1]) + if err != nil { + return nil, err + } + } else { + return nil, fmt.Errorf("unrecognized option in connection_string: %s. Must be one of 'password', 'ssl'", kv[0]) + } + } else { + return nil, fmt.Errorf("unable to parse a part of connection_string: %s. Must contain either ':' (addresses) or '=' (options", part) + } + } + } + + clientOption := valkey.ClientOption{ + InitAddress: address, + Username: username, + Password: password, + TLSConfig: tlsConfig, + } + + if valkeyStoreType == redisNode { + log.Info().Msgf("Using Valkey: %s", address[0]) + clientOption.InitAddress = address[:1] + clientOption.SelectDB = db + } else { + log.Info().Msgf("Using Valkey Cluster: %s", address) + clientOption.ReplicaOnly = true + } + + store.opts = clientOption + + client, err := valkey.NewClient(clientOption) + if err != nil { + return nil, err + } + store.client = client + + return &store, nil +} + +func (r *ValkeyOnlineStore) buildFeatureViewIndices(featureViewNames []string, featureNames []string) (map[string]int, map[int]string, int) { + featureViewIndices := make(map[string]int) + indicesFeatureView := make(map[int]string) + index := len(featureNames) + for _, featureViewName := range featureViewNames { + if _, ok := featureViewIndices[featureViewName]; !ok { + featureViewIndices[featureViewName] = index + indicesFeatureView[index] = featureViewName + index += 1 + } + } + return featureViewIndices, indicesFeatureView, index +} + +func (r *ValkeyOnlineStore) buildValkeyKeys(entityKeys []*types.EntityKey) ([]*[]byte, map[string]int, error) { + valkeyKeys := make([]*[]byte, len(entityKeys)) + valkeyKeyToEntityIndex := make(map[string]int) + for i := 0; i < len(entityKeys); i++ { + var key, err = buildValkeyKey(r.project, entityKeys[i], r.config.EntityKeySerializationVersion) + if err != nil { + return nil, nil, err + } + valkeyKeys[i] = key + valkeyKeyToEntityIndex[string(*key)] = i + } + return valkeyKeys, valkeyKeyToEntityIndex, nil +} + +func (r *ValkeyOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error) { + ctx, span := tracer.Start(ctx, "valkey.OnlineRead") + defer span.End() + + featureCount := len(featureNames) + featureViewIndices, indicesFeatureView, index := r.buildFeatureViewIndices(featureViewNames, featureNames) + hsetKeys, featureNamesWithTimeStamps := buildRedisHashSetKeys(featureViewNames, featureNames, indicesFeatureView, index) + valkeyKeys, valkeyKeyToEntityIndex, err := r.buildValkeyKeys(entityKeys) + if err != nil { + return nil, err + } + + results := make([][]FeatureData, len(entityKeys)) + + // Build pipelined HMGET commands in key order so results can be correlated by index. + cmds := make([]valkey.Completed, len(valkeyKeys)) + keyOrder := make([]string, len(valkeyKeys)) + for i, valkeyKey := range valkeyKeys { + keyString := string(*valkeyKey) + keyOrder[i] = keyString + cmds[i] = r.client.B().Hmget().Key(keyString).Field(hsetKeys...).Build() + } + + responses := r.client.DoMulti(ctx, cmds...) + + for i, resp := range responses { + keyString := keyOrder[i] + entityIndex := valkeyKeyToEntityIndex[keyString] + + results[entityIndex] = make([]FeatureData, featureCount) + + arr, err := resp.ToArray() + if err != nil { + return nil, err + } + + var timeStamp timestamppb.Timestamp + resContainsNonNil := false + + for featureIndex, msg := range arr { + if featureIndex == featureCount { + break + } + + if msg.IsNil() { + // TODO (Ly): Can there be nil result within each feature or they will all be returned as string proto of types.Value_NullVal proto? + featureName := featureNamesWithTimeStamps[featureIndex] + featureViewName := featureViewNames[featureIndex] + timeStampIndex := featureViewIndices[featureViewName] + tsMsg := arr[timeStampIndex] + if !tsMsg.IsNil() { + timeStampString, err := tsMsg.ToString() + if err != nil { + return nil, errors.New("error parsing value from valkey") + } + if err := proto.Unmarshal([]byte(timeStampString), &timeStamp); err != nil { + return nil, errors.New("error converting parsed valkey value to timestamppb.Timestamp") + } + } + + results[entityIndex][featureIndex] = FeatureData{ + Reference: serving.FeatureReferenceV2{FeatureViewName: featureViewName, FeatureName: featureName}, + Timestamp: timestamppb.Timestamp{Seconds: timeStamp.Seconds, Nanos: timeStamp.Nanos}, + Value: types.Value{Val: &types.Value_NullVal{NullVal: types.Null_NULL}}, + } + } else { + valueString, err := msg.ToString() + if err != nil { + return nil, errors.New("error parsing Value from valkey") + } + resContainsNonNil = true + var value types.Value + if err := proto.Unmarshal([]byte(valueString), &value); err != nil { + return nil, errors.New("error converting parsed valkey Value to types.Value") + } + featureName := featureNamesWithTimeStamps[featureIndex] + featureViewName := featureViewNames[featureIndex] + timeStampIndex := featureViewIndices[featureViewName] + tsMsg := arr[timeStampIndex] + if !tsMsg.IsNil() { + timeStampString, err := tsMsg.ToString() + if err != nil { + return nil, errors.New("error parsing Value from valkey") + } + if err := proto.Unmarshal([]byte(timeStampString), &timeStamp); err != nil { + return nil, errors.New("error converting parsed valkey Value to timestamppb.Timestamp") + } + } + results[entityIndex][featureIndex] = FeatureData{ + Reference: serving.FeatureReferenceV2{FeatureViewName: featureViewName, FeatureName: featureName}, + Timestamp: timestamppb.Timestamp{Seconds: timeStamp.Seconds, Nanos: timeStamp.Nanos}, + Value: types.Value{Val: value.Val}, + } + } + } + + if !resContainsNonNil { + results[entityIndex] = nil + } + } + + return results, nil +} + +func (r *ValkeyOnlineStore) Destruct() { + r.client.Close() +} + +func buildValkeyKey(project string, entityKey *types.EntityKey, entityKeySerializationVersion int64) (*[]byte, error) { + serKey, err := serializeEntityKey(entityKey, entityKeySerializationVersion) + if err != nil { + return nil, err + } + fullKey := append(*serKey, []byte(project)...) + return &fullKey, nil +} diff --git a/go/internal/feast/onlinestore/valkeyonlinestore_test.go b/go/internal/feast/onlinestore/valkeyonlinestore_test.go new file mode 100644 index 00000000000..058427a1994 --- /dev/null +++ b/go/internal/feast/onlinestore/valkeyonlinestore_test.go @@ -0,0 +1,156 @@ +package onlinestore + +import ( + "testing" + + "github.com/feast-dev/feast/go/internal/feast/registry" + "github.com/feast-dev/feast/go/protos/feast/types" + + "github.com/stretchr/testify/assert" +) + +func TestNewValkeyOnlineStore(t *testing.T) { + var config = map[string]interface{}{ + "connection_string": "localhost:6379", + } + rc := ®istry.RepoConfig{ + OnlineStore: config, + EntityKeySerializationVersion: 3, + } + store, err := NewValkeyOnlineStore("test", rc, config) + assert.Nil(t, err) + assert.Equal(t, "localhost:6379", store.opts.InitAddress[0]) + assert.Equal(t, "", store.opts.Password) + assert.Equal(t, 0, store.opts.SelectDB) + assert.Nil(t, store.opts.TLSConfig) +} + +func TestNewValkeyOnlineStoreWithPassword(t *testing.T) { + var config = map[string]interface{}{ + "connection_string": "localhost:6379,password=secret", + } + rc := ®istry.RepoConfig{ + OnlineStore: config, + EntityKeySerializationVersion: 3, + } + store, err := NewValkeyOnlineStore("test", rc, config) + assert.Nil(t, err) + assert.Equal(t, "localhost:6379", store.opts.InitAddress[0]) + assert.Equal(t, "secret", store.opts.Password) +} + +func TestNewValkeyOnlineStoreWithDB(t *testing.T) { + var config = map[string]interface{}{ + "connection_string": "localhost:6379,db=1", + } + rc := ®istry.RepoConfig{ + OnlineStore: config, + EntityKeySerializationVersion: 3, + } + store, err := NewValkeyOnlineStore("test", rc, config) + assert.Nil(t, err) + assert.Equal(t, "localhost:6379", store.opts.InitAddress[0]) + assert.Equal(t, 1, store.opts.SelectDB) +} + +func TestNewValkeyOnlineStoreWithSsl(t *testing.T) { + var config = map[string]interface{}{ + "connection_string": "localhost:6379,ssl=true", + } + rc := ®istry.RepoConfig{ + OnlineStore: config, + EntityKeySerializationVersion: 3, + } + store, err := NewValkeyOnlineStore("test", rc, config) + assert.Nil(t, err) + assert.Equal(t, "localhost:6379", store.opts.InitAddress[0]) + assert.NotNil(t, store.opts.TLSConfig) +} + +func TestValkeyBuildFeatureViewIndices(t *testing.T) { + r := &ValkeyOnlineStore{} + + t.Run("test with empty featureViewNames and featureNames", func(t *testing.T) { + featureViewIndices, indicesFeatureView, index := r.buildFeatureViewIndices([]string{}, []string{}) + assert.Equal(t, 0, len(featureViewIndices)) + assert.Equal(t, 0, len(indicesFeatureView)) + assert.Equal(t, 0, index) + }) + + t.Run("test with non-empty featureNames and empty featureViewNames", func(t *testing.T) { + featureViewIndices, indicesFeatureView, index := r.buildFeatureViewIndices([]string{}, []string{"feature1", "feature2"}) + assert.Equal(t, 0, len(featureViewIndices)) + assert.Equal(t, 0, len(indicesFeatureView)) + assert.Equal(t, 2, index) + }) + + t.Run("test with non-empty featureViewNames and featureNames", func(t *testing.T) { + featureViewIndices, indicesFeatureView, index := r.buildFeatureViewIndices([]string{"view1", "view2"}, []string{"feature1", "feature2"}) + assert.Equal(t, 2, len(featureViewIndices)) + assert.Equal(t, 2, len(indicesFeatureView)) + assert.Equal(t, 4, index) + assert.Equal(t, "view1", indicesFeatureView[2]) + assert.Equal(t, "view2", indicesFeatureView[3]) + }) + + t.Run("test with duplicate featureViewNames", func(t *testing.T) { + featureViewIndices, indicesFeatureView, index := r.buildFeatureViewIndices([]string{"view1", "view1"}, []string{"feature1", "feature2"}) + assert.Equal(t, 1, len(featureViewIndices)) + assert.Equal(t, 1, len(indicesFeatureView)) + assert.Equal(t, 3, index) + assert.Equal(t, "view1", indicesFeatureView[2]) + }) +} + +func TestValkeyBuildValkeyKeys(t *testing.T) { + r := &ValkeyOnlineStore{ + project: "test_project", + config: ®istry.RepoConfig{ + EntityKeySerializationVersion: 3, + }, + } + + entity_key1 := types.EntityKey{ + JoinKeys: []string{"driver_id"}, + EntityValues: []*types.Value{{Val: &types.Value_Int64Val{Int64Val: 1005}}}, + } + + entity_key2 := types.EntityKey{ + JoinKeys: []string{"driver_id"}, + EntityValues: []*types.Value{{Val: &types.Value_Int64Val{Int64Val: 1001}}}, + } + + error_entity_key1 := types.EntityKey{ + JoinKeys: []string{"driver_id", "vehicle_id"}, + EntityValues: []*types.Value{{Val: &types.Value_Int64Val{Int64Val: 1005}}}, + } + + t.Run("test with empty entityKeys", func(t *testing.T) { + valkeyKeys, valkeyKeyToEntityIndex, err := r.buildValkeyKeys([]*types.EntityKey{}) + assert.Nil(t, err) + assert.Equal(t, 0, len(valkeyKeys)) + assert.Equal(t, 0, len(valkeyKeyToEntityIndex)) + }) + + t.Run("test with single entityKey", func(t *testing.T) { + entityKeys := []*types.EntityKey{&entity_key1} + valkeyKeys, valkeyKeyToEntityIndex, err := r.buildValkeyKeys(entityKeys) + assert.Nil(t, err) + assert.Equal(t, 1, len(valkeyKeys)) + assert.Equal(t, 1, len(valkeyKeyToEntityIndex)) + }) + + t.Run("test with multiple entityKeys", func(t *testing.T) { + entityKeys := []*types.EntityKey{&entity_key1, &entity_key2} + valkeyKeys, valkeyKeyToEntityIndex, err := r.buildValkeyKeys(entityKeys) + assert.Nil(t, err) + assert.Equal(t, 2, len(valkeyKeys)) + assert.Equal(t, 2, len(valkeyKeyToEntityIndex)) + }) + + t.Run("test with error in buildValkeyKey", func(t *testing.T) { + entityKeys := []*types.EntityKey{&error_entity_key1} + _, _, err := r.buildValkeyKeys(entityKeys) + assert.NotNil(t, err) + }) +}