Skip to content

Commit

Permalink
Create timestamped snapshot for multiops txns
Browse files Browse the repository at this point in the history
  • Loading branch information
riversand963 committed Jul 11, 2022
1 parent 8448947 commit bf2d4ad
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 7 deletions.
53 changes: 46 additions & 7 deletions db_stress_tool/multi_ops_txns_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread,
return s;
}

s = txn->Commit();
s = CommitAndCreateTimestampedSnapshotIfNeeded(thread, *txn);

auto& key_gen = key_gen_for_a_.at(thread->tid);
if (s.ok()) {
Expand Down Expand Up @@ -876,7 +876,7 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread,
return s;
}

s = txn->Commit();
s = CommitAndCreateTimestampedSnapshotIfNeeded(thread, *txn);

if (s.ok()) {
delete txn;
Expand Down Expand Up @@ -968,7 +968,8 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread,
return s;
}

s = txn->Commit();
s = CommitAndCreateTimestampedSnapshotIfNeeded(thread, *txn);

if (s.ok()) {
delete txn;
}
Expand Down Expand Up @@ -1011,8 +1012,8 @@ Status MultiOpsTxnsStressTest::PointLookupTxn(ThreadState* thread,
RollbackTxn(txn).PermitUncheckedError();
});

txn->SetSnapshot();
ropts.snapshot = txn->GetSnapshot();
std::shared_ptr<const Snapshot> snapshot;
SetupSnapshot(thread, ropts, *txn, snapshot);

if (FLAGS_delay_snapshot_read_one_in > 0 &&
thread->rand.OneIn(FLAGS_delay_snapshot_read_one_in)) {
Expand Down Expand Up @@ -1062,8 +1063,8 @@ Status MultiOpsTxnsStressTest::RangeScanTxn(ThreadState* thread,
RollbackTxn(txn).PermitUncheckedError();
});

txn->SetSnapshot();
ropts.snapshot = txn->GetSnapshot();
std::shared_ptr<const Snapshot> snapshot;
SetupSnapshot(thread, ropts, *txn, snapshot);

if (FLAGS_delay_snapshot_read_one_in > 0 &&
thread->rand.OneIn(FLAGS_delay_snapshot_read_one_in)) {
Expand Down Expand Up @@ -1369,6 +1370,35 @@ Status MultiOpsTxnsStressTest::WriteToCommitTimeWriteBatch(Transaction& txn) {
return ctwb->Put(Slice(key_buf, sizeof(key_buf)),
Slice(val_buf, sizeof(val_buf)));
}

Status MultiOpsTxnsStressTest::CommitAndCreateTimestampedSnapshotIfNeeded(
ThreadState* thread, Transaction& txn) {
Status s;
if (FLAGS_create_timestamped_snapshot_one_in > 0 &&
thread->rand.OneInOpt(FLAGS_create_timestamped_snapshot_one_in)) {
uint64_t ts = db_stress_env->NowNanos();
std::shared_ptr<const Snapshot> snapshot;
s = txn.CommitAndTryCreateSnapshot(/*notifier=*/nullptr, ts, &snapshot);
} else {
s = txn.Commit();
}
return s;
}

void MultiOpsTxnsStressTest::SetupSnapshot(
ThreadState* thread, ReadOptions& read_opts, Transaction& txn,
std::shared_ptr<const Snapshot>& snapshot) {
if (thread->rand.OneInOpt(2)) {
snapshot = txn_db_->GetLatestTimestampedSnapshot();
}

if (snapshot) {
read_opts.snapshot = snapshot.get();
} else {
txn.SetSnapshot();
read_opts.snapshot = txn.GetSnapshot();
}
}
#endif // !ROCKSDB_LITE

std::string MultiOpsTxnsStressTest::KeySpaces::EncodeTo() const {
Expand Down Expand Up @@ -1734,6 +1764,15 @@ void CheckAndSetOptionsForMultiOpsTxnStressTest() {
"-key_spaces_path\n");
exit(1);
}
if (FLAGS_create_timestamped_snapshot_one_in > 0) {
if (FLAGS_txn_write_policy !=
static_cast<uint64_t>(TxnDBWritePolicy::WRITE_COMMITTED)) {
fprintf(stderr,
"Timestamped snapshot is not yet supported by "
"write-prepared/write-unprepared transactions\n");
exit(1);
}
}
#else
fprintf(stderr, "-test_multi_ops_txns not supported in ROCKSDB_LITE mode\n");
exit(1);
Expand Down
7 changes: 7 additions & 0 deletions db_stress_tool/multi_ops_txns_stress.h
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,13 @@ class MultiOpsTxnsStressTest : public StressTest {
// actual value of the metadata. Method WriteToCommitTimeWriteBatch()
// emulates this scenario.
Status WriteToCommitTimeWriteBatch(Transaction& txn);

Status CommitAndCreateTimestampedSnapshotIfNeeded(ThreadState* thread,
Transaction& txn);

void SetupSnapshot(ThreadState* thread, ReadOptions& read_opts,
Transaction& txn,
std::shared_ptr<const Snapshot>& snapshot);
#endif //! ROCKSDB_LITE

std::vector<std::unique_ptr<KeyGenerator>> key_gen_for_a_;
Expand Down
2 changes: 2 additions & 0 deletions tools/db_crashtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ def is_direct_io_supported(dbname):
"rollback_one_in": 4,
# Re-enable once we have a compaction for MultiOpsTxnStressTest
"enable_compaction_filter": 0,
"create_timestamped_snapshot_one_in": 50,
}

multiops_wc_txn_params = {
Expand All @@ -425,6 +426,7 @@ def is_direct_io_supported(dbname):
"use_only_the_last_commit_time_batch_for_recovery": 1,
"recycle_log_file_num": 0,
"clear_wp_commit_cache_one_in": 10,
"create_timestamped_snapshot_one_in": 0,
}

def finalize_and_sanitize(src_params):
Expand Down

0 comments on commit bf2d4ad

Please sign in to comment.