scheduler: don't wait on input dependencies when cff.Predicate returns false#105
Open
Saijayavinoth wants to merge 2 commits into
Open
scheduler: don't wait on input dependencies when cff.Predicate returns false#105Saijayavinoth wants to merge 2 commits into
Saijayavinoth wants to merge 2 commits into
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #105 +/- ##
=======================================
Coverage 67.94% 67.94%
=======================================
Files 32 32
Lines 2059 2084 +25
=======================================
+ Hits 1399 1416 +17
- Misses 636 640 +4
- Partials 24 28 +4 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
lint: fix .golangci.yml to pass on golangci-lint v1.64.x Two changes, both surfacing because v1.64.x added strict `config verify` that older versions silently bypassed: - Fix niliness → nilness (typo in govet enable list since e66de90). Older golangci-lint silently ignored the unknown name; v1.64.x rejects it during config verification. - Exclude fmt.Fprint/Fprintf/Fprintln from errcheck. Once config verify passes, errcheck actually runs and catches conventional fmt-write callsites where the returned error is intentionally ignored. Standard exclusion in Go projects.
When cff.Predicate returns false, the consumer's task body is skipped
via its existing 'if !p<hash> { return nil }' gate — but today the
scheduler still waits for every declared input dependency before that
gate runs, so unrelated slow inputs impose their full latency.
Introduce PredicateJob and Scheduler.EnqueuePredicate so the scheduler
can recognize predicate evaluations. When a PredicateJob completes
with PredicateResult=false, walk its consumers in the done branch,
set remaining=0, and push to ready immediately, bypassing the wait on
unrelated data dependencies. Late-enqueue race handled by caching the
predicate's bool on ScheduledJob.predicateResult so consumers
enqueued after the predicate completed can also fast-dispatch.
Safety: the consumer wrapper's skip gate structurally precedes any
read of data-dep values, so early-dispatching it races no producer
write. Existing tests pass unchanged; consumer codegen is
byte-identical.
Benchmark (AMD EPYC 7B13, 5ms slow dep):
- Before (early-dispatch disabled): 5,219,513 consumer_ns/op
- After (early-dispatch enabled): 45,364 consumer_ns/op
~115x reduction in consumer dispatch latency.
Closes #104
ed87797 to
9ab9f6e
Compare
Author
|
@abhinav PTAL |
Collaborator
|
Apologies, @Saijayavinoth I don't work at Uber anymore. Someone from Uber can likely review. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #104
Problem
When
cff.Predicatereturns false, the task body is skipped — but the scheduler still waits for every declared input dependency to finish first. An unrelated slow input imposes its full latency on the consumer, even though the predicate=false result is what decides the skip and the input value is never read. See linked issue for reproducer.Change
scheduler)PredicateJob{Run func(ctx)(bool,error); Dependencies []*ScheduledJob}+Scheduler.EnqueuePredicate. Re-exported ascff.PredicateJob.predicateRun; done-branch fast-dispatches consumers on!result && err==nil; enqueue handler does the same for consumers enqueued after the predicate completed (cached inScheduledJob.predicateResult).EnqueuePredicate(PredicateJob{}); Run sig changes fromerrorto(bool, error). Consumer codegen byte-identical — its existingif !p<hash> { return nil }gate is untouched.Safety
Fast-dispatch appears to race the slow producer's write to the consumer's input value. It does not:
if !p<hash> { return nil }; ... use(inputs...), so onpredicate=falsethe function returns before any input is dereferenced.consumer.remainingpast zero (to−1). Theif remaining == 0push-to-ready check no longer matches, so the consumer is not re-dispatched. Producer errors still propagate to the flow's error path via the normal mechanism.invalid; fast-dispatch still pushes it to ready, and the worker'sj.invalidcheck fires before any body code runs, surfacing the error aserrJobInvalid— identical to pre-PR behavior underContinueOnError.predicateResultonScheduledJobin the done branch; enqueue handler reads it and fast-dispatches inline.Verification
go test -race ./scheduler/...— passes, including newTestPredicateEarlyDispatch,TestPredicateTrueWaitsForDeps,TestPredicateError.make test— 21/22internal/tests/packages green. The one failure (TestPanicRecovered) is a pre-existing Go-version stack-format issue; reproduces with this change reverted.TestBlockingInputs_PredicateShortCircuits— asserts elapsed < slowDelay/10 for a 100ms slow dep.BenchmarkPredicateFalseWithSlowDep(internal/tests/benchmark/), 5ms slow dep, AMD EPYC 7B13:Wall time bounded by
Wait()draining the slow producer;consumer_ns/opis the metric of interest: ~159× reduction.Out of scope (planned follow-up PR)
Fast-dispatch invalid consumers — same mechanism applied to consumers whose dep errored (
j.invalidshort-circuits without dereferencing inputs). Only meaningful underContinueOnError; needs its own test coverage. Will open as a follow-up after this lands.