Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions docs/icebug-memory.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Icebug-Memory Storage Format

## Overview

This is LadybugDB's implementation of [Icebug-Memory](https://github.com/Ladybug-Memory/icebug-format), a read-only graph storage format based on Arrow. It is designed for efficient analytical queries on large graphs.

## V1

Implements Icebug-Memory v1

### Creating tables

Icebug-Memory tables can be created using python/C/C++ APIs. Other languages and CLI are currently not supported

- `create_arrow_table(conn, table_name, arrow_schema, arrow_arrays)` for node tables
- `create_arrow_csr_rel_table(connection, tableName, srcTableName, dstTableName,
fwdIndicesSchema, fwdIndices,
fwdIndptrSchema, fwdIndptr,
optional<bwdIndicesSchema>, optional<bwdIndices>,
optional<bwdIndptrSchema>, optional<bwdIndptr>)` for CSR relationship tables

### Node tables

For each node table, there is a corresponding arrow table containing a primary key column and one column per property as declared in the schema.

### Indices

Each relationship table has a corresponding fwd arrow table containing one row per edge. The first column is always `target` (the destination node offset), followed by zero or more edge property columns as declared in the schema. Optionally, a bwd arrow table can be supplied for efficient reverse traversals.

### Indptr

Each relationship table has a corresponding fwd arrow table containing the CSR row pointers. It has a single integer column with `N+1` entries, where `N` is the number of source nodes. Optionally, a bwd indptr table can be supplied for efficient reverse traversals.

## Convert from other formats

You can convert from other graph formats (e.g. non-csr arrow tables) to Icebug-Memory using the script at https://github.com/Ladybug-Memory/icebug-format

## Lifetime and mutability

Icebug-Memory tables are immutable. `INSERT`, `UPDATE`, `DELETE`, and `ALTER TABLE` are not supported.

The data lifetime is tied to the in-memory Arrow registration. Dropping the table unregisters the Arrow data, and restarting the process requires registering the data again.
14 changes: 8 additions & 6 deletions src/binder/bind/bind_ddl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -597,8 +597,7 @@ std::unique_ptr<BoundStatement> Binder::bindDrop(const Statement& statement) {
return std::make_unique<BoundDrop>(drop.getDropInfo());
}

static void validateNotIceDiskTable(main::ClientContext* clientContext,
const std::string& tableName) {
static void validateNotExtTable(main::ClientContext* clientContext, const std::string& tableName) {
auto catalog = Catalog::Get(*clientContext);
auto transaction = transaction::Transaction::Get(*clientContext);

Expand All @@ -608,24 +607,27 @@ static void validateNotIceDiskTable(main::ClientContext* clientContext,

auto tableEntry = catalog->getTableCatalogEntry(transaction, tableName);
StorageFormat storageFormat = StorageFormat::NONE;
std::string storage;

if (tableEntry->getTableType() == common::TableType::NODE) {
storageFormat = tableEntry->ptrCast<NodeTableCatalogEntry>()->getStorageFormat();
storage = tableEntry->ptrCast<NodeTableCatalogEntry>()->getStorage();
} else if (tableEntry->getTableType() == common::TableType::REL) {
storageFormat = tableEntry->ptrCast<RelGroupCatalogEntry>()->getStorageFormat();
storage = tableEntry->ptrCast<RelGroupCatalogEntry>()->getStorage();
}

if (storageFormat == StorageFormat::ICEBUG_DISK) {
if (!storage.empty() || storageFormat == StorageFormat::ICEBUG_DISK) {
throw BinderException(
std::format("Cannot alter table {}: icebug-disk tables are immutable.", tableName));
std::format("Cannot alter table {}: external tables are immutable.", tableName));
}
}

std::unique_ptr<BoundStatement> Binder::bindAlter(const Statement& statement) {
auto& alter = statement.constCast<Alter>();

// we don't support alter operations on icebug-disk tables
validateNotIceDiskTable(clientContext, alter.getInfo()->tableName);
// we don't support alter operations on external tables
validateNotExtTable(clientContext, alter.getInfo()->tableName);

switch (alter.getInfo()->type) {
case AlterType::RENAME: {
Expand Down
59 changes: 59 additions & 0 deletions src/c_api/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,9 @@ lbug_state lbug_connection_drop_arrow_table(lbug_connection* connection, const c
auto state = setQueryResult(std::move(result), out_query_result);
if (state == LbugSuccess) {
if (!arrowId.empty()) {
// One of these is always a no-op depending on table type.
lbug::ArrowTableSupport::unregisterArrowData(arrowId);
lbug::ArrowTableSupport::unregisterCsrRelData(arrowId);
}
forgetArrowTableID(connectionPtr, table_name);
}
Expand All @@ -297,6 +299,63 @@ lbug_state lbug_connection_drop_arrow_table(lbug_connection* connection, const c
}
}

lbug_state lbug_connection_create_arrow_csr_rel_table(lbug_connection* connection,
const char* table_name, const char* src_table_name, const char* dst_table_name,
ArrowSchema* fwd_indices_schema, ArrowArray* fwd_indices_arrays,
uint64_t fwd_indices_num_arrays, ArrowSchema* fwd_indptr_schema, ArrowArray* fwd_indptr_arrays,
uint64_t fwd_indptr_num_arrays, ArrowSchema* bwd_indices_schema, ArrowArray* bwd_indices_arrays,
uint64_t bwd_indices_num_arrays, ArrowSchema* bwd_indptr_schema, ArrowArray* bwd_indptr_arrays,
uint64_t bwd_indptr_num_arrays, lbug_query_result* out_query_result) {
if (connection == nullptr || connection->_connection == nullptr || table_name == nullptr ||
src_table_name == nullptr || dst_table_name == nullptr || fwd_indices_schema == nullptr ||
fwd_indices_arrays == nullptr || fwd_indptr_schema == nullptr ||
fwd_indptr_arrays == nullptr || out_query_result == nullptr) {
return LbugError;
}
// BWD must be all-or-none across both schemas and array batches.
bool hasAnyBwd = bwd_indices_schema != nullptr || bwd_indices_arrays != nullptr ||
bwd_indices_num_arrays != 0 || bwd_indptr_schema != nullptr ||
bwd_indptr_arrays != nullptr || bwd_indptr_num_arrays != 0;
bool hasAllBwd = bwd_indices_schema != nullptr && bwd_indices_arrays != nullptr &&
bwd_indptr_schema != nullptr && bwd_indptr_arrays != nullptr;
if (hasAnyBwd && !hasAllBwd) {
setLastCAPIErrorMessage("bwd_indices_schema, bwd_indices_arrays, bwd_indptr_schema, and "
"bwd_indptr_arrays must all be provided together or all be null");
return LbugError;
}

try {
clearLastCAPIErrorMessage();
auto connPtr = static_cast<Connection*>(connection->_connection);
std::optional<ArrowSchemaWrapper> bwdIdxSchema;
std::optional<std::vector<ArrowArrayWrapper>> bwdIdxArrays;
std::optional<ArrowSchemaWrapper> bwdIpSchema;
std::optional<std::vector<ArrowArrayWrapper>> bwdIpArrays;

if (hasAllBwd) {
bwdIdxSchema = takeArrowSchema(bwd_indices_schema);
bwdIdxArrays = takeArrowArrays(bwd_indices_arrays, bwd_indices_num_arrays);
bwdIpSchema = takeArrowSchema(bwd_indptr_schema);
bwdIpArrays = takeArrowArrays(bwd_indptr_arrays, bwd_indptr_num_arrays);
}

auto result = lbug::ArrowTableSupport::createArrowCsrRelTable(*connPtr, table_name,
src_table_name, dst_table_name, takeArrowSchema(fwd_indices_schema),
takeArrowArrays(fwd_indices_arrays, fwd_indices_num_arrays),
takeArrowSchema(fwd_indptr_schema),
takeArrowArrays(fwd_indptr_arrays, fwd_indptr_num_arrays), std::move(bwdIdxSchema),
std::move(bwdIdxArrays), std::move(bwdIpSchema), std::move(bwdIpArrays));
auto state = setQueryResult(std::move(result.queryResult), out_query_result);
if (state == LbugSuccess) {
rememberArrowTableID(connPtr, table_name, std::move(result.arrowId));
}
return state;
} catch (Exception& e) {
setLastCAPIErrorMessage(e.what());
return LbugError;
}
}

void lbug_connection_interrupt(lbug_connection* connection) {
static_cast<Connection*>(connection->_connection)->interrupt();
}
Expand Down
17 changes: 17 additions & 0 deletions src/include/c_api/lbug.h
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,23 @@ LBUG_C_API lbug_state lbug_connection_create_arrow_rel_table(lbug_connection* co
*/
LBUG_C_API lbug_state lbug_connection_drop_arrow_table(lbug_connection* connection,
const char* table_name, lbug_query_result* out_query_result);
/**
* @brief Creates an Arrow CSR memory-backed relationship table.
*
* Stores a CSR-layout edge table driven by bound-offset scans. Ownership of all schemas and
* array batches is transferred on call. Pass NULL for bwd_* parameters to omit backward
* adjacency (BWD scans will fall back to a full FWD scan). If any bwd_* parameter is non-NULL
* all four bwd_* parameters must be non-NULL.
*/
LBUG_C_API lbug_state lbug_connection_create_arrow_csr_rel_table(lbug_connection* connection,
const char* table_name, const char* src_table_name, const char* dst_table_name,
struct ArrowSchema* fwd_indices_schema, struct ArrowArray* fwd_indices_arrays,
uint64_t fwd_indices_num_arrays, struct ArrowSchema* fwd_indptr_schema,
struct ArrowArray* fwd_indptr_arrays, uint64_t fwd_indptr_num_arrays,
struct ArrowSchema* bwd_indices_schema, struct ArrowArray* bwd_indices_arrays,
uint64_t bwd_indices_num_arrays, struct ArrowSchema* bwd_indptr_schema,
struct ArrowArray* bwd_indptr_arrays, uint64_t bwd_indptr_num_arrays,
lbug_query_result* out_query_result);
/**
* @brief Interrupts the current query execution in the connection.
* @param connection The connection instance to interrupt.
Expand Down
30 changes: 30 additions & 0 deletions src/include/storage/table/arrow_csr_rel_data.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#pragma once

#include <optional>
#include <vector>

#include "common/arrow/arrow.h"

namespace lbug {
namespace storage {

// One directional adjacency for a CSR-layout Arrow rel table.
// indices batches: struct with child[0] = UINT64 neighbour offset, child[1..] = edge properties
// indptr batches: struct with child[0] = UINT64 row pointers (indptr[i] = first edge of node i,
// N+1 entries where indptr[0]==0)
struct ArrowCsrAdj {
ArrowSchemaWrapper indicesSchema;
std::vector<ArrowArrayWrapper> indices;
ArrowSchemaWrapper indptrSchema;
std::vector<ArrowArrayWrapper> indptr;
};

// CSR adjacency data for one Arrow rel table.
// fwd is required; bwd enables O(degree) backward scans when present.
struct ArrowCsrRelData {
ArrowCsrAdj fwd;
std::optional<ArrowCsrAdj> bwd;
};

} // namespace storage
} // namespace lbug
8 changes: 0 additions & 8 deletions src/include/storage/table/arrow_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,9 @@ class ArrowNodeTable final : public ColumnarNodeTableBase {
common::row_idx_t getTotalRowCount(const transaction::Transaction* transaction) const override;

private:
std::vector<size_t> getBatchSizes(
[[maybe_unused]] const transaction::Transaction* transaction) const;

std::vector<int64_t> getOutputToArrowColumnIdx(
const std::vector<common::column_id_t>& columnIDs) const;

void copyArrowMorselToOutputVectors(const ArrowArrayWrapper& batch,
const size_t currentMorselStartOffset, const uint64_t numRowsToCopy,
const std::vector<common::ValueVector*>& outputVectors,
const std::vector<int64_t>& outputToArrowColumnIdx) const;

private:
ArrowSchemaWrapper schema;
std::vector<ArrowArrayWrapper> arrays;
Expand Down
Loading
Loading