Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, LogBuffer* log_buffer) { ................................ while (!flush_queue_.empty()) { // This cfd is already referenced auto first_cfd = PopFirstFromFlushQueue();
if (first_cfd->IsDropped() || !first_cfd->imm()->IsFlushPending()) { // can't flush this CF, try next one if (first_cfd->Unref()) { delete first_cfd; } continue; }
// found a flush! cfd = first_cfd; break; }
if (cfd != nullptr) { .................................... status = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress, job_context, log_buffer); if (cfd->Unref()) { delete cfd; } } return status; }
Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) { ................................... for (auto cfd : *versions_->GetColumnFamilySet()) { ............................... if (cfd_picked != nullptr) { status = SwitchMemtable(cfd_picked, write_context, FlushReason::kWriteBufferFull); if (status.ok()) { cfd_picked->imm()->FlushRequested(); SchedulePendingFlush(cfd_picked, FlushReason::kWriteBufferFull); MaybeScheduleFlushOrCompaction(); } } return status; }
这个函数的调用是在是在写WAL之前,也就是每次写WAL都会进行这个判断.
1 2 3 4 5 6 7 8 9 10 11 12 13 14
Status DBImpl::PreprocessWrite(const WriteOptions& write_options, bool* need_log_sync, WriteContext* write_context) { .......................................... if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) { // Before a new memtable is added in SwitchMemtable(), // write_buffer_manager_->ShouldFlush() will keep returning true. If another // thread is writing to another DB with the same write buffer, they may also // be flushed. We may end up with flushing much more DBs than needed. It's // suboptimal but still correct. status = HandleWriteBufferFull(write_context); } ........................................ }
// Should only be called from write thread boolShouldFlush()const{ if (enabled()) { if (mutable_memtable_memory_usage() > mutable_limit_) { returntrue; } if (memory_usage() >= buffer_size_ && mutable_memtable_memory_usage() >= buffer_size_ / 2) { // If the memory exceeds the buffer size, we trigger more aggressive // flush. But if already more than half memory is being flushed, // triggering more flush may not help. We will hold it instead. returntrue; } } returnfalse; }
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& flush_options, FlushReason flush_reason, bool writes_stopped) { Status s; uint64_t flush_memtable_id = 0; { .........................................
// SwitchMemtable() will release and reacquire mutex during execution s = SwitchMemtable(cfd, &context); flush_memtable_id = cfd->imm()->GetLatestMemTableID();
if (!writes_stopped) { write_thread_.ExitUnbatched(&w); }
void MemTable::UpdateFlushState() { auto state = flush_state_.load(std::memory_order_relaxed); if (state == FLUSH_NOT_REQUESTED && ShouldFlushNow()) { // ignore CAS failure, because that means somebody else requested // a flush flush_state_.compare_exchange_strong(state, FLUSH_REQUESTED, std::memory_order_relaxed, std::memory_order_relaxed); } }
// If arena still have room for new block allocation, we can safely say it // shouldn't flush. auto allocated_memory = table_->ApproximateMemoryUsage() + range_del_table_->ApproximateMemoryUsage() + arena_.MemoryAllocatedBytes();
// if we can still allocate one more block without exceeding the // over-allocation ratio, then we should not flush. if (allocated_memory + kArenaBlockSize < write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) { returnfalse; }
// if user keeps adding entries that exceeds write_buffer_size, we need to // flush earlier even though we still have much available memory left. if (allocated_memory > write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) { returntrue; }
voidCheckMemtableFull(){ if (flush_scheduler_ != nullptr) { auto* cfd = cf_mems_->current(); assert(cfd != nullptr); if (cfd->mem()->ShouldScheduleFlush() && cfd->mem()->MarkFlushScheduled()) { // MarkFlushScheduled only returns true if we are the one that // should take action, so no need to dedup further flush_scheduler_->ScheduleFlush(cfd); } } }
其中MarkFlushScheduled就是用来改变状态.
1 2 3 4 5 6
boolMarkFlushScheduled(){ auto before = FLUSH_REQUESTED; return flush_state_.compare_exchange_strong(before, FLUSH_SCHEDULED, std::memory_order_relaxed, std::memory_order_relaxed); }
void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) { #ifndef NDEBUG std::lock_guard<std::mutex> lock(checking_mutex_); assert(checking_set_.count(cfd) == 0); checking_set_.insert(cfd); #endif// NDEBUG cfd->Ref(); // Suppress false positive clang analyzer warnings. #ifndef __clang_analyzer__ Node* node = new Node{cfd, head_.load(std::memory_order_relaxed)}; while (!head_.compare_exchange_strong( node->next, node, std::memory_order_relaxed, std::memory_order_relaxed)) { // failing CAS updates the first param, so we are already set for // retry. TakeNextColumnFamily won't happen until after another // inter-thread synchronization, so we don't even need release // semantics for this CAS } #endif// __clang_analyzer__ }
Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta) { ........................................... // This will release and re-acquire the mutex. Status s = WriteLevel0Table();
if (s.ok() && (shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) { s = Status::ShutdownInProgress( "Database shutdown or Column family drop during flush"); }
if (!s.ok()) { cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber()); } else { TEST_SYNC_POINT("FlushJob::InstallResults"); // Replace immutable memtable with the generated Table s = cfd_->imm()->InstallMemtableFlushResults( cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_, meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_, log_buffer_); } ........................................................ }