Skip to content
2 changes: 1 addition & 1 deletion go-sdk/pkg/interaction-store/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewClientV1(config *Config, timing func(name string, value time.Duration, t

return &ClientV1{
grpcClient: conn,
client: pb.NewInteractionStoreTimeSeriesServiceClient(conn.Conn),
client: pb.NewInteractionStoreTimeSeriesServiceClient(conn),
adapter: Adapter{},
callerId: config.CallerId,
}
Expand Down
1 change: 1 addition & 0 deletions horizon/internal/skye/handler/skye.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,7 @@ func (s *skyeConfig) ApproveVariantRequest(requestID int, approval ApprovalReque
// return fmt.Errorf("admin must provide caching_configuration during variant approval")
// }
// For RT Partition, collect all RT partitions for all models across all entities

rtPartitions := make(map[int]bool)
entities, err := s.EtcdConfig.GetEntities()
if err != nil {
Expand Down
44 changes: 39 additions & 5 deletions interaction-store/internal/compression/zstd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,23 @@ import (
"github.com/klauspost/compress/zstd"
)

const (
// decodeBufPoolCap: initial pooled buffer size (256KB). Grown buffers are put back for reuse.
decodeBufPoolCap = 256 * 1024
)

var (
encoder *ZStdEncoder

decoder *ZStdDecoder
mut sync.Mutex

mut sync.Mutex
// decodeBufPool reuses buffers for Zstd DecodeAll to reduce allocations and GC.
decodeBufPool = sync.Pool{
New: func() interface{} {
b := make([]byte, 0, decodeBufPoolCap)
return &b
},
}
)

type ZStdEncoder struct {
Expand Down Expand Up @@ -72,9 +83,32 @@ func NewZStdDecoder() (*ZStdDecoder, error) {
}

func (d *ZStdDecoder) Decode(cdata []byte) (data []byte, err error) {
data, err = d.decoder.DecodeAll(cdata, make([]byte, 0, len(cdata)*3))
bufPtr := decodeBufPool.Get()
var buf []byte
if p, ok := bufPtr.(*[]byte); ok {
buf = *p
}
decoded, err := d.decoder.DecodeAll(cdata, buf[:0])
if err != nil {
return
if p, ok := bufPtr.(*[]byte); ok {
*p = (*p)[:0]
decodeBufPool.Put(bufPtr)
}
return nil, err
}
// Caller keeps the returned slice; copy so we can reuse the pool buffer.
result := make([]byte, len(decoded))
copy(result, decoded)
if p, ok := bufPtr.(*[]byte); ok {
if cap(decoded) <= decodeBufPoolCap {
*p = (*p)[:0]
decodeBufPool.Put(bufPtr)
} else {
// Put back the grown buffer so the pool accumulates larger buffers for reuse.
ptr := new([]byte)
*ptr = decoded[:0]
decodeBufPool.Put(ptr)
}
Comment thread
shubhamk-meesho marked this conversation as resolved.
}
return
return result, nil
}
40 changes: 40 additions & 0 deletions interaction-store/internal/compression/zstd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -74,3 +75,42 @@ func TestZStdDecoder_Decode(t *testing.T) {
t.Errorf("Invalid decoded data length: %d", len(ddata))
}
}

func TestGetEncoder_ZSTD(t *testing.T) {
enc, err := GetEncoder(TypeZSTD)
assert.NoError(t, err)
assert.NotNil(t, enc)
var out []byte
enc.Encode([]byte("test"), &out)
assert.NotEmpty(t, out)
}

func TestGetEncoder_Unsupported(t *testing.T) {
enc, err := GetEncoder(TypeNone)
assert.Error(t, err)
assert.Nil(t, enc)
assert.Contains(t, err.Error(), "unsupported compression type")

enc, err = GetEncoder(Type(99))
assert.Error(t, err)
assert.Nil(t, enc)
}

func TestGetDecoder_ZSTD(t *testing.T) {
dec, err := GetDecoder(TypeZSTD)
assert.NoError(t, err)
assert.NotNil(t, dec)
enc, _ := NewZStdEncoder()
var compressed []byte
enc.Encode([]byte("test"), &compressed)
out, err := dec.Decode(compressed)
assert.NoError(t, err)
assert.Equal(t, []byte("test"), out)
}
Comment thread
shubhamk-meesho marked this conversation as resolved.

func TestGetDecoder_Unsupported(t *testing.T) {
dec, err := GetDecoder(TypeNone)
assert.Error(t, err)
assert.Nil(t, dec)
assert.Contains(t, err.Error(), "unsupported compression type")
}
25 changes: 25 additions & 0 deletions interaction-store/internal/constants/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package constants

// Week-column and rolling window constants for interaction-store (Scylla schema: week_0..week_23).
const (
// WeekColumnPrefix is the prefix for week column names in Scylla (e.g. week_0, week_1).
WeekColumnPrefix = "week_"

// TotalWeeks is the number of weeks in the rolling window (week_0..week_23).
TotalWeeks = 24

// MaxWeekIndex is the maximum week index (TotalWeeks - 1).
MaxWeekIndex = 23

// WeeksPerBucket is the number of weeks per Scylla bucket (bucket0: 0-7, bucket1: 8-15, bucket2: 16-23).
WeeksPerBucket = 8

// MaxRetrieveLimit is the maximum number of events returned per retrieve request.
MaxRetrieveLimit = 2000

// MaxOrderEventsPerWeek is the maximum order events retained per week column.
MaxOrderEventsPerWeek = 500

// MaxClickEventsPerWeek is the maximum click events retained per week column.
MaxClickEventsPerWeek = 500
)
147 changes: 74 additions & 73 deletions interaction-store/internal/data/scylla/scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/Meesho/BharatMLStack/interaction-store/pkg/metric"
"github.com/Meesho/BharatMLStack/interaction-store/pkg/scylla"
"github.com/cespare/xxhash/v2"
"github.com/gocql/gocql"
"github.com/rs/zerolog/log"
)
Expand All @@ -21,7 +20,8 @@ var (
)

const (
envPrefix = "STORAGE_SCYLLA"
envPrefix = "STORAGE_SCYLLA"
cacheKeySep = "_"
)

type Scylla struct {
Expand Down Expand Up @@ -51,16 +51,14 @@ func InitScyllaDb() Database {

func (s *Scylla) RetrieveInteractions(tableName string, userId string, columns []string) (map[string]interface{}, error) {
t1 := time.Now()
metric.Incr("scylla_db_interactions_retrieve_count", []string{metric.TagAsString("table_name", tableName)})
preparedQuery := createRetrieveInteractionsPreparedQuery(s.session, s.keyspace, tableName, columns)
response := executeRetrieveInteractions(preparedQuery, userId)
response := executeRetrieveInteractions(preparedQuery, userId, columns)
metric.Timing("scylla_db_interactions_retrieve_latency", time.Since(t1), []string{metric.TagAsString("table_name", tableName)})
return response, nil
Comment thread
shubhamk-meesho marked this conversation as resolved.
}

func (s *Scylla) UpdateInteractions(tableName string, userId string, columns map[string]interface{}) error {
t1 := time.Now()
metric.Incr("scylla_db_interactions_update_count", []string{metric.TagAsString("table_name", tableName)})
preparedQuery, sortedColumns := createUpdateInteractionsPreparedQuery(s.session, s.keyspace, tableName, userId, columns)
err := executeUpdateInteractions(preparedQuery, sortedColumns, userId, columns)
if err != nil {
Expand All @@ -73,16 +71,14 @@ func (s *Scylla) UpdateInteractions(tableName string, userId string, columns map

func (s *Scylla) RetrieveMetadata(metadataTableName string, userId string, columns []string) (map[string]interface{}, error) {
t1 := time.Now()
metric.Incr("scylla_db_retrieve_metadata_count", []string{metric.TagAsString("metadata_table_name", metadataTableName)})
preparedQuery := createRetrieveMetadataPreparedQuery(s.session, s.keyspace, metadataTableName, columns)
response := executeRetrieveMetadata(preparedQuery, userId)
response := executeRetrieveMetadata(preparedQuery, userId, columns)
metric.Timing("scylla_db_retrieve_metadata_latency", time.Since(t1), []string{metric.TagAsString("metadata_table_name", metadataTableName)})
return response, nil
}

func (s *Scylla) UpdateMetadata(metadataTableName string, userId string, columns map[string]interface{}) error {
t1 := time.Now()
metric.Incr("scylla_db_update_metadata_count", []string{metric.TagAsString("metadata_table_name", metadataTableName)})
preparedQuery, sortedColumns := createUpdateMetadataPreparedQuery(s.session, s.keyspace, metadataTableName, columns)
err := executeUpdateMetadata(preparedQuery, sortedColumns, userId, columns)
if err != nil {
Expand All @@ -93,17 +89,33 @@ func (s *Scylla) UpdateMetadata(metadataTableName string, userId string, columns
return nil
}

func executeRetrieveInteractions(preparedQuery *gocql.Query, userId string) map[string]interface{} {
func executeRetrieveInteractions(preparedQuery *gocql.Query, userId string, columns []string) map[string]interface{} {
preparedQuery.Bind(userId).Consistency(gocql.One)
res, err := preparedQuery.Iter().SliceMap()
if err != nil {
log.Error().Msgf("error executing cql query %v: %v", preparedQuery, err)
return nil
iter := preparedQuery.Iter()

scanDest := make([]*[]byte, len(columns))
scanArgs := make([]interface{}, len(columns))
for i := range columns {
scanDest[i] = new([]byte)
scanArgs[i] = scanDest[i]
}
if len(res) == 0 {

if !iter.Scan(scanArgs...) {
if err := iter.Close(); err != nil {
log.Error().Msgf("error executing cql query %v: %v", preparedQuery, err)
}
return make(map[string]interface{})
}
return res[0]

result := make(map[string]interface{}, len(columns))
for i, col := range columns {
result[col] = *scanDest[i]
}

if err := iter.Close(); err != nil {
log.Error().Msgf("error closing iterator for query %v: %v", preparedQuery, err)
}
return result
}

func executeUpdateInteractions(preparedQuery *gocql.Query, sortedColumns []string, userId string, columns map[string]interface{}) error {
Expand All @@ -121,17 +133,34 @@ func executeUpdateInteractions(preparedQuery *gocql.Query, sortedColumns []strin
return nil
}

func executeRetrieveMetadata(preparedQuery *gocql.Query, userId string) map[string]interface{} {
// executeRetrieveMetadata scans metadata table columns as int (schema: week_0..week_23 are int).
func executeRetrieveMetadata(preparedQuery *gocql.Query, userId string, columns []string) map[string]interface{} {
preparedQuery.Bind(userId).Consistency(gocql.One)
res, err := preparedQuery.Iter().SliceMap()
if err != nil {
log.Error().Msgf("error executing cql query %v: %v", preparedQuery, err)
return nil
iter := preparedQuery.Iter()

scanDest := make([]*int, len(columns))
scanArgs := make([]interface{}, len(columns))
for i := range columns {
scanDest[i] = new(int)
scanArgs[i] = scanDest[i]
}
if len(res) == 0 {

if !iter.Scan(scanArgs...) {
if err := iter.Close(); err != nil {
log.Error().Msgf("error executing cql query %v: %v", preparedQuery, err)
}
return make(map[string]interface{})
}
return res[0]

result := make(map[string]interface{}, len(columns))
for i, col := range columns {
result[col] = *scanDest[i]
}

if err := iter.Close(); err != nil {
log.Error().Msgf("error closing iterator for query %v: %v", preparedQuery, err)
}
return result
}

func executeUpdateMetadata(preparedQuery *gocql.Query, sortedColumns []string, userId string, columns map[string]interface{}) error {
Expand All @@ -149,61 +178,33 @@ func executeUpdateMetadata(preparedQuery *gocql.Query, sortedColumns []string, u
return nil
}

// unorderedHashXXH64 computes a hash that's independent of column order
func unorderedHashXXH64(values []string) uint64 {
var h uint64 = 0
for _, v := range values {
h += xxhash.Sum64String(v)
}
return h
}

// getRetrieveQueryCacheKey generates a hash-based cache key for retrieve queries
// The key is order-independent for columns, meaning [col1, col2] and [col2, col1] produce the same key
// Callers pass columns already sorted, so no hash is needed.
func getRetrieveQueryCacheKey(keyspace, tableName string, columns []string) string {
h := xxhash.New()
// Fixed parts (order matters)
_, _ = h.WriteString(keyspace)
_, _ = h.WriteString(tableName)

// Unordered columns hash (order doesn't matter)
columnsHash := unorderedHashXXH64(columns)
buf := make([]byte, 8)
putUint64(buf, columnsHash)
_, _ = h.Write(buf)

_, _ = h.WriteString("retrieve")

return fmt.Sprintf("%x", h.Sum(nil))
var b strings.Builder
// keyspace + tableName exact; ~10 chars per column name (heuristic); +16 for "_" separators and "retrieve" suffix.
b.Grow(len(keyspace) + len(tableName) + len(columns)*10 + 16)
b.WriteString(keyspace)
b.WriteString(cacheKeySep)
b.WriteString(tableName)
b.WriteString(cacheKeySep)
b.WriteString(strings.Join(columns, ","))
b.WriteString(cacheKeySep)
b.WriteString("retrieve")
return b.String()
}

// getUpdateQueryCacheKey generates a cache key for update queries
func getUpdateQueryCacheKey(keyspace, tableName string, columns []string) string {
h := xxhash.New()
_, _ = h.WriteString(keyspace)
_, _ = h.WriteString(tableName)

// Unordered columns hash
columnsHash := unorderedHashXXH64(columns)
buf := make([]byte, 8)
putUint64(buf, columnsHash)
_, _ = h.Write(buf)

_, _ = h.WriteString("update")

return fmt.Sprintf("%x", h.Sum(nil))
}

// putUint64 writes uint64 to byte slice in little-endian format
func putUint64(b []byte, v uint64) {
b[0] = byte(v)
b[1] = byte(v >> 8)
b[2] = byte(v >> 16)
b[3] = byte(v >> 24)
b[4] = byte(v >> 32)
b[5] = byte(v >> 40)
b[6] = byte(v >> 48)
b[7] = byte(v >> 56)
var b strings.Builder
// Grow hint: keyspace + tableName exact; ~10 chars per column name (heuristic); +12 for "_" separators and "update" suffix.
b.Grow(len(keyspace) + len(tableName) + len(columns)*10 + 12)
b.WriteString(keyspace)
b.WriteString(cacheKeySep)
b.WriteString(tableName)
b.WriteString(cacheKeySep)
b.WriteString(strings.Join(columns, ","))
b.WriteString(cacheKeySep)
b.WriteString("update")
return b.String()
}

func createRetrieveInteractionsPreparedQuery(session *gocql.Session, keyspace string, tableName string, columns []string) *gocql.Query {
Expand Down
Loading