diff --git a/.changeset/spicy-laws-fold.md b/.changeset/spicy-laws-fold.md new file mode 100644 index 000000000..0f722de7a --- /dev/null +++ b/.changeset/spicy-laws-fold.md @@ -0,0 +1,5 @@ +--- +"@opennextjs/aws": minor +--- + +Make PPR work with the cache interceptor diff --git a/packages/open-next/src/core/requestHandler.ts b/packages/open-next/src/core/requestHandler.ts index b3857b0b9..f99b5bfa9 100644 --- a/packages/open-next/src/core/requestHandler.ts +++ b/packages/open-next/src/core/requestHandler.ts @@ -10,6 +10,8 @@ import type { } from "types/open-next"; import { runWithOpenNextRequestContext } from "utils/promise"; +import { Writable } from "node:stream"; +import { finished } from "node:stream/promises"; import { NextConfig } from "config/index"; import type { OpenNextHandlerOptions } from "types/overrides"; import { debug, error } from "../adapters/logger"; @@ -91,10 +93,7 @@ export async function openNextHandler( }); //#endOverride - const headers = - "type" in routingResult - ? routingResult.headers - : routingResult.internalEvent.headers; + const headers = getHeaders(routingResult); const overwrittenResponseHeaders: Record = {}; @@ -205,10 +204,42 @@ export async function openNextHandler( const req = new IncomingMessage(reqProps); const res = createServerResponse( routingResult, - overwrittenResponseHeaders, + routingResult.initialResponse + ? routingResult.initialResponse.headers + : overwrittenResponseHeaders, options?.streamCreator, ); + if (routingResult.initialResponse) { + res.statusCode = routingResult.initialResponse.statusCode; + res.flushHeaders(); + for await (const chunk of routingResult.initialResponse.body) { + res.write(chunk); + } + + //We create a special response for the PPR resume request + const pprRes = createServerResponse( + routingResult, + overwrittenResponseHeaders, + { + writeHeaders: () => { + return new Writable({ + write(chunk, encoding, callback) { + res.write(chunk, encoding, callback); + }, + }); + }, + }, + ); + await adapterHandler(req, pprRes, routingResult, { + waitUntil: options?.waitUntil, + }); + await finished(pprRes); + res.end(); + + return convertRes(res); + } + //#override useAdapterHandler await adapterHandler(req, res, routingResult, { waitUntil: options?.waitUntil, @@ -239,6 +270,14 @@ export async function openNextHandler( ); } +function getHeaders(routingResult: RoutingResult | InternalResult) { + if ("type" in routingResult) { + return routingResult.headers; + } else { + return routingResult.internalEvent.headers; + } +} + async function processRequest( req: IncomingMessage, res: OpenNextNodeResponse, diff --git a/packages/open-next/src/core/routing/adapterHandler.ts b/packages/open-next/src/core/routing/adapterHandler.ts index c6c0125aa..02978ce3d 100644 --- a/packages/open-next/src/core/routing/adapterHandler.ts +++ b/packages/open-next/src/core/routing/adapterHandler.ts @@ -2,6 +2,7 @@ import type { IncomingMessage } from "node:http"; import { finished } from "node:stream/promises"; import type { OpenNextNodeResponse } from "http/index"; import type { ResolvedRoute, RoutingResult, WaitUntil } from "types/open-next"; +import { debug, error } from "../../adapters/logger"; /** * This function loads the necessary routes, and invoke the expected handler. @@ -25,17 +26,17 @@ export async function adapterHandler( } try { - console.log("## adapterHandler trying route", route, req.url); + debug("## adapterHandler trying route", route, req.url); const result = await module.handler(req, res, { waitUntil: options.waitUntil, }); await finished(res); // Not sure this one is necessary. - console.log("## adapterHandler route succeeded", route); + debug("## adapterHandler route succeeded", route); resolved = true; return result; //If it doesn't throw, we are done } catch (e) { - console.log("## adapterHandler route failed", route, e); + error("## adapterHandler route failed", route, e); // I'll have to run some more tests, but in theory, we should not have anything special to do here, and we should return the 500 page here. } } diff --git a/packages/open-next/src/core/routing/cacheInterceptor.ts b/packages/open-next/src/core/routing/cacheInterceptor.ts index 08d308749..13d4d50d6 100644 --- a/packages/open-next/src/core/routing/cacheInterceptor.ts +++ b/packages/open-next/src/core/routing/cacheInterceptor.ts @@ -5,6 +5,7 @@ import type { InternalEvent, InternalResult, MiddlewareEvent, + PartialResult, } from "types/open-next"; import type { CacheValue } from "types/overrides"; import { emptyReadableStream, toReadableStream } from "utils/stream"; @@ -134,17 +135,60 @@ function getBodyForAppRouter( } } +function createPprPartialResult( + event: MiddlewareEvent, + localizedPath: string, + cachedValue: CacheValue<"cache">, + responseBody: string | (() => ReturnType), + contentType: string, +): PartialResult { + if (cachedValue.type !== "app") { + throw new Error("createPprPartialResult called with non-app cache value"); + } + + return { + resumeRequest: { + ...event, + method: "POST", + url: `http://${event.headers.host}${NextConfig.basePath || ""}${ + localizedPath || "/" + }`, + headers: { + ...event.headers, + "next-resume": "1", + }, + rawPath: localizedPath, + body: Buffer.from(cachedValue.meta?.postponed || "", "utf-8"), + }, + result: { + type: "core", + statusCode: event.rewriteStatusCode ?? cachedValue.meta?.status ?? 200, + body: + typeof responseBody === "string" + ? toReadableStream(responseBody) + : responseBody(), + isBase64Encoded: false, + headers: { + "content-type": contentType, + "x-opennext-ppr": "1", + ...cachedValue.meta?.headers, + vary: VARY_HEADER, + }, + }, + }; +} + async function generateResult( event: MiddlewareEvent, localizedPath: string, cachedValue: CacheValue<"cache">, lastModified?: number, -): Promise { +): Promise { debug("Returning result from experimental cache"); - let body = ""; let type = "application/octet-stream"; let isDataRequest = false; let additionalHeaders = {}; + let body: string; if (cachedValue.type === "app") { isDataRequest = Boolean(event.headers.rsc); if (isDataRequest) { @@ -152,19 +196,46 @@ async function generateResult( getBodyForAppRouter(event, cachedValue); body = appRouterBody; additionalHeaders = appHeaders; + if (cachedValue.meta?.postponed) { + debug("App router postponed request detected", localizedPath); + return createPprPartialResult( + event, + localizedPath, + cachedValue, + () => emptyReadableStream(), + "text/x-component", + ); + } + debug("App router data request detected", localizedPath, body); } else { - body = cachedValue.html; + if (cachedValue.meta?.postponed) { + debug("Postponed request detected", localizedPath); + return createPprPartialResult( + event, + localizedPath, + cachedValue, + cachedValue.html, + "text/html; charset=utf-8", + ); + } else { + body = cachedValue.html; + } } type = isDataRequest ? "text/x-component" : "text/html; charset=utf-8"; } else if (cachedValue.type === "page") { isDataRequest = Boolean(event.query.__nextDataReq); - body = isDataRequest ? JSON.stringify(cachedValue.json) : cachedValue.html; + if (isDataRequest) { + body = JSON.stringify(cachedValue.json); + } else { + body = cachedValue.html; + } type = isDataRequest ? "application/json" : "text/html; charset=utf-8"; } else { throw new Error( "generateResult called with unsupported cache value type, only 'app' and 'page' are supported", ); } + // close(); const cacheControl = await computeCacheControl( localizedPath, body, @@ -180,7 +251,7 @@ async function generateResult( // `NextResponse.rewrite(url, { status: xxx}) // The rewrite status code should take precedence over the cached one statusCode: event.rewriteStatusCode ?? cachedValue.meta?.status ?? 200, - body: toReadableStream(body, false), + body: toReadableStream(body, isBinaryContentType(type)), isBase64Encoded: false, headers: { ...cacheControl, @@ -227,13 +298,20 @@ function decodePathParams(pathname: string): string { export async function cacheInterceptor( event: MiddlewareEvent, -): Promise { +): Promise { if ( Boolean(event.headers["next-action"]) || - Boolean(event.headers["x-prerender-revalidate"]) + Boolean(event.headers["x-prerender-revalidate"]) || + Boolean(event.headers["next-resume"]) || + event.method !== "GET" ) return event; + // if(Boolean(event.headers.rsc) && !(Boolean(event.headers["next-router-prefetch"]) || Boolean(event.headers[NEXT_SEGMENT_PREFETCH_HEADER]))) { + // // Let the handler deal with RSC requests with no prefetch header as they are SSR requests + // return event; + // } + // Check for Next.js preview mode cookies const cookies = event.headers.cookie || ""; const hasPreviewData = @@ -258,19 +336,35 @@ export async function cacheInterceptor( debug("Checking cache for", localizedPath, PrerenderManifest); - const isISR = - Object.keys(PrerenderManifest?.routes ?? {}).includes( - localizedPath ?? "/", - ) || - Object.values(PrerenderManifest?.dynamicRoutes ?? {}).some((dr) => - new RegExp(dr.routeRegex).test(localizedPath), - ); + const isDynamicISR = Object.values( + PrerenderManifest?.dynamicRoutes ?? {}, + ).some((dr) => { + const regex = new RegExp(dr.routeRegex); + return regex.test(localizedPath); + }); + + const isStaticRoute = Object.keys(PrerenderManifest?.routes ?? {}).includes( + localizedPath || "/", + ); + + const isISR = isStaticRoute || isDynamicISR; debug("isISR", isISR); if (isISR) { try { - const cachedData = await globalThis.incrementalCache.get( - localizedPath ?? "/index", - ); + let pathToUse = localizedPath; + // For PPR, we need to check the fallback value to get the correct cache key + // We don't want to override a static route though + if (isDynamicISR && !isStaticRoute) { + pathToUse = Object.entries(PrerenderManifest?.dynamicRoutes ?? {}).find( + ([, dr]) => { + const regex = new RegExp(dr.routeRegex); + return regex.test(localizedPath); + }, + )?.[1].fallback! as string; + } else if (localizedPath === "") { + pathToUse = "/index"; + } + const cachedData = await globalThis.incrementalCache.get(pathToUse); debug("cached data in interceptor", cachedData); if (!cachedData?.value) { diff --git a/packages/open-next/src/core/routingHandler.ts b/packages/open-next/src/core/routingHandler.ts index 951be07c5..9b429d48a 100644 --- a/packages/open-next/src/core/routingHandler.ts +++ b/packages/open-next/src/core/routingHandler.ts @@ -8,6 +8,7 @@ import { import type { InternalEvent, InternalResult, + PartialResult, ResolvedRoute, RoutingResult, } from "types/open-next"; @@ -235,26 +236,43 @@ export default async function routingHandler( }; } + const resolvedRoutes: ResolvedRoute[] = [ + ...foundStaticRoute, + ...foundDynamicRoute, + ]; + if ( globalThis.openNextConfig.dangerous?.enableCacheInterception && !isInternalResult(eventOrResult) ) { debug("Cache interception enabled"); - eventOrResult = await cacheInterceptor(eventOrResult); - if (isInternalResult(eventOrResult)) { - applyMiddlewareHeaders(eventOrResult, headers); - return eventOrResult; + const cacheInterceptionResult = await cacheInterceptor(eventOrResult); + if (isInternalResult(cacheInterceptionResult)) { + applyMiddlewareHeaders(cacheInterceptionResult, headers); + return cacheInterceptionResult; + } else if (isPartialResult(cacheInterceptionResult)) { + // We need to apply the headers to both the result (the streamed response) and the resume request + applyMiddlewareHeaders(cacheInterceptionResult.result, headers); + applyMiddlewareHeaders(cacheInterceptionResult.resumeRequest, headers); + return { + internalEvent: cacheInterceptionResult.resumeRequest, + isExternalRewrite: false, + origin: false, + isISR: false, + resolvedRoutes, + initialURL: event.url, + locale: NextConfig.i18n + ? detectLocale(eventOrResult, NextConfig.i18n) + : undefined, + rewriteStatusCode: middlewareEventOrResult.rewriteStatusCode, + initialResponse: cacheInterceptionResult.result, + }; } } // We apply the headers from the middleware response last applyMiddlewareHeaders(eventOrResult, headers); - const resolvedRoutes: ResolvedRoute[] = [ - ...foundStaticRoute, - ...foundDynamicRoute, - ]; - debug("resolvedRoutes", resolvedRoutes); return { @@ -301,8 +319,18 @@ export default async function routingHandler( * @param eventOrResult * @returns Whether the event is an instance of `InternalResult` */ -function isInternalResult( - eventOrResult: InternalEvent | InternalResult, +export function isInternalResult( + eventOrResult: InternalEvent | InternalResult | PartialResult, ): eventOrResult is InternalResult { return eventOrResult != null && "statusCode" in eventOrResult; } + +/** + * @param eventOrResult + * @returns Whether the event is an instance of `PartialResult` (i.e. for PPR responses) + */ +export function isPartialResult( + eventOrResult: InternalEvent | InternalResult | PartialResult, +): eventOrResult is PartialResult { + return eventOrResult != null && "resumeRequest" in eventOrResult; +} diff --git a/packages/open-next/src/types/open-next.ts b/packages/open-next/src/types/open-next.ts index 9241cd50c..ec709ff0d 100644 --- a/packages/open-next/src/types/open-next.ts +++ b/packages/open-next/src/types/open-next.ts @@ -38,6 +38,24 @@ export type MiddlewareEvent = InternalEvent & { rewriteStatusCode?: number; }; +/** + * This event is returned by the cache interceptor and the routing handler. + * It is then handled by either the external middleware or the classic request handler. + * This is designed for PPR support inside the cache interceptor. + */ +export type PartialResult = { + /** + * Resume request that will be forwarded to the handler + */ + resumeRequest: InternalEvent; + /** + * The result that was generated so far by the cache interceptor + * It contains the first part of the body that we'll need to forward to the client immediately + * As well as the headers and status code + */ + result: InternalResult; +}; + export type InternalResult = { statusCode: number; headers: Record; @@ -192,6 +210,13 @@ export interface RoutingResult { resolvedRoutes: ResolvedRoute[]; // The status code applied to a middleware rewrite rewriteStatusCode?: number; + + /** + * This is the response generated when using PPR in the cache interceptor. + * It contains the initial part of the response that should be sent to the client immediately. + * Can only be present when using cache interception and no external middleware. + */ + initialResponse?: InternalResult; } export interface MiddlewareResult