diff --git a/.github/workflows/nodejs-workflow.yml b/.github/workflows/nodejs-workflow.yml index 5ab932863f..bab2286f35 100644 --- a/.github/workflows/nodejs-workflow.yml +++ b/.github/workflows/nodejs-workflow.yml @@ -144,3 +144,40 @@ jobs: if: ${{ matrix.platform == 'darwin' }} working-directory: tools/nodejs_api/ run: rm -rf package + + # Push prebuilt/*.node to repo so pnpm add github:user/ladybug#path:tools/nodejs_api uses them without building. + # Runs only on manual workflow_dispatch to avoid push loops. + update-prebuilt: + if: github.event_name == 'workflow_dispatch' + needs: build-nodejs + runs-on: ubuntu-latest + permissions: + contents: write + steps: + - uses: actions/checkout@v4 + with: + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Download Node.js artifacts + uses: actions/download-artifact@v4 + with: + path: artifacts + pattern: "*nodejs*" + merge-multiple: false + + - name: Copy artifacts to prebuilt + run: | + mkdir -p tools/nodejs_api/prebuilt + for d in artifacts/*/; do + find "$d" -name "lbugjs-*.node" -exec cp {} tools/nodejs_api/prebuilt/ \; + done + ls -la tools/nodejs_api/prebuilt/ + + - name: Commit and push prebuilt + run: | + git config user.name "github-actions[bot]" + git config user.email "github-actions[bot]@users.noreply.github.com" + git add tools/nodejs_api/prebuilt/ + git diff --staged --quiet && echo "No prebuilt changes" && exit 0 + git commit -m "chore(nodejs): update prebuilt addons from CI [skip ci]" + git push diff --git a/.gitignore b/.gitignore index 5a84aa4078..fa1b3163c6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,9 @@ +tree-sitter/ +tree-sitter-cypher/ + .idea/ .vscode +.cursor .vs bazel-* .clwb diff --git a/Makefile b/Makefile index 22391806e4..03bdb5ed71 100644 --- a/Makefile +++ b/Makefile @@ -187,7 +187,7 @@ nodejs-deps: cd tools/nodejs_api && npm install --include=dev nodejstest: nodejs - cd tools/nodejs_api && npm test + cd tools/nodejs_api && node copy_src_to_build.js && npm test nodejstest-deps: nodejs-deps nodejstest diff --git a/tools/nodejs_api/README.md b/tools/nodejs_api/README.md index 086c63eb2f..d2ed12c77a 100644 --- a/tools/nodejs_api/README.md +++ b/tools/nodejs_api/README.md @@ -7,10 +7,24 @@ A high-performance graph database for knowledge-intensive applications. This Nod ## 📦 Installation +**From npm (if published):** + ```bash npm install lbug ``` +**From GitHub** (monorepo; the Node package lives in `tools/nodejs_api`): + +- **pnpm** (v9+), subdirectory is supported: + + ```bash + pnpm add lbug@github:LadybugDB/ladybug#path:tools/nodejs_api + ``` + + On install, the package will build the native addon from source (needs CMake and a C++20 compiler). + +- **npm**: no built-in subdirectory install. Either use a **local path** after cloning and building (see [Build and use in other projects](#-build-and-use-in-other-projects-local)), or a tarball from [GitPkg](https://gitpkg.vercel.app/) (e.g. `https://gitpkg.vercel.app/LadybugDB/ladybug/tools/nodejs_api?main`). + --- ## 🚀 Quick Start @@ -49,10 +63,8 @@ const main = async () => { // Run a query const result = await conn.query("MATCH (u:User) RETURN u.name, u.age;"); - // Fetch all results + // Consume results (choose one style) const rows = await result.getAll(); - - // Output results for (const row of rows) { console.log(row); } @@ -68,12 +80,101 @@ main().catch(console.error); The `lbug` package exposes the following primary classes: -* `Database` – Initializes a database from a file path. -* `Connection` – Executes queries on a connected database. -* `QueryResult` – Provides methods like `getAll()` to retrieve results. +* **Database** – `new Database(path, bufferPoolSize?, ...)`. Initialize with `init()` / `initSync()` (optional; done on first use). Close with `close()`. +* **Connection** – `new Connection(database, numThreads?)`. Run Cypher with `query(statement)` or `prepare(statement)` then `execute(preparedStatement, params)`. Use `transaction(fn)` for a single write transaction, `ping()` for liveness checks. Use `registerStream(name, source, { columns })` to load data from an AsyncIterable via `LOAD FROM name`; `unregisterStream(name)` when done. Configure with `setQueryTimeout(ms)`, `setMaxNumThreadForExec(n)`. +* **QueryResult** – Returned by `query()` / `execute()`. Consume with `getAll()`, `getNext()` / `hasNext()`, **async iteration** (`for await...of`), or **`toStream()`** (Node.js `Readable`). Metadata: `getColumnNames()`, `getColumnDataTypes()`, `getQuerySummary()`. Call `close()` when done (optional if fully consumed). +* **PreparedStatement** – Created by `conn.prepare(statement)`. Execute with `conn.execute(preparedStatement, params)`. Reuse for parameterized queries. Both CommonJS (`require`) and ES Modules (`import`) are fully supported. +### Consuming query results + +```js +const result = await conn.query("MATCH (n:User) RETURN n.name LIMIT 1000"); + +// Option 1: get all rows (loads into memory) +const rows = await result.getAll(); + +// Option 2: row by row (async) +while (result.hasNext()) { + const row = await result.getNext(); + console.log(row); +} + +// Option 3: async iterator (streaming, no full materialization) +for await (const row of result) { + console.log(row); +} + +// Option 4: Node.js Readable stream (e.g. for .pipe()) +const stream = result.toStream(); +stream.on("data", (row) => console.log(row)); +``` + +### Transactions + +**Manual:** Run `BEGIN TRANSACTION`, then your queries, then `COMMIT` or `ROLLBACK`. On error, call `ROLLBACK` before continuing. + +```js +await conn.query("BEGIN TRANSACTION"); +await conn.query("CREATE NODE TABLE Nodes(id INT64, PRIMARY KEY(id))"); +await conn.query('COPY Nodes FROM "data.csv"'); +await conn.query("COMMIT"); +// or on error: await conn.query("ROLLBACK"); +``` + +**Read-only transaction:** `BEGIN TRANSACTION READ ONLY` then queries, then `COMMIT` / `ROLLBACK`. + +**Wrapper:** One write transaction with automatic commit on success and rollback on throw: + +```js +await conn.transaction(async () => { + await conn.query("CREATE NODE TABLE Nodes(id INT64, PRIMARY KEY(id))"); + await conn.query('COPY Nodes FROM "data.csv"'); + // commit happens automatically; on throw, rollback then rethrow +}); +``` + +### Loading data from a Node.js stream + +You can feed data from an **AsyncIterable** (generator, async generator, or any `Symbol.asyncIterator`) into Cypher using **scan replacement**: register a stream by name, then use `LOAD FROM name` in your query. Rows are pulled from JavaScript on demand during execution. + +**API:** + +* **`conn.registerStream(name, source, options)`** (async) + * `name` – string used in Cypher: `LOAD FROM name RETURN ...` + * `source` – AsyncIterable of rows. Each row is an **array** of column values (same order as `options.columns`) or an **object** keyed by column name. + * `options.columns` – **required**. Schema: array of `{ name: string, type: string }`. Supported types: `INT64`, `INT32`, `INT16`, `INT8`, `UINT64`, `UINT32`, `DOUBLE`, `FLOAT`, `STRING`, `BOOL`, `DATE`, `TIMESTAMP`. + +* **`conn.unregisterStream(name)`** + Unregisters the source so the name can be reused or to avoid leaving stale entries. Call after the query (or when done with the stream). + +**Example:** + +```js +async function* generateRows() { + yield [1, "Alice"]; + yield [2, "Bob"]; + yield [3, "Carol"]; +} + +await conn.registerStream("users", generateRows(), { + columns: [ + { name: "id", type: "INT64" }, + { name: "name", type: "STRING" }, + ], +}); + +const result = await conn.query("LOAD FROM users RETURN *"); +for await (const row of result) { + console.log(row); // { id: 1, name: "Alice" }, ... +} + +conn.unregisterStream("users"); +``` + +You can combine the stream with other Cypher: e.g. `LOAD FROM stream RETURN * WHERE col > 0`, or `COPY MyTable FROM (LOAD FROM stream RETURN *)`. + --- ## 🛠️ Local Development (for Contributors) @@ -96,6 +197,94 @@ npm run build npm test ``` +When developing from the **monorepo root**, build the native addon first so tests see the latest C++ code: + +```bash +# From repo root (D:\prj\ladybug or similar) +make nodejs +# Or: cmake --build build/release --target lbugjs +# Then from tools/nodejs_api: +cd tools/nodejs_api && npm test +``` + +--- + +## 🔧 Build and use in other projects (local) + +To use the Node.js API from the Ladybug repo in another project without publishing to npm: + +1. **Build the addon** (from the Ladybug repo root): + + ```bash + make nodejs + ``` + + Or from this directory: + + ```bash + npm run build + ``` + + This compiles the native addon into `build/lbugjs.node` and copies JS and types. + +2. **In your other project**, add a file dependency in `package.json`: + + ```json + "dependencies": { + "lbug": "file:../path/to/ladybug/tools/nodejs_api" + } + ``` + + Then run `npm install`. After that, `require("lbug")` or `import ... from "lbug"` will use your local build. + +3. **Optional:** to pack and install a tarball instead: + + ```bash + cd /path/to/ladybug/tools/nodejs_api + npm run build + npm pack + ``` + + In the other project: `npm install /path/to/ladybug/tools/nodejs_api/lbug-0.0.1.tgz`. + +### Prebuilt in your fork (install from GitHub without building) + +If you install from GitHub (e.g. `pnpm add lbug@github:user/ladybug#path:tools/nodejs_api`), the package runs `install.js`: if it finds a prebuilt binary, it uses it and does not build from source. To ship a prebuilt in your fork: + +1. **Build once** in your clone (from repo root): + + ```bash + make nodejs + ``` + +2. **Create the prebuilt file** (name = `lbugjs--.node`): + + - Windows x64: copy `tools/nodejs_api/build/lbugjs.node` → `tools/nodejs_api/prebuilt/lbugjs-win32-x64.node` + - Linux x64: `lbugjs-linux-x64.node` + - macOS x64: `lbugjs-darwin-x64.node`, arm64: `lbugjs-darwin-arm64.node` + + Example (from repo root). **Windows (PowerShell):** + + ```powershell + New-Item -ItemType Directory -Force -Path tools/nodejs_api/prebuilt + Copy-Item tools/nodejs_api/build/lbugjs.node tools/nodejs_api/prebuilt/lbugjs-win32-x64.node + ``` + + **Linux/macOS:** + + ```bash + mkdir -p tools/nodejs_api/prebuilt + cp tools/nodejs_api/build/lbugjs.node tools/nodejs_api/prebuilt/lbugjs-$(node -p "process.platform")-$(node -p "process.arch").node + ``` + +3. **Commit and push** the `prebuilt/` folder. Then anyone (or you in another project) can do: + + ```bash + pnpm add lbug@github:YOUR_USERNAME/ladybug#path:tools/nodejs_api + ``` + + and the addon will be used from prebuilt without a local build. + --- ## 📦 Packaging and Binary Distribution diff --git a/tools/nodejs_api/build.js b/tools/nodejs_api/build.js index d326300260..b1b93bae43 100644 --- a/tools/nodejs_api/build.js +++ b/tools/nodejs_api/build.js @@ -3,6 +3,9 @@ const path = require("path"); const { execSync } = require("child_process"); const SRC_PATH = path.resolve(__dirname, "../.."); +const NODEJS_API = path.resolve(__dirname, "."); +const BUILD_DIR = path.join(NODEJS_API, "build"); +const SRC_JS_DIR = path.join(NODEJS_API, "src_js"); const THREADS = require("os").cpus().length; console.log(`Using ${THREADS} threads to build Lbug.`); @@ -12,3 +15,19 @@ execSync(`make nodejs NUM_THREADS=${THREADS}`, { cwd: SRC_PATH, stdio: "inherit", }); + +// Ensure build/ has latest JS from src_js (CMake copies at configure time only) +if (fs.existsSync(SRC_JS_DIR) && fs.existsSync(BUILD_DIR)) { + const files = fs.readdirSync(SRC_JS_DIR); + for (const name of files) { + if (name.endsWith(".js") || name.endsWith(".mjs") || name.endsWith(".d.ts")) { + fs.copyFileSync(path.join(SRC_JS_DIR, name), path.join(BUILD_DIR, name)); + } + } + // So package root has types when used as file: dependency + const dts = path.join(BUILD_DIR, "lbug.d.ts"); + if (fs.existsSync(dts)) { + fs.copyFileSync(dts, path.join(NODEJS_API, "lbug.d.ts")); + } + console.log("Copied src_js to build."); +} diff --git a/tools/nodejs_api/copy_src_to_build.js b/tools/nodejs_api/copy_src_to_build.js new file mode 100644 index 0000000000..8ebe1581c4 --- /dev/null +++ b/tools/nodejs_api/copy_src_to_build.js @@ -0,0 +1,22 @@ +/** + * Copies src_js/*.js, *.mjs, *.d.ts into build/ so tests run with the latest JS + * after "make nodejs" (which only copies at cmake configure time). + * Run from tools/nodejs_api. + */ +const fs = require("fs"); +const path = require("path"); + +const srcDir = path.join(__dirname, "src_js"); +const buildDir = path.join(__dirname, "build"); + +if (!fs.existsSync(buildDir)) { + console.warn("copy_src_to_build: build/ missing, run make nodejs first."); + process.exit(0); +} + +const re = /\.(js|mjs|d\.ts)$/; +const files = fs.readdirSync(srcDir).filter((n) => re.test(n)); +for (const name of files) { + fs.copyFileSync(path.join(srcDir, name), path.join(buildDir, name)); +} +console.log("Copied", files.length, "files from src_js to build."); diff --git a/tools/nodejs_api/docs/execution_chain_analysis.md b/tools/nodejs_api/docs/execution_chain_analysis.md new file mode 100644 index 0000000000..8e633201bc --- /dev/null +++ b/tools/nodejs_api/docs/execution_chain_analysis.md @@ -0,0 +1,65 @@ +# LOAD FROM stream: Execution Chain (reference & recommendations) + +## Execution chain diagram + +```mermaid +%%{init: {'flowchart': {'defaultRenderer': 'elk', 'elk': {'direction': 'DOWN'}}}}%% +flowchart TB + subgraph JS["JS (main thread)"] + direction TB + A["query('LOAD FROM name RETURN *')"] + B["registerStream: getChunk(requestId) → pending.push; runConsumer via setImmediate"] + C["runConsumer: sort pending, for each id take it.next(), returnChunk(id, rows, done)"] + D["AsyncIterator: it.next() → yield rows"] + A --> B + B --> C + C --> D + end + + subgraph CppAddon["C++ addon (Node worker thread)"] + direction TB + E["tableFunc: mutex, nextRequestId(), setChunkRequest, BlockingCall(getChunk)"] + F["wait reqPtr->cv until filled"] + G["returnChunkFromJS: req->rows, filled=true, cv.notify_one"] + H["Copy rows to output.dataChunk, return cap"] + E --> F + G --> F + F --> H + end + + subgraph Engine["Engine (single task thread, canParallelFunc=false)"] + direction TB + I["getNextTuple → getNextTuplesInternal"] + J["tableFunc(input, output) → numTuplesScanned"] + K["FactorizedTable accumulates chunks"] + L["MaterializedQueryResult + FactorizedTableIterator"] + I --> J + J --> K + K --> L + end + + J --> E + E --> B + C --> G + H --> J + L --> M["JS hasNext / getNext"] + M --> A +``` + +--- + +## Useful observations + +- **Order**: With `canParallelFunc = false`, one engine thread calls `tableFunc` sequentially. Request IDs are assigned under mutex; JS `runConsumer` sorts `pending` and serves chunks by `requestId`, so iterator order is preserved. +- **End of stream**: Engine calls `tableFunc` until it returns 0. JS sends `returnChunk(id, [], true)` when the iterator is done; C++ returns 0 and the engine stops. No extra call after 0. +- **getNext contract**: Core `getNext()` throws if `!hasNext()`. Addon always checks `hasNext()` before `getNext()` and returns `null` when exhausted so that JS API matches `getNext(): Promise`. + +--- + +## Recommendations for the future + +1. **Keep `canParallelFunc = false`** for the node stream table function. Enabling parallelism would require a deterministic merge of chunks by requestId on the engine side; until then, single-thread keeps order and avoids subtle bugs. +2. **Any new code path that reads rows** (e.g. another language binding or helper) must guard with `hasNext()` before `getNext()`; core will throw otherwise. +3. **Mutex in `tableFunc`**: Currently redundant with single-thread execution but harmless. If parallelism is ever introduced, either remove the mutex and solve ordering in the engine or keep it and document that the stream source is intentionally serialized. +4. **Tests**: Prefer iterating with `hasNext()` + `getNext()` and asserting `getNext()` returns `null` exactly when `hasNext()` becomes false, to lock the contract (see `test_query_result.js`). +5. **Rebuild and full test run** (e.g. `register_stream` + `query_result`) after any change in the addon or engine table function path. diff --git a/tools/nodejs_api/examples/README.md b/tools/nodejs_api/examples/README.md new file mode 100644 index 0000000000..63ace4baf5 --- /dev/null +++ b/tools/nodejs_api/examples/README.md @@ -0,0 +1,11 @@ +# Examples + +Run from `tools/nodejs_api` (after `make nodejs` or `npm run build`): + +```bash +node examples/quickstart.mjs +node examples/stream-load.mjs +``` + +- **quickstart.mjs** — In-memory DB, create table, load data from a stream via `COPY FROM (LOAD FROM ...)`, then query. +- **stream-load.mjs** — Register an async iterable and consume it with `LOAD FROM name RETURN *`. diff --git a/tools/nodejs_api/examples/quickstart.mjs b/tools/nodejs_api/examples/quickstart.mjs new file mode 100644 index 0000000000..d78ec07f9a --- /dev/null +++ b/tools/nodejs_api/examples/quickstart.mjs @@ -0,0 +1,32 @@ +/** + * Quickstart: in-memory database, create schema, load from stream, query. + * Run from tools/nodejs_api: node examples/quickstart.mjs + */ +import { Database, Connection } from "lbug"; + +async function* userRows() { + yield ["Alice", 30]; + yield ["Bob", 25]; +} + +const db = new Database(":memory:"); +const conn = new Connection(db); + +await conn.query(` + CREATE NODE TABLE User(name STRING, age INT64, PRIMARY KEY (name)); +`); + +await conn.registerStream("users", userRows(), { + columns: [ + { name: "name", type: "STRING" }, + { name: "age", type: "INT64" }, + ], +}); +await conn.query("COPY User FROM (LOAD FROM users RETURN *)"); +conn.unregisterStream("users"); + +const result = await conn.query("MATCH (u:User) RETURN u.name, u.age;"); +const rows = await result.getAll(); +console.log(rows); + +await db.close(); diff --git a/tools/nodejs_api/examples/stream-load.mjs b/tools/nodejs_api/examples/stream-load.mjs new file mode 100644 index 0000000000..9adc84f8a9 --- /dev/null +++ b/tools/nodejs_api/examples/stream-load.mjs @@ -0,0 +1,29 @@ +/** + * Load data from a JavaScript async iterable via LOAD FROM. + * Run from tools/nodejs_api: node examples/stream-load.mjs + */ +import { Database, Connection } from "lbug"; + +async function* generateRows() { + yield [1, "Alice"]; + yield [2, "Bob"]; + yield [3, "Carol"]; +} + +const db = new Database(":memory:"); +const conn = new Connection(db); + +await conn.registerStream("users", generateRows(), { + columns: [ + { name: "id", type: "INT64" }, + { name: "name", type: "STRING" }, + ], +}); + +const result = await conn.query("LOAD FROM users RETURN *"); +for await (const row of result) { + console.log(row); +} + +conn.unregisterStream("users"); +await db.close(); diff --git a/tools/nodejs_api/index.js b/tools/nodejs_api/index.js new file mode 100644 index 0000000000..04ea8a1618 --- /dev/null +++ b/tools/nodejs_api/index.js @@ -0,0 +1,4 @@ +"use strict"; + +// After `make nodejs` or `npm run build`, entry point is build/ +module.exports = require("./build"); diff --git a/tools/nodejs_api/index.mjs b/tools/nodejs_api/index.mjs new file mode 100644 index 0000000000..b638316b29 --- /dev/null +++ b/tools/nodejs_api/index.mjs @@ -0,0 +1,9 @@ +export { + default, + Database, + Connection, + PreparedStatement, + QueryResult, + VERSION, + STORAGE_VERSION, +} from "./build/index.mjs"; diff --git a/tools/nodejs_api/install.js b/tools/nodejs_api/install.js index 6a010b335a..7df6e140fc 100644 --- a/tools/nodejs_api/install.js +++ b/tools/nodejs_api/install.js @@ -8,51 +8,72 @@ const process = require("process"); const isNpmBuildFromSourceSet = process.env.npm_config_build_from_source; const platform = process.platform; const arch = process.arch; + +// Skip when already built (e.g. local dev after make nodejs) +if (fsCallback.existsSync(path.join(__dirname, "build", "lbugjs.node"))) { + process.exit(0); +} + const prebuiltPath = path.join( __dirname, "prebuilt", `lbugjs-${platform}-${arch}.node` ); +const buildDir = path.join(__dirname, "build"); +const srcJsDir = path.join(__dirname, "src_js"); +const lbugSourceDir = path.join(__dirname, "lbug-source"); + // Check if building from source is forced if (isNpmBuildFromSourceSet) { console.log( "The NPM_CONFIG_BUILD_FROM_SOURCE environment variable is set. Building from source." ); } -// Check if prebuilt binaries are available +// Prebuilt available + git-clone layout (src_js present, no lbug-source): use prebuilt and copy src_js → build/ +else if (fsCallback.existsSync(prebuiltPath) && fsCallback.existsSync(srcJsDir)) { + console.log("Prebuilt binary is available (git clone layout)."); + if (!fsCallback.existsSync(buildDir)) { + fsCallback.mkdirSync(buildDir, { recursive: true }); + } + fs.copyFileSync(prebuiltPath, path.join(buildDir, "lbugjs.node")); + const jsFiles = fs.readdirSync(srcJsDir).filter((file) => { + return file.endsWith(".js") || file.endsWith(".mjs") || file.endsWith(".d.ts"); + }); + for (const file of jsFiles) { + fs.copyFileSync(path.join(srcJsDir, file), path.join(buildDir, file)); + } + console.log("Done! Prebuilt + JS copied to build/."); + process.exit(0); +} +// Prebuilt available + tarball layout (lbug-source present): copy to root (legacy publish flow) else if (fsCallback.existsSync(prebuiltPath)) { console.log("Prebuilt binary is available."); - console.log("Copying prebuilt binary to package directory..."); fs.copyFileSync(prebuiltPath, path.join(__dirname, "lbugjs.node")); - console.log( - `Copied ${prebuiltPath} -> ${path.join(__dirname, "lbugjs.node")}.` - ); - console.log("Copying JS files to package directory..."); - const jsSourceDir = path.join( - __dirname, - "lbug-source", - "tools", - "nodejs_api", - "src_js" - ); + const jsSourceDir = path.join(lbugSourceDir, "tools", "nodejs_api", "src_js"); const jsFiles = fs.readdirSync(jsSourceDir).filter((file) => { return file.endsWith(".js") || file.endsWith(".mjs") || file.endsWith(".d.ts"); }); - console.log("Files to copy: "); - for (const file of jsFiles) { - console.log(" " + file); - } for (const file of jsFiles) { fs.copyFileSync(path.join(jsSourceDir, file), path.join(__dirname, file)); } - console.log("Copied JS files to package directory."); console.log("Done!"); process.exit(0); } else { console.log("Prebuilt binary is not available, building from source..."); } +if (!fsCallback.existsSync(lbugSourceDir)) { + console.error( + "lbug-source/ not found (install from git clone). Add prebuilt binary to prebuilt/lbugjs-" + + platform + + "-" + + arch + + ".node and commit, or install from a full clone and build there." + ); + process.exit(1); +} + // Get number of threads const THREADS = os.cpus().length; console.log(`Using ${THREADS} threads to build Lbug.`); diff --git a/tools/nodejs_api/package.json b/tools/nodejs_api/package.json index f6ff61a8b3..f574bf8036 100644 --- a/tools/nodejs_api/package.json +++ b/tools/nodejs_api/package.json @@ -5,14 +5,19 @@ "main": "index.js", "module": "./index.mjs", "types": "./lbug.d.ts", - "exports":{ - ".":{ + "exports": { + ".": { "require": "./index.js", "import": "./index.mjs", "types": "./lbug.d.ts" } }, - "files": ["index.js", "index.mjs", "lbug.d.ts", "lbugjs.node"], + "files": [ + "index.js", + "index.mjs", + "lbug.d.ts", + "lbugjs.node" + ], "type": "commonjs", "homepage": "https://ladybugdb.com/", "repository": { @@ -20,6 +25,7 @@ "url": "git+https://github.com/LadybugDB/ladybug.git" }, "scripts": { + "install": "node install.js", "test": "mocha test --timeout 20000", "clean": "node clean.js", "clean-all": "node clean.js all", @@ -37,4 +43,4 @@ "cmake-js": "^7.3.0", "node-addon-api": "^6.0.0" } -} +} \ No newline at end of file diff --git a/tools/nodejs_api/src_cpp/include/node_connection.h b/tools/nodejs_api/src_cpp/include/node_connection.h index caacef92c3..907c478bea 100644 --- a/tools/nodejs_api/src_cpp/include/node_connection.h +++ b/tools/nodejs_api/src_cpp/include/node_connection.h @@ -8,6 +8,7 @@ #include "node_prepared_statement.h" #include "node_progress_bar_display.h" #include "node_query_result.h" +#include "node_scan_replacement.h" #include using namespace lbug::main; @@ -30,15 +31,20 @@ class NodeConnection : public Napi::ObjectWrap { void InitCppConnection(); void SetMaxNumThreadForExec(const Napi::CallbackInfo& info); void SetQueryTimeout(const Napi::CallbackInfo& info); + void Interrupt(const Napi::CallbackInfo& info); Napi::Value ExecuteAsync(const Napi::CallbackInfo& info); Napi::Value QueryAsync(const Napi::CallbackInfo& info); Napi::Value ExecuteSync(const Napi::CallbackInfo& info); Napi::Value QuerySync(const Napi::CallbackInfo& info); void Close(const Napi::CallbackInfo& info); + Napi::Value RegisterStream(const Napi::CallbackInfo& info); + void UnregisterStream(const Napi::CallbackInfo& info); + void ReturnChunk(const Napi::CallbackInfo& info); private: std::shared_ptr database; std::shared_ptr connection; + std::unique_ptr streamRegistry; }; class ConnectionInitAsyncWorker : public Napi::AsyncWorker { diff --git a/tools/nodejs_api/src_cpp/include/node_progress_bar_display.h b/tools/nodejs_api/src_cpp/include/node_progress_bar_display.h index 7813820cb0..e777c94af5 100644 --- a/tools/nodejs_api/src_cpp/include/node_progress_bar_display.h +++ b/tools/nodejs_api/src_cpp/include/node_progress_bar_display.h @@ -13,6 +13,8 @@ using namespace common; */ class NodeProgressBarDisplay : public ProgressBarDisplay { public: + ~NodeProgressBarDisplay() override; + void updateProgress(uint64_t queryID, double newPipelineProgress, uint32_t newNumPipelinesFinished) override; diff --git a/tools/nodejs_api/src_cpp/include/node_query_result.h b/tools/nodejs_api/src_cpp/include/node_query_result.h index b9ee4db979..77129e8b55 100644 --- a/tools/nodejs_api/src_cpp/include/node_query_result.h +++ b/tools/nodejs_api/src_cpp/include/node_query_result.h @@ -102,6 +102,7 @@ class NodeQueryResultGetNextAsyncWorker : public Napi::AsyncWorker { try { if (!nodeQueryResult->queryResult->hasNext()) { cppTuple.reset(); + return; } cppTuple = nodeQueryResult->queryResult->getNext(); } catch (const std::exception& exc) { @@ -112,7 +113,7 @@ class NodeQueryResultGetNextAsyncWorker : public Napi::AsyncWorker { inline void OnOK() override { auto env = Env(); if (cppTuple == nullptr) { - Callback().Call({env.Null(), env.Undefined()}); + Callback().Call({env.Null(), env.Null()}); return; } Napi::Object nodeTuple = Napi::Object::New(env); diff --git a/tools/nodejs_api/src_cpp/include/node_scan_replacement.h b/tools/nodejs_api/src_cpp/include/node_scan_replacement.h new file mode 100644 index 0000000000..bd0109005a --- /dev/null +++ b/tools/nodejs_api/src_cpp/include/node_scan_replacement.h @@ -0,0 +1,60 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "function/table/scan_replacement.h" +#include "function/table/table_function.h" +#include + +namespace lbug { +namespace main { +class Connection; +} +namespace common { +class LogicalType; +} +} + +struct NodeStreamChunkRequest { + std::mutex mtx; + std::condition_variable cv; + std::vector> rows; + std::vector columnNames; // schema order for object rows + bool done = false; + bool filled = false; +}; + +struct NodeStreamSourceState { + Napi::ThreadSafeFunction getChunkTsf; + std::vector columnNames; + std::vector columnTypes; +}; + +class NodeStreamRegistry { +public: + void registerSource(const std::string& name, Napi::ThreadSafeFunction tsf, + std::vector columnNames, + std::vector columnTypes); + void unregisterSource(const std::string& name); + std::vector lookup(const std::string& name) const; + std::unique_ptr replace( + std::span handles) const; + + static NodeStreamChunkRequest* getChunkRequest(uint64_t requestId); + static void setChunkRequest(uint64_t requestId, std::unique_ptr req); + static uint64_t nextRequestId(); + +private: + mutable std::mutex mtx_; + std::unordered_map> sources_; +}; + +void addNodeScanReplacement(lbug::main::Connection* connection, NodeStreamRegistry* registry); + +void returnChunkFromJS(uint64_t requestId, Napi::Array rowsNapi, bool done); diff --git a/tools/nodejs_api/src_cpp/include/node_stream_scan.h b/tools/nodejs_api/src_cpp/include/node_stream_scan.h new file mode 100644 index 0000000000..5c7328b6be --- /dev/null +++ b/tools/nodejs_api/src_cpp/include/node_stream_scan.h @@ -0,0 +1,29 @@ +#pragma once + +#include "function/table/bind_data.h" +#include "function/table/table_function.h" + +struct NodeStreamSourceState; // defined in node_scan_replacement.h + +struct NodeStreamScanFunctionData : lbug::function::TableFuncBindData { + std::shared_ptr state; + + NodeStreamScanFunctionData(lbug::binder::expression_vector columns, + std::shared_ptr state) + : TableFuncBindData(std::move(columns), 0), state(std::move(state)) {} + + std::unique_ptr copy() const override { + return std::make_unique(columns, state); + } +}; + +namespace lbug { +namespace function { + +struct NodeStreamScanFunction { + static constexpr const char* name = "NODE_STREAM_SCAN"; + static TableFunction getFunction(); +}; + +} // namespace function +} // namespace lbug diff --git a/tools/nodejs_api/src_cpp/include/node_util.h b/tools/nodejs_api/src_cpp/include/node_util.h index 4ea7a50674..cd27b21371 100644 --- a/tools/nodejs_api/src_cpp/include/node_util.h +++ b/tools/nodejs_api/src_cpp/include/node_util.h @@ -10,10 +10,10 @@ class Util { static Napi::Value ConvertToNapiObject(const Value& value, Napi::Env env); static std::unordered_map> TransformParametersForExec( Napi::Array params); + static Value TransformNapiValue(Napi::Value napiValue); private: static Napi::Object ConvertNodeIdToNapiObject(const nodeID_t& nodeId, Napi::Env env); - static Value TransformNapiValue(Napi::Value napiValue); const static int64_t JS_MAX_SAFE_INTEGER = 9007199254740991; const static int64_t JS_MIN_SAFE_INTEGER = -9007199254740991; }; diff --git a/tools/nodejs_api/src_cpp/node_connection.cpp b/tools/nodejs_api/src_cpp/node_connection.cpp index 404903904f..4427de0e18 100644 --- a/tools/nodejs_api/src_cpp/node_connection.cpp +++ b/tools/nodejs_api/src_cpp/node_connection.cpp @@ -1,9 +1,11 @@ #include "include/node_connection.h" +#include #include #include "include/node_database.h" #include "include/node_query_result.h" +#include "include/node_scan_replacement.h" #include "include/node_util.h" #include "main/lbug.h" @@ -19,7 +21,11 @@ Napi::Object NodeConnection::Init(Napi::Env env, Napi::Object exports) { InstanceMethod("querySync", &NodeConnection::QuerySync), InstanceMethod("setMaxNumThreadForExec", &NodeConnection::SetMaxNumThreadForExec), InstanceMethod("setQueryTimeout", &NodeConnection::SetQueryTimeout), - InstanceMethod("close", &NodeConnection::Close)}); + InstanceMethod("interrupt", &NodeConnection::Interrupt), + InstanceMethod("close", &NodeConnection::Close), + InstanceMethod("registerStream", &NodeConnection::RegisterStream), + InstanceMethod("unregisterStream", &NodeConnection::UnregisterStream), + InstanceMethod("returnChunk", &NodeConnection::ReturnChunk)}); exports.Set("NodeConnection", t); return exports; @@ -57,6 +63,8 @@ void NodeConnection::InitCppConnection() { this->connection = std::make_shared(database.get()); ProgressBar::Get(*connection->getClientContext()) ->setDisplay(std::make_shared()); + streamRegistry = std::make_unique(); + addNodeScanReplacement(connection.get(), streamRegistry.get()); // After the connection is initialized, we do not need to hold a reference to the database. database.reset(); } @@ -83,9 +91,16 @@ void NodeConnection::SetQueryTimeout(const Napi::CallbackInfo& info) { } } +void NodeConnection::Interrupt(const Napi::CallbackInfo& /* info */) { + if (this->connection) { + this->connection->interrupt(); + } +} + void NodeConnection::Close(const Napi::CallbackInfo& info) { Napi::Env env = info.Env(); Napi::HandleScope scope(env); + streamRegistry.reset(); this->connection.reset(); } @@ -157,3 +172,93 @@ Napi::Value NodeConnection::QueryAsync(const Napi::CallbackInfo& info) { asyncWorker->Queue(); return info.Env().Undefined(); } + +static lbug::common::LogicalType parseColumnType(const std::string& typeStr) { + std::string upper = typeStr; + std::transform(upper.begin(), upper.end(), upper.begin(), ::toupper); + if (upper == "INT64") return lbug::common::LogicalType(lbug::common::LogicalTypeID::INT64); + if (upper == "INT32") return lbug::common::LogicalType(lbug::common::LogicalTypeID::INT32); + if (upper == "INT16") return lbug::common::LogicalType(lbug::common::LogicalTypeID::INT16); + if (upper == "INT8") return lbug::common::LogicalType(lbug::common::LogicalTypeID::INT8); + if (upper == "UINT64") return lbug::common::LogicalType(lbug::common::LogicalTypeID::UINT64); + if (upper == "UINT32") return lbug::common::LogicalType(lbug::common::LogicalTypeID::UINT32); + if (upper == "UINT16") return lbug::common::LogicalType(lbug::common::LogicalTypeID::UINT16); + if (upper == "UINT8") return lbug::common::LogicalType(lbug::common::LogicalTypeID::UINT8); + if (upper == "DOUBLE") return lbug::common::LogicalType(lbug::common::LogicalTypeID::DOUBLE); + if (upper == "FLOAT") return lbug::common::LogicalType(lbug::common::LogicalTypeID::FLOAT); + if (upper == "STRING") return lbug::common::LogicalType(lbug::common::LogicalTypeID::STRING); + if (upper == "BOOL" || upper == "BOOLEAN") + return lbug::common::LogicalType(lbug::common::LogicalTypeID::BOOL); + if (upper == "DATE") return lbug::common::LogicalType(lbug::common::LogicalTypeID::DATE); + if (upper == "TIMESTAMP") + return lbug::common::LogicalType(lbug::common::LogicalTypeID::TIMESTAMP); + throw std::runtime_error("Unsupported column type for registerStream: " + typeStr); +} + +Napi::Value NodeConnection::RegisterStream(const Napi::CallbackInfo& info) { + Napi::Env env = info.Env(); + Napi::HandleScope scope(env); + if (!connection || !streamRegistry) { + Napi::Error::New(env, "Connection not initialized.").ThrowAsJavaScriptException(); + return env.Undefined(); + } + if (info.Length() < 3 || !info[0].IsString() || !info[1].IsFunction() || !info[2].IsArray()) { + Napi::Error::New(env, + "registerStream(name, getChunkCallback, columns): name string, getChunkCallback " + "function(requestId), columns array of { name, type }.") + .ThrowAsJavaScriptException(); + return env.Undefined(); + } + std::string name = info[0].As().Utf8Value(); + Napi::Function getChunkCallback = info[1].As(); + Napi::Array columnsArr = info[2].As(); + std::vector columnNames; + std::vector columnTypes; + for (uint32_t i = 0; i < columnsArr.Length(); i++) { + Napi::Value col = columnsArr.Get(i); + if (!col.IsObject()) continue; + Napi::Object obj = col.As(); + if (!obj.Get("name").IsString() || !obj.Get("type").IsString()) continue; + columnNames.push_back(obj.Get("name").As().Utf8Value()); + columnTypes.push_back( + parseColumnType(obj.Get("type").As().Utf8Value())); + } + if (columnNames.empty()) { + Napi::Error::New(env, "registerStream: at least one column required.").ThrowAsJavaScriptException(); + return env.Undefined(); + } + try { + auto tsf = Napi::ThreadSafeFunction::New( + env, getChunkCallback, "NodeStreamGetChunk", 0, 1); + streamRegistry->registerSource(name, std::move(tsf), std::move(columnNames), + std::move(columnTypes)); + } catch (const std::exception& exc) { + Napi::Error::New(env, std::string(exc.what())).ThrowAsJavaScriptException(); + } + return env.Undefined(); +} + +void NodeConnection::UnregisterStream(const Napi::CallbackInfo& info) { + Napi::Env env = info.Env(); + Napi::HandleScope scope(env); + if (!streamRegistry) return; + if (info.Length() < 1 || !info[0].IsString()) { + Napi::Error::New(env, "unregisterStream(name): name string.").ThrowAsJavaScriptException(); + return; + } + streamRegistry->unregisterSource(info[0].As().Utf8Value()); +} + +void NodeConnection::ReturnChunk(const Napi::CallbackInfo& info) { + Napi::Env env = info.Env(); + if (info.Length() < 3 || !info[0].IsNumber() || !info[1].IsArray() || !info[2].IsBoolean()) { + Napi::Error::New(env, + "returnChunk(requestId, rows, done): requestId number, rows array of rows, done boolean.") + .ThrowAsJavaScriptException(); + return; + } + uint64_t requestId = static_cast(info[0].ToNumber().Int64Value()); + Napi::Array rows = info[1].As(); + bool done = info[2].ToBoolean().Value(); + returnChunkFromJS(requestId, rows, done); +} diff --git a/tools/nodejs_api/src_cpp/node_progress_bar_display.cpp b/tools/nodejs_api/src_cpp/node_progress_bar_display.cpp index 352775624a..a76dbd953e 100644 --- a/tools/nodejs_api/src_cpp/node_progress_bar_display.cpp +++ b/tools/nodejs_api/src_cpp/node_progress_bar_display.cpp @@ -3,6 +3,14 @@ using namespace lbug; using namespace common; +NodeProgressBarDisplay::~NodeProgressBarDisplay() { + std::unique_lock lock(callbackMutex); + for (auto& kv : queryCallbacks) { + kv.second.Release(); + } + queryCallbacks.clear(); +} + void NodeProgressBarDisplay::updateProgress(uint64_t queryID, double newPipelineProgress, uint32_t newNumPipelinesFinished) { if (numPipelines == 0) { diff --git a/tools/nodejs_api/src_cpp/node_scan_replacement.cpp b/tools/nodejs_api/src_cpp/node_scan_replacement.cpp new file mode 100644 index 0000000000..8fdb757f46 --- /dev/null +++ b/tools/nodejs_api/src_cpp/node_scan_replacement.cpp @@ -0,0 +1,134 @@ +#include "include/node_scan_replacement.h" +#include "include/node_stream_scan.h" +#include "include/node_util.h" + +#include "function/table/bind_input.h" +#include "main/client_context.h" +#include "main/connection.h" + +#include + +using namespace lbug::common; +using namespace lbug::function; +using namespace lbug::main; + +namespace { + +std::mutex g_requestMutex; +std::atomic g_nextRequestId{1}; +std::unordered_map> g_chunkRequests; + +} // namespace + +void NodeStreamRegistry::registerSource(const std::string& name, Napi::ThreadSafeFunction tsf, + std::vector columnNames, std::vector columnTypes) { + std::lock_guard lock(mtx_); + auto state = std::make_shared(); + state->getChunkTsf = std::move(tsf); + state->columnNames = std::move(columnNames); + state->columnTypes = std::move(columnTypes); + sources_[name] = std::move(state); +} + +void NodeStreamRegistry::unregisterSource(const std::string& name) { + std::lock_guard lock(mtx_); + sources_.erase(name); +} + +std::vector NodeStreamRegistry::lookup(const std::string& name) const { + std::lock_guard lock(mtx_); + auto it = sources_.find(name); + if (it == sources_.end()) { + return {}; + } + return {reinterpret_cast(it->second.get())}; +} + +std::unique_ptr NodeStreamRegistry::replace( + std::span handles) const { + if (handles.empty()) { + return nullptr; + } + auto* statePtr = reinterpret_cast(handles[0]); + auto state = std::shared_ptr(statePtr, [](NodeStreamSourceState*) {}); + auto data = std::make_unique(); + data->func = NodeStreamScanFunction::getFunction(); + data->bindInput.addLiteralParam(Value::createValue(reinterpret_cast(statePtr))); + return data; +} + +NodeStreamChunkRequest* NodeStreamRegistry::getChunkRequest(uint64_t requestId) { + std::lock_guard lock(g_requestMutex); + auto it = g_chunkRequests.find(requestId); + return it != g_chunkRequests.end() ? it->second.get() : nullptr; +} + +void NodeStreamRegistry::setChunkRequest(uint64_t requestId, + std::unique_ptr req) { + std::lock_guard lock(g_requestMutex); + if (req) { + g_chunkRequests[requestId] = std::move(req); + } else { + g_chunkRequests.erase(requestId); + } +} + +uint64_t NodeStreamRegistry::nextRequestId() { + return g_nextRequestId++; +} + +static std::vector lookupNodeStream(const std::string& objectName, + void* registryVoid) { + auto* registry = static_cast(registryVoid); + return registry->lookup(objectName); +} + +static std::unique_ptr replaceNodeStream( + std::span handles, void* registryVoid) { + auto* registry = static_cast(registryVoid); + return registry->replace(handles); +} + +void addNodeScanReplacement(Connection* connection, NodeStreamRegistry* registry) { + auto lookup = [registry](const std::string& name) { + return lookupNodeStream(name, registry); + }; + auto replace = [registry](std::span handles) { + return replaceNodeStream(handles, registry); + }; + connection->getClientContext()->addScanReplace(ScanReplacement(std::move(lookup), replace)); +} + +void returnChunkFromJS(uint64_t requestId, Napi::Array rowsNapi, bool done) { + auto* req = NodeStreamRegistry::getChunkRequest(requestId); + if (!req) { + return; + } + std::vector> rows; + const size_t numRows = rowsNapi.Length(); + rows.reserve(numRows); + for (size_t r = 0; r < numRows; r++) { + Napi::Value rowVal = rowsNapi.Get(r); + std::vector row; + if (rowVal.IsArray()) { + auto arr = rowVal.As(); + for (size_t c = 0; c < arr.Length(); c++) { + row.push_back(Util::TransformNapiValue(arr.Get(c))); + } + } else if (rowVal.IsObject() && !rowVal.IsNull() && !rowVal.IsUndefined()) { + auto obj = rowVal.As(); + const auto& colNames = req->columnNames; + for (const auto& colName : colNames) { + row.push_back(Util::TransformNapiValue(obj.Get(colName))); + } + } + rows.push_back(std::move(row)); + } + { + std::lock_guard lock(req->mtx); + req->rows = std::move(rows); + req->done = done; + req->filled = true; + } + req->cv.notify_one(); +} diff --git a/tools/nodejs_api/src_cpp/node_stream_scan.cpp b/tools/nodejs_api/src_cpp/node_stream_scan.cpp new file mode 100644 index 0000000000..529c10b91d --- /dev/null +++ b/tools/nodejs_api/src_cpp/node_stream_scan.cpp @@ -0,0 +1,103 @@ +#include "include/node_stream_scan.h" +#include "include/node_scan_replacement.h" + +#include "binder/binder.h" +#include "common/constants.h" +#include "common/system_config.h" +#include "function/table/bind_input.h" +#include "processor/execution_context.h" +#include "processor/result/factorized_table.h" + +#include + +using namespace lbug::common; +using namespace lbug::function; + +namespace lbug { + +namespace { +std::mutex g_nodeStreamTableFuncMutex; +} + +static std::unique_ptr bindFunc(lbug::main::ClientContext*, + const TableFuncBindInput* input) { + auto* statePtr = reinterpret_cast(input->getLiteralVal(0)); + KU_ASSERT(statePtr != nullptr); + std::shared_ptr state(statePtr, [](NodeStreamSourceState*) {}); + auto columns = input->binder->createVariables(state->columnNames, state->columnTypes); + return std::make_unique(std::move(columns), state); +} + +static std::unique_ptr initSharedState( + const TableFuncInitSharedStateInput&) { + return std::make_unique(0); +} + +static std::unique_ptr initLocalState( + const TableFuncInitLocalStateInput&) { + return std::make_unique(); +} + +static offset_t tableFunc(const TableFuncInput& input, TableFuncOutput& output) { + auto* bindData = input.bindData->constPtrCast(); + auto& state = *bindData->state; + std::unique_lock streamLock(g_nodeStreamTableFuncMutex); + const uint64_t requestId = NodeStreamRegistry::nextRequestId(); + auto req = std::make_unique(); + req->columnNames = state.columnNames; + NodeStreamRegistry::setChunkRequest(requestId, std::move(req)); + NodeStreamChunkRequest* reqPtr = NodeStreamRegistry::getChunkRequest(requestId); + KU_ASSERT(reqPtr != nullptr); + + state.getChunkTsf.BlockingCall(&requestId, + [](Napi::Env env, Napi::Function jsCallback, const uint64_t* idPtr) { + jsCallback.Call({Napi::Number::New(env, static_cast(*idPtr))}); + }); + + std::vector> rowsCopy; + { + std::unique_lock lock(reqPtr->mtx); + reqPtr->cv.wait(lock, [reqPtr] { return reqPtr->filled; }); + rowsCopy = std::move(reqPtr->rows); + } + NodeStreamRegistry::setChunkRequest(requestId, nullptr); + streamLock.unlock(); + + const offset_t numRows = static_cast(rowsCopy.size()); + if (numRows == 0) { + return 0; + } + + const auto numCols = bindData->getNumColumns(); + const auto cap = std::min(numRows, static_cast(DEFAULT_VECTOR_CAPACITY)); + for (offset_t r = 0; r < cap; r++) { + for (auto c = 0u; c < numCols; c++) { + auto& vec = output.dataChunk.getValueVectorMutable(c); + if (c < rowsCopy[r].size() && !rowsCopy[r][c].isNull()) { + vec.setNull(r, false); + vec.copyFromValue(r, rowsCopy[r][c]); + } else { + vec.setNull(r, true); + } + } + } + output.dataChunk.state->getSelVectorUnsafe().setSelSize(cap); + return cap; +} + +static double progressFunc(TableFuncSharedState*) { + return 0.0; +} + +TableFunction NodeStreamScanFunction::getFunction() { + TableFunction func(name, std::vector{LogicalTypeID::POINTER}); + func.tableFunc = tableFunc; + func.bindFunc = bindFunc; + func.initSharedStateFunc = initSharedState; + func.initLocalStateFunc = initLocalState; + func.progressFunc = progressFunc; + func.canParallelFunc = [] { return false; }; + return func; +} + +} // namespace lbug diff --git a/tools/nodejs_api/src_js/connection.js b/tools/nodejs_api/src_js/connection.js index 575d9f27ff..652094741e 100644 --- a/tools/nodejs_api/src_js/connection.js +++ b/tools/nodejs_api/src_js/connection.js @@ -121,13 +121,20 @@ class Connection { * Execute a prepared statement with the given parameters. * @param {lbug.PreparedStatement} preparedStatement the prepared statement to execute. * @param {Object} params a plain object mapping parameter names to values. - * @param {Function} [progressCallback] - Optional callback function that is invoked with the progress of the query execution. The callback receives three arguments: pipelineProgress, numPipelinesFinished, and numPipelines. - * @returns {Promise} a promise that resolves to the query result. The promise is rejected if there is an error. + * @param {Object|Function} [optionsOrProgressCallback] - Options { signal?: AbortSignal, progressCallback?: Function } or legacy progress callback. + * @returns {Promise} a promise that resolves to the query result. Rejects if error or options.signal is aborted. */ - execute(preparedStatement, params = {}, progressCallback) { + execute(preparedStatement, params = {}, optionsOrProgressCallback) { + const { signal, progressCallback } = this._normalizeQueryOptions(optionsOrProgressCallback); return new Promise((resolve, reject) => { + if (progressCallback !== undefined && typeof progressCallback !== "function") { + return reject(new Error("progressCallback must be a function.")); + } + if (optionsOrProgressCallback != null && typeof optionsOrProgressCallback !== "function" && typeof optionsOrProgressCallback !== "object") { + return reject(new Error("progressCallback must be a function.")); + } if ( - !typeof preparedStatement === "object" || + typeof preparedStatement !== "object" || preparedStatement.constructor.name !== "PreparedStatement" ) { return reject( @@ -147,11 +154,29 @@ class Connection { const value = params[key]; paramArray.push([key, value]); } - if (progressCallback && typeof progressCallback !== "function") { - return reject(new Error("progressCallback must be a function.")); + if (signal?.aborted) { + return reject(this._createAbortError()); + } + let abortListener; + const cleanup = () => { + if (signal && abortListener) { + signal.removeEventListener("abort", abortListener); + } + }; + if (signal) { + abortListener = () => { + this.interrupt(); + cleanup(); + reject(this._createAbortError()); + }; + signal.addEventListener("abort", abortListener); } this._getConnection() .then((connection) => { + if (signal?.aborted) { + cleanup(); + return reject(this._createAbortError()); + } const nodeQueryResult = new LbugNative.NodeQueryResult(); try { connection.executeAsync( @@ -159,7 +184,11 @@ class Connection { nodeQueryResult, paramArray, (err) => { + cleanup(); if (err) { + if (signal?.aborted && err.message === "Interrupted.") { + return reject(this._createAbortError()); + } return reject(err); } this._unwrapMultipleQueryResults(nodeQueryResult) @@ -173,10 +202,12 @@ class Connection { progressCallback ); } catch (e) { + cleanup(); return reject(e); } }) .catch((err) => { + cleanup(); return reject(err); }); }); @@ -261,26 +292,65 @@ class Connection { return new PreparedStatement(this, preparedStatement); } + /** + * Interrupt the currently executing query on this connection. + * No-op if the connection is not initialized or no query is running. + */ + interrupt() { + if (this._connection) { + this._connection.interrupt(); + } + } + /** * Execute a query. * @param {String} statement the statement to execute. - * @param {Function} [progressCallback] - Optional callback function that is invoked with the progress of the query execution. The callback receives three arguments: pipelineProgress, numPipelinesFinished, and numPipelines. - * @returns {Promise} a promise that resolves to the query result. The promise is rejected if there is an error. + * @param {Object|Function} [optionsOrProgressCallback] - Options object { signal?: AbortSignal, progressCallback?: Function } or legacy progress callback. + * @returns {Promise} a promise that resolves to the query result. The promise is rejected if there is an error or if options.signal is aborted. */ - query(statement, progressCallback) { + query(statement, optionsOrProgressCallback) { + const { signal, progressCallback } = this._normalizeQueryOptions(optionsOrProgressCallback); return new Promise((resolve, reject) => { + if (progressCallback !== undefined && typeof progressCallback !== "function") { + return reject(new Error("progressCallback must be a function.")); + } + if (optionsOrProgressCallback != null && typeof optionsOrProgressCallback !== "function" && typeof optionsOrProgressCallback !== "object") { + return reject(new Error("progressCallback must be a function.")); + } if (typeof statement !== "string") { return reject(new Error("statement must be a string.")); } - if (progressCallback && typeof progressCallback !== "function") { - return reject(new Error("progressCallback must be a function.")); + if (signal?.aborted) { + return reject(this._createAbortError()); + } + let abortListener; + const cleanup = () => { + if (signal && abortListener) { + signal.removeEventListener("abort", abortListener); + } + }; + if (signal) { + abortListener = () => { + this.interrupt(); + cleanup(); + reject(this._createAbortError()); + }; + signal.addEventListener("abort", abortListener); } this._getConnection() .then((connection) => { + if (signal?.aborted) { + cleanup(); + return reject(this._createAbortError()); + } const nodeQueryResult = new LbugNative.NodeQueryResult(); try { connection.queryAsync(statement, nodeQueryResult, (err) => { + cleanup(); if (err) { + if (signal?.aborted && err.message === "Interrupted.") { + return reject(this._createAbortError()); + } return reject(err); } this._unwrapMultipleQueryResults(nodeQueryResult) @@ -293,15 +363,150 @@ class Connection { }, progressCallback); } catch (e) { + cleanup(); return reject(e); } }) .catch((err) => { + cleanup(); return reject(err); }); }); } + _normalizeQueryOptions(optionsOrProgressCallback) { + if (optionsOrProgressCallback == null) { + return { signal: undefined, progressCallback: undefined }; + } + if (typeof optionsOrProgressCallback === "function") { + return { signal: undefined, progressCallback: optionsOrProgressCallback }; + } + if (typeof optionsOrProgressCallback === "object" && optionsOrProgressCallback !== null) { + return { + signal: optionsOrProgressCallback.signal, + progressCallback: optionsOrProgressCallback.progressCallback, + }; + } + return { signal: undefined, progressCallback: undefined }; + } + + _createAbortError() { + return new DOMException("The operation was aborted.", "AbortError"); + } + + /** + * Check that the connection is alive (e.g. for connection pools or health checks). + * Runs a trivial query; rejects if the connection is broken. + * @returns {Promise} resolves to true if the connection is OK. + */ + async ping() { + const result = await this.query("RETURN 1"); + const closeResult = (r) => { + if (Array.isArray(r)) { + r.forEach((q) => q.close()); + } else { + r.close(); + } + }; + closeResult(result); + return true; + } + + /** + * Run EXPLAIN on a Cypher statement and return the plan as a string. + * @param {string} statement – Cypher statement (e.g. "MATCH (a:person) RETURN a") + * @returns {Promise} the plan string (one row per line) + */ + async explain(statement) { + if (typeof statement !== "string") { + throw new Error("explain: statement must be a string."); + } + const trimmed = statement.trim(); + const explainStatement = trimmed.toUpperCase().startsWith("EXPLAIN") ? trimmed : "EXPLAIN " + trimmed; + const result = await this.query(explainStatement); + const single = Array.isArray(result) ? result[0] : result; + const rows = await single.getAll(); + single.close(); + if (rows.length === 0) { + return ""; + } + return rows + .map((row) => Object.values(row).join(" | ")) + .join("\n"); + } + + /** + * Register a stream source for LOAD FROM name. The source must be AsyncIterable; each yielded + * value is a row (array of column values in schema order, or object keyed by column name). + * Call unregisterStream(name) when done or before reusing the name. + * @param {string} name – name used in Cypher: LOAD FROM name RETURN ... + * @param {AsyncIterable|Object>} source – async iterable of rows + * @param {{ columns: Array<{ name: string, type: string }> }} options – schema (required). type: INT64, INT32, DOUBLE, STRING, BOOL, DATE, etc. + */ + async registerStream(name, source, options = {}) { + if (typeof name !== "string") { + throw new Error("registerStream: name must be a string."); + } + const columns = options.columns; + if (!Array.isArray(columns) || columns.length === 0) { + throw new Error("registerStream: options.columns (array of { name, type }) is required."); + } + const conn = await this._getConnection(); + const it = source[Symbol.asyncIterator] ? source[Symbol.asyncIterator].call(source) : source; + const pending = []; + let consumerRunning = false; + + const toRows = (raw) => { + if (raw == null) return []; + if (Array.isArray(raw)) { + const first = raw[0]; + const isArrayOfRows = + raw.length > 0 && + (Array.isArray(first) || (typeof first === "object" && first !== null && !Array.isArray(first))); + return isArrayOfRows ? raw : [raw]; + } + return [raw]; + }; + + const runConsumer = async () => { + pending.sort((a, b) => a - b); + while (pending.length > 0) { + const requestId = pending.shift(); + try { + const n = await it.next(); + const { rows, done } = { rows: toRows(n.value), done: n.done }; + conn.returnChunk(requestId, rows, done); + } catch (e) { + conn.returnChunk(requestId, [], true); + } + } + consumerRunning = false; + }; + + const getChunk = (requestId) => { + pending.push(requestId); + if (!consumerRunning) { + consumerRunning = true; + setImmediate(() => runConsumer()); + } + }; + conn.registerStream(name, getChunk, columns); + } + + /** + * Unregister a stream source by name. + * @param {string} name – name passed to registerStream + */ + unregisterStream(name) { + if (typeof name !== "string") { + throw new Error("unregisterStream: name must be a string."); + } + if (!this._connection) { + return; + } + this._connection.unregisterStream(name); + } + /** * Execute a query synchronously. * @param {String} statement the statement to execute. This function blocks the main thread for the duration of the query, so use it with caution. @@ -396,6 +601,37 @@ class Connection { } } + /** + * Run a function inside a single write transaction. On success commits, on throw rolls back and rethrows. + * Uses Cypher BEGIN TRANSACTION / COMMIT / ROLLBACK under the hood. + * @param {Function} fn async function to run; can use this connection's query/execute inside. + * @returns {Promise<*>} the value returned by fn. + */ + async transaction(fn) { + if (typeof fn !== "function") { + throw new Error("transaction() requires a function."); + } + const closeResult = (r) => { + if (Array.isArray(r)) { + r.forEach((q) => q.close()); + } else { + r.close(); + } + }; + const beginRes = await this.query("BEGIN TRANSACTION"); + closeResult(beginRes); + try { + const result = await fn(); + const commitRes = await this.query("COMMIT"); + closeResult(commitRes); + return result; + } catch (e) { + const rollbackRes = await this.query("ROLLBACK"); + closeResult(rollbackRes); + throw e; + } + } + /** * Set the timeout for queries. Queries that take longer than the timeout * will be aborted. diff --git a/tools/nodejs_api/src_js/lbug.d.ts b/tools/nodejs_api/src_js/lbug.d.ts index 977be1b157..2ee174241c 100644 --- a/tools/nodejs_api/src_js/lbug.d.ts +++ b/tools/nodejs_api/src_js/lbug.d.ts @@ -22,6 +22,15 @@ export type ProgressCallback = ( numPipelines: number ) => void; +/** + * Options for query() and execute(). + * Use signal to cancel the operation via AbortController. + */ +export interface QueryOptions { + signal?: AbortSignal; + progressCallback?: ProgressCallback; +} + /** * Represents a node ID in the graph database. */ @@ -117,6 +126,8 @@ export class Database { * @param maxDBSize Maximum size of the database in bytes * @param autoCheckpoint Whether to enable automatic checkpoints * @param checkpointThreshold Threshold for automatic checkpoints + * @param throwOnWalReplayFailure If true, WAL replay failures throw; otherwise replay stops at error + * @param enableChecksums If true, use checksums to detect WAL corruption */ constructor( databasePath?: string, @@ -125,7 +136,9 @@ export class Database { readOnly?: boolean, maxDBSize?: number, autoCheckpoint?: boolean, - checkpointThreshold?: number + checkpointThreshold?: number, + throwOnWalReplayFailure?: boolean, + enableChecksums?: boolean ); /** @@ -200,6 +213,12 @@ export class Connection { */ setQueryTimeout(timeoutInMs: number): void; + /** + * Interrupt the currently executing query on this connection. + * No-op if the connection is not initialized or no query is running. + */ + interrupt(): void; + /** * Close the connection. * @returns Promise that resolves when connection is closed @@ -215,13 +234,13 @@ export class Connection { * Execute a prepared statement. * @param preparedStatement The prepared statement to execute * @param params Parameters for the query as a plain object - * @param progressCallback Optional progress callback - * @returns Promise that resolves to the query result(s) + * @param optionsOrProgressCallback Options (e.g. signal for abort) or legacy progress callback + * @returns Promise that resolves to the query result(s). Rejects with DOMException AbortError if signal is aborted. */ execute( preparedStatement: PreparedStatement, params?: Record, - progressCallback?: ProgressCallback + optionsOrProgressCallback?: QueryOptions | ProgressCallback ): Promise; /** @@ -252,20 +271,59 @@ export class Connection { /** * Execute a query. * @param statement The statement to execute - * @param progressCallback Optional progress callback - * @returns Promise that resolves to the query result(s) + * @param optionsOrProgressCallback Options (e.g. signal for abort) or legacy progress callback + * @returns Promise that resolves to the query result(s). Rejects with DOMException AbortError if signal is aborted. */ query( statement: string, - progressCallback?: ProgressCallback + optionsOrProgressCallback?: QueryOptions | ProgressCallback ): Promise; + /** + * Run a function inside a single write transaction. Commits on success, rolls back on throw. + * @param fn Async function that can use this connection's query/execute + * @returns Promise that resolves to the return value of fn + */ + transaction(fn: () => Promise): Promise; + /** * Execute a query synchronously. * @param statement The statement to execute * @returns The query result(s) */ querySync(statement: string): QueryResult | QueryResult[]; + + /** + * Check that the connection is alive (e.g. for pools or health checks). + * @returns Promise that resolves to true if OK, rejects if connection is broken + */ + ping(): Promise; + + /** + * Run EXPLAIN on a Cypher statement and return the plan as a string. + * @param statement Cypher statement (e.g. "MATCH (a:person) RETURN a") + * @returns Promise that resolves to the plan string (one row per line) + */ + explain(statement: string): Promise; + + /** + * Register a stream source for LOAD FROM name. Source must be AsyncIterable of rows (array or object). + * Unregister with unregisterStream(name) when done. + * @param name Name used in Cypher: LOAD FROM name RETURN ... + * @param source AsyncIterable of rows (array of column values or object keyed by column name) + * @param options.columns Schema: array of { name: string, type: string } (type: INT64, INT32, DOUBLE, STRING, BOOL, DATE, etc.) + */ + registerStream( + name: string, + source: AsyncIterable>, + options: { columns: Array<{ name: string; type: string }> } + ): Promise; + + /** + * Unregister a stream source by name. + * @param name Name passed to registerStream + */ + unregisterStream(name: string): void; } /** @@ -286,11 +344,25 @@ export class PreparedStatement { getErrorMessage(): string; } +/** + * Query summary with compiling and execution times (milliseconds). + */ +export interface QuerySummary { + compilingTime: number; + executionTime: number; +} + /** * Represents the results of a query execution. * Note: This class is created internally by Connection query methods. + * Supports async iteration: for await (const row of result) { ... } */ -export class QueryResult { +export class QueryResult implements AsyncIterable | null> { + /** + * Async iterator for row-by-row consumption (for await...of). + */ + [Symbol.asyncIterator](): AsyncIterator | null>; + /** * Reset the iterator for reading results. */ @@ -320,6 +392,12 @@ export class QueryResult { */ getNextSync(): Record | null; + /** + * Return a Node.js Readable stream (object mode) that yields one row per chunk. + * @returns Readable stream of row objects + */ + toStream(): import("stream").Readable; + /** * Iterate through the query result with callback functions. * @param resultCallback Callback function called for each row @@ -378,6 +456,18 @@ export class QueryResult { */ getColumnNamesSync(): string[]; + /** + * Get the query summary (compiling and execution time in milliseconds). + * @returns Promise that resolves to the query summary + */ + getQuerySummary(): Promise; + + /** + * Get the query summary synchronously. + * @returns The query summary + */ + getQuerySummarySync(): QuerySummary; + /** * Close the result set and release resources. */ diff --git a/tools/nodejs_api/src_js/query_result.js b/tools/nodejs_api/src_js/query_result.js index 944f23db9c..3d75892eda 100644 --- a/tools/nodejs_api/src_js/query_result.js +++ b/tools/nodejs_api/src_js/query_result.js @@ -1,6 +1,7 @@ "use strict"; const assert = require("assert"); +const { Readable } = require("stream"); class QueryResult { /** @@ -96,6 +97,64 @@ class QueryResult { }); } + /** + * Async iterator for consuming the result row-by-row (e.g. `for await (const row of result)`). + * Does not materialize the full result in memory. + * @returns {AsyncIterator} + */ + [Symbol.asyncIterator]() { + const self = this; + return { + async next() { + self._checkClosed(); + if (!self.hasNext()) { + return { done: true }; + } + try { + const value = await self.getNext(); + if (value === null) { + return { done: true }; + } + return { value, done: false }; + } catch (err) { + return Promise.reject(err); + } + }, + }; + } + + /** + * Return a Node.js Readable stream (object mode) that yields one row per chunk. + * Useful for piping or integrating with stream consumers. Does not require native changes. + * @returns {stream.Readable} Readable stream of row objects. + */ + toStream() { + const self = this; + return new Readable({ + objectMode: true, + read() { + if (self._isClosed) { + return this.push(null); + } + if (!self.hasNext()) { + return this.push(null); + } + self.getNext() + .then((row) => { + if (row !== null && row !== undefined) { + this.push(row); + } + if (!self.hasNext()) { + this.push(null); + } + }) + .catch((err) => { + this.destroy(err); + }); + }, + }); + } + /** * Get all rows of the query result. * @returns {Promise>} a promise that resolves to all rows of the query result. The promise is rejected if there is an error. @@ -229,13 +288,16 @@ class QueryResult { } /** - * Internal function to check if the query result is closed. - * @throws {Error} if the query result is closed. + * Internal function to check if the query result or its connection is closed. + * @throws {Error} if the query result is closed or the connection is closed. */ _checkClosed() { if (this._isClosed) { throw new Error("Query result is closed."); } + if (this._connection._isClosed) { + throw new Error("Connection is closed."); + } } } diff --git a/tools/nodejs_api/test/test.js b/tools/nodejs_api/test/test.js index 4efa4b7e6e..cbd152202d 100644 --- a/tools/nodejs_api/test/test.js +++ b/tools/nodejs_api/test/test.js @@ -18,4 +18,6 @@ describe("lbug", () => { importTest("Concurrent query execution", "./test_concurrency.js"); importTest("Version", "./test_version.js"); importTest("Synchronous API", "./test_sync_api.js"); + importTest("registerStream / LOAD FROM stream", "./test_register_stream.js"); + importTest("Resilience (close during/after use)", "./test_resilience.js"); }); diff --git a/tools/nodejs_api/test/test_connection.js b/tools/nodejs_api/test/test_connection.js index 34345ad26a..e09bf2c1c5 100644 --- a/tools/nodejs_api/test/test_connection.js +++ b/tools/nodejs_api/test/test_connection.js @@ -122,6 +122,50 @@ describe("Execute", function () { }); }); +describe("ping", function () { + it("should resolve to true when connection is alive", async function () { + const ok = await conn.ping(); + assert.strictEqual(ok, true); + }); +}); + +describe("transaction", function () { + it("should commit and return fn result on success", async function () { + const result = await conn.transaction(async () => { + const q = await conn.query("RETURN 42 AS x"); + const rows = await q.getAll(); + q.close(); + return rows[0].x; + }); + assert.equal(result, 42); + }); + + it("should rollback and rethrow on fn error", async function () { + const err = new Error("tx abort"); + try { + await conn.transaction(async () => { + await conn.query("RETURN 1"); + throw err; + }); + assert.fail("transaction should have thrown"); + } catch (e) { + assert.strictEqual(e, err); + } + const q = await conn.query("RETURN 1"); + assert.isTrue(q.hasNext()); + q.close(); + }); + + it("should reject non-function", async function () { + try { + await conn.transaction("not a function"); + assert.fail("transaction should have thrown"); + } catch (e) { + assert.equal(e.message, "transaction() requires a function."); + } + }); +}); + describe("Query", function () { it("should run a valid query", async function () { const queryResult = await conn.query("MATCH (a:person) RETURN COUNT(*)"); @@ -214,6 +258,70 @@ describe("Timeout", function () { }); }); +describe("Interrupt", function () { + it("should abort a long-running query when interrupt() is called", async function () { + if (process.platform === "win32") { + this.skip(); + } + this.timeout(10000); + const newConn = new lbug.Connection(db); + await newConn.init(); + const longQuery = + "UNWIND RANGE(1,100000) AS x UNWIND RANGE(1, 100000) AS y RETURN COUNT(x + y);"; + const queryPromise = newConn.query(longQuery); + setTimeout(() => newConn.interrupt(), 150); + try { + await queryPromise; + assert.fail("No error thrown when the query was interrupted."); + } catch (err) { + assert.equal(err.message, "Interrupted."); + } + }); +}); + +describe("AbortSignal", function () { + it("should reject with AbortError when signal is already aborted before query starts", async function () { + const ac = new AbortController(); + ac.abort(); + try { + await conn.query("RETURN 1", { signal: ac.signal }); + assert.fail("No error thrown when signal was already aborted."); + } catch (err) { + assert.equal(err.name, "AbortError"); + assert.equal(err.message, "The operation was aborted."); + } + }); + + it("should reject with AbortError when signal is aborted during query", async function () { + const newConn = new lbug.Connection(db); + await newConn.init(); + const ac = new AbortController(); + const longQuery = + "UNWIND RANGE(1,100000) AS x UNWIND RANGE(1, 100000) AS y RETURN COUNT(x + y);"; + const queryPromise = newConn.query(longQuery, { signal: ac.signal }); + setTimeout(() => ac.abort(), 150); + try { + await queryPromise; + assert.fail("No error thrown when signal was aborted during query."); + } catch (err) { + assert.equal(err.name, "AbortError"); + } + }); + + it("should work with progressCallback in options object", async function () { + let progressCalled = false; + const result = await conn.query("RETURN 1", { + progressCallback: () => { + progressCalled = true; + }, + }); + assert.exists(result); + const rows = Array.isArray(result) ? result : [result]; + assert.isAtLeast(rows.length, 1); + rows.forEach((r) => r.close()); + }); +}); + describe("Close", function () { it("should close the connection", async function () { const newConn = new lbug.Connection(db); diff --git a/tools/nodejs_api/test/test_database.js b/tools/nodejs_api/test/test_database.js index 06ba4c24f4..57cb692c20 100644 --- a/tools/nodejs_api/test/test_database.js +++ b/tools/nodejs_api/test/test_database.js @@ -9,15 +9,17 @@ const spwan = require("child_process").spawn; const openDatabaseOnSubprocess = (dbPath) => { return new Promise((resolve, _) => { const node = process.argv[0]; + // Use env vars so Windows paths with backslashes don't break the -e code string + const env = { ...process.env, LBUG_PATH: lbugPath, DB_PATH: dbPath }; const code = ` (async() => { - const lbug = require("${lbugPath}"); - const db = new lbug.Database("${dbPath}", 1 << 28); + const lbug = require(process.env.LBUG_PATH); + const db = new lbug.Database(process.env.DB_PATH, 1 << 28); await db.init(); console.log("Database initialized."); })(); `; - const child = spwan(node, ["-e", code]); + const child = spwan(node, ["-e", code], { env }); let stdout = ""; let stderr = ""; child.stdout.on("data", (data) => { @@ -374,10 +376,6 @@ describe("Database constructor", function () { describe("Database close", function () { it("should allow initializing a new database after closing", async function () { - if (process.platform === "win32") { - this._runnable.title += " (skipped: not implemented on Windows)"; - this.skip(); - } const tmpDbPath = await new Promise((resolve, reject) => { tmp.dir({ unsafeCleanup: true }, (err, path, _) => { if (err) { @@ -389,7 +387,6 @@ describe("Database close", function () { const dbPath = path.join(tmpDbPath, "db.kz"); const testDb = new lbug.Database(dbPath, 1 << 28 /* 256MB */); await testDb.init(); - // FIXME: doesn't work properly on windows let subProcessResult = await openDatabaseOnSubprocess(dbPath); assert.notEqual(subProcessResult.code, 0); assert.include( @@ -507,9 +504,17 @@ describe("Database close", function () { assert.deepEqual(tuple, { "+(1,1)": 2 }); testDb.closeSync(); assert.isTrue(testDb._isClosed); - assert.throws(() => conn.querySync("RETURN 1+1"), Error, "Runtime exception: The current operation is not allowed because the parent database is closed."); + assert.throws( + () => conn.querySync("RETURN 1+1"), + Error, + /(Runtime exception:.*parent database is closed|Connection is closed\.)/ + ); conn.closeSync(); assert.isTrue(conn._isClosed); - assert.throws(() => res.resetIterator(), Error, "Runtime exception: The current operation is not allowed because the parent database is closed."); + assert.throws( + () => res.resetIterator(), + Error, + /(Runtime exception:.*parent database is closed|Connection is closed\.)/ + ); }); }); diff --git a/tools/nodejs_api/test/test_query_result.js b/tools/nodejs_api/test/test_query_result.js index 5f84b3f32d..ad306c141e 100644 --- a/tools/nodejs_api/test/test_query_result.js +++ b/tools/nodejs_api/test/test_query_result.js @@ -66,22 +66,30 @@ describe("Get next", function () { } }); - it("should throw an error if there is no next tuple", async function () { + it("should return null when no more tuples", async function () { const queryResult = await conn.query( "MATCH (a:person) RETURN a.ID ORDER BY a.ID" ); for (let i = 0; i < 8; ++i) { await queryResult.getNext(); } - try { - await queryResult.getNext(); - assert.fail("No error thrown when there is no next tuple"); - } catch (err) { - assert.equal( - err.message, - "Runtime exception: No more tuples in QueryResult, Please check hasNext() before calling getNext()." - ); + const exhausted = await queryResult.getNext(); + assert.isNull(exhausted, "getNext() returns null when no more tuples"); + }); + + it("getNext() returns null exactly when hasNext() is false", async function () { + const queryResult = await conn.query( + "MATCH (a:person) RETURN a.ID ORDER BY a.ID" + ); + let count = 0; + while (queryResult.hasNext()) { + const row = await queryResult.getNext(); + assert.isNotNull(row, "getNext() must return value when hasNext() is true"); + count++; } + assert.equal(count, 8); + const afterExhausted = await queryResult.getNext(); + assert.isNull(afterExhausted, "getNext() must return null when hasNext() was false"); }); }); @@ -211,6 +219,46 @@ describe("Get query summary", function () { }); }); +describe("Async iterator (for await...of)", function () { + it("should iterate rows same as getNext", async function () { + const queryResult = await conn.query( + "MATCH (a:person) RETURN a.ID ORDER BY a.ID" + ); + const ids = []; + for await (const row of queryResult) { + ids.push(row["a.ID"]); + } + assert.deepEqual(ids, PERSON_IDS); + }); + + it("should not materialize full result in memory", async function () { + const queryResult = await conn.query( + "MATCH (a:person) RETURN a.ID ORDER BY a.ID" + ); + let count = 0; + for await (const row of queryResult) { + count++; + assert.equal(row["a.ID"], PERSON_IDS[count - 1]); + } + assert.equal(count, PERSON_IDS.length); + }); +}); + +describe("toStream", function () { + it("should yield rows as Readable stream", async function () { + const queryResult = await conn.query( + "MATCH (a:person) RETURN a.ID ORDER BY a.ID" + ); + const stream = queryResult.toStream(); + const rows = []; + for await (const row of stream) { + rows.push(row); + } + const ids = rows.map((r) => r["a.ID"]); + assert.deepEqual(ids, PERSON_IDS); + }); +}); + describe("Close", function () { it("should close the query result", async function () { const queryResult = await conn.query( diff --git a/tools/nodejs_api/test/test_register_stream.js b/tools/nodejs_api/test/test_register_stream.js new file mode 100644 index 0000000000..a2b3fc2eef --- /dev/null +++ b/tools/nodejs_api/test/test_register_stream.js @@ -0,0 +1,81 @@ +const { assert } = require("chai"); + +describe("registerStream / LOAD FROM stream", function () { + it("should LOAD FROM registered stream and return rows", async function () { + async function* rowSource() { + yield [1, "a"]; + yield [2, "b"]; + yield [3, "c"]; + } + await conn.registerStream("mystream", rowSource(), { + columns: [ + { name: "id", type: "INT64" }, + { name: "label", type: "STRING" }, + ], + }); + try { + const result = await conn.query("LOAD FROM mystream RETURN *"); + const rows = Array.isArray(result) ? result : [result]; + assert.isAtLeast(rows.length, 1); + const r = rows[0]; + assert.isTrue(r.hasNext()); + const row1 = await r.getNext(); + assert.exists(row1); + assert.equal(row1["id"], 1); + assert.equal(row1["label"], "a"); + const row2 = await r.getNext(); + assert.equal(row2["id"], 2); + assert.equal(row2["label"], "b"); + const row3 = await r.getNext(); + assert.equal(row3["id"], 3); + assert.equal(row3["label"], "c"); + assert.isNull(await r.getNext()); + } finally { + conn.unregisterStream("mystream"); + } + }); + + it("should LOAD FROM stream with object rows (column order from schema)", async function () { + async function* objectRowSource() { + yield { id: 10, label: "x" }; + yield { label: "y", id: 20 }; + } + await conn.registerStream("objstream", objectRowSource(), { + columns: [ + { name: "id", type: "INT64" }, + { name: "label", type: "STRING" }, + ], + }); + try { + const result = await conn.query("LOAD FROM objstream RETURN *"); + const r = Array.isArray(result) ? result[0] : result; + const row1 = await r.getNext(); + assert.isNotNull(row1, "expected first row from stream"); + assert.equal(row1["id"], 10); + assert.equal(row1["label"], "x"); + const row2 = await r.getNext(); + assert.isNotNull(row2, "expected second row from stream"); + assert.equal(row2["id"], 20); + assert.equal(row2["label"], "y"); + assert.isNull(await r.getNext()); + } finally { + conn.unregisterStream("objstream"); + } + }); + + it("should unregisterStream by name", async function () { + async function* empty() { + if (false) yield []; + } + await conn.registerStream("tmpstream", empty(), { + columns: [{ name: "x", type: "INT64" }], + }); + conn.unregisterStream("tmpstream"); + try { + await conn.query("LOAD FROM tmpstream RETURN *"); + assert.fail("Expected error when loading from unregistered stream."); + } catch (e) { + assert.include(e.message, "not in scope"); + } + }); +}); diff --git a/tools/nodejs_api/test/test_resilience.js b/tools/nodejs_api/test/test_resilience.js new file mode 100644 index 0000000000..252396d4fd --- /dev/null +++ b/tools/nodejs_api/test/test_resilience.js @@ -0,0 +1,188 @@ +"use strict"; + +const { assert } = require("chai"); +const tmp = require("tmp"); +const path = require("path"); + +/** + * Resilience tests: close connection/database during or after operations. + * Goal: no crashes (SIGSEGV, native abort); all failures must surface as JS errors. + */ +function withTempDb(fn) { + return async function () { + const tmpPath = await new Promise((resolve, reject) => { + tmp.dir({ unsafeCleanup: true }, (err, p, _) => { + if (err) return reject(err); + return resolve(p); + }); + }); + const dbPath = path.join(tmpPath, "db.kz"); + const testDb = new lbug.Database(dbPath, 1 << 26 /* 64MB */); + await testDb.init(); + const testConn = new lbug.Connection(testDb); + await testConn.init(); + try { + await fn.call(this, testDb, testConn); + } finally { + if (!testDb._isClosed) await testDb.close().catch(() => {}); + if (!testConn._isClosed) await testConn.close().catch(() => {}); + } + }; +} + +describe("Resilience (close during/after use)", function () { + this.timeout(15000); + + it("query rejects when connection is closed while query is in flight", withTempDb(async (testDb, testConn) => { + const longQuery = "UNWIND range(1, 50000) AS x UNWIND range(1, 5000) AS y RETURN count(*)"; + const queryPromise = testConn.query(longQuery); + await new Promise((r) => setTimeout(r, 120)); + testConn.closeSync(); + const timeoutMs = 5000; + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(`Expected query to reject within ${timeoutMs}ms when connection was closed (timed out).`)), timeoutMs); + }); + try { + await Promise.race([queryPromise, timeoutPromise]); + assert.fail("Expected query to reject when connection was closed during execution."); + } catch (err) { + if ((err.message || "").includes("timed out")) throw err; + assert.instanceOf(err, Error); + const msg = (err.message || "").toLowerCase(); + const ok = msg.includes("closed") || msg.includes("not allowed") || msg.includes("runtime"); + assert.isTrue(ok, `Expected error about closed/not allowed, got: ${err.message}`); + } + })); + + it("query rejects when database is closed while query is in flight", withTempDb(async (testDb, testConn) => { + const longQuery = "UNWIND range(1, 50000) AS x UNWIND range(1, 5000) AS y RETURN count(*)"; + const queryPromise = testConn.query(longQuery); + await new Promise((r) => setTimeout(r, 120)); + testDb.closeSync(); + const timeoutMs = 5000; + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(`Expected query to reject within ${timeoutMs}ms when database was closed (timed out).`)), timeoutMs); + }); + try { + await Promise.race([queryPromise, timeoutPromise]); + assert.fail("Expected query to reject when database was closed during execution."); + } catch (err) { + if ((err.message || "").includes("timed out")) throw err; + assert.instanceOf(err, Error); + const msg = (err.message || "").toLowerCase(); + const ok = msg.includes("closed") || msg.includes("not allowed") || msg.includes("runtime"); + assert.isTrue(ok, `Expected error about closed/not allowed, got: ${err.message}`); + } + })); + + it("getNext() after connection closed throws and does not crash", withTempDb(async (testDb, testConn) => { + const res = await testConn.query("RETURN 1 AS x"); + const row = await res.getNext(); + assert.equal(row.x, 1); + testConn.closeSync(); + try { + await res.getNext(); + assert.fail("Expected getNext() to throw after connection closed."); + } catch (err) { + assert.instanceOf(err, Error); + const msg = (err.message || "").toLowerCase(); + assert.isTrue(msg.includes("closed") || msg.includes("not allowed"), `Expected closed/not allowed, got: ${err.message}`); + } + })); + + it("hasNext() after connection closed throws and does not crash", withTempDb(async (testDb, testConn) => { + const res = await testConn.query("RETURN 1 AS x"); + assert.isTrue(res.hasNext()); + testConn.closeSync(); + try { + res.hasNext(); + assert.fail("Expected hasNext() to throw after connection closed."); + } catch (err) { + assert.instanceOf(err, Error); + const msg = (err.message || "").toLowerCase(); + assert.isTrue(msg.includes("closed") || msg.includes("not allowed"), `Expected closed/not allowed, got: ${err.message}`); + } + })); + + it("getNext() after database closed throws and does not crash", withTempDb(async (testDb, testConn) => { + const res = await testConn.query("RETURN 1 AS x"); + await res.getNext(); + testDb.closeSync(); + try { + await res.getNext(); + assert.fail("Expected getNext() to throw after database closed."); + } catch (err) { + assert.instanceOf(err, Error); + const msg = (err.message || "").toLowerCase(); + assert.isTrue(msg.includes("closed") || msg.includes("not allowed"), `Expected closed/not allowed, got: ${err.message}`); + } + })); + + it("hasNext() after database closed throws and does not crash", withTempDb(async (testDb, testConn) => { + const res = await testConn.query("RETURN 1 AS x"); + testDb.closeSync(); + try { + res.hasNext(); + assert.fail("Expected hasNext() to throw after database closed."); + } catch (err) { + assert.instanceOf(err, Error); + const msg = (err.message || "").toLowerCase(); + assert.isTrue(msg.includes("closed") || msg.includes("not allowed"), `Expected closed/not allowed, got: ${err.message}`); + } + })); + + it("registerStream then close connection then query throws before running", withTempDb(async (testDb, testConn) => { + async function* gen() { + yield [1]; + } + await testConn.registerStream("s", gen(), { columns: [{ name: "x", type: "INT64" }] }); + testConn.closeSync(); + try { + await testConn.query("LOAD FROM s RETURN *"); + assert.fail("Expected query to throw when connection is already closed."); + } catch (err) { + assert.instanceOf(err, Error); + assert.include((err.message || "").toLowerCase(), "closed"); + } + })); + + it("close connection while iterating result: second getNext throws", withTempDb(async (testDb, testConn) => { + const res = await testConn.query("UNWIND [1,2,3] AS x RETURN x"); + const a = await res.getNext(); + assert.equal(a.x, 1); + testConn.closeSync(); + try { + await res.getNext(); + assert.fail("Expected getNext() to throw after connection closed mid-iteration."); + } catch (err) { + assert.instanceOf(err, Error); + const msg = (err.message || "").toLowerCase(); + assert.isTrue(msg.includes("closed") || msg.includes("not allowed"), `Expected closed/not allowed, got: ${err.message}`); + } + })); + + it("query after connection closed throws immediately (no native call)", async function () { + const testConn = new lbug.Connection(db); + await testConn.init(); + await testConn.close(); + try { + await testConn.query("RETURN 1"); + assert.fail("Expected query to throw when connection is closed."); + } catch (err) { + assert.equal(err.message, "Connection is closed."); + } + }); + + it("getNextSync after database closed throws", withTempDb(async (testDb, testConn) => { + const res = await testConn.query("RETURN 1 AS x"); + testDb.closeSync(); + try { + res.getNextSync(); + assert.fail("Expected getNextSync() to throw after database closed."); + } catch (err) { + assert.instanceOf(err, Error); + const msg = (err.message || "").toLowerCase(); + assert.isTrue(msg.includes("closed") || msg.includes("not allowed"), `Expected closed/not allowed, got: ${err.message}`); + } + })); +});