diff --git a/.gitignore b/.gitignore index 5a84aa4078..fa1b3163c6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,9 @@ +tree-sitter/ +tree-sitter-cypher/ + .idea/ .vscode +.cursor .vs bazel-* .clwb diff --git a/Makefile b/Makefile index 22391806e4..03bdb5ed71 100644 --- a/Makefile +++ b/Makefile @@ -187,7 +187,7 @@ nodejs-deps: cd tools/nodejs_api && npm install --include=dev nodejstest: nodejs - cd tools/nodejs_api && npm test + cd tools/nodejs_api && node copy_src_to_build.js && npm test nodejstest-deps: nodejs-deps nodejstest diff --git a/tools/nodejs_api/README.md b/tools/nodejs_api/README.md index 086c63eb2f..d627b96fd8 100644 --- a/tools/nodejs_api/README.md +++ b/tools/nodejs_api/README.md @@ -7,10 +7,24 @@ A high-performance graph database for knowledge-intensive applications. This Nod ## 📦 Installation +**From npm (if published):** + ```bash npm install lbug ``` +**From GitHub** (monorepo; the Node package lives in `tools/nodejs_api`): + +- **pnpm** (v9+), subdirectory is supported: + + ```bash + pnpm add lbug@github:LadybugDB/ladybug#path:tools/nodejs_api + ``` + + On install, the package will build the native addon from source (needs CMake and a C++20 compiler). + +- **npm**: no built-in subdirectory install. Either use a **local path** after cloning and building (see [Build and use in other projects](#-build-and-use-in-other-projects-local)), or a tarball from [GitPkg](https://gitpkg.vercel.app/) (e.g. `https://gitpkg.vercel.app/LadybugDB/ladybug/tools/nodejs_api?main`). + --- ## 🚀 Quick Start @@ -49,10 +63,8 @@ const main = async () => { // Run a query const result = await conn.query("MATCH (u:User) RETURN u.name, u.age;"); - // Fetch all results + // Consume results (choose one style) const rows = await result.getAll(); - - // Output results for (const row of rows) { console.log(row); } @@ -68,12 +80,182 @@ main().catch(console.error); The `lbug` package exposes the following primary classes: -* `Database` – Initializes a database from a file path. -* `Connection` – Executes queries on a connected database. -* `QueryResult` – Provides methods like `getAll()` to retrieve results. +* **Database** – `new Database(path, bufferPoolSize?, ...)`. Initialize with `init()` / `initSync()` (optional; done on first use). When the file is locked, **async init() retries for up to 5s** (configurable: last ctor arg `openLockRetryMs`; set `0` to fail immediately). Close with `close()`. +* **Connection** – `new Connection(database, numThreads?)`. Run Cypher with `query(statement)` or `prepare(statement)` then `execute(preparedStatement, params)`. Use `transaction(fn)` for a single write transaction, `ping()` for liveness checks. **`getNumNodes(nodeName)`** and **`getNumRels(relName)`** return row counts for node/rel tables. Use `registerStream(name, source, { columns })` to load data from an AsyncIterable via `LOAD FROM name`; `unregisterStream(name)` when done. Configure with `setQueryTimeout(ms)`, `setMaxNumThreadForExec(n)`. +* **QueryResult** – Returned by `query()` / `execute()`. Consume with `getAll()`, `getNext()` / `hasNext()`, **async iteration** (`for await...of`), or **`toStream()`** (Node.js `Readable`). Use **`toString()`** for a string representation (header + rows; useful for debugging). Metadata: `getColumnNames()`, `getColumnDataTypes()`, `getQuerySummary()`. Call `close()` when done (optional if fully consumed). +* **PreparedStatement** – Created by `conn.prepare(statement)`. Execute with `conn.execute(preparedStatement, params)`. Reuse for parameterized queries. +* **Pool** – `createPool({ databasePath, maxSize, ... })` returns a connection pool. Use **`pool.run(conn => ...)`** (recommended) or `acquire()` / `release(conn)`; call **`pool.close()`** when done. Both CommonJS (`require`) and ES Modules (`import`) are fully supported. +### Consuming query results + +```js +const result = await conn.query("MATCH (n:User) RETURN n.name LIMIT 1000"); + +// Option 1: get all rows (loads into memory) +const rows = await result.getAll(); + +// Option 2: row by row (async) +while (result.hasNext()) { + const row = await result.getNext(); + console.log(row); +} + +// Option 3: async iterator (streaming, no full materialization) +for await (const row of result) { + console.log(row); +} + +// Option 4: Node.js Readable stream (e.g. for .pipe()) +const stream = result.toStream(); +stream.on("data", (row) => console.log(row)); + +// Option 5: string representation (e.g. for debugging) +console.log(result.toString()); +``` + +### Table counts + +After creating node/rel tables and loading data, you can get row counts: + +```js +conn.initSync(); // or await conn.init() +const numUsers = conn.getNumNodes("User"); +const numFollows = conn.getNumRels("Follows"); +``` + +### Connection pool + +Use **`createPool(options)`** to get a pool of connections (one shared `Database`, up to `maxSize` connections). Prefer **`pool.run(fn)`**: it acquires a connection, runs `fn(conn)`, and releases in `finally` (on success or throw), so you never leak a connection. + +**Options:** `maxSize` (required), `databasePath`, `databaseOptions` (same shape as `Database` constructor), `minSize` (default 0), `acquireTimeoutMillis` (default 0 = wait forever), `validateOnAcquire` (default false; if true, `conn.ping()` before hand-out). + +**Example (recommended: `run`):** + +```js +import { createPool } from "lbug"; + +const pool = createPool({ databasePath: "./mydb", maxSize: 10 }); + +const rows = await pool.run(async (conn) => { + const result = await conn.query("MATCH (u:User) RETURN u.name LIMIT 5"); + const rows = await result.getAll(); + result.close(); + return rows; +}); +console.log(rows); + +await pool.close(); +``` + +**Manual acquire/release:** If you need the same connection for multiple operations, use `acquire()` and always call `release(conn)` in a `finally` block so the connection is returned even on throw. + +```js +const conn = await pool.acquire(); +try { + await conn.query("..."); + // ... +} finally { + pool.release(conn); +} +``` + +When shutting down, call **`pool.close()`**: it rejects new and pending `acquire()`, then closes all connections and the database. + +### Transactions + +**Manual:** Run `BEGIN TRANSACTION`, then your queries, then `COMMIT` or `ROLLBACK`. On error, call `ROLLBACK` before continuing. + +```js +await conn.query("BEGIN TRANSACTION"); +await conn.query("CREATE NODE TABLE Nodes(id INT64, PRIMARY KEY(id))"); +await conn.query('COPY Nodes FROM "data.csv"'); +await conn.query("COMMIT"); +// or on error: await conn.query("ROLLBACK"); +``` + +**Read-only transaction:** `BEGIN TRANSACTION READ ONLY` then queries, then `COMMIT` / `ROLLBACK`. + +**Wrapper:** One write transaction with automatic commit on success and rollback on throw: + +```js +await conn.transaction(async () => { + await conn.query("CREATE NODE TABLE Nodes(id INT64, PRIMARY KEY(id))"); + await conn.query('COPY Nodes FROM "data.csv"'); + // commit happens automatically; on throw, rollback then rethrow +}); +``` + +### Loading data from a Node.js stream + +You can feed data from an **AsyncIterable** (generator, async generator, or any `Symbol.asyncIterator`) into Cypher using **scan replacement**: register a stream by name, then use `LOAD FROM name` in your query. Rows are pulled from JavaScript on demand during execution. + +**API:** + +* **`conn.registerStream(name, source, options)`** (async) + * `name` – string used in Cypher: `LOAD FROM name RETURN ...` + * `source` – AsyncIterable of rows. Each row is an **array** of column values (same order as `options.columns`) or an **object** keyed by column name. + * `options.columns` – **required**. Schema: array of `{ name: string, type: string }`. Supported types: `INT64`, `INT32`, `INT16`, `INT8`, `UINT64`, `UINT32`, `DOUBLE`, `FLOAT`, `STRING`, `BOOL`, `DATE`, `TIMESTAMP`. + +* **`conn.unregisterStream(name)`** + Unregisters the source so the name can be reused or to avoid leaving stale entries. Call after the query (or when done with the stream). + +**Example:** + +```js +async function* generateRows() { + yield [1, "Alice"]; + yield [2, "Bob"]; + yield [3, "Carol"]; +} + +await conn.registerStream("users", generateRows(), { + columns: [ + { name: "id", type: "INT64" }, + { name: "name", type: "STRING" }, + ], +}); + +const result = await conn.query("LOAD FROM users RETURN *"); +for await (const row of result) { + console.log(row); // { id: 1, name: "Alice" }, ... +} + +conn.unregisterStream("users"); +``` + +You can combine the stream with other Cypher: e.g. `LOAD FROM stream RETURN * WHERE col > 0`, or `COPY MyTable FROM (LOAD FROM stream RETURN *)`. + +### Database locked + +Only one process can open the same database path for writing. If the file is already locked, **async `init()` retries for up to 5 seconds** by default (grace period), then throws. You can tune or disable this: + +- **Default**: `new Database("./my.db")` — last ctor arg `openLockRetryMs` defaults to `5000` (retry for up to 5s on lock). +- **No retry**: `new Database("./my.db", 0, true, false, 0, true, -1, true, true, 0)` or pass `openLockRetryMs = 0` as the 10th argument to fail immediately. +- **Longer grace**: e.g. `openLockRetryMs = 3000` to wait up to 3s. + +The error has **`code === 'LBUG_DATABASE_LOCKED'`** so you can catch and handle it if the grace period wasn’t enough: + +```js +import { Database, Connection, LBUG_DATABASE_LOCKED } from "lbug"; + +const db = new Database("./my.db"); // already retries ~5s on lock +try { + await db.init(); +} catch (err) { + if (err.code === LBUG_DATABASE_LOCKED) { + console.error("Database still locked after grace period."); + } + throw err; +} +const conn = new Connection(db); +``` + +Use **read-only** mode for concurrent readers: `new Database(path, undefined, undefined, true)` so multiple processes can open the same DB for read. + +See [docs/database_locked.md](docs/database_locked.md) for how other systems handle this and best practices. + --- ## 🛠️ Local Development (for Contributors) @@ -96,6 +278,94 @@ npm run build npm test ``` +When developing from the **monorepo root**, build the native addon first so tests see the latest C++ code: + +```bash +# From repo root (D:\prj\ladybug or similar) +make nodejs +# Or: cmake --build build/release --target lbugjs +# Then from tools/nodejs_api: +cd tools/nodejs_api && npm test +``` + +--- + +## 🔧 Build and use in other projects (local) + +To use the Node.js API from the Ladybug repo in another project without publishing to npm: + +1. **Build the addon** (from the Ladybug repo root): + + ```bash + make nodejs + ``` + + Or from this directory: + + ```bash + npm run build + ``` + + This compiles the native addon into `build/lbugjs.node` and copies JS and types. + +2. **In your other project**, add a file dependency in `package.json`: + + ```json + "dependencies": { + "lbug": "file:../path/to/ladybug/tools/nodejs_api" + } + ``` + + Then run `npm install`. After that, `require("lbug")` or `import ... from "lbug"` will use your local build. + +3. **Optional:** to pack and install a tarball instead: + + ```bash + cd /path/to/ladybug/tools/nodejs_api + npm run build + npm pack + ``` + + In the other project: `npm install /path/to/ladybug/tools/nodejs_api/lbug-0.0.1.tgz`. + +### Prebuilt in your fork (install from GitHub without building) + +If you install from GitHub (e.g. `pnpm add lbug@github:user/ladybug#path:tools/nodejs_api`), the package runs `install.js`: if it finds a prebuilt binary, it uses it and does not build from source. To ship a prebuilt in your fork: + +1. **Build once** in your clone (from repo root): + + ```bash + make nodejs + ``` + +2. **Create the prebuilt file** (name = `lbugjs--.node`): + + - Windows x64: copy `tools/nodejs_api/build/lbugjs.node` → `tools/nodejs_api/prebuilt/lbugjs-win32-x64.node` + - Linux x64: `lbugjs-linux-x64.node` + - macOS x64: `lbugjs-darwin-x64.node`, arm64: `lbugjs-darwin-arm64.node` + + Example (from repo root). **Windows (PowerShell):** + + ```powershell + New-Item -ItemType Directory -Force -Path tools/nodejs_api/prebuilt + Copy-Item tools/nodejs_api/build/lbugjs.node tools/nodejs_api/prebuilt/lbugjs-win32-x64.node + ``` + + **Linux/macOS:** + + ```bash + mkdir -p tools/nodejs_api/prebuilt + cp tools/nodejs_api/build/lbugjs.node tools/nodejs_api/prebuilt/lbugjs-$(node -p "process.platform")-$(node -p "process.arch").node + ``` + +3. **Commit and push** the `prebuilt/` folder. Then anyone (or you in another project) can do: + + ```bash + pnpm add lbug@github:YOUR_USERNAME/ladybug#path:tools/nodejs_api + ``` + + and the addon will be used from prebuilt without a local build. + --- ## 📦 Packaging and Binary Distribution diff --git a/tools/nodejs_api/build.js b/tools/nodejs_api/build.js index d326300260..b1b93bae43 100644 --- a/tools/nodejs_api/build.js +++ b/tools/nodejs_api/build.js @@ -3,6 +3,9 @@ const path = require("path"); const { execSync } = require("child_process"); const SRC_PATH = path.resolve(__dirname, "../.."); +const NODEJS_API = path.resolve(__dirname, "."); +const BUILD_DIR = path.join(NODEJS_API, "build"); +const SRC_JS_DIR = path.join(NODEJS_API, "src_js"); const THREADS = require("os").cpus().length; console.log(`Using ${THREADS} threads to build Lbug.`); @@ -12,3 +15,19 @@ execSync(`make nodejs NUM_THREADS=${THREADS}`, { cwd: SRC_PATH, stdio: "inherit", }); + +// Ensure build/ has latest JS from src_js (CMake copies at configure time only) +if (fs.existsSync(SRC_JS_DIR) && fs.existsSync(BUILD_DIR)) { + const files = fs.readdirSync(SRC_JS_DIR); + for (const name of files) { + if (name.endsWith(".js") || name.endsWith(".mjs") || name.endsWith(".d.ts")) { + fs.copyFileSync(path.join(SRC_JS_DIR, name), path.join(BUILD_DIR, name)); + } + } + // So package root has types when used as file: dependency + const dts = path.join(BUILD_DIR, "lbug.d.ts"); + if (fs.existsSync(dts)) { + fs.copyFileSync(dts, path.join(NODEJS_API, "lbug.d.ts")); + } + console.log("Copied src_js to build."); +} diff --git a/tools/nodejs_api/copy_src_to_build.js b/tools/nodejs_api/copy_src_to_build.js new file mode 100644 index 0000000000..8ebe1581c4 --- /dev/null +++ b/tools/nodejs_api/copy_src_to_build.js @@ -0,0 +1,22 @@ +/** + * Copies src_js/*.js, *.mjs, *.d.ts into build/ so tests run with the latest JS + * after "make nodejs" (which only copies at cmake configure time). + * Run from tools/nodejs_api. + */ +const fs = require("fs"); +const path = require("path"); + +const srcDir = path.join(__dirname, "src_js"); +const buildDir = path.join(__dirname, "build"); + +if (!fs.existsSync(buildDir)) { + console.warn("copy_src_to_build: build/ missing, run make nodejs first."); + process.exit(0); +} + +const re = /\.(js|mjs|d\.ts)$/; +const files = fs.readdirSync(srcDir).filter((n) => re.test(n)); +for (const name of files) { + fs.copyFileSync(path.join(srcDir, name), path.join(buildDir, name)); +} +console.log("Copied", files.length, "files from src_js to build."); diff --git a/tools/nodejs_api/docs/database_locked.md b/tools/nodejs_api/docs/database_locked.md new file mode 100644 index 0000000000..164adc3f9c --- /dev/null +++ b/tools/nodejs_api/docs/database_locked.md @@ -0,0 +1,40 @@ +# Database locked + +## When it happens + +The database file is locked when: + +- Another process has already opened the same path for read-write (e.g. another Node app, the Ladybug shell, or a backup). +- You open the same path twice in one process (e.g. two `Database` instances to the same path) and both try to write. + +Opening is done at the first use: `db.init()`, `db.initSync()`, or the first `conn.query()` on that database. If the OS file lock cannot be acquired, the native layer throws and the Node API surfaces it as an **Error with `code === 'LBUG_DATABASE_LOCKED'`**. + +## How other systems handle it + +| System | Approach | +|----------|----------| +| **SQLite** | `busy_timeout` (e.g. 5 seconds): block until lock is released or timeout, then return `SQLITE_BUSY`. Apps often retry with exponential backoff. | +| **DuckDB** | Open fails immediately if locked; application retries with backoff. | +| **LMDB** | Single writer; readers use `MDB_NOLOCK` or shared lock. Writers get exclusive lock. | +| **RocksDB** | Options for concurrent access; single process or client–server. | + +Common patterns: + +1. **Fail fast** — return a clear error (e.g. “database locked”) so the app can show a message or retry. +2. **Retry with backoff** — in application code: catch the error, wait (e.g. 50 ms, 100 ms, 200 ms), try again, then give up. +3. **Block with timeout** — wait up to N ms for the lock (requires support in the engine; Ladybug currently uses “fail immediately”). +4. **Read-only for readers** — open in read-only mode so multiple processes can read; only one writer. + +## What the Node API does + +- **Grace period (async init only)**: When you open a database with async `init()` (or the first `query()`), the driver **retries for up to 5 seconds** by default if the file is locked. So short-lived contention (e.g. MCP server or another tool briefly holding the lock) often succeeds without you doing anything. Configure with the last constructor argument `openLockRetryMs` (default `5000`; set `0` to fail immediately). +- **Clear error**: After the grace period or when retry is disabled, you get an Error whose message includes “Could not set lock on file” and a link to the concurrency docs. +- **Error code**: The error is normalized so `err.code === 'LBUG_DATABASE_LOCKED'`. You can import `LBUG_DATABASE_LOCKED` from `lbug` and catch it if you need custom retry or messaging. +- **Sync init**: `initSync()` does not retry; it fails immediately on lock (no blocking wait in the driver). + +## Best practices + +1. **One writer per path** — avoid opening the same on-disk database for write from more than one process at a time. +2. **Concurrent readers** — use `new Database(path, undefined, undefined, true)` (read-only) so multiple processes can read the same DB. +3. **Retry with backoff** — if you expect short-lived contention (e.g. restart or another tool), catch `LBUG_DATABASE_LOCKED`, wait, and retry a few times. +4. **Close when done** — call `db.close()` so the lock is released for other processes. diff --git a/tools/nodejs_api/docs/execution_chain_analysis.md b/tools/nodejs_api/docs/execution_chain_analysis.md new file mode 100644 index 0000000000..8e633201bc --- /dev/null +++ b/tools/nodejs_api/docs/execution_chain_analysis.md @@ -0,0 +1,65 @@ +# LOAD FROM stream: Execution Chain (reference & recommendations) + +## Execution chain diagram + +```mermaid +%%{init: {'flowchart': {'defaultRenderer': 'elk', 'elk': {'direction': 'DOWN'}}}}%% +flowchart TB + subgraph JS["JS (main thread)"] + direction TB + A["query('LOAD FROM name RETURN *')"] + B["registerStream: getChunk(requestId) → pending.push; runConsumer via setImmediate"] + C["runConsumer: sort pending, for each id take it.next(), returnChunk(id, rows, done)"] + D["AsyncIterator: it.next() → yield rows"] + A --> B + B --> C + C --> D + end + + subgraph CppAddon["C++ addon (Node worker thread)"] + direction TB + E["tableFunc: mutex, nextRequestId(), setChunkRequest, BlockingCall(getChunk)"] + F["wait reqPtr->cv until filled"] + G["returnChunkFromJS: req->rows, filled=true, cv.notify_one"] + H["Copy rows to output.dataChunk, return cap"] + E --> F + G --> F + F --> H + end + + subgraph Engine["Engine (single task thread, canParallelFunc=false)"] + direction TB + I["getNextTuple → getNextTuplesInternal"] + J["tableFunc(input, output) → numTuplesScanned"] + K["FactorizedTable accumulates chunks"] + L["MaterializedQueryResult + FactorizedTableIterator"] + I --> J + J --> K + K --> L + end + + J --> E + E --> B + C --> G + H --> J + L --> M["JS hasNext / getNext"] + M --> A +``` + +--- + +## Useful observations + +- **Order**: With `canParallelFunc = false`, one engine thread calls `tableFunc` sequentially. Request IDs are assigned under mutex; JS `runConsumer` sorts `pending` and serves chunks by `requestId`, so iterator order is preserved. +- **End of stream**: Engine calls `tableFunc` until it returns 0. JS sends `returnChunk(id, [], true)` when the iterator is done; C++ returns 0 and the engine stops. No extra call after 0. +- **getNext contract**: Core `getNext()` throws if `!hasNext()`. Addon always checks `hasNext()` before `getNext()` and returns `null` when exhausted so that JS API matches `getNext(): Promise`. + +--- + +## Recommendations for the future + +1. **Keep `canParallelFunc = false`** for the node stream table function. Enabling parallelism would require a deterministic merge of chunks by requestId on the engine side; until then, single-thread keeps order and avoids subtle bugs. +2. **Any new code path that reads rows** (e.g. another language binding or helper) must guard with `hasNext()` before `getNext()`; core will throw otherwise. +3. **Mutex in `tableFunc`**: Currently redundant with single-thread execution but harmless. If parallelism is ever introduced, either remove the mutex and solve ordering in the engine or keep it and document that the stream source is intentionally serialized. +4. **Tests**: Prefer iterating with `hasNext()` + `getNext()` and asserting `getNext()` returns `null` exactly when `hasNext()` becomes false, to lock the contract (see `test_query_result.js`). +5. **Rebuild and full test run** (e.g. `register_stream` + `query_result`) after any change in the addon or engine table function path. diff --git a/tools/nodejs_api/examples/README.md b/tools/nodejs_api/examples/README.md new file mode 100644 index 0000000000..63ace4baf5 --- /dev/null +++ b/tools/nodejs_api/examples/README.md @@ -0,0 +1,11 @@ +# Examples + +Run from `tools/nodejs_api` (after `make nodejs` or `npm run build`): + +```bash +node examples/quickstart.mjs +node examples/stream-load.mjs +``` + +- **quickstart.mjs** — In-memory DB, create table, load data from a stream via `COPY FROM (LOAD FROM ...)`, then query. +- **stream-load.mjs** — Register an async iterable and consume it with `LOAD FROM name RETURN *`. diff --git a/tools/nodejs_api/examples/quickstart.mjs b/tools/nodejs_api/examples/quickstart.mjs new file mode 100644 index 0000000000..d78ec07f9a --- /dev/null +++ b/tools/nodejs_api/examples/quickstart.mjs @@ -0,0 +1,32 @@ +/** + * Quickstart: in-memory database, create schema, load from stream, query. + * Run from tools/nodejs_api: node examples/quickstart.mjs + */ +import { Database, Connection } from "lbug"; + +async function* userRows() { + yield ["Alice", 30]; + yield ["Bob", 25]; +} + +const db = new Database(":memory:"); +const conn = new Connection(db); + +await conn.query(` + CREATE NODE TABLE User(name STRING, age INT64, PRIMARY KEY (name)); +`); + +await conn.registerStream("users", userRows(), { + columns: [ + { name: "name", type: "STRING" }, + { name: "age", type: "INT64" }, + ], +}); +await conn.query("COPY User FROM (LOAD FROM users RETURN *)"); +conn.unregisterStream("users"); + +const result = await conn.query("MATCH (u:User) RETURN u.name, u.age;"); +const rows = await result.getAll(); +console.log(rows); + +await db.close(); diff --git a/tools/nodejs_api/examples/stream-load.mjs b/tools/nodejs_api/examples/stream-load.mjs new file mode 100644 index 0000000000..9adc84f8a9 --- /dev/null +++ b/tools/nodejs_api/examples/stream-load.mjs @@ -0,0 +1,29 @@ +/** + * Load data from a JavaScript async iterable via LOAD FROM. + * Run from tools/nodejs_api: node examples/stream-load.mjs + */ +import { Database, Connection } from "lbug"; + +async function* generateRows() { + yield [1, "Alice"]; + yield [2, "Bob"]; + yield [3, "Carol"]; +} + +const db = new Database(":memory:"); +const conn = new Connection(db); + +await conn.registerStream("users", generateRows(), { + columns: [ + { name: "id", type: "INT64" }, + { name: "name", type: "STRING" }, + ], +}); + +const result = await conn.query("LOAD FROM users RETURN *"); +for await (const row of result) { + console.log(row); +} + +conn.unregisterStream("users"); +await db.close(); diff --git a/tools/nodejs_api/index.js b/tools/nodejs_api/index.js new file mode 100644 index 0000000000..04ea8a1618 --- /dev/null +++ b/tools/nodejs_api/index.js @@ -0,0 +1,4 @@ +"use strict"; + +// After `make nodejs` or `npm run build`, entry point is build/ +module.exports = require("./build"); diff --git a/tools/nodejs_api/index.mjs b/tools/nodejs_api/index.mjs new file mode 100644 index 0000000000..54fd3b6774 --- /dev/null +++ b/tools/nodejs_api/index.mjs @@ -0,0 +1,12 @@ +export { + default, + Database, + Connection, + PreparedStatement, + QueryResult, + createPool, + Pool, + LBUG_DATABASE_LOCKED, + VERSION, + STORAGE_VERSION, +} from "./build/index.mjs"; diff --git a/tools/nodejs_api/install.js b/tools/nodejs_api/install.js index 6a010b335a..7df6e140fc 100644 --- a/tools/nodejs_api/install.js +++ b/tools/nodejs_api/install.js @@ -8,51 +8,72 @@ const process = require("process"); const isNpmBuildFromSourceSet = process.env.npm_config_build_from_source; const platform = process.platform; const arch = process.arch; + +// Skip when already built (e.g. local dev after make nodejs) +if (fsCallback.existsSync(path.join(__dirname, "build", "lbugjs.node"))) { + process.exit(0); +} + const prebuiltPath = path.join( __dirname, "prebuilt", `lbugjs-${platform}-${arch}.node` ); +const buildDir = path.join(__dirname, "build"); +const srcJsDir = path.join(__dirname, "src_js"); +const lbugSourceDir = path.join(__dirname, "lbug-source"); + // Check if building from source is forced if (isNpmBuildFromSourceSet) { console.log( "The NPM_CONFIG_BUILD_FROM_SOURCE environment variable is set. Building from source." ); } -// Check if prebuilt binaries are available +// Prebuilt available + git-clone layout (src_js present, no lbug-source): use prebuilt and copy src_js → build/ +else if (fsCallback.existsSync(prebuiltPath) && fsCallback.existsSync(srcJsDir)) { + console.log("Prebuilt binary is available (git clone layout)."); + if (!fsCallback.existsSync(buildDir)) { + fsCallback.mkdirSync(buildDir, { recursive: true }); + } + fs.copyFileSync(prebuiltPath, path.join(buildDir, "lbugjs.node")); + const jsFiles = fs.readdirSync(srcJsDir).filter((file) => { + return file.endsWith(".js") || file.endsWith(".mjs") || file.endsWith(".d.ts"); + }); + for (const file of jsFiles) { + fs.copyFileSync(path.join(srcJsDir, file), path.join(buildDir, file)); + } + console.log("Done! Prebuilt + JS copied to build/."); + process.exit(0); +} +// Prebuilt available + tarball layout (lbug-source present): copy to root (legacy publish flow) else if (fsCallback.existsSync(prebuiltPath)) { console.log("Prebuilt binary is available."); - console.log("Copying prebuilt binary to package directory..."); fs.copyFileSync(prebuiltPath, path.join(__dirname, "lbugjs.node")); - console.log( - `Copied ${prebuiltPath} -> ${path.join(__dirname, "lbugjs.node")}.` - ); - console.log("Copying JS files to package directory..."); - const jsSourceDir = path.join( - __dirname, - "lbug-source", - "tools", - "nodejs_api", - "src_js" - ); + const jsSourceDir = path.join(lbugSourceDir, "tools", "nodejs_api", "src_js"); const jsFiles = fs.readdirSync(jsSourceDir).filter((file) => { return file.endsWith(".js") || file.endsWith(".mjs") || file.endsWith(".d.ts"); }); - console.log("Files to copy: "); - for (const file of jsFiles) { - console.log(" " + file); - } for (const file of jsFiles) { fs.copyFileSync(path.join(jsSourceDir, file), path.join(__dirname, file)); } - console.log("Copied JS files to package directory."); console.log("Done!"); process.exit(0); } else { console.log("Prebuilt binary is not available, building from source..."); } +if (!fsCallback.existsSync(lbugSourceDir)) { + console.error( + "lbug-source/ not found (install from git clone). Add prebuilt binary to prebuilt/lbugjs-" + + platform + + "-" + + arch + + ".node and commit, or install from a full clone and build there." + ); + process.exit(1); +} + // Get number of threads const THREADS = os.cpus().length; console.log(`Using ${THREADS} threads to build Lbug.`); diff --git a/tools/nodejs_api/package.json b/tools/nodejs_api/package.json index f6ff61a8b3..6d162e4ce5 100644 --- a/tools/nodejs_api/package.json +++ b/tools/nodejs_api/package.json @@ -5,14 +5,22 @@ "main": "index.js", "module": "./index.mjs", "types": "./lbug.d.ts", - "exports":{ - ".":{ + "exports": { + ".": { "require": "./index.js", "import": "./index.mjs", "types": "./lbug.d.ts" } }, - "files": ["index.js", "index.mjs", "lbug.d.ts", "lbugjs.node"], + "files": [ + "index.js", + "index.mjs", + "lbug.d.ts", + "lbugjs.node", + "prebuilt", + "src_js", + "install.js" + ], "type": "commonjs", "homepage": "https://ladybugdb.com/", "repository": { @@ -20,6 +28,7 @@ "url": "git+https://github.com/LadybugDB/ladybug.git" }, "scripts": { + "install": "node install.js", "test": "mocha test --timeout 20000", "clean": "node clean.js", "clean-all": "node clean.js all", @@ -37,4 +46,4 @@ "cmake-js": "^7.3.0", "node-addon-api": "^6.0.0" } -} +} \ No newline at end of file diff --git a/tools/nodejs_api/src_cpp/include/node_connection.h b/tools/nodejs_api/src_cpp/include/node_connection.h index caacef92c3..17c94fa7b0 100644 --- a/tools/nodejs_api/src_cpp/include/node_connection.h +++ b/tools/nodejs_api/src_cpp/include/node_connection.h @@ -8,6 +8,7 @@ #include "node_prepared_statement.h" #include "node_progress_bar_display.h" #include "node_query_result.h" +#include "node_scan_replacement.h" #include using namespace lbug::main; @@ -30,15 +31,22 @@ class NodeConnection : public Napi::ObjectWrap { void InitCppConnection(); void SetMaxNumThreadForExec(const Napi::CallbackInfo& info); void SetQueryTimeout(const Napi::CallbackInfo& info); + void Interrupt(const Napi::CallbackInfo& info); Napi::Value ExecuteAsync(const Napi::CallbackInfo& info); Napi::Value QueryAsync(const Napi::CallbackInfo& info); Napi::Value ExecuteSync(const Napi::CallbackInfo& info); Napi::Value QuerySync(const Napi::CallbackInfo& info); void Close(const Napi::CallbackInfo& info); + Napi::Value RegisterStream(const Napi::CallbackInfo& info); + void UnregisterStream(const Napi::CallbackInfo& info); + void ReturnChunk(const Napi::CallbackInfo& info); + Napi::Value GetNumNodes(const Napi::CallbackInfo& info); + Napi::Value GetNumRels(const Napi::CallbackInfo& info); private: std::shared_ptr database; std::shared_ptr connection; + std::unique_ptr streamRegistry; }; class ConnectionInitAsyncWorker : public Napi::AsyncWorker { diff --git a/tools/nodejs_api/src_cpp/include/node_progress_bar_display.h b/tools/nodejs_api/src_cpp/include/node_progress_bar_display.h index 7813820cb0..e777c94af5 100644 --- a/tools/nodejs_api/src_cpp/include/node_progress_bar_display.h +++ b/tools/nodejs_api/src_cpp/include/node_progress_bar_display.h @@ -13,6 +13,8 @@ using namespace common; */ class NodeProgressBarDisplay : public ProgressBarDisplay { public: + ~NodeProgressBarDisplay() override; + void updateProgress(uint64_t queryID, double newPipelineProgress, uint32_t newNumPipelinesFinished) override; diff --git a/tools/nodejs_api/src_cpp/include/node_query_result.h b/tools/nodejs_api/src_cpp/include/node_query_result.h index b9ee4db979..0b07c97b49 100644 --- a/tools/nodejs_api/src_cpp/include/node_query_result.h +++ b/tools/nodejs_api/src_cpp/include/node_query_result.h @@ -37,6 +37,7 @@ class NodeQueryResult : public Napi::ObjectWrap { Napi::Value GetColumnNamesSync(const Napi::CallbackInfo& info); Napi::Value GetQuerySummarySync(const Napi::CallbackInfo& info); Napi::Value GetQuerySummaryAsync(const Napi::CallbackInfo& info); + Napi::Value GetToStringSync(const Napi::CallbackInfo& info); void PopulateColumnNames(); void Close(const Napi::CallbackInfo& info); void Close(); @@ -102,6 +103,7 @@ class NodeQueryResultGetNextAsyncWorker : public Napi::AsyncWorker { try { if (!nodeQueryResult->queryResult->hasNext()) { cppTuple.reset(); + return; } cppTuple = nodeQueryResult->queryResult->getNext(); } catch (const std::exception& exc) { @@ -112,7 +114,7 @@ class NodeQueryResultGetNextAsyncWorker : public Napi::AsyncWorker { inline void OnOK() override { auto env = Env(); if (cppTuple == nullptr) { - Callback().Call({env.Null(), env.Undefined()}); + Callback().Call({env.Null(), env.Null()}); return; } Napi::Object nodeTuple = Napi::Object::New(env); diff --git a/tools/nodejs_api/src_cpp/include/node_scan_replacement.h b/tools/nodejs_api/src_cpp/include/node_scan_replacement.h new file mode 100644 index 0000000000..bd0109005a --- /dev/null +++ b/tools/nodejs_api/src_cpp/include/node_scan_replacement.h @@ -0,0 +1,60 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "function/table/scan_replacement.h" +#include "function/table/table_function.h" +#include + +namespace lbug { +namespace main { +class Connection; +} +namespace common { +class LogicalType; +} +} + +struct NodeStreamChunkRequest { + std::mutex mtx; + std::condition_variable cv; + std::vector> rows; + std::vector columnNames; // schema order for object rows + bool done = false; + bool filled = false; +}; + +struct NodeStreamSourceState { + Napi::ThreadSafeFunction getChunkTsf; + std::vector columnNames; + std::vector columnTypes; +}; + +class NodeStreamRegistry { +public: + void registerSource(const std::string& name, Napi::ThreadSafeFunction tsf, + std::vector columnNames, + std::vector columnTypes); + void unregisterSource(const std::string& name); + std::vector lookup(const std::string& name) const; + std::unique_ptr replace( + std::span handles) const; + + static NodeStreamChunkRequest* getChunkRequest(uint64_t requestId); + static void setChunkRequest(uint64_t requestId, std::unique_ptr req); + static uint64_t nextRequestId(); + +private: + mutable std::mutex mtx_; + std::unordered_map> sources_; +}; + +void addNodeScanReplacement(lbug::main::Connection* connection, NodeStreamRegistry* registry); + +void returnChunkFromJS(uint64_t requestId, Napi::Array rowsNapi, bool done); diff --git a/tools/nodejs_api/src_cpp/include/node_stream_scan.h b/tools/nodejs_api/src_cpp/include/node_stream_scan.h new file mode 100644 index 0000000000..5c7328b6be --- /dev/null +++ b/tools/nodejs_api/src_cpp/include/node_stream_scan.h @@ -0,0 +1,29 @@ +#pragma once + +#include "function/table/bind_data.h" +#include "function/table/table_function.h" + +struct NodeStreamSourceState; // defined in node_scan_replacement.h + +struct NodeStreamScanFunctionData : lbug::function::TableFuncBindData { + std::shared_ptr state; + + NodeStreamScanFunctionData(lbug::binder::expression_vector columns, + std::shared_ptr state) + : TableFuncBindData(std::move(columns), 0), state(std::move(state)) {} + + std::unique_ptr copy() const override { + return std::make_unique(columns, state); + } +}; + +namespace lbug { +namespace function { + +struct NodeStreamScanFunction { + static constexpr const char* name = "NODE_STREAM_SCAN"; + static TableFunction getFunction(); +}; + +} // namespace function +} // namespace lbug diff --git a/tools/nodejs_api/src_cpp/include/node_util.h b/tools/nodejs_api/src_cpp/include/node_util.h index 4ea7a50674..cd27b21371 100644 --- a/tools/nodejs_api/src_cpp/include/node_util.h +++ b/tools/nodejs_api/src_cpp/include/node_util.h @@ -10,10 +10,10 @@ class Util { static Napi::Value ConvertToNapiObject(const Value& value, Napi::Env env); static std::unordered_map> TransformParametersForExec( Napi::Array params); + static Value TransformNapiValue(Napi::Value napiValue); private: static Napi::Object ConvertNodeIdToNapiObject(const nodeID_t& nodeId, Napi::Env env); - static Value TransformNapiValue(Napi::Value napiValue); const static int64_t JS_MAX_SAFE_INTEGER = 9007199254740991; const static int64_t JS_MIN_SAFE_INTEGER = -9007199254740991; }; diff --git a/tools/nodejs_api/src_cpp/node_connection.cpp b/tools/nodejs_api/src_cpp/node_connection.cpp index 404903904f..6ec6b0e65a 100644 --- a/tools/nodejs_api/src_cpp/node_connection.cpp +++ b/tools/nodejs_api/src_cpp/node_connection.cpp @@ -1,11 +1,14 @@ #include "include/node_connection.h" +#include #include #include "include/node_database.h" #include "include/node_query_result.h" +#include "include/node_scan_replacement.h" #include "include/node_util.h" #include "main/lbug.h" +#include "main/storage_driver.h" Napi::Object NodeConnection::Init(Napi::Env env, Napi::Object exports) { Napi::HandleScope scope(env); @@ -19,7 +22,13 @@ Napi::Object NodeConnection::Init(Napi::Env env, Napi::Object exports) { InstanceMethod("querySync", &NodeConnection::QuerySync), InstanceMethod("setMaxNumThreadForExec", &NodeConnection::SetMaxNumThreadForExec), InstanceMethod("setQueryTimeout", &NodeConnection::SetQueryTimeout), - InstanceMethod("close", &NodeConnection::Close)}); + InstanceMethod("interrupt", &NodeConnection::Interrupt), + InstanceMethod("close", &NodeConnection::Close), + InstanceMethod("registerStream", &NodeConnection::RegisterStream), + InstanceMethod("unregisterStream", &NodeConnection::UnregisterStream), + InstanceMethod("returnChunk", &NodeConnection::ReturnChunk), + InstanceMethod("getNumNodes", &NodeConnection::GetNumNodes), + InstanceMethod("getNumRels", &NodeConnection::GetNumRels)}); exports.Set("NodeConnection", t); return exports; @@ -57,6 +66,8 @@ void NodeConnection::InitCppConnection() { this->connection = std::make_shared(database.get()); ProgressBar::Get(*connection->getClientContext()) ->setDisplay(std::make_shared()); + streamRegistry = std::make_unique(); + addNodeScanReplacement(connection.get(), streamRegistry.get()); // After the connection is initialized, we do not need to hold a reference to the database. database.reset(); } @@ -83,9 +94,16 @@ void NodeConnection::SetQueryTimeout(const Napi::CallbackInfo& info) { } } +void NodeConnection::Interrupt(const Napi::CallbackInfo& /* info */) { + if (this->connection) { + this->connection->interrupt(); + } +} + void NodeConnection::Close(const Napi::CallbackInfo& info) { Napi::Env env = info.Env(); Napi::HandleScope scope(env); + streamRegistry.reset(); this->connection.reset(); } @@ -157,3 +175,139 @@ Napi::Value NodeConnection::QueryAsync(const Napi::CallbackInfo& info) { asyncWorker->Queue(); return info.Env().Undefined(); } + +static lbug::common::LogicalType parseColumnType(const std::string& typeStr) { + std::string upper = typeStr; + std::transform(upper.begin(), upper.end(), upper.begin(), ::toupper); + if (upper == "INT64") return lbug::common::LogicalType(lbug::common::LogicalTypeID::INT64); + if (upper == "INT32") return lbug::common::LogicalType(lbug::common::LogicalTypeID::INT32); + if (upper == "INT16") return lbug::common::LogicalType(lbug::common::LogicalTypeID::INT16); + if (upper == "INT8") return lbug::common::LogicalType(lbug::common::LogicalTypeID::INT8); + if (upper == "UINT64") return lbug::common::LogicalType(lbug::common::LogicalTypeID::UINT64); + if (upper == "UINT32") return lbug::common::LogicalType(lbug::common::LogicalTypeID::UINT32); + if (upper == "UINT16") return lbug::common::LogicalType(lbug::common::LogicalTypeID::UINT16); + if (upper == "UINT8") return lbug::common::LogicalType(lbug::common::LogicalTypeID::UINT8); + if (upper == "DOUBLE") return lbug::common::LogicalType(lbug::common::LogicalTypeID::DOUBLE); + if (upper == "FLOAT") return lbug::common::LogicalType(lbug::common::LogicalTypeID::FLOAT); + if (upper == "STRING") return lbug::common::LogicalType(lbug::common::LogicalTypeID::STRING); + if (upper == "BOOL" || upper == "BOOLEAN") + return lbug::common::LogicalType(lbug::common::LogicalTypeID::BOOL); + if (upper == "DATE") return lbug::common::LogicalType(lbug::common::LogicalTypeID::DATE); + if (upper == "TIMESTAMP") + return lbug::common::LogicalType(lbug::common::LogicalTypeID::TIMESTAMP); + throw std::runtime_error("Unsupported column type for registerStream: " + typeStr); +} + +Napi::Value NodeConnection::RegisterStream(const Napi::CallbackInfo& info) { + Napi::Env env = info.Env(); + Napi::HandleScope scope(env); + if (!connection || !streamRegistry) { + Napi::Error::New(env, "Connection not initialized.").ThrowAsJavaScriptException(); + return env.Undefined(); + } + if (info.Length() < 3 || !info[0].IsString() || !info[1].IsFunction() || !info[2].IsArray()) { + Napi::Error::New(env, + "registerStream(name, getChunkCallback, columns): name string, getChunkCallback " + "function(requestId), columns array of { name, type }.") + .ThrowAsJavaScriptException(); + return env.Undefined(); + } + std::string name = info[0].As().Utf8Value(); + Napi::Function getChunkCallback = info[1].As(); + Napi::Array columnsArr = info[2].As(); + std::vector columnNames; + std::vector columnTypes; + for (uint32_t i = 0; i < columnsArr.Length(); i++) { + Napi::Value col = columnsArr.Get(i); + if (!col.IsObject()) continue; + Napi::Object obj = col.As(); + if (!obj.Get("name").IsString() || !obj.Get("type").IsString()) continue; + columnNames.push_back(obj.Get("name").As().Utf8Value()); + columnTypes.push_back( + parseColumnType(obj.Get("type").As().Utf8Value())); + } + if (columnNames.empty()) { + Napi::Error::New(env, "registerStream: at least one column required.").ThrowAsJavaScriptException(); + return env.Undefined(); + } + try { + auto tsf = Napi::ThreadSafeFunction::New( + env, getChunkCallback, "NodeStreamGetChunk", 0, 1); + streamRegistry->registerSource(name, std::move(tsf), std::move(columnNames), + std::move(columnTypes)); + } catch (const std::exception& exc) { + Napi::Error::New(env, std::string(exc.what())).ThrowAsJavaScriptException(); + } + return env.Undefined(); +} + +void NodeConnection::UnregisterStream(const Napi::CallbackInfo& info) { + Napi::Env env = info.Env(); + Napi::HandleScope scope(env); + if (!streamRegistry) return; + if (info.Length() < 1 || !info[0].IsString()) { + Napi::Error::New(env, "unregisterStream(name): name string.").ThrowAsJavaScriptException(); + return; + } + streamRegistry->unregisterSource(info[0].As().Utf8Value()); +} + +void NodeConnection::ReturnChunk(const Napi::CallbackInfo& info) { + Napi::Env env = info.Env(); + if (info.Length() < 3 || !info[0].IsNumber() || !info[1].IsArray() || !info[2].IsBoolean()) { + Napi::Error::New(env, + "returnChunk(requestId, rows, done): requestId number, rows array of rows, done boolean.") + .ThrowAsJavaScriptException(); + return; + } + uint64_t requestId = static_cast(info[0].ToNumber().Int64Value()); + Napi::Array rows = info[1].As(); + bool done = info[2].ToBoolean().Value(); + returnChunkFromJS(requestId, rows, done); +} + +Napi::Value NodeConnection::GetNumNodes(const Napi::CallbackInfo& info) { + Napi::Env env = info.Env(); + Napi::HandleScope scope(env); + if (!connection) { + Napi::Error::New(env, "Connection not initialized.").ThrowAsJavaScriptException(); + return env.Undefined(); + } + if (info.Length() < 1 || !info[0].IsString()) { + Napi::Error::New(env, "getNumNodes(nodeName): nodeName string required.").ThrowAsJavaScriptException(); + return env.Undefined(); + } + try { + Database* db = connection->getClientContext()->getDatabase(); + StorageDriver storageDriver(db); + std::string nodeName = info[0].As().Utf8Value(); + uint64_t count = storageDriver.getNumNodes(nodeName); + return Napi::Number::New(env, static_cast(count)); + } catch (const std::exception& exc) { + Napi::Error::New(env, std::string(exc.what())).ThrowAsJavaScriptException(); + } + return env.Undefined(); +} + +Napi::Value NodeConnection::GetNumRels(const Napi::CallbackInfo& info) { + Napi::Env env = info.Env(); + Napi::HandleScope scope(env); + if (!connection) { + Napi::Error::New(env, "Connection not initialized.").ThrowAsJavaScriptException(); + return env.Undefined(); + } + if (info.Length() < 1 || !info[0].IsString()) { + Napi::Error::New(env, "getNumRels(relName): relName string required.").ThrowAsJavaScriptException(); + return env.Undefined(); + } + try { + Database* db = connection->getClientContext()->getDatabase(); + StorageDriver storageDriver(db); + std::string relName = info[0].As().Utf8Value(); + uint64_t count = storageDriver.getNumRels(relName); + return Napi::Number::New(env, static_cast(count)); + } catch (const std::exception& exc) { + Napi::Error::New(env, std::string(exc.what())).ThrowAsJavaScriptException(); + } + return env.Undefined(); +} diff --git a/tools/nodejs_api/src_cpp/node_progress_bar_display.cpp b/tools/nodejs_api/src_cpp/node_progress_bar_display.cpp index 352775624a..a76dbd953e 100644 --- a/tools/nodejs_api/src_cpp/node_progress_bar_display.cpp +++ b/tools/nodejs_api/src_cpp/node_progress_bar_display.cpp @@ -3,6 +3,14 @@ using namespace lbug; using namespace common; +NodeProgressBarDisplay::~NodeProgressBarDisplay() { + std::unique_lock lock(callbackMutex); + for (auto& kv : queryCallbacks) { + kv.second.Release(); + } + queryCallbacks.clear(); +} + void NodeProgressBarDisplay::updateProgress(uint64_t queryID, double newPipelineProgress, uint32_t newNumPipelinesFinished) { if (numPipelines == 0) { diff --git a/tools/nodejs_api/src_cpp/node_query_result.cpp b/tools/nodejs_api/src_cpp/node_query_result.cpp index 24c18222ac..d202b705d4 100644 --- a/tools/nodejs_api/src_cpp/node_query_result.cpp +++ b/tools/nodejs_api/src_cpp/node_query_result.cpp @@ -25,6 +25,7 @@ Napi::Object NodeQueryResult::Init(Napi::Env env, Napi::Object exports) { InstanceMethod("getColumnNamesSync", &NodeQueryResult::GetColumnNamesSync), InstanceMethod("getQuerySummaryAsync", &NodeQueryResult::GetQuerySummaryAsync), InstanceMethod("getQuerySummarySync", &NodeQueryResult::GetQuerySummarySync), + InstanceMethod("toStringSync", &NodeQueryResult::GetToStringSync), InstanceMethod("close", &NodeQueryResult::Close)}); exports.Set("NodeQueryResult", t); @@ -228,6 +229,17 @@ Napi::Value NodeQueryResult::GetQuerySummarySync(const Napi::CallbackInfo& info) return env.Undefined(); } +Napi::Value NodeQueryResult::GetToStringSync(const Napi::CallbackInfo& info) { + Napi::Env env = info.Env(); + Napi::HandleScope scope(env); + try { + return Napi::String::New(env, this->queryResult->toString()); + } catch (const std::exception& exc) { + Napi::Error::New(env, std::string(exc.what())).ThrowAsJavaScriptException(); + } + return env.Undefined(); +} + void NodeQueryResult::Close(const Napi::CallbackInfo& info) { Napi::Env env = info.Env(); Napi::HandleScope scope(env); diff --git a/tools/nodejs_api/src_cpp/node_scan_replacement.cpp b/tools/nodejs_api/src_cpp/node_scan_replacement.cpp new file mode 100644 index 0000000000..8fdb757f46 --- /dev/null +++ b/tools/nodejs_api/src_cpp/node_scan_replacement.cpp @@ -0,0 +1,134 @@ +#include "include/node_scan_replacement.h" +#include "include/node_stream_scan.h" +#include "include/node_util.h" + +#include "function/table/bind_input.h" +#include "main/client_context.h" +#include "main/connection.h" + +#include + +using namespace lbug::common; +using namespace lbug::function; +using namespace lbug::main; + +namespace { + +std::mutex g_requestMutex; +std::atomic g_nextRequestId{1}; +std::unordered_map> g_chunkRequests; + +} // namespace + +void NodeStreamRegistry::registerSource(const std::string& name, Napi::ThreadSafeFunction tsf, + std::vector columnNames, std::vector columnTypes) { + std::lock_guard lock(mtx_); + auto state = std::make_shared(); + state->getChunkTsf = std::move(tsf); + state->columnNames = std::move(columnNames); + state->columnTypes = std::move(columnTypes); + sources_[name] = std::move(state); +} + +void NodeStreamRegistry::unregisterSource(const std::string& name) { + std::lock_guard lock(mtx_); + sources_.erase(name); +} + +std::vector NodeStreamRegistry::lookup(const std::string& name) const { + std::lock_guard lock(mtx_); + auto it = sources_.find(name); + if (it == sources_.end()) { + return {}; + } + return {reinterpret_cast(it->second.get())}; +} + +std::unique_ptr NodeStreamRegistry::replace( + std::span handles) const { + if (handles.empty()) { + return nullptr; + } + auto* statePtr = reinterpret_cast(handles[0]); + auto state = std::shared_ptr(statePtr, [](NodeStreamSourceState*) {}); + auto data = std::make_unique(); + data->func = NodeStreamScanFunction::getFunction(); + data->bindInput.addLiteralParam(Value::createValue(reinterpret_cast(statePtr))); + return data; +} + +NodeStreamChunkRequest* NodeStreamRegistry::getChunkRequest(uint64_t requestId) { + std::lock_guard lock(g_requestMutex); + auto it = g_chunkRequests.find(requestId); + return it != g_chunkRequests.end() ? it->second.get() : nullptr; +} + +void NodeStreamRegistry::setChunkRequest(uint64_t requestId, + std::unique_ptr req) { + std::lock_guard lock(g_requestMutex); + if (req) { + g_chunkRequests[requestId] = std::move(req); + } else { + g_chunkRequests.erase(requestId); + } +} + +uint64_t NodeStreamRegistry::nextRequestId() { + return g_nextRequestId++; +} + +static std::vector lookupNodeStream(const std::string& objectName, + void* registryVoid) { + auto* registry = static_cast(registryVoid); + return registry->lookup(objectName); +} + +static std::unique_ptr replaceNodeStream( + std::span handles, void* registryVoid) { + auto* registry = static_cast(registryVoid); + return registry->replace(handles); +} + +void addNodeScanReplacement(Connection* connection, NodeStreamRegistry* registry) { + auto lookup = [registry](const std::string& name) { + return lookupNodeStream(name, registry); + }; + auto replace = [registry](std::span handles) { + return replaceNodeStream(handles, registry); + }; + connection->getClientContext()->addScanReplace(ScanReplacement(std::move(lookup), replace)); +} + +void returnChunkFromJS(uint64_t requestId, Napi::Array rowsNapi, bool done) { + auto* req = NodeStreamRegistry::getChunkRequest(requestId); + if (!req) { + return; + } + std::vector> rows; + const size_t numRows = rowsNapi.Length(); + rows.reserve(numRows); + for (size_t r = 0; r < numRows; r++) { + Napi::Value rowVal = rowsNapi.Get(r); + std::vector row; + if (rowVal.IsArray()) { + auto arr = rowVal.As(); + for (size_t c = 0; c < arr.Length(); c++) { + row.push_back(Util::TransformNapiValue(arr.Get(c))); + } + } else if (rowVal.IsObject() && !rowVal.IsNull() && !rowVal.IsUndefined()) { + auto obj = rowVal.As(); + const auto& colNames = req->columnNames; + for (const auto& colName : colNames) { + row.push_back(Util::TransformNapiValue(obj.Get(colName))); + } + } + rows.push_back(std::move(row)); + } + { + std::lock_guard lock(req->mtx); + req->rows = std::move(rows); + req->done = done; + req->filled = true; + } + req->cv.notify_one(); +} diff --git a/tools/nodejs_api/src_cpp/node_stream_scan.cpp b/tools/nodejs_api/src_cpp/node_stream_scan.cpp new file mode 100644 index 0000000000..529c10b91d --- /dev/null +++ b/tools/nodejs_api/src_cpp/node_stream_scan.cpp @@ -0,0 +1,103 @@ +#include "include/node_stream_scan.h" +#include "include/node_scan_replacement.h" + +#include "binder/binder.h" +#include "common/constants.h" +#include "common/system_config.h" +#include "function/table/bind_input.h" +#include "processor/execution_context.h" +#include "processor/result/factorized_table.h" + +#include + +using namespace lbug::common; +using namespace lbug::function; + +namespace lbug { + +namespace { +std::mutex g_nodeStreamTableFuncMutex; +} + +static std::unique_ptr bindFunc(lbug::main::ClientContext*, + const TableFuncBindInput* input) { + auto* statePtr = reinterpret_cast(input->getLiteralVal(0)); + KU_ASSERT(statePtr != nullptr); + std::shared_ptr state(statePtr, [](NodeStreamSourceState*) {}); + auto columns = input->binder->createVariables(state->columnNames, state->columnTypes); + return std::make_unique(std::move(columns), state); +} + +static std::unique_ptr initSharedState( + const TableFuncInitSharedStateInput&) { + return std::make_unique(0); +} + +static std::unique_ptr initLocalState( + const TableFuncInitLocalStateInput&) { + return std::make_unique(); +} + +static offset_t tableFunc(const TableFuncInput& input, TableFuncOutput& output) { + auto* bindData = input.bindData->constPtrCast(); + auto& state = *bindData->state; + std::unique_lock streamLock(g_nodeStreamTableFuncMutex); + const uint64_t requestId = NodeStreamRegistry::nextRequestId(); + auto req = std::make_unique(); + req->columnNames = state.columnNames; + NodeStreamRegistry::setChunkRequest(requestId, std::move(req)); + NodeStreamChunkRequest* reqPtr = NodeStreamRegistry::getChunkRequest(requestId); + KU_ASSERT(reqPtr != nullptr); + + state.getChunkTsf.BlockingCall(&requestId, + [](Napi::Env env, Napi::Function jsCallback, const uint64_t* idPtr) { + jsCallback.Call({Napi::Number::New(env, static_cast(*idPtr))}); + }); + + std::vector> rowsCopy; + { + std::unique_lock lock(reqPtr->mtx); + reqPtr->cv.wait(lock, [reqPtr] { return reqPtr->filled; }); + rowsCopy = std::move(reqPtr->rows); + } + NodeStreamRegistry::setChunkRequest(requestId, nullptr); + streamLock.unlock(); + + const offset_t numRows = static_cast(rowsCopy.size()); + if (numRows == 0) { + return 0; + } + + const auto numCols = bindData->getNumColumns(); + const auto cap = std::min(numRows, static_cast(DEFAULT_VECTOR_CAPACITY)); + for (offset_t r = 0; r < cap; r++) { + for (auto c = 0u; c < numCols; c++) { + auto& vec = output.dataChunk.getValueVectorMutable(c); + if (c < rowsCopy[r].size() && !rowsCopy[r][c].isNull()) { + vec.setNull(r, false); + vec.copyFromValue(r, rowsCopy[r][c]); + } else { + vec.setNull(r, true); + } + } + } + output.dataChunk.state->getSelVectorUnsafe().setSelSize(cap); + return cap; +} + +static double progressFunc(TableFuncSharedState*) { + return 0.0; +} + +TableFunction NodeStreamScanFunction::getFunction() { + TableFunction func(name, std::vector{LogicalTypeID::POINTER}); + func.tableFunc = tableFunc; + func.bindFunc = bindFunc; + func.initSharedStateFunc = initSharedState; + func.initLocalStateFunc = initLocalState; + func.progressFunc = progressFunc; + func.canParallelFunc = [] { return false; }; + return func; +} + +} // namespace lbug diff --git a/tools/nodejs_api/src_js/connection.js b/tools/nodejs_api/src_js/connection.js index 575d9f27ff..d89e8f58de 100644 --- a/tools/nodejs_api/src_js/connection.js +++ b/tools/nodejs_api/src_js/connection.js @@ -121,13 +121,20 @@ class Connection { * Execute a prepared statement with the given parameters. * @param {lbug.PreparedStatement} preparedStatement the prepared statement to execute. * @param {Object} params a plain object mapping parameter names to values. - * @param {Function} [progressCallback] - Optional callback function that is invoked with the progress of the query execution. The callback receives three arguments: pipelineProgress, numPipelinesFinished, and numPipelines. - * @returns {Promise} a promise that resolves to the query result. The promise is rejected if there is an error. + * @param {Object|Function} [optionsOrProgressCallback] - Options { signal?: AbortSignal, progressCallback?: Function } or legacy progress callback. + * @returns {Promise} a promise that resolves to the query result. Rejects if error or options.signal is aborted. */ - execute(preparedStatement, params = {}, progressCallback) { + execute(preparedStatement, params = {}, optionsOrProgressCallback) { + const { signal, progressCallback } = this._normalizeQueryOptions(optionsOrProgressCallback); return new Promise((resolve, reject) => { + if (progressCallback !== undefined && typeof progressCallback !== "function") { + return reject(new Error("progressCallback must be a function.")); + } + if (optionsOrProgressCallback != null && typeof optionsOrProgressCallback !== "function" && typeof optionsOrProgressCallback !== "object") { + return reject(new Error("progressCallback must be a function.")); + } if ( - !typeof preparedStatement === "object" || + typeof preparedStatement !== "object" || preparedStatement.constructor.name !== "PreparedStatement" ) { return reject( @@ -147,11 +154,29 @@ class Connection { const value = params[key]; paramArray.push([key, value]); } - if (progressCallback && typeof progressCallback !== "function") { - return reject(new Error("progressCallback must be a function.")); + if (signal?.aborted) { + return reject(this._createAbortError()); + } + let abortListener; + const cleanup = () => { + if (signal && abortListener) { + signal.removeEventListener("abort", abortListener); + } + }; + if (signal) { + abortListener = () => { + this.interrupt(); + cleanup(); + reject(this._createAbortError()); + }; + signal.addEventListener("abort", abortListener); } this._getConnection() .then((connection) => { + if (signal?.aborted) { + cleanup(); + return reject(this._createAbortError()); + } const nodeQueryResult = new LbugNative.NodeQueryResult(); try { connection.executeAsync( @@ -159,7 +184,11 @@ class Connection { nodeQueryResult, paramArray, (err) => { + cleanup(); if (err) { + if (signal?.aborted && err.message === "Interrupted.") { + return reject(this._createAbortError()); + } return reject(err); } this._unwrapMultipleQueryResults(nodeQueryResult) @@ -173,10 +202,12 @@ class Connection { progressCallback ); } catch (e) { + cleanup(); return reject(e); } }) .catch((err) => { + cleanup(); return reject(err); }); }); @@ -261,26 +292,65 @@ class Connection { return new PreparedStatement(this, preparedStatement); } + /** + * Interrupt the currently executing query on this connection. + * No-op if the connection is not initialized or no query is running. + */ + interrupt() { + if (this._connection) { + this._connection.interrupt(); + } + } + /** * Execute a query. * @param {String} statement the statement to execute. - * @param {Function} [progressCallback] - Optional callback function that is invoked with the progress of the query execution. The callback receives three arguments: pipelineProgress, numPipelinesFinished, and numPipelines. - * @returns {Promise} a promise that resolves to the query result. The promise is rejected if there is an error. + * @param {Object|Function} [optionsOrProgressCallback] - Options object { signal?: AbortSignal, progressCallback?: Function } or legacy progress callback. + * @returns {Promise} a promise that resolves to the query result. The promise is rejected if there is an error or if options.signal is aborted. */ - query(statement, progressCallback) { + query(statement, optionsOrProgressCallback) { + const { signal, progressCallback } = this._normalizeQueryOptions(optionsOrProgressCallback); return new Promise((resolve, reject) => { + if (progressCallback !== undefined && typeof progressCallback !== "function") { + return reject(new Error("progressCallback must be a function.")); + } + if (optionsOrProgressCallback != null && typeof optionsOrProgressCallback !== "function" && typeof optionsOrProgressCallback !== "object") { + return reject(new Error("progressCallback must be a function.")); + } if (typeof statement !== "string") { return reject(new Error("statement must be a string.")); } - if (progressCallback && typeof progressCallback !== "function") { - return reject(new Error("progressCallback must be a function.")); + if (signal?.aborted) { + return reject(this._createAbortError()); + } + let abortListener; + const cleanup = () => { + if (signal && abortListener) { + signal.removeEventListener("abort", abortListener); + } + }; + if (signal) { + abortListener = () => { + this.interrupt(); + cleanup(); + reject(this._createAbortError()); + }; + signal.addEventListener("abort", abortListener); } this._getConnection() .then((connection) => { + if (signal?.aborted) { + cleanup(); + return reject(this._createAbortError()); + } const nodeQueryResult = new LbugNative.NodeQueryResult(); try { connection.queryAsync(statement, nodeQueryResult, (err) => { + cleanup(); if (err) { + if (signal?.aborted && err.message === "Interrupted.") { + return reject(this._createAbortError()); + } return reject(err); } this._unwrapMultipleQueryResults(nodeQueryResult) @@ -293,15 +363,176 @@ class Connection { }, progressCallback); } catch (e) { + cleanup(); return reject(e); } }) .catch((err) => { + cleanup(); return reject(err); }); }); } + _normalizeQueryOptions(optionsOrProgressCallback) { + if (optionsOrProgressCallback == null) { + return { signal: undefined, progressCallback: undefined }; + } + if (typeof optionsOrProgressCallback === "function") { + return { signal: undefined, progressCallback: optionsOrProgressCallback }; + } + if (typeof optionsOrProgressCallback === "object" && optionsOrProgressCallback !== null) { + return { + signal: optionsOrProgressCallback.signal, + progressCallback: optionsOrProgressCallback.progressCallback, + }; + } + return { signal: undefined, progressCallback: undefined }; + } + + _createAbortError() { + return new DOMException("The operation was aborted.", "AbortError"); + } + + /** + * Check that the connection is alive (e.g. for connection pools or health checks). + * Runs a trivial query; rejects if the connection is broken. + * @returns {Promise} resolves to true if the connection is OK. + */ + async ping() { + const result = await this.query("RETURN 1"); + const closeResult = (r) => { + if (Array.isArray(r)) { + r.forEach((q) => q.close()); + } else { + r.close(); + } + }; + closeResult(result); + return true; + } + + /** + * Run EXPLAIN on a Cypher statement and return the plan as a string. + * @param {string} statement – Cypher statement (e.g. "MATCH (a:person) RETURN a") + * @returns {Promise} the plan string (one row per line) + */ + async explain(statement) { + if (typeof statement !== "string") { + throw new Error("explain: statement must be a string."); + } + const trimmed = statement.trim(); + const explainStatement = trimmed.toUpperCase().startsWith("EXPLAIN") ? trimmed : "EXPLAIN " + trimmed; + const result = await this.query(explainStatement); + const single = Array.isArray(result) ? result[0] : result; + const rows = await single.getAll(); + single.close(); + if (rows.length === 0) { + return ""; + } + return rows + .map((row) => Object.values(row).join(" | ")) + .join("\n"); + } + + /** + * Get the number of nodes in a node table. Connection must be initialized. + * @param {string} nodeName – name of the node table (e.g. "User") + * @returns {number} count of nodes + */ + getNumNodes(nodeName) { + if (typeof nodeName !== "string") { + throw new Error("getNumNodes(nodeName): nodeName must be a string."); + } + const connection = this._getConnectionSync(); + return connection.getNumNodes(nodeName); + } + + /** + * Get the number of relationships in a rel table. Connection must be initialized. + * @param {string} relName – name of the rel table (e.g. "Follows") + * @returns {number} count of relationships + */ + getNumRels(relName) { + if (typeof relName !== "string") { + throw new Error("getNumRels(relName): relName must be a string."); + } + const connection = this._getConnectionSync(); + return connection.getNumRels(relName); + } + + /** + * Register a stream source for LOAD FROM name. The source must be AsyncIterable; each yielded + * value is a row (array of column values in schema order, or object keyed by column name). + * Call unregisterStream(name) when done or before reusing the name. + * @param {string} name – name used in Cypher: LOAD FROM name RETURN ... + * @param {AsyncIterable|Object>} source – async iterable of rows + * @param {{ columns: Array<{ name: string, type: string }> }} options – schema (required). type: INT64, INT32, DOUBLE, STRING, BOOL, DATE, etc. + */ + async registerStream(name, source, options = {}) { + if (typeof name !== "string") { + throw new Error("registerStream: name must be a string."); + } + const columns = options.columns; + if (!Array.isArray(columns) || columns.length === 0) { + throw new Error("registerStream: options.columns (array of { name, type }) is required."); + } + const conn = await this._getConnection(); + const it = source[Symbol.asyncIterator] ? source[Symbol.asyncIterator].call(source) : source; + const pending = []; + let consumerRunning = false; + + const toRows = (raw) => { + if (raw == null) return []; + if (Array.isArray(raw)) { + const first = raw[0]; + const isArrayOfRows = + raw.length > 0 && + (Array.isArray(first) || (typeof first === "object" && first !== null && !Array.isArray(first))); + return isArrayOfRows ? raw : [raw]; + } + return [raw]; + }; + + const runConsumer = async () => { + pending.sort((a, b) => a - b); + while (pending.length > 0) { + const requestId = pending.shift(); + try { + const n = await it.next(); + const { rows, done } = { rows: toRows(n.value), done: n.done }; + conn.returnChunk(requestId, rows, done); + } catch (e) { + conn.returnChunk(requestId, [], true); + } + } + consumerRunning = false; + }; + + const getChunk = (requestId) => { + pending.push(requestId); + if (!consumerRunning) { + consumerRunning = true; + setImmediate(() => runConsumer()); + } + }; + conn.registerStream(name, getChunk, columns); + } + + /** + * Unregister a stream source by name. + * @param {string} name – name passed to registerStream + */ + unregisterStream(name) { + if (typeof name !== "string") { + throw new Error("unregisterStream: name must be a string."); + } + if (!this._connection) { + return; + } + this._connection.unregisterStream(name); + } + /** * Execute a query synchronously. * @param {String} statement the statement to execute. This function blocks the main thread for the duration of the query, so use it with caution. @@ -396,6 +627,37 @@ class Connection { } } + /** + * Run a function inside a single write transaction. On success commits, on throw rolls back and rethrows. + * Uses Cypher BEGIN TRANSACTION / COMMIT / ROLLBACK under the hood. + * @param {Function} fn async function to run; can use this connection's query/execute inside. + * @returns {Promise<*>} the value returned by fn. + */ + async transaction(fn) { + if (typeof fn !== "function") { + throw new Error("transaction() requires a function."); + } + const closeResult = (r) => { + if (Array.isArray(r)) { + r.forEach((q) => q.close()); + } else { + r.close(); + } + }; + const beginRes = await this.query("BEGIN TRANSACTION"); + closeResult(beginRes); + try { + const result = await fn(); + const commitRes = await this.query("COMMIT"); + closeResult(commitRes); + return result; + } catch (e) { + const rollbackRes = await this.query("ROLLBACK"); + closeResult(rollbackRes); + throw e; + } + } + /** * Set the timeout for queries. Queries that take longer than the timeout * will be aborted. diff --git a/tools/nodejs_api/src_js/database.js b/tools/nodejs_api/src_js/database.js index dc70582494..b7d7a79d6f 100644 --- a/tools/nodejs_api/src_js/database.js +++ b/tools/nodejs_api/src_js/database.js @@ -2,6 +2,29 @@ const LbugNative = require("./lbug_native.js"); +/** Error code when the database file is locked by another process. */ +const LBUG_DATABASE_LOCKED = "LBUG_DATABASE_LOCKED"; + +const LOCK_ERROR_MESSAGE = "Could not set lock on file"; + +function isLockError(err) { + return err && typeof err.message === "string" && err.message.includes(LOCK_ERROR_MESSAGE); +} + +function normalizeInitError(err) { + if (isLockError(err)) { + const e = new Error(err.message); + e.code = LBUG_DATABASE_LOCKED; + e.cause = err; + return e; + } + return err; +} + +function sleep(ms) { + return new Promise((r) => setTimeout(r, ms)); +} + class Database { /** * Initialize a new Database object. Note that the initialization is done @@ -26,6 +49,8 @@ class Database { * the error occured. * @param {Boolean} enableChecksums If true, the database will use checksums to detect corruption in the * WAL file. + * @param {Number} [openLockRetryMs=5000] When the database file is locked, retry opening for up to this many ms + * (grace period). Only applies to async init(); set to 0 to fail immediately. Ignored for in-memory databases. */ constructor( databasePath, @@ -37,6 +62,7 @@ class Database { checkpointThreshold = -1, throwOnWalReplayFailure = true, enableChecksums = true, + openLockRetryMs = 5000, ) { if (!databasePath) { databasePath = ":memory:"; @@ -53,6 +79,9 @@ class Database { if (typeof checkpointThreshold !== "number" || maxDBSize < -1) { throw new Error("Checkpoint threshold must be a positive integer."); } + if (typeof openLockRetryMs !== "number" || openLockRetryMs < 0) { + throw new Error("openLockRetryMs must be a non-negative number."); + } bufferManagerSize = Math.floor(bufferManagerSize); maxDBSize = Math.floor(maxDBSize); checkpointThreshold = Math.floor(checkpointThreshold); @@ -70,6 +99,8 @@ class Database { this._isInitialized = false; this._initPromise = null; this._isClosed = false; + // Grace period for lock: retry for up to openLockRetryMs (0 = no retry). In-memory has no file lock. + this._openLockRetryMs = databasePath === ":memory:" ? 0 : Math.floor(openLockRetryMs); } /** @@ -91,27 +122,49 @@ class Database { /** * Initialize the database. Calling this function is optional, as the * database is initialized automatically when the first query is executed. + * When the file is locked, init() retries for up to openLockRetryMs (default 5s) before throwing. */ async init() { if (!this._isInitialized) { if (!this._initPromise) { - this._initPromise = new Promise((resolve, reject) => { - this._database.initAsync((err) => { - if (err) { - reject(err); - } else { - try { - this._isInitialized = true; - } catch (e) { - return reject(e); + const self = this; + const tryOnce = () => + new Promise((resolve, reject) => { + self._database.initAsync((err) => { + if (err) reject(err); + else { + self._isInitialized = true; + resolve(); } - resolve(); - } + }); }); - }); + const OPEN_LOCK_DELAY_MS = 200; + + this._initPromise = (async () => { + const start = Date.now(); + for (;;) { + if (self._isClosed) throw new Error("Database is closed."); + try { + await tryOnce(); + return; + } catch (err) { + if (!isLockError(err)) throw normalizeInitError(err); + if ( + self._openLockRetryMs <= 0 || + Date.now() - start >= self._openLockRetryMs + ) { + throw normalizeInitError(err); + } + await sleep(OPEN_LOCK_DELAY_MS); + } + } + })(); + } + try { + await this._initPromise; + } finally { + this._initPromise = null; } - await this._initPromise; - this._initPromise = null; } } @@ -127,7 +180,11 @@ class Database { if (this._isInitialized) { return; } - this._database.initSync(); + try { + this._database.initSync(); + } catch (err) { + throw normalizeInitError(err); + } this._isInitialized = true; } @@ -208,4 +265,6 @@ class Database { } } +Database.LBUG_DATABASE_LOCKED = LBUG_DATABASE_LOCKED; + module.exports = Database; diff --git a/tools/nodejs_api/src_js/index.js b/tools/nodejs_api/src_js/index.js index d7da3f72b5..3a0b35fd1c 100644 --- a/tools/nodejs_api/src_js/index.js +++ b/tools/nodejs_api/src_js/index.js @@ -4,12 +4,16 @@ const Connection = require("./connection.js"); const Database = require("./database.js"); const PreparedStatement = require("./prepared_statement.js"); const QueryResult = require("./query_result.js"); +const { createPool, Pool } = require("./pool.js"); module.exports = { Connection, Database, PreparedStatement, QueryResult, + createPool, + Pool, + LBUG_DATABASE_LOCKED: Database.LBUG_DATABASE_LOCKED, get VERSION() { return Database.getVersion(); }, diff --git a/tools/nodejs_api/src_js/index.mjs b/tools/nodejs_api/src_js/index.mjs index 9293e40683..bd921d7046 100644 --- a/tools/nodejs_api/src_js/index.mjs +++ b/tools/nodejs_api/src_js/index.mjs @@ -5,6 +5,9 @@ export const Database = lbug.Database; export const Connection = lbug.Connection; export const PreparedStatement = lbug.PreparedStatement; export const QueryResult = lbug.QueryResult; +export const createPool = lbug.createPool; +export const Pool = lbug.Pool; +export const LBUG_DATABASE_LOCKED = lbug.LBUG_DATABASE_LOCKED; export const VERSION = lbug.VERSION; export const STORAGE_VERSION = lbug.STORAGE_VERSION; export default lbug; diff --git a/tools/nodejs_api/src_js/lbug.d.ts b/tools/nodejs_api/src_js/lbug.d.ts index 977be1b157..b81fea3b06 100644 --- a/tools/nodejs_api/src_js/lbug.d.ts +++ b/tools/nodejs_api/src_js/lbug.d.ts @@ -22,6 +22,15 @@ export type ProgressCallback = ( numPipelines: number ) => void; +/** + * Options for query() and execute(). + * Use signal to cancel the operation via AbortController. + */ +export interface QueryOptions { + signal?: AbortSignal; + progressCallback?: ProgressCallback; +} + /** * Represents a node ID in the graph database. */ @@ -104,6 +113,63 @@ export interface SystemConfig { checkpointThreshold?: number; } +/** + * Options for createPool(). Same shape as Database constructor args (except path). + */ +export interface PoolDatabaseOptions { + bufferManagerSize?: number; + enableCompression?: boolean; + readOnly?: boolean; + maxDBSize?: number; + autoCheckpoint?: boolean; + checkpointThreshold?: number; + throwOnWalReplayFailure?: boolean; + enableChecksums?: boolean; + openLockRetryMs?: number; +} + +/** + * Options for createPool(). + */ +export interface PoolOptions { + /** Database file path (default ":memory:") */ + databasePath?: string; + /** Same shape as Database constructor options (bufferManagerSize, readOnly, etc.) */ + databaseOptions?: PoolDatabaseOptions; + /** Minimum connections to keep (default 0) */ + minSize?: number; + /** Maximum connections in the pool (required) */ + maxSize: number; + /** Max time to wait for acquire in ms (0 = wait forever, default 0) */ + acquireTimeoutMillis?: number; + /** If true, call conn.ping() before handing out (default false) */ + validateOnAcquire?: boolean; +} + +/** + * Connection pool: acquire/release or run(fn). One shared Database, up to maxSize Connection instances. + */ +export interface Pool { + /** Acquire a connection; must call release(conn) when done. Prefer run(fn) to avoid leaks. */ + acquire(): Promise; + /** Return a connection to the pool. */ + release(conn: Connection): void; + /** Run fn(conn); connection is released in finally (on success or throw). */ + run(fn: (conn: Connection) => Promise): Promise; + /** Close pool: reject new/pending acquire, then close all connections and database. */ + close(): Promise; +} + +/** Pool constructor (use createPool() instead of new Pool()). */ +export type PoolConstructor = new (options: PoolOptions) => Pool; + +/** + * Create a connection pool. + * @param options Pool options (maxSize required; databasePath, databaseOptions, minSize, acquireTimeoutMillis, validateOnAcquire optional) + * @returns Pool instance + */ +export function createPool(options: PoolOptions): Pool; + /** * Represents a Lbug database instance. */ @@ -117,6 +183,9 @@ export class Database { * @param maxDBSize Maximum size of the database in bytes * @param autoCheckpoint Whether to enable automatic checkpoints * @param checkpointThreshold Threshold for automatic checkpoints + * @param throwOnWalReplayFailure If true, WAL replay failures throw; otherwise replay stops at error + * @param enableChecksums If true, use checksums to detect WAL corruption + * @param openLockRetryMs When the file is locked, retry opening for up to this many ms (default 5000). Set 0 to fail immediately. Only for async init(); ignored for :memory: */ constructor( databasePath?: string, @@ -125,12 +194,16 @@ export class Database { readOnly?: boolean, maxDBSize?: number, autoCheckpoint?: boolean, - checkpointThreshold?: number + checkpointThreshold?: number, + throwOnWalReplayFailure?: boolean, + enableChecksums?: boolean, + openLockRetryMs?: number ); /** * Initialize the database. Calling this function is optional, as the * database is initialized automatically when the first query is executed. + * When the file is locked, retries for up to openLockRetryMs (default 5s) before throwing. * @returns Promise that resolves when initialization completes */ init(): Promise; @@ -200,6 +273,12 @@ export class Connection { */ setQueryTimeout(timeoutInMs: number): void; + /** + * Interrupt the currently executing query on this connection. + * No-op if the connection is not initialized or no query is running. + */ + interrupt(): void; + /** * Close the connection. * @returns Promise that resolves when connection is closed @@ -215,13 +294,13 @@ export class Connection { * Execute a prepared statement. * @param preparedStatement The prepared statement to execute * @param params Parameters for the query as a plain object - * @param progressCallback Optional progress callback - * @returns Promise that resolves to the query result(s) + * @param optionsOrProgressCallback Options (e.g. signal for abort) or legacy progress callback + * @returns Promise that resolves to the query result(s). Rejects with DOMException AbortError if signal is aborted. */ execute( preparedStatement: PreparedStatement, params?: Record, - progressCallback?: ProgressCallback + optionsOrProgressCallback?: QueryOptions | ProgressCallback ): Promise; /** @@ -252,20 +331,73 @@ export class Connection { /** * Execute a query. * @param statement The statement to execute - * @param progressCallback Optional progress callback - * @returns Promise that resolves to the query result(s) + * @param optionsOrProgressCallback Options (e.g. signal for abort) or legacy progress callback + * @returns Promise that resolves to the query result(s). Rejects with DOMException AbortError if signal is aborted. */ query( statement: string, - progressCallback?: ProgressCallback + optionsOrProgressCallback?: QueryOptions | ProgressCallback ): Promise; + /** + * Run a function inside a single write transaction. Commits on success, rolls back on throw. + * @param fn Async function that can use this connection's query/execute + * @returns Promise that resolves to the return value of fn + */ + transaction(fn: () => Promise): Promise; + /** * Execute a query synchronously. * @param statement The statement to execute * @returns The query result(s) */ querySync(statement: string): QueryResult | QueryResult[]; + + /** + * Check that the connection is alive (e.g. for pools or health checks). + * @returns Promise that resolves to true if OK, rejects if connection is broken + */ + ping(): Promise; + + /** + * Run EXPLAIN on a Cypher statement and return the plan as a string. + * @param statement Cypher statement (e.g. "MATCH (a:person) RETURN a") + * @returns Promise that resolves to the plan string (one row per line) + */ + explain(statement: string): Promise; + + /** + * Get the number of nodes in a node table. Connection must be initialized. + * @param nodeName Name of the node table (e.g. "User") + * @returns Count of nodes + */ + getNumNodes(nodeName: string): number; + + /** + * Get the number of relationships in a rel table. Connection must be initialized. + * @param relName Name of the rel table (e.g. "Follows") + * @returns Count of relationships + */ + getNumRels(relName: string): number; + + /** + * Register a stream source for LOAD FROM name. Source must be AsyncIterable of rows (array or object). + * Unregister with unregisterStream(name) when done. + * @param name Name used in Cypher: LOAD FROM name RETURN ... + * @param source AsyncIterable of rows (array of column values or object keyed by column name) + * @param options.columns Schema: array of { name: string, type: string } (type: INT64, INT32, DOUBLE, STRING, BOOL, DATE, etc.) + */ + registerStream( + name: string, + source: AsyncIterable>, + options: { columns: Array<{ name: string; type: string }> } + ): Promise; + + /** + * Unregister a stream source by name. + * @param name Name passed to registerStream + */ + unregisterStream(name: string): void; } /** @@ -286,11 +418,25 @@ export class PreparedStatement { getErrorMessage(): string; } +/** + * Query summary with compiling and execution times (milliseconds). + */ +export interface QuerySummary { + compilingTime: number; + executionTime: number; +} + /** * Represents the results of a query execution. * Note: This class is created internally by Connection query methods. + * Supports async iteration: for await (const row of result) { ... } */ -export class QueryResult { +export class QueryResult implements AsyncIterable | null> { + /** + * Async iterator for row-by-row consumption (for await...of). + */ + [Symbol.asyncIterator](): AsyncIterator | null>; + /** * Reset the iterator for reading results. */ @@ -320,6 +466,18 @@ export class QueryResult { */ getNextSync(): Record | null; + /** + * Return the query result as a string (header + rows). For failed queries returns the error message. + * @returns String representation of the result + */ + toString(): string; + + /** + * Return a Node.js Readable stream (object mode) that yields one row per chunk. + * @returns Readable stream of row objects + */ + toStream(): import("stream").Readable; + /** * Iterate through the query result with callback functions. * @param resultCallback Callback function called for each row @@ -378,12 +536,30 @@ export class QueryResult { */ getColumnNamesSync(): string[]; + /** + * Get the query summary (compiling and execution time in milliseconds). + * @returns Promise that resolves to the query summary + */ + getQuerySummary(): Promise; + + /** + * Get the query summary synchronously. + * @returns The query summary + */ + getQuerySummarySync(): QuerySummary; + /** * Close the result set and release resources. */ close(): void; } +/** + * Error code when the database file is locked by another process. + * Use with init() / initSync() or first query: catch and check err.code === LBUG_DATABASE_LOCKED. + */ +export const LBUG_DATABASE_LOCKED: "LBUG_DATABASE_LOCKED"; + /** * Default export for the Lbug module. */ @@ -392,6 +568,9 @@ declare const lbug: { Connection: typeof Connection; PreparedStatement: typeof PreparedStatement; QueryResult: typeof QueryResult; + createPool: typeof createPool; + Pool: PoolConstructor; + LBUG_DATABASE_LOCKED: typeof LBUG_DATABASE_LOCKED; VERSION: string; STORAGE_VERSION: bigint; }; diff --git a/tools/nodejs_api/src_js/pool.js b/tools/nodejs_api/src_js/pool.js new file mode 100644 index 0000000000..cc0d585113 --- /dev/null +++ b/tools/nodejs_api/src_js/pool.js @@ -0,0 +1,222 @@ +"use strict"; + +const Database = require("./database.js"); +const Connection = require("./connection.js"); + +const DEFAULT_MIN_SIZE = 0; +const DEFAULT_ACQUIRE_TIMEOUT_MILLIS = 0; +const DEFAULT_VALIDATE_ON_ACQUIRE = false; + +function createDatabase(path, databaseOptions) { + const o = databaseOptions || {}; + return new Database( + path, + o.bufferManagerSize ?? 0, + o.enableCompression ?? true, + o.readOnly ?? false, + o.maxDBSize ?? 0, + o.autoCheckpoint ?? true, + o.checkpointThreshold ?? -1, + o.throwOnWalReplayFailure ?? true, + o.enableChecksums ?? true, + o.openLockRetryMs ?? 5000 + ); +} + +class Pool { + constructor(options) { + if (options == null || typeof options !== "object") { + throw new Error("createPool(options): options must be an object."); + } + const path = options.databasePath; + if (path !== undefined && path !== null && path !== "" && typeof path !== "string") { + throw new Error("createPool: databasePath must be a string or empty."); + } + const maxSize = options.maxSize; + if (typeof maxSize !== "number" || maxSize < 1 || !Number.isInteger(maxSize)) { + throw new Error("createPool: maxSize must be a positive integer."); + } + const minSize = options.minSize ?? DEFAULT_MIN_SIZE; + if (typeof minSize !== "number" || minSize < 0 || !Number.isInteger(minSize) || minSize > maxSize) { + throw new Error("createPool: minSize must be a non-negative integer not greater than maxSize."); + } + const acquireTimeoutMillis = options.acquireTimeoutMillis ?? DEFAULT_ACQUIRE_TIMEOUT_MILLIS; + if (typeof acquireTimeoutMillis !== "number" || acquireTimeoutMillis < 0) { + throw new Error("createPool: acquireTimeoutMillis must be a non-negative number."); + } + const validateOnAcquire = options.validateOnAcquire ?? DEFAULT_VALIDATE_ON_ACQUIRE; + + this._databasePath = path == null || path === "" ? ":memory:" : path; + this._databaseOptions = options.databaseOptions || null; + this._maxSize = maxSize; + this._minSize = minSize; + this._acquireTimeoutMillis = acquireTimeoutMillis; + this._validateOnAcquire = Boolean(validateOnAcquire); + + this._database = null; + this._idle = []; + this._allConnections = []; + this._checkedOut = new Set(); + this._waiters = []; + this._closed = false; + } + + _ensureDatabase() { + if (this._database === null) { + this._database = createDatabase(this._databasePath, this._databaseOptions); + } + return this._database; + } + + _createConnection() { + const db = this._ensureDatabase(); + const conn = new Connection(db); + this._allConnections.push(conn); + return conn; + } + + _wakeNextWaiter(conn) { + while (this._waiters.length > 0) { + const w = this._waiters.shift(); + if (w.timer) clearTimeout(w.timer); + this._checkedOut.add(conn); + w.resolve(conn); + return; + } + this._idle.push(conn); + } + + /** + * Acquire a connection from the pool. Must call release(conn) when done (e.g. in finally). + * Prefer pool.run(fn) to avoid forgetting release. + * @returns {Promise} + */ + acquire() { + if (this._closed) { + return Promise.reject(new Error("Pool is closed.")); + } + + while (this._allConnections.length < this._minSize) { + this._idle.push(this._createConnection()); + } + if (this._idle.length > 0) { + const conn = this._idle.shift(); + this._checkedOut.add(conn); + if (this._validateOnAcquire) { + return conn.ping().then(() => conn); + } + return Promise.resolve(conn); + } + if (this._allConnections.length < this._maxSize) { + const conn = this._createConnection(); + this._checkedOut.add(conn); + if (this._validateOnAcquire) { + return conn.ping().then(() => conn); + } + return Promise.resolve(conn); + } + + return new Promise((resolve, reject) => { + const entry = { + resolve, + reject, + timer: null, + }; + if (this._acquireTimeoutMillis > 0) { + entry.timer = setTimeout(() => { + const i = this._waiters.indexOf(entry); + if (i !== -1) { + this._waiters.splice(i, 1); + reject(new Error("Pool acquire timed out.")); + } + }, this._acquireTimeoutMillis); + } + this._waiters.push(entry); + }); + } + + /** + * Return a connection to the pool. No-op if pool is closed. + * @param {lbug.Connection} conn + */ + release(conn) { + if (this._closed) { + return; + } + if ( + conn == null || + typeof conn !== "object" || + conn.constructor.name !== "Connection" + ) { + throw new Error("release(conn): conn must be a Connection from this pool."); + } + if (!this._checkedOut.has(conn)) { + throw new Error("release(conn): connection not from this pool or already released."); + } + this._checkedOut.delete(conn); + this._wakeNextWaiter(conn); + } + + /** + * Run a function with a connection; connection is released in finally (on success or throw). + * @template T + * @param {(conn: lbug.Connection) => Promise} fn + * @returns {Promise} + */ + async run(fn) { + if (typeof fn !== "function") { + throw new Error("pool.run(fn): fn must be a function."); + } + const conn = await this.acquire(); + try { + return await fn(conn); + } finally { + this.release(conn); + } + } + + /** + * Close the pool: reject new and pending acquire, then close all connections and the database. + * @returns {Promise} + */ + async close() { + if (this._closed) { + return; + } + this._closed = true; + const err = new Error("Pool is closed."); + for (const w of this._waiters) { + if (w.timer) clearTimeout(w.timer); + w.reject(err); + } + this._waiters.length = 0; + this._idle.length = 0; + for (const conn of this._allConnections) { + try { + await conn.close(); + } catch (_) { + // ignore + } + } + this._allConnections.length = 0; + if (this._database) { + try { + await this._database.close(); + } catch (_) { + // ignore + } + this._database = null; + } + } +} + +/** + * Create a connection pool. One shared Database; up to maxSize Connection instances. + * @param {lbug.PoolOptions} options + * @returns {lbug.Pool} + */ +function createPool(options) { + return new Pool(options); +} + +module.exports = { createPool, Pool }; diff --git a/tools/nodejs_api/src_js/query_result.js b/tools/nodejs_api/src_js/query_result.js index 944f23db9c..24cfe21cb5 100644 --- a/tools/nodejs_api/src_js/query_result.js +++ b/tools/nodejs_api/src_js/query_result.js @@ -1,6 +1,7 @@ "use strict"; const assert = require("assert"); +const { Readable } = require("stream"); class QueryResult { /** @@ -75,6 +76,15 @@ class QueryResult { return this._queryResult.getNextSync(); } + /** + * Return the query result as a string (header + rows). For failed queries returns the error message. + * @returns {string} + */ + toString() { + this._checkClosed(); + return this._queryResult.toStringSync(); + } + /** * Iterate through the query result with callback functions. * @param {Function} resultCallback the callback function that is called for each row of the query result. @@ -96,6 +106,64 @@ class QueryResult { }); } + /** + * Async iterator for consuming the result row-by-row (e.g. `for await (const row of result)`). + * Does not materialize the full result in memory. + * @returns {AsyncIterator} + */ + [Symbol.asyncIterator]() { + const self = this; + return { + async next() { + self._checkClosed(); + if (!self.hasNext()) { + return { done: true }; + } + try { + const value = await self.getNext(); + if (value === null) { + return { done: true }; + } + return { value, done: false }; + } catch (err) { + return Promise.reject(err); + } + }, + }; + } + + /** + * Return a Node.js Readable stream (object mode) that yields one row per chunk. + * Useful for piping or integrating with stream consumers. Does not require native changes. + * @returns {stream.Readable} Readable stream of row objects. + */ + toStream() { + const self = this; + return new Readable({ + objectMode: true, + read() { + if (self._isClosed) { + return this.push(null); + } + if (!self.hasNext()) { + return this.push(null); + } + self.getNext() + .then((row) => { + if (row !== null && row !== undefined) { + this.push(row); + } + if (!self.hasNext()) { + this.push(null); + } + }) + .catch((err) => { + this.destroy(err); + }); + }, + }); + } + /** * Get all rows of the query result. * @returns {Promise>} a promise that resolves to all rows of the query result. The promise is rejected if there is an error. @@ -229,13 +297,16 @@ class QueryResult { } /** - * Internal function to check if the query result is closed. - * @throws {Error} if the query result is closed. + * Internal function to check if the query result or its connection is closed. + * @throws {Error} if the query result is closed or the connection is closed. */ _checkClosed() { if (this._isClosed) { throw new Error("Query result is closed."); } + if (this._connection._isClosed) { + throw new Error("Connection is closed."); + } } } diff --git a/tools/nodejs_api/test/test.js b/tools/nodejs_api/test/test.js index 4efa4b7e6e..3f8d72a9d0 100644 --- a/tools/nodejs_api/test/test.js +++ b/tools/nodejs_api/test/test.js @@ -10,6 +10,14 @@ describe("lbug", () => { before(() => { return initTests(); }); + after(async () => { + if (global.conn && !global.conn._isClosed) { + await global.conn.close().catch(() => {}); + } + if (global.db && !global.db._isClosed) { + await global.db.close().catch(() => {}); + } + }); importTest("Database", "./test_database.js"); importTest("Connection", "./test_connection.js"); importTest("Query result", "./test_query_result.js"); @@ -18,4 +26,6 @@ describe("lbug", () => { importTest("Concurrent query execution", "./test_concurrency.js"); importTest("Version", "./test_version.js"); importTest("Synchronous API", "./test_sync_api.js"); + importTest("registerStream / LOAD FROM stream", "./test_register_stream.js"); + importTest("Resilience (close during/after use)", "./test_resilience.js"); }); diff --git a/tools/nodejs_api/test/test_connection.js b/tools/nodejs_api/test/test_connection.js index 34345ad26a..29a3370b27 100644 --- a/tools/nodejs_api/test/test_connection.js +++ b/tools/nodejs_api/test/test_connection.js @@ -122,6 +122,50 @@ describe("Execute", function () { }); }); +describe("ping", function () { + it("should resolve to true when connection is alive", async function () { + const ok = await conn.ping(); + assert.strictEqual(ok, true); + }); +}); + +describe("transaction", function () { + it("should commit and return fn result on success", async function () { + const result = await conn.transaction(async () => { + const q = await conn.query("RETURN 42 AS x"); + const rows = await q.getAll(); + q.close(); + return rows[0].x; + }); + assert.equal(result, 42); + }); + + it("should rollback and rethrow on fn error", async function () { + const err = new Error("tx abort"); + try { + await conn.transaction(async () => { + await conn.query("RETURN 1"); + throw err; + }); + assert.fail("transaction should have thrown"); + } catch (e) { + assert.strictEqual(e, err); + } + const q = await conn.query("RETURN 1"); + assert.isTrue(q.hasNext()); + q.close(); + }); + + it("should reject non-function", async function () { + try { + await conn.transaction("not a function"); + assert.fail("transaction should have thrown"); + } catch (e) { + assert.equal(e.message, "transaction() requires a function."); + } + }); +}); + describe("Query", function () { it("should run a valid query", async function () { const queryResult = await conn.query("MATCH (a:person) RETURN COUNT(*)"); @@ -214,6 +258,70 @@ describe("Timeout", function () { }); }); +describe("Interrupt", function () { + it("should abort a long-running query when interrupt() is called", async function () { + if (process.platform === "win32") { + this.skip(); + } + this.timeout(5000); + const newConn = new lbug.Connection(db); + await newConn.init(); + const longQuery = + "UNWIND RANGE(1, 30000) AS x UNWIND RANGE(1, 30000) AS y RETURN COUNT(x + y);"; + const queryPromise = newConn.query(longQuery); + setTimeout(() => newConn.interrupt(), 100); + try { + await queryPromise; + assert.fail("No error thrown when the query was interrupted."); + } catch (err) { + assert.equal(err.message, "Interrupted."); + } + }); +}); + +describe("AbortSignal", function () { + it("should reject with AbortError when signal is already aborted before query starts", async function () { + const ac = new AbortController(); + ac.abort(); + try { + await conn.query("RETURN 1", { signal: ac.signal }); + assert.fail("No error thrown when signal was already aborted."); + } catch (err) { + assert.equal(err.name, "AbortError"); + assert.equal(err.message, "The operation was aborted."); + } + }); + + it("should reject with AbortError when signal is aborted during query", async function () { + const newConn = new lbug.Connection(db); + await newConn.init(); + const ac = new AbortController(); + const longQuery = + "UNWIND RANGE(1, 30000) AS x UNWIND RANGE(1, 30000) AS y RETURN COUNT(x + y);"; + const queryPromise = newConn.query(longQuery, { signal: ac.signal }); + setTimeout(() => ac.abort(), 100); + try { + await queryPromise; + assert.fail("No error thrown when signal was aborted during query."); + } catch (err) { + assert.equal(err.name, "AbortError"); + } + }); + + it("should work with progressCallback in options object", async function () { + let progressCalled = false; + const result = await conn.query("RETURN 1", { + progressCallback: () => { + progressCalled = true; + }, + }); + assert.exists(result); + const rows = Array.isArray(result) ? result : [result]; + assert.isAtLeast(rows.length, 1); + rows.forEach((r) => r.close()); + }); +}); + describe("Close", function () { it("should close the connection", async function () { const newConn = new lbug.Connection(db); diff --git a/tools/nodejs_api/test/test_database.js b/tools/nodejs_api/test/test_database.js index 06ba4c24f4..57cb692c20 100644 --- a/tools/nodejs_api/test/test_database.js +++ b/tools/nodejs_api/test/test_database.js @@ -9,15 +9,17 @@ const spwan = require("child_process").spawn; const openDatabaseOnSubprocess = (dbPath) => { return new Promise((resolve, _) => { const node = process.argv[0]; + // Use env vars so Windows paths with backslashes don't break the -e code string + const env = { ...process.env, LBUG_PATH: lbugPath, DB_PATH: dbPath }; const code = ` (async() => { - const lbug = require("${lbugPath}"); - const db = new lbug.Database("${dbPath}", 1 << 28); + const lbug = require(process.env.LBUG_PATH); + const db = new lbug.Database(process.env.DB_PATH, 1 << 28); await db.init(); console.log("Database initialized."); })(); `; - const child = spwan(node, ["-e", code]); + const child = spwan(node, ["-e", code], { env }); let stdout = ""; let stderr = ""; child.stdout.on("data", (data) => { @@ -374,10 +376,6 @@ describe("Database constructor", function () { describe("Database close", function () { it("should allow initializing a new database after closing", async function () { - if (process.platform === "win32") { - this._runnable.title += " (skipped: not implemented on Windows)"; - this.skip(); - } const tmpDbPath = await new Promise((resolve, reject) => { tmp.dir({ unsafeCleanup: true }, (err, path, _) => { if (err) { @@ -389,7 +387,6 @@ describe("Database close", function () { const dbPath = path.join(tmpDbPath, "db.kz"); const testDb = new lbug.Database(dbPath, 1 << 28 /* 256MB */); await testDb.init(); - // FIXME: doesn't work properly on windows let subProcessResult = await openDatabaseOnSubprocess(dbPath); assert.notEqual(subProcessResult.code, 0); assert.include( @@ -507,9 +504,17 @@ describe("Database close", function () { assert.deepEqual(tuple, { "+(1,1)": 2 }); testDb.closeSync(); assert.isTrue(testDb._isClosed); - assert.throws(() => conn.querySync("RETURN 1+1"), Error, "Runtime exception: The current operation is not allowed because the parent database is closed."); + assert.throws( + () => conn.querySync("RETURN 1+1"), + Error, + /(Runtime exception:.*parent database is closed|Connection is closed\.)/ + ); conn.closeSync(); assert.isTrue(conn._isClosed); - assert.throws(() => res.resetIterator(), Error, "Runtime exception: The current operation is not allowed because the parent database is closed."); + assert.throws( + () => res.resetIterator(), + Error, + /(Runtime exception:.*parent database is closed|Connection is closed\.)/ + ); }); }); diff --git a/tools/nodejs_api/test/test_pool.js b/tools/nodejs_api/test/test_pool.js new file mode 100644 index 0000000000..a22c9f19cc --- /dev/null +++ b/tools/nodejs_api/test/test_pool.js @@ -0,0 +1,163 @@ +require("./common.js"); +const { assert } = require("chai"); +const path = require("path"); +const tmp = require("tmp"); + +describe("Connection pool", function () { + let pool; + let tmpDir; + + before(async function () { + await initTests(); + tmpDir = await new Promise((resolve, reject) => { + tmp.dir({ unsafeCleanup: true }, (err, p) => (err ? reject(err) : resolve(p))); + }); + }); + + after(async function () { + if (tmpDir) tmp.setGracefulCleanup(); + }); + + afterEach(async function () { + if (pool && !pool._closed) { + await pool.close(); + } + }); + + it("createPool requires maxSize", function () { + assert.throws(() => lbug.createPool({}), /maxSize/); + assert.throws(() => lbug.createPool({ databasePath: ":memory:" }), /maxSize/); + assert.doesNotThrow(() => lbug.createPool({ maxSize: 5 })); + }); + + it("pool.run(fn) runs with a connection and releases on success", async function () { + pool = lbug.createPool({ + databasePath: path.join(tmpDir, "p1.kz"), + maxSize: 2, + databaseOptions: { bufferManagerSize: 1 << 24 }, + }); + const result = await pool.run(async (conn) => { + const r = await conn.query("RETURN 1 AS x"); + const rows = await r.getAll(); + r.close(); + return rows; + }); + assert.lengthOf(result, 1); + assert.strictEqual(result[0].x, 1); + }); + + it("pool.run(fn) releases on throw", async function () { + pool = lbug.createPool({ + databasePath: path.join(tmpDir, "p2.kz"), + maxSize: 2, + databaseOptions: { bufferManagerSize: 1 << 24 }, + }); + let err; + try { + await pool.run(async () => { + throw new Error("fail"); + }); + } catch (e) { + err = e; + } + assert.instanceOf(err, Error); + assert.include(err.message, "fail"); + const again = await pool.run(async (conn) => { + const r = await conn.query("RETURN 2 AS y"); + const rows = await r.getAll(); + r.close(); + return rows; + }); + assert.lengthOf(again, 1); + assert.strictEqual(again[0].y, 2); + }); + + it("acquire/release and multiple concurrent cycles", async function () { + pool = lbug.createPool({ + databasePath: path.join(tmpDir, "p3.kz"), + maxSize: 3, + databaseOptions: { bufferManagerSize: 1 << 24 }, + }); + const conn1 = await pool.acquire(); + const conn2 = await pool.acquire(); + const conn3 = await pool.acquire(); + const r1 = await conn1.query("RETURN 1 AS a"); + const r2 = await conn2.query("RETURN 2 AS b"); + const r3 = await conn3.query("RETURN 3 AS c"); + assert.strictEqual((await r1.getAll())[0].a, 1); + assert.strictEqual((await r2.getAll())[0].b, 2); + assert.strictEqual((await r3.getAll())[0].c, 3); + r1.close(); + r2.close(); + r3.close(); + pool.release(conn1); + pool.release(conn2); + pool.release(conn3); + const conn4 = await pool.acquire(); + const r4 = await conn4.query("RETURN 4 AS d"); + assert.strictEqual((await r4.getAll())[0].d, 4); + r4.close(); + pool.release(conn4); + }); + + it("pool does not exceed maxSize", async function () { + pool = lbug.createPool({ + databasePath: path.join(tmpDir, "p4.kz"), + maxSize: 2, + databaseOptions: { bufferManagerSize: 1 << 24 }, + }); + const c1 = await pool.acquire(); + const c2 = await pool.acquire(); + let resolved = false; + const p3 = pool.acquire().then((c) => { + resolved = true; + pool.release(c); + }); + await new Promise((r) => setImmediate(r)); + assert.isFalse(resolved); + pool.release(c1); + await p3; + assert.isTrue(resolved); + pool.release(c2); + }); + + it("acquire() rejects after acquireTimeoutMillis when no connection available", async function () { + pool = lbug.createPool({ + databasePath: path.join(tmpDir, "p5a.kz"), + maxSize: 1, + acquireTimeoutMillis: 80, + databaseOptions: { bufferManagerSize: 1 << 24 }, + }); + const c1 = await pool.acquire(); + let timeoutErr; + try { + await pool.acquire(); + } catch (e) { + timeoutErr = e; + } + assert.instanceOf(timeoutErr, Error); + assert.include(timeoutErr.message, "timed out"); + pool.release(c1); + }); + + it("pool.close() prevents new acquire and closes all", async function () { + pool = lbug.createPool({ + databasePath: path.join(tmpDir, "p5.kz"), + maxSize: 2, + databaseOptions: { bufferManagerSize: 1 << 24 }, + }); + await pool.run(async (conn) => { + const r = await conn.query("RETURN 1"); + r.close(); + }); + await pool.close(); + let closedErr; + try { + await pool.acquire(); + } catch (e) { + closedErr = e; + } + assert.instanceOf(closedErr, Error); + assert.include(closedErr.message, "closed"); + }); +}); diff --git a/tools/nodejs_api/test/test_query_result.js b/tools/nodejs_api/test/test_query_result.js index 5f84b3f32d..ad306c141e 100644 --- a/tools/nodejs_api/test/test_query_result.js +++ b/tools/nodejs_api/test/test_query_result.js @@ -66,22 +66,30 @@ describe("Get next", function () { } }); - it("should throw an error if there is no next tuple", async function () { + it("should return null when no more tuples", async function () { const queryResult = await conn.query( "MATCH (a:person) RETURN a.ID ORDER BY a.ID" ); for (let i = 0; i < 8; ++i) { await queryResult.getNext(); } - try { - await queryResult.getNext(); - assert.fail("No error thrown when there is no next tuple"); - } catch (err) { - assert.equal( - err.message, - "Runtime exception: No more tuples in QueryResult, Please check hasNext() before calling getNext()." - ); + const exhausted = await queryResult.getNext(); + assert.isNull(exhausted, "getNext() returns null when no more tuples"); + }); + + it("getNext() returns null exactly when hasNext() is false", async function () { + const queryResult = await conn.query( + "MATCH (a:person) RETURN a.ID ORDER BY a.ID" + ); + let count = 0; + while (queryResult.hasNext()) { + const row = await queryResult.getNext(); + assert.isNotNull(row, "getNext() must return value when hasNext() is true"); + count++; } + assert.equal(count, 8); + const afterExhausted = await queryResult.getNext(); + assert.isNull(afterExhausted, "getNext() must return null when hasNext() was false"); }); }); @@ -211,6 +219,46 @@ describe("Get query summary", function () { }); }); +describe("Async iterator (for await...of)", function () { + it("should iterate rows same as getNext", async function () { + const queryResult = await conn.query( + "MATCH (a:person) RETURN a.ID ORDER BY a.ID" + ); + const ids = []; + for await (const row of queryResult) { + ids.push(row["a.ID"]); + } + assert.deepEqual(ids, PERSON_IDS); + }); + + it("should not materialize full result in memory", async function () { + const queryResult = await conn.query( + "MATCH (a:person) RETURN a.ID ORDER BY a.ID" + ); + let count = 0; + for await (const row of queryResult) { + count++; + assert.equal(row["a.ID"], PERSON_IDS[count - 1]); + } + assert.equal(count, PERSON_IDS.length); + }); +}); + +describe("toStream", function () { + it("should yield rows as Readable stream", async function () { + const queryResult = await conn.query( + "MATCH (a:person) RETURN a.ID ORDER BY a.ID" + ); + const stream = queryResult.toStream(); + const rows = []; + for await (const row of stream) { + rows.push(row); + } + const ids = rows.map((r) => r["a.ID"]); + assert.deepEqual(ids, PERSON_IDS); + }); +}); + describe("Close", function () { it("should close the query result", async function () { const queryResult = await conn.query( diff --git a/tools/nodejs_api/test/test_register_stream.js b/tools/nodejs_api/test/test_register_stream.js new file mode 100644 index 0000000000..a2b3fc2eef --- /dev/null +++ b/tools/nodejs_api/test/test_register_stream.js @@ -0,0 +1,81 @@ +const { assert } = require("chai"); + +describe("registerStream / LOAD FROM stream", function () { + it("should LOAD FROM registered stream and return rows", async function () { + async function* rowSource() { + yield [1, "a"]; + yield [2, "b"]; + yield [3, "c"]; + } + await conn.registerStream("mystream", rowSource(), { + columns: [ + { name: "id", type: "INT64" }, + { name: "label", type: "STRING" }, + ], + }); + try { + const result = await conn.query("LOAD FROM mystream RETURN *"); + const rows = Array.isArray(result) ? result : [result]; + assert.isAtLeast(rows.length, 1); + const r = rows[0]; + assert.isTrue(r.hasNext()); + const row1 = await r.getNext(); + assert.exists(row1); + assert.equal(row1["id"], 1); + assert.equal(row1["label"], "a"); + const row2 = await r.getNext(); + assert.equal(row2["id"], 2); + assert.equal(row2["label"], "b"); + const row3 = await r.getNext(); + assert.equal(row3["id"], 3); + assert.equal(row3["label"], "c"); + assert.isNull(await r.getNext()); + } finally { + conn.unregisterStream("mystream"); + } + }); + + it("should LOAD FROM stream with object rows (column order from schema)", async function () { + async function* objectRowSource() { + yield { id: 10, label: "x" }; + yield { label: "y", id: 20 }; + } + await conn.registerStream("objstream", objectRowSource(), { + columns: [ + { name: "id", type: "INT64" }, + { name: "label", type: "STRING" }, + ], + }); + try { + const result = await conn.query("LOAD FROM objstream RETURN *"); + const r = Array.isArray(result) ? result[0] : result; + const row1 = await r.getNext(); + assert.isNotNull(row1, "expected first row from stream"); + assert.equal(row1["id"], 10); + assert.equal(row1["label"], "x"); + const row2 = await r.getNext(); + assert.isNotNull(row2, "expected second row from stream"); + assert.equal(row2["id"], 20); + assert.equal(row2["label"], "y"); + assert.isNull(await r.getNext()); + } finally { + conn.unregisterStream("objstream"); + } + }); + + it("should unregisterStream by name", async function () { + async function* empty() { + if (false) yield []; + } + await conn.registerStream("tmpstream", empty(), { + columns: [{ name: "x", type: "INT64" }], + }); + conn.unregisterStream("tmpstream"); + try { + await conn.query("LOAD FROM tmpstream RETURN *"); + assert.fail("Expected error when loading from unregistered stream."); + } catch (e) { + assert.include(e.message, "not in scope"); + } + }); +}); diff --git a/tools/nodejs_api/test/test_resilience.js b/tools/nodejs_api/test/test_resilience.js new file mode 100644 index 0000000000..a24ea05c5d --- /dev/null +++ b/tools/nodejs_api/test/test_resilience.js @@ -0,0 +1,190 @@ +"use strict"; + +const { assert } = require("chai"); +const tmp = require("tmp"); +const path = require("path"); + +/** + * Resilience tests: close connection/database during or after operations. + * Goal: no crashes (SIGSEGV, native abort); all failures must surface as JS errors. + */ +function withTempDb(fn) { + return async function () { + const tmpPath = await new Promise((resolve, reject) => { + tmp.dir({ unsafeCleanup: true }, (err, p, _) => { + if (err) return reject(err); + return resolve(p); + }); + }); + const dbPath = path.join(tmpPath, "db.kz"); + const testDb = new lbug.Database(dbPath, 1 << 26 /* 64MB */); + await testDb.init(); + const testConn = new lbug.Connection(testDb); + await testConn.init(); + try { + await fn.call(this, testDb, testConn); + } finally { + if (!testDb._isClosed) await testDb.close().catch(() => {}); + if (!testConn._isClosed) await testConn.close().catch(() => {}); + } + }; +} + +describe("Resilience (close during/after use)", function () { + this.timeout(10000); + + it("query rejects when connection is closed while query is in flight", withTempDb(async (testDb, testConn) => { + const longQuery = "UNWIND range(1, 20000) AS x UNWIND range(1, 2000) AS y RETURN count(*)"; + const queryPromise = testConn.query(longQuery); + await new Promise((r) => setTimeout(r, 80)); + testConn.closeSync(); + const timeoutMs = 2000; + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(`Expected query to reject within ${timeoutMs}ms when connection was closed (timed out).`)), timeoutMs); + }); + try { + await Promise.race([queryPromise, timeoutPromise]); + assert.fail("Expected query to reject when connection was closed during execution."); + } catch (err) { + if ((err.message || "").includes("timed out")) throw err; + assert.instanceOf(err, Error); + const msg = (err.message || "").toLowerCase(); + const ok = msg.includes("closed") || msg.includes("not allowed") || msg.includes("runtime"); + assert.isTrue(ok, `Expected error about closed/not allowed, got: ${err.message}`); + } + })); + + // Database close is synchronous and blocks until in-flight work completes (core behavior). + // So we cannot observe "query rejects when database is closed" without a non-blocking close. + it.skip("query rejects when database is closed while query is in flight", withTempDb(async (testDb, testConn) => { + const longQuery = "UNWIND range(1, 20000) AS x UNWIND range(1, 2000) AS y RETURN count(*)"; + const queryPromise = testConn.query(longQuery); + await new Promise((r) => setTimeout(r, 120)); + testDb.closeSync(); + const timeoutMs = 5000; + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(`Expected query to reject within ${timeoutMs}ms when database was closed (timed out).`)), timeoutMs); + }); + try { + await Promise.race([queryPromise, timeoutPromise]); + assert.fail("Expected query to reject when database was closed during execution."); + } catch (err) { + if ((err.message || "").includes("timed out")) throw err; + assert.instanceOf(err, Error); + const msg = (err.message || "").toLowerCase(); + const ok = msg.includes("closed") || msg.includes("not allowed") || msg.includes("runtime"); + assert.isTrue(ok, `Expected error about closed/not allowed, got: ${err.message}`); + } + })); + + it("getNext() after connection closed throws and does not crash", withTempDb(async (testDb, testConn) => { + const res = await testConn.query("RETURN 1 AS x"); + const row = await res.getNext(); + assert.equal(row.x, 1); + testConn.closeSync(); + try { + await res.getNext(); + assert.fail("Expected getNext() to throw after connection closed."); + } catch (err) { + assert.instanceOf(err, Error); + const msg = (err.message || "").toLowerCase(); + assert.isTrue(msg.includes("closed") || msg.includes("not allowed"), `Expected closed/not allowed, got: ${err.message}`); + } + })); + + it("hasNext() after connection closed throws and does not crash", withTempDb(async (testDb, testConn) => { + const res = await testConn.query("RETURN 1 AS x"); + assert.isTrue(res.hasNext()); + testConn.closeSync(); + try { + res.hasNext(); + assert.fail("Expected hasNext() to throw after connection closed."); + } catch (err) { + assert.instanceOf(err, Error); + const msg = (err.message || "").toLowerCase(); + assert.isTrue(msg.includes("closed") || msg.includes("not allowed"), `Expected closed/not allowed, got: ${err.message}`); + } + })); + + it("getNext() after database closed throws and does not crash", withTempDb(async (testDb, testConn) => { + const res = await testConn.query("RETURN 1 AS x"); + await res.getNext(); + testDb.closeSync(); + try { + await res.getNext(); + assert.fail("Expected getNext() to throw after database closed."); + } catch (err) { + assert.instanceOf(err, Error); + const msg = (err.message || "").toLowerCase(); + assert.isTrue(msg.includes("closed") || msg.includes("not allowed"), `Expected closed/not allowed, got: ${err.message}`); + } + })); + + it("hasNext() after database closed throws and does not crash", withTempDb(async (testDb, testConn) => { + const res = await testConn.query("RETURN 1 AS x"); + testDb.closeSync(); + try { + res.hasNext(); + assert.fail("Expected hasNext() to throw after database closed."); + } catch (err) { + assert.instanceOf(err, Error); + const msg = (err.message || "").toLowerCase(); + assert.isTrue(msg.includes("closed") || msg.includes("not allowed"), `Expected closed/not allowed, got: ${err.message}`); + } + })); + + it("registerStream then close connection then query throws before running", withTempDb(async (testDb, testConn) => { + async function* gen() { + yield [1]; + } + await testConn.registerStream("s", gen(), { columns: [{ name: "x", type: "INT64" }] }); + testConn.closeSync(); + try { + await testConn.query("LOAD FROM s RETURN *"); + assert.fail("Expected query to throw when connection is already closed."); + } catch (err) { + assert.instanceOf(err, Error); + assert.include((err.message || "").toLowerCase(), "closed"); + } + })); + + it("close connection while iterating result: second getNext throws", withTempDb(async (testDb, testConn) => { + const res = await testConn.query("UNWIND [1,2,3] AS x RETURN x"); + const a = await res.getNext(); + assert.equal(a.x, 1); + testConn.closeSync(); + try { + await res.getNext(); + assert.fail("Expected getNext() to throw after connection closed mid-iteration."); + } catch (err) { + assert.instanceOf(err, Error); + const msg = (err.message || "").toLowerCase(); + assert.isTrue(msg.includes("closed") || msg.includes("not allowed"), `Expected closed/not allowed, got: ${err.message}`); + } + })); + + it("query after connection closed throws immediately (no native call)", async function () { + const testConn = new lbug.Connection(db); + await testConn.init(); + await testConn.close(); + try { + await testConn.query("RETURN 1"); + assert.fail("Expected query to throw when connection is closed."); + } catch (err) { + assert.equal(err.message, "Connection is closed."); + } + }); + + it("getNextSync after database closed throws", withTempDb(async (testDb, testConn) => { + const res = await testConn.query("RETURN 1 AS x"); + testDb.closeSync(); + try { + res.getNextSync(); + assert.fail("Expected getNextSync() to throw after database closed."); + } catch (err) { + assert.instanceOf(err, Error); + const msg = (err.message || "").toLowerCase(); + assert.isTrue(msg.includes("closed") || msg.includes("not allowed"), `Expected closed/not allowed, got: ${err.message}`); + } + })); +});