LCOV - code coverage report
Current view: top level - src/leveldb/db - db_impl.cc (source / functions) Coverage Total Hit
Test: test_bitcoin_coverage.info Lines: 71.8 % 946 679
Test Date: 2026-02-04 04:43:42 Functions: 71.2 % 52 37
Branches: 41.0 % 1365 559

             Branch data     Line data    Source code
       1                 :             : // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
       2                 :             : // Use of this source code is governed by a BSD-style license that can be
       3                 :             : // found in the LICENSE file. See the AUTHORS file for names of contributors.
       4                 :             : 
       5                 :             : #include "db/db_impl.h"
       6                 :             : 
       7                 :             : #include <stdint.h>
       8                 :             : #include <stdio.h>
       9                 :             : 
      10                 :             : #include <algorithm>
      11                 :             : #include <atomic>
      12                 :             : #include <set>
      13                 :             : #include <string>
      14                 :             : #include <vector>
      15                 :             : 
      16                 :             : #include "db/builder.h"
      17                 :             : #include "db/db_iter.h"
      18                 :             : #include "db/dbformat.h"
      19                 :             : #include "db/filename.h"
      20                 :             : #include "db/log_reader.h"
      21                 :             : #include "db/log_writer.h"
      22                 :             : #include "db/memtable.h"
      23                 :             : #include "db/table_cache.h"
      24                 :             : #include "db/version_set.h"
      25                 :             : #include "db/write_batch_internal.h"
      26                 :             : #include "leveldb/db.h"
      27                 :             : #include "leveldb/env.h"
      28                 :             : #include "leveldb/status.h"
      29                 :             : #include "leveldb/table.h"
      30                 :             : #include "leveldb/table_builder.h"
      31                 :             : #include "port/port.h"
      32                 :             : #include "table/block.h"
      33                 :             : #include "table/merger.h"
      34                 :             : #include "table/two_level_iterator.h"
      35                 :             : #include "util/coding.h"
      36                 :             : #include "util/logging.h"
      37                 :             : #include "util/mutexlock.h"
      38                 :             : 
      39                 :             : namespace leveldb {
      40                 :             : 
      41                 :             : const int kNumNonTableCacheFiles = 10;
      42                 :             : 
      43                 :             : // Information kept for every waiting writer
      44                 :             : struct DBImpl::Writer {
      45                 :        1391 :   explicit Writer(port::Mutex* mu)
      46                 :        1391 :       : batch(nullptr), sync(false), done(false), cv(mu) {}
      47                 :             : 
      48                 :             :   Status status;
      49                 :             :   WriteBatch* batch;
      50                 :             :   bool sync;
      51                 :             :   bool done;
      52                 :             :   port::CondVar cv;
      53                 :             : };
      54                 :             : 
      55                 :          12 : struct DBImpl::CompactionState {
      56                 :             :   // Files produced by compaction
      57                 :          24 :   struct Output {
      58                 :             :     uint64_t number;
      59                 :             :     uint64_t file_size;
      60                 :             :     InternalKey smallest, largest;
      61                 :             :   };
      62                 :             : 
      63   [ +  -  +  -  :        1435 :   Output* current_output() { return &outputs[outputs.size() - 1]; }
             -  +  +  - ]
      64                 :             : 
      65                 :          12 :   explicit CompactionState(Compaction* c)
      66                 :          12 :       : compaction(c),
      67                 :          12 :         smallest_snapshot(0),
      68                 :          12 :         outfile(nullptr),
      69                 :          12 :         builder(nullptr),
      70                 :          12 :         total_bytes(0) {}
      71                 :             : 
      72                 :             :   Compaction* const compaction;
      73                 :             : 
      74                 :             :   // Sequence numbers < smallest_snapshot are not significant since we
      75                 :             :   // will never have to service a snapshot below smallest_snapshot.
      76                 :             :   // Therefore if we have seen a sequence number S <= smallest_snapshot,
      77                 :             :   // we can drop all entries for the same key with sequence numbers < S.
      78                 :             :   SequenceNumber smallest_snapshot;
      79                 :             : 
      80                 :             :   std::vector<Output> outputs;
      81                 :             : 
      82                 :             :   // State kept for output being generated
      83                 :             :   WritableFile* outfile;
      84                 :             :   TableBuilder* builder;
      85                 :             : 
      86                 :             :   uint64_t total_bytes;
      87                 :             : };
      88                 :             : 
      89                 :             : // Fix user-supplied options to be reasonable
      90                 :             : template <class T, class V>
      91                 :        1956 : static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
      92         [ -  + ]:        1956 :   if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
      93         [ +  + ]:        1956 :   if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
      94                 :        1956 : }
      95                 :         489 : Options SanitizeOptions(const std::string& dbname,
      96                 :             :                         const InternalKeyComparator* icmp,
      97                 :             :                         const InternalFilterPolicy* ipolicy,
      98                 :             :                         const Options& src) {
      99                 :         489 :   Options result = src;
     100                 :         489 :   result.comparator = icmp;
     101         [ -  + ]:         489 :   result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr;
     102                 :         489 :   ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000);
     103                 :         489 :   ClipToRange(&result.write_buffer_size, 64 << 10, 1 << 30);
     104                 :         489 :   ClipToRange(&result.max_file_size, 1 << 20, 1 << 30);
     105                 :         489 :   ClipToRange(&result.block_size, 1 << 10, 4 << 20);
     106         [ -  + ]:         489 :   if (result.info_log == nullptr) {
     107                 :             :     // Open a log file in the same directory as the db
     108         [ #  # ]:           0 :     src.env->CreateDir(dbname);  // In case it does not exist
     109   [ #  #  #  # ]:           0 :     src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname));
     110         [ #  # ]:           0 :     Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log);
     111         [ #  # ]:           0 :     if (!s.ok()) {
     112                 :             :       // No place suitable for logging
     113                 :           0 :       result.info_log = nullptr;
     114                 :             :     }
     115                 :           0 :   }
     116         [ -  + ]:         489 :   if (result.block_cache == nullptr) {
     117                 :           0 :     result.block_cache = NewLRUCache(8 << 20);
     118                 :             :   }
     119                 :         489 :   return result;
     120                 :             : }
     121                 :             : 
     122                 :         489 : static int TableCacheSize(const Options& sanitized_options) {
     123                 :             :   // Reserve ten files or so for other uses and give the rest to TableCache.
     124                 :         489 :   return sanitized_options.max_open_files - kNumNonTableCacheFiles;
     125                 :             : }
     126                 :             : 
     127                 :         489 : DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
     128                 :         489 :     : env_(raw_options.env),
     129         [ +  - ]:         489 :       internal_comparator_(raw_options.comparator),
     130         [ +  - ]:         489 :       internal_filter_policy_(raw_options.filter_policy),
     131         [ +  - ]:         489 :       options_(SanitizeOptions(dbname, &internal_comparator_,
     132                 :             :                                &internal_filter_policy_, raw_options)),
     133                 :         489 :       owns_info_log_(options_.info_log != raw_options.info_log),
     134                 :         489 :       owns_cache_(options_.block_cache != raw_options.block_cache),
     135         [ -  + ]:         489 :       dbname_(dbname),
     136   [ +  -  +  - ]:         489 :       table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))),
     137                 :         489 :       db_lock_(nullptr),
     138                 :         489 :       shutting_down_(false),
     139                 :         489 :       background_work_finished_signal_(&mutex_),
     140                 :         489 :       mem_(nullptr),
     141                 :         489 :       imm_(nullptr),
     142         [ +  - ]:         489 :       has_imm_(false),
     143                 :         489 :       logfile_(nullptr),
     144                 :         489 :       logfile_number_(0),
     145                 :         489 :       log_(nullptr),
     146                 :         489 :       seed_(0),
     147         [ +  - ]:         489 :       tmp_batch_(new WriteBatch),
     148                 :         489 :       background_compaction_scheduled_(false),
     149         [ +  - ]:         489 :       manual_compaction_(nullptr),
     150                 :         978 :       versions_(new VersionSet(dbname_, &options_, table_cache_,
     151   [ +  -  +  -  :        4890 :                                &internal_comparator_)) {}
          +  -  +  -  +  
                -  +  + ]
     152                 :             : 
     153                 :        2445 : DBImpl::~DBImpl() {
     154                 :             :   // Wait for background work to finish.
     155                 :         489 :   mutex_.Lock();
     156                 :         489 :   shutting_down_.store(true, std::memory_order_release);
     157         [ +  + ]:         491 :   while (background_compaction_scheduled_) {
     158                 :           2 :     background_work_finished_signal_.Wait();
     159                 :             :   }
     160                 :         489 :   mutex_.Unlock();
     161                 :             : 
     162         [ +  - ]:         489 :   if (db_lock_ != nullptr) {
     163         [ -  + ]:         489 :     env_->UnlockFile(db_lock_);
     164                 :             :   }
     165                 :             : 
     166         [ +  - ]:         489 :   delete versions_;
     167         [ +  - ]:         489 :   if (mem_ != nullptr) mem_->Unref();
     168         [ -  + ]:         489 :   if (imm_ != nullptr) imm_->Unref();
     169         [ +  - ]:         489 :   delete tmp_batch_;
     170         [ +  - ]:         489 :   delete log_;
     171         [ +  - ]:         489 :   delete logfile_;
     172         [ +  - ]:         489 :   delete table_cache_;
     173                 :             : 
     174         [ -  + ]:         489 :   if (owns_info_log_) {
     175         [ #  # ]:           0 :     delete options_.info_log;
     176                 :             :   }
     177         [ -  + ]:         489 :   if (owns_cache_) {
     178         [ #  # ]:           0 :     delete options_.block_cache;
     179                 :             :   }
     180         [ +  + ]:         979 : }
     181                 :             : 
     182                 :         421 : Status DBImpl::NewDB() {
     183                 :         421 :   VersionEdit new_db;
     184   [ +  -  +  - ]:         421 :   new_db.SetComparatorName(user_comparator()->Name());
     185         [ +  - ]:         421 :   new_db.SetLogNumber(0);
     186         [ +  - ]:         421 :   new_db.SetNextFile(2);
     187         [ +  - ]:         421 :   new_db.SetLastSequence(0);
     188                 :             : 
     189         [ +  - ]:         421 :   const std::string manifest = DescriptorFileName(dbname_, 1);
     190                 :         421 :   WritableFile* file;
     191         [ +  - ]:         421 :   Status s = env_->NewWritableFile(manifest, &file);
     192         [ +  - ]:         421 :   if (!s.ok()) {
     193                 :             :     return s;
     194                 :             :   }
     195                 :         421 :   {
     196         [ +  - ]:         421 :     log::Writer log(file);
     197         [ +  - ]:         421 :     std::string record;
     198         [ +  - ]:         421 :     new_db.EncodeTo(&record);
     199   [ -  +  +  -  :         421 :     s = log.AddRecord(record);
                   -  + ]
     200         [ +  - ]:         421 :     if (s.ok()) {
     201   [ +  -  -  + ]:         421 :       s = file->Close();
     202                 :             :     }
     203                 :         421 :   }
     204         [ +  - ]:         421 :   delete file;
     205         [ +  - ]:         421 :   if (s.ok()) {
     206                 :             :     // Make "CURRENT" file that points to the new manifest file.
     207   [ +  -  -  + ]:         421 :     s = SetCurrentFile(env_, dbname_, 1);
     208                 :             :   } else {
     209         [ #  # ]:           0 :     env_->DeleteFile(manifest);
     210                 :             :   }
     211                 :             :   return s;
     212                 :         421 : }
     213                 :             : 
     214                 :         255 : void DBImpl::MaybeIgnoreError(Status* s) const {
     215   [ -  +  -  - ]:         255 :   if (s->ok() || options_.paranoid_checks) {
     216                 :             :     // No change needed
     217                 :             :   } else {
     218         [ #  # ]:           0 :     Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
     219         [ #  # ]:           0 :     *s = Status::OK();
     220                 :             :   }
     221                 :         255 : }
     222                 :             : 
     223                 :         502 : void DBImpl::DeleteObsoleteFiles() {
     224                 :         502 :   mutex_.AssertHeld();
     225                 :             : 
     226         [ +  + ]:         502 :   if (!bg_error_.ok()) {
     227                 :             :     // After a background error, we don't know whether a new version may
     228                 :             :     // or may not have been committed, so we cannot safely garbage collect.
     229                 :             :     return;
     230                 :             :   }
     231                 :             : 
     232                 :             :   // Make a set of all of the live files
     233                 :         501 :   std::set<uint64_t> live = pending_outputs_;
     234         [ +  - ]:         501 :   versions_->AddLiveFiles(&live);
     235                 :             : 
     236                 :         501 :   std::vector<std::string> filenames;
     237         [ +  - ]:         501 :   env_->GetChildren(dbname_, &filenames);  // Ignoring errors on purpose
     238                 :         501 :   uint64_t number;
     239                 :         501 :   FileType type;
     240                 :         501 :   std::vector<std::string> files_to_delete;
     241         [ +  + ]:        3167 :   for (std::string& filename : filenames) {
     242   [ +  -  +  + ]:        2666 :     if (ParseFileName(filename, &number, &type)) {
     243                 :        2400 :       bool keep = true;
     244   [ +  +  +  -  :        2400 :       switch (type) {
                      + ]
     245                 :         570 :         case kLogFile:
     246   [ +  +  +  - ]:         570 :           keep = ((number >= versions_->LogNumber()) ||
     247         [ +  - ]:          69 :                   (number == versions_->PrevLogNumber()));
     248                 :             :           break;
     249                 :         990 :         case kDescriptorFile:
     250                 :             :           // Keep my manifest file, and any newer incarnations'
     251                 :             :           // (in case there is a race that allows other incarnations)
     252                 :         990 :           keep = (number >= versions_->ManifestFileNumber());
     253                 :         990 :           break;
     254                 :         213 :         case kTableFile:
     255                 :         213 :           keep = (live.find(number) != live.end());
     256                 :         213 :           break;
     257                 :           0 :         case kTempFile:
     258                 :             :           // Any temp files that are currently being written to must
     259                 :             :           // be recorded in pending_outputs_, which is inserted into "live"
     260                 :           0 :           keep = (live.find(number) != live.end());
     261                 :           0 :           break;
     262                 :             :         case kCurrentFile:
     263                 :             :         case kDBLockFile:
     264                 :             :         case kInfoLogFile:
     265                 :             :           keep = true;
     266                 :             :           break;
     267                 :             :       }
     268                 :             : 
     269         [ +  + ]:        1203 :       if (!keep) {
     270         [ +  - ]:         603 :         files_to_delete.push_back(std::move(filename));
     271         [ +  + ]:         603 :         if (type == kTableFile) {
     272         [ +  - ]:          45 :           table_cache_->Evict(number);
     273                 :             :         }
     274         [ +  - ]:         603 :         Log(options_.info_log, "Delete type=%d #%lld\n", static_cast<int>(type),
     275                 :             :             static_cast<unsigned long long>(number));
     276                 :             :       }
     277                 :             :     }
     278                 :             :   }
     279                 :             : 
     280                 :             :   // While deleting all files unblock other threads. All files being deleted
     281                 :             :   // have unique names which will not collide with newly created files and
     282                 :             :   // are therefore safe to delete while allowing other threads to proceed.
     283                 :         501 :   mutex_.Unlock();
     284         [ +  + ]:        1104 :   for (const std::string& filename : files_to_delete) {
     285   [ +  -  +  -  :        1206 :     env_->DeleteFile(dbname_ + "/" + filename);
                   +  - ]
     286                 :             :   }
     287         [ +  - ]:         501 :   mutex_.Lock();
     288                 :         501 : }
     289                 :             : 
     290                 :         489 : Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
     291                 :         489 :   mutex_.AssertHeld();
     292                 :             : 
     293                 :             :   // Ignore error from CreateDir since the creation of the DB is
     294                 :             :   // committed only when the descriptor is created, and this directory
     295                 :             :   // may already exist from a previous failed creation attempt.
     296         [ +  + ]:         489 :   env_->CreateDir(dbname_);
     297         [ -  + ]:         489 :   assert(db_lock_ == nullptr);
     298         [ +  - ]:         489 :   Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
     299         [ -  + ]:         489 :   if (!s.ok()) {
     300                 :           0 :     return s;
     301                 :             :   }
     302                 :             : 
     303   [ +  -  +  -  :         489 :   if (!env_->FileExists(CurrentFileName(dbname_))) {
                   +  + ]
     304         [ +  - ]:         421 :     if (options_.create_if_missing) {
     305   [ +  -  -  + ]:         421 :       s = NewDB();
     306         [ -  + ]:         421 :       if (!s.ok()) {
     307                 :           0 :         return s;
     308                 :             :       }
     309                 :             :     } else {
     310         [ #  # ]:           0 :       return Status::InvalidArgument(
     311   [ #  #  #  # ]:           0 :           dbname_, "does not exist (create_if_missing is false)");
     312                 :             :     }
     313                 :             :   } else {
     314         [ -  + ]:          68 :     if (options_.error_if_exists) {
     315   [ #  #  #  # ]:           0 :       return Status::InvalidArgument(dbname_,
     316   [ -  -  -  + ]:         489 :                                      "exists (error_if_exists is true)");
     317                 :             :     }
     318                 :             :   }
     319                 :             : 
     320   [ +  -  -  + ]:         489 :   s = versions_->Recover(save_manifest);
     321         [ -  + ]:         489 :   if (!s.ok()) {
     322                 :           0 :     return s;
     323                 :             :   }
     324                 :         489 :   SequenceNumber max_sequence(0);
     325                 :             : 
     326                 :             :   // Recover from all newer log files than the ones named in the
     327                 :             :   // descriptor (new log files may have been added by the previous
     328                 :             :   // incarnation without registering them in the descriptor).
     329                 :             :   //
     330                 :             :   // Note that PrevLogNumber() is no longer used, but we pay
     331                 :             :   // attention to it in case we are recovering a database
     332                 :             :   // produced by an older version of leveldb.
     333         [ +  - ]:         489 :   const uint64_t min_log = versions_->LogNumber();
     334         [ +  - ]:         489 :   const uint64_t prev_log = versions_->PrevLogNumber();
     335                 :         489 :   std::vector<std::string> filenames;
     336   [ +  -  -  + ]:         489 :   s = env_->GetChildren(dbname_, &filenames);
     337         [ -  + ]:         489 :   if (!s.ok()) {
     338                 :           0 :     return s;
     339                 :             :   }
     340         [ +  - ]:         489 :   std::set<uint64_t> expected;
     341         [ +  - ]:         489 :   versions_->AddLiveFiles(&expected);
     342                 :         489 :   uint64_t number;
     343                 :         489 :   FileType type;
     344                 :         489 :   std::vector<uint64_t> logs;
     345   [ -  +  +  + ]:        1989 :   for (size_t i = 0; i < filenames.size(); i++) {
     346   [ +  -  +  + ]:        1500 :     if (ParseFileName(filenames[i], &number, &type)) {
     347                 :        1257 :       expected.erase(number);
     348   [ +  +  -  +  :        1257 :       if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
                   -  - ]
     349         [ +  - ]:          68 :         logs.push_back(number);
     350                 :             :     }
     351                 :             :   }
     352         [ -  + ]:         489 :   if (!expected.empty()) {
     353                 :           0 :     char buf[50];
     354         [ #  # ]:           0 :     snprintf(buf, sizeof(buf), "%d missing files; e.g.",
     355         [ #  # ]:           0 :              static_cast<int>(expected.size()));
     356   [ #  #  #  # ]:           0 :     return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
     357                 :             :   }
     358                 :             : 
     359                 :             :   // Recover in the order in which the logs were generated
     360                 :         489 :   std::sort(logs.begin(), logs.end());
     361   [ -  +  +  + ]:         557 :   for (size_t i = 0; i < logs.size(); i++) {
     362         [ +  - ]:          68 :     s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
     363         [ -  + ]:          68 :                        &max_sequence);
     364         [ -  + ]:          68 :     if (!s.ok()) {
     365                 :           0 :       return s;
     366                 :             :     }
     367                 :             : 
     368                 :             :     // The previous incarnation may not have written any MANIFEST
     369                 :             :     // records after allocating this log number.  So we manually
     370                 :             :     // update the file number allocation counter in VersionSet.
     371         [ +  - ]:          68 :     versions_->MarkFileNumberUsed(logs[i]);
     372                 :             :   }
     373                 :             : 
     374         [ +  + ]:         489 :   if (versions_->LastSequence() < max_sequence) {
     375                 :          59 :     versions_->SetLastSequence(max_sequence);
     376                 :             :   }
     377                 :             : 
     378                 :         489 :   return Status::OK();
     379                 :         978 : }
     380                 :             : 
     381                 :          68 : Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
     382                 :             :                               bool* save_manifest, VersionEdit* edit,
     383                 :             :                               SequenceNumber* max_sequence) {
     384                 :           0 :   struct LogReporter : public log::Reader::Reporter {
     385                 :             :     Env* env;
     386                 :             :     Logger* info_log;
     387                 :             :     const char* fname;
     388                 :             :     Status* status;  // null if options_.paranoid_checks==false
     389                 :           0 :     void Corruption(size_t bytes, const Status& s) override {
     390         [ #  # ]:           0 :       Log(info_log, "%s%s: dropping %d bytes; %s",
     391         [ #  # ]:           0 :           (this->status == nullptr ? "(ignoring error) " : ""), fname,
     392         [ #  # ]:           0 :           static_cast<int>(bytes), s.ToString().c_str());
     393   [ #  #  #  # ]:           0 :       if (this->status != nullptr && this->status->ok()) *this->status = s;
     394                 :           0 :     }
     395                 :             :   };
     396                 :             : 
     397                 :          68 :   mutex_.AssertHeld();
     398                 :             : 
     399                 :             :   // Open the log file
     400                 :          68 :   std::string fname = LogFileName(dbname_, log_number);
     401                 :          68 :   SequentialFile* file;
     402         [ +  - ]:          68 :   Status status = env_->NewSequentialFile(fname, &file);
     403         [ -  + ]:          68 :   if (!status.ok()) {
     404         [ #  # ]:           0 :     MaybeIgnoreError(&status);
     405                 :             :     return status;
     406                 :             :   }
     407                 :             : 
     408                 :             :   // Create the log reader.
     409         [ -  + ]:          68 :   LogReporter reporter;
     410                 :          68 :   reporter.env = env_;
     411                 :          68 :   reporter.info_log = options_.info_log;
     412         [ -  + ]:          68 :   reporter.fname = fname.c_str();
     413         [ -  + ]:          68 :   reporter.status = (options_.paranoid_checks ? &status : nullptr);
     414                 :             :   // We intentionally make log::Reader do checksumming even if
     415                 :             :   // paranoid_checks==false so that corruptions cause entire commits
     416                 :             :   // to be skipped instead of propagating bad information (like overly
     417                 :             :   // large sequence numbers).
     418         [ +  - ]:          68 :   log::Reader reader(file, &reporter, true /*checksum*/, 0 /*initial_offset*/);
     419         [ +  - ]:          68 :   Log(options_.info_log, "Recovering log #%llu",
     420                 :             :       (unsigned long long)log_number);
     421                 :             : 
     422                 :             :   // Read all the records and add to a memtable
     423         [ +  - ]:          68 :   std::string scratch;
     424         [ +  - ]:          68 :   Slice record;
     425         [ +  - ]:          68 :   WriteBatch batch;
     426                 :             :   int compactions = 0;
     427                 :             :   MemTable* mem = nullptr;
     428   [ +  -  +  +  :         323 :   while (reader.ReadRecord(&record, &scratch) && status.ok()) {
                   +  - ]
     429         [ -  + ]:         255 :     if (record.size() < 12) {
     430   [ #  #  #  # ]:           0 :       reporter.Corruption(record.size(),
     431   [ #  #  #  # ]:           0 :                           Status::Corruption("log record too small", fname));
     432                 :           0 :       continue;
     433                 :             :     }
     434         [ +  - ]:         255 :     WriteBatchInternal::SetContents(&batch, record);
     435                 :             : 
     436         [ +  + ]:         255 :     if (mem == nullptr) {
     437   [ +  -  +  - ]:          61 :       mem = new MemTable(internal_comparator_);
     438                 :          61 :       mem->Ref();
     439                 :             :     }
     440   [ +  -  -  + ]:         255 :     status = WriteBatchInternal::InsertInto(&batch, mem);
     441         [ +  - ]:         255 :     MaybeIgnoreError(&status);
     442         [ +  - ]:         255 :     if (!status.ok()) {
     443                 :             :       break;
     444                 :             :     }
     445         [ +  - ]:         255 :     const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) +
     446         [ +  - ]:         255 :                                     WriteBatchInternal::Count(&batch) - 1;
     447         [ +  - ]:         255 :     if (last_seq > *max_sequence) {
     448                 :         255 :       *max_sequence = last_seq;
     449                 :             :     }
     450                 :             : 
     451   [ +  -  -  + ]:         255 :     if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
     452                 :           0 :       compactions++;
     453                 :           0 :       *save_manifest = true;
     454   [ #  #  #  # ]:           0 :       status = WriteLevel0Table(mem, edit, nullptr);
     455                 :           0 :       mem->Unref();
     456                 :           0 :       mem = nullptr;
     457         [ #  # ]:           0 :       if (!status.ok()) {
     458                 :             :         // Reflect errors immediately so that conditions like full
     459                 :             :         // file-systems cause the DB::Open() to fail.
     460                 :             :         break;
     461                 :             :       }
     462                 :             :     }
     463                 :             :   }
     464                 :             : 
     465         [ +  - ]:          68 :   delete file;
     466                 :             : 
     467                 :             :   // See if we should keep reusing the last log file.
     468   [ +  -  -  +  :          68 :   if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
                   -  - ]
     469         [ #  # ]:           0 :     assert(logfile_ == nullptr);
     470         [ #  # ]:           0 :     assert(log_ == nullptr);
     471         [ #  # ]:           0 :     assert(mem_ == nullptr);
     472                 :           0 :     uint64_t lfile_size;
     473   [ #  #  #  #  :           0 :     if (env_->GetFileSize(fname, &lfile_size).ok() &&
          #  #  #  #  #  
                      # ]
     474   [ #  #  #  # ]:           0 :         env_->NewAppendableFile(fname, &logfile_).ok()) {
     475         [ #  # ]:           0 :       Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
     476   [ #  #  #  # ]:           0 :       log_ = new log::Writer(logfile_, lfile_size);
     477                 :           0 :       logfile_number_ = log_number;
     478         [ #  # ]:           0 :       if (mem != nullptr) {
     479                 :           0 :         mem_ = mem;
     480                 :           0 :         mem = nullptr;
     481                 :             :       } else {
     482                 :             :         // mem can be nullptr if lognum exists but was empty.
     483   [ #  #  #  # ]:           0 :         mem_ = new MemTable(internal_comparator_);
     484                 :           0 :         mem_->Ref();
     485                 :             :       }
     486                 :             :     }
     487                 :             :   }
     488                 :             : 
     489         [ +  + ]:          68 :   if (mem != nullptr) {
     490                 :             :     // mem did not get reused; compact it.
     491         [ +  - ]:          61 :     if (status.ok()) {
     492                 :          61 :       *save_manifest = true;
     493   [ +  -  -  + ]:          61 :       status = WriteLevel0Table(mem, edit, nullptr);
     494                 :             :     }
     495                 :          61 :     mem->Unref();
     496                 :             :   }
     497                 :             : 
     498                 :          68 :   return status;
     499                 :         136 : }
     500                 :             : 
     501                 :          62 : Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
     502                 :             :                                 Version* base) {
     503                 :          62 :   mutex_.AssertHeld();
     504                 :          62 :   const uint64_t start_micros = env_->NowMicros();
     505         [ +  - ]:          62 :   FileMetaData meta;
     506         [ +  - ]:          62 :   meta.number = versions_->NewFileNumber();
     507         [ +  - ]:          62 :   pending_outputs_.insert(meta.number);
     508         [ +  - ]:          62 :   Iterator* iter = mem->NewIterator();
     509                 :          62 :   Log(options_.info_log, "Level-0 table #%llu: started",
     510         [ +  - ]:          62 :       (unsigned long long)meta.number);
     511                 :             : 
     512                 :          62 :   Status s;
     513                 :          62 :   {
     514                 :          62 :     mutex_.Unlock();
     515   [ +  -  -  + ]:          62 :     s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
     516         [ +  - ]:          62 :     mutex_.Lock();
     517                 :             :   }
     518                 :             : 
     519                 :         124 :   Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
     520         [ +  - ]:          62 :       (unsigned long long)meta.number, (unsigned long long)meta.file_size,
     521         [ +  - ]:          62 :       s.ToString().c_str());
     522         [ +  - ]:          62 :   delete iter;
     523                 :          62 :   pending_outputs_.erase(meta.number);
     524                 :             : 
     525                 :             :   // Note that if file_size is zero, the file has been deleted and
     526                 :             :   // should not be added to the manifest.
     527                 :          62 :   int level = 0;
     528   [ +  -  +  - ]:          62 :   if (s.ok() && meta.file_size > 0) {
     529         [ -  + ]:          62 :     const Slice min_user_key = meta.smallest.user_key();
     530         [ -  + ]:          62 :     const Slice max_user_key = meta.largest.user_key();
     531         [ +  + ]:          62 :     if (base != nullptr) {
     532         [ +  - ]:           1 :       level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
     533                 :             :     }
     534         [ +  - ]:          62 :     edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
     535                 :             :                   meta.largest);
     536                 :             :   }
     537                 :             : 
     538         [ +  - ]:          62 :   CompactionStats stats;
     539         [ +  - ]:          62 :   stats.micros = env_->NowMicros() - start_micros;
     540                 :          62 :   stats.bytes_written = meta.file_size;
     541                 :          62 :   stats_[level].Add(stats);
     542                 :          62 :   return s;
     543                 :          62 : }
     544                 :             : 
     545                 :           1 : void DBImpl::CompactMemTable() {
     546                 :           1 :   mutex_.AssertHeld();
     547         [ -  + ]:           1 :   assert(imm_ != nullptr);
     548                 :             : 
     549                 :             :   // Save the contents of the memtable as a new Table
     550                 :           1 :   VersionEdit edit;
     551         [ +  - ]:           1 :   Version* base = versions_->current();
     552         [ +  - ]:           1 :   base->Ref();
     553         [ +  - ]:           1 :   Status s = WriteLevel0Table(imm_, &edit, base);
     554         [ +  - ]:           1 :   base->Unref();
     555                 :             : 
     556   [ +  -  -  + ]:           1 :   if (s.ok() && shutting_down_.load(std::memory_order_acquire)) {
     557   [ #  #  #  # ]:           0 :     s = Status::IOError("Deleting DB during memtable compaction");
     558                 :             :   }
     559                 :             : 
     560                 :             :   // Replace immutable memtable with the generated Table
     561         [ +  - ]:           1 :   if (s.ok()) {
     562         [ +  - ]:           1 :     edit.SetPrevLogNumber(0);
     563         [ +  - ]:           1 :     edit.SetLogNumber(logfile_number_);  // Earlier logs no longer needed
     564   [ +  -  -  + ]:           1 :     s = versions_->LogAndApply(&edit, &mutex_);
     565                 :             :   }
     566                 :             : 
     567         [ +  - ]:           1 :   if (s.ok()) {
     568                 :             :     // Commit to the new state
     569                 :           1 :     imm_->Unref();
     570                 :           1 :     imm_ = nullptr;
     571         [ +  - ]:           1 :     has_imm_.store(false, std::memory_order_release);
     572         [ +  - ]:           1 :     DeleteObsoleteFiles();
     573                 :             :   } else {
     574         [ #  # ]:           0 :     RecordBackgroundError(s);
     575                 :             :   }
     576                 :           1 : }
     577                 :             : 
     578                 :           0 : void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
     579                 :           0 :   int max_level_with_files = 1;
     580                 :           0 :   {
     581                 :           0 :     MutexLock l(&mutex_);
     582                 :           0 :     Version* base = versions_->current();
     583         [ #  # ]:           0 :     for (int level = 1; level < config::kNumLevels; level++) {
     584   [ #  #  #  # ]:           0 :       if (base->OverlapInLevel(level, begin, end)) {
     585                 :           0 :         max_level_with_files = level;
     586                 :             :       }
     587                 :             :     }
     588                 :           0 :   }
     589         [ #  # ]:           0 :   TEST_CompactMemTable();  // TODO(sanjay): Skip if memtable does not overlap
     590         [ #  # ]:           0 :   for (int level = 0; level < max_level_with_files; level++) {
     591                 :           0 :     TEST_CompactRange(level, begin, end);
     592                 :             :   }
     593                 :           0 : }
     594                 :             : 
     595                 :           0 : void DBImpl::TEST_CompactRange(int level, const Slice* begin,
     596                 :             :                                const Slice* end) {
     597         [ #  # ]:           0 :   assert(level >= 0);
     598         [ #  # ]:           0 :   assert(level + 1 < config::kNumLevels);
     599                 :             : 
     600         [ #  # ]:           0 :   InternalKey begin_storage, end_storage;
     601                 :             : 
     602         [ #  # ]:           0 :   ManualCompaction manual;
     603                 :           0 :   manual.level = level;
     604                 :           0 :   manual.done = false;
     605         [ #  # ]:           0 :   if (begin == nullptr) {
     606                 :           0 :     manual.begin = nullptr;
     607                 :             :   } else {
     608         [ #  # ]:           0 :     begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
     609                 :           0 :     manual.begin = &begin_storage;
     610                 :             :   }
     611         [ #  # ]:           0 :   if (end == nullptr) {
     612                 :           0 :     manual.end = nullptr;
     613                 :             :   } else {
     614         [ #  # ]:           0 :     end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
     615                 :           0 :     manual.end = &end_storage;
     616                 :             :   }
     617                 :             : 
     618         [ #  # ]:           0 :   MutexLock l(&mutex_);
     619   [ #  #  #  #  :           0 :   while (!manual.done && !shutting_down_.load(std::memory_order_acquire) &&
                   #  # ]
     620         [ #  # ]:           0 :          bg_error_.ok()) {
     621         [ #  # ]:           0 :     if (manual_compaction_ == nullptr) {  // Idle
     622                 :           0 :       manual_compaction_ = &manual;
     623         [ #  # ]:           0 :       MaybeScheduleCompaction();
     624                 :             :     } else {  // Running either my compaction or another compaction.
     625         [ #  # ]:           0 :       background_work_finished_signal_.Wait();
     626                 :             :     }
     627                 :             :   }
     628         [ #  # ]:           0 :   if (manual_compaction_ == &manual) {
     629                 :             :     // Cancel my manual compaction since we aborted early for some reason.
     630                 :           0 :     manual_compaction_ = nullptr;
     631                 :             :   }
     632                 :           0 : }
     633                 :             : 
     634                 :           0 : Status DBImpl::TEST_CompactMemTable() {
     635                 :             :   // nullptr batch means just wait for earlier writes to be done
     636                 :           0 :   Status s = Write(WriteOptions(), nullptr);
     637         [ #  # ]:           0 :   if (s.ok()) {
     638                 :             :     // Wait until the compaction completes
     639         [ #  # ]:           0 :     MutexLock l(&mutex_);
     640   [ #  #  #  # ]:           0 :     while (imm_ != nullptr && bg_error_.ok()) {
     641         [ #  # ]:           0 :       background_work_finished_signal_.Wait();
     642                 :             :     }
     643         [ #  # ]:           0 :     if (imm_ != nullptr) {
     644         [ #  # ]:           0 :       s = bg_error_;
     645                 :             :     }
     646                 :           0 :   }
     647                 :           0 :   return s;
     648                 :           0 : }
     649                 :             : 
     650                 :           2 : void DBImpl::RecordBackgroundError(const Status& s) {
     651                 :           2 :   mutex_.AssertHeld();
     652         [ +  + ]:           2 :   if (bg_error_.ok()) {
     653                 :           1 :     bg_error_ = s;
     654                 :           1 :     background_work_finished_signal_.SignalAll();
     655                 :             :   }
     656                 :           2 : }
     657                 :             : 
     658                 :         509 : void DBImpl::MaybeScheduleCompaction() {
     659                 :         509 :   mutex_.AssertHeld();
     660         [ +  + ]:         509 :   if (background_compaction_scheduled_) {
     661                 :             :     // Already scheduled
     662         [ +  + ]:         505 :   } else if (shutting_down_.load(std::memory_order_acquire)) {
     663                 :             :     // DB is being deleted; no more background compactions
     664         [ +  - ]:         503 :   } else if (!bg_error_.ok()) {
     665                 :             :     // Already got an error; no more changes
     666   [ +  +  +  - ]:         503 :   } else if (imm_ == nullptr && manual_compaction_ == nullptr &&
     667         [ +  + ]:         502 :              !versions_->NeedsCompaction()) {
     668                 :             :     // No work to be done
     669                 :             :   } else {
     670                 :          13 :     background_compaction_scheduled_ = true;
     671                 :          13 :     env_->Schedule(&DBImpl::BGWork, this);
     672                 :             :   }
     673                 :         509 : }
     674                 :             : 
     675                 :          13 : void DBImpl::BGWork(void* db) {
     676                 :          13 :   reinterpret_cast<DBImpl*>(db)->BackgroundCall();
     677                 :          13 : }
     678                 :             : 
     679                 :          13 : void DBImpl::BackgroundCall() {
     680                 :          13 :   MutexLock l(&mutex_);
     681         [ -  + ]:          13 :   assert(background_compaction_scheduled_);
     682         [ +  - ]:          13 :   if (shutting_down_.load(std::memory_order_acquire)) {
     683                 :             :     // No more background work when shutting down.
     684         [ +  - ]:          13 :   } else if (!bg_error_.ok()) {
     685                 :             :     // No more background work after a background error.
     686                 :             :   } else {
     687         [ +  - ]:          13 :     BackgroundCompaction();
     688                 :             :   }
     689                 :             : 
     690                 :          13 :   background_compaction_scheduled_ = false;
     691                 :             : 
     692                 :             :   // Previous compaction may have produced too many files in a level,
     693                 :             :   // so reschedule another compaction if needed.
     694         [ +  - ]:          13 :   MaybeScheduleCompaction();
     695                 :          13 :   background_work_finished_signal_.SignalAll();
     696                 :          13 : }
     697                 :             : 
     698                 :          13 : void DBImpl::BackgroundCompaction() {
     699                 :          13 :   mutex_.AssertHeld();
     700                 :             : 
     701         [ +  + ]:          13 :   if (imm_ != nullptr) {
     702                 :           1 :     CompactMemTable();
     703                 :           1 :     return;
     704                 :             :   }
     705                 :             : 
     706                 :          12 :   Compaction* c;
     707                 :          12 :   bool is_manual = (manual_compaction_ != nullptr);
     708         [ -  + ]:          12 :   InternalKey manual_end;
     709         [ -  + ]:          12 :   if (is_manual) {
     710                 :           0 :     ManualCompaction* m = manual_compaction_;
     711         [ #  # ]:           0 :     c = versions_->CompactRange(m->level, m->begin, m->end);
     712                 :           0 :     m->done = (c == nullptr);
     713         [ #  # ]:           0 :     if (c != nullptr) {
     714   [ #  #  #  # ]:           0 :       manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
     715                 :             :     }
     716         [ #  # ]:           0 :     Log(options_.info_log,
     717                 :             :         "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
     718   [ #  #  #  #  :           0 :         m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
             #  #  #  # ]
     719   [ #  #  #  #  :           0 :         (m->end ? m->end->DebugString().c_str() : "(end)"),
             #  #  #  # ]
     720   [ #  #  #  #  :           0 :         (m->done ? "(end)" : manual_end.DebugString().c_str()));
             #  #  #  # ]
     721                 :             :   } else {
     722         [ +  - ]:          12 :     c = versions_->PickCompaction();
     723                 :             :   }
     724                 :             : 
     725         [ +  - ]:          12 :   Status status;
     726         [ +  - ]:          12 :   if (c == nullptr) {
     727                 :             :     // Nothing to do
     728   [ +  -  +  -  :          12 :   } else if (!is_manual && c->IsTrivialMove()) {
                   -  + ]
     729                 :             :     // Move file to next level
     730   [ #  #  #  # ]:           0 :     assert(c->num_input_files(0) == 1);
     731         [ #  # ]:           0 :     FileMetaData* f = c->input(0, 0);
     732         [ #  # ]:           0 :     c->edit()->DeleteFile(c->level(), f->number);
     733                 :           0 :     c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
     734         [ #  # ]:           0 :                        f->largest);
     735   [ #  #  #  # ]:           0 :     status = versions_->LogAndApply(c->edit(), &mutex_);
     736         [ #  # ]:           0 :     if (!status.ok()) {
     737         [ #  # ]:           0 :       RecordBackgroundError(status);
     738                 :             :     }
     739                 :           0 :     VersionSet::LevelSummaryStorage tmp;
     740                 :           0 :     Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
     741         [ #  # ]:           0 :         static_cast<unsigned long long>(f->number), c->level() + 1,
     742         [ #  # ]:           0 :         static_cast<unsigned long long>(f->file_size),
     743   [ #  #  #  # ]:           0 :         status.ToString().c_str(), versions_->LevelSummary(&tmp));
     744                 :             :   } else {
     745   [ +  -  +  - ]:          12 :     CompactionState* compact = new CompactionState(c);
     746   [ +  -  -  + ]:          12 :     status = DoCompactionWork(compact);
     747         [ +  + ]:          12 :     if (!status.ok()) {
     748         [ +  - ]:           1 :       RecordBackgroundError(status);
     749                 :             :     }
     750         [ +  - ]:          12 :     CleanupCompaction(compact);
     751         [ +  - ]:          12 :     c->ReleaseInputs();
     752         [ +  - ]:          12 :     DeleteObsoleteFiles();
     753                 :             :   }
     754                 :          12 :   delete c;
     755                 :             : 
     756         [ +  + ]:          12 :   if (status.ok()) {
     757                 :             :     // Done
     758         [ -  + ]:           1 :   } else if (shutting_down_.load(std::memory_order_acquire)) {
     759                 :             :     // Ignore compaction errors found during shutting down
     760                 :             :   } else {
     761   [ #  #  #  # ]:           0 :     Log(options_.info_log, "Compaction error: %s", status.ToString().c_str());
     762                 :             :   }
     763                 :             : 
     764         [ -  + ]:          12 :   if (is_manual) {
     765                 :           0 :     ManualCompaction* m = manual_compaction_;
     766         [ #  # ]:           0 :     if (!status.ok()) {
     767                 :           0 :       m->done = true;
     768                 :             :     }
     769         [ #  # ]:           0 :     if (!m->done) {
     770                 :             :       // We only compacted part of the requested range.  Update *m
     771                 :             :       // to the range that is left to be compacted.
     772         [ #  # ]:           0 :       m->tmp_storage = manual_end;
     773                 :           0 :       m->begin = &m->tmp_storage;
     774                 :             :     }
     775                 :           0 :     manual_compaction_ = nullptr;
     776                 :             :   }
     777                 :          12 : }
     778                 :             : 
     779                 :          12 : void DBImpl::CleanupCompaction(CompactionState* compact) {
     780                 :          12 :   mutex_.AssertHeld();
     781         [ +  + ]:          12 :   if (compact->builder != nullptr) {
     782                 :             :     // May happen if we get a shutdown call in the middle of compaction
     783                 :           1 :     compact->builder->Abandon();
     784         [ +  - ]:           1 :     delete compact->builder;
     785                 :             :   } else {
     786         [ -  + ]:          11 :     assert(compact->outfile == nullptr);
     787                 :             :   }
     788         [ +  + ]:          12 :   delete compact->outfile;
     789   [ -  +  +  + ]:          24 :   for (size_t i = 0; i < compact->outputs.size(); i++) {
     790                 :          12 :     const CompactionState::Output& out = compact->outputs[i];
     791                 :          12 :     pending_outputs_.erase(out.number);
     792                 :             :   }
     793         [ +  - ]:          24 :   delete compact;
     794                 :          12 : }
     795                 :             : 
     796                 :          12 : Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
     797         [ -  + ]:          12 :   assert(compact != nullptr);
     798         [ -  + ]:          12 :   assert(compact->builder == nullptr);
     799                 :          12 :   uint64_t file_number;
     800                 :          12 :   {
     801                 :          12 :     mutex_.Lock();
     802                 :          12 :     file_number = versions_->NewFileNumber();
     803                 :          12 :     pending_outputs_.insert(file_number);
     804         [ +  - ]:          12 :     CompactionState::Output out;
     805                 :          12 :     out.number = file_number;
     806                 :          12 :     out.file_size = 0;
     807         [ +  - ]:          12 :     out.smallest.Clear();
     808                 :          12 :     out.largest.Clear();
     809         [ +  - ]:          12 :     compact->outputs.push_back(out);
     810                 :          12 :     mutex_.Unlock();
     811                 :           0 :   }
     812                 :             : 
     813                 :             :   // Make the output file
     814                 :          12 :   std::string fname = TableFileName(dbname_, file_number);
     815         [ +  - ]:          12 :   Status s = env_->NewWritableFile(fname, &compact->outfile);
     816         [ +  - ]:          12 :   if (s.ok()) {
     817   [ +  -  +  - ]:          12 :     compact->builder = new TableBuilder(options_, compact->outfile);
     818                 :             :   }
     819                 :          12 :   return s;
     820                 :          12 : }
     821                 :             : 
     822                 :          11 : Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
     823                 :             :                                           Iterator* input) {
     824         [ -  + ]:          11 :   assert(compact != nullptr);
     825         [ -  + ]:          11 :   assert(compact->outfile != nullptr);
     826         [ -  + ]:          11 :   assert(compact->builder != nullptr);
     827                 :             : 
     828         [ -  + ]:          11 :   const uint64_t output_number = compact->current_output()->number;
     829         [ -  + ]:          11 :   assert(output_number != 0);
     830                 :             : 
     831                 :             :   // Check for iterator errors
     832                 :          11 :   Status s = input->status();
     833         [ +  - ]:          11 :   const uint64_t current_entries = compact->builder->NumEntries();
     834         [ +  - ]:          11 :   if (s.ok()) {
     835   [ +  -  -  + ]:          11 :     s = compact->builder->Finish();
     836                 :             :   } else {
     837         [ #  # ]:           0 :     compact->builder->Abandon();
     838                 :             :   }
     839         [ +  - ]:          11 :   const uint64_t current_bytes = compact->builder->FileSize();
     840         [ -  + ]:          11 :   compact->current_output()->file_size = current_bytes;
     841                 :          11 :   compact->total_bytes += current_bytes;
     842         [ +  - ]:          11 :   delete compact->builder;
     843                 :          11 :   compact->builder = nullptr;
     844                 :             : 
     845                 :             :   // Finish and check for file errors
     846         [ +  - ]:          11 :   if (s.ok()) {
     847   [ +  -  -  + ]:          11 :     s = compact->outfile->Sync();
     848                 :             :   }
     849         [ +  - ]:          11 :   if (s.ok()) {
     850   [ +  -  -  + ]:          11 :     s = compact->outfile->Close();
     851                 :             :   }
     852         [ +  - ]:          11 :   delete compact->outfile;
     853                 :          11 :   compact->outfile = nullptr;
     854                 :             : 
     855   [ +  -  +  - ]:          11 :   if (s.ok() && current_entries > 0) {
     856                 :             :     // Verify that the table is usable
     857                 :          11 :     Iterator* iter =
     858         [ +  - ]:          11 :         table_cache_->NewIterator(ReadOptions(), output_number, current_bytes);
     859   [ +  -  -  + ]:          11 :     s = iter->status();
     860         [ +  - ]:          11 :     delete iter;
     861         [ +  - ]:          11 :     if (s.ok()) {
     862                 :          11 :       Log(options_.info_log, "Generated table #%llu@%d: %lld keys, %lld bytes",
     863         [ +  - ]:          11 :           (unsigned long long)output_number, compact->compaction->level(),
     864                 :             :           (unsigned long long)current_entries,
     865                 :             :           (unsigned long long)current_bytes);
     866                 :             :     }
     867                 :             :   }
     868                 :          11 :   return s;
     869                 :           0 : }
     870                 :             : 
     871                 :          11 : Status DBImpl::InstallCompactionResults(CompactionState* compact) {
     872                 :          11 :   mutex_.AssertHeld();
     873                 :          11 :   Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
     874         [ -  + ]:          11 :       compact->compaction->num_input_files(0), compact->compaction->level(),
     875                 :          11 :       compact->compaction->num_input_files(1), compact->compaction->level() + 1,
     876         [ -  + ]:          11 :       static_cast<long long>(compact->total_bytes));
     877                 :             : 
     878                 :             :   // Add compaction outputs
     879                 :          11 :   compact->compaction->AddInputDeletions(compact->compaction->edit());
     880                 :          11 :   const int level = compact->compaction->level();
     881   [ -  +  +  + ]:          22 :   for (size_t i = 0; i < compact->outputs.size(); i++) {
     882                 :          11 :     const CompactionState::Output& out = compact->outputs[i];
     883                 :          11 :     compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
     884                 :          11 :                                          out.smallest, out.largest);
     885                 :             :   }
     886                 :          11 :   return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
     887                 :             : }
     888                 :             : 
     889                 :          12 : Status DBImpl::DoCompactionWork(CompactionState* compact) {
     890                 :          12 :   const uint64_t start_micros = env_->NowMicros();
     891                 :          12 :   int64_t imm_micros = 0;  // Micros spent doing imm_ compactions
     892                 :             : 
     893                 :          12 :   Log(options_.info_log, "Compacting %d@%d + %d@%d files",
     894         [ -  + ]:          12 :       compact->compaction->num_input_files(0), compact->compaction->level(),
     895         [ -  + ]:          12 :       compact->compaction->num_input_files(1),
     896         [ -  + ]:          12 :       compact->compaction->level() + 1);
     897                 :             : 
     898         [ -  + ]:          12 :   assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
     899         [ -  + ]:          12 :   assert(compact->builder == nullptr);
     900         [ -  + ]:          12 :   assert(compact->outfile == nullptr);
     901         [ +  - ]:          12 :   if (snapshots_.empty()) {
     902                 :          12 :     compact->smallest_snapshot = versions_->LastSequence();
     903                 :             :   } else {
     904                 :           0 :     compact->smallest_snapshot = snapshots_.oldest()->sequence_number();
     905                 :             :   }
     906                 :             : 
     907                 :          12 :   Iterator* input = versions_->MakeInputIterator(compact->compaction);
     908                 :             : 
     909                 :             :   // Release mutex while we're actually doing the compaction work
     910                 :          12 :   mutex_.Unlock();
     911                 :             : 
     912                 :          12 :   input->SeekToFirst();
     913                 :          12 :   Status status;
     914                 :          12 :   ParsedInternalKey ikey;
     915                 :          12 :   std::string current_user_key;
     916                 :          12 :   bool has_current_user_key = false;
     917                 :          12 :   SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
     918   [ +  -  +  +  :        1628 :   while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
                   +  + ]
     919                 :             :     // Prioritize immutable compaction work
     920         [ -  + ]:        1616 :     if (has_imm_.load(std::memory_order_relaxed)) {
     921         [ #  # ]:           0 :       const uint64_t imm_start = env_->NowMicros();
     922         [ #  # ]:           0 :       mutex_.Lock();
     923         [ #  # ]:           0 :       if (imm_ != nullptr) {
     924         [ #  # ]:           0 :         CompactMemTable();
     925                 :             :         // Wake up MakeRoomForWrite() if necessary.
     926                 :           0 :         background_work_finished_signal_.SignalAll();
     927                 :             :       }
     928                 :           0 :       mutex_.Unlock();
     929         [ #  # ]:           0 :       imm_micros += (env_->NowMicros() - imm_start);
     930                 :             :     }
     931                 :             : 
     932         [ +  - ]:        1616 :     Slice key = input->key();
     933   [ +  -  -  + ]:        1616 :     if (compact->compaction->ShouldStopBefore(key) &&
     934         [ #  # ]:           0 :         compact->builder != nullptr) {
     935   [ #  #  #  # ]:           0 :       status = FinishCompactionOutputFile(compact, input);
     936         [ #  # ]:           0 :       if (!status.ok()) {
     937                 :             :         break;
     938                 :             :       }
     939                 :             :     }
     940                 :             : 
     941                 :             :     // Handle key/value, add to state, etc.
     942                 :        1616 :     bool drop = false;
     943         [ -  + ]:        1616 :     if (!ParseInternalKey(key, &ikey)) {
     944                 :             :       // Do not hide error keys
     945                 :           0 :       current_user_key.clear();
     946                 :           0 :       has_current_user_key = false;
     947                 :           0 :       last_sequence_for_key = kMaxSequenceNumber;
     948                 :             :     } else {
     949   [ +  +  +  + ]:        3220 :       if (!has_current_user_key ||
     950   [ -  +  +  - ]:        1604 :           user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) !=
     951                 :             :               0) {
     952                 :             :         // First occurrence of this user key
     953         [ +  - ]:        1412 :         current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
     954                 :             :         has_current_user_key = true;
     955                 :             :         last_sequence_for_key = kMaxSequenceNumber;
     956                 :             :       }
     957                 :             : 
     958         [ +  + ]:        1616 :       if (last_sequence_for_key <= compact->smallest_snapshot) {
     959                 :             :         // Hidden by an newer entry for same user key
     960                 :             :         drop = true;  // (A)
     961                 :        1423 :       } else if (ikey.type == kTypeDeletion &&
     962   [ +  +  +  -  :        1423 :                  ikey.sequence <= compact->smallest_snapshot &&
                   -  + ]
     963         [ +  - ]:          11 :                  compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
     964                 :             :         // For this user key:
     965                 :             :         // (1) there is no data in higher levels
     966                 :             :         // (2) data in lower levels will have larger sequence numbers
     967                 :             :         // (3) data in layers that are being compacted here and have
     968                 :             :         //     smaller sequence numbers will be dropped in the next
     969                 :             :         //     few iterations of this loop (by rule (A) above).
     970                 :             :         // Therefore this deletion marker is obsolete and can be dropped.
     971                 :          11 :         drop = true;
     972                 :             :       }
     973                 :             : 
     974                 :        1616 :       last_sequence_for_key = ikey.sequence;
     975                 :             :     }
     976                 :             : #if 0
     977                 :             :     Log(options_.info_log,
     978                 :             :         "  Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
     979                 :             :         "%d smallest_snapshot: %d",
     980                 :             :         ikey.user_key.ToString().c_str(),
     981                 :             :         (int)ikey.sequence, ikey.type, kTypeValue, drop,
     982                 :             :         compact->compaction->IsBaseLevelForKey(ikey.user_key),
     983                 :             :         (int)last_sequence_for_key, (int)compact->smallest_snapshot);
     984                 :             : #endif
     985                 :             : 
     986                 :        1616 :     if (!drop) {
     987                 :             :       // Open output file if necessary
     988         [ +  + ]:        1401 :       if (compact->builder == nullptr) {
     989   [ +  -  -  + ]:          12 :         status = OpenCompactionOutputFile(compact);
     990         [ +  - ]:          12 :         if (!status.ok()) {
     991                 :             :           break;
     992                 :             :         }
     993                 :             :       }
     994   [ +  -  +  + ]:        1401 :       if (compact->builder->NumEntries() == 0) {
     995   [ -  +  +  - ]:          12 :         compact->current_output()->smallest.DecodeFrom(key);
     996                 :             :       }
     997   [ -  +  +  - ]:        1401 :       compact->current_output()->largest.DecodeFrom(key);
     998   [ +  -  +  - ]:        1401 :       compact->builder->Add(key, input->value());
     999                 :             : 
    1000                 :             :       // Close output file if it is big enough
    1001   [ +  -  -  + ]:        1401 :       if (compact->builder->FileSize() >=
    1002         [ -  + ]:        1401 :           compact->compaction->MaxOutputFileSize()) {
    1003   [ #  #  #  # ]:           0 :         status = FinishCompactionOutputFile(compact, input);
    1004         [ #  # ]:           0 :         if (!status.ok()) {
    1005                 :             :           break;
    1006                 :             :         }
    1007                 :             :       }
    1008                 :             :     }
    1009                 :             : 
    1010         [ +  - ]:        1616 :     input->Next();
    1011                 :             :   }
    1012                 :             : 
    1013   [ +  -  +  + ]:          12 :   if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
    1014   [ +  -  -  + ]:           1 :     status = Status::IOError("Deleting DB during compaction");
    1015                 :             :   }
    1016   [ +  +  +  - ]:          12 :   if (status.ok() && compact->builder != nullptr) {
    1017   [ +  -  -  + ]:          11 :     status = FinishCompactionOutputFile(compact, input);
    1018                 :             :   }
    1019         [ +  + ]:          12 :   if (status.ok()) {
    1020   [ +  -  -  + ]:          11 :     status = input->status();
    1021                 :             :   }
    1022         [ +  - ]:          12 :   delete input;
    1023                 :          12 :   input = nullptr;
    1024                 :             : 
    1025         [ +  - ]:          12 :   CompactionStats stats;
    1026         [ +  - ]:          12 :   stats.micros = env_->NowMicros() - start_micros - imm_micros;
    1027         [ +  + ]:          36 :   for (int which = 0; which < 2; which++) {
    1028   [ -  +  +  + ]:          72 :     for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
    1029                 :          48 :       stats.bytes_read += compact->compaction->input(which, i)->file_size;
    1030                 :             :     }
    1031                 :             :   }
    1032   [ -  +  +  + ]:          24 :   for (size_t i = 0; i < compact->outputs.size(); i++) {
    1033                 :          12 :     stats.bytes_written += compact->outputs[i].file_size;
    1034                 :             :   }
    1035                 :             : 
    1036         [ +  - ]:          12 :   mutex_.Lock();
    1037         [ +  + ]:          12 :   stats_[compact->compaction->level() + 1].Add(stats);
    1038                 :             : 
    1039         [ +  + ]:          12 :   if (status.ok()) {
    1040   [ +  -  -  + ]:          11 :     status = InstallCompactionResults(compact);
    1041                 :             :   }
    1042         [ +  + ]:          12 :   if (!status.ok()) {
    1043         [ +  - ]:           1 :     RecordBackgroundError(status);
    1044                 :             :   }
    1045                 :          12 :   VersionSet::LevelSummaryStorage tmp;
    1046   [ +  -  +  - ]:          12 :   Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp));
    1047                 :          12 :   return status;
    1048                 :          12 : }
    1049                 :             : 
    1050                 :             : namespace {
    1051                 :             : 
    1052                 :             : struct IterState {
    1053                 :             :   port::Mutex* const mu;
    1054                 :             :   Version* const version GUARDED_BY(mu);
    1055                 :             :   MemTable* const mem GUARDED_BY(mu);
    1056                 :             :   MemTable* const imm GUARDED_BY(mu);
    1057                 :             : 
    1058                 :        1124 :   IterState(port::Mutex* mutex, MemTable* mem, MemTable* imm, Version* version)
    1059                 :        1124 :       : mu(mutex), version(version), mem(mem), imm(imm) {}
    1060                 :             : };
    1061                 :             : 
    1062                 :        1124 : static void CleanupIteratorState(void* arg1, void* arg2) {
    1063                 :        1124 :   IterState* state = reinterpret_cast<IterState*>(arg1);
    1064                 :        1124 :   state->mu->Lock();
    1065                 :        1124 :   state->mem->Unref();
    1066         [ -  + ]:        1124 :   if (state->imm != nullptr) state->imm->Unref();
    1067                 :        1124 :   state->version->Unref();
    1068                 :        1124 :   state->mu->Unlock();
    1069         [ +  - ]:        1124 :   delete state;
    1070                 :        1124 : }
    1071                 :             : 
    1072                 :             : }  // anonymous namespace
    1073                 :             : 
    1074                 :        1124 : Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
    1075                 :             :                                       SequenceNumber* latest_snapshot,
    1076                 :             :                                       uint32_t* seed) {
    1077                 :        1124 :   mutex_.Lock();
    1078         [ +  - ]:        1124 :   *latest_snapshot = versions_->LastSequence();
    1079                 :             : 
    1080                 :             :   // Collect together all needed child iterators
    1081                 :        1124 :   std::vector<Iterator*> list;
    1082   [ +  -  +  - ]:        1124 :   list.push_back(mem_->NewIterator());
    1083         [ -  + ]:        1124 :   mem_->Ref();
    1084         [ -  + ]:        1124 :   if (imm_ != nullptr) {
    1085   [ #  #  #  # ]:           0 :     list.push_back(imm_->NewIterator());
    1086                 :           0 :     imm_->Ref();
    1087                 :             :   }
    1088         [ +  - ]:        1124 :   versions_->current()->AddIterators(options, &list);
    1089         [ -  + ]:        1124 :   Iterator* internal_iter =
    1090         [ +  - ]:        1124 :       NewMergingIterator(&internal_comparator_, &list[0], list.size());
    1091         [ +  - ]:        1124 :   versions_->current()->Ref();
    1092                 :             : 
    1093   [ +  -  +  - ]:        1124 :   IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current());
    1094         [ +  - ]:        1124 :   internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
    1095                 :             : 
    1096                 :        1124 :   *seed = ++seed_;
    1097                 :        1124 :   mutex_.Unlock();
    1098                 :        1124 :   return internal_iter;
    1099                 :        1124 : }
    1100                 :             : 
    1101                 :           0 : Iterator* DBImpl::TEST_NewInternalIterator() {
    1102                 :           0 :   SequenceNumber ignored;
    1103                 :           0 :   uint32_t ignored_seed;
    1104                 :           0 :   return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed);
    1105                 :             : }
    1106                 :             : 
    1107                 :           0 : int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
    1108                 :           0 :   MutexLock l(&mutex_);
    1109         [ #  # ]:           0 :   return versions_->MaxNextLevelOverlappingBytes();
    1110                 :           0 : }
    1111                 :             : 
    1112                 :     4722718 : Status DBImpl::Get(const ReadOptions& options, const Slice& key,
    1113                 :             :                    std::string* value) {
    1114         [ +  - ]:     4722718 :   Status s;
    1115         [ +  - ]:     4722718 :   MutexLock l(&mutex_);
    1116                 :     4722718 :   SequenceNumber snapshot;
    1117         [ -  + ]:     4722718 :   if (options.snapshot != nullptr) {
    1118                 :           0 :     snapshot =
    1119                 :           0 :         static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
    1120                 :             :   } else {
    1121                 :     4722718 :     snapshot = versions_->LastSequence();
    1122                 :             :   }
    1123                 :             : 
    1124                 :     4722718 :   MemTable* mem = mem_;
    1125                 :     4722718 :   MemTable* imm = imm_;
    1126         [ +  + ]:     4722718 :   Version* current = versions_->current();
    1127         [ +  + ]:     4722718 :   mem->Ref();
    1128         [ +  + ]:     4722718 :   if (imm != nullptr) imm->Ref();
    1129         [ +  - ]:     4722718 :   current->Ref();
    1130                 :             : 
    1131                 :     4722718 :   bool have_stat_update = false;
    1132                 :     4722718 :   Version::GetStats stats;
    1133                 :             : 
    1134                 :             :   // Unlock while reading from files and memtables
    1135                 :     4722718 :   {
    1136                 :     4722718 :     mutex_.Unlock();
    1137                 :             :     // First look in the memtable, then in the immutable memtable (if any).
    1138         [ +  - ]:     4722718 :     LookupKey lkey(key, snapshot);
    1139   [ +  -  +  + ]:     4722718 :     if (mem->Get(lkey, value, &s)) {
    1140                 :             :       // Done
    1141   [ +  +  +  -  :     4560594 :     } else if (imm != nullptr && imm->Get(lkey, value, &s)) {
                   +  + ]
    1142                 :             :       // Done
    1143                 :             :     } else {
    1144   [ +  -  -  + ]:     4556699 :       s = current->Get(options, lkey, value, &stats);
    1145                 :     4556699 :       have_stat_update = true;
    1146                 :             :     }
    1147         [ +  - ]:     4722718 :     mutex_.Lock();
    1148                 :     4722718 :   }
    1149                 :             : 
    1150   [ +  +  +  -  :     4722718 :   if (have_stat_update && current->UpdateStats(stats)) {
                   +  + ]
    1151         [ +  - ]:           6 :     MaybeScheduleCompaction();
    1152                 :             :   }
    1153                 :     4722718 :   mem->Unref();
    1154         [ +  + ]:     4722718 :   if (imm != nullptr) imm->Unref();
    1155         [ +  - ]:     4722718 :   current->Unref();
    1156                 :     4722718 :   return s;
    1157                 :     4722718 : }
    1158                 :             : 
    1159                 :        1124 : Iterator* DBImpl::NewIterator(const ReadOptions& options) {
    1160                 :        1124 :   SequenceNumber latest_snapshot;
    1161                 :        1124 :   uint32_t seed;
    1162                 :        1124 :   Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
    1163                 :        1124 :   return NewDBIterator(this, user_comparator(), iter,
    1164         [ -  + ]:        1124 :                        (options.snapshot != nullptr
    1165                 :           0 :                             ? static_cast<const SnapshotImpl*>(options.snapshot)
    1166                 :           0 :                                   ->sequence_number()
    1167                 :             :                             : latest_snapshot),
    1168                 :        1124 :                        seed);
    1169                 :             : }
    1170                 :             : 
    1171                 :           7 : void DBImpl::RecordReadSample(Slice key) {
    1172                 :           7 :   MutexLock l(&mutex_);
    1173   [ +  -  -  + ]:           7 :   if (versions_->current()->RecordReadSample(key)) {
    1174         [ #  # ]:           0 :     MaybeScheduleCompaction();
    1175                 :             :   }
    1176                 :           7 : }
    1177                 :             : 
    1178                 :           0 : const Snapshot* DBImpl::GetSnapshot() {
    1179                 :           0 :   MutexLock l(&mutex_);
    1180         [ #  # ]:           0 :   return snapshots_.New(versions_->LastSequence());
    1181                 :           0 : }
    1182                 :             : 
    1183                 :           0 : void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
    1184                 :           0 :   MutexLock l(&mutex_);
    1185                 :           0 :   snapshots_.Delete(static_cast<const SnapshotImpl*>(snapshot));
    1186                 :           0 : }
    1187                 :             : 
    1188                 :             : // Convenience methods
    1189                 :           0 : Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
    1190                 :           0 :   return DB::Put(o, key, val);
    1191                 :             : }
    1192                 :             : 
    1193                 :           0 : Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
    1194                 :           0 :   return DB::Delete(options, key);
    1195                 :             : }
    1196                 :             : 
    1197                 :        1391 : Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
    1198                 :        1391 :   Writer w(&mutex_);
    1199                 :        1391 :   w.batch = updates;
    1200                 :        1391 :   w.sync = options.sync;
    1201                 :        1391 :   w.done = false;
    1202                 :             : 
    1203         [ +  - ]:        1391 :   MutexLock l(&mutex_);
    1204         [ +  - ]:        1391 :   writers_.push_back(&w);
    1205   [ +  -  -  + ]:        1391 :   while (!w.done && &w != writers_.front()) {
    1206         [ #  # ]:           0 :     w.cv.Wait();
    1207                 :             :   }
    1208         [ -  + ]:        1391 :   if (w.done) {
    1209         [ #  # ]:           0 :     return w.status;
    1210                 :             :   }
    1211                 :             : 
    1212                 :             :   // May temporarily unlock and wait.
    1213         [ +  - ]:        1391 :   Status status = MakeRoomForWrite(updates == nullptr);
    1214         [ +  - ]:        1391 :   uint64_t last_sequence = versions_->LastSequence();
    1215                 :        1391 :   Writer* last_writer = &w;
    1216   [ +  -  +  - ]:        1391 :   if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions
    1217         [ +  - ]:        1391 :     WriteBatch* write_batch = BuildBatchGroup(&last_writer);
    1218         [ +  - ]:        1391 :     WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
    1219         [ +  - ]:        1391 :     last_sequence += WriteBatchInternal::Count(write_batch);
    1220                 :             : 
    1221                 :             :     // Add to log and apply to memtable.  We can release the lock
    1222                 :             :     // during this phase since &w is currently responsible for logging
    1223                 :             :     // and protects against concurrent loggers and concurrent writes
    1224                 :             :     // into mem_.
    1225                 :        1391 :     {
    1226                 :        1391 :       mutex_.Unlock();
    1227   [ -  +  +  -  :        1391 :       status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
                   -  + ]
    1228                 :        1391 :       bool sync_error = false;
    1229   [ +  -  +  + ]:        1391 :       if (status.ok() && options.sync) {
    1230   [ +  -  -  + ]:          86 :         status = logfile_->Sync();
    1231         [ -  + ]:          86 :         if (!status.ok()) {
    1232                 :           0 :           sync_error = true;
    1233                 :             :         }
    1234                 :             :       }
    1235         [ +  - ]:        1391 :       if (status.ok()) {
    1236   [ +  -  -  + ]:        1391 :         status = WriteBatchInternal::InsertInto(write_batch, mem_);
    1237                 :             :       }
    1238         [ +  - ]:        1391 :       mutex_.Lock();
    1239         [ -  + ]:        1391 :       if (sync_error) {
    1240                 :             :         // The state of the log file is indeterminate: the log record we
    1241                 :             :         // just added may or may not show up when the DB is re-opened.
    1242                 :             :         // So we force the DB into a mode where all future writes fail.
    1243         [ #  # ]:           0 :         RecordBackgroundError(status);
    1244                 :             :       }
    1245                 :             :     }
    1246   [ -  +  -  - ]:        1391 :     if (write_batch == tmp_batch_) tmp_batch_->Clear();
    1247                 :             : 
    1248                 :        1391 :     versions_->SetLastSequence(last_sequence);
    1249                 :             :   }
    1250                 :             : 
    1251                 :        1391 :   while (true) {
    1252                 :        1391 :     Writer* ready = writers_.front();
    1253                 :        1391 :     writers_.pop_front();
    1254         [ -  + ]:        1391 :     if (ready != &w) {
    1255         [ #  # ]:           0 :       ready->status = status;
    1256                 :           0 :       ready->done = true;
    1257                 :           0 :       ready->cv.Signal();
    1258                 :             :     }
    1259         [ -  + ]:        1391 :     if (ready == last_writer) break;
    1260                 :             :   }
    1261                 :             : 
    1262                 :             :   // Notify new head of write queue
    1263         [ -  + ]:        1391 :   if (!writers_.empty()) {
    1264                 :           0 :     writers_.front()->cv.Signal();
    1265                 :             :   }
    1266                 :             : 
    1267                 :        1391 :   return status;
    1268                 :        2782 : }
    1269                 :             : 
    1270                 :             : // REQUIRES: Writer list must be non-empty
    1271                 :             : // REQUIRES: First writer must have a non-null batch
    1272                 :        1391 : WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
    1273                 :        1391 :   mutex_.AssertHeld();
    1274         [ -  + ]:        1391 :   assert(!writers_.empty());
    1275         [ -  + ]:        1391 :   Writer* first = writers_.front();
    1276                 :        1391 :   WriteBatch* result = first->batch;
    1277         [ -  + ]:        1391 :   assert(result != nullptr);
    1278                 :             : 
    1279         [ -  + ]:        1391 :   size_t size = WriteBatchInternal::ByteSize(first->batch);
    1280                 :             : 
    1281                 :             :   // Allow the group to grow up to a maximum size, but if the
    1282                 :             :   // original write is small, limit the growth so we do not slow
    1283                 :             :   // down the small write too much.
    1284                 :        1391 :   size_t max_size = 1 << 20;
    1285         [ +  + ]:        1391 :   if (size <= (128 << 10)) {
    1286                 :        1390 :     max_size = size + (128 << 10);
    1287                 :             :   }
    1288                 :             : 
    1289                 :        1391 :   *last_writer = first;
    1290                 :        1391 :   std::deque<Writer*>::iterator iter = writers_.begin();
    1291                 :        1391 :   ++iter;  // Advance past "first"
    1292         [ -  + ]:        1391 :   for (; iter != writers_.end(); ++iter) {
    1293                 :           0 :     Writer* w = *iter;
    1294   [ #  #  #  # ]:           0 :     if (w->sync && !first->sync) {
    1295                 :             :       // Do not include a sync write into a batch handled by a non-sync write.
    1296                 :             :       break;
    1297                 :             :     }
    1298                 :             : 
    1299         [ #  # ]:           0 :     if (w->batch != nullptr) {
    1300         [ #  # ]:           0 :       size += WriteBatchInternal::ByteSize(w->batch);
    1301         [ #  # ]:           0 :       if (size > max_size) {
    1302                 :             :         // Do not make batch too big
    1303                 :             :         break;
    1304                 :             :       }
    1305                 :             : 
    1306                 :             :       // Append to *result
    1307         [ #  # ]:           0 :       if (result == first->batch) {
    1308                 :             :         // Switch to temporary batch instead of disturbing caller's batch
    1309                 :           0 :         result = tmp_batch_;
    1310         [ #  # ]:           0 :         assert(WriteBatchInternal::Count(result) == 0);
    1311                 :           0 :         WriteBatchInternal::Append(result, first->batch);
    1312                 :             :       }
    1313                 :           0 :       WriteBatchInternal::Append(result, w->batch);
    1314                 :             :     }
    1315                 :           0 :     *last_writer = w;
    1316                 :             :   }
    1317                 :        1391 :   return result;
    1318                 :             : }
    1319                 :             : 
    1320                 :             : // REQUIRES: mutex_ is held
    1321                 :             : // REQUIRES: this thread is currently at the front of the writer queue
    1322                 :        1391 : Status DBImpl::MakeRoomForWrite(bool force) {
    1323                 :        1391 :   mutex_.AssertHeld();
    1324         [ -  + ]:        1391 :   assert(!writers_.empty());
    1325                 :        1391 :   bool allow_delay = !force;
    1326                 :        1391 :   Status s;
    1327                 :        1392 :   while (true) {
    1328                 :        1392 :     if (!bg_error_.ok()) {
    1329                 :             :       // Yield previous error
    1330         [ #  # ]:           0 :       s = bg_error_;
    1331                 :             :       break;
    1332   [ +  -  +  -  :        1392 :     } else if (allow_delay && versions_->NumLevelFiles(0) >=
                   -  + ]
    1333                 :             :                                   config::kL0_SlowdownWritesTrigger) {
    1334                 :             :       // We are getting close to hitting a hard limit on the number of
    1335                 :             :       // L0 files.  Rather than delaying a single write by several
    1336                 :             :       // seconds when we hit the hard limit, start delaying each
    1337                 :             :       // individual write by 1ms to reduce latency variance.  Also,
    1338                 :             :       // this delay hands over some CPU to the compaction thread in
    1339                 :             :       // case it is sharing the same core as the writer.
    1340                 :           0 :       mutex_.Unlock();
    1341         [ #  # ]:           0 :       env_->SleepForMicroseconds(1000);
    1342                 :           0 :       allow_delay = false;  // Do not delay a single write more than once
    1343   [ -  +  -  - ]:        1392 :       mutex_.Lock();
    1344         [ +  - ]:        1392 :     } else if (!force &&
    1345   [ +  -  +  + ]:        1392 :                (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
    1346                 :             :       // There is room in current memtable
    1347                 :             :       break;
    1348         [ -  + ]:           1 :     } else if (imm_ != nullptr) {
    1349                 :             :       // We have filled up the current memtable, but the previous
    1350                 :             :       // one is still being compacted, so we wait.
    1351         [ #  # ]:           0 :       Log(options_.info_log, "Current memtable full; waiting...\n");
    1352         [ #  # ]:           0 :       background_work_finished_signal_.Wait();
    1353   [ +  -  -  + ]:           1 :     } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
    1354                 :             :       // There are too many level-0 files.
    1355         [ #  # ]:           0 :       Log(options_.info_log, "Too many L0 files; waiting...\n");
    1356         [ #  # ]:           0 :       background_work_finished_signal_.Wait();
    1357                 :             :     } else {
    1358                 :             :       // Attempt to switch to a new memtable and trigger compaction of old
    1359         [ -  + ]:           1 :       assert(versions_->PrevLogNumber() == 0);
    1360         [ +  - ]:           1 :       uint64_t new_log_number = versions_->NewFileNumber();
    1361                 :           1 :       WritableFile* lfile = nullptr;
    1362   [ +  -  +  -  :           1 :       s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
                   -  + ]
    1363         [ -  + ]:           1 :       if (!s.ok()) {
    1364                 :             :         // Avoid chewing through file number space in a tight loop.
    1365         [ #  # ]:           0 :         versions_->ReuseFileNumber(new_log_number);
    1366                 :             :         break;
    1367                 :             :       }
    1368         [ +  - ]:           1 :       delete log_;
    1369         [ +  - ]:           1 :       delete logfile_;
    1370                 :           1 :       logfile_ = lfile;
    1371                 :           1 :       logfile_number_ = new_log_number;
    1372   [ +  -  +  - ]:           1 :       log_ = new log::Writer(lfile);
    1373                 :           1 :       imm_ = mem_;
    1374         [ +  - ]:           1 :       has_imm_.store(true, std::memory_order_release);
    1375   [ +  -  +  - ]:           1 :       mem_ = new MemTable(internal_comparator_);
    1376         [ +  - ]:           1 :       mem_->Ref();
    1377                 :           1 :       force = false;  // Do not force another compaction if have room
    1378         [ +  - ]:           1 :       MaybeScheduleCompaction();
    1379                 :             :     }
    1380                 :             :   }
    1381                 :        1391 :   return s;
    1382                 :           0 : }
    1383                 :             : 
    1384                 :           0 : bool DBImpl::GetProperty(const Slice& property, std::string* value) {
    1385                 :           0 :   value->clear();
    1386                 :             : 
    1387                 :           0 :   MutexLock l(&mutex_);
    1388                 :           0 :   Slice in = property;
    1389         [ #  # ]:           0 :   Slice prefix("leveldb.");
    1390         [ #  # ]:           0 :   if (!in.starts_with(prefix)) return false;
    1391                 :           0 :   in.remove_prefix(prefix.size());
    1392                 :             : 
    1393         [ #  # ]:           0 :   if (in.starts_with("num-files-at-level")) {
    1394                 :           0 :     in.remove_prefix(strlen("num-files-at-level"));
    1395                 :           0 :     uint64_t level;
    1396   [ #  #  #  #  :           0 :     bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
                   #  # ]
    1397         [ #  # ]:           0 :     if (!ok || level >= config::kNumLevels) {
    1398                 :             :       return false;
    1399                 :             :     } else {
    1400                 :           0 :       char buf[100];
    1401         [ #  # ]:           0 :       snprintf(buf, sizeof(buf), "%d",
    1402         [ #  # ]:           0 :                versions_->NumLevelFiles(static_cast<int>(level)));
    1403         [ #  # ]:           0 :       *value = buf;
    1404                 :             :       return true;
    1405                 :             :     }
    1406         [ #  # ]:           0 :   } else if (in == "stats") {
    1407                 :           0 :     char buf[200];
    1408         [ #  # ]:           0 :     snprintf(buf, sizeof(buf),
    1409                 :             :              "                               Compactions\n"
    1410                 :             :              "Level  Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
    1411                 :             :              "--------------------------------------------------\n");
    1412         [ #  # ]:           0 :     value->append(buf);
    1413         [ #  # ]:           0 :     for (int level = 0; level < config::kNumLevels; level++) {
    1414         [ #  # ]:           0 :       int files = versions_->NumLevelFiles(level);
    1415   [ #  #  #  # ]:           0 :       if (stats_[level].micros > 0 || files > 0) {
    1416         [ #  # ]:           0 :         snprintf(buf, sizeof(buf), "%3d %8d %8.0f %9.0f %8.0f %9.0f\n", level,
    1417                 :           0 :                  files, versions_->NumLevelBytes(level) / 1048576.0,
    1418                 :             :                  stats_[level].micros / 1e6,
    1419                 :           0 :                  stats_[level].bytes_read / 1048576.0,
    1420         [ #  # ]:           0 :                  stats_[level].bytes_written / 1048576.0);
    1421         [ #  # ]:           0 :         value->append(buf);
    1422                 :             :       }
    1423                 :             :     }
    1424                 :             :     return true;
    1425         [ #  # ]:           0 :   } else if (in == "sstables") {
    1426         [ #  # ]:           0 :     *value = versions_->current()->DebugString();
    1427                 :           0 :     return true;
    1428         [ #  # ]:           0 :   } else if (in == "approximate-memory-usage") {
    1429         [ #  # ]:           0 :     size_t total_usage = options_.block_cache->TotalCharge();
    1430         [ #  # ]:           0 :     if (mem_) {
    1431         [ #  # ]:           0 :       total_usage += mem_->ApproximateMemoryUsage();
    1432                 :             :     }
    1433         [ #  # ]:           0 :     if (imm_) {
    1434         [ #  # ]:           0 :       total_usage += imm_->ApproximateMemoryUsage();
    1435                 :             :     }
    1436                 :           0 :     char buf[50];
    1437         [ #  # ]:           0 :     snprintf(buf, sizeof(buf), "%llu",
    1438                 :             :              static_cast<unsigned long long>(total_usage));
    1439         [ #  # ]:           0 :     value->append(buf);
    1440                 :             :     return true;
    1441                 :             :   }
    1442                 :             : 
    1443                 :             :   return false;
    1444                 :           0 : }
    1445                 :             : 
    1446                 :          46 : void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
    1447                 :             :   // TODO(opt): better implementation
    1448                 :          46 :   MutexLock l(&mutex_);
    1449         [ +  - ]:          46 :   Version* v = versions_->current();
    1450         [ +  - ]:          46 :   v->Ref();
    1451                 :             : 
    1452         [ +  + ]:          92 :   for (int i = 0; i < n; i++) {
    1453                 :             :     // Convert user_key into a corresponding internal key.
    1454         [ +  - ]:          46 :     InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
    1455         [ +  - ]:          46 :     InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
    1456         [ +  - ]:          46 :     uint64_t start = versions_->ApproximateOffsetOf(v, k1);
    1457         [ +  - ]:          46 :     uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
    1458         [ +  - ]:          46 :     sizes[i] = (limit >= start ? limit - start : 0);
    1459                 :          46 :   }
    1460                 :             : 
    1461         [ +  - ]:          46 :   v->Unref();
    1462                 :          46 : }
    1463                 :             : 
    1464                 :             : // Default implementations of convenience methods that subclasses of DB
    1465                 :             : // can call if they wish
    1466                 :           0 : Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
    1467                 :           0 :   WriteBatch batch;
    1468         [ #  # ]:           0 :   batch.Put(key, value);
    1469         [ #  # ]:           0 :   return Write(opt, &batch);
    1470                 :           0 : }
    1471                 :             : 
    1472                 :           0 : Status DB::Delete(const WriteOptions& opt, const Slice& key) {
    1473                 :           0 :   WriteBatch batch;
    1474         [ #  # ]:           0 :   batch.Delete(key);
    1475         [ #  # ]:           0 :   return Write(opt, &batch);
    1476                 :           0 : }
    1477                 :             : 
    1478                 :         489 : DB::~DB() = default;
    1479                 :             : 
    1480                 :         489 : Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
    1481                 :         489 :   *dbptr = nullptr;
    1482                 :             : 
    1483         [ +  - ]:         489 :   DBImpl* impl = new DBImpl(options, dbname);
    1484                 :         489 :   impl->mutex_.Lock();
    1485                 :         489 :   VersionEdit edit;
    1486                 :             :   // Recover handles create_if_missing, error_if_exists
    1487                 :         489 :   bool save_manifest = false;
    1488         [ +  - ]:         489 :   Status s = impl->Recover(&edit, &save_manifest);
    1489   [ +  -  +  - ]:         489 :   if (s.ok() && impl->mem_ == nullptr) {
    1490                 :             :     // Create new log and a corresponding memtable.
    1491         [ +  - ]:         489 :     uint64_t new_log_number = impl->versions_->NewFileNumber();
    1492                 :         489 :     WritableFile* lfile;
    1493   [ +  -  +  - ]:         978 :     s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
    1494         [ -  + ]:         489 :                                      &lfile);
    1495         [ +  - ]:         489 :     if (s.ok()) {
    1496         [ +  - ]:         489 :       edit.SetLogNumber(new_log_number);
    1497                 :         489 :       impl->logfile_ = lfile;
    1498                 :         489 :       impl->logfile_number_ = new_log_number;
    1499   [ +  -  +  - ]:         489 :       impl->log_ = new log::Writer(lfile);
    1500   [ +  -  +  - ]:         489 :       impl->mem_ = new MemTable(impl->internal_comparator_);
    1501                 :         489 :       impl->mem_->Ref();
    1502                 :             :     }
    1503                 :             :   }
    1504   [ +  -  +  - ]:         489 :   if (s.ok() && save_manifest) {
    1505         [ +  - ]:         489 :     edit.SetPrevLogNumber(0);  // No older logs needed after recovery.
    1506         [ +  - ]:         489 :     edit.SetLogNumber(impl->logfile_number_);
    1507   [ +  -  -  + ]:         489 :     s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
    1508                 :             :   }
    1509         [ +  - ]:         489 :   if (s.ok()) {
    1510         [ +  - ]:         489 :     impl->DeleteObsoleteFiles();
    1511         [ +  - ]:         489 :     impl->MaybeScheduleCompaction();
    1512                 :             :   }
    1513                 :         489 :   impl->mutex_.Unlock();
    1514         [ +  - ]:         489 :   if (s.ok()) {
    1515         [ -  + ]:         489 :     assert(impl->mem_ != nullptr);
    1516                 :         489 :     *dbptr = impl;
    1517                 :             :   } else {
    1518         [ #  # ]:           0 :     delete impl;
    1519                 :             :   }
    1520                 :         489 :   return s;
    1521                 :         489 : }
    1522                 :             : 
    1523                 :         489 : Snapshot::~Snapshot() = default;
    1524                 :             : 
    1525                 :          21 : Status DestroyDB(const std::string& dbname, const Options& options) {
    1526                 :          21 :   Env* env = options.env;
    1527                 :          21 :   std::vector<std::string> filenames;
    1528         [ +  - ]:          21 :   Status result = env->GetChildren(dbname, &filenames);
    1529         [ +  + ]:          21 :   if (!result.ok()) {
    1530                 :             :     // Ignore error in case directory does not exist
    1531                 :           3 :     return Status::OK();
    1532                 :             :   }
    1533                 :             : 
    1534                 :          18 :   FileLock* lock;
    1535         [ +  - ]:          18 :   const std::string lockname = LockFileName(dbname);
    1536   [ +  -  -  + ]:          18 :   result = env->LockFile(lockname, &lock);
    1537         [ +  - ]:          18 :   if (result.ok()) {
    1538                 :             :     uint64_t number;
    1539                 :             :     FileType type;
    1540   [ -  +  +  + ]:         129 :     for (size_t i = 0; i < filenames.size(); i++) {
    1541   [ +  -  +  + ]:         111 :       if (ParseFileName(filenames[i], &number, &type) &&
    1542         [ +  + ]:          75 :           type != kDBLockFile) {  // Lock file will be deleted at end
    1543   [ +  -  +  -  :         114 :         Status del = env->DeleteFile(dbname + "/" + filenames[i]);
                   +  - ]
    1544   [ +  -  -  + ]:          57 :         if (result.ok() && !del.ok()) {
    1545         [ #  # ]:           0 :           result = del;
    1546                 :             :         }
    1547                 :          57 :       }
    1548                 :             :     }
    1549         [ +  - ]:          18 :     env->UnlockFile(lock);  // Ignore error since state is already gone
    1550         [ +  - ]:          18 :     env->DeleteFile(lockname);
    1551         [ +  - ]:          36 :     env->DeleteDir(dbname);  // Ignore error in case dir contains other files
    1552                 :             :   }
    1553                 :          18 :   return result;
    1554                 :          39 : }
    1555                 :             : 
    1556                 :             : }  // namespace leveldb
        

Generated by: LCOV version 2.0-1