-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbuffer_pool.cpp
More file actions
78 lines (65 loc) · 1.81 KB
/
buffer_pool.cpp
File metadata and controls
78 lines (65 loc) · 1.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#include "buffer_pool.hpp"
#include <chrono>
#include <cstring>
#include <fcntl.h>
#include <iostream>
#include <unistd.h>
using std::shared_lock;
using std::shared_mutex;
namespace cache {
std::unique_ptr<BufferPool> BufferPool::New(const std::string& path,
BufferPoolConfig config) {
int fd = open(path.c_str(), O_CREAT | O_RDWR, 0600);
if (fd < 0) return nullptr;
auto bp = std::make_unique<BufferPool>();
bp->fd_ = fd;
bp->nShards_ = config.nShards;
bp->shards_.resize(config.nShards);
for (int i = 0; i < config.nShards; ++i) {
bp->shards_[i] = std::make_unique<Shard>(fd, config.maxEntriesPerShard);
}
bp->startFlusher();
return bp;
}
bool BufferPool::Put(const PageBuf& buf, int offset) {
size_t shardId = hash(offset) % nShards_;
return shards_[shardId]->Put(offset, buf);
}
bool BufferPool::Get(PageBuf& buf, int offset) {
size_t shardId = hash(offset) % nShards_;
Entry* e = shards_[shardId]->Get(offset);
if (!e) return false;
shared_lock<shared_mutex> lock(e->mu);
std::memcpy(buf.data(), e->val.data(), pageSize);
return true;
}
void BufferPool::startFlusher() {
flusherThread_ = std::thread([this] {
flusher();
});
}
void BufferPool::stopFlusher() {
stopFlusher_ = true;
if (flusherThread_.joinable()) flusherThread_.join();
}
BufferPool::~BufferPool() {
stopFlusher();
if (fd_ >= 0) close(fd_);
}
unsigned BufferPool::hash(int key) {
unsigned u = static_cast<unsigned>(key);
u = u * 2654435761u;
return u;
}
void BufferPool::flusher() {
while (!stopFlusher_) {
std::this_thread::sleep_for(std::chrono::seconds(5));
if (stopFlusher_) break;
for (int i = 0; i < nShards_; ++i) {
if (!shards_[i]->Flush()) {
std::cerr << "Flush failed for shard " << i << std::endl;
}
}
}
}
}