Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/streaming-ai-emit-defer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'evlog': minor
---

Defer wide-event emit for streaming HTTP responses (SSE, AI SDK UI streams, chunked bodies) until the response body finishes, so `createAILogger()` metadata is included on the same request event instead of triggering post-emit warnings.

Applies to Next.js `withEvlog`, SvelteKit, Hono, React Router, oRPC, and Nitro/Nuxt integrations. Also merges late `ai` fields onto an emitted event before enrich/drain when metadata arrives in a narrow race window.

Fixes #321
2 changes: 2 additions & 0 deletions apps/docs/content/2.learn/2.wide-events.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ The parent wide event may be emitted **before** the child event; they are two se

**Not available yet:** Hono (no `useLogger` without `c.get('log')` + ALS) and Nitro/Nuxt `useLogger(event)` — use the post-emit warnings to catch mistakes; a different API may arrive later for event-scoped forks.

For AI SDK streaming responses, supported framework integrations (Next.js, Nitro/Nuxt, SvelteKit, Hono, React Router, oRPC) defer wide-event emit until the response body finishes, so `createAILogger(log)` metadata lands on the same request event automatically.

```typescript [server/routes/checkout.post.ts]
import { evlog, useLogger } from 'evlog/express'

Expand Down
2 changes: 1 addition & 1 deletion apps/docs/content/3.integrate/frameworks/02.nextjs.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ export const POST = withEvlog(async (request: Request) => {
})
```

All fields are merged into a single wide event emitted when the handler completes:
All fields are merged into a single wide event emitted when the handler completes (or when a streaming response body finishes, so AI SDK metadata is included):

```bash [Output (Pretty)]
10:23:45.612 INFO [my-app] POST /api/checkout 200 in 145ms
Expand Down
2 changes: 1 addition & 1 deletion apps/docs/content/3.integrate/frameworks/04.nitro.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ One request, one log line with all context:
└─ requestId: a1b2c3d4-...
```

Nitro uses **`useLogger(event)`** (event-bound scope), not `AsyncLocalStorage`, so **`log.fork()` is not available** here yet. Post-emit warnings still apply if code calls `set()` after the wide event was emitted. See [Wide events — After emit](/learn/wide-events#after-emit-sealing-and-background-work).
Nitro uses **`useLogger(event)`** (event-bound scope), not `AsyncLocalStorage`, so **`log.fork()` is not available** here yet. For AI SDK streaming responses, evlog defers wide-event emit until the response body finishes so `createAILogger(log)` metadata stays on the same request event. Post-emit warnings only apply when code calls `set()` after the wide event has actually emitted — for example in non-streaming handlers or background work. See [Wide events — After emit](/learn/wide-events#after-emit-sealing-and-background-work).

## Error Handling

Expand Down
2 changes: 2 additions & 0 deletions apps/docs/content/4.use-cases/2.ai-sdk/02.usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ links:

Every pattern below uses the same `createAILogger(log)` setup. Wrap the model with `ai.wrap()` and the middleware accumulates tokens, tools, and timing on the wide event automatically.

On Next.js, Nuxt/Nitro, SvelteKit, Hono, React Router, and oRPC, evlog defers wide-event emit for streaming responses (for example `text/event-stream` and AI SDK UI streams) until the body finishes, so late `ai` metadata stays on the same request event.

## streamText

The most common pattern — streaming chat with full observability:
Expand Down
10 changes: 9 additions & 1 deletion packages/evlog/src/hono/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Context, MiddlewareHandler } from 'hono'
import type { RequestLogger } from '../types'
import { defineFrameworkIntegration } from '../shared/integration'
import type { BaseEvlogOptions } from '../shared/middleware'
import { shouldDeferEmitForResponse } from '../shared/streamResponse'

export type EvlogHonoOptions = BaseEvlogOptions

Expand Down Expand Up @@ -54,13 +55,20 @@ const integration = defineFrameworkIntegration<Context>({
*/
export function evlog(options: EvlogHonoOptions = {}): MiddlewareHandler {
return async (c, next) => {
const { skipped, finish } = integration.start(c, options)
const { skipped, finish, finishResponse } = integration.start(c, options)
if (skipped) {
await next()
return
}
try {
await next()
if (shouldDeferEmitForResponse(c.res)) {
const response = new Response(c.res.body, {
status: c.res.status,
headers: c.res.headers,
})
return finishResponse(response, { status: response.status })
}
await finish({ status: c.res.status })
} catch (error) {
await finish({ error: error as Error })
Expand Down
39 changes: 38 additions & 1 deletion packages/evlog/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,25 @@ function mergeInto(target: Record<string, unknown>, source: Record<string, unkno
}
}

const pendingDrainState = new WeakMap<WideEvent, { drainStarted: boolean }>()

function isAiOnlyFieldUpdate(data: Record<string, unknown>): boolean {
const keys = Object.keys(data)
return keys.length === 1 && keys[0] === 'ai'
}

/**
* Mark a wide event as past the post-emit AI merge window so late `log.set({ ai })`
* calls warn again. Called by framework enrich/drain pipelines before drain runs.
*
* @internal Used by middleware and framework integrations.
*/
export function markWideEventDrainStarted(event: WideEvent | null): void {
if (!event) return
const state = pendingDrainState.get(event)
if (state) state.drainStarted = true
}

/**
* @internal Wide-event field merge — exported for test mocks that mirror emit accumulation.
*/
Expand Down Expand Up @@ -777,6 +796,7 @@ export function createLogger<T extends object = Record<string, unknown>>(initial
let hasWarn = false
let manualLevel: LogLevel | undefined
let emitted = false
let pendingWideEvent: WideEvent | null = null

function addLog(level: 'info' | 'warn', message: string): void {
if (!Array.isArray(context.requestLogs)) {
Expand Down Expand Up @@ -811,7 +831,18 @@ export function createLogger<T extends object = Record<string, unknown>>(initial
audit: auditMethod,
set(data: FieldContext<T>): void {
if (emitted) {
const keys = Object.keys(data as Record<string, unknown>)
const record = data as Record<string, unknown>
const pendingState = pendingWideEvent ? pendingDrainState.get(pendingWideEvent) : undefined
if (
pendingWideEvent
&& pendingState
&& !pendingState.drainStarted
&& isAiOnlyFieldUpdate(record)
) {
mergeInto(pendingWideEvent as Record<string, unknown>, record)
return
}
const keys = Object.keys(record)
warnPostEmit('log.set()', `Keys dropped: ${keys.length ? keys.join(', ') : '(empty)'}.`)
return
}
Expand Down Expand Up @@ -924,6 +955,7 @@ export function createLogger<T extends object = Record<string, unknown>>(initial

if (!forceKeep && !shouldSample(level)) {
emitted = true
pendingWideEvent = null
return null
}

Expand All @@ -937,6 +969,11 @@ export function createLogger<T extends object = Record<string, unknown>>(initial

const wide = emitWideEvent(level, context, { deferDrain, ownsEvent: true, waitUntil })
emitted = true
pendingWideEvent = wide
if (wide) {
// Only enable the AI merge window when middleware defers drain until finish.
pendingDrainState.set(wide, { drainStarted: !deferDrain })
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return wide
},

Expand Down
87 changes: 46 additions & 41 deletions packages/evlog/src/next/handler.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import type { DrainContext, EnrichContext, TailSamplingContext, WideEvent } from '../types'
import { createRequestLogger, getGlobalDrain, initLogger, isEnabled, isLoggerLocked } from '../logger'
import { createRequestLogger, getGlobalDrain, initLogger, isEnabled, isLoggerLocked, markWideEventDrainStarted } from '../logger'
import { attachForkToLogger } from '../shared/fork'
import type { MiddlewareLoggerOptions } from '../shared/middleware'
import { shouldLog, getServiceForPath } from '../shared/routes'
import { bindStreamingResponseLifecycle, shouldDeferEmitForResponse } from '../shared/streamResponse'
import { filterSafeHeaders } from '../utils'
import { EvlogError } from '../error'
import type { NextEvlogOptions } from './types'
Expand Down Expand Up @@ -83,6 +84,8 @@ async function callEnrichAndDrain(
}
}

markWideEventDrainStarted(emittedEvent)

if (drain) {
const drainCtx: DrainContext = {
event: emittedEvent,
Expand Down Expand Up @@ -112,6 +115,33 @@ async function callEnrichAndDrain(
run().catch(() => {})
}

async function emitRequestEvent(
logger: ReturnType<typeof createRequestLogger>,
requestInfo: { method: string, path: string, requestId: string },
headers: Record<string, string>,
status: number,
): Promise<void> {
let forceKeep = false
if (state.options.keep) {
try {
const tailCtx: TailSamplingContext = {
status,
path: requestInfo.path,
method: requestInfo.method,
context: logger.getContext(),
shouldKeep: false,
}
await state.options.keep(tailCtx)
forceKeep = tailCtx.shouldKeep ?? false
} catch (err) {
console.error('[evlog] keep callback failed:', err)
}
}

const emittedEvent = logger.emit({ _forceKeep: forceKeep })
await callEnrichAndDrain(emittedEvent, requestInfo, headers, status)
}

/**
* Wrap a Next.js route handler or server action with evlog request-scoped logging.
*
Expand Down Expand Up @@ -197,6 +227,19 @@ export function createWithEvlog(options: NextEvlogOptions) {

try {
const result = await evlogStorage.run(logger, () => handler(...args))
const requestInfo = { method, path, requestId }

if (result instanceof Response && shouldDeferEmitForResponse(result)) {
const wrapped = bindStreamingResponseLifecycle(result, async (meta) => {
if (meta.error) {
logger.error(meta.error)
}
const finalStatus = meta.status ?? result.status
logger.set({ status: finalStatus })
await emitRequestEvent(logger, requestInfo, headers, finalStatus)
})
return wrapped as Awaited<TReturn>
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// Extract response status
let { status } = { status: 200 }
Expand All @@ -205,26 +248,7 @@ export function createWithEvlog(options: NextEvlogOptions) {
}
logger.set({ status })

// Build tail sampling context and call keep callback
let forceKeep = false
if (state.options.keep) {
try {
const tailCtx: TailSamplingContext = {
status,
path,
method,
context: logger.getContext(),
shouldKeep: false,
}
await state.options.keep(tailCtx)
forceKeep = tailCtx.shouldKeep ?? false
} catch (err) {
console.error('[evlog] keep callback failed:', err)
}
}

const emittedEvent = logger.emit({ _forceKeep: forceKeep })
await callEnrichAndDrain(emittedEvent, { method, path, requestId }, headers, status)
await emitRequestEvent(logger, requestInfo, headers, status)

return result as Awaited<TReturn>
} catch (error) {
Expand All @@ -235,26 +259,7 @@ export function createWithEvlog(options: NextEvlogOptions) {
?? 500
logger.set({ status: errorStatus })

// Build tail sampling context and call keep callback
let forceKeep = false
if (state.options.keep) {
try {
const tailCtx: TailSamplingContext = {
status: errorStatus,
path,
method,
context: logger.getContext(),
shouldKeep: false,
}
await state.options.keep(tailCtx)
forceKeep = tailCtx.shouldKeep ?? false
} catch (err) {
console.error('[evlog] keep callback failed:', err)
}
}

const emittedEvent = logger.emit({ _forceKeep: forceKeep })
await callEnrichAndDrain(emittedEvent, { method, path, requestId }, headers, errorStatus)
await emitRequestEvent(logger, { method, path, requestId }, headers, errorStatus)

// Return structured JSON response for EvlogErrors (like H3 does for Nuxt)
if (isRequest && error instanceof EvlogError) {
Expand Down
55 changes: 37 additions & 18 deletions packages/evlog/src/nitro-v3/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
import type { CaptureError } from 'nitro/types'
import type { HTTPEvent } from 'nitro/h3'
import { parseURL } from 'ufo'
import { createRequestLogger, getGlobalPluginRunner, initLogger, isEnabled } from '../logger'
import { createRequestLogger, getGlobalPluginRunner, initLogger, isEnabled, markWideEventDrainStarted } from '../logger'
import { registerPrettyErrorSnippetReader } from '../shared/pretty-error'
import { readCodeSnippetFromDisk } from '../shared/pretty-error-snippet.node'
import { enrichErrorStackForDev } from '../shared/enrich-error-stack.node'
import { shouldLog, getServiceForPath, extractErrorStatus } from '../nitro'
import { extendDeferredDrain } from '../nitro/enrich-drain'
import { normalizeRedactConfig } from '../redact'
import { resolveEvlogConfigForNitroPlugin, setActiveNitroRuntime } from '../shared/nitroConfigBridge'
import { bindStreamingResponseLifecycle, shouldDeferEmitForResponse } from '../shared/streamResponse'
import type { EnrichContext, RequestLogger, TailSamplingContext, WideEvent } from '../types'
import { filterSafeHeaders } from '../utils'

Expand Down Expand Up @@ -70,7 +71,7 @@
}
}

async function callDrainHook(

Check warning on line 74 in packages/evlog/src/nitro-v3/plugin.ts

View workflow job for this annotation

GitHub Actions / autofix

Async function 'callDrainHook' has too many parameters (5). Maximum allowed is 4

Check warning on line 74 in packages/evlog/src/nitro-v3/plugin.ts

View workflow job for this annotation

GitHub Actions / lint

Async function 'callDrainHook' has too many parameters (5). Maximum allowed is 4
hooks: Hooks,
emittedEvent: WideEvent | null,
event: HTTPEvent,
Expand Down Expand Up @@ -123,7 +124,7 @@
}
}

async function callEnrichAndDrain(

Check warning on line 127 in packages/evlog/src/nitro-v3/plugin.ts

View workflow job for this annotation

GitHub Actions / autofix

Async function 'callEnrichAndDrain' has too many parameters (5). Maximum allowed is 4

Check warning on line 127 in packages/evlog/src/nitro-v3/plugin.ts

View workflow job for this annotation

GitHub Actions / lint

Async function 'callEnrichAndDrain' has too many parameters (5). Maximum allowed is 4
hooks: Hooks,
emittedEvent: WideEvent | null,
event: HTTPEvent,
Expand All @@ -146,6 +147,8 @@
await runner.runEnrich(enrichCtx)
}

markWideEventDrainStarted(emittedEvent)

await callDrainHook(hooks, emittedEvent, event, hookContext, options)
}

Expand Down Expand Up @@ -241,29 +244,45 @@
const log = ctx?.log as RequestLogger | undefined
if (!log || !ctx) return

const { status } = res
log.set({ status })
const emitSuccessResponse = async (responseStatus: number) => {
log.set({ status: responseStatus })

const startTime = ctx._evlogStartTime as number | undefined
const durationMs = startTime ? Date.now() - startTime : undefined
const startTime = ctx._evlogStartTime as number | undefined
const durationMs = startTime ? Date.now() - startTime : undefined

const { pathname } = parseURL(event.req.url)
const { pathname } = parseURL(event.req.url)

const tailCtx: TailSamplingContext = {
status,
duration: durationMs,
path: pathname,
method: event.req.method,
context: log.getContext(),
shouldKeep: false,
const tailCtx: TailSamplingContext = {
status: responseStatus,
duration: durationMs,
path: pathname,
method: event.req.method,
context: log.getContext(),
shouldKeep: false,
}

await hooks.callHook('evlog:emit:keep', tailCtx)
const runner = getGlobalPluginRunner()
if (runner.hasKeep) await runner.runKeep(tailCtx)

const emittedEvent = log.emit({ _forceKeep: tailCtx.shouldKeep })
await callEnrichAndDrain(hooks, emittedEvent, event, res)
}

await hooks.callHook('evlog:emit:keep', tailCtx)
const runner = getGlobalPluginRunner()
if (runner.hasKeep) await runner.runKeep(tailCtx)
if (shouldDeferEmitForResponse(res)) {
const wrapped = bindStreamingResponseLifecycle(res, async (meta) => {
if (meta.error) {
log.error(meta.error)
}
await emitSuccessResponse(meta.status ?? res.status)
})
if (wrapped !== res && 'res' in event) {
(event as { res: Response }).res = wrapped
}
return
}

const emittedEvent = log.emit({ _forceKeep: tailCtx.shouldKeep })
await callEnrichAndDrain(hooks, emittedEvent, event, res)
await emitSuccessResponse(res.status)
})

hooks.hook('error', async (error, { event }) => {
Expand Down
Loading
Loading