Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/include/storage/buffer_manager/buffer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ struct EvictionCandidate {
static bool isSecondChanceEvictable(uint64_t currPageStateAndVersion) {
return PageState::getState(currPageStateAndVersion) == PageState::UNLOCKED;
}
static bool isEvicted(uint64_t currPageStateAndVersion) {
return PageState::getState(currPageStateAndVersion) == PageState::EVICTED;
}

bool operator==(const EvictionCandidate& other) const {
return fileIdx == other.fileIdx && pageIdx == other.pageIdx;
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/buffer_manager/page_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class PageState {
}
void spinLock(uint64_t oldStateAndVersion) {
while (true) {
if (tryLock(oldStateAndVersion)) {
if (stateAndVersion.compare_exchange_strong(oldStateAndVersion,
updateStateWithSameVersion(oldStateAndVersion, LOCKED))) {
return;
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/planner/plan/plan_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ LogicalPlan Planner::planCopyRelFrom(const BoundCopyFromInfo* info) {
case ScanSourceType::QUERY: {
auto& querySource = info->source->constCast<BoundQueryScanSource>();
plan = planQuery(*querySource.statement);
if (plan.getSchema()->getNumGroups() == 1 && !plan.getSchema()->getGroup(0)->isFlat()) {
auto schema = plan.getSchema();
if (schema->getGroupsPosInScope().size() == 1) {
break;
}
// Copy operator assumes all input are in the same data chunk. If this is not the case,
Expand Down
7 changes: 7 additions & 0 deletions src/storage/buffer_manager/buffer_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,13 @@ uint64_t BufferManager::evictPages() {
if (!evictionCandidate.isEvictable(pageStateAndVersion)) {
if (evictionCandidate.isSecondChanceEvictable(pageStateAndVersion)) {
pageState->tryMark(pageStateAndVersion);
} else if (evictionCandidate.isEvicted(pageStateAndVersion)) {
// Remove evicted candidate from queue. Lock page before clearing to avoid
// data races with other threads that might re-add or evict the same slot.
if (pageState->tryLock(pageStateAndVersion)) {
evictionQueue.clear(candidate);
pageState->unlock();
}
}
continue;
}
Expand Down
20 changes: 15 additions & 5 deletions src/storage/disk_array_collection.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#include "storage/disk_array_collection.h"

#include <memory>

#include "common/exception/runtime.h"
#include "common/system_config.h"
#include "common/types/types.h"
#include "storage/file_handle.h"
Expand All @@ -23,16 +26,23 @@ DiskArrayCollection::DiskArrayCollection(FileHandle& fileHandle, ShadowFile& sha
page_idx_t firstHeaderPage, bool bypassShadowing)
: fileHandle(fileHandle), shadowFile{shadowFile}, bypassShadowing{bypassShadowing},
numHeaders{0} {
// Read headers from disk
// Read headers from disk (no external state in lambda: optimistic read may run multiple times)
page_idx_t headerPageIdx = firstHeaderPage;
do {
std::unique_ptr<HeaderPage> headerPage;
page_idx_t nextHeaderPageIdx = INVALID_PAGE_IDX;
fileHandle.optimisticReadPage(headerPageIdx, [&](auto* frame) {
const auto page = reinterpret_cast<HeaderPage*>(frame);
headersForReadTrx.push_back(std::make_unique<HeaderPage>(*page));
headersForWriteTrx.push_back(std::make_unique<HeaderPage>(*page));
headerPageIdx = page->nextHeaderPage;
numHeaders += page->numHeaders;
headerPage = std::make_unique<HeaderPage>(*page);
nextHeaderPageIdx = page->nextHeaderPage;
});
if (!headerPage) {
throw RuntimeException("Failed to read header page from disk.");
}
numHeaders += headerPage->numHeaders;
headersForReadTrx.push_back(std::make_unique<HeaderPage>(*headerPage));
headersForWriteTrx.push_back(std::move(headerPage));
headerPageIdx = nextHeaderPageIdx;
} while (headerPageIdx != INVALID_PAGE_IDX);
headerPagesOnDisk = headersForReadTrx.size();
}
Expand Down
8 changes: 3 additions & 5 deletions src/storage/index/hash_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -490,21 +490,19 @@ PrimaryKeyIndex::PrimaryKeyIndex(IndexInfo indexInfo, std::unique_ptr<IndexStora
hashIndexDiskArrays->addDiskArray();
}
} else {
size_t headerIdx = 0;
for (size_t headerPageIdx = 0; headerPageIdx < INDEX_HEADER_PAGES; headerPageIdx++) {
size_t startHeaderIdx = headerPageIdx * INDEX_HEADERS_PER_PAGE;
pageAllocator.getDataFH()->optimisticReadPage(
hashIndexStorageInfo.firstHeaderPage + headerPageIdx, [&](auto* frame) {
const auto onDiskHeaders = reinterpret_cast<HashIndexHeaderOnDisk*>(frame);
for (size_t i = 0; i < INDEX_HEADERS_PER_PAGE && headerIdx < NUM_HASH_INDEXES;
i++) {
for (size_t i = 0;
i < INDEX_HEADERS_PER_PAGE && startHeaderIdx + i < NUM_HASH_INDEXES; i++) {
hashIndexHeadersForReadTrx.emplace_back(onDiskHeaders[i]);
headerIdx++;
}
});
}
hashIndexHeadersForWriteTrx.assign(hashIndexHeadersForReadTrx.begin(),
hashIndexHeadersForReadTrx.end());
KU_ASSERT(headerIdx == NUM_HASH_INDEXES);
hashIndexDiskArrays = std::make_unique<DiskArrayCollection>(*pageAllocator.getDataFH(),
*shadowFile,
hashIndexStorageInfo.firstHeaderPage +
Expand Down
11 changes: 11 additions & 0 deletions test/test_files/unwind/unwind_mixed.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-DATASET CSV EMPTY

--

-CASE UnwindMixedTypes
-STATEMENT UNWIND [1, 'hello', true, 3.14, null] AS x RETURN x
---- 5
1
hello
True
3.140000
Loading