diff --git a/src/include/storage/buffer_manager/buffer_manager.h b/src/include/storage/buffer_manager/buffer_manager.h index 5d51ab0491..46b9140714 100644 --- a/src/include/storage/buffer_manager/buffer_manager.h +++ b/src/include/storage/buffer_manager/buffer_manager.h @@ -43,6 +43,9 @@ struct EvictionCandidate { static bool isSecondChanceEvictable(uint64_t currPageStateAndVersion) { return PageState::getState(currPageStateAndVersion) == PageState::UNLOCKED; } + static bool isEvicted(uint64_t currPageStateAndVersion) { + return PageState::getState(currPageStateAndVersion) == PageState::EVICTED; + } bool operator==(const EvictionCandidate& other) const { return fileIdx == other.fileIdx && pageIdx == other.pageIdx; diff --git a/src/include/storage/buffer_manager/page_state.h b/src/include/storage/buffer_manager/page_state.h index 8d504b0eb0..3bc83f765e 100644 --- a/src/include/storage/buffer_manager/page_state.h +++ b/src/include/storage/buffer_manager/page_state.h @@ -41,7 +41,8 @@ class PageState { } void spinLock(uint64_t oldStateAndVersion) { while (true) { - if (tryLock(oldStateAndVersion)) { + if (stateAndVersion.compare_exchange_strong(oldStateAndVersion, + updateStateWithSameVersion(oldStateAndVersion, LOCKED))) { return; } } diff --git a/src/planner/plan/plan_copy.cpp b/src/planner/plan/plan_copy.cpp index 75405d674f..7c3b499601 100644 --- a/src/planner/plan/plan_copy.cpp +++ b/src/planner/plan/plan_copy.cpp @@ -95,7 +95,8 @@ LogicalPlan Planner::planCopyRelFrom(const BoundCopyFromInfo* info) { case ScanSourceType::QUERY: { auto& querySource = info->source->constCast(); plan = planQuery(*querySource.statement); - if (plan.getSchema()->getNumGroups() == 1 && !plan.getSchema()->getGroup(0)->isFlat()) { + auto schema = plan.getSchema(); + if (schema->getGroupsPosInScope().size() == 1) { break; } // Copy operator assumes all input are in the same data chunk. If this is not the case, diff --git a/src/storage/buffer_manager/buffer_manager.cpp b/src/storage/buffer_manager/buffer_manager.cpp index e722d7776e..ce22ecac9d 100644 --- a/src/storage/buffer_manager/buffer_manager.cpp +++ b/src/storage/buffer_manager/buffer_manager.cpp @@ -284,6 +284,13 @@ uint64_t BufferManager::evictPages() { if (!evictionCandidate.isEvictable(pageStateAndVersion)) { if (evictionCandidate.isSecondChanceEvictable(pageStateAndVersion)) { pageState->tryMark(pageStateAndVersion); + } else if (evictionCandidate.isEvicted(pageStateAndVersion)) { + // Remove evicted candidate from queue. Lock page before clearing to avoid + // data races with other threads that might re-add or evict the same slot. + if (pageState->tryLock(pageStateAndVersion)) { + evictionQueue.clear(candidate); + pageState->unlock(); + } } continue; } diff --git a/src/storage/disk_array_collection.cpp b/src/storage/disk_array_collection.cpp index 80a67ffc96..c8ed040adf 100644 --- a/src/storage/disk_array_collection.cpp +++ b/src/storage/disk_array_collection.cpp @@ -1,5 +1,8 @@ #include "storage/disk_array_collection.h" +#include + +#include "common/exception/runtime.h" #include "common/system_config.h" #include "common/types/types.h" #include "storage/file_handle.h" @@ -23,16 +26,23 @@ DiskArrayCollection::DiskArrayCollection(FileHandle& fileHandle, ShadowFile& sha page_idx_t firstHeaderPage, bool bypassShadowing) : fileHandle(fileHandle), shadowFile{shadowFile}, bypassShadowing{bypassShadowing}, numHeaders{0} { - // Read headers from disk + // Read headers from disk (no external state in lambda: optimistic read may run multiple times) page_idx_t headerPageIdx = firstHeaderPage; do { + std::unique_ptr headerPage; + page_idx_t nextHeaderPageIdx = INVALID_PAGE_IDX; fileHandle.optimisticReadPage(headerPageIdx, [&](auto* frame) { const auto page = reinterpret_cast(frame); - headersForReadTrx.push_back(std::make_unique(*page)); - headersForWriteTrx.push_back(std::make_unique(*page)); - headerPageIdx = page->nextHeaderPage; - numHeaders += page->numHeaders; + headerPage = std::make_unique(*page); + nextHeaderPageIdx = page->nextHeaderPage; }); + if (!headerPage) { + throw RuntimeException("Failed to read header page from disk."); + } + numHeaders += headerPage->numHeaders; + headersForReadTrx.push_back(std::make_unique(*headerPage)); + headersForWriteTrx.push_back(std::move(headerPage)); + headerPageIdx = nextHeaderPageIdx; } while (headerPageIdx != INVALID_PAGE_IDX); headerPagesOnDisk = headersForReadTrx.size(); } diff --git a/src/storage/index/hash_index.cpp b/src/storage/index/hash_index.cpp index cc76445a69..b6f35f89c0 100644 --- a/src/storage/index/hash_index.cpp +++ b/src/storage/index/hash_index.cpp @@ -490,21 +490,19 @@ PrimaryKeyIndex::PrimaryKeyIndex(IndexInfo indexInfo, std::unique_ptraddDiskArray(); } } else { - size_t headerIdx = 0; for (size_t headerPageIdx = 0; headerPageIdx < INDEX_HEADER_PAGES; headerPageIdx++) { + size_t startHeaderIdx = headerPageIdx * INDEX_HEADERS_PER_PAGE; pageAllocator.getDataFH()->optimisticReadPage( hashIndexStorageInfo.firstHeaderPage + headerPageIdx, [&](auto* frame) { const auto onDiskHeaders = reinterpret_cast(frame); - for (size_t i = 0; i < INDEX_HEADERS_PER_PAGE && headerIdx < NUM_HASH_INDEXES; - i++) { + for (size_t i = 0; + i < INDEX_HEADERS_PER_PAGE && startHeaderIdx + i < NUM_HASH_INDEXES; i++) { hashIndexHeadersForReadTrx.emplace_back(onDiskHeaders[i]); - headerIdx++; } }); } hashIndexHeadersForWriteTrx.assign(hashIndexHeadersForReadTrx.begin(), hashIndexHeadersForReadTrx.end()); - KU_ASSERT(headerIdx == NUM_HASH_INDEXES); hashIndexDiskArrays = std::make_unique(*pageAllocator.getDataFH(), *shadowFile, hashIndexStorageInfo.firstHeaderPage + diff --git a/test/test_files/unwind/unwind_mixed.test b/test/test_files/unwind/unwind_mixed.test new file mode 100644 index 0000000000..9e512e6a8c --- /dev/null +++ b/test/test_files/unwind/unwind_mixed.test @@ -0,0 +1,11 @@ +-DATASET CSV EMPTY + +-- + +-CASE UnwindMixedTypes +-STATEMENT UNWIND [1, 'hello', true, 3.14, null] AS x RETURN x +---- 5 +1 +hello +True +3.140000