diff --git a/docs/docs/api/appkit-ui/genie/GenieChatMessageList.mdx b/docs/docs/api/appkit-ui/genie/GenieChatMessageList.mdx index be14d87a..f81df359 100644 --- a/docs/docs/api/appkit-ui/genie/GenieChatMessageList.mdx +++ b/docs/docs/api/appkit-ui/genie/GenieChatMessageList.mdx @@ -18,6 +18,8 @@ Scrollable message list that renders Genie chat messages with auto-scroll, skele | `messages` | `GenieMessageItem[]` | ✓ | - | Array of messages to display | | `status` | `enum` | ✓ | - | Current chat status (controls loading indicators and skeleton placeholders) | | `className` | `string` | | - | Additional CSS class for the scroll area | +| `hasPreviousPage` | `boolean` | | `false` | Whether a previous page of older messages exists | +| `onFetchPreviousPage` | `(() => void)` | | - | Callback to fetch the previous page of messages | diff --git a/packages/appkit-ui/src/react/genie/genie-chat-message-list.tsx b/packages/appkit-ui/src/react/genie/genie-chat-message-list.tsx index 15993356..3261ff99 100644 --- a/packages/appkit-ui/src/react/genie/genie-chat-message-list.tsx +++ b/packages/appkit-ui/src/react/genie/genie-chat-message-list.tsx @@ -1,4 +1,4 @@ -import { useEffect, useRef } from "react"; +import { useEffect, useLayoutEffect, useRef } from "react"; import { cn } from "../lib/utils"; import { ScrollArea } from "../ui/scroll-area"; import { Skeleton } from "../ui/skeleton"; @@ -13,6 +13,10 @@ export interface GenieChatMessageListProps { status: GenieChatStatus; /** Additional CSS class for the scroll area */ className?: string; + /** Whether a previous page of older messages exists */ + hasPreviousPage?: boolean; + /** Callback to fetch the previous page of messages */ + onFetchPreviousPage?: () => void; } const STATUS_LABELS: Record = { @@ -26,17 +30,119 @@ function formatStatus(status: string): string { return STATUS_LABELS[status] ?? status.replace(/_/g, " ").toLowerCase(); } -function StreamingIndicator({ messages }: { messages: GenieMessageItem[] }) { - const last = messages[messages.length - 1]; - if (last?.role === "assistant" && last.id === "") { - return ( -
- - {formatStatus(last.status)} -
+function getViewport(scrollRef: React.RefObject) { + return scrollRef.current?.querySelector( + '[data-slot="scroll-area-viewport"]', + ); +} + +/** + * Manages scroll position: scrolls to bottom on append/initial load, + * preserves position when older messages are prepended. + */ +function useScrollManagement( + scrollRef: React.RefObject, + messages: GenieMessageItem[], + status: GenieChatStatus, +) { + const prevFirstMessageIdRef = useRef(null); + const prevScrollHeightRef = useRef(0); + const prevMessageCountRef = useRef(0); + + // Keep prevScrollHeightRef fresh when async content (images, embeds) + // changes the viewport height between renders. + useEffect(() => { + const viewport = getViewport(scrollRef); + if (!viewport) return; + + const observer = new ResizeObserver(() => { + prevScrollHeightRef.current = viewport.scrollHeight; + }); + observer.observe(viewport); + return () => observer.disconnect(); + }, [scrollRef]); + + // biome-ignore lint/correctness/useExhaustiveDependencies: react to message count AND status so prevScrollHeightRef stays accurate when the loading indicator appears/disappears + useLayoutEffect(() => { + const viewport = getViewport(scrollRef); + if (!viewport) return; + + const count = messages.length; + const countChanged = count !== prevMessageCountRef.current; + prevMessageCountRef.current = count; + + // Nothing to do if message count didn't change (e.g. status-only transition) + if (!countChanged) { + prevScrollHeightRef.current = viewport.scrollHeight; + return; + } + + const firstMessageId = messages[0]?.id ?? null; + const wasPrepend = + prevFirstMessageIdRef.current !== null && + firstMessageId !== prevFirstMessageIdRef.current; + + if (wasPrepend && prevScrollHeightRef.current > 0) { + // Older messages prepended — preserve scroll position + const delta = viewport.scrollHeight - prevScrollHeightRef.current; + viewport.scrollTop += delta; + } else { + // Messages appended or initial load — scroll to bottom + viewport.scrollTop = viewport.scrollHeight; + } + + prevFirstMessageIdRef.current = firstMessageId; + prevScrollHeightRef.current = viewport.scrollHeight; + }, [messages.length, status]); +} + +/** + * Observes a sentinel element at the top of the scroll area and triggers + * `onFetchPreviousPage` when the user scrolls to the top (only if content overflows). + * Returns a ref to attach to the sentinel element. + */ +function useLoadOlderOnScroll( + scrollRef: React.RefObject, + shouldObserve: boolean, + onFetchPreviousPage?: () => void, +) { + const sentinelRef = useRef(null); + const onFetchPreviousPageRef = useRef(onFetchPreviousPage); + onFetchPreviousPageRef.current = onFetchPreviousPage; + + useEffect(() => { + const sentinel = sentinelRef.current; + const viewport = getViewport(scrollRef); + if (!sentinel || !viewport || !shouldObserve) return; + + // The observer fires synchronously on observe() if the sentinel is + // already visible. We arm it on the next frame so that synchronous + // initial fire is ignored, but a real intersection (user genuinely + // at the top on a short conversation) triggers on subsequent frames. + let armed = false; + const frameId = requestAnimationFrame(() => { + armed = true; + }); + + const observer = new IntersectionObserver( + (entries) => { + if (!armed) return; + const isScrollable = viewport.scrollHeight > viewport.clientHeight; + if (entries[0]?.isIntersecting && isScrollable) { + onFetchPreviousPageRef.current?.(); + } + }, + { root: viewport, threshold: 0 }, ); - } - return null; + + observer.observe(sentinel); + return () => { + cancelAnimationFrame(frameId); + observer.disconnect(); + }; + }, [scrollRef, shouldObserve]); + + return sentinelRef; } /** Scrollable message list that renders Genie chat messages with auto-scroll, skeleton loaders, and a streaming indicator. */ @@ -44,23 +150,38 @@ export function GenieChatMessageList({ messages, status, className, + hasPreviousPage = false, + onFetchPreviousPage, }: GenieChatMessageListProps) { const scrollRef = useRef(null); - // Scroll only the ScrollArea viewport, not the page - // biome-ignore lint/correctness/useExhaustiveDependencies: intentional triggers for auto-scroll - useEffect(() => { - const viewport = scrollRef.current?.querySelector( - '[data-slot="scroll-area-viewport"]', - ); - if (viewport) { - viewport.scrollTop = viewport.scrollHeight; - } - }, [messages.length, status]); + const sentinelRef = useLoadOlderOnScroll( + scrollRef, + hasPreviousPage && status !== "loading-older", + onFetchPreviousPage, + ); + useScrollManagement(scrollRef, messages, status); + + const lastMessage = messages[messages.length - 1]; + const showStreamingIndicator = + status === "streaming" && + lastMessage?.role === "assistant" && + lastMessage.id === ""; return (
+ {hasPreviousPage &&
} + + {status === "loading-older" && ( +
+ + + Loading older messages... + +
+ )} + {status === "loading-history" && messages.length === 0 && (
@@ -69,15 +190,19 @@ export function GenieChatMessageList({
)} - {messages.map((msg) => { - if (msg.role === "assistant" && msg.id === "" && !msg.content) { - return null; - } - return ; - })} + {messages + .filter( + (msg) => msg.role !== "assistant" || msg.id !== "" || msg.content, + ) + .map((msg) => ( + + ))} - {status === "streaming" && messages.length > 0 && ( - + {showStreamingIndicator && ( +
+ + {formatStatus(lastMessage.status)} +
)} {messages.length === 0 && status === "idle" && ( diff --git a/packages/appkit-ui/src/react/genie/genie-chat.tsx b/packages/appkit-ui/src/react/genie/genie-chat.tsx index d3d2b3d6..32bb1139 100644 --- a/packages/appkit-ui/src/react/genie/genie-chat.tsx +++ b/packages/appkit-ui/src/react/genie/genie-chat.tsx @@ -12,7 +12,15 @@ export function GenieChat({ placeholder, className, }: GenieChatProps) { - const { messages, status, error, sendMessage, reset } = useGenieChat({ + const { + messages, + status, + error, + sendMessage, + reset, + hasPreviousPage, + fetchPreviousPage, + } = useGenieChat({ alias, basePath, }); @@ -32,7 +40,12 @@ export function GenieChat({
)} - + {error && (
diff --git a/packages/appkit-ui/src/react/genie/types.ts b/packages/appkit-ui/src/react/genie/types.ts index 917c1c95..74a2e0e9 100644 --- a/packages/appkit-ui/src/react/genie/types.ts +++ b/packages/appkit-ui/src/react/genie/types.ts @@ -9,6 +9,7 @@ export type { export type GenieChatStatus = | "idle" | "loading-history" + | "loading-older" | "streaming" | "error"; @@ -40,6 +41,12 @@ export interface UseGenieChatReturn { error: string | null; sendMessage: (content: string) => void; reset: () => void; + /** Whether a previous page of older messages exists */ + hasPreviousPage: boolean; + /** Whether a previous page is currently being fetched */ + isFetchingPreviousPage: boolean; + /** Fetch the previous page of older messages */ + fetchPreviousPage: () => void; } export interface GenieChatProps { diff --git a/packages/appkit-ui/src/react/genie/use-genie-chat.ts b/packages/appkit-ui/src/react/genie/use-genie-chat.ts index 3b2e62a6..a54082ae 100644 --- a/packages/appkit-ui/src/react/genie/use-genie-chat.ts +++ b/packages/appkit-ui/src/react/genie/use-genie-chat.ts @@ -73,6 +73,71 @@ function messageResultToItems(msg: GenieMessageResponse): GenieMessageItem[] { return [makeUserItem(msg, "-user"), makeAssistantItem(msg)]; } +/** + * Streams a conversation page via SSE. Collects message items and query + * results into a buffer and returns them when the stream completes. + */ +function fetchConversationPage( + basePath: string, + alias: string, + convId: string, + options: { + pageToken?: string; + signal?: AbortSignal; + onPaginationInfo?: (nextPageToken: string | null) => void; + onError?: (error: string) => void; + onConnectionError?: (err: unknown) => void; + }, +): Promise { + const params = new URLSearchParams({ + requestId: crypto.randomUUID(), + }); + if (options.pageToken) { + params.set("pageToken", options.pageToken); + } + + const items: GenieMessageItem[] = []; + return connectSSE({ + url: `${basePath}/${encodeURIComponent(alias)}/conversations/${encodeURIComponent(convId)}?${params}`, + signal: options.signal, + onMessage: async (message) => { + try { + const event = JSON.parse(message.data) as GenieStreamEvent; + switch (event.type) { + case "message_result": + items.push(...messageResultToItems(event.message)); + break; + case "query_result": + for (let i = items.length - 1; i >= 0; i--) { + const item = items[i]; + if ( + item.attachments.some( + (a) => a.attachmentId === event.attachmentId, + ) + ) { + item.queryResults.set(event.attachmentId, event.data); + break; + } + } + break; + case "history_info": + options.onPaginationInfo?.(event.nextPageToken); + break; + case "error": + options.onError?.(event.error); + break; + } + } catch { + // Malformed SSE data + } + }, + onError: (err) => options.onConnectionError?.(err), + }).then(() => items); +} + +/** Minimum time (ms) to hold the loading-older state so scroll inertia settles before prepending messages. */ +const MIN_PREVIOUS_PAGE_LOAD_MS = 800; + /** * Manages the full Genie chat lifecycle: * SSE streaming, conversation persistence via URL, and history replay. @@ -94,16 +159,25 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { const [status, setStatus] = useState("idle"); const [conversationId, setConversationId] = useState(null); const [error, setError] = useState(null); + const [nextPageToken, setNextPageToken] = useState(null); + + const hasPreviousPage = nextPageToken !== null; + const isFetchingPreviousPage = status === "loading-older"; const abortControllerRef = useRef(null); + const paginationAbortRef = useRef(null); const conversationIdRef = useRef(null); + const nextPageTokenRef = useRef(null); + const isLoadingOlderRef = useRef(false); useEffect(() => { conversationIdRef.current = conversationId; - }, [conversationId]); + nextPageTokenRef.current = nextPageToken; + }, [conversationId, nextPageToken]); - const processEvent = useCallback( - (event: GenieStreamEvent, isHistory: boolean) => { + /** Process SSE events during live message streaming (sendMessage). */ + const processStreamEvent = useCallback( + (event: GenieStreamEvent) => { switch (event.type) { case "message_start": { setConversationId(event.conversationId); @@ -128,10 +202,7 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { const msg = event.message; const hasAttachments = (msg.attachments?.length ?? 0) > 0; - if (isHistory) { - const items = messageResultToItems(msg); - setMessages((prev) => [...prev, ...items]); - } else if (hasAttachments) { + if (hasAttachments) { // During streaming we already appended the user message locally, // so only handle assistant results. Messages without attachments // are the user-message echo from the API — skip those. @@ -149,21 +220,26 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { case "query_result": { setMessages((prev) => { - const updated = [...prev]; - for (let i = updated.length - 1; i >= 0; i--) { - const msg = updated[i]; + // Reverse scan — query results typically match recent messages + for (let i = prev.length - 1; i >= 0; i--) { + const msg = prev[i]; if ( msg.attachments.some( (a) => a.attachmentId === event.attachmentId, ) ) { - const queryResults = new Map(msg.queryResults); - queryResults.set(event.attachmentId, event.data); - updated[i] = { ...msg, queryResults }; - break; + const updated = prev.slice(); + updated[i] = { + ...msg, + queryResults: new Map(msg.queryResults).set( + event.attachmentId, + event.data, + ), + }; + return updated; } } - return updated; + return prev; }); break; } @@ -184,6 +260,7 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { if (!trimmed) return; abortControllerRef.current?.abort(); + paginationAbortRef.current?.abort(); setError(null); setStatus("streaming"); @@ -221,7 +298,7 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { signal: abortController.signal, onMessage: async (message) => { try { - processEvent(JSON.parse(message.data) as GenieStreamEvent, false); + processStreamEvent(JSON.parse(message.data) as GenieStreamEvent); } catch { // Malformed SSE data } @@ -247,56 +324,118 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { } }); }, - [alias, basePath, processEvent], + [alias, basePath, processStreamEvent], ); - const loadHistory = useCallback( - (convId: string) => { - abortControllerRef.current?.abort(); - setStatus("loading-history"); - setError(null); - setMessages([]); - setConversationId(convId); - + /** Creates an AbortController, stores it in the given ref, and fetches a conversation page. */ + const fetchPage = useCallback( + ( + controllerRef: { current: AbortController | null }, + convId: string, + options?: { pageToken?: string; errorMessage?: string }, + ) => { + controllerRef.current?.abort(); const abortController = new AbortController(); - abortControllerRef.current = abortController; + controllerRef.current = abortController; - const requestId = crypto.randomUUID(); - - connectSSE({ - url: `${basePath}/${encodeURIComponent(alias)}/conversations/${encodeURIComponent(convId)}?requestId=${encodeURIComponent(requestId)}`, + const promise = fetchConversationPage(basePath, alias, convId, { + pageToken: options?.pageToken, signal: abortController.signal, - onMessage: async (message) => { - try { - processEvent(JSON.parse(message.data) as GenieStreamEvent, true); - } catch { - // Malformed SSE data - } + onPaginationInfo: setNextPageToken, + onError: (msg) => { + setError(msg); + setStatus("error"); }, - onError: (err) => { + onConnectionError: (err) => { if (abortController.signal.aborted) return; setError( err instanceof Error ? err.message - : "Failed to load conversation history.", + : (options?.errorMessage ?? "Failed to load messages."), ); setStatus("error"); }, - }).then(() => { + }); + + return { promise, abortController }; + }, + [alias, basePath], + ); + + const loadHistory = useCallback( + (convId: string) => { + paginationAbortRef.current?.abort(); + setStatus("loading-history"); + setError(null); + setMessages([]); + setConversationId(convId); + + const { promise, abortController } = fetchPage( + abortControllerRef, + convId, + { errorMessage: "Failed to load conversation history." }, + ); + promise.then((items) => { if (!abortController.signal.aborted) { + setMessages(items); setStatus((prev) => (prev === "error" ? "error" : "idle")); } }); }, - [alias, basePath, processEvent], + [fetchPage], ); + const fetchPreviousPage = useCallback(() => { + if ( + !nextPageTokenRef.current || + !conversationIdRef.current || + isLoadingOlderRef.current + ) + return; + + isLoadingOlderRef.current = true; + setStatus("loading-older"); + setError(null); + + const startTime = Date.now(); + const { promise, abortController } = fetchPage( + paginationAbortRef, + conversationIdRef.current, + { + pageToken: nextPageTokenRef.current, + errorMessage: "Failed to load older messages.", + }, + ); + promise + .then(async (items) => { + if (abortController.signal.aborted) return; + const elapsed = Date.now() - startTime; + if (elapsed < MIN_PREVIOUS_PAGE_LOAD_MS) { + await new Promise((r) => + setTimeout(r, MIN_PREVIOUS_PAGE_LOAD_MS - elapsed), + ); + } + if (abortController.signal.aborted) return; + if (items.length > 0) { + setMessages((prev) => [...items, ...prev]); + } + setStatus((current) => + current === "loading-older" ? "idle" : current, + ); + }) + .finally(() => { + isLoadingOlderRef.current = false; + }); + }, [fetchPage]); + const reset = useCallback(() => { abortControllerRef.current?.abort(); + paginationAbortRef.current?.abort(); setMessages([]); setConversationId(null); setError(null); setStatus("idle"); + setNextPageToken(null); if (persistInUrl) { removeUrlParam(urlParamName); } @@ -310,8 +449,19 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { } return () => { abortControllerRef.current?.abort(); + paginationAbortRef.current?.abort(); }; }, [persistInUrl, urlParamName, loadHistory]); - return { messages, status, conversationId, error, sendMessage, reset }; + return { + messages, + status, + conversationId, + error, + sendMessage, + reset, + hasPreviousPage, + isFetchingPreviousPage, + fetchPreviousPage, + }; } diff --git a/packages/appkit/src/connectors/genie/client.ts b/packages/appkit/src/connectors/genie/client.ts index 99636517..3028791d 100644 --- a/packages/appkit/src/connectors/genie/client.ts +++ b/packages/appkit/src/connectors/genie/client.ts @@ -111,28 +111,27 @@ export class GenieConnector { workspaceClient: WorkspaceClient, spaceId: string, conversationId: string, - options?: { maxMessages?: number }, - ): Promise { - const maxMessages = options?.maxMessages ?? this.config.maxMessages; - const allMessages: GenieMessage[] = []; - let pageToken: string | undefined; - - do { - const response = await workspaceClient.genie.listConversationMessages({ - space_id: spaceId, - conversation_id: conversationId, - page_size: genieConnectorDefaults.pageSize, - ...(pageToken ? { page_token: pageToken } : {}), - }); + options?: { pageSize?: number; pageToken?: string }, + ): Promise<{ + messages: GenieMessageResponse[]; + nextPageToken: string | null; + }> { + const pageSize = + options?.pageSize ?? genieConnectorDefaults.initialPageSize; - if (response.messages) { - allMessages.push(...response.messages); - } + const response = await workspaceClient.genie.listConversationMessages({ + space_id: spaceId, + conversation_id: conversationId, + page_size: pageSize, + ...(options?.pageToken ? { page_token: options.pageToken } : {}), + }); - pageToken = response.next_page_token; - } while (pageToken && allMessages.length < maxMessages); + const messages = (response.messages ?? []).reverse().map(toMessageResponse); - return allMessages.slice(0, maxMessages).reverse().map(toMessageResponse); + return { + messages, + nextPageToken: response.next_page_token ?? null, + }; } async getMessageAttachmentQueryResult( @@ -258,21 +257,35 @@ export class GenieConnector { workspaceClient: WorkspaceClient, spaceId: string, conversationId: string, - options?: { includeQueryResults?: boolean }, + options?: { + includeQueryResults?: boolean; + pageSize?: number; + pageToken?: string; + }, ): AsyncGenerator { const includeQueryResults = options?.includeQueryResults !== false; try { - const messageResponses = await this.listConversationMessages( - workspaceClient, - spaceId, - conversationId, - ); + const { messages: messageResponses, nextPageToken } = + await this.listConversationMessages( + workspaceClient, + spaceId, + conversationId, + { pageSize: options?.pageSize, pageToken: options?.pageToken }, + ); for (const messageResponse of messageResponses) { yield { type: "message_result", message: messageResponse }; } + yield { + type: "history_info", + conversationId, + spaceId, + nextPageToken, + loadedCount: messageResponses.length, + }; + if (includeQueryResults) { const queryAttachments: Array<{ messageId: string; @@ -367,15 +380,27 @@ export class GenieConnector { spaceId: string, conversationId: string, ): Promise { - const messages = await this.listConversationMessages( - workspaceClient, - spaceId, - conversationId, - ); + const allMessages: GenieMessageResponse[] = []; + let pageToken: string | undefined; + + do { + const { messages, nextPageToken } = await this.listConversationMessages( + workspaceClient, + spaceId, + conversationId, + { + pageSize: genieConnectorDefaults.pageSize, + pageToken, + }, + ); + allMessages.push(...messages); + pageToken = nextPageToken ?? undefined; + } while (pageToken && allMessages.length < this.config.maxMessages); + return { conversationId, spaceId, - messages, + messages: allMessages.slice(0, this.config.maxMessages), }; } } diff --git a/packages/appkit/src/connectors/genie/defaults.ts b/packages/appkit/src/connectors/genie/defaults.ts index a86172da..c53c0479 100644 --- a/packages/appkit/src/connectors/genie/defaults.ts +++ b/packages/appkit/src/connectors/genie/defaults.ts @@ -5,4 +5,6 @@ export const genieConnectorDefaults = { maxMessages: 200, /** Default page size for listConversationMessages. */ pageSize: 100, + /** Default page size for initial conversation load (lazy loading). */ + initialPageSize: 20, } as const; diff --git a/packages/appkit/src/plugins/genie/genie.ts b/packages/appkit/src/plugins/genie/genie.ts index 1656a8ea..a3c301b4 100644 --- a/packages/appkit/src/plugins/genie/genie.ts +++ b/packages/appkit/src/plugins/genie/genie.ts @@ -95,7 +95,9 @@ export class GeniePlugin extends Plugin { ); const timeout = this.config.timeout ?? 120_000; - const requestId = (req.query.requestId as string) || randomUUID(); + const requestId = + (typeof req.query.requestId === "string" && req.query.requestId) || + randomUUID(); const streamSettings: StreamExecutionSettings = { ...genieStreamDefaults, @@ -138,14 +140,19 @@ export class GeniePlugin extends Plugin { } const includeQueryResults = req.query.includeQueryResults !== "false"; - const requestId = (req.query.requestId as string) || randomUUID(); + const pageToken = + typeof req.query.pageToken === "string" ? req.query.pageToken : undefined; + const requestId = + (typeof req.query.requestId === "string" && req.query.requestId) || + randomUUID(); logger.debug( - "Fetching conversation %s from space %s (alias=%s, includeQueryResults=%s)", + "Fetching conversation %s from space %s (alias=%s, includeQueryResults=%s, pageToken=%s)", conversationId, spaceId, alias, includeQueryResults, + pageToken ?? "none", ); const streamSettings: StreamExecutionSettings = { @@ -165,7 +172,7 @@ export class GeniePlugin extends Plugin { workspaceClient, spaceId, conversationId, - { includeQueryResults }, + { includeQueryResults, pageToken }, ), streamSettings, ); diff --git a/packages/appkit/src/plugins/genie/tests/genie.test.ts b/packages/appkit/src/plugins/genie/tests/genie.test.ts index 58acfcfe..36d0d921 100644 --- a/packages/appkit/src/plugins/genie/tests/genie.test.ts +++ b/packages/appkit/src/plugins/genie/tests/genie.test.ts @@ -6,6 +6,7 @@ import { setupDatabricksEnv, } from "@tools/test-helpers"; import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; +import { genieConnectorDefaults } from "../../../connectors/genie/defaults"; import { ServiceContext } from "../../../context/service-context"; import { Plugin } from "../../../plugin"; import { GeniePlugin, genie } from "../genie"; @@ -584,7 +585,7 @@ describe("Genie Plugin", () => { expect.objectContaining({ space_id: "test-space-id", conversation_id: "conv-123", - page_size: 100, + page_size: genieConnectorDefaults.initialPageSize, }), ); @@ -597,6 +598,9 @@ describe("Genie Plugin", () => { ).length; expect(messageResultCount).toBe(2); + // Should have history_info event + expect(allWritten).toContain("history_info"); + // Should contain message content expect(allWritten).toContain("What are the top customers?"); expect(allWritten).toContain("Here are the results"); @@ -707,34 +711,20 @@ describe("Genie Plugin", () => { expect(mockRes.end).toHaveBeenCalled(); }); - test("should paginate through all messages", async () => { - mockGenieService.listConversationMessages - .mockResolvedValueOnce({ - messages: [ - { - message_id: "msg-1", - conversation_id: "conv-123", - space_id: "test-space-id", - content: "Page 1 message", - status: "COMPLETED", - attachments: [], - }, - ], - next_page_token: "page-2-token", - }) - .mockResolvedValueOnce({ - messages: [ - { - message_id: "msg-2", - conversation_id: "conv-123", - space_id: "test-space-id", - content: "Page 2 message", - status: "COMPLETED", - attachments: [], - }, - ], - next_page_token: undefined, - }); + test("should fetch only one page and emit history_info with nextPageToken", async () => { + mockGenieService.listConversationMessages.mockResolvedValueOnce({ + messages: [ + { + message_id: "msg-1", + conversation_id: "conv-123", + space_id: "test-space-id", + content: "Most recent message", + status: "COMPLETED", + attachments: [], + }, + ], + next_page_token: "page-2-token", + }); const plugin = new GeniePlugin(config); const { router, getHandler } = createMockRouter(); @@ -752,36 +742,63 @@ describe("Genie Plugin", () => { await handler(mockReq, mockRes); + // Should only fetch one page (lazy loading) expect(mockGenieService.listConversationMessages).toHaveBeenCalledTimes( - 2, + 1, ); - // First call without page_token - expect(mockGenieService.listConversationMessages).toHaveBeenNthCalledWith( - 1, + expect(mockGenieService.listConversationMessages).toHaveBeenCalledWith( expect.objectContaining({ space_id: "test-space-id", conversation_id: "conv-123", - page_size: 100, + page_size: genieConnectorDefaults.initialPageSize, }), ); - // Second call with page_token - expect(mockGenieService.listConversationMessages).toHaveBeenNthCalledWith( - 2, - expect.objectContaining({ - space_id: "test-space-id", + const writeCalls = mockRes.write.mock.calls.map((call: any[]) => call[0]); + const allWritten = writeCalls.join(""); + + expect(allWritten).toContain("Most recent message"); + // history_info should contain the nextPageToken + expect(allWritten).toContain("history_info"); + expect(allWritten).toContain("page-2-token"); + expect(mockRes.end).toHaveBeenCalled(); + }); + + test("should emit history_info with null nextPageToken when no more pages", async () => { + mockMessages([ + { + message_id: "msg-1", conversation_id: "conv-123", - page_size: 100, - page_token: "page-2-token", - }), + space_id: "test-space-id", + content: "Only message", + status: "COMPLETED", + attachments: [], + }, + ]); + + const plugin = new GeniePlugin(config); + const { router, getHandler } = createMockRouter(); + + plugin.injectRoutes(router); + + const handler = getHandler( + "GET", + "/:alias/conversations/:conversationId", ); + const mockReq = createConversationRequest({ + query: { includeQueryResults: "false" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); const writeCalls = mockRes.write.mock.calls.map((call: any[]) => call[0]); const allWritten = writeCalls.join(""); - expect(allWritten).toContain("Page 1 message"); - expect(allWritten).toContain("Page 2 message"); + expect(allWritten).toContain("history_info"); + // nextPageToken should be null + expect(allWritten).toContain('"nextPageToken":null'); expect(mockRes.end).toHaveBeenCalled(); }); @@ -982,6 +999,95 @@ describe("Genie Plugin", () => { }); }); + describe("getConversation with pageToken", () => { + test("should pass pageToken through to streamConversation", async () => { + mockGenieService.listConversationMessages.mockResolvedValueOnce({ + messages: [ + { + message_id: "msg-old-1", + conversation_id: "conv-123", + space_id: "test-space-id", + content: "Older message", + status: "COMPLETED", + attachments: [], + }, + ], + next_page_token: "next-token-abc", + }); + + const plugin = new GeniePlugin(config); + const { router, getHandler } = createMockRouter(); + + plugin.injectRoutes(router); + + const handler = getHandler( + "GET", + "/:alias/conversations/:conversationId", + ); + const mockReq = createMockRequest({ + params: { alias: "myspace", conversationId: "conv-123" }, + query: { pageToken: "some-page-token" }, + headers: { + "x-forwarded-access-token": "user-token", + "x-forwarded-user": "user-1", + }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(mockGenieService.listConversationMessages).toHaveBeenCalledWith( + expect.objectContaining({ + space_id: "test-space-id", + conversation_id: "conv-123", + page_token: "some-page-token", + }), + ); + + const writeCalls = mockRes.write.mock.calls.map((call: any[]) => call[0]); + const allWritten = writeCalls.join(""); + + expect(allWritten).toContain("Older message"); + expect(allWritten).toContain("history_info"); + expect(allWritten).toContain("next-token-abc"); + expect(mockRes.end).toHaveBeenCalled(); + }); + + test("should yield error event when paginated request fails", async () => { + mockGenieService.listConversationMessages.mockRejectedValue( + new Error("Page token expired"), + ); + + const plugin = new GeniePlugin(config); + const { router, getHandler } = createMockRouter(); + + plugin.injectRoutes(router); + + const handler = getHandler( + "GET", + "/:alias/conversations/:conversationId", + ); + const mockReq = createMockRequest({ + params: { alias: "myspace", conversationId: "conv-123" }, + query: { pageToken: "expired-token" }, + headers: { + "x-forwarded-access-token": "user-token", + "x-forwarded-user": "user-1", + }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + const writeCalls = mockRes.write.mock.calls.map((call: any[]) => call[0]); + const allWritten = writeCalls.join(""); + + expect(allWritten).toContain("error"); + expect(allWritten).toContain("Page token expired"); + expect(mockRes.end).toHaveBeenCalled(); + }); + }); + describe("SSE reconnection streamId", () => { let executeStreamSpy: ReturnType; diff --git a/packages/shared/src/genie.ts b/packages/shared/src/genie.ts index b8f3970f..b8021ab1 100644 --- a/packages/shared/src/genie.ts +++ b/packages/shared/src/genie.ts @@ -14,7 +14,16 @@ export type GenieStreamEvent = statementId: string; data: unknown; } - | { type: "error"; error: string }; + | { type: "error"; error: string } + | { + type: "history_info"; + conversationId: string; + spaceId: string; + /** Opaque token to fetch the next (older) page. Null means no more pages. */ + nextPageToken: string | null; + /** Total messages returned in this initial load */ + loadedCount: number; + }; /** Cleaned response — subset of SDK GenieMessage */ export interface GenieMessageResponse {