-
Notifications
You must be signed in to change notification settings - Fork 54
feat: add queue size warning logs for high parallelism detection #1054
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
- Add logging to applyHeadQueue in crdt-clock.ts with warnings when size > 5 - Add logging to writeQueue in write-queue.ts with warnings when size > 10 - Include debug logs for all queue operations with size and context - Add comprehensive tests for queue logging functionality - Tests cover normal operations, high concurrency, and error handling Helps identify when applications hit the database with excessive parallelism which can cause performance degradation. Warnings help developers optimize their access patterns before performance issues occur. Resolves #1053
WalkthroughAdds debug and warning logs to CRDT apply-head and write queue to report queue sizes (warn thresholds: >5 and >10). Introduces runtime tests exercising logging paths, concurrency, ordering, and error propagation with mocked workers/DB. No public API changes or algorithm modifications. Changes
Sequence Diagram(s)Estimated code review effort🎯 3 (Moderate) | ⏱️ ~18 minutes Assessment against linked issues
Possibly related PRs
Suggested reviewers
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
Status, Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🔭 Outside diff range comments (1)
core/base/crdt-clock.ts (1)
171-179: Async validation is not awaited; blocks may not be validated before use.
validateBlocksmaps to async functions but never awaits them. This means the subsequent logic may proceed before the validations complete, causing race conditions and misleading errors later.Apply this fix to await validation:
async function validateBlocks(logger: Logger, newHead: ClockHead, blockstore?: BaseBlockstore) { if (!blockstore) throw logger.Error().Msg("missing blockstore"); - newHead.map(async (cid) => { - const got = await blockstore.get(cid); - if (!got) { - throw logger.Error().Str("cid", cid.toString()).Msg("int_applyHead missing block").AsError(); - } - }); + for (const cid of newHead) { + const got = await blockstore.get(cid); + if (!got) { + throw logger.Error().Str("cid", cid.toString()).Msg("int_applyHead missing block").AsError(); + } + } }
🧹 Nitpick comments (11)
core/base/crdt-clock.ts (3)
118-123: Good, targeted queue-size observability; consider throttling to prevent warn log floods.The added warn/debug logs align with the PR objective and provide immediate visibility. Under heavy load, this can emit a warn per call. Consider throttling (e.g., once per N seconds while threshold is exceeded) to avoid excessive logs in stress scenarios and production.
Also consider:
- Unifying field keys across modules (e.g., use "queueSize" in both apply/write queues) to simplify log querying.
- Extracting thresholds as constants to prepare for the planned “configurable thresholds” enhancement.
167-169: Avoid in-place sort to prevent subtle side-effects on shared arrays.
sortClockHeadmutates the given array in place. Sincethis.head,newHead, andprevHeadcan be shared references, this can lead to hard-to-trace behavior. Prefer a non-mutating sort.-function sortClockHead(clockHead: ClockHead) { - return clockHead.sort((a, b) => a.toString().localeCompare(b.toString())); -} +function sortClockHead(clockHead: ClockHead) { + return [...clockHead].sort((a, b) => a.toString().localeCompare(b.toString())); +}
181-183: Use a more robust equality check for ClockHeads.Stringifying arrays can hide ordering issues or fail if element
toStringformats differ. Since you already sort heads, comparing by element-wise equality is clearer and safer.-function compareClockHeads(head1: ClockHead, head2: ClockHead) { - return head1.toString() === head2.toString(); -} +function compareClockHeads(head1: ClockHead, head2: ClockHead) { + if (head1.length !== head2.length) return false; + for (let i = 0; i < head1.length; i++) { + if (head1[i].toString() !== head2[i].toString()) return false; + } + return true; +}core/base/write-queue.ts (1)
74-79: Queue-size logging is on point; consider constant or option for the threshold.The warn on >10 and debug log per enqueue meet the PR goal. To ease future configurability:
- Hoist threshold to a module-level const or to opts (e.g.,
opts.warnThresholdWriteQueuewith a default).- Optionally throttle warnings to avoid log storms under prolonged overload.
Example minimal refactor:
+const WRITE_QUEUE_WARN_THRESHOLD = 10; ... this.queue.push({ tasks, resolve, reject }); const queueSize = this.queue.length; - if (queueSize > 10) { + if (queueSize > WRITE_QUEUE_WARN_THRESHOLD) { this.logger.Warn().Uint("writeQueueSize", queueSize).Uint("bulkTaskCount", tasks.length).Msg("High writeQueue size - potential high parallelism"); }core/tests/runtime/queue-logging.test.ts (3)
18-31: Remove unused mock logger setup.
mockLoggeris created but never used. This adds noise and can confuse future readers about the intent.- // Mock logger to capture log calls - mockLogger = { - warn: vi.fn().mockReturnValue({ - Uint: vi.fn().mockReturnThis(), - Bool: vi.fn().mockReturnThis(), - Msg: vi.fn().mockReturnThis() - }), - debug: vi.fn().mockReturnValue({ - Uint: vi.fn().mockReturnThis(), - Bool: vi.fn().mockReturnThis(), - Msg: vi.fn().mockReturnThis() - }) - }; + // Intentionally not mocking internal logger here; functional verification only
38-47: Test name implies log verification, but asserts only functional completion.That’s fine for end-to-end sanity. Consider adding a focused test that injects a mock logger into a lower-level queue (like you did for applyHeadQueue) to assert warn/debug emission at thresholds.
85-113: Good concurrency coverage; consider asserting DB invariants more strictly.Optionally assert:
- No duplicates (by id) after mixed operations.
- All results have a valid clock/head shape.
This tightens validation that logging has zero functional side-effects under load.
core/tests/runtime/write-queue-logging.test.ts (2)
20-38: Remove unused local logger stub or wire it through ensureLogger.The local
loggerobject is created but never used by the queue sincewriteQueueconstructs its own logger viaensureLogger(sthis, ...). To reduce confusion, remove it. Alternatively, enhancemockSuperThisto includeloggerand ensureensureLoggerpicks it up if that’s supported.
153-174: Assert processing order deterministically.You intend to verify FIFO ordering with
chunkSize: 1, but the test only asserts membership, not order. Strengthen the check:- // Should have processed all tasks - expect(calls).toHaveLength(3); - expect(calls).toContain("first"); - expect(calls).toContain("second"); - expect(calls).toContain("third"); + expect(calls).toEqual(["first", "second", "third"]);core/tests/runtime/apply-head-queue-logging.test.ts (2)
3-3: Remove unused import.
ensureLoggeris imported but not used.-import { ensureLogger } from "@fireproof/core-runtime";
39-68: Minor: test doesn’t truly validate queue size tracking.The test drains the generator but doesn’t assert on intermediate sizes or warn thresholds. If feasible, add assertions against
queue.size()before and after pushing and after one worker tick, or assert thatlogger.Warnis called when enqueuing beyond the threshold.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
core/base/crdt-clock.ts(1 hunks)core/base/write-queue.ts(1 hunks)core/tests/runtime/apply-head-queue-logging.test.ts(1 hunks)core/tests/runtime/queue-logging.test.ts(1 hunks)core/tests/runtime/write-queue-logging.test.ts(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (3)
core/tests/runtime/write-queue-logging.test.ts (2)
core/types/base/types.ts (3)
SuperThis(137-148)DocUpdate(238-243)DocTypes(209-209)core/base/write-queue.ts (1)
writeQueue(92-98)
core/tests/runtime/queue-logging.test.ts (3)
core/types/base/types.ts (1)
Database(593-626)core/base/ledger.ts (1)
fireproof(343-345)core/base/crdt.ts (1)
allDocs(230-237)
core/tests/runtime/apply-head-queue-logging.test.ts (3)
core/base/ledger.ts (1)
logger(116-118)vendor/p-limit/index.js (2)
queue(6-6)generator(57-60)core/types/base/types.ts (3)
DocTypes(209-209)ClockHead(201-201)DocUpdate(238-243)
| it("should sort tasks with updates first", async () => { | ||
| const taskWithoutUpdates = { | ||
| newHead: [] as ClockHead, | ||
| prevHead: [] as ClockHead, | ||
| }; | ||
|
|
||
| const taskWithUpdates = { | ||
| newHead: [] as ClockHead, | ||
| prevHead: [] as ClockHead, | ||
| updates: [{ id: "test", value: { test: "data" } }] as DocUpdate<DocTypes>[] | ||
| }; | ||
|
|
||
| // Add task without updates first | ||
| const gen1 = queue.push(taskWithoutUpdates); | ||
|
|
||
| // Add task with updates second | ||
| const gen2 = queue.push(taskWithUpdates); | ||
|
|
||
| // Process both | ||
| await Promise.all([ | ||
| (async () => { | ||
| let result = await gen1.next(); | ||
| while (!result.done) { | ||
| result = await gen1.next(); | ||
| } | ||
| })(), | ||
| (async () => { | ||
| let result = await gen2.next(); | ||
| while (!result.done) { | ||
| result = await gen2.next(); | ||
| } | ||
| })() | ||
| ]); | ||
|
|
||
| // Both workers should have been called | ||
| expect(mockWorker).toHaveBeenCalledTimes(2); | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
The “sort tasks with updates first” test doesn’t validate order.
You currently assert only that both tasks run. To verify prioritization, assert call order using the third argument (localUpdates):
- // Both workers should have been called
- expect(mockWorker).toHaveBeenCalledTimes(2);
+ // Both workers should have been called
+ expect(mockWorker).toHaveBeenCalledTimes(2);
+ // Validate that the call with localUpdates=true happened before the one with localUpdates=false
+ const calls = mockWorker.mock.calls;
+ expect(calls[0][2]).toBe(true);
+ expect(calls[1][2]).toBe(false);Note: If you keep processing both generators concurrently, ordering may be nondeterministic. For a deterministic assertion, process the queue serially.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| it("should sort tasks with updates first", async () => { | |
| const taskWithoutUpdates = { | |
| newHead: [] as ClockHead, | |
| prevHead: [] as ClockHead, | |
| }; | |
| const taskWithUpdates = { | |
| newHead: [] as ClockHead, | |
| prevHead: [] as ClockHead, | |
| updates: [{ id: "test", value: { test: "data" } }] as DocUpdate<DocTypes>[] | |
| }; | |
| // Add task without updates first | |
| const gen1 = queue.push(taskWithoutUpdates); | |
| // Add task with updates second | |
| const gen2 = queue.push(taskWithUpdates); | |
| // Process both | |
| await Promise.all([ | |
| (async () => { | |
| let result = await gen1.next(); | |
| while (!result.done) { | |
| result = await gen1.next(); | |
| } | |
| })(), | |
| (async () => { | |
| let result = await gen2.next(); | |
| while (!result.done) { | |
| result = await gen2.next(); | |
| } | |
| })() | |
| ]); | |
| // Both workers should have been called | |
| expect(mockWorker).toHaveBeenCalledTimes(2); | |
| }); | |
| it("should sort tasks with updates first", async () => { | |
| const taskWithoutUpdates = { | |
| newHead: [] as ClockHead, | |
| prevHead: [] as ClockHead, | |
| }; | |
| const taskWithUpdates = { | |
| newHead: [] as ClockHead, | |
| prevHead: [] as ClockHead, | |
| updates: [{ id: "test", value: { test: "data" } }] as DocUpdate<DocTypes>[] | |
| }; | |
| // Add task without updates first | |
| const gen1 = queue.push(taskWithoutUpdates); | |
| // Add task with updates second | |
| const gen2 = queue.push(taskWithUpdates); | |
| // Process both | |
| await Promise.all([ | |
| (async () => { | |
| let result = await gen1.next(); | |
| while (!result.done) { | |
| result = await gen1.next(); | |
| } | |
| })(), | |
| (async () => { | |
| let result = await gen2.next(); | |
| while (!result.done) { | |
| result = await gen2.next(); | |
| } | |
| })() | |
| ]); | |
| // Both workers should have been called | |
| expect(mockWorker).toHaveBeenCalledTimes(2); | |
| // Validate that the call with localUpdates=true happened before the one with localUpdates=false | |
| const calls = mockWorker.mock.calls; | |
| expect(calls[0][2]).toBe(true); | |
| expect(calls[1][2]).toBe(false); | |
| }); |
🤖 Prompt for AI Agents
In core/tests/runtime/apply-head-queue-logging.test.ts around lines 112-148, the
test only asserts both tasks ran but not their execution order; update the test
to process the queue serially (not concurrently) so ordering is deterministic,
consume the first generator to completion before starting the second, and then
assert mockWorker was called twice with the call order verifying the third
argument (localUpdates) — the first call should have the updates present and the
second call should have no updates.
| it("should handle worker errors gracefully", async () => { | ||
| // Make worker throw an error | ||
| mockWorker.mockRejectedValueOnce(new Error("Worker error")); | ||
|
|
||
| const task = { | ||
| newHead: [] as ClockHead, | ||
| prevHead: [] as ClockHead, | ||
| updates: [{ id: "test", value: { test: "data" } }] as DocUpdate<DocTypes>[] | ||
| }; | ||
|
|
||
| const generator = queue.push(task); | ||
|
|
||
| // Should handle the error without crashing | ||
| await expect(async () => { | ||
| let result = await generator.next(); | ||
| while (!result.done) { | ||
| result = await generator.next(); | ||
| } | ||
| }).rejects.toThrow(); | ||
|
|
||
| // Error should have been logged | ||
| expect(logger.Error).toHaveBeenCalled(); | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect usage of rejects with an async function; assert the promise instead.
await expect(async () => { ... }).rejects.toThrow() passes a function to expect, which is for sync .toThrow. For rejected promises, pass the promise itself.
Apply this correction:
- await expect(async () => {
- let result = await generator.next();
- while (!result.done) {
- result = await generator.next();
- }
- }).rejects.toThrow();
+ const drain = (async () => {
+ let result = await generator.next();
+ while (!result.done) {
+ result = await generator.next();
+ }
+ })();
+ await expect(drain).rejects.toThrow();📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| it("should handle worker errors gracefully", async () => { | |
| // Make worker throw an error | |
| mockWorker.mockRejectedValueOnce(new Error("Worker error")); | |
| const task = { | |
| newHead: [] as ClockHead, | |
| prevHead: [] as ClockHead, | |
| updates: [{ id: "test", value: { test: "data" } }] as DocUpdate<DocTypes>[] | |
| }; | |
| const generator = queue.push(task); | |
| // Should handle the error without crashing | |
| await expect(async () => { | |
| let result = await generator.next(); | |
| while (!result.done) { | |
| result = await generator.next(); | |
| } | |
| }).rejects.toThrow(); | |
| // Error should have been logged | |
| expect(logger.Error).toHaveBeenCalled(); | |
| }); | |
| it("should handle worker errors gracefully", async () => { | |
| // Make worker throw an error | |
| mockWorker.mockRejectedValueOnce(new Error("Worker error")); | |
| const task = { | |
| newHead: [] as ClockHead, | |
| prevHead: [] as ClockHead, | |
| updates: [{ id: "test", value: { test: "data" } }] as DocUpdate<DocTypes>[] | |
| }; | |
| const generator = queue.push(task); | |
| // Should handle the error without crashing | |
| const drain = (async () => { | |
| let result = await generator.next(); | |
| while (!result.done) { | |
| result = await generator.next(); | |
| } | |
| })(); | |
| await expect(drain).rejects.toThrow(); | |
| // Error should have been logged | |
| expect(logger.Error).toHaveBeenCalled(); | |
| }); |
🤖 Prompt for AI Agents
In core/tests/runtime/apply-head-queue-logging.test.ts around lines 150-172, the
test incorrectly passes an async function to expect(...).rejects; change it to
assert the actual promise: invoke the async generator body immediately to
produce a promise (e.g. const run = (async () => { let result = await
generator.next(); while (!result.done) { result = await generator.next(); }
})();), then use await expect(run).rejects.toThrow(); after that assert
expect(logger.Error).toHaveBeenCalled(); to keep the logging assertion.
| it("should handle worker errors", async () => { | ||
| // Make worker throw an error | ||
| mockWorker.mockRejectedValueOnce(new Error("Worker failed")); | ||
|
|
||
| const queue = writeQueue(mockSuperThis, mockWorker, { chunkSize: 32 }); | ||
|
|
||
| const task: DocUpdate<DocTypes> = { id: "test", value: { test: "data" } }; | ||
|
|
||
| // Should propagate the error | ||
| await expect(queue.push(task)).rejects.toThrow("Worker failed"); | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Error message assertion may be brittle; assert rejection type instead.
writeQueue wraps worker errors via the logger builder (.Msg("Error processing task").AsError()), which may alter the message. Asserting the exact message "Worker failed" can make the test flaky.
Use a type-based rejection assertion:
- await expect(queue.push(task)).rejects.toThrow("Worker failed");
+ await expect(queue.push(task)).rejects.toBeInstanceOf(Error);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| it("should handle worker errors", async () => { | |
| // Make worker throw an error | |
| mockWorker.mockRejectedValueOnce(new Error("Worker failed")); | |
| const queue = writeQueue(mockSuperThis, mockWorker, { chunkSize: 32 }); | |
| const task: DocUpdate<DocTypes> = { id: "test", value: { test: "data" } }; | |
| // Should propagate the error | |
| await expect(queue.push(task)).rejects.toThrow("Worker failed"); | |
| }); | |
| it("should handle worker errors", async () => { | |
| // Make worker throw an error | |
| mockWorker.mockRejectedValueOnce(new Error("Worker failed")); | |
| const queue = writeQueue(mockSuperThis, mockWorker, { chunkSize: 32 }); | |
| const task: DocUpdate<DocTypes> = { id: "test", value: { test: "data" } }; | |
| // Should propagate the error | |
| - await expect(queue.push(task)).rejects.toThrow("Worker failed"); | |
| + await expect(queue.push(task)).rejects.toBeInstanceOf(Error); | |
| }); |
🤖 Prompt for AI Agents
In core/tests/runtime/write-queue-logging.test.ts around lines 123 to 133, the
test asserts the exact error message "Worker failed", which is brittle because
the logger wraps/changes the message; change the assertion to check the
rejection type instead (e.g., use await
expect(queue.push(task)).rejects.toBeInstanceOf(Error) or
.rejects.toThrow(Error)) so the test verifies an Error was thrown without
depending on the exact message.
mabels
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no way that i will accept that.
The Cement Logger has a Debugging feature and we planned not to use any mocks at all.
In fireproofing, which should not be necessary, there are multiple layers to make better software without logs.
|
I will pull a queue implementation (did a lot before) into @adviser/cement that has this feature built into it. To fix my ResolveOnce reset problem, I need that anyway. |
Summary
Implements warning logs to detect when applications hit the database with excessive parallelism, helping identify performance bottlenecks before they become critical issues.
Changes
ApplyHeadQueue Monitoring
WriteQueue Monitoring
Comprehensive Test Suite
Benefits
Testing
The feature can be tested by creating high-concurrency scenarios:
Resolves
Closes #1053
Future Enhancements
Summary by CodeRabbit