diff --git a/src/ailego/buffer/vector_page_table.cc b/src/ailego/buffer/vector_page_table.cc index fec7a1902..553919fb3 100644 --- a/src/ailego/buffer/vector_page_table.cc +++ b/src/ailego/buffer/vector_page_table.cc @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include +#include #include #include @@ -41,6 +44,8 @@ static ssize_t zvec_pread(int fd, void *buf, size_t count, size_t offset) { namespace zvec { namespace ailego { +const size_t kVectorPageSize = MemoryHelper::PageSize(); + void VectorPageTable::init(size_t entry_num) { if (entries_) { delete[] entries_; @@ -97,12 +102,11 @@ void VectorPageTable::evict_block(block_id_t block_id) { assert(block_id < entry_num_); Entry &entry = entries_[block_id]; char *buffer = entry.buffer; - size_t size = entry.size; int expected = 0; if (entry.ref_count.compare_exchange_strong( expected, std::numeric_limits::min())) { if (buffer) { - MemoryLimitPool::get_instance().release_buffer(buffer, size); + MemoryLimitPool::get_instance().release_buffer(buffer, kVectorPageSize); } } // Always reset in_evict_queue regardless of whether the CAS succeeded: @@ -113,32 +117,20 @@ void VectorPageTable::evict_block(block_id_t block_id) { entry.in_evict_queue.store(false, std::memory_order_relaxed); } -char *VectorPageTable::set_block_acquired(block_id_t block_id, char *buffer, - size_t size) { +char *VectorPageTable::set_block_acquired(block_id_t block_id, char *buffer) { assert(block_id < entry_num_); Entry &entry = entries_[block_id]; while (true) { int current_count = entry.ref_count.load(std::memory_order_relaxed); if (current_count >= 0) { - // Defensive branch: in practice this path should never be reached. - // set_block_acquired() is always called under block_mutexes_[block_id], - // and the caller (acquire_buffer) re-checks acquire_block() inside the - // same lock before invoking this function. Therefore, if we get here, - // ref_count must still be negative (unloaded). This branch is retained - // as a safety net in case the locking contract is violated in the future, - // e.g. if set_block_acquired is called from an unlocked context. if (entry.ref_count.compare_exchange_weak( current_count, current_count + 1, std::memory_order_acq_rel, std::memory_order_acquire)) { - MemoryLimitPool::get_instance().release_buffer(buffer, size); + MemoryLimitPool::get_instance().release_buffer(buffer, kVectorPageSize); return entry.buffer; } } else { entry.buffer = buffer; - entry.size = size; - // Ensure in_evict_queue is cleared when the block is freshly loaded so - // that the first release_block() after loading can register it in the - // eviction queue. entry.in_evict_queue.store(false, std::memory_order_relaxed); entry.ref_count.store(1, std::memory_order_release); return entry.buffer; @@ -170,15 +162,13 @@ VecBufferPool::VecBufferPool(const std::string &filename) { file_size_ = st.st_size; } -int VecBufferPool::init(size_t segment_count) { - size_t block_num = segment_count + 10; +int VecBufferPool::init() { + size_t block_num = (file_size_ + kVectorPageSize - 1) / kVectorPageSize; page_table_.init(block_num); - // Allocate all mutexes in a single contiguous array so that the cold-path - // lock in acquire_buffer() accesses cache-friendly memory instead of - // chasing 31K+ independent heap pointers. - block_mutexes_ = std::make_unique(block_num); - block_mutexes_count_ = block_num; - LOG_DEBUG("entry num: %zu", page_table_.entry_num()); + block_mutexes_ = + std::make_unique(VecBufferPool::kMutexBucketCount); + LOG_DEBUG("entry num: %zu, file_size: %zu", page_table_.entry_num(), + file_size_); return 0; } @@ -186,54 +176,57 @@ VecBufferPoolHandle VecBufferPool::get_handle() { return VecBufferPoolHandle(*this); } -char *VecBufferPool::acquire_buffer(block_id_t block_id, size_t offset, - size_t size, int retry) { - assert(block_id < block_mutexes_count_); - char *buffer = page_table_.acquire_block(block_id); +char *VecBufferPool::acquire_buffer(block_id_t page_id, int retry) { + assert(page_id < page_table_.entry_num()); + char *buffer = page_table_.acquire_block(page_id); if (buffer) { return buffer; } - std::lock_guard lock(block_mutexes_[block_id]); - buffer = page_table_.acquire_block(block_id); + std::lock_guard lock( + block_mutexes_[page_id % VecBufferPool::kMutexBucketCount]); + buffer = page_table_.acquire_block(page_id); if (buffer) { return buffer; } { - bool found = - MemoryLimitPool::get_instance().try_acquire_buffer(size, buffer); + bool found = MemoryLimitPool::get_instance().try_acquire_buffer( + kVectorPageSize, buffer); if (!found) { for (int i = 0; i < retry; i++) { BlockEvictionQueue::get_instance().recycle(); - found = - MemoryLimitPool::get_instance().try_acquire_buffer(size, buffer); + found = MemoryLimitPool::get_instance().try_acquire_buffer( + kVectorPageSize, buffer); if (found) { break; } } } if (!found) { - LOG_ERROR( - "Buffer pool failed to get free buffer: file[%s], block_id[%zu], " - "offset[%zu], size[%zu]", - file_name_.c_str(), block_id, offset, size); + LOG_ERROR("Buffer pool failed to get free buffer: file[%s], page_id[%zu]", + file_name_.c_str(), page_id); return nullptr; } } + size_t page_offset = page_id * kVectorPageSize; + size_t expected_bytes = std::min(kVectorPageSize, file_size_ - page_offset); + if (expected_bytes < kVectorPageSize) { + std::memset(buffer + expected_bytes, 0, kVectorPageSize - expected_bytes); + } #if defined(_MSC_VER) - ssize_t read_bytes = zvec_pread(fd_, buffer, size, offset); + ssize_t read_bytes = zvec_pread(fd_, buffer, expected_bytes, page_offset); #else - ssize_t read_bytes = pread(fd_, buffer, size, offset); + ssize_t read_bytes = pread(fd_, buffer, expected_bytes, page_offset); #endif - if (read_bytes != static_cast(size)) { + if (read_bytes != static_cast(expected_bytes)) { LOG_ERROR( - "Buffer pool failed to read file at offset: file[%s], block_id[%zu], " - "offset[%zu], size[%zu]", - file_name_.c_str(), block_id, offset, size); - MemoryLimitPool::get_instance().release_buffer(buffer, size); + "Buffer pool failed to read file at offset: file[%s], page_id[%zu], " + "offset[%zu], expected[%zu], got[%zd]", + file_name_.c_str(), page_id, page_offset, expected_bytes, read_bytes); + MemoryLimitPool::get_instance().release_buffer(buffer, kVectorPageSize); return nullptr; } - return page_table_.set_block_acquired(block_id, buffer, size); + return page_table_.set_block_acquired(page_id, buffer); } int VecBufferPool::get_meta(size_t offset, size_t length, char *buffer) { @@ -252,10 +245,41 @@ int VecBufferPool::get_meta(size_t offset, size_t length, char *buffer) { return 0; } -char *VecBufferPoolHandle::get_block(size_t offset, size_t size, - size_t block_id) { - char *buffer = pool_.acquire_buffer(block_id, offset, size, 50); - return buffer; +char *VecBufferPoolHandle::get_single_page(size_t file_offset, size_t len, + size_t &out_page_id) { + size_t first_page = file_offset / kVectorPageSize; + assert(len == 0 || (file_offset + len - 1) / kVectorPageSize == first_page); + out_page_id = first_page; + char *page = pool_.acquire_buffer(first_page, 50); + if (!page) { + return nullptr; + } + return page + (file_offset - first_page * kVectorPageSize); +} + +bool VecBufferPoolHandle::read_range(size_t file_offset, size_t len, + char *out) { + if (len == 0) { + return true; + } + size_t first_page = file_offset / kVectorPageSize; + size_t last_page = (file_offset + len - 1) / kVectorPageSize; + size_t remaining = len; + size_t dst_cursor = 0; + for (size_t pg = first_page; pg <= last_page; ++pg) { + char *page = pool_.acquire_buffer(pg, 50); + if (!page) { + return false; + } + size_t page_start = pg * kVectorPageSize; + size_t intra_offset = (pg == first_page) ? (file_offset - page_start) : 0; + size_t chunk = std::min(kVectorPageSize - intra_offset, remaining); + std::memcpy(out + dst_cursor, page + intra_offset, chunk); + pool_.page_table_.release_block(pg); + dst_cursor += chunk; + remaining -= chunk; + } + return true; } int VecBufferPoolHandle::get_meta(size_t offset, size_t length, char *buffer) { diff --git a/src/core/algorithm/hnsw/hnsw_entity.h b/src/core/algorithm/hnsw/hnsw_entity.h index a6ead8f63..bae57ec7a 100644 --- a/src/core/algorithm/hnsw/hnsw_entity.h +++ b/src/core/algorithm/hnsw/hnsw_entity.h @@ -201,11 +201,21 @@ struct BufferPoolMemoryBlock { void *data) : buffer_pool_handle_(handle), buffer_block_id_(block_id), data_(data) {} + static BufferPoolMemoryBlock MakeOwned(void *owned_data) { + BufferPoolMemoryBlock b; + b.owns_buffer_ = true; + b.data_ = owned_data; + return b; + } + BufferPoolMemoryBlock(const BufferPoolMemoryBlock &rhs) : buffer_pool_handle_(rhs.buffer_pool_handle_), buffer_block_id_(rhs.buffer_block_id_), data_(rhs.data_) { - if (buffer_pool_handle_) { + if (rhs.owns_buffer_) { + owns_buffer_ = false; + buffer_pool_handle_ = nullptr; + } else if (buffer_pool_handle_) { buffer_pool_handle_->acquire_one(buffer_block_id_); } } @@ -216,7 +226,10 @@ struct BufferPoolMemoryBlock { buffer_pool_handle_ = rhs.buffer_pool_handle_; buffer_block_id_ = rhs.buffer_block_id_; data_ = rhs.data_; - if (buffer_pool_handle_) { + if (rhs.owns_buffer_) { + owns_buffer_ = false; + buffer_pool_handle_ = nullptr; + } else if (buffer_pool_handle_) { buffer_pool_handle_->acquire_one(buffer_block_id_); } } @@ -226,8 +239,10 @@ struct BufferPoolMemoryBlock { BufferPoolMemoryBlock(BufferPoolMemoryBlock &&rhs) noexcept : buffer_pool_handle_(rhs.buffer_pool_handle_), buffer_block_id_(rhs.buffer_block_id_), + owns_buffer_(rhs.owns_buffer_), data_(rhs.data_) { rhs.buffer_pool_handle_ = nullptr; + rhs.owns_buffer_ = false; rhs.data_ = nullptr; } @@ -236,8 +251,10 @@ struct BufferPoolMemoryBlock { release(); buffer_pool_handle_ = rhs.buffer_pool_handle_; buffer_block_id_ = rhs.buffer_block_id_; + owns_buffer_ = rhs.owns_buffer_; data_ = rhs.data_; rhs.buffer_pool_handle_ = nullptr; + rhs.owns_buffer_ = false; rhs.data_ = nullptr; } return *this; @@ -260,7 +277,12 @@ struct BufferPoolMemoryBlock { private: void release() { - if (buffer_pool_handle_) { + if (owns_buffer_) { + if (data_) { + ailego_free(data_); + } + owns_buffer_ = false; + } else if (buffer_pool_handle_) { buffer_pool_handle_->release_one(buffer_block_id_); buffer_pool_handle_ = nullptr; } @@ -269,6 +291,7 @@ struct BufferPoolMemoryBlock { ailego::VecBufferPoolHandle *buffer_pool_handle_{nullptr}; size_t buffer_block_id_{0}; + bool owns_buffer_{false}; void *data_{nullptr}; }; diff --git a/src/core/algorithm/hnsw/hnsw_streamer_entity.h b/src/core/algorithm/hnsw/hnsw_streamer_entity.h index 3dc6c9640..3c2fb0cea 100644 --- a/src/core/algorithm/hnsw/hnsw_streamer_entity.h +++ b/src/core/algorithm/hnsw/hnsw_streamer_entity.h @@ -638,9 +638,16 @@ HnswStreamerEntity::get_neighbors_typed( LOG_ERROR("Read neighbor header failed, ret=%zu", ret); return NeighborsT(); } - BufferPoolMemoryBlock block(mem_block.buffer_pool_handle_, - mem_block.buffer_block_id_, mem_block.data_); - mem_block.buffer_pool_handle_ = nullptr; + BufferPoolMemoryBlock block; + if (mem_block.type_ == IndexStorage::MemoryBlock::MBT_HEAP_SCRATCH) { + block = BufferPoolMemoryBlock::MakeOwned(mem_block.data_); + mem_block.data_ = nullptr; + mem_block.type_ = IndexStorage::MemoryBlock::MBT_UNKNOWN; + } else { + block = BufferPoolMemoryBlock(mem_block.buffer_pool_handle_, + mem_block.buffer_block_id_, mem_block.data_); + mem_block.buffer_pool_handle_ = nullptr; + } return NeighborsT(std::move(block)); } @@ -688,10 +695,19 @@ inline int HnswStreamerEntity::get_vector_typed( loc.second, read_size, ret); return IndexError_ReadData; } - vec_blocks[i] = - BufferPoolMemoryBlock(mem_block.buffer_pool_handle_, + vec_blocks[i] = [&]() { + if (mem_block.type_ == IndexStorage::MemoryBlock::MBT_HEAP_SCRATCH) { + BufferPoolMemoryBlock b = + BufferPoolMemoryBlock::MakeOwned(mem_block.data_); + mem_block.data_ = nullptr; + mem_block.type_ = IndexStorage::MemoryBlock::MBT_UNKNOWN; + return b; + } + BufferPoolMemoryBlock b(mem_block.buffer_pool_handle_, mem_block.buffer_block_id_, mem_block.data_); - mem_block.buffer_pool_handle_ = nullptr; + mem_block.buffer_pool_handle_ = nullptr; + return b; + }(); } return 0; } diff --git a/src/core/algorithm/vamana/vamana_streamer_entity.h b/src/core/algorithm/vamana/vamana_streamer_entity.h index ae2918786..ab8878cb3 100644 --- a/src/core/algorithm/vamana/vamana_streamer_entity.h +++ b/src/core/algorithm/vamana/vamana_streamer_entity.h @@ -352,9 +352,16 @@ VamanaStreamerEntity::get_neighbors_typed( LOG_ERROR("Read neighbor header failed, ret=%zu", ret); return NeighborsT(); } - BufferPoolMemoryBlock block(mem_block.buffer_pool_handle_, - mem_block.buffer_block_id_, mem_block.data_); - mem_block.buffer_pool_handle_ = nullptr; + BufferPoolMemoryBlock block; + if (mem_block.type_ == IndexStorage::MemoryBlock::MBT_HEAP_SCRATCH) { + block = BufferPoolMemoryBlock::MakeOwned(mem_block.data_); + mem_block.data_ = nullptr; + mem_block.type_ = IndexStorage::MemoryBlock::MBT_UNKNOWN; + } else { + block = BufferPoolMemoryBlock(mem_block.buffer_pool_handle_, + mem_block.buffer_block_id_, mem_block.data_); + mem_block.buffer_pool_handle_ = nullptr; + } return NeighborsT(std::move(block)); } @@ -392,10 +399,19 @@ inline int VamanaStreamerEntity::get_vector_typed( LOG_ERROR("Read vector failed, ret=%zu", ret); return IndexError_ReadData; } - vec_blocks[i] = - BufferPoolMemoryBlock(mem_block.buffer_pool_handle_, + vec_blocks[i] = [&]() { + if (mem_block.type_ == IndexStorage::MemoryBlock::MBT_HEAP_SCRATCH) { + BufferPoolMemoryBlock b = + BufferPoolMemoryBlock::MakeOwned(mem_block.data_); + mem_block.data_ = nullptr; + mem_block.type_ = IndexStorage::MemoryBlock::MBT_UNKNOWN; + return b; + } + BufferPoolMemoryBlock b(mem_block.buffer_pool_handle_, mem_block.buffer_block_id_, mem_block.data_); - mem_block.buffer_pool_handle_ = nullptr; + mem_block.buffer_pool_handle_ = nullptr; + return b; + }(); } return 0; } diff --git a/src/core/utility/buffer_storage.cc b/src/core/utility/buffer_storage.cc index 62d442a5b..d0a05fd37 100644 --- a/src/core/utility/buffer_storage.cc +++ b/src/core/utility/buffer_storage.cc @@ -80,15 +80,13 @@ class BufferStorage : public IndexStorage { } len = meta->data_size - offset; } - size_t buffer_offset = segment_header_start_offset_ + - segment_header_->content_offset + - segment_->meta()->data_index; - auto *raw = owner_->get_buffer(buffer_offset, capacity_, segment_id_); - if (!raw) { + size_t abs_offset = segment_header_start_offset_ + + segment_header_->content_offset + + segment_->meta()->data_index + offset; + if (!owner_->buffer_pool_handle_->read_range(abs_offset, len, + static_cast(buf))) { return 0; } - auto *data = raw + offset; - memmove(buf, data, len); return len; } @@ -101,14 +99,33 @@ class BufferStorage : public IndexStorage { } len = meta->data_size - offset; } - size_t buffer_offset = segment_header_start_offset_ + - segment_header_->content_offset + - segment_->meta()->data_index; - auto *raw = owner_->get_buffer(buffer_offset, capacity_, segment_id_); - if (!raw) { + size_t abs_offset = segment_header_start_offset_ + + segment_header_->content_offset + + segment_->meta()->data_index + offset; + size_t first_page = abs_offset / ailego::kVectorPageSize; + size_t last_page = (len == 0) + ? first_page + : (abs_offset + len - 1) / ailego::kVectorPageSize; + if (first_page == last_page) { + size_t page_id = 0; + char *raw = owner_->buffer_pool_handle_->get_single_page(abs_offset, + len, page_id); + if (!raw) { + return 0; + } + *data = raw; + return len; + } + char *tmp = static_cast(ailego_aligned_malloc(len, 4096)); + if (!tmp) { return 0; } - *data = raw + offset; + if (!owner_->buffer_pool_handle_->read_range(abs_offset, len, tmp)) { + ailego_free(tmp); + return 0; + } + owner_->register_tmp_buffer(tmp); + *data = tmp; return len; } @@ -120,21 +137,36 @@ class BufferStorage : public IndexStorage { } len = meta->data_size - offset; } - size_t buffer_offset = segment_header_start_offset_ + - segment_header_->content_offset + - segment_->meta()->data_index; - auto *raw = owner_->get_buffer(buffer_offset, capacity_, segment_id_); - if (!raw) { - return 0; - } - - data.reset(owner_->buffer_pool_handle_.get(), segment_id_, raw + offset); - if (data.data()) { + size_t abs_offset = segment_header_start_offset_ + + segment_header_->content_offset + + segment_->meta()->data_index + offset; + size_t first_page = abs_offset / ailego::kVectorPageSize; + size_t last_page = (len == 0) + ? first_page + : (abs_offset + len - 1) / ailego::kVectorPageSize; + if (first_page == last_page) { + size_t page_id = 0; + char *raw = owner_->buffer_pool_handle_->get_single_page(abs_offset, + len, page_id); + if (!raw) { + LOG_ERROR("read error (single-page acquire failed)."); + return -1; + } + data.reset(owner_->buffer_pool_handle_.get(), page_id, raw); return len; - } else { - LOG_ERROR("read error."); + } + char *tmp = static_cast(ailego_aligned_malloc(len, 4096)); + if (!tmp) { + LOG_ERROR("read error (alloc cross-page temp buffer failed)."); + return -1; + } + if (!owner_->buffer_pool_handle_->read_range(abs_offset, len, tmp)) { + ailego_free(tmp); + LOG_ERROR("read error (cross-page read_range failed)."); return -1; } + data = MemoryBlock::MakeOwned(tmp); + return len; } //! Write data into the storage with offset @@ -199,7 +231,7 @@ class BufferStorage : public IndexStorage { if (ret != 0) { return ret; } - ret = buffer_pool_->init(segments_.size()); + ret = buffer_pool_->init(); if (ret != 0) { return ret; } @@ -210,8 +242,22 @@ class BufferStorage : public IndexStorage { return 0; } - char *get_buffer(size_t offset, size_t length, size_t block_id) { - return buffer_pool_handle_->get_block(offset, length, block_id); + void register_tmp_buffer(char *buf) { + std::lock_guard latch(tmp_buffers_mutex_); + tmp_buffers_.push_back(buf); + } + + char *get_buffer(size_t offset, size_t length, size_t /*block_id*/) { + char *tmp = static_cast(ailego_aligned_malloc(length, 4096)); + if (!tmp) { + return nullptr; + } + if (!buffer_pool_handle_->read_range(offset, length, tmp)) { + ailego_free(tmp); + return nullptr; + } + register_tmp_buffer(tmp); + return tmp; } int get_meta(size_t offset, size_t length, char *out) { @@ -472,6 +518,15 @@ class BufferStorage : public IndexStorage { segments_.clear(); memset(&header_, 0, sizeof(header_)); memset(&footer_, 0, sizeof(footer_)); + { + std::lock_guard tmp_latch(tmp_buffers_mutex_); + for (char *p : tmp_buffers_) { + if (p) { + ailego_free(p); + } + } + tmp_buffers_.clear(); + } buffer_pool_handle_.reset(); buffer_pool_.reset(); max_segment_size_ = 0; @@ -503,6 +558,9 @@ class BufferStorage : public IndexStorage { bool index_dirty_{false}; mutable std::mutex mapping_mutex_{}; + std::vector tmp_buffers_{}; + mutable std::mutex tmp_buffers_mutex_{}; + // buffer manager std::string file_name_; IndexFormat::MetaHeader header_{}; diff --git a/src/include/zvec/ailego/buffer/vector_page_table.h b/src/include/zvec/ailego/buffer/vector_page_table.h index 653b7af53..c6a08c9da 100644 --- a/src/include/zvec/ailego/buffer/vector_page_table.h +++ b/src/include/zvec/ailego/buffer/vector_page_table.h @@ -42,16 +42,13 @@ namespace zvec { namespace ailego { +extern const size_t kVectorPageSize; + class VectorPageTable { - struct alignas(64) Entry { + struct Entry { std::atomic ref_count; - // True when this block has been enqueued in BlockEvictionQueue and has not - // yet been evicted. Used in release_block() to suppress duplicate - // insertions: once a block is in the eviction queue we never push it again - // until it is evicted (which resets the flag). std::atomic in_evict_queue; char *buffer; - size_t size; }; public: @@ -76,22 +73,17 @@ class VectorPageTable { void evict_block(block_id_t block_id); - char *set_block_acquired(block_id_t block_id, char *buffer, size_t size); + char *set_block_acquired(block_id_t block_id, char *buffer); size_t entry_num() const { return entry_num_; } - // Returns true if the block has no active references (ref_count <= 0). - // Used by VecBufferPool destructor to assert all handles are released. bool is_released(block_id_t block_id) const { assert(block_id < entry_num_); return entries_[block_id].ref_count.load(std::memory_order_relaxed) <= 0; } - // Returns true if the block is no longer registered in the eviction queue - // (either it was never added, or it has already been evicted). - // Used by BlockEvictionQueue to detect stale queue entries. inline bool is_dead_block(BlockEvictionQueue::BlockType block) const { Entry &entry = entries_[block.vector_block.first]; return !entry.in_evict_queue.load(std::memory_order_relaxed); @@ -108,12 +100,11 @@ class VecBufferPool { public: typedef std::shared_ptr Pointer; + static constexpr size_t kMutexBucketCount = 64UL * 1024UL; + VecBufferPool(const std::string &filename); ~VecBufferPool() { for (size_t i = 0; i < page_table_.entry_num(); ++i) { - // A positive ref_count means a VecBufferPoolHandle is still alive, - // which is a contract violation: all handles must be destroyed before - // the pool itself is destroyed. assert(page_table_.is_released(i)); page_table_.evict_block(i); } @@ -124,12 +115,11 @@ class VecBufferPool { #endif } - int init(size_t segment_count); + int init(); VecBufferPoolHandle get_handle(); - char *acquire_buffer(block_id_t block_id, size_t offset, size_t size, - int retry = 0); + char *acquire_buffer(block_id_t page_id, int retry = 0); int get_meta(size_t offset, size_t length, char *buffer); @@ -146,11 +136,7 @@ class VecBufferPool { VectorPageTable page_table_; private: - // Contiguous array of per-block mutexes (one allocation, cache-friendly for - // the cold-path load in acquire_buffer). block_mutexes_count_ mirrors the - // array length because unique_ptr has no built-in size accessor. std::unique_ptr block_mutexes_{}; - size_t block_mutexes_count_{0}; }; class VecBufferPoolHandle { @@ -162,7 +148,9 @@ class VecBufferPoolHandle { typedef std::shared_ptr Pointer; - char *get_block(size_t offset, size_t size, size_t block_id); + char *get_single_page(size_t file_offset, size_t len, size_t &out_page_id); + + bool read_range(size_t file_offset, size_t len, char *out); int get_meta(size_t offset, size_t length, char *buffer); diff --git a/src/include/zvec/core/framework/index_storage.h b/src/include/zvec/core/framework/index_storage.h index ac1052e86..530073aad 100644 --- a/src/include/zvec/core/framework/index_storage.h +++ b/src/include/zvec/core/framework/index_storage.h @@ -34,6 +34,7 @@ class IndexStorage : public IndexModule { MBT_UNKNOWN = 0, MBT_MMAP = 1, MBT_BUFFERPOOL = 2, + MBT_HEAP_SCRATCH = 3, }; MemoryBlock() {} @@ -46,9 +47,17 @@ class IndexStorage : public IndexModule { } MemoryBlock(void *data) : type_(MemoryBlockType::MBT_MMAP), data_(data) {} + static MemoryBlock MakeOwned(void *owned) { + MemoryBlock mb; + mb.type_ = MemoryBlockType::MBT_HEAP_SCRATCH; + mb.data_ = owned; + return mb; + } + MemoryBlock(const MemoryBlock &rhs) { switch (rhs.type_) { case MemoryBlockType::MBT_MMAP: + case MemoryBlockType::MBT_HEAP_SCRATCH: this->reset(rhs.data_); break; case MemoryBlockType::MBT_BUFFERPOOL: @@ -71,6 +80,12 @@ class IndexStorage : public IndexModule { rhs.buffer_pool_handle_ = nullptr; rhs.type_ = MemoryBlockType::MBT_UNKNOWN; break; + case MemoryBlockType::MBT_HEAP_SCRATCH: + type_ = MemoryBlockType::MBT_HEAP_SCRATCH; + data_ = rhs.data_; + rhs.data_ = nullptr; + rhs.type_ = MemoryBlockType::MBT_UNKNOWN; + break; default: break; } @@ -87,6 +102,9 @@ class IndexStorage : public IndexModule { rhs.data_); buffer_pool_handle_->acquire_one(buffer_block_id_); break; + case MemoryBlockType::MBT_HEAP_SCRATCH: + this->reset(rhs.data_); + break; default: break; } @@ -106,6 +124,13 @@ class IndexStorage : public IndexModule { rhs.buffer_pool_handle_ = nullptr; rhs.type_ = MemoryBlockType::MBT_UNKNOWN; break; + case MemoryBlockType::MBT_HEAP_SCRATCH: + release_owned(); + type_ = MemoryBlockType::MBT_HEAP_SCRATCH; + data_ = rhs.data_; + rhs.data_ = nullptr; + rhs.type_ = MemoryBlockType::MBT_UNKNOWN; + break; default: break; } @@ -122,6 +147,9 @@ class IndexStorage : public IndexModule { buffer_pool_handle_->release_one(buffer_block_id_); } break; + case MemoryBlockType::MBT_HEAP_SCRATCH: + release_owned(); + break; default: break; } @@ -136,6 +164,8 @@ class IndexStorage : public IndexModule { void *data) { if (type_ == MemoryBlockType::MBT_BUFFERPOOL) { buffer_pool_handle_->release_one(buffer_block_id_); + } else if (type_ == MemoryBlockType::MBT_HEAP_SCRATCH) { + release_owned(); } type_ = MemoryBlockType::MBT_BUFFERPOOL; buffer_pool_handle_ = buffer_pool_handle; @@ -147,6 +177,8 @@ class IndexStorage : public IndexModule { if (type_ == MemoryBlockType::MBT_BUFFERPOOL) { buffer_pool_handle_->release_one(buffer_block_id_); buffer_pool_handle_ = nullptr; + } else if (type_ == MemoryBlockType::MBT_HEAP_SCRATCH) { + release_owned(); } type_ = MemoryBlockType::MBT_MMAP; data_ = data; @@ -156,6 +188,14 @@ class IndexStorage : public IndexModule { void *data_{nullptr}; mutable ailego::VecBufferPoolHandle *buffer_pool_handle_{nullptr}; size_t buffer_block_id_{0}; + + private: + void release_owned() { + if (data_) { + ailego_free(data_); + data_ = nullptr; + } + } }; struct SegmentData {