Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
tree-sitter/
tree-sitter-cypher/

.idea/
.vscode
.vs
Expand Down
59 changes: 53 additions & 6 deletions tools/nodejs_api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,8 @@ const main = async () => {
// Run a query
const result = await conn.query("MATCH (u:User) RETURN u.name, u.age;");

// Fetch all results
// Consume results (choose one style)
const rows = await result.getAll();

// Output results
for (const row of rows) {
console.log(row);
}
Expand All @@ -68,12 +66,61 @@ main().catch(console.error);

The `lbug` package exposes the following primary classes:

* `Database` – Initializes a database from a file path.
* `Connection` – Executes queries on a connected database.
* `QueryResult` – Provides methods like `getAll()` to retrieve results.
* **Database** – `new Database(path, bufferPoolSize?, ...)`. Initialize with `init()` / `initSync()` (optional; done on first use). Close with `close()`.
* **Connection** – `new Connection(database, numThreads?)`. Run Cypher with `query(statement)` or `prepare(statement)` then `execute(preparedStatement, params)`. Use `transaction(fn)` for a single write transaction, `ping()` for liveness checks. Configure with `setQueryTimeout(ms)`, `setMaxNumThreadForExec(n)`.
* **QueryResult** – Returned by `query()` / `execute()`. Consume with `getAll()`, `getNext()` / `hasNext()`, **async iteration** (`for await...of`), or **`toStream()`** (Node.js `Readable`). Metadata: `getColumnNames()`, `getColumnDataTypes()`, `getQuerySummary()`. Call `close()` when done (optional if fully consumed).
* **PreparedStatement** – Created by `conn.prepare(statement)`. Execute with `conn.execute(preparedStatement, params)`. Reuse for parameterized queries.

Both CommonJS (`require`) and ES Modules (`import`) are fully supported.

### Consuming query results

```js
const result = await conn.query("MATCH (n:User) RETURN n.name LIMIT 1000");

// Option 1: get all rows (loads into memory)
const rows = await result.getAll();

// Option 2: row by row (async)
while (result.hasNext()) {
const row = await result.getNext();
console.log(row);
}

// Option 3: async iterator (streaming, no full materialization)
for await (const row of result) {
console.log(row);
}

// Option 4: Node.js Readable stream (e.g. for .pipe())
const stream = result.toStream();
stream.on("data", (row) => console.log(row));
```

### Transactions

**Manual:** Run `BEGIN TRANSACTION`, then your queries, then `COMMIT` or `ROLLBACK`. On error, call `ROLLBACK` before continuing.

```js
await conn.query("BEGIN TRANSACTION");
await conn.query("CREATE NODE TABLE Nodes(id INT64, PRIMARY KEY(id))");
await conn.query('COPY Nodes FROM "data.csv"');
await conn.query("COMMIT");
// or on error: await conn.query("ROLLBACK");
```

**Read-only transaction:** `BEGIN TRANSACTION READ ONLY` then queries, then `COMMIT` / `ROLLBACK`.

**Wrapper:** One write transaction with automatic commit on success and rollback on throw:

```js
await conn.transaction(async () => {
await conn.query("CREATE NODE TABLE Nodes(id INT64, PRIMARY KEY(id))");
await conn.query('COPY Nodes FROM "data.csv"');
// commit happens automatically; on throw, rollback then rethrow
});
```

---

## 🛠️ Local Development (for Contributors)
Expand Down
2 changes: 2 additions & 0 deletions tools/nodejs_api/src_cpp/include/node_progress_bar_display.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ using namespace common;
*/
class NodeProgressBarDisplay : public ProgressBarDisplay {
public:
~NodeProgressBarDisplay() override;

void updateProgress(uint64_t queryID, double newPipelineProgress,
uint32_t newNumPipelinesFinished) override;

Expand Down
8 changes: 8 additions & 0 deletions tools/nodejs_api/src_cpp/node_progress_bar_display.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
using namespace lbug;
using namespace common;

NodeProgressBarDisplay::~NodeProgressBarDisplay() {
std::unique_lock<std::shared_mutex> lock(callbackMutex);
for (auto& kv : queryCallbacks) {
kv.second.Release();
}
queryCallbacks.clear();
}

void NodeProgressBarDisplay::updateProgress(uint64_t queryID, double newPipelineProgress,
uint32_t newNumPipelinesFinished) {
if (numPipelines == 0) {
Expand Down
49 changes: 49 additions & 0 deletions tools/nodejs_api/src_js/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,24 @@ class Connection {
});
}

/**
* Check that the connection is alive (e.g. for connection pools or health checks).
* Runs a trivial query; rejects if the connection is broken.
* @returns {Promise<boolean>} resolves to true if the connection is OK.
*/
async ping() {
const result = await this.query("RETURN 1");
const closeResult = (r) => {
if (Array.isArray(r)) {
r.forEach((q) => q.close());
} else {
r.close();
}
};
closeResult(result);
return true;
}

/**
* Execute a query synchronously.
* @param {String} statement the statement to execute. This function blocks the main thread for the duration of the query, so use it with caution.
Expand Down Expand Up @@ -396,6 +414,37 @@ class Connection {
}
}

/**
* Run a function inside a single write transaction. On success commits, on throw rolls back and rethrows.
* Uses Cypher BEGIN TRANSACTION / COMMIT / ROLLBACK under the hood.
* @param {Function} fn async function to run; can use this connection's query/execute inside.
* @returns {Promise<*>} the value returned by fn.
*/
async transaction(fn) {
if (typeof fn !== "function") {
throw new Error("transaction() requires a function.");
}
const closeResult = (r) => {
if (Array.isArray(r)) {
r.forEach((q) => q.close());
} else {
r.close();
}
};
const beginRes = await this.query("BEGIN TRANSACTION");
closeResult(beginRes);
try {
const result = await fn();
const commitRes = await this.query("COMMIT");
closeResult(commitRes);
return result;
} catch (e) {
const rollbackRes = await this.query("ROLLBACK");
closeResult(rollbackRes);
throw e;
}
}

/**
* Set the timeout for queries. Queries that take longer than the timeout
* will be aborted.
Expand Down
33 changes: 31 additions & 2 deletions tools/nodejs_api/src_js/lbug.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ export class Database {
* @param maxDBSize Maximum size of the database in bytes
* @param autoCheckpoint Whether to enable automatic checkpoints
* @param checkpointThreshold Threshold for automatic checkpoints
* @param throwOnWalReplayFailure If true, WAL replay failures throw; otherwise replay stops at error
* @param enableChecksums If true, use checksums to detect WAL corruption
*/
constructor(
databasePath?: string,
Expand All @@ -125,7 +127,9 @@ export class Database {
readOnly?: boolean,
maxDBSize?: number,
autoCheckpoint?: boolean,
checkpointThreshold?: number
checkpointThreshold?: number,
throwOnWalReplayFailure?: boolean,
enableChecksums?: boolean
);

/**
Expand Down Expand Up @@ -260,12 +264,25 @@ export class Connection {
progressCallback?: ProgressCallback
): Promise<QueryResult | QueryResult[]>;

/**
* Run a function inside a single write transaction. Commits on success, rolls back on throw.
* @param fn Async function that can use this connection's query/execute
* @returns Promise that resolves to the return value of fn
*/
transaction<T>(fn: () => Promise<T>): Promise<T>;

/**
* Execute a query synchronously.
* @param statement The statement to execute
* @returns The query result(s)
*/
querySync(statement: string): QueryResult | QueryResult[];

/**
* Check that the connection is alive (e.g. for pools or health checks).
* @returns Promise that resolves to true if OK, rejects if connection is broken
*/
ping(): Promise<boolean>;
}

/**
Expand All @@ -289,8 +306,14 @@ export class PreparedStatement {
/**
* Represents the results of a query execution.
* Note: This class is created internally by Connection query methods.
* Supports async iteration: for await (const row of result) { ... }
*/
export class QueryResult {
export class QueryResult implements AsyncIterable<Record<string, LbugValue> | null> {
/**
* Async iterator for row-by-row consumption (for await...of).
*/
[Symbol.asyncIterator](): AsyncIterator<Record<string, LbugValue> | null>;

/**
* Reset the iterator for reading results.
*/
Expand Down Expand Up @@ -320,6 +343,12 @@ export class QueryResult {
*/
getNextSync(): Record<string, LbugValue> | null;

/**
* Return a Node.js Readable stream (object mode) that yields one row per chunk.
* @returns Readable stream of row objects
*/
toStream(): import("stream").Readable;

/**
* Iterate through the query result with callback functions.
* @param resultCallback Callback function called for each row
Expand Down
59 changes: 59 additions & 0 deletions tools/nodejs_api/src_js/query_result.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"use strict";

const assert = require("assert");
const { Readable } = require("stream");

class QueryResult {
/**
Expand Down Expand Up @@ -96,6 +97,64 @@ class QueryResult {
});
}

/**
* Async iterator for consuming the result row-by-row (e.g. `for await (const row of result)`).
* Does not materialize the full result in memory.
* @returns {AsyncIterator<Object>}
*/
[Symbol.asyncIterator]() {
const self = this;
return {
async next() {
self._checkClosed();
if (!self.hasNext()) {
return { done: true };
}
try {
const value = await self.getNext();
if (value === null) {
return { done: true };
}
return { value, done: false };
} catch (err) {
return Promise.reject(err);
}
},
};
}

/**
* Return a Node.js Readable stream (object mode) that yields one row per chunk.
* Useful for piping or integrating with stream consumers. Does not require native changes.
* @returns {stream.Readable} Readable stream of row objects.
*/
toStream() {
const self = this;
return new Readable({
objectMode: true,
read() {
if (self._isClosed) {
return this.push(null);
}
if (!self.hasNext()) {
return this.push(null);
}
self.getNext()
.then((row) => {
if (row !== null && row !== undefined) {
this.push(row);
}
if (!self.hasNext()) {
this.push(null);
}
})
.catch((err) => {
this.destroy(err);
});
},
});
}

/**
* Get all rows of the query result.
* @returns {Promise<Array<Object>>} a promise that resolves to all rows of the query result. The promise is rejected if there is an error.
Expand Down
44 changes: 44 additions & 0 deletions tools/nodejs_api/test/test_connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,50 @@ describe("Execute", function () {
});
});

describe("ping", function () {
it("should resolve to true when connection is alive", async function () {
const ok = await conn.ping();
assert.strictEqual(ok, true);
});
});

describe("transaction", function () {
it("should commit and return fn result on success", async function () {
const result = await conn.transaction(async () => {
const q = await conn.query("RETURN 42 AS x");
const rows = await q.getAll();
q.close();
return rows[0].x;
});
assert.equal(result, 42);
});

it("should rollback and rethrow on fn error", async function () {
const err = new Error("tx abort");
try {
await conn.transaction(async () => {
await conn.query("RETURN 1");
throw err;
});
assert.fail("transaction should have thrown");
} catch (e) {
assert.strictEqual(e, err);
}
const q = await conn.query("RETURN 1");
assert.isTrue(q.hasNext());
q.close();
});

it("should reject non-function", async function () {
try {
await conn.transaction("not a function");
assert.fail("transaction should have thrown");
} catch (e) {
assert.equal(e.message, "transaction() requires a function.");
}
});
});

describe("Query", function () {
it("should run a valid query", async function () {
const queryResult = await conn.query("MATCH (a:person) RETURN COUNT(*)");
Expand Down
Loading