fix(memory): bound ingestion queue to prevent OOM under runaway producers#2451
fix(memory): bound ingestion queue to prevent OOM under runaway producers#2451M3gA-Mind wants to merge 6 commits into
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughSwitch ingestion from an unbounded to a bounded Tokio mpsc channel (DEFAULT_QUEUE_CAPACITY). IngestionQueue stores a bounded sender and capacity; submit uses try_send to surface Full vs Closed; worker startup accepts capacity. Tests exercise full, closed, and recovery scenarios. Also add German i18n entries for MCP Server UI. ChangesBounded ingestion queue with backpressure
German i18n chunk
Sequence Diagram(s)sequenceDiagram
participant Producer
participant IngestionQueue
participant IngestionState
participant IngestionWorker
Producer->>IngestionQueue: submit(IngestionJob) via try_send
IngestionQueue->>IngestionState: enqueue() (increment)
IngestionQueue-->>Producer: return true (accepted) / false (Full or Closed)
IngestionWorker->>IngestionQueue: receive job from bounded mpsc::Receiver
IngestionWorker->>IngestionState: acquire() (process job)
IngestionWorker-->>IngestionState: release() (decrement handled by enqueue/worker logic)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
src/openhuman/memory/ingestion/queue.rs (1)
137-140: ⚡ Quick winUse
debug!for this worker lifecycle log.
background worker startedis diagnostic state-transition logging, soinfo!is noisier than the repo convention for this class of event.As per coding guidelines,
src/**/*.rs: Uselogortracingcrate atdebugortracelevel for Rust diagnostic logs.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/openhuman/memory/ingestion/queue.rs` around lines 137 - 140, Change the lifecycle log from info to debug: replace the log::info! invocation that emits "[memory:ingestion_queue] background worker started (capacity={})" with a debug! call so this diagnostic state-transition uses debug level; ensure the debug macro from the log/tracing crate is in scope (or import it) and keep the same message and capacity argument, targeting the background worker startup code in the ingestion queue (the line that currently calls log::info! with capacity).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/openhuman/memory/ingestion/queue.rs`:
- Around line 44-49: The backpressure log currently hardcodes the default
capacity constant; change the queue handle to store the actual capacity used
when created (add a capacity field to IngestionQueue or the public handle
returned by start_worker_with_capacity), set that field inside
start_worker_with_capacity when you create the mpsc channel, and update every
TrySendError::Full log path (including the send path and the other occurrences
mentioned) to log self.capacity (or handle.capacity) instead of the
DEFAULT_QUEUE_CAPACITY constant so the message reflects the real configured
queue size.
- Around line 237-271: Add a new test that exercises the recovery path after
transient backpressure: create an IngestionState and an mpsc channel with
capacity >1, fill the channel slot directly (e.g.,
tx.try_send(make_dummy_job("filler"))), construct the IngestionQueue via
IngestionQueue::from_parts(tx, state.clone()), assert that queue.submit(...)
initially returns false due to Full, then drain the channel (receive the
buffered item from the receiver) and call queue.submit(...) again and assert it
now returns true and that state.snapshot().queue_depth reflects the enqueued
job; use make_dummy_job to create jobs and mirror the style of the existing
submit_when_full_returns_false test to prove recovery works.
---
Nitpick comments:
In `@src/openhuman/memory/ingestion/queue.rs`:
- Around line 137-140: Change the lifecycle log from info to debug: replace the
log::info! invocation that emits "[memory:ingestion_queue] background worker
started (capacity={})" with a debug! call so this diagnostic state-transition
uses debug level; ensure the debug macro from the log/tracing crate is in scope
(or import it) and keep the same message and capacity argument, targeting the
background worker startup code in the ingestion queue (the line that currently
calls log::info! with capacity).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 25cb53a9-5666-4d71-a2b7-506169cc0971
📒 Files selected for processing (2)
src/openhuman/memory/ingestion/mod.rssrc/openhuman/memory/ingestion/queue.rs
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/openhuman/memory/ingestion/queue.rs (1)
250-274: ⚡ Quick winRemove unused variables and misleading comment.
The
txand_rxcreated on lines 252-253 are never used for testing; the comment on line 254 says "Fill the channel slot" but the subsequent code creates different channels (tx2for the Closed test,tx3for the Full test). The unusedtxis only dropped on line 273 to silence a warning, which makes the test harder to follow.♻️ Simplify test by removing unused setup
#[tokio::test] async fn submit_when_full_returns_false() { let state = IngestionState::new(); - let (tx, _rx) = mpsc::channel::<IngestionJob>(1); - // Fill the channel slot with a dummy send so it's full. let (tx2, rx2) = mpsc::channel::<IngestionJob>(1); drop(rx2); // closed channel — worker gone // Test the Closed branch via from_parts with a closed receiver. let queue = IngestionQueue::from_parts(tx2, state.clone(), 1); assert!(!queue.submit(make_dummy_job("orphan"))); assert_eq!(state.snapshot().queue_depth, 0); // Test the Full branch: capacity-1 channel, fill it, then try another. let state2 = IngestionState::new(); let (tx3, _rx3) = mpsc::channel::<IngestionJob>(1); // Send one item to fill it (bypassing submit to avoid incrementing state). tx3.try_send(make_dummy_job("filler")).ok(); let queue2 = IngestionQueue::from_parts(tx3, state2.clone(), 1); assert!(!queue2.submit(make_dummy_job("overflow"))); // Depth should be 0 — enqueue was rolled back. assert_eq!(state2.snapshot().queue_depth, 0); - - drop(tx); // silence unused warning }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/openhuman/memory/ingestion/queue.rs` around lines 250 - 274, The test submit_when_full_returns_false contains unused variables tx and _rx and a misleading comment; remove the unused channel creation (tx, _rx) and the final drop(tx) no-op, and update the comment that currently reads "Fill the channel slot" to accurately describe the two separate setups (closed receiver via tx2/rx2 and pre-filled channel via tx3) so readers can follow the Closed and Full branches tested using IngestionQueue::from_parts and the IngestionQueue::submit assertions against state and state2.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@src/openhuman/memory/ingestion/queue.rs`:
- Around line 250-274: The test submit_when_full_returns_false contains unused
variables tx and _rx and a misleading comment; remove the unused channel
creation (tx, _rx) and the final drop(tx) no-op, and update the comment that
currently reads "Fill the channel slot" to accurately describe the two separate
setups (closed receiver via tx2/rx2 and pre-filled channel via tx3) so readers
can follow the Closed and Full branches tested using IngestionQueue::from_parts
and the IngestionQueue::submit assertions against state and state2.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 894f5d0e-c39c-4dbd-b65e-06a085c368d0
📒 Files selected for processing (1)
src/openhuman/memory/ingestion/queue.rs
|
The |
graycyrus
left a comment
There was a problem hiding this comment.
Solid fix — bounded channel at 512 with proper try_send backpressure is exactly right for this. Tests cover all three paths (full, closed, recovery). Clean PR, one minor nit below.
| File | Change |
|---|---|
queue.rs |
unbounded_channel → channel(512), send → try_send with Full/Closed arms, capacity field, 3 tests |
mod.rs |
Re-export DEFAULT_QUEUE_CAPACITY |
| mod tests { | ||
| use super::*; | ||
| use tokio::sync::mpsc; | ||
|
|
There was a problem hiding this comment.
[minor] Dead channel — (tx, _rx) is created here but never used (just drop(tx) at the end to silence the warning). The comment on the next line ("Fill the channel slot…") is a leftover from an earlier draft and doesn't match the code. Safe to remove both the allocation and the stale comment.
|
Addressed the nit in two commits:
Test compiles cleanly locally ( |
|
The Root cause: commit This PR only touches The fix needs to come from the author of |
|
All three CI failures on this PR share the same root cause (none introduced by this PR):
Root cause: commit Every PR whose CI merge commit includes that commit will fail these checks until the translations are added to the locale chunk files. This PR's changes are purely in The fix belongs in |
…cers Replaces the unbounded mpsc channel in IngestionQueue with a bounded one (DEFAULT_QUEUE_CAPACITY = 512). try_send now distinguishes Full (shed job, log warn) from Closed (worker gone), and start_worker_with_capacity allows tests to exercise backpressure deterministically. Closes tinyhumansai#2442
- Store actual `capacity` field on `IngestionQueue` so the Full-path warn log reflects the real configured size instead of `DEFAULT_QUEUE_CAPACITY` - Demote worker-start lifecycle log from `info!` to `debug!` per repo convention for diagnostic state-transition events - Update `from_parts` (test helper) to accept a `capacity` argument - Add `submit_recovers_after_backpressure` test: fills channel, asserts submit returns false, drains one slot, asserts subsequent submit returns true and queue_depth increments correctly
…l test Removes the unused `(tx, _rx)` binding and its stale comment that was left over from an earlier draft of the test. The two test cases (closed channel and full channel) are fully expressed by `tx2/rx2` and `tx3/_rx3`.
Commit 82883f5 removed the dead (tx, _rx) allocation but missed the accompanying drop(tx) at end of the test body, causing a compile error.
Commit 4e5eaa7 added 18 settings.mcpServer.* / settings.developerMenu.mcpServer.* keys to en.ts and en-5.ts but omitted the German locale (de-5.ts). Every other locale already has these keys. Adding English-identical placeholder strings so the i18n coverage and locale-completeness unit tests pass while proper translations are pending.
19459fd to
6dedd4c
Compare
|
Follow-up: fixed the i18n failures directly rather than waiting for a separate PR. What I found: Fix (commit
|
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/openhuman/memory/ingestion/queue.rs (1)
250-270: 💤 Low valueConsider splitting test or renaming for clarity.
submit_when_full_returns_falseactually tests both theClosedpath (lines 253-259) and theFullpath (lines 261-269), but the name only describes the Full scenario. Sincesubmit_when_worker_gone_returns_falsealready covers the Closed case in isolation, the first half of this test is redundant.You could either remove lines 252-259 (Closed is tested elsewhere) or rename this test to reflect both scenarios—but the coverage is correct as-is.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/openhuman/memory/ingestion/queue.rs` around lines 250 - 270, The test submit_when_full_returns_false mixes two cases (Closed and Full); either remove the redundant Closed portion (the block using rx2/drop and IngestionQueue::from_parts(tx2, state.clone(), 1)) since submit_when_worker_gone_returns_false already covers the Closed path, or rename the test to something like submit_when_closed_or_full_returns_false to reflect both branches; update assertions/comments accordingly and keep the second block that fills the channel (tx3.try_send(...), IngestionQueue::from_parts(tx3, state2.clone(), 1), assert!(!queue2.submit(...)), assert_eq!(state2.snapshot().queue_depth, 0)).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@src/openhuman/memory/ingestion/queue.rs`:
- Around line 250-270: The test submit_when_full_returns_false mixes two cases
(Closed and Full); either remove the redundant Closed portion (the block using
rx2/drop and IngestionQueue::from_parts(tx2, state.clone(), 1)) since
submit_when_worker_gone_returns_false already covers the Closed path, or rename
the test to something like submit_when_closed_or_full_returns_false to reflect
both branches; update assertions/comments accordingly and keep the second block
that fills the channel (tx3.try_send(...), IngestionQueue::from_parts(tx3,
state2.clone(), 1), assert!(!queue2.submit(...)),
assert_eq!(state2.snapshot().queue_depth, 0)).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 90108797-f45d-42af-9cf1-144608e6e021
📒 Files selected for processing (3)
app/src/lib/i18n/chunks/de-5.tssrc/openhuman/memory/ingestion/mod.rssrc/openhuman/memory/ingestion/queue.rs
✅ Files skipped from review due to trivial changes (1)
- src/openhuman/memory/ingestion/mod.rs
subconscious.providerUnavailableTitle and subconscious.providerSettings were present in all other locale chunk-3 files but absent from de-3.ts. Adding English-identical placeholder strings to unblock i18n coverage.
|
One more batch of missing German keys found and fixed (commit
These belonged in |
Summary
mpsc::unbounded_channelwithmpsc::channel(512)inIngestionQueue— caps peak memory at a bounded upper limit under any producer burst.submit()now usestry_send()and distinguishesTrySendError::Full(shed job, log warn, returnfalse) fromTrySendError::Closed(worker gone, existing behaviour).start_worker_with_capacityfor deterministic backpressure tests;start_worker_with_statedelegates to it with the default 512 cap.DEFAULT_QUEUE_CAPACITYfromingestion::modso callers can reference the constant.Problem
start_worker_with_stateusedmpsc::unbounded_channel, allowing a misbehaving skill or sync loop callingput_docin a tight loop to enqueue indefinitely while the single-threaded worker drains one job at a time (seconds-to-minutes each). At typical doc sizes the channel would consume unbounded memory before the worker caught up — an OOM kill with no user-visible warning.Solution
Bounded channel at capacity 512 absorbs realistic bursts (bulk Notion/Slack backfill, ~200-doc skill syncs) while hard-capping memory pressure. When full,
submit()logs a warning and returnsfalse; both existing call sites inclient.rsalready ignore the return value, so producer behaviour under normal load is unchanged.Submission Checklist
submit_when_full_returns_falseandsubmit_when_worker_gone_returns_falseinqueue.rscargo testpasses cleanCloses #2442in RelatedImpact
Related
DomainEvent::MemoryIngestionEnqueueDroppedfor observability surfacing (out of scope for this fix).AI Authored PR Metadata
Linear Issue
Commit & Branch
Validation Run
pnpm --filter openhuman-app format:checkpnpm typecheck— N/A (Rust-only change)cargo test -p openhuman queue— 2 new tests passcargo fmt --all -- --checkpasses,cargo clippy -p openhumancleanValidation Blocked
command:N/Aerror:N/Aimpact:N/ABehavior Changes
Parity Contract
start_worker_with_statesignature unchanged; bothput_doc/store_skill_synccall sites unaffectedClosedarm matches previoussenderror handlingDuplicate / Superseded PR Handling
Summary by CodeRabbit
Refactor
Tests
Localization