uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const { // Note: the result for level zero is not really used since we set // the level-0 compaction threshold based on number of files. assert(level >= 0); assert(level < static_cast<int>(level_max_bytes_.size())); return level_max_bytes_[level]; }
// Try whether we can make last level's target size to be max_level_size uint64_t cur_level_size = max_level_size; for (int i = num_levels_ - 2; i >= first_non_empty_level; i--) { // Round up after dividing cur_level_size = static_cast<uint64_t>( cur_level_size / options.max_bytes_for_level_multiplier); }
找到base_level_size,一般来说也就是cur_level_size.
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Find base level (where L0 data is compacted to). base_level_ = first_non_empty_level; while (base_level_ > 1 && cur_level_size > base_bytes_max) { --base_level_; cur_level_size = static_cast<uint64_t>( cur_level_size / options.max_bytes_for_level_multiplier); } if (cur_level_size > base_bytes_max) { // Even L1 will be too large assert(base_level_ == 1); base_level_size = base_bytes_max; } else { base_level_size = cur_level_size; }
然后给level_max_bytes_ 赋值
1 2 3 4 5 6 7 8 9 10 11 12 13
uint64_t level_size = base_level_size; for (int i = base_level_; i < num_levels_; i++) { if (i > base_level_) { level_size = MultiplyCheckOverflow( level_size, options.max_bytes_for_level_multiplier); } // Don't set any level below base_bytes_max. Otherwise, the LSM can // assume an hourglass shape where L1+ sizes are smaller than L0. This // causes compaction scoring, which depends on level sizes, to favor L1+ // at the expense of L0, which may fill up and stall. level_max_bytes_[i] = std::max(level_size, base_bytes_max); } }
// cfd is referenced here auto cfd = PopFirstFromCompactionQueue(); // We unreference here because the following code will take a Ref() on // this cfd if it is going to use it (Compaction class holds a // reference). // This will all happen under a mutex so we don't have to be afraid of // somebody else deleting it. if (cfd->Unref()) { delete cfd; // This was the last reference of the column family, so no need to // compact. return Status::OK(); }
Compaction* LevelCompactionBuilder::PickCompaction() { // Pick up the first file to start compaction. It may have been extended // to a clean cut. SetupInitialFiles(); if (start_level_inputs_.empty()) { returnnullptr; } assert(start_level_ >= 0 && output_level_ >= 0);
// If it is a L0 -> base level compaction, we need to set up other L0 // files if needed. if (!SetupOtherL0FilesIfNeeded()) { returnnullptr; }
// Pick files in the output level and expand more files in the start level // if needed. if (!SetupOtherInputsIfNeeded()) { returnnullptr; }
// Form a compaction object containing the files we picked. Compaction* c = GetCompaction();
// Pick the largest file in this level that is not already // being compacted conststd::vector<int>& file_size = vstorage_->FilesByCompactionPri(start_level_); conststd::vector<FileMetaData*>& level_files = vstorage_->LevelFiles(start_level_);
if (start_level_ == 0 && output_level_ != 0) { // Two level 0 compaction won't run at the same time, so don't need to worry // about files on level 0 being compacted. assert(compaction_picker_->level0_compactions_in_progress()->empty()); InternalKey smallest, largest; compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest); // Note that the next call will discard the file we placed in // c->inputs_[0] earlier and replace it with an overlapping set // which will include the picked file. start_level_inputs_.files.clear(); vstorage_->GetOverlappingInputs(0, &smallest, &largest, &start_level_inputs_.files);
// If we include more L0 files in the same compaction run it can // cause the 'smallest' and 'largest' key to get extended to a // larger range. So, re-invoke GetRange to get the new key range compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest); if (compaction_picker_->IsRangeInCompaction( vstorage_, &smallest, &largest, output_level_, &parent_index_)) { return false; } }
if (output_level_ != 0) { output_level_inputs_.level = output_level_; if (!compaction_picker_->SetupOtherInputs( cf_name_, mutable_cf_options_, vstorage_, &start_level_inputs_, &output_level_inputs_, &parent_index_, base_index_)) { returnfalse; }
compaction_inputs_.push_back(start_level_inputs_); if (!output_level_inputs_.empty()) { compaction_inputs_.push_back(output_level_inputs_); }
// In some edge cases we could pick a compaction that will be compacting // a key range that overlap with another running compaction, and both // of them have the same output level. This could happen if // (1) we are running a non-exclusive manual compaction // (2) AddFile ingest a new file into the LSM tree // We need to disallow this from happening. if (compaction_picker_->FilesRangeOverlapWithCompaction(compaction_inputs_, output_level_)) { // This compaction output could potentially conflict with the output // of a currently running compaction, we cannot run it. returnfalse; } compaction_picker_->GetGrandparents(vstorage_, start_level_inputs_, output_level_inputs_, &grandparents_); }
最后就是构造一个compact然后返回.
1 2 3 4 5 6
// Form a compaction object containing the files we picked. Compaction* c = GetCompaction();
void CompactionJob::GenSubcompactionBoundaries() { ........................... // Add the starting and/or ending key of certain input files as a potential // boundary for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) { int lvl = c->level(lvl_idx); if (lvl >= start_lvl && lvl <= out_lvl) { const LevelFilesBrief* flevel = c->input_levels(lvl_idx); size_t num_files = flevel->num_files; ..................... if (lvl == 0) { // For level 0 add the starting and ending key of each file since the // files may have greatly differing key ranges (not range-partitioned) for (size_t i = 0; i < num_files; i++) { bounds.emplace_back(flevel->files[i].smallest_key); bounds.emplace_back(flevel->files[i].largest_key); } } else { // For all other levels add the smallest/largest key in the level to // encompass the range covered by that level bounds.emplace_back(flevel->files[0].smallest_key); bounds.emplace_back(flevel->files[num_files - 1].largest_key); if (lvl == out_lvl) { // For the last level include the starting keys of all files since // the last level is the largest and probably has the widest key // range. Since it's range partitioned, the ending key of one file // and the starting key of the next are very close (or identical). for (size_t i = 1; i < num_files; i++) { bounds.emplace_back(flevel->files[i].smallest_key); } } } } } ......................
然后则是对取得的bounds进行排序以及去重.
1 2 3 4 5 6 7 8 9 10 11 12 13
std::sort(bounds.begin(), bounds.end(), [cfd_comparator](const Slice& a, const Slice& b) -> bool { return cfd_comparator->Compare(ExtractUserKey(a), ExtractUserKey(b)) < 0; }); // Remove duplicated entries from bounds bounds.erase( std::unique(bounds.begin(), bounds.end(), [cfd_comparator](const Slice& a, const Slice& b) -> bool { return cfd_comparator->Compare(ExtractUserKey(a), ExtractUserKey(b)) == 0; }), bounds.end());
接近着就来计算理想情况下所需要的subcompactions的个数以及输出文件的个数.
1 2 3 4 5 6 7 8 9 10 11 12
// Group the ranges into subcompactions constdouble min_file_fill_percent = 4.0 / 5; int base_level = v->storage_info()->base_level(); uint64_t max_output_files = static_cast<uint64_t>(std::ceil( sum / min_file_fill_percent / MaxFileSizeForLevel(*(c->mutable_cf_options()), out_lvl, c->immutable_cf_options()->compaction_style, base_level, c->immutable_cf_options()->level_compaction_dynamic_level_bytes))); uint64_t subcompactions = std::min({static_cast<uint64_t>(ranges.size()), static_cast<uint64_t>(c->max_subcompactions()), max_output_files});
if (subcompactions > 1) { double mean = sum * 1.0 / subcompactions; // Greedily add ranges to the subcompaction until the sum of the ranges' // sizes becomes >= the expected mean size of a subcompaction sum = 0; for (size_t i = 0; i < ranges.size() - 1; i++) { sum += ranges[i].size; if (subcompactions == 1) { // If there's only one left to schedule then it goes to the end so no // need to put an end boundary continue; } if (sum >= mean) { boundaries_.emplace_back(ExtractUserKey(ranges[i].range.limit)); sizes_.emplace_back(sum); subcompactions--; sum = 0; } } sizes_.emplace_back(sum + ranges.back().size); }
// Launch a thread for each of subcompactions 1...num_threads-1 std::vector<port::Thread> thread_pool; thread_pool.reserve(num_threads - 1); for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) { thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this, &compact_->sub_compact_states[i]); }
// Always schedule the first subcompaction (whether or not there are also // others) in the current thread to be efficient with resources ProcessKeyValueCompaction(&compact_->sub_compact_states[0]);
// Wait for all other threads (if there are any) to finish execution for (auto& thread : thread_pool) { thread.join(); }
if (output_directory_) { output_directory_->Fsync(); }