release: promote recursive RLM trace observability#96
Conversation
Adds recursive-tree observability for rlm_query child runs. Includes bounded child invocation, usage splits, Langfuse spans, fail-closed recursion controls, and process-group abort cleanup.
📝 WalkthroughWalkthroughThis PR introduces recursive RLM child process tracing and observability via Langfuse integration. It adds a ChangesRecursive RLM tracing with Langfuse observability
Possibly related PRs
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ESLint
ESLint skipped: no ESLint configuration detected in root package.json. To enable, add Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a minimal Langfuse ingestion recorder (LangfuseTraceRecorder) to enable recursive RLM tree observability without external SDK dependencies. It adds tracking for recursive child process execution, including child start/end logging, usage breakdown (root vs. child vs. total splits), and budget limits enforcement across recursion. Feedback highlights two issues in src/llm.ts: first, when the maximum recursion depth is reached, the early return does not trigger the onChildStart and onChildEnd callbacks, leading to incomplete Langfuse traces; second, if a child process fails to spawn, both error and close events can fire, potentially causing duplicate callback invocations and double promise resolution, which can be resolved using a guard flag.
| if (options.maxDepth !== undefined && currentDepth >= options.maxDepth) { | ||
| const error = `Error: max recursive rlm_query depth ${options.maxDepth} reached`; | ||
| const result: RlmChildResult = { answer: error }; | ||
| options.logger?.childStart({ | ||
| child_correlation_id: correlationId, | ||
| prompt_preview: prompt.slice(0, 200), | ||
| depth, | ||
| }); | ||
| options.logger?.childEnd({ | ||
| child_correlation_id: correlationId, | ||
| child_run_id: null, | ||
| input_tokens: 0, | ||
| output_tokens: 0, | ||
| cost: 0, | ||
| llm_calls: 0, | ||
| time_ms: 0, | ||
| is_error: true, | ||
| error_message: error, | ||
| }); | ||
| resolve(result); | ||
| return; | ||
| } |
There was a problem hiding this comment.
When the maximum recursion depth is reached, the early return block logs the event to the local logger but does not invoke the onChildStart and onChildEnd callbacks. This prevents the aborted child run from being recorded as a span in Langfuse, leading to incomplete or missing traces for aborted recursive calls.
Invoking these callbacks ensures that the aborted run is correctly represented in Langfuse as a failed span with the appropriate error message.
if (options.maxDepth !== undefined && currentDepth >= options.maxDepth) {
const error = `Error: max recursive rlm_query depth ${options.maxDepth} reached`;
const result: RlmChildResult = { answer: error };
options.logger?.childStart({
child_correlation_id: correlationId,
prompt_preview: prompt.slice(0, 200),
depth,
});
const spanId = options.onChildStart?.({ correlationId, prompt, depth });
options.logger?.childEnd({
child_correlation_id: correlationId,
child_run_id: null,
input_tokens: 0,
output_tokens: 0,
cost: 0,
llm_calls: 0,
time_ms: 0,
is_error: true,
error_message: error,
});
options.onChildEnd?.({ spanId, result, durationMs: 0, isError: true, errorMessage: error });
resolve(result);
return;
}| child.on("close", (code) => { | ||
| const durationMs = Date.now() - startMs; | ||
| if (code !== 0) { | ||
| resolve(`Error: child rlmx exited with code ${code}. ${stderr}`.trim()); | ||
| const errorMessage = `Error: child rlmx exited with code ${code}. ${stderr}`.trim(); | ||
| const result: RlmChildResult = { answer: errorMessage }; | ||
| options.logger?.childEnd({ | ||
| child_correlation_id: correlationId, | ||
| child_run_id: null, | ||
| input_tokens: 0, | ||
| output_tokens: 0, | ||
| cost: 0, | ||
| llm_calls: 0, | ||
| time_ms: durationMs, | ||
| is_error: true, | ||
| error_message: errorMessage, | ||
| }); | ||
| options.onChildEnd?.({ spanId, result, durationMs, isError: true, errorMessage }); | ||
| resolve(result); | ||
| return; | ||
| } | ||
| try { | ||
| const result = JSON.parse(stdout); | ||
| resolve(result.answer ?? stdout); | ||
| } catch { | ||
| resolve(stdout.trim() || `Error: empty response from child rlmx`); | ||
| } | ||
| const result = parseRlmChildOutput(stdout); | ||
| options.logger?.childEnd({ | ||
| child_correlation_id: correlationId, | ||
| child_run_id: result.runId ?? null, | ||
| input_tokens: result.usage?.inputTokens ?? 0, | ||
| output_tokens: result.usage?.outputTokens ?? 0, | ||
| cost: result.usage?.totalCost ?? 0, | ||
| llm_calls: result.usage?.llmCalls ?? 0, | ||
| time_ms: durationMs, | ||
| }); | ||
| options.onChildEnd?.({ spanId, result, durationMs }); | ||
| resolve(result); | ||
| }); | ||
|
|
||
| child.on("error", (err) => { | ||
| resolve(`Error: failed to spawn child rlmx: ${err.message}`); | ||
| const durationMs = Date.now() - startMs; | ||
| const errorMessage = `Error: failed to spawn child rlmx: ${err.message}`; | ||
| const result: RlmChildResult = { answer: errorMessage }; | ||
| options.logger?.childEnd({ | ||
| child_correlation_id: correlationId, | ||
| child_run_id: null, | ||
| input_tokens: 0, | ||
| output_tokens: 0, | ||
| cost: 0, | ||
| llm_calls: 0, | ||
| time_ms: durationMs, | ||
| is_error: true, | ||
| error_message: errorMessage, | ||
| }); | ||
| options.onChildEnd?.({ spanId, result, durationMs, isError: true, errorMessage }); | ||
| resolve(result); | ||
| }); | ||
| }); |
There was a problem hiding this comment.
If the child process fails to spawn, Node.js can emit both the error and close events. Without a guard, this can result in onChildEnd being called twice and the promise being resolved twice, which may lead to duplicate span updates or errors in Langfuse.
Introducing a resolved flag ensures that the completion logic and callbacks are executed exactly once.
let resolved = false;
child.on("close", (code) => {
if (resolved) return;
resolved = true;
const durationMs = Date.now() - startMs;
if (code !== 0) {
const errorMessage = `Error: child rlmx exited with code ${code}. ${stderr}`.trim();
const result: RlmChildResult = { answer: errorMessage };
options.logger?.childEnd({
child_correlation_id: correlationId,
child_run_id: null,
input_tokens: 0,
output_tokens: 0,
cost: 0,
llm_calls: 0,
time_ms: durationMs,
is_error: true,
error_message: errorMessage,
});
options.onChildEnd?.({ spanId, result, durationMs, isError: true, errorMessage });
resolve(result);
return;
}
const result = parseRlmChildOutput(stdout);
options.logger?.childEnd({
child_correlation_id: correlationId,
child_run_id: result.runId ?? null,
input_tokens: result.usage?.inputTokens ?? 0,
output_tokens: result.usage?.outputTokens ?? 0,
cost: result.usage?.totalCost ?? 0,
llm_calls: result.usage?.llmCalls ?? 0,
time_ms: durationMs,
});
options.onChildEnd?.({ spanId, result, durationMs });
resolve(result);
});
child.on("error", (err) => {
if (resolved) return;
resolved = true;
const durationMs = Date.now() - startMs;
const errorMessage = `Error: failed to spawn child rlmx: ${err.message}`;
const result: RlmChildResult = { answer: errorMessage };
options.logger?.childEnd({
child_correlation_id: correlationId,
child_run_id: null,
input_tokens: 0,
output_tokens: 0,
cost: 0,
llm_calls: 0,
time_ms: durationMs,
is_error: true,
error_message: errorMessage,
});
options.onChildEnd?.({ spanId, result, durationMs, isError: true, errorMessage });
resolve(result);
});There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 4d6912c1ff
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| const batch = prompts.slice(i, i + MAX_CONCURRENT); | ||
| const batchResults = await Promise.all( | ||
| batch.map((p) => rlmQuery(p, cwd, signal)) | ||
| batch.map((p) => rlmQuery(p, cwd, signal, options)) |
There was a problem hiding this comment.
Enforce shared budgets across batched children
When a REPL request uses rlm_query_batched, every prompt is spawned with the same options object, including the maxCost/maxTokens values that rlmLoop computed once before the request, and the parent budget is only updated after all batched children finish. In a batch of multiple recursive prompts, each child can therefore consume the full remaining global budget, so --max-cost or --max-tokens can be exceeded by the batch size before the parent notices. Please allocate/update remaining budget per child or stop scheduling additional children once the aggregate child usage reaches the parent limit.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/langfuse.ts (1)
120-147: 💤 Low valueEvents are discarded before confirming successful ingestion.
Line 122 splices (removes) all events from the queue before the fetch completes. If the POST fails or times out, those events are permanently lost with no opportunity to retry or restore them.
Consider either:
- Splice after successful response, or
- Restore events to the queue on failure if retry is desired.
If the current "fire and forget on failure" behavior is intentional for bounded observability, a brief comment clarifying this design choice would help future readers.
♻️ Option: Restore events on failure
async flush(): Promise<void> { if (!this.enabled || this.queue.length === 0) return; const batch = this.queue.splice(0, this.queue.length); const auth = Buffer.from(`${this.publicKey}:${this.secretKey}`).toString("base64"); const abortController = new AbortController(); const timeoutHandle = setTimeout(() => abortController.abort(), this.flushTimeoutMs); try { const res = await this.fetchImpl(`${this.host}/api/public/ingestion`, { method: "POST", headers: { "content-type": "application/json", authorization: `Basic ${auth}`, }, body: JSON.stringify({ batch }), signal: abortController.signal, }); if (!res.ok) { + this.queue.unshift(...batch); // Restore for potential retry throw new Error(`Langfuse ingestion failed: ${res.status} ${await res.text().catch(() => "")}`.trim()); } } catch (err) { + this.queue.unshift(...batch); // Restore on any failure if (err instanceof Error && err.name === "AbortError") { throw new Error(`Langfuse ingestion timed out after ${this.flushTimeoutMs}ms`); } throw err; } finally { clearTimeout(timeoutHandle); } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/langfuse.ts` around lines 120 - 147, The flush() implementation currently removes events from this.queue immediately (const batch = this.queue.splice(...)) before the network request, which causes permanent loss if the POST fails or times out; change the logic to only remove (splice) the items after a successful response, or if you prefer to attempt the request then restore on failure, keep a copy of the batch (e.g., const batch = this.queue.slice(...)) and on error reinsert the events back into this.queue (e.g., unshift/concat) so they can be retried; update the error/timeout handling around fetchImpl and flushTimeoutMs accordingly, or if dropping on failure is intentional add a clear comment in flush() explaining the design choice.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/llm.ts`:
- Around line 464-468: The current logic adds an abort listener to the incoming
AbortSignal but doesn't check signal.aborted before spawning the child, allowing
the child to run if the signal was already aborted; update the rlmQuery (or
wherever the child is spawned) to first check if signal && signal.aborted and,
if so, call terminateChildTree (or otherwise short-circuit/throw) instead of
proceeding to spawn the child, otherwise continue to add the once listener as
now—this ensures pre-spawn aborts are handled.
---
Nitpick comments:
In `@src/langfuse.ts`:
- Around line 120-147: The flush() implementation currently removes events from
this.queue immediately (const batch = this.queue.splice(...)) before the network
request, which causes permanent loss if the POST fails or times out; change the
logic to only remove (splice) the items after a successful response, or if you
prefer to attempt the request then restore on failure, keep a copy of the batch
(e.g., const batch = this.queue.slice(...)) and on error reinsert the events
back into this.queue (e.g., unshift/concat) so they can be retried; update the
error/timeout handling around fetchImpl and flushTimeoutMs accordingly, or if
dropping on failure is intentional add a clear comment in flush() explaining the
design choice.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 24e0e306-7622-4259-9fd4-ef9856e46cb0
⛔ Files ignored due to path filters (15)
dist/src/index.d.tsis excluded by!**/dist/**dist/src/index.jsis excluded by!**/dist/**dist/src/langfuse.d.tsis excluded by!**/dist/**dist/src/langfuse.jsis excluded by!**/dist/**dist/src/llm.d.tsis excluded by!**/dist/**dist/src/llm.jsis excluded by!**/dist/**dist/src/logger.d.tsis excluded by!**/dist/**dist/src/logger.jsis excluded by!**/dist/**dist/src/output.d.tsis excluded by!**/dist/**dist/src/output.jsis excluded by!**/dist/**dist/src/rlm.jsis excluded by!**/dist/**dist/src/version.d.tsis excluded by!**/dist/**dist/src/version.jsis excluded by!**/dist/**dist/tests/recursive-trace.test.d.tsis excluded by!**/dist/**dist/tests/recursive-trace.test.jsis excluded by!**/dist/**
📒 Files selected for processing (9)
package.jsonsrc/index.tssrc/langfuse.tssrc/llm.tssrc/logger.tssrc/output.tssrc/rlm.tssrc/version.tstests/recursive-trace.test.ts
| if (signal) { | ||
| signal.addEventListener("abort", () => child.kill("SIGTERM"), { | ||
| signal.addEventListener("abort", terminateChildTree, { | ||
| once: true, | ||
| }); | ||
| } |
There was a problem hiding this comment.
Missing pre-spawn abort check allows child to run after parent aborts.
If the signal is already aborted when rlmQuery is called, the abort event listener won't fire (since the event already happened), and the child process will spawn and run to completion.
Proposed fix: check signal.aborted before spawning
const spanId = options.onChildStart?.({ correlationId, prompt, depth });
const startMs = Date.now();
+
+ if (signal?.aborted) {
+ const durationMs = Date.now() - startMs;
+ const errorMessage = "Error: child rlmx aborted before spawn";
+ const result: RlmChildResult = { answer: errorMessage };
+ options.logger?.childEnd({
+ child_correlation_id: correlationId,
+ child_run_id: null,
+ input_tokens: 0,
+ output_tokens: 0,
+ cost: 0,
+ llm_calls: 0,
+ time_ms: durationMs,
+ is_error: true,
+ error_message: errorMessage,
+ });
+ options.onChildEnd?.({ spanId, result, durationMs, isError: true, errorMessage });
+ resolve(result);
+ return;
+ }
+
const child = spawn(🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/llm.ts` around lines 464 - 468, The current logic adds an abort listener
to the incoming AbortSignal but doesn't check signal.aborted before spawning the
child, allowing the child to run if the signal was already aborted; update the
rlmQuery (or wherever the child is spawned) to first check if signal &&
signal.aborted and, if so, call terminateChildTree (or otherwise
short-circuit/throw) instead of proceeding to spawn the child, otherwise
continue to add the once listener as now—this ensures pre-spawn aborts are
handled.
Summary
Promote dev to main for the recursive RLM child-run observability release.
Included
Proof
Summary by CodeRabbit
New Features
Tests