diff --git a/db/db_impl.cc b/db/db_impl.cc index 1b534325..afeae40f 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -196,7 +196,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) throttle_end(0), running_compactions_(0), block_size_changed_(0), last_low_mem_(0), - hotbackup_pending_(false) + hotbackup_pending_(false), + non_block_tickets_(0), last_penalty_(0), est_mem_usage_(0) { current_block_size_=options_.block_size; @@ -1816,7 +1817,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } // May temporarily unlock and wait. - status = MakeRoomForWrite(my_batch == NULL); + if (0==add_and_fetch(&non_block_tickets_, (uint32_t)0)) + status = MakeRoomForWrite(my_batch == NULL); + uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; if (status.ok() && my_batch != NULL) { // NULL batch is for compactions @@ -1842,6 +1845,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { if (updates == tmp_batch_) tmp_batch_->Clear(); versions_->SetLastSequence(last_sequence); + + // cache this value while mutex held + est_mem_usage_=mem_->ApproximateMemoryUsage(); } while (true) { @@ -1863,7 +1869,19 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { gPerfCounters->Inc(ePerfApiWrite); // protect use of versions_ ... still within scope of mutex_ lock - throttle=versions_->WriteThrottleUsec(IsCompactionScheduled()); + if (0==add_and_fetch(&non_block_tickets_, (uint32_t)0)) + { + throttle=versions_->WriteThrottleUsec(IsCompactionScheduled()); + gPerfCounters->Inc(ePerfDebug0); + } // if + else + { + // only reduce count for operations that were marked + if (options.non_blocking) + dec_and_fetch(&non_block_tickets_); + throttle=0; + gPerfCounters->Inc(ePerfDebug1); + } // else } // release MutexLock l(&mutex_) @@ -1975,6 +1993,34 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { return result; } + +/** + * Test 3 biggest things that cause a thread to block + * or delay. There are other reasons in MakeRoomForWrite() + * but those are ignored since penalty and such will reflect them. + */ +bool +DBImpl::RequestNonBlockTicket() +{ + bool ret_flag(false); + + // assume ticket succeeds + inc_and_fetch(&non_block_tickets_); + + // est_mem_usage_ is last known mem_->ApproximateMemoryUsage(), write_buffer_size is constant + ret_flag = (est_mem_usage_ <= options_.write_buffer_size); + ret_flag = ret_flag && 1==GetThrottleWriteRate(); + ret_flag = ret_flag && 0==GetLastPenalty(); + + // remove ticket + if (!ret_flag) + dec_and_fetch(&non_block_tickets_); + + return(ret_flag); + +} // DBImpl::RequestNonBlockTicket + + // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue Status DBImpl::MakeRoomForWrite(bool force) { @@ -1983,7 +2029,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { bool allow_delay = !force; Status s; - while (true) { + while (0==add_and_fetch(&non_block_tickets_, (uint32_t)0)) { if (!bg_error_.ok()) { // Yield previous error gPerfCounters->Inc(ePerfWriteError); diff --git a/db/db_impl.h b/db/db_impl.h index 5e3976a3..f33fd22a 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -49,6 +49,7 @@ class DBImpl : public DB { virtual Status VerifyLevels(); virtual void CheckAvailableCompactions(); virtual Logger* GetLogger() const { return options_.info_log; } + virtual bool RequestNonBlockTicket(); // Extra methods (for testing) that are not in the public DB interface @@ -88,7 +89,12 @@ class DBImpl : public DB { bool IsCompactionScheduled(); uint32_t RunningCompactionCount() {mutex_.AssertHeld(); return(running_compactions_);}; - protected: + // memory barrier set/retrieval of last_penalty_ + uint64_t GetLastPenalty() {return(add_and_fetch(&last_penalty_, (uint64_t)0));}; + void SetLastPenalty(uint64_t NewPenalty) + {compare_and_swap(&last_penalty_, (uint64_t)last_penalty_, NewPenalty);}; + +protected: friend class DB; struct CompactionState; struct Writer; @@ -223,13 +229,15 @@ class DBImpl : public DB { volatile size_t current_block_size_; // last dynamic block size computed volatile uint64_t block_size_changed_; // NowMicros() when block size computed volatile uint64_t last_low_mem_; // NowMicros() when low memory last seen + volatile bool hotbackup_pending_; // true if hotbackup cycle initiated, blocks close + volatile uint32_t non_block_tickets_; // how many non-blocking writes promised? + volatile uint64_t est_mem_usage_; // cached mem->ApproximateMemoryUsage() + volatile uint64_t last_penalty_; // most recent Version->penalty_ value seen // accessor to new, dynamic block_cache Cache * block_cache() {return(double_cache.GetBlockCache());}; Cache * file_cache() {return(double_cache.GetFileCache());}; - volatile bool hotbackup_pending_; - // No copying allowed DBImpl(const DBImpl&); void operator=(const DBImpl&); diff --git a/db/version_set.cc b/db/version_set.cc index 1c5d5422..e7efdb2e 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1189,11 +1189,11 @@ VersionSet::Finalize(Version* v) // only occurs if no other compactions running on groomer thread // (no grooming if landing level is still overloaded) if (0==score && grooming_trigger<=v->files_[level].size() - && (uint64_t)TotalFileSize(v->files_[config::kNumOverlapLevels]) + && (uint64_t)TotalFileSize(v->files_[config::kNumOverlapLevels]) < gLevelTraits[config::kNumOverlapLevels].m_DesiredBytesForLevel) { // secondary test, don't push too much to next Overlap too soon - if (!gLevelTraits[level+1].m_OverlappedFiles + if (!gLevelTraits[level+1].m_OverlappedFiles || v->files_[level+1].size()<=config::kL0_CompactionTrigger) { score=1; @@ -1300,7 +1300,7 @@ VersionSet::UpdatePenalty( for (int level = 0; level < config::kNumLevels-1; ++level) { int loop, count, value, increment; - + value=0; count=0; increment=0; @@ -1660,6 +1660,7 @@ VersionSet::PickCompaction( // perform this once per call ... since Finalize now loops UpdatePenalty(current_); + db_impl->SetLastPenalty(current_->write_penalty_); // submit a work object for every valid compaction needed current_->compaction_level_=-1; @@ -1669,7 +1670,7 @@ VersionSet::PickCompaction( Log(options_->info_log,"Finalize level: %d, grooming %d", current_->compaction_level_, current_->compaction_grooming_); - + c=NULL; // We prefer compactions triggered by too much data in a level over diff --git a/include/leveldb/db.h b/include/leveldb/db.h index d2bd6dce..1510e79e 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -171,6 +171,12 @@ class DB { // eleveldb, access to leveldb's logging routines. virtual Logger* GetLogger() const { return NULL; } + // Riak specific function: Request a "ticket" that allows + // bypass of most wait / blocking activity in a Write() call. + // Used by eleveldb to potential skip a thread switch prior + // to leveldb call. Returned bool set in WriteOptions::non_blocking. + virtual bool RequestNonBlockTicket() {return(false);}; + private: // No copying allowed DB(const DB&); diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 4032129c..d8a5ee29 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -352,8 +352,18 @@ struct WriteOptions { // Default: false bool sync; + // This is used in conjunction with Erlang eleveldb driver. + // Not useful outside that context. If write is known to + // block, for any reason, eleveldb must switch to non-scheduler + // thread. This flag informs leveldb that eleveldb received + // a "ticket" to guarantee it would not block, throttle, etc. + // The guarantee is not 100%, but sufficient. + // + // Default: false + bool non_blocking; + WriteOptions() - : sync(false) { + : sync(false), non_blocking(false) { } }; diff --git a/util/throttle.cc b/util/throttle.cc index f5ce3492..8c859f30 100644 --- a/util/throttle.cc +++ b/util/throttle.cc @@ -71,7 +71,7 @@ struct ThrottleData_t // most recent interval statistics for last hour. ThrottleData_t gThrottleData[THROTTLE_INTERVALS]; -uint64_t gThrottleRate, gUnadjustedThrottleRate; +volatile uint64_t gThrottleRate, gUnadjustedThrottleRate; static volatile bool gThrottleRunning=false; static pthread_t gThrottleThreadId; @@ -111,6 +111,7 @@ ThrottleThread( uint64_t tot_micros, tot_keys, tot_backlog, tot_compact; int replace_idx, loop, ret_val; uint64_t new_throttle, new_unadjusted; + int64_t temp_throttle; time_t now_seconds, cache_expire; struct timespec wait_time; @@ -238,18 +239,25 @@ ThrottleThread( } // else // change the throttle slowly + // (+1 & +2 keep throttle moving toward goal when difference new and + // old is less than THROTTLE_SCALING) + temp_throttle=gThrottleRate; + if (gThrottleRate < new_throttle) - gThrottleRate+=(new_throttle - gThrottleRate)/THROTTLE_SCALING; + temp_throttle+=(new_throttle - gThrottleRate)/THROTTLE_SCALING +1; else - gThrottleRate-=(gThrottleRate - new_throttle)/THROTTLE_SCALING; + temp_throttle-=(gThrottleRate - new_throttle)/THROTTLE_SCALING +2; - if (0==gThrottleRate) - gThrottleRate=1; // throttle must always have an effect + // +2 can make this go negative + if (temp_throttle<1) + temp_throttle=1; // throttle must always have an effect - gUnadjustedThrottleRate=new_unadjusted; + // use sync operation to force full barrier assignment + compare_and_swap(&gThrottleRate, (uint64_t)gThrottleRate, (uint64_t)temp_throttle); + compare_and_swap(&gUnadjustedThrottleRate, (uint64_t)gUnadjustedThrottleRate, new_unadjusted); // Log(NULL, "ThrottleRate %" PRIu64 ", UnadjustedThrottleRate %" PRIu64, gThrottleRate, gUnadjustedThrottleRate); - + gPerfCounters->Set(ePerfThrottleGauge, gThrottleRate); gPerfCounters->Add(ePerfThrottleCounter, gThrottleRate*THROTTLE_SECONDS); gPerfCounters->Set(ePerfThrottleUnadjusted, gUnadjustedThrottleRate); @@ -277,7 +285,7 @@ ThrottleThread( // This is a second non-throttle task added to this one minute loop. Pattern forming. // See if hot backup wants to initiate. // - CheckHotBackupTrigger(); + CheckHotBackupTrigger(); // nudge compaction logic of potential grooming if (0==gCompactionThreads->m_WorkQueueAtomic) // user databases @@ -331,8 +339,9 @@ void SetThrottleWriteRate(uint64_t Micros, uint64_t Keys, bool IsLevel0) return; }; -uint64_t GetThrottleWriteRate() {return(gThrottleRate);}; -uint64_t GetUnadjustedThrottleWriteRate() {return(gUnadjustedThrottleRate);}; +// add_and_fetch() used to force memory barrier operations +uint64_t GetThrottleWriteRate() {return(add_and_fetch(&gThrottleRate, (uint64_t)0));}; +uint64_t GetUnadjustedThrottleWriteRate() {return(add_and_fetch(&gUnadjustedThrottleRate,(uint64_t)0));}; // clock_gettime but only updated once every 60 seconds (roughly) uint64_t GetTimeMinutes() {return(gCurrentTime);};