Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
tree-sitter/
tree-sitter-cypher/

.idea/
.vscode
.cursor
.vs
bazel-*
.clwb
Expand Down
59 changes: 53 additions & 6 deletions tools/nodejs_api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,8 @@ const main = async () => {
// Run a query
const result = await conn.query("MATCH (u:User) RETURN u.name, u.age;");

// Fetch all results
// Consume results (choose one style)
const rows = await result.getAll();

// Output results
for (const row of rows) {
console.log(row);
}
Expand All @@ -68,12 +66,61 @@ main().catch(console.error);

The `lbug` package exposes the following primary classes:

* `Database` – Initializes a database from a file path.
* `Connection` – Executes queries on a connected database.
* `QueryResult` – Provides methods like `getAll()` to retrieve results.
* **Database** – `new Database(path, bufferPoolSize?, ...)`. Initialize with `init()` / `initSync()` (optional; done on first use). Close with `close()`.
* **Connection** – `new Connection(database, numThreads?)`. Run Cypher with `query(statement)` or `prepare(statement)` then `execute(preparedStatement, params)`. Use `transaction(fn)` for a single write transaction, `ping()` for liveness checks. Configure with `setQueryTimeout(ms)`, `setMaxNumThreadForExec(n)`.
* **QueryResult** – Returned by `query()` / `execute()`. Consume with `getAll()`, `getNext()` / `hasNext()`, **async iteration** (`for await...of`), or **`toStream()`** (Node.js `Readable`). Metadata: `getColumnNames()`, `getColumnDataTypes()`, `getQuerySummary()`. Call `close()` when done (optional if fully consumed).
* **PreparedStatement** – Created by `conn.prepare(statement)`. Execute with `conn.execute(preparedStatement, params)`. Reuse for parameterized queries.

Both CommonJS (`require`) and ES Modules (`import`) are fully supported.

### Consuming query results

```js
const result = await conn.query("MATCH (n:User) RETURN n.name LIMIT 1000");

// Option 1: get all rows (loads into memory)
const rows = await result.getAll();

// Option 2: row by row (async)
while (result.hasNext()) {
const row = await result.getNext();
console.log(row);
}

// Option 3: async iterator (streaming, no full materialization)
for await (const row of result) {
console.log(row);
}

// Option 4: Node.js Readable stream (e.g. for .pipe())
const stream = result.toStream();
stream.on("data", (row) => console.log(row));
```

### Transactions

**Manual:** Run `BEGIN TRANSACTION`, then your queries, then `COMMIT` or `ROLLBACK`. On error, call `ROLLBACK` before continuing.

```js
await conn.query("BEGIN TRANSACTION");
await conn.query("CREATE NODE TABLE Nodes(id INT64, PRIMARY KEY(id))");
await conn.query('COPY Nodes FROM "data.csv"');
await conn.query("COMMIT");
// or on error: await conn.query("ROLLBACK");
```

**Read-only transaction:** `BEGIN TRANSACTION READ ONLY` then queries, then `COMMIT` / `ROLLBACK`.

**Wrapper:** One write transaction with automatic commit on success and rollback on throw:

```js
await conn.transaction(async () => {
await conn.query("CREATE NODE TABLE Nodes(id INT64, PRIMARY KEY(id))");
await conn.query('COPY Nodes FROM "data.csv"');
// commit happens automatically; on throw, rollback then rethrow
});
```

---

## 🛠️ Local Development (for Contributors)
Expand Down
14 changes: 14 additions & 0 deletions tools/nodejs_api/build.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ const path = require("path");
const { execSync } = require("child_process");

const SRC_PATH = path.resolve(__dirname, "../..");
const NODEJS_API = path.resolve(__dirname, ".");
const BUILD_DIR = path.join(NODEJS_API, "build");
const SRC_JS_DIR = path.join(NODEJS_API, "src_js");
const THREADS = require("os").cpus().length;

console.log(`Using ${THREADS} threads to build Lbug.`);
Expand All @@ -12,3 +15,14 @@ execSync(`make nodejs NUM_THREADS=${THREADS}`, {
cwd: SRC_PATH,
stdio: "inherit",
});

// Ensure build/ has latest JS from src_js (CMake copies at configure time only)
if (fs.existsSync(SRC_JS_DIR) && fs.existsSync(BUILD_DIR)) {
const files = fs.readdirSync(SRC_JS_DIR);
for (const name of files) {
if (name.endsWith(".js") || name.endsWith(".mjs") || name.endsWith(".d.ts")) {
fs.copyFileSync(path.join(SRC_JS_DIR, name), path.join(BUILD_DIR, name));
}
}
console.log("Copied src_js to build.");
}
1 change: 1 addition & 0 deletions tools/nodejs_api/src_cpp/include/node_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class NodeConnection : public Napi::ObjectWrap<NodeConnection> {
void InitCppConnection();
void SetMaxNumThreadForExec(const Napi::CallbackInfo& info);
void SetQueryTimeout(const Napi::CallbackInfo& info);
void Interrupt(const Napi::CallbackInfo& info);
Napi::Value ExecuteAsync(const Napi::CallbackInfo& info);
Napi::Value QueryAsync(const Napi::CallbackInfo& info);
Napi::Value ExecuteSync(const Napi::CallbackInfo& info);
Expand Down
2 changes: 2 additions & 0 deletions tools/nodejs_api/src_cpp/include/node_progress_bar_display.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ using namespace common;
*/
class NodeProgressBarDisplay : public ProgressBarDisplay {
public:
~NodeProgressBarDisplay() override;

void updateProgress(uint64_t queryID, double newPipelineProgress,
uint32_t newNumPipelinesFinished) override;

Expand Down
7 changes: 7 additions & 0 deletions tools/nodejs_api/src_cpp/node_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Napi::Object NodeConnection::Init(Napi::Env env, Napi::Object exports) {
InstanceMethod("querySync", &NodeConnection::QuerySync),
InstanceMethod("setMaxNumThreadForExec", &NodeConnection::SetMaxNumThreadForExec),
InstanceMethod("setQueryTimeout", &NodeConnection::SetQueryTimeout),
InstanceMethod("interrupt", &NodeConnection::Interrupt),
InstanceMethod("close", &NodeConnection::Close)});

exports.Set("NodeConnection", t);
Expand Down Expand Up @@ -83,6 +84,12 @@ void NodeConnection::SetQueryTimeout(const Napi::CallbackInfo& info) {
}
}

void NodeConnection::Interrupt(const Napi::CallbackInfo& info) {
if (this->connection) {
this->connection->interrupt();
}
}

void NodeConnection::Close(const Napi::CallbackInfo& info) {
Napi::Env env = info.Env();
Napi::HandleScope scope(env);
Expand Down
8 changes: 8 additions & 0 deletions tools/nodejs_api/src_cpp/node_progress_bar_display.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
using namespace lbug;
using namespace common;

NodeProgressBarDisplay::~NodeProgressBarDisplay() {
std::unique_lock<std::shared_mutex> lock(callbackMutex);
for (auto& kv : queryCallbacks) {
kv.second.Release();
}
queryCallbacks.clear();
}

void NodeProgressBarDisplay::updateProgress(uint64_t queryID, double newPipelineProgress,
uint32_t newNumPipelinesFinished) {
if (numPipelines == 0) {
Expand Down
148 changes: 140 additions & 8 deletions tools/nodejs_api/src_js/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,11 @@ class Connection {
* Execute a prepared statement with the given parameters.
* @param {lbug.PreparedStatement} preparedStatement the prepared statement to execute.
* @param {Object} params a plain object mapping parameter names to values.
* @param {Function} [progressCallback] - Optional callback function that is invoked with the progress of the query execution. The callback receives three arguments: pipelineProgress, numPipelinesFinished, and numPipelines.
* @returns {Promise<lbug.QueryResult>} a promise that resolves to the query result. The promise is rejected if there is an error.
* @param {Object|Function} [optionsOrProgressCallback] - Options { signal?: AbortSignal, progressCallback?: Function } or legacy progress callback.
* @returns {Promise<lbug.QueryResult>} a promise that resolves to the query result. Rejects if error or options.signal is aborted.
*/
execute(preparedStatement, params = {}, progressCallback) {
execute(preparedStatement, params = {}, optionsOrProgressCallback) {
const { signal, progressCallback } = this._normalizeQueryOptions(optionsOrProgressCallback);
return new Promise((resolve, reject) => {
if (
!typeof preparedStatement === "object" ||
Expand All @@ -150,16 +151,41 @@ class Connection {
if (progressCallback && typeof progressCallback !== "function") {
return reject(new Error("progressCallback must be a function."));
}
if (signal?.aborted) {
return reject(this._createAbortError());
}
let abortListener;
const cleanup = () => {
if (signal && abortListener) {
signal.removeEventListener("abort", abortListener);
}
};
if (signal) {
abortListener = () => {
this.interrupt();
cleanup();
reject(this._createAbortError());
};
signal.addEventListener("abort", abortListener);
}
this._getConnection()
.then((connection) => {
if (signal?.aborted) {
cleanup();
return reject(this._createAbortError());
}
const nodeQueryResult = new LbugNative.NodeQueryResult();
try {
connection.executeAsync(
preparedStatement._preparedStatement,
nodeQueryResult,
paramArray,
(err) => {
cleanup();
if (err) {
if (signal?.aborted && err.message === "Interrupted.") {
return reject(this._createAbortError());
}
return reject(err);
}
this._unwrapMultipleQueryResults(nodeQueryResult)
Expand All @@ -173,10 +199,12 @@ class Connection {
progressCallback
);
} catch (e) {
cleanup();
return reject(e);
}
})
.catch((err) => {
cleanup();
return reject(err);
});
});
Expand Down Expand Up @@ -261,26 +289,59 @@ class Connection {
return new PreparedStatement(this, preparedStatement);
}

/**
* Interrupt the currently executing query on this connection.
* No-op if the connection is not initialized or no query is running.
*/
interrupt() {
if (this._connection) {
this._connection.interrupt();
}
}

/**
* Execute a query.
* @param {String} statement the statement to execute.
* @param {Function} [progressCallback] - Optional callback function that is invoked with the progress of the query execution. The callback receives three arguments: pipelineProgress, numPipelinesFinished, and numPipelines.
* @returns {Promise<lbug.QueryResult>} a promise that resolves to the query result. The promise is rejected if there is an error.
* @param {Object|Function} [optionsOrProgressCallback] - Options object { signal?: AbortSignal, progressCallback?: Function } or legacy progress callback.
* @returns {Promise<lbug.QueryResult>} a promise that resolves to the query result. The promise is rejected if there is an error or if options.signal is aborted.
*/
query(statement, progressCallback) {
query(statement, optionsOrProgressCallback) {
const { signal, progressCallback } = this._normalizeQueryOptions(optionsOrProgressCallback);
return new Promise((resolve, reject) => {
if (typeof statement !== "string") {
return reject(new Error("statement must be a string."));
}
if (progressCallback && typeof progressCallback !== "function") {
return reject(new Error("progressCallback must be a function."));
if (signal?.aborted) {
return reject(this._createAbortError());
}
let abortListener;
const cleanup = () => {
if (signal && abortListener) {
signal.removeEventListener("abort", abortListener);
}
};
if (signal) {
abortListener = () => {
this.interrupt();
cleanup();
reject(this._createAbortError());
};
signal.addEventListener("abort", abortListener);
}
this._getConnection()
.then((connection) => {
if (signal?.aborted) {
cleanup();
return reject(this._createAbortError());
}
const nodeQueryResult = new LbugNative.NodeQueryResult();
try {
connection.queryAsync(statement, nodeQueryResult, (err) => {
cleanup();
if (err) {
if (signal?.aborted && err.message === "Interrupted.") {
return reject(this._createAbortError());
}
return reject(err);
}
this._unwrapMultipleQueryResults(nodeQueryResult)
Expand All @@ -293,15 +354,55 @@ class Connection {
},
progressCallback);
} catch (e) {
cleanup();
return reject(e);
}
})
.catch((err) => {
cleanup();
return reject(err);
});
});
}

_normalizeQueryOptions(optionsOrProgressCallback) {
if (optionsOrProgressCallback == null) {
return { signal: undefined, progressCallback: undefined };
}
if (typeof optionsOrProgressCallback === "function") {
return { signal: undefined, progressCallback: optionsOrProgressCallback };
}
if (typeof optionsOrProgressCallback === "object" && optionsOrProgressCallback !== null) {
return {
signal: optionsOrProgressCallback.signal,
progressCallback: optionsOrProgressCallback.progressCallback,
};
}
return { signal: undefined, progressCallback: undefined };
}

_createAbortError() {
return new DOMException("The operation was aborted.", "AbortError");
}

/**
* Check that the connection is alive (e.g. for connection pools or health checks).
* Runs a trivial query; rejects if the connection is broken.
* @returns {Promise<boolean>} resolves to true if the connection is OK.
*/
async ping() {
const result = await this.query("RETURN 1");
const closeResult = (r) => {
if (Array.isArray(r)) {
r.forEach((q) => q.close());
} else {
r.close();
}
};
closeResult(result);
return true;
}

/**
* Execute a query synchronously.
* @param {String} statement the statement to execute. This function blocks the main thread for the duration of the query, so use it with caution.
Expand Down Expand Up @@ -396,6 +497,37 @@ class Connection {
}
}

/**
* Run a function inside a single write transaction. On success commits, on throw rolls back and rethrows.
* Uses Cypher BEGIN TRANSACTION / COMMIT / ROLLBACK under the hood.
* @param {Function} fn async function to run; can use this connection's query/execute inside.
* @returns {Promise<*>} the value returned by fn.
*/
async transaction(fn) {
if (typeof fn !== "function") {
throw new Error("transaction() requires a function.");
}
const closeResult = (r) => {
if (Array.isArray(r)) {
r.forEach((q) => q.close());
} else {
r.close();
}
};
const beginRes = await this.query("BEGIN TRANSACTION");
closeResult(beginRes);
try {
const result = await fn();
const commitRes = await this.query("COMMIT");
closeResult(commitRes);
return result;
} catch (e) {
const rollbackRes = await this.query("ROLLBACK");
closeResult(rollbackRes);
throw e;
}
}

/**
* Set the timeout for queries. Queries that take longer than the timeout
* will be aborted.
Expand Down
Loading