Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 50 additions & 4 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
throttle_end(0),
running_compactions_(0),
block_size_changed_(0), last_low_mem_(0),
hotbackup_pending_(false)
hotbackup_pending_(false),
non_block_tickets_(0), last_penalty_(0), est_mem_usage_(0)
{
current_block_size_=options_.block_size;

Expand Down Expand Up @@ -1816,7 +1817,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
}

// May temporarily unlock and wait.
status = MakeRoomForWrite(my_batch == NULL);
if (0==add_and_fetch(&non_block_tickets_, (uint32_t)0))
status = MakeRoomForWrite(my_batch == NULL);

uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
if (status.ok() && my_batch != NULL) { // NULL batch is for compactions
Expand All @@ -1842,6 +1845,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
if (updates == tmp_batch_) tmp_batch_->Clear();

versions_->SetLastSequence(last_sequence);

// cache this value while mutex held
est_mem_usage_=mem_->ApproximateMemoryUsage();
}

while (true) {
Expand All @@ -1863,7 +1869,19 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
gPerfCounters->Inc(ePerfApiWrite);

// protect use of versions_ ... still within scope of mutex_ lock
throttle=versions_->WriteThrottleUsec(IsCompactionScheduled());
if (0==add_and_fetch(&non_block_tickets_, (uint32_t)0))
{
throttle=versions_->WriteThrottleUsec(IsCompactionScheduled());
gPerfCounters->Inc(ePerfDebug0);
} // if
else
{
// only reduce count for operations that were marked
if (options.non_blocking)
dec_and_fetch(&non_block_tickets_);
throttle=0;
gPerfCounters->Inc(ePerfDebug1);
} // else
} // release MutexLock l(&mutex_)


Expand Down Expand Up @@ -1975,6 +1993,34 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
return result;
}


/**
* Test 3 biggest things that cause a thread to block
* or delay. There are other reasons in MakeRoomForWrite()
* but those are ignored since penalty and such will reflect them.
*/
bool
DBImpl::RequestNonBlockTicket()
{
bool ret_flag(false);

// assume ticket succeeds
inc_and_fetch(&non_block_tickets_);

// est_mem_usage_ is last known mem_->ApproximateMemoryUsage(), write_buffer_size is constant
ret_flag = (est_mem_usage_ <= options_.write_buffer_size);
ret_flag = ret_flag && 1==GetThrottleWriteRate();
ret_flag = ret_flag && 0==GetLastPenalty();

// remove ticket
if (!ret_flag)
dec_and_fetch(&non_block_tickets_);

return(ret_flag);

} // DBImpl::RequestNonBlockTicket


// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::MakeRoomForWrite(bool force) {
Expand All @@ -1983,7 +2029,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
bool allow_delay = !force;
Status s;

while (true) {
while (0==add_and_fetch(&non_block_tickets_, (uint32_t)0)) {
if (!bg_error_.ok()) {
// Yield previous error
gPerfCounters->Inc(ePerfWriteError);
Expand Down
14 changes: 11 additions & 3 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class DBImpl : public DB {
virtual Status VerifyLevels();
virtual void CheckAvailableCompactions();
virtual Logger* GetLogger() const { return options_.info_log; }
virtual bool RequestNonBlockTicket();

// Extra methods (for testing) that are not in the public DB interface

Expand Down Expand Up @@ -88,7 +89,12 @@ class DBImpl : public DB {
bool IsCompactionScheduled();
uint32_t RunningCompactionCount() {mutex_.AssertHeld(); return(running_compactions_);};

protected:
// memory barrier set/retrieval of last_penalty_
uint64_t GetLastPenalty() {return(add_and_fetch(&last_penalty_, (uint64_t)0));};
void SetLastPenalty(uint64_t NewPenalty)
{compare_and_swap(&last_penalty_, (uint64_t)last_penalty_, NewPenalty);};

protected:
friend class DB;
struct CompactionState;
struct Writer;
Expand Down Expand Up @@ -223,13 +229,15 @@ class DBImpl : public DB {
volatile size_t current_block_size_; // last dynamic block size computed
volatile uint64_t block_size_changed_; // NowMicros() when block size computed
volatile uint64_t last_low_mem_; // NowMicros() when low memory last seen
volatile bool hotbackup_pending_; // true if hotbackup cycle initiated, blocks close
volatile uint32_t non_block_tickets_; // how many non-blocking writes promised?
volatile uint64_t est_mem_usage_; // cached mem->ApproximateMemoryUsage()
volatile uint64_t last_penalty_; // most recent Version->penalty_ value seen

// accessor to new, dynamic block_cache
Cache * block_cache() {return(double_cache.GetBlockCache());};
Cache * file_cache() {return(double_cache.GetFileCache());};

volatile bool hotbackup_pending_;

// No copying allowed
DBImpl(const DBImpl&);
void operator=(const DBImpl&);
Expand Down
9 changes: 5 additions & 4 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1189,11 +1189,11 @@ VersionSet::Finalize(Version* v)
// only occurs if no other compactions running on groomer thread
// (no grooming if landing level is still overloaded)
if (0==score && grooming_trigger<=v->files_[level].size()
&& (uint64_t)TotalFileSize(v->files_[config::kNumOverlapLevels])
&& (uint64_t)TotalFileSize(v->files_[config::kNumOverlapLevels])
< gLevelTraits[config::kNumOverlapLevels].m_DesiredBytesForLevel)
{
// secondary test, don't push too much to next Overlap too soon
if (!gLevelTraits[level+1].m_OverlappedFiles
if (!gLevelTraits[level+1].m_OverlappedFiles
|| v->files_[level+1].size()<=config::kL0_CompactionTrigger)
{
score=1;
Expand Down Expand Up @@ -1300,7 +1300,7 @@ VersionSet::UpdatePenalty(
for (int level = 0; level < config::kNumLevels-1; ++level)
{
int loop, count, value, increment;

value=0;
count=0;
increment=0;
Expand Down Expand Up @@ -1660,6 +1660,7 @@ VersionSet::PickCompaction(

// perform this once per call ... since Finalize now loops
UpdatePenalty(current_);
db_impl->SetLastPenalty(current_->write_penalty_);

// submit a work object for every valid compaction needed
current_->compaction_level_=-1;
Expand All @@ -1669,7 +1670,7 @@ VersionSet::PickCompaction(

Log(options_->info_log,"Finalize level: %d, grooming %d",
current_->compaction_level_, current_->compaction_grooming_);

c=NULL;

// We prefer compactions triggered by too much data in a level over
Expand Down
6 changes: 6 additions & 0 deletions include/leveldb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ class DB {
// eleveldb, access to leveldb's logging routines.
virtual Logger* GetLogger() const { return NULL; }

// Riak specific function: Request a "ticket" that allows
// bypass of most wait / blocking activity in a Write() call.
// Used by eleveldb to potential skip a thread switch prior
// to leveldb call. Returned bool set in WriteOptions::non_blocking.
virtual bool RequestNonBlockTicket() {return(false);};

private:
// No copying allowed
DB(const DB&);
Expand Down
12 changes: 11 additions & 1 deletion include/leveldb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,18 @@ struct WriteOptions {
// Default: false
bool sync;

// This is used in conjunction with Erlang eleveldb driver.
// Not useful outside that context. If write is known to
// block, for any reason, eleveldb must switch to non-scheduler
// thread. This flag informs leveldb that eleveldb received
// a "ticket" to guarantee it would not block, throttle, etc.
// The guarantee is not 100%, but sufficient.
//
// Default: false
bool non_blocking;

WriteOptions()
: sync(false) {
: sync(false), non_blocking(false) {
}
};

Expand Down
29 changes: 19 additions & 10 deletions util/throttle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ struct ThrottleData_t
// most recent interval statistics for last hour.
ThrottleData_t gThrottleData[THROTTLE_INTERVALS];

uint64_t gThrottleRate, gUnadjustedThrottleRate;
volatile uint64_t gThrottleRate, gUnadjustedThrottleRate;

static volatile bool gThrottleRunning=false;
static pthread_t gThrottleThreadId;
Expand Down Expand Up @@ -111,6 +111,7 @@ ThrottleThread(
uint64_t tot_micros, tot_keys, tot_backlog, tot_compact;
int replace_idx, loop, ret_val;
uint64_t new_throttle, new_unadjusted;
int64_t temp_throttle;
time_t now_seconds, cache_expire;
struct timespec wait_time;

Expand Down Expand Up @@ -238,18 +239,25 @@ ThrottleThread(
} // else

// change the throttle slowly
// (+1 & +2 keep throttle moving toward goal when difference new and
// old is less than THROTTLE_SCALING)
temp_throttle=gThrottleRate;

if (gThrottleRate < new_throttle)
gThrottleRate+=(new_throttle - gThrottleRate)/THROTTLE_SCALING;
temp_throttle+=(new_throttle - gThrottleRate)/THROTTLE_SCALING +1;
else
gThrottleRate-=(gThrottleRate - new_throttle)/THROTTLE_SCALING;
temp_throttle-=(gThrottleRate - new_throttle)/THROTTLE_SCALING +2;

if (0==gThrottleRate)
gThrottleRate=1; // throttle must always have an effect
// +2 can make this go negative
if (temp_throttle<1)
temp_throttle=1; // throttle must always have an effect

gUnadjustedThrottleRate=new_unadjusted;
// use sync operation to force full barrier assignment
compare_and_swap(&gThrottleRate, (uint64_t)gThrottleRate, (uint64_t)temp_throttle);
compare_and_swap(&gUnadjustedThrottleRate, (uint64_t)gUnadjustedThrottleRate, new_unadjusted);

// Log(NULL, "ThrottleRate %" PRIu64 ", UnadjustedThrottleRate %" PRIu64, gThrottleRate, gUnadjustedThrottleRate);

gPerfCounters->Set(ePerfThrottleGauge, gThrottleRate);
gPerfCounters->Add(ePerfThrottleCounter, gThrottleRate*THROTTLE_SECONDS);
gPerfCounters->Set(ePerfThrottleUnadjusted, gUnadjustedThrottleRate);
Expand Down Expand Up @@ -277,7 +285,7 @@ ThrottleThread(
// This is a second non-throttle task added to this one minute loop. Pattern forming.
// See if hot backup wants to initiate.
//
CheckHotBackupTrigger();
CheckHotBackupTrigger();

// nudge compaction logic of potential grooming
if (0==gCompactionThreads->m_WorkQueueAtomic) // user databases
Expand Down Expand Up @@ -331,8 +339,9 @@ void SetThrottleWriteRate(uint64_t Micros, uint64_t Keys, bool IsLevel0)
return;
};

uint64_t GetThrottleWriteRate() {return(gThrottleRate);};
uint64_t GetUnadjustedThrottleWriteRate() {return(gUnadjustedThrottleRate);};
// add_and_fetch() used to force memory barrier operations
uint64_t GetThrottleWriteRate() {return(add_and_fetch(&gThrottleRate, (uint64_t)0));};
uint64_t GetUnadjustedThrottleWriteRate() {return(add_and_fetch(&gUnadjustedThrottleRate,(uint64_t)0));};

// clock_gettime but only updated once every 60 seconds (roughly)
uint64_t GetTimeMinutes() {return(gCurrentTime);};
Expand Down