Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 32 additions & 16 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"fmt"
"math"
"os"
"slices"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -77,6 +78,17 @@ func indexTokens(ctx context.Context, info *indexMutationInfo) ([]string, error)
return tokens, nil
}

// addDeadNode appends uid to the HNSW dead-node list unless it is already
// present, reporting whether the list changed. The dead set is persisted as a
// single posting that is rewritten in full on every vector delete, so skipping
// already-recorded uids keeps repeated deletes from bloating it with duplicates.
func addDeadNode(deadNodes []uint64, uid uint64) ([]uint64, bool) {
if slices.Contains(deadNodes, uid) {
return deadNodes, false
}
return append(deadNodes, uid), true
}

// addIndexMutations adds mutation(s) for a single term, to maintain the index,
// but only for the given tokenizers.
// TODO - See if we need to pass op as argument as t should already have Op.
Expand Down Expand Up @@ -128,27 +140,31 @@ func (txn *Txn) addIndexMutations(ctx context.Context, info *indexMutationInfo)
}
var deadNodes []uint64
deadData, _ := pl.Value(txn.StartTs)
if deadData.Value == nil {
deadNodes = append(deadNodes, uid)
} else {
if deadData.Value != nil {
deadNodes, err = hnsw.ParseEdges(string(deadData.Value.([]byte)))
if err != nil {
return []*pb.DirectedEdge{}, err
}
deadNodes = append(deadNodes, uid)
}
deadNodesBytes, marshalErr := json.Marshal(deadNodes)
if marshalErr != nil {
return []*pb.DirectedEdge{}, marshalErr
}
edge := &pb.DirectedEdge{
Entity: 1,
Attr: deadAttr,
Value: deadNodesBytes,
ValueType: pb.Posting_ValType(0),
}
if err := pl.addMutation(ctx, txn, edge); err != nil {
return nil, err
// The dead set is a single growing posting. Skip uids already
// recorded as dead so repeated deletes (delete/reinsert churn, or a
// Raft re-apply of the same mutation) don't append duplicates and
// re-marshal the whole blob with no effect.
updated, changed := addDeadNode(deadNodes, uid)
if changed {
deadNodesBytes, marshalErr := json.Marshal(updated)
if marshalErr != nil {
return []*pb.DirectedEdge{}, marshalErr
}
edge := &pb.DirectedEdge{
Entity: 1,
Attr: deadAttr,
Value: deadNodesBytes,
ValueType: pb.Posting_ValType(0),
}
if err := pl.addMutation(ctx, txn, edge); err != nil {
return nil, err
}
}
}

Expand Down
17 changes: 17 additions & 0 deletions posting/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,3 +615,20 @@ func TestNeedsListTypeRebuild(t *testing.T) {
require.False(t, rebuild)
require.Error(t, err)
}

func TestAddDeadNode(t *testing.T) {
// First delete records the uid.
out, changed := addDeadNode(nil, 5)
require.True(t, changed)
require.Equal(t, []uint64{5}, out)

// A distinct uid is appended.
out, changed = addDeadNode([]uint64{5}, 7)
require.True(t, changed)
require.Equal(t, []uint64{5, 7}, out)

// A uid already recorded as dead is a no-op: no change, no growth.
out, changed = addDeadNode([]uint64{5, 7}, 5)
require.False(t, changed)
require.Equal(t, []uint64{5, 7}, out)
}
Loading