Skip to content

Commit

Permalink
Add maintainer_job_ratio support (#159)
Browse files Browse the repository at this point in the history
  • Loading branch information
mm304321141 committed Nov 29, 2021
1 parent cea10fb commit e257ad5
Show file tree
Hide file tree
Showing 17 changed files with 83 additions and 36 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ if(NOT WITH_TERARKDB_NAMESPACE)
endif()
MESSAGE(STATUS "terarkdb namespace: " ${WITH_TERARKDB_NAMESPACE})

option(BUILD_SUFFIX "library sufix" "d")
option(BUILD_SUFFIX "library suffix" "d")
option(FORCE_TERARKDB_RELEASE_BUILD "ignore outside cmake_build_type" OFF)

SET(CMAKE_THREAD_PREFER_PTHREAD TRUE)
Expand Down
16 changes: 11 additions & 5 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2472,20 +2472,26 @@ void rocksdb_options_set_blob_gc_ratio(rocksdb_options_t* opt, double v) {
opt->rep.blob_gc_ratio = v;
}

void rocksdb_options_target_blob_file_size(rocksdb_options_t* opt, uint64_t v) {
void rocksdb_options_set_target_blob_file_size(rocksdb_options_t* opt,
uint64_t v) {
opt->rep.target_blob_file_size = v;
}

void rocksdb_options_blob_file_defragment_size(rocksdb_options_t* opt,
uint64_t v) {
void rocksdb_options_set_blob_file_defragment_size(rocksdb_options_t* opt,
uint64_t v) {
opt->rep.blob_file_defragment_size = v;
}

void rocksdb_options_max_dependence_blob_overlap(rocksdb_options_t* opt,
size_t v) {
void rocksdb_options_set_max_dependence_blob_overlap(rocksdb_options_t* opt,
size_t v) {
opt->rep.max_dependence_blob_overlap = v;
}

void rocksdb_options_set_maintainer_job_ratio(rocksdb_options_t* opt,
double v) {
opt->rep.maintainer_job_ratio = v;
}

void rocksdb_options_set_optimize_filters_for_hits(rocksdb_options_t* opt,
bool v) {
opt->rep.optimize_filters_for_hits = v;
Expand Down
37 changes: 24 additions & 13 deletions db/version_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ struct VersionBuilderContextImpl : VersionBuilder::Context {
: table_cache(nullptr),
levels(nullptr),
dependence_version(0),
new_deleted_files(0) {}
new_deleted_files(0),
maintainer_job_limit(0) {}

~VersionBuilderContextImpl() {
for (auto& pair : dependence_map) {
Expand Down Expand Up @@ -148,6 +149,7 @@ struct VersionBuilderContextImpl : VersionBuilder::Context {
chash_map<uint64_t, FileMetaData*>* levels;
size_t dependence_version;
size_t new_deleted_files;
uint64_t maintainer_job_limit;
chash_map<uint64_t, DependenceItem> dependence_map;
chash_map<uint64_t, InheritanceItem> inheritance_counter;
};
Expand Down Expand Up @@ -322,7 +324,8 @@ class VersionBuilder::Rep {
}
}

void CalculateDependence(bool finish, bool is_open_db = false) {
void CalculateDependence(bool finish, bool is_open_db,
double maintainer_job_ratio) {
if (!finish && (!is_open_db || context_->new_deleted_files < 65536)) {
return;
}
Expand All @@ -348,7 +351,9 @@ class VersionBuilder::Rep {
std::priority_queue<uint64_t> old_file_queue;
constexpr size_t max_queue_size = 8;
auto push_old_file = [&](FileMetaData* f) {
if (!f->prop.is_map_sst() && !f->prop.dependence.empty()) {
if (!f->being_compacted && !f->prop.is_map_sst() &&
!f->prop.dependence.empty() &&
!f->has_marked_for_compaction(FileMetaData::kMarkedFromUpdateBlob)) {
for (auto& dependence : f->prop.dependence) {
if (TransFileNumber(dependence.file_number)->file_number !=
dependence.file_number) {
Expand Down Expand Up @@ -409,6 +414,8 @@ class VersionBuilder::Rep {
}
++it;
} else {
context_->maintainer_job_limit +=
uint64_t(item.f->fd.GetFileSize() * maintainer_job_ratio);
DelInheritance(item.f);
context_->UnrefFile(item.f);
it = dependence_map.erase(it);
Expand All @@ -421,9 +428,12 @@ class VersionBuilder::Rep {
while (old_file_queue.size() > old_file_count) {
old_file_queue.pop();
}
while (!old_file_queue.empty()) {
dependence_map[old_file_queue.top()].f->marked_for_compaction |=
FileMetaData::kMarkedFromUpdateBlob;
FileMetaData* f;
while (!old_file_queue.empty() &&
(f = dependence_map[old_file_queue.top()].f)->fd.GetFileSize() <
context_->maintainer_job_limit) {
context_->maintainer_job_limit -= f->fd.GetFileSize();
f->marked_for_compaction |= FileMetaData::kMarkedFromUpdateBlob;
old_file_queue.pop();
}
}
Expand Down Expand Up @@ -666,15 +676,15 @@ class VersionBuilder::Rep {
}

// shrink files
CalculateDependence(false, edit->is_open_db());
CalculateDependence(false, edit->is_open_db(), 0);
}

// Save the current state in *v.
// WARNING: this func will call out of mutex
void SaveTo(VersionStorageInfo* vstorage) {
void SaveTo(VersionStorageInfo* vstorage, double maintainer_job_ratio) {
Init();
CheckConsistency(vstorage, true);
CalculateDependence(true);
CalculateDependence(true, false, maintainer_job_ratio);
auto exists = [&](uint64_t file_number) {
auto find = context_->inheritance_counter.find(file_number);
assert(find != context_->inheritance_counter.end());
Expand Down Expand Up @@ -867,8 +877,9 @@ bool VersionBuilder::CheckConsistencyForNumLevels() {

void VersionBuilder::Apply(VersionEdit* edit) { rep_->Apply(edit); }

void VersionBuilder::SaveTo(VersionStorageInfo* vstorage) {
rep_->SaveTo(vstorage);
void VersionBuilder::SaveTo(VersionStorageInfo* vstorage,
double maintainer_job_ratio) {
rep_->SaveTo(vstorage, maintainer_job_ratio);
}

void VersionBuilder::LoadTableHandlers(InternalStats* internal_stats,
Expand Down Expand Up @@ -978,15 +989,15 @@ void VersionBuilderDebugger::Verify(VersionBuilder::Rep* rep,
vstorage->InternalComparator(),
vstorage->InternalComparator()->user_comparator(),
vstorage->num_levels(), kCompactionStyleNone, true);
rep_0.SaveTo(&vstorage_1);
rep_0.SaveTo(&vstorage_1, 0);
VersionBuilder::Rep rep_1(rep->env_options_, rep->info_log_,
rep->table_cache, &vstorage_1);
for (size_t j = i; j < pos.size() - 1; ++j) {
VersionEdit edit;
get_edit(j, &edit);
rep_1.Apply(&edit);
}
rep_1.SaveTo(&vstorage_0);
rep_1.SaveTo(&vstorage_0, 0);
auto err = verify(vstorage, &vstorage_0);
if (!err.empty()) {
has_err = true;
Expand Down
2 changes: 1 addition & 1 deletion db/version_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class VersionBuilder {
int level);
bool CheckConsistencyForNumLevels();
void Apply(VersionEdit* edit);
void SaveTo(VersionStorageInfo* vstorage);
void SaveTo(VersionStorageInfo* vstorage, double maintainer_job_ratio);
void LoadTableHandlers(InternalStats* internal_stats,
bool prefetch_index_and_filter_in_cache,
const SliceTransform* prefix_extractor,
Expand Down
14 changes: 7 additions & 7 deletions db/version_builder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ TEST_F(VersionBuilderTest, ApplyAndSaveTo) {
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
kCompactionStyleLevel, false);
version_builder.Apply(&version_edit);
version_builder.SaveTo(&new_vstorage);
version_builder.SaveTo(&new_vstorage, 0);

ASSERT_EQ(400U, new_vstorage.NumLevelBytes(2));
ASSERT_EQ(300U, new_vstorage.NumLevelBytes(3));
Expand Down Expand Up @@ -193,7 +193,7 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic) {
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
kCompactionStyleLevel, false);
version_builder.Apply(&version_edit);
version_builder.SaveTo(&new_vstorage);
version_builder.SaveTo(&new_vstorage, 0);

ASSERT_EQ(0U, new_vstorage.NumLevelBytes(0));
ASSERT_EQ(100U, new_vstorage.NumLevelBytes(3));
Expand Down Expand Up @@ -243,7 +243,7 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic2) {
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
kCompactionStyleLevel, false);
version_builder.Apply(&version_edit);
version_builder.SaveTo(&new_vstorage);
version_builder.SaveTo(&new_vstorage, 0);

ASSERT_EQ(0U, new_vstorage.NumLevelBytes(0));
ASSERT_EQ(200U, new_vstorage.NumLevelBytes(4));
Expand Down Expand Up @@ -320,7 +320,7 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic3) {

VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
kCompactionStyleLevel, false);
version_builder.SaveTo(&new_vstorage);
version_builder.SaveTo(&new_vstorage, 0);

ASSERT_EQ(0U, new_vstorage.NumLevelBytes(1));
ASSERT_EQ(150U, new_vstorage.NumLevelBytes(2));
Expand Down Expand Up @@ -351,7 +351,7 @@ TEST_F(VersionBuilderTest, ApplyMultipleAndSaveTo) {
VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels,
kCompactionStyleLevel, false);
version_builder.Apply(&version_edit);
version_builder.SaveTo(&new_vstorage);
version_builder.SaveTo(&new_vstorage, 0);

ASSERT_EQ(500U, new_vstorage.NumLevelBytes(2));
ASSERT_TRUE(VerifyDependFiles(&new_vstorage, {666, 676, 636, 616, 606}));
Expand Down Expand Up @@ -389,7 +389,7 @@ TEST_F(VersionBuilderTest, ApplyDeleteAndSaveTo) {
GetInternalKey("850"), 200, 200, false, {});
version_builder.Apply(&version_edit2);

version_builder.SaveTo(&new_vstorage);
version_builder.SaveTo(&new_vstorage, 0);

ASSERT_EQ(300U, new_vstorage.NumLevelBytes(2));
ASSERT_TRUE(VerifyDependFiles(&new_vstorage, {666, 676, 606}));
Expand Down Expand Up @@ -489,7 +489,7 @@ TEST_F(VersionBuilderTest, HugeLSM) {

version_builder.Apply(&version_edit);

version_builder.SaveTo(&new_vstorage);
version_builder.SaveTo(&new_vstorage, 0);

UnrefFilesInVersion(&new_vstorage);
}
Expand Down
4 changes: 4 additions & 0 deletions db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ struct FileMetaData {
kGarbageCollectionPermitted = 2,
};

bool has_marked_for_compaction(uint8_t flag) {
return (marked_for_compaction & flag) != 0;
}

bool is_output_to_parent_level() const {
constexpr uint8_t kFlag =
kMarkedFromUser | kMarkedFromRangeDeletion | kMarkedFromTableBuilder;
Expand Down
13 changes: 8 additions & 5 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -741,11 +741,12 @@ class BaseReferencedVersionBuilder {
versions->LogAndApplyHelper(version_->cfd(), version_builder_, version_,
edit, mu, false);
}
void DoApplyAndSaveTo(VersionStorageInfo* vstorage) {
void DoApplyAndSaveTo(VersionStorageInfo* vstorage,
double maintainer_job_ratio) {
for (auto edit : edit_list_) {
version_builder_->Apply(edit);
}
version_builder_->SaveTo(vstorage);
version_builder_->SaveTo(vstorage, maintainer_job_ratio);
}

private:
Expand Down Expand Up @@ -3189,7 +3190,9 @@ Status VersionSet::ProcessManifestWrites(std::deque<ManifestWriter>& writers,
for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
assert(!builder_guards.empty() &&
builder_guards.size() == versions.size());
builder_guards[i]->DoApplyAndSaveTo(versions[i]->storage_info());
builder_guards[i]->DoApplyAndSaveTo(
versions[i]->storage_info(),
mutable_cf_options_ptrs[i]->maintainer_job_ratio);
}
}

Expand Down Expand Up @@ -3926,7 +3929,7 @@ Status VersionSet::Recover(
Version* v = new Version(cfd, this, env_options_,
*cfd->GetLatestMutableCFOptions(),
current_version_number_++);
builder->SaveTo(v->storage_info());
builder->SaveTo(v->storage_info(), 0);

// Install recovered version
v->PrepareApply(*cfd->GetLatestMutableCFOptions());
Expand Down Expand Up @@ -4296,7 +4299,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
Version* v = new Version(cfd, this, env_options_,
*cfd->GetLatestMutableCFOptions(),
current_version_number_++);
builder->SaveTo(v->storage_info());
builder->SaveTo(v->storage_info(), 0);
v->PrepareApply(*cfd->GetLatestMutableCFOptions());

printf("--------------- Column family \"%s\" (ID %u) --------------\n",
Expand Down
4 changes: 3 additions & 1 deletion include/rocksdb/c.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ extern "C" {
#endif

#include <stdarg.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdbool.h>

/* Exported types */

Expand Down Expand Up @@ -931,6 +931,8 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_blob_file_defragment_size(
rocksdb_options_t*, uint64_t);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_max_dependence_blob_overlap(
rocksdb_options_t*, size_t);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_maintainer_job_ratio(
rocksdb_options_t*, double);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_optimize_filters_for_hits(
rocksdb_options_t*, bool);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_optimize_range_deletion(
Expand Down
4 changes: 4 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,10 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions {
// 0 to unlimited
size_t max_dependence_blob_overlap = 1024;

// Maintainer job ratio
// 0 to 1
double maintainer_job_ratio = 0.1;

// This is a factory that provides TableFactory objects.
// Default: a block-based table factory that provides a default
// implementation of TableBuilder and TableReader with default
Expand Down
3 changes: 3 additions & 0 deletions options/cf_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ void MutableCFOptions::Dump(Logger* log) const {
blob_file_defragment_size);
ROCKS_LOG_INFO(log, " max_dependence_blob_overlap: %zu",
max_dependence_blob_overlap);
ROCKS_LOG_INFO(log, " maintainer_job_ratio: %f",
maintainer_job_ratio);
ROCKS_LOG_INFO(log, " soft_pending_compaction_bytes_limit: %" PRIu64,
soft_pending_compaction_bytes_limit);
ROCKS_LOG_INFO(log, " hard_pending_compaction_bytes_limit: %" PRIu64,
Expand Down Expand Up @@ -296,6 +298,7 @@ MutableCFOptions::MutableCFOptions(const ColumnFamilyOptions& options, Env* env)
target_blob_file_size(options.target_blob_file_size),
blob_file_defragment_size(options.blob_file_defragment_size),
max_dependence_blob_overlap(options.max_dependence_blob_overlap),
maintainer_job_ratio(options.maintainer_job_ratio),
soft_pending_compaction_bytes_limit(
options.soft_pending_compaction_bytes_limit),
hard_pending_compaction_bytes_limit(
Expand Down
2 changes: 2 additions & 0 deletions options/cf_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ struct MutableCFOptions {
target_blob_file_size(0),
blob_file_defragment_size(0),
max_dependence_blob_overlap(0),
maintainer_job_ratio(0),
soft_pending_compaction_bytes_limit(0),
hard_pending_compaction_bytes_limit(0),
level0_file_num_compaction_trigger(0),
Expand Down Expand Up @@ -222,6 +223,7 @@ struct MutableCFOptions {
uint64_t target_blob_file_size;
uint64_t blob_file_defragment_size;
size_t max_dependence_blob_overlap;
double maintainer_job_ratio;
uint64_t soft_pending_compaction_bytes_limit;
uint64_t hard_pending_compaction_bytes_limit;
int level0_file_num_compaction_trigger;
Expand Down
2 changes: 2 additions & 0 deletions options/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
blob_file_defragment_size);
ROCKS_LOG_HEADER(log, " Options.max_dependence_blob_overlap: %zu",
max_dependence_blob_overlap);
ROCKS_LOG_HEADER(log, " Options.maintainer_job_ratio: %f",
maintainer_job_ratio);
ROCKS_LOG_HEADER(log, " Options.ttl_gc_ratio: %f",
ttl_gc_ratio);
ROCKS_LOG_HEADER(log, " Options.ttl_max_scan_gap: %zd",
Expand Down
9 changes: 7 additions & 2 deletions options/options_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ ColumnFamilyOptions BuildColumnFamilyOptions(
mutable_cf_options.blob_file_defragment_size;
cf_opts.max_dependence_blob_overlap =
mutable_cf_options.max_dependence_blob_overlap;
cf_opts.maintainer_job_ratio = mutable_cf_options.maintainer_job_ratio;
cf_opts.optimize_filters_for_hits =
mutable_cf_options.optimize_filters_for_hits;
cf_opts.optimize_range_deletion = mutable_cf_options.optimize_range_deletion;
Expand Down Expand Up @@ -1780,8 +1781,8 @@ std::unordered_map<std::string, OptionTypeInfo>
OptionType::kBoolean, OptionVerificationType::kNormal, false,
offsetof(struct ImmutableDBOptions, avoid_unnecessary_blocking_io)}},
{"zenfs_gc_ratio",
{offsetof(struct DBOptions, zenfs_gc_ratio),
OptionType::kDouble, OptionVerificationType::kNormal, false,
{offsetof(struct DBOptions, zenfs_gc_ratio), OptionType::kDouble,
OptionVerificationType::kNormal, false,
offsetof(struct ImmutableDBOptions, zenfs_gc_ratio)}}};

std::unordered_map<std::string, BlockBasedTableOptions::IndexType>
Expand Down Expand Up @@ -1936,6 +1937,10 @@ std::unordered_map<std::string, OptionTypeInfo>
{offset_of(&ColumnFamilyOptions::max_dependence_blob_overlap),
OptionType::kSizeT, OptionVerificationType::kNormal, true,
offsetof(struct MutableCFOptions, max_dependence_blob_overlap)}},
{"maintainer_job_ratio",
{offset_of(&ColumnFamilyOptions::maintainer_job_ratio),
OptionType::kDouble, OptionVerificationType::kNormal, true,
offsetof(struct MutableCFOptions, maintainer_job_ratio)}},
{"filter_deletes",
{0, OptionType::kBoolean, OptionVerificationType::kDeprecated, true,
0}},
Expand Down
1 change: 1 addition & 0 deletions options/options_settable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"target_blob_file_size=0;"
"blob_file_defragment_size=0;"
"max_dependence_blob_overlap=1024;"
"maintainer_job_ratio=0.1;"
"optimize_filters_for_hits=false;"
"optimize_range_deletion=false;"
"report_bg_io_stats=true;"
Expand Down
1 change: 1 addition & 0 deletions terark-tools/bloat-test/db.ini
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,4 @@
target_blob_file_size=0
blob_file_defragment_size=0
max_dependence_blob_overlap=1024
maintainer_job_ratio=0.1
Loading

0 comments on commit e257ad5

Please sign in to comment.