diff --git a/.changeset/nitro-ai-streams.md b/.changeset/nitro-ai-streams.md new file mode 100644 index 00000000..52418197 --- /dev/null +++ b/.changeset/nitro-ai-streams.md @@ -0,0 +1,5 @@ +--- +'evlog': minor +--- + +Add `createNitroAIStreamLogger()` from `evlog/ai/nitro` for Nuxt/Nitro AI SDK streaming responses. The helper records stream metadata on a correlated child event and sends it through the normal Nitro enrich/drain hooks, avoiding post-emit warnings when the parent request event has already completed. diff --git a/apps/docs/content/2.learn/2.wide-events.md b/apps/docs/content/2.learn/2.wide-events.md index f96e0a2e..e5d4e163 100644 --- a/apps/docs/content/2.learn/2.wide-events.md +++ b/apps/docs/content/2.learn/2.wide-events.md @@ -186,7 +186,7 @@ For intentional background work that should produce **its own** wide event, use The parent wide event may be emitted **before** the child event; they are two separate events ordered by time. -**Not available yet:** Hono (no `useLogger` without `c.get('log')` + ALS) and Nitro/Nuxt `useLogger(event)` — use the post-emit warnings to catch mistakes; a different API may arrive later for event-scoped forks. +**Not available yet:** Hono (no `useLogger` without `c.get('log')` + ALS) and Nitro/Nuxt `useLogger(event)`. For Nuxt/Nitro AI streams, use `createNitroAIStreamLogger()` from `evlog/ai/nitro` to emit a correlated child event after the response body finishes. ```typescript [server/routes/checkout.post.ts] import { evlog, useLogger } from 'evlog/express' diff --git a/apps/docs/content/3.integrate/frameworks/04.nitro.md b/apps/docs/content/3.integrate/frameworks/04.nitro.md index ffc74497..df34de25 100644 --- a/apps/docs/content/3.integrate/frameworks/04.nitro.md +++ b/apps/docs/content/3.integrate/frameworks/04.nitro.md @@ -125,7 +125,7 @@ One request, one log line with all context: └─ requestId: a1b2c3d4-... ``` -Nitro uses **`useLogger(event)`** (event-bound scope), not `AsyncLocalStorage`, so **`log.fork()` is not available** here yet. Post-emit warnings still apply if code calls `set()` after the wide event was emitted. See [Wide events — After emit](/learn/wide-events#after-emit-sealing-and-background-work). +Nitro uses **`useLogger(event)`** (event-bound scope), not `AsyncLocalStorage`, so **`log.fork()` is not available** here yet. For AI SDK streaming responses, use `createNitroAIStreamLogger(event)` from `evlog/ai/nitro`; it emits a correlated child event after the response body finishes and routes it through the same Nitro enrich/drain hooks. Post-emit warnings still apply if code calls `set()` after the wide event was emitted. See [Wide events — After emit](/learn/wide-events#after-emit-sealing-and-background-work). ## Error Handling diff --git a/apps/docs/content/4.use-cases/2.ai-sdk/01.overview.md b/apps/docs/content/4.use-cases/2.ai-sdk/01.overview.md index de410b0b..60a44d36 100644 --- a/apps/docs/content/4.use-cases/2.ai-sdk/01.overview.md +++ b/apps/docs/content/4.use-cases/2.ai-sdk/01.overview.md @@ -42,10 +42,10 @@ actions: Add AI observability to my app with evlog. - Install the AI SDK: pnpm add ai -- Import createAILogger from 'evlog/ai' -- Create an AI logger with createAILogger(log) where log is your request logger +- Import createAILogger from 'evlog/ai' for awaited calls, or createNitroAIStreamLogger from 'evlog/ai/nitro' for Nuxt streaming responses +- Create an AI logger with createAILogger(log) or use the ai returned by createNitroAIStreamLogger(event) - Wrap your model with ai.wrap('anthropic/claude-sonnet-4.6') and pass it to generateText, streamText, etc. -- Token usage, tool calls, streaming metrics, and errors are captured automatically into the wide event +- Token usage, tool calls, streaming metrics, and errors are captured automatically into the request event or a correlated child stream event - For deeper observability (tool execution timing, total generation wall time), add createEvlogIntegration(ai) to experimental_telemetry.integrations - For embedding calls, use ai.captureEmbed({ usage, model, dimensions, count }) after embed() or embedMany() - For cost estimation, pass a cost map: createAILogger(log, { cost: { 'claude-sonnet-4.6': { input: 3, output: 15 } } }) @@ -94,23 +94,29 @@ export default defineEventHandler(async (event) => { ``` ```typescript [After] -import { useLogger } from 'evlog' -import { createAILogger } from 'evlog/ai' +import { consumeStream } from 'ai' +import { createEvlogIntegration } from 'evlog/ai' +import { createNitroAIStreamLogger } from 'evlog/ai/nitro' export default defineEventHandler(async (event) => { - const log = useLogger(event) - const ai = createAILogger(log) + const { ai, wrapResponse } = createNitroAIStreamLogger(event) const result = streamText({ model: ai.wrap('anthropic/claude-sonnet-4.6'), messages, + experimental_telemetry: { + isEnabled: true, + integrations: [createEvlogIntegration(ai)], + }, }) - return result.toTextStreamResponse() + return wrapResponse(result.toUIMessageStreamResponse({ + consumeSseStream: consumeStream, + })) }) ``` :: -Your wide event now includes: +Your request wide event gets a correlated child event with: ```json [Wide Event] { @@ -118,6 +124,8 @@ Your wide event now includes: "path": "/api/chat", "status": 200, "duration": "4.5s", + "operation": "ai-stream", + "_parentRequestId": "req_...", "ai": { "calls": 1, "model": "claude-sonnet-4.6", @@ -148,6 +156,8 @@ Your wide event now includes: The middleware intercepts calls at the provider level. It does not touch your callbacks, prompts, or responses. Captured data flows through the normal evlog pipeline (sampling, enrichers, drains) and lands in Axiom, Better Stack, or wherever you drain to. +For Nuxt/Nitro streaming responses, `createNitroAIStreamLogger(event)` creates the same `AILogger` on a child stream event so late stream metadata is not written to an already-emitted request event. + ## Where to next ::card-group @@ -194,11 +204,9 @@ The middleware intercepts calls at the provider level. It does not touch your ca ::code-group ```typescript [Nuxt] -import { useLogger } from 'evlog' -import { createAILogger } from 'evlog/ai' +import { createNitroAIStreamLogger } from 'evlog/ai/nitro' -const log = useLogger(event) -const ai = createAILogger(log) +const { ai } = createNitroAIStreamLogger(event) ``` ```typescript [Next.js] diff --git a/apps/docs/content/4.use-cases/2.ai-sdk/02.usage.md b/apps/docs/content/4.use-cases/2.ai-sdk/02.usage.md index ef9d4d23..b8ab85a5 100644 --- a/apps/docs/content/4.use-cases/2.ai-sdk/02.usage.md +++ b/apps/docs/content/4.use-cases/2.ai-sdk/02.usage.md @@ -17,19 +17,19 @@ links: variant: subtle --- -Every pattern below uses the same `createAILogger(log)` setup. Wrap the model with `ai.wrap()` and the middleware accumulates tokens, tools, and timing on the wide event automatically. +Awaited calls use `createAILogger(log)` directly. Nuxt/Nitro streaming responses use `createNitroAIStreamLogger(event)` so AI metadata lands on a correlated child event after the response body finishes. ## streamText The most common pattern — streaming chat with full observability: ```typescript [server/api/chat.post.ts] -import { streamText } from 'ai' -import { createAILogger } from 'evlog/ai' +import { consumeStream, streamText } from 'ai' +import { createEvlogIntegration } from 'evlog/ai' +import { createNitroAIStreamLogger } from 'evlog/ai/nitro' export default defineEventHandler(async (event) => { - const log = useLogger(event) - const ai = createAILogger(log) + const { ai, log, wrapResponse } = createNitroAIStreamLogger(event) const { messages } = await readBody(event) log.set({ action: 'chat', messagesCount: messages.length }) @@ -37,16 +37,22 @@ export default defineEventHandler(async (event) => { const result = streamText({ model: ai.wrap('anthropic/claude-sonnet-4.6'), messages, + experimental_telemetry: { + isEnabled: true, + integrations: [createEvlogIntegration(ai)], + }, onFinish: ({ text }) => { saveConversation(text) }, }) - return result.toTextStreamResponse() + return wrapResponse(result.toUIMessageStreamResponse({ + consumeSseStream: consumeStream, + })) }) ``` -The middleware never touches your `onFinish` callback — your code runs as usual. +The middleware never touches your `onFinish` callback — your code runs as usual. The child event uses `operation: 'ai-stream'` and `_parentRequestId` to link back to the request event. ## generateText @@ -75,14 +81,15 @@ The middleware fires for each step automatically. Steps, tool calls, and tokens ```typescript [server/api/agent.post.ts] import { ToolLoopAgent, createAgentUIStreamResponse, stepCountIs } from 'ai' -import { useLogger } from 'evlog' -import { createAILogger } from 'evlog/ai' +import { createNitroAIStreamLogger } from 'evlog/ai/nitro' export default defineEventHandler(async (event) => { - const log = useLogger(event) const { messages } = await readBody(event) - const ai = createAILogger(log, { - toolInputs: { maxLength: 500 }, + const { ai, wrapResponse } = createNitroAIStreamLogger(event, { + fields: { messagesCount: messages.length }, + ai: { + toolInputs: { maxLength: 500 }, + }, }) const agent = new ToolLoopAgent({ @@ -91,13 +98,15 @@ export default defineEventHandler(async (event) => { stopWhen: stepCountIs(5), }) - return createAgentUIStreamResponse({ + return wrapResponse(createAgentUIStreamResponse({ agent, uiMessages: messages, - }) + })) }) ``` +For non-streaming agent calls, bind `createAILogger` to `useLogger(event)` directly. + Wide event after a 3-step agent run: ```json [Wide Event] diff --git a/apps/docs/content/4.use-cases/2.ai-sdk/04.metadata.md b/apps/docs/content/4.use-cases/2.ai-sdk/04.metadata.md index 10e781f5..04fec20b 100644 --- a/apps/docs/content/4.use-cases/2.ai-sdk/04.metadata.md +++ b/apps/docs/content/4.use-cases/2.ai-sdk/04.metadata.md @@ -87,13 +87,11 @@ Each invocation receives a fresh snapshot. Returns an unsubscribe function. Subs ```typescript [server/api/agent.post.ts] import { ToolLoopAgent, createAgentUIStreamResponse, stepCountIs } from 'ai' -import { useLogger } from 'evlog' -import { createAILogger } from 'evlog/ai' +import { createNitroAIStreamLogger } from 'evlog/ai/nitro' export default defineEventHandler(async (event) => { - const log = useLogger(event) const { messages } = await readBody(event) - const ai = createAILogger(log) + const { ai, wrapResponse } = createNitroAIStreamLogger(event) ai.onUpdate((metadata) => { pushToClient(event, { @@ -110,7 +108,7 @@ export default defineEventHandler(async (event) => { stopWhen: stepCountIs(5), }) - return createAgentUIStreamResponse({ agent, uiMessages: messages }) + return wrapResponse(createAgentUIStreamResponse({ agent, uiMessages: messages })) }) ``` diff --git a/apps/nuxthub-playground/server/api/chat.post.ts b/apps/nuxthub-playground/server/api/chat.post.ts index 2d847a5c..1b19088c 100644 --- a/apps/nuxthub-playground/server/api/chat.post.ts +++ b/apps/nuxthub-playground/server/api/chat.post.ts @@ -1,5 +1,6 @@ import { ToolLoopAgent, createAgentUIStreamResponse, stepCountIs } from 'ai' -import { createAILogger, createEvlogIntegration } from 'evlog/ai' +import { createEvlogIntegration } from 'evlog/ai' +import { createNitroAIStreamLogger } from 'evlog/ai/nitro' import { queryEvents } from '../tools/query-events' const systemPrompt = `You are a helpful assistant that analyzes application logs stored in a SQLite database. @@ -58,20 +59,19 @@ Use json_extract() for querying JSON columns. Examples: - Only use SELECT queries — never modify data.` export default defineEventHandler(async (event) => { - const logger = useLogger(event) const { messages } = await readBody(event) - - logger.set({ action: 'chat', messagesCount: messages.length }) - - const ai = createAILogger(logger, { - toolInputs: true, - cost: { - 'gemini-3-flash': { input: 0.1, output: 0.4 }, + const { ai, log, wrapResponse } = createNitroAIStreamLogger(event, { + fields: { action: 'chat', messagesCount: messages.length }, + ai: { + toolInputs: true, + cost: { + 'gemini-3-flash': { input: 0.1, output: 0.4 }, + }, }, }) ai.onUpdate((metadata) => { - logger.set({ + log.set({ aiLive: { step: metadata.calls, totalTokens: metadata.totalTokens, @@ -92,7 +92,7 @@ export default defineEventHandler(async (event) => { integrations: [createEvlogIntegration(ai)], }, }) - return createAgentUIStreamResponse({ + return wrapResponse(createAgentUIStreamResponse({ agent, uiMessages: messages, messageMetadata: ({ part }) => { @@ -107,12 +107,12 @@ export default defineEventHandler(async (event) => { } }, onFinish: () => { - logger.set({ + log.set({ aiFinalMetadata: ai.getMetadata(), aiFinalCost: ai.getEstimatedCost(), }) }, - }) + })) } catch (error) { throw createError({ statusCode: 500, diff --git a/packages/evlog/README.md b/packages/evlog/README.md index 91e4c9ae..92734dd0 100644 --- a/packages/evlog/README.md +++ b/packages/evlog/README.md @@ -797,23 +797,31 @@ See [the Audit Logs guide](https://evlog.dev/use-cases/audit/overview) for compl Capture token usage, tool calls, model info, and streaming metrics from the [Vercel AI SDK](https://ai-sdk.dev) into wide events. Requires `ai >= 6.0.0`. ```typescript -import { streamText } from 'ai' -import { createAILogger } from 'evlog/ai' +import { consumeStream, streamText } from 'ai' +import { createEvlogIntegration } from 'evlog/ai' +import { createNitroAIStreamLogger } from 'evlog/ai/nitro' export default defineEventHandler(async (event) => { - const log = useLogger(event) - const ai = createAILogger(log) + const { ai, wrapResponse } = createNitroAIStreamLogger(event) const result = streamText({ model: ai.wrap('anthropic/claude-sonnet-4.6'), // string or model object messages, + experimental_telemetry: { + isEnabled: true, + integrations: [createEvlogIntegration(ai)], + }, onFinish: ({ text }) => saveConversation(text), // no conflict }) - return result.toTextStreamResponse() + return wrapResponse(result.toUIMessageStreamResponse({ + consumeSseStream: consumeStream, + })) }) ``` +For Nuxt/Nitro streaming responses, `createNitroAIStreamLogger(event)` emits a correlated child event with `operation: 'ai-stream'` and `_parentRequestId`. For awaited calls such as `generateText`, bind `createAILogger` to the request logger directly. + The middleware captures: `inputTokens`, `outputTokens`, `cacheReadTokens`, `reasoningTokens`, `model`, `provider`, `finishReason`, `toolCalls`, `steps`, `msToFirstChunk`, `msToFinish`, `tokensPerSecond`. For embeddings: `ai.captureEmbed({ usage })`. @@ -1255,7 +1263,7 @@ The framework emits **one wide event per HTTP request** when the response finish | Express, Fastify, NestJS, SvelteKit, React Router, Elysia | Yes | | Next.js `withEvlog` | Yes | | Hono (`c.get('log')` only) | Not yet | -| Nitro / Nuxt `useLogger(event)` | Not yet — use post-emit warnings; see [Wide events](https://evlog.dev/learn/wide-events) | +| Nitro / Nuxt `useLogger(event)` | Not yet — use `createNitroAIStreamLogger()` for AI streams | ```typescript import { evlog, useLogger } from 'evlog/express' diff --git a/packages/evlog/package.json b/packages/evlog/package.json index 1d905317..94560b61 100644 --- a/packages/evlog/package.json +++ b/packages/evlog/package.json @@ -211,6 +211,11 @@ "import": "./dist/ai/index.mjs", "default": "./dist/ai/index.mjs" }, + "./ai/nitro": { + "types": "./dist/ai/nitro.d.mts", + "import": "./dist/ai/nitro.mjs", + "default": "./dist/ai/nitro.mjs" + }, "./better-auth": { "types": "./dist/better-auth/index.d.mts", "import": "./dist/better-auth/index.mjs", @@ -326,6 +331,9 @@ "ai": [ "./dist/ai/index.d.mts" ], + "ai/nitro": [ + "./dist/ai/nitro.d.mts" + ], "better-auth": [ "./dist/better-auth/index.d.mts" ] diff --git a/packages/evlog/src/ai/nitro.ts b/packages/evlog/src/ai/nitro.ts new file mode 100644 index 00000000..2829220a --- /dev/null +++ b/packages/evlog/src/ai/nitro.ts @@ -0,0 +1,313 @@ +import { getHeaders } from 'h3' +import type { NitroApp } from 'nitropack/types' +import { createRequestLogger, getGlobalPluginRunner } from '../logger' +import { useLogger } from '../runtime/server/useLogger' +import type { DrainContext, EnrichContext, FieldContext, RequestLogger, ServerEvent, WideEvent } from '../types' +import { filterSafeHeaders } from '../utils' +import type { AILogger, AILoggerOptions } from './index' +import { createAILogger } from './index' + +export type NitroAIStreamFields> = FieldContext & { + _forceKeep?: boolean +} + +export interface NitroAIStreamLoggerOptions> { + /** + * Operation label stored on the child wide event. + * @default 'ai-stream' + */ + operation?: string + /** + * Initial fields added to the child wide event. + */ + fields?: FieldContext + /** + * Options passed to `createAILogger()`. + */ + ai?: AILoggerOptions +} + +export interface NitroAIStreamLogger> { + /** AI SDK logger bound to the child stream event. */ + ai: AILogger + /** Child request logger for custom stream metadata. */ + log: RequestLogger + /** + * Emit the child AI stream event through Nitro enrich/drain hooks. + * + * Idempotent: later calls return `null` without warning. + */ + emit: (fields?: NitroAIStreamFields) => Promise + /** + * Return a new response with the same status, status text, headers, and + * streamed body. The child event emits when the body closes or errors. + */ + wrapResponse: (response: Response, fields?: NitroAIStreamFields) => Response +} + +interface ResponseMeta { + status?: number + headers?: Record +} + +interface WaitUntilHost { + waitUntil?: (promise: Promise) => void +} + +function getSafeRequestHeaders(event: ServerEvent): Record { + const headers = getHeaders(event as Parameters[0]) + return filterSafeHeaders(headers) +} + +function getSafeResponseHeaders(response: Response): Record | undefined { + const headers: Record = {} + response.headers.forEach((value, key) => { + headers[key] = value + }) + if (Object.keys(headers).length === 0) return undefined + return filterSafeHeaders(headers) +} + +function resolveWaitUntil(event: ServerEvent): WaitUntilHost | undefined { + return event.context.cloudflare?.context ?? event.context +} + +function toError(value: unknown): Error { + return value instanceof Error ? value : new Error(String(value)) +} + +function splitEmitFields(fields?: NitroAIStreamFields): { + context?: FieldContext + forceKeep: boolean +} { + if (!fields) return { forceKeep: false } + const { _forceKeep, ...context } = fields + return { + context: context as FieldContext, + forceKeep: _forceKeep === true, + } +} + +async function enrichAndDrainNitroEvent( + event: ServerEvent, + emittedEvent: WideEvent, + meta: ResponseMeta, +): Promise { + const nitroApp = await getNitroApp() + const runner = getGlobalPluginRunner() + const request = { + method: event.method, + path: event.path, + requestId: typeof emittedEvent.requestId === 'string' ? emittedEvent.requestId : undefined, + } + const headers = getSafeRequestHeaders(event) + const hookContext = { + request, + headers, + response: { + status: meta.status, + headers: meta.headers, + }, + } + const enrichCtx: EnrichContext = { event: emittedEvent, ...hookContext } + + try { + await nitroApp.hooks.callHook('evlog:enrich', enrichCtx) + } catch (err) { + console.error('[evlog] enrich failed:', err) + } + if (runner.hasEnrich) { + await runner.runEnrich(enrichCtx) + } + + const drainCtx: DrainContext = { + event: emittedEvent, + request, + headers, + } + const drainTasks: Array> = [] + + try { + drainTasks.push( + nitroApp.hooks.callHook('evlog:drain', drainCtx).catch((err: unknown) => { + console.error('[evlog] drain failed:', err) + }), + ) + } catch (err) { + console.error('[evlog] drain failed:', err) + } + if (runner.hasDrain) { + drainTasks.push(runner.runDrain(drainCtx)) + } + + if (drainTasks.length === 0) return + const drainPromise = Promise.all(drainTasks) + const waitUntil = resolveWaitUntil(event) + if (typeof waitUntil?.waitUntil === 'function') { + waitUntil.waitUntil(drainPromise) + } else { + await drainPromise + } +} + +async function getNitroApp(): Promise { + const { useNitroApp } = await import('nitropack/runtime') + return useNitroApp() +} + +function createObservedBody( + body: ReadableStream, + onDone: () => Promise, + onError: (error: unknown) => Promise, +): ReadableStream | null { + if (body.locked) { + void onError(new TypeError('stream is already locked')).catch((err: unknown) => { + console.error('[evlog] stream error handling failed:', err) + }) + return null + } + + const reader = body.getReader() + + return new ReadableStream({ + async pull(controller) { + try { + const { done, value } = await reader.read() + if (done) { + await onDone() + controller.close() + return + } + controller.enqueue(value) + } catch (err) { + await onError(err) + controller.error(err) + } + }, + + async cancel(reason) { + try { + await reader.cancel(reason) + } finally { + await onDone() + } + }, + }) +} + +/** + * Create a child AI stream logger for Nuxt/Nitro streaming responses. + * + * Nitro emits the parent request wide event when the handler returns its + * streaming `Response`, before the AI SDK finishes the body. This helper keeps + * AI metadata on a correlated child event and sends it through the same Nitro + * enrich/drain hooks as normal request logs. + * + * @example + * ```ts + * import { streamText, consumeStream } from 'ai' + * import { createEvlogIntegration } from 'evlog/ai' + * import { createNitroAIStreamLogger } from 'evlog/ai/nitro' + * + * export default defineEventHandler(async (event) => { + * const { ai, wrapResponse } = createNitroAIStreamLogger(event) + * + * const result = streamText({ + * model: ai.wrap('anthropic/claude-sonnet-4.6'), + * messages, + * experimental_telemetry: { + * isEnabled: true, + * integrations: [createEvlogIntegration(ai)], + * }, + * }) + * + * return wrapResponse(result.toUIMessageStreamResponse({ + * consumeSseStream: consumeStream, + * })) + * }) + * ``` + */ +export function createNitroAIStreamLogger>( + event: ServerEvent, + options: NitroAIStreamLoggerOptions = {}, +): NitroAIStreamLogger { + const parent = useLogger(event) + const parentContext = parent.getContext() + const parentRequestId = typeof parentContext.requestId === 'string' ? parentContext.requestId : undefined + const parentService = typeof parentContext.service === 'string' ? parentContext.service : undefined + const waitUntil = resolveWaitUntil(event) + const log = createRequestLogger({ + method: event.method, + path: event.path, + requestId: crypto.randomUUID(), + waitUntil: typeof waitUntil?.waitUntil === 'function' + ? waitUntil.waitUntil.bind(waitUntil) + : undefined, + }, { _deferDrain: true }) + const operation = options.operation ?? 'ai-stream' + let emitted = false + + log.set({ + ...(parentService ? { service: parentService } : {}), + ...options.fields, + operation, + ...(parentRequestId ? { _parentRequestId: parentRequestId } : {}), + } as FieldContext) + + async function emit(fields?: NitroAIStreamFields, meta: ResponseMeta = {}): Promise { + if (emitted) return null + emitted = true + const { context, forceKeep } = splitEmitFields(fields) + + if (context) { + log.set(context) + } + + const status = typeof fields?.status === 'number' ? fields.status : meta.status + const emittedEvent = log.emit({ + ...(typeof status === 'number' ? { status } : {}), + ...(forceKeep ? { _forceKeep: true } : {}), + } as FieldContext & { _forceKeep?: boolean }) + if (!emittedEvent) return null + + await enrichAndDrainNitroEvent(event, emittedEvent, { + ...meta, + status: typeof status === 'number' ? status : meta.status, + }) + return emittedEvent + } + + function wrapResponse(response: Response, fields?: NitroAIStreamFields): Response { + const meta = { + status: response.status, + headers: getSafeResponseHeaders(response), + } + if (!response.body) { + void emit(fields, meta) + return response + } + + const body = createObservedBody( + response.body, + () => emit(fields, meta).then(() => undefined), + (err) => { + log.error(toError(err)) + return emit(fields, meta).then(() => undefined) + }, + ) + if (!body) return response + + return new Response(body, { + status: response.status, + statusText: response.statusText, + headers: response.headers, + }) + } + + return { + ai: createAILogger(log, options.ai), + log, + emit, + wrapResponse, + } +} diff --git a/packages/evlog/src/logger.ts b/packages/evlog/src/logger.ts index 3c60ed0c..2aacbaf9 100644 --- a/packages/evlog/src/logger.ts +++ b/packages/evlog/src/logger.ts @@ -18,7 +18,7 @@ function isoNow(): string { /** Shown after post-emit warnings so users can fix fire-and-forget / ALS continuations. */ const POST_EMIT_FORK_HINT = - 'For intentional background work tied to this request, use log.fork(\'label\', fn) when your integration supports it (see https://evlog.dev).' + 'For intentional background work tied to this request, use log.fork(\'label\', fn) when your integration supports it. For Nuxt/Nitro AI streams, use createNitroAIStreamLogger() from evlog/ai/nitro (see https://evlog.dev).' function warnPostEmit(method: string, detail: string): void { console.warn( diff --git a/packages/evlog/test/ai/nitro.test.ts b/packages/evlog/test/ai/nitro.test.ts new file mode 100644 index 00000000..13adba35 --- /dev/null +++ b/packages/evlog/test/ai/nitro.test.ts @@ -0,0 +1,285 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +import { getHeaders } from 'h3' +import { createNitroAIStreamLogger } from '../../src/ai/nitro' +import { createRequestLogger, initLogger } from '../../src/logger' +import type { DrainContext, EnrichContext, ServerEvent, WideEvent } from '../../src/types' +import { defined } from '../helpers/defined' + +const nitroRuntime = vi.hoisted(() => ({ + app: { + hooks: { + callHook: vi.fn(), + }, + }, +})) + +vi.mock('h3', () => ({ + getHeaders: vi.fn(), +})) + +vi.mock('nitropack/runtime', () => ({ + useNitroApp: () => nitroRuntime.app, +})) + +const encoder = new TextEncoder() + +function createWaitUntil() { + const promises: Array> = [] + const waitUntil = vi.fn((promise: Promise) => { + promises.push(promise) + }) + return { promises, waitUntil } +} + +function createEvent(waitUntil?: (promise: Promise) => void): { + event: ServerEvent + parent: ReturnType +} { + const parent = createRequestLogger({ + method: 'POST', + path: '/api/chat', + requestId: 'parent-req', + }, { _deferDrain: true }) + parent.set({ service: 'chat-api' }) + + return { + parent, + event: { + method: 'POST', + path: '/api/chat', + context: { + log: parent, + ...(waitUntil + ? { cloudflare: { context: { waitUntil } } } + : {}), + }, + }, + } +} + +function createDeferredStream() { + let close: (() => void) | undefined + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode('hello')) + close = () => { + controller.enqueue(encoder.encode(' world')) + controller.close() + } + }, + }) + + return { + stream, + close: () => defined(close, 'close stream')(), + } +} + +function createStream(chunks: string[]): ReadableStream { + return new ReadableStream({ + start(controller) { + for (const chunk of chunks) { + controller.enqueue(encoder.encode(chunk)) + } + controller.close() + }, + }) +} + +function collectNitroEvents() { + const order: string[] = [] + const drained: WideEvent[] = [] + const drainContexts: DrainContext[] = [] + + nitroRuntime.app.hooks.callHook.mockImplementation((name: string, ctx: EnrichContext | DrainContext) => { + order.push(name) + if (name === 'evlog:enrich') { + ctx.event.enriched = true + } + if (name === 'evlog:drain') { + drained.push(structuredClone(ctx.event)) + drainContexts.push({ + event: structuredClone(ctx.event), + request: ctx.request, + headers: ctx.headers, + }) + } + return Promise.resolve() + }) + + return { drained, drainContexts, order } +} + +describe('createNitroAIStreamLogger', () => { + beforeEach(() => { + initLogger({ + env: { service: 'test', environment: 'test' }, + pretty: false, + silent: true, + _suppressDrainWarning: true, + }) + vi.mocked(getHeaders).mockReturnValue({ + authorization: 'Bearer secret', + 'user-agent': 'vitest', + }) + nitroRuntime.app.hooks.callHook.mockReset() + }) + + afterEach(() => { + vi.restoreAllMocks() + }) + + it('emits a correlated child AI event after the parent request has emitted', async () => { + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) + const { promises, waitUntil } = createWaitUntil() + const { event, parent } = createEvent(waitUntil) + const { drained, drainContexts, order } = collectNitroEvents() + const source = createDeferredStream() + const stream = createNitroAIStreamLogger(event, { + fields: { chatId: 'chat_1', userId: 'user_1' }, + }) + + const response = stream.wrapResponse(new Response(source.stream, { + status: 202, + headers: { + 'content-type': 'text/plain', + 'x-stream': 'yes', + }, + })) + + parent.emit({ status: 202 }) + stream.ai.captureEmbed({ + usage: { tokens: 42 }, + model: 'text-embedding-3-small', + }) + + const text = response.text() + source.close() + + await expect(text).resolves.toBe('hello world') + await Promise.all(promises) + + expect(warnSpy.mock.calls.some(([message]) => String(message).includes('log.set() called after the wide event was emitted'))).toBe(false) + expect(waitUntil).toHaveBeenCalledTimes(1) + expect(order).toEqual(['evlog:enrich', 'evlog:drain']) + expect(drained).toHaveLength(1) + expect(defined(drainContexts[0], 'drain context').headers).toEqual({ + 'user-agent': 'vitest', + }) + + const child = defined(drained[0], 'child event') + expect(child.operation).toBe('ai-stream') + expect(child._parentRequestId).toBe('parent-req') + expect(child.requestId).not.toBe('parent-req') + expect(child.service).toBe('chat-api') + expect(child.chatId).toBe('chat_1') + expect(child.userId).toBe('user_1') + expect(child.enriched).toBe(true) + expect(child.ai).toMatchObject({ + calls: 1, + inputTokens: 42, + outputTokens: 0, + totalTokens: 42, + embedding: { + model: 'text-embedding-3-small', + tokens: 42, + }, + }) + }) + + it('preserves response status, headers, and streamed body chunks', async () => { + const { event } = createEvent() + collectNitroEvents() + const stream = createNitroAIStreamLogger(event) + + const response = stream.wrapResponse(new Response(createStream(['a', 'b']), { + status: 207, + statusText: 'Multi-Status', + headers: { + 'content-type': 'text/plain', + 'x-stream': 'yes', + }, + })) + + expect(response.status).toBe(207) + expect(response.statusText).toBe('Multi-Status') + expect(response.headers.get('x-stream')).toBe('yes') + await expect(response.text()).resolves.toBe('ab') + }) + + it('records stream errors on the child event and drains once', async () => { + const { promises, waitUntil } = createWaitUntil() + const { event } = createEvent(waitUntil) + const { drained } = collectNitroEvents() + const error = new Error('stream exploded') + const source = new ReadableStream({ + pull() { + throw error + }, + }) + const stream = createNitroAIStreamLogger(event) + const response = stream.wrapResponse(new Response(source, { status: 200 })) + + await expect(response.text()).rejects.toThrow('stream exploded') + await Promise.all(promises) + + expect(drained).toHaveLength(1) + const child = defined(drained[0], 'child event') + expect(child.level).toBe('error') + expect(child.error).toMatchObject({ message: 'stream exploded' }) + const drainCalls = nitroRuntime.app.hooks.callHook.mock.calls.filter(([name]) => name === 'evlog:drain') + expect(drainCalls).toHaveLength(1) + }) + + it('records an error and returns the original response when the body is already locked', async () => { + const { promises, waitUntil } = createWaitUntil() + const { event } = createEvent(waitUntil) + const { drained } = collectNitroEvents() + const original = new Response(createStream(['locked']), { status: 200 }) + const body = defined(original.body, 'response body') + const reader = body.getReader() + const stream = createNitroAIStreamLogger(event) + + try { + const response = stream.wrapResponse(original) + + expect(response).toBe(original) + await vi.waitFor(() => { + expect(promises).toHaveLength(1) + }) + await Promise.all(promises) + + expect(drained).toHaveLength(1) + expect(defined(drained[0], 'child event').error).toMatchObject({ + message: 'stream is already locked', + }) + } finally { + reader.releaseLock() + } + }) + + it('handles locked body error handler rejections', async () => { + const waitUntilError = new Error('waitUntil failed') + const waitUntil = vi.fn((_promise: Promise) => { + throw waitUntilError + }) + const errorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}) + const { event } = createEvent(waitUntil) + collectNitroEvents() + const original = new Response(createStream(['locked']), { status: 200 }) + const body = defined(original.body, 'response body') + const reader = body.getReader() + const stream = createNitroAIStreamLogger(event) + + try { + const response = stream.wrapResponse(original) + + expect(response).toBe(original) + await vi.waitFor(() => { + expect(errorSpy).toHaveBeenCalledWith('[evlog] stream error handling failed:', waitUntilError) + }) + } finally { + reader.releaseLock() + } + }) +}) diff --git a/packages/evlog/test/toolkit/__snapshots__/api-surface.test.ts.snap b/packages/evlog/test/toolkit/__snapshots__/api-surface.test.ts.snap index 9209ce33..aa1fa0d4 100644 --- a/packages/evlog/test/toolkit/__snapshots__/api-surface.test.ts.snap +++ b/packages/evlog/test/toolkit/__snapshots__/api-surface.test.ts.snap @@ -44,6 +44,9 @@ exports[`public API surface > matches snapshot for all subpath exports 1`] = ` "createAIMiddleware", "createEvlogIntegration", ], + "./ai/nitro": [ + "createNitroAIStreamLogger", + ], "./axiom": [ "createAxiomDrain", "sendBatchToAxiom", diff --git a/packages/evlog/tsdown.config.ts b/packages/evlog/tsdown.config.ts index 1b7744a0..b8ffc6a1 100644 --- a/packages/evlog/tsdown.config.ts +++ b/packages/evlog/tsdown.config.ts @@ -54,6 +54,7 @@ export default defineConfig({ 'client': 'src/client.ts', 'toolkit': 'src/shared/index.ts', 'ai/index': 'src/ai/index.ts', + 'ai/nitro': 'src/ai/nitro.ts', 'better-auth/index': 'src/better-auth/index.ts', }, format: 'esm',