diff --git a/HISTORY.md b/HISTORY.md index 09bd895927..9c1db9b190 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -28,6 +28,7 @@ Based on RocksDB 8.1.1 * sst_dump: display metaindex_handle and the index_handle's offset and size in footer information (#404). * Static Pinning: Set the default for mid-percent capacity threshold in scoped pinning policy to 70 (#689). * db_bench: Add support for individual scoped pinning policy parameters (#687). +* Enable speedb features: Constrain the interface of SharedOptions (make immutable) (#740). ### Bug Fixes * Fix RepeatableThread to work properly with on thread start callback feature (https://github.com/speedb-io/speedb/pull/667). diff --git a/examples/enable_speedb_features_example.cc b/examples/enable_speedb_features_example.cc index 02ce8346f9..85b31a262d 100644 --- a/examples/enable_speedb_features_example.cc +++ b/examples/enable_speedb_features_example.cc @@ -20,6 +20,7 @@ #include "rocksdb/db.h" #include "rocksdb/options.h" #include "rocksdb/slice.h" +#include "rocksdb/slice_transform.h" using namespace ROCKSDB_NAMESPACE; @@ -55,11 +56,17 @@ int main() { // as listed in the definition of SpeedbSharedOptiopns in options.h op1.create_if_missing = true; op1.compression = rocksdb::kNoCompression; + // NOT having a prefix-extractor (the deafult) will result in the + // memtable_factory==HashSpdbRepFactory //... op1.EnableSpeedbFeatures(so1); op2.create_if_missing = true; op2.compression = rocksdb::kZlibCompression; + // Having a prefix-extractor will result in the + // memtable_factory==SkipListRepFactory + op2.prefix_extractor.reset(NewFixedPrefixTransform(4)); + //... op2.EnableSpeedbFeatures(so1); @@ -124,6 +131,8 @@ int main() { } std::cout << "new_cf was created in db3" << std::endl; + // Cleanup + s = db3->DropColumnFamily(cf); if (!s.ok()) { std::cerr << s.ToString() << std::endl; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index c548881e93..5351ff1726 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -2241,39 +2241,72 @@ struct LiveFilesStorageInfoOptions { // more info and use example can be found in enable_speedb_features_example.cc class SharedOptions { public: - SharedOptions(); + static constexpr size_t kDefaultDelayedWriteRate = 256 * 1024 * 1024ul; + static constexpr size_t kDefaultBucketSize = 1000000; + static constexpr bool kDeafultUseMerge = true; + + static constexpr size_t kWbmPerCfSizeIncrease = 512 * 1024 * 1024ul; + + public: SharedOptions(size_t total_ram_size_bytes, size_t total_threads, - size_t delayed_write_rate = 256 * 1024 * 1024ul, - size_t bucket_size = 1000000, bool use_merge = true); - size_t GetTotalThreads() { return total_threads_; } - size_t GetTotalRamSizeBytes() { return total_ram_size_bytes_; } - size_t GetDelayedWriteRate() { return delayed_write_rate_; } - size_t GetBucketSize() { return bucket_size_; } - size_t IsMergeMemtableSupported() { return use_merge_; } + size_t delayed_write_rate = kDefaultDelayedWriteRate, + size_t bucket_size = kDefaultBucketSize, + bool use_merge = kDeafultUseMerge); + + public: + size_t GetMaxWriteBufferManagerSize() const; + + size_t GetTotalThreads() const { return total_threads_; } + size_t GetTotalRamSizeBytes() const { return total_ram_size_bytes_; } + size_t GetDelayedWriteRate() const { return delayed_write_rate_; } + size_t GetBucketSize() const { return bucket_size_; } + size_t IsMergeMemtableSupported() const { return use_merge_; } + + const Cache* GetCache() const { return cache_.get(); } + const WriteController* GetWriteController() const { + return write_controller_.get(); + }; + const WriteBufferManager* GetWriteBufferManager() const { + return write_buffer_manager_.get(); + } + const TablePinningPolicy* GetPinningPolicy() const { + return pinning_policy_.get(); + } + + private: + void CreateWriteBufferManager(); + void CreatePinningPolicy(); + // this function will increase write buffer manager by increased_by amount // as long as the result is not bigger than the maximum size of // total_ram_size_ /4 void IncreaseWriteBufferSize(size_t increase_by); - void CreatePinningPolicy(); - size_t GetMaxWriteBufferManagerSize() const; - std::shared_ptr cache = nullptr; - std::shared_ptr write_controller = nullptr; - std::shared_ptr write_buffer_manager = nullptr; + private: + std::shared_ptr cache_ = nullptr; + std::shared_ptr write_controller_ = nullptr; + std::shared_ptr write_buffer_manager_ = nullptr; + std::shared_ptr pinning_policy_ = nullptr; + + private: + size_t total_ram_size_bytes_ = 0; + size_t total_threads_ = 0; + size_t delayed_write_rate_ = kDefaultBucketSize; + size_t bucket_size_ = kDefaultBucketSize; + bool use_merge_ = kDeafultUseMerge; + + private: + // For Future Use Env* env = Env::Default(); std::shared_ptr rate_limiter = nullptr; std::shared_ptr sst_file_manager = nullptr; std::shared_ptr info_log = nullptr; std::vector> listeners; std::shared_ptr file_checksum_gen_factory = nullptr; - std::shared_ptr pinning_policy = nullptr; private: - size_t total_threads_ = 0; - size_t total_ram_size_bytes_ = 0; - size_t delayed_write_rate_ = 0; - size_t bucket_size_ = 1000000; - bool use_merge_ = true; + friend struct DBOptions; + friend struct ColumnFamilyOptions; }; } // namespace ROCKSDB_NAMESPACE diff --git a/options/options.cc b/options/options.cc index ff6a7a04f2..dae523abed 100644 --- a/options/options.cc +++ b/options/options.cc @@ -551,33 +551,33 @@ Options* Options::OldDefaults(int rocksdb_major_version, Options* Options::EnableSpeedbFeatures(SharedOptions& shared_options) { EnableSpeedbFeaturesDB(shared_options); EnableSpeedbFeaturesCF(shared_options); + if (memtable_factory->IsInsertConcurrentlySupported() == false) { + assert(allow_concurrent_memtable_write == false); + allow_concurrent_memtable_write = false; + } return this; } SharedOptions::SharedOptions(size_t total_ram_size_bytes, size_t total_threads, size_t delayed_write_rate, size_t bucket_size, - bool use_merge) { - total_threads_ = total_threads; - total_ram_size_bytes_ = total_ram_size_bytes; - delayed_write_rate_ = delayed_write_rate; - bucket_size_ = bucket_size; - use_merge_ = use_merge; - // initial_write_buffer_size_ is initialized to 1 to avoid from empty memory - // which might cause some problems - int initial_write_buffer_size_ = 1; - cache = NewLRUCache(total_ram_size_bytes_); - write_controller.reset( + bool use_merge) + : total_ram_size_bytes_(total_ram_size_bytes), + total_threads_(total_threads), + delayed_write_rate_(delayed_write_rate), + bucket_size_(bucket_size), + use_merge_(use_merge) { + cache_ = NewLRUCache(total_ram_size_bytes_); + write_controller_.reset( new WriteController(true /*dynamic_delay*/, delayed_write_rate_)); - write_buffer_manager.reset(new WriteBufferManager( - initial_write_buffer_size_, cache, true /*allow_stall*/)); + CreateWriteBufferManager(); CreatePinningPolicy(); } void SharedOptions::IncreaseWriteBufferSize(size_t increase_by) { // Max write_buffer_manager->buffer_size() size_t wbm_max_buf_size = GetMaxWriteBufferManagerSize(); - size_t current_buffer_size = write_buffer_manager->buffer_size(); + size_t current_buffer_size = write_buffer_manager_->buffer_size(); size_t set_buf_res = 0; if (current_buffer_size == 1 && increase_by > 1) { @@ -591,14 +591,25 @@ void SharedOptions::IncreaseWriteBufferSize(size_t increase_by) { set_buf_res = wbm_max_buf_size; } if (set_buf_res != 0) { - write_buffer_manager->SetBufferSize(set_buf_res); + write_buffer_manager_->SetBufferSize(set_buf_res); } } +void SharedOptions::CreateWriteBufferManager() { + // initial_write_buffer_size_ is initialized to 1 to avoid from empty memory + // which might cause some problems + size_t initial_write_buffer_size_ = 1U; + + write_buffer_manager_.reset(new WriteBufferManager( + initial_write_buffer_size_, cache_, true /*allow_stall*/, + true /* initiate_fluses */, WriteBufferManager::FlushInitiationOptions(), + WriteBufferManager::kDfltStartDelayPercentThreshold)); +} + void SharedOptions::CreatePinningPolicy() { // Calculate the size of the clean memory - auto clean_memory_capacity = cache->GetCapacity(); - if (write_buffer_manager->cost_to_cache()) { + auto clean_memory_capacity = cache_->GetCapacity(); + if (write_buffer_manager_->cost_to_cache()) { // The WBM's size is increased on every call to EnableSpeedbFeaturesCF() // up to a max size. For simplicity, calculate the space for pinning // as if wbm is at its max size. Otherwise we would have to update the @@ -622,7 +633,7 @@ void SharedOptions::CreatePinningPolicy() { std::ostringstream oss; oss << "id=speedb_scoped_pinning_policy; capacity=" << pinning_capacity; auto s = TablePinningPolicy::CreateFromString(config_options, oss.str(), - &pinning_policy); + &pinning_policy_); assert(s.ok()); } @@ -631,13 +642,12 @@ size_t SharedOptions::GetMaxWriteBufferManagerSize() const { } DBOptions* DBOptions::EnableSpeedbFeaturesDB(SharedOptions& shared_options) { - env = shared_options.env; IncreaseParallelism((int)shared_options.GetTotalThreads()); delayed_write_rate = shared_options.GetDelayedWriteRate(); bytes_per_sync = 1ul << 20; use_dynamic_delay = true; - write_buffer_manager = shared_options.write_buffer_manager; - write_controller = shared_options.write_controller; + write_buffer_manager = shared_options.write_buffer_manager_; + write_controller = shared_options.write_controller_; return this; } @@ -665,8 +675,8 @@ ColumnFamilyOptions* ColumnFamilyOptions::EnableSpeedbFeaturesCF( // to disable flush due to write buffer full // each new column family will ask the write buffer manager to increase the // write buffer size by 512 * 1024 * 1024ul - shared_options.IncreaseWriteBufferSize(512 * 1024 * 1024ul); - auto db_wbf_size = shared_options.write_buffer_manager->buffer_size(); + shared_options.IncreaseWriteBufferSize(SharedOptions::kWbmPerCfSizeIncrease); + auto db_wbf_size = shared_options.write_buffer_manager_->buffer_size(); // cf write_buffer_size write_buffer_size = std::min(db_wbf_size / 4, 64ul << 20); max_write_buffer_number = 4; @@ -682,15 +692,14 @@ ColumnFamilyOptions* ColumnFamilyOptions::EnableSpeedbFeaturesCF( &block_based_table_options.filter_policy); assert(s.ok()); block_based_table_options.cache_index_and_filter_blocks = true; - block_based_table_options.block_cache = shared_options.cache; + block_based_table_options.block_cache = shared_options.cache_; block_based_table_options.cache_index_and_filter_blocks_with_high_priority = true; - block_based_table_options.pinning_policy = shared_options.pinning_policy; + block_based_table_options.pinning_policy = shared_options.pinning_policy_; table_factory.reset(NewBlockBasedTableFactory(block_based_table_options)); } if (prefix_extractor) { - memtable_factory.reset( - NewHashSkipListRepFactory(shared_options.GetBucketSize())); + memtable_factory.reset(new SkipListFactory()); } else { memtable_factory.reset( NewHashSpdbRepFactory(shared_options.GetBucketSize(), diff --git a/options/options_test.cc b/options/options_test.cc index a1a4a50c89..2cbbcfa5cc 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -5200,137 +5200,227 @@ class SharedOptionsTest : public testing::Test {}; TEST_F(SharedOptionsTest, SharedOptionsTest) { size_t total_ram_size_bytes = 100 * 1024 * 1024 * 1024ul; - size_t delayed_write_rate = 256 * 1024 * 1024ul; size_t total_threads = 8; - SharedOptions so(total_ram_size_bytes, total_threads, delayed_write_rate); - - ASSERT_TRUE(so.GetTotalThreads() == total_threads); - ASSERT_TRUE(so.GetDelayedWriteRate() == delayed_write_rate); - ASSERT_TRUE(so.GetTotalRamSizeBytes() == total_ram_size_bytes); - - ASSERT_TRUE(so.write_buffer_manager->buffer_size() == 1); - ASSERT_TRUE(so.cache->GetCapacity() == total_ram_size_bytes); - ASSERT_TRUE(so.write_buffer_manager->IsInitiatingFlushes() == true); - ASSERT_TRUE(so.write_controller->max_delayed_write_rate() == - delayed_write_rate); - ASSERT_TRUE(so.write_controller->is_dynamic_delay()); - ASSERT_TRUE(so.rate_limiter == nullptr); - ASSERT_TRUE(so.sst_file_manager == nullptr); - ASSERT_TRUE(so.info_log == nullptr); - ASSERT_TRUE(so.file_checksum_gen_factory == nullptr); - - ASSERT_TRUE(so.pinning_policy.get() != nullptr); - ASSERT_STREQ(so.pinning_policy->Name(), "speedb_scoped_pinning_policy") - << so.pinning_policy->Name(); - - size_t expected_pinning_capacity = 0.8 * (0.75 * total_ram_size_bytes); - std::string actual_pinning_capacity_str; - ASSERT_OK(so.pinning_policy->GetOption(ConfigOptions(), "capacity", - &actual_pinning_capacity_str)); - ASSERT_EQ(actual_pinning_capacity_str, - std::to_string(expected_pinning_capacity)); + size_t delayed_write_rate = 256 * 1024 * 1024ul; + size_t bucket_size = 50000; + bool use_merge = false; + + // + // Test default values for SharedOptions's ctor + // + + SharedOptions so_with_dflts(total_ram_size_bytes, total_threads); + + ASSERT_EQ(so_with_dflts.GetMaxWriteBufferManagerSize(), + total_ram_size_bytes / 4); + ASSERT_EQ(so_with_dflts.GetTotalRamSizeBytes(), total_ram_size_bytes); + ASSERT_EQ(so_with_dflts.GetTotalThreads(), total_threads); + ASSERT_EQ(so_with_dflts.GetDelayedWriteRate(), + SharedOptions::kDefaultDelayedWriteRate); + ASSERT_EQ(so_with_dflts.GetBucketSize(), SharedOptions::kDefaultBucketSize); + ASSERT_EQ(so_with_dflts.IsMergeMemtableSupported(), + SharedOptions::kDeafultUseMerge); + + auto so_with_dflts_cache = so_with_dflts.GetCache(); + ASSERT_TRUE(so_with_dflts_cache != nullptr); + ASSERT_STREQ(so_with_dflts_cache->Name(), "LRUCache"); + ASSERT_EQ(so_with_dflts_cache->GetCapacity(), total_ram_size_bytes); + + auto so_with_dflts_wc = so_with_dflts.GetWriteController(); + ASSERT_TRUE(so_with_dflts_wc != nullptr); + ASSERT_EQ(so_with_dflts_wc->max_delayed_write_rate(), + SharedOptions::kDefaultDelayedWriteRate); + ASSERT_TRUE(so_with_dflts_wc->is_dynamic_delay()); + + auto so_with_dflts_wbm = so_with_dflts.GetWriteBufferManager(); + ASSERT_TRUE(so_with_dflts_wbm != nullptr); + ASSERT_EQ(so_with_dflts_wbm->buffer_size(), 1U); + ASSERT_TRUE(so_with_dflts_wbm->IsInitiatingFlushes()); + + auto so_with_dflts_pp = so_with_dflts.GetPinningPolicy(); + ASSERT_TRUE(so_with_dflts_pp != nullptr); + ASSERT_STREQ(so_with_dflts_pp->Name(), "speedb_scoped_pinning_policy"); + + std::string so_dflts_capacity_str; + so_with_dflts_pp->GetOption(ConfigOptions(), "capacity", + &so_dflts_capacity_str); + auto so_dflts_expected_pinning_capacity = + 80U * + (total_ram_size_bytes - so_with_dflts.GetMaxWriteBufferManagerSize()) / + 100U; + ASSERT_EQ(so_dflts_capacity_str, + std::to_string(so_dflts_expected_pinning_capacity)); + + // + // Test construction with all values specified + // + + SharedOptions so_no_dflts(total_ram_size_bytes, total_threads, + delayed_write_rate, bucket_size, use_merge); + + ASSERT_EQ(so_no_dflts.GetTotalRamSizeBytes(), total_ram_size_bytes); + ASSERT_EQ(so_no_dflts.GetTotalThreads(), total_threads); + ASSERT_EQ(so_no_dflts.GetDelayedWriteRate(), delayed_write_rate); + ASSERT_EQ(so_no_dflts.GetBucketSize(), bucket_size); + ASSERT_EQ(so_no_dflts.IsMergeMemtableSupported(), use_merge); + + auto so_no_dflts_cache = so_no_dflts.GetCache(); + ASSERT_TRUE(so_no_dflts_cache != nullptr); + ASSERT_STREQ(so_no_dflts_cache->Name(), "LRUCache"); + ASSERT_EQ(so_no_dflts_cache->GetCapacity(), total_ram_size_bytes); + + auto so_no_dflts_wc = so_no_dflts.GetWriteController(); + ASSERT_TRUE(so_no_dflts_wc != nullptr); + ASSERT_EQ(so_no_dflts_wc->max_delayed_write_rate(), delayed_write_rate); + ASSERT_TRUE(so_no_dflts_wc->is_dynamic_delay()); + + auto so_no_dflts_wbm = so_no_dflts.GetWriteBufferManager(); + ASSERT_TRUE(so_no_dflts_wbm != nullptr); + ASSERT_EQ(so_no_dflts_wbm->buffer_size(), 1U); + ASSERT_TRUE(so_no_dflts_wbm->IsInitiatingFlushes()); + + auto so_no_dflts_pp = so_no_dflts.GetPinningPolicy(); + ASSERT_TRUE(so_no_dflts_pp != nullptr); + ASSERT_STREQ(so_with_dflts_pp->Name(), "speedb_scoped_pinning_policy"); + + std::string so_no_dflts_capacity_str; + so_with_dflts_pp->GetOption(ConfigOptions(), "capacity", + &so_no_dflts_capacity_str); + auto so_no_dflts_expected_pinning_capacity = + 80U * + (total_ram_size_bytes - so_no_dflts.GetMaxWriteBufferManagerSize()) / + 100U; + ASSERT_EQ(so_no_dflts_capacity_str, + std::to_string(so_no_dflts_expected_pinning_capacity)); } -TEST_F(SharedOptionsTest, EnableSpeedbFeatures) { - Options op1, op2, op3; - size_t total_ram_size_bytes = 100 * 1024 * 1024 * 1024ul; - size_t delayed_write_rate = 256 * 1024 * 1024ul; - int total_threads = 8; - SharedOptions so(total_ram_size_bytes, total_threads, delayed_write_rate); +namespace test_enable_speedb { - // create the DB if it's not already present - op1.create_if_missing = true; - op2.create_if_missing = true; - op3.create_if_missing = true; +void ValidateDBOptionsPart(const SharedOptions& so, const DBOptions& op, + size_t total_num_cfs) { + ASSERT_EQ(op.max_background_jobs, static_cast(so.GetTotalThreads())); + ASSERT_EQ(op.bytes_per_sync, 1 << 20); + ASSERT_TRUE(op.use_dynamic_delay); + ASSERT_EQ(op.delayed_write_rate, so.GetDelayedWriteRate()); - op1.EnableSpeedbFeatures(so); - ASSERT_TRUE(op1.write_buffer_manager->buffer_size() == - 1 * 512 * 1024 * 1024ul); - op2.EnableSpeedbFeatures(so); - ASSERT_TRUE(op2.write_buffer_manager->buffer_size() == - 2 * 512 * 1024 * 1024ul); - op3.EnableSpeedbFeatures(so); - ASSERT_TRUE(op3.write_buffer_manager->buffer_size() == - 3 * 512 * 1024 * 1024ul); + ASSERT_EQ(op.write_controller.get(), so.GetWriteController()); - ASSERT_EQ(op1.env, so.env); - ASSERT_EQ(op2.env, so.env); - ASSERT_EQ(op3.env, so.env); + ASSERT_EQ(op.write_buffer_manager.get(), so.GetWriteBufferManager()); + if (total_num_cfs > 0U) { + auto expected_wbm_size = + std::min(total_num_cfs * SharedOptions::kWbmPerCfSizeIncrease, + so.GetMaxWriteBufferManagerSize()); + ASSERT_EQ(op.write_buffer_manager->buffer_size(), expected_wbm_size); + } else { + ASSERT_EQ(op.write_buffer_manager->buffer_size(), 1U); + } +} - ASSERT_EQ(op1.max_background_jobs, (int)so.GetTotalThreads()); - ASSERT_EQ(op2.max_background_jobs, (int)so.GetTotalThreads()); - ASSERT_EQ(op3.max_background_jobs, (int)so.GetTotalThreads()); +void ValidateCFOptionsPart(const SharedOptions& so, + const ColumnFamilyOptions& op) { + ASSERT_EQ(op.max_write_buffer_number, 4); + ASSERT_EQ(op.min_write_buffer_number_to_merge, 1); + + ASSERT_TRUE(op.table_factory != nullptr); + auto* table_options = op.table_factory->GetOptions(); - ASSERT_EQ(op1.delayed_write_rate, so.GetDelayedWriteRate()); - ASSERT_EQ(op2.delayed_write_rate, so.GetDelayedWriteRate()); - ASSERT_EQ(op3.delayed_write_rate, so.GetDelayedWriteRate()); + ASSERT_TRUE(table_options->block_cache != nullptr); + ASSERT_EQ(table_options->block_cache.get(), so.GetCache()); - ASSERT_EQ(op1.write_buffer_manager, so.write_buffer_manager); - ASSERT_EQ(op2.write_buffer_manager, so.write_buffer_manager); - ASSERT_EQ(op3.write_buffer_manager, so.write_buffer_manager); + ASSERT_TRUE(table_options->filter_policy != nullptr); + ASSERT_STREQ(table_options->filter_policy->Name(), + "speedb_paired_bloom_filter"); - ASSERT_EQ(op1.write_buffer_manager->buffer_size(), 3 * 512 * 1024 * 1024ul); - ASSERT_EQ(op2.write_buffer_manager->buffer_size(), 3 * 512 * 1024 * 1024ul); - ASSERT_EQ(op3.write_buffer_manager->buffer_size(), 3 * 512 * 1024 * 1024ul); + ASSERT_TRUE(table_options->pinning_policy != nullptr); + ASSERT_EQ(table_options->pinning_policy.get(), so.GetPinningPolicy()); - const auto* sanitized_table_options = - op1.table_factory->GetOptions(); - ASSERT_EQ(sanitized_table_options->block_cache, so.cache); + ASSERT_TRUE(op.memtable_factory != nullptr); + if (op.prefix_extractor != nullptr) { + ASSERT_STREQ(op.memtable_factory->Name(), "SkipListFactory"); + } else { + ASSERT_STREQ(op.memtable_factory->Name(), "HashSpdbRepFactory"); + } } -TEST_F(SharedOptionsTest, EnableSpeedbFeaturesDB) { - DBOptions op; +} // namespace test_enable_speedb + +TEST_F(SharedOptionsTest, EnableSpeedbFeatures) { size_t total_ram_size_bytes = 100 * 1024 * 1024 * 1024ul; size_t delayed_write_rate = 256 * 1024 * 1024ul; int total_threads = 8; + SharedOptions so(total_ram_size_bytes, total_threads, delayed_write_rate); - op.EnableSpeedbFeaturesDB(so); + size_t num_cfs = 0U; - ASSERT_EQ(op.env, so.env); + Options op1; + op1.EnableSpeedbFeatures(so); + ++num_cfs; + test_enable_speedb::ValidateDBOptionsPart(so, op1, num_cfs); + test_enable_speedb::ValidateCFOptionsPart(so, op1); - ASSERT_EQ(op.max_background_jobs, (int)so.GetTotalThreads()); + Options op2; + op2.prefix_extractor.reset(NewFixedPrefixTransform(1)); + op2.allow_concurrent_memtable_write = false; + op2.EnableSpeedbFeatures(so); + ++num_cfs; + test_enable_speedb::ValidateDBOptionsPart(so, op2, num_cfs); + test_enable_speedb::ValidateCFOptionsPart(so, op2); + ASSERT_FALSE(op2.allow_concurrent_memtable_write); - ASSERT_EQ(op.delayed_write_rate, so.GetDelayedWriteRate()); + Options op3; + op3.EnableSpeedbFeatures(so); + ++num_cfs; + test_enable_speedb::ValidateDBOptionsPart(so, op3, num_cfs); + test_enable_speedb::ValidateCFOptionsPart(so, op3); +} - ASSERT_EQ(op.write_buffer_manager, so.write_buffer_manager); +TEST_F(SharedOptionsTest, EnableSpeedbFeaturesDB) { + size_t total_ram_size_bytes = 100 * 1024 * 1024 * 1024ul; + int total_threads = 8; + size_t delayed_write_rate = 256 * 1024 * 1024ul; + SharedOptions so(total_ram_size_bytes, total_threads, delayed_write_rate); - ASSERT_EQ(op.write_buffer_manager->buffer_size(), 1); + DBOptions op; + op.EnableSpeedbFeaturesDB(so); + test_enable_speedb::ValidateDBOptionsPart(so, op, 0 /* num_cfs */); } TEST_F(SharedOptionsTest, EnableSpeedbFeaturesCF) { - Options op; - ColumnFamilyOptions cfo; - - size_t total_ram_size_bytes = 100 * 1024 * 1024 * 1024ul; + size_t total_ram_size_bytes = + 4 * SharedOptions::kWbmPerCfSizeIncrease * 4 + 1; size_t delayed_write_rate = 256 * 1024 * 1024; int total_threads = 8; SharedOptions so(total_ram_size_bytes, total_threads, delayed_write_rate); + ColumnFamilyOptions cfo1; + cfo1.EnableSpeedbFeaturesCF(so); + test_enable_speedb::ValidateCFOptionsPart(so, cfo1); + + ColumnFamilyOptions cfo2; + cfo2.EnableSpeedbFeaturesCF(so); + test_enable_speedb::ValidateCFOptionsPart(so, cfo2); + // create the DB if it's not already present - op.create_if_missing = true; - op.EnableSpeedbFeatures(so); - ASSERT_EQ(op.write_buffer_manager->buffer_size(), 1 * 512 * 1024 * 1024ul); - cfo.EnableSpeedbFeaturesCF(so); - ASSERT_EQ(op.write_buffer_manager->buffer_size(), 2 * 512 * 1024 * 1024ul); - ASSERT_EQ( - op.write_buffer_size, - std::min(op.write_buffer_manager->buffer_size() / 4, 64ul << 20)); - ASSERT_EQ(op.max_write_buffer_number, 4); - ASSERT_EQ(op.min_write_buffer_number_to_merge, 1); - ASSERT_EQ(op.env, so.env); - const auto* sanitized_table_options = - op.table_factory->GetOptions(); - ASSERT_EQ(sanitized_table_options->block_cache, so.cache); - - const auto sanitized_options_overrides = - sanitized_table_options->cache_usage_options.options_overrides; - EXPECT_EQ(sanitized_options_overrides.size(), kNumCacheEntryRoles); - for (auto options_overrides_iter = sanitized_options_overrides.cbegin(); - options_overrides_iter != sanitized_options_overrides.cend(); - ++options_overrides_iter) { - } + Options op1; + op1.EnableSpeedbFeatures(so); + test_enable_speedb::ValidateDBOptionsPart(so, op1, 3 /* num_cfs */); + test_enable_speedb::ValidateCFOptionsPart(so, op1); + + // create the DB if it's not already present + Options op2; + op2.EnableSpeedbFeatures(so); + ASSERT_EQ(op2.write_buffer_manager->buffer_size(), + so.GetMaxWriteBufferManagerSize()); + test_enable_speedb::ValidateDBOptionsPart(so, op2, 4 /* num_cfs */); + + // create the DB if it's not already present + Options op3; + op3.EnableSpeedbFeatures(so); + ASSERT_EQ(op3.write_buffer_manager->buffer_size(), + so.GetMaxWriteBufferManagerSize()); + test_enable_speedb::ValidateDBOptionsPart(so, op3, 5 /* num_cfs */); } } // namespace ROCKSDB_NAMESPACE diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 8ad81a1a38..10d4293b67 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -5209,9 +5209,9 @@ class Benchmark { void OpenDb(Options options, const std::string& db_name, DBWithColumnFamilies* db) { - SharedOptions so(FLAGS_total_ram_size, options.max_background_jobs, - options.delayed_write_rate, FLAGS_hash_bucket_count, - false); + SharedOptions so(FLAGS_total_ram_size, FLAGS_max_background_jobs, + FLAGS_delayed_write_rate, FLAGS_hash_bucket_count, + false /* use_merge */); if (FLAGS_enable_speedb_features) { options.EnableSpeedbFeatures(so); }