diff --git a/.gitignore b/.gitignore index 5a84aa4078..fa1b3163c6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,9 @@ +tree-sitter/ +tree-sitter-cypher/ + .idea/ .vscode +.cursor .vs bazel-* .clwb diff --git a/tools/nodejs_api/README.md b/tools/nodejs_api/README.md index 086c63eb2f..95978ea6af 100644 --- a/tools/nodejs_api/README.md +++ b/tools/nodejs_api/README.md @@ -49,10 +49,8 @@ const main = async () => { // Run a query const result = await conn.query("MATCH (u:User) RETURN u.name, u.age;"); - // Fetch all results + // Consume results (choose one style) const rows = await result.getAll(); - - // Output results for (const row of rows) { console.log(row); } @@ -68,12 +66,61 @@ main().catch(console.error); The `lbug` package exposes the following primary classes: -* `Database` – Initializes a database from a file path. -* `Connection` – Executes queries on a connected database. -* `QueryResult` – Provides methods like `getAll()` to retrieve results. +* **Database** – `new Database(path, bufferPoolSize?, ...)`. Initialize with `init()` / `initSync()` (optional; done on first use). Close with `close()`. +* **Connection** – `new Connection(database, numThreads?)`. Run Cypher with `query(statement)` or `prepare(statement)` then `execute(preparedStatement, params)`. Use `transaction(fn)` for a single write transaction, `ping()` for liveness checks. Configure with `setQueryTimeout(ms)`, `setMaxNumThreadForExec(n)`. +* **QueryResult** – Returned by `query()` / `execute()`. Consume with `getAll()`, `getNext()` / `hasNext()`, **async iteration** (`for await...of`), or **`toStream()`** (Node.js `Readable`). Metadata: `getColumnNames()`, `getColumnDataTypes()`, `getQuerySummary()`. Call `close()` when done (optional if fully consumed). +* **PreparedStatement** – Created by `conn.prepare(statement)`. Execute with `conn.execute(preparedStatement, params)`. Reuse for parameterized queries. Both CommonJS (`require`) and ES Modules (`import`) are fully supported. +### Consuming query results + +```js +const result = await conn.query("MATCH (n:User) RETURN n.name LIMIT 1000"); + +// Option 1: get all rows (loads into memory) +const rows = await result.getAll(); + +// Option 2: row by row (async) +while (result.hasNext()) { + const row = await result.getNext(); + console.log(row); +} + +// Option 3: async iterator (streaming, no full materialization) +for await (const row of result) { + console.log(row); +} + +// Option 4: Node.js Readable stream (e.g. for .pipe()) +const stream = result.toStream(); +stream.on("data", (row) => console.log(row)); +``` + +### Transactions + +**Manual:** Run `BEGIN TRANSACTION`, then your queries, then `COMMIT` or `ROLLBACK`. On error, call `ROLLBACK` before continuing. + +```js +await conn.query("BEGIN TRANSACTION"); +await conn.query("CREATE NODE TABLE Nodes(id INT64, PRIMARY KEY(id))"); +await conn.query('COPY Nodes FROM "data.csv"'); +await conn.query("COMMIT"); +// or on error: await conn.query("ROLLBACK"); +``` + +**Read-only transaction:** `BEGIN TRANSACTION READ ONLY` then queries, then `COMMIT` / `ROLLBACK`. + +**Wrapper:** One write transaction with automatic commit on success and rollback on throw: + +```js +await conn.transaction(async () => { + await conn.query("CREATE NODE TABLE Nodes(id INT64, PRIMARY KEY(id))"); + await conn.query('COPY Nodes FROM "data.csv"'); + // commit happens automatically; on throw, rollback then rethrow +}); +``` + --- ## 🛠️ Local Development (for Contributors) diff --git a/tools/nodejs_api/build.js b/tools/nodejs_api/build.js index d326300260..4eca2b526a 100644 --- a/tools/nodejs_api/build.js +++ b/tools/nodejs_api/build.js @@ -3,6 +3,9 @@ const path = require("path"); const { execSync } = require("child_process"); const SRC_PATH = path.resolve(__dirname, "../.."); +const NODEJS_API = path.resolve(__dirname, "."); +const BUILD_DIR = path.join(NODEJS_API, "build"); +const SRC_JS_DIR = path.join(NODEJS_API, "src_js"); const THREADS = require("os").cpus().length; console.log(`Using ${THREADS} threads to build Lbug.`); @@ -12,3 +15,14 @@ execSync(`make nodejs NUM_THREADS=${THREADS}`, { cwd: SRC_PATH, stdio: "inherit", }); + +// Ensure build/ has latest JS from src_js (CMake copies at configure time only) +if (fs.existsSync(SRC_JS_DIR) && fs.existsSync(BUILD_DIR)) { + const files = fs.readdirSync(SRC_JS_DIR); + for (const name of files) { + if (name.endsWith(".js") || name.endsWith(".mjs") || name.endsWith(".d.ts")) { + fs.copyFileSync(path.join(SRC_JS_DIR, name), path.join(BUILD_DIR, name)); + } + } + console.log("Copied src_js to build."); +} diff --git a/tools/nodejs_api/src_cpp/include/node_connection.h b/tools/nodejs_api/src_cpp/include/node_connection.h index caacef92c3..ad9ddcb90a 100644 --- a/tools/nodejs_api/src_cpp/include/node_connection.h +++ b/tools/nodejs_api/src_cpp/include/node_connection.h @@ -30,6 +30,7 @@ class NodeConnection : public Napi::ObjectWrap { void InitCppConnection(); void SetMaxNumThreadForExec(const Napi::CallbackInfo& info); void SetQueryTimeout(const Napi::CallbackInfo& info); + void Interrupt(const Napi::CallbackInfo& info); Napi::Value ExecuteAsync(const Napi::CallbackInfo& info); Napi::Value QueryAsync(const Napi::CallbackInfo& info); Napi::Value ExecuteSync(const Napi::CallbackInfo& info); diff --git a/tools/nodejs_api/src_cpp/include/node_progress_bar_display.h b/tools/nodejs_api/src_cpp/include/node_progress_bar_display.h index 7813820cb0..e777c94af5 100644 --- a/tools/nodejs_api/src_cpp/include/node_progress_bar_display.h +++ b/tools/nodejs_api/src_cpp/include/node_progress_bar_display.h @@ -13,6 +13,8 @@ using namespace common; */ class NodeProgressBarDisplay : public ProgressBarDisplay { public: + ~NodeProgressBarDisplay() override; + void updateProgress(uint64_t queryID, double newPipelineProgress, uint32_t newNumPipelinesFinished) override; diff --git a/tools/nodejs_api/src_cpp/node_connection.cpp b/tools/nodejs_api/src_cpp/node_connection.cpp index 404903904f..278eedeefd 100644 --- a/tools/nodejs_api/src_cpp/node_connection.cpp +++ b/tools/nodejs_api/src_cpp/node_connection.cpp @@ -19,6 +19,7 @@ Napi::Object NodeConnection::Init(Napi::Env env, Napi::Object exports) { InstanceMethod("querySync", &NodeConnection::QuerySync), InstanceMethod("setMaxNumThreadForExec", &NodeConnection::SetMaxNumThreadForExec), InstanceMethod("setQueryTimeout", &NodeConnection::SetQueryTimeout), + InstanceMethod("interrupt", &NodeConnection::Interrupt), InstanceMethod("close", &NodeConnection::Close)}); exports.Set("NodeConnection", t); @@ -83,6 +84,12 @@ void NodeConnection::SetQueryTimeout(const Napi::CallbackInfo& info) { } } +void NodeConnection::Interrupt(const Napi::CallbackInfo& info) { + if (this->connection) { + this->connection->interrupt(); + } +} + void NodeConnection::Close(const Napi::CallbackInfo& info) { Napi::Env env = info.Env(); Napi::HandleScope scope(env); diff --git a/tools/nodejs_api/src_cpp/node_progress_bar_display.cpp b/tools/nodejs_api/src_cpp/node_progress_bar_display.cpp index 352775624a..a76dbd953e 100644 --- a/tools/nodejs_api/src_cpp/node_progress_bar_display.cpp +++ b/tools/nodejs_api/src_cpp/node_progress_bar_display.cpp @@ -3,6 +3,14 @@ using namespace lbug; using namespace common; +NodeProgressBarDisplay::~NodeProgressBarDisplay() { + std::unique_lock lock(callbackMutex); + for (auto& kv : queryCallbacks) { + kv.second.Release(); + } + queryCallbacks.clear(); +} + void NodeProgressBarDisplay::updateProgress(uint64_t queryID, double newPipelineProgress, uint32_t newNumPipelinesFinished) { if (numPipelines == 0) { diff --git a/tools/nodejs_api/src_js/connection.js b/tools/nodejs_api/src_js/connection.js index 575d9f27ff..59baa70a34 100644 --- a/tools/nodejs_api/src_js/connection.js +++ b/tools/nodejs_api/src_js/connection.js @@ -121,10 +121,11 @@ class Connection { * Execute a prepared statement with the given parameters. * @param {lbug.PreparedStatement} preparedStatement the prepared statement to execute. * @param {Object} params a plain object mapping parameter names to values. - * @param {Function} [progressCallback] - Optional callback function that is invoked with the progress of the query execution. The callback receives three arguments: pipelineProgress, numPipelinesFinished, and numPipelines. - * @returns {Promise} a promise that resolves to the query result. The promise is rejected if there is an error. + * @param {Object|Function} [optionsOrProgressCallback] - Options { signal?: AbortSignal, progressCallback?: Function } or legacy progress callback. + * @returns {Promise} a promise that resolves to the query result. Rejects if error or options.signal is aborted. */ - execute(preparedStatement, params = {}, progressCallback) { + execute(preparedStatement, params = {}, optionsOrProgressCallback) { + const { signal, progressCallback } = this._normalizeQueryOptions(optionsOrProgressCallback); return new Promise((resolve, reject) => { if ( !typeof preparedStatement === "object" || @@ -150,8 +151,29 @@ class Connection { if (progressCallback && typeof progressCallback !== "function") { return reject(new Error("progressCallback must be a function.")); } + if (signal?.aborted) { + return reject(this._createAbortError()); + } + let abortListener; + const cleanup = () => { + if (signal && abortListener) { + signal.removeEventListener("abort", abortListener); + } + }; + if (signal) { + abortListener = () => { + this.interrupt(); + cleanup(); + reject(this._createAbortError()); + }; + signal.addEventListener("abort", abortListener); + } this._getConnection() .then((connection) => { + if (signal?.aborted) { + cleanup(); + return reject(this._createAbortError()); + } const nodeQueryResult = new LbugNative.NodeQueryResult(); try { connection.executeAsync( @@ -159,7 +181,11 @@ class Connection { nodeQueryResult, paramArray, (err) => { + cleanup(); if (err) { + if (signal?.aborted && err.message === "Interrupted.") { + return reject(this._createAbortError()); + } return reject(err); } this._unwrapMultipleQueryResults(nodeQueryResult) @@ -173,10 +199,12 @@ class Connection { progressCallback ); } catch (e) { + cleanup(); return reject(e); } }) .catch((err) => { + cleanup(); return reject(err); }); }); @@ -261,26 +289,59 @@ class Connection { return new PreparedStatement(this, preparedStatement); } + /** + * Interrupt the currently executing query on this connection. + * No-op if the connection is not initialized or no query is running. + */ + interrupt() { + if (this._connection) { + this._connection.interrupt(); + } + } + /** * Execute a query. * @param {String} statement the statement to execute. - * @param {Function} [progressCallback] - Optional callback function that is invoked with the progress of the query execution. The callback receives three arguments: pipelineProgress, numPipelinesFinished, and numPipelines. - * @returns {Promise} a promise that resolves to the query result. The promise is rejected if there is an error. + * @param {Object|Function} [optionsOrProgressCallback] - Options object { signal?: AbortSignal, progressCallback?: Function } or legacy progress callback. + * @returns {Promise} a promise that resolves to the query result. The promise is rejected if there is an error or if options.signal is aborted. */ - query(statement, progressCallback) { + query(statement, optionsOrProgressCallback) { + const { signal, progressCallback } = this._normalizeQueryOptions(optionsOrProgressCallback); return new Promise((resolve, reject) => { if (typeof statement !== "string") { return reject(new Error("statement must be a string.")); } - if (progressCallback && typeof progressCallback !== "function") { - return reject(new Error("progressCallback must be a function.")); + if (signal?.aborted) { + return reject(this._createAbortError()); + } + let abortListener; + const cleanup = () => { + if (signal && abortListener) { + signal.removeEventListener("abort", abortListener); + } + }; + if (signal) { + abortListener = () => { + this.interrupt(); + cleanup(); + reject(this._createAbortError()); + }; + signal.addEventListener("abort", abortListener); } this._getConnection() .then((connection) => { + if (signal?.aborted) { + cleanup(); + return reject(this._createAbortError()); + } const nodeQueryResult = new LbugNative.NodeQueryResult(); try { connection.queryAsync(statement, nodeQueryResult, (err) => { + cleanup(); if (err) { + if (signal?.aborted && err.message === "Interrupted.") { + return reject(this._createAbortError()); + } return reject(err); } this._unwrapMultipleQueryResults(nodeQueryResult) @@ -293,15 +354,55 @@ class Connection { }, progressCallback); } catch (e) { + cleanup(); return reject(e); } }) .catch((err) => { + cleanup(); return reject(err); }); }); } + _normalizeQueryOptions(optionsOrProgressCallback) { + if (optionsOrProgressCallback == null) { + return { signal: undefined, progressCallback: undefined }; + } + if (typeof optionsOrProgressCallback === "function") { + return { signal: undefined, progressCallback: optionsOrProgressCallback }; + } + if (typeof optionsOrProgressCallback === "object" && optionsOrProgressCallback !== null) { + return { + signal: optionsOrProgressCallback.signal, + progressCallback: optionsOrProgressCallback.progressCallback, + }; + } + return { signal: undefined, progressCallback: undefined }; + } + + _createAbortError() { + return new DOMException("The operation was aborted.", "AbortError"); + } + + /** + * Check that the connection is alive (e.g. for connection pools or health checks). + * Runs a trivial query; rejects if the connection is broken. + * @returns {Promise} resolves to true if the connection is OK. + */ + async ping() { + const result = await this.query("RETURN 1"); + const closeResult = (r) => { + if (Array.isArray(r)) { + r.forEach((q) => q.close()); + } else { + r.close(); + } + }; + closeResult(result); + return true; + } + /** * Execute a query synchronously. * @param {String} statement the statement to execute. This function blocks the main thread for the duration of the query, so use it with caution. @@ -396,6 +497,37 @@ class Connection { } } + /** + * Run a function inside a single write transaction. On success commits, on throw rolls back and rethrows. + * Uses Cypher BEGIN TRANSACTION / COMMIT / ROLLBACK under the hood. + * @param {Function} fn async function to run; can use this connection's query/execute inside. + * @returns {Promise<*>} the value returned by fn. + */ + async transaction(fn) { + if (typeof fn !== "function") { + throw new Error("transaction() requires a function."); + } + const closeResult = (r) => { + if (Array.isArray(r)) { + r.forEach((q) => q.close()); + } else { + r.close(); + } + }; + const beginRes = await this.query("BEGIN TRANSACTION"); + closeResult(beginRes); + try { + const result = await fn(); + const commitRes = await this.query("COMMIT"); + closeResult(commitRes); + return result; + } catch (e) { + const rollbackRes = await this.query("ROLLBACK"); + closeResult(rollbackRes); + throw e; + } + } + /** * Set the timeout for queries. Queries that take longer than the timeout * will be aborted. diff --git a/tools/nodejs_api/src_js/lbug.d.ts b/tools/nodejs_api/src_js/lbug.d.ts index 977be1b157..3e820c0132 100644 --- a/tools/nodejs_api/src_js/lbug.d.ts +++ b/tools/nodejs_api/src_js/lbug.d.ts @@ -22,6 +22,15 @@ export type ProgressCallback = ( numPipelines: number ) => void; +/** + * Options for query() and execute(). + * Use signal to cancel the operation via AbortController. + */ +export interface QueryOptions { + signal?: AbortSignal; + progressCallback?: ProgressCallback; +} + /** * Represents a node ID in the graph database. */ @@ -117,6 +126,8 @@ export class Database { * @param maxDBSize Maximum size of the database in bytes * @param autoCheckpoint Whether to enable automatic checkpoints * @param checkpointThreshold Threshold for automatic checkpoints + * @param throwOnWalReplayFailure If true, WAL replay failures throw; otherwise replay stops at error + * @param enableChecksums If true, use checksums to detect WAL corruption */ constructor( databasePath?: string, @@ -125,7 +136,9 @@ export class Database { readOnly?: boolean, maxDBSize?: number, autoCheckpoint?: boolean, - checkpointThreshold?: number + checkpointThreshold?: number, + throwOnWalReplayFailure?: boolean, + enableChecksums?: boolean ); /** @@ -200,6 +213,12 @@ export class Connection { */ setQueryTimeout(timeoutInMs: number): void; + /** + * Interrupt the currently executing query on this connection. + * No-op if the connection is not initialized or no query is running. + */ + interrupt(): void; + /** * Close the connection. * @returns Promise that resolves when connection is closed @@ -215,13 +234,13 @@ export class Connection { * Execute a prepared statement. * @param preparedStatement The prepared statement to execute * @param params Parameters for the query as a plain object - * @param progressCallback Optional progress callback - * @returns Promise that resolves to the query result(s) + * @param optionsOrProgressCallback Options (e.g. signal for abort) or legacy progress callback + * @returns Promise that resolves to the query result(s). Rejects with DOMException AbortError if signal is aborted. */ execute( preparedStatement: PreparedStatement, params?: Record, - progressCallback?: ProgressCallback + optionsOrProgressCallback?: QueryOptions | ProgressCallback ): Promise; /** @@ -252,20 +271,33 @@ export class Connection { /** * Execute a query. * @param statement The statement to execute - * @param progressCallback Optional progress callback - * @returns Promise that resolves to the query result(s) + * @param optionsOrProgressCallback Options (e.g. signal for abort) or legacy progress callback + * @returns Promise that resolves to the query result(s). Rejects with DOMException AbortError if signal is aborted. */ query( statement: string, - progressCallback?: ProgressCallback + optionsOrProgressCallback?: QueryOptions | ProgressCallback ): Promise; + /** + * Run a function inside a single write transaction. Commits on success, rolls back on throw. + * @param fn Async function that can use this connection's query/execute + * @returns Promise that resolves to the return value of fn + */ + transaction(fn: () => Promise): Promise; + /** * Execute a query synchronously. * @param statement The statement to execute * @returns The query result(s) */ querySync(statement: string): QueryResult | QueryResult[]; + + /** + * Check that the connection is alive (e.g. for pools or health checks). + * @returns Promise that resolves to true if OK, rejects if connection is broken + */ + ping(): Promise; } /** @@ -289,8 +321,14 @@ export class PreparedStatement { /** * Represents the results of a query execution. * Note: This class is created internally by Connection query methods. + * Supports async iteration: for await (const row of result) { ... } */ -export class QueryResult { +export class QueryResult implements AsyncIterable | null> { + /** + * Async iterator for row-by-row consumption (for await...of). + */ + [Symbol.asyncIterator](): AsyncIterator | null>; + /** * Reset the iterator for reading results. */ @@ -320,6 +358,12 @@ export class QueryResult { */ getNextSync(): Record | null; + /** + * Return a Node.js Readable stream (object mode) that yields one row per chunk. + * @returns Readable stream of row objects + */ + toStream(): import("stream").Readable; + /** * Iterate through the query result with callback functions. * @param resultCallback Callback function called for each row diff --git a/tools/nodejs_api/src_js/query_result.js b/tools/nodejs_api/src_js/query_result.js index 944f23db9c..6f6ec88e81 100644 --- a/tools/nodejs_api/src_js/query_result.js +++ b/tools/nodejs_api/src_js/query_result.js @@ -1,6 +1,7 @@ "use strict"; const assert = require("assert"); +const { Readable } = require("stream"); class QueryResult { /** @@ -96,6 +97,64 @@ class QueryResult { }); } + /** + * Async iterator for consuming the result row-by-row (e.g. `for await (const row of result)`). + * Does not materialize the full result in memory. + * @returns {AsyncIterator} + */ + [Symbol.asyncIterator]() { + const self = this; + return { + async next() { + self._checkClosed(); + if (!self.hasNext()) { + return { done: true }; + } + try { + const value = await self.getNext(); + if (value === null) { + return { done: true }; + } + return { value, done: false }; + } catch (err) { + return Promise.reject(err); + } + }, + }; + } + + /** + * Return a Node.js Readable stream (object mode) that yields one row per chunk. + * Useful for piping or integrating with stream consumers. Does not require native changes. + * @returns {stream.Readable} Readable stream of row objects. + */ + toStream() { + const self = this; + return new Readable({ + objectMode: true, + read() { + if (self._isClosed) { + return this.push(null); + } + if (!self.hasNext()) { + return this.push(null); + } + self.getNext() + .then((row) => { + if (row !== null && row !== undefined) { + this.push(row); + } + if (!self.hasNext()) { + this.push(null); + } + }) + .catch((err) => { + this.destroy(err); + }); + }, + }); + } + /** * Get all rows of the query result. * @returns {Promise>} a promise that resolves to all rows of the query result. The promise is rejected if there is an error. diff --git a/tools/nodejs_api/test/test_connection.js b/tools/nodejs_api/test/test_connection.js index 34345ad26a..2b7ce11729 100644 --- a/tools/nodejs_api/test/test_connection.js +++ b/tools/nodejs_api/test/test_connection.js @@ -122,6 +122,50 @@ describe("Execute", function () { }); }); +describe("ping", function () { + it("should resolve to true when connection is alive", async function () { + const ok = await conn.ping(); + assert.strictEqual(ok, true); + }); +}); + +describe("transaction", function () { + it("should commit and return fn result on success", async function () { + const result = await conn.transaction(async () => { + const q = await conn.query("RETURN 42 AS x"); + const rows = await q.getAll(); + q.close(); + return rows[0].x; + }); + assert.equal(result, 42); + }); + + it("should rollback and rethrow on fn error", async function () { + const err = new Error("tx abort"); + try { + await conn.transaction(async () => { + await conn.query("RETURN 1"); + throw err; + }); + assert.fail("transaction should have thrown"); + } catch (e) { + assert.strictEqual(e, err); + } + const q = await conn.query("RETURN 1"); + assert.isTrue(q.hasNext()); + q.close(); + }); + + it("should reject non-function", async function () { + try { + await conn.transaction("not a function"); + assert.fail("transaction should have thrown"); + } catch (e) { + assert.equal(e.message, "transaction() requires a function."); + } + }); +}); + describe("Query", function () { it("should run a valid query", async function () { const queryResult = await conn.query("MATCH (a:person) RETURN COUNT(*)"); @@ -214,6 +258,66 @@ describe("Timeout", function () { }); }); +describe("Interrupt", function () { + it("should abort a long-running query when interrupt() is called", async function () { + const newConn = new lbug.Connection(db); + await newConn.init(); + const longQuery = + "UNWIND RANGE(1,100000) AS x UNWIND RANGE(1, 100000) AS y RETURN COUNT(x + y);"; + const queryPromise = newConn.query(longQuery); + setTimeout(() => newConn.interrupt(), 150); + try { + await queryPromise; + assert.fail("No error thrown when the query was interrupted."); + } catch (err) { + assert.equal(err.message, "Interrupted."); + } + }); +}); + +describe("AbortSignal", function () { + it("should reject with AbortError when signal is already aborted before query starts", async function () { + const ac = new AbortController(); + ac.abort(); + try { + await conn.query("RETURN 1", { signal: ac.signal }); + assert.fail("No error thrown when signal was already aborted."); + } catch (err) { + assert.equal(err.name, "AbortError"); + assert.equal(err.message, "The operation was aborted."); + } + }); + + it("should reject with AbortError when signal is aborted during query", async function () { + const newConn = new lbug.Connection(db); + await newConn.init(); + const ac = new AbortController(); + const longQuery = + "UNWIND RANGE(1,100000) AS x UNWIND RANGE(1, 100000) AS y RETURN COUNT(x + y);"; + const queryPromise = newConn.query(longQuery, { signal: ac.signal }); + setTimeout(() => ac.abort(), 150); + try { + await queryPromise; + assert.fail("No error thrown when signal was aborted during query."); + } catch (err) { + assert.equal(err.name, "AbortError"); + } + }); + + it("should work with progressCallback in options object", async function () { + let progressCalled = false; + const result = await conn.query("RETURN 1", { + progressCallback: () => { + progressCalled = true; + }, + }); + assert.exists(result); + const rows = Array.isArray(result) ? result : [result]; + assert.isAtLeast(rows.length, 1); + rows.forEach((r) => r.close()); + }); +}); + describe("Close", function () { it("should close the connection", async function () { const newConn = new lbug.Connection(db); diff --git a/tools/nodejs_api/test/test_query_result.js b/tools/nodejs_api/test/test_query_result.js index 5f84b3f32d..dab58580b5 100644 --- a/tools/nodejs_api/test/test_query_result.js +++ b/tools/nodejs_api/test/test_query_result.js @@ -211,6 +211,46 @@ describe("Get query summary", function () { }); }); +describe("Async iterator (for await...of)", function () { + it("should iterate rows same as getNext", async function () { + const queryResult = await conn.query( + "MATCH (a:person) RETURN a.ID ORDER BY a.ID" + ); + const ids = []; + for await (const row of queryResult) { + ids.push(row["a.ID"]); + } + assert.deepEqual(ids, PERSON_IDS); + }); + + it("should not materialize full result in memory", async function () { + const queryResult = await conn.query( + "MATCH (a:person) RETURN a.ID ORDER BY a.ID" + ); + let count = 0; + for await (const row of queryResult) { + count++; + assert.equal(row["a.ID"], PERSON_IDS[count - 1]); + } + assert.equal(count, PERSON_IDS.length); + }); +}); + +describe("toStream", function () { + it("should yield rows as Readable stream", async function () { + const queryResult = await conn.query( + "MATCH (a:person) RETURN a.ID ORDER BY a.ID" + ); + const stream = queryResult.toStream(); + const rows = []; + for await (const row of stream) { + rows.push(row); + } + const ids = rows.map((r) => r["a.ID"]); + assert.deepEqual(ids, PERSON_IDS); + }); +}); + describe("Close", function () { it("should close the query result", async function () { const queryResult = await conn.query(