Skip to content

Commit

Permalink
Properly report IO errors when IndexType::kBinarySearchWithFirstKey i…
Browse files Browse the repository at this point in the history
…s used (#6621)

Summary:
Context: Index type `kBinarySearchWithFirstKey` added the ability for sst file iterator to sometimes report a key from index without reading the corresponding data block. This is useful when sst blocks are cut at some meaningful boundaries (e.g. one block per key prefix), and many seeks land between blocks (e.g. for each prefix, the ranges of keys in different sst files are nearly disjoint, so a typical seek needs to read a data block from only one file even if all files have the prefix). But this added a new error condition, which rocksdb code was really not equipped to deal with: `InternalIterator::value()` may fail with an IO error or Status::Incomplete, but it's just a method returning a Slice, with no way to report error instead. Before this PR, this type of error wasn't handled at all (an empty slice was returned), and kBinarySearchWithFirstKey implementation was considered a prototype.

Now that we (LogDevice) have experimented with kBinarySearchWithFirstKey for a while and confirmed that it's really useful, this PR is adding the missing error handling.

It's a pretty inconvenient situation implementation-wise. The error needs to be reported from InternalIterator when trying to access value. But there are ~700 call sites of `InternalIterator::value()`, most of which either can't hit the error condition (because the iterator is reading from memtable or from index or something) or wouldn't benefit from the deferred loading of the value (e.g. compaction iterator that reads all values anyway). Adding error handling to all these call sites would needlessly bloat the code. So instead I made the deferred value loading optional: only the call sites that may use deferred loading have to call the new method `PrepareValue()` before calling `value()`. The feature is enabled with a new bool argument `allow_unprepared_value` to a bunch of methods that create iterators (it wouldn't make sense to put it in ReadOptions because it's completely internal to iterators, with virtually no user-visible effect). Lmk if you have better ideas.

Note that the deferred value loading only happens for *internal* iterators. The user-visible iterator (DBIter) always prepares the value before returning from Seek/Next/etc. We could go further and add an API to defer that value loading too, but that's most likely not useful for LogDevice, so it doesn't seem worth the complexity for now.
Pull Request resolved: #6621

Test Plan: make -j5 check . Will also deploy to some logdevice test clusters and look at stats.

Reviewed By: siying

Differential Revision: D20786930

Pulled By: al13n321

fbshipit-source-id: 6da77d918bad3780522e918f17f4d5513d3e99ee
  • Loading branch information
al13n321 authored and facebook-github-bot committed Apr 16, 2020
1 parent 610a09c commit e45673d
Show file tree
Hide file tree
Showing 34 changed files with 295 additions and 95 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
## Unreleased
### Bug Fixes
* Fix wrong result being read from ingested file. May happen when a key in the file happen to be prefix of another key also in the file. The issue can further cause more data corruption. The issue exists with rocksdb >= 5.0.0 since DB::IngestExternalFile() was introduced.
* Finish implementation of BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey. It's now ready for use. Significantly reduces read amplification in some setups, especially for iterator seeks.

### Public API Change
* Add NewFileChecksumGenCrc32cFactory to the file checksum public API, such that the builtin Crc32c based file checksum generator factory can be used by applications.
Expand Down
2 changes: 1 addition & 1 deletion db/arena_wrapped_db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Status ArenaWrappedDBIter::Refresh() {

InternalIterator* internal_iter = db_impl_->NewInternalIterator(
read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator(),
latest_seq);
latest_seq, /* allow_unprepared_value */ true);
SetIterUnderDBIter(internal_iter);
} else {
db_iter_->set_sequence(latest_seq);
Expand Down
3 changes: 2 additions & 1 deletion db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ Status BuildTable(
: internal_stats->GetFileReadHist(0),
TableReaderCaller::kFlush, /*arena=*/nullptr,
/*skip_filter=*/false, level, /*smallest_compaction_key=*/nullptr,
/*largest_compaction_key*/ nullptr));
/*largest_compaction_key*/ nullptr,
/*allow_unprepared_value*/ false));
s = it->status();
if (s.ok() && paranoid_file_checks) {
for (it->SeekToFirst(); it->Valid(); it->Next()) {
Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,8 @@ Status CompactionJob::Run() {
TableReaderCaller::kCompactionRefill, /*arena=*/nullptr,
/*skip_filters=*/false, compact_->compaction->output_level(),
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr);
/*largest_compaction_key=*/nullptr,
/*allow_unprepared_value=*/false);
auto s = iter->status();

if (s.ok() && paranoid_file_checks_) {
Expand Down
19 changes: 12 additions & 7 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1313,7 +1313,7 @@ bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) {

InternalIterator* DBImpl::NewInternalIterator(
Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence,
ColumnFamilyHandle* column_family) {
ColumnFamilyHandle* column_family, bool allow_unprepared_value) {
ColumnFamilyData* cfd;
if (column_family == nullptr) {
cfd = default_cf_handle_->cfd();
Expand All @@ -1327,7 +1327,7 @@ InternalIterator* DBImpl::NewInternalIterator(
mutex_.Unlock();
ReadOptions roptions;
return NewInternalIterator(roptions, cfd, super_version, arena, range_del_agg,
sequence);
sequence, allow_unprepared_value);
}

void DBImpl::SchedulePurge() {
Expand Down Expand Up @@ -1450,7 +1450,8 @@ InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
SuperVersion* super_version,
Arena* arena,
RangeDelAggregator* range_del_agg,
SequenceNumber sequence) {
SequenceNumber sequence,
bool allow_unprepared_value) {
InternalIterator* internal_iter;
assert(arena != nullptr);
assert(range_del_agg != nullptr);
Expand Down Expand Up @@ -1482,7 +1483,8 @@ InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
// Collect iterators for files in L0 - Ln
if (read_options.read_tier != kMemtableTier) {
super_version->current->AddIterators(read_options, file_options_,
&merge_iter_builder, range_del_agg);
&merge_iter_builder, range_del_agg,
allow_unprepared_value);
}
internal_iter = merge_iter_builder.Finish();
IterState* cleanup =
Expand Down Expand Up @@ -2548,7 +2550,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,

#else
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
auto iter = new ForwardIterator(this, read_options, cfd, sv);
auto iter = new ForwardIterator(this, read_options, cfd, sv,
/* allow_unprepared_value */ true);
result = NewDBIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
cfd->user_comparator(), iter, kMaxSequenceNumber,
Expand Down Expand Up @@ -2625,7 +2628,8 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,

InternalIterator* internal_iter =
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
db_iter->GetRangeDelAggregator(), snapshot);
db_iter->GetRangeDelAggregator(), snapshot,
/* allow_unprepared_value */ true);
db_iter->SetIterUnderDBIter(internal_iter);

return db_iter;
Expand Down Expand Up @@ -2653,7 +2657,8 @@ Status DBImpl::NewIterators(
for (auto cfh : column_families) {
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
auto iter = new ForwardIterator(this, read_options, cfd, sv);
auto iter = new ForwardIterator(this, read_options, cfd, sv,
/* allow_unprepared_value */ true);
iterators->push_back(NewDBIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
cfd->user_comparator(), iter, kMaxSequenceNumber,
Expand Down
10 changes: 8 additions & 2 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -581,9 +581,14 @@ class DBImpl : public DB {
// Return an internal iterator over the current state of the database.
// The keys of this iterator are internal keys (see format.h).
// The returned iterator should be deleted when no longer needed.
// If allow_unprepared_value is true, the returned iterator may defer reading
// the value and so will require PrepareValue() to be called before value();
// allow_unprepared_value = false is convenient when this optimization is not
// useful, e.g. when reading the whole column family.
InternalIterator* NewInternalIterator(
Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence,
ColumnFamilyHandle* column_family = nullptr);
ColumnFamilyHandle* column_family = nullptr,
bool allow_unprepared_value = false);

LogsWithPrepTracker* logs_with_prep_tracker() {
return &logs_with_prep_tracker_;
Expand Down Expand Up @@ -709,7 +714,8 @@ class DBImpl : public DB {

InternalIterator* NewInternalIterator(
const ReadOptions&, ColumnFamilyData* cfd, SuperVersion* super_version,
Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence);
Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence,
bool allow_unprepared_value);

// hollow transactions shell used for recovery.
// these will then be passed to TransactionDB so that
Expand Down
6 changes: 4 additions & 2 deletions db/db_impl/db_impl_readonly.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
super_version->version_number, read_callback);
auto internal_iter =
NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(),
db_iter->GetRangeDelAggregator(), read_seq);
db_iter->GetRangeDelAggregator(), read_seq,
/* allow_unprepared_value */ true);
db_iter->SetIterUnderDBIter(internal_iter);
return db_iter;
}
Expand Down Expand Up @@ -118,7 +119,8 @@ Status DBImplReadOnly::NewIterators(
sv->version_number, read_callback);
auto* internal_iter =
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
db_iter->GetRangeDelAggregator(), read_seq);
db_iter->GetRangeDelAggregator(), read_seq,
/* allow_unprepared_value */ true);
db_iter->SetIterUnderDBIter(internal_iter);
iterators->push_back(db_iter);
}
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,8 @@ ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
super_version->version_number, read_callback);
auto internal_iter =
NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(),
db_iter->GetRangeDelAggregator(), snapshot);
db_iter->GetRangeDelAggregator(), snapshot,
/* allow_unprepared_value */ true);
db_iter->SetIterUnderDBIter(internal_iter);
return db_iter;
}
Expand Down
35 changes: 31 additions & 4 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,11 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
} else {
assert(!skipping_saved_key ||
CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) > 0);
if (!iter_.PrepareValue()) {
assert(!iter_.status().ok());
valid_ = false;
return false;
}
num_skipped = 0;
reseek_done = false;
switch (ikey_.type) {
Expand Down Expand Up @@ -452,6 +457,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
// Scan from the newer entries to older entries.
// PRE: iter_.key() points to the first merge type entry
// saved_key_ stores the user key
// iter_.PrepareValue() has been called
// POST: saved_value_ has the merged value for the user key
// iter_ points to the next entry (or invalid)
bool DBIter::MergeValuesNewToOld() {
Expand Down Expand Up @@ -481,14 +487,21 @@ bool DBIter::MergeValuesNewToOld() {
if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
// hit the next user key, stop right here
break;
} else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
}
if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
range_del_agg_.ShouldDelete(
ikey, RangeDelPositioningMode::kForwardTraversal)) {
// hit a delete with the same user key, stop right here
// iter_ is positioned after delete
iter_.Next();
break;
} else if (kTypeValue == ikey.type) {
}
if (!iter_.PrepareValue()) {
valid_ = false;
return false;
}

if (kTypeValue == ikey.type) {
// hit a put, merge the put value with operands and store the
// final result in saved_value_. We are done!
const Slice val = iter_.value();
Expand Down Expand Up @@ -760,6 +773,11 @@ bool DBIter::FindValueForCurrentKey() {
return FindValueForCurrentKeyUsingSeek();
}

if (!iter_.PrepareValue()) {
valid_ = false;
return false;
}

last_key_entry_type = ikey.type;
switch (last_key_entry_type) {
case kTypeValue:
Expand Down Expand Up @@ -937,6 +955,10 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
valid_ = false;
return false;
}
if (!iter_.PrepareValue()) {
valid_ = false;
return false;
}
if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) {
assert(iter_.iter()->IsValuePinned());
pinned_value_ = iter_.value();
Expand Down Expand Up @@ -968,12 +990,17 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
break;
}

if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
range_del_agg_.ShouldDelete(
ikey, RangeDelPositioningMode::kForwardTraversal)) {
break;
} else if (ikey.type == kTypeValue) {
}
if (!iter_.PrepareValue()) {
valid_ = false;
return false;
}

if (ikey.type == kTypeValue) {
const Slice val = iter_.value();
Status s = MergeHelper::TimedFullMerge(
merge_operator_, saved_key_.GetUserKey(), &val,
Expand Down
50 changes: 40 additions & 10 deletions db/db_iterator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1167,32 +1167,62 @@ TEST_P(DBIteratorTest, IndexWithFirstKey) {
ropt.tailing = tailing;
std::unique_ptr<Iterator> iter(NewIterator(ropt));

ropt.read_tier = ReadTier::kBlockCacheTier;
std::unique_ptr<Iterator> nonblocking_iter(NewIterator(ropt));

iter->Seek("b10");
ASSERT_TRUE(iter->Valid());
EXPECT_EQ("b2", iter->key().ToString());
EXPECT_EQ("y2", iter->value().ToString());
EXPECT_EQ(1, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));

// The cache-only iterator should succeed too, using the blocks pulled into
// the cache by the previous iterator.
nonblocking_iter->Seek("b10");
ASSERT_TRUE(nonblocking_iter->Valid());
EXPECT_EQ("b2", nonblocking_iter->key().ToString());
EXPECT_EQ("y2", nonblocking_iter->value().ToString());
EXPECT_EQ(1, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));

// ... but it shouldn't be able to step forward since the next block is
// not in cache yet.
nonblocking_iter->Next();
ASSERT_FALSE(nonblocking_iter->Valid());
ASSERT_TRUE(nonblocking_iter->status().IsIncomplete());

// ... nor should a seek to the next key succeed.
nonblocking_iter->Seek("b20");
ASSERT_FALSE(nonblocking_iter->Valid());
ASSERT_TRUE(nonblocking_iter->status().IsIncomplete());

iter->Next();
ASSERT_TRUE(iter->Valid());
EXPECT_EQ("b3", iter->key().ToString());
EXPECT_EQ("y3", iter->value().ToString());
EXPECT_EQ(2, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
EXPECT_EQ(4, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
EXPECT_EQ(1, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));

// After the blocking iterator loaded the next block, the nonblocking
// iterator's seek should succeed.
nonblocking_iter->Seek("b20");
ASSERT_TRUE(nonblocking_iter->Valid());
EXPECT_EQ("b3", nonblocking_iter->key().ToString());
EXPECT_EQ("y3", nonblocking_iter->value().ToString());
EXPECT_EQ(2, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));

iter->Seek("c0");
ASSERT_TRUE(iter->Valid());
EXPECT_EQ("c0", iter->key().ToString());
EXPECT_EQ("z1,z2", iter->value().ToString());
EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
EXPECT_EQ(4, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
EXPECT_EQ(2, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
EXPECT_EQ(6, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));

iter->Next();
ASSERT_TRUE(iter->Valid());
EXPECT_EQ("c3", iter->key().ToString());
EXPECT_EQ("z3", iter->value().ToString());
EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
EXPECT_EQ(5, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
EXPECT_EQ(2, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
EXPECT_EQ(7, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));

iter.reset();

Expand All @@ -1207,13 +1237,13 @@ TEST_P(DBIteratorTest, IndexWithFirstKey) {
ASSERT_TRUE(iter->Valid());
EXPECT_EQ("b2", iter->key().ToString());
EXPECT_EQ("y2", iter->value().ToString());
EXPECT_EQ(1, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
EXPECT_EQ(5, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
EXPECT_EQ(3, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
EXPECT_EQ(7, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));

iter->Next();
ASSERT_FALSE(iter->Valid());
EXPECT_EQ(1, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
EXPECT_EQ(5, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
EXPECT_EQ(3, stats->getTickerCount(BLOCK_CACHE_DATA_HIT));
EXPECT_EQ(7, stats->getTickerCount(BLOCK_CACHE_DATA_MISS));
}
}

Expand Down
Loading

0 comments on commit e45673d

Please sign in to comment.