From 02efe182192b89ebc5708c9ac66f92b9b5f94717 Mon Sep 17 00:00:00 2001 From: Sylvain Lesage Date: Tue, 29 Jul 2025 18:21:30 -0400 Subject: [PATCH 1/8] update dev dependencies --- package.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index 243db5ce..ddf78d6d 100644 --- a/package.json +++ b/package.json @@ -67,8 +67,8 @@ "@storybook/react-vite": "9.0.18", "@testing-library/react": "16.3.0", "@types/node": "24.1.0", - "@types/react": "19.1.8", - "@types/react-dom": "19.1.6", + "@types/react": "19.1.9", + "@types/react-dom": "19.1.7", "@vitejs/plugin-react": "4.7.0", "@vitest/coverage-v8": "3.2.4", "eslint": "9.32.0", From d1d943fcb7a1315021bbeccb27f0264d2b265b03 Mon Sep 17 00:00:00 2001 From: Sylvain Lesage Date: Tue, 29 Jul 2025 20:32:40 -0400 Subject: [PATCH 2/8] upgrade hightable --- package.json | 2 +- src/components/AvroView/AvroView.tsx | 4 +- src/components/Cell/Cell.tsx | 16 +- src/components/CellPanel/CellPanel.tsx | 18 +- src/components/File/File.tsx | 11 +- src/components/ImageView/ImageView.tsx | 4 +- src/components/JsonView/JsonView.tsx | 4 +- src/components/MarkdownView/MarkdownView.tsx | 4 +- src/components/ParquetView/ParquetView.tsx | 9 +- src/components/TextView/TextView.tsx | 4 +- src/components/Viewer/Viewer.tsx | 2 +- src/lib/index.ts | 2 +- src/lib/tableProvider.ts | 277 ++++++++----------- src/lib/utils.ts | 9 + src/lib/workers/parquetWorker.ts | 67 +---- src/lib/workers/parquetWorkerClient.ts | 58 +--- src/lib/workers/types.ts | 51 +--- 17 files changed, 199 insertions(+), 343 deletions(-) diff --git a/package.json b/package.json index ddf78d6d..a80e73e0 100644 --- a/package.json +++ b/package.json @@ -55,7 +55,7 @@ "watch:url": "NODE_ENV=development nodemon bin/cli.js https://hyperparam.blob.core.windows.net/hyperparam/starcoderdata-js-00000-of-00065.parquet" }, "dependencies": { - "hightable": "0.17.2", + "hightable": "0.18.1", "hyparquet": "1.17.1", "hyparquet-compressors": "1.1.1", "icebird": "0.3.0", diff --git a/src/components/AvroView/AvroView.tsx b/src/components/AvroView/AvroView.tsx index 71ea3d7f..03b53892 100644 --- a/src/components/AvroView/AvroView.tsx +++ b/src/components/AvroView/AvroView.tsx @@ -8,7 +8,7 @@ import styles from '../Json/Json.module.css' interface ViewerProps { source: FileSource - setError: (error: Error | undefined) => void + setError: (error: unknown) => void } /** @@ -43,7 +43,7 @@ export default function AvroView({ source, setError }: ViewerProps) { setContent({ fileSize }) setJson(json) } catch (error) { - setError(error as Error) + setError(error) } finally { setIsLoading(false) } diff --git a/src/components/Cell/Cell.tsx b/src/components/Cell/Cell.tsx index 0f31285e..ae5e9608 100644 --- a/src/components/Cell/Cell.tsx +++ b/src/components/Cell/Cell.tsx @@ -39,21 +39,17 @@ export default function CellView({ source, row, col }: CellProps) { const metadata = await parquetMetadataAsync(asyncBuffer) setProgress(0.75) const df = parquetDataFrame(from, metadata) - const asyncRows = df.rows({ start: row, end: row + 1 }) - if (asyncRows.length > 1 || !(0 in asyncRows)) { - throw new Error(`Expected 1 row, got ${asyncRows.length}`) - } - const asyncRow = asyncRows[0] - // Await cell data + const columnName = df.header[col] if (columnName === undefined) { throw new Error(`Column name missing at index col=${col}`) } - const asyncCell = asyncRow.cells[columnName] - if (asyncCell === undefined) { - throw new Error(`Cell missing at column ${columnName}`) + await df.fetch({ rowStart: row, rowEnd: row + 1, columns: [columnName] }) + const cell = df.getCell({ row, column: columnName }) + if (cell === undefined) { + throw new Error(`Cell at row=${row}, col=${col} is undefined`) } - const text = await asyncCell.then(stringify) + const text = stringify(cell.value) setText(text) setError(undefined) } catch (error) { diff --git a/src/components/CellPanel/CellPanel.tsx b/src/components/CellPanel/CellPanel.tsx index a91006d4..0108468e 100644 --- a/src/components/CellPanel/CellPanel.tsx +++ b/src/components/CellPanel/CellPanel.tsx @@ -4,9 +4,9 @@ import { useConfig } from '../../hooks/useConfig.js' import { cn } from '../../lib/utils.js' import ContentWrapper from '../ContentWrapper/ContentWrapper.js' import Json from '../Json/Json.js' +import jsonStyles from '../Json/Json.module.css' import SlideCloseButton from '../SlideCloseButton/SlideCloseButton.js' import styles from '../TextView/TextView.module.css' -import jsonStyles from '../Json/Json.module.css' interface ViewerProps { df: DataFrame @@ -29,21 +29,17 @@ export default function CellPanel({ df, row, col, setProgress, setError, onClose async function loadCellData() { try { setProgress(0.5) - const asyncRows = df.rows({ start: row, end: row + 1 }) - if (asyncRows.length > 1 || !(0 in asyncRows)) { - throw new Error(`Expected 1 row, got ${asyncRows.length}`) - } - const asyncRow = asyncRows[0] - // Await cell data + const columnName = df.header[col] if (columnName === undefined) { throw new Error(`Column name missing at index col=${col}`) } - const asyncCell = asyncRow.cells[columnName] - if (asyncCell === undefined) { - throw new Error(`Cell missing at column ${columnName}`) + await df.fetch({ rowStart: row, rowEnd: row + 1, columns: [columnName] }) + const cell = df.getCell({ row, column: columnName }) + if (cell === undefined) { + throw new Error(`Cell at row=${row}, col=${col} is undefined`) } - const value: unknown = await asyncCell + const value: unknown = await cell.value if (value instanceof Object && !(value instanceof Date)) { setContent( diff --git a/src/components/File/File.tsx b/src/components/File/File.tsx index d8c10f03..6078e03e 100644 --- a/src/components/File/File.tsx +++ b/src/components/File/File.tsx @@ -1,5 +1,6 @@ -import { useState } from 'react' +import { useCallback, useState } from 'react' import type { FileSource } from '../../lib/sources/types.js' +import { toError } from '../../lib/utils.js' import Breadcrumb from '../Breadcrumb/Breadcrumb.js' import Layout from '../Layout/Layout.js' import Viewer from '../Viewer/Viewer.js' @@ -13,10 +14,14 @@ interface FileProps { */ export default function File({ source }: FileProps) { const [progress, setProgress] = useState() - const [error, setError] = useState() + const [error, setError] = useState() + + const setErrorWrapper = useCallback((error: unknown) => { + setError(toError(error)) + }, [setError]) return - + } diff --git a/src/components/ImageView/ImageView.tsx b/src/components/ImageView/ImageView.tsx index 0e81295a..2f49303e 100644 --- a/src/components/ImageView/ImageView.tsx +++ b/src/components/ImageView/ImageView.tsx @@ -7,7 +7,7 @@ import styles from './ImageView.module.css' interface ViewerProps { source: FileSource - setError: (error: Error | undefined) => void + setError: (error: unknown) => void } interface Content { @@ -45,7 +45,7 @@ export default function ImageView({ source, setError }: ViewerProps) { setError(undefined) } catch (error) { setContent(undefined) - setError(error as Error) + setError(error) } finally { setIsLoading(false) } diff --git a/src/components/JsonView/JsonView.tsx b/src/components/JsonView/JsonView.tsx index eed17d18..bff8752c 100644 --- a/src/components/JsonView/JsonView.tsx +++ b/src/components/JsonView/JsonView.tsx @@ -8,7 +8,7 @@ import styles from '../Json/Json.module.css' interface ViewerProps { source: FileSource - setError: (error: Error | undefined) => void + setError: (error: unknown) => void } const largeFileSize = 8_000_000 // 8 mb @@ -48,7 +48,7 @@ export default function JsonView({ source, setError }: ViewerProps) { setJson(JSON.parse(text)) } catch (error) { // TODO: show plain text in error case - setError(error as Error) + setError(error) } finally { setIsLoading(false) } diff --git a/src/components/MarkdownView/MarkdownView.tsx b/src/components/MarkdownView/MarkdownView.tsx index 83b388c9..b7dc5c14 100644 --- a/src/components/MarkdownView/MarkdownView.tsx +++ b/src/components/MarkdownView/MarkdownView.tsx @@ -8,7 +8,7 @@ import styles from './MarkdownView.module.css' interface ViewerProps { source: FileSource - setError: (error: Error | undefined) => void + setError: (error: unknown) => void } /** @@ -37,7 +37,7 @@ export default function MarkdownView({ source, setError }: ViewerProps) { setError(undefined) setContent({ text, fileSize }) } catch (error) { - setError(error as Error) + setError(error) setContent(undefined) } finally { setIsLoading(false) diff --git a/src/components/ParquetView/ParquetView.tsx b/src/components/ParquetView/ParquetView.tsx index f7b76579..6c6a2460 100644 --- a/src/components/ParquetView/ParquetView.tsx +++ b/src/components/ParquetView/ParquetView.tsx @@ -1,4 +1,4 @@ -import HighTable, { DataFrame, rowCache } from 'hightable' +import HighTable, { DataFrame } from 'hightable' import 'hightable/src/HighTable.css' import { asyncBufferFromUrl, parquetMetadataAsync } from 'hyparquet' import React, { useCallback, useEffect, useState } from 'react' @@ -15,7 +15,7 @@ import styles from './ParquetView.module.css' interface ViewerProps { source: FileSource setProgress: (progress: number | undefined) => void - setError: (error: Error | undefined) => void + setError: (error: unknown) => void } interface Content extends ContentSize { @@ -41,12 +41,11 @@ export default function ParquetView({ source, setProgress, setError }: ViewerPro const from = { url: resolveUrl, byteLength: asyncBuffer.byteLength, requestInit } setProgress(0.66) const metadata = await parquetMetadataAsync(asyncBuffer) - let dataframe = parquetDataFrame(from, metadata) - dataframe = rowCache(dataframe) + const dataframe = parquetDataFrame(from, metadata) const fileSize = asyncBuffer.byteLength setContent({ dataframe, fileSize }) } catch (error) { - setError(error as Error) + setError(error) } finally { setIsLoading(false) setProgress(1) diff --git a/src/components/TextView/TextView.tsx b/src/components/TextView/TextView.tsx index ace7dd98..c1e5abb1 100644 --- a/src/components/TextView/TextView.tsx +++ b/src/components/TextView/TextView.tsx @@ -7,7 +7,7 @@ import styles from './TextView.module.css' interface ViewerProps { source: FileSource - setError: (error: Error | undefined) => void + setError: (error: unknown) => void } /** @@ -36,7 +36,7 @@ export default function TextView({ source, setError }: ViewerProps) { setError(undefined) setContent({ text, fileSize }) } catch (error) { - setError(error as Error) + setError(error) setContent(undefined) } finally { setIsLoading(false) diff --git a/src/components/Viewer/Viewer.tsx b/src/components/Viewer/Viewer.tsx index f2d1c62e..47388fe2 100644 --- a/src/components/Viewer/Viewer.tsx +++ b/src/components/Viewer/Viewer.tsx @@ -9,7 +9,7 @@ import TextView from '../TextView/TextView.js' interface ViewerProps { source: FileSource - setError: (error: Error | undefined) => void + setError: (error: unknown) => void setProgress: (progress: number | undefined) => void } diff --git a/src/lib/index.ts b/src/lib/index.ts index 046303f9..f34152a7 100644 --- a/src/lib/index.ts +++ b/src/lib/index.ts @@ -3,4 +3,4 @@ export * from './sources/index.js' export { parquetDataFrame } from './tableProvider.js' export { asyncBufferFrom, cn, contentTypes, formatFileSize, getFileDate, getFileDateShort, imageTypes, parseFileSize } from './utils.js' export { parquetQueryWorker } from './workers/parquetWorkerClient.js' -export type { AsyncBufferFrom, Cells } from './workers/types.js' +export type { AsyncBufferFrom } from './workers/types.js' diff --git a/src/lib/tableProvider.ts b/src/lib/tableProvider.ts index 596c8a7a..0c792251 100644 --- a/src/lib/tableProvider.ts +++ b/src/lib/tableProvider.ts @@ -1,37 +1,14 @@ -import { DataFrame, OrderBy, ResolvableRow, resolvableRow } from 'hightable' +import { DataFrame, DataFrameEvents, ResolvedValue, UnsortableDataFrame, createEventTarget, sortableDataFrame } from 'hightable' +import type { ColumnData } from 'hyparquet' import { FileMetaData, parquetSchema } from 'hyparquet' -import { parquetColumnRanksWorker, parquetQueryWorker } from './workers/parquetWorkerClient.js' +import { parquetQueryWorker } from './workers/parquetWorkerClient.js' import type { AsyncBufferFrom } from './workers/types.d.ts' -/* - * sortIndex[0] gives the index of the first row in the sorted table - */ -export function computeSortIndex(orderByRanks: { direction: 'ascending' | 'descending', ranks: number[] }[]): number[] { - if (!(0 in orderByRanks)) { - throw new Error('orderByRanks should have at least one element') - } - const numRows = orderByRanks[0].ranks.length - return Array - .from({ length: numRows }, (_, i) => i) - .sort((a, b) => { - for (const { direction, ranks } of orderByRanks) { - const rankA = ranks[a] - const rankB = ranks[b] - if (rankA === undefined || rankB === undefined) { - throw new Error('Invalid ranks') - } - const value = direction === 'ascending' ? 1 : -1 - if (rankA < rankB) return -value - if (rankA > rankB) return value - } - return 0 - }) -} - interface VirtualRowGroup { groupStart: number groupEnd: number - fetching: boolean + fetching: Map + fetched: Map } /** @@ -40,9 +17,9 @@ interface VirtualRowGroup { export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): DataFrame { const { children } = parquetSchema(metadata) const header = children.map(child => child.element.name) - const sortCache = new Map>() - const columnRanksCache = new Map>() - const data = new Array(Number(metadata.num_rows)) + const eventTarget = createEventTarget() + + const cellCache = new Map[]>(header.map(name => [name, []])) // virtual row groups are up to 1000 rows within row group boundaries const groups: VirtualRowGroup[] = [] @@ -52,155 +29,129 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): for (let j = 0; j < rg.num_rows; j += 1000) { const groupSize = Math.min(1000, Number(rg.num_rows) - j) const groupEnd = groupStart + groupSize - groups.push({ groupStart, groupEnd, fetching: false }) + groups.push({ + groupStart, + groupEnd, + fetching: new Map(header.map(name => [name, false])), + fetched: new Map(header.map(name => [name, false])), + }) groupStart = groupEnd } } - function fetchVirtualRowGroup(virtualGroupIndex: number) { + async function fetchVirtualRowGroup({ virtualGroupIndex, columns, onCached }: { + virtualGroupIndex: number, columns: string[], onCached?: () => void + }): Promise { const group = groups[virtualGroupIndex] - if (group && !group.fetching) { - group.fetching = true - const { groupStart, groupEnd } = group - // Initialize with resolvable promises - for (let i = groupStart; i < groupEnd; i++) { - data[i] = resolvableRow(header) - data[i]?.index.resolve(i) - } - parquetQueryWorker({ from, metadata, rowStart: groupStart, rowEnd: groupEnd }) - .then(groupData => { - for (let rowIndex = groupStart; rowIndex < groupEnd; rowIndex++) { - const dataRow = data[rowIndex] - if (dataRow === undefined) { - throw new Error(`Missing data row for index ${rowIndex}`) - } - const row = groupData[rowIndex - groupStart] - if (row === undefined) { - throw new Error(`Missing row in groupData for index ${rowIndex}`) - } - for (const [key, value] of Object.entries(row)) { - const cell = dataRow.cells[key] - if (cell === undefined) { - throw new Error(`Missing column in dataRow for column ${key}`) - } - cell.resolve(value) - } - } - }) - .catch((error: unknown) => { - const reason = `Error fetching rows ${groupStart}-${groupEnd}: ${error}` - // reject the index of the first row (it's enough to trigger the error bar) - data[groupStart]?.index.reject(reason) - }) + if (!group) { + throw new Error(`Virtual row group ${virtualGroupIndex} not found`) } - } + const { groupStart, groupEnd, fetching, fetched } = group + const columnsToFetch = columns.filter(column => !fetching.get(column) && !fetched.get(column)) - function getColumnRanks(column: string): Promise { - let columnRanks = columnRanksCache.get(column) - if (!columnRanks) { - columnRanks = parquetColumnRanksWorker({ from, metadata, column }) - columnRanksCache.set(column, columnRanks) + function onChunk(chunk: ColumnData): void { + const { columnName, columnData, rowStart } = chunk + const cachedColumn = cellCache.get(columnName) + if (!cachedColumn) { + throw new Error(`Column "${columnName}" not found in header`) + } + let row = rowStart + for (const value of columnData) { + cachedColumn[row] ??= { value } + onCached?.() + row++ + } } - return columnRanks - } - function getSortIndex(orderBy: OrderBy): Promise { - const orderByKey = JSON.stringify(orderBy) - let sortIndex = sortCache.get(orderByKey) - if (!sortIndex) { - const orderByRanksPromise = Promise.all( - orderBy.map(({ column, direction }) => getColumnRanks(column).then(ranks => ({ direction, ranks }))) - ) - sortIndex = orderByRanksPromise.then(orderByRanks => computeSortIndex(orderByRanks)) - sortCache.set(orderByKey, sortIndex) + if (columnsToFetch.length === 0) { + // nothing to fetch + return } - return sortIndex + + columnsToFetch.forEach(column => { + fetching.set(column, true) + }) + + // TODO(SL): pass AbortSignal to the worker? + await parquetQueryWorker({ from, metadata, rowStart: groupStart, rowEnd: groupEnd, columns: columnsToFetch, onChunk }) + + columnsToFetch.forEach(column => { + fetching.set(column, false) + fetched.set(column, true) + }) + } - return { + const numRows = Number(metadata.num_rows) + + const unsortableDataFrame: UnsortableDataFrame = { header, - numRows: Number(metadata.num_rows), - rows({ start, end, orderBy }) { - if (orderBy?.length) { - const numRows = end - start - const wrapped = new Array(numRows).fill(null).map(() => resolvableRow(header)) - - getSortIndex(orderBy).then(indices => { - // Compute row groups to fetch - for (const index of indices.slice(start, end)) { - const groupIndex = groups.findIndex(({ groupEnd }) => index < groupEnd) - fetchVirtualRowGroup(groupIndex) - } - - // Re-assemble data in sorted order into wrapped - for (let i = start; i < end; i++) { - const index = indices[i] - if (index === undefined) { - throw new Error(`index ${i} not found in indices`) - } - const row = data[index] - if (row === undefined) { - throw new Error('Row not fetched') - } - const { cells } = row - const wrappedRow = wrapped[i - start] - if (wrappedRow === undefined) { - throw new Error(`Wrapped row missing at index ${i - start}`) - } - wrappedRow.index.resolve(index) - for (const key of header) { - const cell = cells[key] - if (cell) { - // TODO(SL): should we remove this check? It makes sense only if header change - // but if so, I guess we will have more issues - cell - .then((value: unknown) => { - const wrappedCell = wrappedRow.cells[key] - if (wrappedCell === undefined) { - throw new Error(`Wrapped cell not found for column ${key}`) - } - wrappedCell.resolve(value) - }) - .catch((error: unknown) => { - console.error('Error resolving sorted row', error) - }) - } - } - } - }).catch((error: unknown) => { - console.error('Error fetching sort index or resolving sorted rows', error) - // Reject at least one promise to trigger the error bar - wrapped[0]?.index.reject(`Error fetching sort index or resolving sorted rows: ${error}`) - }) - - return wrapped - } else { - groups.forEach(({ groupStart, groupEnd }, i) => { - if (groupStart < end && groupEnd > start) { - fetchVirtualRowGroup(i) - } - }) - const wrapped = data.slice(start, end) - if (wrapped.some(row => row === undefined)) { - throw new Error('Row not fetched') - } - return wrapped as ResolvableRow[] - } + numRows, + eventTarget, + getRowNumber({ row }) { + validateRow({ row, data: { numRows } }) + return { value: row } + }, + getCell({ row, column }) { + validateRow({ row, data: { numRows } }) + validateColumn({ column, data: { header } }) + return cellCache.get(column)?.[row] }, - sortable: true, - getColumn({ column, start, end }) { - if (!header.includes(column)) { - return Promise.reject(new Error(`Column "${column}" not found in header`)) + fetch: async ({ rowStart, rowEnd, columns, signal }) => { + validateFetchParams({ rowStart, rowEnd, columns, data: { numRows, header } }) + checkSignal(signal) + + if (!columns || columns.length === 0) { + return + } + + const promises: Promise[] = [] + + function onCached() { + eventTarget.dispatchEvent(new CustomEvent('resolve')) } - return parquetQueryWorker({ - from, - metadata, - rowStart: start, - rowEnd: end, - }).then(rows => { - return rows.map(row => row[column]) + groups.forEach(({ groupStart, groupEnd }, i) => { + if (groupStart < rowEnd && groupEnd > rowStart) { + promises.push( + fetchVirtualRowGroup({ + virtualGroupIndex: i, + columns, + onCached, + }).then(() => { + checkSignal(signal) + }) + ) + } }) + + await Promise.all(promises) }, } + + return sortableDataFrame(unsortableDataFrame) +} + +function validateFetchParams({ rowStart, rowEnd, columns, data: { numRows, header } }: {rowStart: number, rowEnd: number, columns?: string[], data: Pick}): void { + if (rowStart < 0 || rowEnd > numRows || !Number.isInteger(rowStart) || !Number.isInteger(rowEnd) || rowStart > rowEnd) { + throw new Error(`Invalid row range: ${rowStart} - ${rowEnd}, numRows: ${numRows}`) + } + if (columns?.some(column => !header.includes(column))) { + throw new Error(`Invalid columns: ${columns.join(', ')}. Available columns: ${header.join(', ')}`) + } +} +function validateRow({ row, data: { numRows } }: {row: number, data: Pick}): void { + if (row < 0 || row >= numRows || !Number.isInteger(row)) { + throw new Error(`Invalid row index: ${row}, numRows: ${numRows}`) + } +} +function validateColumn({ column, data: { header } }: {column: string, data: Pick}): void { + if (!header.includes(column)) { + throw new Error(`Invalid column: ${column}. Available columns: ${header.join(', ')}`) + } +} +function checkSignal(signal?: AbortSignal): void { + if (signal?.aborted) { + throw new DOMException('The operation was aborted.', 'AbortError') + } } diff --git a/src/lib/utils.ts b/src/lib/utils.ts index 6940bd16..a9552319 100644 --- a/src/lib/utils.ts +++ b/src/lib/utils.ts @@ -1,3 +1,4 @@ +import { stringify } from 'hightable' import { AsyncBuffer, asyncBufferFromUrl, cachedAsyncBuffer } from 'hyparquet' import { AsyncBufferFrom } from './workers/types.js' @@ -97,3 +98,11 @@ export const contentTypes: Record = { } export const imageTypes = ['.png', '.jpg', '.jpeg', '.gif', '.svg', '.tiff', '.webp'] + +export function toError(error: unknown): Error | undefined{ + if (error === undefined || error instanceof Error) { + return error + } else { + return new Error(stringify(error)) + } +} diff --git a/src/lib/workers/parquetWorker.ts b/src/lib/workers/parquetWorker.ts index 50036aaf..6b654e1b 100644 --- a/src/lib/workers/parquetWorker.ts +++ b/src/lib/workers/parquetWorker.ts @@ -1,69 +1,30 @@ -import { ColumnData, parquetQuery } from 'hyparquet' +import type { ColumnData } from 'hyparquet' +import { parquetRead } from 'hyparquet' import { compressors } from 'hyparquet-compressors' -import { parquetReadColumn } from 'hyparquet/src/read.js' import { asyncBufferFrom } from '../utils.js' -import type { ChunkMessage, ClientMessage, ColumnRanksMessage, ErrorMessage, ResultMessage } from './types.js' +import type { ChunkMessage, ClientMessage, ErrorMessage, ResultMessage } from './types.js' function postChunkMessage ({ chunk, queryId }: ChunkMessage) { self.postMessage({ chunk, queryId }) } -function postResultMessage ({ result, queryId }: ResultMessage) { - self.postMessage({ result, queryId }) +function postResultMessage ({ queryId }: ResultMessage) { + self.postMessage({ queryId }) } function postErrorMessage ({ error, queryId }: ErrorMessage) { self.postMessage({ error, queryId }) } -function postColumnRanksMessage ({ columnRanks, queryId }: ColumnRanksMessage) { - self.postMessage({ columnRanks, queryId }) -} self.onmessage = async ({ data }: { data: ClientMessage }) => { - const { metadata, from, kind, queryId } = data + const { rowStart, rowEnd, columns, metadata, from, queryId } = data const file = await asyncBufferFrom(from) - if (kind === 'columnRanks') { - const { column } = data - // return the column ranks in ascending order - // we can get the descending order replacing the rank with numRows - rank - 1. It's not exactly the rank of - // the descending order, because the rank is the first, not the last, of the ties. But it's enough for the - // purpose of sorting. - - try { - const sortColumn: unknown[] = Array.from(await parquetReadColumn({ file, metadata, columns: [column], compressors })) - const valuesWithIndex = sortColumn.map((value, index) => ({ value, index })) - const sortedValuesWithIndex = valuesWithIndex.sort(({ value: a }, { value: b }) => compare(a, b)) - const columnRanks = sortedValuesWithIndex.reduce((accumulator, currentValue, rank) => { - const { lastValue, lastRank, ranks } = accumulator - const { value, index } = currentValue - if (value === lastValue) { - ranks[index] = lastRank - return { ranks, lastValue, lastRank } - } else { - ranks[index] = rank - return { ranks, lastValue: value, lastRank: rank } - } - }, { - ranks: Array(sortColumn.length).fill(-1) as number[], - lastValue: undefined as unknown, - lastRank: 0, - }).ranks - postColumnRanksMessage({ columnRanks, queryId }) - } catch (error) { - postErrorMessage({ error: error as Error, queryId }) - } - } else { - const { rowStart, rowEnd, columns, orderBy, filter, chunks } = data - const onChunk = chunks ? (chunk: ColumnData) => { postChunkMessage({ chunk, queryId }) } : undefined - try { - const result = await parquetQuery({ metadata, file, rowStart, rowEnd, columns, orderBy, filter, compressors, onChunk }) - postResultMessage({ result, queryId }) - } catch (error) { - postErrorMessage({ error: error as Error, queryId }) - } + try { + await parquetRead({ metadata, file, rowStart, rowEnd, columns, compressors, onChunk }) + postResultMessage({ queryId }) + } catch (error) { + postErrorMessage({ error: error as Error, queryId }) } -} -function compare(a: T, b: T): number { - if (a < b) return -1 - if (a > b) return 1 - return 1 // TODO: how to handle nulls? + function onChunk(chunk: ColumnData) { + postChunkMessage({ chunk, queryId }) + } } diff --git a/src/lib/workers/parquetWorkerClient.ts b/src/lib/workers/parquetWorkerClient.ts index be661c60..c2968512 100644 --- a/src/lib/workers/parquetWorkerClient.ts +++ b/src/lib/workers/parquetWorkerClient.ts @@ -2,29 +2,22 @@ import ParquetWorker from './parquetWorker?worker&inline' /// ^ the worker is bundled with the main thread code (inline) which is easier for users to import /// (no need to copy the worker file to the right place) import type { ColumnData } from 'hyparquet' -import type { Cells, ColumnRanksClientMessage, ColumnRanksWorkerMessage, ColumnRanksWorkerOptions, QueryClientMessage, QueryWorkerMessage, QueryWorkerOptions } from './types.js' +import type { ClientMessage, WorkerMessage, WorkerOptions } from './types.js' let worker: Worker | undefined let nextQueryId = 0 -interface RowsQueryAgent { - kind: 'query' - resolve: (value: Cells[]) => void +interface QueryAgent { + resolve: () => void reject: (error: Error) => void - onChunk?: (chunk: ColumnData) => void + onChunk: (chunk: ColumnData) => void } -interface ColumnRanksQueryAgent { - kind: 'columnRanks' - resolve: (value: number[]) => void - reject: (error: Error) => void -} -type QueryAgent = RowsQueryAgent | ColumnRanksQueryAgent const pending = new Map() function getWorker() { if (!worker) { worker = new ParquetWorker() - worker.onmessage = ({ data }: { data: QueryWorkerMessage | ColumnRanksWorkerMessage }) => { + worker.onmessage = ({ data }: { data: WorkerMessage }) => { const pendingQueryAgent = pending.get(data.queryId) if (!pendingQueryAgent) { console.warn( @@ -33,27 +26,13 @@ function getWorker() { return } - if (pendingQueryAgent.kind === 'query') { - const { onChunk, resolve, reject } = pendingQueryAgent - if ('error' in data) { - reject(data.error) - } else if ('result' in data) { - resolve(data.result) - } else if ('chunk' in data) { - onChunk?.(data.chunk) - } else { - reject(new Error('Unexpected message from worker')) - } - return - } - - const { resolve, reject } = pendingQueryAgent + const { onChunk, resolve, reject } = pendingQueryAgent if ('error' in data) { reject(data.error) - } else if ('columnRanks' in data) { - resolve(data.columnRanks) + } else if ('chunk' in data) { + onChunk(data.chunk) } else { - reject(new Error('Unexpected message from worker')) + resolve() } } } @@ -66,26 +45,13 @@ function getWorker() { * Instead of taking an AsyncBuffer, it takes a AsyncBufferFrom, because it needs * to be serialized to the worker. */ -export function parquetQueryWorker({ metadata, from, rowStart, rowEnd, columns, orderBy, filter, onChunk }: QueryWorkerOptions): Promise { +export function parquetQueryWorker({ metadata, from, rowStart, rowEnd, columns, onChunk }: WorkerOptions): Promise { // TODO(SL) Support passing columns? return new Promise((resolve, reject) => { const queryId = nextQueryId++ - pending.set(queryId, { kind: 'query', resolve, reject, onChunk }) - const worker = getWorker() - - // If caller provided an onChunk callback, worker will send chunks as they are parsed - const chunks = onChunk !== undefined - const message: QueryClientMessage = { queryId, metadata, from, rowStart, rowEnd, columns, orderBy, filter, chunks, kind: 'query' } - worker.postMessage(message) - }) -} - -export function parquetColumnRanksWorker({ metadata, from, column }: ColumnRanksWorkerOptions): Promise { - return new Promise((resolve, reject) => { - const queryId = nextQueryId++ - pending.set(queryId, { kind: 'columnRanks', resolve, reject }) + pending.set(queryId, { resolve, reject, onChunk }) const worker = getWorker() - const message: ColumnRanksClientMessage = { queryId, metadata, from, column, kind: 'columnRanks' } + const message: ClientMessage = { queryId, metadata, from, rowStart, rowEnd, columns } worker.postMessage(message) }) } diff --git a/src/lib/workers/types.ts b/src/lib/workers/types.ts index f7a5a2d4..be883077 100644 --- a/src/lib/workers/types.ts +++ b/src/lib/workers/types.ts @@ -1,5 +1,4 @@ import type { ColumnData, FileMetaData } from 'hyparquet' -import type { ParquetQueryFilter } from 'hyparquet/src/types.js' // Serializable constructors for AsyncBuffers interface AsyncBufferFromFile { @@ -12,51 +11,25 @@ interface AsyncBufferFromUrl { requestInit?: RequestInit } export type AsyncBufferFrom = AsyncBufferFromFile | AsyncBufferFromUrl -// Cells is defined in hightable, but uses any, not unknown -export type Cells = Record ; -export interface CommonWorkerOptions { - metadata: FileMetaData, - from: AsyncBufferFrom -} -interface Message { +export interface ResultMessage { queryId: number } -export interface ErrorMessage extends Message { +export interface ErrorMessage extends ResultMessage { error: Error } +export interface ChunkMessage extends ResultMessage { + chunk: ColumnData +} +export type WorkerMessage = ChunkMessage | ResultMessage | ErrorMessage -/* Query worker */ -export interface QueryWorkerOptions extends CommonWorkerOptions { +export interface WorkerOptions { + metadata: FileMetaData, + from: AsyncBufferFrom rowStart?: number, rowEnd?: number, columns?: string[], - orderBy?: string, - filter?: ParquetQueryFilter, - onChunk?: (chunk: ColumnData) => void -} -export interface QueryClientMessage extends QueryWorkerOptions, Message { - kind: 'query', - chunks?: boolean + onChunk: (chunk: ColumnData) => void + // TODO(SL): support onPage too? } -export interface ChunkMessage extends Message { - chunk: ColumnData -} -export interface ResultMessage extends Message { - result: Cells[] -} -export type QueryWorkerMessage = ChunkMessage | ResultMessage | ErrorMessage - -/* ColumnRanks worker */ -export interface ColumnRanksWorkerOptions extends CommonWorkerOptions { - column: string -} -export interface ColumnRanksClientMessage extends ColumnRanksWorkerOptions, Message { - kind: 'columnRanks' -} -export interface ColumnRanksMessage extends Message { - columnRanks: number[] -} -export type ColumnRanksWorkerMessage = ColumnRanksMessage | ErrorMessage - -export type ClientMessage = QueryClientMessage | ColumnRanksClientMessage +export type ClientMessage = Omit & ResultMessage From d651830fc8b76ac09448ce70a0a9943ae70b69bc Mon Sep 17 00:00:00 2001 From: Sylvain Lesage Date: Tue, 29 Jul 2025 21:49:12 -0400 Subject: [PATCH 3/8] refactor --- src/lib/tableProvider.ts | 37 ++++++++++++++++--------------------- 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/src/lib/tableProvider.ts b/src/lib/tableProvider.ts index 0c792251..37025574 100644 --- a/src/lib/tableProvider.ts +++ b/src/lib/tableProvider.ts @@ -39,8 +39,8 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): } } - async function fetchVirtualRowGroup({ virtualGroupIndex, columns, onCached }: { - virtualGroupIndex: number, columns: string[], onCached?: () => void + async function fetchVirtualRowGroup({ virtualGroupIndex, columns }: { + virtualGroupIndex: number, columns: string[] }): Promise { const group = groups[virtualGroupIndex] if (!group) { @@ -49,20 +49,6 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): const { groupStart, groupEnd, fetching, fetched } = group const columnsToFetch = columns.filter(column => !fetching.get(column) && !fetched.get(column)) - function onChunk(chunk: ColumnData): void { - const { columnName, columnData, rowStart } = chunk - const cachedColumn = cellCache.get(columnName) - if (!cachedColumn) { - throw new Error(`Column "${columnName}" not found in header`) - } - let row = rowStart - for (const value of columnData) { - cachedColumn[row] ??= { value } - onCached?.() - row++ - } - } - if (columnsToFetch.length === 0) { // nothing to fetch return @@ -82,6 +68,20 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): } + function onChunk(chunk: ColumnData): void { + const { columnName, columnData, rowStart } = chunk + const cachedColumn = cellCache.get(columnName) + if (!cachedColumn) { + throw new Error(`Column "${columnName}" not found in header`) + } + let row = rowStart + for (const value of columnData) { + cachedColumn[row] ??= { value } + row++ + } + eventTarget.dispatchEvent(new CustomEvent('resolve')) + } + const numRows = Number(metadata.num_rows) const unsortableDataFrame: UnsortableDataFrame = { @@ -107,17 +107,12 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): const promises: Promise[] = [] - function onCached() { - eventTarget.dispatchEvent(new CustomEvent('resolve')) - } - groups.forEach(({ groupStart, groupEnd }, i) => { if (groupStart < rowEnd && groupEnd > rowStart) { promises.push( fetchVirtualRowGroup({ virtualGroupIndex: i, columns, - onCached, }).then(() => { checkSignal(signal) }) From 47e5c46b3a954cbc349a6dd50e31a84b635895aa Mon Sep 17 00:00:00 2001 From: Sylvain Lesage Date: Tue, 29 Jul 2025 22:04:27 -0400 Subject: [PATCH 4/8] refactor + use onPage --- src/lib/tableProvider.ts | 13 +++++-------- src/lib/workers/parquetWorker.ts | 2 +- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/lib/tableProvider.ts b/src/lib/tableProvider.ts index 37025574..1ba541dd 100644 --- a/src/lib/tableProvider.ts +++ b/src/lib/tableProvider.ts @@ -39,13 +39,9 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): } } - async function fetchVirtualRowGroup({ virtualGroupIndex, columns }: { - virtualGroupIndex: number, columns: string[] + async function fetchVirtualRowGroup({ group, columns }: { + group: VirtualRowGroup, columns: string[] }): Promise { - const group = groups[virtualGroupIndex] - if (!group) { - throw new Error(`Virtual row group ${virtualGroupIndex} not found`) - } const { groupStart, groupEnd, fetching, fetched } = group const columnsToFetch = columns.filter(column => !fetching.get(column) && !fetched.get(column)) @@ -107,11 +103,12 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): const promises: Promise[] = [] - groups.forEach(({ groupStart, groupEnd }, i) => { + groups.forEach((group) => { + const { groupStart, groupEnd } = group if (groupStart < rowEnd && groupEnd > rowStart) { promises.push( fetchVirtualRowGroup({ - virtualGroupIndex: i, + group, columns, }).then(() => { checkSignal(signal) diff --git a/src/lib/workers/parquetWorker.ts b/src/lib/workers/parquetWorker.ts index 6b654e1b..3a84164b 100644 --- a/src/lib/workers/parquetWorker.ts +++ b/src/lib/workers/parquetWorker.ts @@ -18,7 +18,7 @@ self.onmessage = async ({ data }: { data: ClientMessage }) => { const { rowStart, rowEnd, columns, metadata, from, queryId } = data const file = await asyncBufferFrom(from) try { - await parquetRead({ metadata, file, rowStart, rowEnd, columns, compressors, onChunk }) + await parquetRead({ metadata, file, rowStart, rowEnd, columns, compressors, onPage: onChunk }) postResultMessage({ queryId }) } catch (error) { postErrorMessage({ error: error as Error, queryId }) From efd2b3de376ce2dc34588ebf73620b63e6cc8c62 Mon Sep 17 00:00:00 2001 From: Sylvain Lesage Date: Wed, 30 Jul 2025 13:02:43 -0400 Subject: [PATCH 5/8] show a warning message instead of an error when the cell has not loaded yet --- src/components/Cell/Cell.tsx | 7 +++---- src/components/CellPanel/CellPanel.tsx | 10 +++++++++- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/components/Cell/Cell.tsx b/src/components/Cell/Cell.tsx index ae5e9608..2b7f7438 100644 --- a/src/components/Cell/Cell.tsx +++ b/src/components/Cell/Cell.tsx @@ -15,6 +15,8 @@ interface CellProps { col: number } +const UNLOADED_CELL_PLACEHOLDER = '' + /** * Cell viewer displays a single cell from a table. */ @@ -46,10 +48,7 @@ export default function CellView({ source, row, col }: CellProps) { } await df.fetch({ rowStart: row, rowEnd: row + 1, columns: [columnName] }) const cell = df.getCell({ row, column: columnName }) - if (cell === undefined) { - throw new Error(`Cell at row=${row}, col=${col} is undefined`) - } - const text = stringify(cell.value) + const text = cell === undefined ? UNLOADED_CELL_PLACEHOLDER : stringify(cell.value) setText(text) setError(undefined) } catch (error) { diff --git a/src/components/CellPanel/CellPanel.tsx b/src/components/CellPanel/CellPanel.tsx index 0108468e..9fa798c3 100644 --- a/src/components/CellPanel/CellPanel.tsx +++ b/src/components/CellPanel/CellPanel.tsx @@ -17,6 +17,8 @@ interface ViewerProps { onClose: () => void } +const UNLOADED_CELL_PLACEHOLDER = '' + /** * Cell viewer displays a single cell from a table. */ @@ -37,7 +39,13 @@ export default function CellPanel({ df, row, col, setProgress, setError, onClose await df.fetch({ rowStart: row, rowEnd: row + 1, columns: [columnName] }) const cell = df.getCell({ row, column: columnName }) if (cell === undefined) { - throw new Error(`Cell at row=${row}, col=${col} is undefined`) + setContent( + + {/* TODO(SL) maybe change the style to highlight it's not real content */} + {UNLOADED_CELL_PLACEHOLDER} + + ) + return } const value: unknown = await cell.value if (value instanceof Object && !(value instanceof Date)) { From 91e45c5a1c33171faa2298d53038a0f9807fad71 Mon Sep 17 00:00:00 2001 From: Sylvain Lesage Date: Wed, 30 Jul 2025 13:03:07 -0400 Subject: [PATCH 6/8] fix logic in groups fetching --- src/lib/tableProvider.ts | 40 ++++++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/src/lib/tableProvider.ts b/src/lib/tableProvider.ts index 1ba541dd..6e39f9cc 100644 --- a/src/lib/tableProvider.ts +++ b/src/lib/tableProvider.ts @@ -4,11 +4,18 @@ import { FileMetaData, parquetSchema } from 'hyparquet' import { parquetQueryWorker } from './workers/parquetWorkerClient.js' import type { AsyncBufferFrom } from './workers/types.d.ts' +type GroupStatus = { + kind: 'unfetched' +} | { + kind: 'fetching' + promise: Promise +} | { + kind: 'fetched' +} interface VirtualRowGroup { groupStart: number groupEnd: number - fetching: Map - fetched: Map + state: Map } /** @@ -32,8 +39,7 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): groups.push({ groupStart, groupEnd, - fetching: new Map(header.map(name => [name, false])), - fetched: new Map(header.map(name => [name, false])), + state: new Map(header.map(name => [name, { kind: 'unfetched' }])), }) groupStart = groupEnd } @@ -42,24 +48,22 @@ export function parquetDataFrame(from: AsyncBufferFrom, metadata: FileMetaData): async function fetchVirtualRowGroup({ group, columns }: { group: VirtualRowGroup, columns: string[] }): Promise { - const { groupStart, groupEnd, fetching, fetched } = group - const columnsToFetch = columns.filter(column => !fetching.get(column) && !fetched.get(column)) - - if (columnsToFetch.length === 0) { - // nothing to fetch - return - } - - columnsToFetch.forEach(column => { - fetching.set(column, true) - }) + const { groupStart, groupEnd, state } = group + const columnsToFetch = columns.filter(column => state.get(column)?.kind === 'unfetched') + const promises = [...group.state.values()].filter((status): status is { kind: 'fetching', promise: Promise } => status.kind === 'fetching').map(status => status.promise) // TODO(SL): pass AbortSignal to the worker? - await parquetQueryWorker({ from, metadata, rowStart: groupStart, rowEnd: groupEnd, columns: columnsToFetch, onChunk }) + if (columnsToFetch.length > 0) { + const commonPromise = parquetQueryWorker({ from, metadata, rowStart: groupStart, rowEnd: groupEnd, columns: columnsToFetch, onChunk }) + columnsToFetch.forEach(column => { + state.set(column, { kind: 'fetching', promise: commonPromise }) + }) + promises.push(commonPromise) + } + await Promise.all(promises) columnsToFetch.forEach(column => { - fetching.set(column, false) - fetched.set(column, true) + state.set(column, { kind: 'fetched' }) }) } From cb859c7c088fe0bc6a189ba5b7b88c52bfbe4cca Mon Sep 17 00:00:00 2001 From: Sylvain Lesage Date: Wed, 30 Jul 2025 13:20:31 -0400 Subject: [PATCH 7/8] fix the content of the cell panel --- src/components/CellPanel/CellPanel.tsx | 62 +++++++++++++++----------- 1 file changed, 37 insertions(+), 25 deletions(-) diff --git a/src/components/CellPanel/CellPanel.tsx b/src/components/CellPanel/CellPanel.tsx index 9fa798c3..6c568e50 100644 --- a/src/components/CellPanel/CellPanel.tsx +++ b/src/components/CellPanel/CellPanel.tsx @@ -1,5 +1,6 @@ -import { DataFrame, stringify } from 'hightable' -import { ReactNode, useEffect, useState } from 'react' +import type { DataFrame, ResolvedValue } from 'hightable' +import { stringify } from 'hightable' +import { ReactNode, useCallback, useEffect, useState } from 'react' import { useConfig } from '../../hooks/useConfig.js' import { cn } from '../../lib/utils.js' import ContentWrapper from '../ContentWrapper/ContentWrapper.js' @@ -13,7 +14,7 @@ interface ViewerProps { row: number col: number setProgress: (progress: number) => void - setError: (error: Error) => void + setError: (error: unknown) => void onClose: () => void } @@ -26,6 +27,31 @@ export default function CellPanel({ df, row, col, setProgress, setError, onClose const [content, setContent] = useState() const { customClass } = useConfig() + const fillContent = useCallback((cell: ResolvedValue | undefined) => { + let content: ReactNode + if (cell === undefined) { + content = + + {UNLOADED_CELL_PLACEHOLDER} + + } else { + const { value } = cell + if (value instanceof Object && !(value instanceof Date)) { + content = + + + + } else { + content = + + {stringify(value)} + + } + } + setContent(content) + setError(undefined) + }, [customClass?.textView, customClass?.jsonView, setError]) + // Load cell data useEffect(() => { async function loadCellData() { @@ -36,31 +62,17 @@ export default function CellPanel({ df, row, col, setProgress, setError, onClose if (columnName === undefined) { throw new Error(`Column name missing at index col=${col}`) } - await df.fetch({ rowStart: row, rowEnd: row + 1, columns: [columnName] }) - const cell = df.getCell({ row, column: columnName }) + let cell = df.getCell({ row, column: columnName }) if (cell === undefined) { - setContent( - - {/* TODO(SL) maybe change the style to highlight it's not real content */} - {UNLOADED_CELL_PLACEHOLDER} - - ) + fillContent(undefined) return } - const value: unknown = await cell.value - if (value instanceof Object && !(value instanceof Date)) { - setContent( - - - - ) - } else { - setContent( - - {stringify(value)} - - ) + await df.fetch({ rowStart: row, rowEnd: row + 1, columns: [columnName] }) + cell = df.getCell({ row, column: columnName }) + if (cell === undefined) { + throw new Error(`Cell at row=${row}, column=${columnName} is undefined`) } + fillContent(cell) } catch (error) { setError(error as Error) } finally { @@ -69,7 +81,7 @@ export default function CellPanel({ df, row, col, setProgress, setError, onClose } void loadCellData() - }, [df, col, row, setProgress, setError, customClass]) + }, [df, col, row, setProgress, setError, fillContent]) const headers = <> From afb929885effd7ee62c676833bc19e0cfc9489bf Mon Sep 17 00:00:00 2001 From: Sylvain Lesage Date: Wed, 30 Jul 2025 13:27:29 -0400 Subject: [PATCH 8/8] don't open the cell panel until the cell has loaded --- src/components/ParquetView/ParquetView.tsx | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/components/ParquetView/ParquetView.tsx b/src/components/ParquetView/ParquetView.tsx index 6c6a2460..2dc8b0cd 100644 --- a/src/components/ParquetView/ParquetView.tsx +++ b/src/components/ParquetView/ParquetView.tsx @@ -82,9 +82,14 @@ export default function ParquetView({ source, setProgress, setError }: ViewerPro if (cell?.col === col && cell.row === row) { return undefined } + const columnName = content?.dataframe.header[col] + if (columnName === undefined || !content?.dataframe.getCell({ row, column: columnName })) { + // don't open the cell panel until it has loaded + return undefined + } return { row, col } }) - }, []) + }, [content]) const onDoubleClickCell = useCallback((_event: React.MouseEvent, col: number, row: number) => { toggleCell(col, row) }, [toggleCell])