diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index 0a20084e303..3832e277478 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -284,6 +284,29 @@ they form a Raft group and provide synchronous replication. "goroutine spin-up cost, so tiny mutations are slightly slower on it; "+ "bulk multi-predicate mutations are faster (crossover ~100 edges in "+ "benchmarks here)."). + Flag("mutations-pipeline-goroutines", + "Global budget of intra-predicate worker goroutines distributed "+ + "across the predicates in a single mutation batch, proportionally to "+ + "each predicate's edge count. A hot/dominant predicate is granted most "+ + "of the budget so its data-write and index passes parallelize; tiny "+ + "predicates get one worker each. 0 (or any value < 2) disables "+ + "intra-predicate parallelism — every predicate runs on a single "+ + "goroutine, exactly the legacy behavior. N>0 is an absolute budget. "+ + "-1 = auto (derive from GOMAXPROCS*fraction, capped by "+ + "edges/min-edges-per-worker). Default 30."). + Flag("mutations-pipeline-goroutines-fraction", + "AUTO mode only (mutations-pipeline-goroutines=-1): fraction of "+ + "GOMAXPROCS one mutation batch may use. The apply phase is serial "+ + "across transactions, so a single apply effectively owns the box; "+ + "default 1.0 uses all cores. Raise above 1.0 to oversubscribe "+ + "(benchmarks show the throughput peak near 2-3x cores) or lower it "+ + "to leave headroom for Badger compaction/queries/GC under heavy "+ + "concurrent read load. Default 1.0."). + Flag("mutations-pipeline-min-edges-per-worker", + "AUTO mode only (mutations-pipeline-goroutines=-1): minimum edges per "+ + "intra-predicate worker the derived budget targets "+ + "(workCap = totalEdges / this). Mirrors the 256-edge DivideAndRule "+ + "rule. Default 256."). String()) } @@ -807,6 +830,9 @@ func run() { enableDetailedMetrics := featureFlagsConf.GetBool("enable-detailed-metrics") x.WorkerConfig.SlowQueryLogThreshold = featureFlagsConf.GetDuration("log-slow-query-threshold") x.WorkerConfig.MutationsPipelineThreshold = int(featureFlagsConf.GetInt64("mutations-pipeline-threshold")) + x.WorkerConfig.MutationsPipelineGoroutines = int(featureFlagsConf.GetInt64("mutations-pipeline-goroutines")) + x.WorkerConfig.MutationsPipelineGoroutinesFraction = featureFlagsConf.GetFloat64("mutations-pipeline-goroutines-fraction") + x.WorkerConfig.MutationsPipelineMinEdgesPerWorker = int(featureFlagsConf.GetInt64("mutations-pipeline-min-edges-per-worker")) x.PrintVersion() glog.Infof("x.Config: %+v", x.Config) diff --git a/posting/index.go b/posting/index.go index cd92c238ad5..78d885ebb89 100644 --- a/posting/index.go +++ b/posting/index.go @@ -14,6 +14,7 @@ import ( "fmt" "math" "os" + "runtime" "sort" "strings" "sync" @@ -106,6 +107,12 @@ type PredicatePipeline struct { edges chan *pb.DirectedEdge wg *sync.WaitGroup errCh chan error + // workers is this predicate's grant from the global intra-predicate budget + // (see allocateWorkers). 0 means the budget is disabled/unset: data-write + // stays serial and InsertTokenizerIndexes uses the legacy numGo=10 — exactly + // the pre-budget behavior. A value k>=1 splits the merge-light passes + // (ProcessSingle's data-write, index tokenization) into k goroutines. + workers int } func (pp *PredicatePipeline) close() { @@ -237,7 +244,15 @@ func (mp *MutationPipeline) InsertTokenizerIndexes(ctx context.Context, pipeline } } + // Fold the historically-fixed 10-way index tokenization into the global + // budget. When the budget is disabled (pipeline.workers == 0) this stays at + // the legacy numGo=10, so behavior is byte-identical to before. Index + // tokenization is a low-fan-in pass (key ), so parallelizing it + // is safe; the cache-locked merge below still serializes the final write. numGo := 10 + if pipeline.workers > 0 { + numGo = pipeline.workers + } wg.Add(numGo) chMap := make(map[int]chan uint64) @@ -397,22 +412,145 @@ func (mp *MutationPipeline) ProcessList(ctx context.Context, pipeline *Predicate dataKey := x.DataKey(pipeline.attr, 0) baseKey := string(dataKey[:len(dataKey)-8]) // Avoid repeated conversion - for uid, pl := range postings { + // writeListDataUid emits the forward data-write delta for a single source uid. + // The forward key is one-to-one (each source uid is written + // exactly once), so — exactly as ProcessSingle's writeDataUid — it is the safe + // beneficiary of intra-predicate parallelism: when this predicate is granted + // k>1 workers the source-uid space is partitioned into disjoint contiguous + // ranges and each range runs this on its own goroutine. scratchKey MUST be + // per-worker: the legacy loop reused a single dataKey buffer, which would be a + // data race across goroutines. store is the locked AddDelta on the serial path + // and the lock-free AddDeltaConcurrent wrapper on the parallel path; for a + // given distinct key both write identical bytes and return the same deduped + // list, so the result (data bytes and conflict-key set) is store-order-independent. + // Lang predicates ride the SAME path: all language postings for one entity fold + // into that entity's single data key (one-to-one by source + // entity), so the disjointness invariant holds for them too. + writeListDataUid := func(uid uint64, pl *pb.PostingList, scratchKey []byte, + store func(key string, input *pb.PostingList) (*pb.PostingList, error)) error { if len(pl.Postings) == 0 { - continue + return nil } - binary.BigEndian.PutUint64(dataKey[len(dataKey)-8:], uid) - if newPl, err := mp.txn.AddDelta(baseKey+string(dataKey[len(dataKey)-8:]), pl, info.isUid, true); err != nil { + binary.BigEndian.PutUint64(scratchKey[len(scratchKey)-8:], uid) + key := baseKey + string(scratchKey[len(scratchKey)-8:]) + + newPl, err := store(key, pl) + if err != nil { return err - } else { - if !info.noConflict { - mp.txn.addConflictKeyWithUid(dataKey, newPl, info.hasUpsert, info.noConflict) + } + if !info.noConflict { + mp.txn.addConflictKeyWithUid(scratchKey, newPl, info.hasUpsert, info.noConflict) + } + return nil + } + + // Serial path (budget disabled or a one-worker grant): byte-for-byte the + // pre-budget loop, reusing the single dataKey scratch buffer and the locked + // AddDelta(..., info.isUid, true). + if pipeline.workers <= 1 { + serialStore := func(k string, in *pb.PostingList) (*pb.PostingList, error) { + return mp.txn.AddDelta(k, in, info.isUid, true) + } + for uid, pl := range postings { + if err := writeListDataUid(uid, pl, dataKey, serialStore); err != nil { + return err } } + return nil } - return nil + // Parallel path: split the disjoint forward keys across k + // goroutines. Each worker owns a contiguous, non-overlapping slice of the + // source-uid snapshot, so no two workers ever write the same data key + // (invariant I1) — that disjointness is what makes the lock-free + // AddDeltaConcurrent safe. ProcessReverse / the index / count passes have + // already run serially above; all workers join here (wg.Wait) before + // ProcessList returns, so the legacy MVCC ordering — every delta in place + // before the predicate goroutine closes — is preserved. + // + // concStore reproduces AddDelta(..., info.isUid, true)'s committed bytes + // without taking the global cache lock. Two behaviors must be matched exactly: + // + // - addToList=true: WITHIN one ProcessList call each source uid is written + // once, but a transaction may apply several mutation proposals on the SAME + // *Txn (worker/draft.go RegisterStartTs reuses it, and txn.Update between + // proposals does not clear txn.cache.deltas), so an EARLIER proposal may + // have left a delta for this key. It must be prepended, exactly as the + // serial AddDelta does — otherwise a later proposal silently drops the + // earlier list/lang postings (forward-list data loss and a forward/reverse + // mismatch, since ProcessReverse stays serial and does merge across + // proposals). + // - info.isUid sort/dedup, applied to the merged list. + // + // The prior delta is read from the sharded, per-key-locked deltas map ONLY — + // never (*Deltas).Get/GetBytes, which also read the plain-Go indexMap that is + // only safe under cache.Lock (another predicate's InsertTokenizerIndexes may be + // writing it concurrently). A data key never appears in indexMap, so for this + // key the sharded read returns byte-for-byte what serial AddDelta observes. The + // disjoint-partition guarantee (I1) makes this read-merge-write single-writer + // per key, so it is safe without the global lock. + concStore := func(k string, in *pb.PostingList) (*pb.PostingList, error) { + merged := new(pb.PostingList) + if prevBytes, ok := mp.txn.cache.deltas.deltas.Get(k); ok { + prev := &pb.PostingList{} + if err := proto.Unmarshal(prevBytes, prev); err != nil { + glog.Errorf("Error unmarshalling prior delta for key %x: %v", k, err) + return nil, err + } + merged.Postings = append(merged.Postings, prev.Postings...) + } + merged.Postings = append(merged.Postings, in.Postings...) + if info.isUid { + merged.Postings = SortAndDedupPostings(merged.Postings) + } + if err := mp.txn.AddDeltaConcurrent(k, merged); err != nil { + return nil, err + } + return merged, nil + } + + uidList := make([]uint64, 0, len(postings)) + for uid := range postings { + uidList = append(uidList, uid) + } + if len(uidList) == 0 { + return nil + } + k := pipeline.workers + if k > len(uidList) { + k = len(uidList) + } + width := (len(uidList) + k - 1) / k // ceil(len/k), DivideAndRule-style + + var wg sync.WaitGroup + var once sync.Once + var firstErr error + for w := 0; w < k; w++ { + start := w * width + if start >= len(uidList) { + break + } + end := start + width + if end > len(uidList) { + end = len(uidList) + } + wg.Add(1) + go func(sub []uint64) { + defer wg.Done() + // Per-worker scratch buffer: sharing dataKey across goroutines races. + localKey := x.DataKey(pipeline.attr, 0) + for _, uid := range sub { + if err := writeListDataUid(uid, postings[uid], localKey, + concStore); err != nil { + once.Do(func() { firstErr = err }) + return + } + } + }(uidList[start:end]) + } + wg.Wait() + return firstErr } func findSingleValueInPostingList(pb *pb.PostingList) *pb.Posting { @@ -884,24 +1022,34 @@ func (mp *MutationPipeline) ProcessSingle(ctx context.Context, pipeline *Predica // is removed, mirroring the legacy isSingleUidUpdate explicit DEL. stripSyntheticDel := info.index - for uid, pl := range postings { + // writeDataUid emits the data-write delta for a single source uid. The key + // is one-to-one with no fan-in, so it is the primary (and safest) + // beneficiary of intra-predicate parallelism: when this predicate is granted + // k>1 workers the uid space is partitioned into disjoint contiguous ranges + // and each range runs this on its own goroutine. scratchKey MUST be + // per-worker — the legacy loop reused a single dataKey buffer, which would be + // a data race across goroutines. store is AddDelta on the serial path and the + // lock-free AddDeltaConcurrent on the parallel path; for a given distinct key + // both write identical bytes, so the result is store-order-independent. + writeDataUid := func(uid uint64, pl *pb.PostingList, scratchKey []byte, + store func(string, *pb.PostingList) error) error { // An empty PostingList means nothing was accumulated for this uid (e.g. // a DEL whose value did not match the committed value, or a same-value // SET-after-handleOldDeleteForSingle no-op). Writing an empty delta - // would call AddDelta → setMutation on the cached List with empty bytes, - // which resets the mutable layer's currentEntries to empty for this - // txn's startTs. A subsequent read through the same cached List then - // surfaces an empty data list. CommitToDisk skips empty deltas at the - // badger level, but the in-memory side effect on the cached List has - // already corrupted any concurrent reader holding that List. Skip the - // no-op uid entirely — matches the legacy per-edge path which simply - // does nothing when a DEL value does not match committed. + // would call setMutation on the cached List with empty bytes, which + // resets the mutable layer's currentEntries to empty for this txn's + // startTs. A subsequent read through the same cached List then surfaces + // an empty data list. CommitToDisk skips empty deltas at the badger + // level, but the in-memory side effect on the cached List has already + // corrupted any concurrent reader holding that List. Skip the no-op uid + // entirely — matches the legacy per-edge path which simply does nothing + // when a DEL value does not match committed. if len(pl.Postings) == 0 { - continue + return nil } - binary.BigEndian.PutUint64(dataKey[len(dataKey)-8:], uid) - key := baseKey + string(dataKey[len(dataKey)-8:]) + binary.BigEndian.PutUint64(scratchKey[len(scratchKey)-8:], uid) + key := baseKey + string(scratchKey[len(scratchKey)-8:]) if !info.noConflict { mp.txn.addConflictKey(farm.Fingerprint64([]byte(key))) @@ -928,12 +1076,73 @@ func (mp *MutationPipeline) ProcessSingle(ctx context.Context, pipeline *Predica } } - if _, err := mp.txn.AddDelta(key, writePl, false, false); err != nil { - return err + return store(key, writePl) + } + + // Serial path (budget disabled or a one-worker grant): byte-for-byte the + // pre-budget loop, reusing the single dataKey scratch buffer and the locked + // AddDelta. + if pipeline.workers <= 1 { + for uid, pl := range postings { + if err := writeDataUid(uid, pl, dataKey, + func(k string, wpl *pb.PostingList) error { + _, err := mp.txn.AddDelta(k, wpl, false, false) + return err + }); err != nil { + return err + } } + return nil } - return nil + // Parallel path: split the disjoint keys across k goroutines. Each + // worker owns a contiguous, non-overlapping slice of the uid snapshot, so no + // two workers ever write the same data key (invariant I1) — that disjointness + // is what makes the lock-free AddDeltaConcurrent safe. All workers join here + // (via wg.Wait) before ProcessSingle returns, so the MVCC ordering of the + // legacy path — every delta in place before the predicate goroutine closes — + // is preserved. + uidList := make([]uint64, 0, len(postings)) + for uid := range postings { + uidList = append(uidList, uid) + } + if len(uidList) == 0 { + return nil + } + k := pipeline.workers + if k > len(uidList) { + k = len(uidList) + } + width := (len(uidList) + k - 1) / k // ceil(len/k), DivideAndRule-style + + var wg sync.WaitGroup + var once sync.Once + var firstErr error + for w := 0; w < k; w++ { + start := w * width + if start >= len(uidList) { + break + } + end := start + width + if end > len(uidList) { + end = len(uidList) + } + wg.Add(1) + go func(sub []uint64) { + defer wg.Done() + // Per-worker scratch buffer: sharing dataKey across goroutines races. + localKey := x.DataKey(pipeline.attr, 0) + for _, uid := range sub { + if err := writeDataUid(uid, postings[uid], localKey, + mp.txn.AddDeltaConcurrent); err != nil { + once.Do(func() { firstErr = err }) + return + } + } + }(uidList[start:end]) + } + wg.Wait() + return firstErr } func runMutation(ctx context.Context, edge *pb.DirectedEdge, txn *Txn) error { @@ -1142,6 +1351,167 @@ func ValidateAndConvert(edge *pb.DirectedEdge, su *pb.SchemaUpdate) error { return nil } +// allocateWorkers distributes a fixed global goroutine budget across the +// predicates present in one mutation batch, proportionally to each predicate's +// edge count, so a hot/dominant predicate is granted most of the budget while +// tiny predicates get one worker each. The grant feeds back into each +// PredicatePipeline.workers and controls the within-predicate split of the +// merge-light passes (data-write, index tokenization). +// +// edgeCounts maps predicate attr -> number of (non-star-delete) edges for that +// predicate in the batch; budget is x.WorkerConfig.MutationsPipelineGoroutines. +// Returns nil when the budget is disabled (budget < 2) or there are no +// predicates. A nil result — and any attr missing from the result — is treated +// by callers as workers=0, i.e. the legacy one-goroutine-per-predicate path. +// +// Apportionment is Hamilton / largest-remainder with a floor of 1: +// - If P := len(edgeCounts) >= budget, the batch already saturates the budget +// with one worker each, so every predicate gets exactly 1. +// - Otherwise (P < budget), with E := sum(e_p): +// quota_p = budget * e_p / E (real-valued ideal share) +// workers_p = max(1, floor(quota_p)) (floor, but never below 1) +// deficit = budget - sum(workers_p) +// If deficit > 0, hand the leftover units out one at a time to the predicates +// with the largest fractional remainder (quota_p - floor(quota_p)). If +// deficit < 0 (the floor-of-1 pushed the sum over budget because many tiny +// predicates rounded up from 0), reclaim units one at a time from predicates +// with workers_p > 1, smallest remainder first. Ties broken by attr string +// (ascending) for determinism. +// +// Invariants: every workers_p >= 1; no predicate exceeds budget; and when +// P < budget, sum(workers_p) == budget. +func allocateWorkers(edgeCounts map[string]int, budget int) map[string]int { + P := len(edgeCounts) + if budget < 2 || P == 0 { + return nil + } + + // Deterministic predicate order so the remainder tie-break is stable. + attrs := make([]string, 0, P) + total := 0 + for attr, c := range edgeCounts { + attrs = append(attrs, attr) + total += c + } + sort.Strings(attrs) + + out := make(map[string]int, P) + + // Batch saturates (or oversaturates) the budget: one worker each. This + // documents the apportionment edge case where there are at least as many + // predicates as the budget — there is nothing left to split with. + if P >= budget || total == 0 { + for _, attr := range attrs { + out[attr] = 1 + } + return out + } + + type rem struct { + attr string + remainder float64 + } + rems := make([]rem, 0, P) + assigned := 0 + for _, attr := range attrs { + quota := float64(budget) * float64(edgeCounts[attr]) / float64(total) + w := int(math.Floor(quota)) + if w < 1 { + w = 1 + } + out[attr] = w + assigned += w + rems = append(rems, rem{attr: attr, remainder: quota - math.Floor(quota)}) + } + + deficit := budget - assigned + switch { + case deficit > 0: + // Distribute leftover units to the largest fractional remainders first. + // deficit < P here, so there are always enough predicates to receive them. + sort.SliceStable(rems, func(i, j int) bool { + if rems[i].remainder != rems[j].remainder { + return rems[i].remainder > rems[j].remainder + } + return rems[i].attr < rems[j].attr + }) + for i := 0; i < len(rems) && deficit > 0; i++ { + out[rems[i].attr]++ + deficit-- + } + case deficit < 0: + // The floor-of-1 oversubscribed the budget. Reclaim units from predicates + // with more than one worker, smallest remainder first, so the hot + // predicate keeps the most. Since P < budget, the excess above the + // per-predicate floor always exceeds the deficit, so this terminates with + // sum == budget. + sort.SliceStable(rems, func(i, j int) bool { + if rems[i].remainder != rems[j].remainder { + return rems[i].remainder < rems[j].remainder + } + return rems[i].attr < rems[j].attr + }) + for deficit < 0 { + progressed := false + for i := 0; i < len(rems) && deficit < 0; i++ { + if out[rems[i].attr] > 1 { + out[rems[i].attr]-- + deficit++ + progressed = true + } + } + if !progressed { + // Defensive: cannot happen when P < budget. Avoid an infinite loop. + break + } + } + } + + return out +} + +// mutationsPipelineGoroutinesAuto is the sentinel value of +// x.WorkerConfig.MutationsPipelineGoroutines that selects AUTO mode: instead of a +// fixed absolute budget, the per-batch budget is derived at runtime from the +// machine and the batch size by autoBudget. 0 still disables the budget and any +// N>0 is still an absolute budget. +const mutationsPipelineGoroutinesAuto = -1 + +// autoBudget derives the per-batch intra-predicate goroutine budget at runtime +// (AUTO mode, mutations-pipeline-goroutines=-1). It is a pure, deterministic +// function of its inputs — independent of the real GOMAXPROCS — so the formula +// can be unit-tested directly. +// +// machineCap = max(1, round(gomaxprocs * fraction)) +// workCap = max(1, totalEdges / minEdgesPerWorker) +// budget = min(machineCap, workCap) +// +// fraction is < 1 by default (0.5) on purpose: the apply phase is serial across +// transactions, so a single apply effectively owns the whole box, and we want to +// leave headroom for Badger compaction, concurrent queries, and the GC rather +// than pin every core to one mutation. workCap mirrors x.DivideAndRule's 256-edge +// rule — there is no point spinning up a worker per <256 edges. A computed budget +// of 1 (or anything < 2) feeds into allocateWorkers, which returns nil for +// budget < 2, so the caller transparently falls back to the legacy +// one-goroutine-per-predicate path. +func autoBudget(gomaxprocs, totalEdges int, fraction float64, minEdgesPerWorker int) int { + machineCap := int(math.Round(float64(gomaxprocs) * fraction)) + if machineCap < 1 { + machineCap = 1 + } + if minEdgesPerWorker < 1 { + minEdgesPerWorker = 1 // defensive: avoid divide-by-zero on misconfig + } + workCap := totalEdges / minEdgesPerWorker + if workCap < 1 { + workCap = 1 + } + if machineCap < workCap { + return machineCap + } + return workCap +} + // Process is the entry point for the mutation pipeline. It fans out one goroutine per // predicate so mutations for different predicates are parallelized, while all edges // for the same predicate are serialized through a single channel to a single goroutine. @@ -1172,6 +1542,35 @@ func (mp *MutationPipeline) Process(ctx context.Context, edges []*pb.DirectedEdg // build a star-delete won't generate the index Del entries for the prior // value, leaving stale uids in the index permanently. ctx = schema.GetWriteContext(ctx) + + // Pre-pass: count edges per predicate so the global goroutine budget can be + // apportioned proportionally before any predicate goroutine starts. Star-delete + // edges are excluded — they are handled inline below and never reach a + // predicate goroutine. allocateWorkers returns nil when the budget is disabled + // (budget < 2), in which case every pred.workers stays 0 and the pipeline + // behaves exactly as the legacy one-goroutine-per-predicate path. + edgeCounts := make(map[string]int) + totalEdges := 0 + for _, edge := range edges { + if edge.Op == pb.DirectedEdge_DEL && string(edge.Value) == x.Star { + continue + } + edgeCounts[edge.Attr]++ + totalEdges++ + } + // Resolve the budget. 0 (or any value < 2) disables the pipeline budget; N>0 is + // an absolute budget; the -1 sentinel selects AUTO, where the budget is derived + // once per call from GOMAXPROCS and the batch size before apportionment. AUTO + // only chooses the integer fed into allocateWorkers — the apportionment and the + // whole downstream path are identical to a fixed budget of that value. + budget := x.WorkerConfig.MutationsPipelineGoroutines + if budget == mutationsPipelineGoroutinesAuto { + budget = autoBudget(runtime.GOMAXPROCS(0), totalEdges, + x.WorkerConfig.MutationsPipelineGoroutinesFraction, + x.WorkerConfig.MutationsPipelineMinEdgesPerWorker) + } + workerAlloc := allocateWorkers(edgeCounts, budget) + predicates := map[string]*PredicatePipeline{} var wg sync.WaitGroup numWg := 0 @@ -1194,9 +1593,10 @@ sendLoop: pred, ok := predicates[edge.Attr] if !ok { pred = &PredicatePipeline{ - attr: edge.Attr, - edges: make(chan *pb.DirectedEdge, 1000), - wg: &wg, + attr: edge.Attr, + edges: make(chan *pb.DirectedEdge, 1000), + wg: &wg, + workers: workerAlloc[edge.Attr], // 0 when the budget is disabled } predicates[edge.Attr] = pred wg.Add(1) diff --git a/posting/oracle.go b/posting/oracle.go index 63526608146..e23137bed9b 100644 --- a/posting/oracle.go +++ b/posting/oracle.go @@ -128,6 +128,43 @@ func (txn *Txn) AddDelta(key string, input *pb.PostingList, doSortAndDedup bool, return pl, nil } +// AddDeltaConcurrent is the lock-free sibling of AddDelta for the merge-light +// data-write pass (ProcessSingle), which writes one-to-one keys with +// the addToList=false, doSortAndDedup=false shape. Because each such key is +// written by exactly one worker (disjoint uid sub-ranges within a predicate, and +// distinct key prefixes across predicates), there is no read-modify-write on the +// delta map for a given key, so the global txn.cache.Lock() that AddDelta takes +// can be skipped: +// +// - proto.Marshal runs outside any cache lock — this is the parallel win. +// - txn.cache.deltas.AddToDeltas stores into the sharded, per-key-locked delta +// map; distinct keys hit distinct shards, so concurrent stores are safe. +// - guarantee (c) of AddDelta is preserved: if a List for this key is already +// cached, the delta is pushed into it via setMutation (which self-locks the +// List) so any holder of that List observes the write. plists is read under +// cache.RLock to stay consistent with cache.Lock writers elsewhere. +// +// When no List is cached, skipping setMutation is correct: a later reader +// rebuilds the List via getInternal and re-applies this delta from lc.deltas. +// Callers needing addToList or sort/dedup must keep using the locked AddDelta. +func (txn *Txn) AddDeltaConcurrent(key string, input *pb.PostingList) error { + newPl, err := proto.Marshal(input) + if err != nil { + glog.Errorf("Error marshalling posting list: %v", err) + return err + } + + txn.cache.deltas.AddToDeltas(key, newPl) + + txn.cache.RLock() + list, listOk := txn.cache.plists[key] + txn.cache.RUnlock() + if listOk { + list.setMutation(txn.StartTs, newPl) + } + return nil +} + func (txn *Txn) LockCache() { txn.cache.Lock() } diff --git a/posting/pipeline_budget_test.go b/posting/pipeline_budget_test.go new file mode 100644 index 00000000000..c18240dd73d --- /dev/null +++ b/posting/pipeline_budget_test.go @@ -0,0 +1,838 @@ +/* + * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package posting + +// Tests and benchmark for the proportional intra-predicate goroutine budget +// (mutations-pipeline-goroutines). The budget lets a hot/dominant predicate use +// more than one goroutine for its merge-light passes (data-write, index +// tokenization) while staying byte-identical to the legacy one-goroutine path. + +import ( + "context" + "fmt" + "sort" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/dgraph-io/dgraph/v25/protos/pb" + "github.com/dgraph-io/dgraph/v25/schema" + "github.com/dgraph-io/dgraph/v25/x" +) + +// Timestamp bands for budget tests (kept clear of the existing pipeline_test.go +// bands documented there, which top out at 2599): +// Byte-identical, budget=0: 5000-5099 +// Byte-identical, budget=30: 5100-5199 +// Auto byte-identical: 5200-5299 +// ProcessList byte-identical: 5300-5399 +// ProcessList cross-proposal: 5400-5499 + +// buildSkewedBatch produces a deterministic, predicate-skewed mutation batch: +// one dominant scalar predicate (nameAttr) with nameCount edges, plus two small +// predicates (deptAttr scalar, friendAttr uid). Entities live in disjoint uid +// ranges per predicate. name values repeat across a small set so the index has +// multi-uid buckets; friend targets repeat so reverse lists have multiple +// sources. Reused by both the byte-identical test and the benchmark. +// cloneEdges returns a fresh copy of each edge struct. The pipeline mutates an +// edge's ValueId in place (setting the scalar-value marker for scalar postings), +// which is fine in production where every transaction gets fresh edges from the +// proto, but breaks tests/benchmarks that feed the same slice to Process twice. +func cloneEdges(in []*pb.DirectedEdge) []*pb.DirectedEdge { + out := make([]*pb.DirectedEdge, len(in)) + for i, e := range in { + c := *e + out[i] = &c + } + return out +} + +func buildSkewedBatch(nameAttr, deptAttr, friendAttr string, nameCount int) []*pb.DirectedEdge { + const ( + nameBase = uint64(1_000_000) + deptBase = uint64(2_000_000) + friendBase = uint64(3_000_000) + targetBase = uint64(4_000_000) + smallCount = 10 + nameDistinct = 50 + deptDistinct = 3 + friendTargets = 4 + ) + edges := make([]*pb.DirectedEdge, 0, nameCount+2*smallCount) + for i := 0; i < nameCount; i++ { + edges = append(edges, &pb.DirectedEdge{ + Entity: nameBase + uint64(i), + Attr: nameAttr, + Value: []byte(fmt.Sprintf("name%d", i%nameDistinct)), + ValueType: pb.Posting_STRING, + Op: pb.DirectedEdge_SET, + }) + } + for i := 0; i < smallCount; i++ { + edges = append(edges, &pb.DirectedEdge{ + Entity: deptBase + uint64(i), + Attr: deptAttr, + Value: []byte(fmt.Sprintf("dept%d", i%deptDistinct)), + ValueType: pb.Posting_STRING, + Op: pb.DirectedEdge_SET, + }) + } + for i := 0; i < smallCount; i++ { + edges = append(edges, &pb.DirectedEdge{ + Entity: friendBase + uint64(i), + Attr: friendAttr, + ValueId: targetBase + uint64(i%friendTargets), + Op: pb.DirectedEdge_SET, + }) + } + return edges +} + +// buildLangEdges produces langs language postings for each of n distinct +// entities of a `string @lang` predicate. Each (entity, lang) pair is a distinct +// posting (Uid = fingerprint(lang)) but all fold into the entity's single +// forward data key — so the lang forward-write is one-to-one by +// source entity, the disjointness the ProcessList parallel split relies on. +func buildLangEdges(attr string, n, langs int) []*pb.DirectedEdge { + const base = uint64(5_000_000) + langCodes := []string{"en", "de", "fr", "es", "it"} + if langs > len(langCodes) { + langs = len(langCodes) + } + edges := make([]*pb.DirectedEdge, 0, n*langs) + for i := 0; i < n; i++ { + for l := 0; l < langs; l++ { + edges = append(edges, &pb.DirectedEdge{ + Entity: base + uint64(i), + Attr: attr, + Value: []byte(fmt.Sprintf("bio%d-%s", i, langCodes[l])), + ValueType: pb.Posting_STRING, + Lang: langCodes[l], + Op: pb.DirectedEdge_SET, + }) + } + } + return edges +} + +func sortedUint64(in []uint64) []uint64 { + out := append([]uint64(nil), in...) + sort.Slice(out, func(i, j int) bool { return out[i] < out[j] }) + return out +} + +// runBudgetBatch sets the global budget, runs the batch through a fresh pipeline +// txn, snapshots the conflict-key set produced (before commit, which does not +// clear it), commits to Badger, and returns the snapshot. The budget is restored +// on return so other tests still see the disabled (zero-value) default. +func runBudgetBatch(t *testing.T, budget int, startTs, commitTs uint64, + edges []*pb.DirectedEdge) map[uint64]struct{} { + t.Helper() + old := x.WorkerConfig.MutationsPipelineGoroutines + x.WorkerConfig.MutationsPipelineGoroutines = budget + defer func() { x.WorkerConfig.MutationsPipelineGoroutines = old }() + + txn := Oracle().RegisterStartTs(startTs) + mp := NewMutationPipeline(txn) + require.NoError(t, mp.Process(context.Background(), edges)) + + txn.Lock() + conflicts := make(map[uint64]struct{}, len(txn.conflicts)) + for k := range txn.conflicts { + conflicts[k] = struct{}{} + } + txn.Unlock() + + commitPipelineTxn(t, txn, commitTs) + return conflicts +} + +// runTwoProposalBatch runs two mutation batches as two SEPARATE Process calls on +// the SAME registered transaction, modeling a multi-statement (CommitNow=false) +// transaction: worker/draft.go's RegisterStartTs reuses the *Txn for a start ts, +// and txn.Update between proposals does not clear txn.cache.deltas, so a list/lang +// predicate's forward deltas accumulate across proposals via AddDelta's +// addToList=true. The single-Process byte-identical tests never exercise this +// cross-proposal merge. Snapshots the conflict-key set, commits, restores the +// budget. +func runTwoProposalBatch(t *testing.T, budget int, startTs, commitTs uint64, + batch1, batch2 []*pb.DirectedEdge) map[uint64]struct{} { + t.Helper() + old := x.WorkerConfig.MutationsPipelineGoroutines + x.WorkerConfig.MutationsPipelineGoroutines = budget + defer func() { x.WorkerConfig.MutationsPipelineGoroutines = old }() + + txn := Oracle().RegisterStartTs(startTs) + + mp1 := NewMutationPipeline(txn) + require.NoError(t, mp1.Process(context.Background(), batch1)) + txn.Update() // mirrors draft.go's `defer txn.Update()` per applyMutations. + + mp2 := NewMutationPipeline(txn) + require.NoError(t, mp2.Process(context.Background(), batch2)) + + txn.Lock() + conflicts := make(map[uint64]struct{}, len(txn.conflicts)) + for k := range txn.conflicts { + conflicts[k] = struct{}{} + } + txn.Unlock() + + commitPipelineTxn(t, txn, commitTs) + return conflicts +} + +// TestProcessListCrossProposalByteIdentical is the regression test for the +// ProcessList parallel-write addToList bug: the parallel concStore must replicate +// AddDelta's addToList=true so a list/lang forward delta accumulated by an EARLIER +// proposal on the same txn is preserved by a LATER proposal. Two proposals SET the +// same source uids to a second target (and the same lang entities in a second +// language). The serial path appends across proposals ([A,B]); a parallel path +// that overwrote would drop A. Asserts the budget>1 result equals the serial +// baseline AND that the baseline itself carries both proposals' postings (so the +// test fails on the buggy overwrite, not merely on disagreement). Run under -race. +func TestProcessListCrossProposalByteIdentical(t *testing.T) { + const ( + linkSrc = 60 // distinct source uids touched in both proposals + bioEntities = 40 // distinct @lang entities touched in both proposals + ) + + linkAttr := x.AttrInRootNamespace("link") + bioAttr := x.AttrInRootNamespace("bio") + + const ( + srcBase = uint64(1_000_000) + bioBase = uint64(5_000_000) + targetA = uint64(9_000_001) + targetB = uint64(9_000_002) + ) + + buildLink := func(target uint64) []*pb.DirectedEdge { + edges := make([]*pb.DirectedEdge, 0, linkSrc) + for i := 0; i < linkSrc; i++ { + edges = append(edges, &pb.DirectedEdge{ + Entity: srcBase + uint64(i), + Attr: linkAttr, + ValueId: target, + Op: pb.DirectedEdge_SET, + }) + } + return edges + } + buildBio := func(lang string) []*pb.DirectedEdge { + edges := make([]*pb.DirectedEdge, 0, bioEntities) + for i := 0; i < bioEntities; i++ { + edges = append(edges, &pb.DirectedEdge{ + Entity: bioBase + uint64(i), + Attr: bioAttr, + Value: []byte(fmt.Sprintf("%s%d", lang, i)), + ValueType: pb.Posting_STRING, + Lang: lang, + Op: pb.DirectedEdge_SET, + }) + } + return edges + } + + batch1 := append(buildLink(targetA), buildBio("en")...) + batch2 := append(buildLink(targetB), buildBio("de")...) + + // Sanity: budget=30 grants BOTH ProcessList predicates more than one worker in + // each proposal (link=60, bio=40 per proposal), so the parallel concStore path + // is genuinely taken on the second proposal where the cross-proposal merge runs. + alloc := allocateWorkers(map[string]int{linkAttr: linkSrc, bioAttr: bioEntities}, 30) + require.Greater(t, alloc[linkAttr], 1, "test must exercise multi-worker [uid] predicate") + require.Greater(t, alloc[bioAttr], 1, "test must exercise multi-worker @lang predicate") + + schemaBytes := []byte(` + link: [uid] @reverse . + bio: string @lang . + `) + + langVals := func(t *testing.T, attr string, entity, readTs uint64) map[string]string { + t.Helper() + l, err := GetNoStore(x.DataKey(attr, entity), readTs) + require.NoError(t, err) + out := map[string]string{} + require.NoError(t, l.Iterate(readTs, 0, func(p *pb.Posting) error { + out[string(p.LangTag)] = string(p.Value) + return nil + })) + return out + } + + type result struct { + fwd map[uint64][]uint64 + rev map[uint64][]uint64 + lang map[uint64]map[string]string + } + + collect := func(readTs uint64) result { + r := result{ + fwd: map[uint64][]uint64{}, + rev: map[uint64][]uint64{}, + lang: map[uint64]map[string]string{}, + } + for i := 0; i < linkSrc; i++ { + src := srcBase + uint64(i) + r.fwd[src] = sortedUint64(forwardUids(t, linkAttr, src, readTs)) + } + for _, tgt := range []uint64{targetA, targetB} { + r.rev[tgt] = sortedUint64(reverseUids(t, linkAttr, tgt, readTs)) + } + for i := 0; i < bioEntities; i++ { + e := bioBase + uint64(i) + r.lang[e] = langVals(t, bioAttr, e, readTs) + } + return r + } + + // Run 1 — budget disabled (serial cross-proposal addToList merge). + require.NoError(t, pstore.DropAll()) + MemLayerInstance.clear() + require.NoError(t, schema.ParseBytes(schemaBytes, 1)) + conflicts0 := runTwoProposalBatch(t, 0, 5400, 5401, cloneEdges(batch1), cloneEdges(batch2)) + res0 := collect(5402) + + // Run 2 — budget=30 (parallel concStore must reproduce the merge). + require.NoError(t, pstore.DropAll()) + MemLayerInstance.clear() + require.NoError(t, schema.ParseBytes(schemaBytes, 1)) + conflicts30 := runTwoProposalBatch(t, 30, 5410, 5411, cloneEdges(batch1), cloneEdges(batch2)) + res30 := collect(5412) + + // The serial baseline must carry BOTH proposals' postings — otherwise the + // equality assertions below could pass on a shared (wrong) overwrite result. + wantFwd := sortedUint64([]uint64{targetA, targetB}) + for i := 0; i < linkSrc; i++ { + require.Equal(t, wantFwd, res0.fwd[srcBase+uint64(i)], + "serial forward list must append across proposals") + } + for i := 0; i < bioEntities; i++ { + require.Equal(t, map[string]string{ + "en": fmt.Sprintf("en%d", i), + "de": fmt.Sprintf("de%d", i), + }, res0.lang[bioBase+uint64(i)], "serial lang values must carry both proposals") + } + + require.Equal(t, res0.fwd, res30.fwd, "forward uid lists must match across budgets") + require.Equal(t, res0.rev, res30.rev, "reverse lists must match across budgets") + require.Equal(t, res0.lang, res30.lang, "lang values must match across budgets") + require.Equal(t, conflicts0, conflicts30, "conflict-key sets must be identical") +} + +// TestAllocateWorkers exercises the pure apportionment helper: floor of 1, the +// budget-disabled and saturated edge cases, the largest-remainder distribution, +// the floor-of-1 reclaim, and determinism — no cluster needed. +func TestAllocateWorkers(t *testing.T) { + // Disabled budget (< 2) and empty input return nil → callers treat every + // predicate as workers=0 (legacy path). + require.Nil(t, allocateWorkers(map[string]int{"a": 5, "b": 3}, 0)) + require.Nil(t, allocateWorkers(map[string]int{"a": 5, "b": 3}, 1)) + require.Nil(t, allocateWorkers(map[string]int{}, 30)) + + // P >= budget: one worker each, nothing left to split with. + got := allocateWorkers(map[string]int{"a": 100, "b": 50, "c": 1}, 2) + require.Equal(t, map[string]int{"a": 1, "b": 1, "c": 1}, got) + + // Even split: budget 4 across two equal predicates → 2 each. + require.Equal(t, map[string]int{"a": 2, "b": 2}, + allocateWorkers(map[string]int{"a": 1, "b": 1}, 4)) + + // Largest-remainder with deterministic tie-break: budget 3, two equal + // predicates → quota 1.5 each, the single leftover goes to the + // lexicographically smaller attr. + require.Equal(t, map[string]int{"a": 2, "b": 1}, + allocateWorkers(map[string]int{"a": 1, "b": 1}, 3)) + + // Skewed: one dominant predicate takes most of the budget; tiny ones get 1. + // name=300, dept=10, friend=10, total=320, budget=30: + // name quota 28.125 -> 28, dept/friend quota 0.9375 -> 1 each; sum == 30. + skewed := allocateWorkers(map[string]int{"name": 300, "dept": 10, "friend": 10}, 30) + require.Equal(t, map[string]int{"name": 28, "dept": 1, "friend": 1}, skewed) + + // Floor-of-1 reclaim: a huge dominant plus 25 single-edge predicates + // oversubscribes (28 ones bump the floors over budget); the overshoot is + // reclaimed from the only predicate with >1 worker (the dominant). + counts := map[string]int{"hot": 10000} + for i := 0; i < 25; i++ { + counts[fmt.Sprintf("tiny%02d", i)] = 1 + } + reclaim := allocateWorkers(counts, 30) + sum := 0 + for attr, w := range reclaim { + require.GreaterOrEqual(t, w, 1, "every predicate must get >= 1 worker (%s)", attr) + sum += w + } + require.Equal(t, 30, sum, "sum must equal budget when P < budget") + require.Equal(t, 5, reclaim["hot"], "dominant keeps the remaining budget after the floor") + for i := 0; i < 25; i++ { + require.Equal(t, 1, reclaim[fmt.Sprintf("tiny%02d", i)]) + } + + // Determinism: the same input yields the same output across runs. + require.Equal(t, skewed, + allocateWorkers(map[string]int{"friend": 10, "name": 300, "dept": 10}, 30)) +} + +// TestPipelineBudgetByteIdentical proves the proportional budget produces results +// byte-identical to the legacy one-goroutine-per-predicate path: the same skewed +// batch run with budget=0 and budget=30 must yield identical committed data +// values, index buckets, forward/reverse lists, AND identical conflict-key sets. +// The dominant `name` predicate is granted >1 worker so both the parallel +// data-write and the parallel index tokenization (numGo=k) are exercised. Run it +// under -race to also cover I1/I2/I3. +func TestPipelineBudgetByteIdentical(t *testing.T) { + const nameCount = 300 + + nameAttr := x.AttrInRootNamespace("name") + deptAttr := x.AttrInRootNamespace("dept") + friendAttr := x.AttrInRootNamespace("friend") + edges := buildSkewedBatch(nameAttr, deptAttr, friendAttr, nameCount) + + // Sanity: budget=30 actually grants the dominant predicate more than one + // worker, so the parallel path under test is genuinely taken. + alloc := allocateWorkers(map[string]int{nameAttr: nameCount, deptAttr: 10, friendAttr: 10}, 30) + require.Greater(t, alloc[nameAttr], 1, "test must exercise multi-worker dominant predicate") + + schemaBytes := []byte(` + name: string @index(exact) . + dept: string @index(exact) . + friend: uid @reverse . + `) + + type result struct { + nameVal map[uint64]string + nameIdx map[string][]uint64 + deptIdx map[string][]uint64 + fwd map[uint64][]uint64 + rev map[uint64][]uint64 + } + + collect := func(readTs uint64) result { + r := result{ + nameVal: map[uint64]string{}, + nameIdx: map[string][]uint64{}, + deptIdx: map[string][]uint64{}, + fwd: map[uint64][]uint64{}, + rev: map[uint64][]uint64{}, + } + for i := 0; i < nameCount; i++ { + e := uint64(1_000_000) + uint64(i) + r.nameVal[e] = scalarValue(t, nameAttr, e, readTs) + } + for v := 0; v < 50; v++ { + val := fmt.Sprintf("name%d", v) + r.nameIdx[val] = sortedUint64(indexUidsForVal(t, nameAttr, val, readTs)) + } + for v := 0; v < 3; v++ { + val := fmt.Sprintf("dept%d", v) + r.deptIdx[val] = sortedUint64(indexUidsForVal(t, deptAttr, val, readTs)) + } + for i := 0; i < 10; i++ { + src := uint64(3_000_000) + uint64(i) + r.fwd[src] = sortedUint64(forwardUids(t, friendAttr, src, readTs)) + } + for tgt := 0; tgt < 4; tgt++ { + target := uint64(4_000_000) + uint64(tgt) + r.rev[target] = sortedUint64(reverseUids(t, friendAttr, target, readTs)) + } + return r + } + + // Run 1 — budget disabled (legacy one-goroutine-per-predicate). + require.NoError(t, pstore.DropAll()) + MemLayerInstance.clear() + require.NoError(t, schema.ParseBytes(schemaBytes, 1)) + conflicts0 := runBudgetBatch(t, 0, 5000, 5001, cloneEdges(edges)) + res0 := collect(5002) + + // Run 2 — budget=30 (dominant predicate parallelized). + require.NoError(t, pstore.DropAll()) + MemLayerInstance.clear() + require.NoError(t, schema.ParseBytes(schemaBytes, 1)) + conflicts30 := runBudgetBatch(t, 30, 5100, 5101, cloneEdges(edges)) + res30 := collect(5102) + + require.Equal(t, res0.nameVal, res30.nameVal, "scalar data values must match") + require.Equal(t, res0.nameIdx, res30.nameIdx, "name index buckets must match") + require.Equal(t, res0.deptIdx, res30.deptIdx, "dept index buckets must match") + require.Equal(t, res0.fwd, res30.fwd, "forward uid lists must match") + require.Equal(t, res0.rev, res30.rev, "reverse lists must match") + require.Equal(t, conflicts0, conflicts30, "conflict-key sets must be identical") +} + +// TestProcessListBudgetByteIdentical is the ProcessList counterpart of +// TestPipelineBudgetByteIdentical. It proves the forward data-write split added to +// ProcessList is byte-identical to the serial path for the predicates that route +// THROUGH ProcessList — which the existing budget tests do NOT cover: their +// `friend: uid @reverse` is a scalar uid predicate (no @lang, not a list) so it +// goes to ProcessSingle. Here `link: [uid] @reverse` (info.isList → ProcessList, +// info.isUid → the sort/dedup concStore branch) is the dominant predicate, plus a +// small `bio: string @lang` predicate (su.Lang → ProcessList, non-uid branch) so +// the lang forward-write split is exercised too. The same batch run with budget=0 +// (serial) and budget=30 (both predicates parallelized) must yield identical +// forward uid lists, reverse lists, lang values, AND conflict-key sets. Run under +// -race to also cover the disjoint-key/per-worker-buffer invariants. +func TestProcessListBudgetByteIdentical(t *testing.T) { + const ( + linkCount = 300 // distinct source uids → 5 hot reverse targets + hotTargets = 5 + bioEntities = 40 + bioLangs = 3 // 40*3 = 120 lang postings + ) + + linkAttr := x.AttrInRootNamespace("link") + bioAttr := x.AttrInRootNamespace("bio") + + edges := append(buildReverseEdges(linkAttr, linkCount, hotTargets), + buildLangEdges(bioAttr, bioEntities, bioLangs)...) + + // Sanity: budget=30 grants BOTH predicates more than one worker, so the + // multi-worker ProcessList path under test is genuinely taken for the [uid] + // case AND the lang case. + alloc := allocateWorkers(map[string]int{linkAttr: linkCount, bioAttr: bioEntities * bioLangs}, 30) + require.Greater(t, alloc[linkAttr], 1, "test must exercise multi-worker [uid] @reverse predicate") + require.Greater(t, alloc[bioAttr], 1, "test must exercise multi-worker @lang predicate") + + schemaBytes := []byte(` + link: [uid] @reverse . + bio: string @lang . + `) + + // langVals reads every language posting for a @lang entity's forward data key + // as a langTag→value map, so both runs can be compared without depending on + // posting order. + langVals := func(t *testing.T, attr string, entity, readTs uint64) map[string]string { + t.Helper() + l, err := GetNoStore(x.DataKey(attr, entity), readTs) + require.NoError(t, err) + out := map[string]string{} + require.NoError(t, l.Iterate(readTs, 0, func(p *pb.Posting) error { + out[string(p.LangTag)] = string(p.Value) + return nil + })) + return out + } + + type result struct { + fwd map[uint64][]uint64 + rev map[uint64][]uint64 + lang map[uint64]map[string]string + } + + collect := func(readTs uint64) result { + r := result{ + fwd: map[uint64][]uint64{}, + rev: map[uint64][]uint64{}, + lang: map[uint64]map[string]string{}, + } + for i := 0; i < linkCount; i++ { + src := uint64(1_000_000) + uint64(i) + r.fwd[src] = sortedUint64(forwardUids(t, linkAttr, src, readTs)) + } + for tgt := 0; tgt < hotTargets; tgt++ { + target := uint64(9_000_000) + uint64(tgt) + r.rev[target] = sortedUint64(reverseUids(t, linkAttr, target, readTs)) + } + for i := 0; i < bioEntities; i++ { + e := uint64(5_000_000) + uint64(i) + r.lang[e] = langVals(t, bioAttr, e, readTs) + } + return r + } + + // Run 1 — budget disabled (serial ProcessList forward write). + require.NoError(t, pstore.DropAll()) + MemLayerInstance.clear() + require.NoError(t, schema.ParseBytes(schemaBytes, 1)) + conflicts0 := runBudgetBatch(t, 0, 5300, 5301, cloneEdges(edges)) + res0 := collect(5302) + + // Run 2 — budget=30 (both ProcessList predicates parallelized). + require.NoError(t, pstore.DropAll()) + MemLayerInstance.clear() + require.NoError(t, schema.ParseBytes(schemaBytes, 1)) + conflicts30 := runBudgetBatch(t, 30, 5310, 5311, cloneEdges(edges)) + res30 := collect(5312) + + require.Equal(t, res0.fwd, res30.fwd, "forward uid lists must match") + require.Equal(t, res0.rev, res30.rev, "reverse lists must match") + require.Equal(t, res0.lang, res30.lang, "lang values must match") + require.Equal(t, conflicts0, conflicts30, "conflict-key sets must be identical") +} + +// TestAutoBudget exercises the pure AUTO-mode derivation directly, independent of +// the real GOMAXPROCS: machineCap vs workCap interaction, rounding, the budget<2 +// fallback, the divide-by-zero guard on a misconfigured min-edges, and +// determinism — no cluster needed. +func TestAutoBudget(t *testing.T) { + // machineCap dominates: round(96*0.5)=48 < 1_000_000/256=3906. + require.Equal(t, 48, autoBudget(96, 1_000_000, 0.5, 256)) + + // workCap dominates: 2560/256=10 < round(96*0.5)=48. + require.Equal(t, 10, autoBudget(96, 2560, 0.5, 256)) + + // Rounding is half away from zero: round(3*0.5)=round(1.5)=2. + require.Equal(t, 2, autoBudget(3, 1_000_000, 0.5, 256)) + + // budget<2 fallback: round(1*0.5)=1 and 100/256 clamps to 1, so min==1 → the + // caller routes through allocateWorkers, which returns nil for budget<2 (legacy). + require.Equal(t, 1, autoBudget(1, 100, 0.5, 256)) + + // Divide-by-zero guard: a misconfigured minEdgesPerWorker of 0 must not panic + // and clamps to 1, so workCap==totalEdges and machineCap (48) wins here. + require.Equal(t, 48, autoBudget(96, 2560, 0.5, 0)) + + // Determinism: identical inputs yield identical outputs. + require.Equal(t, autoBudget(96, 2560, 0.5, 256), autoBudget(96, 2560, 0.5, 256)) +} + +// runAutoBudgetBatch is runBudgetBatch's AUTO-mode sibling: it enables AUTO +// (MutationsPipelineGoroutines=-1) with the given fraction / min-edges, runs the +// batch through a fresh pipeline txn, snapshots the conflict-key set, commits, and +// restores all three config fields on return. +func runAutoBudgetBatch(t *testing.T, fraction float64, minEdges int, + startTs, commitTs uint64, edges []*pb.DirectedEdge) map[uint64]struct{} { + t.Helper() + oldBudget := x.WorkerConfig.MutationsPipelineGoroutines + oldFraction := x.WorkerConfig.MutationsPipelineGoroutinesFraction + oldMinEdges := x.WorkerConfig.MutationsPipelineMinEdgesPerWorker + x.WorkerConfig.MutationsPipelineGoroutines = mutationsPipelineGoroutinesAuto + x.WorkerConfig.MutationsPipelineGoroutinesFraction = fraction + x.WorkerConfig.MutationsPipelineMinEdgesPerWorker = minEdges + defer func() { + x.WorkerConfig.MutationsPipelineGoroutines = oldBudget + x.WorkerConfig.MutationsPipelineGoroutinesFraction = oldFraction + x.WorkerConfig.MutationsPipelineMinEdgesPerWorker = oldMinEdges + }() + + txn := Oracle().RegisterStartTs(startTs) + mp := NewMutationPipeline(txn) + require.NoError(t, mp.Process(context.Background(), edges)) + + txn.Lock() + conflicts := make(map[uint64]struct{}, len(txn.conflicts)) + for k := range txn.conflicts { + conflicts[k] = struct{}{} + } + txn.Unlock() + + commitPipelineTxn(t, txn, commitTs) + return conflicts +} + +// TestPipelineBudgetAutoByteIdentical proves AUTO mode is byte-identical to a +// fixed budget of the same computed value (and to the disabled legacy path). AUTO +// only chooses the integer fed into allocateWorkers, so for a deterministically +// forced internal budget the downstream behavior must match exactly. The forcing +// is host-independent: fraction=100 makes machineCap=round(GOMAXPROCS*100) >= 100 +// on any host, so workCap=totalEdges/minEdges=320/32=10 always wins → AUTO derives +// budget=10. Asserts committed data values, index buckets, forward/reverse lists, +// AND conflict-key sets equal those of budget=0 and a fixed budget=10. Run under +// -race to also cover the parallel data-write/index paths. +func TestPipelineBudgetAutoByteIdentical(t *testing.T) { + const nameCount = 300 // total batch = 300 name + 10 dept + 10 friend = 320 edges + + nameAttr := x.AttrInRootNamespace("name") + deptAttr := x.AttrInRootNamespace("dept") + friendAttr := x.AttrInRootNamespace("friend") + edges := buildSkewedBatch(nameAttr, deptAttr, friendAttr, nameCount) + + // Sanity: the forced internal budget of 10 grants the dominant predicate more + // than one worker, so the multi-worker path is genuinely exercised. + alloc := allocateWorkers(map[string]int{nameAttr: nameCount, deptAttr: 10, friendAttr: 10}, 10) + require.Greater(t, alloc[nameAttr], 1, "test must exercise multi-worker dominant predicate") + // Sanity: AUTO with fraction=100 / minEdges=32 derives exactly 10 regardless of + // the host's GOMAXPROCS (machineCap >= 100, workCap = 320/32 = 10). + require.Equal(t, 10, autoBudget(1, 320, 100, 32)) + require.Equal(t, 10, autoBudget(256, 320, 100, 32)) + + schemaBytes := []byte(` + name: string @index(exact) . + dept: string @index(exact) . + friend: uid @reverse . + `) + + type result struct { + nameVal map[uint64]string + nameIdx map[string][]uint64 + deptIdx map[string][]uint64 + fwd map[uint64][]uint64 + rev map[uint64][]uint64 + } + + collect := func(readTs uint64) result { + r := result{ + nameVal: map[uint64]string{}, + nameIdx: map[string][]uint64{}, + deptIdx: map[string][]uint64{}, + fwd: map[uint64][]uint64{}, + rev: map[uint64][]uint64{}, + } + for i := 0; i < nameCount; i++ { + e := uint64(1_000_000) + uint64(i) + r.nameVal[e] = scalarValue(t, nameAttr, e, readTs) + } + for v := 0; v < 50; v++ { + val := fmt.Sprintf("name%d", v) + r.nameIdx[val] = sortedUint64(indexUidsForVal(t, nameAttr, val, readTs)) + } + for v := 0; v < 3; v++ { + val := fmt.Sprintf("dept%d", v) + r.deptIdx[val] = sortedUint64(indexUidsForVal(t, deptAttr, val, readTs)) + } + for i := 0; i < 10; i++ { + src := uint64(3_000_000) + uint64(i) + r.fwd[src] = sortedUint64(forwardUids(t, friendAttr, src, readTs)) + } + for tgt := 0; tgt < 4; tgt++ { + target := uint64(4_000_000) + uint64(tgt) + r.rev[target] = sortedUint64(reverseUids(t, friendAttr, target, readTs)) + } + return r + } + + // Run 1 — budget disabled (legacy one-goroutine-per-predicate). + require.NoError(t, pstore.DropAll()) + MemLayerInstance.clear() + require.NoError(t, schema.ParseBytes(schemaBytes, 1)) + conflicts0 := runBudgetBatch(t, 0, 5200, 5201, cloneEdges(edges)) + res0 := collect(5202) + + // Run 2 — fixed budget=10. + require.NoError(t, pstore.DropAll()) + MemLayerInstance.clear() + require.NoError(t, schema.ParseBytes(schemaBytes, 1)) + conflicts10 := runBudgetBatch(t, 10, 5210, 5211, cloneEdges(edges)) + res10 := collect(5212) + + // Run 3 — AUTO, forced to derive budget=10. + require.NoError(t, pstore.DropAll()) + MemLayerInstance.clear() + require.NoError(t, schema.ParseBytes(schemaBytes, 1)) + conflictsAuto := runAutoBudgetBatch(t, 100, 32, 5220, 5221, cloneEdges(edges)) + resAuto := collect(5222) + + require.Equal(t, res0.nameVal, resAuto.nameVal, "scalar data values must match legacy") + require.Equal(t, res10.nameVal, resAuto.nameVal, "scalar data values must match fixed budget") + require.Equal(t, res10.nameIdx, resAuto.nameIdx, "name index buckets must match fixed budget") + require.Equal(t, res10.deptIdx, resAuto.deptIdx, "dept index buckets must match fixed budget") + require.Equal(t, res10.fwd, resAuto.fwd, "forward uid lists must match fixed budget") + require.Equal(t, res10.rev, resAuto.rev, "reverse lists must match fixed budget") + require.Equal(t, conflicts0, conflictsAuto, "conflict-key set must match legacy") + require.Equal(t, conflicts10, conflictsAuto, "conflict-key set must match fixed budget") +} + +// BenchmarkPipelineSkewedBatch measures Process throughput on a ~20,000-edge +// batch dominated by one non-indexed scalar predicate (~99% of edges), comparing +// the budget-off serial data-write against budget=30. The dominant predicate is +// non-indexed so the measurement isolates the data-write parallelization (the +// primary beneficiary). edges/s plus ns/op are reported per sub-benchmark. +func BenchmarkPipelineSkewedBatch(b *testing.B) { + require.NoError(b, pstore.DropAll()) + MemLayerInstance.clear() + require.NoError(b, schema.ParseBytes([]byte(` + name: string . + dept: string @index(exact) . + friend: uid @reverse . + `), 1)) + + nameAttr := x.AttrInRootNamespace("name") + deptAttr := x.AttrInRootNamespace("dept") + friendAttr := x.AttrInRootNamespace("friend") + edges := buildSkewedBatch(nameAttr, deptAttr, friendAttr, 19980) // ~20,000 total + + run := func(b *testing.B, budget int) { + old := x.WorkerConfig.MutationsPipelineGoroutines + x.WorkerConfig.MutationsPipelineGoroutines = budget + defer func() { x.WorkerConfig.MutationsPipelineGoroutines = old }() + + var ts uint64 = 100_000 + b.ResetTimer() + for i := 0; i < b.N; i++ { + ts += 10 + // Fresh, unregistered txn per iteration: a private cache, no oracle + // bookkeeping to leak across b.N. + txn := NewTxn(ts) + mp := NewMutationPipeline(txn) + if err := mp.Process(context.Background(), cloneEdges(edges)); err != nil { + b.Fatal(err) + } + } + b.StopTimer() + edgesPerSec := float64(len(edges)) * float64(b.N) / b.Elapsed().Seconds() + b.ReportMetric(edgesPerSec, "edges/s") + } + + b.Run("budget=0", func(b *testing.B) { run(b, 0) }) + b.Run("budget=30", func(b *testing.B) { run(b, 30) }) +} + +// BenchmarkPipelineBudgetSweep sweeps the goroutine budget over the same +// ~20,000-edge skewed batch as BenchmarkPipelineSkewedBatch, plus AUTO mode, so +// the throughput plateau can be located on the real (e.g. 96-core) box and the +// AUTO fraction tuned. Each sub-benchmark reports edges/s alongside the default +// ns/op. cloneEdges hands each Process call a fresh slice (Process mutates edge +// ValueId in place). Run with: +// +// go test -run '^$' -bench '^BenchmarkPipelineBudgetSweep$' -benchmem ./posting/ +func BenchmarkPipelineBudgetSweep(b *testing.B) { + require.NoError(b, pstore.DropAll()) + MemLayerInstance.clear() + require.NoError(b, schema.ParseBytes([]byte(` + name: string . + dept: string @index(exact) . + friend: uid @reverse . + `), 1)) + + nameAttr := x.AttrInRootNamespace("name") + deptAttr := x.AttrInRootNamespace("dept") + friendAttr := x.AttrInRootNamespace("friend") + edges := buildSkewedBatch(nameAttr, deptAttr, friendAttr, 19980) // ~20,000 total + + // run sets the budget plus the two AUTO tunables (inert unless budget==-1) and + // restores all three on return. fraction/minEdges only matter for the auto case. + run := func(b *testing.B, budget int, fraction float64, minEdges int) { + oldBudget := x.WorkerConfig.MutationsPipelineGoroutines + oldFraction := x.WorkerConfig.MutationsPipelineGoroutinesFraction + oldMinEdges := x.WorkerConfig.MutationsPipelineMinEdgesPerWorker + x.WorkerConfig.MutationsPipelineGoroutines = budget + x.WorkerConfig.MutationsPipelineGoroutinesFraction = fraction + x.WorkerConfig.MutationsPipelineMinEdgesPerWorker = minEdges + defer func() { + x.WorkerConfig.MutationsPipelineGoroutines = oldBudget + x.WorkerConfig.MutationsPipelineGoroutinesFraction = oldFraction + x.WorkerConfig.MutationsPipelineMinEdgesPerWorker = oldMinEdges + }() + + var ts uint64 = 200_000 + b.ResetTimer() + for i := 0; i < b.N; i++ { + ts += 10 + // Fresh, unregistered txn per iteration: a private cache, no oracle + // bookkeeping to leak across b.N. + txn := NewTxn(ts) + mp := NewMutationPipeline(txn) + if err := mp.Process(context.Background(), cloneEdges(edges)); err != nil { + b.Fatal(err) + } + } + b.StopTimer() + edgesPerSec := float64(len(edges)) * float64(b.N) / b.Elapsed().Seconds() + b.ReportMetric(edgesPerSec, "edges/s") + } + + for _, budget := range []int{0, 8, 16, 24, 32, 48, 64, 96} { + budget := budget + b.Run(fmt.Sprintf("budget=%d", budget), func(b *testing.B) { run(b, budget, 0.5, 256) }) + } + // AUTO uses the production default fraction (1.0); runtime.GOMAXPROCS(0) drives + // machineCap = round(GOMAXPROCS * 1.0), capped by edges/minEdgesPerWorker. + b.Run("budget=auto", func(b *testing.B) { + run(b, mutationsPipelineGoroutinesAuto, 1.0, 256) + }) +} diff --git a/posting/pipeline_revbench_test.go b/posting/pipeline_revbench_test.go new file mode 100644 index 00000000000..78430513fcc --- /dev/null +++ b/posting/pipeline_revbench_test.go @@ -0,0 +1,138 @@ +/* + * Reverse-heavy benchmark: measures whether the auto/budget hybrid speedup + * reaches @reverse [uid] predicates (it routes to ProcessList, which is serial). + * Self-contained; package-internal (white-box). + */ + +package posting + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/dgraph-io/dgraph/v25/protos/pb" + "github.com/dgraph-io/dgraph/v25/schema" + "github.com/dgraph-io/dgraph/v25/x" +) + +// buildReverseEdges builds `n` forward [uid] edges from distinct sources to a +// small set of `hotTargets` target uids — modeling the "many sources, 2-5 hot +// target nodes" reverse fan-in of the production workload. +func buildReverseEdges(attr string, n, hotTargets int) []*pb.DirectedEdge { + const srcBase = uint64(1_000_000) + const tgtBase = uint64(9_000_000) + edges := make([]*pb.DirectedEdge, 0, n) + for i := 0; i < n; i++ { + edges = append(edges, &pb.DirectedEdge{ + Entity: srcBase + uint64(i), + Attr: attr, + ValueId: tgtBase + uint64(i%hotTargets), // few hot targets + Op: pb.DirectedEdge_SET, + }) + } + return edges +} + +// buildScalarIndexedEdges builds `n` scalar string edges (distinct entities, +// `distinct` distinct values) — these route to ProcessSingle and DO benefit +// from the intra-predicate split. +func buildScalarIndexedEdges(attr string, n, distinct int) []*pb.DirectedEdge { + const base = uint64(2_000_000) + edges := make([]*pb.DirectedEdge, 0, n) + for i := 0; i < n; i++ { + edges = append(edges, &pb.DirectedEdge{ + Entity: base + uint64(i), + Attr: attr, + Value: []byte(fmt.Sprintf("v%d", i%distinct)), + ValueType: pb.Posting_STRING, + Op: pb.DirectedEdge_SET, + }) + } + return edges +} + +func runBudget(b *testing.B, edges []*pb.DirectedEdge, budget int, fraction float64, minEdges int) { + ob := x.WorkerConfig.MutationsPipelineGoroutines + of := x.WorkerConfig.MutationsPipelineGoroutinesFraction + om := x.WorkerConfig.MutationsPipelineMinEdgesPerWorker + x.WorkerConfig.MutationsPipelineGoroutines = budget + x.WorkerConfig.MutationsPipelineGoroutinesFraction = fraction + x.WorkerConfig.MutationsPipelineMinEdgesPerWorker = minEdges + defer func() { + x.WorkerConfig.MutationsPipelineGoroutines = ob + x.WorkerConfig.MutationsPipelineGoroutinesFraction = of + x.WorkerConfig.MutationsPipelineMinEdgesPerWorker = om + }() + var ts uint64 = 300_000 + b.ResetTimer() + for i := 0; i < b.N; i++ { + ts += 10 + txn := NewTxn(ts) + mp := NewMutationPipeline(txn) + if err := mp.Process(context.Background(), cloneEdges(edges)); err != nil { + b.Fatal(err) + } + } + b.StopTimer() + b.ReportMetric(float64(len(edges))*float64(b.N)/b.Elapsed().Seconds(), "edges/s") +} + +// BenchmarkReverseDominant: ~20k edges, a SINGLE dominant `[uid] @reverse` +// predicate (5 hot targets). This is the "is the speedup OBE for @reverse" +// case — expect budget=auto ~= budget=0 (ProcessList + ProcessReverse serial). +func BenchmarkReverseDominant(b *testing.B) { + require.NoError(b, pstore.DropAll()) + MemLayerInstance.clear() + require.NoError(b, schema.ParseBytes([]byte(`link: [uid] @reverse .`), 1)) + link := x.AttrInRootNamespace("link") + edges := buildReverseEdges(link, 19980, 5) + + for _, bud := range []int{0, 8, 32} { + bud := bud + b.Run(fmt.Sprintf("budget=%d", bud), func(b *testing.B) { runBudget(b, edges, bud, 1.0, 256) }) + } + b.Run("budget=auto", func(b *testing.B) { runBudget(b, edges, mutationsPipelineGoroutinesAuto, 1.0, 256) }) +} + +// BenchmarkReverseFiftyFifty: ~20k edges, 50% on a `[uid] @reverse` predicate +// (no speedup) and 50% on a scalar `@index` predicate (full speedup) — the +// "50% of incoming statements have an @reverse index" assumption. Shows the +// diluted, real-workload speedup. +func BenchmarkReverseFiftyFifty(b *testing.B) { + require.NoError(b, pstore.DropAll()) + MemLayerInstance.clear() + require.NoError(b, schema.ParseBytes([]byte(` + sname: string @index(exact) . + link: [uid] @reverse . + `), 1)) + sname := x.AttrInRootNamespace("sname") + link := x.AttrInRootNamespace("link") + edges := append(buildScalarIndexedEdges(sname, 10000, 50), + buildReverseEdges(link, 10000, 5)...) + + for _, bud := range []int{0, 8, 32} { + bud := bud + b.Run(fmt.Sprintf("budget=%d", bud), func(b *testing.B) { runBudget(b, edges, bud, 1.0, 256) }) + } + b.Run("budget=auto", func(b *testing.B) { runBudget(b, edges, mutationsPipelineGoroutinesAuto, 1.0, 256) }) +} + +// BenchmarkScalarDominant: control — ~20k edges, dominant scalar `@index` +// predicate (matches the original sweep's best case) to contrast against the +// reverse-dominant number. +func BenchmarkScalarDominant(b *testing.B) { + require.NoError(b, pstore.DropAll()) + MemLayerInstance.clear() + require.NoError(b, schema.ParseBytes([]byte(`sname: string @index(exact) .`), 1)) + sname := x.AttrInRootNamespace("sname") + edges := buildScalarIndexedEdges(sname, 19980, 50) + + for _, bud := range []int{0, 8, 32} { + bud := bud + b.Run(fmt.Sprintf("budget=%d", bud), func(b *testing.B) { runBudget(b, edges, bud, 1.0, 256) }) + } + b.Run("budget=auto", func(b *testing.B) { runBudget(b, edges, mutationsPipelineGoroutinesAuto, 1.0, 256) }) +} diff --git a/posting/pipeline_schema_matrix_test.go b/posting/pipeline_schema_matrix_test.go new file mode 100644 index 00000000000..115ac790400 --- /dev/null +++ b/posting/pipeline_schema_matrix_test.go @@ -0,0 +1,351 @@ +/* + * Schema/index-type matrix: confirms the intra-predicate goroutine budget + * (proportional, auto, and the ProcessList parallel forward write) produces + * byte-identical committed state and identical conflict-key sets vs. the legacy + * one-goroutine-per-predicate path, across a wide range of Dgraph value types + * and index tokenizers. + * + * The comparison is fully generic: for every predicate it scans ALL committed + * Badger keys (data, every index token, reverse, count) at a read timestamp and + * dumps each posting list canonically, then asserts the budget>1 dump equals the + * budget=0 dump. This covers any tokenizer without hand-coding token readers. + */ + +package posting + +import ( + "context" + "encoding/hex" + "fmt" + "sort" + "strings" + "testing" + + "github.com/dgraph-io/badger/v4" + "github.com/stretchr/testify/require" + + "github.com/dgraph-io/dgraph/v25/protos/pb" + "github.com/dgraph-io/dgraph/v25/schema" + "github.com/dgraph-io/dgraph/v25/x" +) + +// schemaCase is one row of the matrix: a schema, the root-namespace predicate +// attrs it declares, and a builder for a representative mutation batch. +type schemaCase struct { + name string + schema string + attrs []string + edges func(attrs []string) []*pb.DirectedEdge +} + +// strEdge builds a scalar edge carrying a textual value; ValidateAndConvert +// converts it to the schema's scalar type (the RDF ingest path). +func strEdge(attr string, entity uint64, val string) *pb.DirectedEdge { + return &pb.DirectedEdge{ + Entity: entity, Attr: attr, + Value: []byte(val), ValueType: pb.Posting_STRING, Op: pb.DirectedEdge_SET, + } +} + +// uidEdge builds a uid edge (forward target in ValueId). +func uidEdge(attr string, entity, target uint64) *pb.DirectedEdge { + return &pb.DirectedEdge{ + Entity: entity, Attr: attr, ValueId: target, Op: pb.DirectedEdge_SET, + } +} + +// snapshotPredicates scans every committed key under each predicate's prefix at +// readTs and returns hex(key) -> canonical posting-list dump. Two runs that +// commit identical logical state produce identical maps. +func snapshotPredicates(t *testing.T, attrs []string, readTs uint64) map[string]string { + t.Helper() + out := map[string]string{} + txn := pstore.NewTransactionAt(readTs, false) + defer txn.Discard() + for _, attr := range attrs { + prefix := x.PredicatePrefix(attr) + iopt := badger.DefaultIteratorOptions + iopt.AllVersions = false + iopt.PrefetchValues = false + iopt.Prefix = prefix + it := txn.NewIterator(iopt) + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + key := it.Item().KeyCopy(nil) + l, err := GetNoStore(key, readTs) + require.NoError(t, err) + var postings []string + err = l.Iterate(readTs, 0, func(p *pb.Posting) error { + postings = append(postings, fmt.Sprintf( + "u=%d|v=%s|pt=%d|lang=%s|facets=%v", + p.Uid, hex.EncodeToString(p.Value), p.PostingType, + hex.EncodeToString(p.LangTag), p.Facets)) + return nil + }) + require.NoError(t, err) + // postings already in uid order from Iterate; make deterministic anyway. + sort.Strings(postings) + out[hex.EncodeToString(key)] = fmt.Sprintf("%v", postings) + } + it.Close() + } + return out +} + +// keyKinds parses the snapshot's keys and counts how many are data / index / +// reverse / count. Used to prove the comparison actually covers the secondary +// key types a schema declares (so the test can't pass by comparing data only). +func keyKinds(t *testing.T, snap map[string]string) (data, index, reverse, count int) { + t.Helper() + for hk := range snap { + raw, err := hex.DecodeString(hk) + require.NoError(t, err) + pk, err := x.Parse(raw) + require.NoError(t, err) + switch { + case pk.IsCountOrCountRev(): + count++ + case pk.IsReverse(): + reverse++ + case pk.IsIndex(): + index++ + case pk.IsData(): + data++ + } + } + return +} + +// runMatrixBatch applies edges through a fresh pipeline txn at the given budget +// (and auto tunables), snapshots the conflict-key set, commits, and restores the +// config. Mirrors runBudgetBatch but also sets the auto fraction/min-edges. +func runMatrixBatch(t *testing.T, budget int, fraction float64, minEdges int, + startTs, commitTs uint64, edges []*pb.DirectedEdge) map[uint64]struct{} { + t.Helper() + ob := x.WorkerConfig.MutationsPipelineGoroutines + of := x.WorkerConfig.MutationsPipelineGoroutinesFraction + om := x.WorkerConfig.MutationsPipelineMinEdgesPerWorker + x.WorkerConfig.MutationsPipelineGoroutines = budget + x.WorkerConfig.MutationsPipelineGoroutinesFraction = fraction + x.WorkerConfig.MutationsPipelineMinEdgesPerWorker = minEdges + defer func() { + x.WorkerConfig.MutationsPipelineGoroutines = ob + x.WorkerConfig.MutationsPipelineGoroutinesFraction = of + x.WorkerConfig.MutationsPipelineMinEdgesPerWorker = om + }() + + txn := Oracle().RegisterStartTs(startTs) + mp := NewMutationPipeline(txn) + require.NoError(t, mp.Process(context.Background(), edges)) + + txn.Lock() + conflicts := make(map[uint64]struct{}, len(txn.conflicts)) + for k := range txn.conflicts { + conflicts[k] = struct{}{} + } + txn.Unlock() + + commitPipelineTxn(t, txn, commitTs) + return conflicts +} + +// matrixCases is the schema/index-type matrix. +func matrixCases() []schemaCase { + const n = 600 // edges per scalar predicate; > any tested budget so the split fires + scalar := func(valOf func(i int) string) func([]string) []*pb.DirectedEdge { + return func(attrs []string) []*pb.DirectedEdge { + e := make([]*pb.DirectedEdge, 0, n) + for i := 0; i < n; i++ { + e = append(e, strEdge(attrs[0], uint64(1_000_000+i), valOf(i))) + } + return e + } + } + return []schemaCase{ + { + name: "string_multi_index", // exact+hash+term+fulltext+trigram on one pred + schema: `p: string @index(exact, hash, term, fulltext, trigram) .`, + attrs: []string{"p"}, + edges: scalar(func(i int) string { return fmt.Sprintf("the quick brown fox %d", i%40) }), + }, + { + name: "int_count", + schema: `p: int @index(int) @count .`, + attrs: []string{"p"}, + edges: scalar(func(i int) string { return fmt.Sprintf("%d", i%50) }), + }, + { + name: "float_index", + schema: `p: float @index(float) .`, + attrs: []string{"p"}, + edges: scalar(func(i int) string { return fmt.Sprintf("%d.25", i%50) }), + }, + { + name: "datetime_index", + schema: `p: dateTime @index(day) .`, + attrs: []string{"p"}, + edges: scalar(func(i int) string { return fmt.Sprintf("2021-%02d-15T10:30:00Z", (i%12)+1) }), + }, + { + name: "bool_index", + schema: `p: bool @index(bool) .`, + attrs: []string{"p"}, + edges: scalar(func(i int) string { return fmt.Sprintf("%t", i%2 == 0) }), + }, + { + name: "geo_index", + schema: `p: geo @index(geo) .`, + attrs: []string{"p"}, + edges: scalar(func(i int) string { + return fmt.Sprintf(`{"type":"Point","coordinates":[%d.0,%d.0]}`, i%7, (i*2)%7) + }), + }, + { + name: "list_uid_reverse_count", + schema: `p: [uid] @reverse @count .`, + attrs: []string{"p"}, + edges: func(attrs []string) []*pb.DirectedEdge { + e := make([]*pb.DirectedEdge, 0, n) + for i := 0; i < n; i++ { // many sources -> 5 hot reverse targets + e = append(e, uidEdge(attrs[0], uint64(1_000_000+i), uint64(9_000_000+i%5))) + } + return e + }, + }, + { + name: "uid_reverse_singular", + schema: `p: uid @reverse .`, + attrs: []string{"p"}, + edges: func(attrs []string) []*pb.DirectedEdge { + e := make([]*pb.DirectedEdge, 0, n) + for i := 0; i < n; i++ { + e = append(e, uidEdge(attrs[0], uint64(1_000_000+i), uint64(9_000_000+i%5))) + } + return e + }, + }, + { + name: "list_string_index", + schema: `p: [string] @index(exact, term) .`, + attrs: []string{"p"}, + edges: func(attrs []string) []*pb.DirectedEdge { + e := make([]*pb.DirectedEdge, 0, 2*n) + for i := 0; i < n; i++ { // 2 values per entity -> list postings + shared tokens + ent := uint64(1_000_000 + i) + e = append(e, strEdge(attrs[0], ent, fmt.Sprintf("tag%d", i%30))) + e = append(e, strEdge(attrs[0], ent, fmt.Sprintf("tag%d", (i+1)%30))) + } + return e + }, + }, + { + name: "upsert_and_noconflict", + schema: "up: string @index(exact) @upsert .\n nc: string @index(hash) @noconflict .", + attrs: []string{"up", "nc"}, + edges: func(attrs []string) []*pb.DirectedEdge { + e := make([]*pb.DirectedEdge, 0, 2*n) + for i := 0; i < n; i++ { + e = append(e, strEdge(attrs[0], uint64(1_000_000+i), fmt.Sprintf("u%d", i%50))) + e = append(e, strEdge(attrs[1], uint64(1_000_000+i), fmt.Sprintf("c%d", i%50))) + } + return e + }, + }, + { + name: "lang_fulltext", + schema: `p: string @index(fulltext) @lang .`, + attrs: []string{"p"}, + edges: func(attrs []string) []*pb.DirectedEdge { + // buildLangEdges (existing helper): n entities across several langs. + return buildLangEdges(attrs[0], n, 4) + }, + }, + { + name: "mixed_hot_predicate", // skewed multi-predicate, dominant scalar + reverse + index + schema: "name: string @index(exact) .\n friend: [uid] @reverse .\n age: int @index(int) .", + attrs: []string{"name", "friend", "age"}, + edges: func(attrs []string) []*pb.DirectedEdge { + e := make([]*pb.DirectedEdge, 0, n+200) + for i := 0; i < n; i++ { // dominant: name + e = append(e, strEdge(attrs[0], uint64(1_000_000+i), fmt.Sprintf("name%d", i%50))) + } + for i := 0; i < 100; i++ { + e = append(e, uidEdge(attrs[1], uint64(2_000_000+i), uint64(9_000_000+i%5))) + e = append(e, strEdge(attrs[2], uint64(3_000_000+i), fmt.Sprintf("%d", 20+i%40))) + } + return e + }, + }, + } +} + +// TestSchemaMatrixByteIdentical is the headline robustness test: for every +// schema/index combination, the proportional budget (fixed 8, fixed 32, and +// auto) must produce byte-identical committed state and identical conflict keys +// vs. the legacy (budget=0) path. Run under -race to also cover the concurrent +// data-write / index-tokenization / ProcessList paths. +func TestSchemaMatrixByteIdentical(t *testing.T) { + type budgetCfg struct { + name string + budget int + fraction float64 + minEdges int + } + budgets := []budgetCfg{ + {"fixed8", 8, 1.0, 256}, + {"fixed32", 32, 1.0, 256}, + {"auto", mutationsPipelineGoroutinesAuto, 1.0, 64}, + } + + var ts uint64 = 1_000_000 + next := func() (uint64, uint64, uint64) { ts += 100; return ts, ts + 1, ts + 2 } + + reset := func(t *testing.T, schemaText string) { + require.NoError(t, pstore.DropAll()) + MemLayerInstance.clear() + require.NoError(t, schema.ParseBytes([]byte(schemaText), 1)) + } + + for _, sc := range matrixCases() { + attrs := make([]string, len(sc.attrs)) + for i, a := range sc.attrs { + attrs[i] = x.AttrInRootNamespace(a) + } + for _, bc := range budgets { + sc, bc, attrs := sc, bc, attrs + t.Run(sc.name+"/"+bc.name, func(t *testing.T) { + // Baseline: legacy one-goroutine-per-predicate (budget 0). + reset(t, sc.schema) + s0, c0, r0 := next() + conf0 := runMatrixBatch(t, 0, 1.0, 256, s0, c0, sc.edges(attrs)) + base := snapshotPredicates(t, attrs, r0) + require.NotEmpty(t, base, "baseline wrote no keys for %s", sc.name) + + // Harden: prove the snapshot actually covers the secondary key + // types this schema declares, so the equality check is meaningful + // (not just comparing data keys). + data, index, reverse, count := keyKinds(t, base) + require.Positive(t, data, "expected data keys (%s)", sc.name) + if strings.Contains(sc.schema, "@index") { + require.Positive(t, index, "expected index keys (%s)", sc.name) + } + if strings.Contains(sc.schema, "@reverse") { + require.Positive(t, reverse, "expected reverse keys (%s)", sc.name) + } + if strings.Contains(sc.schema, "@count") { + require.Positive(t, count, "expected count keys (%s)", sc.name) + } + + // Candidate: the budget under test. + reset(t, sc.schema) + s1, c1, r1 := next() + confN := runMatrixBatch(t, bc.budget, bc.fraction, bc.minEdges, s1, c1, sc.edges(attrs)) + cand := snapshotPredicates(t, attrs, r1) + + require.Equal(t, base, cand, + "committed state must be byte-identical (%s, %s)", sc.name, bc.name) + require.Equal(t, conf0, confN, + "conflict-key set must be identical (%s, %s)", sc.name, bc.name) + }) + } + } +} diff --git a/worker/server_state.go b/worker/server_state.go index 66bdb3f8e4d..ae8c660a560 100644 --- a/worker/server_state.go +++ b/worker/server_state.go @@ -43,7 +43,9 @@ const ( `lambda-url=;` CacheDefaults = `size-mb=4096; percentage=40,40,20; remove-on-update=false` FeatureFlagsDefaults = `normalize-compatibility-mode=; enable-detailed-metrics=false; ` + - `log-slow-query-threshold=0; mutations-pipeline-threshold=1` + `log-slow-query-threshold=0; mutations-pipeline-threshold=1; ` + + `mutations-pipeline-goroutines=30; mutations-pipeline-goroutines-fraction=1.0; ` + + `mutations-pipeline-min-edges-per-worker=256` ) // ServerState holds the state of the Dgraph server. diff --git a/x/config.go b/x/config.go index 4613db0841f..ab14cab7212 100644 --- a/x/config.go +++ b/x/config.go @@ -151,6 +151,38 @@ type WorkerOptions struct { // Plumbed via the "feature-flags" superflag as // "mutations-pipeline-threshold". MutationsPipelineThreshold int + // MutationsPipelineGoroutines is the global budget of *intra-predicate* worker + // goroutines distributed across the predicates in a single mutation batch, + // proportionally to each predicate's edge count. A hot/dominant predicate is + // granted most of the budget so its data-write and index passes parallelize; + // tiny predicates get one worker each. The Go zero value 0 (and any value < 2, + // except the -1 sentinel) disables intra-predicate parallelism entirely: every + // predicate runs on a single goroutine exactly as the legacy path did, which + // keeps unit tests (which never set this) byte-for-byte identical to before. + // N>0 is an absolute, fixed budget of N. The special value -1 selects AUTO + // mode, where the budget is derived at runtime from GOMAXPROCS and the batch + // size (see posting.autoBudget) using MutationsPipelineGoroutinesFraction and + // MutationsPipelineMinEdgesPerWorker. Plumbed via the "feature-flags" superflag + // as "mutations-pipeline-goroutines"; production default is 30. + MutationsPipelineGoroutines int + // MutationsPipelineGoroutinesFraction is the fraction of GOMAXPROCS that AUTO + // mode (MutationsPipelineGoroutines == -1) may use for one mutation batch. The + // apply phase is serial across transactions — a single apply effectively owns + // the box — so the default is 1.0 (use all cores). A budget-sweep benchmark on + // an 8-core box showed fraction=0.5 leaves ~25% throughput on the table vs + // fraction>=1.0, with the peak near 2-3x cores; raise above 1.0 to oversubscribe + // or lower it to leave headroom for Badger compaction/queries/GC under heavy + // concurrent read load. Inert unless MutationsPipelineGoroutines == -1. Plumbed + // via the "feature-flags" superflag as "mutations-pipeline-goroutines-fraction"; + // default 1.0. + MutationsPipelineGoroutinesFraction float64 + // MutationsPipelineMinEdgesPerWorker is the minimum number of edges per + // intra-predicate worker that AUTO mode targets when deriving the budget + // (workCap = totalEdges / MutationsPipelineMinEdgesPerWorker). It mirrors + // x.DivideAndRule's 256-edge rule. Inert unless MutationsPipelineGoroutines == + // -1. Plumbed via the "feature-flags" superflag as + // "mutations-pipeline-min-edges-per-worker"; default 256. + MutationsPipelineMinEdgesPerWorker int } // WorkerConfig stores the global instance of the worker package's options.