Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
TableCache* table_cache, Iterator* iter, FileMetaData* meta) {
Status s;
meta->file_size = 0;
meta->num_entries = 0;
meta->num_tombstones = 0;
iter->SeekToFirst();

std::string fname = TableFileName(dbname, meta->number);
Expand All @@ -31,8 +33,13 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
TableBuilder* builder = new TableBuilder(options, file);
meta->smallest.DecodeFrom(iter->key());
Slice key;
ParsedInternalKey ikey;
for (; iter->Valid(); iter->Next()) {
key = iter->key();
meta->num_entries++;
if (ParseInternalKey(key, &ikey) && ikey.type == kTypeDeletion) {
meta->num_tombstones++;
}
builder->Add(key, iter->value());
}
if (!key.empty()) {
Expand Down
24 changes: 19 additions & 5 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ struct DBImpl::CompactionState {
struct Output {
uint64_t number;
uint64_t file_size;
uint64_t num_tombstones; // Number of tombstone entries in output file
uint64_t num_entries; // Total number of entries in output file
InternalKey smallest, largest;
};

Expand Down Expand Up @@ -535,8 +537,8 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
if (base != nullptr) {
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
}
edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
meta.largest);
edit->AddFile(level, meta.number, meta.file_size, meta.num_tombstones,
meta.num_entries, meta.smallest, meta.largest);
}

CompactionStats stats;
Expand Down Expand Up @@ -740,8 +742,8 @@ void DBImpl::BackgroundCompaction() {
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0);
c->edit()->RemoveFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
f->largest);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->num_tombstones,
f->num_entries, f->smallest, f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_);
if (!status.ok()) {
RecordBackgroundError(status);
Expand Down Expand Up @@ -813,6 +815,9 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
pending_outputs_.insert(file_number);
CompactionState::Output out;
out.number = file_number;
out.file_size = 0;
out.num_tombstones = 0;
out.num_entries = 0;
out.smallest.Clear();
out.largest.Clear();
compact->outputs.push_back(out);
Expand Down Expand Up @@ -890,6 +895,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
out.num_tombstones, out.num_entries,
out.smallest, out.largest);
}
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
Expand Down Expand Up @@ -949,7 +955,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {

// Handle key/value, add to state, etc.
bool drop = false;
if (!ParseInternalKey(key, &ikey)) {
const bool parsed_ok = ParseInternalKey(key, &ikey);
if (!parsed_ok) {
// Do not hide error keys
current_user_key.clear();
has_current_user_key = false;
Expand Down Expand Up @@ -1004,6 +1011,13 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
compact->current_output()->smallest.DecodeFrom(key);
}
compact->current_output()->largest.DecodeFrom(key);

// Update tombstone statistics for output file
compact->current_output()->num_entries++;
if (parsed_ok && ikey.type == kTypeDeletion) {
compact->current_output()->num_tombstones++;
}

compact->builder->Add(key, input->value());

// Close output file if it is big enough
Expand Down
10 changes: 8 additions & 2 deletions db/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ class Repairer {
bool empty = true;
ParsedInternalKey parsed;
t.max_sequence = 0;
t.meta.num_entries = 0;
t.meta.num_tombstones = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
Slice key = iter->key();
if (!ParseInternalKey(key, &parsed)) {
Expand All @@ -268,6 +270,10 @@ class Repairer {
}

counter++;
t.meta.num_entries++;
if (parsed.type == kTypeDeletion) {
t.meta.num_tombstones++;
}
if (empty) {
empty = false;
t.meta.smallest.DecodeFrom(key);
Expand Down Expand Up @@ -368,8 +374,8 @@ class Repairer {
for (size_t i = 0; i < tables_.size(); i++) {
// TODO(opt): separate out into multiple levels
const TableInfo& t = tables_[i];
edit_.AddFile(0, t.meta.number, t.meta.file_size, t.meta.smallest,
t.meta.largest);
edit_.AddFile(0, t.meta.number, t.meta.file_size, t.meta.num_tombstones,
t.meta.num_entries, t.meta.smallest, t.meta.largest);
}

// std::fprintf(stderr,
Expand Down
45 changes: 38 additions & 7 deletions db/version_edit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ enum Tag {
kDeletedFile = 6,
kNewFile = 7,
// 8 was used for large value refs
kPrevLogNumber = 9
kPrevLogNumber = 9,
kNewFileWithStats = 10 // New file with tombstone statistics
};

void VersionEdit::Clear() {
Expand Down Expand Up @@ -75,12 +76,23 @@ void VersionEdit::EncodeTo(std::string* dst) const {

for (size_t i = 0; i < new_files_.size(); i++) {
const FileMetaData& f = new_files_[i].second;
PutVarint32(dst, kNewFile);
PutVarint32(dst, new_files_[i].first); // level
PutVarint64(dst, f.number);
PutVarint64(dst, f.file_size);
PutLengthPrefixedSlice(dst, f.smallest.Encode());
PutLengthPrefixedSlice(dst, f.largest.Encode());
if (f.num_entries > 0 || f.num_tombstones > 0) {
PutVarint32(dst, kNewFileWithStats);
PutVarint32(dst, new_files_[i].first); // level
PutVarint64(dst, f.number);
PutVarint64(dst, f.file_size);
PutVarint64(dst, f.num_tombstones);
PutVarint64(dst, f.num_entries);
PutLengthPrefixedSlice(dst, f.smallest.Encode());
PutLengthPrefixedSlice(dst, f.largest.Encode());
} else {
PutVarint32(dst, kNewFile);
PutVarint32(dst, new_files_[i].first); // level
PutVarint64(dst, f.number);
PutVarint64(dst, f.file_size);
PutLengthPrefixedSlice(dst, f.smallest.Encode());
PutLengthPrefixedSlice(dst, f.largest.Encode());
}
}
}

Expand Down Expand Up @@ -176,6 +188,8 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
break;

case kNewFile:
f.num_tombstones = 0;
f.num_entries = 0;
if (GetLevel(&input, &level) && GetVarint64(&input, &f.number) &&
GetVarint64(&input, &f.file_size) &&
GetInternalKey(&input, &f.smallest) &&
Expand All @@ -186,6 +200,19 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
}
break;

case kNewFileWithStats:
if (GetLevel(&input, &level) && GetVarint64(&input, &f.number) &&
GetVarint64(&input, &f.file_size) &&
GetVarint64(&input, &f.num_tombstones) &&
GetVarint64(&input, &f.num_entries) &&
GetInternalKey(&input, &f.smallest) &&
GetInternalKey(&input, &f.largest)) {
new_files_.push_back(std::make_pair(level, f));
} else {
msg = "new-file with stats entry";
}
break;

default:
msg = "unknown tag";
break;
Expand Down Expand Up @@ -246,6 +273,10 @@ std::string VersionEdit::DebugString() const {
AppendNumberTo(&r, f.number);
r.append(" ");
AppendNumberTo(&r, f.file_size);
r.append(" tombstones:");
AppendNumberTo(&r, f.num_tombstones);
r.append(" entries:");
AppendNumberTo(&r, f.num_entries);
r.append(" ");
r.append(f.smallest.DebugString());
r.append(" .. ");
Expand Down
32 changes: 28 additions & 4 deletions db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,25 @@ namespace leveldb {
class VersionSet;

struct FileMetaData {
FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) {}
FileMetaData()
: refs(0),
allowed_seeks(1 << 30),
file_size(0),
num_tombstones(0),
num_entries(0) {}

int refs;
int allowed_seeks; // Seeks allowed until compaction
uint64_t number;
uint64_t file_size; // File size in bytes
InternalKey smallest; // Smallest internal key served by table
InternalKey largest; // Largest internal key served by table
uint64_t file_size; // File size in bytes
uint64_t num_tombstones; // Number of tombstone entries (kTypeDeletion)
uint64_t num_entries; // Total number of entries in the file
InternalKey smallest; // Smallest internal key served by table
InternalKey largest; // Largest internal key served by table

double TombstoneDensity() const {
return (num_entries > 0) ? static_cast<double>(num_tombstones) / num_entries : 0.0;
}
};

class VersionEdit {
Expand Down Expand Up @@ -70,6 +81,19 @@ class VersionEdit {
new_files_.push_back(std::make_pair(level, f));
}

void AddFile(int level, uint64_t file, uint64_t file_size,
uint64_t num_tombstones, uint64_t num_entries,
const InternalKey& smallest, const InternalKey& largest) {
FileMetaData f;
f.number = file;
f.file_size = file_size;
f.num_tombstones = num_tombstones;
f.num_entries = num_entries;
f.smallest = smallest;
f.largest = largest;
new_files_.push_back(std::make_pair(level, f));
}

// Delete the specified "file" from the specified "level".
void RemoveFile(int level, uint64_t file) {
deleted_files_.insert(std::make_pair(level, file));
Expand Down
46 changes: 42 additions & 4 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,15 @@ void VersionSet::Finalize(Version* v) {
int best_level = -1;
double best_score = -1;

// Reset tombstone compaction fields
v->tombstone_file_to_compact_ = nullptr;
v->tombstone_file_to_compact_level_ = -1;

// Track the file with highest tombstone density (> 50%)
FileMetaData* highest_tombstone_file = nullptr;
int highest_tombstone_level = -1;
double highest_tombstone_density = 0.5; // Threshold: 50%

for (int level = 0; level < config::kNumLevels - 1; level++) {
double score;
if (level == 0) {
Expand All @@ -1054,6 +1063,16 @@ void VersionSet::Finalize(Version* v) {
const uint64_t level_bytes = TotalFileSize(v->files_[level]);
score =
static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level);

// Check tombstone density for levels > 0
for (FileMetaData* f : v->files_[level]) {
double density = f->TombstoneDensity();
if (density > highest_tombstone_density) {
highest_tombstone_density = density;
highest_tombstone_file = f;
highest_tombstone_level = level;
}
}
}

if (score > best_score) {
Expand All @@ -1064,6 +1083,12 @@ void VersionSet::Finalize(Version* v) {

v->compaction_level_ = best_level;
v->compaction_score_ = best_score;

// Set tombstone compaction file if found (density > 50%)
if (highest_tombstone_file != nullptr) {
v->tombstone_file_to_compact_ = highest_tombstone_file;
v->tombstone_file_to_compact_level_ = highest_tombstone_level;
}
}

Status VersionSet::WriteSnapshot(log::Writer* log) {
Expand All @@ -1087,7 +1112,8 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
const std::vector<FileMetaData*>& files = current_->files_[level];
for (size_t i = 0; i < files.size(); i++) {
const FileMetaData* f = files[i];
edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest);
edit.AddFile(level, f->number, f->file_size, f->num_tombstones,
f->num_entries, f->smallest, f->largest);
}
}

Expand Down Expand Up @@ -1253,11 +1279,23 @@ Compaction* VersionSet::PickCompaction() {
Compaction* c;
int level;

// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
// Priority order:
// 1. Tombstone compaction (density > 50%) - highest priority
// 2. Size compaction (based on file size/number limits)
// 3. Seek compaction (based on seek stats)
const bool tombstone_compaction = (current_->tombstone_file_to_compact_ != nullptr);
const bool size_compaction = (current_->compaction_score_ >= 1);
const bool seek_compaction = (current_->file_to_compact_ != nullptr);
if (size_compaction) {

if (tombstone_compaction) {
// Prioritize compaction for files with high tombstone density (> 50%)
level = current_->tombstone_file_to_compact_level_;
assert(level >= 0);
assert(level > 0); // Tombstone compaction only for levels > 0
assert(level + 1 < config::kNumLevels);
c = new Compaction(options_, level);
c->inputs_[0].push_back(current_->tombstone_file_to_compact_);
} else if (size_compaction) {
level = current_->compaction_level_;
assert(level >= 0);
assert(level + 1 < config::kNumLevels);
Expand Down
10 changes: 9 additions & 1 deletion db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ class Version {
refs_(0),
file_to_compact_(nullptr),
file_to_compact_level_(-1),
tombstone_file_to_compact_(nullptr),
tombstone_file_to_compact_level_(-1),
compaction_score_(-1),
compaction_level_(-1) {}

Expand Down Expand Up @@ -157,6 +159,11 @@ class Version {
FileMetaData* file_to_compact_;
int file_to_compact_level_;

// Next file to compact based on tombstone density.
// When tombstone density > 50%, we prioritize this file for compaction.
FileMetaData* tombstone_file_to_compact_;
int tombstone_file_to_compact_level_;

// Level that should be compacted next and its compaction score.
// Score < 1 means compaction is not strictly needed. These fields
// are initialized by Finalize().
Expand Down Expand Up @@ -251,7 +258,8 @@ class VersionSet {
// Returns true iff some level needs a compaction.
bool NeedsCompaction() const {
Version* v = current_;
return (v->compaction_score_ >= 1) || (v->file_to_compact_ != nullptr);
return (v->compaction_score_ >= 1) || (v->file_to_compact_ != nullptr) ||
(v->tombstone_file_to_compact_ != nullptr);
}

// Add all files listed in any live version to *live.
Expand Down