Skip to content

Commit

Permalink
Generate mixed workload with Get, Put, Seek in db_bench (#4788)
Browse files Browse the repository at this point in the history
Summary:
Based on the specific workload models (key access distribution, value size distribution, and iterator scan length distribution, the QPS variation), the MixGraph benchmark generate the synthetic workload according to these distributions which can reflect the real-world workload characteristics.

After user enable the tracing function, they will get the trace file. By analyzing the trace file with the trace_analyzer tool, user can generate a set of statistic data files including. The *_accessed_key_stats.txt,  *-accessed_value_size_distribution.txt, *-iterator_length_distribution.txt, and *-qps_stats.txt are mainly used to fit the Matlab model fitting. After that, user can get the parameters of the workload distributions (the modeling details are described: [here](https://github.com/facebook/rocksdb/wiki/RocksDB-Trace%2C-Replay%2C-and-Analyzer))

The key access distribution follows the The two-term power model. The probability density function is: `f(x) = ax^{b}+c`. The corresponding parameters are key_dist_a, key_dist_b, and key_dist_c in db_bench

For the value size distribution and iterator scan length distribution, they both follow the Generalized Pareto Distribution. The probability density function is `f(x) = (1/sigma)(1+k*(x-theta)/sigma))^{-1-1/k)`. The parameters are: value_k, value_theta, value_sigma and iter_k, iter_theta, iter_sigma. For more information about the Generalized Pareto Distribution, users can find the [wiki](https://en.wikipedia.org/wiki/Generalized_Pareto_distribution) and [Matalb page](https://www.mathworks.com/help/stats/generalized-pareto-distribution.html)

As for the QPS, it follows the diurnal pattern. So Sine is a good model to fit it. `F(x) = sine_a*sin(sine_b*x + sine_c) + sine_d`. The trace_will tell you the average QPS in the print out resutls, which is sine_d. After user fit the "*-qps_stats.txt" to the Matlab model, user can get the sine_a, sine_b, and sine_c. By using the 4 parameters, user can control the QPS variation including the period, average, changes.

To use the bench mark, user can indicate the following parameters as examples:
```
-benchmarks="mixgraph" -key_dist_a=0.002312 -key_dist_b=0.3467 -value_k=0.9233 -value_sigma=226.4092 -iter_k=2.517 -iter_sigma=14.236 -mix_get_ratio=0.7 -mix_put_ratio=0.25 -mix_seek_ratio=0.05 -sine_mix_rate_interval_milliseconds=500 -sine_a=15000 -sine_b=1 -sine_d=20000
```
Pull Request resolved: #4788

Differential Revision: D13573940

Pulled By: sagar0

fbshipit-source-id: e184c27e07b4f1bc0b436c2be36c5090c1fb0222
  • Loading branch information
zhichao-cao authored and facebook-github-bot committed Jan 22, 2019
1 parent 16a5ac5 commit ce8e88d
Showing 1 changed file with 293 additions and 0 deletions.
293 changes: 293 additions & 0 deletions tools/db_bench_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#endif
#include <fcntl.h>
#include <inttypes.h>
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
Expand Down Expand Up @@ -102,6 +103,7 @@ DEFINE_string(
"compact,"
"compactall,"
"multireadrandom,"
"mixgraph,"
"readseq,"
"readtocache,"
"readreverse,"
Expand Down Expand Up @@ -932,6 +934,52 @@ DEFINE_uint64(
"If non-zero, db_bench will rate-limit the writes going into RocksDB. This "
"is the global rate in bytes/second.");

// the parameters of mix_graph
DEFINE_double(key_dist_a, 0.0,
"The parameter 'a' of key access distribution model "
"f(x)=a*x^b");
DEFINE_double(key_dist_b, 0.0,
"The parameter 'b' of key access distribution model "
"f(x)=a*x^b");
DEFINE_double(value_theta, 0.0,
"The parameter 'theta' of Generized Pareto Distribution "
"f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
DEFINE_double(value_k, 0.0,
"The parameter 'k' of Generized Pareto Distribution "
"f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
DEFINE_double(value_sigma, 0.0,
"The parameter 'theta' of Generized Pareto Distribution "
"f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
DEFINE_double(iter_theta, 0.0,
"The parameter 'theta' of Generized Pareto Distribution "
"f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
DEFINE_double(iter_k, 0.0,
"The parameter 'k' of Generized Pareto Distribution "
"f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
DEFINE_double(iter_sigma, 0.0,
"The parameter 'sigma' of Generized Pareto Distribution "
"f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
DEFINE_double(mix_get_ratio, 1.0,
"The ratio of Get queries of mix_graph workload");
DEFINE_double(mix_put_ratio, 0.0,
"The ratio of Put queries of mix_graph workload");
DEFINE_double(mix_seek_ratio, 0.0,
"The ratio of Seek queries of mix_graph workload");
DEFINE_int64(mix_max_scan_len, 10000, "The max scan length of Iterator");
DEFINE_int64(mix_ave_kv_size, 512,
"The average key-value size of this workload");
DEFINE_int64(mix_max_value_size, 1024, "The max value size of this workload");
DEFINE_double(
sine_mix_rate_noise, 0.0,
"Add the noise ratio to the sine rate, it is between 0.0 and 1.0");
DEFINE_bool(sine_mix_rate, false,
"Enable the sine QPS control on the mix workload");
DEFINE_uint64(
sine_mix_rate_interval_milliseconds, 10000,
"Interval of which the sine wave read_rate_limit is recalculated");
DEFINE_int64(mix_accesses, -1,
"The total query accesses of mix_graph workload");

DEFINE_uint64(
benchmark_read_rate_limit, 0,
"If non-zero, db_bench will rate-limit the reads from RocksDB. This "
Expand Down Expand Up @@ -2627,6 +2675,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
fprintf(stderr, "entries_per_batch = %" PRIi64 "\n",
entries_per_batch_);
method = &Benchmark::MultiReadRandom;
} else if (name == "mixgraph") {
method = &Benchmark::MixGraph;
} else if (name == "readmissing") {
++key_size_;
method = &Benchmark::ReadRandom;
Expand Down Expand Up @@ -4536,6 +4586,249 @@ void VerifyDBFromDB(std::string& truth_db_name) {
thread->stats.AddMessage(msg);
}

// THe reverse function of Pareto function
int64_t ParetoCdfInversion(double u, double theta, double k, double sigma) {
double ret;
if (k == 0.0) {
ret = theta - sigma * std::log(u);
} else {
ret = theta + sigma * (std::pow(u, -1 * k) - 1) / k;
}
return static_cast<int64_t>(ceil(ret));
}
// inversion of y=ax^b
int64_t PowerCdfInversion(double u, double a, double b) {
double ret;
ret = std::pow((u / a), (1 / b));
return static_cast<int64_t>(ceil(ret));
}

// Add the noice to the QPS
double AddNoise(double origin, double noise_ratio) {
if (noise_ratio < 0.0 || noise_ratio > 1.0) {
return origin;
}
int band_int = static_cast<int>(FLAGS_sine_a);
double delta = (rand() % band_int - band_int / 2) * noise_ratio;
if (origin + delta < 0) {
return origin;
} else {
return (origin + delta);
}
}

// decide the query type
// 0 Get, 1 Put, 2 Seek, 3 SeekForPrev, 4 Delete, 5 SingleDelete, 6 merge
class QueryDecider {
public:
std::vector<int> type_;
std::vector<double> ratio_;
int range_;

QueryDecider() {}
~QueryDecider() {}

Status Initiate(std::vector<double> ratio_input) {
int range_max = 1000;
double sum = 0.0;
for (auto& ratio : ratio_input) {
sum += ratio;
}
range_ = 0;
for (auto& ratio : ratio_input) {
range_ += static_cast<int>(ceil(range_max * (ratio / sum)));
type_.push_back(range_);
ratio_.push_back(ratio / sum);
}
return Status::OK();
}

int GetType(int64_t rand_num) {
if (rand_num < 0) {
rand_num = rand_num * (-1);
}
int pos = static_cast<int>(rand_num % range_);
for (int i = 0; i < static_cast<int>(type_.size()); i++) {
if (pos < type_[i]) {
return i;
}
}
return 0;
}
};

// The graph wokrload mixed with Get, Put, Iterator
void MixGraph(ThreadState* thread) {
int64_t read = 0; // including single gets and Next of iterators
int64_t gets = 0;
int64_t puts = 0;
int64_t found = 0;
int64_t seek = 0;
int64_t seek_found = 0;
int64_t bytes = 0;
int64_t value_max = FLAGS_mix_max_value_size;
int64_t scan_len_max = FLAGS_mix_max_scan_len;
double write_rate = 1000000.0;
double read_rate = 1000000.0;
std::vector<double> ratio;
char value_buffer[2 * value_max];
QueryDecider query;
RandomGenerator gen;
Status s;

ReadOptions options(FLAGS_verify_checksum, true);
std::unique_ptr<const char[]> key_guard;
Slice key = AllocateKey(&key_guard);
PinnableSlice pinnable_val;
ratio.push_back(FLAGS_mix_get_ratio);
ratio.push_back(FLAGS_mix_put_ratio);
ratio.push_back(FLAGS_mix_seek_ratio);
query.Initiate(ratio);

// the limit of qps initiation
if (FLAGS_sine_a != 0 || FLAGS_sine_d != 0) {
thread->shared->read_rate_limiter.reset(NewGenericRateLimiter(
read_rate, 100000 /* refill_period_us */, 10 /* fairness */,
RateLimiter::Mode::kReadsOnly));
thread->shared->write_rate_limiter.reset(
NewGenericRateLimiter(write_rate));
}

Duration duration(FLAGS_duration, reads_);
while (!duration.Done(1)) {
DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread);
int64_t rand_v, key_rand, key_seed;
rand_v = GetRandomKey(&thread->rand) % FLAGS_num;
double u = static_cast<double>(rand_v) / FLAGS_num;
key_seed = PowerCdfInversion(u, FLAGS_key_dist_a, FLAGS_key_dist_b);
Random64 rand(key_seed);
key_rand = static_cast<int64_t>(rand.Next()) % FLAGS_num;
GenerateKeyFromInt(key_rand, FLAGS_num, &key);
int query_type = query.GetType(rand_v);

// change the qps
uint64_t now = FLAGS_env->NowMicros();
uint64_t usecs_since_last;
if (now > thread->stats.GetSineInterval()) {
usecs_since_last = now - thread->stats.GetSineInterval();
} else {
usecs_since_last = 0;
}

if (usecs_since_last >
(FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000})) {
double usecs_since_start =
static_cast<double>(now - thread->stats.GetStart());
thread->stats.ResetSineInterval();
double mix_rate_with_noise = AddNoise(
SineRate(usecs_since_start / 1000000.0), FLAGS_sine_mix_rate_noise);
read_rate = mix_rate_with_noise * (query.ratio_[0] + query.ratio_[2]);
write_rate =
mix_rate_with_noise * query.ratio_[1] * FLAGS_mix_ave_kv_size;

thread->shared->write_rate_limiter.reset(
NewGenericRateLimiter(write_rate));
thread->shared->read_rate_limiter.reset(NewGenericRateLimiter(
read_rate,
FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000}, 10,
RateLimiter::Mode::kReadsOnly));
}
// Start the query
if (query_type == 0) {
// the Get query
gets++;
read++;
if (FLAGS_num_column_families > 1) {
s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key,
&pinnable_val);
} else {
pinnable_val.Reset();
s = db_with_cfh->db->Get(options,
db_with_cfh->db->DefaultColumnFamily(), key,
&pinnable_val);
}

if (s.ok()) {
found++;
bytes += key.size() + pinnable_val.size();
} else if (!s.IsNotFound()) {
fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
abort();
}

if (thread->shared->read_rate_limiter.get() != nullptr &&
read % 256 == 255) {
thread->shared->read_rate_limiter->Request(
256, Env::IO_HIGH, nullptr /* stats */,
RateLimiter::OpType::kRead);
}

} else if (query_type == 1) {
// the Put query
puts++;
int64_t value_size = ParetoCdfInversion(
u, FLAGS_value_theta, FLAGS_value_k, FLAGS_value_sigma);
if (value_size < 0) {
value_size = 10;
} else if (value_size > value_max) {
value_size = value_size % value_max;
}
s = db_with_cfh->db->Put(write_options_, key, gen.Generate(value_size));
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
}

if (thread->shared->write_rate_limiter) {
thread->shared->write_rate_limiter->Request(
key.size() + value_size, Env::IO_HIGH, nullptr /*stats*/,
RateLimiter::OpType::kWrite);
}

} else if (query_type == 2) {
// Seek query
if (db_with_cfh->db != nullptr) {
Iterator* single_iter = nullptr;
single_iter = db_with_cfh->db->NewIterator(options);
if (single_iter != nullptr) {
single_iter->Seek(key);
seek++;
read++;
if (single_iter->Valid() && single_iter->key().compare(key) == 0) {
seek_found++;
}
int64_t scan_length =
ParetoCdfInversion(u, FLAGS_iter_theta, FLAGS_iter_k,
FLAGS_iter_sigma) %
scan_len_max;
for (int64_t j = 0; j < scan_length && single_iter->Valid(); j++) {
Slice value = single_iter->value();
memcpy(value_buffer, value.data(),
std::min(value.size(), sizeof(value_buffer)));
bytes += single_iter->key().size() + single_iter->value().size();
single_iter->Next();
assert(single_iter->status().ok());
}
}
delete single_iter;
}
}
}
char msg[100];
snprintf(msg, sizeof(msg),
"( Gets:%" PRIu64 " Puts:%" PRIu64 " Seek:%" PRIu64 " of %" PRIu64
" in %" PRIu64 " found)\n",
gets, puts, seek, found, read);

thread->stats.AddBytes(bytes);
thread->stats.AddMessage(msg);

if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
get_perf_context()->ToString());
}
}

void IteratorCreation(ThreadState* thread) {
Duration duration(FLAGS_duration, reads_);
ReadOptions options(FLAGS_verify_checksum, true);
Expand Down

0 comments on commit ce8e88d

Please sign in to comment.