MANIFEST = { CURRENT, MANIFEST-<seq-no>* } CURRENT = File pointer to the latest manifest log MANIFEST-<seq no> = Contains snapshot of RocksDB state and subsequent modifications
friend class Version; ................................. // Opened lazily unique_ptr<log::Writer> descriptor_log_; // generates a increasing version number for every new version uint64_t current_version_number_;
// Queue of writers to the manifest file std::deque<ManifestWriter*> manifest_writers_; ..........................................
// this is used to batch writes to the manifest file struct VersionSet::ManifestWriter { Status status; bool done; InstrumentedCondVar cv; ColumnFamilyData* cfd; const autovector<VersionEdit*>& edit_list;
autovector<VersionEdit*> batch_edits; .................................... if (w.edit_list.front()->IsColumnFamilyManipulation()) { // no group commits for column family add or drop LogAndApplyCFHelper(w.edit_list.front()); batch_edits.push_back(w.edit_list.front()); } else { v = new Version(column_family_data, this, current_version_number_++); ........................................................ for (const auto& writer : manifest_writers_) { if (writer->edit_list.front()->IsColumnFamilyManipulation() || writer->cfd->GetID() != column_family_data->GetID()) { break; } last_writer = writer; for (const auto& edit : writer->edit_list) { ........................................... batch_edits.push_back(edit); } } builder->SaveTo(v->storage_info()); }
if (new_descriptor_log) { // if we're writing out new snapshot make sure to persist max column family if (column_family_set_->GetMaxColumnFamily() > 0) { w.edit_list.front()->SetMaxColumnFamily( column_family_set_->GetMaxColumnFamily()); } }
unique_ptr<WritableFileWriter> file_writer( new WritableFileWriter(std::move(descriptor_file), opt_env_opts)); descriptor_log_.reset( new log::Writer(std::move(file_writer), 0, false)); s = WriteSnapshot(descriptor_log_.get()); } }
for (auto& e : batch_edits) { std::string record; if (!e->EncodeTo(&record)) { s = Status::Corruption( "Unable to Encode VersionEdit:" + e->DebugString(true)); break; } TEST_KILL_RANDOM("VersionSet::LogAndApply:BeforeAddRecord", rocksdb_kill_odds * REDUCE_ODDS2); s = descriptor_log_->AddRecord(record); if (!s.ok()) { break; } } if (s.ok()) { s = SyncManifest(env_, db_options_, descriptor_log_->file()); } ............................. // If we just created a new descriptor file, install it by writing a // new CURRENT file that points to it. if (s.ok() && new_descriptor_log) { s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_, db_directory); }
CURRENT文件更新完毕之后,就可以删除老的mainfest文件了.
1 2 3 4 5 6
// Append the old mainfest file to the obsolete_manifests_ list to be deleted // by PurgeObsoleteFiles later. if (s.ok() && new_descriptor_log) { obsolete_manifests_.emplace_back( DescriptorFileName("", manifest_file_number_)); }
最后则是更新manifest_writers_队列,唤醒之前阻塞的内容.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// wake up all the waiting writers while (true) { ManifestWriter* ready = manifest_writers_.front(); manifest_writers_.pop_front(); if (ready != &w) { ready->status = s; ready->done = true; ready->cv.Signal(); } if (ready == last_writer) break; } // Notify new head of write queue if (!manifest_writers_.empty()) { manifest_writers_.front()->cv.Signal(); }
struct FileMetaData { FileDescriptor fd; InternalKey smallest; // Smallest internal key served by table InternalKey largest; // Largest internal key served by table SequenceNumber smallest_seqno; // The smallest seqno in this file SequenceNumber largest_seqno; // The largest seqno in this file
......................................... // File size compensated by deletion entry. // This is updated in Version::UpdateAccumulatedStats() first time when the // file is created or loaded. After it is updated (!= 0), it is immutable. uint64_t compensated_file_size; // These values can mutate, but they can only be read or written from // single-threaded LogAndApply thread uint64_t num_entries; // the number of entries. uint64_t num_deletions; // the number of deletion entries. uint64_t raw_key_size; // total uncompressed key size. uint64_t raw_value_size; // total uncompressed value size.
int refs; // Reference count
bool being_compacted; // Is this file undergoing compaction? bool init_stats_from_file; // true if the data-entry stats of this file // has initialized from file.
bool marked_for_compaction; // True if client asked us nicely to compact this // file. };