diff --git a/.gitignore b/.gitignore index 5a84aa4078..fa1b3163c6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,9 @@ +tree-sitter/ +tree-sitter-cypher/ + .idea/ .vscode +.cursor .vs bazel-* .clwb diff --git a/Makefile b/Makefile index 22391806e4..03bdb5ed71 100644 --- a/Makefile +++ b/Makefile @@ -187,7 +187,7 @@ nodejs-deps: cd tools/nodejs_api && npm install --include=dev nodejstest: nodejs - cd tools/nodejs_api && npm test + cd tools/nodejs_api && node copy_src_to_build.js && npm test nodejstest-deps: nodejs-deps nodejstest diff --git a/tools/nodejs_api/README.md b/tools/nodejs_api/README.md index 086c63eb2f..d2ed12c77a 100644 --- a/tools/nodejs_api/README.md +++ b/tools/nodejs_api/README.md @@ -7,10 +7,24 @@ A high-performance graph database for knowledge-intensive applications. This Nod ## πŸ“¦ Installation +**From npm (if published):** + ```bash npm install lbug ``` +**From GitHub** (monorepo; the Node package lives in `tools/nodejs_api`): + +- **pnpm** (v9+), subdirectory is supported: + + ```bash + pnpm add lbug@github:LadybugDB/ladybug#path:tools/nodejs_api + ``` + + On install, the package will build the native addon from source (needs CMake and a C++20 compiler). + +- **npm**: no built-in subdirectory install. Either use a **local path** after cloning and building (see [Build and use in other projects](#-build-and-use-in-other-projects-local)), or a tarball from [GitPkg](https://gitpkg.vercel.app/) (e.g. `https://gitpkg.vercel.app/LadybugDB/ladybug/tools/nodejs_api?main`). + --- ## πŸš€ Quick Start @@ -49,10 +63,8 @@ const main = async () => { // Run a query const result = await conn.query("MATCH (u:User) RETURN u.name, u.age;"); - // Fetch all results + // Consume results (choose one style) const rows = await result.getAll(); - - // Output results for (const row of rows) { console.log(row); } @@ -68,12 +80,101 @@ main().catch(console.error); The `lbug` package exposes the following primary classes: -* `Database` – Initializes a database from a file path. -* `Connection` – Executes queries on a connected database. -* `QueryResult` – Provides methods like `getAll()` to retrieve results. +* **Database** – `new Database(path, bufferPoolSize?, ...)`. Initialize with `init()` / `initSync()` (optional; done on first use). Close with `close()`. +* **Connection** – `new Connection(database, numThreads?)`. Run Cypher with `query(statement)` or `prepare(statement)` then `execute(preparedStatement, params)`. Use `transaction(fn)` for a single write transaction, `ping()` for liveness checks. Use `registerStream(name, source, { columns })` to load data from an AsyncIterable via `LOAD FROM name`; `unregisterStream(name)` when done. Configure with `setQueryTimeout(ms)`, `setMaxNumThreadForExec(n)`. +* **QueryResult** – Returned by `query()` / `execute()`. Consume with `getAll()`, `getNext()` / `hasNext()`, **async iteration** (`for await...of`), or **`toStream()`** (Node.js `Readable`). Metadata: `getColumnNames()`, `getColumnDataTypes()`, `getQuerySummary()`. Call `close()` when done (optional if fully consumed). +* **PreparedStatement** – Created by `conn.prepare(statement)`. Execute with `conn.execute(preparedStatement, params)`. Reuse for parameterized queries. Both CommonJS (`require`) and ES Modules (`import`) are fully supported. +### Consuming query results + +```js +const result = await conn.query("MATCH (n:User) RETURN n.name LIMIT 1000"); + +// Option 1: get all rows (loads into memory) +const rows = await result.getAll(); + +// Option 2: row by row (async) +while (result.hasNext()) { + const row = await result.getNext(); + console.log(row); +} + +// Option 3: async iterator (streaming, no full materialization) +for await (const row of result) { + console.log(row); +} + +// Option 4: Node.js Readable stream (e.g. for .pipe()) +const stream = result.toStream(); +stream.on("data", (row) => console.log(row)); +``` + +### Transactions + +**Manual:** Run `BEGIN TRANSACTION`, then your queries, then `COMMIT` or `ROLLBACK`. On error, call `ROLLBACK` before continuing. + +```js +await conn.query("BEGIN TRANSACTION"); +await conn.query("CREATE NODE TABLE Nodes(id INT64, PRIMARY KEY(id))"); +await conn.query('COPY Nodes FROM "data.csv"'); +await conn.query("COMMIT"); +// or on error: await conn.query("ROLLBACK"); +``` + +**Read-only transaction:** `BEGIN TRANSACTION READ ONLY` then queries, then `COMMIT` / `ROLLBACK`. + +**Wrapper:** One write transaction with automatic commit on success and rollback on throw: + +```js +await conn.transaction(async () => { + await conn.query("CREATE NODE TABLE Nodes(id INT64, PRIMARY KEY(id))"); + await conn.query('COPY Nodes FROM "data.csv"'); + // commit happens automatically; on throw, rollback then rethrow +}); +``` + +### Loading data from a Node.js stream + +You can feed data from an **AsyncIterable** (generator, async generator, or any `Symbol.asyncIterator`) into Cypher using **scan replacement**: register a stream by name, then use `LOAD FROM name` in your query. Rows are pulled from JavaScript on demand during execution. + +**API:** + +* **`conn.registerStream(name, source, options)`** (async) + * `name` – string used in Cypher: `LOAD FROM name RETURN ...` + * `source` – AsyncIterable of rows. Each row is an **array** of column values (same order as `options.columns`) or an **object** keyed by column name. + * `options.columns` – **required**. Schema: array of `{ name: string, type: string }`. Supported types: `INT64`, `INT32`, `INT16`, `INT8`, `UINT64`, `UINT32`, `DOUBLE`, `FLOAT`, `STRING`, `BOOL`, `DATE`, `TIMESTAMP`. + +* **`conn.unregisterStream(name)`** + Unregisters the source so the name can be reused or to avoid leaving stale entries. Call after the query (or when done with the stream). + +**Example:** + +```js +async function* generateRows() { + yield [1, "Alice"]; + yield [2, "Bob"]; + yield [3, "Carol"]; +} + +await conn.registerStream("users", generateRows(), { + columns: [ + { name: "id", type: "INT64" }, + { name: "name", type: "STRING" }, + ], +}); + +const result = await conn.query("LOAD FROM users RETURN *"); +for await (const row of result) { + console.log(row); // { id: 1, name: "Alice" }, ... +} + +conn.unregisterStream("users"); +``` + +You can combine the stream with other Cypher: e.g. `LOAD FROM stream RETURN * WHERE col > 0`, or `COPY MyTable FROM (LOAD FROM stream RETURN *)`. + --- ## πŸ› οΈ Local Development (for Contributors) @@ -96,6 +197,94 @@ npm run build npm test ``` +When developing from the **monorepo root**, build the native addon first so tests see the latest C++ code: + +```bash +# From repo root (D:\prj\ladybug or similar) +make nodejs +# Or: cmake --build build/release --target lbugjs +# Then from tools/nodejs_api: +cd tools/nodejs_api && npm test +``` + +--- + +## πŸ”§ Build and use in other projects (local) + +To use the Node.js API from the Ladybug repo in another project without publishing to npm: + +1. **Build the addon** (from the Ladybug repo root): + + ```bash + make nodejs + ``` + + Or from this directory: + + ```bash + npm run build + ``` + + This compiles the native addon into `build/lbugjs.node` and copies JS and types. + +2. **In your other project**, add a file dependency in `package.json`: + + ```json + "dependencies": { + "lbug": "file:../path/to/ladybug/tools/nodejs_api" + } + ``` + + Then run `npm install`. After that, `require("lbug")` or `import ... from "lbug"` will use your local build. + +3. **Optional:** to pack and install a tarball instead: + + ```bash + cd /path/to/ladybug/tools/nodejs_api + npm run build + npm pack + ``` + + In the other project: `npm install /path/to/ladybug/tools/nodejs_api/lbug-0.0.1.tgz`. + +### Prebuilt in your fork (install from GitHub without building) + +If you install from GitHub (e.g. `pnpm add lbug@github:user/ladybug#path:tools/nodejs_api`), the package runs `install.js`: if it finds a prebuilt binary, it uses it and does not build from source. To ship a prebuilt in your fork: + +1. **Build once** in your clone (from repo root): + + ```bash + make nodejs + ``` + +2. **Create the prebuilt file** (name = `lbugjs--.node`): + + - Windows x64: copy `tools/nodejs_api/build/lbugjs.node` β†’ `tools/nodejs_api/prebuilt/lbugjs-win32-x64.node` + - Linux x64: `lbugjs-linux-x64.node` + - macOS x64: `lbugjs-darwin-x64.node`, arm64: `lbugjs-darwin-arm64.node` + + Example (from repo root). **Windows (PowerShell):** + + ```powershell + New-Item -ItemType Directory -Force -Path tools/nodejs_api/prebuilt + Copy-Item tools/nodejs_api/build/lbugjs.node tools/nodejs_api/prebuilt/lbugjs-win32-x64.node + ``` + + **Linux/macOS:** + + ```bash + mkdir -p tools/nodejs_api/prebuilt + cp tools/nodejs_api/build/lbugjs.node tools/nodejs_api/prebuilt/lbugjs-$(node -p "process.platform")-$(node -p "process.arch").node + ``` + +3. **Commit and push** the `prebuilt/` folder. Then anyone (or you in another project) can do: + + ```bash + pnpm add lbug@github:YOUR_USERNAME/ladybug#path:tools/nodejs_api + ``` + + and the addon will be used from prebuilt without a local build. + --- ## πŸ“¦ Packaging and Binary Distribution diff --git a/tools/nodejs_api/build.js b/tools/nodejs_api/build.js index d326300260..b1b93bae43 100644 --- a/tools/nodejs_api/build.js +++ b/tools/nodejs_api/build.js @@ -3,6 +3,9 @@ const path = require("path"); const { execSync } = require("child_process"); const SRC_PATH = path.resolve(__dirname, "../.."); +const NODEJS_API = path.resolve(__dirname, "."); +const BUILD_DIR = path.join(NODEJS_API, "build"); +const SRC_JS_DIR = path.join(NODEJS_API, "src_js"); const THREADS = require("os").cpus().length; console.log(`Using ${THREADS} threads to build Lbug.`); @@ -12,3 +15,19 @@ execSync(`make nodejs NUM_THREADS=${THREADS}`, { cwd: SRC_PATH, stdio: "inherit", }); + +// Ensure build/ has latest JS from src_js (CMake copies at configure time only) +if (fs.existsSync(SRC_JS_DIR) && fs.existsSync(BUILD_DIR)) { + const files = fs.readdirSync(SRC_JS_DIR); + for (const name of files) { + if (name.endsWith(".js") || name.endsWith(".mjs") || name.endsWith(".d.ts")) { + fs.copyFileSync(path.join(SRC_JS_DIR, name), path.join(BUILD_DIR, name)); + } + } + // So package root has types when used as file: dependency + const dts = path.join(BUILD_DIR, "lbug.d.ts"); + if (fs.existsSync(dts)) { + fs.copyFileSync(dts, path.join(NODEJS_API, "lbug.d.ts")); + } + console.log("Copied src_js to build."); +} diff --git a/tools/nodejs_api/copy_src_to_build.js b/tools/nodejs_api/copy_src_to_build.js new file mode 100644 index 0000000000..8ebe1581c4 --- /dev/null +++ b/tools/nodejs_api/copy_src_to_build.js @@ -0,0 +1,22 @@ +/** + * Copies src_js/*.js, *.mjs, *.d.ts into build/ so tests run with the latest JS + * after "make nodejs" (which only copies at cmake configure time). + * Run from tools/nodejs_api. + */ +const fs = require("fs"); +const path = require("path"); + +const srcDir = path.join(__dirname, "src_js"); +const buildDir = path.join(__dirname, "build"); + +if (!fs.existsSync(buildDir)) { + console.warn("copy_src_to_build: build/ missing, run make nodejs first."); + process.exit(0); +} + +const re = /\.(js|mjs|d\.ts)$/; +const files = fs.readdirSync(srcDir).filter((n) => re.test(n)); +for (const name of files) { + fs.copyFileSync(path.join(srcDir, name), path.join(buildDir, name)); +} +console.log("Copied", files.length, "files from src_js to build."); diff --git a/tools/nodejs_api/docs/execution_chain_analysis.md b/tools/nodejs_api/docs/execution_chain_analysis.md new file mode 100644 index 0000000000..b28b1e40e2 --- /dev/null +++ b/tools/nodejs_api/docs/execution_chain_analysis.md @@ -0,0 +1,137 @@ +# LOAD FROM stream: Execution Chain Analysis + +## 1. End-to-end execution chain + +```mermaid +flowchart TB + subgraph JS["JS (main thread)"] + A[conn.query("LOAD FROM name RETURN *")] + B[registerStream: getChunk(requestId) β†’ pending.push; runConsumer via setImmediate] + C[runConsumer: sort pending, for each requestId take chunks[index], returnChunk(requestId, rows, done)] + D[AsyncIterator: it.next() β†’ yield rows] + end + + subgraph CppAddon["C++ addon (Node worker)"] + E[tableFunc: mutex, nextRequestId(), setChunkRequest, BlockingCall getChunk] + F[wait reqPtr->cv until filled] + G[returnChunkFromJS: req->rows, req->filled=true, cv.notify_one] + H[Copy rows to output.dataChunk, return cap] + end + + subgraph Engine["Engine (task thread, single if canParallelFunc=false)"] + I[getNextTuple β†’ getNextTuplesInternal] + J[tableFunc(input, output) β†’ numTuplesScanned] + K[FactorizedTable accumulates chunks] + L[Result: MaterializedQueryResult + FactorizedTableIterator] + end + + A --> I + I --> J + J --> E + E --> B + B --> C + C --> D + C --> G + G --> F + F --> H + H --> J + J --> K + K --> L + L --> M[hasNext / getNext in JS] + M --> A +``` + +### Sequence (single-threaded pipeline) + +| Step | Where | What | +|------|--------|------| +| 1 | JS | `query("LOAD FROM mystream RETURN *")` | +| 2 | Engine | Parse, plan TableFunctionCall (node stream), execute task | +| 3 | Engine | Source: `getNextTuple()` β†’ `getNextTuplesInternal()` β†’ `tableFunc()` | +| 4 | C++ | `tableFunc`: lock mutex, requestId=1, setChunkRequest(1), BlockingCall(getChunk, 1) | +| 5 | JS | getChunk(1) called on main thread; pending=[1], runConsumer scheduled | +| 6 | C++ | BlockingCall returns only after JS calls returnChunk; C++ waits on req->cv | +| 7 | JS | runConsumer: requestId=1, index=0, it.next() β†’ { value: [1,"a"], done: false }, returnChunk(1, [[1,"a"]], false) | +| 8 | C++ | returnChunkFromJS: req->rows, filled=true, cv.notify_one; tableFunc wakes, copies 1 row, returns 1 | +| 9 | Engine | TableFunctionCall outputs chunk to ResultSet; getNextTuple called again | +| 10 | C++ | tableFunc again: requestId=2, BlockingCall(getChunk, 2), wait | +| 11 | JS | getChunk(2), pending=[2], runConsumer: it.next() β†’ [2,"b"], returnChunk(2, ...) | +| … | … | Repeat until tableFunc returns 0 (empty chunk / done) | +| N | Engine | No more tuples from source β†’ pipeline finishes, MaterializedQueryResult built | +| N+1 | JS | result.hasNext() / result.getNext() over FactorizedTableIterator | +| N+2 | C++ addon | GetNextAsync: if !hasNext() return null; else getNext() (core throws if !hasNext()) | +| N+3 | JS | getNext() resolves to row or null | +``` + +--- + +## 2. Expert views and failure points + +### Expert 1: Engine / pipeline + +- **Order**: With `canParallelFunc = false`, one thread runs the pipeline; tableFunc is called sequentially (1, 2, 3, …). Chunks are appended to FactorizedTable in call order β†’ row order is preserved. +- **End of stream**: Engine keeps calling tableFunc until it returns 0. JS sends `returnChunk(id, [], true)` when iterator is done; C++ returns 0 β†’ engine stops. No extra tableFunc call after β€œdone”. +- **Risk**: If the engine ever called tableFunc again after a 0 return, the next requestId would be issued and JS would call it.next() again (possibly past end). Current code path: return 0 β†’ getNextTuplesInternal returns false β†’ parent stops pulling; no evidence of extra call. + +### Expert 2: Addon (C++ / JS bridge) + +- **requestId ordering**: C++ assigns requestId sequentially under mutex; JS runConsumer sorts `pending` and serves `chunks[requestId - 1]`. So request 1 gets first it.next(), request 2 gets second, etc. Correct. +- **getNext() contract**: Core MaterializedQueryResult::getNext() throws if !hasNext(). Addon GetNextSync checks hasNext() and returns env.Null() without calling getNext(). GetNextAsync previously called getNext() even when !hasNext() β†’ throw. Fix: in Execute(), if !hasNext() set cppTuple=null and return; in OnOK(), pass env.Null() as value when cppTuple is null. +- **Risk**: Any other code path that calls getNext() without checking hasNext() will still throw (by design in core); addon must always guard. + +### Expert 3: API / tests + +- **Contract**: d.ts says `getNext(): Promise | null>`. So β€œno more rows” must be expressed as resolving with `null`, not throwing. +- **Tests**: register_stream expects getNext() after last row to return null; test_query_result was updated to expect null when exhausted instead of throw. +- **Risk**: Inconsistent use of hasNext() before getNext() in other tests or user code can still hit the core throw if the addon path is bypassed or a different API is used. + +--- + +## 3. Hypotheses (why β€œNo more tuples” or wrong order happened) + +| # | Hypothesis | Likelihood | Evidence | +|---|------------|------------|----------| +| H1 | Multiple threads called tableFunc; results merged by completion order, so first row could be id=3; then one thread called getNext() after exhaustion | High (before fix) | canParallelFunc default true; TaskScheduler runs task with N threads; each copy of operator calls getNextTuplesInternal. | +| H2 | GetNextAsync did not check hasNext() before getNext(); when test called getNext() the 4th time, C++ threw β€œNo more tuples” | High | Execute() had β€œif (!hasNext()) cppTuple.reset();” then unconditionally β€œcppTuple = getNext();” β†’ always threw when exhausted. | +| H3 | JS runConsumer delivered chunks out of order (e.g. request 2 before 1) | Low | pending is sorted by requestId; chunks[index] with index = requestId - 1; iterator consumed in order. | +| H4 | Engine calls tableFunc one extra time after 0 return | Low | No code path found that would call tableFunc again after return 0. | +| H5 | Empty chunk not handled (e.g. returnChunk(id, [], false) and engine expects more) | Low | C++ returns 0 for numRows==0; engine treats 0 as β€œno tuples this call”, continues until next tableFunc returns 0; JS can send [] and done=false for β€œno data this chunk” and later send done=true. | + +--- + +## 4. Brainstorm: design options + +| Option | Description | Pros | Cons | +|--------|-------------|------|------| +| A. Single-thread table func (current) | canParallelFunc = false for node stream | Simple, deterministic order, no cross-thread ordering bugs | Slightly less engine parallelism for this source only | +| B. Return null when exhausted (current) | Addon GetNextAsync: if !hasNext() return null, do not call getNext() | Matches d.ts and user expectation; tests can rely on getNext() β†’ null | Core still throws if getNext() called without hasNext(); only addon is defensive | +| C. Relax tests: order-independent | Tests collect all rows, sort by id, assert set | Resilient to any engine reorder | Hides real ordering bugs; less strict | +| D. Keep mutex in tableFunc | Already in place | Serializes tableFunc calls even if canParallelFunc were true | Redundant with A; adds lock contention if we ever parallelize | +| E. Core API: getNext() returns null | Change MaterializedQueryResult::getNext() to return null instead of throw when !hasNext() | Single contract everywhere | Breaking change for C++ API users who rely on throw | +| F. Document β€œalways use hasNext() before getNext()” | Keep throw in core, document in JS | Clear contract | Poor DX; register_stream tests expect null | + +**Best combination (current state):** + +- **A + B**: Single-thread table function + addon returns null when exhausted. No test relaxation (C). Mutex (D) is optional extra safety; can keep. No core API change (E) to avoid breaking C++ users. + +--- + +## 5. Checklist (what was fixed / what to verify) + +- [x] **canParallelFunc = false** for NodeStreamScanFunction β†’ single-thread pipeline, order preserved. +- [x] **GetNextAsync**: if !hasNext(), do not call getNext(); pass env.Null() to callback so JS gets null. +- [x] **test_query_result**: expect null when exhausted; test name updated. +- [ ] **Rebuild addon** and run full test suite (register_stream + query_result) to confirm no regressions. +- [x] **Optional**: Add a single test that iterates with hasNext() and asserts getNext() returns null exactly when hasNext() becomes false, to lock the contract (test_query_result.js: "getNext() returns null exactly when hasNext() is false"). + +--- + +## 6. One-page summary + +**Chain:** JS `query("LOAD FROM name")` β†’ Engine plans TableFunctionCall β†’ one task thread (canParallelFunc=false) calls tableFunc repeatedly β†’ C++ tableFunc uses mutex, nextRequestId(), BlockingCall(getChunk), waits on cv β†’ JS getChunk pushes requestId, runConsumer feeds iterator in request order and returnChunk β†’ C++ wakes, copies rows, returns count β†’ Engine accumulates into FactorizedTable β†’ MaterializedQueryResult β†’ JS hasNext/getNext. + +**Root causes of past failures:** (1) Parallel table function β†’ chunks completed out of order β†’ first row could be 3 instead of 1. (2) GetNextAsync called getNext() even when !hasNext() β†’ core threw β€œNo more tuples”. + +**Fixes applied:** (1) canParallelFunc = false. (2) GetNextAsync: if !hasNext() return null and skip getNext(); callback second arg = env.Null(). (3) Test expects null when exhausted. + +**Recommended:** Keep current design (A+B). Verify with full rebuild and tests. diff --git a/tools/nodejs_api/examples/README.md b/tools/nodejs_api/examples/README.md new file mode 100644 index 0000000000..63ace4baf5 --- /dev/null +++ b/tools/nodejs_api/examples/README.md @@ -0,0 +1,11 @@ +# Examples + +Run from `tools/nodejs_api` (after `make nodejs` or `npm run build`): + +```bash +node examples/quickstart.mjs +node examples/stream-load.mjs +``` + +- **quickstart.mjs** β€” In-memory DB, create table, load data from a stream via `COPY FROM (LOAD FROM ...)`, then query. +- **stream-load.mjs** β€” Register an async iterable and consume it with `LOAD FROM name RETURN *`. diff --git a/tools/nodejs_api/examples/quickstart.mjs b/tools/nodejs_api/examples/quickstart.mjs new file mode 100644 index 0000000000..d78ec07f9a --- /dev/null +++ b/tools/nodejs_api/examples/quickstart.mjs @@ -0,0 +1,32 @@ +/** + * Quickstart: in-memory database, create schema, load from stream, query. + * Run from tools/nodejs_api: node examples/quickstart.mjs + */ +import { Database, Connection } from "lbug"; + +async function* userRows() { + yield ["Alice", 30]; + yield ["Bob", 25]; +} + +const db = new Database(":memory:"); +const conn = new Connection(db); + +await conn.query(` + CREATE NODE TABLE User(name STRING, age INT64, PRIMARY KEY (name)); +`); + +await conn.registerStream("users", userRows(), { + columns: [ + { name: "name", type: "STRING" }, + { name: "age", type: "INT64" }, + ], +}); +await conn.query("COPY User FROM (LOAD FROM users RETURN *)"); +conn.unregisterStream("users"); + +const result = await conn.query("MATCH (u:User) RETURN u.name, u.age;"); +const rows = await result.getAll(); +console.log(rows); + +await db.close(); diff --git a/tools/nodejs_api/examples/stream-load.mjs b/tools/nodejs_api/examples/stream-load.mjs new file mode 100644 index 0000000000..9adc84f8a9 --- /dev/null +++ b/tools/nodejs_api/examples/stream-load.mjs @@ -0,0 +1,29 @@ +/** + * Load data from a JavaScript async iterable via LOAD FROM. + * Run from tools/nodejs_api: node examples/stream-load.mjs + */ +import { Database, Connection } from "lbug"; + +async function* generateRows() { + yield [1, "Alice"]; + yield [2, "Bob"]; + yield [3, "Carol"]; +} + +const db = new Database(":memory:"); +const conn = new Connection(db); + +await conn.registerStream("users", generateRows(), { + columns: [ + { name: "id", type: "INT64" }, + { name: "name", type: "STRING" }, + ], +}); + +const result = await conn.query("LOAD FROM users RETURN *"); +for await (const row of result) { + console.log(row); +} + +conn.unregisterStream("users"); +await db.close(); diff --git a/tools/nodejs_api/index.js b/tools/nodejs_api/index.js new file mode 100644 index 0000000000..04ea8a1618 --- /dev/null +++ b/tools/nodejs_api/index.js @@ -0,0 +1,4 @@ +"use strict"; + +// After `make nodejs` or `npm run build`, entry point is build/ +module.exports = require("./build"); diff --git a/tools/nodejs_api/index.mjs b/tools/nodejs_api/index.mjs new file mode 100644 index 0000000000..b638316b29 --- /dev/null +++ b/tools/nodejs_api/index.mjs @@ -0,0 +1,9 @@ +export { + default, + Database, + Connection, + PreparedStatement, + QueryResult, + VERSION, + STORAGE_VERSION, +} from "./build/index.mjs"; diff --git a/tools/nodejs_api/install.js b/tools/nodejs_api/install.js index 6a010b335a..7df6e140fc 100644 --- a/tools/nodejs_api/install.js +++ b/tools/nodejs_api/install.js @@ -8,51 +8,72 @@ const process = require("process"); const isNpmBuildFromSourceSet = process.env.npm_config_build_from_source; const platform = process.platform; const arch = process.arch; + +// Skip when already built (e.g. local dev after make nodejs) +if (fsCallback.existsSync(path.join(__dirname, "build", "lbugjs.node"))) { + process.exit(0); +} + const prebuiltPath = path.join( __dirname, "prebuilt", `lbugjs-${platform}-${arch}.node` ); +const buildDir = path.join(__dirname, "build"); +const srcJsDir = path.join(__dirname, "src_js"); +const lbugSourceDir = path.join(__dirname, "lbug-source"); + // Check if building from source is forced if (isNpmBuildFromSourceSet) { console.log( "The NPM_CONFIG_BUILD_FROM_SOURCE environment variable is set. Building from source." ); } -// Check if prebuilt binaries are available +// Prebuilt available + git-clone layout (src_js present, no lbug-source): use prebuilt and copy src_js β†’ build/ +else if (fsCallback.existsSync(prebuiltPath) && fsCallback.existsSync(srcJsDir)) { + console.log("Prebuilt binary is available (git clone layout)."); + if (!fsCallback.existsSync(buildDir)) { + fsCallback.mkdirSync(buildDir, { recursive: true }); + } + fs.copyFileSync(prebuiltPath, path.join(buildDir, "lbugjs.node")); + const jsFiles = fs.readdirSync(srcJsDir).filter((file) => { + return file.endsWith(".js") || file.endsWith(".mjs") || file.endsWith(".d.ts"); + }); + for (const file of jsFiles) { + fs.copyFileSync(path.join(srcJsDir, file), path.join(buildDir, file)); + } + console.log("Done! Prebuilt + JS copied to build/."); + process.exit(0); +} +// Prebuilt available + tarball layout (lbug-source present): copy to root (legacy publish flow) else if (fsCallback.existsSync(prebuiltPath)) { console.log("Prebuilt binary is available."); - console.log("Copying prebuilt binary to package directory..."); fs.copyFileSync(prebuiltPath, path.join(__dirname, "lbugjs.node")); - console.log( - `Copied ${prebuiltPath} -> ${path.join(__dirname, "lbugjs.node")}.` - ); - console.log("Copying JS files to package directory..."); - const jsSourceDir = path.join( - __dirname, - "lbug-source", - "tools", - "nodejs_api", - "src_js" - ); + const jsSourceDir = path.join(lbugSourceDir, "tools", "nodejs_api", "src_js"); const jsFiles = fs.readdirSync(jsSourceDir).filter((file) => { return file.endsWith(".js") || file.endsWith(".mjs") || file.endsWith(".d.ts"); }); - console.log("Files to copy: "); - for (const file of jsFiles) { - console.log(" " + file); - } for (const file of jsFiles) { fs.copyFileSync(path.join(jsSourceDir, file), path.join(__dirname, file)); } - console.log("Copied JS files to package directory."); console.log("Done!"); process.exit(0); } else { console.log("Prebuilt binary is not available, building from source..."); } +if (!fsCallback.existsSync(lbugSourceDir)) { + console.error( + "lbug-source/ not found (install from git clone). Add prebuilt binary to prebuilt/lbugjs-" + + platform + + "-" + + arch + + ".node and commit, or install from a full clone and build there." + ); + process.exit(1); +} + // Get number of threads const THREADS = os.cpus().length; console.log(`Using ${THREADS} threads to build Lbug.`); diff --git a/tools/nodejs_api/package.json b/tools/nodejs_api/package.json index f6ff61a8b3..f574bf8036 100644 --- a/tools/nodejs_api/package.json +++ b/tools/nodejs_api/package.json @@ -5,14 +5,19 @@ "main": "index.js", "module": "./index.mjs", "types": "./lbug.d.ts", - "exports":{ - ".":{ + "exports": { + ".": { "require": "./index.js", "import": "./index.mjs", "types": "./lbug.d.ts" } }, - "files": ["index.js", "index.mjs", "lbug.d.ts", "lbugjs.node"], + "files": [ + "index.js", + "index.mjs", + "lbug.d.ts", + "lbugjs.node" + ], "type": "commonjs", "homepage": "https://ladybugdb.com/", "repository": { @@ -20,6 +25,7 @@ "url": "git+https://github.com/LadybugDB/ladybug.git" }, "scripts": { + "install": "node install.js", "test": "mocha test --timeout 20000", "clean": "node clean.js", "clean-all": "node clean.js all", @@ -37,4 +43,4 @@ "cmake-js": "^7.3.0", "node-addon-api": "^6.0.0" } -} +} \ No newline at end of file diff --git a/tools/nodejs_api/src_cpp/include/node_connection.h b/tools/nodejs_api/src_cpp/include/node_connection.h index caacef92c3..907c478bea 100644 --- a/tools/nodejs_api/src_cpp/include/node_connection.h +++ b/tools/nodejs_api/src_cpp/include/node_connection.h @@ -8,6 +8,7 @@ #include "node_prepared_statement.h" #include "node_progress_bar_display.h" #include "node_query_result.h" +#include "node_scan_replacement.h" #include using namespace lbug::main; @@ -30,15 +31,20 @@ class NodeConnection : public Napi::ObjectWrap { void InitCppConnection(); void SetMaxNumThreadForExec(const Napi::CallbackInfo& info); void SetQueryTimeout(const Napi::CallbackInfo& info); + void Interrupt(const Napi::CallbackInfo& info); Napi::Value ExecuteAsync(const Napi::CallbackInfo& info); Napi::Value QueryAsync(const Napi::CallbackInfo& info); Napi::Value ExecuteSync(const Napi::CallbackInfo& info); Napi::Value QuerySync(const Napi::CallbackInfo& info); void Close(const Napi::CallbackInfo& info); + Napi::Value RegisterStream(const Napi::CallbackInfo& info); + void UnregisterStream(const Napi::CallbackInfo& info); + void ReturnChunk(const Napi::CallbackInfo& info); private: std::shared_ptr database; std::shared_ptr connection; + std::unique_ptr streamRegistry; }; class ConnectionInitAsyncWorker : public Napi::AsyncWorker { diff --git a/tools/nodejs_api/src_cpp/include/node_progress_bar_display.h b/tools/nodejs_api/src_cpp/include/node_progress_bar_display.h index 7813820cb0..e777c94af5 100644 --- a/tools/nodejs_api/src_cpp/include/node_progress_bar_display.h +++ b/tools/nodejs_api/src_cpp/include/node_progress_bar_display.h @@ -13,6 +13,8 @@ using namespace common; */ class NodeProgressBarDisplay : public ProgressBarDisplay { public: + ~NodeProgressBarDisplay() override; + void updateProgress(uint64_t queryID, double newPipelineProgress, uint32_t newNumPipelinesFinished) override; diff --git a/tools/nodejs_api/src_cpp/include/node_query_result.h b/tools/nodejs_api/src_cpp/include/node_query_result.h index b9ee4db979..77129e8b55 100644 --- a/tools/nodejs_api/src_cpp/include/node_query_result.h +++ b/tools/nodejs_api/src_cpp/include/node_query_result.h @@ -102,6 +102,7 @@ class NodeQueryResultGetNextAsyncWorker : public Napi::AsyncWorker { try { if (!nodeQueryResult->queryResult->hasNext()) { cppTuple.reset(); + return; } cppTuple = nodeQueryResult->queryResult->getNext(); } catch (const std::exception& exc) { @@ -112,7 +113,7 @@ class NodeQueryResultGetNextAsyncWorker : public Napi::AsyncWorker { inline void OnOK() override { auto env = Env(); if (cppTuple == nullptr) { - Callback().Call({env.Null(), env.Undefined()}); + Callback().Call({env.Null(), env.Null()}); return; } Napi::Object nodeTuple = Napi::Object::New(env); diff --git a/tools/nodejs_api/src_cpp/include/node_scan_replacement.h b/tools/nodejs_api/src_cpp/include/node_scan_replacement.h new file mode 100644 index 0000000000..bd0109005a --- /dev/null +++ b/tools/nodejs_api/src_cpp/include/node_scan_replacement.h @@ -0,0 +1,60 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "function/table/scan_replacement.h" +#include "function/table/table_function.h" +#include + +namespace lbug { +namespace main { +class Connection; +} +namespace common { +class LogicalType; +} +} + +struct NodeStreamChunkRequest { + std::mutex mtx; + std::condition_variable cv; + std::vector> rows; + std::vector columnNames; // schema order for object rows + bool done = false; + bool filled = false; +}; + +struct NodeStreamSourceState { + Napi::ThreadSafeFunction getChunkTsf; + std::vector columnNames; + std::vector columnTypes; +}; + +class NodeStreamRegistry { +public: + void registerSource(const std::string& name, Napi::ThreadSafeFunction tsf, + std::vector columnNames, + std::vector columnTypes); + void unregisterSource(const std::string& name); + std::vector lookup(const std::string& name) const; + std::unique_ptr replace( + std::span handles) const; + + static NodeStreamChunkRequest* getChunkRequest(uint64_t requestId); + static void setChunkRequest(uint64_t requestId, std::unique_ptr req); + static uint64_t nextRequestId(); + +private: + mutable std::mutex mtx_; + std::unordered_map> sources_; +}; + +void addNodeScanReplacement(lbug::main::Connection* connection, NodeStreamRegistry* registry); + +void returnChunkFromJS(uint64_t requestId, Napi::Array rowsNapi, bool done); diff --git a/tools/nodejs_api/src_cpp/include/node_stream_scan.h b/tools/nodejs_api/src_cpp/include/node_stream_scan.h new file mode 100644 index 0000000000..5c7328b6be --- /dev/null +++ b/tools/nodejs_api/src_cpp/include/node_stream_scan.h @@ -0,0 +1,29 @@ +#pragma once + +#include "function/table/bind_data.h" +#include "function/table/table_function.h" + +struct NodeStreamSourceState; // defined in node_scan_replacement.h + +struct NodeStreamScanFunctionData : lbug::function::TableFuncBindData { + std::shared_ptr state; + + NodeStreamScanFunctionData(lbug::binder::expression_vector columns, + std::shared_ptr state) + : TableFuncBindData(std::move(columns), 0), state(std::move(state)) {} + + std::unique_ptr copy() const override { + return std::make_unique(columns, state); + } +}; + +namespace lbug { +namespace function { + +struct NodeStreamScanFunction { + static constexpr const char* name = "NODE_STREAM_SCAN"; + static TableFunction getFunction(); +}; + +} // namespace function +} // namespace lbug diff --git a/tools/nodejs_api/src_cpp/include/node_util.h b/tools/nodejs_api/src_cpp/include/node_util.h index 4ea7a50674..cd27b21371 100644 --- a/tools/nodejs_api/src_cpp/include/node_util.h +++ b/tools/nodejs_api/src_cpp/include/node_util.h @@ -10,10 +10,10 @@ class Util { static Napi::Value ConvertToNapiObject(const Value& value, Napi::Env env); static std::unordered_map> TransformParametersForExec( Napi::Array params); + static Value TransformNapiValue(Napi::Value napiValue); private: static Napi::Object ConvertNodeIdToNapiObject(const nodeID_t& nodeId, Napi::Env env); - static Value TransformNapiValue(Napi::Value napiValue); const static int64_t JS_MAX_SAFE_INTEGER = 9007199254740991; const static int64_t JS_MIN_SAFE_INTEGER = -9007199254740991; }; diff --git a/tools/nodejs_api/src_cpp/node_connection.cpp b/tools/nodejs_api/src_cpp/node_connection.cpp index 404903904f..4427de0e18 100644 --- a/tools/nodejs_api/src_cpp/node_connection.cpp +++ b/tools/nodejs_api/src_cpp/node_connection.cpp @@ -1,9 +1,11 @@ #include "include/node_connection.h" +#include #include #include "include/node_database.h" #include "include/node_query_result.h" +#include "include/node_scan_replacement.h" #include "include/node_util.h" #include "main/lbug.h" @@ -19,7 +21,11 @@ Napi::Object NodeConnection::Init(Napi::Env env, Napi::Object exports) { InstanceMethod("querySync", &NodeConnection::QuerySync), InstanceMethod("setMaxNumThreadForExec", &NodeConnection::SetMaxNumThreadForExec), InstanceMethod("setQueryTimeout", &NodeConnection::SetQueryTimeout), - InstanceMethod("close", &NodeConnection::Close)}); + InstanceMethod("interrupt", &NodeConnection::Interrupt), + InstanceMethod("close", &NodeConnection::Close), + InstanceMethod("registerStream", &NodeConnection::RegisterStream), + InstanceMethod("unregisterStream", &NodeConnection::UnregisterStream), + InstanceMethod("returnChunk", &NodeConnection::ReturnChunk)}); exports.Set("NodeConnection", t); return exports; @@ -57,6 +63,8 @@ void NodeConnection::InitCppConnection() { this->connection = std::make_shared(database.get()); ProgressBar::Get(*connection->getClientContext()) ->setDisplay(std::make_shared()); + streamRegistry = std::make_unique(); + addNodeScanReplacement(connection.get(), streamRegistry.get()); // After the connection is initialized, we do not need to hold a reference to the database. database.reset(); } @@ -83,9 +91,16 @@ void NodeConnection::SetQueryTimeout(const Napi::CallbackInfo& info) { } } +void NodeConnection::Interrupt(const Napi::CallbackInfo& /* info */) { + if (this->connection) { + this->connection->interrupt(); + } +} + void NodeConnection::Close(const Napi::CallbackInfo& info) { Napi::Env env = info.Env(); Napi::HandleScope scope(env); + streamRegistry.reset(); this->connection.reset(); } @@ -157,3 +172,93 @@ Napi::Value NodeConnection::QueryAsync(const Napi::CallbackInfo& info) { asyncWorker->Queue(); return info.Env().Undefined(); } + +static lbug::common::LogicalType parseColumnType(const std::string& typeStr) { + std::string upper = typeStr; + std::transform(upper.begin(), upper.end(), upper.begin(), ::toupper); + if (upper == "INT64") return lbug::common::LogicalType(lbug::common::LogicalTypeID::INT64); + if (upper == "INT32") return lbug::common::LogicalType(lbug::common::LogicalTypeID::INT32); + if (upper == "INT16") return lbug::common::LogicalType(lbug::common::LogicalTypeID::INT16); + if (upper == "INT8") return lbug::common::LogicalType(lbug::common::LogicalTypeID::INT8); + if (upper == "UINT64") return lbug::common::LogicalType(lbug::common::LogicalTypeID::UINT64); + if (upper == "UINT32") return lbug::common::LogicalType(lbug::common::LogicalTypeID::UINT32); + if (upper == "UINT16") return lbug::common::LogicalType(lbug::common::LogicalTypeID::UINT16); + if (upper == "UINT8") return lbug::common::LogicalType(lbug::common::LogicalTypeID::UINT8); + if (upper == "DOUBLE") return lbug::common::LogicalType(lbug::common::LogicalTypeID::DOUBLE); + if (upper == "FLOAT") return lbug::common::LogicalType(lbug::common::LogicalTypeID::FLOAT); + if (upper == "STRING") return lbug::common::LogicalType(lbug::common::LogicalTypeID::STRING); + if (upper == "BOOL" || upper == "BOOLEAN") + return lbug::common::LogicalType(lbug::common::LogicalTypeID::BOOL); + if (upper == "DATE") return lbug::common::LogicalType(lbug::common::LogicalTypeID::DATE); + if (upper == "TIMESTAMP") + return lbug::common::LogicalType(lbug::common::LogicalTypeID::TIMESTAMP); + throw std::runtime_error("Unsupported column type for registerStream: " + typeStr); +} + +Napi::Value NodeConnection::RegisterStream(const Napi::CallbackInfo& info) { + Napi::Env env = info.Env(); + Napi::HandleScope scope(env); + if (!connection || !streamRegistry) { + Napi::Error::New(env, "Connection not initialized.").ThrowAsJavaScriptException(); + return env.Undefined(); + } + if (info.Length() < 3 || !info[0].IsString() || !info[1].IsFunction() || !info[2].IsArray()) { + Napi::Error::New(env, + "registerStream(name, getChunkCallback, columns): name string, getChunkCallback " + "function(requestId), columns array of { name, type }.") + .ThrowAsJavaScriptException(); + return env.Undefined(); + } + std::string name = info[0].As().Utf8Value(); + Napi::Function getChunkCallback = info[1].As(); + Napi::Array columnsArr = info[2].As(); + std::vector columnNames; + std::vector columnTypes; + for (uint32_t i = 0; i < columnsArr.Length(); i++) { + Napi::Value col = columnsArr.Get(i); + if (!col.IsObject()) continue; + Napi::Object obj = col.As(); + if (!obj.Get("name").IsString() || !obj.Get("type").IsString()) continue; + columnNames.push_back(obj.Get("name").As().Utf8Value()); + columnTypes.push_back( + parseColumnType(obj.Get("type").As().Utf8Value())); + } + if (columnNames.empty()) { + Napi::Error::New(env, "registerStream: at least one column required.").ThrowAsJavaScriptException(); + return env.Undefined(); + } + try { + auto tsf = Napi::ThreadSafeFunction::New( + env, getChunkCallback, "NodeStreamGetChunk", 0, 1); + streamRegistry->registerSource(name, std::move(tsf), std::move(columnNames), + std::move(columnTypes)); + } catch (const std::exception& exc) { + Napi::Error::New(env, std::string(exc.what())).ThrowAsJavaScriptException(); + } + return env.Undefined(); +} + +void NodeConnection::UnregisterStream(const Napi::CallbackInfo& info) { + Napi::Env env = info.Env(); + Napi::HandleScope scope(env); + if (!streamRegistry) return; + if (info.Length() < 1 || !info[0].IsString()) { + Napi::Error::New(env, "unregisterStream(name): name string.").ThrowAsJavaScriptException(); + return; + } + streamRegistry->unregisterSource(info[0].As().Utf8Value()); +} + +void NodeConnection::ReturnChunk(const Napi::CallbackInfo& info) { + Napi::Env env = info.Env(); + if (info.Length() < 3 || !info[0].IsNumber() || !info[1].IsArray() || !info[2].IsBoolean()) { + Napi::Error::New(env, + "returnChunk(requestId, rows, done): requestId number, rows array of rows, done boolean.") + .ThrowAsJavaScriptException(); + return; + } + uint64_t requestId = static_cast(info[0].ToNumber().Int64Value()); + Napi::Array rows = info[1].As(); + bool done = info[2].ToBoolean().Value(); + returnChunkFromJS(requestId, rows, done); +} diff --git a/tools/nodejs_api/src_cpp/node_progress_bar_display.cpp b/tools/nodejs_api/src_cpp/node_progress_bar_display.cpp index 352775624a..a76dbd953e 100644 --- a/tools/nodejs_api/src_cpp/node_progress_bar_display.cpp +++ b/tools/nodejs_api/src_cpp/node_progress_bar_display.cpp @@ -3,6 +3,14 @@ using namespace lbug; using namespace common; +NodeProgressBarDisplay::~NodeProgressBarDisplay() { + std::unique_lock lock(callbackMutex); + for (auto& kv : queryCallbacks) { + kv.second.Release(); + } + queryCallbacks.clear(); +} + void NodeProgressBarDisplay::updateProgress(uint64_t queryID, double newPipelineProgress, uint32_t newNumPipelinesFinished) { if (numPipelines == 0) { diff --git a/tools/nodejs_api/src_cpp/node_scan_replacement.cpp b/tools/nodejs_api/src_cpp/node_scan_replacement.cpp new file mode 100644 index 0000000000..8fdb757f46 --- /dev/null +++ b/tools/nodejs_api/src_cpp/node_scan_replacement.cpp @@ -0,0 +1,134 @@ +#include "include/node_scan_replacement.h" +#include "include/node_stream_scan.h" +#include "include/node_util.h" + +#include "function/table/bind_input.h" +#include "main/client_context.h" +#include "main/connection.h" + +#include + +using namespace lbug::common; +using namespace lbug::function; +using namespace lbug::main; + +namespace { + +std::mutex g_requestMutex; +std::atomic g_nextRequestId{1}; +std::unordered_map> g_chunkRequests; + +} // namespace + +void NodeStreamRegistry::registerSource(const std::string& name, Napi::ThreadSafeFunction tsf, + std::vector columnNames, std::vector columnTypes) { + std::lock_guard lock(mtx_); + auto state = std::make_shared(); + state->getChunkTsf = std::move(tsf); + state->columnNames = std::move(columnNames); + state->columnTypes = std::move(columnTypes); + sources_[name] = std::move(state); +} + +void NodeStreamRegistry::unregisterSource(const std::string& name) { + std::lock_guard lock(mtx_); + sources_.erase(name); +} + +std::vector NodeStreamRegistry::lookup(const std::string& name) const { + std::lock_guard lock(mtx_); + auto it = sources_.find(name); + if (it == sources_.end()) { + return {}; + } + return {reinterpret_cast(it->second.get())}; +} + +std::unique_ptr NodeStreamRegistry::replace( + std::span handles) const { + if (handles.empty()) { + return nullptr; + } + auto* statePtr = reinterpret_cast(handles[0]); + auto state = std::shared_ptr(statePtr, [](NodeStreamSourceState*) {}); + auto data = std::make_unique(); + data->func = NodeStreamScanFunction::getFunction(); + data->bindInput.addLiteralParam(Value::createValue(reinterpret_cast(statePtr))); + return data; +} + +NodeStreamChunkRequest* NodeStreamRegistry::getChunkRequest(uint64_t requestId) { + std::lock_guard lock(g_requestMutex); + auto it = g_chunkRequests.find(requestId); + return it != g_chunkRequests.end() ? it->second.get() : nullptr; +} + +void NodeStreamRegistry::setChunkRequest(uint64_t requestId, + std::unique_ptr req) { + std::lock_guard lock(g_requestMutex); + if (req) { + g_chunkRequests[requestId] = std::move(req); + } else { + g_chunkRequests.erase(requestId); + } +} + +uint64_t NodeStreamRegistry::nextRequestId() { + return g_nextRequestId++; +} + +static std::vector lookupNodeStream(const std::string& objectName, + void* registryVoid) { + auto* registry = static_cast(registryVoid); + return registry->lookup(objectName); +} + +static std::unique_ptr replaceNodeStream( + std::span handles, void* registryVoid) { + auto* registry = static_cast(registryVoid); + return registry->replace(handles); +} + +void addNodeScanReplacement(Connection* connection, NodeStreamRegistry* registry) { + auto lookup = [registry](const std::string& name) { + return lookupNodeStream(name, registry); + }; + auto replace = [registry](std::span handles) { + return replaceNodeStream(handles, registry); + }; + connection->getClientContext()->addScanReplace(ScanReplacement(std::move(lookup), replace)); +} + +void returnChunkFromJS(uint64_t requestId, Napi::Array rowsNapi, bool done) { + auto* req = NodeStreamRegistry::getChunkRequest(requestId); + if (!req) { + return; + } + std::vector> rows; + const size_t numRows = rowsNapi.Length(); + rows.reserve(numRows); + for (size_t r = 0; r < numRows; r++) { + Napi::Value rowVal = rowsNapi.Get(r); + std::vector row; + if (rowVal.IsArray()) { + auto arr = rowVal.As(); + for (size_t c = 0; c < arr.Length(); c++) { + row.push_back(Util::TransformNapiValue(arr.Get(c))); + } + } else if (rowVal.IsObject() && !rowVal.IsNull() && !rowVal.IsUndefined()) { + auto obj = rowVal.As(); + const auto& colNames = req->columnNames; + for (const auto& colName : colNames) { + row.push_back(Util::TransformNapiValue(obj.Get(colName))); + } + } + rows.push_back(std::move(row)); + } + { + std::lock_guard lock(req->mtx); + req->rows = std::move(rows); + req->done = done; + req->filled = true; + } + req->cv.notify_one(); +} diff --git a/tools/nodejs_api/src_cpp/node_stream_scan.cpp b/tools/nodejs_api/src_cpp/node_stream_scan.cpp new file mode 100644 index 0000000000..529c10b91d --- /dev/null +++ b/tools/nodejs_api/src_cpp/node_stream_scan.cpp @@ -0,0 +1,103 @@ +#include "include/node_stream_scan.h" +#include "include/node_scan_replacement.h" + +#include "binder/binder.h" +#include "common/constants.h" +#include "common/system_config.h" +#include "function/table/bind_input.h" +#include "processor/execution_context.h" +#include "processor/result/factorized_table.h" + +#include + +using namespace lbug::common; +using namespace lbug::function; + +namespace lbug { + +namespace { +std::mutex g_nodeStreamTableFuncMutex; +} + +static std::unique_ptr bindFunc(lbug::main::ClientContext*, + const TableFuncBindInput* input) { + auto* statePtr = reinterpret_cast(input->getLiteralVal(0)); + KU_ASSERT(statePtr != nullptr); + std::shared_ptr state(statePtr, [](NodeStreamSourceState*) {}); + auto columns = input->binder->createVariables(state->columnNames, state->columnTypes); + return std::make_unique(std::move(columns), state); +} + +static std::unique_ptr initSharedState( + const TableFuncInitSharedStateInput&) { + return std::make_unique(0); +} + +static std::unique_ptr initLocalState( + const TableFuncInitLocalStateInput&) { + return std::make_unique(); +} + +static offset_t tableFunc(const TableFuncInput& input, TableFuncOutput& output) { + auto* bindData = input.bindData->constPtrCast(); + auto& state = *bindData->state; + std::unique_lock streamLock(g_nodeStreamTableFuncMutex); + const uint64_t requestId = NodeStreamRegistry::nextRequestId(); + auto req = std::make_unique(); + req->columnNames = state.columnNames; + NodeStreamRegistry::setChunkRequest(requestId, std::move(req)); + NodeStreamChunkRequest* reqPtr = NodeStreamRegistry::getChunkRequest(requestId); + KU_ASSERT(reqPtr != nullptr); + + state.getChunkTsf.BlockingCall(&requestId, + [](Napi::Env env, Napi::Function jsCallback, const uint64_t* idPtr) { + jsCallback.Call({Napi::Number::New(env, static_cast(*idPtr))}); + }); + + std::vector> rowsCopy; + { + std::unique_lock lock(reqPtr->mtx); + reqPtr->cv.wait(lock, [reqPtr] { return reqPtr->filled; }); + rowsCopy = std::move(reqPtr->rows); + } + NodeStreamRegistry::setChunkRequest(requestId, nullptr); + streamLock.unlock(); + + const offset_t numRows = static_cast(rowsCopy.size()); + if (numRows == 0) { + return 0; + } + + const auto numCols = bindData->getNumColumns(); + const auto cap = std::min(numRows, static_cast(DEFAULT_VECTOR_CAPACITY)); + for (offset_t r = 0; r < cap; r++) { + for (auto c = 0u; c < numCols; c++) { + auto& vec = output.dataChunk.getValueVectorMutable(c); + if (c < rowsCopy[r].size() && !rowsCopy[r][c].isNull()) { + vec.setNull(r, false); + vec.copyFromValue(r, rowsCopy[r][c]); + } else { + vec.setNull(r, true); + } + } + } + output.dataChunk.state->getSelVectorUnsafe().setSelSize(cap); + return cap; +} + +static double progressFunc(TableFuncSharedState*) { + return 0.0; +} + +TableFunction NodeStreamScanFunction::getFunction() { + TableFunction func(name, std::vector{LogicalTypeID::POINTER}); + func.tableFunc = tableFunc; + func.bindFunc = bindFunc; + func.initSharedStateFunc = initSharedState; + func.initLocalStateFunc = initLocalState; + func.progressFunc = progressFunc; + func.canParallelFunc = [] { return false; }; + return func; +} + +} // namespace lbug diff --git a/tools/nodejs_api/src_js/connection.js b/tools/nodejs_api/src_js/connection.js index 575d9f27ff..d1c8146755 100644 --- a/tools/nodejs_api/src_js/connection.js +++ b/tools/nodejs_api/src_js/connection.js @@ -121,13 +121,20 @@ class Connection { * Execute a prepared statement with the given parameters. * @param {lbug.PreparedStatement} preparedStatement the prepared statement to execute. * @param {Object} params a plain object mapping parameter names to values. - * @param {Function} [progressCallback] - Optional callback function that is invoked with the progress of the query execution. The callback receives three arguments: pipelineProgress, numPipelinesFinished, and numPipelines. - * @returns {Promise} a promise that resolves to the query result. The promise is rejected if there is an error. + * @param {Object|Function} [optionsOrProgressCallback] - Options { signal?: AbortSignal, progressCallback?: Function } or legacy progress callback. + * @returns {Promise} a promise that resolves to the query result. Rejects if error or options.signal is aborted. */ - execute(preparedStatement, params = {}, progressCallback) { + execute(preparedStatement, params = {}, optionsOrProgressCallback) { + const { signal, progressCallback } = this._normalizeQueryOptions(optionsOrProgressCallback); return new Promise((resolve, reject) => { + if (progressCallback !== undefined && typeof progressCallback !== "function") { + return reject(new Error("progressCallback must be a function.")); + } + if (optionsOrProgressCallback != null && typeof optionsOrProgressCallback !== "function" && typeof optionsOrProgressCallback !== "object") { + return reject(new Error("progressCallback must be a function.")); + } if ( - !typeof preparedStatement === "object" || + typeof preparedStatement !== "object" || preparedStatement.constructor.name !== "PreparedStatement" ) { return reject( @@ -147,11 +154,29 @@ class Connection { const value = params[key]; paramArray.push([key, value]); } - if (progressCallback && typeof progressCallback !== "function") { - return reject(new Error("progressCallback must be a function.")); + if (signal?.aborted) { + return reject(this._createAbortError()); + } + let abortListener; + const cleanup = () => { + if (signal && abortListener) { + signal.removeEventListener("abort", abortListener); + } + }; + if (signal) { + abortListener = () => { + this.interrupt(); + cleanup(); + reject(this._createAbortError()); + }; + signal.addEventListener("abort", abortListener); } this._getConnection() .then((connection) => { + if (signal?.aborted) { + cleanup(); + return reject(this._createAbortError()); + } const nodeQueryResult = new LbugNative.NodeQueryResult(); try { connection.executeAsync( @@ -159,7 +184,11 @@ class Connection { nodeQueryResult, paramArray, (err) => { + cleanup(); if (err) { + if (signal?.aborted && err.message === "Interrupted.") { + return reject(this._createAbortError()); + } return reject(err); } this._unwrapMultipleQueryResults(nodeQueryResult) @@ -173,10 +202,12 @@ class Connection { progressCallback ); } catch (e) { + cleanup(); return reject(e); } }) .catch((err) => { + cleanup(); return reject(err); }); }); @@ -261,26 +292,65 @@ class Connection { return new PreparedStatement(this, preparedStatement); } + /** + * Interrupt the currently executing query on this connection. + * No-op if the connection is not initialized or no query is running. + */ + interrupt() { + if (this._connection) { + this._connection.interrupt(); + } + } + /** * Execute a query. * @param {String} statement the statement to execute. - * @param {Function} [progressCallback] - Optional callback function that is invoked with the progress of the query execution. The callback receives three arguments: pipelineProgress, numPipelinesFinished, and numPipelines. - * @returns {Promise} a promise that resolves to the query result. The promise is rejected if there is an error. + * @param {Object|Function} [optionsOrProgressCallback] - Options object { signal?: AbortSignal, progressCallback?: Function } or legacy progress callback. + * @returns {Promise} a promise that resolves to the query result. The promise is rejected if there is an error or if options.signal is aborted. */ - query(statement, progressCallback) { + query(statement, optionsOrProgressCallback) { + const { signal, progressCallback } = this._normalizeQueryOptions(optionsOrProgressCallback); return new Promise((resolve, reject) => { + if (progressCallback !== undefined && typeof progressCallback !== "function") { + return reject(new Error("progressCallback must be a function.")); + } + if (optionsOrProgressCallback != null && typeof optionsOrProgressCallback !== "function" && typeof optionsOrProgressCallback !== "object") { + return reject(new Error("progressCallback must be a function.")); + } if (typeof statement !== "string") { return reject(new Error("statement must be a string.")); } - if (progressCallback && typeof progressCallback !== "function") { - return reject(new Error("progressCallback must be a function.")); + if (signal?.aborted) { + return reject(this._createAbortError()); + } + let abortListener; + const cleanup = () => { + if (signal && abortListener) { + signal.removeEventListener("abort", abortListener); + } + }; + if (signal) { + abortListener = () => { + this.interrupt(); + cleanup(); + reject(this._createAbortError()); + }; + signal.addEventListener("abort", abortListener); } this._getConnection() .then((connection) => { + if (signal?.aborted) { + cleanup(); + return reject(this._createAbortError()); + } const nodeQueryResult = new LbugNative.NodeQueryResult(); try { connection.queryAsync(statement, nodeQueryResult, (err) => { + cleanup(); if (err) { + if (signal?.aborted && err.message === "Interrupted.") { + return reject(this._createAbortError()); + } return reject(err); } this._unwrapMultipleQueryResults(nodeQueryResult) @@ -293,15 +363,127 @@ class Connection { }, progressCallback); } catch (e) { + cleanup(); return reject(e); } }) .catch((err) => { + cleanup(); return reject(err); }); }); } + _normalizeQueryOptions(optionsOrProgressCallback) { + if (optionsOrProgressCallback == null) { + return { signal: undefined, progressCallback: undefined }; + } + if (typeof optionsOrProgressCallback === "function") { + return { signal: undefined, progressCallback: optionsOrProgressCallback }; + } + if (typeof optionsOrProgressCallback === "object" && optionsOrProgressCallback !== null) { + return { + signal: optionsOrProgressCallback.signal, + progressCallback: optionsOrProgressCallback.progressCallback, + }; + } + return { signal: undefined, progressCallback: undefined }; + } + + _createAbortError() { + return new DOMException("The operation was aborted.", "AbortError"); + } + + /** + * Check that the connection is alive (e.g. for connection pools or health checks). + * Runs a trivial query; rejects if the connection is broken. + * @returns {Promise} resolves to true if the connection is OK. + */ + async ping() { + const result = await this.query("RETURN 1"); + const closeResult = (r) => { + if (Array.isArray(r)) { + r.forEach((q) => q.close()); + } else { + r.close(); + } + }; + closeResult(result); + return true; + } + + /** + * Register a stream source for LOAD FROM name. The source must be AsyncIterable; each yielded + * value is a row (array of column values in schema order, or object keyed by column name). + * Call unregisterStream(name) when done or before reusing the name. + * @param {string} name – name used in Cypher: LOAD FROM name RETURN ... + * @param {AsyncIterable|Object>} source – async iterable of rows + * @param {{ columns: Array<{ name: string, type: string }> }} options – schema (required). type: INT64, INT32, DOUBLE, STRING, BOOL, DATE, etc. + */ + async registerStream(name, source, options = {}) { + if (typeof name !== "string") { + throw new Error("registerStream: name must be a string."); + } + const columns = options.columns; + if (!Array.isArray(columns) || columns.length === 0) { + throw new Error("registerStream: options.columns (array of { name, type }) is required."); + } + const conn = await this._getConnection(); + const it = source[Symbol.asyncIterator] ? source[Symbol.asyncIterator].call(source) : source; + const pending = []; + let consumerRunning = false; + + const toRows = (raw) => { + if (raw == null) return []; + if (Array.isArray(raw)) { + const first = raw[0]; + const isArrayOfRows = + raw.length > 0 && + (Array.isArray(first) || (typeof first === "object" && first !== null && !Array.isArray(first))); + return isArrayOfRows ? raw : [raw]; + } + return [raw]; + }; + + const runConsumer = async () => { + pending.sort((a, b) => a - b); + while (pending.length > 0) { + const requestId = pending.shift(); + try { + const n = await it.next(); + const { rows, done } = { rows: toRows(n.value), done: n.done }; + conn.returnChunk(requestId, rows, done); + } catch (e) { + conn.returnChunk(requestId, [], true); + } + } + consumerRunning = false; + }; + + const getChunk = (requestId) => { + pending.push(requestId); + if (!consumerRunning) { + consumerRunning = true; + setImmediate(() => runConsumer()); + } + }; + conn.registerStream(name, getChunk, columns); + } + + /** + * Unregister a stream source by name. + * @param {string} name – name passed to registerStream + */ + unregisterStream(name) { + if (typeof name !== "string") { + throw new Error("unregisterStream: name must be a string."); + } + if (!this._connection) { + return; + } + this._connection.unregisterStream(name); + } + /** * Execute a query synchronously. * @param {String} statement the statement to execute. This function blocks the main thread for the duration of the query, so use it with caution. @@ -396,6 +578,37 @@ class Connection { } } + /** + * Run a function inside a single write transaction. On success commits, on throw rolls back and rethrows. + * Uses Cypher BEGIN TRANSACTION / COMMIT / ROLLBACK under the hood. + * @param {Function} fn async function to run; can use this connection's query/execute inside. + * @returns {Promise<*>} the value returned by fn. + */ + async transaction(fn) { + if (typeof fn !== "function") { + throw new Error("transaction() requires a function."); + } + const closeResult = (r) => { + if (Array.isArray(r)) { + r.forEach((q) => q.close()); + } else { + r.close(); + } + }; + const beginRes = await this.query("BEGIN TRANSACTION"); + closeResult(beginRes); + try { + const result = await fn(); + const commitRes = await this.query("COMMIT"); + closeResult(commitRes); + return result; + } catch (e) { + const rollbackRes = await this.query("ROLLBACK"); + closeResult(rollbackRes); + throw e; + } + } + /** * Set the timeout for queries. Queries that take longer than the timeout * will be aborted. diff --git a/tools/nodejs_api/src_js/lbug.d.ts b/tools/nodejs_api/src_js/lbug.d.ts index 977be1b157..807289a9a2 100644 --- a/tools/nodejs_api/src_js/lbug.d.ts +++ b/tools/nodejs_api/src_js/lbug.d.ts @@ -22,6 +22,15 @@ export type ProgressCallback = ( numPipelines: number ) => void; +/** + * Options for query() and execute(). + * Use signal to cancel the operation via AbortController. + */ +export interface QueryOptions { + signal?: AbortSignal; + progressCallback?: ProgressCallback; +} + /** * Represents a node ID in the graph database. */ @@ -117,6 +126,8 @@ export class Database { * @param maxDBSize Maximum size of the database in bytes * @param autoCheckpoint Whether to enable automatic checkpoints * @param checkpointThreshold Threshold for automatic checkpoints + * @param throwOnWalReplayFailure If true, WAL replay failures throw; otherwise replay stops at error + * @param enableChecksums If true, use checksums to detect WAL corruption */ constructor( databasePath?: string, @@ -125,7 +136,9 @@ export class Database { readOnly?: boolean, maxDBSize?: number, autoCheckpoint?: boolean, - checkpointThreshold?: number + checkpointThreshold?: number, + throwOnWalReplayFailure?: boolean, + enableChecksums?: boolean ); /** @@ -200,6 +213,12 @@ export class Connection { */ setQueryTimeout(timeoutInMs: number): void; + /** + * Interrupt the currently executing query on this connection. + * No-op if the connection is not initialized or no query is running. + */ + interrupt(): void; + /** * Close the connection. * @returns Promise that resolves when connection is closed @@ -215,13 +234,13 @@ export class Connection { * Execute a prepared statement. * @param preparedStatement The prepared statement to execute * @param params Parameters for the query as a plain object - * @param progressCallback Optional progress callback - * @returns Promise that resolves to the query result(s) + * @param optionsOrProgressCallback Options (e.g. signal for abort) or legacy progress callback + * @returns Promise that resolves to the query result(s). Rejects with DOMException AbortError if signal is aborted. */ execute( preparedStatement: PreparedStatement, params?: Record, - progressCallback?: ProgressCallback + optionsOrProgressCallback?: QueryOptions | ProgressCallback ): Promise; /** @@ -252,20 +271,52 @@ export class Connection { /** * Execute a query. * @param statement The statement to execute - * @param progressCallback Optional progress callback - * @returns Promise that resolves to the query result(s) + * @param optionsOrProgressCallback Options (e.g. signal for abort) or legacy progress callback + * @returns Promise that resolves to the query result(s). Rejects with DOMException AbortError if signal is aborted. */ query( statement: string, - progressCallback?: ProgressCallback + optionsOrProgressCallback?: QueryOptions | ProgressCallback ): Promise; + /** + * Run a function inside a single write transaction. Commits on success, rolls back on throw. + * @param fn Async function that can use this connection's query/execute + * @returns Promise that resolves to the return value of fn + */ + transaction(fn: () => Promise): Promise; + /** * Execute a query synchronously. * @param statement The statement to execute * @returns The query result(s) */ querySync(statement: string): QueryResult | QueryResult[]; + + /** + * Check that the connection is alive (e.g. for pools or health checks). + * @returns Promise that resolves to true if OK, rejects if connection is broken + */ + ping(): Promise; + + /** + * Register a stream source for LOAD FROM name. Source must be AsyncIterable of rows (array or object). + * Unregister with unregisterStream(name) when done. + * @param name Name used in Cypher: LOAD FROM name RETURN ... + * @param source AsyncIterable of rows (array of column values or object keyed by column name) + * @param options.columns Schema: array of { name: string, type: string } (type: INT64, INT32, DOUBLE, STRING, BOOL, DATE, etc.) + */ + registerStream( + name: string, + source: AsyncIterable>, + options: { columns: Array<{ name: string; type: string }> } + ): Promise; + + /** + * Unregister a stream source by name. + * @param name Name passed to registerStream + */ + unregisterStream(name: string): void; } /** @@ -289,8 +340,14 @@ export class PreparedStatement { /** * Represents the results of a query execution. * Note: This class is created internally by Connection query methods. + * Supports async iteration: for await (const row of result) { ... } */ -export class QueryResult { +export class QueryResult implements AsyncIterable | null> { + /** + * Async iterator for row-by-row consumption (for await...of). + */ + [Symbol.asyncIterator](): AsyncIterator | null>; + /** * Reset the iterator for reading results. */ @@ -320,6 +377,12 @@ export class QueryResult { */ getNextSync(): Record | null; + /** + * Return a Node.js Readable stream (object mode) that yields one row per chunk. + * @returns Readable stream of row objects + */ + toStream(): import("stream").Readable; + /** * Iterate through the query result with callback functions. * @param resultCallback Callback function called for each row diff --git a/tools/nodejs_api/src_js/query_result.js b/tools/nodejs_api/src_js/query_result.js index 944f23db9c..3d75892eda 100644 --- a/tools/nodejs_api/src_js/query_result.js +++ b/tools/nodejs_api/src_js/query_result.js @@ -1,6 +1,7 @@ "use strict"; const assert = require("assert"); +const { Readable } = require("stream"); class QueryResult { /** @@ -96,6 +97,64 @@ class QueryResult { }); } + /** + * Async iterator for consuming the result row-by-row (e.g. `for await (const row of result)`). + * Does not materialize the full result in memory. + * @returns {AsyncIterator} + */ + [Symbol.asyncIterator]() { + const self = this; + return { + async next() { + self._checkClosed(); + if (!self.hasNext()) { + return { done: true }; + } + try { + const value = await self.getNext(); + if (value === null) { + return { done: true }; + } + return { value, done: false }; + } catch (err) { + return Promise.reject(err); + } + }, + }; + } + + /** + * Return a Node.js Readable stream (object mode) that yields one row per chunk. + * Useful for piping or integrating with stream consumers. Does not require native changes. + * @returns {stream.Readable} Readable stream of row objects. + */ + toStream() { + const self = this; + return new Readable({ + objectMode: true, + read() { + if (self._isClosed) { + return this.push(null); + } + if (!self.hasNext()) { + return this.push(null); + } + self.getNext() + .then((row) => { + if (row !== null && row !== undefined) { + this.push(row); + } + if (!self.hasNext()) { + this.push(null); + } + }) + .catch((err) => { + this.destroy(err); + }); + }, + }); + } + /** * Get all rows of the query result. * @returns {Promise>} a promise that resolves to all rows of the query result. The promise is rejected if there is an error. @@ -229,13 +288,16 @@ class QueryResult { } /** - * Internal function to check if the query result is closed. - * @throws {Error} if the query result is closed. + * Internal function to check if the query result or its connection is closed. + * @throws {Error} if the query result is closed or the connection is closed. */ _checkClosed() { if (this._isClosed) { throw new Error("Query result is closed."); } + if (this._connection._isClosed) { + throw new Error("Connection is closed."); + } } } diff --git a/tools/nodejs_api/test/test.js b/tools/nodejs_api/test/test.js index 4efa4b7e6e..cbd152202d 100644 --- a/tools/nodejs_api/test/test.js +++ b/tools/nodejs_api/test/test.js @@ -18,4 +18,6 @@ describe("lbug", () => { importTest("Concurrent query execution", "./test_concurrency.js"); importTest("Version", "./test_version.js"); importTest("Synchronous API", "./test_sync_api.js"); + importTest("registerStream / LOAD FROM stream", "./test_register_stream.js"); + importTest("Resilience (close during/after use)", "./test_resilience.js"); }); diff --git a/tools/nodejs_api/test/test_connection.js b/tools/nodejs_api/test/test_connection.js index 34345ad26a..e09bf2c1c5 100644 --- a/tools/nodejs_api/test/test_connection.js +++ b/tools/nodejs_api/test/test_connection.js @@ -122,6 +122,50 @@ describe("Execute", function () { }); }); +describe("ping", function () { + it("should resolve to true when connection is alive", async function () { + const ok = await conn.ping(); + assert.strictEqual(ok, true); + }); +}); + +describe("transaction", function () { + it("should commit and return fn result on success", async function () { + const result = await conn.transaction(async () => { + const q = await conn.query("RETURN 42 AS x"); + const rows = await q.getAll(); + q.close(); + return rows[0].x; + }); + assert.equal(result, 42); + }); + + it("should rollback and rethrow on fn error", async function () { + const err = new Error("tx abort"); + try { + await conn.transaction(async () => { + await conn.query("RETURN 1"); + throw err; + }); + assert.fail("transaction should have thrown"); + } catch (e) { + assert.strictEqual(e, err); + } + const q = await conn.query("RETURN 1"); + assert.isTrue(q.hasNext()); + q.close(); + }); + + it("should reject non-function", async function () { + try { + await conn.transaction("not a function"); + assert.fail("transaction should have thrown"); + } catch (e) { + assert.equal(e.message, "transaction() requires a function."); + } + }); +}); + describe("Query", function () { it("should run a valid query", async function () { const queryResult = await conn.query("MATCH (a:person) RETURN COUNT(*)"); @@ -214,6 +258,70 @@ describe("Timeout", function () { }); }); +describe("Interrupt", function () { + it("should abort a long-running query when interrupt() is called", async function () { + if (process.platform === "win32") { + this.skip(); + } + this.timeout(10000); + const newConn = new lbug.Connection(db); + await newConn.init(); + const longQuery = + "UNWIND RANGE(1,100000) AS x UNWIND RANGE(1, 100000) AS y RETURN COUNT(x + y);"; + const queryPromise = newConn.query(longQuery); + setTimeout(() => newConn.interrupt(), 150); + try { + await queryPromise; + assert.fail("No error thrown when the query was interrupted."); + } catch (err) { + assert.equal(err.message, "Interrupted."); + } + }); +}); + +describe("AbortSignal", function () { + it("should reject with AbortError when signal is already aborted before query starts", async function () { + const ac = new AbortController(); + ac.abort(); + try { + await conn.query("RETURN 1", { signal: ac.signal }); + assert.fail("No error thrown when signal was already aborted."); + } catch (err) { + assert.equal(err.name, "AbortError"); + assert.equal(err.message, "The operation was aborted."); + } + }); + + it("should reject with AbortError when signal is aborted during query", async function () { + const newConn = new lbug.Connection(db); + await newConn.init(); + const ac = new AbortController(); + const longQuery = + "UNWIND RANGE(1,100000) AS x UNWIND RANGE(1, 100000) AS y RETURN COUNT(x + y);"; + const queryPromise = newConn.query(longQuery, { signal: ac.signal }); + setTimeout(() => ac.abort(), 150); + try { + await queryPromise; + assert.fail("No error thrown when signal was aborted during query."); + } catch (err) { + assert.equal(err.name, "AbortError"); + } + }); + + it("should work with progressCallback in options object", async function () { + let progressCalled = false; + const result = await conn.query("RETURN 1", { + progressCallback: () => { + progressCalled = true; + }, + }); + assert.exists(result); + const rows = Array.isArray(result) ? result : [result]; + assert.isAtLeast(rows.length, 1); + rows.forEach((r) => r.close()); + }); +}); + describe("Close", function () { it("should close the connection", async function () { const newConn = new lbug.Connection(db); diff --git a/tools/nodejs_api/test/test_database.js b/tools/nodejs_api/test/test_database.js index 06ba4c24f4..57cb692c20 100644 --- a/tools/nodejs_api/test/test_database.js +++ b/tools/nodejs_api/test/test_database.js @@ -9,15 +9,17 @@ const spwan = require("child_process").spawn; const openDatabaseOnSubprocess = (dbPath) => { return new Promise((resolve, _) => { const node = process.argv[0]; + // Use env vars so Windows paths with backslashes don't break the -e code string + const env = { ...process.env, LBUG_PATH: lbugPath, DB_PATH: dbPath }; const code = ` (async() => { - const lbug = require("${lbugPath}"); - const db = new lbug.Database("${dbPath}", 1 << 28); + const lbug = require(process.env.LBUG_PATH); + const db = new lbug.Database(process.env.DB_PATH, 1 << 28); await db.init(); console.log("Database initialized."); })(); `; - const child = spwan(node, ["-e", code]); + const child = spwan(node, ["-e", code], { env }); let stdout = ""; let stderr = ""; child.stdout.on("data", (data) => { @@ -374,10 +376,6 @@ describe("Database constructor", function () { describe("Database close", function () { it("should allow initializing a new database after closing", async function () { - if (process.platform === "win32") { - this._runnable.title += " (skipped: not implemented on Windows)"; - this.skip(); - } const tmpDbPath = await new Promise((resolve, reject) => { tmp.dir({ unsafeCleanup: true }, (err, path, _) => { if (err) { @@ -389,7 +387,6 @@ describe("Database close", function () { const dbPath = path.join(tmpDbPath, "db.kz"); const testDb = new lbug.Database(dbPath, 1 << 28 /* 256MB */); await testDb.init(); - // FIXME: doesn't work properly on windows let subProcessResult = await openDatabaseOnSubprocess(dbPath); assert.notEqual(subProcessResult.code, 0); assert.include( @@ -507,9 +504,17 @@ describe("Database close", function () { assert.deepEqual(tuple, { "+(1,1)": 2 }); testDb.closeSync(); assert.isTrue(testDb._isClosed); - assert.throws(() => conn.querySync("RETURN 1+1"), Error, "Runtime exception: The current operation is not allowed because the parent database is closed."); + assert.throws( + () => conn.querySync("RETURN 1+1"), + Error, + /(Runtime exception:.*parent database is closed|Connection is closed\.)/ + ); conn.closeSync(); assert.isTrue(conn._isClosed); - assert.throws(() => res.resetIterator(), Error, "Runtime exception: The current operation is not allowed because the parent database is closed."); + assert.throws( + () => res.resetIterator(), + Error, + /(Runtime exception:.*parent database is closed|Connection is closed\.)/ + ); }); }); diff --git a/tools/nodejs_api/test/test_query_result.js b/tools/nodejs_api/test/test_query_result.js index 5f84b3f32d..ad306c141e 100644 --- a/tools/nodejs_api/test/test_query_result.js +++ b/tools/nodejs_api/test/test_query_result.js @@ -66,22 +66,30 @@ describe("Get next", function () { } }); - it("should throw an error if there is no next tuple", async function () { + it("should return null when no more tuples", async function () { const queryResult = await conn.query( "MATCH (a:person) RETURN a.ID ORDER BY a.ID" ); for (let i = 0; i < 8; ++i) { await queryResult.getNext(); } - try { - await queryResult.getNext(); - assert.fail("No error thrown when there is no next tuple"); - } catch (err) { - assert.equal( - err.message, - "Runtime exception: No more tuples in QueryResult, Please check hasNext() before calling getNext()." - ); + const exhausted = await queryResult.getNext(); + assert.isNull(exhausted, "getNext() returns null when no more tuples"); + }); + + it("getNext() returns null exactly when hasNext() is false", async function () { + const queryResult = await conn.query( + "MATCH (a:person) RETURN a.ID ORDER BY a.ID" + ); + let count = 0; + while (queryResult.hasNext()) { + const row = await queryResult.getNext(); + assert.isNotNull(row, "getNext() must return value when hasNext() is true"); + count++; } + assert.equal(count, 8); + const afterExhausted = await queryResult.getNext(); + assert.isNull(afterExhausted, "getNext() must return null when hasNext() was false"); }); }); @@ -211,6 +219,46 @@ describe("Get query summary", function () { }); }); +describe("Async iterator (for await...of)", function () { + it("should iterate rows same as getNext", async function () { + const queryResult = await conn.query( + "MATCH (a:person) RETURN a.ID ORDER BY a.ID" + ); + const ids = []; + for await (const row of queryResult) { + ids.push(row["a.ID"]); + } + assert.deepEqual(ids, PERSON_IDS); + }); + + it("should not materialize full result in memory", async function () { + const queryResult = await conn.query( + "MATCH (a:person) RETURN a.ID ORDER BY a.ID" + ); + let count = 0; + for await (const row of queryResult) { + count++; + assert.equal(row["a.ID"], PERSON_IDS[count - 1]); + } + assert.equal(count, PERSON_IDS.length); + }); +}); + +describe("toStream", function () { + it("should yield rows as Readable stream", async function () { + const queryResult = await conn.query( + "MATCH (a:person) RETURN a.ID ORDER BY a.ID" + ); + const stream = queryResult.toStream(); + const rows = []; + for await (const row of stream) { + rows.push(row); + } + const ids = rows.map((r) => r["a.ID"]); + assert.deepEqual(ids, PERSON_IDS); + }); +}); + describe("Close", function () { it("should close the query result", async function () { const queryResult = await conn.query( diff --git a/tools/nodejs_api/test/test_register_stream.js b/tools/nodejs_api/test/test_register_stream.js new file mode 100644 index 0000000000..a2b3fc2eef --- /dev/null +++ b/tools/nodejs_api/test/test_register_stream.js @@ -0,0 +1,81 @@ +const { assert } = require("chai"); + +describe("registerStream / LOAD FROM stream", function () { + it("should LOAD FROM registered stream and return rows", async function () { + async function* rowSource() { + yield [1, "a"]; + yield [2, "b"]; + yield [3, "c"]; + } + await conn.registerStream("mystream", rowSource(), { + columns: [ + { name: "id", type: "INT64" }, + { name: "label", type: "STRING" }, + ], + }); + try { + const result = await conn.query("LOAD FROM mystream RETURN *"); + const rows = Array.isArray(result) ? result : [result]; + assert.isAtLeast(rows.length, 1); + const r = rows[0]; + assert.isTrue(r.hasNext()); + const row1 = await r.getNext(); + assert.exists(row1); + assert.equal(row1["id"], 1); + assert.equal(row1["label"], "a"); + const row2 = await r.getNext(); + assert.equal(row2["id"], 2); + assert.equal(row2["label"], "b"); + const row3 = await r.getNext(); + assert.equal(row3["id"], 3); + assert.equal(row3["label"], "c"); + assert.isNull(await r.getNext()); + } finally { + conn.unregisterStream("mystream"); + } + }); + + it("should LOAD FROM stream with object rows (column order from schema)", async function () { + async function* objectRowSource() { + yield { id: 10, label: "x" }; + yield { label: "y", id: 20 }; + } + await conn.registerStream("objstream", objectRowSource(), { + columns: [ + { name: "id", type: "INT64" }, + { name: "label", type: "STRING" }, + ], + }); + try { + const result = await conn.query("LOAD FROM objstream RETURN *"); + const r = Array.isArray(result) ? result[0] : result; + const row1 = await r.getNext(); + assert.isNotNull(row1, "expected first row from stream"); + assert.equal(row1["id"], 10); + assert.equal(row1["label"], "x"); + const row2 = await r.getNext(); + assert.isNotNull(row2, "expected second row from stream"); + assert.equal(row2["id"], 20); + assert.equal(row2["label"], "y"); + assert.isNull(await r.getNext()); + } finally { + conn.unregisterStream("objstream"); + } + }); + + it("should unregisterStream by name", async function () { + async function* empty() { + if (false) yield []; + } + await conn.registerStream("tmpstream", empty(), { + columns: [{ name: "x", type: "INT64" }], + }); + conn.unregisterStream("tmpstream"); + try { + await conn.query("LOAD FROM tmpstream RETURN *"); + assert.fail("Expected error when loading from unregistered stream."); + } catch (e) { + assert.include(e.message, "not in scope"); + } + }); +}); diff --git a/tools/nodejs_api/test/test_resilience.js b/tools/nodejs_api/test/test_resilience.js new file mode 100644 index 0000000000..252396d4fd --- /dev/null +++ b/tools/nodejs_api/test/test_resilience.js @@ -0,0 +1,188 @@ +"use strict"; + +const { assert } = require("chai"); +const tmp = require("tmp"); +const path = require("path"); + +/** + * Resilience tests: close connection/database during or after operations. + * Goal: no crashes (SIGSEGV, native abort); all failures must surface as JS errors. + */ +function withTempDb(fn) { + return async function () { + const tmpPath = await new Promise((resolve, reject) => { + tmp.dir({ unsafeCleanup: true }, (err, p, _) => { + if (err) return reject(err); + return resolve(p); + }); + }); + const dbPath = path.join(tmpPath, "db.kz"); + const testDb = new lbug.Database(dbPath, 1 << 26 /* 64MB */); + await testDb.init(); + const testConn = new lbug.Connection(testDb); + await testConn.init(); + try { + await fn.call(this, testDb, testConn); + } finally { + if (!testDb._isClosed) await testDb.close().catch(() => {}); + if (!testConn._isClosed) await testConn.close().catch(() => {}); + } + }; +} + +describe("Resilience (close during/after use)", function () { + this.timeout(15000); + + it("query rejects when connection is closed while query is in flight", withTempDb(async (testDb, testConn) => { + const longQuery = "UNWIND range(1, 50000) AS x UNWIND range(1, 5000) AS y RETURN count(*)"; + const queryPromise = testConn.query(longQuery); + await new Promise((r) => setTimeout(r, 120)); + testConn.closeSync(); + const timeoutMs = 5000; + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(`Expected query to reject within ${timeoutMs}ms when connection was closed (timed out).`)), timeoutMs); + }); + try { + await Promise.race([queryPromise, timeoutPromise]); + assert.fail("Expected query to reject when connection was closed during execution."); + } catch (err) { + if ((err.message || "").includes("timed out")) throw err; + assert.instanceOf(err, Error); + const msg = (err.message || "").toLowerCase(); + const ok = msg.includes("closed") || msg.includes("not allowed") || msg.includes("runtime"); + assert.isTrue(ok, `Expected error about closed/not allowed, got: ${err.message}`); + } + })); + + it("query rejects when database is closed while query is in flight", withTempDb(async (testDb, testConn) => { + const longQuery = "UNWIND range(1, 50000) AS x UNWIND range(1, 5000) AS y RETURN count(*)"; + const queryPromise = testConn.query(longQuery); + await new Promise((r) => setTimeout(r, 120)); + testDb.closeSync(); + const timeoutMs = 5000; + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(`Expected query to reject within ${timeoutMs}ms when database was closed (timed out).`)), timeoutMs); + }); + try { + await Promise.race([queryPromise, timeoutPromise]); + assert.fail("Expected query to reject when database was closed during execution."); + } catch (err) { + if ((err.message || "").includes("timed out")) throw err; + assert.instanceOf(err, Error); + const msg = (err.message || "").toLowerCase(); + const ok = msg.includes("closed") || msg.includes("not allowed") || msg.includes("runtime"); + assert.isTrue(ok, `Expected error about closed/not allowed, got: ${err.message}`); + } + })); + + it("getNext() after connection closed throws and does not crash", withTempDb(async (testDb, testConn) => { + const res = await testConn.query("RETURN 1 AS x"); + const row = await res.getNext(); + assert.equal(row.x, 1); + testConn.closeSync(); + try { + await res.getNext(); + assert.fail("Expected getNext() to throw after connection closed."); + } catch (err) { + assert.instanceOf(err, Error); + const msg = (err.message || "").toLowerCase(); + assert.isTrue(msg.includes("closed") || msg.includes("not allowed"), `Expected closed/not allowed, got: ${err.message}`); + } + })); + + it("hasNext() after connection closed throws and does not crash", withTempDb(async (testDb, testConn) => { + const res = await testConn.query("RETURN 1 AS x"); + assert.isTrue(res.hasNext()); + testConn.closeSync(); + try { + res.hasNext(); + assert.fail("Expected hasNext() to throw after connection closed."); + } catch (err) { + assert.instanceOf(err, Error); + const msg = (err.message || "").toLowerCase(); + assert.isTrue(msg.includes("closed") || msg.includes("not allowed"), `Expected closed/not allowed, got: ${err.message}`); + } + })); + + it("getNext() after database closed throws and does not crash", withTempDb(async (testDb, testConn) => { + const res = await testConn.query("RETURN 1 AS x"); + await res.getNext(); + testDb.closeSync(); + try { + await res.getNext(); + assert.fail("Expected getNext() to throw after database closed."); + } catch (err) { + assert.instanceOf(err, Error); + const msg = (err.message || "").toLowerCase(); + assert.isTrue(msg.includes("closed") || msg.includes("not allowed"), `Expected closed/not allowed, got: ${err.message}`); + } + })); + + it("hasNext() after database closed throws and does not crash", withTempDb(async (testDb, testConn) => { + const res = await testConn.query("RETURN 1 AS x"); + testDb.closeSync(); + try { + res.hasNext(); + assert.fail("Expected hasNext() to throw after database closed."); + } catch (err) { + assert.instanceOf(err, Error); + const msg = (err.message || "").toLowerCase(); + assert.isTrue(msg.includes("closed") || msg.includes("not allowed"), `Expected closed/not allowed, got: ${err.message}`); + } + })); + + it("registerStream then close connection then query throws before running", withTempDb(async (testDb, testConn) => { + async function* gen() { + yield [1]; + } + await testConn.registerStream("s", gen(), { columns: [{ name: "x", type: "INT64" }] }); + testConn.closeSync(); + try { + await testConn.query("LOAD FROM s RETURN *"); + assert.fail("Expected query to throw when connection is already closed."); + } catch (err) { + assert.instanceOf(err, Error); + assert.include((err.message || "").toLowerCase(), "closed"); + } + })); + + it("close connection while iterating result: second getNext throws", withTempDb(async (testDb, testConn) => { + const res = await testConn.query("UNWIND [1,2,3] AS x RETURN x"); + const a = await res.getNext(); + assert.equal(a.x, 1); + testConn.closeSync(); + try { + await res.getNext(); + assert.fail("Expected getNext() to throw after connection closed mid-iteration."); + } catch (err) { + assert.instanceOf(err, Error); + const msg = (err.message || "").toLowerCase(); + assert.isTrue(msg.includes("closed") || msg.includes("not allowed"), `Expected closed/not allowed, got: ${err.message}`); + } + })); + + it("query after connection closed throws immediately (no native call)", async function () { + const testConn = new lbug.Connection(db); + await testConn.init(); + await testConn.close(); + try { + await testConn.query("RETURN 1"); + assert.fail("Expected query to throw when connection is closed."); + } catch (err) { + assert.equal(err.message, "Connection is closed."); + } + }); + + it("getNextSync after database closed throws", withTempDb(async (testDb, testConn) => { + const res = await testConn.query("RETURN 1 AS x"); + testDb.closeSync(); + try { + res.getNextSync(); + assert.fail("Expected getNextSync() to throw after database closed."); + } catch (err) { + assert.instanceOf(err, Error); + const msg = (err.message || "").toLowerCase(); + assert.isTrue(msg.includes("closed") || msg.includes("not allowed"), `Expected closed/not allowed, got: ${err.message}`); + } + })); +});