Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
tree-sitter/
tree-sitter-cypher/

.idea/
.vscode
.cursor
.vs
bazel-*
.clwb
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ nodejs-deps:
cd tools/nodejs_api && npm install --include=dev

nodejstest: nodejs
cd tools/nodejs_api && npm test
cd tools/nodejs_api && node copy_src_to_build.js && npm test

nodejstest-deps: nodejs-deps nodejstest

Expand Down
109 changes: 103 additions & 6 deletions tools/nodejs_api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,8 @@ const main = async () => {
// Run a query
const result = await conn.query("MATCH (u:User) RETURN u.name, u.age;");

// Fetch all results
// Consume results (choose one style)
const rows = await result.getAll();

// Output results
for (const row of rows) {
console.log(row);
}
Expand All @@ -68,12 +66,101 @@ main().catch(console.error);

The `lbug` package exposes the following primary classes:

* `Database` – Initializes a database from a file path.
* `Connection` – Executes queries on a connected database.
* `QueryResult` – Provides methods like `getAll()` to retrieve results.
* **Database** – `new Database(path, bufferPoolSize?, ...)`. Initialize with `init()` / `initSync()` (optional; done on first use). Close with `close()`.
* **Connection** – `new Connection(database, numThreads?)`. Run Cypher with `query(statement)` or `prepare(statement)` then `execute(preparedStatement, params)`. Use `transaction(fn)` for a single write transaction, `ping()` for liveness checks. Use `registerStream(name, source, { columns })` to load data from an AsyncIterable via `LOAD FROM name`; `unregisterStream(name)` when done. Configure with `setQueryTimeout(ms)`, `setMaxNumThreadForExec(n)`.
* **QueryResult** – Returned by `query()` / `execute()`. Consume with `getAll()`, `getNext()` / `hasNext()`, **async iteration** (`for await...of`), or **`toStream()`** (Node.js `Readable`). Metadata: `getColumnNames()`, `getColumnDataTypes()`, `getQuerySummary()`. Call `close()` when done (optional if fully consumed).
* **PreparedStatement** – Created by `conn.prepare(statement)`. Execute with `conn.execute(preparedStatement, params)`. Reuse for parameterized queries.

Both CommonJS (`require`) and ES Modules (`import`) are fully supported.

### Consuming query results

```js
const result = await conn.query("MATCH (n:User) RETURN n.name LIMIT 1000");

// Option 1: get all rows (loads into memory)
const rows = await result.getAll();

// Option 2: row by row (async)
while (result.hasNext()) {
const row = await result.getNext();
console.log(row);
}

// Option 3: async iterator (streaming, no full materialization)
for await (const row of result) {
console.log(row);
}

// Option 4: Node.js Readable stream (e.g. for .pipe())
const stream = result.toStream();
stream.on("data", (row) => console.log(row));
```

### Transactions

**Manual:** Run `BEGIN TRANSACTION`, then your queries, then `COMMIT` or `ROLLBACK`. On error, call `ROLLBACK` before continuing.

```js
await conn.query("BEGIN TRANSACTION");
await conn.query("CREATE NODE TABLE Nodes(id INT64, PRIMARY KEY(id))");
await conn.query('COPY Nodes FROM "data.csv"');
await conn.query("COMMIT");
// or on error: await conn.query("ROLLBACK");
```

**Read-only transaction:** `BEGIN TRANSACTION READ ONLY` then queries, then `COMMIT` / `ROLLBACK`.

**Wrapper:** One write transaction with automatic commit on success and rollback on throw:

```js
await conn.transaction(async () => {
await conn.query("CREATE NODE TABLE Nodes(id INT64, PRIMARY KEY(id))");
await conn.query('COPY Nodes FROM "data.csv"');
// commit happens automatically; on throw, rollback then rethrow
});
```

### Loading data from a Node.js stream

You can feed data from an **AsyncIterable** (generator, async generator, or any `Symbol.asyncIterator`) into Cypher using **scan replacement**: register a stream by name, then use `LOAD FROM name` in your query. Rows are pulled from JavaScript on demand during execution.

**API:**

* **`conn.registerStream(name, source, options)`** (async)
* `name` – string used in Cypher: `LOAD FROM name RETURN ...`
* `source` – AsyncIterable of rows. Each row is an **array** of column values (same order as `options.columns`) or an **object** keyed by column name.
* `options.columns` – **required**. Schema: array of `{ name: string, type: string }`. Supported types: `INT64`, `INT32`, `INT16`, `INT8`, `UINT64`, `UINT32`, `DOUBLE`, `FLOAT`, `STRING`, `BOOL`, `DATE`, `TIMESTAMP`.

* **`conn.unregisterStream(name)`**
Unregisters the source so the name can be reused or to avoid leaving stale entries. Call after the query (or when done with the stream).

**Example:**

```js
async function* generateRows() {
yield [1, "Alice"];
yield [2, "Bob"];
yield [3, "Carol"];
}

await conn.registerStream("users", generateRows(), {
columns: [
{ name: "id", type: "INT64" },
{ name: "name", type: "STRING" },
],
});

const result = await conn.query("LOAD FROM users RETURN *");
for await (const row of result) {
console.log(row); // { id: 1, name: "Alice" }, ...
}

conn.unregisterStream("users");
```

You can combine the stream with other Cypher: e.g. `LOAD FROM stream RETURN * WHERE col > 0`, or `COPY MyTable FROM (LOAD FROM stream RETURN *)`.

---

## 🛠️ Local Development (for Contributors)
Expand All @@ -96,6 +183,16 @@ npm run build
npm test
```

When developing from the **monorepo root**, build the native addon first so tests see the latest C++ code:

```bash
# From repo root (D:\prj\ladybug or similar)
make nodejs
# Or: cmake --build build/release --target lbugjs
# Then from tools/nodejs_api:
cd tools/nodejs_api && npm test
```

---

## 📦 Packaging and Binary Distribution
Expand Down
14 changes: 14 additions & 0 deletions tools/nodejs_api/build.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ const path = require("path");
const { execSync } = require("child_process");

const SRC_PATH = path.resolve(__dirname, "../..");
const NODEJS_API = path.resolve(__dirname, ".");
const BUILD_DIR = path.join(NODEJS_API, "build");
const SRC_JS_DIR = path.join(NODEJS_API, "src_js");
const THREADS = require("os").cpus().length;

console.log(`Using ${THREADS} threads to build Lbug.`);
Expand All @@ -12,3 +15,14 @@ execSync(`make nodejs NUM_THREADS=${THREADS}`, {
cwd: SRC_PATH,
stdio: "inherit",
});

// Ensure build/ has latest JS from src_js (CMake copies at configure time only)
if (fs.existsSync(SRC_JS_DIR) && fs.existsSync(BUILD_DIR)) {
const files = fs.readdirSync(SRC_JS_DIR);
for (const name of files) {
if (name.endsWith(".js") || name.endsWith(".mjs") || name.endsWith(".d.ts")) {
fs.copyFileSync(path.join(SRC_JS_DIR, name), path.join(BUILD_DIR, name));
}
}
console.log("Copied src_js to build.");
}
22 changes: 22 additions & 0 deletions tools/nodejs_api/copy_src_to_build.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Copies src_js/*.js, *.mjs, *.d.ts into build/ so tests run with the latest JS
* after "make nodejs" (which only copies at cmake configure time).
* Run from tools/nodejs_api.
*/
const fs = require("fs");
const path = require("path");

const srcDir = path.join(__dirname, "src_js");
const buildDir = path.join(__dirname, "build");

if (!fs.existsSync(buildDir)) {
console.warn("copy_src_to_build: build/ missing, run make nodejs first.");
process.exit(0);
}

const re = /\.(js|mjs|d\.ts)$/;
const files = fs.readdirSync(srcDir).filter((n) => re.test(n));
for (const name of files) {
fs.copyFileSync(path.join(srcDir, name), path.join(buildDir, name));
}
console.log("Copied", files.length, "files from src_js to build.");
137 changes: 137 additions & 0 deletions tools/nodejs_api/docs/execution_chain_analysis.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# LOAD FROM stream: Execution Chain Analysis

## 1. End-to-end execution chain

```mermaid
flowchart TB
subgraph JS["JS (main thread)"]
A[conn.query("LOAD FROM name RETURN *")]
B[registerStream: getChunk(requestId) → pending.push; runConsumer via setImmediate]
C[runConsumer: sort pending, for each requestId take chunks[index], returnChunk(requestId, rows, done)]
D[AsyncIterator: it.next() → yield rows]
end

subgraph CppAddon["C++ addon (Node worker)"]
E[tableFunc: mutex, nextRequestId(), setChunkRequest, BlockingCall getChunk]
F[wait reqPtr->cv until filled]
G[returnChunkFromJS: req->rows, req->filled=true, cv.notify_one]
H[Copy rows to output.dataChunk, return cap]
end

subgraph Engine["Engine (task thread, single if canParallelFunc=false)"]
I[getNextTuple → getNextTuplesInternal]
J[tableFunc(input, output) → numTuplesScanned]
K[FactorizedTable accumulates chunks]
L[Result: MaterializedQueryResult + FactorizedTableIterator]
end

A --> I
I --> J
J --> E
E --> B
B --> C
C --> D
C --> G
G --> F
F --> H
H --> J
J --> K
K --> L
L --> M[hasNext / getNext in JS]
M --> A
```

### Sequence (single-threaded pipeline)

| Step | Where | What |
|------|--------|------|
| 1 | JS | `query("LOAD FROM mystream RETURN *")` |
| 2 | Engine | Parse, plan TableFunctionCall (node stream), execute task |
| 3 | Engine | Source: `getNextTuple()` → `getNextTuplesInternal()` → `tableFunc()` |
| 4 | C++ | `tableFunc`: lock mutex, requestId=1, setChunkRequest(1), BlockingCall(getChunk, 1) |
| 5 | JS | getChunk(1) called on main thread; pending=[1], runConsumer scheduled |
| 6 | C++ | BlockingCall returns only after JS calls returnChunk; C++ waits on req->cv |
| 7 | JS | runConsumer: requestId=1, index=0, it.next() → { value: [1,"a"], done: false }, returnChunk(1, [[1,"a"]], false) |
| 8 | C++ | returnChunkFromJS: req->rows, filled=true, cv.notify_one; tableFunc wakes, copies 1 row, returns 1 |
| 9 | Engine | TableFunctionCall outputs chunk to ResultSet; getNextTuple called again |
| 10 | C++ | tableFunc again: requestId=2, BlockingCall(getChunk, 2), wait |
| 11 | JS | getChunk(2), pending=[2], runConsumer: it.next() → [2,"b"], returnChunk(2, ...) |
| … | … | Repeat until tableFunc returns 0 (empty chunk / done) |
| N | Engine | No more tuples from source → pipeline finishes, MaterializedQueryResult built |
| N+1 | JS | result.hasNext() / result.getNext() over FactorizedTableIterator |
| N+2 | C++ addon | GetNextAsync: if !hasNext() return null; else getNext() (core throws if !hasNext()) |
| N+3 | JS | getNext() resolves to row or null |
```

---

## 2. Expert views and failure points

### Expert 1: Engine / pipeline

- **Order**: With `canParallelFunc = false`, one thread runs the pipeline; tableFunc is called sequentially (1, 2, 3, …). Chunks are appended to FactorizedTable in call order → row order is preserved.
- **End of stream**: Engine keeps calling tableFunc until it returns 0. JS sends `returnChunk(id, [], true)` when iterator is done; C++ returns 0 → engine stops. No extra tableFunc call after “done”.
- **Risk**: If the engine ever called tableFunc again after a 0 return, the next requestId would be issued and JS would call it.next() again (possibly past end). Current code path: return 0 → getNextTuplesInternal returns false → parent stops pulling; no evidence of extra call.

### Expert 2: Addon (C++ / JS bridge)

- **requestId ordering**: C++ assigns requestId sequentially under mutex; JS runConsumer sorts `pending` and serves `chunks[requestId - 1]`. So request 1 gets first it.next(), request 2 gets second, etc. Correct.
- **getNext() contract**: Core MaterializedQueryResult::getNext() throws if !hasNext(). Addon GetNextSync checks hasNext() and returns env.Null() without calling getNext(). GetNextAsync previously called getNext() even when !hasNext() → throw. Fix: in Execute(), if !hasNext() set cppTuple=null and return; in OnOK(), pass env.Null() as value when cppTuple is null.
- **Risk**: Any other code path that calls getNext() without checking hasNext() will still throw (by design in core); addon must always guard.

### Expert 3: API / tests

- **Contract**: d.ts says `getNext(): Promise<Record<string, LbugValue> | null>`. So “no more rows” must be expressed as resolving with `null`, not throwing.
- **Tests**: register_stream expects getNext() after last row to return null; test_query_result was updated to expect null when exhausted instead of throw.
- **Risk**: Inconsistent use of hasNext() before getNext() in other tests or user code can still hit the core throw if the addon path is bypassed or a different API is used.

---

## 3. Hypotheses (why “No more tuples” or wrong order happened)

| # | Hypothesis | Likelihood | Evidence |
|---|------------|------------|----------|
| H1 | Multiple threads called tableFunc; results merged by completion order, so first row could be id=3; then one thread called getNext() after exhaustion | High (before fix) | canParallelFunc default true; TaskScheduler runs task with N threads; each copy of operator calls getNextTuplesInternal. |
| H2 | GetNextAsync did not check hasNext() before getNext(); when test called getNext() the 4th time, C++ threw “No more tuples” | High | Execute() had “if (!hasNext()) cppTuple.reset();” then unconditionally “cppTuple = getNext();” → always threw when exhausted. |
| H3 | JS runConsumer delivered chunks out of order (e.g. request 2 before 1) | Low | pending is sorted by requestId; chunks[index] with index = requestId - 1; iterator consumed in order. |
| H4 | Engine calls tableFunc one extra time after 0 return | Low | No code path found that would call tableFunc again after return 0. |
| H5 | Empty chunk not handled (e.g. returnChunk(id, [], false) and engine expects more) | Low | C++ returns 0 for numRows==0; engine treats 0 as “no tuples this call”, continues until next tableFunc returns 0; JS can send [] and done=false for “no data this chunk” and later send done=true. |

---

## 4. Brainstorm: design options

| Option | Description | Pros | Cons |
|--------|-------------|------|------|
| A. Single-thread table func (current) | canParallelFunc = false for node stream | Simple, deterministic order, no cross-thread ordering bugs | Slightly less engine parallelism for this source only |
| B. Return null when exhausted (current) | Addon GetNextAsync: if !hasNext() return null, do not call getNext() | Matches d.ts and user expectation; tests can rely on getNext() → null | Core still throws if getNext() called without hasNext(); only addon is defensive |
| C. Relax tests: order-independent | Tests collect all rows, sort by id, assert set | Resilient to any engine reorder | Hides real ordering bugs; less strict |
| D. Keep mutex in tableFunc | Already in place | Serializes tableFunc calls even if canParallelFunc were true | Redundant with A; adds lock contention if we ever parallelize |
| E. Core API: getNext() returns null | Change MaterializedQueryResult::getNext() to return null instead of throw when !hasNext() | Single contract everywhere | Breaking change for C++ API users who rely on throw |
| F. Document “always use hasNext() before getNext()” | Keep throw in core, document in JS | Clear contract | Poor DX; register_stream tests expect null |

**Best combination (current state):**

- **A + B**: Single-thread table function + addon returns null when exhausted. No test relaxation (C). Mutex (D) is optional extra safety; can keep. No core API change (E) to avoid breaking C++ users.

---

## 5. Checklist (what was fixed / what to verify)

- [x] **canParallelFunc = false** for NodeStreamScanFunction → single-thread pipeline, order preserved.
- [x] **GetNextAsync**: if !hasNext(), do not call getNext(); pass env.Null() to callback so JS gets null.
- [x] **test_query_result**: expect null when exhausted; test name updated.
- [ ] **Rebuild addon** and run full test suite (register_stream + query_result) to confirm no regressions.
- [x] **Optional**: Add a single test that iterates with hasNext() and asserts getNext() returns null exactly when hasNext() becomes false, to lock the contract (test_query_result.js: "getNext() returns null exactly when hasNext() is false").

---

## 6. One-page summary

**Chain:** JS `query("LOAD FROM name")` → Engine plans TableFunctionCall → one task thread (canParallelFunc=false) calls tableFunc repeatedly → C++ tableFunc uses mutex, nextRequestId(), BlockingCall(getChunk), waits on cv → JS getChunk pushes requestId, runConsumer feeds iterator in request order and returnChunk → C++ wakes, copies rows, returns count → Engine accumulates into FactorizedTable → MaterializedQueryResult → JS hasNext/getNext.

**Root causes of past failures:** (1) Parallel table function → chunks completed out of order → first row could be 3 instead of 1. (2) GetNextAsync called getNext() even when !hasNext() → core threw “No more tuples”.

**Fixes applied:** (1) canParallelFunc = false. (2) GetNextAsync: if !hasNext() return null and skip getNext(); callback second arg = env.Null(). (3) Test expects null when exhausted.

**Recommended:** Keep current design (A+B). Verify with full rebuild and tests.
6 changes: 6 additions & 0 deletions tools/nodejs_api/src_cpp/include/node_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "node_prepared_statement.h"
#include "node_progress_bar_display.h"
#include "node_query_result.h"
#include "node_scan_replacement.h"
#include <napi.h>

using namespace lbug::main;
Expand All @@ -30,15 +31,20 @@ class NodeConnection : public Napi::ObjectWrap<NodeConnection> {
void InitCppConnection();
void SetMaxNumThreadForExec(const Napi::CallbackInfo& info);
void SetQueryTimeout(const Napi::CallbackInfo& info);
void Interrupt(const Napi::CallbackInfo& info);
Napi::Value ExecuteAsync(const Napi::CallbackInfo& info);
Napi::Value QueryAsync(const Napi::CallbackInfo& info);
Napi::Value ExecuteSync(const Napi::CallbackInfo& info);
Napi::Value QuerySync(const Napi::CallbackInfo& info);
void Close(const Napi::CallbackInfo& info);
Napi::Value RegisterStream(const Napi::CallbackInfo& info);
void UnregisterStream(const Napi::CallbackInfo& info);
void ReturnChunk(const Napi::CallbackInfo& info);

private:
std::shared_ptr<Database> database;
std::shared_ptr<Connection> connection;
std::unique_ptr<NodeStreamRegistry> streamRegistry;
};

class ConnectionInitAsyncWorker : public Napi::AsyncWorker {
Expand Down
2 changes: 2 additions & 0 deletions tools/nodejs_api/src_cpp/include/node_progress_bar_display.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ using namespace common;
*/
class NodeProgressBarDisplay : public ProgressBarDisplay {
public:
~NodeProgressBarDisplay() override;

void updateProgress(uint64_t queryID, double newPipelineProgress,
uint32_t newNumPipelinesFinished) override;

Expand Down
3 changes: 2 additions & 1 deletion tools/nodejs_api/src_cpp/include/node_query_result.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class NodeQueryResultGetNextAsyncWorker : public Napi::AsyncWorker {
try {
if (!nodeQueryResult->queryResult->hasNext()) {
cppTuple.reset();
return;
}
cppTuple = nodeQueryResult->queryResult->getNext();
} catch (const std::exception& exc) {
Expand All @@ -112,7 +113,7 @@ class NodeQueryResultGetNextAsyncWorker : public Napi::AsyncWorker {
inline void OnOK() override {
auto env = Env();
if (cppTuple == nullptr) {
Callback().Call({env.Null(), env.Undefined()});
Callback().Call({env.Null(), env.Null()});
return;
}
Napi::Object nodeTuple = Napi::Object::New(env);
Expand Down
Loading