-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathshard.cpp
More file actions
103 lines (83 loc) · 2.3 KB
/
shard.cpp
File metadata and controls
103 lines (83 loc) · 2.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#include "shard.hpp"
#include <cstring>
#include <fcntl.h>
#include <mutex>
#include <shared_mutex>
#include <unistd.h>
#include <vector>
using std::shared_mutex;
using std::unique_lock;
using std::shared_lock;
namespace cache {
Shard::Shard(int fd, int maxEntriesPerShard)
: fd_(fd), maxEntriesPerShard_(maxEntriesPerShard) {}
bool Shard::Put(int key, const PageBuf& buf) {
Entry* ent = get(key, false);
if (!ent) return false;
unique_lock<shared_mutex> lock(ent->mu);
std::memcpy(ent->val.data(), buf.data(), pageSize);
ent->isDirty = true;
return true;
}
Entry* Shard::Get(int key) { return get(key, true); }
Entry* Shard::get(int key, bool loadFromFile) {
{
shared_lock<shared_mutex> lock(mu_);
auto it = m_.find(key);
if (it != m_.end()) return it->second.get();
}
{
unique_lock<shared_mutex> lock(mu_);
if (static_cast<int>(m_.size()) >= maxEntriesPerShard_) {
if (!evictOneLocked()) return nullptr;
}
}
auto e = std::make_unique<Entry>();
e->key = key;
e->isDirty = false;
if (loadFromFile) {
ssize_t n = pread(fd_, e->val.data(), pageSize, static_cast<off_t>(key));
if (n < 0) return nullptr;
if (n > 0 && n < static_cast<ssize_t>(pageSize))
return nullptr;
}
Entry* ptr = e.get();
unique_lock<shared_mutex> lock(mu_);
m_[key] = std::move(e);
return ptr;
}
bool Shard::Flush() {
std::vector<std::pair<int, Entry*>> dirty;
{
unique_lock<shared_mutex> lock(mu_);
for (auto& p : m_) {
Entry* e = p.second.get();
shared_lock<shared_mutex> entryLock(e->mu);
if (e->isDirty) dirty.emplace_back(p.first, e);
}
}
for (auto& p : dirty) {
int offset = p.first;
Entry* e = p.second;
unique_lock<shared_mutex> lock(e->mu);
if (!e->isDirty) continue;
ssize_t n = pwrite(fd_, e->val.data(), pageSize, static_cast<off_t>(offset));
if (n != static_cast<ssize_t>(pageSize)) continue;
e->isDirty = false;
}
return true;
}
bool Shard::evictOneLocked() {
for (auto it = m_.begin(); it != m_.end(); ++it) {
Entry* e = it->second.get();
if (e->isDirty) {
ssize_t n =
pwrite(fd_, e->val.data(), pageSize, static_cast<off_t>(it->first));
if (n != static_cast<ssize_t>(pageSize)) return false;
}
m_.erase(it);
return true;
}
return true;
}
}