Skip to content

Commit

Permalink
Add a temporary option for user to opt-out enforcement of SingleDelet…
Browse files Browse the repository at this point in the history
…e contract (facebook#9983)

Summary:
PR facebook#9888 started to enforce the contract of single delete described in https://github.com/facebook/rocksdb/wiki/Single-Delete.

For some of existing use cases, it is desirable to have a transition during which compaction will not fail
if the contract is violated. Therefore, we add a temporary option `enforce_single_del_contracts` to allow
application to opt out from this new strict behavior. Once transition completes, the flag can be set to `true` again.

In a future release, the option will be removed.

Pull Request resolved: facebook#9983

Test Plan: make check

Reviewed By: ajkr

Differential Revision: D36333672

Pulled By: riversand963

fbshipit-source-id: dcb703ea0ed08076a1422f1bfb9914afe3c2caa2
  • Loading branch information
riversand963 authored and facebook-github-bot committed May 16, 2022
1 parent e66e6d2 commit 3f263ef
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 12 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* CompactionFilter::Decision has a new value: kRemoveWithSingleDelete. If CompactionFilter returns this decision, then CompactionIterator will use `SingleDelete` to mark a key as removed.
* Renamed CompactionFilter::Decision::kRemoveWithSingleDelete to kPurge since the latter sounds more general and hides the implementation details of how compaction iterator handles keys.
* Added ability to specify functions for Prepare and Validate to OptionsTypeInfo. Added methods to OptionTypeInfo to set the functions via an API. These methods are intended for RocksDB plugin developers for configuration management.
* Added a new immutable db options, enforce_single_del_contracts. If set to false (default is true), compaction will NOT fail due to a single delete followed by a delete for the same key. The purpose of this temporay option is to help existing use cases migrate.

### Bug Fixes
* RocksDB calls FileSystem::Poll API during FilePrefetchBuffer destruction which impacts performance as it waits for read requets completion which is not needed anymore. Calling FileSystem::AbortIO to abort those requests instead fixes that performance issue.
Expand Down
1 change: 1 addition & 0 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ Status BuildTable(
ShouldReportDetailedTime(env, ioptions.stats),
true /* internal key corruption is not ok */, range_del_agg.get(),
blob_file_builder.get(), ioptions.allow_data_in_errors,
ioptions.enforce_single_del_contracts,
/*compaction=*/nullptr, compaction_filter.get(),
/*shutting_down=*/nullptr,
/*manual_compaction_paused=*/nullptr,
Expand Down
20 changes: 14 additions & 6 deletions db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ CompactionIterator::CompactionIterator(
Env* env, bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
const Compaction* compaction, const CompactionFilter* compaction_filter,
bool enforce_single_del_contracts, const Compaction* compaction,
const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
const std::atomic<int>* manual_compaction_paused,
const std::atomic<bool>* manual_compaction_canceled,
Expand All @@ -38,7 +39,7 @@ CompactionIterator::CompactionIterator(
input, cmp, merge_helper, last_sequence, snapshots,
earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env,
report_detailed_time, expect_valid_internal_key, range_del_agg,
blob_file_builder, allow_data_in_errors,
blob_file_builder, allow_data_in_errors, enforce_single_del_contracts,
std::unique_ptr<CompactionProxy>(
compaction ? new RealCompaction(compaction) : nullptr),
compaction_filter, shutting_down, manual_compaction_paused,
Expand All @@ -52,6 +53,7 @@ CompactionIterator::CompactionIterator(
Env* env, bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
bool enforce_single_del_contracts,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
Expand Down Expand Up @@ -80,6 +82,7 @@ CompactionIterator::CompactionIterator(
manual_compaction_canceled_(manual_compaction_canceled),
info_log_(info_log),
allow_data_in_errors_(allow_data_in_errors),
enforce_single_del_contracts_(enforce_single_del_contracts),
timestamp_size_(cmp_ ? cmp_->timestamp_size() : 0),
full_history_ts_low_(full_history_ts_low),
current_user_key_sequence_(0),
Expand Down Expand Up @@ -657,10 +660,15 @@ void CompactionIterator::NextFromInput() {
"TransactionDBOptions::rollback_deletion_type_callback is "
"configured properly. Mixing SD and DEL can lead to "
"undefined behaviors";
ROCKS_LOG_ERROR(info_log_, "%s", oss.str().c_str());
valid_ = false;
status_ = Status::Corruption(oss.str());
return;
++iter_stats_.num_record_drop_obsolete;
++iter_stats_.num_single_del_mismatch;
if (enforce_single_del_contracts_) {
ROCKS_LOG_ERROR(info_log_, "%s", oss.str().c_str());
valid_ = false;
status_ = Status::Corruption(oss.str());
return;
}
ROCKS_LOG_WARN(info_log_, "%s", oss.str().c_str());
} else if (!is_timestamp_eligible_for_gc) {
// We cannot drop the SingleDelete as timestamp is enabled, and
// timestamp of this key is greater than or equal to
Expand Down
5 changes: 4 additions & 1 deletion db/compaction/compaction_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class CompactionIterator {
Env* env, bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
const Compaction* compaction = nullptr,
bool enforce_single_del_contracts, const Compaction* compaction = nullptr,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const std::atomic<int>* manual_compaction_paused = nullptr,
Expand All @@ -193,6 +193,7 @@ class CompactionIterator {
Env* env, bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
bool enforce_single_del_contracts,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
Expand Down Expand Up @@ -332,6 +333,8 @@ class CompactionIterator {

bool allow_data_in_errors_;

const bool enforce_single_del_contracts_;

// Comes from comparator.
const size_t timestamp_size_;

Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction_iterator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
snapshot_checker_.get(), Env::Default(),
false /* report_detailed_time */, false, range_del_agg_.get(),
nullptr /* blob_file_builder */, true /*allow_data_in_errors*/,
std::move(compaction), filter, &shutting_down_,
true /*enforce_single_del_contracts*/, std::move(compaction), filter,
&shutting_down_,
/*manual_compaction_paused=*/nullptr,
/*manual_compaction_canceled=*/nullptr, /*info_log=*/nullptr,
full_history_ts_low));
Expand Down
6 changes: 3 additions & 3 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1479,9 +1479,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_),
/*expect_valid_internal_key=*/true, &range_del_agg,
blob_file_builder.get(), db_options_.allow_data_in_errors,
sub_compact->compaction, compaction_filter, shutting_down_,
manual_compaction_paused_, manual_compaction_canceled_,
db_options_.info_log, full_history_ts_low));
db_options_.enforce_single_del_contracts, sub_compact->compaction,
compaction_filter, shutting_down_, manual_compaction_paused_,
manual_compaction_canceled_, db_options_.info_log, full_history_ts_low));
auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst();
if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
Expand Down
15 changes: 15 additions & 0 deletions db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,21 @@ TEST_F(CompactionJobTest, OldestBlobFileNumber) {
/* expected_oldest_blob_file_number */ 19);
}

TEST_F(CompactionJobTest, NoEnforceSingleDeleteContract) {
db_options_.enforce_single_del_contracts = false;
NewDB();

auto file =
mock::MakeMockFile({{KeyStr("a", 4U, kTypeSingleDeletion), ""},
{KeyStr("a", 3U, kTypeDeletion), "dontcare"}});
AddMockFile(file);
SetLastSequence(4U);

auto expected_results = mock::MakeMockFile();
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results);
}

TEST_F(CompactionJobTest, InputSerialization) {
// Setup a random CompactionServiceInput
CompactionServiceInput input;
Expand Down
1 change: 1 addition & 0 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ Status FlushJob::MemPurge() {
env, ShouldReportDetailedTime(env, ioptions->stats),
true /* internal key corruption is not ok */, range_del_agg.get(),
nullptr, ioptions->allow_data_in_errors,
ioptions->enforce_single_del_contracts,
/*compaction=*/nullptr, compaction_filter.get(),
/*shutting_down=*/nullptr,
/*manual_compaction_paused=*/nullptr,
Expand Down
13 changes: 13 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,19 @@ struct DBOptions {
//
// Default: kNonVolatileBlockTier
CacheTier lowest_used_cache_tier = CacheTier::kNonVolatileBlockTier;

// If set to false, when compaction or flush sees a SingleDelete followed by
// a Delete for the same user key, compaction job will not fail.
// Otherwise, compaction job will fail.
// This is a temporary option to help existing use cases migrate, and
// will be removed in a future release.
// Warning: do not set to false unless you are trying to migrate existing
// data in which the contract of single delete
// (https://github.com/facebook/rocksdb/wiki/Single-Delete) is not enforced,
// thus has Delete mixed with SingleDelete for the same user key. Violation
// of the contract leads to undefined behaviors with high possibility of data
// inconsistency, e.g. deleted old data become visible again, etc.
bool enforce_single_del_contracts = true;
};

// Options to control the behavior of a database (passed to DB::Open)
Expand Down
9 changes: 8 additions & 1 deletion options/db_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
OptionTypeInfo::Enum<CacheTier>(
offsetof(struct ImmutableDBOptions, lowest_used_cache_tier),
&cache_tier_string_map, OptionTypeFlags::kNone)},
{"enforce_single_del_contracts",
{offsetof(struct ImmutableDBOptions, enforce_single_del_contracts),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
};

const std::string OptionsHelper::kDBOptionsName = "DBOptions";
Expand Down Expand Up @@ -750,7 +754,8 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
db_host_id(options.db_host_id),
checksum_handoff_file_types(options.checksum_handoff_file_types),
lowest_used_cache_tier(options.lowest_used_cache_tier),
compaction_service(options.compaction_service) {
compaction_service(options.compaction_service),
enforce_single_del_contracts(options.enforce_single_del_contracts) {
fs = env->GetFileSystem();
clock = env->GetSystemClock().get();
logger = info_log.get();
Expand Down Expand Up @@ -921,6 +926,8 @@ void ImmutableDBOptions::Dump(Logger* log) const {
allow_data_in_errors);
ROCKS_LOG_HEADER(log, " Options.db_host_id: %s",
db_host_id.c_str());
ROCKS_LOG_HEADER(log, " Options.enforce_single_del_contracts: %s",
enforce_single_del_contracts ? "true" : "false");
}

bool ImmutableDBOptions::IsWalDirSameAsDBPath() const {
Expand Down
1 change: 1 addition & 0 deletions options/db_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ struct ImmutableDBOptions {
Statistics* stats;
Logger* logger;
std::shared_ptr<CompactionService> compaction_service;
bool enforce_single_del_contracts;

bool IsWalDirSameAsDBPath() const;
bool IsWalDirSameAsDBPath(const std::string& path) const;
Expand Down

0 comments on commit 3f263ef

Please sign in to comment.