diff --git a/posting/index.go b/posting/index.go index ae6c3352a44..f5deef2ba76 100644 --- a/posting/index.go +++ b/posting/index.go @@ -14,6 +14,7 @@ import ( "fmt" "math" "os" + "slices" "strings" "sync/atomic" "time" @@ -77,6 +78,17 @@ func indexTokens(ctx context.Context, info *indexMutationInfo) ([]string, error) return tokens, nil } +// addDeadNode appends uid to the HNSW dead-node list unless it is already +// present, reporting whether the list changed. The dead set is persisted as a +// single posting that is rewritten in full on every vector delete, so skipping +// already-recorded uids keeps repeated deletes from bloating it with duplicates. +func addDeadNode(deadNodes []uint64, uid uint64) ([]uint64, bool) { + if slices.Contains(deadNodes, uid) { + return deadNodes, false + } + return append(deadNodes, uid), true +} + // addIndexMutations adds mutation(s) for a single term, to maintain the index, // but only for the given tokenizers. // TODO - See if we need to pass op as argument as t should already have Op. @@ -128,27 +140,31 @@ func (txn *Txn) addIndexMutations(ctx context.Context, info *indexMutationInfo) } var deadNodes []uint64 deadData, _ := pl.Value(txn.StartTs) - if deadData.Value == nil { - deadNodes = append(deadNodes, uid) - } else { + if deadData.Value != nil { deadNodes, err = hnsw.ParseEdges(string(deadData.Value.([]byte))) if err != nil { return []*pb.DirectedEdge{}, err } - deadNodes = append(deadNodes, uid) - } - deadNodesBytes, marshalErr := json.Marshal(deadNodes) - if marshalErr != nil { - return []*pb.DirectedEdge{}, marshalErr - } - edge := &pb.DirectedEdge{ - Entity: 1, - Attr: deadAttr, - Value: deadNodesBytes, - ValueType: pb.Posting_ValType(0), } - if err := pl.addMutation(ctx, txn, edge); err != nil { - return nil, err + // The dead set is a single growing posting. Skip uids already + // recorded as dead so repeated deletes (delete/reinsert churn, or a + // Raft re-apply of the same mutation) don't append duplicates and + // re-marshal the whole blob with no effect. + updated, changed := addDeadNode(deadNodes, uid) + if changed { + deadNodesBytes, marshalErr := json.Marshal(updated) + if marshalErr != nil { + return []*pb.DirectedEdge{}, marshalErr + } + edge := &pb.DirectedEdge{ + Entity: 1, + Attr: deadAttr, + Value: deadNodesBytes, + ValueType: pb.Posting_ValType(0), + } + if err := pl.addMutation(ctx, txn, edge); err != nil { + return nil, err + } } } diff --git a/posting/index_test.go b/posting/index_test.go index 3f75c26fb10..15ab302c118 100644 --- a/posting/index_test.go +++ b/posting/index_test.go @@ -615,3 +615,20 @@ func TestNeedsListTypeRebuild(t *testing.T) { require.False(t, rebuild) require.Error(t, err) } + +func TestAddDeadNode(t *testing.T) { + // First delete records the uid. + out, changed := addDeadNode(nil, 5) + require.True(t, changed) + require.Equal(t, []uint64{5}, out) + + // A distinct uid is appended. + out, changed = addDeadNode([]uint64{5}, 7) + require.True(t, changed) + require.Equal(t, []uint64{5, 7}, out) + + // A uid already recorded as dead is a no-op: no change, no growth. + out, changed = addDeadNode([]uint64{5, 7}, 5) + require.False(t, changed) + require.Equal(t, []uint64{5, 7}, out) +}