Sync internal develop to public develop#346
Conversation
WalkthroughThe pull request refactors interaction-store retrieval and persistence handlers to use centralized constants for temporal calculations, introduces buffer pooling for compression, implements parallel data fetching with errgroup, enhances validation with time-range and limit constraints, and improves error context across multiple services. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant RetrieveHandler as Retrieve Handler
participant DB as Database (Scylla)
participant Deserializer
participant Merger
Client->>RetrieveHandler: Retrieve(start, end, limit)
activate RetrieveHandler
RetrieveHandler->>RetrieveHandler: validateTimeRange()
RetrieveHandler->>RetrieveHandler: buildOrderedWeeks(start, end)
RetrieveHandler->>RetrieveHandler: buildTableToFieldsMapping(orderedWeeks)
par Parallel Data Fetch
RetrieveHandler->>DB: fetchDataInParallel(tables, columns)
activate DB
DB-->>RetrieveHandler: weekToData map
deactivate DB
end
RetrieveHandler->>Deserializer: deserializeWeeks(weekToData)
activate Deserializer
Deserializer-->>RetrieveHandler: weekToEvents map
deactivate Deserializer
RetrieveHandler->>Merger: mergeFilterAndLimit(orderedWeeks, weekToEvents, start, end, limit)
activate Merger
Merger-->>RetrieveHandler: filtered events slice
deactivate Merger
RetrieveHandler-->>Client: events
deactivate RetrieveHandler
🚥 Pre-merge checks | ✅ 1 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
horizon/internal/skye/handler/skye.go (2)
605-619:⚠️ Potential issue | 🟠 MajorDo not silently drop filters missing in etcd at approval time.
On Lines 605-619, missing filters are skipped instead of failing approval. This can register a variant with incomplete criteria versus the request payload.
💡 Proposed fix
criteria := []skyeEtcd.Criteria{} for _, filter := range payload.FilterConfiguration.Criteria { - if _, ok := filtersFromEtcd[filter.ColumnName]; ok { - filterValue := filtersFromEtcd[filter.ColumnName].FilterValue - defaultValue := filtersFromEtcd[filter.ColumnName].DefaultValue - if filter.Condition == enums.FilterCondition(enums.NOT_EQUALS) { - filterValue = filtersFromEtcd[filter.ColumnName].DefaultValue - defaultValue = filtersFromEtcd[filter.ColumnName].FilterValue - - } - criteria = append(criteria, skyeEtcd.Criteria{ - ColumnName: filter.ColumnName, - FilterValue: filterValue, - DefaultValue: defaultValue, - }) - } + filterCfg, ok := filtersFromEtcd[filter.ColumnName] + if !ok { + return fmt.Errorf("filter '%s' no longer exists in etcd for entity '%s'", filter.ColumnName, payload.Entity) + } + filterValue := filterCfg.FilterValue + defaultValue := filterCfg.DefaultValue + if filter.Condition == enums.FilterCondition(enums.NOT_EQUALS) { + filterValue = filterCfg.DefaultValue + defaultValue = filterCfg.FilterValue + } + criteria = append(criteria, skyeEtcd.Criteria{ + ColumnName: filter.ColumnName, + FilterValue: filterValue, + DefaultValue: defaultValue, + }) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@horizon/internal/skye/handler/skye.go` around lines 605 - 619, The loop that builds criteria skips filters not present in filtersFromEtcd, which allows registering incomplete variants; instead, when iterating payload.FilterConfiguration.Criteria in the handler (where filtersFromEtcd and criteria []skyeEtcd.Criteria are used), detect any missing filter key (i.e., if _, ok := filtersFromEtcd[filter.ColumnName]; !ok) and immediately return/fail the approval with a clear error indicating the missing filter name from the request payload; do this in the same function that currently appends skyeEtcd.Criteria so callers cannot proceed with incomplete criteria.
569-631:⚠️ Potential issue | 🟠 MajorRT partition allocation is race-prone under concurrent approvals.
Lines 569-631 perform non-atomic read/select/register: two concurrent approvals can observe the same free partition (1-256) and attempt to register it simultaneously.
RegisterVariantin the etcd layer (etcd.go:270-330) writes the RTPartition directly viaCreateNodes()with no transactional conflict checks or uniqueness guards, allowing duplicate allocations.To fix: either (1) atomically reserve the partition before returning from approval handler, or (2) add a pre-write uniqueness check in
RegisterVariantusing etcd transactions with conflict detection.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@horizon/internal/skye/handler/skye.go` around lines 569 - 631, The current allocation flow (GetEntities -> build availablePartitions -> pick availablePartitions[rnd] in the handler) is race-prone; either atomically reserve the chosen RT partition before returning from the approval handler or make RegisterVariant (and its CreateNodes call) enforce uniqueness with an etcd transaction; specifically, update the handler logic around availablePartitions/rnd to perform an atomic "reserve" write (e.g., create a reservation key) before calling EtcdConfig.RegisterVariant, or modify EtcdConfig.RegisterVariant / CreateNodes to use an etcd Txn that CAS-checks that the RTPartition key/path does not exist (fail the txn if already present) and return an error so the caller can retry with a new partition.
🧹 Nitpick comments (10)
online-feature-store/internal/config/etcd.go (1)
275-276: Centralize -1 sentinel as a named constant for maintainability.All current callers properly validate the
sequence == -1sentinel after checking for errors, so there is no immediate risk of misuse. However, the magic number-1appears in multiple call sites (features.go:178, retrieve.go:829, 843, 850) without a centralized definition, making the codebase harder to maintain.Suggested refactor
+const SequenceNotFound = -1 ... - return -1, nil + return SequenceNotFound, nilThen update call sites to use
SequenceNotFoundinstead of hardcoded-1.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@online-feature-store/internal/config/etcd.go` around lines 275 - 276, Introduce a named constant (e.g., SequenceNotFound = -1) near the top of the package in online-feature-store/internal/config/etcd.go, replace the literal -1 return in the function that currently logs "feature %s not found..." with SequenceNotFound, and update all call sites that check for sequence == -1 (features.go and retrieve.go at the noted locations) to compare against SequenceNotFound instead of the magic number; ensure the constant is exported or package-visible as appropriate for the callers.horizon/internal/skye/handler/skye.go (1)
557-559: Update stale RTPartition approval comment.Line 557 says admin must provide RTPartition, but this method now auto-assigns it. Please align the comment with actual behavior to avoid operator confusion.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@horizon/internal/skye/handler/skye.go` around lines 557 - 559, Update the outdated comment near the approval check in skye.go: remove or reword the mention that "Admin must provide RTPartition" and instead state that RTPartition is auto-assigned by this method while Admin must provide VectorDBConfig fields (approval.AdminVectorDBConfig.ReadHost, approval.AdminVectorDBConfig.WriteHost, approval.AdminVectorDBConfig.Port) and the RateLimiter; adjust the comment above the conditional that returns fmt.Errorf("admin must provide vector_db_config during variant approval") to reflect current behavior so operators are not misled.online-feature-store/internal/handler/feature/retrieve.go (1)
671-671: Fix misleading error text forGetFeatureGroupfailure.Line 671 reports “active schema” even though this branch is from
GetFeatureGroup. This makes oncall debugging noisy.
Downstream impact: update the matching assertion inonline-feature-store/internal/handler/feature/retrieve_test.goLine 652.✏️ Proposed fix
- return fmt.Errorf("failed to get active schema for feature group %s, in entity %s: %w", fg.FeatureGroupLabel, retrieveData.EntityLabel, err) + return fmt.Errorf("failed to get feature group %s, in entity %s: %w", fg.FeatureGroupLabel, retrieveData.EntityLabel, err)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@online-feature-store/internal/handler/feature/retrieve.go` at line 671, The error message in the GetFeatureGroup failure branch incorrectly refers to "active schema" — update the fmt.Errorf call in retrieve.go (the error returned inside the GetFeatureGroup handling branch) to mention "failed to get feature group" (including fg.FeatureGroupLabel and retrieveData.EntityLabel and wrap err) so the message accurately reflects GetFeatureGroup failure, and then update the matching assertion in retrieve_test.go (the test that asserts the error text around line 652) to expect the new message text.interaction-store/internal/compression/zstd.go (1)
86-113: Redundant type assertions on every branch add fragility.
decodeBufPool.Newalways returns*[]byte, so all threeif p, ok := bufPtr.(*[]byte); okguards (lines 88, 93, 102) unconditionally succeed. If any ever failed, the pool buffer would be silently leaked without anPut. Asserting once at the top and using a direct (panicking) form makes the invariant explicit and eliminates the silent-leak risk.♻️ Proposed simplification
func (d *ZStdDecoder) Decode(cdata []byte) (data []byte, err error) { - bufPtr := decodeBufPool.Get() - var buf []byte - if p, ok := bufPtr.(*[]byte); ok { - buf = *p - } - decoded, err := d.decoder.DecodeAll(cdata, buf[:0]) + bufPtr := decodeBufPool.Get().(*[]byte) + buf := *bufPtr + decoded, err := d.decoder.DecodeAll(cdata, buf[:0]) if err != nil { - if p, ok := bufPtr.(*[]byte); ok { - *p = (*p)[:0] - decodeBufPool.Put(bufPtr) - } + *bufPtr = (*bufPtr)[:0] + decodeBufPool.Put(bufPtr) return nil, err } result := make([]byte, len(decoded)) copy(result, decoded) - if p, ok := bufPtr.(*[]byte); ok { - if cap(decoded) <= decodeBufPoolCap { - *p = (*p)[:0] - decodeBufPool.Put(bufPtr) - } else { - ptr := new([]byte) - *ptr = decoded[:0] - decodeBufPool.Put(ptr) - } + if cap(decoded) <= decodeBufPoolCap { + *bufPtr = (*bufPtr)[:0] + decodeBufPool.Put(bufPtr) + } else { + *bufPtr = (*bufPtr)[:0] + decodeBufPool.Put(bufPtr) + ptr := new([]byte) + *ptr = decoded[:0] + decodeBufPool.Put(ptr) } return result, nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@interaction-store/internal/compression/zstd.go` around lines 86 - 113, The code repeatedly and redundantly type-asserts bufPtr to *([]byte) (three times) even though decodeBufPool.New always returns *([]byte); change this by performing a single explicit type assertion immediately after bufPtr := decodeBufPool.Get() (e.g., p := bufPtr.(*[]byte>) with a direct assertion so it panics if the invariant is violated), then use p for all later operations (setting buf, resetting slices, and decodeBufPool.Put) and remove the three guarded if blocks; keep the existing logic around capacity check using decodeBufPoolCap and the copy of decoded so behavior remains identical while eliminating silent-leak risk.interaction-store/internal/constants/constants.go (1)
11-13: DeriveMaxWeekIndexfromTotalWeeksto avoid drift.This keeps the boundary definition self-consistent if window size changes later.
♻️ Suggested change
- // MaxWeekIndex is the maximum week index (TotalWeeks - 1). - MaxWeekIndex = 23 + // MaxWeekIndex is the maximum week index (TotalWeeks - 1). + MaxWeekIndex = TotalWeeks - 1🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@interaction-store/internal/constants/constants.go` around lines 11 - 13, MaxWeekIndex is hardcoded to 23; change it to be derived from TotalWeeks to prevent drift by computing MaxWeekIndex as TotalWeeks - 1 (use the existing TotalWeeks constant) so boundary values stay consistent—update the constant definition for MaxWeekIndex in interaction-store/internal/constants/constants.go to reference TotalWeeks instead of a magic number.interaction-store/internal/handler/interaction.go (1)
126-149: Remove redundant condition checks in the parallel path.Line [126] already guarantees both types are requested, so the inner
if wantClickandif wantOrderare unnecessary.♻️ Suggested simplification
if wantClick && wantOrder { var clickProto []*timeseries.ClickEvent var orderProto []*timeseries.OrderEvent g := new(errgroup.Group) - if wantClick { - g.Go(func() error { - events, err := h.clickRetrieveHandler.Retrieve(req.UserId, req.StartTimestamp, req.EndTimestamp, req.Limit) - if err != nil { - return err - } - clickProto = h.convertClickEventsToProto(events) - return nil - }) - } - if wantOrder { - g.Go(func() error { - events, err := h.orderRetrieveHandler.Retrieve(req.UserId, req.StartTimestamp, req.EndTimestamp, req.Limit) - if err != nil { - return err - } - orderProto = h.convertOrderEventsToProto(events) - return nil - }) - } + g.Go(func() error { + events, err := h.clickRetrieveHandler.Retrieve(req.UserId, req.StartTimestamp, req.EndTimestamp, req.Limit) + if err != nil { + return err + } + clickProto = h.convertClickEventsToProto(events) + return nil + }) + g.Go(func() error { + events, err := h.orderRetrieveHandler.Retrieve(req.UserId, req.StartTimestamp, req.EndTimestamp, req.Limit) + if err != nil { + return err + } + orderProto = h.convertOrderEventsToProto(events) + return nil + })🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@interaction-store/internal/handler/interaction.go` around lines 126 - 149, The inner conditional checks inside the parallel branch are redundant because the outer if already ensures both wantClick and wantOrder are true; remove the inner if wantClick and if wantOrder and always spawn two goroutines on the errgroup (use g.Go for the click path calling h.clickRetrieveHandler.Retrieve(...), then set clickProto via h.convertClickEventsToProto(...), and a second g.Go for the order path calling h.orderRetrieveHandler.Retrieve(...) and h.convertOrderEventsToProto(...)). Keep existing error handling and return behavior inside each goroutine and leave the errgroup usage (g := new(errgroup.Group)) unchanged.interaction-store/internal/utils/utils_test.go (1)
10-16: Strengthen deterministic assertions to exact expected values.These checks are currently too permissive and may miss regressions.
♻️ Suggested test hardening
func TestWeekFromTimestampMs(t *testing.T) { // Jan 15, 2024 is in ISO week 3 ts := time.Date(2024, 1, 15, 12, 0, 0, 0, time.UTC).UnixMilli() week := WeekFromTimestampMs(ts) - assert.GreaterOrEqual(t, week, 1) - assert.LessOrEqual(t, week, 53) + assert.Equal(t, 3, week) } @@ func TestTimestampDiffInWeeks_25Weeks(t *testing.T) { start := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli() end := time.Date(2024, 6, 24, 0, 0, 0, 0, time.UTC).UnixMilli() // ~25 weeks diff := TimestampDiffInWeeks(start, end) - assert.GreaterOrEqual(t, diff, 24) + assert.Equal(t, 25, diff) }Also applies to: 49-54
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@interaction-store/internal/utils/utils_test.go` around lines 10 - 16, The test TestWeekFromTimestampMs uses permissive range assertions; change it to assert the exact ISO week value for the given timestamp by replacing assert.GreaterOrEqual/assert.LessOrEqual with assert.Equal(t, 3, WeekFromTimestampMs(ts)) (since Jan 15, 2024 is ISO week 3) and apply the same hardening to the related test at the other location (lines 49-54) so both tests assert exact expected week numbers using WeekFromTimestampMs.interaction-store/internal/handler/retrieve/click.go (1)
237-263:breakassumes events are pre-sorted descending — document the invariant.
mergeFilterAndLimitusesbreak(Line 253) whents < startTimestampMs, which is an optimization that relies on each week's events being stored in descending timestamp order. If this invariant is ever violated (e.g., a bug in persist or a data migration), events would silently be dropped from results.Consider adding a brief comment at the function/loop level documenting this precondition.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@interaction-store/internal/handler/retrieve/click.go` around lines 237 - 263, Document the descending-order invariant used by mergeFilterAndLimit: state clearly in the function comment (or immediately above the inner loop `for i := range events` where the `break` on `ts < startTimestampMs` occurs) that each week's `weekToEvents[...]` slice is expected to be pre-sorted in descending timestamp order and that the `break` relies on that precondition; include a short note suggesting that violating this invariant may silently drop older events and that callers/persistence/migrations must preserve the order.interaction-store/internal/handler/retrieve/order.go (1)
61-257: Significant code duplication withclick.go— consider generics or a shared helper.
buildOrderedWeeks,buildTableToFieldsMapping,fetchDataInParallel,deserializeWeeks, andmergeFilterAndLimitare structurally identical betweenorder.goandclick.go, differing only in the event type (FlattenedOrderEventvsClickEvent) and the timestamp accessor (OrderedAtvsClickedAt).With Go generics, you could extract a shared retrieval pipeline parameterized by event type and a timestamp-extraction function. This would eliminate ~200 lines of near-identical code and ensure bug fixes or improvements are applied uniformly.
Not a blocker for this PR, but worth tracking as a follow-up.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@interaction-store/internal/handler/retrieve/order.go` around lines 61 - 257, The two files duplicate the retrieval pipeline (buildOrderedWeeks, buildTableToFieldsMapping, fetchDataInParallel, deserializeWeeks, mergeFilterAndLimit) for Order vs Click differing only by event type and timestamp accessor; refactor by extracting a generic/shared retrieval pipeline (e.g., a parameterized function or generic type) that accepts the event type parameter (FlattenedOrderEvent / ClickEvent), a timestamp-extractor function (OrderedAt / ClickedAt), and the interaction-type constants used in DeserializePSDB (enum.InteractionTypeOrder / enum.InteractionTypeClick), then replace OrderRetrieveHandler and ClickRetrieveHandler implementations to call that shared pipeline so the duplicated logic is centralized and type-specific behavior is passed in as parameters.interaction-store/internal/handler/retrieve/retrieve.go (1)
11-16: Error message hard-codes "24 weeks" instead of using the constant.If
constants.TotalWeeksis ever changed, this error message will become stale and misleading. Consider usingfmt.Errorfwithconstants.TotalWeeksor at least adding a comment linking it to the constant.- ErrTimeRangeExceeded = errors.New("startTimestampMs and endTimestampMs cannot be more than 24 weeks apart") + ErrTimeRangeExceeded = fmt.Errorf("startTimestampMs and endTimestampMs cannot be more than %d weeks apart", constants.TotalWeeks)Note: this would change
ErrTimeRangeExceededfrom a sentinel error to a formatted error. Iferrors.Iscomparison is needed (which the tests do use), keep it as a sentinel but add a comment documenting the coupling toconstants.TotalWeeks.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@interaction-store/internal/handler/retrieve/retrieve.go` around lines 11 - 16, ErrTimeRangeExceeded currently hard-codes "24 weeks" but is coupled to constants.TotalWeeks; keep ErrTimeRangeExceeded as a sentinel (since tests use errors.Is) and add a comment directly above ErrTimeRangeExceeded documenting the coupling to constants.TotalWeeks and instructing maintainers to update the error text if constants.TotalWeeks changes (alternatively, if you decide to make the message dynamic, replace ErrTimeRangeExceeded with a fmt.Errorf(created at runtime) and update any tests that rely on errors.Is accordingly).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@interaction-store/internal/compression/zstd_test.go`:
- Around line 99-109: TestGetDecoder_ZSTD ignores the error returned by
NewZStdEncoder(), which can make enc nil and cause a panic when calling
enc.Encode; update the test to capture and check the error from NewZStdEncoder()
(e.g., call enc, err := NewZStdEncoder() and assert.NoError(t, err) or t.Fatalf
on err) before using enc.Encode so the test fails cleanly instead of panicking.
In `@interaction-store/internal/compression/zstd.go`:
- Around line 106-111: The code path that grows the decoded buffer only puts the
new slice into decodeBufPool and drops the original bufPtr from Get(), causing
an unbalanced Get/Put; update the else branch in the function handling decoded
and bufPtr so you Put back both the original bufPtr and the newly created grown
slice when cap(decoded) > decodeBufPoolCap (use decodeBufPool.Put(bufPtr) and
also Put the new ptr for decoded[:0]), ensuring decodeBufPool receives the
original buffer reference (bufPtr) as well as the grown buffer to keep Get/Put
balanced.
In `@interaction-store/internal/data/scylla/scylla.go`:
- Around line 52-57: The helpers executeRetrieveInteractions and
executeRetrieveMetadata currently return only map[string]interface{} and swallow
iterator errors; change their signatures to return (map[string]interface{},
error), update their internals to return any iter.Scan() or iter.Close() errors
(instead of only logging), and then update callers such as
Scylla.RetrieveInteractions and Scylla.RetrieveMetadata to accept the (map,
error) response and propagate non-nil errors upward (i.e., return nil, err)
rather than always returning (response, nil); ensure all call sites in this file
that use these helpers are adjusted accordingly so database read failures are
surfaced to callers.
In `@interaction-store/internal/handler/retrieve/click.go`:
- Around line 61-93: The buildOrderedWeeks function can emit duplicate week
indices when lowerBound <= upperBound but diffWeeks > constants.TotalWeeks;
change buildOrderedWeeks to defensively handle that case by treating diffWeeks >
constants.TotalWeeks as a full-cycle request: inside buildOrderedWeeks (function
name) add a guard that if diffWeeks > constants.TotalWeeks (or specifically when
lowerBound <= upperBound && diffWeeks > constants.TotalWeeks) then allocate
weeks with capacity constants.TotalWeeks and append every week index once from
constants.MaxWeekIndex down to 0 (same output as the existing full-cycle
branch), and add a brief comment referencing this defensive behavior so callers
understand why the full-cycle branch is used.
In `@online-feature-store/internal/handler/feature/features.go`:
- Around line 173-181: The code calls
blocks.HelperVectorFeatureStringToConcatenatedString(encodedValue,
int(stringLengths[sequence])) after getting sequence from GetSequenceNo; add a
full bounds check for sequence before indexing stringLengths (ensure sequence >=
0 && sequence < len(stringLengths)) and return a clear error if out of range to
avoid panics due to stale schema/config drift; update the block around
GetSequenceNo, the sequence variable check, and the call to
HelperVectorFeatureStringToConcatenatedString accordingly.
In `@online-feature-store/internal/handler/feature/retrieve.go`:
- Line 96: The log call currently prints the full query object via
log.Error().Err(err).Msgf("Error while pre-processing request for query: %v",
query); change this to avoid logging sensitive payloads by removing the full
query from the formatted message and instead log only minimal, non-sensitive
structured fields (e.g., query ID, safe status codes, or a keyed hash of the
user key). Replace the Msgf call with a structured log that attaches Err(err)
plus explicit safe fields (for example .Str("query_id", query.GetId()) or
.Str("key_hash", hashFn(query.GetKey()))) and ensure any user identifiers are
redacted or hashed before logging.
---
Outside diff comments:
In `@horizon/internal/skye/handler/skye.go`:
- Around line 605-619: The loop that builds criteria skips filters not present
in filtersFromEtcd, which allows registering incomplete variants; instead, when
iterating payload.FilterConfiguration.Criteria in the handler (where
filtersFromEtcd and criteria []skyeEtcd.Criteria are used), detect any missing
filter key (i.e., if _, ok := filtersFromEtcd[filter.ColumnName]; !ok) and
immediately return/fail the approval with a clear error indicating the missing
filter name from the request payload; do this in the same function that
currently appends skyeEtcd.Criteria so callers cannot proceed with incomplete
criteria.
- Around line 569-631: The current allocation flow (GetEntities -> build
availablePartitions -> pick availablePartitions[rnd] in the handler) is
race-prone; either atomically reserve the chosen RT partition before returning
from the approval handler or make RegisterVariant (and its CreateNodes call)
enforce uniqueness with an etcd transaction; specifically, update the handler
logic around availablePartitions/rnd to perform an atomic "reserve" write (e.g.,
create a reservation key) before calling EtcdConfig.RegisterVariant, or modify
EtcdConfig.RegisterVariant / CreateNodes to use an etcd Txn that CAS-checks that
the RTPartition key/path does not exist (fail the txn if already present) and
return an error so the caller can retry with a new partition.
---
Nitpick comments:
In `@horizon/internal/skye/handler/skye.go`:
- Around line 557-559: Update the outdated comment near the approval check in
skye.go: remove or reword the mention that "Admin must provide RTPartition" and
instead state that RTPartition is auto-assigned by this method while Admin must
provide VectorDBConfig fields (approval.AdminVectorDBConfig.ReadHost,
approval.AdminVectorDBConfig.WriteHost, approval.AdminVectorDBConfig.Port) and
the RateLimiter; adjust the comment above the conditional that returns
fmt.Errorf("admin must provide vector_db_config during variant approval") to
reflect current behavior so operators are not misled.
In `@interaction-store/internal/compression/zstd.go`:
- Around line 86-113: The code repeatedly and redundantly type-asserts bufPtr to
*([]byte) (three times) even though decodeBufPool.New always returns *([]byte);
change this by performing a single explicit type assertion immediately after
bufPtr := decodeBufPool.Get() (e.g., p := bufPtr.(*[]byte>) with a direct
assertion so it panics if the invariant is violated), then use p for all later
operations (setting buf, resetting slices, and decodeBufPool.Put) and remove the
three guarded if blocks; keep the existing logic around capacity check using
decodeBufPoolCap and the copy of decoded so behavior remains identical while
eliminating silent-leak risk.
In `@interaction-store/internal/constants/constants.go`:
- Around line 11-13: MaxWeekIndex is hardcoded to 23; change it to be derived
from TotalWeeks to prevent drift by computing MaxWeekIndex as TotalWeeks - 1
(use the existing TotalWeeks constant) so boundary values stay consistent—update
the constant definition for MaxWeekIndex in
interaction-store/internal/constants/constants.go to reference TotalWeeks
instead of a magic number.
In `@interaction-store/internal/handler/interaction.go`:
- Around line 126-149: The inner conditional checks inside the parallel branch
are redundant because the outer if already ensures both wantClick and wantOrder
are true; remove the inner if wantClick and if wantOrder and always spawn two
goroutines on the errgroup (use g.Go for the click path calling
h.clickRetrieveHandler.Retrieve(...), then set clickProto via
h.convertClickEventsToProto(...), and a second g.Go for the order path calling
h.orderRetrieveHandler.Retrieve(...) and h.convertOrderEventsToProto(...)). Keep
existing error handling and return behavior inside each goroutine and leave the
errgroup usage (g := new(errgroup.Group)) unchanged.
In `@interaction-store/internal/handler/retrieve/click.go`:
- Around line 237-263: Document the descending-order invariant used by
mergeFilterAndLimit: state clearly in the function comment (or immediately above
the inner loop `for i := range events` where the `break` on `ts <
startTimestampMs` occurs) that each week's `weekToEvents[...]` slice is expected
to be pre-sorted in descending timestamp order and that the `break` relies on
that precondition; include a short note suggesting that violating this invariant
may silently drop older events and that callers/persistence/migrations must
preserve the order.
In `@interaction-store/internal/handler/retrieve/order.go`:
- Around line 61-257: The two files duplicate the retrieval pipeline
(buildOrderedWeeks, buildTableToFieldsMapping, fetchDataInParallel,
deserializeWeeks, mergeFilterAndLimit) for Order vs Click differing only by
event type and timestamp accessor; refactor by extracting a generic/shared
retrieval pipeline (e.g., a parameterized function or generic type) that accepts
the event type parameter (FlattenedOrderEvent / ClickEvent), a
timestamp-extractor function (OrderedAt / ClickedAt), and the interaction-type
constants used in DeserializePSDB (enum.InteractionTypeOrder /
enum.InteractionTypeClick), then replace OrderRetrieveHandler and
ClickRetrieveHandler implementations to call that shared pipeline so the
duplicated logic is centralized and type-specific behavior is passed in as
parameters.
In `@interaction-store/internal/handler/retrieve/retrieve.go`:
- Around line 11-16: ErrTimeRangeExceeded currently hard-codes "24 weeks" but is
coupled to constants.TotalWeeks; keep ErrTimeRangeExceeded as a sentinel (since
tests use errors.Is) and add a comment directly above ErrTimeRangeExceeded
documenting the coupling to constants.TotalWeeks and instructing maintainers to
update the error text if constants.TotalWeeks changes (alternatively, if you
decide to make the message dynamic, replace ErrTimeRangeExceeded with a
fmt.Errorf(created at runtime) and update any tests that rely on errors.Is
accordingly).
In `@interaction-store/internal/utils/utils_test.go`:
- Around line 10-16: The test TestWeekFromTimestampMs uses permissive range
assertions; change it to assert the exact ISO week value for the given timestamp
by replacing assert.GreaterOrEqual/assert.LessOrEqual with assert.Equal(t, 3,
WeekFromTimestampMs(ts)) (since Jan 15, 2024 is ISO week 3) and apply the same
hardening to the related test at the other location (lines 49-54) so both tests
assert exact expected week numbers using WeekFromTimestampMs.
In `@online-feature-store/internal/config/etcd.go`:
- Around line 275-276: Introduce a named constant (e.g., SequenceNotFound = -1)
near the top of the package in online-feature-store/internal/config/etcd.go,
replace the literal -1 return in the function that currently logs "feature %s
not found..." with SequenceNotFound, and update all call sites that check for
sequence == -1 (features.go and retrieve.go at the noted locations) to compare
against SequenceNotFound instead of the magic number; ensure the constant is
exported or package-visible as appropriate for the callers.
In `@online-feature-store/internal/handler/feature/retrieve.go`:
- Line 671: The error message in the GetFeatureGroup failure branch incorrectly
refers to "active schema" — update the fmt.Errorf call in retrieve.go (the error
returned inside the GetFeatureGroup handling branch) to mention "failed to get
feature group" (including fg.FeatureGroupLabel and retrieveData.EntityLabel and
wrap err) so the message accurately reflects GetFeatureGroup failure, and then
update the matching assertion in retrieve_test.go (the test that asserts the
error text around line 652) to expect the new message text.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (24)
go-sdk/pkg/interaction-store/client.gohorizon/internal/skye/handler/skye.gointeraction-store/internal/compression/zstd.gointeraction-store/internal/compression/zstd_test.gointeraction-store/internal/constants/constants.gointeraction-store/internal/data/scylla/scylla.gointeraction-store/internal/data/scylla/scylla_test.gointeraction-store/internal/handler/interaction.gointeraction-store/internal/handler/persist/click.gointeraction-store/internal/handler/persist/click_test.gointeraction-store/internal/handler/persist/order.gointeraction-store/internal/handler/persist/order_test.gointeraction-store/internal/handler/retrieve/click.gointeraction-store/internal/handler/retrieve/click_test.gointeraction-store/internal/handler/retrieve/order.gointeraction-store/internal/handler/retrieve/order_test.gointeraction-store/internal/handler/retrieve/retrieve.gointeraction-store/internal/handler/retrieve/retrieve_test.gointeraction-store/internal/utils/utils_test.goonline-feature-store/internal/config/etcd.goonline-feature-store/internal/handler/feature/features.goonline-feature-store/internal/handler/feature/retrieve.goonline-feature-store/internal/handler/feature/retrieve_test.goonline-feature-store/internal/server/http/server.go
💤 Files with no reviewable changes (1)
- online-feature-store/internal/server/http/server.go
|
Is this PR a bug fix? A bug fix corrects existing broken functionality. Examples of bug fixes: fixing crashes, correcting logic errors, fixing broken UI. Examples of NOT bug fixes: new features, refactoring, cleanup, docs, tests, performance improvements. Respond with JSON only: Important:
|
|
Tip For best results, initiate chat on the files or code changes.
{
"confidence": 0.15,
"reasoning": "This PR is a sync/merge operation containing primarily refactoring (constant extraction, cache key restructuring), performance improvements (buffer pooling, parallel fetching), new features (RT partition selection, concurrent retrieval), test additions, and enhanced error messages. None of the changes clearly fix crashes, correct logic errors, or repair broken functionality. The work is focused on code organization, optimization, and feature enhancement rather than correcting existing broken behavior."
} |
Summary by CodeRabbit
New Features
Improvements
Bug Fixes