Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable speedb features: Constrain the i/f of SharedOptions (make immutable) #742

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Based on RocksDB 8.1.1
* sst_dump: display metaindex_handle and the index_handle's offset and size in footer information (#404).
* Static Pinning: Set the default for mid-percent capacity threshold in scoped pinning policy to 70 (#689).
* db_bench: Add support for individual scoped pinning policy parameters (#687).
* Enable speedb features: Constrain the interface of SharedOptions (make immutable) (#740).

### Bug Fixes
* Fix RepeatableThread to work properly with on thread start callback feature (https://github.com/speedb-io/speedb/pull/667).
Expand Down
9 changes: 9 additions & 0 deletions examples/enable_speedb_features_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"

using namespace ROCKSDB_NAMESPACE;

Expand Down Expand Up @@ -55,11 +56,17 @@ int main() {
// as listed in the definition of SpeedbSharedOptiopns in options.h
op1.create_if_missing = true;
op1.compression = rocksdb::kNoCompression;
// NOT having a prefix-extractor (the deafult) will result in the
// memtable_factory==HashSpdbRepFactory
//...
op1.EnableSpeedbFeatures(so1);

op2.create_if_missing = true;
op2.compression = rocksdb::kZlibCompression;
// Having a prefix-extractor will result in the
// memtable_factory==SkipListRepFactory
op2.prefix_extractor.reset(NewFixedPrefixTransform(4));

//...
op2.EnableSpeedbFeatures(so1);

Expand Down Expand Up @@ -124,6 +131,8 @@ int main() {
}
std::cout << "new_cf was created in db3" << std::endl;

// Cleanup

s = db3->DropColumnFamily(cf);
if (!s.ok()) {
std::cerr << s.ToString() << std::endl;
Expand Down
71 changes: 52 additions & 19 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -2241,39 +2241,72 @@ struct LiveFilesStorageInfoOptions {
// more info and use example can be found in enable_speedb_features_example.cc
class SharedOptions {
public:
SharedOptions();
static constexpr size_t kDefaultDelayedWriteRate = 256 * 1024 * 1024ul;
static constexpr size_t kDefaultBucketSize = 1000000;
static constexpr bool kDeafultUseMerge = true;

static constexpr size_t kWbmPerCfSizeIncrease = 512 * 1024 * 1024ul;

public:
SharedOptions(size_t total_ram_size_bytes, size_t total_threads,
size_t delayed_write_rate = 256 * 1024 * 1024ul,
size_t bucket_size = 1000000, bool use_merge = true);
size_t GetTotalThreads() { return total_threads_; }
size_t GetTotalRamSizeBytes() { return total_ram_size_bytes_; }
size_t GetDelayedWriteRate() { return delayed_write_rate_; }
size_t GetBucketSize() { return bucket_size_; }
size_t IsMergeMemtableSupported() { return use_merge_; }
size_t delayed_write_rate = kDefaultDelayedWriteRate,
size_t bucket_size = kDefaultBucketSize,
bool use_merge = kDeafultUseMerge);

public:
size_t GetMaxWriteBufferManagerSize() const;

size_t GetTotalThreads() const { return total_threads_; }
size_t GetTotalRamSizeBytes() const { return total_ram_size_bytes_; }
size_t GetDelayedWriteRate() const { return delayed_write_rate_; }
size_t GetBucketSize() const { return bucket_size_; }
size_t IsMergeMemtableSupported() const { return use_merge_; }

const Cache* GetCache() const { return cache_.get(); }
const WriteController* GetWriteController() const {
return write_controller_.get();
};
const WriteBufferManager* GetWriteBufferManager() const {
return write_buffer_manager_.get();
}
const TablePinningPolicy* GetPinningPolicy() const {
return pinning_policy_.get();
}

private:
void CreateWriteBufferManager();
void CreatePinningPolicy();

// this function will increase write buffer manager by increased_by amount
// as long as the result is not bigger than the maximum size of
// total_ram_size_ /4
void IncreaseWriteBufferSize(size_t increase_by);
void CreatePinningPolicy();
size_t GetMaxWriteBufferManagerSize() const;

std::shared_ptr<Cache> cache = nullptr;
std::shared_ptr<WriteController> write_controller = nullptr;
std::shared_ptr<WriteBufferManager> write_buffer_manager = nullptr;
private:
std::shared_ptr<Cache> cache_ = nullptr;
std::shared_ptr<WriteController> write_controller_ = nullptr;
std::shared_ptr<WriteBufferManager> write_buffer_manager_ = nullptr;
std::shared_ptr<TablePinningPolicy> pinning_policy_ = nullptr;

private:
size_t total_ram_size_bytes_ = 0;
size_t total_threads_ = 0;
size_t delayed_write_rate_ = kDefaultBucketSize;
size_t bucket_size_ = kDefaultBucketSize;
bool use_merge_ = kDeafultUseMerge;

private:
// For Future Use
Env* env = Env::Default();
std::shared_ptr<RateLimiter> rate_limiter = nullptr;
std::shared_ptr<SstFileManager> sst_file_manager = nullptr;
std::shared_ptr<Logger> info_log = nullptr;
std::vector<std::shared_ptr<EventListener>> listeners;
std::shared_ptr<FileChecksumGenFactory> file_checksum_gen_factory = nullptr;
std::shared_ptr<TablePinningPolicy> pinning_policy = nullptr;

private:
size_t total_threads_ = 0;
size_t total_ram_size_bytes_ = 0;
size_t delayed_write_rate_ = 0;
size_t bucket_size_ = 1000000;
bool use_merge_ = true;
friend struct DBOptions;
friend struct ColumnFamilyOptions;
};

} // namespace ROCKSDB_NAMESPACE
63 changes: 36 additions & 27 deletions options/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -551,33 +551,33 @@ Options* Options::OldDefaults(int rocksdb_major_version,
Options* Options::EnableSpeedbFeatures(SharedOptions& shared_options) {
EnableSpeedbFeaturesDB(shared_options);
EnableSpeedbFeaturesCF(shared_options);
if (memtable_factory->IsInsertConcurrentlySupported() == false) {
assert(allow_concurrent_memtable_write == false);
allow_concurrent_memtable_write = false;
ofriedma marked this conversation as resolved.
Show resolved Hide resolved
}
return this;
}

SharedOptions::SharedOptions(size_t total_ram_size_bytes, size_t total_threads,
size_t delayed_write_rate, size_t bucket_size,
bool use_merge) {
total_threads_ = total_threads;
total_ram_size_bytes_ = total_ram_size_bytes;
delayed_write_rate_ = delayed_write_rate;
bucket_size_ = bucket_size;
use_merge_ = use_merge;
// initial_write_buffer_size_ is initialized to 1 to avoid from empty memory
// which might cause some problems
int initial_write_buffer_size_ = 1;
cache = NewLRUCache(total_ram_size_bytes_);
write_controller.reset(
bool use_merge)
: total_ram_size_bytes_(total_ram_size_bytes),
total_threads_(total_threads),
delayed_write_rate_(delayed_write_rate),
bucket_size_(bucket_size),
use_merge_(use_merge) {
cache_ = NewLRUCache(total_ram_size_bytes_);
write_controller_.reset(
new WriteController(true /*dynamic_delay*/, delayed_write_rate_));
write_buffer_manager.reset(new WriteBufferManager(
initial_write_buffer_size_, cache, true /*allow_stall*/));

CreateWriteBufferManager();
CreatePinningPolicy();
}

void SharedOptions::IncreaseWriteBufferSize(size_t increase_by) {
// Max write_buffer_manager->buffer_size()
size_t wbm_max_buf_size = GetMaxWriteBufferManagerSize();
size_t current_buffer_size = write_buffer_manager->buffer_size();
size_t current_buffer_size = write_buffer_manager_->buffer_size();
size_t set_buf_res = 0;

if (current_buffer_size == 1 && increase_by > 1) {
Expand All @@ -591,14 +591,25 @@ void SharedOptions::IncreaseWriteBufferSize(size_t increase_by) {
set_buf_res = wbm_max_buf_size;
}
if (set_buf_res != 0) {
write_buffer_manager->SetBufferSize(set_buf_res);
write_buffer_manager_->SetBufferSize(set_buf_res);
}
}

void SharedOptions::CreateWriteBufferManager() {
// initial_write_buffer_size_ is initialized to 1 to avoid from empty memory
// which might cause some problems
size_t initial_write_buffer_size_ = 1U;

write_buffer_manager_.reset(new WriteBufferManager(
initial_write_buffer_size_, cache_, true /*allow_stall*/,
true /* initiate_fluses */, WriteBufferManager::FlushInitiationOptions(),
WriteBufferManager::kDfltStartDelayPercentThreshold));
}

void SharedOptions::CreatePinningPolicy() {
// Calculate the size of the clean memory
auto clean_memory_capacity = cache->GetCapacity();
if (write_buffer_manager->cost_to_cache()) {
auto clean_memory_capacity = cache_->GetCapacity();
if (write_buffer_manager_->cost_to_cache()) {
// The WBM's size is increased on every call to EnableSpeedbFeaturesCF()
// up to a max size. For simplicity, calculate the space for pinning
// as if wbm is at its max size. Otherwise we would have to update the
Expand All @@ -622,7 +633,7 @@ void SharedOptions::CreatePinningPolicy() {
std::ostringstream oss;
oss << "id=speedb_scoped_pinning_policy; capacity=" << pinning_capacity;
auto s = TablePinningPolicy::CreateFromString(config_options, oss.str(),
&pinning_policy);
&pinning_policy_);
assert(s.ok());
}

Expand All @@ -631,13 +642,12 @@ size_t SharedOptions::GetMaxWriteBufferManagerSize() const {
}

DBOptions* DBOptions::EnableSpeedbFeaturesDB(SharedOptions& shared_options) {
env = shared_options.env;
IncreaseParallelism((int)shared_options.GetTotalThreads());
delayed_write_rate = shared_options.GetDelayedWriteRate();
bytes_per_sync = 1ul << 20;
use_dynamic_delay = true;
write_buffer_manager = shared_options.write_buffer_manager;
write_controller = shared_options.write_controller;
write_buffer_manager = shared_options.write_buffer_manager_;
write_controller = shared_options.write_controller_;
return this;
}

Expand Down Expand Up @@ -665,8 +675,8 @@ ColumnFamilyOptions* ColumnFamilyOptions::EnableSpeedbFeaturesCF(
// to disable flush due to write buffer full
// each new column family will ask the write buffer manager to increase the
// write buffer size by 512 * 1024 * 1024ul
shared_options.IncreaseWriteBufferSize(512 * 1024 * 1024ul);
auto db_wbf_size = shared_options.write_buffer_manager->buffer_size();
shared_options.IncreaseWriteBufferSize(SharedOptions::kWbmPerCfSizeIncrease);
auto db_wbf_size = shared_options.write_buffer_manager_->buffer_size();
// cf write_buffer_size
write_buffer_size = std::min<size_t>(db_wbf_size / 4, 64ul << 20);
max_write_buffer_number = 4;
Expand All @@ -682,15 +692,14 @@ ColumnFamilyOptions* ColumnFamilyOptions::EnableSpeedbFeaturesCF(
&block_based_table_options.filter_policy);
assert(s.ok());
block_based_table_options.cache_index_and_filter_blocks = true;
block_based_table_options.block_cache = shared_options.cache;
block_based_table_options.block_cache = shared_options.cache_;
block_based_table_options.cache_index_and_filter_blocks_with_high_priority =
true;
block_based_table_options.pinning_policy = shared_options.pinning_policy;
block_based_table_options.pinning_policy = shared_options.pinning_policy_;
table_factory.reset(NewBlockBasedTableFactory(block_based_table_options));
}
if (prefix_extractor) {
memtable_factory.reset(
NewHashSkipListRepFactory(shared_options.GetBucketSize()));
memtable_factory.reset(new SkipListFactory());
} else {
memtable_factory.reset(
NewHashSpdbRepFactory(shared_options.GetBucketSize(),
Expand Down
Loading