feat(sync): request-driven pre_confirmed polling#3694
Conversation
|
Claude finished @RafaelGranza's task in 5m 2s —— View job PR ReviewReviewed against Important / worth a closer look
Nits
Looks good
Inline comments posted on the specific lines. |
| preLatestCacheSize = 10 | ||
| // Per-fetch cap so a hung feeder can't hold the running guard for the | ||
| // feeder client's full retry budget (~20s). TODO: consider exposing as flag. | ||
| preConfirmedFetchTimeout = 2 * time.Second |
There was a problem hiding this comment.
Hardcoded for now. Worth exposing as a flag, or fine to keep at 2s?
| numCallsPreConfirmed atomic.Uint32 | ||
| numCallsPending atomic.Uint32 |
There was a problem hiding this comment.
This is to avoid race conditions during tests
| preLatestCacheSize = 10 | ||
| // Per-fetch cap so a hung feeder can't hold the running guard for the | ||
| // feeder client's full retry budget (~20s). TODO: consider exposing as flag. | ||
| preConfirmedFetchTimeout = 2 * time.Second |
There was a problem hiding this comment.
Question — 2s timeout vs single feeder attempt.
The feeder client (clients/feeder/feeder.go) sets HTTP timeouts that grow under load (mediumGrowThreshold / IncreaseTimeout). With preConfirmedPollInterval defaulting to 500ms, the cap of 2s is fine for the steady-state case, but a single HTTP attempt that's already paying an increased timeout (say >1s) plus the backoff before the retry will not complete inside 2s. The comment frames this as "can't hold the running guard for the feeder client's full retry budget (~20s)" — but in practice the cap also cancels any retry path, which means under a slow feeder we'll silently never recover within a single fetchPreConfirmed call. The next trigger/tick will restart the attempt from scratch.
That may be the intent (you'd rather retry-from-scratch on the next 500ms tick than wait), but worth being explicit in the comment, and worth confirming this is acceptable on a saturated/sluggish feeder. Two thoughts:
- Consider tying the timeout to
max(preConfirmedPollInterval*N, floor)so the cap scales with the configured cadence — a node configured with--preconfirmed-poll-interval=5sprobably doesn't want a 2s cap. - Optionally surface as a flag (the existing TODO hints at this).
There was a problem hiding this comment.
IMO, this is NOT a problem, since the ticker will keep trying if the request failed.
| defer func() { | ||
| select { | ||
| case <-s.preConfirmedTrigger: | ||
| default: | ||
| } | ||
| s.preConfirmedFetching.Store(false) | ||
| }() |
There was a problem hiding this comment.
Subtle: the drain-then-release order can silently swallow a freshness request.
The ordering here is drain trigger → set fetching=false. Consider the trigger storm in the new test (line 460-464): all 100 requests during the fetch are dropped at requestPreConfirmedRefresh because preConfirmedFetching==true. That's the documented behavior. But the buffered slot drained here represents an additional request that arrived in the tiny window between the polling goroutine reading from preConfirmedTrigger and fetchPreConfirmed setting preConfirmedFetching=true. That request was sent after the trigger this fetch was started for, so the fetch result is not necessarily fresh enough to satisfy it.
In practice the next RPC call re-triggers, so it self-heals — but worth a one-line note in the comment that the drain assumes "any trigger queued during the fetch was for data the fetch already covers." Today the comment says "a refresh arriving during this call doesn't fire a back-to-back fetch", which is true but slightly misleading about whose request gets dropped.
A subtle hardening alternative: flip the order — clear the flag first, then drain. That makes the drain semantically "drop triggers that would race with the next iteration's own read", and any request that arrived in the post-drain window queues a fresh fetch on the next iteration. The cost is one possibly-redundant fetch immediately afterwards, but in exchange you never silently lose a request. Up to you whether the current trade-off is worth keeping — just call it out.
There was a problem hiding this comment.
I'll apply changes
| // Every read is a vote that the cached pre_confirmed should be refreshed. | ||
| // The polling goroutine picks this up on its next iteration and any subscribers | ||
| // of preConfirmedDataFeed (e.g. WebSocket clients) benefit from the same fetch. | ||
| s.requestPreConfirmedRefresh() |
There was a problem hiding this comment.
Doc accuracy: the triggering caller does not benefit from the triggered fetch.
This call is fire-and-forget — the fetch is async, so this PreConfirmed() call returns the currently stored s.preConfirmed.Load() below. The fresh data is observed only by:
- the next
PreConfirmed()caller (after the fetch updatess.preConfirmed), or - subscribers of
preConfirmedDataFeed.
The PR description ("Each call to PreConfirmed() now triggers a fresh pre_confirmed fetch") reads as if the caller gets the fresh result. They don't. Worth tightening the inline comment here to say something like:
Note: this is async — the current call returns the cached value. Fresh data is observed by the next caller and by
preConfirmedDataFeedsubscribers.
Also: requestPreConfirmedRefresh() runs even when preConfirmedPollInterval == 0 (polling disabled). It's harmless (channel fills to 1, subsequent sends drop), but slightly wasteful. Consider gating with if s.preConfirmedPollInterval > 0.
There was a problem hiding this comment.
IMO, if the client is dictating the request frequency, this implies it requests frequently.
So not returning the fresher info is not necessary, since the call will re-trigger again, from ticker or request.
| time.Sleep(50 * time.Millisecond) | ||
| require.Equal(t, uint32(1), mockDS.numCallsPreConfirmed.Load(), | ||
| "trigger storm during fetch should not produce additional fetches") |
There was a problem hiding this comment.
Potentially flaky on slow CI.
The time.Sleep(50ms) followed by a strict require.Equal(t, uint32(1), ...) assumes the polling goroutine has had time to (a) deliver out, (b) loop back to the select, and (c) not be woken by anything. On a heavily loaded CI runner 50ms can be tight, and any spurious wake (e.g. the ticker — even at time.Hour it shouldn't fire, but other timers in the runtime could affect scheduling) would flip the count.
Two cleaner options:
require.Never(t, func() bool {
return mockDS.numCallsPreConfirmed.Load() != 1
}, 200*time.Millisecond, 10*time.Millisecond,
"trigger storm during fetch should not produce additional fetches")or use testing/synctest (already imported in this file) to make the test deterministic. The first is the smallest change.
| return | ||
|
|
||
| case <-s.preConfirmedTrigger: | ||
| case <-ticker.C: |
There was a problem hiding this comment.
Nit: ticker tick buffered during a fetch causes a "free" second fetch.
With default preConfirmedPollInterval=500ms and a fetch that takes most of the 2s timeout (slow feeder, hung connection, etc.), at least one ticker.C will fire and buffer in time.Ticker's 1-slot buffer. The instant fetchPreConfirmed returns and the loop re-enters the select, the buffered tick fires immediately and starts another fetch. That fetch is fine in isolation, but it makes the "1 fetch per interval" intent slightly misleading under slow-feeder conditions.
If you care, drain the ticker in the same defer as the trigger inside fetchPreConfirmed:
defer func() {
select { case <-s.preConfirmedTrigger: default: }
select { case <-ticker.C: default: } // would need ticker passed in
s.preConfirmedFetching.Store(false)
}()Not a blocker — this PR doesn't make the situation worse than the pre-PR behavior — but a natural companion if you're already adding the trigger drain.
There was a problem hiding this comment.
I'm inclined of not caring about it.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## thiagodeev/feeder-preconfirmed-optimization #3694 +/- ##
===============================================================================
+ Coverage 76.51% 76.52% +0.01%
===============================================================================
Files 402 402
Lines 36952 36936 -16
===============================================================================
- Hits 28273 28266 -7
+ Misses 6697 6686 -11
- Partials 1982 1984 +2 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
Claude finished @RafaelGranza's task in 3m 13s —— View job PR Review (incremental — only the new commit since prior review)
Only one new commit since the prior review: New finding (this push)Nit — Re: author responses on prior findings
Looks good
|
| func TestPreConfirmedUpdateFrequency(t *testing.T) { | ||
| testDB := memory.New() | ||
| bc := blockchain.New(testDB, &networks.Sepolia, | ||
| blockchain.WithNewState(statetestutils.UseNewState())) | ||
| client := feeder.NewTestClient(t, &networks.Sepolia) | ||
| gw := adaptfeeder.New(client) | ||
| dataSource := NewFeederGatewayDataSource(bc, gw) | ||
|
|
||
| head0, err := gw.BlockByNumber(t.Context(), 0) | ||
| require.NoError(t, err) | ||
| su0, err := gw.StateUpdate(t.Context(), 0) | ||
| require.NoError(t, err) | ||
| require.NoError(t, bc.Store(head0, &core.BlockCommitments{}, su0, | ||
| map[felt.Felt]core.ClassDefinition{})) | ||
|
|
||
| const ( | ||
| tickerInterval = 500 * time.Millisecond // default --preconfirmed-poll-interval | ||
| runDuration = 5 * time.Second | ||
| triggerRate = 50 * time.Millisecond // simulated RPC traffic at 20 Hz | ||
| ) | ||
|
|
||
| type scenario struct { | ||
| name string | ||
| triggerInterval time.Duration // 0 means no triggers (ticker-only baseline) | ||
| } | ||
| scenarios := []scenario{ | ||
| {"baseline_ticker_only", 0}, | ||
| {"request_driven_20Hz", triggerRate}, | ||
| } | ||
|
|
||
| // Run each scenario across a few realistic feeder latencies. | ||
| latencies := []time.Duration{ | ||
| 50 * time.Millisecond, | ||
| 100 * time.Millisecond, | ||
| 200 * time.Millisecond, | ||
| } | ||
|
|
||
| for _, fetchLatency := range latencies { | ||
| for _, sc := range scenarios { | ||
| name := fmt.Sprintf("%s/fetch=%s", sc.name, fetchLatency) | ||
| t.Run(name, func(t *testing.T) { | ||
| synctest.Test(t, func(t *testing.T) { | ||
| mockDS := &MockDataSource{ | ||
| DataSource: dataSource, | ||
| PreConfirmedFunc: func( | ||
| ctx context.Context, | ||
| number uint64, | ||
| _ string, _ uint64, _ uint, | ||
| ) (pending.PreConfirmedUpdate, error) { | ||
| select { | ||
| case <-time.After(fetchLatency): | ||
| case <-ctx.Done(): | ||
| return pending.PreConfirmedUpdate{}, ctx.Err() | ||
| } | ||
| preConf := makeTestPreConfirmed(number) | ||
| preConf.BlockIdentifier = "mock" | ||
| return pending.PreConfirmedUpdate{ | ||
| Mode: pending.PreConfirmedFull, | ||
| BlockIdentifier: preConf.BlockIdentifier, | ||
| FullBlock: &preConf, | ||
| }, nil | ||
| }, | ||
| } | ||
| syn := New(bc, mockDS, log.NewNopZapLogger(), 0, tickerInterval, false, testDB) | ||
| syn.highestBlockHeader.Store(head0.Header) | ||
|
|
||
| var target atomic.Uint64 | ||
| target.Store(1) | ||
| out := make(chan *pending.PreConfirmedUpdate, 1024) | ||
|
|
||
| ctx, cancel := context.WithCancel(t.Context()) | ||
|
|
||
| go syn.pollPreConfirmed(ctx, &target, out) | ||
|
|
||
| var emitted atomic.Uint32 | ||
| if sc.triggerInterval > 0 { | ||
| go func() { | ||
| ticker := time.NewTicker(sc.triggerInterval) | ||
| defer ticker.Stop() | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case <-ticker.C: | ||
| emitted.Add(1) | ||
| syn.requestPreConfirmedRefresh() | ||
| } | ||
| } | ||
| }() | ||
| } | ||
|
|
||
| var intervals []time.Duration | ||
| var lastUpdate time.Time | ||
| recorderDone := make(chan struct{}) | ||
| go func() { | ||
| defer close(recorderDone) | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case <-out: | ||
| now := time.Now() | ||
| if !lastUpdate.IsZero() { | ||
| intervals = append(intervals, now.Sub(lastUpdate)) | ||
| } | ||
| lastUpdate = now | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| time.Sleep(runDuration) | ||
| cancel() | ||
| <-recorderDone | ||
|
|
||
| fetches := mockDS.numCallsPreConfirmed.Load() | ||
| trigs := emitted.Load() | ||
| var dropped uint32 | ||
| if trigs >= fetches { | ||
| dropped = trigs - fetches | ||
| } | ||
| updates := uint32(len(intervals) + 1) | ||
| if lastUpdate.IsZero() { | ||
| updates = 0 | ||
| } | ||
| t.Logf( | ||
| "%s fetch=%s: updates=%d (%.1f/s) fetches=%d "+ | ||
| "triggers_emitted=%d triggers_dropped=%d "+ | ||
| "mean_interval=%v p99_interval=%v", | ||
| sc.name, fetchLatency, | ||
| updates, float64(updates)/runDuration.Seconds(), | ||
| fetches, trigs, dropped, | ||
| meanDuration(intervals), p99Duration(intervals), | ||
| ) | ||
| }) | ||
| }) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Nit: this is a Test* that asserts nothing — consider adding bounds so it doubles as a regression test.
The function reads as a benchmark probe (logs metrics via t.Logf, no require.* calls), but it lives under Test* rather than Benchmark* so it runs on every go test ./sync/... invocation. Two consequences:
-
No regression coverage. If
pollPreConfirmedever stopped honoring the running guard and started firing one fetch per trigger, this test would silently log a 20× increase infetchesand still pass. Cheap addition for the request-driven scenario:if sc.triggerInterval > 0 { // With fetch latency >= triggerInterval, the running guard must drop // most triggers; fetches should be bounded by ceil(runDuration/fetchLatency). maxFetches := uint32(runDuration/fetchLatency) + 2 require.LessOrEqual(t, fetches, maxFetches, "running guard regressed: expected at most %d fetches, got %d", maxFetches, fetches) }
-
Minor noise in CI. Six subtests of 5s virtual time each — fast under
synctest(well under 1s wall-clock), but the docstring should probably set expectations (or rename toBenchmark*and call from a custom harness).
Also a small metrics-accuracy nit: after cancel(), the trigger-emitting goroutine can fire emitted.Add(1) one more time before observing ctx.Done(), while the main proceeds to emitted.Load() only after <-recorderDone. Off by ≤1 — atomics make this safe, just slightly imprecise in the printed triggers_emitted. A synctest.Wait() after cancel() would pin it.
Not blocking — file is a diagnostic, not load-bearing.
|
Claude finished @RafaelGranza's task in 3m 3s —— View job PR Review (incremental — only the new commit since prior review)
Only one new commit since the prior review: New findingsNone. The synctest rewrite is sound and directly addresses the flake nit from the prior review. Tracing the scenario through the bubble:
Two minor things to note (not new — context for the rewrite):
Re: prior-review items still open
Looks good
|
Summary
PreConfirmed()now triggers a freshpre_confirmedfetch instead of waiting for the next ticker.--preconfirmed-poll-intervalticker is now a fallback when there is no RPC traffic to drive fetches.Benchmark
Numbers come from
TestPreConfirmedUpdateFrequency: a 5-second simulation where one RPC call hitsPreConfirmed()every 50ms (Mimicking a client, or more than one, dictating a higher request frequency).Run:
go test -v -run TestPreConfirmedUpdateFrequency ./sync/The running guard caught 35-77% of triggers as duplicates, so it was not a feeder hammer.