Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 95 additions & 36 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"fmt"
"math"
"os"
"slices"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -77,6 +78,80 @@ func indexTokens(ctx context.Context, info *indexMutationInfo) ([]string, error)
return tokens, nil
}

// updateDeadNodes performs an atomic read-modify-write of the predicate's HNSW
// dead-node list (the uids of deleted vectors, stored as a JSON array in a
// single posting at DataKey(deadAttr, 1)). mutate receives the current list and
// returns the new list plus whether it changed; the write is skipped when
// unchanged.
//
// The whole read-modify-write runs under the posting list's write lock. Several
// goroutines can share the same txn and touch this single posting concurrently —
// a multi-edge mutation applies its edges in parallel (worker/draft.go), and the
// index rebuilder streams with 16 goroutines — so without holding the lock
// across the read and the write, concurrent record/clear calls would clobber
// each other (last-writer-wins on the full JSON blob) and silently lose dead
// uids. Holding the lock serializes them, and the read sees the txn's own
// pending updates, so the list accumulates correctly.
func (txn *Txn) updateDeadNodes(ctx context.Context, deadAttr string,
mutate func(deadNodes []uint64) ([]uint64, bool)) error {
pl, err := txn.Get(x.DataKey(deadAttr, 1))
if err != nil {
return err
}
pl.Lock()
defer pl.Unlock()

var deadNodes []uint64
val, err := pl.ValueWithLockHeld(txn.StartTs)
if err != nil && err != ErrNoValue {
return err
}
if err == nil && val.Value != nil {
deadNodes, err = hnsw.ParseEdges(string(val.Value.([]byte)))
if err != nil {
return err
}
}

updated, changed := mutate(deadNodes)
if !changed {
return nil
}
b, err := json.Marshal(updated)
if err != nil {
return err
}
return pl.addMutationInternal(ctx, txn, &pb.DirectedEdge{
Entity: 1,
Attr: deadAttr,
Value: b,
ValueType: pb.Posting_ValType(0),
})
}

// recordDeadNode adds uid to the predicate's dead-node list. It is idempotent:
// a uid already recorded is left as-is, so repeated deletes (or a Raft re-apply)
// don't grow the list with duplicates.
func (txn *Txn) recordDeadNode(ctx context.Context, deadAttr string, uid uint64) error {
return txn.updateDeadNodes(ctx, deadAttr, func(deadNodes []uint64) ([]uint64, bool) {
if slices.Contains(deadNodes, uid) {
return deadNodes, false
}
return append(deadNodes, uid), true
})
}

// clearDeadNode removes uid from the predicate's dead-node list, marking a
// re-inserted vector live again. It is a no-op when the uid is not recorded.
func (txn *Txn) clearDeadNode(ctx context.Context, deadAttr string, uid uint64) error {
return txn.updateDeadNodes(ctx, deadAttr, func(deadNodes []uint64) ([]uint64, bool) {
if !slices.Contains(deadNodes, uid) {
return deadNodes, false
}
return slices.DeleteFunc(deadNodes, func(d uint64) bool { return d == uid }), true
})
}

// addIndexMutations adds mutation(s) for a single term, to maintain the index,
// but only for the given tokenizers.
// TODO - See if we need to pass op as argument as t should already have Op.
Expand Down Expand Up @@ -111,45 +186,21 @@ func (txn *Txn) addIndexMutations(ctx context.Context, info *indexMutationInfo)
return []*pb.DirectedEdge{}, err
}

if info.op == pb.DirectedEdge_DEL &&
len(data) > 0 && data[0].Tid == types.VFloatID {
// TODO look into better alternatives
// The issue here is that we will create dead nodes in the Vector Index
// assuming an HNSW index type. What we should do instead is invoke
// index.Remove(<key to dead index value>). However, we currently do
// not support this in VectorIndex code!!
// if a delete & dealing with vfloats, add this to dead node in persistent store.
// What we should do instead is invoke the factory.Remove(key) operation.
// A vfloat delete must record the uid in the index's dead-node list so
// that removeDeadNodes can strip edges pointing at the now-deleted vector.
//
// We use info.val (the value being deleted, supplied by the caller) rather
// than the re-read `data`: addMutationHelper has already applied the delete
// by this point, so AllValues(txn.StartTs) is empty and the node would
// never be recorded.
//
// TODO: replace this with an index.Remove(<dead key>) call once the
// VectorIndex API supports removal directly.
if info.op == pb.DirectedEdge_DEL && info.val.Tid == types.VFloatID {
deadAttr := hnsw.ConcatStrings(info.edge.Attr, hnsw.VecDead)
deadKey := x.DataKey(deadAttr, 1)
pl, err := txn.Get(deadKey)
if err != nil {
if err := txn.recordDeadNode(ctx, deadAttr, uid); err != nil {
return []*pb.DirectedEdge{}, err
}
var deadNodes []uint64
deadData, _ := pl.Value(txn.StartTs)
if deadData.Value == nil {
deadNodes = append(deadNodes, uid)
} else {
deadNodes, err = hnsw.ParseEdges(string(deadData.Value.([]byte)))
if err != nil {
return []*pb.DirectedEdge{}, err
}
deadNodes = append(deadNodes, uid)
}
deadNodesBytes, marshalErr := json.Marshal(deadNodes)
if marshalErr != nil {
return []*pb.DirectedEdge{}, marshalErr
}
edge := &pb.DirectedEdge{
Entity: 1,
Attr: deadAttr,
Value: deadNodesBytes,
ValueType: pb.Posting_ValType(0),
}
if err := pl.addMutation(ctx, txn, edge); err != nil {
return nil, err
}
}

// TODO: As stated earlier, we need to validate that it is okay to assume
Expand All @@ -170,6 +221,14 @@ func (txn *Txn) addIndexMutations(ctx context.Context, info *indexMutationInfo)
if err != nil {
return []*pb.DirectedEdge{}, err
}
// The uid is alive again: clear it from the dead-node list. Otherwise a
// deleted-then-reinserted (re-embedded) vector stays marked dead, gets
// stripped from every neighbour's edge list by removeDeadNodes, and ends
// up orphaned from the HNSW graph — present but unreachable in search.
deadAttr := hnsw.ConcatStrings(info.edge.Attr, hnsw.VecDead)
if err := txn.clearDeadNode(ctx, deadAttr, uid); err != nil {
return []*pb.DirectedEdge{}, err
}
pbEdges := []*pb.DirectedEdge{}
for _, e := range edges {
pbe := indexEdgeToPbEdge(e)
Expand Down
219 changes: 219 additions & 0 deletions posting/index_vector_dead_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
* SPDX-License-Identifier: Apache-2.0
*/

package posting

import (
"context"
"sync"
"testing"

"github.com/stretchr/testify/require"

"github.com/dgraph-io/dgraph/v25/protos/pb"
"github.com/dgraph-io/dgraph/v25/schema"
"github.com/dgraph-io/dgraph/v25/tok/hnsw"
"github.com/dgraph-io/dgraph/v25/tok/index"
"github.com/dgraph-io/dgraph/v25/types"
"github.com/dgraph-io/dgraph/v25/x"
)

// These tests cover the dead-node lifecycle of the HNSW vector index, which has
// two correctness bugs:
//
// (A) Deleting a vector is supposed to record its uid in the predicate's
// "__vector_dead" list so that removeDeadNodes can strip edges pointing at
// it. But addIndexMutations reads the value from AllValues(StartTs) *after*
// addMutationHelper has already applied the delete, so the read is empty
// and the dead node is never recorded. The old value is available as
// info.val but is ignored.
//
// (B) The dead list is append-only: re-inserting (re-embedding) a previously
// deleted uid never removes it from "__vector_dead". Once (A) is fixed and
// the list is populated, a re-inserted vector would be permanently treated
// as dead — stripped from every neighbour's edge list, leaving it with no
// inbound edges and unreachable from the HNSW entry point (it has an
// embedding but is missing from search results).
//
// Together these tests pin the full lifecycle: delete records, re-insert clears,
// and a deleted-then-reinserted vector stays searchable.

// parseVecSchema registers an HNSW-indexed float32vector predicate for tests.
func parseVecSchema(t *testing.T, pred string) {
require.NoError(t, schema.ParseBytes(
[]byte(pred+`: float32vector @index(hnsw(metric: "euclidean")) .`), 1))
}

// setVector SETs a float32 vector for uid through the full indexed-mutation path
// (AddMutationWithIndex -> addIndexMutations -> HNSW insert).
func setVector(t *testing.T, attr string, uid uint64, vec []float32, startTs, commitTs uint64) {
key := x.DataKey(attr, uid)
l, err := GetNoStore(key, startTs)
require.NoError(t, err)
edge := &pb.DirectedEdge{
Attr: attr,
Entity: uid,
Value: types.FloatArrayAsBytes(vec),
ValueType: pb.Posting_VFLOAT,
}
addMutation(t, l, edge, Set, startTs, commitTs, true)
}

// delVector DELs the vector for uid through the full indexed-mutation path.
func delVector(t *testing.T, attr string, uid uint64, vec []float32, startTs, commitTs uint64) {
key := x.DataKey(attr, uid)
l, err := GetNoStore(key, startTs)
require.NoError(t, err)
edge := &pb.DirectedEdge{
Attr: attr,
Entity: uid,
Value: types.FloatArrayAsBytes(vec),
ValueType: pb.Posting_VFLOAT,
}
addMutation(t, l, edge, Del, startTs, commitTs, true)
}

// readDeadList returns the uids recorded in the predicate's __vector_dead list.
func readDeadList(t *testing.T, attr string, readTs uint64) []uint64 {
deadKey := x.DataKey(hnsw.ConcatStrings(attr, hnsw.VecDead), 1)
l, err := GetNoStore(deadKey, readTs)
require.NoError(t, err)
val, err := l.Value(readTs)
if err == ErrNoValue {
return nil
}
require.NoError(t, err)
b, ok := val.Value.([]byte)
require.True(t, ok, "dead list value should be []byte")
dead, err := hnsw.ParseEdges(string(b))
require.NoError(t, err)
return dead
}

// TestDeletedVectorRecordedAsDead verifies bug (A): deleting a vector must record
// its uid in __vector_dead. The probe in this package confirms the delete itself
// applies (the value is gone), so a missing dead-list entry isolates the
// record-on-delete defect.
func TestDeletedVectorRecordedAsDead(t *testing.T) {
parseVecSchema(t, "vecdead")
attr := x.AttrInRootNamespace("vecdead")

setVector(t, attr, 1, []float32{0, 0}, 1, 2)
setVector(t, attr, 2, []float32{1, 0}, 3, 4)

delVector(t, attr, 2, []float32{1, 0}, 5, 6)

require.Contains(t, readDeadList(t, attr, 7), uint64(2),
"deleting a vector must record its uid in __vector_dead")
}

// TestReinsertedVectorClearedFromDeadList verifies bug (B): once a deleted uid is
// re-inserted, it must be removed from __vector_dead so it is no longer treated
// as dead.
func TestReinsertedVectorClearedFromDeadList(t *testing.T) {
parseVecSchema(t, "vecdead2")
attr := x.AttrInRootNamespace("vecdead2")

setVector(t, attr, 1, []float32{0, 0}, 11, 12)
setVector(t, attr, 2, []float32{1, 0}, 13, 14)

delVector(t, attr, 2, []float32{1, 0}, 15, 16)
require.Contains(t, readDeadList(t, attr, 17), uint64(2),
"precondition: delete should record the uid as dead")

setVector(t, attr, 2, []float32{1, 0}, 18, 19)
require.NotContains(t, readDeadList(t, attr, 20), uint64(2),
"re-inserting a vector must remove its uid from __vector_dead")
}

// TestReinsertedVectorIsSearchable re-verifies the lifecycle end-to-end from the
// user-facing angle: after delete + re-insert, a self-search by the node's own
// vector must return it. If the uid stays in the dead list it is stripped from
// neighbour edge lists, orphaned, and missing from search.
func TestReinsertedVectorIsSearchable(t *testing.T) {
parseVecSchema(t, "vecdead3")
attr := x.AttrInRootNamespace("vecdead3")
ctx := context.Background()

// Entry node first (uid 1), then a small cluster.
setVector(t, attr, 1, []float32{0, 0}, 31, 32)
setVector(t, attr, 2, []float32{10, 0}, 33, 34)
setVector(t, attr, 3, []float32{10, 1}, 35, 36)
setVector(t, attr, 4, []float32{11, 0}, 37, 38)

// Delete then re-insert uid 2.
delVector(t, attr, 2, []float32{10, 0}, 39, 40)
setVector(t, attr, 2, []float32{10, 0}, 41, 42)

readTs := uint64(50)
specs, err := schema.State().FactoryCreateSpec(ctx, attr)
require.NoError(t, err)
require.NotEmpty(t, specs)
indexer, err := specs[0].CreateIndex(attr)
require.NoError(t, err)

lc := NewLocalCache(readTs)
qc := hnsw.NewQueryCache(NewViLocalCache(lc), readTs)
res, err := indexer.SearchWithUid(ctx, qc, 2, 5, index.AcceptAll[float32])
require.NoError(t, err)
require.Contains(t, res, uint64(2),
"a deleted-then-reinserted vector must be reachable in HNSW search")
}

// TestDeadNodeConcurrentRecord guards the read-modify-write of the single
// dead-node posting. A multi-edge mutation applies its edges in parallel
// goroutines sharing one txn (worker/draft.go), all targeting the same
// __vector_dead posting, so an unsynchronized RMW would lose updates. Every
// concurrently-recorded uid must survive.
func TestDeadNodeConcurrentRecord(t *testing.T) {
parseVecSchema(t, "vecdeadc")
attr := x.AttrInRootNamespace("vecdeadc")
deadAttr := hnsw.ConcatStrings(attr, hnsw.VecDead)
ctx := context.Background()

txn := Oracle().RegisterStartTs(100)
const n = 64
var wg sync.WaitGroup
for i := 1; i <= n; i++ {
wg.Add(1)
go func(uid uint64) {
defer wg.Done()
require.NoError(t, txn.recordDeadNode(ctx, deadAttr, uid))
}(uint64(i))
}
wg.Wait()

pl, err := txn.Get(x.DataKey(deadAttr, 1))
require.NoError(t, err)
val, err := pl.Value(100)
require.NoError(t, err)
dead, err := hnsw.ParseEdges(string(val.Value.([]byte)))
require.NoError(t, err)
require.Len(t, dead, n, "concurrent recordDeadNode must not lose updates")
}

// TestDeadNodeSameTxnRecordThenClear verifies the read-modify-write sees the
// txn's own pending updates: recording then clearing the same uid within one
// transaction must leave it absent from the dead list.
func TestDeadNodeSameTxnRecordThenClear(t *testing.T) {
parseVecSchema(t, "vecdeads")
attr := x.AttrInRootNamespace("vecdeads")
deadAttr := hnsw.ConcatStrings(attr, hnsw.VecDead)
ctx := context.Background()

txn := Oracle().RegisterStartTs(200)
require.NoError(t, txn.recordDeadNode(ctx, deadAttr, 7))
require.NoError(t, txn.recordDeadNode(ctx, deadAttr, 8))
require.NoError(t, txn.clearDeadNode(ctx, deadAttr, 7))

pl, err := txn.Get(x.DataKey(deadAttr, 1))
require.NoError(t, err)
val, err := pl.Value(200)
require.NoError(t, err)
dead, err := hnsw.ParseEdges(string(val.Value.([]byte)))
require.NoError(t, err)
require.NotContains(t, dead, uint64(7), "clear must observe the in-txn record")
require.Contains(t, dead, uint64(8))
}
Loading