From 07c95b6d20c18909ed4b86194b757b12e0b563e3 Mon Sep 17 00:00:00 2001 From: Matthew V Date: Thu, 23 Aug 2012 17:41:21 -0400 Subject: [PATCH 1/7] Adjust cache release logic when initial pointer has lowest timestamp but high ref count --- util/cache.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/util/cache.cc b/util/cache.cc index f922dc6d..a43e6022 100644 --- a/util/cache.cc +++ b/util/cache.cc @@ -325,7 +325,8 @@ Cache::Handle* LRUCache::Insert( for (cursor=lru_.next; cursor!=&lru_; cursor=cursor->next) { - if (timercmp(&cursor->last_access, &low_ptr->last_access, <) && cursor->refs <= 1) + if ((timercmp(&cursor->last_access, &low_ptr->last_access, <) && cursor->refs <= 1) + || cursor->refs < low_ptr->refs) low_ptr=cursor; } // for // removing item that still has active references is From e477211e9109a76fc630eeae883f60fcbeb84559 Mon Sep 17 00:00:00 2001 From: Matthew V Date: Thu, 6 Sep 2012 13:11:06 -0400 Subject: [PATCH 2/7] merge errors since this was cherry-picked from master --- db/builder.cc | 1 + db/db_impl.cc | 266 ++++++++++++++++++++++++++---------------- db/db_impl.h | 1 + db/version_edit.h | 5 +- db/version_set.cc | 20 +++- db/version_set.h | 33 +++++- include/leveldb/env.h | 5 +- util/env_posix.cc | 100 ++++++---------- 8 files changed, 260 insertions(+), 171 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index f4198821..591aa456 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -38,6 +38,7 @@ Status BuildTable(const std::string& dbname, Slice key = iter->key(); meta->largest.DecodeFrom(key); builder->Add(key, iter->value()); + ++meta->num_entries; } // Finish and check for builder errors diff --git a/db/db_impl.cc b/db/db_impl.cc index 3bba89da..4f0203a4 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -4,7 +4,9 @@ #include "db/db_impl.h" +#include #include +#include #include #include #include @@ -68,6 +70,7 @@ struct DBImpl::CompactionState { TableBuilder* builder; uint64_t total_bytes; + uint64_t num_entries; Output* current_output() { return &outputs[outputs.size()-1]; } @@ -75,7 +78,8 @@ struct DBImpl::CompactionState { : compaction(c), outfile(NULL), builder(NULL), - total_bytes(0) { + total_bytes(0), + num_entries(0) { } }; @@ -468,9 +472,10 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, mutex_.Lock(); } - Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s", + Log(options_.info_log, "Level-0 table #%llu: %llu bytes, %llu keys %s", (unsigned long long) meta.number, (unsigned long long) meta.file_size, + (unsigned long long) meta.num_entries, s.ToString().c_str()); delete iter; pending_outputs_.erase(meta.number); @@ -493,6 +498,13 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, stats.micros = env_->NowMicros() - start_micros; stats.bytes_written = meta.file_size; stats_[level].Add(stats); + + if (0!=meta.num_entries && s.ok()) + { + // 2x since mem to disk, not disk to disk + versions_->SetWriteRate(2*stats.micros/meta.num_entries); + } // if + return s; } @@ -667,90 +679,96 @@ void DBImpl::BackgroundCall() { bg_cv_.SignalAll(); } -void DBImpl::BackgroundCompaction() { +Status DBImpl::BackgroundCompaction() { + Status status; + bool is_manual = (manual_compaction_ != NULL); + InternalKey manual_end; + mutex_.AssertHeld(); if (imm_ != NULL) { pthread_rwlock_rdlock(&gThreadLock0); - CompactMemTable(); + status=CompactMemTable(); pthread_rwlock_unlock(&gThreadLock0); - return; } - Compaction* c; - bool is_manual = (manual_compaction_ != NULL); - InternalKey manual_end; - if (is_manual) { - ManualCompaction* m = manual_compaction_; - c = versions_->CompactRange(m->level, m->begin, m->end); - m->done = (c == NULL); - if (c != NULL) { - manual_end = c->input(0, c->num_input_files(0) - 1)->largest; - } - Log(options_.info_log, - "Manual compaction at level-%d from %s .. %s; will stop at %s\n", - m->level, - (m->begin ? m->begin->DebugString().c_str() : "(begin)"), - (m->end ? m->end->DebugString().c_str() : "(end)"), - (m->done ? "(end)" : manual_end.DebugString().c_str())); - } else { - c = versions_->PickCompaction(); - } + if (status.ok()) + { + Compaction* c; + if (is_manual) { + ManualCompaction* m = manual_compaction_; + c = versions_->CompactRange(m->level, m->begin, m->end); + m->done = (c == NULL); + if (c != NULL) { + manual_end = c->input(0, c->num_input_files(0) - 1)->largest; + } + Log(options_.info_log, + "Manual compaction at level-%d from %s .. %s; will stop at %s\n", + m->level, + (m->begin ? m->begin->DebugString().c_str() : "(begin)"), + (m->end ? m->end->DebugString().c_str() : "(end)"), + (m->done ? "(end)" : manual_end.DebugString().c_str())); + } else { + c = versions_->PickCompaction(); + } - Status status; - if (c == NULL) { - // Nothing to do - } else if (!is_manual && c->IsTrivialMove()) { - // Move file to next level - assert(c->num_input_files(0) == 1); - FileMetaData* f = c->input(0, 0); - c->edit()->DeleteFile(c->level(), f->number); - c->edit()->AddFile(c->level() + 1, f->number, f->file_size, - f->smallest, f->largest); - status = versions_->LogAndApply(c->edit(), &mutex_); - VersionSet::LevelSummaryStorage tmp; - Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", - static_cast(f->number), - c->level() + 1, - static_cast(f->file_size), - status.ToString().c_str(), - versions_->LevelSummary(&tmp)); - } else { - CompactionState* compact = new CompactionState(c); - status = DoCompactionWork(compact); - CleanupCompaction(compact); - c->ReleaseInputs(); - DeleteObsoleteFiles(); + if (c == NULL) { + // Nothing to do + } else if (!is_manual && c->IsTrivialMove()) { + // Move file to next level + assert(c->num_input_files(0) == 1); + FileMetaData* f = c->input(0, 0); + c->edit()->DeleteFile(c->level(), f->number); + c->edit()->AddFile(c->level() + 1, f->number, f->file_size, + f->smallest, f->largest); + status = versions_->LogAndApply(c->edit(), &mutex_); + VersionSet::LevelSummaryStorage tmp; + Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", + static_cast(f->number), + c->level() + 1, + static_cast(f->file_size), + status.ToString().c_str(), + versions_->LevelSummary(&tmp)); + } else { + CompactionState* compact = new CompactionState(c); + status = DoCompactionWork(compact); + CleanupCompaction(compact); + c->ReleaseInputs(); + DeleteObsoleteFiles(); + } + delete c; } - delete c; if (status.ok()) { - // Done + // Done } else if (shutting_down_.Acquire_Load()) { - // Ignore compaction errors found during shutting down + // Ignore compaction errors found during shutting down } else { - Log(options_.info_log, - "Compaction error: %s", status.ToString().c_str()); - if (options_.paranoid_checks && bg_error_.ok()) { - bg_error_ = status; - } + Log(options_.info_log, + "Compaction error: %s", status.ToString().c_str()); + if (options_.paranoid_checks && bg_error_.ok()) { + bg_error_ = status; + } } if (is_manual) { - ManualCompaction* m = manual_compaction_; - if (!status.ok()) { - m->done = true; - } - if (!m->done) { - // We only compacted part of the requested range. Update *m - // to the range that is left to be compacted. - m->tmp_storage = manual_end; - m->begin = &m->tmp_storage; - } - manual_compaction_ = NULL; + ManualCompaction* m = manual_compaction_; + if (!status.ok()) { + m->done = true; + } + if (!m->done) { + // We only compacted part of the requested range. Update *m + // to the range that is left to be compacted. + m->tmp_storage = manual_end; + m->begin = &m->tmp_storage; + } + manual_compaction_ = NULL; } + + return status; } + void DBImpl::CleanupCompaction(CompactionState* compact) { mutex_.AssertHeld(); if (compact->builder != NULL) { @@ -813,6 +831,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, const uint64_t current_bytes = compact->builder->FileSize(); compact->current_output()->file_size = current_bytes; compact->total_bytes += current_bytes; + compact->num_entries += compact->builder->NumEntries(); delete compact->builder; compact->builder = NULL; @@ -898,37 +917,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { std::string current_user_key; bool has_current_user_key = false; SequenceNumber last_sequence_for_key = kMaxSequenceNumber; + for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { // Prioritize immutable compaction work - if (has_imm_.NoBarrier_Load() != NULL) { - const uint64_t imm_start = env_->NowMicros(); - mutex_.Lock(); - if (imm_ != NULL) { - if (0 == compact->compaction->level()) - pthread_rwlock_unlock(&gThreadLock1); - pthread_rwlock_rdlock(&gThreadLock0); - CompactMemTable(); - pthread_rwlock_unlock(&gThreadLock0); - if (0 == compact->compaction->level()) - pthread_rwlock_rdlock(&gThreadLock1); - bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary - } - mutex_.Unlock(); - imm_micros += (env_->NowMicros() - imm_start); - } - - // pause to potentially hand off disk to - // memtable threads - pthread_rwlock_wrlock(&gThreadLock0); - pthread_rwlock_unlock(&gThreadLock0); - - // Give priorities to level 0 compactions, unless - // this compaction is blocking a level 0 in this database - if (0 != compact->compaction->level() && level0_good) - { - pthread_rwlock_wrlock(&gThreadLock1); - pthread_rwlock_unlock(&gThreadLock1); - } // if + imm_micros+=PrioritizeWork(0==compact->compaction->level()); Slice key = input->key(); if (compact->compaction->ShouldStopBefore(key) && @@ -1041,6 +1033,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { stats_[compact->compaction->level() + 1].Add(stats); if (status.ok()) { + if (0!=compact->num_entries) + versions_->SetWriteRate(stats.micros/compact->num_entries); status = InstallCompactionResults(compact); } VersionSet::LevelSummaryStorage tmp; @@ -1049,6 +1043,68 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { return status; } + +int64_t +DBImpl::PrioritizeWork( + bool IsLevel0) +{ + int64_t start_time; + bool again; + int ret_val; + struct timespec timeout; + + start_time=env_->NowMicros(); + + // loop while on hold due to higher priority stuff, + // but keep polling for need to handle imm_ + do + { + again=false; + + if (has_imm_.NoBarrier_Load() != NULL) { + mutex_.Lock(); + if (imm_ != NULL) { + if (IsLevel0) + pthread_rwlock_unlock(&gThreadLock1); + pthread_rwlock_rdlock(&gThreadLock0); + CompactMemTable(); + pthread_rwlock_unlock(&gThreadLock0); + if (IsLevel0) + pthread_rwlock_rdlock(&gThreadLock1); + bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary + } // if + mutex_.Unlock(); + } // if + + // pause to potentially hand off disk to + // memtable threads + clock_gettime(CLOCK_REALTIME, &timeout); + timeout.tv_sec+=5; + ret_val=pthread_rwlock_timedwrlock(&gThreadLock0, &timeout); + if (0==ret_val) + pthread_rwlock_unlock(&gThreadLock0); + again=(ETIMEDOUT==ret_val); + + // Give priorities to level 0 compactions, unless + // this compaction is blocking a level 0 in this database + if (!IsLevel0 && level0_good) + { + clock_gettime(CLOCK_REALTIME, &timeout); + timeout.tv_sec+=5; + ret_val=pthread_rwlock_timedwrlock(&gThreadLock1, &timeout); + if (0==ret_val) + pthread_rwlock_unlock(&gThreadLock1); + again=again || (ETIMEDOUT==ret_val); + } // if + } while(again); + + // let caller know how long was spent waiting. + return(env_->NowMicros() - start_time); + +} // PrioritizeWork + + + namespace { struct IterState { port::Mutex* mu; @@ -1303,12 +1359,16 @@ Status DBImpl::MakeRoomForWrite(bool force) { assert(!writers_.empty()); bool allow_delay = !force; Status s; + int throttle; - mutex_.Unlock(); - /// slowing things down - // shared thread block throttle - env_->WriteThrottle(versions_->NumLevelFiles(0)); - mutex_.Lock(); + throttle=versions_->WriteThrottleUsec(); + if (0!=throttle) + { + mutex_.Unlock(); + /// slowing things down + env_->SleepForMicroseconds(throttle); + mutex_.Lock(); + } // if // hint to background compaction. level0_good=(versions_->NumLevelFiles(0) < config::kL0_CompactionTrigger); @@ -1338,11 +1398,15 @@ Status DBImpl::MakeRoomForWrite(bool force) { } else if (imm_ != NULL) { // We have filled up the current memtable, but the previous // one is still being compacted, so we wait. + Log(options_.info_log, "waiting 2...\n"); + MaybeScheduleCompaction(); bg_cv_.Wait(); + Log(options_.info_log, "running 2...\n"); } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { // There are too many level-0 files. Log(options_.info_log, "waiting...\n"); bg_cv_.Wait(); + Log(options_.info_log, "running...\n"); } else { // Attempt to switch to a new memtable and trigger compaction of old assert(versions_->PrevLogNumber() == 0); diff --git a/db/db_impl.h b/db/db_impl.h index ba304636..f4f4eee3 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -97,6 +97,7 @@ class DBImpl : public DB { void BackgroundCompaction(); void CleanupCompaction(CompactionState* compact); Status DoCompactionWork(CompactionState* compact); + int64_t PrioritizeWork(bool IsLevel0); Status OpenCompactionOutputFile(CompactionState* compact); Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input); diff --git a/db/version_edit.h b/db/version_edit.h index 5f8b59e3..6d605e29 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -19,10 +19,13 @@ struct FileMetaData { int allowed_seeks; // Seeks allowed until compaction uint64_t number; uint64_t file_size; // File size in bytes + uint64_t num_entries; InternalKey smallest; // Smallest internal key served by table InternalKey largest; // Largest internal key served by table - FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) { } + FileMetaData() + : refs(0), allowed_seeks(1 << 30), file_size(0), num_entries(0) + { } }; diff --git a/db/version_set.cc b/db/version_set.cc index bbdcd962..ccac4bd4 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -706,6 +706,7 @@ VersionSet::VersionSet(const std::string& dbname, last_sequence_(0), log_number_(0), prev_log_number_(0), + write_rate_usec_(0), descriptor_file_(NULL), descriptor_log_(NULL), dummy_versions_(this), @@ -944,6 +945,7 @@ void VersionSet::Finalize(Version* v) { // Precomputed best level for next compaction int best_level = -1; double best_score = -1; + int penalty=0; for (int level = 0; level < config::kNumLevels-1; level++) { double score; @@ -964,12 +966,23 @@ void VersionSet::Finalize(Version* v) { // don't screw around ... get data written to disk! if (config::kL0_SlowdownWritesTrigger <= v->files_[level].size()) - score*=10.0; + score*=1000000.0; + + // compute penalty for write throttle if too many Level-0 files accumulating + if (config::kL0_CompactionTrigger <= v->files_[level].size()) + { + penalty+=v->files_[level].size() - config::kL0_CompactionTrigger +1; + } // if } else { // Compute the ratio of current size to size limit. const uint64_t level_bytes = TotalFileSize(v->files_[level]); score = static_cast(level_bytes) / MaxBytesForLevel(level); + + // compute aggressive penalty for write throttle, things go bad if higher + // levels are allowed to backup ... especially Level-1 + if (1<=score) + penalty+=(static_cast(score))*5; } if (score > best_score) { @@ -980,6 +993,11 @@ void VersionSet::Finalize(Version* v) { v->compaction_level_ = best_level; v->compaction_score_ = best_score; + + if (50write_penalty_ = penalty; + } Status VersionSet::WriteSnapshot(log::Writer* log) { diff --git a/db/version_set.h b/db/version_set.h index c5ec6940..bc9ca61c 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -21,6 +21,7 @@ #include "db/dbformat.h" #include "db/version_edit.h" #include "port/port.h" +#include "leveldb/env.h" namespace leveldb { @@ -103,6 +104,8 @@ class Version { int NumFiles(int level) const { return files_[level].size(); } + int WritePenalty() const {return write_penalty_; } + // Return a human readable string that describes this version's contents. std::string DebugString() const; @@ -130,13 +133,16 @@ class Version { // are initialized by Finalize(). double compaction_score_; int compaction_level_; + int write_penalty_; explicit Version(VersionSet* vset) : vset_(vset), next_(this), prev_(this), refs_(0), file_to_compact_(NULL), file_to_compact_level_(-1), compaction_score_(-1), - compaction_level_(-1) { + compaction_level_(-1), + write_penalty_(0) + { } ~Version(); @@ -198,6 +204,30 @@ class VersionSet { // being compacted, or zero if there is no such log file. uint64_t PrevLogNumber() const { return prev_log_number_; } + + void SetWriteRate(uint64_t Rate) + { + // take higher rate immediately + if (write_rate_usec_ < Rate) + write_rate_usec_=Rate; + + // scale down to a lower rate slowly + else + write_rate_usec_-=(write_rate_usec_ - Rate)/7; + + return; + }; + int WriteThrottleUsec() + { + int ret_msec=0; + int penalty=current_->write_penalty_; + penalty+=options_->env->GetBackgroundBacklog(); + + // divide by 10 so penalty becomes range of 1.? instead of 1? + return((int)((0!=penalty) ? ((penalty)*write_rate_usec_) : 0)); + } + + // Pick level and inputs for a new compaction. // Returns NULL if there is no compaction to be done. // Otherwise returns a pointer to a heap-allocated object that @@ -276,6 +306,7 @@ class VersionSet { uint64_t last_sequence_; uint64_t log_number_; uint64_t prev_log_number_; // 0 or backing store for memtable being compacted + uint64_t write_rate_usec_; // most recent average rate per key // Opened lazily WritableFile* descriptor_file_; diff --git a/include/leveldb/env.h b/include/leveldb/env.h index 5d645114..d7c3303b 100644 --- a/include/leveldb/env.h +++ b/include/leveldb/env.h @@ -158,9 +158,8 @@ class Env { // Sleep/delay the thread for the perscribed number of micro-seconds. virtual void SleepForMicroseconds(int micros) = 0; - // Simple adaptive write throttle to pace incoming data against - // hardware write throughput - virtual void WriteThrottle(int level0_count) {return;}; + // Where supported, give count of background jobs pending. + virtual int GetBackgroundBacklog() const {return(0);}; private: // No copying allowed diff --git a/util/env_posix.cc b/util/env_posix.cc index 515ffef1..646744b2 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -596,72 +596,30 @@ class PosixEnv : public Env { } virtual void SleepForMicroseconds(int micros) { - usleep(micros); - } +#if 0 + usleep(micros); //obsolete since POSIX.1-2008 +#else + struct timespec ts; + int ret_val; - virtual void WriteThrottle( - int level0_count) //!< database specific count - { - int pause; - static volatile int global_pause=0; - static pthread_mutex_t global_pause_mutex=PTHREAD_MUTEX_INITIALIZER; - - - pause=0; - // lock access to the queueX_ variables - PthreadCall("lock", pthread_mutex_lock(&mu_)); - - // write time assumptions: The server is going to be - // relatively fast when starting imm_ or level 1+ - // compactions, hence 1,500 write operation pace. - // The server is going to be busy when level 0 to 1 - // are falling behind, hence 666 write operations per second. - // Per thread block calculations cancel out in the math, hence ignored. - - // imm_ flush queue, cost is 0.4ms - /// (assumes need 0.5 seconds with 1,500 write/ps) - pause+=queue4_.size() * 333; - - // level 0 merge queue, cost 10.0ms - /// (assumes need 15 seconds with 1,500 write/ps) - pause+=queue2_.size() * 10000; - - // 4.5 ms for each level 0 file above compaction trigger point - // (assumes 3 seconds additional per file with 1,500 write/ps) - // Either there are too many Level 0 compaction already, or this - // vnode is in the middle of a Level 1+ compaction. Progressive - // slow down as time passes. - if (config::kL0_CompactionTrigger < level0_count) - pause+=(level0_count-config::kL0_CompactionTrigger) * 2000; - - // level 1+ work backlog - // just acknowledge that work is pending, but this work is - // not essential to write volume ... unless it blocks Level 0 - // of same vnode and above equation addresses that scenario. - if (0!=queue_.size()) - pause+=1000; - - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); - - PthreadCall("lock", pthread_mutex_lock(&global_pause_mutex)); - // want to impact all threads to slow down writes. - // there is no easy way today to know when to "turn off" - // the pause globally, so use --global_pause as proxy - // (this param is touchy) - if (global_pausefront().function; void* arg = queue_ptr->front().arg; queue_ptr->pop_front(); + bg_backlog_=queue4_.size()+queue2_.size()+queue_.size(); PthreadCall("unlock", pthread_mutex_unlock(&mu_)); From 80911d94f7d3b7f361dbde7ef36ec2f361a855de Mon Sep 17 00:00:00 2001 From: Matthew V Date: Thu, 6 Sep 2012 13:45:08 -0400 Subject: [PATCH 3/7] fix error due to missed change on master that is not on this branch --- db/db_impl.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 4f0203a4..5977fc27 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -679,7 +679,7 @@ void DBImpl::BackgroundCall() { bg_cv_.SignalAll(); } -Status DBImpl::BackgroundCompaction() { +void DBImpl::BackgroundCompaction() { Status status; bool is_manual = (manual_compaction_ != NULL); InternalKey manual_end; @@ -765,7 +765,6 @@ Status DBImpl::BackgroundCompaction() { manual_compaction_ = NULL; } - return status; } From 824a1bb8f646523c8f049180b6e901974f3a7ad8 Mon Sep 17 00:00:00 2001 From: Matthew V Date: Thu, 6 Sep 2012 15:49:11 -0400 Subject: [PATCH 4/7] Correct an incorrect correction made during cherry-pick. Funny what testing can find. --- db/db_impl.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/db/db_impl.cc b/db/db_impl.cc index 5977fc27..9272f552 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -690,6 +690,7 @@ void DBImpl::BackgroundCompaction() { pthread_rwlock_rdlock(&gThreadLock0); status=CompactMemTable(); pthread_rwlock_unlock(&gThreadLock0); + return; } if (status.ok()) From 7a80c10008b194786a15e63e2a6b5b37b85d2be4 Mon Sep 17 00:00:00 2001 From: Matthew V Date: Thu, 6 Sep 2012 17:41:26 -0400 Subject: [PATCH 5/7] Adjust the polling timeout from 5 to 1 per customer feedback. --- db/db_impl.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 9272f552..be9fd965 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1079,7 +1079,7 @@ DBImpl::PrioritizeWork( // pause to potentially hand off disk to // memtable threads clock_gettime(CLOCK_REALTIME, &timeout); - timeout.tv_sec+=5; + timeout.tv_sec+=1; ret_val=pthread_rwlock_timedwrlock(&gThreadLock0, &timeout); if (0==ret_val) pthread_rwlock_unlock(&gThreadLock0); @@ -1087,10 +1087,10 @@ DBImpl::PrioritizeWork( // Give priorities to level 0 compactions, unless // this compaction is blocking a level 0 in this database - if (!IsLevel0 && level0_good) + if (!IsLevel0 && level0_good && !again) { clock_gettime(CLOCK_REALTIME, &timeout); - timeout.tv_sec+=5; + timeout.tv_sec+=1; ret_val=pthread_rwlock_timedwrlock(&gThreadLock1, &timeout); if (0==ret_val) pthread_rwlock_unlock(&gThreadLock1); From 420f4b50dec91711e9fe754e16c2e47ad92f6300 Mon Sep 17 00:00:00 2001 From: Matthew V Date: Fri, 7 Sep 2012 11:28:54 -0400 Subject: [PATCH 6/7] Update db_impl.cc and env_posix.cc so that they will compile on apple/bsd. Restored BackgroundCompaction to google's formatting since alt code path not used. --- db/db_impl.cc | 151 ++++++++++++++++++++++++++-------------------- util/env_posix.cc | 18 +++--- 2 files changed, 96 insertions(+), 73 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index be9fd965..64334ac7 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -11,6 +11,7 @@ #include #include #include +#include #include #include "db/builder.h" #include "db/db_iter.h" @@ -681,8 +682,6 @@ void DBImpl::BackgroundCall() { void DBImpl::BackgroundCompaction() { Status status; - bool is_manual = (manual_compaction_ != NULL); - InternalKey manual_end; mutex_.AssertHeld(); @@ -693,79 +692,78 @@ void DBImpl::BackgroundCompaction() { return; } - if (status.ok()) - { - Compaction* c; - if (is_manual) { - ManualCompaction* m = manual_compaction_; - c = versions_->CompactRange(m->level, m->begin, m->end); - m->done = (c == NULL); - if (c != NULL) { - manual_end = c->input(0, c->num_input_files(0) - 1)->largest; - } - Log(options_.info_log, - "Manual compaction at level-%d from %s .. %s; will stop at %s\n", - m->level, - (m->begin ? m->begin->DebugString().c_str() : "(begin)"), - (m->end ? m->end->DebugString().c_str() : "(end)"), - (m->done ? "(end)" : manual_end.DebugString().c_str())); - } else { - c = versions_->PickCompaction(); - } - - if (c == NULL) { - // Nothing to do - } else if (!is_manual && c->IsTrivialMove()) { - // Move file to next level - assert(c->num_input_files(0) == 1); - FileMetaData* f = c->input(0, 0); - c->edit()->DeleteFile(c->level(), f->number); - c->edit()->AddFile(c->level() + 1, f->number, f->file_size, - f->smallest, f->largest); - status = versions_->LogAndApply(c->edit(), &mutex_); - VersionSet::LevelSummaryStorage tmp; - Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", - static_cast(f->number), - c->level() + 1, - static_cast(f->file_size), - status.ToString().c_str(), - versions_->LevelSummary(&tmp)); - } else { - CompactionState* compact = new CompactionState(c); - status = DoCompactionWork(compact); - CleanupCompaction(compact); - c->ReleaseInputs(); - DeleteObsoleteFiles(); - } - delete c; + Compaction* c; + bool is_manual = (manual_compaction_ != NULL); + InternalKey manual_end; + if (is_manual) { + ManualCompaction* m = manual_compaction_; + c = versions_->CompactRange(m->level, m->begin, m->end); + m->done = (c == NULL); + if (c != NULL) { + manual_end = c->input(0, c->num_input_files(0) - 1)->largest; + } + Log(options_.info_log, + "Manual compaction at level-%d from %s .. %s; will stop at %s\n", + m->level, + (m->begin ? m->begin->DebugString().c_str() : "(begin)"), + (m->end ? m->end->DebugString().c_str() : "(end)"), + (m->done ? "(end)" : manual_end.DebugString().c_str())); + } else { + c = versions_->PickCompaction(); + } + + + if (c == NULL) { + // Nothing to do + } else if (!is_manual && c->IsTrivialMove()) { + // Move file to next level + assert(c->num_input_files(0) == 1); + FileMetaData* f = c->input(0, 0); + c->edit()->DeleteFile(c->level(), f->number); + c->edit()->AddFile(c->level() + 1, f->number, f->file_size, + f->smallest, f->largest); + status = versions_->LogAndApply(c->edit(), &mutex_); + VersionSet::LevelSummaryStorage tmp; + Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", + static_cast(f->number), + c->level() + 1, + static_cast(f->file_size), + status.ToString().c_str(), + versions_->LevelSummary(&tmp)); + } else { + CompactionState* compact = new CompactionState(c); + status = DoCompactionWork(compact); + CleanupCompaction(compact); + c->ReleaseInputs(); + DeleteObsoleteFiles(); } + delete c; if (status.ok()) { - // Done + // Done } else if (shutting_down_.Acquire_Load()) { - // Ignore compaction errors found during shutting down + // Ignore compaction errors found during shutting down } else { - Log(options_.info_log, - "Compaction error: %s", status.ToString().c_str()); - if (options_.paranoid_checks && bg_error_.ok()) { - bg_error_ = status; - } + Log(options_.info_log, + "Compaction error: %s", status.ToString().c_str()); + if (options_.paranoid_checks && bg_error_.ok()) { + bg_error_ = status; + } } if (is_manual) { - ManualCompaction* m = manual_compaction_; - if (!status.ok()) { - m->done = true; - } - if (!m->done) { - // We only compacted part of the requested range. Update *m - // to the range that is left to be compacted. - m->tmp_storage = manual_end; - m->begin = &m->tmp_storage; - } - manual_compaction_ = NULL; + ManualCompaction* m = manual_compaction_; + if (!status.ok()) { + m->done = true; + } + if (!m->done) { + // We only compacted part of the requested range. Update *m + // to the range that is left to be compacted. + m->tmp_storage = manual_end; + m->begin = &m->tmp_storage; + } + manual_compaction_ = NULL; } - } @@ -1078,9 +1076,19 @@ DBImpl::PrioritizeWork( // pause to potentially hand off disk to // memtable threads +#if defined(_POSIX_TIMEOUTS) && (_POSIX_TIMEOUTS - 200112L) >= 0L clock_gettime(CLOCK_REALTIME, &timeout); timeout.tv_sec+=1; ret_val=pthread_rwlock_timedwrlock(&gThreadLock0, &timeout); +#else + // ugly spin lock + ret_val=pthread_rwlock_trywrlock(&gThreadLock0); + if (EBUSY==ret_val) + { + ret_val=ETIMEDOUT; + env_->SleepForMicroseconds(10000); // 10milliseconds + } // if +#endif if (0==ret_val) pthread_rwlock_unlock(&gThreadLock0); again=(ETIMEDOUT==ret_val); @@ -1089,9 +1097,20 @@ DBImpl::PrioritizeWork( // this compaction is blocking a level 0 in this database if (!IsLevel0 && level0_good && !again) { +#if defined(_POSIX_TIMEOUTS) && (_POSIX_TIMEOUTS - 200112L) >= 0L clock_gettime(CLOCK_REALTIME, &timeout); timeout.tv_sec+=1; ret_val=pthread_rwlock_timedwrlock(&gThreadLock1, &timeout); +#else + // ugly spin lock + ret_val=pthread_rwlock_trywrlock(&gThreadLock1); + if (EBUSY==ret_val) + { + ret_val=ETIMEDOUT; + env_->SleepForMicroseconds(10000); // 10milliseconds + } // if +#endif + if (0==ret_val) pthread_rwlock_unlock(&gThreadLock1); again=again || (ETIMEDOUT==ret_val); diff --git a/util/env_posix.cc b/util/env_posix.cc index 646744b2..d3775526 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -596,9 +596,6 @@ class PosixEnv : public Env { } virtual void SleepForMicroseconds(int micros) { -#if 0 - usleep(micros); //obsolete since POSIX.1-2008 -#else struct timespec ts; int ret_val; @@ -610,12 +607,15 @@ class PosixEnv : public Env { do { +#if _POSIX_TIMERS > 0L // later ... add test for CLOCK_MONOTONIC_RAW where supported (better) ret_val=clock_nanosleep(CLOCK_MONOTONIC,0, &ts, &ts); +#else + ret_val=nanosleep(&ts, &ts); +#endif } while(EINTR==ret_val && 0!=(ts.tv_sec+ts.tv_nsec)); } // if -#endif - } + } // SleepForMicroSeconds virtual int GetBackgroundBacklog() const {return(bg_backlog_);}; @@ -645,7 +645,7 @@ class PosixEnv : public Env { pthread_t bgthread4_; // imm_ to level 0 compactions bool started_bgthread_; volatile int bg_backlog_;// count of items on 3 compaction queues - int64_t clock_res_; + int64_t clock_res_; // Entry per Schedule() call struct BGItem { void* arg; void (*function)(void*); }; @@ -659,13 +659,17 @@ class PosixEnv : public Env { PosixEnv::PosixEnv() : page_size_(getpagesize()), started_bgthread_(false), - bg_backlog_(0) + bg_backlog_(0), + clock_res_(1) { struct timespec ts; + +#if _POSIX_TIMERS > 0L clock_getres(CLOCK_MONOTONIC, &ts); clock_res_=ts.tv_sec*1000000+ts.tv_nsec/1000; if (0==clock_res_) ++clock_res_; +#endif PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL)); PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL)); From cd3bcc898a8702dcab602ab4b4bea7f7d71a10b5 Mon Sep 17 00:00:00 2001 From: Matthew V Date: Fri, 7 Sep 2012 15:55:24 -0400 Subject: [PATCH 7/7] Internet was wrong on how to set feature test for clock_nanosleep. Last time I use Internet for technical data. --- util/env_posix.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/util/env_posix.cc b/util/env_posix.cc index d3775526..b92124d3 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -607,7 +607,7 @@ class PosixEnv : public Env { do { -#if _POSIX_TIMERS > 0L +#if _POSIX_TIMERS >= 200801L // later ... add test for CLOCK_MONOTONIC_RAW where supported (better) ret_val=clock_nanosleep(CLOCK_MONOTONIC,0, &ts, &ts); #else @@ -664,7 +664,7 @@ PosixEnv::PosixEnv() : page_size_(getpagesize()), { struct timespec ts; -#if _POSIX_TIMERS > 0L +#if _POSIX_TIMERS >= 200801L clock_getres(CLOCK_MONOTONIC, &ts); clock_res_=ts.tv_sec*1000000+ts.tv_nsec/1000; if (0==clock_res_)