feature: BackgroundTask.progressStream (Rx push) + ADR#589
Conversation
Follow-up to #588. - progressStream: Rx[P] on BackgroundTask — a push stream of progress updates that completes when the task finishes, for reactive consumers (live progress bars). Backed by an RxVar[Option[P]] used only as the notification channel; the poll snapshot stays on the AtomicReference (RxVar.get isn't cross-thread visibility-safe). Uses flatMap (not filter) to skip the initial replayed None — a false filter predicate emits OnCompletion downstream, which would prematurely end the stream; RxVar holds only the latest value, so an unconsumed stream doesn't accumulate. - JS: the body runs inline, so a later subscriber sees only the final value (not a live feed) — documented; use the poll snapshot on JS. - ADR adr/2026-06-20-background-task.md capturing the #588 design + the non-obvious decisions (Node worker_threads can't host a Scala closure -> JS inline, cooperative cancel + onCancel hook, catch-Throwable + finally-signal so await never hangs, hook draining on completion, the filter-vs-flatMap footgun). Linked from CLAUDE.md. Test: progressStream pushes reported progress and completes (JVM + Native). JVM 1651 / JS 1401 / Native 1413 pass. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a push-based progressStream: Rx[P] to BackgroundTask alongside its existing poll-based snapshot, allowing reactive consumers to observe progress updates. It also adds an Architecture Decision Record (ADR) detailing the cross-platform design choices of BackgroundTask and includes concurrency tests for the new stream on JVM and Native platforms. The feedback suggests wrapping progressVar.stop() in a try-catch block to guarantee that any exception thrown during stream completion does not prevent the completion gate from being signalled, which would otherwise cause await() to hang indefinitely.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
There was a problem hiding this comment.
Code Review
This pull request introduces a push-based Rx[P] progress stream (progressStream) to the cross-platform BackgroundTask utility, allowing reactive consumers to observe progress updates. The implementation uses RxVar as a notification channel while retaining AtomicReference as the thread-safe source of truth for polling. It also adds an Architecture Decision Record (ADR) detailing the design and cross-platform constraints of BackgroundTask, along with corresponding concurrency tests for JVM and Native platforms. No review comments were provided, so there is no additional feedback.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
A progressStream subscriber throwing during stream completion could propagate out of progressVar.stop() and skip gate.signal(), hanging await() forever. Complete the stream inside a try whose finally signals the gate. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Follow-up to #588 (
BackgroundTask, issue #552).What
progressStream: Rx[P]onBackgroundTask— a push stream of progress updates that completes when the task finishes, for reactive consumers (a live CLI progress bar / UI) that want to react rather than poll. The poll snapshot (progress) stays.adr/2026-06-20-background-task.mdcapturing feature: Add BackgroundTask — cross-platform cancellable, progress-pollable worker (#552) #588's non-obvious decisions, linked fromCLAUDE.md.Design notes (two real footguns avoided)
RxVar[Option[P]]used only as the notification channel;progressstays on theAtomicReferencebecauseRxVar.getreads a non-@volatilevar outside any lock — not safe for cross-thread polling.reportProgressupdates both.flatMap(notfilter) to skip the initial replayedNone: a falsefilterpredicate emitsOnCompletiondownstream (RxRunnerFilterOp), which would prematurely end the stream.flatMap { Some(p) => Rx.single(p); None => Rx.empty }skips theNoneinstead.RxVarholds only the latest value, so an unconsumed stream doesn't accumulate (no buffering leak, unlikeRx.queue).start(), so a later subscriber sees only the final value (not a live feed) — documented; use the poll snapshot on JS.Testing
progressStream pushes reported progress and completes(JVM + Native; not cross-platform since JS has nothing live to observe). Full suites: JVM 1651 / JS 1401 / Native 1413.Plan:
plans/2026-06-20-background-task-progress-stream.md.🤖 Generated with Claude Code