Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Account memory of big memory users in BlockBasedTableReader in global memory limit #9748

Closed
wants to merge 9 commits into from
Prev Previous commit
Next Next commit
Feedback: two CRM handles; on-the-fly table properties mem approx
  • Loading branch information
hx235 committed Apr 6, 2022
commit a37970b6660bfcf9d9b09409a2118a5d49e67370
18 changes: 11 additions & 7 deletions cache/cache_reservation_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,18 @@

namespace ROCKSDB_NAMESPACE {

CacheReservationManager::CacheReservationHandle::CacheReservationHandle(
template <CacheEntryRole R>
CacheReservationManagerImpl<R>::CacheReservationHandle::CacheReservationHandle(
std::size_t incremental_memory_used,
std::shared_ptr<CacheReservationManager> cache_res_mgr)
std::shared_ptr<CacheReservationManagerImpl> cache_res_mgr)
: incremental_memory_used_(incremental_memory_used) {
assert(cache_res_mgr);
cache_res_mgr_ = cache_res_mgr;
}

CacheReservationManager::CacheReservationHandle::~CacheReservationHandle() {
assert(cache_res_mgr_);
template <CacheEntryRole R>
CacheReservationManagerImpl<
R>::CacheReservationHandle::~CacheReservationHandle() {
Status s = cache_res_mgr_->ReleaseCacheReservation(incremental_memory_used_);
s.PermitUncheckedError();
}
Expand Down Expand Up @@ -86,12 +88,14 @@ Status CacheReservationManagerImpl<R>::UpdateCacheReservation(
template <CacheEntryRole R>
Status CacheReservationManagerImpl<R>::MakeCacheReservation(
std::size_t incremental_memory_used,
std::unique_ptr<CacheReservationHandle>* handle) {
std::unique_ptr<CacheReservationManager::CacheReservationHandle>* handle) {
assert(handle);
Status s =
UpdateCacheReservation(GetTotalMemoryUsed() + incremental_memory_used);
(*handle).reset(new CacheReservationHandle(incremental_memory_used,
this->shared_from_this()));
(*handle).reset(new CacheReservationManagerImpl::CacheReservationHandle(
incremental_memory_used,
std::enable_shared_from_this<
ajkr marked this conversation as resolved.
Show resolved Hide resolved
CacheReservationManagerImpl<R>>::shared_from_this()));
return s;
}

Expand Down
98 changes: 63 additions & 35 deletions cache/cache_reservation_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,13 @@
namespace ROCKSDB_NAMESPACE {
// CacheReservationManager is an interface for reserving cache space for the
// memory used
class CacheReservationManager
: public std::enable_shared_from_this<CacheReservationManager> {
class CacheReservationManager {
public:
// CacheReservationHandle is for managing the lifetime of a cache reservation
// for an incremental amount of memory used (i.e, incremental_memory_used)
class CacheReservationHandle {
public:
// REQUIRES: cache_res_mgr != nullptr
explicit CacheReservationHandle(
std::size_t incremental_memory_used,
std::shared_ptr<CacheReservationManager> cache_res_mgr);

~CacheReservationHandle();

std::size_t GetIncrementalMemoryUsed() { return incremental_memory_used_; }

private:
std::size_t incremental_memory_used_;
std::shared_ptr<CacheReservationManager> cache_res_mgr_;
virtual ~CacheReservationHandle() {}
};
virtual ~CacheReservationManager() {}
virtual Status UpdateCacheReservation(std::size_t new_memory_used) = 0;
Expand All @@ -57,7 +45,9 @@ class CacheReservationManager

protected:
virtual Status ReleaseCacheReservation(
std::size_t incremental_memory_used) = 0;
std::size_t /* incremental_memory_used */) {
return Status::NotSupported();
}
};

// CacheReservationManagerImpl implements interface CacheReservationManager
Expand All @@ -67,8 +57,23 @@ class CacheReservationManager
// This class is NOT thread-safe, except that GetTotalReservedCacheSize()
// can be called without external synchronization.
template <CacheEntryRole R>
class CacheReservationManagerImpl : public CacheReservationManager {
class CacheReservationManagerImpl
: public CacheReservationManager,
public std::enable_shared_from_this<CacheReservationManagerImpl<R>> {
public:
class CacheReservationHandle
: public CacheReservationManager::CacheReservationHandle {
public:
CacheReservationHandle(
std::size_t incremental_memory_used,
std::shared_ptr<CacheReservationManagerImpl> cache_res_mgr);
~CacheReservationHandle() override;

private:
std::size_t incremental_memory_used_;
std::shared_ptr<CacheReservationManagerImpl> cache_res_mgr_;
};

// Construct a CacheReservationManagerImpl
// @param cache The cache where dummy entries are inserted and released for
// reserving cache space
Expand Down Expand Up @@ -168,7 +173,8 @@ class CacheReservationManagerImpl : public CacheReservationManager {
// REQUIRES: handle != nullptr
Status MakeCacheReservation(
std::size_t incremental_memory_used,
std::unique_ptr<CacheReservationHandle> *handle) override;
std::unique_ptr<CacheReservationManager::CacheReservationHandle> *handle)
override;

// Return the size of the cache (which is a multiple of kSizeDummyEntry)
// successfully reserved by calling UpdateCacheReservation().
Expand Down Expand Up @@ -208,11 +214,36 @@ class CacheReservationManagerImpl : public CacheReservationManager {
CacheKey cache_key_;
};

class ConcurrentCacheReservationManager : public CacheReservationManager {
class ConcurrentCacheReservationManager
: public CacheReservationManager,
public std::enable_shared_from_this<ConcurrentCacheReservationManager> {
public:
class CacheReservationHandle
: public CacheReservationManager::CacheReservationHandle {
public:
CacheReservationHandle(
std::shared_ptr<ConcurrentCacheReservationManager> cache_res_mgr,
std::unique_ptr<CacheReservationManager::CacheReservationHandle>
cache_res_handle) {
assert(cache_res_mgr && cache_res_handle);
cache_res_mgr_ = cache_res_mgr;
cache_res_handle_ = std::move(cache_res_handle);
}

~CacheReservationHandle() override {
std::lock_guard<std::mutex> lock(cache_res_mgr_->cache_res_mgr_mu_);
cache_res_handle_.reset();
}

private:
std::shared_ptr<ConcurrentCacheReservationManager> cache_res_mgr_;
std::unique_ptr<CacheReservationManager::CacheReservationHandle>
cache_res_handle_;
};

explicit ConcurrentCacheReservationManager(
std::shared_ptr<CacheReservationManager> cache_res_mgr) {
cache_res_mgr_ = cache_res_mgr;
cache_res_mgr_ = std::move(cache_res_mgr);
}
ConcurrentCacheReservationManager(const ConcurrentCacheReservationManager &) =
delete;
Expand All @@ -233,12 +264,19 @@ class ConcurrentCacheReservationManager : public CacheReservationManager {
std::size_t incremental_memory_used,
std::unique_ptr<CacheReservationManager::CacheReservationHandle> *handle)
override {
std::lock_guard<std::mutex> lock(cache_res_mgr_mu_);
Status s = cache_res_mgr_->UpdateCacheReservation(
cache_res_mgr_->GetTotalMemoryUsed() + incremental_memory_used);
(*handle).reset(new CacheReservationHandle(incremental_memory_used,
this->shared_from_this()));

std::unique_ptr<CacheReservationManager::CacheReservationHandle>
wrapped_handle;
Status s;
{
std::lock_guard<std::mutex> lock(cache_res_mgr_mu_);
s = cache_res_mgr_->MakeCacheReservation(incremental_memory_used,
&wrapped_handle);
}
(*handle).reset(
new ConcurrentCacheReservationManager::CacheReservationHandle(
std::enable_shared_from_this<
ajkr marked this conversation as resolved.
Show resolved Hide resolved
ConcurrentCacheReservationManager>::shared_from_this(),
std::move(wrapped_handle)));
return s;
}
inline std::size_t GetTotalReservedCacheSize() override {
Expand All @@ -252,15 +290,5 @@ class ConcurrentCacheReservationManager : public CacheReservationManager {
private:
std::mutex cache_res_mgr_mu_;
std::shared_ptr<CacheReservationManager> cache_res_mgr_;

inline Status ReleaseCacheReservation(
std::size_t incremental_memory_used) override {
std::lock_guard<std::mutex> lock(cache_res_mgr_mu_);
assert(cache_res_mgr_->GetTotalMemoryUsed() >= incremental_memory_used);
std::size_t updated_total_mem_used =
cache_res_mgr_->GetTotalMemoryUsed() - incremental_memory_used;
Status s = cache_res_mgr_->UpdateCacheReservation(updated_total_mem_used);
return s;
}
};
} // namespace ROCKSDB_NAMESPACE
26 changes: 26 additions & 0 deletions db/db_table_properties_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include <memory>
#include <unordered_set>
#include <vector>

Expand All @@ -18,6 +19,7 @@
#include "rocksdb/utilities/table_properties_collectors.h"
#include "table/format.h"
#include "table/meta_blocks.h"
#include "table/table_properties_internal.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "util/random.h"
Expand Down Expand Up @@ -409,6 +411,30 @@ TEST_F(DBTablePropertiesTest, GetDbIdentifiersProperty) {
}
}

TEST_F(DBTablePropertiesTest, GetUint64TAndStringPropStartEndPosition) {
TableProperties tp;
tp.orig_file_number = 123;
tp.data_size = 456;
tp.db_id = "123";
tp.db_session_id = "456";
tp.user_collected_properties = {{"some_user_prop", "value"}};

std::pair<const uint64_t*, const uint64_t*> u_props =
TEST_GetUint64TPropStartEndPosition(&tp);
std::pair<const std::string*, const std::string*> s_props =
TEST_GetStringPropStartEndPosition(&tp);

EXPECT_TRUE(u_props.first == &tp.orig_file_number);
EXPECT_TRUE(*u_props.first == tp.orig_file_number);
EXPECT_TRUE(static_cast<const void*>(u_props.second) ==
static_cast<const void*>(s_props.first));

EXPECT_TRUE(s_props.first == &tp.db_id);
EXPECT_TRUE(*s_props.first == tp.db_id);
EXPECT_TRUE(static_cast<const void*>(s_props.second) ==
static_cast<const void*>(&tp.user_collected_properties));
}

class DBTableHostnamePropertyTest
: public DBTestBase,
public ::testing::WithParamInterface<std::tuple<int, std::string>> {
Expand Down
5 changes: 0 additions & 5 deletions include/rocksdb/table_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,6 @@ struct TableProperties {
std::map<std::string, uint64_t> GetAggregatablePropertiesAsMap() const;

std::size_t ApproximateMemoryUsage() const;

void IncreaseApproximateMemoryUsage(std::size_t delta);

private:
std::size_t approx_mem_usage_ = 0;
};

// Extra properties
Expand Down
16 changes: 0 additions & 16 deletions table/meta_blocks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,6 @@ Status ReadTablePropertiesHelper(
// Insert in user-collected properties for API backwards compatibility
new_table_properties->user_collected_properties.insert(
{key, raw_val.ToString()});
new_table_properties->IncreaseApproximateMemoryUsage(key.size() +
raw_val.size());
}
// handle predefined rocksdb properties
uint64_t val;
Expand All @@ -349,46 +347,32 @@ Status ReadTablePropertiesHelper(
continue;
}
*(pos->second) = val;
new_table_properties->IncreaseApproximateMemoryUsage(sizeof(val));
} else if (key == TablePropertiesNames::kDbId) {
new_table_properties->db_id = raw_val.ToString();
new_table_properties->IncreaseApproximateMemoryUsage(raw_val.size());
} else if (key == TablePropertiesNames::kDbSessionId) {
new_table_properties->db_session_id = raw_val.ToString();
new_table_properties->IncreaseApproximateMemoryUsage(raw_val.size());
} else if (key == TablePropertiesNames::kDbHostId) {
new_table_properties->db_host_id = raw_val.ToString();
new_table_properties->IncreaseApproximateMemoryUsage(raw_val.size());
} else if (key == TablePropertiesNames::kFilterPolicy) {
new_table_properties->filter_policy_name = raw_val.ToString();
new_table_properties->IncreaseApproximateMemoryUsage(raw_val.size());
} else if (key == TablePropertiesNames::kColumnFamilyName) {
new_table_properties->column_family_name = raw_val.ToString();
new_table_properties->IncreaseApproximateMemoryUsage(raw_val.size());
} else if (key == TablePropertiesNames::kComparator) {
new_table_properties->comparator_name = raw_val.ToString();
new_table_properties->IncreaseApproximateMemoryUsage(raw_val.size());
} else if (key == TablePropertiesNames::kMergeOperator) {
new_table_properties->merge_operator_name = raw_val.ToString();
new_table_properties->IncreaseApproximateMemoryUsage(raw_val.size());
} else if (key == TablePropertiesNames::kPrefixExtractorName) {
new_table_properties->prefix_extractor_name = raw_val.ToString();
new_table_properties->IncreaseApproximateMemoryUsage(raw_val.size());
} else if (key == TablePropertiesNames::kPropertyCollectors) {
new_table_properties->property_collectors_names = raw_val.ToString();
new_table_properties->IncreaseApproximateMemoryUsage(raw_val.size());
} else if (key == TablePropertiesNames::kCompression) {
new_table_properties->compression_name = raw_val.ToString();
new_table_properties->IncreaseApproximateMemoryUsage(raw_val.size());
} else if (key == TablePropertiesNames::kCompressionOptions) {
new_table_properties->compression_options = raw_val.ToString();
new_table_properties->IncreaseApproximateMemoryUsage(raw_val.size());
} else {
// handle user-collected properties
new_table_properties->user_collected_properties.insert(
{key, raw_val.ToString()});
new_table_properties->IncreaseApproximateMemoryUsage(key.size() +
raw_val.size());
}
}

Expand Down
Loading