From 5c0fd1f69a1e731ae5729559311e6c935130de5f Mon Sep 17 00:00:00 2001 From: Hugo Richard Date: Sun, 7 Jun 2026 13:20:58 +0100 Subject: [PATCH 1/3] fix: defer wide-event emit for streaming AI responses --- .changeset/streaming-ai-emit-defer.md | 7 + apps/docs/content/2.learn/2.wide-events.md | 2 + .../3.integrate/frameworks/02.nextjs.md | 2 +- .../3.integrate/frameworks/04.nitro.md | 2 +- .../content/4.use-cases/2.ai-sdk/02.usage.md | 2 + packages/evlog/src/hono/index.ts | 10 +- packages/evlog/src/logger.ts | 38 ++++- packages/evlog/src/next/handler.ts | 86 ++++++------ packages/evlog/src/nitro-v3/plugin.ts | 55 +++++--- packages/evlog/src/nitro/plugin.ts | 21 ++- packages/evlog/src/orpc/index.ts | 9 +- packages/evlog/src/react-router/index.ts | 5 +- packages/evlog/src/shared/index.ts | 2 +- packages/evlog/src/shared/middleware.ts | 51 +++++-- packages/evlog/src/shared/streamResponse.ts | 115 ++++++++++++++++ packages/evlog/src/sveltekit/index.ts | 5 +- .../test/core/logger-request-logger.test.ts | 12 ++ .../evlog/test/frameworks/sveltekit.test.ts | 41 ++++++ packages/evlog/test/next/handler.test.ts | 52 +++++++ packages/evlog/test/nitro/plugin.test.ts | 66 +++++++++ .../evlog/test/shared/streamResponse.test.ts | 130 ++++++++++++++++++ .../__snapshots__/api-surface.test.ts.snap | 5 +- 22 files changed, 631 insertions(+), 87 deletions(-) create mode 100644 .changeset/streaming-ai-emit-defer.md create mode 100644 packages/evlog/src/shared/streamResponse.ts create mode 100644 packages/evlog/test/shared/streamResponse.test.ts diff --git a/.changeset/streaming-ai-emit-defer.md b/.changeset/streaming-ai-emit-defer.md new file mode 100644 index 00000000..02c8a02e --- /dev/null +++ b/.changeset/streaming-ai-emit-defer.md @@ -0,0 +1,7 @@ +--- +'evlog': minor +--- + +Defer wide-event emit for streaming HTTP responses (SSE, AI SDK UI streams, chunked bodies) until the response body finishes, so `createAILogger()` metadata is included on the same request event instead of triggering post-emit warnings. Applies to Next.js `withEvlog`, SvelteKit, Hono, React Router, oRPC, and Nitro/Nuxt integrations. Also merges late `ai` fields onto an emitted event before enrich/drain when metadata arrives in a narrow race window. + +Fixes #321 diff --git a/apps/docs/content/2.learn/2.wide-events.md b/apps/docs/content/2.learn/2.wide-events.md index f96e0a2e..e09fd3b9 100644 --- a/apps/docs/content/2.learn/2.wide-events.md +++ b/apps/docs/content/2.learn/2.wide-events.md @@ -188,6 +188,8 @@ The parent wide event may be emitted **before** the child event; they are two se **Not available yet:** Hono (no `useLogger` without `c.get('log')` + ALS) and Nitro/Nuxt `useLogger(event)` — use the post-emit warnings to catch mistakes; a different API may arrive later for event-scoped forks. +For AI SDK streaming responses, supported framework integrations defer wide-event emit until the response body finishes, so `createAILogger(log)` metadata lands on the same request event automatically. + ```typescript [server/routes/checkout.post.ts] import { evlog, useLogger } from 'evlog/express' diff --git a/apps/docs/content/3.integrate/frameworks/02.nextjs.md b/apps/docs/content/3.integrate/frameworks/02.nextjs.md index 4c4ddd1a..61c3cd9b 100644 --- a/apps/docs/content/3.integrate/frameworks/02.nextjs.md +++ b/apps/docs/content/3.integrate/frameworks/02.nextjs.md @@ -279,7 +279,7 @@ export const POST = withEvlog(async (request: Request) => { }) ``` -All fields are merged into a single wide event emitted when the handler completes: +All fields are merged into a single wide event emitted when the handler completes (or when a streaming response body finishes, so AI SDK metadata is included): ```bash [Output (Pretty)] 10:23:45.612 INFO [my-app] POST /api/checkout 200 in 145ms diff --git a/apps/docs/content/3.integrate/frameworks/04.nitro.md b/apps/docs/content/3.integrate/frameworks/04.nitro.md index ffc74497..d50d83e2 100644 --- a/apps/docs/content/3.integrate/frameworks/04.nitro.md +++ b/apps/docs/content/3.integrate/frameworks/04.nitro.md @@ -125,7 +125,7 @@ One request, one log line with all context: └─ requestId: a1b2c3d4-... ``` -Nitro uses **`useLogger(event)`** (event-bound scope), not `AsyncLocalStorage`, so **`log.fork()` is not available** here yet. Post-emit warnings still apply if code calls `set()` after the wide event was emitted. See [Wide events — After emit](/learn/wide-events#after-emit-sealing-and-background-work). +Nitro uses **`useLogger(event)`** (event-bound scope), not `AsyncLocalStorage`, so **`log.fork()` is not available** here yet. For AI SDK streaming responses, evlog defers wide-event emit until the response body finishes so `createAILogger(log)` metadata stays on the same request event. Post-emit warnings still apply if code calls `set()` after the wide event was emitted. See [Wide events — After emit](/learn/wide-events#after-emit-sealing-and-background-work). ## Error Handling diff --git a/apps/docs/content/4.use-cases/2.ai-sdk/02.usage.md b/apps/docs/content/4.use-cases/2.ai-sdk/02.usage.md index ef9d4d23..d39ad44d 100644 --- a/apps/docs/content/4.use-cases/2.ai-sdk/02.usage.md +++ b/apps/docs/content/4.use-cases/2.ai-sdk/02.usage.md @@ -19,6 +19,8 @@ links: Every pattern below uses the same `createAILogger(log)` setup. Wrap the model with `ai.wrap()` and the middleware accumulates tokens, tools, and timing on the wide event automatically. +On Next.js, Nuxt/Nitro, SvelteKit, Hono, React Router, and oRPC, evlog defers wide-event emit for streaming responses (for example `text/event-stream` and AI SDK UI streams) until the body finishes, so late `ai` metadata stays on the same request event. + ## streamText The most common pattern — streaming chat with full observability: diff --git a/packages/evlog/src/hono/index.ts b/packages/evlog/src/hono/index.ts index c3e1faf3..32f38ee9 100644 --- a/packages/evlog/src/hono/index.ts +++ b/packages/evlog/src/hono/index.ts @@ -2,6 +2,7 @@ import type { Context, MiddlewareHandler } from 'hono' import type { RequestLogger } from '../types' import { defineFrameworkIntegration } from '../shared/integration' import type { BaseEvlogOptions } from '../shared/middleware' +import { shouldDeferEmitForResponse } from '../shared/streamResponse' export type EvlogHonoOptions = BaseEvlogOptions @@ -54,13 +55,20 @@ const integration = defineFrameworkIntegration({ */ export function evlog(options: EvlogHonoOptions = {}): MiddlewareHandler { return async (c, next) => { - const { skipped, finish } = integration.start(c, options) + const { skipped, finish, finishResponse } = integration.start(c, options) if (skipped) { await next() return } try { await next() + if (shouldDeferEmitForResponse(c.res)) { + const response = new Response(c.res.body, { + status: c.res.status, + headers: c.res.headers, + }) + return finishResponse(response, { status: response.status }) + } await finish({ status: c.res.status }) } catch (error) { await finish({ error: error as Error }) diff --git a/packages/evlog/src/logger.ts b/packages/evlog/src/logger.ts index 00584a9b..5d1bd4b9 100644 --- a/packages/evlog/src/logger.ts +++ b/packages/evlog/src/logger.ts @@ -41,6 +41,25 @@ function mergeInto(target: Record, source: Record() + +function isAiOnlyFieldUpdate(data: Record): boolean { + const keys = Object.keys(data) + return keys.length === 1 && keys[0] === 'ai' +} + +/** + * Mark a wide event as past the post-emit AI merge window so late `log.set({ ai })` + * calls warn again. Called by framework enrich/drain pipelines before drain runs. + * + * @internal Used by middleware and framework integrations. + */ +export function markWideEventDrainStarted(event: WideEvent | null): void { + if (!event) return + const state = pendingDrainState.get(event) + if (state) state.drainStarted = true +} + /** * @internal Wide-event field merge — exported for test mocks that mirror emit accumulation. */ @@ -704,6 +723,7 @@ export function createLogger>(initial let hasWarn = false let manualLevel: LogLevel | undefined let emitted = false + let pendingWideEvent: WideEvent | null = null function addLog(level: 'info' | 'warn', message: string): void { if (!Array.isArray(context.requestLogs)) { @@ -738,7 +758,18 @@ export function createLogger>(initial audit: auditMethod, set(data: FieldContext): void { if (emitted) { - const keys = Object.keys(data as Record) + const record = data as Record + const pendingState = pendingWideEvent ? pendingDrainState.get(pendingWideEvent) : undefined + if ( + pendingWideEvent + && pendingState + && !pendingState.drainStarted + && isAiOnlyFieldUpdate(record) + ) { + mergeInto(pendingWideEvent as Record, record) + return + } + const keys = Object.keys(record) warnPostEmit('log.set()', `Keys dropped: ${keys.length ? keys.join(', ') : '(empty)'}.`) return } @@ -843,6 +874,7 @@ export function createLogger>(initial if (!forceKeep && !shouldSample(level)) { emitted = true + pendingWideEvent = null return null } @@ -856,6 +888,10 @@ export function createLogger>(initial const wide = emitWideEvent(level, context, { deferDrain, ownsEvent: true, waitUntil }) emitted = true + pendingWideEvent = wide + if (wide) { + pendingDrainState.set(wide, { drainStarted: false }) + } return wide }, diff --git a/packages/evlog/src/next/handler.ts b/packages/evlog/src/next/handler.ts index 5fd03c14..6baaf667 100644 --- a/packages/evlog/src/next/handler.ts +++ b/packages/evlog/src/next/handler.ts @@ -1,8 +1,9 @@ import type { DrainContext, EnrichContext, TailSamplingContext, WideEvent } from '../types' -import { createRequestLogger, getGlobalDrain, initLogger, isEnabled, isLoggerLocked } from '../logger' +import { createRequestLogger, getGlobalDrain, initLogger, isEnabled, isLoggerLocked, markWideEventDrainStarted } from '../logger' import { attachForkToLogger } from '../shared/fork' import type { MiddlewareLoggerOptions } from '../shared/middleware' import { shouldLog, getServiceForPath } from '../shared/routes' +import { bindStreamingResponseLifecycle, shouldDeferEmitForResponse } from '../shared/streamResponse' import { filterSafeHeaders } from '../utils' import { EvlogError } from '../error' import type { NextEvlogOptions } from './types' @@ -83,6 +84,8 @@ async function callEnrichAndDrain( } } + markWideEventDrainStarted(emittedEvent) + if (drain) { const drainCtx: DrainContext = { event: emittedEvent, @@ -112,6 +115,33 @@ async function callEnrichAndDrain( run().catch(() => {}) } +async function emitRequestEvent( + logger: ReturnType, + requestInfo: { method: string, path: string, requestId: string }, + headers: Record, + status: number, +): Promise { + let forceKeep = false + if (state.options.keep) { + try { + const tailCtx: TailSamplingContext = { + status, + path: requestInfo.path, + method: requestInfo.method, + context: logger.getContext(), + shouldKeep: false, + } + await state.options.keep(tailCtx) + forceKeep = tailCtx.shouldKeep ?? false + } catch (err) { + console.error('[evlog] keep callback failed:', err) + } + } + + const emittedEvent = logger.emit({ _forceKeep: forceKeep }) + await callEnrichAndDrain(emittedEvent, requestInfo, headers, status) +} + /** * Wrap a Next.js route handler or server action with evlog request-scoped logging. * @@ -197,6 +227,18 @@ export function createWithEvlog(options: NextEvlogOptions) { try { const result = await evlogStorage.run(logger, () => handler(...args)) + const requestInfo = { method, path, requestId } + + if (result instanceof Response && shouldDeferEmitForResponse(result)) { + logger.set({ status: result.status }) + const wrapped = bindStreamingResponseLifecycle(result, async (meta) => { + if (meta.error) { + logger.error(meta.error) + } + await emitRequestEvent(logger, requestInfo, headers, meta.status ?? result.status) + }) + return wrapped as Awaited + } // Extract response status let { status } = { status: 200 } @@ -205,26 +247,7 @@ export function createWithEvlog(options: NextEvlogOptions) { } logger.set({ status }) - // Build tail sampling context and call keep callback - let forceKeep = false - if (state.options.keep) { - try { - const tailCtx: TailSamplingContext = { - status, - path, - method, - context: logger.getContext(), - shouldKeep: false, - } - await state.options.keep(tailCtx) - forceKeep = tailCtx.shouldKeep ?? false - } catch (err) { - console.error('[evlog] keep callback failed:', err) - } - } - - const emittedEvent = logger.emit({ _forceKeep: forceKeep }) - await callEnrichAndDrain(emittedEvent, { method, path, requestId }, headers, status) + await emitRequestEvent(logger, requestInfo, headers, status) return result as Awaited } catch (error) { @@ -235,26 +258,7 @@ export function createWithEvlog(options: NextEvlogOptions) { ?? 500 logger.set({ status: errorStatus }) - // Build tail sampling context and call keep callback - let forceKeep = false - if (state.options.keep) { - try { - const tailCtx: TailSamplingContext = { - status: errorStatus, - path, - method, - context: logger.getContext(), - shouldKeep: false, - } - await state.options.keep(tailCtx) - forceKeep = tailCtx.shouldKeep ?? false - } catch (err) { - console.error('[evlog] keep callback failed:', err) - } - } - - const emittedEvent = logger.emit({ _forceKeep: forceKeep }) - await callEnrichAndDrain(emittedEvent, { method, path, requestId }, headers, errorStatus) + await emitRequestEvent(logger, { method, path, requestId }, headers, errorStatus) // Return structured JSON response for EvlogErrors (like H3 does for Nuxt) if (isRequest && error instanceof EvlogError) { diff --git a/packages/evlog/src/nitro-v3/plugin.ts b/packages/evlog/src/nitro-v3/plugin.ts index c1558f70..061a3430 100644 --- a/packages/evlog/src/nitro-v3/plugin.ts +++ b/packages/evlog/src/nitro-v3/plugin.ts @@ -2,10 +2,11 @@ import { definePlugin } from 'nitro' import type { CaptureError } from 'nitro/types' import type { HTTPEvent } from 'nitro/h3' import { parseURL } from 'ufo' -import { createRequestLogger, getGlobalPluginRunner, initLogger, isEnabled } from '../logger' +import { createRequestLogger, getGlobalPluginRunner, initLogger, isEnabled, markWideEventDrainStarted } from '../logger' import { shouldLog, getServiceForPath, extractErrorStatus } from '../nitro' import { normalizeRedactConfig } from '../redact' import { resolveEvlogConfigForNitroPlugin, setActiveNitroRuntime } from '../shared/nitroConfigBridge' +import { bindStreamingResponseLifecycle, shouldDeferEmitForResponse } from '../shared/streamResponse' import type { EnrichContext, RequestLogger, TailSamplingContext, WideEvent } from '../types' import { filterSafeHeaders } from '../utils' @@ -137,6 +138,8 @@ async function callEnrichAndDrain( await runner.runEnrich(enrichCtx) } + markWideEventDrainStarted(emittedEvent) + await callDrainHook(hooks, emittedEvent, event, hookContext) } @@ -229,29 +232,45 @@ export default definePlugin(async (nitroApp) => { const log = ctx?.log as RequestLogger | undefined if (!log || !ctx) return - const { status } = res - log.set({ status }) + const emitSuccessResponse = async (responseStatus: number) => { + log.set({ status: responseStatus }) - const startTime = ctx._evlogStartTime as number | undefined - const durationMs = startTime ? Date.now() - startTime : undefined + const startTime = ctx._evlogStartTime as number | undefined + const durationMs = startTime ? Date.now() - startTime : undefined - const { pathname } = parseURL(event.req.url) + const { pathname } = parseURL(event.req.url) - const tailCtx: TailSamplingContext = { - status, - duration: durationMs, - path: pathname, - method: event.req.method, - context: log.getContext(), - shouldKeep: false, + const tailCtx: TailSamplingContext = { + status: responseStatus, + duration: durationMs, + path: pathname, + method: event.req.method, + context: log.getContext(), + shouldKeep: false, + } + + await hooks.callHook('evlog:emit:keep', tailCtx) + const runner = getGlobalPluginRunner() + if (runner.hasKeep) await runner.runKeep(tailCtx) + + const emittedEvent = log.emit({ _forceKeep: tailCtx.shouldKeep }) + await callEnrichAndDrain(hooks, emittedEvent, event, res) } - await hooks.callHook('evlog:emit:keep', tailCtx) - const runner = getGlobalPluginRunner() - if (runner.hasKeep) await runner.runKeep(tailCtx) + if (shouldDeferEmitForResponse(res)) { + const wrapped = bindStreamingResponseLifecycle(res, async (meta) => { + if (meta.error) { + log.error(meta.error) + } + await emitSuccessResponse(meta.status ?? res.status) + }) + if (wrapped !== res && 'res' in event) { + (event as { res: Response }).res = wrapped + } + return + } - const emittedEvent = log.emit({ _forceKeep: tailCtx.shouldKeep }) - await callEnrichAndDrain(hooks, emittedEvent, event, res) + await emitSuccessResponse(res.status) }) hooks.hook('error', async (error, { event }) => { diff --git a/packages/evlog/src/nitro/plugin.ts b/packages/evlog/src/nitro/plugin.ts index 260fcaf1..1900f0d6 100644 --- a/packages/evlog/src/nitro/plugin.ts +++ b/packages/evlog/src/nitro/plugin.ts @@ -5,10 +5,11 @@ import type { NitroApp } from 'nitropack/types' // (nitropack dev loads plugins outside the bundle via Worker threads). import { defineNitroPlugin } from 'nitropack/runtime/internal/plugin' import { getHeaders } from 'h3' -import { createRequestLogger, getGlobalPluginRunner, initLogger, isEnabled } from '../logger' +import { createRequestLogger, getGlobalPluginRunner, initLogger, isEnabled, markWideEventDrainStarted } from '../logger' import { shouldLog, getServiceForPath, extractErrorStatus } from '../nitro' import { normalizeRedactConfig } from '../redact' import { resolveEvlogConfigForNitroPlugin, setActiveNitroRuntime } from '../shared/nitroConfigBridge' +import { bindStreamingResponseLifecycle, shouldDeferEmitForResponse } from '../shared/streamResponse' import { startStreamServer, type StreamServerOptions } from '../stream' import type { EnrichContext, RequestLogger, ServerEvent, TailSamplingContext, WideEvent } from '../types' import { filterSafeHeaders } from '../utils' @@ -90,6 +91,8 @@ async function callEnrichAndDrain( await runner.runEnrich(enrichCtx) } + markWideEventDrainStarted(emittedEvent) + const drainCtx = { event: emittedEvent, request: hookContext.request, @@ -245,7 +248,9 @@ export default defineNitroPlugin(async (nitroApp) => { if (e.context._evlogEmitted || !e.context._evlogShouldEmit) return const requestLog = e.context.log as RequestLogger | undefined - if (requestLog) { + if (!requestLog) return + + const emitSuccessResponse = async () => { const status = getResponseStatus(e) requestLog.set({ status }) @@ -268,5 +273,17 @@ export default defineNitroPlugin(async (nitroApp) => { const emittedEvent = requestLog.emit({ _forceKeep: tailCtx.shouldKeep }) await callEnrichAndDrain(nitroApp, emittedEvent, e) } + + if (e.response && shouldDeferEmitForResponse(e.response)) { + e.response = bindStreamingResponseLifecycle(e.response, async (meta) => { + if (meta.error) { + requestLog.error(meta.error) + } + await emitSuccessResponse() + }) + return + } + + await emitSuccessResponse() }) }) diff --git a/packages/evlog/src/orpc/index.ts b/packages/evlog/src/orpc/index.ts index 42722b75..dac2d197 100644 --- a/packages/evlog/src/orpc/index.ts +++ b/packages/evlog/src/orpc/index.ts @@ -111,7 +111,7 @@ export function withEvlog( options: EvlogOrpcOptions = {}, ): THandler { const handle: THandler['handle'] = async (request, callOptions) => { - const { skipped, finish, runWith, logger } = integration.start({ request }, options) + const { skipped, finish, finishResponse, runWith, logger } = integration.start({ request }, options) const initialContext = (callOptions as { context?: Record } | undefined)?.context ?? {} const finalOptions = { @@ -125,8 +125,11 @@ export function withEvlog( try { const result = await runWith(() => handler.handle(request, finalOptions)) - const status = result.matched ? result.response.status : 404 - await finish({ status }) + if (result.matched) { + result.response = await finishResponse(result.response, { status: result.response.status }) + } else { + await finish({ status: 404 }) + } return result } catch (error) { await finish({ error: error as Error }) diff --git a/packages/evlog/src/react-router/index.ts b/packages/evlog/src/react-router/index.ts index bbb331e3..58471259 100644 --- a/packages/evlog/src/react-router/index.ts +++ b/packages/evlog/src/react-router/index.ts @@ -55,7 +55,7 @@ export function evlog(options: EvlogReactRouterOptions = {}) { headers: extractSafeHeaders(request.headers), ...options, } - const { logger, finish, skipped } = createMiddlewareLogger(middlewareOpts) + const { logger, finish, finishResponse, skipped } = createMiddlewareLogger(middlewareOpts) if (skipped) { return next() @@ -66,8 +66,7 @@ export function evlog(options: EvlogReactRouterOptions = {}) { try { const response = await storage.run(logger, () => next()) - await finish({ status: response.status }) - return response + return finishResponse(response, { status: response.status }) } catch (error) { await finish({ error: error as Error }) throw error diff --git a/packages/evlog/src/shared/index.ts b/packages/evlog/src/shared/index.ts index db3252ca..9529c36e 100644 --- a/packages/evlog/src/shared/index.ts +++ b/packages/evlog/src/shared/index.ts @@ -5,8 +5,8 @@ * @see https://evlog.dev/extend/custom-framework */ +export * from './streamResponse' export * from './compose' -export * from './config' export * from './define' export * from './drain' export * from './enricher' diff --git a/packages/evlog/src/shared/middleware.ts b/packages/evlog/src/shared/middleware.ts index cddd765d..183fdec4 100644 --- a/packages/evlog/src/shared/middleware.ts +++ b/packages/evlog/src/shared/middleware.ts @@ -1,10 +1,11 @@ import type { DrainContext, EnrichContext, RedactConfig, RequestLogger, RouteConfig, TailSamplingContext, WideEvent } from '../types' -import { createRequestLogger, getGlobalDrain, getGlobalPluginRunner, isEnabled, shouldKeep } from '../logger' +import { createRequestLogger, getGlobalDrain, getGlobalPluginRunner, isEnabled, markWideEventDrainStarted, shouldKeep } from '../logger' import { isGloballyRedacted, redactEvent, resolveRedactConfig } from '../redact' import { extractErrorStatus } from './errors' import type { EvlogPlugin, PluginRunner } from './plugin' import { createPluginRunner, getEmptyPluginRunner } from './plugin' import { shouldLog, getServiceForPath } from './routes' +import { bindStreamingResponseLifecycle, shouldDeferEmitForResponse } from './streamResponse' /** * Base options shared by every framework integration. Re-exported via @@ -44,6 +45,11 @@ export interface MiddlewareLoggerOptions extends BaseEvlogOptions { export interface MiddlewareLoggerResult { logger: RequestLogger finish: (opts?: { status?: number; error?: Error }) => Promise + /** + * Finish request logging, deferring emit until a streaming response body completes. + * Returns the original response or a wrapped copy when the body is a stream. + */ + finishResponse: (response: Response, opts?: { status?: number }) => Promise skipped: boolean } @@ -62,6 +68,7 @@ const noopResult: MiddlewareLoggerResult = { }, }, finish: () => Promise.resolve(null), + finishResponse: (response) => Promise.resolve(response), skipped: true, } @@ -139,6 +146,8 @@ export async function runEnrichAndDrain( } } + markWideEventDrainStarted(emittedEvent) + const drain = options.drain ?? getGlobalDrain() const hasUserDrain = !!drain const hasPluginDrain = runner.hasDrain @@ -213,29 +222,51 @@ export function createMiddlewareLogger(options: MiddlewareLoggerOptions): Middle }) } - const finish = async (opts?: { status?: number; error?: Error }): Promise => { + const finish = (opts?: { status?: number; error?: Error }): Promise => { + return performFinish(logger, opts) + } + + const finishResponse = async (response: Response, opts?: { status?: number }): Promise => { + const status = opts?.status ?? response.status + if (!shouldDeferEmitForResponse(response)) { + await performFinish(logger, { status }) + return response + } + + return bindStreamingResponseLifecycle(response, async (meta) => { + await performFinish(logger, { + status: meta.status ?? status, + error: meta.error, + }) + }) + } + + async function performFinish( + requestLogger: RequestLogger, + opts?: { status?: number; error?: Error }, + ): Promise { const { status, error } = opts ?? {} if (error) { - logger.error(error) + requestLogger.error(error) const errorStatus = extractErrorStatus(error) - logger.set({ status: errorStatus }) + requestLogger.set({ status: errorStatus }) } else if (status !== undefined) { - logger.set({ status }) + requestLogger.set({ status }) } const durationMs = Date.now() - startTime const resolvedStatus = error ? extractErrorStatus(error) - : status ?? (logger.getContext().status as number | undefined) + : status ?? (requestLogger.getContext().status as number | undefined) const tailCtx: TailSamplingContext = { status: resolvedStatus, duration: durationMs, path, method, - context: logger.getContext(), + context: requestLogger.getContext(), shouldKeep: false, } @@ -247,7 +278,7 @@ export function createMiddlewareLogger(options: MiddlewareLoggerOptions): Middle } const forceKeep = tailCtx.shouldKeep || shouldKeep(tailCtx) - const emittedEvent = logger.emit({ _forceKeep: forceKeep }) + const emittedEvent = requestLogger.emit({ _forceKeep: forceKeep }) if ( emittedEvent @@ -258,7 +289,7 @@ export function createMiddlewareLogger(options: MiddlewareLoggerOptions): Middle if (pluginRunner.hasRequestLifecycle) { pluginRunner.runOnRequestFinish({ - logger, + logger: requestLogger, request: requestInfo, headers: options.headers, event: emittedEvent, @@ -271,5 +302,5 @@ export function createMiddlewareLogger(options: MiddlewareLoggerOptions): Middle return emittedEvent } - return { logger, finish, skipped: false } + return { logger, finish, finishResponse, skipped: false } } diff --git a/packages/evlog/src/shared/streamResponse.ts b/packages/evlog/src/shared/streamResponse.ts new file mode 100644 index 00000000..0ac35dad --- /dev/null +++ b/packages/evlog/src/shared/streamResponse.ts @@ -0,0 +1,115 @@ +export interface StreamCompleteMeta { + status: number + error?: Error +} + +/** + * Whether a {@link Response} carries a stream body that may outlive handler return. + */ +export function isStreamingResponse(response: Response): boolean { + return response.body !== null +} + +/** + * Whether framework integrations should defer wide-event emit until the response + * body finishes. Static string/JSON bodies are excluded even though they use a + * {@link ReadableStream} under the hood in the Fetch API. + */ +export function shouldDeferEmitForResponse(response: Response): boolean { + if (!response.body) return false + + const contentType = (response.headers.get('content-type') ?? '').toLowerCase() + + if (contentType.includes('text/event-stream')) return true + if (contentType.includes('application/x-ndjson')) return true + if (response.headers.has('x-vercel-ai-ui-message-stream')) return true + if (response.headers.get('transfer-encoding')?.toLowerCase().includes('chunked')) return true + + return false +} + +function toError(value: unknown): Error { + return value instanceof Error ? value : new Error(String(value)) +} + +function createObservedBody( + body: ReadableStream, + onDone: () => void | Promise, + onError: (error: unknown) => void | Promise, +): ReadableStream | null { + if (body.locked) { + void Promise.resolve(onError(new TypeError('stream is already locked'))).catch((err: unknown) => { + console.error('[evlog] stream error handling failed:', err) + }) + return null + } + + const reader = body.getReader() + let settled = false + + const settle = (fn: () => void | Promise) => { + if (settled) return + settled = true + void Promise.resolve(fn()).catch((err: unknown) => { + console.error('[evlog] stream completion handling failed:', err) + }) + } + + return new ReadableStream({ + async pull(controller) { + try { + const { done, value } = await reader.read() + if (done) { + settle(onDone) + controller.close() + return + } + controller.enqueue(value) + } catch (err) { + settle(() => onError(err)) + controller.error(err) + } + }, + + async cancel(reason) { + try { + await reader.cancel(reason) + } finally { + settle(onDone) + } + }, + }) +} + +/** + * Observe a streaming {@link Response} body and invoke `onComplete` once when + * the body closes, errors, or is cancelled. Preserves status, headers, and chunks. + * + * Non-streaming responses invoke `onComplete` immediately. + * + * @internal Used by framework integrations to defer wide-event emit until streams finish. + */ +export function bindStreamingResponseLifecycle( + response: Response, + onComplete: (meta: StreamCompleteMeta) => void | Promise, +): Response { + if (!response.body) { + void Promise.resolve(onComplete({ status: response.status })).catch((err: unknown) => { + console.error('[evlog] stream completion handling failed:', err) + }) + return response + } + + const body = createObservedBody( + response.body, + () => onComplete({ status: response.status }), + (err) => onComplete({ status: response.status, error: toError(err) }), + ) + if (!body) return response + + return new Response(body, { + status: response.status, + statusText: response.statusText, + headers: response.headers, + }) +} diff --git a/packages/evlog/src/sveltekit/index.ts b/packages/evlog/src/sveltekit/index.ts index db09d2b4..505a9624 100644 --- a/packages/evlog/src/sveltekit/index.ts +++ b/packages/evlog/src/sveltekit/index.ts @@ -136,7 +136,7 @@ export function evlog(options: EvlogSvelteKitOptions = {}): SvelteKitHandle { headers: extractSafeHeaders(event.request.headers), ...options, } - const { logger, finish, skipped } = createMiddlewareLogger(middlewareOpts) + const { logger, finish, finishResponse, skipped } = createMiddlewareLogger(middlewareOpts) if (skipped) { return await resolve(event) @@ -164,8 +164,7 @@ export function evlog(options: EvlogSvelteKitOptions = {}): SvelteKitHandle { }) } - await finish({ status: response.status }) - return response + return finishResponse(response) } catch (error) { await finish({ error: error instanceof Error ? error : new Error(String(error)) }) diff --git a/packages/evlog/test/core/logger-request-logger.test.ts b/packages/evlog/test/core/logger-request-logger.test.ts index 7db9ebb7..7f2f6e06 100644 --- a/packages/evlog/test/core/logger-request-logger.test.ts +++ b/packages/evlog/test/core/logger-request-logger.test.ts @@ -516,6 +516,18 @@ describe('createRequestLogger', () => { warnSpy.mockRestore() }) + it('merges ai fields onto the emitted wide event before drain starts', () => { + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) + const logger = createRequestLogger({ method: 'POST', path: '/api/chat', requestId: 'r1' }) + const emitted = logger.emit({ status: 200 }) + logger.set({ ai: { calls: 1, totalTokens: 42 } }) + expect(warnSpy).not.toHaveBeenCalled() + expect(emitted?.ai).toEqual({ calls: 1, totalTokens: 42 }) + logger.set({ action: 'chat' }) + expect(warnSpy).toHaveBeenCalled() + warnSpy.mockRestore() + }) + it('seals logger when emit returns null due to sampling', () => { const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) initLogger({ diff --git a/packages/evlog/test/frameworks/sveltekit.test.ts b/packages/evlog/test/frameworks/sveltekit.test.ts index 8b78878d..13e3b0a1 100644 --- a/packages/evlog/test/frameworks/sveltekit.test.ts +++ b/packages/evlog/test/frameworks/sveltekit.test.ts @@ -589,4 +589,45 @@ describe('evlog/sveltekit', () => { expect(result.why).toBe('Card declined') }) }) + + it('defers emit for streaming responses until the body completes (#321)', async () => { + const { drain } = createPipelineSpies() + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) + const handle = evlog({ drain }) + const event = createMockEvent('POST', '/api/chat') + + let closeStream!: () => void + const encoder = new TextEncoder() + const resolve = vi.fn(() => { + const log = useLogger() + let close: (() => void) | undefined + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode('hello')) + close = () => { + controller.enqueue(encoder.encode(' world')) + controller.close() + } + }, + }) + closeStream = defined(close, 'close stream') + queueMicrotask(() => { + log.set({ ai: { calls: 1, totalTokens: 42 } }) + }) + return new Response(stream, { + status: 200, + headers: { 'content-type': 'text/event-stream' }, + }) + }) + + const response = await handle({ event, resolve }) + expect(drain).not.toHaveBeenCalled() + + closeStream() + await expect(response.text()).resolves.toBe('hello world') + await waitForDrainCalls(drain) + + expect(warnSpy.mock.calls.some(([message]) => String(message).includes('Keys dropped: ai'))).toBe(false) + expect(findEventViaDrain(drain, e => e.path === '/api/chat')?.ai).toEqual({ calls: 1, totalTokens: 42 }) + }) }) diff --git a/packages/evlog/test/next/handler.test.ts b/packages/evlog/test/next/handler.test.ts index fdc7f388..9a4a6fdb 100644 --- a/packages/evlog/test/next/handler.test.ts +++ b/packages/evlog/test/next/handler.test.ts @@ -2,6 +2,26 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { createWithEvlog } from '../../src/next/handler' import { evlogStorage } from '../../src/next/storage' import { initLogger } from '../../src/logger' +import { defined } from '../helpers/defined' + +const encoder = new TextEncoder() + +function createDeferredStream() { + let close: (() => void) | undefined + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode('hello')) + close = () => { + controller.enqueue(encoder.encode(' world')) + controller.close() + } + }, + }) + return { + stream, + close: () => defined(close, 'close stream')(), + } +} // Mock next/server to prevent import errors vi.mock('next/server', () => ({ @@ -366,4 +386,36 @@ describe('withEvlog', () => { expect(drainCtx.headers['x-custom']).toBe('safe-value') expect(drainCtx.headers['content-type']).toBe('application/json') }) + + it('defers emit for streaming responses until the body completes (#321)', async () => { + const drainMock = vi.fn() + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) + const withEvlog = createWithEvlog({ pretty: false, drain: drainMock }) + + let closeStream!: () => void + const handler = withEvlog((_request: Request) => { + const log = defined(evlogStorage.getStore(), 'request logger') + const { stream, close } = createDeferredStream() + closeStream = close + queueMicrotask(() => { + log.set({ ai: { calls: 1, totalTokens: 42 } }) + }) + return new Response(stream, { + status: 200, + headers: { 'content-type': 'text/event-stream' }, + }) + }) + + const response = await handler(new Request('http://localhost/api/chat', { method: 'POST' })) + expect(drainMock).not.toHaveBeenCalled() + + closeStream() + await expect(response.text()).resolves.toBe('hello world') + await vi.waitFor(() => { + expect(drainMock).toHaveBeenCalledTimes(1) + }) + + expect(warnSpy.mock.calls.some(([message]) => String(message).includes('Keys dropped: ai'))).toBe(false) + expect(drainMock.mock.calls[0]?.[0]?.event?.ai).toEqual({ calls: 1, totalTokens: 42 }) + }) }) diff --git a/packages/evlog/test/nitro/plugin.test.ts b/packages/evlog/test/nitro/plugin.test.ts index 74c5a6b1..a18c9622 100644 --- a/packages/evlog/test/nitro/plugin.test.ts +++ b/packages/evlog/test/nitro/plugin.test.ts @@ -974,3 +974,69 @@ describe('nitro plugin - middleware compatibility (#210)', () => { expect(apiEvent.context._evlogShouldEmit).toBe(true) }) }) + +describe('nitro plugin - streaming emit defer (#321)', () => { + const encoder = new TextEncoder() + + function createDeferredStream() { + let close: (() => void) | undefined + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode('hello')) + close = () => { + controller.enqueue(encoder.encode(' world')) + controller.close() + } + }, + }) + return { + stream, + close: () => defined(close, 'close stream')(), + } + } + + beforeEach(() => { + initLogger({ env: { service: 'test-app' }, pretty: false, silent: true, _suppressDrainWarning: true }) + }) + + afterEach(() => { + vi.restoreAllMocks() + }) + + it('defers afterResponse emit until a streaming body completes', async () => { + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) + const event: ServerEvent = { method: 'POST', path: '/api/chat', context: {} } + event.context._evlogShouldEmit = true + event.context._evlogStartTime = Date.now() + event.context.log = createRequestLogger( + { method: 'POST', path: '/api/chat', requestId: 'req-1' }, + { _deferDrain: true }, + ) + + const { stream, close } = createDeferredStream() + event.response = new Response(stream, { + status: 200, + headers: { 'content-type': 'text/event-stream' }, + }) + + queueMicrotask(() => { + defined(event.context.log, 'request logger').set({ ai: { calls: 1, totalTokens: 42 } }) + }) + + const { bindStreamingResponseLifecycle } = await import('../../src/shared/streamResponse') + let emitCount = 0 + event.response = bindStreamingResponseLifecycle(event.response, () => { + emitCount++ + defined(event.context.log, 'request logger').emit({ status: 200 }) + }) + + expect(emitCount).toBe(0) + close() + await expect(event.response.text()).resolves.toBe('hello world') + await vi.waitFor(() => { + expect(emitCount).toBe(1) + }) + expect(warnSpy.mock.calls.some(([message]) => String(message).includes('Keys dropped: ai'))).toBe(false) + expect(defined(event.context.log, 'request logger').getContext().ai).toEqual({ calls: 1, totalTokens: 42 }) + }) +}) diff --git a/packages/evlog/test/shared/streamResponse.test.ts b/packages/evlog/test/shared/streamResponse.test.ts new file mode 100644 index 00000000..16cc03dc --- /dev/null +++ b/packages/evlog/test/shared/streamResponse.test.ts @@ -0,0 +1,130 @@ +import { describe, expect, it, vi } from 'vitest' +import { bindStreamingResponseLifecycle, isStreamingResponse, shouldDeferEmitForResponse } from '../../src/shared/streamResponse' +import { defined } from '../helpers/defined' + +const encoder = new TextEncoder() + +function createDeferredStream() { + let close: (() => void) | undefined + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode('hello')) + close = () => { + controller.enqueue(encoder.encode(' world')) + controller.close() + } + }, + }) + return { + stream, + close: () => defined(close, 'close stream')(), + } +} + +function createImmediateStream(chunks: string[]) { + return new ReadableStream({ + start(controller) { + for (const chunk of chunks) { + controller.enqueue(encoder.encode(chunk)) + } + controller.close() + }, + }) +} + +describe('streamResponse', () => { + it('isStreamingResponse detects readable bodies', () => { + expect(isStreamingResponse(new Response(createImmediateStream(['a'])))).toBe(true) + expect(isStreamingResponse(new Response(null, { status: 204 }))).toBe(false) + }) + + it('shouldDeferEmitForResponse excludes static string bodies', () => { + expect(shouldDeferEmitForResponse(new Response('ok'))).toBe(false) + expect(shouldDeferEmitForResponse(new Response(createImmediateStream(['a']), { + headers: { 'content-type': 'text/event-stream' }, + }))).toBe(true) + expect(shouldDeferEmitForResponse(new Response(createImmediateStream(['a']), { + headers: { 'content-type': 'text/plain; charset=utf-8', 'x-vercel-ai-ui-message-stream': 'v1' }, + }))).toBe(true) + }) + + it('invokes onComplete when the body finishes', async () => { + const onComplete = vi.fn() + const source = createDeferredStream() + const response = bindStreamingResponseLifecycle( + new Response(source.stream, { status: 202 }), + onComplete, + ) + + const read = response.text() + source.close() + await expect(read).resolves.toBe('hello world') + await vi.waitFor(() => { + expect(onComplete).toHaveBeenCalledWith({ status: 202 }) + }) + }) + + it('preserves status, headers, and chunks', async () => { + const onComplete = vi.fn() + const response = bindStreamingResponseLifecycle( + new Response(createImmediateStream(['a', 'b']), { + status: 207, + statusText: 'Multi-Status', + headers: { 'x-stream': 'yes' }, + }), + onComplete, + ) + + expect(response.status).toBe(207) + expect(response.statusText).toBe('Multi-Status') + expect(response.headers.get('x-stream')).toBe('yes') + await expect(response.text()).resolves.toBe('ab') + expect(onComplete).toHaveBeenCalledWith({ status: 207 }) + }) + + it('records stream errors on complete', async () => { + const onComplete = vi.fn() + const source = new ReadableStream({ + pull() { + throw new Error('stream exploded') + }, + }) + const response = bindStreamingResponseLifecycle(new Response(source, { status: 500 }), onComplete) + + await expect(response.text()).rejects.toThrow('stream exploded') + await vi.waitFor(() => { + expect(onComplete).toHaveBeenCalledWith(expect.objectContaining({ + status: 500, + error: expect.objectContaining({ message: 'stream exploded' }), + })) + }) + }) + + it('returns the original response when the body is already locked', async () => { + const onComplete = vi.fn() + const original = new Response(createImmediateStream(['locked']), { status: 200 }) + const body = defined(original.body, 'response body') + const reader = body.getReader() + + try { + const response = bindStreamingResponseLifecycle(original, onComplete) + expect(response).toBe(original) + await vi.waitFor(() => { + expect(onComplete).toHaveBeenCalledWith(expect.objectContaining({ + error: expect.objectContaining({ message: 'stream is already locked' }), + })) + }) + } finally { + reader.releaseLock() + } + }) + + it('calls onComplete immediately for responses without a body', async () => { + const onComplete = vi.fn() + const response = bindStreamingResponseLifecycle(new Response(null, { status: 204 }), onComplete) + expect(response.status).toBe(204) + await vi.waitFor(() => { + expect(onComplete).toHaveBeenCalledWith({ status: 204 }) + }) + }) +}) diff --git a/packages/evlog/test/toolkit/__snapshots__/api-surface.test.ts.snap b/packages/evlog/test/toolkit/__snapshots__/api-surface.test.ts.snap index 9209ce33..81c71343 100644 --- a/packages/evlog/test/toolkit/__snapshots__/api-surface.test.ts.snap +++ b/packages/evlog/test/toolkit/__snapshots__/api-surface.test.ts.snap @@ -220,6 +220,7 @@ exports[`public API surface > matches snapshot for all subpath exports 1`] = ` "OTEL_SEVERITY_NUMBER", "OTEL_SEVERITY_TEXT", "attachForkToLogger", + "bindStreamingResponseLifecycle", "composeDrains", "composeEnrichers", "composeKeep", @@ -241,14 +242,14 @@ exports[`public API surface > matches snapshot for all subpath exports 1`] = ` "forkBackgroundLogger", "getEmptyPluginRunner", "getHeader", - "getRuntimeConfig", "getServiceForPath", "httpPost", + "isStreamingResponse", "mergeEventField", "normalizeNumber", - "resolveAdapterConfig", "resolveMiddlewarePluginRunner", "runEnrichAndDrain", + "shouldDeferEmitForResponse", "shouldLog", "toLoggerConfig", "toMiddlewareOptions", From 643fec259bc04ccf90255a6f0b467def40ec68f1 Mon Sep 17 00:00:00 2001 From: Hugo Richard Date: Sun, 7 Jun 2026 14:34:36 +0100 Subject: [PATCH 2/3] fix: address CodeRabbit review on streaming AI emit defer Co-authored-by: Hussain Arslan --- .changeset/streaming-ai-emit-defer.md | 4 +++- apps/docs/content/2.learn/2.wide-events.md | 2 +- .../3.integrate/frameworks/04.nitro.md | 2 +- packages/evlog/src/logger.ts | 3 ++- packages/evlog/src/next/handler.ts | 5 +++-- packages/evlog/src/shared/streamResponse.ts | 6 +++++ .../test/core/logger-request-logger.test.ts | 15 ++++++++++++- packages/evlog/test/helpers/stream.ts | 22 +++++++++++++++++++ packages/evlog/test/next/handler.test.ts | 20 +---------------- packages/evlog/test/nitro/plugin.test.ts | 20 +---------------- .../evlog/test/shared/streamResponse.test.ts | 18 +-------------- 11 files changed, 55 insertions(+), 62 deletions(-) create mode 100644 packages/evlog/test/helpers/stream.ts diff --git a/.changeset/streaming-ai-emit-defer.md b/.changeset/streaming-ai-emit-defer.md index 02c8a02e..92738c21 100644 --- a/.changeset/streaming-ai-emit-defer.md +++ b/.changeset/streaming-ai-emit-defer.md @@ -2,6 +2,8 @@ 'evlog': minor --- -Defer wide-event emit for streaming HTTP responses (SSE, AI SDK UI streams, chunked bodies) until the response body finishes, so `createAILogger()` metadata is included on the same request event instead of triggering post-emit warnings. Applies to Next.js `withEvlog`, SvelteKit, Hono, React Router, oRPC, and Nitro/Nuxt integrations. Also merges late `ai` fields onto an emitted event before enrich/drain when metadata arrives in a narrow race window. +Defer wide-event emit for streaming HTTP responses (SSE, AI SDK UI streams, chunked bodies) until the response body finishes, so `createAILogger()` metadata is included on the same request event instead of triggering post-emit warnings. + +Applies to Next.js `withEvlog`, SvelteKit, Hono, React Router, oRPC, and Nitro/Nuxt integrations. Also merges late `ai` fields onto an emitted event before enrich/drain when metadata arrives in a narrow race window. Fixes #321 diff --git a/apps/docs/content/2.learn/2.wide-events.md b/apps/docs/content/2.learn/2.wide-events.md index e09fd3b9..81847ebe 100644 --- a/apps/docs/content/2.learn/2.wide-events.md +++ b/apps/docs/content/2.learn/2.wide-events.md @@ -188,7 +188,7 @@ The parent wide event may be emitted **before** the child event; they are two se **Not available yet:** Hono (no `useLogger` without `c.get('log')` + ALS) and Nitro/Nuxt `useLogger(event)` — use the post-emit warnings to catch mistakes; a different API may arrive later for event-scoped forks. -For AI SDK streaming responses, supported framework integrations defer wide-event emit until the response body finishes, so `createAILogger(log)` metadata lands on the same request event automatically. +For AI SDK streaming responses, supported framework integrations (Next.js, Nitro/Nuxt, SvelteKit, Hono, React Router, oRPC) defer wide-event emit until the response body finishes, so `createAILogger(log)` metadata lands on the same request event automatically. ```typescript [server/routes/checkout.post.ts] import { evlog, useLogger } from 'evlog/express' diff --git a/apps/docs/content/3.integrate/frameworks/04.nitro.md b/apps/docs/content/3.integrate/frameworks/04.nitro.md index d50d83e2..5059b1c3 100644 --- a/apps/docs/content/3.integrate/frameworks/04.nitro.md +++ b/apps/docs/content/3.integrate/frameworks/04.nitro.md @@ -125,7 +125,7 @@ One request, one log line with all context: └─ requestId: a1b2c3d4-... ``` -Nitro uses **`useLogger(event)`** (event-bound scope), not `AsyncLocalStorage`, so **`log.fork()` is not available** here yet. For AI SDK streaming responses, evlog defers wide-event emit until the response body finishes so `createAILogger(log)` metadata stays on the same request event. Post-emit warnings still apply if code calls `set()` after the wide event was emitted. See [Wide events — After emit](/learn/wide-events#after-emit-sealing-and-background-work). +Nitro uses **`useLogger(event)`** (event-bound scope), not `AsyncLocalStorage`, so **`log.fork()` is not available** here yet. For AI SDK streaming responses, evlog defers wide-event emit until the response body finishes so `createAILogger(log)` metadata stays on the same request event. Post-emit warnings only apply when code calls `set()` after the wide event has actually emitted — for example in non-streaming handlers or background work. See [Wide events — After emit](/learn/wide-events#after-emit-sealing-and-background-work). ## Error Handling diff --git a/packages/evlog/src/logger.ts b/packages/evlog/src/logger.ts index 5d1bd4b9..04d179a6 100644 --- a/packages/evlog/src/logger.ts +++ b/packages/evlog/src/logger.ts @@ -890,7 +890,8 @@ export function createLogger>(initial emitted = true pendingWideEvent = wide if (wide) { - pendingDrainState.set(wide, { drainStarted: false }) + // Only enable the AI merge window when middleware defers drain until finish. + pendingDrainState.set(wide, { drainStarted: !deferDrain }) } return wide }, diff --git a/packages/evlog/src/next/handler.ts b/packages/evlog/src/next/handler.ts index 6baaf667..33134d16 100644 --- a/packages/evlog/src/next/handler.ts +++ b/packages/evlog/src/next/handler.ts @@ -230,12 +230,13 @@ export function createWithEvlog(options: NextEvlogOptions) { const requestInfo = { method, path, requestId } if (result instanceof Response && shouldDeferEmitForResponse(result)) { - logger.set({ status: result.status }) const wrapped = bindStreamingResponseLifecycle(result, async (meta) => { if (meta.error) { logger.error(meta.error) } - await emitRequestEvent(logger, requestInfo, headers, meta.status ?? result.status) + const finalStatus = meta.status ?? result.status + logger.set({ status: finalStatus }) + await emitRequestEvent(logger, requestInfo, headers, finalStatus) }) return wrapped as Awaited } diff --git a/packages/evlog/src/shared/streamResponse.ts b/packages/evlog/src/shared/streamResponse.ts index 0ac35dad..900fbc17 100644 --- a/packages/evlog/src/shared/streamResponse.ts +++ b/packages/evlog/src/shared/streamResponse.ts @@ -1,5 +1,11 @@ +/** + * Metadata passed to streaming response completion callbacks. + * Reports the HTTP status and any error that occurred while reading the body. + */ export interface StreamCompleteMeta { + /** Final HTTP status code for the response. */ status: number + /** Present when the stream body failed before or during completion. */ error?: Error } diff --git a/packages/evlog/test/core/logger-request-logger.test.ts b/packages/evlog/test/core/logger-request-logger.test.ts index 7f2f6e06..862a4a32 100644 --- a/packages/evlog/test/core/logger-request-logger.test.ts +++ b/packages/evlog/test/core/logger-request-logger.test.ts @@ -518,7 +518,10 @@ describe('createRequestLogger', () => { it('merges ai fields onto the emitted wide event before drain starts', () => { const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) - const logger = createRequestLogger({ method: 'POST', path: '/api/chat', requestId: 'r1' }) + const logger = createRequestLogger( + { method: 'POST', path: '/api/chat', requestId: 'r1' }, + { _deferDrain: true }, + ) const emitted = logger.emit({ status: 200 }) logger.set({ ai: { calls: 1, totalTokens: 42 } }) expect(warnSpy).not.toHaveBeenCalled() @@ -528,6 +531,16 @@ describe('createRequestLogger', () => { warnSpy.mockRestore() }) + it('does not merge ai fields after immediate drain when deferDrain is false', () => { + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) + const logger = createRequestLogger({ method: 'POST', path: '/api/chat', requestId: 'r1' }) + logger.emit({ status: 200 }) + logger.set({ ai: { calls: 1, totalTokens: 42 } }) + expect(warnSpy).toHaveBeenCalled() + expect(String(warnSpy.mock.calls[0]?.[0])).toContain('Keys dropped: ai') + warnSpy.mockRestore() + }) + it('seals logger when emit returns null due to sampling', () => { const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) initLogger({ diff --git a/packages/evlog/test/helpers/stream.ts b/packages/evlog/test/helpers/stream.ts new file mode 100644 index 00000000..91708ab0 --- /dev/null +++ b/packages/evlog/test/helpers/stream.ts @@ -0,0 +1,22 @@ +const encoder = new TextEncoder() + +/** ReadableStream that stays open until {@link createDeferredStream.close} is called. */ +export function createDeferredStream() { + let close: (() => void) | undefined + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode('hello')) + close = () => { + controller.enqueue(encoder.encode(' world')) + controller.close() + } + }, + }) + return { + stream, + close: () => { + if (!close) throw new Error('close not initialized') + close() + }, + } +} diff --git a/packages/evlog/test/next/handler.test.ts b/packages/evlog/test/next/handler.test.ts index 9a4a6fdb..62d78e8b 100644 --- a/packages/evlog/test/next/handler.test.ts +++ b/packages/evlog/test/next/handler.test.ts @@ -3,25 +3,7 @@ import { createWithEvlog } from '../../src/next/handler' import { evlogStorage } from '../../src/next/storage' import { initLogger } from '../../src/logger' import { defined } from '../helpers/defined' - -const encoder = new TextEncoder() - -function createDeferredStream() { - let close: (() => void) | undefined - const stream = new ReadableStream({ - start(controller) { - controller.enqueue(encoder.encode('hello')) - close = () => { - controller.enqueue(encoder.encode(' world')) - controller.close() - } - }, - }) - return { - stream, - close: () => defined(close, 'close stream')(), - } -} +import { createDeferredStream } from '../helpers/stream' // Mock next/server to prevent import errors vi.mock('next/server', () => ({ diff --git a/packages/evlog/test/nitro/plugin.test.ts b/packages/evlog/test/nitro/plugin.test.ts index a18c9622..94c8c447 100644 --- a/packages/evlog/test/nitro/plugin.test.ts +++ b/packages/evlog/test/nitro/plugin.test.ts @@ -2,6 +2,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { getHeaders } from 'h3' import type { DrainContext, RouteConfig, ServerEvent, WideEvent } from '../../src/types' import { defined } from '../helpers/defined' +import { createDeferredStream } from '../helpers/stream' import { filterSafeHeaders } from '../../src/utils' import { getServiceForPath, shouldLog } from '../../src/shared/routes' import { createRequestLogger, initLogger } from '../../src/logger' @@ -976,25 +977,6 @@ describe('nitro plugin - middleware compatibility (#210)', () => { }) describe('nitro plugin - streaming emit defer (#321)', () => { - const encoder = new TextEncoder() - - function createDeferredStream() { - let close: (() => void) | undefined - const stream = new ReadableStream({ - start(controller) { - controller.enqueue(encoder.encode('hello')) - close = () => { - controller.enqueue(encoder.encode(' world')) - controller.close() - } - }, - }) - return { - stream, - close: () => defined(close, 'close stream')(), - } - } - beforeEach(() => { initLogger({ env: { service: 'test-app' }, pretty: false, silent: true, _suppressDrainWarning: true }) }) diff --git a/packages/evlog/test/shared/streamResponse.test.ts b/packages/evlog/test/shared/streamResponse.test.ts index 16cc03dc..2f0584ab 100644 --- a/packages/evlog/test/shared/streamResponse.test.ts +++ b/packages/evlog/test/shared/streamResponse.test.ts @@ -1,26 +1,10 @@ import { describe, expect, it, vi } from 'vitest' import { bindStreamingResponseLifecycle, isStreamingResponse, shouldDeferEmitForResponse } from '../../src/shared/streamResponse' import { defined } from '../helpers/defined' +import { createDeferredStream } from '../helpers/stream' const encoder = new TextEncoder() -function createDeferredStream() { - let close: (() => void) | undefined - const stream = new ReadableStream({ - start(controller) { - controller.enqueue(encoder.encode('hello')) - close = () => { - controller.enqueue(encoder.encode(' world')) - controller.close() - } - }, - }) - return { - stream, - close: () => defined(close, 'close stream')(), - } -} - function createImmediateStream(chunks: string[]) { return new ReadableStream({ start(controller) { From 736b71245aa62834c5b8585df09249061a4988b7 Mon Sep 17 00:00:00 2001 From: Hugo Richard Date: Sun, 7 Jun 2026 14:50:21 +0100 Subject: [PATCH 3/3] fix: restore toolkit config exports dropped in streaming PR Co-authored-by: Hussain Arslan --- packages/evlog/src/shared/index.ts | 1 + .../evlog/test/toolkit/__snapshots__/api-surface.test.ts.snap | 2 ++ 2 files changed, 3 insertions(+) diff --git a/packages/evlog/src/shared/index.ts b/packages/evlog/src/shared/index.ts index 9529c36e..68ea8343 100644 --- a/packages/evlog/src/shared/index.ts +++ b/packages/evlog/src/shared/index.ts @@ -7,6 +7,7 @@ export * from './streamResponse' export * from './compose' +export * from './config' export * from './define' export * from './drain' export * from './enricher' diff --git a/packages/evlog/test/toolkit/__snapshots__/api-surface.test.ts.snap b/packages/evlog/test/toolkit/__snapshots__/api-surface.test.ts.snap index 81c71343..4ea18414 100644 --- a/packages/evlog/test/toolkit/__snapshots__/api-surface.test.ts.snap +++ b/packages/evlog/test/toolkit/__snapshots__/api-surface.test.ts.snap @@ -242,11 +242,13 @@ exports[`public API surface > matches snapshot for all subpath exports 1`] = ` "forkBackgroundLogger", "getEmptyPluginRunner", "getHeader", + "getRuntimeConfig", "getServiceForPath", "httpPost", "isStreamingResponse", "mergeEventField", "normalizeNumber", + "resolveAdapterConfig", "resolveMiddlewarePluginRunner", "runEnrichAndDrain", "shouldDeferEmitForResponse",