Branch data Line data Source code
1 : : // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 : : // Use of this source code is governed by a BSD-style license that can be
3 : : // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 : :
5 : : #include "db/log_reader.h"
6 : :
7 : : #include <stdio.h>
8 : :
9 : : #include "leveldb/env.h"
10 : : #include "util/coding.h"
11 : : #include "util/crc32c.h"
12 : :
13 : : namespace leveldb {
14 : : namespace log {
15 : :
16 : 4609 : Reader::Reporter::~Reporter() = default;
17 : :
18 : 4609 : Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum,
19 : 4609 : uint64_t initial_offset)
20 : 4609 : : file_(file),
21 : 4609 : reporter_(reporter),
22 : 4609 : checksum_(checksum),
23 : 4609 : backing_store_(new char[kBlockSize]),
24 : 4609 : buffer_(),
25 : 4609 : eof_(false),
26 : 4609 : last_record_offset_(0),
27 : 4609 : end_of_buffer_offset_(0),
28 : 4609 : initial_offset_(initial_offset),
29 : 4609 : resyncing_(initial_offset > 0) {}
30 : :
31 [ + - ]: 4609 : Reader::~Reader() { delete[] backing_store_; }
32 : :
33 : 0 : bool Reader::SkipToInitialBlock() {
34 : 0 : const size_t offset_in_block = initial_offset_ % kBlockSize;
35 : 0 : uint64_t block_start_location = initial_offset_ - offset_in_block;
36 : :
37 : : // Don't search a block if we'd be in the trailer
38 [ # # ]: 0 : if (offset_in_block > kBlockSize - 6) {
39 : 0 : block_start_location += kBlockSize;
40 : : }
41 : :
42 : 0 : end_of_buffer_offset_ = block_start_location;
43 : :
44 : : // Skip to start of first block that can contain the initial record
45 [ # # ]: 0 : if (block_start_location > 0) {
46 : 0 : Status skip_status = file_->Skip(block_start_location);
47 [ # # ]: 0 : if (!skip_status.ok()) {
48 [ # # ]: 0 : ReportDrop(block_start_location, skip_status);
49 [ # # ]: 0 : return false;
50 : : }
51 : 0 : }
52 : :
53 : : return true;
54 : : }
55 : :
56 : 23071 : bool Reader::ReadRecord(Slice* record, std::string* scratch) {
57 [ - + ]: 23071 : if (last_record_offset_ < initial_offset_) {
58 [ # # ]: 0 : if (!SkipToInitialBlock()) {
59 : : return false;
60 : : }
61 : : }
62 : :
63 : 23071 : scratch->clear();
64 : 23071 : record->clear();
65 : 23071 : bool in_fragmented_record = false;
66 : : // Record offset of the logical record that we're reading
67 : : // 0 is a dummy value to make compilers happy
68 : 23071 : uint64_t prospective_record_offset = 0;
69 : :
70 : 23071 : Slice fragment;
71 : 23244 : while (true) {
72 : 23244 : const unsigned int record_type = ReadPhysicalRecord(&fragment);
73 : :
74 : : // ReadPhysicalRecord may have only had an empty trailer remaining in its
75 : : // internal buffer. Calculate the offset of the next physical record now
76 : : // that it has returned, properly accounting for its header size.
77 : 23244 : uint64_t physical_record_offset =
78 [ - + ]: 23244 : end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size();
79 : :
80 [ - + ]: 23244 : if (resyncing_) {
81 [ # # ]: 0 : if (record_type == kMiddleType) {
82 : 0 : continue;
83 [ # # ]: 0 : } else if (record_type == kLastType) {
84 : 0 : resyncing_ = false;
85 : 0 : continue;
86 : : } else {
87 : 0 : resyncing_ = false;
88 : : }
89 : : }
90 : :
91 [ + + + + : 23244 : switch (record_type) {
+ + - ]
92 : 18372 : case kFullType:
93 [ - + ]: 18372 : if (in_fragmented_record) {
94 : : // Handle bug in earlier versions of log::Writer where
95 : : // it could emit an empty kFirstType record at the tail end
96 : : // of a block followed by a kFullType or kFirstType record
97 : : // at the beginning of the next block.
98 [ # # ]: 0 : if (!scratch->empty()) {
99 [ # # ]: 0 : ReportCorruption(scratch->size(), "partial record without end(1)");
100 : : }
101 : : }
102 : 18372 : prospective_record_offset = physical_record_offset;
103 : 18372 : scratch->clear();
104 : 18372 : *record = fragment;
105 : 18372 : last_record_offset_ = prospective_record_offset;
106 : 18372 : return true;
107 : :
108 : 90 : case kFirstType:
109 [ - + ]: 90 : if (in_fragmented_record) {
110 : : // Handle bug in earlier versions of log::Writer where
111 : : // it could emit an empty kFirstType record at the tail end
112 : : // of a block followed by a kFullType or kFirstType record
113 : : // at the beginning of the next block.
114 [ # # ]: 0 : if (!scratch->empty()) {
115 [ # # ]: 0 : ReportCorruption(scratch->size(), "partial record without end(2)");
116 : : }
117 : : }
118 : 90 : prospective_record_offset = physical_record_offset;
119 : 90 : scratch->assign(fragment.data(), fragment.size());
120 : 90 : in_fragmented_record = true;
121 : 90 : break;
122 : :
123 : 80 : case kMiddleType:
124 [ - + ]: 80 : if (!in_fragmented_record) {
125 : 0 : ReportCorruption(fragment.size(),
126 : : "missing start of fragmented record(1)");
127 : : } else {
128 : 80 : scratch->append(fragment.data(), fragment.size());
129 : : }
130 : : break;
131 : :
132 : 90 : case kLastType:
133 [ - + ]: 90 : if (!in_fragmented_record) {
134 : 0 : ReportCorruption(fragment.size(),
135 : : "missing start of fragmented record(2)");
136 : : } else {
137 : 90 : scratch->append(fragment.data(), fragment.size());
138 [ - + ]: 90 : *record = Slice(*scratch);
139 : 90 : last_record_offset_ = prospective_record_offset;
140 : 90 : return true;
141 : : }
142 : 0 : break;
143 : :
144 : 4609 : case kEof:
145 [ - + ]: 4609 : if (in_fragmented_record) {
146 : : // This can be caused by the writer dying immediately after
147 : : // writing a physical record but before completing the next; don't
148 : : // treat it as a corruption, just ignore the entire logical record.
149 : 0 : scratch->clear();
150 : : }
151 : : return false;
152 : :
153 : 3 : case kBadRecord:
154 [ + - ]: 3 : if (in_fragmented_record) {
155 [ # # ]: 0 : ReportCorruption(scratch->size(), "error in middle of record");
156 : 0 : in_fragmented_record = false;
157 : 0 : scratch->clear();
158 : : }
159 : : break;
160 : :
161 : 0 : default: {
162 : 0 : char buf[40];
163 [ # # ]: 0 : snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
164 [ # # ]: 0 : ReportCorruption(
165 [ # # ]: 0 : (fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
166 : : buf);
167 : 0 : in_fragmented_record = false;
168 : 0 : scratch->clear();
169 : 0 : break;
170 : : }
171 : : }
172 : : }
173 : : return false;
174 : : }
175 : :
176 : 0 : uint64_t Reader::LastRecordOffset() { return last_record_offset_; }
177 : :
178 : 3 : void Reader::ReportCorruption(uint64_t bytes, const char* reason) {
179 [ - + + - : 6 : ReportDrop(bytes, Status::Corruption(reason, file_->GetName()));
+ - ]
180 : 3 : }
181 : :
182 : 3 : void Reader::ReportDrop(uint64_t bytes, const Status& reason) {
183 [ + - + - ]: 3 : if (reporter_ != nullptr &&
184 [ + - ]: 3 : end_of_buffer_offset_ - buffer_.size() - bytes >= initial_offset_) {
185 : 3 : reporter_->Corruption(static_cast<size_t>(bytes), reason);
186 : : }
187 : 3 : }
188 : :
189 : 23244 : unsigned int Reader::ReadPhysicalRecord(Slice* result) {
190 : 32810 : while (true) {
191 [ + + ]: 28027 : if (buffer_.size() < kHeaderSize) {
192 [ + + ]: 9392 : if (!eof_) {
193 : : // Last read was a full read, so this is a trailer to skip
194 : 4783 : buffer_.clear();
195 : 4783 : Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
196 [ - + ]: 4783 : end_of_buffer_offset_ += buffer_.size();
197 [ - + ]: 4783 : if (!status.ok()) {
198 [ # # ]: 0 : buffer_.clear();
199 [ # # ]: 0 : ReportDrop(kBlockSize, status);
200 : 0 : eof_ = true;
201 [ # # ]: 0 : return kEof;
202 [ + + ]: 4783 : } else if (buffer_.size() < kBlockSize) {
203 : 4609 : eof_ = true;
204 : : }
205 : 4783 : continue;
206 : 4783 : } else {
207 : : // Note that if buffer_ is non-empty, we have a truncated header at the
208 : : // end of the file, which can be caused by the writer crashing in the
209 : : // middle of writing the header. Instead of considering this an error,
210 : : // just report EOF.
211 : 4609 : buffer_.clear();
212 : 4609 : return kEof;
213 : : }
214 : : }
215 : :
216 : : // Parse the header
217 [ - + ]: 18635 : const char* header = buffer_.data();
218 : 18635 : const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
219 : 18635 : const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
220 : 18635 : const unsigned int type = header[6];
221 : 18635 : const uint32_t length = a | (b << 8);
222 [ - + ]: 18635 : if (kHeaderSize + length > buffer_.size()) {
223 : 0 : size_t drop_size = buffer_.size();
224 [ # # ]: 0 : buffer_.clear();
225 [ # # ]: 0 : if (!eof_) {
226 : 0 : ReportCorruption(drop_size, "bad record length");
227 : 0 : return kBadRecord;
228 : : }
229 : : // If the end of the file has been reached without reading |length| bytes
230 : : // of payload, assume the writer died in the middle of writing the record.
231 : : // Don't report a corruption.
232 : : return kEof;
233 : : }
234 : :
235 [ - + ]: 18635 : if (type == kZeroType && length == 0) {
236 : : // Skip zero length record without reporting any drops since
237 : : // such records are produced by the mmap based writing code in
238 : : // env_posix.cc that preallocates file regions.
239 : 0 : buffer_.clear();
240 : 0 : return kBadRecord;
241 : : }
242 : :
243 : : // Check crc
244 [ + - ]: 18635 : if (checksum_) {
245 : 18635 : uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
246 : 18635 : uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);
247 [ + + ]: 18635 : if (actual_crc != expected_crc) {
248 : : // Drop the rest of the buffer since "length" itself may have
249 : : // been corrupted and if we trust it, we could find some
250 : : // fragment of a real log record that just happens to look
251 : : // like a valid log record.
252 : 3 : size_t drop_size = buffer_.size();
253 : 3 : buffer_.clear();
254 : 3 : ReportCorruption(drop_size, "checksum mismatch");
255 : 3 : return kBadRecord;
256 : : }
257 : : }
258 : :
259 : 18632 : buffer_.remove_prefix(kHeaderSize + length);
260 : :
261 : : // Skip physical record that started before initial_offset_
262 [ - + ]: 18632 : if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
263 [ - + ]: 18632 : initial_offset_) {
264 : 0 : result->clear();
265 : 0 : return kBadRecord;
266 : : }
267 : :
268 : 18632 : *result = Slice(header + kHeaderSize, length);
269 : 18632 : return type;
270 : : }
271 : : }
272 : :
273 : : } // namespace log
274 : : } // namespace leveldb
|