diff --git a/.changeset/streaming-ai-emit-defer.md b/.changeset/streaming-ai-emit-defer.md new file mode 100644 index 00000000..92738c21 --- /dev/null +++ b/.changeset/streaming-ai-emit-defer.md @@ -0,0 +1,9 @@ +--- +'evlog': minor +--- + +Defer wide-event emit for streaming HTTP responses (SSE, AI SDK UI streams, chunked bodies) until the response body finishes, so `createAILogger()` metadata is included on the same request event instead of triggering post-emit warnings. + +Applies to Next.js `withEvlog`, SvelteKit, Hono, React Router, oRPC, and Nitro/Nuxt integrations. Also merges late `ai` fields onto an emitted event before enrich/drain when metadata arrives in a narrow race window. + +Fixes #321 diff --git a/apps/docs/content/2.learn/2.wide-events.md b/apps/docs/content/2.learn/2.wide-events.md index 5e6db6d6..a3b5a915 100644 --- a/apps/docs/content/2.learn/2.wide-events.md +++ b/apps/docs/content/2.learn/2.wide-events.md @@ -188,6 +188,8 @@ The parent wide event may be emitted **before** the child event; they are two se **Not available yet:** Hono (no `useLogger` without `c.get('log')` + ALS) and Nitro/Nuxt `useLogger(event)` — use the post-emit warnings to catch mistakes; a different API may arrive later for event-scoped forks. +For AI SDK streaming responses, supported framework integrations (Next.js, Nitro/Nuxt, SvelteKit, Hono, React Router, oRPC) defer wide-event emit until the response body finishes, so `createAILogger(log)` metadata lands on the same request event automatically. + ```typescript [server/routes/checkout.post.ts] import { evlog, useLogger } from 'evlog/express' diff --git a/apps/docs/content/3.integrate/frameworks/02.nextjs.md b/apps/docs/content/3.integrate/frameworks/02.nextjs.md index 26adc686..1029f6cb 100644 --- a/apps/docs/content/3.integrate/frameworks/02.nextjs.md +++ b/apps/docs/content/3.integrate/frameworks/02.nextjs.md @@ -279,7 +279,7 @@ export const POST = withEvlog(async (request: Request) => { }) ``` -All fields are merged into a single wide event emitted when the handler completes: +All fields are merged into a single wide event emitted when the handler completes (or when a streaming response body finishes, so AI SDK metadata is included): ```bash [Output (Pretty)] 10:23:45.612 INFO [my-app] POST /api/checkout 200 in 145ms diff --git a/apps/docs/content/3.integrate/frameworks/04.nitro.md b/apps/docs/content/3.integrate/frameworks/04.nitro.md index ffc74497..5059b1c3 100644 --- a/apps/docs/content/3.integrate/frameworks/04.nitro.md +++ b/apps/docs/content/3.integrate/frameworks/04.nitro.md @@ -125,7 +125,7 @@ One request, one log line with all context: └─ requestId: a1b2c3d4-... ``` -Nitro uses **`useLogger(event)`** (event-bound scope), not `AsyncLocalStorage`, so **`log.fork()` is not available** here yet. Post-emit warnings still apply if code calls `set()` after the wide event was emitted. See [Wide events — After emit](/learn/wide-events#after-emit-sealing-and-background-work). +Nitro uses **`useLogger(event)`** (event-bound scope), not `AsyncLocalStorage`, so **`log.fork()` is not available** here yet. For AI SDK streaming responses, evlog defers wide-event emit until the response body finishes so `createAILogger(log)` metadata stays on the same request event. Post-emit warnings only apply when code calls `set()` after the wide event has actually emitted — for example in non-streaming handlers or background work. See [Wide events — After emit](/learn/wide-events#after-emit-sealing-and-background-work). ## Error Handling diff --git a/apps/docs/content/4.use-cases/2.ai-sdk/02.usage.md b/apps/docs/content/4.use-cases/2.ai-sdk/02.usage.md index ef9d4d23..d39ad44d 100644 --- a/apps/docs/content/4.use-cases/2.ai-sdk/02.usage.md +++ b/apps/docs/content/4.use-cases/2.ai-sdk/02.usage.md @@ -19,6 +19,8 @@ links: Every pattern below uses the same `createAILogger(log)` setup. Wrap the model with `ai.wrap()` and the middleware accumulates tokens, tools, and timing on the wide event automatically. +On Next.js, Nuxt/Nitro, SvelteKit, Hono, React Router, and oRPC, evlog defers wide-event emit for streaming responses (for example `text/event-stream` and AI SDK UI streams) until the body finishes, so late `ai` metadata stays on the same request event. + ## streamText The most common pattern — streaming chat with full observability: diff --git a/packages/evlog/src/hono/index.ts b/packages/evlog/src/hono/index.ts index c3e1faf3..32f38ee9 100644 --- a/packages/evlog/src/hono/index.ts +++ b/packages/evlog/src/hono/index.ts @@ -2,6 +2,7 @@ import type { Context, MiddlewareHandler } from 'hono' import type { RequestLogger } from '../types' import { defineFrameworkIntegration } from '../shared/integration' import type { BaseEvlogOptions } from '../shared/middleware' +import { shouldDeferEmitForResponse } from '../shared/streamResponse' export type EvlogHonoOptions = BaseEvlogOptions @@ -54,13 +55,20 @@ const integration = defineFrameworkIntegration({ */ export function evlog(options: EvlogHonoOptions = {}): MiddlewareHandler { return async (c, next) => { - const { skipped, finish } = integration.start(c, options) + const { skipped, finish, finishResponse } = integration.start(c, options) if (skipped) { await next() return } try { await next() + if (shouldDeferEmitForResponse(c.res)) { + const response = new Response(c.res.body, { + status: c.res.status, + headers: c.res.headers, + }) + return finishResponse(response, { status: response.status }) + } await finish({ status: c.res.status }) } catch (error) { await finish({ error: error as Error }) diff --git a/packages/evlog/src/logger.ts b/packages/evlog/src/logger.ts index 7d99f9f8..4c86569d 100644 --- a/packages/evlog/src/logger.ts +++ b/packages/evlog/src/logger.ts @@ -45,6 +45,25 @@ function mergeInto(target: Record, source: Record() + +function isAiOnlyFieldUpdate(data: Record): boolean { + const keys = Object.keys(data) + return keys.length === 1 && keys[0] === 'ai' +} + +/** + * Mark a wide event as past the post-emit AI merge window so late `log.set({ ai })` + * calls warn again. Called by framework enrich/drain pipelines before drain runs. + * + * @internal Used by middleware and framework integrations. + */ +export function markWideEventDrainStarted(event: WideEvent | null): void { + if (!event) return + const state = pendingDrainState.get(event) + if (state) state.drainStarted = true +} + /** * @internal Wide-event field merge — exported for test mocks that mirror emit accumulation. */ @@ -777,6 +796,7 @@ export function createLogger>(initial let hasWarn = false let manualLevel: LogLevel | undefined let emitted = false + let pendingWideEvent: WideEvent | null = null function addLog(level: 'info' | 'warn', message: string): void { if (!Array.isArray(context.requestLogs)) { @@ -811,7 +831,18 @@ export function createLogger>(initial audit: auditMethod, set(data: FieldContext): void { if (emitted) { - const keys = Object.keys(data as Record) + const record = data as Record + const pendingState = pendingWideEvent ? pendingDrainState.get(pendingWideEvent) : undefined + if ( + pendingWideEvent + && pendingState + && !pendingState.drainStarted + && isAiOnlyFieldUpdate(record) + ) { + mergeInto(pendingWideEvent as Record, record) + return + } + const keys = Object.keys(record) warnPostEmit('log.set()', `Keys dropped: ${keys.length ? keys.join(', ') : '(empty)'}.`) return } @@ -924,6 +955,7 @@ export function createLogger>(initial if (!forceKeep && !shouldSample(level)) { emitted = true + pendingWideEvent = null return null } @@ -937,6 +969,11 @@ export function createLogger>(initial const wide = emitWideEvent(level, context, { deferDrain, ownsEvent: true, waitUntil }) emitted = true + pendingWideEvent = wide + if (wide) { + // Only enable the AI merge window when middleware defers drain until finish. + pendingDrainState.set(wide, { drainStarted: !deferDrain }) + } return wide }, diff --git a/packages/evlog/src/next/handler.ts b/packages/evlog/src/next/handler.ts index 5fd03c14..33134d16 100644 --- a/packages/evlog/src/next/handler.ts +++ b/packages/evlog/src/next/handler.ts @@ -1,8 +1,9 @@ import type { DrainContext, EnrichContext, TailSamplingContext, WideEvent } from '../types' -import { createRequestLogger, getGlobalDrain, initLogger, isEnabled, isLoggerLocked } from '../logger' +import { createRequestLogger, getGlobalDrain, initLogger, isEnabled, isLoggerLocked, markWideEventDrainStarted } from '../logger' import { attachForkToLogger } from '../shared/fork' import type { MiddlewareLoggerOptions } from '../shared/middleware' import { shouldLog, getServiceForPath } from '../shared/routes' +import { bindStreamingResponseLifecycle, shouldDeferEmitForResponse } from '../shared/streamResponse' import { filterSafeHeaders } from '../utils' import { EvlogError } from '../error' import type { NextEvlogOptions } from './types' @@ -83,6 +84,8 @@ async function callEnrichAndDrain( } } + markWideEventDrainStarted(emittedEvent) + if (drain) { const drainCtx: DrainContext = { event: emittedEvent, @@ -112,6 +115,33 @@ async function callEnrichAndDrain( run().catch(() => {}) } +async function emitRequestEvent( + logger: ReturnType, + requestInfo: { method: string, path: string, requestId: string }, + headers: Record, + status: number, +): Promise { + let forceKeep = false + if (state.options.keep) { + try { + const tailCtx: TailSamplingContext = { + status, + path: requestInfo.path, + method: requestInfo.method, + context: logger.getContext(), + shouldKeep: false, + } + await state.options.keep(tailCtx) + forceKeep = tailCtx.shouldKeep ?? false + } catch (err) { + console.error('[evlog] keep callback failed:', err) + } + } + + const emittedEvent = logger.emit({ _forceKeep: forceKeep }) + await callEnrichAndDrain(emittedEvent, requestInfo, headers, status) +} + /** * Wrap a Next.js route handler or server action with evlog request-scoped logging. * @@ -197,6 +227,19 @@ export function createWithEvlog(options: NextEvlogOptions) { try { const result = await evlogStorage.run(logger, () => handler(...args)) + const requestInfo = { method, path, requestId } + + if (result instanceof Response && shouldDeferEmitForResponse(result)) { + const wrapped = bindStreamingResponseLifecycle(result, async (meta) => { + if (meta.error) { + logger.error(meta.error) + } + const finalStatus = meta.status ?? result.status + logger.set({ status: finalStatus }) + await emitRequestEvent(logger, requestInfo, headers, finalStatus) + }) + return wrapped as Awaited + } // Extract response status let { status } = { status: 200 } @@ -205,26 +248,7 @@ export function createWithEvlog(options: NextEvlogOptions) { } logger.set({ status }) - // Build tail sampling context and call keep callback - let forceKeep = false - if (state.options.keep) { - try { - const tailCtx: TailSamplingContext = { - status, - path, - method, - context: logger.getContext(), - shouldKeep: false, - } - await state.options.keep(tailCtx) - forceKeep = tailCtx.shouldKeep ?? false - } catch (err) { - console.error('[evlog] keep callback failed:', err) - } - } - - const emittedEvent = logger.emit({ _forceKeep: forceKeep }) - await callEnrichAndDrain(emittedEvent, { method, path, requestId }, headers, status) + await emitRequestEvent(logger, requestInfo, headers, status) return result as Awaited } catch (error) { @@ -235,26 +259,7 @@ export function createWithEvlog(options: NextEvlogOptions) { ?? 500 logger.set({ status: errorStatus }) - // Build tail sampling context and call keep callback - let forceKeep = false - if (state.options.keep) { - try { - const tailCtx: TailSamplingContext = { - status: errorStatus, - path, - method, - context: logger.getContext(), - shouldKeep: false, - } - await state.options.keep(tailCtx) - forceKeep = tailCtx.shouldKeep ?? false - } catch (err) { - console.error('[evlog] keep callback failed:', err) - } - } - - const emittedEvent = logger.emit({ _forceKeep: forceKeep }) - await callEnrichAndDrain(emittedEvent, { method, path, requestId }, headers, errorStatus) + await emitRequestEvent(logger, { method, path, requestId }, headers, errorStatus) // Return structured JSON response for EvlogErrors (like H3 does for Nuxt) if (isRequest && error instanceof EvlogError) { diff --git a/packages/evlog/src/nitro-v3/plugin.ts b/packages/evlog/src/nitro-v3/plugin.ts index 11148f35..87346c56 100644 --- a/packages/evlog/src/nitro-v3/plugin.ts +++ b/packages/evlog/src/nitro-v3/plugin.ts @@ -2,7 +2,7 @@ import { definePlugin } from 'nitro' import type { CaptureError } from 'nitro/types' import type { HTTPEvent } from 'nitro/h3' import { parseURL } from 'ufo' -import { createRequestLogger, getGlobalPluginRunner, initLogger, isEnabled } from '../logger' +import { createRequestLogger, getGlobalPluginRunner, initLogger, isEnabled, markWideEventDrainStarted } from '../logger' import { registerPrettyErrorSnippetReader } from '../shared/pretty-error' import { readCodeSnippetFromDisk } from '../shared/pretty-error-snippet.node' import { enrichErrorStackForDev } from '../shared/enrich-error-stack.node' @@ -10,6 +10,7 @@ import { shouldLog, getServiceForPath, extractErrorStatus } from '../nitro' import { extendDeferredDrain } from '../nitro/enrich-drain' import { normalizeRedactConfig } from '../redact' import { resolveEvlogConfigForNitroPlugin, setActiveNitroRuntime } from '../shared/nitroConfigBridge' +import { bindStreamingResponseLifecycle, shouldDeferEmitForResponse } from '../shared/streamResponse' import type { EnrichContext, RequestLogger, TailSamplingContext, WideEvent } from '../types' import { filterSafeHeaders } from '../utils' @@ -146,6 +147,8 @@ async function callEnrichAndDrain( await runner.runEnrich(enrichCtx) } + markWideEventDrainStarted(emittedEvent) + await callDrainHook(hooks, emittedEvent, event, hookContext, options) } @@ -241,29 +244,45 @@ export default definePlugin(async (nitroApp) => { const log = ctx?.log as RequestLogger | undefined if (!log || !ctx) return - const { status } = res - log.set({ status }) + const emitSuccessResponse = async (responseStatus: number) => { + log.set({ status: responseStatus }) - const startTime = ctx._evlogStartTime as number | undefined - const durationMs = startTime ? Date.now() - startTime : undefined + const startTime = ctx._evlogStartTime as number | undefined + const durationMs = startTime ? Date.now() - startTime : undefined - const { pathname } = parseURL(event.req.url) + const { pathname } = parseURL(event.req.url) - const tailCtx: TailSamplingContext = { - status, - duration: durationMs, - path: pathname, - method: event.req.method, - context: log.getContext(), - shouldKeep: false, + const tailCtx: TailSamplingContext = { + status: responseStatus, + duration: durationMs, + path: pathname, + method: event.req.method, + context: log.getContext(), + shouldKeep: false, + } + + await hooks.callHook('evlog:emit:keep', tailCtx) + const runner = getGlobalPluginRunner() + if (runner.hasKeep) await runner.runKeep(tailCtx) + + const emittedEvent = log.emit({ _forceKeep: tailCtx.shouldKeep }) + await callEnrichAndDrain(hooks, emittedEvent, event, res) } - await hooks.callHook('evlog:emit:keep', tailCtx) - const runner = getGlobalPluginRunner() - if (runner.hasKeep) await runner.runKeep(tailCtx) + if (shouldDeferEmitForResponse(res)) { + const wrapped = bindStreamingResponseLifecycle(res, async (meta) => { + if (meta.error) { + log.error(meta.error) + } + await emitSuccessResponse(meta.status ?? res.status) + }) + if (wrapped !== res && 'res' in event) { + (event as { res: Response }).res = wrapped + } + return + } - const emittedEvent = log.emit({ _forceKeep: tailCtx.shouldKeep }) - await callEnrichAndDrain(hooks, emittedEvent, event, res) + await emitSuccessResponse(res.status) }) hooks.hook('error', async (error, { event }) => { diff --git a/packages/evlog/src/nitro/plugin.ts b/packages/evlog/src/nitro/plugin.ts index 61fd6cc7..3b115294 100644 --- a/packages/evlog/src/nitro/plugin.ts +++ b/packages/evlog/src/nitro/plugin.ts @@ -4,13 +4,14 @@ // (nitropack dev loads plugins outside the bundle via Worker threads). import { defineNitroPlugin } from 'nitropack/runtime/internal/plugin' import { getHeaders } from 'h3' -import { createRequestLogger, getGlobalPluginRunner, initLogger, isEnabled } from '../logger' +import { createRequestLogger, getGlobalPluginRunner, initLogger, isEnabled, markWideEventDrainStarted } from '../logger' import { registerPrettyErrorSnippetReader } from '../shared/pretty-error' import { readCodeSnippetFromDisk } from '../shared/pretty-error-snippet.node' import { enrichErrorStackForDev } from '../shared/enrich-error-stack.node' import { shouldLog, getServiceForPath, extractErrorStatus } from '../nitro' import { normalizeRedactConfig } from '../redact' import { resolveEvlogConfigForNitroPlugin, setActiveNitroRuntime } from '../shared/nitroConfigBridge' +import { bindStreamingResponseLifecycle, shouldDeferEmitForResponse } from '../shared/streamResponse' import { startStreamServer, type StreamServerOptions } from '../stream' import type { RequestLogger, ServerEvent, TailSamplingContext } from '../types' import { filterSafeHeaders } from '../utils' @@ -40,6 +41,7 @@ function getResponseStatus(event: ServerEvent): number { return 200 } + export default defineNitroPlugin(async (nitroApp) => { setActiveNitroRuntime('v2') const evlogConfig = await resolveEvlogConfigForNitroPlugin() @@ -176,7 +178,9 @@ export default defineNitroPlugin(async (nitroApp) => { if (e.context._evlogEmitted || e.context._evlogEmitting || !e.context._evlogShouldEmit) return const requestLog = e.context.log as RequestLogger | undefined - if (requestLog) { + if (!requestLog) return + + const emitSuccessResponse = async () => { const status = getResponseStatus(e) requestLog.set({ status }) @@ -199,5 +203,17 @@ export default defineNitroPlugin(async (nitroApp) => { const emittedEvent = requestLog.emit({ _forceKeep: tailCtx.shouldKeep }) await callEnrichAndDrain(nitroApp, emittedEvent, e) } + + if (e.response && shouldDeferEmitForResponse(e.response)) { + e.response = bindStreamingResponseLifecycle(e.response, async (meta) => { + if (meta.error) { + requestLog.error(meta.error) + } + await emitSuccessResponse() + }) + return + } + + await emitSuccessResponse() }) }) diff --git a/packages/evlog/src/orpc/index.ts b/packages/evlog/src/orpc/index.ts index 42722b75..dac2d197 100644 --- a/packages/evlog/src/orpc/index.ts +++ b/packages/evlog/src/orpc/index.ts @@ -111,7 +111,7 @@ export function withEvlog( options: EvlogOrpcOptions = {}, ): THandler { const handle: THandler['handle'] = async (request, callOptions) => { - const { skipped, finish, runWith, logger } = integration.start({ request }, options) + const { skipped, finish, finishResponse, runWith, logger } = integration.start({ request }, options) const initialContext = (callOptions as { context?: Record } | undefined)?.context ?? {} const finalOptions = { @@ -125,8 +125,11 @@ export function withEvlog( try { const result = await runWith(() => handler.handle(request, finalOptions)) - const status = result.matched ? result.response.status : 404 - await finish({ status }) + if (result.matched) { + result.response = await finishResponse(result.response, { status: result.response.status }) + } else { + await finish({ status: 404 }) + } return result } catch (error) { await finish({ error: error as Error }) diff --git a/packages/evlog/src/react-router/index.ts b/packages/evlog/src/react-router/index.ts index bbb331e3..58471259 100644 --- a/packages/evlog/src/react-router/index.ts +++ b/packages/evlog/src/react-router/index.ts @@ -55,7 +55,7 @@ export function evlog(options: EvlogReactRouterOptions = {}) { headers: extractSafeHeaders(request.headers), ...options, } - const { logger, finish, skipped } = createMiddlewareLogger(middlewareOpts) + const { logger, finish, finishResponse, skipped } = createMiddlewareLogger(middlewareOpts) if (skipped) { return next() @@ -66,8 +66,7 @@ export function evlog(options: EvlogReactRouterOptions = {}) { try { const response = await storage.run(logger, () => next()) - await finish({ status: response.status }) - return response + return finishResponse(response, { status: response.status }) } catch (error) { await finish({ error: error as Error }) throw error diff --git a/packages/evlog/src/shared/index.ts b/packages/evlog/src/shared/index.ts index db3252ca..68ea8343 100644 --- a/packages/evlog/src/shared/index.ts +++ b/packages/evlog/src/shared/index.ts @@ -5,6 +5,7 @@ * @see https://evlog.dev/extend/custom-framework */ +export * from './streamResponse' export * from './compose' export * from './config' export * from './define' diff --git a/packages/evlog/src/shared/middleware.ts b/packages/evlog/src/shared/middleware.ts index cddd765d..183fdec4 100644 --- a/packages/evlog/src/shared/middleware.ts +++ b/packages/evlog/src/shared/middleware.ts @@ -1,10 +1,11 @@ import type { DrainContext, EnrichContext, RedactConfig, RequestLogger, RouteConfig, TailSamplingContext, WideEvent } from '../types' -import { createRequestLogger, getGlobalDrain, getGlobalPluginRunner, isEnabled, shouldKeep } from '../logger' +import { createRequestLogger, getGlobalDrain, getGlobalPluginRunner, isEnabled, markWideEventDrainStarted, shouldKeep } from '../logger' import { isGloballyRedacted, redactEvent, resolveRedactConfig } from '../redact' import { extractErrorStatus } from './errors' import type { EvlogPlugin, PluginRunner } from './plugin' import { createPluginRunner, getEmptyPluginRunner } from './plugin' import { shouldLog, getServiceForPath } from './routes' +import { bindStreamingResponseLifecycle, shouldDeferEmitForResponse } from './streamResponse' /** * Base options shared by every framework integration. Re-exported via @@ -44,6 +45,11 @@ export interface MiddlewareLoggerOptions extends BaseEvlogOptions { export interface MiddlewareLoggerResult { logger: RequestLogger finish: (opts?: { status?: number; error?: Error }) => Promise + /** + * Finish request logging, deferring emit until a streaming response body completes. + * Returns the original response or a wrapped copy when the body is a stream. + */ + finishResponse: (response: Response, opts?: { status?: number }) => Promise skipped: boolean } @@ -62,6 +68,7 @@ const noopResult: MiddlewareLoggerResult = { }, }, finish: () => Promise.resolve(null), + finishResponse: (response) => Promise.resolve(response), skipped: true, } @@ -139,6 +146,8 @@ export async function runEnrichAndDrain( } } + markWideEventDrainStarted(emittedEvent) + const drain = options.drain ?? getGlobalDrain() const hasUserDrain = !!drain const hasPluginDrain = runner.hasDrain @@ -213,29 +222,51 @@ export function createMiddlewareLogger(options: MiddlewareLoggerOptions): Middle }) } - const finish = async (opts?: { status?: number; error?: Error }): Promise => { + const finish = (opts?: { status?: number; error?: Error }): Promise => { + return performFinish(logger, opts) + } + + const finishResponse = async (response: Response, opts?: { status?: number }): Promise => { + const status = opts?.status ?? response.status + if (!shouldDeferEmitForResponse(response)) { + await performFinish(logger, { status }) + return response + } + + return bindStreamingResponseLifecycle(response, async (meta) => { + await performFinish(logger, { + status: meta.status ?? status, + error: meta.error, + }) + }) + } + + async function performFinish( + requestLogger: RequestLogger, + opts?: { status?: number; error?: Error }, + ): Promise { const { status, error } = opts ?? {} if (error) { - logger.error(error) + requestLogger.error(error) const errorStatus = extractErrorStatus(error) - logger.set({ status: errorStatus }) + requestLogger.set({ status: errorStatus }) } else if (status !== undefined) { - logger.set({ status }) + requestLogger.set({ status }) } const durationMs = Date.now() - startTime const resolvedStatus = error ? extractErrorStatus(error) - : status ?? (logger.getContext().status as number | undefined) + : status ?? (requestLogger.getContext().status as number | undefined) const tailCtx: TailSamplingContext = { status: resolvedStatus, duration: durationMs, path, method, - context: logger.getContext(), + context: requestLogger.getContext(), shouldKeep: false, } @@ -247,7 +278,7 @@ export function createMiddlewareLogger(options: MiddlewareLoggerOptions): Middle } const forceKeep = tailCtx.shouldKeep || shouldKeep(tailCtx) - const emittedEvent = logger.emit({ _forceKeep: forceKeep }) + const emittedEvent = requestLogger.emit({ _forceKeep: forceKeep }) if ( emittedEvent @@ -258,7 +289,7 @@ export function createMiddlewareLogger(options: MiddlewareLoggerOptions): Middle if (pluginRunner.hasRequestLifecycle) { pluginRunner.runOnRequestFinish({ - logger, + logger: requestLogger, request: requestInfo, headers: options.headers, event: emittedEvent, @@ -271,5 +302,5 @@ export function createMiddlewareLogger(options: MiddlewareLoggerOptions): Middle return emittedEvent } - return { logger, finish, skipped: false } + return { logger, finish, finishResponse, skipped: false } } diff --git a/packages/evlog/src/shared/streamResponse.ts b/packages/evlog/src/shared/streamResponse.ts new file mode 100644 index 00000000..900fbc17 --- /dev/null +++ b/packages/evlog/src/shared/streamResponse.ts @@ -0,0 +1,121 @@ +/** + * Metadata passed to streaming response completion callbacks. + * Reports the HTTP status and any error that occurred while reading the body. + */ +export interface StreamCompleteMeta { + /** Final HTTP status code for the response. */ + status: number + /** Present when the stream body failed before or during completion. */ + error?: Error +} + +/** + * Whether a {@link Response} carries a stream body that may outlive handler return. + */ +export function isStreamingResponse(response: Response): boolean { + return response.body !== null +} + +/** + * Whether framework integrations should defer wide-event emit until the response + * body finishes. Static string/JSON bodies are excluded even though they use a + * {@link ReadableStream} under the hood in the Fetch API. + */ +export function shouldDeferEmitForResponse(response: Response): boolean { + if (!response.body) return false + + const contentType = (response.headers.get('content-type') ?? '').toLowerCase() + + if (contentType.includes('text/event-stream')) return true + if (contentType.includes('application/x-ndjson')) return true + if (response.headers.has('x-vercel-ai-ui-message-stream')) return true + if (response.headers.get('transfer-encoding')?.toLowerCase().includes('chunked')) return true + + return false +} + +function toError(value: unknown): Error { + return value instanceof Error ? value : new Error(String(value)) +} + +function createObservedBody( + body: ReadableStream, + onDone: () => void | Promise, + onError: (error: unknown) => void | Promise, +): ReadableStream | null { + if (body.locked) { + void Promise.resolve(onError(new TypeError('stream is already locked'))).catch((err: unknown) => { + console.error('[evlog] stream error handling failed:', err) + }) + return null + } + + const reader = body.getReader() + let settled = false + + const settle = (fn: () => void | Promise) => { + if (settled) return + settled = true + void Promise.resolve(fn()).catch((err: unknown) => { + console.error('[evlog] stream completion handling failed:', err) + }) + } + + return new ReadableStream({ + async pull(controller) { + try { + const { done, value } = await reader.read() + if (done) { + settle(onDone) + controller.close() + return + } + controller.enqueue(value) + } catch (err) { + settle(() => onError(err)) + controller.error(err) + } + }, + + async cancel(reason) { + try { + await reader.cancel(reason) + } finally { + settle(onDone) + } + }, + }) +} + +/** + * Observe a streaming {@link Response} body and invoke `onComplete` once when + * the body closes, errors, or is cancelled. Preserves status, headers, and chunks. + * + * Non-streaming responses invoke `onComplete` immediately. + * + * @internal Used by framework integrations to defer wide-event emit until streams finish. + */ +export function bindStreamingResponseLifecycle( + response: Response, + onComplete: (meta: StreamCompleteMeta) => void | Promise, +): Response { + if (!response.body) { + void Promise.resolve(onComplete({ status: response.status })).catch((err: unknown) => { + console.error('[evlog] stream completion handling failed:', err) + }) + return response + } + + const body = createObservedBody( + response.body, + () => onComplete({ status: response.status }), + (err) => onComplete({ status: response.status, error: toError(err) }), + ) + if (!body) return response + + return new Response(body, { + status: response.status, + statusText: response.statusText, + headers: response.headers, + }) +} diff --git a/packages/evlog/src/sveltekit/index.ts b/packages/evlog/src/sveltekit/index.ts index db09d2b4..505a9624 100644 --- a/packages/evlog/src/sveltekit/index.ts +++ b/packages/evlog/src/sveltekit/index.ts @@ -136,7 +136,7 @@ export function evlog(options: EvlogSvelteKitOptions = {}): SvelteKitHandle { headers: extractSafeHeaders(event.request.headers), ...options, } - const { logger, finish, skipped } = createMiddlewareLogger(middlewareOpts) + const { logger, finish, finishResponse, skipped } = createMiddlewareLogger(middlewareOpts) if (skipped) { return await resolve(event) @@ -164,8 +164,7 @@ export function evlog(options: EvlogSvelteKitOptions = {}): SvelteKitHandle { }) } - await finish({ status: response.status }) - return response + return finishResponse(response) } catch (error) { await finish({ error: error instanceof Error ? error : new Error(String(error)) }) diff --git a/packages/evlog/test/core/logger-request-logger.test.ts b/packages/evlog/test/core/logger-request-logger.test.ts index 7db9ebb7..862a4a32 100644 --- a/packages/evlog/test/core/logger-request-logger.test.ts +++ b/packages/evlog/test/core/logger-request-logger.test.ts @@ -516,6 +516,31 @@ describe('createRequestLogger', () => { warnSpy.mockRestore() }) + it('merges ai fields onto the emitted wide event before drain starts', () => { + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) + const logger = createRequestLogger( + { method: 'POST', path: '/api/chat', requestId: 'r1' }, + { _deferDrain: true }, + ) + const emitted = logger.emit({ status: 200 }) + logger.set({ ai: { calls: 1, totalTokens: 42 } }) + expect(warnSpy).not.toHaveBeenCalled() + expect(emitted?.ai).toEqual({ calls: 1, totalTokens: 42 }) + logger.set({ action: 'chat' }) + expect(warnSpy).toHaveBeenCalled() + warnSpy.mockRestore() + }) + + it('does not merge ai fields after immediate drain when deferDrain is false', () => { + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) + const logger = createRequestLogger({ method: 'POST', path: '/api/chat', requestId: 'r1' }) + logger.emit({ status: 200 }) + logger.set({ ai: { calls: 1, totalTokens: 42 } }) + expect(warnSpy).toHaveBeenCalled() + expect(String(warnSpy.mock.calls[0]?.[0])).toContain('Keys dropped: ai') + warnSpy.mockRestore() + }) + it('seals logger when emit returns null due to sampling', () => { const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) initLogger({ diff --git a/packages/evlog/test/frameworks/sveltekit.test.ts b/packages/evlog/test/frameworks/sveltekit.test.ts index 8b78878d..13e3b0a1 100644 --- a/packages/evlog/test/frameworks/sveltekit.test.ts +++ b/packages/evlog/test/frameworks/sveltekit.test.ts @@ -589,4 +589,45 @@ describe('evlog/sveltekit', () => { expect(result.why).toBe('Card declined') }) }) + + it('defers emit for streaming responses until the body completes (#321)', async () => { + const { drain } = createPipelineSpies() + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) + const handle = evlog({ drain }) + const event = createMockEvent('POST', '/api/chat') + + let closeStream!: () => void + const encoder = new TextEncoder() + const resolve = vi.fn(() => { + const log = useLogger() + let close: (() => void) | undefined + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode('hello')) + close = () => { + controller.enqueue(encoder.encode(' world')) + controller.close() + } + }, + }) + closeStream = defined(close, 'close stream') + queueMicrotask(() => { + log.set({ ai: { calls: 1, totalTokens: 42 } }) + }) + return new Response(stream, { + status: 200, + headers: { 'content-type': 'text/event-stream' }, + }) + }) + + const response = await handle({ event, resolve }) + expect(drain).not.toHaveBeenCalled() + + closeStream() + await expect(response.text()).resolves.toBe('hello world') + await waitForDrainCalls(drain) + + expect(warnSpy.mock.calls.some(([message]) => String(message).includes('Keys dropped: ai'))).toBe(false) + expect(findEventViaDrain(drain, e => e.path === '/api/chat')?.ai).toEqual({ calls: 1, totalTokens: 42 }) + }) }) diff --git a/packages/evlog/test/helpers/stream.ts b/packages/evlog/test/helpers/stream.ts new file mode 100644 index 00000000..91708ab0 --- /dev/null +++ b/packages/evlog/test/helpers/stream.ts @@ -0,0 +1,22 @@ +const encoder = new TextEncoder() + +/** ReadableStream that stays open until {@link createDeferredStream.close} is called. */ +export function createDeferredStream() { + let close: (() => void) | undefined + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode('hello')) + close = () => { + controller.enqueue(encoder.encode(' world')) + controller.close() + } + }, + }) + return { + stream, + close: () => { + if (!close) throw new Error('close not initialized') + close() + }, + } +} diff --git a/packages/evlog/test/next/handler.test.ts b/packages/evlog/test/next/handler.test.ts index fdc7f388..62d78e8b 100644 --- a/packages/evlog/test/next/handler.test.ts +++ b/packages/evlog/test/next/handler.test.ts @@ -2,6 +2,8 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { createWithEvlog } from '../../src/next/handler' import { evlogStorage } from '../../src/next/storage' import { initLogger } from '../../src/logger' +import { defined } from '../helpers/defined' +import { createDeferredStream } from '../helpers/stream' // Mock next/server to prevent import errors vi.mock('next/server', () => ({ @@ -366,4 +368,36 @@ describe('withEvlog', () => { expect(drainCtx.headers['x-custom']).toBe('safe-value') expect(drainCtx.headers['content-type']).toBe('application/json') }) + + it('defers emit for streaming responses until the body completes (#321)', async () => { + const drainMock = vi.fn() + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) + const withEvlog = createWithEvlog({ pretty: false, drain: drainMock }) + + let closeStream!: () => void + const handler = withEvlog((_request: Request) => { + const log = defined(evlogStorage.getStore(), 'request logger') + const { stream, close } = createDeferredStream() + closeStream = close + queueMicrotask(() => { + log.set({ ai: { calls: 1, totalTokens: 42 } }) + }) + return new Response(stream, { + status: 200, + headers: { 'content-type': 'text/event-stream' }, + }) + }) + + const response = await handler(new Request('http://localhost/api/chat', { method: 'POST' })) + expect(drainMock).not.toHaveBeenCalled() + + closeStream() + await expect(response.text()).resolves.toBe('hello world') + await vi.waitFor(() => { + expect(drainMock).toHaveBeenCalledTimes(1) + }) + + expect(warnSpy.mock.calls.some(([message]) => String(message).includes('Keys dropped: ai'))).toBe(false) + expect(drainMock.mock.calls[0]?.[0]?.event?.ai).toEqual({ calls: 1, totalTokens: 42 }) + }) }) diff --git a/packages/evlog/test/nitro/plugin.test.ts b/packages/evlog/test/nitro/plugin.test.ts index 28eb07e0..bf016c1f 100644 --- a/packages/evlog/test/nitro/plugin.test.ts +++ b/packages/evlog/test/nitro/plugin.test.ts @@ -2,6 +2,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { getHeaders } from 'h3' import type { DrainContext, RouteConfig, ServerEvent, WideEvent } from '../../src/types' import { defined } from '../helpers/defined' +import { createDeferredStream } from '../helpers/stream' import { filterSafeHeaders } from '../../src/utils' import { getServiceForPath, shouldLog } from '../../src/shared/routes' import { createRequestLogger, initLogger } from '../../src/logger' @@ -979,3 +980,50 @@ describe('nitro plugin - middleware compatibility (#210)', () => { expect(apiEvent.context._evlogShouldEmit).toBe(true) }) }) + +describe('nitro plugin - streaming emit defer (#321)', () => { + beforeEach(() => { + initLogger({ env: { service: 'test-app' }, pretty: false, silent: true, _suppressDrainWarning: true }) + }) + + afterEach(() => { + vi.restoreAllMocks() + }) + + it('defers afterResponse emit until a streaming body completes', async () => { + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) + const event: ServerEvent = { method: 'POST', path: '/api/chat', context: {} } + event.context._evlogShouldEmit = true + event.context._evlogStartTime = Date.now() + event.context.log = createRequestLogger( + { method: 'POST', path: '/api/chat', requestId: 'req-1' }, + { _deferDrain: true }, + ) + + const { stream, close } = createDeferredStream() + event.response = new Response(stream, { + status: 200, + headers: { 'content-type': 'text/event-stream' }, + }) + + queueMicrotask(() => { + defined(event.context.log, 'request logger').set({ ai: { calls: 1, totalTokens: 42 } }) + }) + + const { bindStreamingResponseLifecycle } = await import('../../src/shared/streamResponse') + let emitCount = 0 + event.response = bindStreamingResponseLifecycle(event.response, () => { + emitCount++ + defined(event.context.log, 'request logger').emit({ status: 200 }) + }) + + expect(emitCount).toBe(0) + close() + await expect(event.response.text()).resolves.toBe('hello world') + await vi.waitFor(() => { + expect(emitCount).toBe(1) + }) + expect(warnSpy.mock.calls.some(([message]) => String(message).includes('Keys dropped: ai'))).toBe(false) + expect(defined(event.context.log, 'request logger').getContext().ai).toEqual({ calls: 1, totalTokens: 42 }) + }) +}) diff --git a/packages/evlog/test/shared/streamResponse.test.ts b/packages/evlog/test/shared/streamResponse.test.ts new file mode 100644 index 00000000..2f0584ab --- /dev/null +++ b/packages/evlog/test/shared/streamResponse.test.ts @@ -0,0 +1,114 @@ +import { describe, expect, it, vi } from 'vitest' +import { bindStreamingResponseLifecycle, isStreamingResponse, shouldDeferEmitForResponse } from '../../src/shared/streamResponse' +import { defined } from '../helpers/defined' +import { createDeferredStream } from '../helpers/stream' + +const encoder = new TextEncoder() + +function createImmediateStream(chunks: string[]) { + return new ReadableStream({ + start(controller) { + for (const chunk of chunks) { + controller.enqueue(encoder.encode(chunk)) + } + controller.close() + }, + }) +} + +describe('streamResponse', () => { + it('isStreamingResponse detects readable bodies', () => { + expect(isStreamingResponse(new Response(createImmediateStream(['a'])))).toBe(true) + expect(isStreamingResponse(new Response(null, { status: 204 }))).toBe(false) + }) + + it('shouldDeferEmitForResponse excludes static string bodies', () => { + expect(shouldDeferEmitForResponse(new Response('ok'))).toBe(false) + expect(shouldDeferEmitForResponse(new Response(createImmediateStream(['a']), { + headers: { 'content-type': 'text/event-stream' }, + }))).toBe(true) + expect(shouldDeferEmitForResponse(new Response(createImmediateStream(['a']), { + headers: { 'content-type': 'text/plain; charset=utf-8', 'x-vercel-ai-ui-message-stream': 'v1' }, + }))).toBe(true) + }) + + it('invokes onComplete when the body finishes', async () => { + const onComplete = vi.fn() + const source = createDeferredStream() + const response = bindStreamingResponseLifecycle( + new Response(source.stream, { status: 202 }), + onComplete, + ) + + const read = response.text() + source.close() + await expect(read).resolves.toBe('hello world') + await vi.waitFor(() => { + expect(onComplete).toHaveBeenCalledWith({ status: 202 }) + }) + }) + + it('preserves status, headers, and chunks', async () => { + const onComplete = vi.fn() + const response = bindStreamingResponseLifecycle( + new Response(createImmediateStream(['a', 'b']), { + status: 207, + statusText: 'Multi-Status', + headers: { 'x-stream': 'yes' }, + }), + onComplete, + ) + + expect(response.status).toBe(207) + expect(response.statusText).toBe('Multi-Status') + expect(response.headers.get('x-stream')).toBe('yes') + await expect(response.text()).resolves.toBe('ab') + expect(onComplete).toHaveBeenCalledWith({ status: 207 }) + }) + + it('records stream errors on complete', async () => { + const onComplete = vi.fn() + const source = new ReadableStream({ + pull() { + throw new Error('stream exploded') + }, + }) + const response = bindStreamingResponseLifecycle(new Response(source, { status: 500 }), onComplete) + + await expect(response.text()).rejects.toThrow('stream exploded') + await vi.waitFor(() => { + expect(onComplete).toHaveBeenCalledWith(expect.objectContaining({ + status: 500, + error: expect.objectContaining({ message: 'stream exploded' }), + })) + }) + }) + + it('returns the original response when the body is already locked', async () => { + const onComplete = vi.fn() + const original = new Response(createImmediateStream(['locked']), { status: 200 }) + const body = defined(original.body, 'response body') + const reader = body.getReader() + + try { + const response = bindStreamingResponseLifecycle(original, onComplete) + expect(response).toBe(original) + await vi.waitFor(() => { + expect(onComplete).toHaveBeenCalledWith(expect.objectContaining({ + error: expect.objectContaining({ message: 'stream is already locked' }), + })) + }) + } finally { + reader.releaseLock() + } + }) + + it('calls onComplete immediately for responses without a body', async () => { + const onComplete = vi.fn() + const response = bindStreamingResponseLifecycle(new Response(null, { status: 204 }), onComplete) + expect(response.status).toBe(204) + await vi.waitFor(() => { + expect(onComplete).toHaveBeenCalledWith({ status: 204 }) + }) + }) +}) diff --git a/packages/evlog/test/toolkit/__snapshots__/api-surface.test.ts.snap b/packages/evlog/test/toolkit/__snapshots__/api-surface.test.ts.snap index 9209ce33..4ea18414 100644 --- a/packages/evlog/test/toolkit/__snapshots__/api-surface.test.ts.snap +++ b/packages/evlog/test/toolkit/__snapshots__/api-surface.test.ts.snap @@ -220,6 +220,7 @@ exports[`public API surface > matches snapshot for all subpath exports 1`] = ` "OTEL_SEVERITY_NUMBER", "OTEL_SEVERITY_TEXT", "attachForkToLogger", + "bindStreamingResponseLifecycle", "composeDrains", "composeEnrichers", "composeKeep", @@ -244,11 +245,13 @@ exports[`public API surface > matches snapshot for all subpath exports 1`] = ` "getRuntimeConfig", "getServiceForPath", "httpPost", + "isStreamingResponse", "mergeEventField", "normalizeNumber", "resolveAdapterConfig", "resolveMiddlewarePluginRunner", "runEnrichAndDrain", + "shouldDeferEmitForResponse", "shouldLog", "toLoggerConfig", "toMiddlewareOptions",