Branch data Line data Source code
1 : : // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 : : // Use of this source code is governed by a BSD-style license that can be
3 : : // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 : :
5 : : #include "db/db_impl.h"
6 : :
7 : : #include <stdint.h>
8 : : #include <stdio.h>
9 : :
10 : : #include <algorithm>
11 : : #include <atomic>
12 : : #include <set>
13 : : #include <string>
14 : : #include <vector>
15 : :
16 : : #include "db/builder.h"
17 : : #include "db/db_iter.h"
18 : : #include "db/dbformat.h"
19 : : #include "db/filename.h"
20 : : #include "db/log_reader.h"
21 : : #include "db/log_writer.h"
22 : : #include "db/memtable.h"
23 : : #include "db/table_cache.h"
24 : : #include "db/version_set.h"
25 : : #include "db/write_batch_internal.h"
26 : : #include "leveldb/db.h"
27 : : #include "leveldb/env.h"
28 : : #include "leveldb/status.h"
29 : : #include "leveldb/table.h"
30 : : #include "leveldb/table_builder.h"
31 : : #include "port/port.h"
32 : : #include "table/block.h"
33 : : #include "table/merger.h"
34 : : #include "table/two_level_iterator.h"
35 : : #include "util/coding.h"
36 : : #include "util/logging.h"
37 : : #include "util/mutexlock.h"
38 : :
39 : : namespace leveldb {
40 : :
41 : : const int kNumNonTableCacheFiles = 10;
42 : :
43 : : // Information kept for every waiting writer
44 : : struct DBImpl::Writer {
45 : 1391 : explicit Writer(port::Mutex* mu)
46 : 1391 : : batch(nullptr), sync(false), done(false), cv(mu) {}
47 : :
48 : : Status status;
49 : : WriteBatch* batch;
50 : : bool sync;
51 : : bool done;
52 : : port::CondVar cv;
53 : : };
54 : :
55 : 12 : struct DBImpl::CompactionState {
56 : : // Files produced by compaction
57 : 24 : struct Output {
58 : : uint64_t number;
59 : : uint64_t file_size;
60 : : InternalKey smallest, largest;
61 : : };
62 : :
63 [ + - + - : 1435 : Output* current_output() { return &outputs[outputs.size() - 1]; }
- + + - ]
64 : :
65 : 12 : explicit CompactionState(Compaction* c)
66 : 12 : : compaction(c),
67 : 12 : smallest_snapshot(0),
68 : 12 : outfile(nullptr),
69 : 12 : builder(nullptr),
70 : 12 : total_bytes(0) {}
71 : :
72 : : Compaction* const compaction;
73 : :
74 : : // Sequence numbers < smallest_snapshot are not significant since we
75 : : // will never have to service a snapshot below smallest_snapshot.
76 : : // Therefore if we have seen a sequence number S <= smallest_snapshot,
77 : : // we can drop all entries for the same key with sequence numbers < S.
78 : : SequenceNumber smallest_snapshot;
79 : :
80 : : std::vector<Output> outputs;
81 : :
82 : : // State kept for output being generated
83 : : WritableFile* outfile;
84 : : TableBuilder* builder;
85 : :
86 : : uint64_t total_bytes;
87 : : };
88 : :
89 : : // Fix user-supplied options to be reasonable
90 : : template <class T, class V>
91 : 1956 : static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
92 [ - + ]: 1956 : if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
93 [ + + ]: 1956 : if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
94 : 1956 : }
95 : 489 : Options SanitizeOptions(const std::string& dbname,
96 : : const InternalKeyComparator* icmp,
97 : : const InternalFilterPolicy* ipolicy,
98 : : const Options& src) {
99 : 489 : Options result = src;
100 : 489 : result.comparator = icmp;
101 [ - + ]: 489 : result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr;
102 : 489 : ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000);
103 : 489 : ClipToRange(&result.write_buffer_size, 64 << 10, 1 << 30);
104 : 489 : ClipToRange(&result.max_file_size, 1 << 20, 1 << 30);
105 : 489 : ClipToRange(&result.block_size, 1 << 10, 4 << 20);
106 [ - + ]: 489 : if (result.info_log == nullptr) {
107 : : // Open a log file in the same directory as the db
108 [ # # ]: 0 : src.env->CreateDir(dbname); // In case it does not exist
109 [ # # # # ]: 0 : src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname));
110 [ # # ]: 0 : Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log);
111 [ # # ]: 0 : if (!s.ok()) {
112 : : // No place suitable for logging
113 : 0 : result.info_log = nullptr;
114 : : }
115 : 0 : }
116 [ - + ]: 489 : if (result.block_cache == nullptr) {
117 : 0 : result.block_cache = NewLRUCache(8 << 20);
118 : : }
119 : 489 : return result;
120 : : }
121 : :
122 : 489 : static int TableCacheSize(const Options& sanitized_options) {
123 : : // Reserve ten files or so for other uses and give the rest to TableCache.
124 : 489 : return sanitized_options.max_open_files - kNumNonTableCacheFiles;
125 : : }
126 : :
127 : 489 : DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
128 : 489 : : env_(raw_options.env),
129 [ + - ]: 489 : internal_comparator_(raw_options.comparator),
130 [ + - ]: 489 : internal_filter_policy_(raw_options.filter_policy),
131 [ + - ]: 489 : options_(SanitizeOptions(dbname, &internal_comparator_,
132 : : &internal_filter_policy_, raw_options)),
133 : 489 : owns_info_log_(options_.info_log != raw_options.info_log),
134 : 489 : owns_cache_(options_.block_cache != raw_options.block_cache),
135 [ - + ]: 489 : dbname_(dbname),
136 [ + - + - ]: 489 : table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))),
137 : 489 : db_lock_(nullptr),
138 : 489 : shutting_down_(false),
139 : 489 : background_work_finished_signal_(&mutex_),
140 : 489 : mem_(nullptr),
141 : 489 : imm_(nullptr),
142 [ + - ]: 489 : has_imm_(false),
143 : 489 : logfile_(nullptr),
144 : 489 : logfile_number_(0),
145 : 489 : log_(nullptr),
146 : 489 : seed_(0),
147 [ + - ]: 489 : tmp_batch_(new WriteBatch),
148 : 489 : background_compaction_scheduled_(false),
149 [ + - ]: 489 : manual_compaction_(nullptr),
150 : 978 : versions_(new VersionSet(dbname_, &options_, table_cache_,
151 [ + - + - : 4890 : &internal_comparator_)) {}
+ - + - +
- + + ]
152 : :
153 : 2445 : DBImpl::~DBImpl() {
154 : : // Wait for background work to finish.
155 : 489 : mutex_.Lock();
156 : 489 : shutting_down_.store(true, std::memory_order_release);
157 [ + + ]: 491 : while (background_compaction_scheduled_) {
158 : 2 : background_work_finished_signal_.Wait();
159 : : }
160 : 489 : mutex_.Unlock();
161 : :
162 [ + - ]: 489 : if (db_lock_ != nullptr) {
163 [ - + ]: 489 : env_->UnlockFile(db_lock_);
164 : : }
165 : :
166 [ + - ]: 489 : delete versions_;
167 [ + - ]: 489 : if (mem_ != nullptr) mem_->Unref();
168 [ - + ]: 489 : if (imm_ != nullptr) imm_->Unref();
169 [ + - ]: 489 : delete tmp_batch_;
170 [ + - ]: 489 : delete log_;
171 [ + - ]: 489 : delete logfile_;
172 [ + - ]: 489 : delete table_cache_;
173 : :
174 [ - + ]: 489 : if (owns_info_log_) {
175 [ # # ]: 0 : delete options_.info_log;
176 : : }
177 [ - + ]: 489 : if (owns_cache_) {
178 [ # # ]: 0 : delete options_.block_cache;
179 : : }
180 [ + + ]: 979 : }
181 : :
182 : 421 : Status DBImpl::NewDB() {
183 : 421 : VersionEdit new_db;
184 [ + - + - ]: 421 : new_db.SetComparatorName(user_comparator()->Name());
185 [ + - ]: 421 : new_db.SetLogNumber(0);
186 [ + - ]: 421 : new_db.SetNextFile(2);
187 [ + - ]: 421 : new_db.SetLastSequence(0);
188 : :
189 [ + - ]: 421 : const std::string manifest = DescriptorFileName(dbname_, 1);
190 : 421 : WritableFile* file;
191 [ + - ]: 421 : Status s = env_->NewWritableFile(manifest, &file);
192 [ + - ]: 421 : if (!s.ok()) {
193 : : return s;
194 : : }
195 : 421 : {
196 [ + - ]: 421 : log::Writer log(file);
197 [ + - ]: 421 : std::string record;
198 [ + - ]: 421 : new_db.EncodeTo(&record);
199 [ - + + - : 421 : s = log.AddRecord(record);
- + ]
200 [ + - ]: 421 : if (s.ok()) {
201 [ + - - + ]: 421 : s = file->Close();
202 : : }
203 : 421 : }
204 [ + - ]: 421 : delete file;
205 [ + - ]: 421 : if (s.ok()) {
206 : : // Make "CURRENT" file that points to the new manifest file.
207 [ + - - + ]: 421 : s = SetCurrentFile(env_, dbname_, 1);
208 : : } else {
209 [ # # ]: 0 : env_->DeleteFile(manifest);
210 : : }
211 : : return s;
212 : 421 : }
213 : :
214 : 255 : void DBImpl::MaybeIgnoreError(Status* s) const {
215 [ - + - - ]: 255 : if (s->ok() || options_.paranoid_checks) {
216 : : // No change needed
217 : : } else {
218 [ # # ]: 0 : Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
219 [ # # ]: 0 : *s = Status::OK();
220 : : }
221 : 255 : }
222 : :
223 : 502 : void DBImpl::DeleteObsoleteFiles() {
224 : 502 : mutex_.AssertHeld();
225 : :
226 [ + + ]: 502 : if (!bg_error_.ok()) {
227 : : // After a background error, we don't know whether a new version may
228 : : // or may not have been committed, so we cannot safely garbage collect.
229 : : return;
230 : : }
231 : :
232 : : // Make a set of all of the live files
233 : 501 : std::set<uint64_t> live = pending_outputs_;
234 [ + - ]: 501 : versions_->AddLiveFiles(&live);
235 : :
236 : 501 : std::vector<std::string> filenames;
237 [ + - ]: 501 : env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
238 : 501 : uint64_t number;
239 : 501 : FileType type;
240 : 501 : std::vector<std::string> files_to_delete;
241 [ + + ]: 3167 : for (std::string& filename : filenames) {
242 [ + - + + ]: 2666 : if (ParseFileName(filename, &number, &type)) {
243 : 2400 : bool keep = true;
244 [ + + + - : 2400 : switch (type) {
+ ]
245 : 570 : case kLogFile:
246 [ + + + - ]: 570 : keep = ((number >= versions_->LogNumber()) ||
247 [ + - ]: 69 : (number == versions_->PrevLogNumber()));
248 : : break;
249 : 990 : case kDescriptorFile:
250 : : // Keep my manifest file, and any newer incarnations'
251 : : // (in case there is a race that allows other incarnations)
252 : 990 : keep = (number >= versions_->ManifestFileNumber());
253 : 990 : break;
254 : 213 : case kTableFile:
255 : 213 : keep = (live.find(number) != live.end());
256 : 213 : break;
257 : 0 : case kTempFile:
258 : : // Any temp files that are currently being written to must
259 : : // be recorded in pending_outputs_, which is inserted into "live"
260 : 0 : keep = (live.find(number) != live.end());
261 : 0 : break;
262 : : case kCurrentFile:
263 : : case kDBLockFile:
264 : : case kInfoLogFile:
265 : : keep = true;
266 : : break;
267 : : }
268 : :
269 [ + + ]: 1203 : if (!keep) {
270 [ + - ]: 603 : files_to_delete.push_back(std::move(filename));
271 [ + + ]: 603 : if (type == kTableFile) {
272 [ + - ]: 45 : table_cache_->Evict(number);
273 : : }
274 [ + - ]: 603 : Log(options_.info_log, "Delete type=%d #%lld\n", static_cast<int>(type),
275 : : static_cast<unsigned long long>(number));
276 : : }
277 : : }
278 : : }
279 : :
280 : : // While deleting all files unblock other threads. All files being deleted
281 : : // have unique names which will not collide with newly created files and
282 : : // are therefore safe to delete while allowing other threads to proceed.
283 : 501 : mutex_.Unlock();
284 [ + + ]: 1104 : for (const std::string& filename : files_to_delete) {
285 [ + - + - : 1206 : env_->DeleteFile(dbname_ + "/" + filename);
+ - ]
286 : : }
287 [ + - ]: 501 : mutex_.Lock();
288 : 501 : }
289 : :
290 : 489 : Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
291 : 489 : mutex_.AssertHeld();
292 : :
293 : : // Ignore error from CreateDir since the creation of the DB is
294 : : // committed only when the descriptor is created, and this directory
295 : : // may already exist from a previous failed creation attempt.
296 [ + + ]: 489 : env_->CreateDir(dbname_);
297 [ - + ]: 489 : assert(db_lock_ == nullptr);
298 [ + - ]: 489 : Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
299 [ - + ]: 489 : if (!s.ok()) {
300 : 0 : return s;
301 : : }
302 : :
303 [ + - + - : 489 : if (!env_->FileExists(CurrentFileName(dbname_))) {
+ + ]
304 [ + - ]: 421 : if (options_.create_if_missing) {
305 [ + - - + ]: 421 : s = NewDB();
306 [ - + ]: 421 : if (!s.ok()) {
307 : 0 : return s;
308 : : }
309 : : } else {
310 [ # # ]: 0 : return Status::InvalidArgument(
311 [ # # # # ]: 0 : dbname_, "does not exist (create_if_missing is false)");
312 : : }
313 : : } else {
314 [ - + ]: 68 : if (options_.error_if_exists) {
315 [ # # # # ]: 0 : return Status::InvalidArgument(dbname_,
316 [ - - - + ]: 489 : "exists (error_if_exists is true)");
317 : : }
318 : : }
319 : :
320 [ + - - + ]: 489 : s = versions_->Recover(save_manifest);
321 [ - + ]: 489 : if (!s.ok()) {
322 : 0 : return s;
323 : : }
324 : 489 : SequenceNumber max_sequence(0);
325 : :
326 : : // Recover from all newer log files than the ones named in the
327 : : // descriptor (new log files may have been added by the previous
328 : : // incarnation without registering them in the descriptor).
329 : : //
330 : : // Note that PrevLogNumber() is no longer used, but we pay
331 : : // attention to it in case we are recovering a database
332 : : // produced by an older version of leveldb.
333 [ + - ]: 489 : const uint64_t min_log = versions_->LogNumber();
334 [ + - ]: 489 : const uint64_t prev_log = versions_->PrevLogNumber();
335 : 489 : std::vector<std::string> filenames;
336 [ + - - + ]: 489 : s = env_->GetChildren(dbname_, &filenames);
337 [ - + ]: 489 : if (!s.ok()) {
338 : 0 : return s;
339 : : }
340 [ + - ]: 489 : std::set<uint64_t> expected;
341 [ + - ]: 489 : versions_->AddLiveFiles(&expected);
342 : 489 : uint64_t number;
343 : 489 : FileType type;
344 : 489 : std::vector<uint64_t> logs;
345 [ - + + + ]: 1989 : for (size_t i = 0; i < filenames.size(); i++) {
346 [ + - + + ]: 1500 : if (ParseFileName(filenames[i], &number, &type)) {
347 : 1257 : expected.erase(number);
348 [ + + - + : 1257 : if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
- - ]
349 [ + - ]: 68 : logs.push_back(number);
350 : : }
351 : : }
352 [ - + ]: 489 : if (!expected.empty()) {
353 : 0 : char buf[50];
354 [ # # ]: 0 : snprintf(buf, sizeof(buf), "%d missing files; e.g.",
355 [ # # ]: 0 : static_cast<int>(expected.size()));
356 [ # # # # ]: 0 : return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
357 : : }
358 : :
359 : : // Recover in the order in which the logs were generated
360 : 489 : std::sort(logs.begin(), logs.end());
361 [ - + + + ]: 557 : for (size_t i = 0; i < logs.size(); i++) {
362 [ + - ]: 68 : s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
363 [ - + ]: 68 : &max_sequence);
364 [ - + ]: 68 : if (!s.ok()) {
365 : 0 : return s;
366 : : }
367 : :
368 : : // The previous incarnation may not have written any MANIFEST
369 : : // records after allocating this log number. So we manually
370 : : // update the file number allocation counter in VersionSet.
371 [ + - ]: 68 : versions_->MarkFileNumberUsed(logs[i]);
372 : : }
373 : :
374 [ + + ]: 489 : if (versions_->LastSequence() < max_sequence) {
375 : 59 : versions_->SetLastSequence(max_sequence);
376 : : }
377 : :
378 : 489 : return Status::OK();
379 : 978 : }
380 : :
381 : 68 : Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
382 : : bool* save_manifest, VersionEdit* edit,
383 : : SequenceNumber* max_sequence) {
384 : 0 : struct LogReporter : public log::Reader::Reporter {
385 : : Env* env;
386 : : Logger* info_log;
387 : : const char* fname;
388 : : Status* status; // null if options_.paranoid_checks==false
389 : 0 : void Corruption(size_t bytes, const Status& s) override {
390 [ # # ]: 0 : Log(info_log, "%s%s: dropping %d bytes; %s",
391 [ # # ]: 0 : (this->status == nullptr ? "(ignoring error) " : ""), fname,
392 [ # # ]: 0 : static_cast<int>(bytes), s.ToString().c_str());
393 [ # # # # ]: 0 : if (this->status != nullptr && this->status->ok()) *this->status = s;
394 : 0 : }
395 : : };
396 : :
397 : 68 : mutex_.AssertHeld();
398 : :
399 : : // Open the log file
400 : 68 : std::string fname = LogFileName(dbname_, log_number);
401 : 68 : SequentialFile* file;
402 [ + - ]: 68 : Status status = env_->NewSequentialFile(fname, &file);
403 [ - + ]: 68 : if (!status.ok()) {
404 [ # # ]: 0 : MaybeIgnoreError(&status);
405 : : return status;
406 : : }
407 : :
408 : : // Create the log reader.
409 [ - + ]: 68 : LogReporter reporter;
410 : 68 : reporter.env = env_;
411 : 68 : reporter.info_log = options_.info_log;
412 [ - + ]: 68 : reporter.fname = fname.c_str();
413 [ - + ]: 68 : reporter.status = (options_.paranoid_checks ? &status : nullptr);
414 : : // We intentionally make log::Reader do checksumming even if
415 : : // paranoid_checks==false so that corruptions cause entire commits
416 : : // to be skipped instead of propagating bad information (like overly
417 : : // large sequence numbers).
418 [ + - ]: 68 : log::Reader reader(file, &reporter, true /*checksum*/, 0 /*initial_offset*/);
419 [ + - ]: 68 : Log(options_.info_log, "Recovering log #%llu",
420 : : (unsigned long long)log_number);
421 : :
422 : : // Read all the records and add to a memtable
423 [ + - ]: 68 : std::string scratch;
424 [ + - ]: 68 : Slice record;
425 [ + - ]: 68 : WriteBatch batch;
426 : : int compactions = 0;
427 : : MemTable* mem = nullptr;
428 [ + - + + : 323 : while (reader.ReadRecord(&record, &scratch) && status.ok()) {
+ - ]
429 [ - + ]: 255 : if (record.size() < 12) {
430 [ # # # # ]: 0 : reporter.Corruption(record.size(),
431 [ # # # # ]: 0 : Status::Corruption("log record too small", fname));
432 : 0 : continue;
433 : : }
434 [ + - ]: 255 : WriteBatchInternal::SetContents(&batch, record);
435 : :
436 [ + + ]: 255 : if (mem == nullptr) {
437 [ + - + - ]: 61 : mem = new MemTable(internal_comparator_);
438 : 61 : mem->Ref();
439 : : }
440 [ + - - + ]: 255 : status = WriteBatchInternal::InsertInto(&batch, mem);
441 [ + - ]: 255 : MaybeIgnoreError(&status);
442 [ + - ]: 255 : if (!status.ok()) {
443 : : break;
444 : : }
445 [ + - ]: 255 : const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) +
446 [ + - ]: 255 : WriteBatchInternal::Count(&batch) - 1;
447 [ + - ]: 255 : if (last_seq > *max_sequence) {
448 : 255 : *max_sequence = last_seq;
449 : : }
450 : :
451 [ + - - + ]: 255 : if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
452 : 0 : compactions++;
453 : 0 : *save_manifest = true;
454 [ # # # # ]: 0 : status = WriteLevel0Table(mem, edit, nullptr);
455 : 0 : mem->Unref();
456 : 0 : mem = nullptr;
457 [ # # ]: 0 : if (!status.ok()) {
458 : : // Reflect errors immediately so that conditions like full
459 : : // file-systems cause the DB::Open() to fail.
460 : : break;
461 : : }
462 : : }
463 : : }
464 : :
465 [ + - ]: 68 : delete file;
466 : :
467 : : // See if we should keep reusing the last log file.
468 [ + - - + : 68 : if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
- - ]
469 [ # # ]: 0 : assert(logfile_ == nullptr);
470 [ # # ]: 0 : assert(log_ == nullptr);
471 [ # # ]: 0 : assert(mem_ == nullptr);
472 : 0 : uint64_t lfile_size;
473 [ # # # # : 0 : if (env_->GetFileSize(fname, &lfile_size).ok() &&
# # # # #
# ]
474 [ # # # # ]: 0 : env_->NewAppendableFile(fname, &logfile_).ok()) {
475 [ # # ]: 0 : Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
476 [ # # # # ]: 0 : log_ = new log::Writer(logfile_, lfile_size);
477 : 0 : logfile_number_ = log_number;
478 [ # # ]: 0 : if (mem != nullptr) {
479 : 0 : mem_ = mem;
480 : 0 : mem = nullptr;
481 : : } else {
482 : : // mem can be nullptr if lognum exists but was empty.
483 [ # # # # ]: 0 : mem_ = new MemTable(internal_comparator_);
484 : 0 : mem_->Ref();
485 : : }
486 : : }
487 : : }
488 : :
489 [ + + ]: 68 : if (mem != nullptr) {
490 : : // mem did not get reused; compact it.
491 [ + - ]: 61 : if (status.ok()) {
492 : 61 : *save_manifest = true;
493 [ + - - + ]: 61 : status = WriteLevel0Table(mem, edit, nullptr);
494 : : }
495 : 61 : mem->Unref();
496 : : }
497 : :
498 : 68 : return status;
499 : 136 : }
500 : :
501 : 62 : Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
502 : : Version* base) {
503 : 62 : mutex_.AssertHeld();
504 : 62 : const uint64_t start_micros = env_->NowMicros();
505 [ + - ]: 62 : FileMetaData meta;
506 [ + - ]: 62 : meta.number = versions_->NewFileNumber();
507 [ + - ]: 62 : pending_outputs_.insert(meta.number);
508 [ + - ]: 62 : Iterator* iter = mem->NewIterator();
509 : 62 : Log(options_.info_log, "Level-0 table #%llu: started",
510 [ + - ]: 62 : (unsigned long long)meta.number);
511 : :
512 : 62 : Status s;
513 : 62 : {
514 : 62 : mutex_.Unlock();
515 [ + - - + ]: 62 : s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
516 [ + - ]: 62 : mutex_.Lock();
517 : : }
518 : :
519 : 124 : Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
520 [ + - ]: 62 : (unsigned long long)meta.number, (unsigned long long)meta.file_size,
521 [ + - ]: 62 : s.ToString().c_str());
522 [ + - ]: 62 : delete iter;
523 : 62 : pending_outputs_.erase(meta.number);
524 : :
525 : : // Note that if file_size is zero, the file has been deleted and
526 : : // should not be added to the manifest.
527 : 62 : int level = 0;
528 [ + - + - ]: 62 : if (s.ok() && meta.file_size > 0) {
529 [ - + ]: 62 : const Slice min_user_key = meta.smallest.user_key();
530 [ - + ]: 62 : const Slice max_user_key = meta.largest.user_key();
531 [ + + ]: 62 : if (base != nullptr) {
532 [ + - ]: 1 : level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
533 : : }
534 [ + - ]: 62 : edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
535 : : meta.largest);
536 : : }
537 : :
538 [ + - ]: 62 : CompactionStats stats;
539 [ + - ]: 62 : stats.micros = env_->NowMicros() - start_micros;
540 : 62 : stats.bytes_written = meta.file_size;
541 : 62 : stats_[level].Add(stats);
542 : 62 : return s;
543 : 62 : }
544 : :
545 : 1 : void DBImpl::CompactMemTable() {
546 : 1 : mutex_.AssertHeld();
547 [ - + ]: 1 : assert(imm_ != nullptr);
548 : :
549 : : // Save the contents of the memtable as a new Table
550 : 1 : VersionEdit edit;
551 [ + - ]: 1 : Version* base = versions_->current();
552 [ + - ]: 1 : base->Ref();
553 [ + - ]: 1 : Status s = WriteLevel0Table(imm_, &edit, base);
554 [ + - ]: 1 : base->Unref();
555 : :
556 [ + - - + ]: 1 : if (s.ok() && shutting_down_.load(std::memory_order_acquire)) {
557 [ # # # # ]: 0 : s = Status::IOError("Deleting DB during memtable compaction");
558 : : }
559 : :
560 : : // Replace immutable memtable with the generated Table
561 [ + - ]: 1 : if (s.ok()) {
562 [ + - ]: 1 : edit.SetPrevLogNumber(0);
563 [ + - ]: 1 : edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
564 [ + - - + ]: 1 : s = versions_->LogAndApply(&edit, &mutex_);
565 : : }
566 : :
567 [ + - ]: 1 : if (s.ok()) {
568 : : // Commit to the new state
569 : 1 : imm_->Unref();
570 : 1 : imm_ = nullptr;
571 [ + - ]: 1 : has_imm_.store(false, std::memory_order_release);
572 [ + - ]: 1 : DeleteObsoleteFiles();
573 : : } else {
574 [ # # ]: 0 : RecordBackgroundError(s);
575 : : }
576 : 1 : }
577 : :
578 : 0 : void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
579 : 0 : int max_level_with_files = 1;
580 : 0 : {
581 : 0 : MutexLock l(&mutex_);
582 : 0 : Version* base = versions_->current();
583 [ # # ]: 0 : for (int level = 1; level < config::kNumLevels; level++) {
584 [ # # # # ]: 0 : if (base->OverlapInLevel(level, begin, end)) {
585 : 0 : max_level_with_files = level;
586 : : }
587 : : }
588 : 0 : }
589 [ # # ]: 0 : TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
590 [ # # ]: 0 : for (int level = 0; level < max_level_with_files; level++) {
591 : 0 : TEST_CompactRange(level, begin, end);
592 : : }
593 : 0 : }
594 : :
595 : 0 : void DBImpl::TEST_CompactRange(int level, const Slice* begin,
596 : : const Slice* end) {
597 [ # # ]: 0 : assert(level >= 0);
598 [ # # ]: 0 : assert(level + 1 < config::kNumLevels);
599 : :
600 [ # # ]: 0 : InternalKey begin_storage, end_storage;
601 : :
602 [ # # ]: 0 : ManualCompaction manual;
603 : 0 : manual.level = level;
604 : 0 : manual.done = false;
605 [ # # ]: 0 : if (begin == nullptr) {
606 : 0 : manual.begin = nullptr;
607 : : } else {
608 [ # # ]: 0 : begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
609 : 0 : manual.begin = &begin_storage;
610 : : }
611 [ # # ]: 0 : if (end == nullptr) {
612 : 0 : manual.end = nullptr;
613 : : } else {
614 [ # # ]: 0 : end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
615 : 0 : manual.end = &end_storage;
616 : : }
617 : :
618 [ # # ]: 0 : MutexLock l(&mutex_);
619 [ # # # # : 0 : while (!manual.done && !shutting_down_.load(std::memory_order_acquire) &&
# # ]
620 [ # # ]: 0 : bg_error_.ok()) {
621 [ # # ]: 0 : if (manual_compaction_ == nullptr) { // Idle
622 : 0 : manual_compaction_ = &manual;
623 [ # # ]: 0 : MaybeScheduleCompaction();
624 : : } else { // Running either my compaction or another compaction.
625 [ # # ]: 0 : background_work_finished_signal_.Wait();
626 : : }
627 : : }
628 [ # # ]: 0 : if (manual_compaction_ == &manual) {
629 : : // Cancel my manual compaction since we aborted early for some reason.
630 : 0 : manual_compaction_ = nullptr;
631 : : }
632 : 0 : }
633 : :
634 : 0 : Status DBImpl::TEST_CompactMemTable() {
635 : : // nullptr batch means just wait for earlier writes to be done
636 : 0 : Status s = Write(WriteOptions(), nullptr);
637 [ # # ]: 0 : if (s.ok()) {
638 : : // Wait until the compaction completes
639 [ # # ]: 0 : MutexLock l(&mutex_);
640 [ # # # # ]: 0 : while (imm_ != nullptr && bg_error_.ok()) {
641 [ # # ]: 0 : background_work_finished_signal_.Wait();
642 : : }
643 [ # # ]: 0 : if (imm_ != nullptr) {
644 [ # # ]: 0 : s = bg_error_;
645 : : }
646 : 0 : }
647 : 0 : return s;
648 : 0 : }
649 : :
650 : 2 : void DBImpl::RecordBackgroundError(const Status& s) {
651 : 2 : mutex_.AssertHeld();
652 [ + + ]: 2 : if (bg_error_.ok()) {
653 : 1 : bg_error_ = s;
654 : 1 : background_work_finished_signal_.SignalAll();
655 : : }
656 : 2 : }
657 : :
658 : 509 : void DBImpl::MaybeScheduleCompaction() {
659 : 509 : mutex_.AssertHeld();
660 [ + + ]: 509 : if (background_compaction_scheduled_) {
661 : : // Already scheduled
662 [ + + ]: 505 : } else if (shutting_down_.load(std::memory_order_acquire)) {
663 : : // DB is being deleted; no more background compactions
664 [ + - ]: 503 : } else if (!bg_error_.ok()) {
665 : : // Already got an error; no more changes
666 [ + + + - ]: 503 : } else if (imm_ == nullptr && manual_compaction_ == nullptr &&
667 [ + + ]: 502 : !versions_->NeedsCompaction()) {
668 : : // No work to be done
669 : : } else {
670 : 13 : background_compaction_scheduled_ = true;
671 : 13 : env_->Schedule(&DBImpl::BGWork, this);
672 : : }
673 : 509 : }
674 : :
675 : 13 : void DBImpl::BGWork(void* db) {
676 : 13 : reinterpret_cast<DBImpl*>(db)->BackgroundCall();
677 : 13 : }
678 : :
679 : 13 : void DBImpl::BackgroundCall() {
680 : 13 : MutexLock l(&mutex_);
681 [ - + ]: 13 : assert(background_compaction_scheduled_);
682 [ + - ]: 13 : if (shutting_down_.load(std::memory_order_acquire)) {
683 : : // No more background work when shutting down.
684 [ + - ]: 13 : } else if (!bg_error_.ok()) {
685 : : // No more background work after a background error.
686 : : } else {
687 [ + - ]: 13 : BackgroundCompaction();
688 : : }
689 : :
690 : 13 : background_compaction_scheduled_ = false;
691 : :
692 : : // Previous compaction may have produced too many files in a level,
693 : : // so reschedule another compaction if needed.
694 [ + - ]: 13 : MaybeScheduleCompaction();
695 : 13 : background_work_finished_signal_.SignalAll();
696 : 13 : }
697 : :
698 : 13 : void DBImpl::BackgroundCompaction() {
699 : 13 : mutex_.AssertHeld();
700 : :
701 [ + + ]: 13 : if (imm_ != nullptr) {
702 : 1 : CompactMemTable();
703 : 1 : return;
704 : : }
705 : :
706 : 12 : Compaction* c;
707 : 12 : bool is_manual = (manual_compaction_ != nullptr);
708 [ - + ]: 12 : InternalKey manual_end;
709 [ - + ]: 12 : if (is_manual) {
710 : 0 : ManualCompaction* m = manual_compaction_;
711 [ # # ]: 0 : c = versions_->CompactRange(m->level, m->begin, m->end);
712 : 0 : m->done = (c == nullptr);
713 [ # # ]: 0 : if (c != nullptr) {
714 [ # # # # ]: 0 : manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
715 : : }
716 [ # # ]: 0 : Log(options_.info_log,
717 : : "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
718 [ # # # # : 0 : m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
# # # # ]
719 [ # # # # : 0 : (m->end ? m->end->DebugString().c_str() : "(end)"),
# # # # ]
720 [ # # # # : 0 : (m->done ? "(end)" : manual_end.DebugString().c_str()));
# # # # ]
721 : : } else {
722 [ + - ]: 12 : c = versions_->PickCompaction();
723 : : }
724 : :
725 [ + - ]: 12 : Status status;
726 [ + - ]: 12 : if (c == nullptr) {
727 : : // Nothing to do
728 [ + - + - : 12 : } else if (!is_manual && c->IsTrivialMove()) {
- + ]
729 : : // Move file to next level
730 [ # # # # ]: 0 : assert(c->num_input_files(0) == 1);
731 [ # # ]: 0 : FileMetaData* f = c->input(0, 0);
732 [ # # ]: 0 : c->edit()->DeleteFile(c->level(), f->number);
733 : 0 : c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
734 [ # # ]: 0 : f->largest);
735 [ # # # # ]: 0 : status = versions_->LogAndApply(c->edit(), &mutex_);
736 [ # # ]: 0 : if (!status.ok()) {
737 [ # # ]: 0 : RecordBackgroundError(status);
738 : : }
739 : 0 : VersionSet::LevelSummaryStorage tmp;
740 : 0 : Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
741 [ # # ]: 0 : static_cast<unsigned long long>(f->number), c->level() + 1,
742 [ # # ]: 0 : static_cast<unsigned long long>(f->file_size),
743 [ # # # # ]: 0 : status.ToString().c_str(), versions_->LevelSummary(&tmp));
744 : : } else {
745 [ + - + - ]: 12 : CompactionState* compact = new CompactionState(c);
746 [ + - - + ]: 12 : status = DoCompactionWork(compact);
747 [ + + ]: 12 : if (!status.ok()) {
748 [ + - ]: 1 : RecordBackgroundError(status);
749 : : }
750 [ + - ]: 12 : CleanupCompaction(compact);
751 [ + - ]: 12 : c->ReleaseInputs();
752 [ + - ]: 12 : DeleteObsoleteFiles();
753 : : }
754 : 12 : delete c;
755 : :
756 [ + + ]: 12 : if (status.ok()) {
757 : : // Done
758 [ - + ]: 1 : } else if (shutting_down_.load(std::memory_order_acquire)) {
759 : : // Ignore compaction errors found during shutting down
760 : : } else {
761 [ # # # # ]: 0 : Log(options_.info_log, "Compaction error: %s", status.ToString().c_str());
762 : : }
763 : :
764 [ - + ]: 12 : if (is_manual) {
765 : 0 : ManualCompaction* m = manual_compaction_;
766 [ # # ]: 0 : if (!status.ok()) {
767 : 0 : m->done = true;
768 : : }
769 [ # # ]: 0 : if (!m->done) {
770 : : // We only compacted part of the requested range. Update *m
771 : : // to the range that is left to be compacted.
772 [ # # ]: 0 : m->tmp_storage = manual_end;
773 : 0 : m->begin = &m->tmp_storage;
774 : : }
775 : 0 : manual_compaction_ = nullptr;
776 : : }
777 : 12 : }
778 : :
779 : 12 : void DBImpl::CleanupCompaction(CompactionState* compact) {
780 : 12 : mutex_.AssertHeld();
781 [ + + ]: 12 : if (compact->builder != nullptr) {
782 : : // May happen if we get a shutdown call in the middle of compaction
783 : 1 : compact->builder->Abandon();
784 [ + - ]: 1 : delete compact->builder;
785 : : } else {
786 [ - + ]: 11 : assert(compact->outfile == nullptr);
787 : : }
788 [ + + ]: 12 : delete compact->outfile;
789 [ - + + + ]: 24 : for (size_t i = 0; i < compact->outputs.size(); i++) {
790 : 12 : const CompactionState::Output& out = compact->outputs[i];
791 : 12 : pending_outputs_.erase(out.number);
792 : : }
793 [ + - ]: 24 : delete compact;
794 : 12 : }
795 : :
796 : 12 : Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
797 [ - + ]: 12 : assert(compact != nullptr);
798 [ - + ]: 12 : assert(compact->builder == nullptr);
799 : 12 : uint64_t file_number;
800 : 12 : {
801 : 12 : mutex_.Lock();
802 : 12 : file_number = versions_->NewFileNumber();
803 : 12 : pending_outputs_.insert(file_number);
804 [ + - ]: 12 : CompactionState::Output out;
805 : 12 : out.number = file_number;
806 : 12 : out.file_size = 0;
807 [ + - ]: 12 : out.smallest.Clear();
808 : 12 : out.largest.Clear();
809 [ + - ]: 12 : compact->outputs.push_back(out);
810 : 12 : mutex_.Unlock();
811 : 0 : }
812 : :
813 : : // Make the output file
814 : 12 : std::string fname = TableFileName(dbname_, file_number);
815 [ + - ]: 12 : Status s = env_->NewWritableFile(fname, &compact->outfile);
816 [ + - ]: 12 : if (s.ok()) {
817 [ + - + - ]: 12 : compact->builder = new TableBuilder(options_, compact->outfile);
818 : : }
819 : 12 : return s;
820 : 12 : }
821 : :
822 : 11 : Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
823 : : Iterator* input) {
824 [ - + ]: 11 : assert(compact != nullptr);
825 [ - + ]: 11 : assert(compact->outfile != nullptr);
826 [ - + ]: 11 : assert(compact->builder != nullptr);
827 : :
828 [ - + ]: 11 : const uint64_t output_number = compact->current_output()->number;
829 [ - + ]: 11 : assert(output_number != 0);
830 : :
831 : : // Check for iterator errors
832 : 11 : Status s = input->status();
833 [ + - ]: 11 : const uint64_t current_entries = compact->builder->NumEntries();
834 [ + - ]: 11 : if (s.ok()) {
835 [ + - - + ]: 11 : s = compact->builder->Finish();
836 : : } else {
837 [ # # ]: 0 : compact->builder->Abandon();
838 : : }
839 [ + - ]: 11 : const uint64_t current_bytes = compact->builder->FileSize();
840 [ - + ]: 11 : compact->current_output()->file_size = current_bytes;
841 : 11 : compact->total_bytes += current_bytes;
842 [ + - ]: 11 : delete compact->builder;
843 : 11 : compact->builder = nullptr;
844 : :
845 : : // Finish and check for file errors
846 [ + - ]: 11 : if (s.ok()) {
847 [ + - - + ]: 11 : s = compact->outfile->Sync();
848 : : }
849 [ + - ]: 11 : if (s.ok()) {
850 [ + - - + ]: 11 : s = compact->outfile->Close();
851 : : }
852 [ + - ]: 11 : delete compact->outfile;
853 : 11 : compact->outfile = nullptr;
854 : :
855 [ + - + - ]: 11 : if (s.ok() && current_entries > 0) {
856 : : // Verify that the table is usable
857 : 11 : Iterator* iter =
858 [ + - ]: 11 : table_cache_->NewIterator(ReadOptions(), output_number, current_bytes);
859 [ + - - + ]: 11 : s = iter->status();
860 [ + - ]: 11 : delete iter;
861 [ + - ]: 11 : if (s.ok()) {
862 : 11 : Log(options_.info_log, "Generated table #%llu@%d: %lld keys, %lld bytes",
863 [ + - ]: 11 : (unsigned long long)output_number, compact->compaction->level(),
864 : : (unsigned long long)current_entries,
865 : : (unsigned long long)current_bytes);
866 : : }
867 : : }
868 : 11 : return s;
869 : 0 : }
870 : :
871 : 11 : Status DBImpl::InstallCompactionResults(CompactionState* compact) {
872 : 11 : mutex_.AssertHeld();
873 : 11 : Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
874 [ - + ]: 11 : compact->compaction->num_input_files(0), compact->compaction->level(),
875 : 11 : compact->compaction->num_input_files(1), compact->compaction->level() + 1,
876 [ - + ]: 11 : static_cast<long long>(compact->total_bytes));
877 : :
878 : : // Add compaction outputs
879 : 11 : compact->compaction->AddInputDeletions(compact->compaction->edit());
880 : 11 : const int level = compact->compaction->level();
881 [ - + + + ]: 22 : for (size_t i = 0; i < compact->outputs.size(); i++) {
882 : 11 : const CompactionState::Output& out = compact->outputs[i];
883 : 11 : compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
884 : 11 : out.smallest, out.largest);
885 : : }
886 : 11 : return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
887 : : }
888 : :
889 : 12 : Status DBImpl::DoCompactionWork(CompactionState* compact) {
890 : 12 : const uint64_t start_micros = env_->NowMicros();
891 : 12 : int64_t imm_micros = 0; // Micros spent doing imm_ compactions
892 : :
893 : 12 : Log(options_.info_log, "Compacting %d@%d + %d@%d files",
894 [ - + ]: 12 : compact->compaction->num_input_files(0), compact->compaction->level(),
895 [ - + ]: 12 : compact->compaction->num_input_files(1),
896 [ - + ]: 12 : compact->compaction->level() + 1);
897 : :
898 [ - + ]: 12 : assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
899 [ - + ]: 12 : assert(compact->builder == nullptr);
900 [ - + ]: 12 : assert(compact->outfile == nullptr);
901 [ + - ]: 12 : if (snapshots_.empty()) {
902 : 12 : compact->smallest_snapshot = versions_->LastSequence();
903 : : } else {
904 : 0 : compact->smallest_snapshot = snapshots_.oldest()->sequence_number();
905 : : }
906 : :
907 : 12 : Iterator* input = versions_->MakeInputIterator(compact->compaction);
908 : :
909 : : // Release mutex while we're actually doing the compaction work
910 : 12 : mutex_.Unlock();
911 : :
912 : 12 : input->SeekToFirst();
913 : 12 : Status status;
914 : 12 : ParsedInternalKey ikey;
915 : 12 : std::string current_user_key;
916 : 12 : bool has_current_user_key = false;
917 : 12 : SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
918 [ + - + + : 1628 : while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
+ + ]
919 : : // Prioritize immutable compaction work
920 [ - + ]: 1616 : if (has_imm_.load(std::memory_order_relaxed)) {
921 [ # # ]: 0 : const uint64_t imm_start = env_->NowMicros();
922 [ # # ]: 0 : mutex_.Lock();
923 [ # # ]: 0 : if (imm_ != nullptr) {
924 [ # # ]: 0 : CompactMemTable();
925 : : // Wake up MakeRoomForWrite() if necessary.
926 : 0 : background_work_finished_signal_.SignalAll();
927 : : }
928 : 0 : mutex_.Unlock();
929 [ # # ]: 0 : imm_micros += (env_->NowMicros() - imm_start);
930 : : }
931 : :
932 [ + - ]: 1616 : Slice key = input->key();
933 [ + - - + ]: 1616 : if (compact->compaction->ShouldStopBefore(key) &&
934 [ # # ]: 0 : compact->builder != nullptr) {
935 [ # # # # ]: 0 : status = FinishCompactionOutputFile(compact, input);
936 [ # # ]: 0 : if (!status.ok()) {
937 : : break;
938 : : }
939 : : }
940 : :
941 : : // Handle key/value, add to state, etc.
942 : 1616 : bool drop = false;
943 [ - + ]: 1616 : if (!ParseInternalKey(key, &ikey)) {
944 : : // Do not hide error keys
945 : 0 : current_user_key.clear();
946 : 0 : has_current_user_key = false;
947 : 0 : last_sequence_for_key = kMaxSequenceNumber;
948 : : } else {
949 [ + + + + ]: 3220 : if (!has_current_user_key ||
950 [ - + + - ]: 1604 : user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) !=
951 : : 0) {
952 : : // First occurrence of this user key
953 [ + - ]: 1412 : current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
954 : : has_current_user_key = true;
955 : : last_sequence_for_key = kMaxSequenceNumber;
956 : : }
957 : :
958 [ + + ]: 1616 : if (last_sequence_for_key <= compact->smallest_snapshot) {
959 : : // Hidden by an newer entry for same user key
960 : : drop = true; // (A)
961 : 1423 : } else if (ikey.type == kTypeDeletion &&
962 [ + + + - : 1423 : ikey.sequence <= compact->smallest_snapshot &&
- + ]
963 [ + - ]: 11 : compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
964 : : // For this user key:
965 : : // (1) there is no data in higher levels
966 : : // (2) data in lower levels will have larger sequence numbers
967 : : // (3) data in layers that are being compacted here and have
968 : : // smaller sequence numbers will be dropped in the next
969 : : // few iterations of this loop (by rule (A) above).
970 : : // Therefore this deletion marker is obsolete and can be dropped.
971 : 11 : drop = true;
972 : : }
973 : :
974 : 1616 : last_sequence_for_key = ikey.sequence;
975 : : }
976 : : #if 0
977 : : Log(options_.info_log,
978 : : " Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
979 : : "%d smallest_snapshot: %d",
980 : : ikey.user_key.ToString().c_str(),
981 : : (int)ikey.sequence, ikey.type, kTypeValue, drop,
982 : : compact->compaction->IsBaseLevelForKey(ikey.user_key),
983 : : (int)last_sequence_for_key, (int)compact->smallest_snapshot);
984 : : #endif
985 : :
986 : 1616 : if (!drop) {
987 : : // Open output file if necessary
988 [ + + ]: 1401 : if (compact->builder == nullptr) {
989 [ + - - + ]: 12 : status = OpenCompactionOutputFile(compact);
990 [ + - ]: 12 : if (!status.ok()) {
991 : : break;
992 : : }
993 : : }
994 [ + - + + ]: 1401 : if (compact->builder->NumEntries() == 0) {
995 [ - + + - ]: 12 : compact->current_output()->smallest.DecodeFrom(key);
996 : : }
997 [ - + + - ]: 1401 : compact->current_output()->largest.DecodeFrom(key);
998 [ + - + - ]: 1401 : compact->builder->Add(key, input->value());
999 : :
1000 : : // Close output file if it is big enough
1001 [ + - - + ]: 1401 : if (compact->builder->FileSize() >=
1002 [ - + ]: 1401 : compact->compaction->MaxOutputFileSize()) {
1003 [ # # # # ]: 0 : status = FinishCompactionOutputFile(compact, input);
1004 [ # # ]: 0 : if (!status.ok()) {
1005 : : break;
1006 : : }
1007 : : }
1008 : : }
1009 : :
1010 [ + - ]: 1616 : input->Next();
1011 : : }
1012 : :
1013 [ + - + + ]: 12 : if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
1014 [ + - - + ]: 1 : status = Status::IOError("Deleting DB during compaction");
1015 : : }
1016 [ + + + - ]: 12 : if (status.ok() && compact->builder != nullptr) {
1017 [ + - - + ]: 11 : status = FinishCompactionOutputFile(compact, input);
1018 : : }
1019 [ + + ]: 12 : if (status.ok()) {
1020 [ + - - + ]: 11 : status = input->status();
1021 : : }
1022 [ + - ]: 12 : delete input;
1023 : 12 : input = nullptr;
1024 : :
1025 [ + - ]: 12 : CompactionStats stats;
1026 [ + - ]: 12 : stats.micros = env_->NowMicros() - start_micros - imm_micros;
1027 [ + + ]: 36 : for (int which = 0; which < 2; which++) {
1028 [ - + + + ]: 72 : for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
1029 : 48 : stats.bytes_read += compact->compaction->input(which, i)->file_size;
1030 : : }
1031 : : }
1032 [ - + + + ]: 24 : for (size_t i = 0; i < compact->outputs.size(); i++) {
1033 : 12 : stats.bytes_written += compact->outputs[i].file_size;
1034 : : }
1035 : :
1036 [ + - ]: 12 : mutex_.Lock();
1037 [ + + ]: 12 : stats_[compact->compaction->level() + 1].Add(stats);
1038 : :
1039 [ + + ]: 12 : if (status.ok()) {
1040 [ + - - + ]: 11 : status = InstallCompactionResults(compact);
1041 : : }
1042 [ + + ]: 12 : if (!status.ok()) {
1043 [ + - ]: 1 : RecordBackgroundError(status);
1044 : : }
1045 : 12 : VersionSet::LevelSummaryStorage tmp;
1046 [ + - + - ]: 12 : Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp));
1047 : 12 : return status;
1048 : 12 : }
1049 : :
1050 : : namespace {
1051 : :
1052 : : struct IterState {
1053 : : port::Mutex* const mu;
1054 : : Version* const version GUARDED_BY(mu);
1055 : : MemTable* const mem GUARDED_BY(mu);
1056 : : MemTable* const imm GUARDED_BY(mu);
1057 : :
1058 : 1124 : IterState(port::Mutex* mutex, MemTable* mem, MemTable* imm, Version* version)
1059 : 1124 : : mu(mutex), version(version), mem(mem), imm(imm) {}
1060 : : };
1061 : :
1062 : 1124 : static void CleanupIteratorState(void* arg1, void* arg2) {
1063 : 1124 : IterState* state = reinterpret_cast<IterState*>(arg1);
1064 : 1124 : state->mu->Lock();
1065 : 1124 : state->mem->Unref();
1066 [ - + ]: 1124 : if (state->imm != nullptr) state->imm->Unref();
1067 : 1124 : state->version->Unref();
1068 : 1124 : state->mu->Unlock();
1069 [ + - ]: 1124 : delete state;
1070 : 1124 : }
1071 : :
1072 : : } // anonymous namespace
1073 : :
1074 : 1124 : Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
1075 : : SequenceNumber* latest_snapshot,
1076 : : uint32_t* seed) {
1077 : 1124 : mutex_.Lock();
1078 [ + - ]: 1124 : *latest_snapshot = versions_->LastSequence();
1079 : :
1080 : : // Collect together all needed child iterators
1081 : 1124 : std::vector<Iterator*> list;
1082 [ + - + - ]: 1124 : list.push_back(mem_->NewIterator());
1083 [ - + ]: 1124 : mem_->Ref();
1084 [ - + ]: 1124 : if (imm_ != nullptr) {
1085 [ # # # # ]: 0 : list.push_back(imm_->NewIterator());
1086 : 0 : imm_->Ref();
1087 : : }
1088 [ + - ]: 1124 : versions_->current()->AddIterators(options, &list);
1089 [ - + ]: 1124 : Iterator* internal_iter =
1090 [ + - ]: 1124 : NewMergingIterator(&internal_comparator_, &list[0], list.size());
1091 [ + - ]: 1124 : versions_->current()->Ref();
1092 : :
1093 [ + - + - ]: 1124 : IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current());
1094 [ + - ]: 1124 : internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
1095 : :
1096 : 1124 : *seed = ++seed_;
1097 : 1124 : mutex_.Unlock();
1098 : 1124 : return internal_iter;
1099 : 1124 : }
1100 : :
1101 : 0 : Iterator* DBImpl::TEST_NewInternalIterator() {
1102 : 0 : SequenceNumber ignored;
1103 : 0 : uint32_t ignored_seed;
1104 : 0 : return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed);
1105 : : }
1106 : :
1107 : 0 : int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
1108 : 0 : MutexLock l(&mutex_);
1109 [ # # ]: 0 : return versions_->MaxNextLevelOverlappingBytes();
1110 : 0 : }
1111 : :
1112 : 4722718 : Status DBImpl::Get(const ReadOptions& options, const Slice& key,
1113 : : std::string* value) {
1114 [ + - ]: 4722718 : Status s;
1115 [ + - ]: 4722718 : MutexLock l(&mutex_);
1116 : 4722718 : SequenceNumber snapshot;
1117 [ - + ]: 4722718 : if (options.snapshot != nullptr) {
1118 : 0 : snapshot =
1119 : 0 : static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
1120 : : } else {
1121 : 4722718 : snapshot = versions_->LastSequence();
1122 : : }
1123 : :
1124 : 4722718 : MemTable* mem = mem_;
1125 : 4722718 : MemTable* imm = imm_;
1126 [ + + ]: 4722718 : Version* current = versions_->current();
1127 [ + + ]: 4722718 : mem->Ref();
1128 [ + + ]: 4722718 : if (imm != nullptr) imm->Ref();
1129 [ + - ]: 4722718 : current->Ref();
1130 : :
1131 : 4722718 : bool have_stat_update = false;
1132 : 4722718 : Version::GetStats stats;
1133 : :
1134 : : // Unlock while reading from files and memtables
1135 : 4722718 : {
1136 : 4722718 : mutex_.Unlock();
1137 : : // First look in the memtable, then in the immutable memtable (if any).
1138 [ + - ]: 4722718 : LookupKey lkey(key, snapshot);
1139 [ + - + + ]: 4722718 : if (mem->Get(lkey, value, &s)) {
1140 : : // Done
1141 [ + + + - : 4560594 : } else if (imm != nullptr && imm->Get(lkey, value, &s)) {
+ + ]
1142 : : // Done
1143 : : } else {
1144 [ + - - + ]: 4556699 : s = current->Get(options, lkey, value, &stats);
1145 : 4556699 : have_stat_update = true;
1146 : : }
1147 [ + - ]: 4722718 : mutex_.Lock();
1148 : 4722718 : }
1149 : :
1150 [ + + + - : 4722718 : if (have_stat_update && current->UpdateStats(stats)) {
+ + ]
1151 [ + - ]: 6 : MaybeScheduleCompaction();
1152 : : }
1153 : 4722718 : mem->Unref();
1154 [ + + ]: 4722718 : if (imm != nullptr) imm->Unref();
1155 [ + - ]: 4722718 : current->Unref();
1156 : 4722718 : return s;
1157 : 4722718 : }
1158 : :
1159 : 1124 : Iterator* DBImpl::NewIterator(const ReadOptions& options) {
1160 : 1124 : SequenceNumber latest_snapshot;
1161 : 1124 : uint32_t seed;
1162 : 1124 : Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
1163 : 1124 : return NewDBIterator(this, user_comparator(), iter,
1164 [ - + ]: 1124 : (options.snapshot != nullptr
1165 : 0 : ? static_cast<const SnapshotImpl*>(options.snapshot)
1166 : 0 : ->sequence_number()
1167 : : : latest_snapshot),
1168 : 1124 : seed);
1169 : : }
1170 : :
1171 : 7 : void DBImpl::RecordReadSample(Slice key) {
1172 : 7 : MutexLock l(&mutex_);
1173 [ + - - + ]: 7 : if (versions_->current()->RecordReadSample(key)) {
1174 [ # # ]: 0 : MaybeScheduleCompaction();
1175 : : }
1176 : 7 : }
1177 : :
1178 : 0 : const Snapshot* DBImpl::GetSnapshot() {
1179 : 0 : MutexLock l(&mutex_);
1180 [ # # ]: 0 : return snapshots_.New(versions_->LastSequence());
1181 : 0 : }
1182 : :
1183 : 0 : void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
1184 : 0 : MutexLock l(&mutex_);
1185 : 0 : snapshots_.Delete(static_cast<const SnapshotImpl*>(snapshot));
1186 : 0 : }
1187 : :
1188 : : // Convenience methods
1189 : 0 : Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
1190 : 0 : return DB::Put(o, key, val);
1191 : : }
1192 : :
1193 : 0 : Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
1194 : 0 : return DB::Delete(options, key);
1195 : : }
1196 : :
1197 : 1391 : Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
1198 : 1391 : Writer w(&mutex_);
1199 : 1391 : w.batch = updates;
1200 : 1391 : w.sync = options.sync;
1201 : 1391 : w.done = false;
1202 : :
1203 [ + - ]: 1391 : MutexLock l(&mutex_);
1204 [ + - ]: 1391 : writers_.push_back(&w);
1205 [ + - - + ]: 1391 : while (!w.done && &w != writers_.front()) {
1206 [ # # ]: 0 : w.cv.Wait();
1207 : : }
1208 [ - + ]: 1391 : if (w.done) {
1209 [ # # ]: 0 : return w.status;
1210 : : }
1211 : :
1212 : : // May temporarily unlock and wait.
1213 [ + - ]: 1391 : Status status = MakeRoomForWrite(updates == nullptr);
1214 [ + - ]: 1391 : uint64_t last_sequence = versions_->LastSequence();
1215 : 1391 : Writer* last_writer = &w;
1216 [ + - + - ]: 1391 : if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
1217 [ + - ]: 1391 : WriteBatch* write_batch = BuildBatchGroup(&last_writer);
1218 [ + - ]: 1391 : WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
1219 [ + - ]: 1391 : last_sequence += WriteBatchInternal::Count(write_batch);
1220 : :
1221 : : // Add to log and apply to memtable. We can release the lock
1222 : : // during this phase since &w is currently responsible for logging
1223 : : // and protects against concurrent loggers and concurrent writes
1224 : : // into mem_.
1225 : 1391 : {
1226 : 1391 : mutex_.Unlock();
1227 [ - + + - : 1391 : status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
- + ]
1228 : 1391 : bool sync_error = false;
1229 [ + - + + ]: 1391 : if (status.ok() && options.sync) {
1230 [ + - - + ]: 86 : status = logfile_->Sync();
1231 [ - + ]: 86 : if (!status.ok()) {
1232 : 0 : sync_error = true;
1233 : : }
1234 : : }
1235 [ + - ]: 1391 : if (status.ok()) {
1236 [ + - - + ]: 1391 : status = WriteBatchInternal::InsertInto(write_batch, mem_);
1237 : : }
1238 [ + - ]: 1391 : mutex_.Lock();
1239 [ - + ]: 1391 : if (sync_error) {
1240 : : // The state of the log file is indeterminate: the log record we
1241 : : // just added may or may not show up when the DB is re-opened.
1242 : : // So we force the DB into a mode where all future writes fail.
1243 [ # # ]: 0 : RecordBackgroundError(status);
1244 : : }
1245 : : }
1246 [ - + - - ]: 1391 : if (write_batch == tmp_batch_) tmp_batch_->Clear();
1247 : :
1248 : 1391 : versions_->SetLastSequence(last_sequence);
1249 : : }
1250 : :
1251 : 1391 : while (true) {
1252 : 1391 : Writer* ready = writers_.front();
1253 : 1391 : writers_.pop_front();
1254 [ - + ]: 1391 : if (ready != &w) {
1255 [ # # ]: 0 : ready->status = status;
1256 : 0 : ready->done = true;
1257 : 0 : ready->cv.Signal();
1258 : : }
1259 [ - + ]: 1391 : if (ready == last_writer) break;
1260 : : }
1261 : :
1262 : : // Notify new head of write queue
1263 [ - + ]: 1391 : if (!writers_.empty()) {
1264 : 0 : writers_.front()->cv.Signal();
1265 : : }
1266 : :
1267 : 1391 : return status;
1268 : 2782 : }
1269 : :
1270 : : // REQUIRES: Writer list must be non-empty
1271 : : // REQUIRES: First writer must have a non-null batch
1272 : 1391 : WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
1273 : 1391 : mutex_.AssertHeld();
1274 [ - + ]: 1391 : assert(!writers_.empty());
1275 [ - + ]: 1391 : Writer* first = writers_.front();
1276 : 1391 : WriteBatch* result = first->batch;
1277 [ - + ]: 1391 : assert(result != nullptr);
1278 : :
1279 [ - + ]: 1391 : size_t size = WriteBatchInternal::ByteSize(first->batch);
1280 : :
1281 : : // Allow the group to grow up to a maximum size, but if the
1282 : : // original write is small, limit the growth so we do not slow
1283 : : // down the small write too much.
1284 : 1391 : size_t max_size = 1 << 20;
1285 [ + + ]: 1391 : if (size <= (128 << 10)) {
1286 : 1390 : max_size = size + (128 << 10);
1287 : : }
1288 : :
1289 : 1391 : *last_writer = first;
1290 : 1391 : std::deque<Writer*>::iterator iter = writers_.begin();
1291 : 1391 : ++iter; // Advance past "first"
1292 [ - + ]: 1391 : for (; iter != writers_.end(); ++iter) {
1293 : 0 : Writer* w = *iter;
1294 [ # # # # ]: 0 : if (w->sync && !first->sync) {
1295 : : // Do not include a sync write into a batch handled by a non-sync write.
1296 : : break;
1297 : : }
1298 : :
1299 [ # # ]: 0 : if (w->batch != nullptr) {
1300 [ # # ]: 0 : size += WriteBatchInternal::ByteSize(w->batch);
1301 [ # # ]: 0 : if (size > max_size) {
1302 : : // Do not make batch too big
1303 : : break;
1304 : : }
1305 : :
1306 : : // Append to *result
1307 [ # # ]: 0 : if (result == first->batch) {
1308 : : // Switch to temporary batch instead of disturbing caller's batch
1309 : 0 : result = tmp_batch_;
1310 [ # # ]: 0 : assert(WriteBatchInternal::Count(result) == 0);
1311 : 0 : WriteBatchInternal::Append(result, first->batch);
1312 : : }
1313 : 0 : WriteBatchInternal::Append(result, w->batch);
1314 : : }
1315 : 0 : *last_writer = w;
1316 : : }
1317 : 1391 : return result;
1318 : : }
1319 : :
1320 : : // REQUIRES: mutex_ is held
1321 : : // REQUIRES: this thread is currently at the front of the writer queue
1322 : 1391 : Status DBImpl::MakeRoomForWrite(bool force) {
1323 : 1391 : mutex_.AssertHeld();
1324 [ - + ]: 1391 : assert(!writers_.empty());
1325 : 1391 : bool allow_delay = !force;
1326 : 1391 : Status s;
1327 : 1392 : while (true) {
1328 : 1392 : if (!bg_error_.ok()) {
1329 : : // Yield previous error
1330 [ # # ]: 0 : s = bg_error_;
1331 : : break;
1332 [ + - + - : 1392 : } else if (allow_delay && versions_->NumLevelFiles(0) >=
- + ]
1333 : : config::kL0_SlowdownWritesTrigger) {
1334 : : // We are getting close to hitting a hard limit on the number of
1335 : : // L0 files. Rather than delaying a single write by several
1336 : : // seconds when we hit the hard limit, start delaying each
1337 : : // individual write by 1ms to reduce latency variance. Also,
1338 : : // this delay hands over some CPU to the compaction thread in
1339 : : // case it is sharing the same core as the writer.
1340 : 0 : mutex_.Unlock();
1341 [ # # ]: 0 : env_->SleepForMicroseconds(1000);
1342 : 0 : allow_delay = false; // Do not delay a single write more than once
1343 [ - + - - ]: 1392 : mutex_.Lock();
1344 [ + - ]: 1392 : } else if (!force &&
1345 [ + - + + ]: 1392 : (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
1346 : : // There is room in current memtable
1347 : : break;
1348 [ - + ]: 1 : } else if (imm_ != nullptr) {
1349 : : // We have filled up the current memtable, but the previous
1350 : : // one is still being compacted, so we wait.
1351 [ # # ]: 0 : Log(options_.info_log, "Current memtable full; waiting...\n");
1352 [ # # ]: 0 : background_work_finished_signal_.Wait();
1353 [ + - - + ]: 1 : } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
1354 : : // There are too many level-0 files.
1355 [ # # ]: 0 : Log(options_.info_log, "Too many L0 files; waiting...\n");
1356 [ # # ]: 0 : background_work_finished_signal_.Wait();
1357 : : } else {
1358 : : // Attempt to switch to a new memtable and trigger compaction of old
1359 [ - + ]: 1 : assert(versions_->PrevLogNumber() == 0);
1360 [ + - ]: 1 : uint64_t new_log_number = versions_->NewFileNumber();
1361 : 1 : WritableFile* lfile = nullptr;
1362 [ + - + - : 1 : s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
- + ]
1363 [ - + ]: 1 : if (!s.ok()) {
1364 : : // Avoid chewing through file number space in a tight loop.
1365 [ # # ]: 0 : versions_->ReuseFileNumber(new_log_number);
1366 : : break;
1367 : : }
1368 [ + - ]: 1 : delete log_;
1369 [ + - ]: 1 : delete logfile_;
1370 : 1 : logfile_ = lfile;
1371 : 1 : logfile_number_ = new_log_number;
1372 [ + - + - ]: 1 : log_ = new log::Writer(lfile);
1373 : 1 : imm_ = mem_;
1374 [ + - ]: 1 : has_imm_.store(true, std::memory_order_release);
1375 [ + - + - ]: 1 : mem_ = new MemTable(internal_comparator_);
1376 [ + - ]: 1 : mem_->Ref();
1377 : 1 : force = false; // Do not force another compaction if have room
1378 [ + - ]: 1 : MaybeScheduleCompaction();
1379 : : }
1380 : : }
1381 : 1391 : return s;
1382 : 0 : }
1383 : :
1384 : 0 : bool DBImpl::GetProperty(const Slice& property, std::string* value) {
1385 : 0 : value->clear();
1386 : :
1387 : 0 : MutexLock l(&mutex_);
1388 : 0 : Slice in = property;
1389 [ # # ]: 0 : Slice prefix("leveldb.");
1390 [ # # ]: 0 : if (!in.starts_with(prefix)) return false;
1391 : 0 : in.remove_prefix(prefix.size());
1392 : :
1393 [ # # ]: 0 : if (in.starts_with("num-files-at-level")) {
1394 : 0 : in.remove_prefix(strlen("num-files-at-level"));
1395 : 0 : uint64_t level;
1396 [ # # # # : 0 : bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
# # ]
1397 [ # # ]: 0 : if (!ok || level >= config::kNumLevels) {
1398 : : return false;
1399 : : } else {
1400 : 0 : char buf[100];
1401 [ # # ]: 0 : snprintf(buf, sizeof(buf), "%d",
1402 [ # # ]: 0 : versions_->NumLevelFiles(static_cast<int>(level)));
1403 [ # # ]: 0 : *value = buf;
1404 : : return true;
1405 : : }
1406 [ # # ]: 0 : } else if (in == "stats") {
1407 : 0 : char buf[200];
1408 [ # # ]: 0 : snprintf(buf, sizeof(buf),
1409 : : " Compactions\n"
1410 : : "Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
1411 : : "--------------------------------------------------\n");
1412 [ # # ]: 0 : value->append(buf);
1413 [ # # ]: 0 : for (int level = 0; level < config::kNumLevels; level++) {
1414 [ # # ]: 0 : int files = versions_->NumLevelFiles(level);
1415 [ # # # # ]: 0 : if (stats_[level].micros > 0 || files > 0) {
1416 [ # # ]: 0 : snprintf(buf, sizeof(buf), "%3d %8d %8.0f %9.0f %8.0f %9.0f\n", level,
1417 : 0 : files, versions_->NumLevelBytes(level) / 1048576.0,
1418 : : stats_[level].micros / 1e6,
1419 : 0 : stats_[level].bytes_read / 1048576.0,
1420 [ # # ]: 0 : stats_[level].bytes_written / 1048576.0);
1421 [ # # ]: 0 : value->append(buf);
1422 : : }
1423 : : }
1424 : : return true;
1425 [ # # ]: 0 : } else if (in == "sstables") {
1426 [ # # ]: 0 : *value = versions_->current()->DebugString();
1427 : 0 : return true;
1428 [ # # ]: 0 : } else if (in == "approximate-memory-usage") {
1429 [ # # ]: 0 : size_t total_usage = options_.block_cache->TotalCharge();
1430 [ # # ]: 0 : if (mem_) {
1431 [ # # ]: 0 : total_usage += mem_->ApproximateMemoryUsage();
1432 : : }
1433 [ # # ]: 0 : if (imm_) {
1434 [ # # ]: 0 : total_usage += imm_->ApproximateMemoryUsage();
1435 : : }
1436 : 0 : char buf[50];
1437 [ # # ]: 0 : snprintf(buf, sizeof(buf), "%llu",
1438 : : static_cast<unsigned long long>(total_usage));
1439 [ # # ]: 0 : value->append(buf);
1440 : : return true;
1441 : : }
1442 : :
1443 : : return false;
1444 : 0 : }
1445 : :
1446 : 46 : void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
1447 : : // TODO(opt): better implementation
1448 : 46 : MutexLock l(&mutex_);
1449 [ + - ]: 46 : Version* v = versions_->current();
1450 [ + - ]: 46 : v->Ref();
1451 : :
1452 [ + + ]: 92 : for (int i = 0; i < n; i++) {
1453 : : // Convert user_key into a corresponding internal key.
1454 [ + - ]: 46 : InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
1455 [ + - ]: 46 : InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
1456 [ + - ]: 46 : uint64_t start = versions_->ApproximateOffsetOf(v, k1);
1457 [ + - ]: 46 : uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
1458 [ + - ]: 46 : sizes[i] = (limit >= start ? limit - start : 0);
1459 : 46 : }
1460 : :
1461 [ + - ]: 46 : v->Unref();
1462 : 46 : }
1463 : :
1464 : : // Default implementations of convenience methods that subclasses of DB
1465 : : // can call if they wish
1466 : 0 : Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
1467 : 0 : WriteBatch batch;
1468 [ # # ]: 0 : batch.Put(key, value);
1469 [ # # ]: 0 : return Write(opt, &batch);
1470 : 0 : }
1471 : :
1472 : 0 : Status DB::Delete(const WriteOptions& opt, const Slice& key) {
1473 : 0 : WriteBatch batch;
1474 [ # # ]: 0 : batch.Delete(key);
1475 [ # # ]: 0 : return Write(opt, &batch);
1476 : 0 : }
1477 : :
1478 : 489 : DB::~DB() = default;
1479 : :
1480 : 489 : Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
1481 : 489 : *dbptr = nullptr;
1482 : :
1483 [ + - ]: 489 : DBImpl* impl = new DBImpl(options, dbname);
1484 : 489 : impl->mutex_.Lock();
1485 : 489 : VersionEdit edit;
1486 : : // Recover handles create_if_missing, error_if_exists
1487 : 489 : bool save_manifest = false;
1488 [ + - ]: 489 : Status s = impl->Recover(&edit, &save_manifest);
1489 [ + - + - ]: 489 : if (s.ok() && impl->mem_ == nullptr) {
1490 : : // Create new log and a corresponding memtable.
1491 [ + - ]: 489 : uint64_t new_log_number = impl->versions_->NewFileNumber();
1492 : 489 : WritableFile* lfile;
1493 [ + - + - ]: 978 : s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
1494 [ - + ]: 489 : &lfile);
1495 [ + - ]: 489 : if (s.ok()) {
1496 [ + - ]: 489 : edit.SetLogNumber(new_log_number);
1497 : 489 : impl->logfile_ = lfile;
1498 : 489 : impl->logfile_number_ = new_log_number;
1499 [ + - + - ]: 489 : impl->log_ = new log::Writer(lfile);
1500 [ + - + - ]: 489 : impl->mem_ = new MemTable(impl->internal_comparator_);
1501 : 489 : impl->mem_->Ref();
1502 : : }
1503 : : }
1504 [ + - + - ]: 489 : if (s.ok() && save_manifest) {
1505 [ + - ]: 489 : edit.SetPrevLogNumber(0); // No older logs needed after recovery.
1506 [ + - ]: 489 : edit.SetLogNumber(impl->logfile_number_);
1507 [ + - - + ]: 489 : s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
1508 : : }
1509 [ + - ]: 489 : if (s.ok()) {
1510 [ + - ]: 489 : impl->DeleteObsoleteFiles();
1511 [ + - ]: 489 : impl->MaybeScheduleCompaction();
1512 : : }
1513 : 489 : impl->mutex_.Unlock();
1514 [ + - ]: 489 : if (s.ok()) {
1515 [ - + ]: 489 : assert(impl->mem_ != nullptr);
1516 : 489 : *dbptr = impl;
1517 : : } else {
1518 [ # # ]: 0 : delete impl;
1519 : : }
1520 : 489 : return s;
1521 : 489 : }
1522 : :
1523 : 489 : Snapshot::~Snapshot() = default;
1524 : :
1525 : 21 : Status DestroyDB(const std::string& dbname, const Options& options) {
1526 : 21 : Env* env = options.env;
1527 : 21 : std::vector<std::string> filenames;
1528 [ + - ]: 21 : Status result = env->GetChildren(dbname, &filenames);
1529 [ + + ]: 21 : if (!result.ok()) {
1530 : : // Ignore error in case directory does not exist
1531 : 3 : return Status::OK();
1532 : : }
1533 : :
1534 : 18 : FileLock* lock;
1535 [ + - ]: 18 : const std::string lockname = LockFileName(dbname);
1536 [ + - - + ]: 18 : result = env->LockFile(lockname, &lock);
1537 [ + - ]: 18 : if (result.ok()) {
1538 : : uint64_t number;
1539 : : FileType type;
1540 [ - + + + ]: 129 : for (size_t i = 0; i < filenames.size(); i++) {
1541 [ + - + + ]: 111 : if (ParseFileName(filenames[i], &number, &type) &&
1542 [ + + ]: 75 : type != kDBLockFile) { // Lock file will be deleted at end
1543 [ + - + - : 114 : Status del = env->DeleteFile(dbname + "/" + filenames[i]);
+ - ]
1544 [ + - - + ]: 57 : if (result.ok() && !del.ok()) {
1545 [ # # ]: 0 : result = del;
1546 : : }
1547 : 57 : }
1548 : : }
1549 [ + - ]: 18 : env->UnlockFile(lock); // Ignore error since state is already gone
1550 [ + - ]: 18 : env->DeleteFile(lockname);
1551 [ + - ]: 36 : env->DeleteDir(dbname); // Ignore error in case dir contains other files
1552 : : }
1553 : 18 : return result;
1554 : 39 : }
1555 : :
1556 : : } // namespace leveldb
|