Skip to content

Commit

Permalink
Pass a timeout to FileSystem for random reads (facebook#6751)
Browse files Browse the repository at this point in the history
Summary:
Calculate ```IOOptions::timeout``` using ```ReadOptions::deadline``` and pass it to ```FileSystem::Read/FileSystem::MultiRead```. This allows us to impose a tighter bound on the time taken by Get/MultiGet on FileSystem/Envs that support IO timeouts. Even on those that don't support, check in ```RandomAccessFileReader::Read``` and ```MultiRead``` and return ```Status::TimedOut()``` if the deadline is exceeded.

For now, TableReader creation, which might do file opens and reads, are not covered. It will be implemented in another PR.

Tests:
Update existing unit tests to verify the correct timeout value is being passed
Pull Request resolved: facebook#6751

Reviewed By: riversand963

Differential Revision: D21285631

Pulled By: anand1976

fbshipit-source-id: d89af843e5a91ece866e87aa29438b52a65a8567
  • Loading branch information
anand76 authored and facebook-github-bot committed Apr 30, 2020
1 parent eecd8fb commit ab13d43
Show file tree
Hide file tree
Showing 24 changed files with 196 additions and 113 deletions.
71 changes: 47 additions & 24 deletions db/db_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2387,17 +2387,22 @@ class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet {

class DeadlineRandomAccessFile : public FSRandomAccessFileWrapper {
public:
DeadlineRandomAccessFile(DeadlineFS& fs,
DeadlineRandomAccessFile(DeadlineFS& fs, SpecialEnv* env,
std::unique_ptr<FSRandomAccessFile>& file)
: FSRandomAccessFileWrapper(file.get()),
fs_(fs),
file_(std::move(file)) {}
file_(std::move(file)),
env_(env) {}

IOStatus Read(uint64_t offset, size_t len, const IOOptions& opts,
Slice* result, char* scratch, IODebugContext* dbg) const override {
int delay;
const std::chrono::microseconds deadline = fs_.GetDeadline();
if (deadline.count()) {
AssertDeadline(deadline, opts);
}
if (fs_.ShouldDelay(&delay)) {
Env::Default()->SleepForMicroseconds(delay);
env_->SleepForMicroseconds(delay);
}
return FSRandomAccessFileWrapper::Read(offset, len, opts, result, scratch,
dbg);
Expand All @@ -2406,22 +2411,37 @@ class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet {
IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
const IOOptions& options, IODebugContext* dbg) override {
int delay;
const std::chrono::microseconds deadline = fs_.GetDeadline();
if (deadline.count()) {
AssertDeadline(deadline, options);
}
if (fs_.ShouldDelay(&delay)) {
Env::Default()->SleepForMicroseconds(delay);
env_->SleepForMicroseconds(delay);
}
return FSRandomAccessFileWrapper::MultiRead(reqs, num_reqs, options, dbg);
}

private:
void AssertDeadline(const std::chrono::microseconds deadline,
const IOOptions& opts) const {
// Give a leeway of +- 10us as it can take some time for the Get/
// MultiGet call to reach here, in order to avoid false alarms
std::chrono::microseconds now =
std::chrono::microseconds(env_->NowMicros());
ASSERT_EQ(deadline - now, opts.timeout);
}
DeadlineFS& fs_;
std::unique_ptr<FSRandomAccessFile> file_;
SpecialEnv* env_;
};

class DeadlineFS : public FileSystemWrapper {
public:
DeadlineFS()
: FileSystemWrapper(FileSystem::Default()),
delay_idx_(0) {}
DeadlineFS(SpecialEnv* env)
: FileSystemWrapper(FileSystem::Default()),
delay_idx_(0),
deadline_(std::chrono::microseconds::zero()),
env_(env) {}
~DeadlineFS() = default;

IOStatus NewRandomAccessFile(const std::string& fname,
Expand All @@ -2432,13 +2452,14 @@ class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet {
IOStatus s;

s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
result->reset(new DeadlineRandomAccessFile(*this, file));
result->reset(new DeadlineRandomAccessFile(*this, env_, file));
return s;
}

// Set a vector of {IO counter, delay in microseconds} pairs that control
// when to inject a delay and duration of the delay
void SetDelaySequence(const std::vector<std::pair<int, int>>&& seq) {
void SetDelaySequence(const std::chrono::microseconds deadline,
const std::vector<std::pair<int, int>>&& seq) {
int total_delay = 0;
for (auto& seq_iter : seq) {
// Ensure no individual delay is > 500ms
Expand All @@ -2451,6 +2472,7 @@ class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet {
delay_seq_ = seq;
delay_idx_ = 0;
io_count_ = 0;
deadline_ = deadline;
}

// Increment the IO counter and return a delay in microseconds
Expand All @@ -2464,10 +2486,14 @@ class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet {
return false;
}

const std::chrono::microseconds GetDeadline() { return deadline_; }

private:
std::vector<std::pair<int, int>> delay_seq_;
size_t delay_idx_;
int io_count_;
std::chrono::microseconds deadline_;
SpecialEnv* env_;
};

inline void CheckStatus(std::vector<Status>& statuses, size_t num_ok) {
Expand All @@ -2483,8 +2509,10 @@ class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet {

TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
std::shared_ptr<DBBasicTestMultiGetDeadline::DeadlineFS> fs(
new DBBasicTestMultiGetDeadline::DeadlineFS());
std::unique_ptr<Env> env = NewCompositeEnv(fs);
new DBBasicTestMultiGetDeadline::DeadlineFS(env_));
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
env_->no_slowdown_ = true;
env_->time_elapse_only_sleep_.store(true);
Options options = CurrentOptions();

std::shared_ptr<Cache> cache = NewLRUCache(1048576);
Expand All @@ -2509,13 +2537,13 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
cfs[i] = handles_[i];
keys[i] = Slice(key_str[i].data(), key_str[i].size());
}
// Delay the first IO by 200ms
fs->SetDelaySequence({{0, 200000}});

ReadOptions ro;
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
// Delay the first IO by 200ms
fs->SetDelaySequence(ro.deadline, {{0, 20000}});

std::vector<Status> statuses = dbfull()->MultiGet(ro, cfs, keys, &values);
std::cout << "Non-batched MultiGet";
// The first key is successful because we check after the lookup, but
// subsequent keys fail due to deadline exceeded
CheckStatus(statuses, 1);
Expand All @@ -2537,10 +2565,9 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
cfs[i] = handles_[i / 2];
keys[i] = Slice(key_str[i].data(), key_str[i].size());
}
fs->SetDelaySequence({{1, 200000}});
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
fs->SetDelaySequence(ro.deadline, {{1, 20000}});
statuses = dbfull()->MultiGet(ro, cfs, keys, &values);
std::cout << "Non-batched 2";
CheckStatus(statuses, 3);

// Test batched MultiGet with an IO delay in the first data block read.
Expand All @@ -2552,11 +2579,10 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
cache->SetCapacity(1048576);
statuses.clear();
statuses.resize(keys.size());
fs->SetDelaySequence({{0, 200000}});
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
fs->SetDelaySequence(ro.deadline, {{0, 20000}});
dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(),
pin_values.data(), statuses.data());
std::cout << "Batched 1";
CheckStatus(statuses, 2);

// Similar to the previous one, but an IO delay in the third CF data block
Expand All @@ -2568,11 +2594,10 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
cache->SetCapacity(1048576);
statuses.clear();
statuses.resize(keys.size());
fs->SetDelaySequence({{2, 200000}});
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
fs->SetDelaySequence(ro.deadline, {{2, 20000}});
dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(),
pin_values.data(), statuses.data());
std::cout << "Batched 2";
CheckStatus(statuses, 6);

// Similar to the previous one, but an IO delay in the last but one CF
Expand All @@ -2583,11 +2608,10 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
cache->SetCapacity(1048576);
statuses.clear();
statuses.resize(keys.size());
fs->SetDelaySequence({{3, 200000}});
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
fs->SetDelaySequence(ro.deadline, {{3, 20000}});
dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(),
pin_values.data(), statuses.data());
std::cout << "Batched 3";
CheckStatus(statuses, 8);

// Test batched MultiGet with single CF and lots of keys. Inject delay
Expand All @@ -2610,11 +2634,10 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
}
statuses.clear();
statuses.resize(keys.size());
fs->SetDelaySequence({{1, 200000}});
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
fs->SetDelaySequence(ro.deadline, {{1, 20000}});
dbfull()->MultiGet(ro, handles_[0], keys.size(), keys.data(),
pin_values.data(), statuses.data());
std::cout << "Batched single CF";
CheckStatus(statuses, 64);
Close();
}
Expand Down
5 changes: 0 additions & 5 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1527,11 +1527,6 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
GetImplOptions& get_impl_options) {
assert(get_impl_options.value != nullptr ||
get_impl_options.merge_operands != nullptr);
// We will eventually support deadline for Get requests too, but safeguard
// for now
if (read_options.deadline != std::chrono::microseconds::zero()) {
return Status::NotSupported("ReadOptions deadline is not supported");
}

#ifndef NDEBUG
assert(get_impl_options.column_family);
Expand Down
3 changes: 1 addition & 2 deletions file/file_prefetch_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ Status FilePrefetchBuffer::Prefetch(RandomAccessFileReader* reader,

Slice result;
size_t read_len = static_cast<size_t>(roundup_len - chunk_len);
s = reader->Read(rounddown_offset + chunk_len,
read_len, &result,
s = reader->Read(IOOptions(), rounddown_offset + chunk_len, read_len, &result,
buffer_.BufferStart() + chunk_len, nullptr, for_compaction);
#ifndef NDEBUG
if (!s.ok() || result.size() < read_len) {
Expand Down
16 changes: 16 additions & 0 deletions file/file_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,20 @@ extern Status DeleteDBFile(const ImmutableDBOptions* db_options,

extern bool IsWalDirSameAsDBPath(const ImmutableDBOptions* db_options);

inline IOStatus PrepareIOFromReadOptions(const ReadOptions& ro, Env* env,
IOOptions& opts) {
if (!env) {
env = Env::Default();
}

if (ro.deadline.count()) {
std::chrono::microseconds now = std::chrono::microseconds(env->NowMicros());
if (now > ro.deadline) {
return IOStatus::TimedOut("Deadline exceeded");
}
opts.timeout = ro.deadline - now;
}
return IOStatus::OK();
}

} // namespace ROCKSDB_NAMESPACE
30 changes: 23 additions & 7 deletions file/random_access_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
#include "monitoring/histogram.h"
#include "monitoring/iostats_context_imp.h"
#include "port/port.h"
#include "table/format.h"
#include "test_util/sync_point.h"
#include "util/random.h"
#include "util/rate_limiter.h"

namespace ROCKSDB_NAMESPACE {

Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
char* scratch, AlignedBuf* aligned_buf,
Status RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset,
size_t n, Slice* result, char* scratch,
AlignedBuf* aligned_buf,
bool for_compaction) const {
(void)aligned_buf;
Status s;
Expand Down Expand Up @@ -62,10 +64,16 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
start_ts = std::chrono::system_clock::now();
orig_offset = aligned_offset + buf.CurrentSize();
}

{
IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
s = file_->Read(aligned_offset + buf.CurrentSize(), allowed,
IOOptions(), &tmp, buf.Destination(), nullptr);
// Only user reads are expected to specify a timeout. And user reads
// are not subjected to rate_limiter and should go through only
// one iteration of this loop, so we don't need to check and adjust
// the opts.timeout before calling file_->Read
assert(!opts.timeout.count() || allowed == read_size);
s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, opts,
&tmp, buf.Destination(), nullptr);
}
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::system_clock::now();
Expand Down Expand Up @@ -116,9 +124,15 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
start_ts = std::chrono::system_clock::now();
}
#endif

{
IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
s = file_->Read(offset + pos, allowed, IOOptions(), &tmp_result,
// Only user reads are expected to specify a timeout. And user reads
// are not subjected to rate_limiter and should go through only
// one iteration of this loop, so we don't need to check and adjust
// the opts.timeout before calling file_->Read
assert(!opts.timeout.count() || allowed == n);
s = file_->Read(offset + pos, allowed, opts, &tmp_result,
scratch + pos, nullptr);
}
#ifndef ROCKSDB_LITE
Expand Down Expand Up @@ -186,7 +200,8 @@ bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) {
return true;
}

Status RandomAccessFileReader::MultiRead(FSReadRequest* read_reqs,
Status RandomAccessFileReader::MultiRead(const IOOptions& opts,
FSReadRequest* read_reqs,
size_t num_reqs,
AlignedBuf* aligned_buf) const {
(void)aligned_buf; // suppress warning of unused variable in LITE mode
Expand Down Expand Up @@ -244,9 +259,10 @@ Status RandomAccessFileReader::MultiRead(FSReadRequest* read_reqs,
start_ts = std::chrono::system_clock::now();
}
#endif // ROCKSDB_LITE

{
IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
s = file_->MultiRead(fs_reqs, num_fs_reqs, IOOptions(), nullptr);
s = file_->MultiRead(fs_reqs, num_fs_reqs, opts, nullptr);
}

#ifndef ROCKSDB_LITE
Expand Down
9 changes: 6 additions & 3 deletions file/random_access_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,16 @@ class RandomAccessFileReader {
// 2. Otherwise, scratch is not used and can be null, the aligned_buf owns
// the internally allocated buffer on return, and the result refers to a
// region in aligned_buf.
Status Read(uint64_t offset, size_t n, Slice* result, char* scratch,
AlignedBuf* aligned_buf, bool for_compaction = false) const;
Status Read(const IOOptions& opts, uint64_t offset, size_t n, Slice* result,
char* scratch, AlignedBuf* aligned_buf,
bool for_compaction = false) const;

// REQUIRES:
// num_reqs > 0, reqs do not overlap, and offsets in reqs are increasing.
// In non-direct IO mode, aligned_buf should be null;
// In direct IO mode, aligned_buf stores the aligned buffer allocated inside
// MultiRead, the result Slices in reqs refer to aligned_buf.
Status MultiRead(FSReadRequest* reqs, size_t num_reqs,
Status MultiRead(const IOOptions& opts, FSReadRequest* reqs, size_t num_reqs,
AlignedBuf* aligned_buf) const;

Status Prefetch(uint64_t offset, size_t n) const {
Expand All @@ -134,5 +135,7 @@ class RandomAccessFileReader {
std::string file_name() const { return file_name_; }

bool use_direct_io() const { return file_->use_direct_io(); }

Env* env() const { return env_; }
};
} // namespace ROCKSDB_NAMESPACE
15 changes: 10 additions & 5 deletions file/random_access_file_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ TEST_F(RandomAccessFileReaderTest, ReadDirectIO) {
Slice result;
AlignedBuf buf;
for (bool for_compaction : {true, false}) {
ASSERT_OK(r->Read(offset, len, &result, nullptr, &buf, for_compaction));
ASSERT_OK(r->Read(IOOptions(), offset, len, &result, nullptr, &buf,
for_compaction));
ASSERT_EQ(result.ToString(), content.substr(offset, len));
}
}
Expand Down Expand Up @@ -152,7 +153,8 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
reqs.push_back(std::move(r0));
reqs.push_back(std::move(r1));
AlignedBuf aligned_buf;
ASSERT_OK(r->MultiRead(reqs.data(), reqs.size(), &aligned_buf));
ASSERT_OK(
r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf));

AssertResult(content, reqs);
}
Expand Down Expand Up @@ -189,7 +191,8 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
reqs.push_back(std::move(r1));
reqs.push_back(std::move(r2));
AlignedBuf aligned_buf;
ASSERT_OK(r->MultiRead(reqs.data(), reqs.size(), &aligned_buf));
ASSERT_OK(
r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf));

AssertResult(content, reqs);
}
Expand Down Expand Up @@ -226,7 +229,8 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
reqs.push_back(std::move(r1));
reqs.push_back(std::move(r2));
AlignedBuf aligned_buf;
ASSERT_OK(r->MultiRead(reqs.data(), reqs.size(), &aligned_buf));
ASSERT_OK(
r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf));

AssertResult(content, reqs);
}
Expand Down Expand Up @@ -255,7 +259,8 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
reqs.push_back(std::move(r0));
reqs.push_back(std::move(r1));
AlignedBuf aligned_buf;
ASSERT_OK(r->MultiRead(reqs.data(), reqs.size(), &aligned_buf));
ASSERT_OK(
r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf));

AssertResult(content, reqs);
}
Expand Down
Loading

0 comments on commit ab13d43

Please sign in to comment.