Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ coverage/
exports*
debug/
logs/*.txt

# AI tooling
.claude/
.wolf/
CLAUDE.md
7 changes: 7 additions & 0 deletions src/scraper/api-version.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/**
* Perplexity versions its REST endpoints via a `?version=X.Y` query param
* (e.g. `/rest/thread/list_ask_threads?version=2.18`). Both library discovery
* and conversation extraction hit `/rest/` endpoints, so they share this default
* to keep the version in one place rather than duplicating the literal.
*/
export const DEFAULT_API_VERSION = '2.18'
243 changes: 140 additions & 103 deletions src/scraper/conversation-extractor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import { z } from 'zod'
import { type Page, type BrowserContext, type Response } from '@playwright/test'
import { logger } from '../utils/logger.js'
import { waitStrategy } from '../utils/wait-strategy.js'
import { ApiDiagnosticsWriter } from '../utils/api-diagnostics.js'
import { type Config } from '../utils/config.js'
import { ApiDiagnosticsWriter } from '../utils/api-diagnostics.js'
import { DEFAULT_API_VERSION } from './api-version.js'

export interface ConversationMessage {
role: 'user' | 'assistant'
Expand Down Expand Up @@ -44,11 +45,25 @@ export class ConversationExtractor {
blocks: z.array(ConversationExtractor.BlockSchema).optional(),
})

/**
* Validates the top-level shape of the `/rest/thread/{id}` HTTP response,
* confirmed against a live 2026 response. The endpoint returns either a bare
* array of entries or an object wrapping them; pagination is signalled by the
* top-level `has_next_page` / `next_cursor` pair. Fields are optional and the
* object is non-strict, so unknown/new keys don't reject an otherwise-valid
* response — shape drift is surfaced via diagnostics, and `EntrySchema`
* remains the per-entry fallback downstream.
*/
private static readonly ApiResponseSchema = z.union([
z.array(ConversationExtractor.EntrySchema),
z.object({
entries: z.array(ConversationExtractor.EntrySchema),
background_entries: z.array(z.unknown()).optional(),
has_next_page: z.boolean().optional(),
next_cursor: z.string().nullable().optional(),
status: z.string().optional(),
thread_metadata: z.unknown().optional(),
// Legacy/list-style pagination — kept for backward compatibility.
collection_info: z
.object({
has_next_page: z.boolean().optional(),
Expand Down Expand Up @@ -106,12 +121,6 @@ export class ConversationExtractor {
}
}

private static readonly TIMEOUT_MAX_MS = 30_000
private static readonly TIMEOUT_MIN_MS = 8_000
private static readonly TIMEOUT_STEP_DOWN_MS = 3_000
private static readonly TIMEOUT_STEP_UP_MS = 1_000

private currentTimeoutMs = ConversationExtractor.TIMEOUT_MAX_MS
private readonly diagnostics: ApiDiagnosticsWriter

constructor(
Expand All @@ -121,21 +130,6 @@ export class ConversationExtractor {
this.diagnostics = new ApiDiagnosticsWriter(config)
}

reduceTimeout(): void {
this.currentTimeoutMs = Math.max(
ConversationExtractor.TIMEOUT_MIN_MS,
this.currentTimeoutMs - ConversationExtractor.TIMEOUT_STEP_DOWN_MS
)
logger.debug(`[extractor] timeout reduced to ${this.currentTimeoutMs}ms`)
}

recoverTimeout(): void {
this.currentTimeoutMs = Math.min(
ConversationExtractor.TIMEOUT_MAX_MS,
this.currentTimeoutMs + ConversationExtractor.TIMEOUT_STEP_UP_MS
)
}

async extract(conversationUrl: string): Promise<ExtractedConversation> {
await this.ensureContextIsAlive()

Expand All @@ -147,16 +141,12 @@ export class ConversationExtractor {
throw new ConversationExtractor.ExtractionError(`Failed to create new page: ${errorMessage}`)
}

const apiResponsePromise = this.captureConversationApiResponse(conversationPage)

try {
await this.navigateToConversationUrl(conversationPage, conversationUrl)
await waitStrategy(this.config).afterScroll(conversationPage)

const capturedApiData = await apiResponsePromise
if (!capturedApiData) {
throw new ConversationExtractor.NoDataError('API response timeout or not found')
}
const conversationId = this.extractIdFromUrl(conversationUrl)
const capturedApiData = await this.fetchThreadData(conversationPage, conversationId)

const extractedConversation = this.parseConversationData(capturedApiData, conversationUrl)
if (!extractedConversation) {
Expand All @@ -176,6 +166,122 @@ export class ConversationExtractor {
}
}

// Safety cap on cursor pagination (~99 entries/page, so ~5k entries).
private static readonly MAX_PAGES = 50

private async fetchThreadData(page: Page, threadId: string): Promise<unknown> {
const firstParsed = await this.fetchThreadPage(page, threadId, null)
const firstObj =
firstParsed && !Array.isArray(firstParsed) ? (firstParsed as Record<string, unknown>) : null

let cursor = typeof firstObj?.next_cursor === 'string' ? firstObj.next_cursor : null

// Fast path: the whole thread fit in one response (the common case).
if (firstObj?.has_next_page !== true || !cursor) {
return firstParsed
}

// Long thread: walk the cursor pages and accumulate entries in API order
// (the endpoint paginates in conversation order — the same order the previous
// response-listener relied on, so no reordering is needed here).
const entries: unknown[] = Array.isArray(firstObj.entries) ? [...firstObj.entries] : []
const backgroundEntries: unknown[] = Array.isArray(firstObj.background_entries)
? [...firstObj.background_entries]
: []
let pageCount = 1

while (cursor && pageCount < ConversationExtractor.MAX_PAGES) {
const parsed = await this.fetchThreadPage(page, threadId, cursor)
const obj = Array.isArray(parsed)
? { entries: parsed as unknown[] }
: (parsed as Record<string, unknown>)

if (Array.isArray(obj.entries)) entries.push(...obj.entries)
if (Array.isArray(obj.background_entries)) backgroundEntries.push(...obj.background_entries)
pageCount++

cursor =
obj.has_next_page === true && typeof obj.next_cursor === 'string' ? obj.next_cursor : null
}

if (cursor) {
logger.warn(
`[extractor] thread ${threadId}: stopped at the ${ConversationExtractor.MAX_PAGES}-page ` +
'cap; export may be truncated'
)
}
logger.debug(
`[extractor] thread ${threadId}: assembled ${entries.length} entries from ${pageCount} pages`
)

// Preserve the first page's top-level metadata, but with the full entry set.
return {
...firstObj,
entries,
background_entries: backgroundEntries,
has_next_page: false,
next_cursor: null,
}
}

/** Fetches and validates a single page of the thread API (cursor = null for page 1). */
private async fetchThreadPage(
page: Page,
threadId: string,
cursor: string | null
): Promise<unknown> {
let apiUrl = `https://www.perplexity.ai/rest/thread/${threadId}?version=${DEFAULT_API_VERSION}&source=default`
if (cursor) apiUrl += `&cursor=${encodeURIComponent(cursor)}`

const raw = await page.evaluate(
async ({ url }: { url: string }) => {
const res = await fetch(url, { credentials: 'include' })
const text = await res.text()
return { status: res.status, body: text }
},
{ url: apiUrl }
)

logger.debug(`[extractor] thread ${threadId}: HTTP ${raw.status}${cursor ? ' (paged)' : ''}`)

if (raw.status === 401 || raw.status === 403) {
throw new ConversationExtractor.AuthError(`Authentication required (${raw.status})`)
}
if (raw.status === 404) {
throw new ConversationExtractor.NotFoundError('Thread not found (404)')
}
if (raw.status >= 500) {
throw new ConversationExtractor.ServerError(`Server error (${raw.status})`)
}
if (raw.status !== 200) {
throw new ConversationExtractor.ExtractionError(`Thread API returned HTTP ${raw.status}`)
}

let parsed: unknown
try {
parsed = JSON.parse(raw.body)
} catch {
throw new ConversationExtractor.ExtractionError(`Thread API: invalid JSON for ${threadId}`)
}

// Validate the HTTP response shape. On drift we record a diagnostic and fall
// through — the per-entry EntrySchema downstream is the resilient fallback,
// so a shape change is surfaced without breaking extraction.
const shapeResult = ConversationExtractor.ApiResponseSchema.safeParse(parsed)
if (!shapeResult.success) {
logger.warn(`[extractor] thread ${threadId}: unexpected response shape (see api-diagnostics)`)
this.diagnostics
.writeFailure({
url: apiUrl,
errorType: 'zod_error',
zodErrorPaths: shapeResult.error.issues.map((issue) => issue.path.join('.')),
})
.catch(() => {})
}

return parsed
}

private async ensureContextIsAlive(): Promise<void> {
if (!this.context) {
throw new ConversationExtractor.ExtractionError('Browser context is missing')
Expand All @@ -187,78 +293,6 @@ export class ConversationExtractor {
}
}

private captureConversationApiResponse(page: Page): Promise<unknown> {
const accumulatedEntries: unknown[] = []
let isRequestResolved = false

return new Promise((resolve) => {
const timeoutId = setTimeout(() => {
if (!isRequestResolved) {
if (accumulatedEntries.length > 0) {
logger.info(
`API response timeout – resolving with ${accumulatedEntries.length} accumulated entries`
)
resolve({ entries: accumulatedEntries })
} else {
logger.warn('API response timeout – resolving with null')
resolve(null)
}
isRequestResolved = true
}
}, this.currentTimeoutMs)

page.on('response', async (response: Response) => {
if (isRequestResolved) return

const responseUrl = response.url()
const isThreadApiRequest = responseUrl.includes('/rest/thread/')
const isListRequest =
responseUrl.includes('list_ask_threads') ||
responseUrl.includes('list_recent') ||
responseUrl.includes('list_pinned')

if (!isThreadApiRequest || isListRequest) return
if (page.isClosed()) return

try {
const jsonResponse = await response.json()
if (isRequestResolved) return

const parseResult = ConversationExtractor.ApiResponseSchema.safeParse(jsonResponse)

if (!parseResult.success) {
this.diagnostics
.writeFailure({
url: response.url(),
errorType: 'zod_error',
zodErrorPaths: parseResult.error.issues.map((issue) => issue.path.join('.')),
})
.catch(() => {})
} else {
const responseData = parseResult.data
const currentEntries = Array.isArray(responseData) ? responseData : responseData.entries
accumulatedEntries.push(...currentEntries)

const hasNextPage =
!Array.isArray(responseData) && responseData.collection_info?.has_next_page === true

if (!hasNextPage) {
clearTimeout(timeoutId)
isRequestResolved = true
resolve({ entries: accumulatedEntries })
} else {
logger.info(
`Captured paginated response, ${accumulatedEntries.length} entries so far...`
)
}
}
} catch (_error) {
// Silent catch for JSON parse errors from non-JSON responses
}
})
})
}

private async navigateToConversationUrl(page: Page, url: string): Promise<void> {
const NAVIGATION_TIMEOUT_MS = 30000
const navigationResponse = await page.goto(url, {
Expand Down Expand Up @@ -369,8 +403,8 @@ export class ConversationExtractor {
if (dataObject && Array.isArray(dataObject.entries)) return dataObject.entries as unknown[]
if (dataObject && (dataObject.query_str || dataObject.blocks)) return [data]

logger.warn(`Unknown API response shape for ${url}`)
this.diagnostics.writeFailure({ url, errorType: 'unknown_shape' }).catch(() => {})

return []
}

Expand All @@ -397,9 +431,12 @@ export class ConversationExtractor {
}

let answer = ''
const seenAnswers = new Set<string>()
for (const block of entry.blocks ?? []) {
if (block.markdown_block?.answer) {
answer += block.markdown_block.answer + '\n\n'
const blockAnswer = block.markdown_block?.answer
if (blockAnswer && !seenAnswers.has(blockAnswer)) {
seenAnswers.add(blockAnswer)
answer += blockAnswer + '\n\n'
}
}

Expand Down
Loading