Skip to content
1 change: 1 addition & 0 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Status BuildTable(const std::string& dbname,
Slice key = iter->key();
meta->largest.DecodeFrom(key);
builder->Add(key, iter->value());
++meta->num_entries;
}

// Finish and check for builder errors
Expand Down
159 changes: 121 additions & 38 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@

#include "db/db_impl.h"

#include <time.h>
#include <algorithm>
#include <errno.h>
#include <set>
#include <string>
#include <stdint.h>
#include <stdio.h>
#include <unistd.h>
#include <vector>
#include "db/builder.h"
#include "db/db_iter.h"
Expand Down Expand Up @@ -68,14 +71,16 @@ struct DBImpl::CompactionState {
TableBuilder* builder;

uint64_t total_bytes;
uint64_t num_entries;

Output* current_output() { return &outputs[outputs.size()-1]; }

explicit CompactionState(Compaction* c)
: compaction(c),
outfile(NULL),
builder(NULL),
total_bytes(0) {
total_bytes(0),
num_entries(0) {
}
};

Expand Down Expand Up @@ -468,9 +473,10 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
mutex_.Lock();
}

Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
Log(options_.info_log, "Level-0 table #%llu: %llu bytes, %llu keys %s",
(unsigned long long) meta.number,
(unsigned long long) meta.file_size,
(unsigned long long) meta.num_entries,
s.ToString().c_str());
delete iter;
pending_outputs_.erase(meta.number);
Expand All @@ -493,6 +499,13 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
stats.micros = env_->NowMicros() - start_micros;
stats.bytes_written = meta.file_size;
stats_[level].Add(stats);

if (0!=meta.num_entries && s.ok())
{
// 2x since mem to disk, not disk to disk
versions_->SetWriteRate(2*stats.micros/meta.num_entries);
} // if

return s;
}

Expand Down Expand Up @@ -668,11 +681,13 @@ void DBImpl::BackgroundCall() {
}

void DBImpl::BackgroundCompaction() {
Status status;

mutex_.AssertHeld();

if (imm_ != NULL) {
pthread_rwlock_rdlock(&gThreadLock0);
CompactMemTable();
status=CompactMemTable();
pthread_rwlock_unlock(&gThreadLock0);
return;
}
Expand All @@ -697,7 +712,7 @@ void DBImpl::BackgroundCompaction() {
c = versions_->PickCompaction();
}

Status status;

if (c == NULL) {
// Nothing to do
} else if (!is_manual && c->IsTrivialMove()) {
Expand Down Expand Up @@ -751,6 +766,7 @@ void DBImpl::BackgroundCompaction() {
}
}


void DBImpl::CleanupCompaction(CompactionState* compact) {
mutex_.AssertHeld();
if (compact->builder != NULL) {
Expand Down Expand Up @@ -813,6 +829,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
const uint64_t current_bytes = compact->builder->FileSize();
compact->current_output()->file_size = current_bytes;
compact->total_bytes += current_bytes;
compact->num_entries += compact->builder->NumEntries();
delete compact->builder;
compact->builder = NULL;

Expand Down Expand Up @@ -898,37 +915,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
std::string current_user_key;
bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;

for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
// Prioritize immutable compaction work
if (has_imm_.NoBarrier_Load() != NULL) {
const uint64_t imm_start = env_->NowMicros();
mutex_.Lock();
if (imm_ != NULL) {
if (0 == compact->compaction->level())
pthread_rwlock_unlock(&gThreadLock1);
pthread_rwlock_rdlock(&gThreadLock0);
CompactMemTable();
pthread_rwlock_unlock(&gThreadLock0);
if (0 == compact->compaction->level())
pthread_rwlock_rdlock(&gThreadLock1);
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
}
mutex_.Unlock();
imm_micros += (env_->NowMicros() - imm_start);
}

// pause to potentially hand off disk to
// memtable threads
pthread_rwlock_wrlock(&gThreadLock0);
pthread_rwlock_unlock(&gThreadLock0);

// Give priorities to level 0 compactions, unless
// this compaction is blocking a level 0 in this database
if (0 != compact->compaction->level() && level0_good)
{
pthread_rwlock_wrlock(&gThreadLock1);
pthread_rwlock_unlock(&gThreadLock1);
} // if
imm_micros+=PrioritizeWork(0==compact->compaction->level());

Slice key = input->key();
if (compact->compaction->ShouldStopBefore(key) &&
Expand Down Expand Up @@ -1041,6 +1031,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
stats_[compact->compaction->level() + 1].Add(stats);

if (status.ok()) {
if (0!=compact->num_entries)
versions_->SetWriteRate(stats.micros/compact->num_entries);
status = InstallCompactionResults(compact);
}
VersionSet::LevelSummaryStorage tmp;
Expand All @@ -1049,6 +1041,89 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
return status;
}


int64_t
DBImpl::PrioritizeWork(
bool IsLevel0)
{
int64_t start_time;
bool again;
int ret_val;
struct timespec timeout;

start_time=env_->NowMicros();

// loop while on hold due to higher priority stuff,
// but keep polling for need to handle imm_
do
{
again=false;

if (has_imm_.NoBarrier_Load() != NULL) {
mutex_.Lock();
if (imm_ != NULL) {
if (IsLevel0)
pthread_rwlock_unlock(&gThreadLock1);
pthread_rwlock_rdlock(&gThreadLock0);
CompactMemTable();
pthread_rwlock_unlock(&gThreadLock0);
if (IsLevel0)
pthread_rwlock_rdlock(&gThreadLock1);
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
} // if
mutex_.Unlock();
} // if

// pause to potentially hand off disk to
// memtable threads
#if defined(_POSIX_TIMEOUTS) && (_POSIX_TIMEOUTS - 200112L) >= 0L
clock_gettime(CLOCK_REALTIME, &timeout);
timeout.tv_sec+=1;
ret_val=pthread_rwlock_timedwrlock(&gThreadLock0, &timeout);
#else
// ugly spin lock
ret_val=pthread_rwlock_trywrlock(&gThreadLock0);
if (EBUSY==ret_val)
{
ret_val=ETIMEDOUT;
env_->SleepForMicroseconds(10000); // 10milliseconds
} // if
#endif
if (0==ret_val)
pthread_rwlock_unlock(&gThreadLock0);
again=(ETIMEDOUT==ret_val);

// Give priorities to level 0 compactions, unless
// this compaction is blocking a level 0 in this database
if (!IsLevel0 && level0_good && !again)
{
#if defined(_POSIX_TIMEOUTS) && (_POSIX_TIMEOUTS - 200112L) >= 0L
clock_gettime(CLOCK_REALTIME, &timeout);
timeout.tv_sec+=1;
ret_val=pthread_rwlock_timedwrlock(&gThreadLock1, &timeout);
#else
// ugly spin lock
ret_val=pthread_rwlock_trywrlock(&gThreadLock1);
if (EBUSY==ret_val)
{
ret_val=ETIMEDOUT;
env_->SleepForMicroseconds(10000); // 10milliseconds
} // if
#endif

if (0==ret_val)
pthread_rwlock_unlock(&gThreadLock1);
again=again || (ETIMEDOUT==ret_val);
} // if
} while(again);

// let caller know how long was spent waiting.
return(env_->NowMicros() - start_time);

} // PrioritizeWork



namespace {
struct IterState {
port::Mutex* mu;
Expand Down Expand Up @@ -1303,12 +1378,16 @@ Status DBImpl::MakeRoomForWrite(bool force) {
assert(!writers_.empty());
bool allow_delay = !force;
Status s;
int throttle;

mutex_.Unlock();
/// slowing things down
// shared thread block throttle
env_->WriteThrottle(versions_->NumLevelFiles(0));
mutex_.Lock();
throttle=versions_->WriteThrottleUsec();
if (0!=throttle)
{
mutex_.Unlock();
/// slowing things down
env_->SleepForMicroseconds(throttle);
mutex_.Lock();
} // if

// hint to background compaction.
level0_good=(versions_->NumLevelFiles(0) < config::kL0_CompactionTrigger);
Expand Down Expand Up @@ -1338,11 +1417,15 @@ Status DBImpl::MakeRoomForWrite(bool force) {
} else if (imm_ != NULL) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
Log(options_.info_log, "waiting 2...\n");
MaybeScheduleCompaction();
bg_cv_.Wait();
Log(options_.info_log, "running 2...\n");
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// There are too many level-0 files.
Log(options_.info_log, "waiting...\n");
bg_cv_.Wait();
Log(options_.info_log, "running...\n");
} else {
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
Expand Down
1 change: 1 addition & 0 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class DBImpl : public DB {
void BackgroundCompaction();
void CleanupCompaction(CompactionState* compact);
Status DoCompactionWork(CompactionState* compact);
int64_t PrioritizeWork(bool IsLevel0);

Status OpenCompactionOutputFile(CompactionState* compact);
Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input);
Expand Down
21 changes: 20 additions & 1 deletion db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,31 @@ struct FileMetaData {
int allowed_seeks; // Seeks allowed until compaction
uint64_t number;
uint64_t file_size; // File size in bytes
uint64_t num_entries;
InternalKey smallest; // Smallest internal key served by table
InternalKey largest; // Largest internal key served by table

FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) { }
FileMetaData()
: refs(0), allowed_seeks(1 << 30), file_size(0), num_entries(0)
{ }
};


class FileMetaDataPtrCompare
{
protected:
const Comparator * comparator_;

public:
explicit FileMetaDataPtrCompare(const Comparator * Comparer)
: comparator_(Comparer) {};

bool operator() (const FileMetaData * file1, const FileMetaData * file2) const
{
return(comparator_->Compare(file1->smallest.user_key(), file2->smallest.user_key()) < 0);
}
}; // class FileMetaDataPtrCompare

class VersionEdit {
public:
VersionEdit() { Clear(); }
Expand Down
32 changes: 31 additions & 1 deletion db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,7 @@ VersionSet::VersionSet(const std::string& dbname,
last_sequence_(0),
log_number_(0),
prev_log_number_(0),
write_rate_usec_(0),
descriptor_file_(NULL),
descriptor_log_(NULL),
dummy_versions_(this),
Expand Down Expand Up @@ -944,6 +945,7 @@ void VersionSet::Finalize(Version* v) {
// Precomputed best level for next compaction
int best_level = -1;
double best_score = -1;
int penalty=0;

for (int level = 0; level < config::kNumLevels-1; level++) {
double score;
Expand All @@ -964,12 +966,23 @@ void VersionSet::Finalize(Version* v) {

// don't screw around ... get data written to disk!
if (config::kL0_SlowdownWritesTrigger <= v->files_[level].size())
score*=10.0;
score*=1000000.0;

// compute penalty for write throttle if too many Level-0 files accumulating
if (config::kL0_CompactionTrigger <= v->files_[level].size())
{
penalty+=v->files_[level].size() - config::kL0_CompactionTrigger +1;
} // if

} else {
// Compute the ratio of current size to size limit.
const uint64_t level_bytes = TotalFileSize(v->files_[level]);
score = static_cast<double>(level_bytes) / MaxBytesForLevel(level);

// compute aggressive penalty for write throttle, things go bad if higher
// levels are allowed to backup ... especially Level-1
if (1<=score)
penalty+=(static_cast<int>(score))*5;
}

if (score > best_score) {
Expand All @@ -980,6 +993,11 @@ void VersionSet::Finalize(Version* v) {

v->compaction_level_ = best_level;
v->compaction_score_ = best_score;

if (50<penalty)
penalty=50;
v->write_penalty_ = penalty;

}

Status VersionSet::WriteSnapshot(log::Writer* log) {
Expand Down Expand Up @@ -1225,6 +1243,18 @@ Compaction* VersionSet::PickCompaction() {
// which will include the picked file.
current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
assert(!c->inputs_[0].empty());

// this can get into tens of thousands after a repair
// keep it sane
if (options_->max_open_files < c->inputs_[0].size())
{
std::nth_element(c->inputs_[0].begin(),
c->inputs_[0].begin()+options_->max_open_files-1,
c->inputs_[0].end(),FileMetaDataPtrCompare(options_->comparator));
c->inputs_[0].erase(c->inputs_[0].begin()+options_->max_open_files,
c->inputs_[0].end());
} // if

}

SetupOtherInputs(c);
Expand Down
Loading