Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/client/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4302,7 +4302,7 @@ int Client::mds_command(
// Construct and send MCommand
MCommand *m = new MCommand(monclient->get_fsid());
m->cmd = cmd;
m->set_data(inbl);
m->set_data(const_cast<bufferlist&>(inbl));
m->set_tid(tid);
conn->send_message(m);
}
Expand Down
33 changes: 27 additions & 6 deletions src/common/buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "common/safe_io.h"
#include "common/simple_spin.h"
#include "common/strtol.h"
#include "common/likely.h"
#include "include/atomic.h"
#include "common/Mutex.h"
#include "include/types.h"
Expand Down Expand Up @@ -161,6 +162,10 @@ static simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZER;
bool is_n_page_sized() {
return (len & ~CEPH_PAGE_MASK) == 0;
}
virtual bool is_volatile() {
// true if unsafe to claim-hold due to, e.g., special registration
return false;
}
bool get_crc(const pair<size_t, size_t> &fromto,
pair<uint32_t, uint32_t> *crc) const {
Mutex::Locker l(crc_lock);
Expand Down Expand Up @@ -602,6 +607,18 @@ static simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZER;
return _raw->clone();
}

buffer::ptr& buffer::ptr::strong_claim() {
if (_raw && _raw->is_volatile()) {
buffer::raw *tr = _raw;
_raw = tr->clone();
_raw->nref.set(1);
if (unlikely(tr->nref.dec() == 0)) {
delete tr;
}
}
return *this;
}

void buffer::ptr::swap(ptr& other)
{
raw *r = _raw;
Expand Down Expand Up @@ -1164,27 +1181,31 @@ void buffer::list::rebuild_page_aligned()
}

// sort-of-like-assignment-op
void buffer::list::claim(list& bl)
void buffer::list::claim(list& bl, bool strong)
{
// free my buffers
clear();
claim_append(bl);
claim_append(bl, strong);
}

void buffer::list::claim_append(list& bl)
void buffer::list::claim_append(list& bl, bool strong)
{
// steal the other guy's buffers
_len += bl._len;
_buffers.splice( _buffers.end(), bl._buffers );
if (strong)
bl.strong_claim_inplace();
_buffers.splice(_buffers.end(), bl._buffers );
bl._len = 0;
bl.last_p = bl.begin();
}

void buffer::list::claim_prepend(list& bl)
void buffer::list::claim_prepend(list& bl, bool strong)
{
// steal the other guy's buffers
_len += bl._len;
_buffers.splice( _buffers.begin(), bl._buffers );
if (strong)
bl.strong_claim_inplace();
_buffers.splice(_buffers.begin(), bl._buffers );
bl._len = 0;
bl.last_p = bl.begin();
}
Expand Down
38 changes: 32 additions & 6 deletions src/include/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ class buffer {

raw *clone();
void swap(ptr& other);
ptr& strong_claim();

// misc
bool at_buffer_head() const { return _off == 0; }
Expand Down Expand Up @@ -323,12 +324,17 @@ class buffer {
append_buffer.set_length(0); // unused, so far.
}
~list() {}

list(const list& other) : _buffers(other._buffers), _len(other._len), _memcopy_count(other._memcopy_count),last_p(this) { }
list(const list& other) : _buffers(other._buffers), _len(other._len),
_memcopy_count(other._memcopy_count), last_p(this) {
// make strong-claim semantics the unmarked case
strong_claim_inplace();
}
list& operator= (const list& other) {
if (this != &other) {
_buffers = other._buffers;
_len = other._len;
// make strong-claim semantics the unmarked case
strong_claim_inplace();
}
return *this;
}
Expand Down Expand Up @@ -396,10 +402,30 @@ class buffer {
void rebuild_aligned(unsigned align);
void rebuild_page_aligned();

// sort-of-like-assignment-op
void claim(list& bl);
void claim_append(list& bl);
void claim_prepend(list& bl);
// assignment-op with move semantics
void claim(list& bl, bool strong=true);
void claim_append(list& bl, bool strong=true);
void claim_prepend(list& bl, bool strong=true);

// non-destructively replace volatile buffers
void strong_claim_inplace() {
std::list<buffer::ptr>::iterator pb;
for (pb = _buffers.begin(); pb != _buffers.end(); ++pb) {
(void) pb->strong_claim();
}
}

// copy with explicit volatile-sharing semantics
void share(list& bl)
{
if (this != &bl) {
clear();
std::list<buffer::ptr>::iterator pb;
for (pb = bl._buffers.begin(); pb != bl._buffers.end(); ++pb) {
push_back(*pb);
}
}
}

iterator begin() {
return iterator(this, 0);
Expand Down
12 changes: 6 additions & 6 deletions src/msg/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,33 +314,33 @@ class Message : public RefCountedObject {
void set_payload(bufferlist& bl) {
if (byte_throttler)
byte_throttler->put(payload.length());
payload.claim(bl);
payload.claim(bl, false /* !strong */);
if (byte_throttler)
byte_throttler->take(payload.length());
}

void set_middle(bufferlist& bl) {
if (byte_throttler)
byte_throttler->put(payload.length());
middle.claim(bl);
middle.claim(bl, false /* !strong */);
if (byte_throttler)
byte_throttler->take(payload.length());
}
bufferlist& get_middle() { return middle; }

void set_data(const bufferlist &d) {
void set_data(bufferlist &bl) {
if (byte_throttler)
byte_throttler->put(data.length());
data = d;
data.claim(bl, false /* !strong */);
if (byte_throttler)
byte_throttler->take(data.length());
}

bufferlist& get_data() { return data; }
void claim_data(bufferlist& bl) {
void claim_data(bufferlist& bl, bool strong=true) {
if (byte_throttler)
byte_throttler->put(data.length());
bl.claim(data);
bl.claim(data, strong);
}
off_t get_data_len() { return data.length(); }

Expand Down
2 changes: 1 addition & 1 deletion src/os/FileJournal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ int FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, uint64
bufferptr bp = buffer::create_static(pre_pad, zero_buf);
bl.push_back(bp);
}
bl.claim_append(ebl);
bl.claim_append(ebl, false /* ! strong */); // potential zero-copy

if (h.post_pad) {
bufferptr bp = buffer::create_static(post_pad, zero_buf);
Expand Down
2 changes: 1 addition & 1 deletion src/os/FileJournal.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class FileJournal : public Journal {
TrackedOpRef tracked_op;
write_item(uint64_t s, bufferlist& b, int al, TrackedOpRef opref) :
seq(s), alignment(al), tracked_op(opref) {
bl.claim(b);
bl.claim(b, false /* !strong */); // potential zero-copy
}
write_item() : seq(0), alignment(0) {}
};
Expand Down