diff --git a/src/ailego/buffer/vector_page_table.cc b/src/ailego/buffer/vector_page_table.cc index fec7a1902..78dcd3c69 100644 --- a/src/ailego/buffer/vector_page_table.cc +++ b/src/ailego/buffer/vector_page_table.cc @@ -12,6 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include +#include +#include #include #include @@ -36,122 +40,170 @@ static ssize_t zvec_pread(int fd, void *buf, size_t count, size_t offset) { } return static_cast(bytes_read); } +static ssize_t zvec_pwrite(int fd, const void *buf, size_t count, + size_t offset) { + HANDLE handle = reinterpret_cast(_get_osfhandle(fd)); + if (handle == INVALID_HANDLE_VALUE) return -1; + OVERLAPPED ov = {}; + ov.Offset = static_cast(offset & 0xFFFFFFFF); + ov.OffsetHigh = static_cast(offset >> 32); + DWORD bytes_written = 0; + if (!WriteFile(handle, buf, static_cast(count), &bytes_written, &ov)) { + return -1; + } + return static_cast(bytes_written); +} #endif namespace zvec { namespace ailego { +const size_t kVectorPageSize = MemoryHelper::PageSize(); + void VectorPageTable::init(size_t entry_num) { - if (entries_) { - delete[] entries_; + // Free old segments if any. + for (size_t i = 0; i < segment_count_; ++i) { + delete[] segments_[i]; + segments_[i] = nullptr; } entry_num_ = entry_num; - entries_ = new Entry[entry_num_]; - for (size_t i = 0; i < entry_num_; i++) { - entries_[i].ref_count.store(std::numeric_limits::min()); - entries_[i].in_evict_queue.store(false); - entries_[i].buffer = nullptr; + segment_count_ = (entry_num + kSegmentSize - 1) / kSegmentSize; + for (size_t s = 0; s < segment_count_; ++s) { + segments_[s] = new Entry[kSegmentSize]; + for (size_t i = 0; i < kSegmentSize; ++i) { + segments_[s][i].ref_count.store(std::numeric_limits::min()); + segments_[s][i].in_evict_queue.store(false); + segments_[s][i].is_dirty.store(false); + segments_[s][i].buffer = nullptr; + segments_[s][i].file_offset = 0; + } + } +} + +void VectorPageTable::extend(size_t new_entry_num) { + if (new_entry_num <= entry_num_) return; + size_t new_segment_count = (new_entry_num + kSegmentSize - 1) / kSegmentSize; + for (size_t s = segment_count_; s < new_segment_count; ++s) { + segments_[s] = new Entry[kSegmentSize]; + for (size_t i = 0; i < kSegmentSize; ++i) { + segments_[s][i].ref_count.store(std::numeric_limits::min()); + segments_[s][i].in_evict_queue.store(false); + segments_[s][i].is_dirty.store(false); + segments_[s][i].buffer = nullptr; + segments_[s][i].file_offset = 0; + } } + segment_count_ = new_segment_count; + entry_num_ = new_entry_num; } char *VectorPageTable::acquire_block(block_id_t block_id) { assert(block_id < entry_num_); - Entry &entry = entries_[block_id]; + Entry &e = entry_at(block_id); while (true) { - int current_count = entry.ref_count.load(std::memory_order_acquire); + int current_count = e.ref_count.load(std::memory_order_acquire); if (current_count < 0) { return nullptr; } - if (entry.ref_count.compare_exchange_weak(current_count, current_count + 1, - std::memory_order_acq_rel, - std::memory_order_acquire)) { - return entry.buffer; + if (e.ref_count.compare_exchange_weak(current_count, current_count + 1, + std::memory_order_acq_rel, + std::memory_order_acquire)) { + return e.buffer; } } } void VectorPageTable::release_block(block_id_t block_id) { assert(block_id < entry_num_); - Entry &entry = entries_[block_id]; + Entry &e = entry_at(block_id); - if (entry.ref_count.fetch_sub(1, std::memory_order_release) == 1) { + if (e.ref_count.fetch_sub(1, std::memory_order_release) == 1) { std::atomic_thread_fence(std::memory_order_acquire); - // Attempt to transition in_evict_queue from false -> true. The CAS ensures - // only one thread enqueues this block even if multiple threads race here. bool expected = false; - if (entry.in_evict_queue.compare_exchange_strong( - expected, true, std::memory_order_acq_rel, - std::memory_order_relaxed)) { + if (e.in_evict_queue.compare_exchange_strong(expected, true, + std::memory_order_acq_rel, + std::memory_order_relaxed)) { BlockEvictionQueue::BlockType block; block.page_table = this; block.vector_block.first = block_id; block.vector_block.second = 0; BlockEvictionQueue::get_instance().add_single_block(block, 0); } - // else: block is already in the eviction queue; do not add a duplicate - // entry. } } void VectorPageTable::evict_block(block_id_t block_id) { assert(block_id < entry_num_); - Entry &entry = entries_[block_id]; - char *buffer = entry.buffer; - size_t size = entry.size; + Entry &e = entry_at(block_id); int expected = 0; - if (entry.ref_count.compare_exchange_strong( - expected, std::numeric_limits::min())) { + // Two-phase eviction to prevent data race on e.buffer with + // set_block_acquired. We first CAS to kEvicting (-1), which causes + // set_block_acquired to spin-wait; then do the actual work (flush, free, + // null buffer); finally store INT_MIN ("evicted") which unblocks + // set_block_acquired. + static constexpr int kEvicting = -1; + if (e.ref_count.compare_exchange_strong(expected, kEvicting)) { + char *buffer = e.buffer; + if (buffer && e.is_dirty.load(std::memory_order_relaxed) && + flush_callback_) { + flush_callback_(block_id, buffer, kVectorPageSize, e.file_offset); + e.is_dirty.store(false, std::memory_order_relaxed); + } if (buffer) { - MemoryLimitPool::get_instance().release_buffer(buffer, size); + e.buffer = nullptr; + MemoryLimitPool::get_instance().release_buffer(buffer, kVectorPageSize); } + // Transition to fully-evicted state. Use release so that the + // set_block_acquired acquire-load sees e.buffer == nullptr. + e.ref_count.store(std::numeric_limits::min(), + std::memory_order_release); } - // Always reset in_evict_queue regardless of whether the CAS succeeded: - // - On success: the block is evicted; future releases should re-register it. - // - On failure: the block was re-acquired by another thread between the - // ref-count check and this call. Clearing in_evict_queue lets the next - // release_block() re-enqueue it so it is not silently lost. - entry.in_evict_queue.store(false, std::memory_order_relaxed); + e.in_evict_queue.store(false, std::memory_order_relaxed); } char *VectorPageTable::set_block_acquired(block_id_t block_id, char *buffer, - size_t size) { + size_t file_offset) { assert(block_id < entry_num_); - Entry &entry = entries_[block_id]; + Entry &e = entry_at(block_id); while (true) { - int current_count = entry.ref_count.load(std::memory_order_relaxed); + int current_count = e.ref_count.load(std::memory_order_acquire); if (current_count >= 0) { - // Defensive branch: in practice this path should never be reached. - // set_block_acquired() is always called under block_mutexes_[block_id], - // and the caller (acquire_buffer) re-checks acquire_block() inside the - // same lock before invoking this function. Therefore, if we get here, - // ref_count must still be negative (unloaded). This branch is retained - // as a safety net in case the locking contract is violated in the future, - // e.g. if set_block_acquired is called from an unlocked context. - if (entry.ref_count.compare_exchange_weak( - current_count, current_count + 1, std::memory_order_acq_rel, - std::memory_order_acquire)) { - MemoryLimitPool::get_instance().release_buffer(buffer, size); - return entry.buffer; + if (e.ref_count.compare_exchange_weak(current_count, current_count + 1, + std::memory_order_acq_rel, + std::memory_order_acquire)) { + MemoryLimitPool::get_instance().release_buffer(buffer, kVectorPageSize); + return e.buffer; } + } else if (current_count == std::numeric_limits::min()) { + // Fully evicted — safe to claim this entry for our new buffer. + e.buffer = buffer; + e.file_offset = file_offset; + e.in_evict_queue.store(false, std::memory_order_relaxed); + e.is_dirty.store(false, std::memory_order_relaxed); + e.ref_count.store(1, std::memory_order_release); + return e.buffer; } else { - entry.buffer = buffer; - entry.size = size; - // Ensure in_evict_queue is cleared when the block is freshly loaded so - // that the first release_block() after loading can register it in the - // eviction queue. - entry.in_evict_queue.store(false, std::memory_order_relaxed); - entry.ref_count.store(1, std::memory_order_release); - return entry.buffer; + // kEvicting (-1): eviction is in progress on this entry. Spin briefly + // until evict_block finishes (transitions to INT_MIN). + // This is a very short critical section (flush + free, ~μs). + std::this_thread::yield(); } } } -VecBufferPool::VecBufferPool(const std::string &filename) { +VecBufferPool::VecBufferPool(const std::string &filename, bool writable, + bool create) { file_name_ = filename; + writable_ = writable || create; #if defined(_MSC_VER) - fd_ = _open(filename.c_str(), O_RDONLY | _O_BINARY); + int flags = writable_ ? (create ? (O_RDWR | O_CREAT | O_TRUNC | _O_BINARY) + : (O_RDWR | _O_BINARY)) + : (O_RDONLY | _O_BINARY); + fd_ = _open(filename.c_str(), flags, 0644); #else - fd_ = open(filename.c_str(), O_RDONLY); + int flags = + writable_ ? (create ? (O_RDWR | O_CREAT | O_TRUNC) : O_RDWR) : O_RDONLY; + fd_ = ::open(filename.c_str(), flags, 0644); #endif if (fd_ < 0) { throw std::runtime_error("Failed to open file: " + filename); @@ -170,15 +222,38 @@ VecBufferPool::VecBufferPool(const std::string &filename) { file_size_ = st.st_size; } -int VecBufferPool::init(size_t segment_count) { - size_t block_num = segment_count + 10; +int VecBufferPool::init() { + size_t block_num = (file_size_ + kVectorPageSize - 1) / kVectorPageSize; page_table_.init(block_num); - // Allocate all mutexes in a single contiguous array so that the cold-path - // lock in acquire_buffer() accesses cache-friendly memory instead of - // chasing 31K+ independent heap pointers. - block_mutexes_ = std::make_unique(block_num); - block_mutexes_count_ = block_num; - LOG_DEBUG("entry num: %zu", page_table_.entry_num()); + block_mutexes_ = + std::make_unique(VecBufferPool::kMutexBucketCount); + LOG_DEBUG("entry num: %zu, file_size: %zu", page_table_.entry_num(), + file_size_); + + // In writable mode, inject a flush callback into the page table so that + // evict_block()/flush_block()/flush_all() can pwrite dirty blocks back to + // the backing file without needing to know about fd_ directly. + if (writable_) { + int fd = fd_; + const std::string &name = file_name_; + page_table_.set_flush_callback([fd, &name](block_id_t /*block_id*/, + char *buf, size_t sz, + size_t off) -> int { +#if defined(_MSC_VER) + ssize_t w = zvec_pwrite(fd, buf, sz, off); +#else + ssize_t w = ::pwrite(fd, buf, sz, off); +#endif + if (w != static_cast(sz)) { + LOG_ERROR( + "Buffer pool flush failed: file[%s], offset[%zu], " + "expected[%zu], got[%zd]", + name.c_str(), off, sz, w); + return -1; + } + return 0; + }); + } return 0; } @@ -186,54 +261,57 @@ VecBufferPoolHandle VecBufferPool::get_handle() { return VecBufferPoolHandle(*this); } -char *VecBufferPool::acquire_buffer(block_id_t block_id, size_t offset, - size_t size, int retry) { - assert(block_id < block_mutexes_count_); - char *buffer = page_table_.acquire_block(block_id); +char *VecBufferPool::acquire_buffer(block_id_t page_id, int retry) { + assert(page_id < page_table_.entry_num()); + char *buffer = page_table_.acquire_block(page_id); if (buffer) { return buffer; } - std::lock_guard lock(block_mutexes_[block_id]); - buffer = page_table_.acquire_block(block_id); + std::lock_guard lock( + block_mutexes_[page_id % VecBufferPool::kMutexBucketCount]); + buffer = page_table_.acquire_block(page_id); if (buffer) { return buffer; } { - bool found = - MemoryLimitPool::get_instance().try_acquire_buffer(size, buffer); + bool found = MemoryLimitPool::get_instance().try_acquire_buffer( + kVectorPageSize, buffer); if (!found) { for (int i = 0; i < retry; i++) { BlockEvictionQueue::get_instance().recycle(); - found = - MemoryLimitPool::get_instance().try_acquire_buffer(size, buffer); + found = MemoryLimitPool::get_instance().try_acquire_buffer( + kVectorPageSize, buffer); if (found) { break; } } } if (!found) { - LOG_ERROR( - "Buffer pool failed to get free buffer: file[%s], block_id[%zu], " - "offset[%zu], size[%zu]", - file_name_.c_str(), block_id, offset, size); + LOG_ERROR("Buffer pool failed to get free buffer: file[%s], page_id[%zu]", + file_name_.c_str(), page_id); return nullptr; } } + size_t page_offset = page_id * kVectorPageSize; + size_t expected_bytes = std::min(kVectorPageSize, file_size_ - page_offset); + if (expected_bytes < kVectorPageSize) { + std::memset(buffer + expected_bytes, 0, kVectorPageSize - expected_bytes); + } #if defined(_MSC_VER) - ssize_t read_bytes = zvec_pread(fd_, buffer, size, offset); + ssize_t read_bytes = zvec_pread(fd_, buffer, expected_bytes, page_offset); #else - ssize_t read_bytes = pread(fd_, buffer, size, offset); + ssize_t read_bytes = pread(fd_, buffer, expected_bytes, page_offset); #endif - if (read_bytes != static_cast(size)) { + if (read_bytes != static_cast(expected_bytes)) { LOG_ERROR( - "Buffer pool failed to read file at offset: file[%s], block_id[%zu], " - "offset[%zu], size[%zu]", - file_name_.c_str(), block_id, offset, size); - MemoryLimitPool::get_instance().release_buffer(buffer, size); + "Buffer pool failed to read file at offset: file[%s], page_id[%zu], " + "offset[%zu], expected[%zu], got[%zd]", + file_name_.c_str(), page_id, page_offset, expected_bytes, read_bytes); + MemoryLimitPool::get_instance().release_buffer(buffer, kVectorPageSize); return nullptr; } - return page_table_.set_block_acquired(block_id, buffer, size); + return page_table_.set_block_acquired(page_id, buffer, page_offset); } int VecBufferPool::get_meta(size_t offset, size_t length, char *buffer) { @@ -252,16 +330,171 @@ int VecBufferPool::get_meta(size_t offset, size_t length, char *buffer) { return 0; } -char *VecBufferPoolHandle::get_block(size_t offset, size_t size, - size_t block_id) { - char *buffer = pool_.acquire_buffer(block_id, offset, size, 50); - return buffer; +int VecBufferPool::write_range(size_t file_offset, size_t length, + const char *src) { + if (!writable_) { + LOG_ERROR("write_range called on read-only pool: file[%s]", + file_name_.c_str()); + return -1; + } + if (length == 0) { + return 0; + } + size_t first_page = file_offset / kVectorPageSize; + size_t last_page = (file_offset + length - 1) / kVectorPageSize; + size_t remaining = length; + size_t src_cursor = 0; + for (size_t pg = first_page; pg <= last_page; ++pg) { + // Loading the page ensures we do not clobber unrelated bytes within the + // same page when the write is not page-aligned. acquire_buffer() pre-fills + // from the backing file (or zero-pads beyond EOF). + char *page = this->acquire_buffer(pg, 50); + if (!page) { + LOG_ERROR("write_range acquire failed: file[%s], page[%zu]", + file_name_.c_str(), pg); + return -1; + } + size_t page_start = pg * kVectorPageSize; + size_t intra_offset = (pg == first_page) ? (file_offset - page_start) : 0; + size_t chunk = std::min(kVectorPageSize - intra_offset, remaining); + std::memcpy(page + intra_offset, src + src_cursor, chunk); + page_table_.mark_dirty(pg); + page_table_.release_block(pg); + src_cursor += chunk; + remaining -= chunk; + } + return 0; +} + +int VecBufferPool::write_meta(size_t offset, size_t length, + const char *buffer) { + if (!writable_) { + LOG_ERROR("write_meta called on read-only pool: file[%s]", + file_name_.c_str()); + return -1; + } +#if defined(_MSC_VER) + ssize_t w = zvec_pwrite(fd_, buffer, length, offset); +#else + ssize_t w = ::pwrite(fd_, buffer, length, offset); +#endif + if (w != static_cast(length)) { + LOG_ERROR( + "Buffer pool failed to write meta: file[%s], offset[%zu], " + "length[%zu], got[%zd]", + file_name_.c_str(), offset, length, w); + return -1; + } + return 0; +} + +int VecBufferPool::flush_all() { + if (!writable_) { + return 0; + } + int rc = 0; + for (size_t i = 0; i < page_table_.entry_num(); ++i) { + if (page_table_.is_block_dirty(i)) { + int r = page_table_.flush_block(i); + if (r != 0) { + rc = r; + } + } + } + return rc; +} + +bool VecBufferPool::extend_file(size_t new_size) { + if (!writable_) { + LOG_ERROR("extend_file called on read-only pool: file[%s]", + file_name_.c_str()); + return false; + } + if (new_size <= file_size_) { + return true; + } +#if defined(_MSC_VER) + if (_chsize_s(fd_, static_cast(new_size)) != 0) { + LOG_ERROR("extend_file _chsize_s failed: file[%s], new_size[%zu]", + file_name_.c_str(), new_size); + return false; + } +#else + if (::ftruncate(fd_, static_cast(new_size)) != 0) { + LOG_ERROR("extend_file ftruncate failed: file[%s], new_size[%zu]", + file_name_.c_str(), new_size); + return false; + } +#endif + file_size_ = new_size; + // Extend the page table to cover the new file range. Existing entries + // stay at their original addresses so concurrent readers are unaffected. + size_t new_entry_num = (file_size_ + kVectorPageSize - 1) / kVectorPageSize; + if (new_entry_num > page_table_.entry_num()) { + page_table_.extend(new_entry_num); + } + return true; +} + +char *VecBufferPoolHandle::get_single_page(size_t file_offset, size_t len, + size_t &out_page_id) { + size_t first_page = file_offset / kVectorPageSize; + assert(len == 0 || (file_offset + len - 1) / kVectorPageSize == first_page); + out_page_id = first_page; + char *page = pool_.acquire_buffer(first_page, 50); + if (!page) { + return nullptr; + } + return page + (file_offset - first_page * kVectorPageSize); +} + +bool VecBufferPoolHandle::read_range(size_t file_offset, size_t len, + char *out) { + if (len == 0) { + return true; + } + size_t first_page = file_offset / kVectorPageSize; + size_t last_page = (file_offset + len - 1) / kVectorPageSize; + size_t remaining = len; + size_t dst_cursor = 0; + for (size_t pg = first_page; pg <= last_page; ++pg) { + char *page = pool_.acquire_buffer(pg, 50); + if (!page) { + return false; + } + size_t page_start = pg * kVectorPageSize; + size_t intra_offset = (pg == first_page) ? (file_offset - page_start) : 0; + size_t chunk = std::min(kVectorPageSize - intra_offset, remaining); + std::memcpy(out + dst_cursor, page + intra_offset, chunk); + pool_.page_table_.release_block(pg); + dst_cursor += chunk; + remaining -= chunk; + } + return true; } int VecBufferPoolHandle::get_meta(size_t offset, size_t length, char *buffer) { return pool_.get_meta(offset, length, buffer); } +int VecBufferPoolHandle::write_range(size_t file_offset, size_t len, + const char *src) { + return pool_.write_range(file_offset, len, src); +} + +int VecBufferPoolHandle::write_meta(size_t offset, size_t length, + const char *buffer) { + return pool_.write_meta(offset, length, buffer); +} + +int VecBufferPoolHandle::flush_all() { + return pool_.flush_all(); +} + +bool VecBufferPoolHandle::writable() const { + return pool_.writable(); +} + void VecBufferPoolHandle::release_one(block_id_t block_id) { pool_.page_table_.release_block(block_id); } diff --git a/src/core/algorithm/flat/flat_streamer.cc b/src/core/algorithm/flat/flat_streamer.cc index 8969efc14..5e6171659 100644 --- a/src/core/algorithm/flat/flat_streamer.cc +++ b/src/core/algorithm/flat/flat_streamer.cc @@ -34,7 +34,7 @@ FlatStreamer::FlatStreamer() : entity_(stats_) {} template FlatStreamer::~FlatStreamer() { - if (state_ == STATE_INITED) { + if (state_ == STATE_INITED || state_ == STATE_OPENED) { this->cleanup(); } } diff --git a/src/core/algorithm/flat/flat_streamer_entity.cc b/src/core/algorithm/flat/flat_streamer_entity.cc index 988f5fdfb..87d9a1906 100644 --- a/src/core/algorithm/flat/flat_streamer_entity.cc +++ b/src/core/algorithm/flat/flat_streamer_entity.cc @@ -165,13 +165,20 @@ int FlatStreamerEntity::add(uint64_t key, const void *vec, size_t size) { IndexStorage::MemoryBlock head_block; this->get_head_block(head_block); - const BlockLocation *bl = - reinterpret_cast(head_block.data()); - if (ailego_unlikely(bl == nullptr)) { - LOG_ERROR("Failed to get block loc"); - return IndexError_ReadData; + BlockLocation block; + { + const BlockLocation *bl = + reinterpret_cast(head_block.data()); + if (ailego_unlikely(bl == nullptr)) { + LOG_ERROR("Failed to get block loc"); + return IndexError_ReadData; + } + block = *bl; } - BlockLocation block = *bl; + // Release the head block reference early so that the buffer pool ref_count + // and memory budget held by it do not block subsequent acquire/evict in this + // function (alloc_block / add_to_block may compete for the same memory). + head_block.reset(nullptr); if (!this->is_valid_block(block)) { int ret = this->alloc_block(block, &block); @@ -922,6 +929,9 @@ int FlatStreamerEntity::add_vector_with_id(const uint32_t id, const void *query, this->get_head_block(head_block); BlockLocation block = *reinterpret_cast(head_block.data()); + // Release buffer-pool pin before any alloc_block() call that may trigger + // append_segment() and rebuild the pool (same reason as in add()). + head_block.reset(nullptr); if (!this->is_valid_block(block)) { int ret = this->alloc_block(block, &block); if (ailego_unlikely(ret != 0)) { diff --git a/src/core/algorithm/hnsw/hnsw_entity.h b/src/core/algorithm/hnsw/hnsw_entity.h index a6ead8f63..bae57ec7a 100644 --- a/src/core/algorithm/hnsw/hnsw_entity.h +++ b/src/core/algorithm/hnsw/hnsw_entity.h @@ -201,11 +201,21 @@ struct BufferPoolMemoryBlock { void *data) : buffer_pool_handle_(handle), buffer_block_id_(block_id), data_(data) {} + static BufferPoolMemoryBlock MakeOwned(void *owned_data) { + BufferPoolMemoryBlock b; + b.owns_buffer_ = true; + b.data_ = owned_data; + return b; + } + BufferPoolMemoryBlock(const BufferPoolMemoryBlock &rhs) : buffer_pool_handle_(rhs.buffer_pool_handle_), buffer_block_id_(rhs.buffer_block_id_), data_(rhs.data_) { - if (buffer_pool_handle_) { + if (rhs.owns_buffer_) { + owns_buffer_ = false; + buffer_pool_handle_ = nullptr; + } else if (buffer_pool_handle_) { buffer_pool_handle_->acquire_one(buffer_block_id_); } } @@ -216,7 +226,10 @@ struct BufferPoolMemoryBlock { buffer_pool_handle_ = rhs.buffer_pool_handle_; buffer_block_id_ = rhs.buffer_block_id_; data_ = rhs.data_; - if (buffer_pool_handle_) { + if (rhs.owns_buffer_) { + owns_buffer_ = false; + buffer_pool_handle_ = nullptr; + } else if (buffer_pool_handle_) { buffer_pool_handle_->acquire_one(buffer_block_id_); } } @@ -226,8 +239,10 @@ struct BufferPoolMemoryBlock { BufferPoolMemoryBlock(BufferPoolMemoryBlock &&rhs) noexcept : buffer_pool_handle_(rhs.buffer_pool_handle_), buffer_block_id_(rhs.buffer_block_id_), + owns_buffer_(rhs.owns_buffer_), data_(rhs.data_) { rhs.buffer_pool_handle_ = nullptr; + rhs.owns_buffer_ = false; rhs.data_ = nullptr; } @@ -236,8 +251,10 @@ struct BufferPoolMemoryBlock { release(); buffer_pool_handle_ = rhs.buffer_pool_handle_; buffer_block_id_ = rhs.buffer_block_id_; + owns_buffer_ = rhs.owns_buffer_; data_ = rhs.data_; rhs.buffer_pool_handle_ = nullptr; + rhs.owns_buffer_ = false; rhs.data_ = nullptr; } return *this; @@ -260,7 +277,12 @@ struct BufferPoolMemoryBlock { private: void release() { - if (buffer_pool_handle_) { + if (owns_buffer_) { + if (data_) { + ailego_free(data_); + } + owns_buffer_ = false; + } else if (buffer_pool_handle_) { buffer_pool_handle_->release_one(buffer_block_id_); buffer_pool_handle_ = nullptr; } @@ -269,6 +291,7 @@ struct BufferPoolMemoryBlock { ailego::VecBufferPoolHandle *buffer_pool_handle_{nullptr}; size_t buffer_block_id_{0}; + bool owns_buffer_{false}; void *data_{nullptr}; }; diff --git a/src/core/algorithm/hnsw/hnsw_index_hash.h b/src/core/algorithm/hnsw/hnsw_index_hash.h index 1557dcd93..cc59e84ab 100644 --- a/src/core/algorithm/hnsw/hnsw_index_hash.h +++ b/src/core/algorithm/hnsw/hnsw_index_hash.h @@ -41,9 +41,9 @@ class HnswIndexHashMap { items_(reinterpret_cast(data)) {} //! Return a empty loc or the key item loc - Slot(Chunk::Pointer &&chunk, IndexStorage::MemoryBlock &&mem_block) - : chunk_(std::move(chunk)), items_block_(std::move(mem_block)) { - items_ = reinterpret_cast(items_block_.data()); + Slot(Chunk::Pointer &&chunk, std::vector &&local_data) + : chunk_(std::move(chunk)), local_data_(std::move(local_data)) { + items_ = reinterpret_cast(local_data_.data()); } const_iterator find(key_type key, uint32_t max_items, uint32_t mask) const { auto it = &items_[key & mask]; @@ -73,8 +73,8 @@ class HnswIndexHashMap { private: Chunk::Pointer chunk_{}; - const Item *items_{nullptr}; // point to chunk data - IndexStorage::MemoryBlock items_block_{}; + const Item *items_{nullptr}; // point to local_data_ + std::vector local_data_{}; }; public: @@ -114,9 +114,9 @@ class HnswIndexHashMap { } int cleanup(void) { - broker_.reset(); slots_.clear(); slots_.shrink_to_fit(); + broker_.reset(); mask_bits_ = 0U; slot_items_ = 0U; slot_loc_mask_ = 0U; @@ -141,7 +141,6 @@ class HnswIndexHashMap { auto idx = key >> mask_bits_; if (idx >= slots_.size()) { if (ailego_unlikely(idx >= slots_.capacity())) { - LOG_ERROR("no space to insert"); return false; } for (auto i = slots_.size(); i <= idx; ++i) { @@ -152,7 +151,6 @@ class HnswIndexHashMap { } auto it = slots_[idx].find(key, slot_items_, slot_loc_mask_); if (ailego_unlikely(it == nullptr)) { - LOG_ERROR("no space to insert"); return false; } @@ -179,14 +177,10 @@ class HnswIndexHashMap { LOG_ERROR("Chunk resize failed, size=%zu", size); return false; } - //! Read the whole data to memory - IndexStorage::MemoryBlock data_block; - if (ailego_unlikely(chunk->read(0U, data_block, size) != size)) { - LOG_ERROR("Chunk read failed, size=%zu", size); - return false; - } - - slots_.emplace_back(std::move(chunk), std::move(data_block)); + //! Use a local zero-initialized buffer; new chunks contain all zeros, + //! so no buffer-pool read is needed and no ref_count is pinned. + std::vector local_buf(size, 0); + slots_.emplace_back(std::move(chunk), std::move(local_buf)); return true; } @@ -208,13 +202,14 @@ class HnswIndexHashMap { i, chunk->data_size(), size); return IndexError_InvalidFormat; } - //! Read the whole data to memory - IndexStorage::MemoryBlock data_block; - if (ailego_unlikely(chunk->read(0U, data_block, size) != size)) { - LOG_ERROR("Chunk read failed, size=%zu", size); - return false; + //! Copy chunk data into a local buffer via fetch() so that no + //! buffer-pool block is pinned for the lifetime of the Slot. + std::vector local_buf(size); + if (ailego_unlikely(chunk->fetch(0U, local_buf.data(), size) != size)) { + LOG_ERROR("Chunk fetch failed, size=%zu", size); + return IndexError_InvalidFormat; } - slots_.emplace_back(std::move(chunk), std::move(data_block)); + slots_.emplace_back(std::move(chunk), std::move(local_buf)); } return 0; } diff --git a/src/core/algorithm/hnsw/hnsw_streamer.cc b/src/core/algorithm/hnsw/hnsw_streamer.cc index 935cae5d4..c5e78f415 100644 --- a/src/core/algorithm/hnsw/hnsw_streamer.cc +++ b/src/core/algorithm/hnsw/hnsw_streamer.cc @@ -28,7 +28,7 @@ namespace core { HnswStreamer::HnswStreamer() = default; HnswStreamer::~HnswStreamer() { - if (state_ == STATE_INITED) { + if (state_ == STATE_INITED || state_ == STATE_OPENED) { this->cleanup(); } } diff --git a/src/core/algorithm/hnsw/hnsw_streamer_entity.cc b/src/core/algorithm/hnsw/hnsw_streamer_entity.cc index acc9bee36..a8ada19e6 100644 --- a/src/core/algorithm/hnsw/hnsw_streamer_entity.cc +++ b/src/core/algorithm/hnsw/hnsw_streamer_entity.cc @@ -37,6 +37,7 @@ int HnswStreamerEntity::init(size_t max_doc_cnt) { std::lock_guard lock(mutex_); broker_ = std::make_shared(stats_); upper_neighbor_index_ = std::make_shared(); + upper_neighbor_rw_mutex_ = std::make_shared(); keys_map_lock_ = std::make_shared(); keys_map_ = std::make_shared>(); if (!keys_map_ || !upper_neighbor_index_ || !broker_ || !keys_map_lock_) { @@ -767,9 +768,10 @@ const HnswEntity::Pointer HnswStreamerEntity::clone() const { HnswStreamerEntity *entity = new (std::nothrow) HnswStreamerEntity( stats_, header(), chunk_size_, node_index_mask_bits_, upper_neighbor_mask_bits_, filter_same_key_, get_vector_enabled_, - upper_neighbor_index_, keys_map_lock_, keys_map_, use_key_info_map_, - std::move(node_chunks), std::move(upper_neighbor_chunks), broker_, - node_chunk_bases_, upper_neighbor_chunk_bases_); + upper_neighbor_index_, upper_neighbor_rw_mutex_, keys_map_lock_, + keys_map_, use_key_info_map_, std::move(node_chunks), + std::move(upper_neighbor_chunks), broker_, node_chunk_bases_, + upper_neighbor_chunk_bases_); if (ailego_unlikely(!entity)) { LOG_ERROR("HnswStreamerEntity new failed"); } @@ -800,9 +802,9 @@ const HnswEntity::Pointer HnswMmapStreamerEntity::clone() const { auto *entity = new (std::nothrow) HnswMmapStreamerEntity( stats_, header(), chunk_size_, node_index_mask_bits_, upper_neighbor_mask_bits_, filter_same_key_, get_vector_enabled_, - upper_neighbor_index_, keys_map_lock_, keys_map_, use_key_info_map_, - std::move(node_chunks), std::move(upper_neighbor_chunks), broker_, - nullptr, nullptr); + upper_neighbor_index_, upper_neighbor_rw_mutex_, keys_map_lock_, + keys_map_, use_key_info_map_, std::move(node_chunks), + std::move(upper_neighbor_chunks), broker_, nullptr, nullptr); if (ailego_unlikely(!entity)) { LOG_ERROR("HnswMmapStreamerEntity new failed"); } @@ -833,9 +835,9 @@ const HnswEntity::Pointer HnswContiguousStreamerEntity::clone() const { auto *entity = new (std::nothrow) HnswContiguousStreamerEntity( stats_, header(), chunk_size_, node_index_mask_bits_, upper_neighbor_mask_bits_, filter_same_key_, get_vector_enabled_, - upper_neighbor_index_, keys_map_lock_, keys_map_, use_key_info_map_, - std::move(node_chunks), std::move(upper_neighbor_chunks), broker_, - nullptr, nullptr); + upper_neighbor_index_, upper_neighbor_rw_mutex_, keys_map_lock_, + keys_map_, use_key_info_map_, std::move(node_chunks), + std::move(upper_neighbor_chunks), broker_, nullptr, nullptr); if (ailego_unlikely(!entity)) { LOG_ERROR("HnswContiguousStreamerEntity new failed"); return HnswEntity::Pointer(); diff --git a/src/core/algorithm/hnsw/hnsw_streamer_entity.h b/src/core/algorithm/hnsw/hnsw_streamer_entity.h index 3dc6c9640..677393de3 100644 --- a/src/core/algorithm/hnsw/hnsw_streamer_entity.h +++ b/src/core/algorithm/hnsw/hnsw_streamer_entity.h @@ -17,6 +17,7 @@ #include #include #include +#include #if defined(__linux__) || defined(__APPLE__) #include #endif @@ -246,19 +247,19 @@ class HnswStreamerEntity : public HnswEntity { using NIHashMapPointer = std::shared_ptr; //! Clone construct, used by clone method in subclasses - HnswStreamerEntity(IndexStreamer::Stats &stats, const HNSWHeader &hd, - size_t chunk_size, uint32_t node_index_mask_bits, - uint32_t upper_neighbor_mask_bits, bool filter_same_key, - bool get_vector_enabled, - const NIHashMapPointer &upper_neighbor_index, - std::shared_ptr &keys_map_lock, - const HashMapPointer &keys_map, - bool use_key_info_map, - std::vector &&node_chunks, - std::vector &&upper_neighbor_chunks, - const ChunkBroker::Pointer &broker, - std::shared_ptr> node_bases, - std::shared_ptr> upper_bases) + HnswStreamerEntity( + IndexStreamer::Stats &stats, const HNSWHeader &hd, size_t chunk_size, + uint32_t node_index_mask_bits, uint32_t upper_neighbor_mask_bits, + bool filter_same_key, bool get_vector_enabled, + const NIHashMapPointer &upper_neighbor_index, + const std::shared_ptr &upper_neighbor_rw_mutex, + std::shared_ptr &keys_map_lock, + const HashMapPointer &keys_map, bool use_key_info_map, + std::vector &&node_chunks, + std::vector &&upper_neighbor_chunks, + const ChunkBroker::Pointer &broker, + std::shared_ptr> node_bases, + std::shared_ptr> upper_bases) : stats_(stats), chunk_size_(chunk_size), node_index_mask_bits_(node_index_mask_bits), @@ -269,6 +270,7 @@ class HnswStreamerEntity : public HnswEntity { filter_same_key_(filter_same_key), get_vector_enabled_(get_vector_enabled), use_key_info_map_(use_key_info_map), + upper_neighbor_rw_mutex_(upper_neighbor_rw_mutex), upper_neighbor_index_(upper_neighbor_index), keys_map_lock_(keys_map_lock), keys_map_(keys_map), @@ -323,6 +325,10 @@ class HnswStreamerEntity : public HnswEntity { inline std::pair get_upper_neighbor_chunk_loc( level_t level, node_id_t id) const { + // Shared lock: concurrent readers are fine, but must synchronize with + // add_upper_neighbor's exclusive lock to avoid data-race on + // slots_.size() inside HnswIndexHashMap. + std::shared_lock lk(*upper_neighbor_rw_mutex_); auto it = upper_neighbor_index_->find(id); ailego_assert_abort(it != upper_neighbor_index_->end(), "Get upper neighbor header failed"); @@ -370,6 +376,10 @@ class HnswStreamerEntity : public HnswEntity { if (level == 0) { return 0; } + // Exclusive lock: protects upper_neighbor_chunks_.emplace_back() and + // upper_neighbor_index_->insert() from racing with concurrent find() + // calls in get_upper_neighbor_chunk_loc(). + std::unique_lock lk(*upper_neighbor_rw_mutex_); Chunk::Pointer chunk; uint64_t chunk_offset = UINT64_MAX; size_t neighbors_size = get_total_upper_neighbors_size(level); @@ -408,17 +418,40 @@ class HnswStreamerEntity : public HnswEntity { meta.level = level; meta.index = (chunk_index << upper_neighbor_mask_bits_) | (chunk_offset / upper_neighbor_size_); + size_t zero_start = chunk_offset; chunk_offset += upper_neighbor_size_ * level; - if (ailego_unlikely(!upper_neighbor_index_->insert(id, meta.data))) { - LOG_ERROR("HashMap insert value failed"); - return IndexError_Runtime; - } + // IMPORTANT: order matters here. + // 1) resize so the chunk's data_size covers the new region. + // 2) zero-fill the new region: storage backends like BufferStorage do + // NOT zero on resize -- only metadata is updated, and the underlying + // page may contain stale content from a previously-evicted page. + // Without this step, NeighborsHeader::neighbor_cnt is garbage and + // select_entry_point()/search_neighbors() iterate over garbage + // node_ids, eventually triggering find()'s assertion in + // get_upper_neighbor_chunk_loc(). + // 3) ONLY THEN publish the entry to upper_neighbor_index_, so that any + // concurrent reader that finds this id already sees a properly + // zeroed upper-neighbor slot. if (ailego_unlikely(chunk->resize(chunk_offset) != chunk_offset)) { LOG_ERROR("Chunk resize to %zu failed", (size_t)chunk_offset); return IndexError_Runtime; } + // Use std::vector instead of a VLA: VLAs are a GNU extension and may + // produce different codegen / be rejected under clang/MSVC. + std::vector zeros(neighbors_size, 0); + if (ailego_unlikely(chunk->write(zero_start, zeros.data(), + neighbors_size) != neighbors_size)) { + LOG_ERROR("Chunk write zeros failed"); + return IndexError_Runtime; + } + + if (ailego_unlikely(!upper_neighbor_index_->insert(id, meta.data))) { + LOG_ERROR("HashMap insert value failed"); + return IndexError_Runtime; + } + return 0; } @@ -529,6 +562,10 @@ class HnswStreamerEntity : public HnswEntity { protected: IndexStreamer::Stats &stats_; std::mutex mutex_{}; + //! Guards upper_neighbor_index_ and upper_neighbor_chunks_ against + //! concurrent reads (find) and writes (insert/emplace_back). + //! Shared via shared_ptr so all clones synchronize on the SAME mutex. + mutable std::shared_ptr upper_neighbor_rw_mutex_{}; size_t max_index_size_{0UL}; uint32_t chunk_size_{kDefaultChunkSize}; uint32_t upper_neighbor_chunk_size_{kDefaultChunkSize}; @@ -638,9 +675,16 @@ HnswStreamerEntity::get_neighbors_typed( LOG_ERROR("Read neighbor header failed, ret=%zu", ret); return NeighborsT(); } - BufferPoolMemoryBlock block(mem_block.buffer_pool_handle_, - mem_block.buffer_block_id_, mem_block.data_); - mem_block.buffer_pool_handle_ = nullptr; + BufferPoolMemoryBlock block; + if (mem_block.type_ == IndexStorage::MemoryBlock::MBT_HEAP_SCRATCH) { + block = BufferPoolMemoryBlock::MakeOwned(mem_block.data_); + mem_block.data_ = nullptr; + mem_block.type_ = IndexStorage::MemoryBlock::MBT_UNKNOWN; + } else { + block = BufferPoolMemoryBlock(mem_block.buffer_pool_handle_, + mem_block.buffer_block_id_, mem_block.data_); + mem_block.buffer_pool_handle_ = nullptr; + } return NeighborsT(std::move(block)); } @@ -688,10 +732,19 @@ inline int HnswStreamerEntity::get_vector_typed( loc.second, read_size, ret); return IndexError_ReadData; } - vec_blocks[i] = - BufferPoolMemoryBlock(mem_block.buffer_pool_handle_, + vec_blocks[i] = [&]() { + if (mem_block.type_ == IndexStorage::MemoryBlock::MBT_HEAP_SCRATCH) { + BufferPoolMemoryBlock b = + BufferPoolMemoryBlock::MakeOwned(mem_block.data_); + mem_block.data_ = nullptr; + mem_block.type_ = IndexStorage::MemoryBlock::MBT_UNKNOWN; + return b; + } + BufferPoolMemoryBlock b(mem_block.buffer_pool_handle_, mem_block.buffer_block_id_, mem_block.data_); - mem_block.buffer_pool_handle_ = nullptr; + mem_block.buffer_pool_handle_ = nullptr; + return b; + }(); } return 0; } diff --git a/src/core/algorithm/hnsw_rabitq/hnsw_rabitq_index_hash.h b/src/core/algorithm/hnsw_rabitq/hnsw_rabitq_index_hash.h index 4f01aabb3..bf3dc1e7c 100644 --- a/src/core/algorithm/hnsw_rabitq/hnsw_rabitq_index_hash.h +++ b/src/core/algorithm/hnsw_rabitq/hnsw_rabitq_index_hash.h @@ -41,9 +41,9 @@ class HnswIndexHashMap { items_(reinterpret_cast(data)) {} //! Return a empty loc or the key item loc - Slot(Chunk::Pointer &&chunk, IndexStorage::MemoryBlock &&mem_block) - : chunk_(std::move(chunk)), items_block_(std::move(mem_block)) { - items_ = reinterpret_cast(items_block_.data()); + Slot(Chunk::Pointer &&chunk, std::vector &&local_data) + : chunk_(std::move(chunk)), local_data_(std::move(local_data)) { + items_ = reinterpret_cast(local_data_.data()); } const_iterator find(key_type key, uint32_t max_items, uint32_t mask) const { auto it = &items_[key & mask]; @@ -73,8 +73,8 @@ class HnswIndexHashMap { private: Chunk::Pointer chunk_{}; - const Item *items_{nullptr}; // point to chunk data - IndexStorage::MemoryBlock items_block_{}; + const Item *items_{nullptr}; // point to local_data_ + std::vector local_data_{}; }; public: @@ -179,14 +179,18 @@ class HnswIndexHashMap { LOG_ERROR("Chunk resize failed, size=%zu", size); return false; } - //! Read the whole data to memory - IndexStorage::MemoryBlock data_block; - if (ailego_unlikely(chunk->read(0U, data_block, size) != size)) { - LOG_ERROR("Chunk read failed, size=%zu", size); - return false; - } - - slots_.emplace_back(std::move(chunk), std::move(data_block)); + //! Use a local zero-initialized buffer; new chunks contain all zeros, + //! so no buffer-pool read is needed and no ref_count is pinned. + //! NOTE: Previously this used `chunk->read(0U, data_block, size)` which + //! returns a view into the underlying BufferPool page. That made the + //! Slot's `items_` pointer alias buffer-pool memory shared across + //! threads, which under clang -O3 release exposed a data race on + //! Slot::find()'s probing read of `it->second` (concurrent + //! const_cast writes from insert() were not reliably visible). Using a + //! private zero-initialized vector matches the HNSW (non-RABITQ) + //! implementation and avoids this race. + std::vector local_buf(size, 0); + slots_.emplace_back(std::move(chunk), std::move(local_buf)); return true; } @@ -208,13 +212,14 @@ class HnswIndexHashMap { i, chunk->data_size(), size); return IndexError_InvalidFormat; } - //! Read the whole data to memory - IndexStorage::MemoryBlock data_block; - if (ailego_unlikely(chunk->read(0U, data_block, size) != size)) { - LOG_ERROR("Chunk read failed, size=%zu", size); - return false; + //! Copy chunk data into a local buffer via fetch() so that no + //! buffer-pool block is pinned for the lifetime of the Slot. + std::vector local_buf(size); + if (ailego_unlikely(chunk->fetch(0U, local_buf.data(), size) != size)) { + LOG_ERROR("Chunk fetch failed, size=%zu", size); + return IndexError_InvalidFormat; } - slots_.emplace_back(std::move(chunk), std::move(data_block)); + slots_.emplace_back(std::move(chunk), std::move(local_buf)); } return 0; } diff --git a/src/core/algorithm/hnsw_rabitq/hnsw_rabitq_streamer.cc b/src/core/algorithm/hnsw_rabitq/hnsw_rabitq_streamer.cc index 9eacf0bc6..2ea2f6aa0 100644 --- a/src/core/algorithm/hnsw_rabitq/hnsw_rabitq_streamer.cc +++ b/src/core/algorithm/hnsw_rabitq/hnsw_rabitq_streamer.cc @@ -40,7 +40,7 @@ HnswRabitqStreamer::HnswRabitqStreamer(IndexProvider::Pointer provider, provider_(std::move(provider)) {} HnswRabitqStreamer::~HnswRabitqStreamer() { - if (state_ == STATE_INITED) { + if (state_ == STATE_INITED || state_ == STATE_OPENED) { this->cleanup(); } } diff --git a/src/core/algorithm/hnsw_rabitq/hnsw_rabitq_streamer_entity.cc b/src/core/algorithm/hnsw_rabitq/hnsw_rabitq_streamer_entity.cc index 35501ed94..cef59c35c 100644 --- a/src/core/algorithm/hnsw_rabitq/hnsw_rabitq_streamer_entity.cc +++ b/src/core/algorithm/hnsw_rabitq/hnsw_rabitq_streamer_entity.cc @@ -34,6 +34,7 @@ int HnswRabitqStreamerEntity::init(size_t max_doc_cnt) { std::lock_guard lock(mutex_); broker_ = std::make_shared(stats_); upper_neighbor_index_ = std::make_shared(); + upper_neighbor_rw_mutex_ = std::make_shared(); keys_map_lock_ = std::make_shared(); keys_map_ = std::make_shared>(); if (!keys_map_ || !upper_neighbor_index_ || !broker_ || !keys_map_lock_) { @@ -697,8 +698,9 @@ const HnswRabitqEntity::Pointer HnswRabitqStreamerEntity::clone() const { new (std::nothrow) HnswRabitqStreamerEntity( stats_, header(), chunk_size_, node_index_mask_bits_, upper_neighbor_mask_bits_, filter_same_key_, get_vector_enabled_, - upper_neighbor_index_, keys_map_lock_, keys_map_, use_key_info_map_, - std::move(node_chunks), std::move(upper_neighbor_chunks), broker_); + upper_neighbor_index_, upper_neighbor_rw_mutex_, keys_map_lock_, + keys_map_, use_key_info_map_, std::move(node_chunks), + std::move(upper_neighbor_chunks), broker_); if (ailego_unlikely(!entity)) { LOG_ERROR("HnswRabitqStreamerEntity new failed"); } diff --git a/src/core/algorithm/hnsw_rabitq/hnsw_rabitq_streamer_entity.h b/src/core/algorithm/hnsw_rabitq/hnsw_rabitq_streamer_entity.h index ea36143af..7c5b600e7 100644 --- a/src/core/algorithm/hnsw_rabitq/hnsw_rabitq_streamer_entity.h +++ b/src/core/algorithm/hnsw_rabitq/hnsw_rabitq_streamer_entity.h @@ -15,6 +15,7 @@ #pragma once #include +#include #include #include #include @@ -216,17 +217,17 @@ class HnswRabitqStreamerEntity : public HnswRabitqEntity { using NIHashMapPointer = std::shared_ptr; //! Private construct, only be called by clone method - HnswRabitqStreamerEntity(IndexStreamer::Stats &stats, const HNSWHeader &hd, - size_t chunk_size, uint32_t node_index_mask_bits, - uint32_t upper_neighbor_mask_bits, - bool filter_same_key, bool get_vector_enabled, - const NIHashMapPointer &upper_neighbor_index, - std::shared_ptr &keys_map_lock, - const HashMapPointer &keys_map, - bool use_key_info_map, - std::vector &&node_chunks, - std::vector &&upper_neighbor_chunks, - const HnswRabitqChunkBroker::Pointer &broker) + HnswRabitqStreamerEntity( + IndexStreamer::Stats &stats, const HNSWHeader &hd, size_t chunk_size, + uint32_t node_index_mask_bits, uint32_t upper_neighbor_mask_bits, + bool filter_same_key, bool get_vector_enabled, + const NIHashMapPointer &upper_neighbor_index, + const std::shared_ptr &upper_neighbor_rw_mutex, + std::shared_ptr &keys_map_lock, + const HashMapPointer &keys_map, bool use_key_info_map, + std::vector &&node_chunks, + std::vector &&upper_neighbor_chunks, + const HnswRabitqChunkBroker::Pointer &broker) : stats_(stats), chunk_size_(chunk_size), node_index_mask_bits_(node_index_mask_bits), @@ -237,6 +238,7 @@ class HnswRabitqStreamerEntity : public HnswRabitqEntity { filter_same_key_(filter_same_key), get_vector_enabled_(get_vector_enabled), use_key_info_map_(use_key_info_map), + upper_neighbor_rw_mutex_(upper_neighbor_rw_mutex), upper_neighbor_index_(upper_neighbor_index), keys_map_lock_(keys_map_lock), keys_map_(keys_map), @@ -286,6 +288,11 @@ class HnswRabitqStreamerEntity : public HnswRabitqEntity { inline std::pair get_upper_neighbor_chunk_loc( level_t level, node_id_t id) const { + // Shared lock: concurrent readers are fine, but must synchronize with + // add_upper_neighbor's exclusive lock to avoid data-race on + // slots_.size() inside HnswIndexHashMap (the emplace_back in alloc_slot + // is not atomic and concurrent find() may see a stale size value). + std::shared_lock lk(*upper_neighbor_rw_mutex_); auto it = upper_neighbor_index_->find(id); ailego_assert_abort(it != upper_neighbor_index_->end(), "Get upper neighbor header failed"); @@ -334,6 +341,10 @@ class HnswRabitqStreamerEntity : public HnswRabitqEntity { if (level == 0) { return 0; } + // Exclusive lock: protects upper_neighbor_chunks_.emplace_back() and + // upper_neighbor_index_->insert() from racing with concurrent find() + // calls in get_upper_neighbor_chunk_loc(). + std::unique_lock lk(*upper_neighbor_rw_mutex_); Chunk::Pointer chunk; uint64_t chunk_offset = -1UL; size_t neighbors_size = get_total_upper_neighbors_size(level); @@ -373,17 +384,40 @@ class HnswRabitqStreamerEntity : public HnswRabitqEntity { meta.level = level; meta.index = (chunk_index << upper_neighbor_mask_bits_) | (chunk_offset / upper_neighbor_size_); + size_t zero_start = chunk_offset; chunk_offset += upper_neighbor_size_ * level; - if (ailego_unlikely(!upper_neighbor_index_->insert(id, meta.data))) { - LOG_ERROR("HashMap insert value failed"); - return IndexError_Runtime; - } + // IMPORTANT: order matters here. + // 1) resize so the chunk's data_size covers the new region. + // 2) zero-fill the new region: storage backends like BufferStorage do + // NOT zero on resize -- only metadata is updated, and the underlying + // page may contain stale content from a previously-evicted page. + // Without this step, NeighborsHeader::neighbor_cnt is garbage and + // select_entry_point()/search_neighbors() iterate over garbage + // node_ids, eventually triggering find()'s assertion in + // get_upper_neighbor_chunk_loc() at line 291. + // 3) ONLY THEN publish the entry to upper_neighbor_index_, so that any + // concurrent reader that finds this id already sees a properly + // zeroed upper-neighbor slot. if (ailego_unlikely(chunk->resize(chunk_offset) != chunk_offset)) { LOG_ERROR("Chunk resize to %zu failed", (size_t)chunk_offset); return IndexError_Runtime; } + // Use std::vector instead of a VLA: VLAs are a GNU extension and may + // produce different codegen / be rejected under clang/MSVC. + std::vector zeros(neighbors_size, 0); + if (ailego_unlikely(chunk->write(zero_start, zeros.data(), + neighbors_size) != neighbors_size)) { + LOG_ERROR("Chunk write zeros failed"); + return IndexError_Runtime; + } + + if (ailego_unlikely(!upper_neighbor_index_->insert(id, meta.data))) { + LOG_ERROR("HashMap insert value failed"); + return IndexError_Runtime; + } + return 0; } @@ -503,6 +537,11 @@ class HnswRabitqStreamerEntity : public HnswRabitqEntity { bool get_vector_enabled_{false}; bool use_key_info_map_{true}; + // Shared via shared_ptr so that all cloned entities synchronize against + // the SAME mutex instance. A plain std::shared_mutex member would be + // independent per clone and provide no real protection for the shared + // upper_neighbor_index_ hashmap. + mutable std::shared_ptr upper_neighbor_rw_mutex_{}; NIHashMapPointer upper_neighbor_index_{}; mutable std::shared_ptr keys_map_lock_{}; diff --git a/src/core/algorithm/hnsw_sparse/hnsw_sparse_streamer.cc b/src/core/algorithm/hnsw_sparse/hnsw_sparse_streamer.cc index 3abce8087..20c215257 100644 --- a/src/core/algorithm/hnsw_sparse/hnsw_sparse_streamer.cc +++ b/src/core/algorithm/hnsw_sparse/hnsw_sparse_streamer.cc @@ -27,7 +27,7 @@ namespace core { HnswSparseStreamer::HnswSparseStreamer() : entity_(stats_) {} HnswSparseStreamer::~HnswSparseStreamer() { - if (state_ == STATE_INITED) { + if (state_ == STATE_INITED || state_ == STATE_OPENED) { this->cleanup(); } } diff --git a/src/core/algorithm/vamana/vamana_streamer.cc b/src/core/algorithm/vamana/vamana_streamer.cc index ae935eb81..2738a98ad 100644 --- a/src/core/algorithm/vamana/vamana_streamer.cc +++ b/src/core/algorithm/vamana/vamana_streamer.cc @@ -26,7 +26,7 @@ namespace core { VamanaStreamer::VamanaStreamer() = default; VamanaStreamer::~VamanaStreamer() { - if (state_ == STATE_INITED) { + if (state_ == STATE_INITED || state_ == STATE_OPENED) { this->cleanup(); } } diff --git a/src/core/algorithm/vamana/vamana_streamer_entity.h b/src/core/algorithm/vamana/vamana_streamer_entity.h index 87033ed04..30b55b80e 100644 --- a/src/core/algorithm/vamana/vamana_streamer_entity.h +++ b/src/core/algorithm/vamana/vamana_streamer_entity.h @@ -352,9 +352,16 @@ VamanaStreamerEntity::get_neighbors_typed( LOG_ERROR("Read neighbor header failed, ret=%zu", ret); return NeighborsT(); } - BufferPoolMemoryBlock block(mem_block.buffer_pool_handle_, - mem_block.buffer_block_id_, mem_block.data_); - mem_block.buffer_pool_handle_ = nullptr; + BufferPoolMemoryBlock block; + if (mem_block.type_ == IndexStorage::MemoryBlock::MBT_HEAP_SCRATCH) { + block = BufferPoolMemoryBlock::MakeOwned(mem_block.data_); + mem_block.data_ = nullptr; + mem_block.type_ = IndexStorage::MemoryBlock::MBT_UNKNOWN; + } else { + block = BufferPoolMemoryBlock(mem_block.buffer_pool_handle_, + mem_block.buffer_block_id_, mem_block.data_); + mem_block.buffer_pool_handle_ = nullptr; + } return NeighborsT(std::move(block)); } @@ -392,10 +399,19 @@ inline int VamanaStreamerEntity::get_vector_typed( LOG_ERROR("Read vector failed, ret=%zu", ret); return IndexError_ReadData; } - vec_blocks[i] = - BufferPoolMemoryBlock(mem_block.buffer_pool_handle_, + vec_blocks[i] = [&]() { + if (mem_block.type_ == IndexStorage::MemoryBlock::MBT_HEAP_SCRATCH) { + BufferPoolMemoryBlock b = + BufferPoolMemoryBlock::MakeOwned(mem_block.data_); + mem_block.data_ = nullptr; + mem_block.type_ = IndexStorage::MemoryBlock::MBT_UNKNOWN; + return b; + } + BufferPoolMemoryBlock b(mem_block.buffer_pool_handle_, mem_block.buffer_block_id_, mem_block.data_); - mem_block.buffer_pool_handle_ = nullptr; + mem_block.buffer_pool_handle_ = nullptr; + return b; + }(); } return 0; } diff --git a/src/core/utility/buffer_storage.cc b/src/core/utility/buffer_storage.cc index 7ccf93b71..8606c562c 100644 --- a/src/core/utility/buffer_storage.cc +++ b/src/core/utility/buffer_storage.cc @@ -12,9 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include +#include +#include #include +#include +#include #include +#include #include #include #include @@ -25,7 +31,32 @@ namespace zvec { namespace core { -/*! MMap File Storage +// Thread-local reusable scratch buffer for cross-page reads in the +// read(const void**) overload. Avoids allocating a new buffer on +// every cross-page read by reusing the same allocation on each thread. The +// returned pointer is valid only until the next cross-page read() on +// the same thread -- matching the single-page path's transient +// lifetime (ref released immediately, page may be evicted any time). +struct CrossPageScratch { + char *buf = nullptr; + size_t cap = 0; + ~CrossPageScratch() { + if (buf) ailego_free(buf); + } + char *ensure(size_t len) { + if (cap < len) { + if (buf) ailego_free(buf); + // C11 aligned_alloc requires size to be a multiple of alignment. + const size_t kAlign = 4096UL; + size_t alloc_size = (len + (kAlign - 1UL)) & ~(kAlign - 1UL); + buf = static_cast(ailego_aligned_malloc(alloc_size, kAlign)); + cap = buf ? alloc_size : 0; + } + return buf; + } +}; + +/*! Buffer Storage */ class BufferStorage : public IndexStorage { public: @@ -38,32 +69,37 @@ class BufferStorage : public IndexStorage { typedef std::shared_ptr Pointer; //! Constructor - WrappedSegment(BufferStorage *owner, IndexMapping::Segment *segment, - uint64_t segment_header_start_offset, - IndexFormat::MetaHeader *segment_header, size_t segment_id) - : segment_(segment), + //! + //! `info` MUST be a pointer into BufferStorage::segments_ (an + //! unordered_map mapped value). C++ guarantees those pointers stay + //! valid across insertions, so the WrappedSegment can safely fetch + //! the LATEST segment_header / segment_header_start_offset / Segment + //! after a re-parse caused by append_segment(). Storing the pointer + //! (rather than copying header_/offset into local fields) is what + //! prevents use-after-free when chain_headers_ is rebuilt. + WrappedSegment(BufferStorage *owner, IndexMapping::SegmentInfo *info, + size_t segment_id) + : segment_info_(info), owner_(owner), segment_id_(segment_id), - capacity_(static_cast(segment->meta()->data_size + - segment->meta()->padding_size)), - segment_header_start_offset_(segment_header_start_offset), - segment_header_(segment_header) {} + capacity_(static_cast(info->segment.meta()->data_size + + info->segment.meta()->padding_size)) {} //! Destructor ~WrappedSegment(void) override {} //! Retrieve size of data size_t data_size(void) const override { - return static_cast(segment_->meta()->data_size); + return static_cast(segment_info_->segment.meta()->data_size); } //! Retrieve crc of data uint32_t data_crc(void) const override { - return segment_->meta()->data_crc; + return segment_info_->segment.meta()->data_crc; } //! Retrieve size of padding size_t padding_size(void) const override { - return static_cast(segment_->meta()->padding_size); + return static_cast(segment_info_->segment.meta()->padding_size); } //! Retrieve capacity of segment @@ -72,84 +108,231 @@ class BufferStorage : public IndexStorage { } //! Fetch data from segment (with own buffer) + //! + //! C1: pool/handle are stable for the lifetime of the index + //! (no retire/rebuild), so no lock is needed on the hot path. size_t fetch(size_t offset, void *buf, size_t len) const override { - if (ailego_unlikely(offset + len > segment_->meta()->data_size)) { - auto meta = segment_->meta(); - if (offset > meta->data_size) { - offset = meta->data_size; + if (ailego_unlikely(!owner_->buffer_pool_handle_)) { + LOG_ERROR("WrappedSegment::fetch: handle is null, file[%s], id[%zu]", + owner_->file_name_.c_str(), segment_id_); + return 0; + } + const size_t data_size = segment_info_->segment.meta()->data_size; + if (ailego_unlikely(offset > data_size || len > data_size - offset)) { + if (offset > data_size) { + offset = data_size; } - len = meta->data_size - offset; + len = data_size - offset; } - size_t buffer_offset = segment_header_start_offset_ + - segment_header_->content_offset + - segment_->meta()->data_index; - auto *raw = owner_->get_buffer(buffer_offset, capacity_, segment_id_); - if (!raw) { + size_t abs_offset = segment_info_->segment_header_start_offset + + segment_info_->segment_header->content_offset + + segment_info_->segment.meta()->data_index + offset; + if (!owner_->buffer_pool_handle_->read_range(abs_offset, len, + static_cast(buf))) { return 0; } - auto *data = raw + offset; - memmove(buf, data, len); return len; } //! Read data from segment + //! C1: lock-free hot path (pool/handle never change during operation). size_t read(size_t offset, const void **data, size_t len) override { - if (ailego_unlikely(offset + len > segment_->meta()->data_size)) { - auto meta = segment_->meta(); - if (offset > meta->data_size) { - offset = meta->data_size; + if (ailego_unlikely(!owner_->buffer_pool_handle_)) { + LOG_ERROR("WrappedSegment::read: handle is null, file[%s], id[%zu]", + owner_->file_name_.c_str(), segment_id_); + *data = nullptr; + return 0; + } + const size_t data_size = segment_info_->segment.meta()->data_size; + if (ailego_unlikely(offset > data_size || len > data_size - offset)) { + if (offset > data_size) { + offset = data_size; } - len = meta->data_size - offset; + len = data_size - offset; } - size_t buffer_offset = segment_header_start_offset_ + - segment_header_->content_offset + - segment_->meta()->data_index; - auto *raw = owner_->get_buffer(buffer_offset, capacity_, segment_id_); - if (!raw) { + size_t abs_offset = segment_info_->segment_header_start_offset + + segment_info_->segment_header->content_offset + + segment_info_->segment.meta()->data_index + offset; + size_t first_page = abs_offset / ailego::kVectorPageSize; + size_t last_page = (len == 0) + ? first_page + : (abs_offset + len - 1) / ailego::kVectorPageSize; + if (first_page == last_page) { + size_t page_id = 0; + char *raw = owner_->buffer_pool_handle_->get_single_page(abs_offset, + len, page_id); + if (!raw) { + *data = nullptr; + return 0; + } + *data = raw; + // Release the buffer-pool ref count acquired by get_single_page(). + // The pointer remains valid as long as the page is not evicted; callers + // needing a stable pin should use the read(MemoryBlock&) overload. + owner_->buffer_pool_handle_->release_one(page_id); + return len; + } + // Reuse a thread-local scratch buffer to avoid allocating on + // every cross-page read. The pointer is valid until the next + // cross-page read(const void**) on the same thread. + thread_local CrossPageScratch scratch; + char *tmp = scratch.ensure(len); + if (!tmp) { + *data = nullptr; + return 0; + } + if (!owner_->buffer_pool_handle_->read_range(abs_offset, len, tmp)) { + *data = nullptr; return 0; } - *data = raw + offset; + *data = tmp; return len; } + //! C1: lock-free hot path (pool/handle never change during operation). size_t read(size_t offset, MemoryBlock &data, size_t len) override { - if (ailego_unlikely(offset + len > segment_->meta()->data_size)) { - auto meta = segment_->meta(); - if (offset > meta->data_size) { - offset = meta->data_size; + if (ailego_unlikely(!owner_->buffer_pool_handle_)) { + LOG_ERROR( + "WrappedSegment::read(MemoryBlock&): handle is null, file[%s], " + "id[%zu]", + owner_->file_name_.c_str(), segment_id_); + return 0; + } + const size_t data_size = segment_info_->segment.meta()->data_size; + if (ailego_unlikely(offset > data_size || len > data_size - offset)) { + if (offset > data_size) { + offset = data_size; + } + len = data_size - offset; + } + size_t abs_offset = segment_info_->segment_header_start_offset + + segment_info_->segment_header->content_offset + + segment_info_->segment.meta()->data_index + offset; + size_t first_page = abs_offset / ailego::kVectorPageSize; + size_t last_page = (len == 0) + ? first_page + : (abs_offset + len - 1) / ailego::kVectorPageSize; + if (first_page == last_page) { + size_t page_id = 0; + char *raw = owner_->buffer_pool_handle_->get_single_page(abs_offset, + len, page_id); + if (!raw) { + LOG_ERROR("read error (single-page acquire failed)."); + return 0; } - len = meta->data_size - offset; + data.reset(owner_->buffer_pool_handle_.get(), page_id, raw); + return len; } - size_t buffer_offset = segment_header_start_offset_ + - segment_header_->content_offset + - segment_->meta()->data_index; - auto *raw = owner_->get_buffer(buffer_offset, capacity_, segment_id_); - if (!raw) { + // C11 aligned_alloc requires the requested size to be a multiple of + // the alignment; round len up to the next 4K boundary. Without this + // glibc treats the call as undefined behaviour and silently corrupts + // heap metadata (manifesting later as `corrupted size vs. prev_size`). + const size_t kAlign = 4096UL; + size_t alloc_size = (len + (kAlign - 1UL)) & ~(kAlign - 1UL); + char *tmp = + static_cast(ailego_aligned_malloc(alloc_size, kAlign)); + if (!tmp) { + LOG_ERROR("read error (alloc cross-page temp buffer failed)."); return 0; } - - data.reset(owner_->buffer_pool_handle_.get(), segment_id_, raw + offset); - if (data.data()) { - return len; - } else { - LOG_ERROR("read error."); - return -1; + if (!owner_->buffer_pool_handle_->read_range(abs_offset, len, tmp)) { + ailego_free(tmp); + LOG_ERROR("read error (cross-page read_range failed)."); + return 0; } + data = MemoryBlock::MakeOwned(tmp, len); + return len; } //! Write data into the storage with offset - size_t write(size_t /*offset*/, const void * /*data*/, - size_t len) override { + //! + //! Takes a SHARED latch on the owner's mapping shard. This pairs with + //! the EXCLUSIVE all-shards latch held by flush_index() / append_segment() + //! around the meta_buf CRC + write_meta phase: writers parallelize + //! across (and within) shards, but are fully excluded while CRC is + //! computed over the meta_buf bytes that this method mutates + //! (data_size / padding_size). Without this latch the lock-free hot + //! path raced with the CRC compute, producing footer.segments_meta_crc + //! that did not match the bytes pwrite()'d to disk. + size_t write(size_t offset, const void *data, size_t len) override { + std::shared_lock latch( + owner_->mapping_shards_[owner_->mapping_shard_id()].mtx); + if (ailego_unlikely(!owner_->buffer_pool_handle_ || + !owner_->buffer_pool_)) { + LOG_ERROR("WrappedSegment::write: pool is null, file[%s], id[%zu]", + owner_->file_name_.c_str(), segment_id_); + return 0; + } + // In read-only mode the write is a silent no-op so that callers that + // unconditionally write (e.g. CRC updates) do not return an error. + if (!owner_->buffer_pool_->writable()) { + return len; + } + if (ailego_unlikely(offset > capacity_ || len > capacity_ - offset)) { + LOG_ERROR( + "write() exceeds segment capacity: offset=%zu len=%zu cap=%zu", + offset, len, capacity_); + return 0; + } + auto meta = segment_info_->segment.meta(); + if (offset + len > meta->data_size) { + meta->data_size = offset + len; + meta->padding_size = capacity_ - meta->data_size; + } + size_t abs_offset = segment_info_->segment_header_start_offset + + segment_info_->segment_header->content_offset + + segment_info_->segment.meta()->data_index + offset; + if (owner_->buffer_pool_handle_->write_range( + abs_offset, len, static_cast(data)) != 0) { + LOG_ERROR("write() page-cache write_range failed at abs_offset=%zu", + abs_offset); + return 0; + } + // ALWAYS mark dirty after a successful page-cache write so that the + // next flush_index() does NOT take the `if (!index_dirty_) return 0;` + // short-circuit and skip flush_all(). Previously this was only set + // when `data_size` grew, which meant fixed-size segments (e.g. + // chunk_meta_segment writing HnswChunkMeta in place) never raised + // the dirty flag -- their 4K page-cache pages were not flushed before + // append_segment(), so the freshly-rebuilt page table + // pread'd stale content from disk and chunk_cnts[NODE] lagged the + // real segment count, eventually causing sync_chunks() to see a + // mid-state segment and crash with a NULL Chunk::Pointer. + owner_->set_as_dirty(); return len; } //! Resize size of data - size_t resize(size_t /*size*/) override { - return 0; + //! + //! Takes a SHARED latch for the same reason as write(): mutating + //! meta->data_size / padding_size must be excluded from the CRC + //! compute in flush_index() / append_segment(). + size_t resize(size_t size) override { + std::shared_lock latch( + owner_->mapping_shards_[owner_->mapping_shard_id()].mtx); + auto meta = segment_info_->segment.meta(); + if (meta->data_size != size) { + if (size > capacity_) { + size = capacity_; + } + meta->data_size = size; + meta->padding_size = capacity_ - size; + owner_->set_as_dirty(); + } + return size; } //! Update crc of data - void update_data_crc(uint32_t /*crc*/) override {} + //! + //! Takes a SHARED latch for the same reason as write(): mutating + //! meta->data_crc must be excluded from the CRC compute in + //! flush_index() / append_segment(). + void update_data_crc(uint32_t crc) override { + std::shared_lock latch( + owner_->mapping_shards_[owner_->mapping_shard_id()].mtx); + segment_info_->segment.meta()->data_crc = crc; + owner_->set_as_dirty(); + } //! Clone the segment IndexStorage::Segment::Pointer clone(void) override { @@ -158,14 +341,17 @@ class BufferStorage : public IndexStorage { protected: friend BufferStorage; - IndexMapping::Segment *segment_{}; + // Pointer into BufferStorage::segments_ (an unordered_map mapped value). + // C++ guarantees the address stays valid across map insertions. All + // header / start-offset / segment-meta accesses go through this pointer + // so that re-parses after append_segment() are observed without + // needing to recreate WrappedSegment instances held by callers. + IndexMapping::SegmentInfo *segment_info_{nullptr}; private: BufferStorage *owner_{nullptr}; size_t segment_id_{}; size_t capacity_{}; - uint64_t segment_header_start_offset_; - IndexFormat::MetaHeader *segment_header_; }; //! Destructor @@ -179,7 +365,11 @@ class BufferStorage : public IndexStorage { } //! Initialize storage - int init(const ailego::Params & /*params*/) override { + int init(const ailego::Params ¶ms) override { + uint32_t val = params.get_as_uint32(MMAPFILE_STORAGE_SEGMENT_META_CAPACITY); + if (val != 0) { + segment_meta_capacity_ = val; + } return 0; } @@ -190,48 +380,63 @@ class BufferStorage : public IndexStorage { } //! Open storage - int open(const std::string &path, bool /*create_if_missing*/) override { + int open(const std::string &path, bool create_if_missing) override { file_name_ = path; - buffer_pool_ = std::make_shared(path); + if (!ailego::File::IsExist(path) && create_if_missing) { + size_t last_slash = path.rfind('/'); + if (last_slash != std::string::npos) { + ailego::File::MakePath(path.substr(0, last_slash)); + } + int error_code = this->init_index(path); + if (error_code != 0) { + LOG_ERROR("init_index failed for %s, errno=%d", path.c_str(), + error_code); + return error_code; + } + } + + // Open in writable mode when the caller expects to modify the index + // (create_if_missing=true implies write intent, same as MMapFileStorage). + buffer_pool_ = std::make_shared( + path, /*writable=*/create_if_missing, /*create=*/false); buffer_pool_handle_ = std::make_shared( buffer_pool_->get_handle()); int ret = ParseToMapping(); if (ret != 0) { + this->close_index(); return ret; } - ret = buffer_pool_->init(segments_.size()); + ret = buffer_pool_->init(); if (ret != 0) { + this->close_index(); return ret; } LOG_INFO( - "BufferStorage opened: file=%s, max_segment_size=%zu, " + "BufferStorage opened: file=%s, writable=%d, max_segment_size=%lu, " "segment_count=%zu", - file_name_.c_str(), (size_t)max_segment_size_, segments_.size()); + file_name_.c_str(), static_cast(create_if_missing), + max_segment_size_, segments_.size()); return 0; } - char *get_buffer(size_t offset, size_t length, size_t block_id) { - return buffer_pool_handle_->get_block(offset, length, block_id); - } - - int get_meta(size_t offset, size_t length, char *out) { - return buffer_pool_handle_->get_meta(offset, length, out); - } - - int ParseHeader(size_t offset) { - std::unique_ptr buffer(new char[sizeof(header_)]); - if (get_meta(offset, sizeof(header_), buffer.get()) != 0) { + int ParseHeader(size_t offset, IndexFormat::MetaHeader *out) { + std::unique_ptr buffer(new char[sizeof(*out)]); + // ParseHeader is called from ParseToMapping which is itself called + // from either open() (single-threaded) or append_segment() (under + // AllShardsExclusiveLatch). Do NOT add an internal lock here -- + // std::shared_mutex is not reentrant -> deadlock. + if (buffer_pool_handle_->get_meta(offset, sizeof(*out), buffer.get()) != + 0) { LOG_ERROR("Get segment header failed."); return IndexError_Runtime; } - uint8_t *header_ptr = reinterpret_cast(buffer.get()); - memcpy(&header_, header_ptr, sizeof(header_)); - if (header_.meta_header_size != sizeof(IndexFormat::MetaHeader)) { + memcpy(out, buffer.get(), sizeof(*out)); + if (out->meta_header_size != sizeof(IndexFormat::MetaHeader)) { LOG_ERROR("Header meta size is invalid."); return IndexError_InvalidLength; } - if (ailego::Crc32c::Hash(&header_, sizeof(header_), header_.header_crc) != - header_.header_crc) { + if (ailego::Crc32c::Hash(out, sizeof(*out), out->header_crc) != + out->header_crc) { LOG_ERROR("Header meta checksum is invalid."); return IndexError_InvalidChecksum; } @@ -240,7 +445,9 @@ class BufferStorage : public IndexStorage { int ParseFooter(size_t offset) { std::unique_ptr buffer(new char[sizeof(footer_)]); - if (get_meta(offset, sizeof(footer_), buffer.get()) != 0) { + // Bypass wrapper -- see ParseHeader() comment for why. + if (buffer_pool_handle_->get_meta(offset, sizeof(footer_), buffer.get()) != + 0) { LOG_ERROR("Get segment footer failed."); return IndexError_Runtime; } @@ -258,12 +465,17 @@ class BufferStorage : public IndexStorage { return 0; } - int ParseSegment(size_t offset) { - std::lock_guard latch(mapping_mutex_); + int ParseSegment(size_t offset, IndexFormat::MetaHeader *chain_header, + uint32_t *out_segment_ids_offset) { + // NOTE: this function is only called from ParseToMapping(), which is + // itself called from either open() (single-threaded construction) or + // append_segment() (under AllShardsExclusiveLatch). Do NOT add an + // internal lock here -- doing so would deadlock the append path. std::unique_ptr segment_buffer = std::make_unique(footer_.segments_meta_size); - if (get_meta(offset, footer_.segments_meta_size, segment_buffer.get()) != - 0) { + // Bypass wrapper -- see ParseHeader() comment for why. + if (buffer_pool_handle_->get_meta(offset, footer_.segments_meta_size, + segment_buffer.get()) != 0) { LOG_ERROR("Get segment meta failed."); return IndexError_Runtime; } @@ -278,7 +490,7 @@ class BufferStorage : public IndexStorage { for (IndexFormat::SegmentMeta *iter = segment_start, *end = segment_start + footer_.segment_count; iter != end; ++iter) { - if (iter->segment_id_offset > footer_.segments_meta_size) { + if (iter->segment_id_offset >= footer_.segments_meta_size) { return IndexError_InvalidValue; } if (iter->data_index > footer_.content_size) { @@ -291,15 +503,29 @@ class BufferStorage : public IndexStorage { if (iter->segment_id_offset < segment_ids_offset) { segment_ids_offset = iter->segment_id_offset; } - id_hash_.emplace( - std::string(reinterpret_cast(segment_start) + - iter->segment_id_offset), - segments_.size()); - segments_.emplace( - std::string(reinterpret_cast(segment_start) + - iter->segment_id_offset), + // Assign a stable numeric ID (block_id in the page table) to this + // segment. We use id_hash_.size() rather than segments_.size() because + // segments_ is intentionally NOT cleared between appends (to keep + // existing WrappedSegment pointers valid), so segments_.size() would + // reflect stale entries and produce wrong IDs on re-parse. + const std::string seg_name(reinterpret_cast(segment_start) + + iter->segment_id_offset); + const size_t seg_id = id_hash_.size(); + id_hash_[seg_name] = seg_id; + // Update the segments_ entry in-place so that any WrappedSegment + // instances that already hold a pointer to this entry (via + // &segments_[name].segment) continue to use the refreshed meta_ptr_ + // after the re-parse. + // + // IMPORTANT: chain_header points into chain_headers_ which is a + // std::vector>; each chain owns its OWN + // MetaHeader copy. Do NOT use a shared &header_ here -- when there + // are multiple meta-header chains in the file, the next ParseHeader() + // would overwrite that single instance and break content_offset for + // all earlier-chain segments. + segments_[seg_name] = IndexMapping::SegmentInfo{IndexMapping::Segment{iter}, - current_header_start_offset_, &header_}); + current_header_start_offset_, chain_header}; max_segment_size_ = std::max(max_segment_size_, iter->data_size + iter->padding_size); if (sizeof(IndexFormat::SegmentMeta) * footer_.segment_count > @@ -308,36 +534,45 @@ class BufferStorage : public IndexStorage { } } buffer_pool_buffers_.push_back(std::move(segment_buffer)); + if (out_segment_ids_offset) { + *out_segment_ids_offset = segment_ids_offset; + } return 0; } int ParseToMapping() { while (true) { int ret; - ret = ParseHeader(current_header_start_offset_); + // Allocate an OWN MetaHeader for this chain so that subsequent chains + // never overwrite earlier-chain headers (prior implementation used a + // single header_ member, which corrupted content_offset for chain-0 + // segments once chain-1 was parsed). + chain_headers_.emplace_back(std::make_unique()); + IndexFormat::MetaHeader *chain_header = chain_headers_.back().get(); + ret = ParseHeader(current_header_start_offset_, chain_header); if (ret != 0) { LOG_ERROR("Failed to parse header, errno %d, %s", ret, IndexError::What(ret)); return ret; } - switch (header_.version) { + switch (chain_header->version) { case IndexFormat::FORMAT_VERSION: break; default: - LOG_ERROR("Unsupported index version: %u", header_.version); + LOG_ERROR("Unsupported index version: %u", chain_header->version); return IndexError_Unsupported; } // Unpack footer - if (header_.meta_footer_size != sizeof(IndexFormat::MetaFooter)) { + if (chain_header->meta_footer_size != sizeof(IndexFormat::MetaFooter)) { return IndexError_InvalidLength; } - if ((int32_t)header_.meta_footer_offset < 0) { + if ((int32_t)chain_header->meta_footer_offset < 0) { return IndexError_Unsupported; } uint64_t footer_offset = - header_.meta_footer_offset + current_header_start_offset_; + chain_header->meta_footer_offset + current_header_start_offset_; ret = ParseFooter(footer_offset); if (ret != 0) { LOG_ERROR("Failed to parse footer, errno %d, %s", ret, @@ -352,13 +587,21 @@ class BufferStorage : public IndexStorage { } const uint64_t segment_start_offset = footer_offset - footer_.segments_meta_size; - ret = ParseSegment(segment_start_offset); + uint32_t segment_ids_offset = footer_.segments_meta_size; + ret = + ParseSegment(segment_start_offset, chain_header, &segment_ids_offset); if (ret != 0) { LOG_ERROR("Failed to parse segment, errno %d, %s", ret, IndexError::What(ret)); return ret; } + // Record per-chain metadata offsets so flush_index() can write + // updated segment metas and footers back to the backing file. + meta_chains_.push_back({current_header_start_offset_, footer_offset, + segment_start_offset, footer_.segments_meta_size, + segment_ids_offset, footer_}); + if (footer_.next_meta_header_offset == 0) { break; } @@ -395,13 +638,18 @@ class BufferStorage : public IndexStorage { //! Retrieve a segment by id IndexStorage::Segment::Pointer get(const std::string &id, int) override { - auto segment_info = this->get_segment_info(id); - if (!segment_info) { + std::shared_lock latch( + mapping_shards_[mapping_shard_id()].mtx); + auto seg_iter = segments_.find(id); + if (seg_iter == segments_.end()) { return WrappedSegment::Pointer{}; } - return std::make_shared( - this, &segment_info->segment, segment_info->segment_header_start_offset, - segment_info->segment_header, id_hash_[id]); + auto id_iter = id_hash_.find(id); + if (id_iter == id_hash_.end()) { + return WrappedSegment::Pointer{}; + } + return std::make_shared(this, &seg_iter->second, + id_iter->second); } //! Test if it a segment exists @@ -411,20 +659,24 @@ class BufferStorage : public IndexStorage { //! Retrieve magic number of index uint32_t magic(void) const override { - return header_.magic; + if (chain_headers_.empty()) { + return 0u; + } + return chain_headers_.front()->magic; } protected: - //! Initialize index version segment - int init_version_segment(void) { + //! Initialize index version segment (writes content into an IndexMapping). + //! Only intended to be called from init_index() while `mapping` is still + //! open in create-mode. + int init_version_segment(IndexMapping &mapping) { size_t data_size = std::strlen(IndexVersion::Details()); - int error_code = - this->append_segment(INDEX_VERSION_SEGMENT_NAME, data_size); + int error_code = mapping.append(INDEX_VERSION_SEGMENT_NAME, data_size); if (error_code != 0) { return error_code; } - - auto segment = &get_segment_info(INDEX_VERSION_SEGMENT_NAME)->segment; + IndexMapping::Segment *segment = + mapping.map(INDEX_VERSION_SEGMENT_NAME, false, false); if (!segment) { return IndexError_MMapFile; } @@ -438,74 +690,574 @@ class BufferStorage : public IndexStorage { return 0; } - //! Initialize index file - int init_index(const std::string & /*path*/) { - // Add index version - int error_code = this->init_version_segment(); - if (error_code != 0) { - return error_code; + //! Create the initial on-disk index structure and write the mandatory + //! version segment. Uses IndexMapping (the same engine as MMapFileStorage) + //! so the produced file is fully compatible with both storage backends. + int init_index(const std::string &path) { + IndexMapping mapping; + int ret = mapping.create(path, segment_meta_capacity_); + if (ret != 0) { + LOG_ERROR( + "BufferStorage failed to create index file: path[%s], errno[%d]", + path.c_str(), ret); + return ret; } - - // Refresh mapping - this->refresh_index(0); - return 0; + ret = this->init_version_segment(mapping); + if (ret != 0) { + LOG_ERROR( + "BufferStorage failed to append version segment: path[%s], errno[%d]", + path.c_str(), ret); + mapping.close(); + return ret; + } + mapping.refresh(0); + ret = mapping.flush(); + mapping.close(); + if (ret != 0) { + LOG_ERROR( + "BufferStorage failed to flush new index file: path[%s], errno[%d]", + path.c_str(), ret); + } + return ret; } - //! Set the index file as dirty + //! Set the index file as dirty. + //! + //! HOT PATH: called once per WrappedSegment::write() / resize() / + //! update_data_crc(). We MUST unconditionally store(true) here, not + //! guard with a load-then-store: under relaxed semantics a writer can + //! observe a stale dirty=true (its own core's cached value) AFTER + //! flush_index() has CAS'd dirty to false on another core, then skip + //! its own store and the writer's modification gets dropped (next + //! flush_index() short-circuits at the top because dirty is false). + //! The MESI ping-pong is the cost of correctness; it is bounded by the + //! caller's write rate and amortized by the caller's actual I/O. void set_as_dirty(void) { - index_dirty_ = true; + index_dirty_.store(true, std::memory_order_relaxed); } //! Refresh meta information (checksum, update time, etc.) - void refresh_index(uint64_t /*chkp*/) {} + void refresh_index(uint64_t chkp) { + // Monotonic merge: callers may invoke refresh() out of order under + // concurrency (parallel writers, retries, batched commits delivered on + // different threads). An unconditional store would let a smaller chkp + // arriving later overwrite a larger one, violating the upper-layer + // invariant that the persisted check_point is non-decreasing. CAS-loop + // max guarantees the largest observed value wins regardless of arrival + // order; relaxed ordering is sufficient because flush_index() takes the + // all-shards exclusive latch which establishes the necessary + // happens-before for the actual disk write. + if (chkp != 0) { + uint64_t cur = pending_check_point_.load(std::memory_order_relaxed); + while (chkp > cur) { + if (pending_check_point_.compare_exchange_weak( + cur, chkp, std::memory_order_relaxed)) { + break; + } + // compare_exchange_weak refreshed `cur`; loop checks chkp > cur + // again and exits if some other thread already raised pending past + // our value. + } + } + // In BufferStorage the segment metadata lives in buffer_pool_buffers_. + // CRC recomputation and disk write are deferred to flush_index(). + // Mark dirty unconditionally for the same reason as set_as_dirty(): + // a load-then-store guard would let a stale `true` observation skip + // the store and lose this refresh. Note: even when our chkp lost the + // CAS race (was discarded as stale), we still set dirty -- the winning + // larger chkp must be flushed, and flush_index()'s UpdateMetaFooter() + // is a no-op for chkp==0 so a spurious extra flush is harmless. + index_dirty_.store(true, std::memory_order_relaxed); + } - //! Flush index storage + //! Flush index storage: persists any pending meta changes (segments_meta + + //! footer) for each header chain, then asks the page cache to write back + //! dirty data pages. int flush_index(void) { + if (!index_dirty_.load(std::memory_order_relaxed)) { + return 0; + } + // EXCLUSIVE all-shards latch: blocks the lock-free hot path + // (WrappedSegment::write / resize / update_data_crc) which mutates + // meta->data_size / padding_size / data_crc, the very bytes we hash + // to recompute footer.segments_meta_crc and pwrite to disk. Holding + // a single shard's shared lock (the previous design) was insufficient + // because writers on other shards could race with the CRC compute + // and produce a checksum that mismatches the on-disk segment_meta + // bytes, causing IndexError_InvalidChecksum on the next open(). + AllShardsExclusiveLatch latch(mapping_shards_); + return flush_index_locked(); + } + + //! Internal flush implementation. PRECONDITION: caller MUST already hold + //! AllShardsExclusiveLatch on mapping_shards_. Used by flush_index() + //! (which acquires the latch itself) and by close_index() (which must + //! flush and tear down under a SINGLE continuous latch hold so that no + //! writer can slip in between flush and pool reset and lose its dirty + //! pages). + int flush_index_locked(void) { + // NULL GUARD: a previous append_segment() may have left the pool in a + // torn-down state. + if (!buffer_pool_ || !buffer_pool_handle_) { + LOG_ERROR("BufferStorage::flush_index skipped: pool not ready, file[%s]", + file_name_.c_str()); + return IndexError_Runtime; + } + if (!buffer_pool_->writable()) { + // Read-only pool: nothing to flush. + index_dirty_.store(false, std::memory_order_relaxed); + return 0; + } + // Atomically claim the dirty flag at the START of the flush, not at the + // end. This prevents a TOCTOU race against the lock-free hot path: + // any WrappedSegment::write() that happens between flush_all() and the + // end of this function will simply re-set dirty=true (its set_as_dirty + // observes our cleared flag), and the next flush_index() will pick up + // those new dirty pages. An unconditional store(false) at the end + // would silently swallow that concurrent write. + bool expected_dirty = true; + if (!index_dirty_.compare_exchange_strong(expected_dirty, false, + std::memory_order_relaxed)) { + // Another thread already claimed and is performing the flush; treat + // this call as a no-op. The previous design (no CAS) allowed + // duplicate concurrent flushers; bailing out here is strictly safer + // because both flushers would otherwise race on per-chain footer + // mutation in the loop below. + return 0; + } + // Snapshot the pending checkpoint AFTER claiming dirty so that we + // observe at least every refresh_index() that happened before we + // claimed. The CAS-reset at the end will preserve any newer chkp + // stored by a concurrent refresh_index() during this flush. + const uint64_t consumed_chkp = + pending_check_point_.load(std::memory_order_relaxed); + // Flush all dirty data blocks to the backing file first. + if (buffer_pool_handle_->flush_all() != 0) { + // Restore dirty so the next flush_index() retries. + index_dirty_.store(true, std::memory_order_relaxed); + LOG_ERROR("flush_all data blocks failed: file[%s]", file_name_.c_str()); + return IndexError_WriteData; + } + // For each metadata chain, recompute the segment-meta CRC, update the + // in-memory footer (segments_meta_crc + footer_crc + update_time), and + // write both the segment metadata and the footer back to the backing + // file. Uses the per-chain in-memory footer copy, avoiding a pread. + for (size_t ci = 0; + ci < meta_chains_.size() && ci < buffer_pool_buffers_.size(); ++ci) { + MetaChain &mchain = meta_chains_[ci]; + const char *seg_buf = buffer_pool_buffers_[ci].get(); + // Recompute segment metadata CRC and refresh the per-chain footer. + mchain.footer.segments_meta_crc = + ailego::Crc32c::Hash(seg_buf, mchain.segment_meta_size, 0u); + IndexFormat::UpdateMetaFooter(&mchain.footer, consumed_chkp); + // Write segment metadata back to disk. + if (buffer_pool_handle_->write_meta(mchain.segment_meta_file_offset, + mchain.segment_meta_size, + seg_buf) != 0) { + LOG_ERROR("Failed to write segment meta: file[%s], chain[%zu]", + file_name_.c_str(), ci); + index_dirty_.store(true, std::memory_order_relaxed); + return IndexError_WriteData; + } + // Write the updated footer back to disk. + if (buffer_pool_handle_->write_meta( + mchain.footer_file_offset, sizeof(mchain.footer), + reinterpret_cast(&mchain.footer)) != 0) { + LOG_ERROR("Failed to write footer: file[%s], chain[%zu]", + file_name_.c_str(), ci); + index_dirty_.store(true, std::memory_order_relaxed); + return IndexError_WriteData; + } + } + // Keep the convenience alias in sync with the last chain. + if (!meta_chains_.empty()) { + footer_ = meta_chains_.back().footer; + } + // CAS-reset pending: only consume the checkpoint we observed at the + // start. If a concurrent refresh_index() stored a newer value during + // the flush, CAS fails and the newer value remains in + // pending_check_point_; refresh_index() also re-set dirty=true (since + // we cleared it at the top), so the next flush_index() will persist + // the newer chkp. + uint64_t expected_chkp = consumed_chkp; + pending_check_point_.compare_exchange_strong(expected_chkp, 0, + std::memory_order_relaxed); return 0; } //! Close index storage void close_index(void) { - std::lock_guard latch(mapping_mutex_); + // Take the all-shards exclusive latch BEFORE flushing, and hold it for + // the entire teardown sequence. Earlier code released the latch + // between flush and teardown, opening a window in which a writer could + // grab a shared lock, mutate meta_buf via WrappedSegment::write() and + // call set_as_dirty(true). After this close_index() reacquired the + // latch and reset buffer_pool_handle_, those dirty pages would be + // dropped on the floor with no chance to flush. Holding a SINGLE + // latch instance across flush_index_locked() and the reset eliminates + // that window: writers can only enter once we have fully torn down + // (and at that point segments_/buffer_pool_handle_ are gone, so they + // would fail the null/state guards in WrappedSegment). + AllShardsExclusiveLatch latch(mapping_shards_); + flush_index_locked(); file_name_.clear(); id_hash_.clear(); segments_.clear(); - memset(&header_, 0, sizeof(header_)); + chain_headers_.clear(); memset(&footer_, 0, sizeof(footer_)); buffer_pool_handle_.reset(); buffer_pool_.reset(); max_segment_size_ = 0; buffer_pool_buffers_.clear(); + meta_chains_.clear(); + current_header_start_offset_ = 0; + pending_check_point_.store(0, std::memory_order_relaxed); + index_dirty_.store(false, std::memory_order_relaxed); } - //! Append a segment into storage - int append_segment(const std::string & /*id*/, size_t /*size*/) { + //! Append a segment into storage. + //! + //! C1: the page table extends in-place (no pool rotation). The exclusive + //! latch is held only briefly to protect segments_/id_hash_ insertion. + int append_segment(const std::string &id, size_t size) { + // Flush any in-memory metadata changes (data_size, padding_size, CRC) + // accumulated by prior write()/resize() calls. + this->flush_index(); + + AllShardsExclusiveLatch latch(mapping_shards_); + + if (!buffer_pool_ || !buffer_pool_handle_) { + LOG_ERROR("append_segment: pool not ready, file[%s]", file_name_.c_str()); + return IndexError_Runtime; + } + if (!buffer_pool_->writable()) { + LOG_ERROR("append_segment: pool is read-only, file[%s]", + file_name_.c_str()); + return IndexError_Runtime; + } + if (size == 0) { + return IndexError_InvalidArgument; + } + if (segments_.find(id) != segments_.end()) { + return IndexError_Duplicate; + } + if (meta_chains_.empty() || chain_headers_.empty() || + buffer_pool_buffers_.empty()) { + LOG_ERROR("append_segment: invalid state, file[%s]", file_name_.c_str()); + return IndexError_Runtime; + } + + // Page-aligned padded size for the new segment. Matches IndexMapping's + // CalcPageAlignedSize() so the on-disk layout stays identical. + const size_t page_size = ailego::kVectorPageSize; + const size_t padded_size = (size + page_size - 1) / page_size * page_size; + + // The "current last chain" is meta_chains_.back() / chain_headers_.back(); + // footer_ is always the last chain's footer (overwritten by ParseFooter + // during ParseToMapping). + size_t id_size = id.length() + 1; + size_t need_size = sizeof(IndexFormat::SegmentMeta) + id_size; + MetaChain *chain = &meta_chains_.back(); + IndexFormat::MetaHeader *header = chain_headers_.back().get(); + char *meta_buf = buffer_pool_buffers_.back().get(); + + // Rollback handle for the (possibly committed) chain split below. + // Default is a no-op; populated ONLY after Step 1's in-memory commit + // succeeds so that a Step 2 disk-write failure can undo the split as + // well, leaving meta_chains_ / chain_headers_ / buffer_pool_buffers_ / + // footer_ / current_header_start_offset_ exactly as they were before + // append_segment() ran. Without this, a Step 2 failure would leave + // an orphan empty chain permanently appended to the file (harmless + // for correctness because it stays linked and gets reused on next + // append, but disruptive for idempotent retries and unit tests). + std::function rollback_step1 = []() {}; + + // ---- Step 1: chain split if current chain has no meta capacity left. + if (sizeof(IndexFormat::SegmentMeta) * footer_.segment_count + need_size > + chain->segment_ids_offset) { + size_t new_chain_start = buffer_pool_->file_size(); + new_chain_start = + (new_chain_start + page_size - 1) / page_size * page_size; + size_t new_meta_total = + (segment_meta_capacity_ + sizeof(IndexFormat::MetaHeader) + + sizeof(IndexFormat::MetaFooter) + page_size - 1) / + page_size * page_size; + uint32_t new_segments_meta_size = static_cast( + new_meta_total - sizeof(IndexFormat::MetaHeader) - + sizeof(IndexFormat::MetaFooter)); + + // Prepare the linked old footer WITHOUT mutating footer_ yet so + // that a write failure leaves in-memory state untouched. + const auto saved_footer_before_split = footer_; + IndexFormat::MetaFooter linked_footer = footer_; + linked_footer.next_meta_header_offset = new_chain_start; + IndexFormat::UpdateMetaFooter(&linked_footer, 0); + + // Write old footer with forward link to disk. + if (buffer_pool_handle_->write_meta( + chain->footer_file_offset, sizeof(linked_footer), + reinterpret_cast(&linked_footer)) != 0) { + LOG_ERROR("append_segment: write old footer failed, file[%s]", + file_name_.c_str()); + return IndexError_WriteData; + } + + // Best-effort rollback: restore original old footer on disk if a + // subsequent disk write in this split block fails. + auto undo_old_footer = [&]() { + buffer_pool_handle_->write_meta( + chain->footer_file_offset, sizeof(saved_footer_before_split), + reinterpret_cast(&saved_footer_before_split)); + }; + + // Extend the file and write the new chain's header + (zero) footer. + // The segment_meta region is implicitly zero-filled by ftruncate, + // matching the empty `new_meta_buf` we keep in memory. + if (!buffer_pool_->extend_file(new_chain_start + new_meta_total)) { + undo_old_footer(); + return IndexError_Runtime; + } + + auto new_header = std::make_unique(); + IndexFormat::SetupMetaHeader( + new_header.get(), + static_cast(new_meta_total - + sizeof(IndexFormat::MetaFooter)), + static_cast(new_meta_total)); + + auto new_meta_buf = std::make_unique(new_segments_meta_size); + std::memset(new_meta_buf.get(), 0, new_segments_meta_size); + + IndexFormat::MetaFooter new_footer; + IndexFormat::SetupMetaFooter(&new_footer); + new_footer.segments_meta_size = new_segments_meta_size; + new_footer.total_size = new_meta_total; + new_footer.segments_meta_crc = + ailego::Crc32c::Hash(new_meta_buf.get(), new_segments_meta_size, 0u); + IndexFormat::UpdateMetaFooter(&new_footer, 0); + + if (buffer_pool_handle_->write_meta( + new_chain_start, sizeof(IndexFormat::MetaHeader), + reinterpret_cast(new_header.get())) != 0) { + undo_old_footer(); + return IndexError_WriteData; + } + uint64_t new_segment_meta_file_offset = + new_chain_start + sizeof(IndexFormat::MetaHeader); + uint64_t new_footer_file_offset = + new_chain_start + new_header->meta_footer_offset; + if (buffer_pool_handle_->write_meta( + new_footer_file_offset, sizeof(new_footer), + reinterpret_cast(&new_footer)) != 0) { + undo_old_footer(); + return IndexError_WriteData; + } + + // Snapshot the OLD chain's pre-commit state for rollback_step1. + // Captured by value because `chain` will be reassigned below to point + // at the new chain's slot in meta_chains_, and pop_back() during + // rollback would invalidate any reference into the old slot. + const auto saved_old_chain_footer = chain->footer; + const uint64_t saved_old_footer_file_offset = chain->footer_file_offset; + const uint64_t saved_current_header_start = current_header_start_offset_; + + // All split disk writes succeeded -- commit in-memory state. + chain->footer = linked_footer; // old chain keeps linked footer + chain_headers_.push_back(std::move(new_header)); + buffer_pool_buffers_.push_back(std::move(new_meta_buf)); + meta_chains_.push_back(MetaChain{ + new_chain_start, new_footer_file_offset, new_segment_meta_file_offset, + new_segments_meta_size, new_segments_meta_size, new_footer}); + footer_ = new_footer; + current_header_start_offset_ = new_chain_start; + + chain = &meta_chains_.back(); + header = chain_headers_.back().get(); + meta_buf = buffer_pool_buffers_.back().get(); + + // Install rollback for the committed split: pop the new chain and + // restore the old chain on both disk and memory. Captured fully by + // value (except `this`-via-member-access) so a subsequent reassignment + // of local pointers (chain/header/meta_buf) does not corrupt the + // closure. + rollback_step1 = [this, saved_footer_before_split, saved_old_chain_footer, + saved_old_footer_file_offset, + saved_current_header_start]() { + // 1. Restore old chain's footer on disk (drop forward link). + buffer_pool_handle_->write_meta( + saved_old_footer_file_offset, sizeof(saved_footer_before_split), + reinterpret_cast(&saved_footer_before_split)); + // 2. Pop the freshly-pushed new chain from in-memory containers. + // The associated unique_ptr / unique_ptr + // are released here. + if (!meta_chains_.empty()) meta_chains_.pop_back(); + if (!chain_headers_.empty()) chain_headers_.pop_back(); + if (!buffer_pool_buffers_.empty()) buffer_pool_buffers_.pop_back(); + // 3. Restore old chain's in-memory footer (its forward link was + // set to the now-popped new chain). + if (!meta_chains_.empty()) { + meta_chains_.back().footer = saved_old_chain_footer; + } + // 4. Restore footer_ and current_header_start_offset_ to their + // pre-split values. The on-disk file size is intentionally NOT + // shrunk: most buffer-pool backends offer no precise truncate, + // and the leftover bytes (the orphan new_header / new_footer + // region) are unreachable -- step 1 above has already removed + // the forward link from the old footer, so ParseToMapping() + // stops at the old chain and the leftover region is reusable + // by the next append_segment()'s split via file_size() + // realignment. + footer_ = saved_footer_before_split; + current_header_start_offset_ = saved_current_header_start; + }; + } + + // ---- Step 2: append SegmentMeta + ID into the (possibly new) last + // chain, then persist meta_buf and footer. + uint64_t new_data_index = footer_.content_size; + uint64_t new_seg_abs_offset = + chain->header_start_offset + header->content_offset + new_data_index; + uint64_t new_file_size = new_seg_abs_offset + padded_size; + if (new_file_size > buffer_pool_->file_size()) { + if (!buffer_pool_->extend_file(new_file_size)) { + return IndexError_Runtime; + } + } + + // Save mutable state for rollback if a disk write fails below. + const auto saved_footer = footer_; + const auto saved_chain_footer = chain->footer; + const auto saved_segment_ids_offset = chain->segment_ids_offset; + // Save the meta_buf regions that will be overwritten (SegmentMeta + // entry and segment-ID string) so they can be restored exactly, + // keeping the CRC consistent for a potential later flush_index(). + const size_t meta_entry_off = + sizeof(IndexFormat::SegmentMeta) * footer_.segment_count; + const uint32_t new_ids_off = + chain->segment_ids_offset - static_cast(id_size); + char saved_meta_entry[sizeof(IndexFormat::SegmentMeta)]; + std::memcpy(saved_meta_entry, meta_buf + meta_entry_off, + sizeof(IndexFormat::SegmentMeta)); + std::unique_ptr saved_id_bytes(new char[id_size]); + std::memcpy(saved_id_bytes.get(), meta_buf + new_ids_off, id_size); + + chain->segment_ids_offset -= static_cast(id_size); + IndexFormat::SegmentMeta *new_seg = + reinterpret_cast(meta_buf) + + footer_.segment_count; + new_seg->segment_id_offset = chain->segment_ids_offset; + new_seg->data_index = new_data_index; + new_seg->data_size = 0; + new_seg->data_crc = 0; + new_seg->padding_size = padded_size; + std::memcpy(meta_buf + chain->segment_ids_offset, id.c_str(), id_size); + + footer_.segment_count += 1; + footer_.content_size += padded_size; + footer_.total_size += padded_size; + footer_.segments_meta_crc = + ailego::Crc32c::Hash(meta_buf, chain->segment_meta_size, 0u); + IndexFormat::UpdateMetaFooter(&footer_, 0); + chain->footer = footer_; // sync in-memory copy for flush_index + + // Rollback helper: restore meta_buf, footer_, and chain fields to + // their pre-Step-2 values so that flush_index() writes consistent + // metadata and the next append_segment() can retry cleanly. + auto rollback_step2 = [&]() { + std::memcpy(meta_buf + meta_entry_off, saved_meta_entry, + sizeof(IndexFormat::SegmentMeta)); + std::memcpy(meta_buf + new_ids_off, saved_id_bytes.get(), id_size); + footer_ = saved_footer; + chain->footer = saved_chain_footer; + chain->segment_ids_offset = saved_segment_ids_offset; + }; + + if (buffer_pool_handle_->write_meta(chain->segment_meta_file_offset, + chain->segment_meta_size, + meta_buf) != 0) { + LOG_ERROR("append_segment: write segment_meta failed, file[%s]", + file_name_.c_str()); + rollback_step2(); + rollback_step1(); + return IndexError_WriteData; + } + if (buffer_pool_handle_->write_meta( + chain->footer_file_offset, sizeof(footer_), + reinterpret_cast(&footer_)) != 0) { + LOG_ERROR("append_segment: write footer failed, file[%s]", + file_name_.c_str()); + rollback_step2(); + rollback_step1(); + return IndexError_WriteData; + } + + // All disk writes succeeded -- commit remaining in-memory state. + // WrappedSegment instances already held by callers reference + // &segments_[name], whose address is stable across unordered_map + // insertions, so existing references stay valid. + segments_[id] = IndexMapping::SegmentInfo{ + IndexMapping::Segment{new_seg}, chain->header_start_offset, header}; + const size_t new_id = id_hash_.size(); + id_hash_[id] = new_id; + max_segment_size_ = std::max(max_segment_size_, padded_size); + + // ---- Step 3: With the segmented page table (C1), extend_file() + // already extended the page table in-place. No pool + // rotation or flush_all is needed — the same pool/handle + // continues to serve both old and new pages. return 0; } //! Test if a segment exists bool has_segment(const std::string &id) const { - std::lock_guard latch(mapping_mutex_); + std::shared_lock latch( + mapping_shards_[mapping_shard_id()].mtx); return (segments_.find(id) != segments_.end()); } - //! Get a segment from storage - IndexMapping::SegmentInfo *get_segment_info(const std::string &id) { - std::lock_guard latch(mapping_mutex_); - auto iter = segments_.find(id); - if (iter == segments_.end()) { - return nullptr; - } - return &iter->second; + private: + std::atomic index_dirty_{false}; + std::atomic pending_check_point_{0}; + + // Sharded reader-writer lock to eliminate cache-line ping-pong on the + // reader counter. Each concurrent reader hashes to its own shard, + // avoiding cross-core contention. Writers (append_segment/close_index) + // lock ALL shards to achieve exclusive access. + static constexpr size_t kMappingMutexShards = 32; + struct alignas(64) MutexShard { + std::shared_mutex mtx; + }; + mutable MutexShard mapping_shards_[kMappingMutexShards]{}; + + // Per-thread shard selection (stable hash, no syscall). + size_t mapping_shard_id() const { + thread_local const size_t id = + std::hash()(std::this_thread::get_id()) % + kMappingMutexShards; + return id; } - private: - bool index_dirty_{false}; - mutable std::mutex mapping_mutex_{}; + // RAII guard that locks ALL shards exclusively (for writers). + struct AllShardsExclusiveLatch { + MutexShard *shards_; + AllShardsExclusiveLatch(MutexShard *shards) : shards_(shards) { + for (size_t i = 0; i < kMappingMutexShards; ++i) shards_[i].mtx.lock(); + } + ~AllShardsExclusiveLatch() { + for (size_t i = 0; i < kMappingMutexShards; ++i) shards_[i].mtx.unlock(); + } + AllShardsExclusiveLatch(const AllShardsExclusiveLatch &) = delete; + AllShardsExclusiveLatch &operator=(const AllShardsExclusiveLatch &) = + delete; + }; // buffer manager std::string file_name_; - IndexFormat::MetaHeader header_{}; + // Per-chain owning copies of MetaHeader. segments_[name].segment_header + // points into one of these, so each chain's content_offset stays stable + // across re-parses (a single shared header_ would be overwritten by the + // next chain's ParseHeader and corrupt earlier-chain segment reads). + std::vector> chain_headers_{}; IndexFormat::MetaFooter footer_{}; std::unordered_map segments_{}; std::unordered_map id_hash_{}; @@ -515,6 +1267,30 @@ class BufferStorage : public IndexStorage { ailego::VecBufferPool::Pointer buffer_pool_{nullptr}; ailego::VecBufferPoolHandle::Pointer buffer_pool_handle_{nullptr}; uint64_t current_header_start_offset_{0u}; + + // Capacity (in bytes) of the segment metadata section written by + // init_index(). + uint32_t segment_meta_capacity_{4096u}; + + // Per-header-chain file offsets used by flush_index() to write updated + // segment metadata and footer back to the backing file after writes. + struct MetaChain { + uint64_t header_start_offset; + uint64_t footer_file_offset; + uint64_t segment_meta_file_offset; + uint32_t segment_meta_size; + // Lowest offset of segment ID strings within the segment_meta region. + // Equals segment_meta_size when no IDs have been written yet, and + // decreases by `strlen(id)+1` for each appended segment. Used by + // append_segment() to detect when the chain runs out of meta capacity + // and a new chain must be split off. + uint32_t segment_ids_offset; + // In-memory copy of this chain's MetaFooter. Kept in sync with disk + // by flush_index() and append_segment(), avoiding a pread per chain + // on every flush. + IndexFormat::MetaFooter footer; + }; + std::vector meta_chains_{}; }; INDEX_FACTORY_REGISTER_STORAGE(BufferStorage); diff --git a/src/db/index/segment/segment.cc b/src/db/index/segment/segment.cc index 7d3b2a56b..210d5a0d5 100644 --- a/src/db/index/segment/segment.cc +++ b/src/db/index/segment/segment.cc @@ -522,10 +522,20 @@ Status SegmentImpl::close() { } } vector_indexers_.clear(); + for (const auto &[name, indexers] : quant_vector_indexers_) { + for (auto indexer : indexers) { + indexer->Close(); + } + } + quant_vector_indexers_.clear(); for (auto [name, indexer] : memory_vector_indexers_) { indexer->Close(); } memory_vector_indexers_.clear(); + for (auto [name, indexer] : quant_memory_vector_indexers_) { + indexer->Close(); + } + quant_memory_vector_indexers_.clear(); return Status::OK(); } diff --git a/src/include/zvec/ailego/buffer/vector_page_table.h b/src/include/zvec/ailego/buffer/vector_page_table.h index 653b7af53..24c70838d 100644 --- a/src/include/zvec/ailego/buffer/vector_page_table.h +++ b/src/include/zvec/ailego/buffer/vector_page_table.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -42,25 +43,30 @@ namespace zvec { namespace ailego { +extern const size_t kVectorPageSize; + class VectorPageTable { - struct alignas(64) Entry { + struct Entry { std::atomic ref_count; - // True when this block has been enqueued in BlockEvictionQueue and has not - // yet been evicted. Used in release_block() to suppress duplicate - // insertions: once a block is in the eviction queue we never push it again - // until it is evicted (which resets the flag). std::atomic in_evict_queue; + std::atomic is_dirty; char *buffer; - size_t size; + size_t file_offset; }; public: - VectorPageTable() : entry_num_(0), entries_(nullptr) { + // Callback invoked by evict_block() to persist a dirty block before its + // memory is released. Signature: (block_id, buffer, size, file_offset). + using FlushCallback = std::function; + + VectorPageTable() { BlockEvictionQueue::get_instance().set_valid(this); } ~VectorPageTable() { BlockEvictionQueue::get_instance().set_invalid(this); - delete[] entries_; + for (size_t i = 0; i < segment_count_; ++i) { + delete[] segments_[i]; + } } VectorPageTable(const VectorPageTable &) = delete; @@ -70,36 +76,89 @@ class VectorPageTable { void init(size_t entry_num); + //! Extend the page table to cover at least `new_entry_num` entries. + //! Existing entries stay at their original addresses (no invalidation). + //! Safe to call while readers operate on existing pages. + void extend(size_t new_entry_num); + char *acquire_block(block_id_t block_id); void release_block(block_id_t block_id); void evict_block(block_id_t block_id); - char *set_block_acquired(block_id_t block_id, char *buffer, size_t size); + char *set_block_acquired(block_id_t block_id, char *buffer, + size_t file_offset); + + void set_flush_callback(FlushCallback cb) { + flush_callback_ = std::move(cb); + } + + //! Mark a loaded block as dirty so that it is persisted on eviction. + void mark_dirty(block_id_t block_id) { + assert(block_id < entry_num_); + entry_at(block_id).is_dirty.store(true, std::memory_order_relaxed); + } + + bool is_block_dirty(block_id_t block_id) const { + assert(block_id < entry_num_); + return entry_at(block_id).is_dirty.load(std::memory_order_relaxed); + } + + //! Flush a single dirty block without evicting it. Caller guarantees the + //! block is currently loaded (buffer != nullptr). + int flush_block(block_id_t block_id) { + assert(block_id < entry_num_); + Entry &e = entry_at(block_id); + char *buffer = e.buffer; + if (!buffer || !flush_callback_) { + return 0; + } + if (!e.is_dirty.load(std::memory_order_relaxed)) { + return 0; + } + int rc = flush_callback_(block_id, buffer, kVectorPageSize, e.file_offset); + if (rc == 0) { + e.is_dirty.store(false, std::memory_order_relaxed); + } + return rc; + } size_t entry_num() const { return entry_num_; } - // Returns true if the block has no active references (ref_count <= 0). - // Used by VecBufferPool destructor to assert all handles are released. bool is_released(block_id_t block_id) const { assert(block_id < entry_num_); - return entries_[block_id].ref_count.load(std::memory_order_relaxed) <= 0; + return entry_at(block_id).ref_count.load(std::memory_order_relaxed) <= 0; } - // Returns true if the block is no longer registered in the eviction queue - // (either it was never added, or it has already been evicted). - // Used by BlockEvictionQueue to detect stale queue entries. inline bool is_dead_block(BlockEvictionQueue::BlockType block) const { - Entry &entry = entries_[block.vector_block.first]; - return !entry.in_evict_queue.load(std::memory_order_relaxed); + const Entry &e = entry_at(block.vector_block.first); + return !e.in_evict_queue.load(std::memory_order_relaxed); } private: + // Segmented page table: entries are split across fixed-size segments so + // that extend() can grow the table without moving existing entries. + static constexpr size_t kSegmentShift = 16; // 65536 entries per segment + static constexpr size_t kSegmentSize = size_t{1} << kSegmentShift; + static constexpr size_t kSegmentMask = kSegmentSize - 1; + static constexpr size_t kMaxSegments = + 2048; // up to 128M entries (512GB @ 4K) + size_t entry_num_{0}; - Entry *entries_{nullptr}; + size_t segment_count_{0}; + Entry *segments_[kMaxSegments]{}; + + Entry &entry_at(size_t idx) { + return segments_[idx >> kSegmentShift][idx & kSegmentMask]; + } + const Entry &entry_at(size_t idx) const { + return segments_[idx >> kSegmentShift][idx & kSegmentMask]; + } + + FlushCallback flush_callback_{}; }; class VecBufferPoolHandle; @@ -108,12 +167,15 @@ class VecBufferPool { public: typedef std::shared_ptr Pointer; - VecBufferPool(const std::string &filename); + static constexpr size_t kMutexBucketCount = 64UL * 1024UL; + + VecBufferPool(const std::string &filename, bool writable = false, + bool create = false); ~VecBufferPool() { + // Flush any remaining dirty blocks before tearing down memory/fd so that + // writes are not silently lost. Safe to call even in read-only mode. + (void)this->flush_all(); for (size_t i = 0; i < page_table_.entry_num(); ++i) { - // A positive ref_count means a VecBufferPoolHandle is still alive, - // which is a contract violation: all handles must be destroyed before - // the pool itself is destroyed. assert(page_table_.is_released(i)); page_table_.evict_block(i); } @@ -124,15 +186,37 @@ class VecBufferPool { #endif } - int init(size_t segment_count); + int init(); VecBufferPoolHandle get_handle(); - char *acquire_buffer(block_id_t block_id, size_t offset, size_t size, - int retry = 0); + char *acquire_buffer(block_id_t page_id, int retry = 0); int get_meta(size_t offset, size_t length, char *buffer); + //! Write a contiguous range via the page cache; marks touched pages dirty. + //! Returns 0 on success, -1 on failure (e.g. read-only pool or I/O error). + int write_range(size_t file_offset, size_t length, const char *src); + + //! Write raw bytes directly via pwrite, bypassing the page cache. Used for + //! metadata regions (header/footer/segments_meta) which are only read via + //! get_meta() and never cached. + int write_meta(size_t offset, size_t length, const char *buffer); + + //! Iterate all entries and persist any dirty blocks to disk. Safe to call + //! repeatedly; no-op in read-only mode. + int flush_all(); + + //! Extend the backing file to `new_size` bytes via ftruncate (no-op if + //! already >= new_size), refresh the cached file_size_, and extend the + //! page_table to cover the new range. Returns true on success, false on + //! a read-only pool or I/O failure. + bool extend_file(size_t new_size); + + bool writable() const { + return writable_; + } + size_t file_size() const { return file_size_; } @@ -141,16 +225,13 @@ class VecBufferPool { int fd_; size_t file_size_; std::string file_name_; + bool writable_{false}; public: VectorPageTable page_table_; private: - // Contiguous array of per-block mutexes (one allocation, cache-friendly for - // the cold-path load in acquire_buffer). block_mutexes_count_ mirrors the - // array length because unique_ptr has no built-in size accessor. std::unique_ptr block_mutexes_{}; - size_t block_mutexes_count_{0}; }; class VecBufferPoolHandle { @@ -162,10 +243,20 @@ class VecBufferPoolHandle { typedef std::shared_ptr Pointer; - char *get_block(size_t offset, size_t size, size_t block_id); + char *get_single_page(size_t file_offset, size_t len, size_t &out_page_id); + + bool read_range(size_t file_offset, size_t len, char *out); int get_meta(size_t offset, size_t length, char *buffer); + int write_range(size_t file_offset, size_t len, const char *src); + + int write_meta(size_t offset, size_t length, const char *buffer); + + int flush_all(); + + bool writable() const; + void release_one(block_id_t block_id); void acquire_one(block_id_t block_id); diff --git a/src/include/zvec/core/framework/index_storage.h b/src/include/zvec/core/framework/index_storage.h index ac1052e86..3da2e6669 100644 --- a/src/include/zvec/core/framework/index_storage.h +++ b/src/include/zvec/core/framework/index_storage.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -34,6 +35,7 @@ class IndexStorage : public IndexModule { MBT_UNKNOWN = 0, MBT_MMAP = 1, MBT_BUFFERPOOL = 2, + MBT_HEAP_SCRATCH = 3, }; MemoryBlock() {} @@ -46,6 +48,20 @@ class IndexStorage : public IndexModule { } MemoryBlock(void *data) : type_(MemoryBlockType::MBT_MMAP), data_(data) {} + //! Build an HEAP_SCRATCH MemoryBlock that owns `owned` (allocated via + //! ailego_malloc / ailego_aligned_malloc). `size` is the byte length of + //! the buffer and is required so that copy construction / copy + //! assignment can deep-copy the buffer instead of aliasing it (a shallow + //! copy would result in use-after-free once the original block is + //! destructed and frees the buffer). + static MemoryBlock MakeOwned(void *owned, size_t size) { + MemoryBlock mb; + mb.type_ = MemoryBlockType::MBT_HEAP_SCRATCH; + mb.data_ = owned; + mb.scratch_size_ = size; + return mb; + } + MemoryBlock(const MemoryBlock &rhs) { switch (rhs.type_) { case MemoryBlockType::MBT_MMAP: @@ -55,6 +71,12 @@ class IndexStorage : public IndexModule { this->reset(rhs.buffer_pool_handle_, rhs.buffer_block_id_, rhs.data_); buffer_pool_handle_->acquire_one(buffer_block_id_); break; + case MemoryBlockType::MBT_HEAP_SCRATCH: + // Deep copy: each owner must hold its own buffer, otherwise the + // first destructor frees the buffer and leaves the surviving + // copies dangling. + deep_copy_from(rhs); + break; default: break; } @@ -71,6 +93,14 @@ class IndexStorage : public IndexModule { rhs.buffer_pool_handle_ = nullptr; rhs.type_ = MemoryBlockType::MBT_UNKNOWN; break; + case MemoryBlockType::MBT_HEAP_SCRATCH: + type_ = MemoryBlockType::MBT_HEAP_SCRATCH; + data_ = rhs.data_; + scratch_size_ = rhs.scratch_size_; + rhs.data_ = nullptr; + rhs.scratch_size_ = 0; + rhs.type_ = MemoryBlockType::MBT_UNKNOWN; + break; default: break; } @@ -87,6 +117,10 @@ class IndexStorage : public IndexModule { rhs.data_); buffer_pool_handle_->acquire_one(buffer_block_id_); break; + case MemoryBlockType::MBT_HEAP_SCRATCH: + release_current(); + deep_copy_from(rhs); + break; default: break; } @@ -106,6 +140,15 @@ class IndexStorage : public IndexModule { rhs.buffer_pool_handle_ = nullptr; rhs.type_ = MemoryBlockType::MBT_UNKNOWN; break; + case MemoryBlockType::MBT_HEAP_SCRATCH: + release_current(); + type_ = MemoryBlockType::MBT_HEAP_SCRATCH; + data_ = rhs.data_; + scratch_size_ = rhs.scratch_size_; + rhs.data_ = nullptr; + rhs.scratch_size_ = 0; + rhs.type_ = MemoryBlockType::MBT_UNKNOWN; + break; default: break; } @@ -122,10 +165,14 @@ class IndexStorage : public IndexModule { buffer_pool_handle_->release_one(buffer_block_id_); } break; + case MemoryBlockType::MBT_HEAP_SCRATCH: + release_owned(); + break; default: break; } data_ = nullptr; + scratch_size_ = 0; } const void *data() const { @@ -136,6 +183,8 @@ class IndexStorage : public IndexModule { void *data) { if (type_ == MemoryBlockType::MBT_BUFFERPOOL) { buffer_pool_handle_->release_one(buffer_block_id_); + } else if (type_ == MemoryBlockType::MBT_HEAP_SCRATCH) { + release_owned(); } type_ = MemoryBlockType::MBT_BUFFERPOOL; buffer_pool_handle_ = buffer_pool_handle; @@ -147,6 +196,8 @@ class IndexStorage : public IndexModule { if (type_ == MemoryBlockType::MBT_BUFFERPOOL) { buffer_pool_handle_->release_one(buffer_block_id_); buffer_pool_handle_ = nullptr; + } else if (type_ == MemoryBlockType::MBT_HEAP_SCRATCH) { + release_owned(); } type_ = MemoryBlockType::MBT_MMAP; data_ = data; @@ -156,6 +207,56 @@ class IndexStorage : public IndexModule { void *data_{nullptr}; mutable ailego::VecBufferPoolHandle *buffer_pool_handle_{nullptr}; size_t buffer_block_id_{0}; + //! Byte size of the heap-scratch buffer pointed to by `data_`; only used + //! when type_ == MBT_HEAP_SCRATCH. Required for safe deep-copy on + //! copy-construction / copy-assignment of HEAP_SCRATCH blocks. + size_t scratch_size_{0}; + + private: + void release_owned() { + if (data_) { + ailego_free(data_); + data_ = nullptr; + } + scratch_size_ = 0; + } + + //! Drop whatever the current MemoryBlock holds, regardless of type, so + //! that the slot is ready to receive new ownership. Mirrors what the + //! destructor would do (minus zeroing data_) but leaves the type alone + //! for the caller to overwrite immediately afterwards. + void release_current() { + switch (type_) { + case MemoryBlockType::MBT_BUFFERPOOL: + if (buffer_pool_handle_) { + buffer_pool_handle_->release_one(buffer_block_id_); + buffer_pool_handle_ = nullptr; + } + break; + case MemoryBlockType::MBT_HEAP_SCRATCH: + release_owned(); + break; + default: + break; + } + data_ = nullptr; + type_ = MemoryBlockType::MBT_UNKNOWN; + } + + //! Allocate a fresh buffer of the same size as `rhs.scratch_size_`, + //! memcpy `rhs.data_` into it, and become the new owner. Used by the + //! HEAP_SCRATCH copy ctor / copy assignment so the original and the + //! copy each free their own buffer independently. + void deep_copy_from(const MemoryBlock &rhs) { + type_ = MemoryBlockType::MBT_HEAP_SCRATCH; + scratch_size_ = rhs.scratch_size_; + if (scratch_size_ > 0 && rhs.data_) { + data_ = ailego_malloc(scratch_size_); + std::memcpy(data_, rhs.data_, scratch_size_); + } else { + data_ = nullptr; + } + } }; struct SegmentData { diff --git a/tests/core/algorithm/flat/flat_streamer_buffer_test.cc b/tests/core/algorithm/flat/flat_streamer_buffer_test.cc index 441853c86..e3fce1f24 100644 --- a/tests/core/algorithm/flat/flat_streamer_buffer_test.cc +++ b/tests/core/algorithm/flat/flat_streamer_buffer_test.cc @@ -168,6 +168,251 @@ TEST_F(FlatStreamerTest, TestLinearSearch) { read_streamer.reset(); } +TEST_F(FlatStreamerTest, TestLinearSearchBuffer) { + MemoryLimitPool::get_instance().init(2 * 1024UL * 1024UL * 1024UL); + IndexStreamer::Pointer write_streamer = + IndexFactory::CreateStreamer("FlatStreamer"); + ASSERT_TRUE(write_streamer != nullptr); + + Params params; + ASSERT_EQ(0, write_streamer->init(*index_meta_ptr_, params)); + auto storage = IndexFactory::CreateStorage("BufferStorage"); + ASSERT_NE(nullptr, storage); + Params stg_params; + ASSERT_EQ(0, storage->init(stg_params)); + ASSERT_EQ(0, storage->open(dir_ + "Test/LinearSearchBuffer", true)); + ASSERT_EQ(0, write_streamer->open(storage)); + + auto ctx = write_streamer->create_context(); + ASSERT_TRUE(!!ctx); + + size_t cnt = 10000UL; + IndexQueryMeta qmeta(IndexMeta::DT_FP32, dim); + for (size_t i = 0; i < cnt; i++) { + NumericalVector vec(dim); + for (size_t j = 0; j < dim; ++j) { + vec[j] = i; + } + write_streamer->add_impl(i, vec.data(), qmeta, ctx); + } + write_streamer->flush(0UL); + write_streamer->close(); + write_streamer.reset(); + storage->close(); + + IndexStreamer::Pointer read_streamer = + IndexFactory::CreateStreamer("FlatStreamer"); + ASSERT_EQ(0, read_streamer->init(*index_meta_ptr_, params)); + auto read_storage = IndexFactory::CreateStorage("BufferStorage"); + ASSERT_NE(nullptr, read_storage); + ASSERT_EQ(0, read_storage->init(stg_params)); + ASSERT_EQ(0, read_storage->open(dir_ + "Test/LinearSearchBuffer", false)); + ASSERT_EQ(0, read_streamer->open(read_storage)); + size_t topk = 3; + auto provider = read_streamer->create_provider(); + for (size_t i = 0; i < cnt; i += 1) { + NumericalVector vec(dim); + for (size_t j = 0; j < dim; ++j) { + vec[j] = i; + } + ctx->set_topk(topk); + ASSERT_EQ(0, read_streamer->search_impl(vec.data(), qmeta, ctx)); + auto &result1 = ctx->result(); + ASSERT_EQ(topk, result1.size()); + IndexStorage::MemoryBlock block; + ASSERT_EQ(0, provider->get_vector(result1[0].key(), block)); + const float *data = (float *)block.data(); + for (size_t j = 0; j < dim; ++j) { + ASSERT_FLOAT_EQ(data[j], i); + } + ASSERT_EQ(i, result1[0].key()); + + for (size_t j = 0; j < dim; ++j) { + vec[j] = i + 0.1f; + } + ctx->set_topk(topk); + ASSERT_EQ(0, read_streamer->search_impl(vec.data(), qmeta, ctx)); + auto &result2 = ctx->result(); + ASSERT_EQ(topk, result2.size()); + ASSERT_EQ(i, result2[0].key()); + ASSERT_EQ(i == cnt - 1 ? i - 1 : i + 1, result2[1].key()); + ASSERT_EQ(i == 0 ? 2 : (i == cnt - 1 ? i - 2 : i - 1), result2[2].key()); + } + + ctx->set_topk(100U); + NumericalVector vec(dim); + for (size_t j = 0; j < dim; ++j) { + vec[j] = 10.1f; + } + ASSERT_EQ(0, read_streamer->search_bf_impl(vec.data(), qmeta, ctx)); + auto &result = ctx->result(); + ASSERT_EQ(100U, result.size()); + ASSERT_EQ(10, result[0].key()); + ASSERT_EQ(11, result[1].key()); + ASSERT_EQ(5, result[10].key()); + ASSERT_EQ(0, result[20].key()); + ASSERT_EQ(30, result[30].key()); + ASSERT_EQ(35, result[35].key()); + ASSERT_EQ(99, result[99].key()); + + ElapsedTime elapsed_time; + for (size_t i = 0; i < cnt; i += 1) { + NumericalVector vec(dim); + for (size_t j = 0; j < dim; ++j) { + vec[j] = i; + } + ctx->set_topk(topk); + ASSERT_EQ(0, read_streamer->search_impl(vec.data(), qmeta, ctx)); + auto &result1 = ctx->result(); + ASSERT_EQ(topk, result1.size()); + IndexStorage::MemoryBlock block; + ASSERT_EQ(0, provider->get_vector(result1[0].key(), block)); + const float *data = (float *)block.data(); + for (size_t j = 0; j < dim; ++j) { + ASSERT_FLOAT_EQ(data[j], i); + } + ASSERT_EQ(i, result1[0].key()); + + for (size_t j = 0; j < dim; ++j) { + vec[j] = i + 0.1f; + } + ctx->set_topk(topk); + ASSERT_EQ(0, read_streamer->search_impl(vec.data(), qmeta, ctx)); + auto &result2 = ctx->result(); + ASSERT_EQ(topk, result2.size()); + ASSERT_EQ(i, result2[0].key()); + ASSERT_EQ(i == cnt - 1 ? i - 1 : i + 1, result2[1].key()); + ASSERT_EQ(i == 0 ? 2 : (i == cnt - 1 ? i - 2 : i - 1), result2[2].key()); + } + cout << "Elapsed time: " << elapsed_time.milli_seconds() << " ms" << endl; + + read_streamer->close(); + read_streamer.reset(); +} + +TEST_F(FlatStreamerTest, TestLinearSearchBufferMMap) { + MemoryLimitPool::get_instance().init(2 * 1024UL * 1024UL * 1024UL); + IndexStreamer::Pointer write_streamer = + IndexFactory::CreateStreamer("FlatStreamer"); + ASSERT_TRUE(write_streamer != nullptr); + + Params params; + ASSERT_EQ(0, write_streamer->init(*index_meta_ptr_, params)); + auto storage = IndexFactory::CreateStorage("BufferStorage"); + ASSERT_NE(nullptr, storage); + Params stg_params; + ASSERT_EQ(0, storage->init(stg_params)); + ASSERT_EQ(0, storage->open(dir_ + "Test/LinearSearchBuffer", true)); + ASSERT_EQ(0, write_streamer->open(storage)); + + auto ctx = write_streamer->create_context(); + ASSERT_TRUE(!!ctx); + + size_t cnt = 10000UL; + IndexQueryMeta qmeta(IndexMeta::DT_FP32, dim); + for (size_t i = 0; i < cnt; i++) { + NumericalVector vec(dim); + for (size_t j = 0; j < dim; ++j) { + vec[j] = i; + } + write_streamer->add_impl(i, vec.data(), qmeta, ctx); + } + write_streamer->flush(0UL); + write_streamer->close(); + write_streamer.reset(); + storage->close(); + + IndexStreamer::Pointer read_streamer = + IndexFactory::CreateStreamer("FlatStreamer"); + ASSERT_EQ(0, read_streamer->init(*index_meta_ptr_, params)); + auto read_storage = IndexFactory::CreateStorage("MMapFileStorage"); + ASSERT_NE(nullptr, read_storage); + ASSERT_EQ(0, read_storage->init(stg_params)); + ASSERT_EQ(0, read_storage->open(dir_ + "Test/LinearSearchBuffer", false)); + ASSERT_EQ(0, read_streamer->open(read_storage)); + size_t topk = 3; + auto provider = read_streamer->create_provider(); + for (size_t i = 0; i < cnt; i += 1) { + NumericalVector vec(dim); + for (size_t j = 0; j < dim; ++j) { + vec[j] = i; + } + ctx->set_topk(topk); + ASSERT_EQ(0, read_streamer->search_impl(vec.data(), qmeta, ctx)); + auto &result1 = ctx->result(); + ASSERT_EQ(topk, result1.size()); + IndexStorage::MemoryBlock block; + ASSERT_EQ(0, provider->get_vector(result1[0].key(), block)); + const float *data = (float *)block.data(); + for (size_t j = 0; j < dim; ++j) { + ASSERT_FLOAT_EQ(data[j], i); + } + ASSERT_EQ(i, result1[0].key()); + + for (size_t j = 0; j < dim; ++j) { + vec[j] = i + 0.1f; + } + ctx->set_topk(topk); + ASSERT_EQ(0, read_streamer->search_impl(vec.data(), qmeta, ctx)); + auto &result2 = ctx->result(); + ASSERT_EQ(topk, result2.size()); + ASSERT_EQ(i, result2[0].key()); + ASSERT_EQ(i == cnt - 1 ? i - 1 : i + 1, result2[1].key()); + ASSERT_EQ(i == 0 ? 2 : (i == cnt - 1 ? i - 2 : i - 1), result2[2].key()); + } + + ctx->set_topk(100U); + NumericalVector vec(dim); + for (size_t j = 0; j < dim; ++j) { + vec[j] = 10.1f; + } + ASSERT_EQ(0, read_streamer->search_bf_impl(vec.data(), qmeta, ctx)); + auto &result = ctx->result(); + ASSERT_EQ(100U, result.size()); + ASSERT_EQ(10, result[0].key()); + ASSERT_EQ(11, result[1].key()); + ASSERT_EQ(5, result[10].key()); + ASSERT_EQ(0, result[20].key()); + ASSERT_EQ(30, result[30].key()); + ASSERT_EQ(35, result[35].key()); + ASSERT_EQ(99, result[99].key()); + + ElapsedTime elapsed_time; + for (size_t i = 0; i < cnt; i += 1) { + NumericalVector vec(dim); + for (size_t j = 0; j < dim; ++j) { + vec[j] = i; + } + ctx->set_topk(topk); + ASSERT_EQ(0, read_streamer->search_impl(vec.data(), qmeta, ctx)); + auto &result1 = ctx->result(); + ASSERT_EQ(topk, result1.size()); + IndexStorage::MemoryBlock block; + ASSERT_EQ(0, provider->get_vector(result1[0].key(), block)); + const float *data = (float *)block.data(); + for (size_t j = 0; j < dim; ++j) { + ASSERT_FLOAT_EQ(data[j], i); + } + ASSERT_EQ(i, result1[0].key()); + + for (size_t j = 0; j < dim; ++j) { + vec[j] = i + 0.1f; + } + ctx->set_topk(topk); + ASSERT_EQ(0, read_streamer->search_impl(vec.data(), qmeta, ctx)); + auto &result2 = ctx->result(); + ASSERT_EQ(topk, result2.size()); + ASSERT_EQ(i, result2[0].key()); + ASSERT_EQ(i == cnt - 1 ? i - 1 : i + 1, result2[1].key()); + ASSERT_EQ(i == 0 ? 2 : (i == cnt - 1 ? i - 2 : i - 1), result2[2].key()); + } + cout << "Elapsed time: " << elapsed_time.milli_seconds() << " ms" << endl; + + read_streamer->close(); + read_streamer.reset(); +} + + TEST_F(FlatStreamerTest, TestLinearSearchWithLRU) { MemoryLimitPool::get_instance().init(100 * 1024UL * 1024UL); #ifdef __ANDROID__ @@ -351,7 +596,6 @@ TEST_F(FlatStreamerTest, TestLinearSearchMMap) { ASSERT_EQ(topk, result1.size()); IndexStorage::MemoryBlock block; ASSERT_EQ(0, provider->get_vector(result1[0].key(), block)); - const float *data = (float *)block.data(); for (size_t j = 0; j < dim; ++j) { const float *data = (float *)provider->get_vector(result1[0].key()); EXPECT_FLOAT_EQ(data[j], i); diff --git a/tests/core/algorithm/hnsw/hnsw_streamer_buffer_test.cc b/tests/core/algorithm/hnsw/hnsw_streamer_buffer_test.cc index cf3093e22..cd21ff912 100644 --- a/tests/core/algorithm/hnsw/hnsw_streamer_buffer_test.cc +++ b/tests/core/algorithm/hnsw/hnsw_streamer_buffer_test.cc @@ -171,6 +171,254 @@ TEST_F(HnswStreamerTest, TestHnswSearch) { cout << "Elapsed time: " << elapsed_time.milli_seconds() << " ms" << endl; } +TEST_F(HnswStreamerTest, TestHnswSearchBuffer) { + MemoryLimitPool::get_instance().init(2 * 1024UL * 1024UL * 1024UL); + IndexStreamer::Pointer write_streamer = + IndexFactory::CreateStreamer("HnswStreamer"); + ASSERT_TRUE(write_streamer != nullptr); + + Params params; + params.set(PARAM_HNSW_STREAMER_GET_VECTOR_ENABLE, true); + + ASSERT_EQ(0, write_streamer->init(*index_meta_ptr_, params)); + auto storage = IndexFactory::CreateStorage("BufferStorage"); + ASSERT_NE(nullptr, storage); + Params stg_params; + ASSERT_EQ(0, storage->init(stg_params)); + ASSERT_EQ(0, storage->open(dir_ + "Test/TestHnswSearchBuffer", true)); + ASSERT_EQ(0, write_streamer->open(storage)); + + auto ctx = write_streamer->create_context(); + ASSERT_TRUE(!!ctx); + + size_t cnt = 10000UL; + IndexQueryMeta qmeta(IndexMeta::DT_FP32, dim); + for (size_t i = 0; i < cnt; i++) { + NumericalVector vec(dim); + for (size_t j = 0; j < dim; ++j) { + vec[j] = i; + } + write_streamer->add_impl(i, vec.data(), qmeta, ctx); + } + write_streamer->flush(0UL); + write_streamer->close(); + write_streamer.reset(); + storage->close(); + + IndexStreamer::Pointer read_streamer = + IndexFactory::CreateStreamer("HnswStreamer"); + ASSERT_EQ(0, read_streamer->init(*index_meta_ptr_, params)); + auto read_storage = IndexFactory::CreateStorage("BufferStorage"); + ASSERT_NE(nullptr, read_storage); + ASSERT_EQ(0, read_storage->init(stg_params)); + ASSERT_EQ(0, read_storage->open(dir_ + "Test/TestHnswSearchBuffer", false)); + ASSERT_EQ(0, read_streamer->open(read_storage)); + size_t topk = 3; + auto provider = read_streamer->create_provider(); + for (size_t i = 0; i < cnt; i += 1) { + NumericalVector vec(dim); + for (size_t j = 0; j < dim; ++j) { + vec[j] = i; + } + ctx->set_topk(topk); + ASSERT_EQ(0, read_streamer->search_impl(vec.data(), qmeta, ctx)); + auto &result1 = ctx->result(); + ASSERT_EQ(topk, result1.size()); + IndexStorage::MemoryBlock block; + ASSERT_EQ(0, provider->get_vector(result1[0].key(), block)); + const float *data = (float *)block.data(); + for (size_t j = 0; j < dim; ++j) { + ASSERT_EQ(data[j], i); + } + ASSERT_EQ(i, result1[0].key()); + + for (size_t j = 0; j < dim; ++j) { + vec[j] = i + 0.1f; + } + ctx->set_topk(topk); + ASSERT_EQ(0, read_streamer->search_impl(vec.data(), qmeta, ctx)); + auto &result2 = ctx->result(); + ASSERT_EQ(topk, result2.size()); + ASSERT_EQ(i, result2[0].key()); + ASSERT_EQ(i == cnt - 1 ? i - 1 : i + 1, result2[1].key()); + ASSERT_EQ(i == 0 ? 2 : (i == cnt - 1 ? i - 2 : i - 1), result2[2].key()); + } + + ctx->set_topk(100U); + NumericalVector vec(dim); + for (size_t j = 0; j < dim; ++j) { + vec[j] = 10.1f; + } + ASSERT_EQ(0, read_streamer->search_bf_impl(vec.data(), qmeta, ctx)); + auto &result = ctx->result(); + ASSERT_EQ(100U, result.size()); + ASSERT_EQ(10, result[0].key()); + ASSERT_EQ(11, result[1].key()); + ASSERT_EQ(5, result[10].key()); + ASSERT_EQ(0, result[20].key()); + ASSERT_EQ(30, result[30].key()); + ASSERT_EQ(35, result[35].key()); + ASSERT_EQ(99, result[99].key()); + + ElapsedTime elapsed_time; + for (size_t i = 0; i < cnt; i += 1) { + NumericalVector vec(dim); + for (size_t j = 0; j < dim; ++j) { + vec[j] = i; + } + ctx->set_topk(topk); + ASSERT_EQ(0, read_streamer->search_impl(vec.data(), qmeta, ctx)); + auto &result1 = ctx->result(); + ASSERT_EQ(topk, result1.size()); + IndexStorage::MemoryBlock block; + ASSERT_EQ(0, provider->get_vector(result1[0].key(), block)); + const float *data = (float *)block.data(); + for (size_t j = 0; j < dim; ++j) { + ASSERT_EQ(data[j], i); + } + ASSERT_EQ(i, result1[0].key()); + + for (size_t j = 0; j < dim; ++j) { + vec[j] = i + 0.1f; + } + ctx->set_topk(topk); + ASSERT_EQ(0, read_streamer->search_impl(vec.data(), qmeta, ctx)); + auto &result2 = ctx->result(); + ASSERT_EQ(topk, result2.size()); + ASSERT_EQ(i, result2[0].key()); + ASSERT_EQ(i == cnt - 1 ? i - 1 : i + 1, result2[1].key()); + ASSERT_EQ(i == 0 ? 2 : (i == cnt - 1 ? i - 2 : i - 1), result2[2].key()); + } + + read_streamer->close(); + read_streamer.reset(); + cout << "Elapsed time: " << elapsed_time.milli_seconds() << " ms" << endl; +} + +TEST_F(HnswStreamerTest, TestHnswSearchBufferMMap) { + MemoryLimitPool::get_instance().init(2 * 1024UL * 1024UL * 1024UL); + IndexStreamer::Pointer write_streamer = + IndexFactory::CreateStreamer("HnswStreamer"); + ASSERT_TRUE(write_streamer != nullptr); + + Params params; + params.set(PARAM_HNSW_STREAMER_GET_VECTOR_ENABLE, true); + + ASSERT_EQ(0, write_streamer->init(*index_meta_ptr_, params)); + auto storage = IndexFactory::CreateStorage("BufferStorage"); + ASSERT_NE(nullptr, storage); + Params stg_params; + ASSERT_EQ(0, storage->init(stg_params)); + ASSERT_EQ(0, storage->open(dir_ + "Test/TestHnswSearchBufferMMap", true)); + ASSERT_EQ(0, write_streamer->open(storage)); + + auto ctx = write_streamer->create_context(); + ASSERT_TRUE(!!ctx); + + size_t cnt = 10000UL; + IndexQueryMeta qmeta(IndexMeta::DT_FP32, dim); + for (size_t i = 0; i < cnt; i++) { + NumericalVector vec(dim); + for (size_t j = 0; j < dim; ++j) { + vec[j] = i; + } + write_streamer->add_impl(i, vec.data(), qmeta, ctx); + } + write_streamer->flush(0UL); + write_streamer->close(); + write_streamer.reset(); + storage->close(); + + IndexStreamer::Pointer read_streamer = + IndexFactory::CreateStreamer("HnswStreamer"); + ASSERT_EQ(0, read_streamer->init(*index_meta_ptr_, params)); + auto read_storage = IndexFactory::CreateStorage("MMapFileStorage"); + ASSERT_NE(nullptr, read_storage); + ASSERT_EQ(0, read_storage->init(stg_params)); + ASSERT_EQ(0, read_storage->open(dir_ + "Test/TestHnswSearchBufferMMap", false)); + ASSERT_EQ(0, read_streamer->open(read_storage)); + size_t topk = 3; + auto provider = read_streamer->create_provider(); + for (size_t i = 0; i < cnt; i += 1) { + NumericalVector vec(dim); + for (size_t j = 0; j < dim; ++j) { + vec[j] = i; + } + ctx->set_topk(topk); + ASSERT_EQ(0, read_streamer->search_impl(vec.data(), qmeta, ctx)); + auto &result1 = ctx->result(); + ASSERT_EQ(topk, result1.size()); + IndexStorage::MemoryBlock block; + ASSERT_EQ(0, provider->get_vector(result1[0].key(), block)); + const float *data = (float *)block.data(); + for (size_t j = 0; j < dim; ++j) { + ASSERT_EQ(data[j], i); + } + ASSERT_EQ(i, result1[0].key()); + + for (size_t j = 0; j < dim; ++j) { + vec[j] = i + 0.1f; + } + ctx->set_topk(topk); + ASSERT_EQ(0, read_streamer->search_impl(vec.data(), qmeta, ctx)); + auto &result2 = ctx->result(); + ASSERT_EQ(topk, result2.size()); + ASSERT_EQ(i, result2[0].key()); + ASSERT_EQ(i == cnt - 1 ? i - 1 : i + 1, result2[1].key()); + ASSERT_EQ(i == 0 ? 2 : (i == cnt - 1 ? i - 2 : i - 1), result2[2].key()); + } + + ctx->set_topk(100U); + NumericalVector vec(dim); + for (size_t j = 0; j < dim; ++j) { + vec[j] = 10.1f; + } + ASSERT_EQ(0, read_streamer->search_bf_impl(vec.data(), qmeta, ctx)); + auto &result = ctx->result(); + ASSERT_EQ(100U, result.size()); + ASSERT_EQ(10, result[0].key()); + ASSERT_EQ(11, result[1].key()); + ASSERT_EQ(5, result[10].key()); + ASSERT_EQ(0, result[20].key()); + ASSERT_EQ(30, result[30].key()); + ASSERT_EQ(35, result[35].key()); + ASSERT_EQ(99, result[99].key()); + + ElapsedTime elapsed_time; + for (size_t i = 0; i < cnt; i += 1) { + NumericalVector vec(dim); + for (size_t j = 0; j < dim; ++j) { + vec[j] = i; + } + ctx->set_topk(topk); + ASSERT_EQ(0, read_streamer->search_impl(vec.data(), qmeta, ctx)); + auto &result1 = ctx->result(); + ASSERT_EQ(topk, result1.size()); + IndexStorage::MemoryBlock block; + ASSERT_EQ(0, provider->get_vector(result1[0].key(), block)); + const float *data = (float *)block.data(); + for (size_t j = 0; j < dim; ++j) { + ASSERT_EQ(data[j], i); + } + ASSERT_EQ(i, result1[0].key()); + + for (size_t j = 0; j < dim; ++j) { + vec[j] = i + 0.1f; + } + ctx->set_topk(topk); + ASSERT_EQ(0, read_streamer->search_impl(vec.data(), qmeta, ctx)); + auto &result2 = ctx->result(); + ASSERT_EQ(topk, result2.size()); + ASSERT_EQ(i, result2[0].key()); + ASSERT_EQ(i == cnt - 1 ? i - 1 : i + 1, result2[1].key()); + ASSERT_EQ(i == 0 ? 2 : (i == cnt - 1 ? i - 2 : i - 1), result2[2].key()); + } + + read_streamer->close(); + read_streamer.reset(); + cout << "Elapsed time: " << elapsed_time.milli_seconds() << " ms" << endl; +} + TEST_F(HnswStreamerTest, TestHnswSearchMMap) { IndexStreamer::Pointer write_streamer = IndexFactory::CreateStreamer("HnswStreamer");