Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Group rocksdb.sst.read.micros stat by IOActivity flush and compaction #11288

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Use thread status util instead of new thread io activity
  • Loading branch information
hx235 committed Apr 20, 2023
commit ea7ee956923d25f05edb363a6946bfc5dc4e01e0
1 change: 0 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,6 @@ set(SOURCES
test_util/sync_point.cc
test_util/sync_point_impl.cc
test_util/testutil.cc
test_util/thread_io_activity.cc
test_util/transaction_test_util.cc
tools/block_cache_analyzer/block_cache_trace_analyzer.cc
tools/dump/db_dump_tool.cc
Expand Down
2 changes: 0 additions & 2 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"table/unique_id.cc",
"test_util/sync_point.cc",
"test_util/sync_point_impl.cc",
"test_util/thread_io_activity.cc",
"test_util/transaction_test_util.cc",
"tools/dump/db_dump_tool.cc",
"tools/io_tracer_parser_tool.cc",
Expand Down Expand Up @@ -574,7 +573,6 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"table/unique_id.cc",
"test_util/sync_point.cc",
"test_util/sync_point_impl.cc",
"test_util/thread_io_activity.cc",
"test_util/transaction_test_util.cc",
"tools/dump/db_dump_tool.cc",
"tools/io_tracer_parser_tool.cc",
Expand Down
7 changes: 0 additions & 7 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
#include "table/table_builder.h"
#include "table/unique_id_impl.h"
#include "test_util/sync_point.h"
#include "test_util/thread_io_activity.h"
#include "util/stop_watch.h"

namespace ROCKSDB_NAMESPACE {
Expand Down Expand Up @@ -619,8 +618,6 @@ void CompactionJob::GenSubcompactionBoundaries() {
Status CompactionJob::Run() {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_RUN);
ThreadIOActivityGuardForTest thread_io_activity_guard(
Env::IOActivity::kCompaction);
TEST_SYNC_POINT("CompactionJob::Run():Start");
log_buffer_->FlushBufferToLog();
LogCompaction();
Expand Down Expand Up @@ -715,8 +712,6 @@ Status CompactionJob::Run() {
compact_->compaction->mutable_cf_options()->prefix_extractor;
std::atomic<size_t> next_file_idx(0);
auto verify_table = [&](Status& output_status) {
ThreadIOActivityGuardForTest verify_table_thread_io_activity_guard(
Env::IOActivity::kCompaction);
while (true) {
size_t file_idx = next_file_idx.fetch_add(1);
if (file_idx >= files_output.size()) {
Expand Down Expand Up @@ -1040,8 +1035,6 @@ void CompactionJob::NotifyOnSubcompactionCompleted(
void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
assert(sub_compact);
assert(sub_compact->compaction);
ThreadIOActivityGuardForTest thread_io_activity_guard(
Env::IOActivity::kCompaction);
if (db_options_.compaction_service) {
CompactionServiceJobStatus comp_status =
ProcessKeyValueCompactionWithCompactionService(sub_compact);
Expand Down
15 changes: 8 additions & 7 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
#include "file/writable_file_writer.h"
#include "logging/logging.h"
#include "monitoring/persistent_stats_history.h"
#include "monitoring/thread_status_util.h"
#include "options/options_helper.h"
#include "rocksdb/table.h"
#include "rocksdb/wal_filter.h"
#include "test_util/sync_point.h"
#include "test_util/thread_io_activity.h"
#include "util/rate_limiter.h"

namespace ROCKSDB_NAMESPACE {
Expand Down Expand Up @@ -924,8 +924,6 @@ Status DBImpl::InitPersistStatsColumnFamily() {
Status DBImpl::LogAndApplyForRecovery(const RecoveryContext& recovery_ctx) {
mutex_.AssertHeld();
assert(versions_->descriptor_log_ == nullptr);
ThreadIOActivityGuardForTest thread_io_activity_guard(
Env::IOActivity::kDBOpen);
const ReadOptions read_options(Env::IOActivity::kDBOpen);
Status s = versions_->LogAndApply(
recovery_ctx.cfds_, recovery_ctx.mutable_cf_opts_, read_options,
Expand Down Expand Up @@ -1570,8 +1568,6 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
assert(std::numeric_limits<uint64_t>::max() ==
cfd->imm()->GetEarliestMemTableID());

ThreadIOActivityGuardForTest thread_io_activity_guard(
Env::IOActivity::kDBOpen);
const uint64_t start_micros = immutable_db_options_.clock->NowMicros();

FileMetaData meta;
Expand Down Expand Up @@ -1747,8 +1743,13 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
const bool kSeqPerBatch = true;
const bool kBatchPerTxn = true;
return DBImpl::Open(db_options, dbname, column_families, handles, dbptr,
!kSeqPerBatch, kBatchPerTxn);
ThreadStatusUtil::SetDB(&dbname, db_options.env,
db_options.enable_thread_tracking);
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType::OP_DBOPEN);
Status s = DBImpl::Open(db_options, dbname, column_families, handles, dbptr,
!kSeqPerBatch, kBatchPerTxn);
ThreadStatusUtil::ResetThreadStatus();
ajkr marked this conversation as resolved.
Show resolved Hide resolved
return s;
}

// TODO: Implement the trimming in flush code path.
Expand Down
3 changes: 0 additions & 3 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
#include "table/table_builder.h"
#include "table/two_level_iterator.h"
#include "test_util/sync_point.h"
#include "test_util/thread_io_activity.h"
#include "util/coding.h"
#include "util/mutexlock.h"
#include "util/stop_watch.h"
Expand Down Expand Up @@ -211,8 +210,6 @@ void FlushJob::PickMemTable() {
Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
bool* switched_to_mempurge) {
TEST_SYNC_POINT("FlushJob::Start");
ThreadIOActivityGuardForTest thread_io_activity_guard(
Env::IOActivity::kFlush);
db_mutex_->AssertHeld();
assert(pick_memtable_called);
// Mempurge threshold can be dynamically changed.
Expand Down
3 changes: 0 additions & 3 deletions db/version_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
#include "db/version_set.h"
#include "port/port.h"
#include "table/table_reader.h"
#include "test_util/thread_io_activity.h"
#include "util/string_util.h"

namespace ROCKSDB_NAMESPACE {
Expand Down Expand Up @@ -1316,8 +1315,6 @@ class VersionBuilder::Rep {
std::atomic<size_t> next_file_meta_idx(0);
std::function<void()> load_handlers_func([&]() {
while (true) {
ThreadIOActivityGuardForTest thread_io_activity_guard(
read_options.io_activity);
size_t file_idx = next_file_meta_idx.fetch_add(1);
if (file_idx >= files_meta.size()) {
break;
Expand Down
5 changes: 0 additions & 5 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
#include "table/two_level_iterator.h"
#include "table/unique_id_impl.h"
#include "test_util/sync_point.h"
#include "test_util/thread_io_activity.h"
#include "util/cast_util.h"
#include "util/coding.h"
#include "util/coro_utils.h"
Expand Down Expand Up @@ -5708,8 +5707,6 @@ Status VersionSet::GetCurrentManifestPath(const std::string& dbname,
Status VersionSet::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
std::string* db_id, bool no_error_if_files_missing) {
ThreadIOActivityGuardForTest thread_io_activity_guard(
Env::IOActivity::kDBOpen);
const ReadOptions read_options(Env::IOActivity::kDBOpen);
// Read "CURRENT" file, which contains a pointer to the current manifest
// file
Expand Down Expand Up @@ -5895,8 +5892,6 @@ Status VersionSet::TryRecoverFromOneManifest(
const std::string& manifest_path,
const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
std::string* db_id, bool* has_missing_table_file) {
ThreadIOActivityGuardForTest thread_io_activity_guard(
Env::IOActivity::kDBOpen);
const ReadOptions read_options(Env::IOActivity::kDBOpen);
ROCKS_LOG_INFO(db_options_->info_log, "Trying to recover from manifest: %s\n",
manifest_path.c_str());
Expand Down
2 changes: 2 additions & 0 deletions db_stress_tool/db_stress_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ DECLARE_int32(create_timestamped_snapshot_one_in);

DECLARE_bool(allow_data_in_errors);

DECLARE_bool(enable_thread_tracking);

// Tiered storage
DECLARE_bool(enable_tiered_storage); // set last_level_temperature
DECLARE_int64(preclude_last_level_data_seconds);
Expand Down
9 changes: 7 additions & 2 deletions db_stress_tool/db_stress_driver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ void ThreadBody(void* v) {
}
}
}

bool RunStressTest(SharedState* shared) {
bool RunStressTestImpl(SharedState* shared) {
SystemClock* clock = db_stress_env->GetSystemClock().get();
StressTest* stress = shared->GetStressTest();

Expand Down Expand Up @@ -207,5 +206,11 @@ bool RunStressTest(SharedState* shared) {
}
return true;
}
bool RunStressTest(SharedState* shared) {
ThreadStatusUtil::RegisterThread(db_stress_env, ThreadStatus::USER);
bool result = RunStressTestImpl(shared);
ThreadStatusUtil::UnregisterThread();
return result;
}
} // namespace ROCKSDB_NAMESPACE
#endif // GFLAGS
14 changes: 9 additions & 5 deletions db_stress_tool/db_stress_env_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#ifdef GFLAGS
#pragma once
#include "db_stress_tool/db_stress_common.h"
#include "test_util/thread_io_activity.h"
#include "monitoring/thread_status_util.h"

namespace ROCKSDB_NAMESPACE {
class DbStressRandomAccessFileWrapper : public FSRandomAccessFileOwnerWrapper {
Expand All @@ -22,10 +22,14 @@ class DbStressRandomAccessFileWrapper : public FSRandomAccessFileOwnerWrapper {
IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) const override {
const Env::IOActivity io_activity = TEST_GetThreadIOActivity();
if (io_activity != Env::IOActivity::kUnknown) {
assert(io_activity == options.io_activity);
}
#ifndef NDEBUG
const ThreadStatus::OperationType thread_op =
ThreadStatusUtil::GetThreadOperation();
Env::IOActivity io_activity =
ThreadStatusUtil::TEST_GetExpectedIOActivity(thread_op);
assert(io_activity == Env::IOActivity::kUnknown ||
io_activity == options.io_activity);
#endif
return target()->Read(offset, n, options, result, scratch, dbg);
}
};
Expand Down
5 changes: 5 additions & 0 deletions db_stress_tool/db_stress_gflags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,11 @@ DEFINE_bool(allow_data_in_errors,
ROCKSDB_NAMESPACE::Options().allow_data_in_errors,
"If true, allow logging data, e.g. key, value in LOG files.");

DEFINE_bool(enable_thread_tracking,
ROCKSDB_NAMESPACE::Options().enable_thread_tracking,
"If true, the status of the threads involved in this DB will be "
"tracked and available via GetThreadList() API.");

DEFINE_int32(verify_iterator_with_expected_state_one_in, 0,
"If non-zero, when TestIterate() is to be called, there is a "
"1/verify_iterator_with_expected_state_one_in "
Expand Down
2 changes: 2 additions & 0 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3215,6 +3215,8 @@ void InitializeOptionsFromFlags(
}

options.allow_data_in_errors = FLAGS_allow_data_in_errors;

options.enable_thread_tracking = FLAGS_enable_thread_tracking;
}

void InitializeOptionsGeneral(
Expand Down
1 change: 1 addition & 0 deletions include/rocksdb/thread_status.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ struct ThreadStatus {
OP_UNKNOWN = 0,
OP_COMPACTION,
OP_FLUSH,
OP_DBOPEN,
NUM_OP_TYPES
};

Expand Down
4 changes: 4 additions & 0 deletions java/rocksjni/portal.h
Original file line number Diff line number Diff line change
Expand Up @@ -6785,6 +6785,8 @@ class OperationTypeJni {
return 0x1;
case ROCKSDB_NAMESPACE::ThreadStatus::OperationType::OP_FLUSH:
return 0x2;
case ROCKSDB_NAMESPACE::ThreadStatus::OperationType::OP_DBOPEN:
return 0x3;
default:
return 0x7F; // undefined
}
Expand All @@ -6801,6 +6803,8 @@ class OperationTypeJni {
return ROCKSDB_NAMESPACE::ThreadStatus::OperationType::OP_COMPACTION;
case 0x2:
return ROCKSDB_NAMESPACE::ThreadStatus::OperationType::OP_FLUSH;
case 0x3:
return ROCKSDB_NAMESPACE::ThreadStatus::OperationType::OP_DBOPEN;
default:
// undefined/default
return ROCKSDB_NAMESPACE::ThreadStatus::OperationType::OP_UNKNOWN;
Expand Down
3 changes: 2 additions & 1 deletion java/src/main/java/org/rocksdb/OperationType.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
public enum OperationType {
OP_UNKNOWN((byte)0x0),
OP_COMPACTION((byte)0x1),
OP_FLUSH((byte)0x2);
OP_FLUSH((byte) 0x2),
OP_DBOPEN((byte) 0x3);

private final byte value;

Expand Down
63 changes: 42 additions & 21 deletions monitoring/thread_status_updater.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,29 @@ void ThreadStatusUpdater::ResetThreadStatus() {
ClearThreadState();
ClearThreadOperation();
SetColumnFamilyInfoKey(nullptr);
SetDBInfoName(nullptr);
}

void ThreadStatusUpdater::SetDBInfoName(const std::string* db_name) {
auto* data = Get();
if (data == nullptr) {
return;
}

data->enable_tracking = ThreadStatusUpdater::ShouldEnableTracking(
db_name, data->cf_key.load(std::memory_order_relaxed));
data->db_name.store(const_cast<std::string*>(db_name),
std::memory_order_relaxed);
}

void ThreadStatusUpdater::SetColumnFamilyInfoKey(const void* cf_key) {
auto* data = Get();
if (data == nullptr) {
return;
}
// set the tracking flag based on whether cf_key is non-null or not.
// If enable_thread_tracking is set to false, the input cf_key
// would be nullptr.
data->enable_tracking = (cf_key != nullptr);

data->enable_tracking = ThreadStatusUpdater::ShouldEnableTracking(
data->db_name.load(std::memory_order_relaxed), cf_key);
data->cf_key.store(const_cast<void*>(cf_key), std::memory_order_relaxed);
}

Expand Down Expand Up @@ -86,6 +98,14 @@ void ThreadStatusUpdater::SetThreadOperation(
}
}

ThreadStatus::OperationType ThreadStatusUpdater::GetThreadOperation() {
ThreadStatusData* data = GetLocalThreadStatus();
if (data == nullptr) {
return ThreadStatus::OperationType::OP_UNKNOWN;
}
return data->operation_type.load(std::memory_order_relaxed);
}

void ThreadStatusUpdater::SetThreadOperationProperty(int i, uint64_t value) {
auto* data = GetLocalThreadStatus();
if (data == nullptr) {
Expand Down Expand Up @@ -175,31 +195,31 @@ Status ThreadStatusUpdater::GetThreadList(
// use "memory_order_relaxed" to load the cf_key.
auto cf_key = thread_data->cf_key.load(std::memory_order_relaxed);

ThreadStatus::OperationType op_type = ThreadStatus::OP_UNKNOWN;
ThreadStatus::OperationType op_type =
thread_data->operation_type.load(std::memory_order_acquire);
ThreadStatus::OperationStage op_stage = ThreadStatus::STAGE_UNKNOWN;
ThreadStatus::StateType state_type = ThreadStatus::STATE_UNKNOWN;
uint64_t op_elapsed_micros = 0;
uint64_t op_props[ThreadStatus::kNumOperationProperties] = {0};

auto iter = cf_info_map_.find(cf_key);
if (iter != cf_info_map_.end()) {
op_type = thread_data->operation_type.load(std::memory_order_acquire);
// display lower-level info only when higher-level info is available.
if (op_type != ThreadStatus::OP_UNKNOWN) {
op_elapsed_micros = now_micros - thread_data->op_start_time.load(
std::memory_order_relaxed);
op_stage = thread_data->operation_stage.load(std::memory_order_relaxed);
state_type = thread_data->state_type.load(std::memory_order_relaxed);
for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) {
op_props[i] =
thread_data->op_properties[i].load(std::memory_order_relaxed);
}
// display lower-level info only when higher-level info is available.
if (op_type != ThreadStatus::OP_UNKNOWN) {
op_elapsed_micros = now_micros - thread_data->op_start_time.load(
std::memory_order_relaxed);
op_stage = thread_data->operation_stage.load(std::memory_order_relaxed);
state_type = thread_data->state_type.load(std::memory_order_relaxed);
for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) {
op_props[i] =
thread_data->op_properties[i].load(std::memory_order_relaxed);
}
}

auto iter = cf_info_map_.find(cf_key);
thread_list->emplace_back(
thread_id, thread_type,
iter != cf_info_map_.end() ? iter->second.db_name : "",
iter != cf_info_map_.end()
? iter->second.db_name
: (thread_data->db_name ? *thread_data->db_name : ""),
iter != cf_info_map_.end() ? iter->second.cf_name : "", op_type,
op_elapsed_micros, op_stage, op_props, state_type);
}
Expand All @@ -212,8 +232,9 @@ ThreadStatusData* ThreadStatusUpdater::GetLocalThreadStatus() {
return nullptr;
}
if (!thread_status_data_->enable_tracking) {
assert(thread_status_data_->cf_key.load(std::memory_order_relaxed) ==
nullptr);
assert(!ShouldEnableTracking(
thread_status_data_->db_name.load(std::memory_order_relaxed),
thread_status_data_->cf_key.load(std::memory_order_relaxed)));
return nullptr;
}
return thread_status_data_;
Expand Down
Loading