From f577ab3419f900383f979f09a7524a6fbfa309e2 Mon Sep 17 00:00:00 2001 From: Shaun Patterson Date: Fri, 19 Jun 2026 11:07:23 -0400 Subject: [PATCH] fix(hnsw): correct the vector dead-node lifecycle (record on delete, clear on re-insert) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The HNSW vector index keeps a per-predicate "dead-node" list (a uid array at DataKey(__vector_dead, 1)) of deleted vectors, which removeDeadNodes uses to strip edges pointing at them. Two correctness bugs: (A) Deletes were never recorded. addIndexMutations read the deleted value from pl.AllValues(txn.StartTs), but addMutationHelper has already applied the delete by then, so the read is empty (len(data)==0) and the dead node is never added. The value being deleted is already passed in as info.val and was ignored. Net effect: the dead list stays empty and removeDeadNodes is a no-op, so edges to deleted vectors are never cleaned up. (B) The list was append-only: re-inserting (re-embedding) a previously deleted uid never cleared it. Once (A) is fixed, a re-inserted vector would stay marked dead, be stripped from every neighbour's edge list, and end up orphaned — present but unreachable in search. This matters in practice: a SET that overwrites an existing vector runs addIndexMutations with op=DEL (old value) then op=SET (new value) in one mutation, so without (B) every in-place re-embed would orphan its node. Fix: - Record on delete using info.val (the value actually being deleted). - Clear on (re-)insert so a revived uid is no longer treated as dead. - Both go through updateDeadNodes, an atomic read-modify-write under the posting's write lock. A multi-edge mutation applies edges in parallel goroutines sharing one txn (worker/draft.go) and the index rebuilder streams with 16 goroutines, all touching this single posting; the previous unlocked delete-path RMW could lose updates under that concurrency. The locked RMW also reads the txn's own pending writes, so a delete+re-insert within one mutation nets out correctly. - recordDeadNode is idempotent (no duplicate growth on repeated deletes / Raft re-apply). Tests: delete records the uid; re-insert clears it; a deleted-then-reinserted vector stays searchable; concurrent records don't lose updates (-race); record-then-clear within one txn nets out. Co-Authored-By: Claude Opus 4.8 (1M context) --- posting/index.go | 131 +++++++++++++----- posting/index_vector_dead_test.go | 219 ++++++++++++++++++++++++++++++ 2 files changed, 314 insertions(+), 36 deletions(-) create mode 100644 posting/index_vector_dead_test.go diff --git a/posting/index.go b/posting/index.go index ae6c3352a44..604e89f9ba8 100644 --- a/posting/index.go +++ b/posting/index.go @@ -14,6 +14,7 @@ import ( "fmt" "math" "os" + "slices" "strings" "sync/atomic" "time" @@ -77,6 +78,80 @@ func indexTokens(ctx context.Context, info *indexMutationInfo) ([]string, error) return tokens, nil } +// updateDeadNodes performs an atomic read-modify-write of the predicate's HNSW +// dead-node list (the uids of deleted vectors, stored as a JSON array in a +// single posting at DataKey(deadAttr, 1)). mutate receives the current list and +// returns the new list plus whether it changed; the write is skipped when +// unchanged. +// +// The whole read-modify-write runs under the posting list's write lock. Several +// goroutines can share the same txn and touch this single posting concurrently — +// a multi-edge mutation applies its edges in parallel (worker/draft.go), and the +// index rebuilder streams with 16 goroutines — so without holding the lock +// across the read and the write, concurrent record/clear calls would clobber +// each other (last-writer-wins on the full JSON blob) and silently lose dead +// uids. Holding the lock serializes them, and the read sees the txn's own +// pending updates, so the list accumulates correctly. +func (txn *Txn) updateDeadNodes(ctx context.Context, deadAttr string, + mutate func(deadNodes []uint64) ([]uint64, bool)) error { + pl, err := txn.Get(x.DataKey(deadAttr, 1)) + if err != nil { + return err + } + pl.Lock() + defer pl.Unlock() + + var deadNodes []uint64 + val, err := pl.ValueWithLockHeld(txn.StartTs) + if err != nil && err != ErrNoValue { + return err + } + if err == nil && val.Value != nil { + deadNodes, err = hnsw.ParseEdges(string(val.Value.([]byte))) + if err != nil { + return err + } + } + + updated, changed := mutate(deadNodes) + if !changed { + return nil + } + b, err := json.Marshal(updated) + if err != nil { + return err + } + return pl.addMutationInternal(ctx, txn, &pb.DirectedEdge{ + Entity: 1, + Attr: deadAttr, + Value: b, + ValueType: pb.Posting_ValType(0), + }) +} + +// recordDeadNode adds uid to the predicate's dead-node list. It is idempotent: +// a uid already recorded is left as-is, so repeated deletes (or a Raft re-apply) +// don't grow the list with duplicates. +func (txn *Txn) recordDeadNode(ctx context.Context, deadAttr string, uid uint64) error { + return txn.updateDeadNodes(ctx, deadAttr, func(deadNodes []uint64) ([]uint64, bool) { + if slices.Contains(deadNodes, uid) { + return deadNodes, false + } + return append(deadNodes, uid), true + }) +} + +// clearDeadNode removes uid from the predicate's dead-node list, marking a +// re-inserted vector live again. It is a no-op when the uid is not recorded. +func (txn *Txn) clearDeadNode(ctx context.Context, deadAttr string, uid uint64) error { + return txn.updateDeadNodes(ctx, deadAttr, func(deadNodes []uint64) ([]uint64, bool) { + if !slices.Contains(deadNodes, uid) { + return deadNodes, false + } + return slices.DeleteFunc(deadNodes, func(d uint64) bool { return d == uid }), true + }) +} + // addIndexMutations adds mutation(s) for a single term, to maintain the index, // but only for the given tokenizers. // TODO - See if we need to pass op as argument as t should already have Op. @@ -111,45 +186,21 @@ func (txn *Txn) addIndexMutations(ctx context.Context, info *indexMutationInfo) return []*pb.DirectedEdge{}, err } - if info.op == pb.DirectedEdge_DEL && - len(data) > 0 && data[0].Tid == types.VFloatID { - // TODO look into better alternatives - // The issue here is that we will create dead nodes in the Vector Index - // assuming an HNSW index type. What we should do instead is invoke - // index.Remove(). However, we currently do - // not support this in VectorIndex code!! - // if a delete & dealing with vfloats, add this to dead node in persistent store. - // What we should do instead is invoke the factory.Remove(key) operation. + // A vfloat delete must record the uid in the index's dead-node list so + // that removeDeadNodes can strip edges pointing at the now-deleted vector. + // + // We use info.val (the value being deleted, supplied by the caller) rather + // than the re-read `data`: addMutationHelper has already applied the delete + // by this point, so AllValues(txn.StartTs) is empty and the node would + // never be recorded. + // + // TODO: replace this with an index.Remove() call once the + // VectorIndex API supports removal directly. + if info.op == pb.DirectedEdge_DEL && info.val.Tid == types.VFloatID { deadAttr := hnsw.ConcatStrings(info.edge.Attr, hnsw.VecDead) - deadKey := x.DataKey(deadAttr, 1) - pl, err := txn.Get(deadKey) - if err != nil { + if err := txn.recordDeadNode(ctx, deadAttr, uid); err != nil { return []*pb.DirectedEdge{}, err } - var deadNodes []uint64 - deadData, _ := pl.Value(txn.StartTs) - if deadData.Value == nil { - deadNodes = append(deadNodes, uid) - } else { - deadNodes, err = hnsw.ParseEdges(string(deadData.Value.([]byte))) - if err != nil { - return []*pb.DirectedEdge{}, err - } - deadNodes = append(deadNodes, uid) - } - deadNodesBytes, marshalErr := json.Marshal(deadNodes) - if marshalErr != nil { - return []*pb.DirectedEdge{}, marshalErr - } - edge := &pb.DirectedEdge{ - Entity: 1, - Attr: deadAttr, - Value: deadNodesBytes, - ValueType: pb.Posting_ValType(0), - } - if err := pl.addMutation(ctx, txn, edge); err != nil { - return nil, err - } } // TODO: As stated earlier, we need to validate that it is okay to assume @@ -170,6 +221,14 @@ func (txn *Txn) addIndexMutations(ctx context.Context, info *indexMutationInfo) if err != nil { return []*pb.DirectedEdge{}, err } + // The uid is alive again: clear it from the dead-node list. Otherwise a + // deleted-then-reinserted (re-embedded) vector stays marked dead, gets + // stripped from every neighbour's edge list by removeDeadNodes, and ends + // up orphaned from the HNSW graph — present but unreachable in search. + deadAttr := hnsw.ConcatStrings(info.edge.Attr, hnsw.VecDead) + if err := txn.clearDeadNode(ctx, deadAttr, uid); err != nil { + return []*pb.DirectedEdge{}, err + } pbEdges := []*pb.DirectedEdge{} for _, e := range edges { pbe := indexEdgeToPbEdge(e) diff --git a/posting/index_vector_dead_test.go b/posting/index_vector_dead_test.go new file mode 100644 index 00000000000..7c9da173460 --- /dev/null +++ b/posting/index_vector_dead_test.go @@ -0,0 +1,219 @@ +/* + * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package posting + +import ( + "context" + "sync" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/dgraph-io/dgraph/v25/protos/pb" + "github.com/dgraph-io/dgraph/v25/schema" + "github.com/dgraph-io/dgraph/v25/tok/hnsw" + "github.com/dgraph-io/dgraph/v25/tok/index" + "github.com/dgraph-io/dgraph/v25/types" + "github.com/dgraph-io/dgraph/v25/x" +) + +// These tests cover the dead-node lifecycle of the HNSW vector index, which has +// two correctness bugs: +// +// (A) Deleting a vector is supposed to record its uid in the predicate's +// "__vector_dead" list so that removeDeadNodes can strip edges pointing at +// it. But addIndexMutations reads the value from AllValues(StartTs) *after* +// addMutationHelper has already applied the delete, so the read is empty +// and the dead node is never recorded. The old value is available as +// info.val but is ignored. +// +// (B) The dead list is append-only: re-inserting (re-embedding) a previously +// deleted uid never removes it from "__vector_dead". Once (A) is fixed and +// the list is populated, a re-inserted vector would be permanently treated +// as dead — stripped from every neighbour's edge list, leaving it with no +// inbound edges and unreachable from the HNSW entry point (it has an +// embedding but is missing from search results). +// +// Together these tests pin the full lifecycle: delete records, re-insert clears, +// and a deleted-then-reinserted vector stays searchable. + +// parseVecSchema registers an HNSW-indexed float32vector predicate for tests. +func parseVecSchema(t *testing.T, pred string) { + require.NoError(t, schema.ParseBytes( + []byte(pred+`: float32vector @index(hnsw(metric: "euclidean")) .`), 1)) +} + +// setVector SETs a float32 vector for uid through the full indexed-mutation path +// (AddMutationWithIndex -> addIndexMutations -> HNSW insert). +func setVector(t *testing.T, attr string, uid uint64, vec []float32, startTs, commitTs uint64) { + key := x.DataKey(attr, uid) + l, err := GetNoStore(key, startTs) + require.NoError(t, err) + edge := &pb.DirectedEdge{ + Attr: attr, + Entity: uid, + Value: types.FloatArrayAsBytes(vec), + ValueType: pb.Posting_VFLOAT, + } + addMutation(t, l, edge, Set, startTs, commitTs, true) +} + +// delVector DELs the vector for uid through the full indexed-mutation path. +func delVector(t *testing.T, attr string, uid uint64, vec []float32, startTs, commitTs uint64) { + key := x.DataKey(attr, uid) + l, err := GetNoStore(key, startTs) + require.NoError(t, err) + edge := &pb.DirectedEdge{ + Attr: attr, + Entity: uid, + Value: types.FloatArrayAsBytes(vec), + ValueType: pb.Posting_VFLOAT, + } + addMutation(t, l, edge, Del, startTs, commitTs, true) +} + +// readDeadList returns the uids recorded in the predicate's __vector_dead list. +func readDeadList(t *testing.T, attr string, readTs uint64) []uint64 { + deadKey := x.DataKey(hnsw.ConcatStrings(attr, hnsw.VecDead), 1) + l, err := GetNoStore(deadKey, readTs) + require.NoError(t, err) + val, err := l.Value(readTs) + if err == ErrNoValue { + return nil + } + require.NoError(t, err) + b, ok := val.Value.([]byte) + require.True(t, ok, "dead list value should be []byte") + dead, err := hnsw.ParseEdges(string(b)) + require.NoError(t, err) + return dead +} + +// TestDeletedVectorRecordedAsDead verifies bug (A): deleting a vector must record +// its uid in __vector_dead. The probe in this package confirms the delete itself +// applies (the value is gone), so a missing dead-list entry isolates the +// record-on-delete defect. +func TestDeletedVectorRecordedAsDead(t *testing.T) { + parseVecSchema(t, "vecdead") + attr := x.AttrInRootNamespace("vecdead") + + setVector(t, attr, 1, []float32{0, 0}, 1, 2) + setVector(t, attr, 2, []float32{1, 0}, 3, 4) + + delVector(t, attr, 2, []float32{1, 0}, 5, 6) + + require.Contains(t, readDeadList(t, attr, 7), uint64(2), + "deleting a vector must record its uid in __vector_dead") +} + +// TestReinsertedVectorClearedFromDeadList verifies bug (B): once a deleted uid is +// re-inserted, it must be removed from __vector_dead so it is no longer treated +// as dead. +func TestReinsertedVectorClearedFromDeadList(t *testing.T) { + parseVecSchema(t, "vecdead2") + attr := x.AttrInRootNamespace("vecdead2") + + setVector(t, attr, 1, []float32{0, 0}, 11, 12) + setVector(t, attr, 2, []float32{1, 0}, 13, 14) + + delVector(t, attr, 2, []float32{1, 0}, 15, 16) + require.Contains(t, readDeadList(t, attr, 17), uint64(2), + "precondition: delete should record the uid as dead") + + setVector(t, attr, 2, []float32{1, 0}, 18, 19) + require.NotContains(t, readDeadList(t, attr, 20), uint64(2), + "re-inserting a vector must remove its uid from __vector_dead") +} + +// TestReinsertedVectorIsSearchable re-verifies the lifecycle end-to-end from the +// user-facing angle: after delete + re-insert, a self-search by the node's own +// vector must return it. If the uid stays in the dead list it is stripped from +// neighbour edge lists, orphaned, and missing from search. +func TestReinsertedVectorIsSearchable(t *testing.T) { + parseVecSchema(t, "vecdead3") + attr := x.AttrInRootNamespace("vecdead3") + ctx := context.Background() + + // Entry node first (uid 1), then a small cluster. + setVector(t, attr, 1, []float32{0, 0}, 31, 32) + setVector(t, attr, 2, []float32{10, 0}, 33, 34) + setVector(t, attr, 3, []float32{10, 1}, 35, 36) + setVector(t, attr, 4, []float32{11, 0}, 37, 38) + + // Delete then re-insert uid 2. + delVector(t, attr, 2, []float32{10, 0}, 39, 40) + setVector(t, attr, 2, []float32{10, 0}, 41, 42) + + readTs := uint64(50) + specs, err := schema.State().FactoryCreateSpec(ctx, attr) + require.NoError(t, err) + require.NotEmpty(t, specs) + indexer, err := specs[0].CreateIndex(attr) + require.NoError(t, err) + + lc := NewLocalCache(readTs) + qc := hnsw.NewQueryCache(NewViLocalCache(lc), readTs) + res, err := indexer.SearchWithUid(ctx, qc, 2, 5, index.AcceptAll[float32]) + require.NoError(t, err) + require.Contains(t, res, uint64(2), + "a deleted-then-reinserted vector must be reachable in HNSW search") +} + +// TestDeadNodeConcurrentRecord guards the read-modify-write of the single +// dead-node posting. A multi-edge mutation applies its edges in parallel +// goroutines sharing one txn (worker/draft.go), all targeting the same +// __vector_dead posting, so an unsynchronized RMW would lose updates. Every +// concurrently-recorded uid must survive. +func TestDeadNodeConcurrentRecord(t *testing.T) { + parseVecSchema(t, "vecdeadc") + attr := x.AttrInRootNamespace("vecdeadc") + deadAttr := hnsw.ConcatStrings(attr, hnsw.VecDead) + ctx := context.Background() + + txn := Oracle().RegisterStartTs(100) + const n = 64 + var wg sync.WaitGroup + for i := 1; i <= n; i++ { + wg.Add(1) + go func(uid uint64) { + defer wg.Done() + require.NoError(t, txn.recordDeadNode(ctx, deadAttr, uid)) + }(uint64(i)) + } + wg.Wait() + + pl, err := txn.Get(x.DataKey(deadAttr, 1)) + require.NoError(t, err) + val, err := pl.Value(100) + require.NoError(t, err) + dead, err := hnsw.ParseEdges(string(val.Value.([]byte))) + require.NoError(t, err) + require.Len(t, dead, n, "concurrent recordDeadNode must not lose updates") +} + +// TestDeadNodeSameTxnRecordThenClear verifies the read-modify-write sees the +// txn's own pending updates: recording then clearing the same uid within one +// transaction must leave it absent from the dead list. +func TestDeadNodeSameTxnRecordThenClear(t *testing.T) { + parseVecSchema(t, "vecdeads") + attr := x.AttrInRootNamespace("vecdeads") + deadAttr := hnsw.ConcatStrings(attr, hnsw.VecDead) + ctx := context.Background() + + txn := Oracle().RegisterStartTs(200) + require.NoError(t, txn.recordDeadNode(ctx, deadAttr, 7)) + require.NoError(t, txn.recordDeadNode(ctx, deadAttr, 8)) + require.NoError(t, txn.clearDeadNode(ctx, deadAttr, 7)) + + pl, err := txn.Get(x.DataKey(deadAttr, 1)) + require.NoError(t, err) + val, err := pl.Value(200) + require.NoError(t, err) + dead, err := hnsw.ParseEdges(string(val.Value.([]byte))) + require.NoError(t, err) + require.NotContains(t, dead, uint64(7), "clear must observe the in-txn record") + require.Contains(t, dead, uint64(8)) +}