Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion docs/icebug-disk.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ CREATE REL TABLE follows(FROM user TO user, since INT32) WITH (storage = '<path-
CREATE REL TABLE livesin(FROM user TO city) WITH (storage = '<path-to-dir>', format = 'icebug-disk');
```

File paths can be relative or absolute and are resolved as `<path-to-dir>/nodes_{tableName}.parquet` for node tables, and `<path-to-dir>/indices_{tableName}.parquet` and `<path-to-dir>/indptr_{tableName}.parquet` for relationship tables.
File paths can be relative or absolute and are resolved as `<path-to-dir>/nodes_{tableName}.parquet` for node tables, and `<path-to-dir>/indices_{tableName}.parquet` and `<path-to-dir>/indptr_{tableName}.parquet` for CSR relationship tables. Relationship tables can also point `storage` directly at a single `.parquet` file to use the FLAT layout.

Object-store URIs (e.g. `s3://bucket/path`, `https://host/path`) are also supported as `storage` values.

Expand All @@ -49,6 +49,14 @@ Each relationship table has a corresponding `indices_{tableName}.parquet` file c

Each relationship table has a corresponding `indptr_{tableName}.parquet` file containing the CSR row pointers. It has a single integer column with `N+1` entries, where `N` is the number of source nodes.

### Flat Relationships

A relationship table whose `storage` value points directly to a `.parquet` file uses the FLAT layout. The file contains one row per edge. The first two columns are source and target node offsets, followed by zero or more edge property columns as declared in the schema. For example:

```cypher
CREATE REL TABLE follows(FROM user TO user, since INT32) WITH (storage = '<path-to-file>/rels_follows.parquet', format = 'icebug-disk');
```

## Convert from other formats

You can convert from other graph formats (e.g. duckdb, parquet tables) to Icebug-Disk using the script at https://github.com/Ladybug-Memory/icebug-format
30 changes: 30 additions & 0 deletions src/c_api/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,36 @@ lbug_state lbug_connection_create_arrow_rel_table(lbug_connection* connection,
}
}

lbug_state lbug_connection_create_arrow_rel_table_csr(lbug_connection* connection,
const char* table_name, const char* src_table_name, const char* dst_table_name,
ArrowSchema* indices_schema, ArrowArray* indices_arrays, uint64_t num_indices_arrays,
ArrowSchema* indptr_schema, ArrowArray* indptr_arrays, uint64_t num_indptr_arrays,
lbug_query_result* out_query_result) {
if (connection == nullptr || connection->_connection == nullptr || table_name == nullptr ||
src_table_name == nullptr || dst_table_name == nullptr || indices_schema == nullptr ||
indices_arrays == nullptr || indptr_schema == nullptr || indptr_arrays == nullptr ||
out_query_result == nullptr) {
return LbugError;
}
try {
clearLastCAPIErrorMessage();
auto result = lbug::ArrowTableSupport::createRelTableFromArrowCSR(
*static_cast<Connection*>(connection->_connection), table_name, src_table_name,
dst_table_name, takeArrowSchema(indices_schema),
takeArrowArrays(indices_arrays, num_indices_arrays), takeArrowSchema(indptr_schema),
takeArrowArrays(indptr_arrays, num_indptr_arrays));
auto state = setQueryResult(std::move(result.queryResult), out_query_result);
if (state == LbugSuccess) {
rememberArrowTableID(static_cast<Connection*>(connection->_connection), table_name,
std::move(result.arrowId));
}
return state;
} catch (Exception& e) {
setLastCAPIErrorMessage(e.what());
return LbugError;
}
}

lbug_state lbug_connection_drop_arrow_table(lbug_connection* connection, const char* table_name,
lbug_query_result* out_query_result) {
if (connection == nullptr || connection->_connection == nullptr || table_name == nullptr ||
Expand Down
14 changes: 14 additions & 0 deletions src/include/c_api/lbug.h
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,20 @@ LBUG_C_API lbug_state lbug_connection_create_arrow_rel_table(lbug_connection* co
const char* table_name, const char* src_table_name, const char* dst_table_name,
struct ArrowSchema* schema, struct ArrowArray* arrays, uint64_t num_arrays,
lbug_query_result* out_query_result);
/**
* @brief Creates a CSR Arrow memory-backed relationship table from Arrow C Data Interface data.
*
* The indices Arrow table must contain a destination offset column named "to" and any relationship
* property columns. The indptr Arrow table must contain one offset column. Ownership of schemas and
* arrays is transferred to lbug on success or failure. The caller must not release them after this
* call.
*/
LBUG_C_API lbug_state lbug_connection_create_arrow_rel_table_csr(lbug_connection* connection,
const char* table_name, const char* src_table_name, const char* dst_table_name,
struct ArrowSchema* indices_schema, struct ArrowArray* indices_arrays,
uint64_t num_indices_arrays, struct ArrowSchema* indptr_schema,
struct ArrowArray* indptr_arrays, uint64_t num_indptr_arrays,
lbug_query_result* out_query_result);
/**
* @brief Drops an Arrow memory-backed table.
*/
Expand Down
22 changes: 21 additions & 1 deletion src/include/storage/table/arrow_rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "catalog/catalog_entry/rel_group_catalog_entry.h"
#include "common/arrow/arrow.h"
#include "storage/table/arrow_table_support.h"
#include "storage/table/columnar_rel_table_base.h"
#include "storage/table/node_table.h"

Expand All @@ -30,7 +31,9 @@ class ArrowRelTable final : public ColumnarRelTableBase {
ArrowRelTable(catalog::RelGroupCatalogEntry* relGroupEntry, common::table_id_t fromTableID,
common::table_id_t toTableID, const StorageManager* storageManager,
MemoryManager* memoryManager, const NodeTable* fromNodeTable, const NodeTable* toNodeTable,
ArrowSchemaWrapper schema, std::vector<ArrowArrayWrapper> arrays, std::string arrowId);
ArrowRelTableLayout layout, ArrowSchemaWrapper schema,
std::vector<ArrowArrayWrapper> arrays, ArrowSchemaWrapper indptrSchema,
std::vector<ArrowArrayWrapper> indptrArrays, std::string arrowId);
~ArrowRelTable();

void initScanState(transaction::Transaction* transaction, TableScanState& scanState,
Expand All @@ -45,16 +48,33 @@ class ArrowRelTable final : public ColumnarRelTableBase {
private:
int64_t fromColumnIdx = -1;
int64_t toColumnIdx = -1;
int64_t csrNbrColumnIdx = -1;
int64_t csrIndptrColumnIdx = 0;
std::vector<int64_t> getOutputToArrowColumnIdx(
const std::vector<common::column_id_t>& columnIDs) const;
bool scanFlat(transaction::Transaction* transaction, TableScanState& scanState);
bool scanCSR(TableScanState& scanState);
bool readCSRValue(common::ValueVector& outputVector, common::offset_t relOffset,
uint64_t dstOffset) const;
bool readIndptr(common::offset_t srcOffset, common::offset_t& result) const;
common::offset_t findCSRSourceOffset(common::offset_t relOffset) const;
bool readArrowValueAtOffset(const ArrowSchemaWrapper& arrowSchema,
const std::vector<ArrowArrayWrapper>& arrowArrays, const std::vector<size_t>& startOffsets,
int64_t columnIdx, common::offset_t rowOffset, common::ValueVector& outputVector,
uint64_t dstOffset) const;

const NodeTable* fromNodeTable;
const NodeTable* toNodeTable;
ArrowRelTableLayout layout;
ArrowSchemaWrapper schema;
std::vector<ArrowArrayWrapper> arrays;
std::vector<size_t> batchStartOffsets;
ArrowSchemaWrapper indptrSchema;
std::vector<ArrowArrayWrapper> indptrArrays;
std::vector<size_t> indptrBatchStartOffsets;
std::unordered_map<common::column_id_t, int64_t> propertyColumnToArrowColumnIdx;
size_t totalRows = 0;
size_t totalIndptrRows = 0;
std::string arrowId;
};

Expand Down
26 changes: 26 additions & 0 deletions src/include/storage/table/arrow_table_support.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <cstdint>
#include <memory>
#include <string>
#include <vector>
Expand All @@ -10,6 +11,16 @@

namespace lbug {

enum class ArrowRelTableLayout : uint8_t { FLAT, CSR };

struct ArrowRelTableData {
ArrowRelTableLayout layout = ArrowRelTableLayout::FLAT;
ArrowSchemaWrapper schema;
std::vector<ArrowArrayWrapper> arrays;
ArrowSchemaWrapper indptrSchema;
std::vector<ArrowArrayWrapper> indptrArrays;
};

// Result of creating an arrow table view
struct ArrowTableCreationResult {
std::unique_ptr<main::QueryResult> queryResult;
Expand All @@ -22,10 +33,16 @@ class LBUG_API ArrowTableSupport {
static std::string registerArrowData(ArrowSchemaWrapper schema,
std::vector<ArrowArrayWrapper> arrays);

// Register Arrow relationship data and return an ID
static std::string registerArrowRelData(ArrowRelTableData data);

// Retrieve Arrow data by ID (returns pointers to data in registry)
static bool getArrowData(const std::string& id, ArrowSchemaWrapper*& schema,
std::vector<ArrowArrayWrapper>*& arrays);

// Retrieve Arrow relationship data by ID (returns pointer to data in registry)
static bool getArrowRelData(const std::string& id, ArrowRelTableData*& data);

// Unregister Arrow data by ID
static void unregisterArrowData(const std::string& id);

Expand All @@ -42,6 +59,15 @@ class LBUG_API ArrowTableSupport {
std::vector<ArrowArrayWrapper> arrays, const std::string& srcColumnName = "from",
const std::string& dstColumnName = "to");

// Create a relationship table from Arrow CSR arrays. The indices table must contain a
// destination offset column and any relationship property columns. The indptr table must
// contain one offset column with source-node row offsets into the indices table.
static ArrowTableCreationResult createRelTableFromArrowCSR(main::Connection& connection,
const std::string& tableName, const std::string& srcTableName,
const std::string& dstTableName, ArrowSchemaWrapper indicesSchema,
std::vector<ArrowArrayWrapper> indicesArrays, ArrowSchemaWrapper indptrSchema,
std::vector<ArrowArrayWrapper> indptrArrays, const std::string& dstColumnName = "to");

// Unregister an arrow table completely (drop table and unregister data)
static std::unique_ptr<main::QueryResult> unregisterArrowTable(main::Connection& connection,
const std::string& tableName);
Expand Down
7 changes: 7 additions & 0 deletions src/include/storage/table/ice_disk_rel_table.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <cstdint>

#include "catalog/catalog_entry/rel_group_catalog_entry.h"
#include "common/exception/runtime.h"
#include "common/types/internal_id_util.h"
Expand All @@ -13,6 +15,8 @@ class ClientContext;
} // namespace main
namespace storage {

enum class IceDiskRelTableLayout : uint8_t { CSR, FLAT };

struct IceDiskRelTableScanState final : RelTableScanState {
std::unique_ptr<processor::ParquetReaderScanState> parquetScanState;

Expand Down Expand Up @@ -68,6 +72,7 @@ class IceDiskRelTable final : public ColumnarRelTableBase {
common::row_idx_t getTotalRowCount(const transaction::Transaction* transaction) const override;

private:
IceDiskRelTableLayout layout;
std::string indicesFilePath;
std::string indptrFilePath;
mutable std::unique_ptr<processor::ParquetReader> indicesReader;
Expand All @@ -80,6 +85,8 @@ class IceDiskRelTable final : public ColumnarRelTableBase {
void initializeIndptrReader(transaction::Transaction* transaction) const;
void loadIndptrData(transaction::Transaction* transaction) const;
common::offset_t findSourceNodeForRow(common::offset_t globalRowIdx) const;
bool scanCSR(transaction::Transaction* transaction, IceDiskRelTableScanState& scanState);
bool scanFlat(transaction::Transaction* transaction, IceDiskRelTableScanState& scanState);
};

} // namespace storage
Expand Down
7 changes: 7 additions & 0 deletions src/include/storage/table/ice_disk_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ class IceDiskUtils {
IceDiskUtils::joinPath(dir, "indptr_" + name + suffix)};
}

// Get the file path for a flat relationship table. The file contains source and target node
// offsets followed by relationship property columns.
static std::string constructFlatRelTablePath(const std::string& dir, const std::string& name,
const std::string& suffix) {
return IceDiskUtils::joinPath(dir, "rels_" + name + suffix);
}

// Validates that the parquet file at `path` carries the expected icebug_disk_version metadata.
// Note: path is already resolved by VFS
static void checkVersionCompatibility(main::ClientContext* context, const std::string& path) {
Expand Down
2 changes: 2 additions & 0 deletions src/include/storage/table/rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ struct RelTableScanState : TableScanState {
// a single multi-rel scan state can scan native, icebug-disk-backed, and Arrow-backed tables.
size_t arrowCurrentBatchIdx = 0;
size_t arrowCurrentBatchOffset = 0;
size_t arrowCSRBoundIdx = 0;
common::offset_t arrowCSRCurrentRelOffset = common::INVALID_OFFSET;
std::unordered_map<common::offset_t, common::sel_t> arrowBoundNodeOffsetToSelPos;
std::unique_ptr<common::ValueVector> arrowSrcKeyVector;
std::unique_ptr<common::ValueVector> arrowDstKeyVector;
Expand Down
20 changes: 13 additions & 7 deletions src/storage/storage_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,8 @@ void StorageManager::addRelTable(RelGroupCatalogEntry* entry, const RelTableCata
} else if (!entry->getStorage().empty()) {
if (entry->getStorage().substr(0, 8) == "arrow://") {
std::string arrowId = entry->getStorage().substr(8);
ArrowSchemaWrapper* schema = nullptr;
std::vector<ArrowArrayWrapper>* arrays = nullptr;
if (!ArrowTableSupport::getArrowData(arrowId, schema, arrays)) {
ArrowRelTableData* relData = nullptr;
if (!ArrowTableSupport::getArrowRelData(arrowId, relData)) {
throw common::RuntimeException("Failed to retrieve Arrow data for ID: " + arrowId);
}
if (!tables.contains(info.nodePair.srcTableID) ||
Expand All @@ -186,15 +185,22 @@ void StorageManager::addRelTable(RelGroupCatalogEntry* entry, const RelTableCata
throw common::RuntimeException(
"Arrow rel table currently supports only regular node tables");
}
ArrowSchemaWrapper schemaCopy = createShallowCopy(*schema);
ArrowSchemaWrapper schemaCopy = createShallowCopy(relData->schema);
std::vector<ArrowArrayWrapper> arraysCopy;
arraysCopy.reserve(arrays->size());
for (const auto& arr : *arrays) {
arraysCopy.reserve(relData->arrays.size());
for (const auto& arr : relData->arrays) {
arraysCopy.push_back(createShallowCopy(arr));
}
ArrowSchemaWrapper indptrSchemaCopy = createShallowCopy(relData->indptrSchema);
std::vector<ArrowArrayWrapper> indptrArraysCopy;
indptrArraysCopy.reserve(relData->indptrArrays.size());
for (const auto& arr : relData->indptrArrays) {
indptrArraysCopy.push_back(createShallowCopy(arr));
}
tables[info.oid] = std::make_unique<ArrowRelTable>(entry, info.nodePair.srcTableID,
info.nodePair.dstTableID, this, &memoryManager, fromNodeTable, toNodeTable,
std::move(schemaCopy), std::move(arraysCopy), arrowId);
relData->layout, std::move(schemaCopy), std::move(arraysCopy),
std::move(indptrSchemaCopy), std::move(indptrArraysCopy), arrowId);
} else {
throw common::RuntimeException(
"Unsupported storage option for rel table: " + entry->getStorage());
Expand Down
Loading
Loading