Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix btree tests #278

Merged
merged 3 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .jenkins/jenkinsfile_nightly
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ pipeline {
steps {
sh "conan create --build missing -o sisl:prerelease=True -o homestore:sanitize=True -o homestore:skip_testing=True -pr debug . ${PROJECT}/${VER}@"
sh "find ${CONAN_USER_HOME} -type f -wholename '*bin/test_index_btree' -exec cp {} .jenkins/test_index_btree \\;"
sh "find ${CONAN_USER_HOME} -type f -wholename '*bin/test_meta_blk_mgr' -exec cp {} .jenkins/test_meta_blk_mgr\\;"
sh "find ${CONAN_USER_HOME} -type f -wholename '*bin/test_log_store' -exec cp {} .jenkins/test_log_store\\;"
sh "find ${CONAN_USER_HOME} -type f -wholename '*bin/test_meta_blk_mgr' -exec cp {} .jenkins/test_meta_blk_mgr \\;"
sh "find ${CONAN_USER_HOME} -type f -wholename '*bin/test_log_store' -exec cp {} .jenkins/test_log_store \\;"
sh "find ${CONAN_USER_HOME} -type f -wholename '*bin/scripts/btree_test.py' -exec install -Dm755 {} .jenkins/btree_test.py \\; "
sh "find ${CONAN_USER_HOME} -type f -wholename '*bin/scripts/log_meta_test.py' -exec install -Dm755 {} .jenkins/log_meta_test.py \\; "
}
Expand Down
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "5.0.3"
version = "5.0.4"
homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
topics = ("ebay", "nublox")
Expand Down
122 changes: 79 additions & 43 deletions src/tests/btree_helpers/btree_test_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ struct BtreeTestHelper {
if (m_is_multi_threaded) {
std::mutex mtx;
m_run_time = SISL_OPTIONS["run_time"].as< uint32_t >();
iomanager.run_on_wait(iomgr::reactor_regex::all_io, [this, &mtx]() {
iomanager.run_on_wait(iomgr::reactor_regex::all_worker, [this, &mtx]() {
auto fv = iomanager.sync_io_capable_fibers();
std::unique_lock lg(mtx);
m_fibers.insert(m_fibers.end(), fv.begin(), fv.end());
Expand Down Expand Up @@ -84,27 +84,30 @@ struct BtreeTestHelper {

public:
void preload(uint32_t preload_size) {
const auto chunk_size = preload_size / m_fibers.size();
const auto last_chunk_size = preload_size % chunk_size ?: chunk_size;
auto test_count = m_fibers.size();

for (std::size_t i = 0; i < m_fibers.size(); ++i) {
const auto start_range = i * chunk_size;
const auto end_range = start_range + ((i == m_fibers.size() - 1) ? last_chunk_size : chunk_size);
iomanager.run_on_forget(m_fibers[i], [this, start_range, end_range, &test_count]() {
for (uint32_t i = start_range; i < end_range; i++) {
put(i, btree_put_type::INSERT);
}
{
std::unique_lock lg(m_test_done_mtx);
if (--test_count == 0) { m_test_done_cv.notify_one(); }
}
});
}
if (preload_size) {
const auto n_fibers = std::min(preload_size, (uint32_t)m_fibers.size());
const auto chunk_size = preload_size / n_fibers;
const auto last_chunk_size = preload_size % chunk_size ?: chunk_size;
auto test_count = n_fibers;

for (std::size_t i = 0; i < n_fibers; ++i) {
const auto start_range = i * chunk_size;
const auto end_range = start_range + ((i == n_fibers - 1) ? last_chunk_size : chunk_size);
iomanager.run_on_forget(m_fibers[i], [this, start_range, end_range, &test_count]() {
for (uint32_t i = start_range; i < end_range; i++) {
put(i, btree_put_type::INSERT);
}
{
std::unique_lock lg(m_test_done_mtx);
if (--test_count == 0) { m_test_done_cv.notify_one(); }
}
});
}

{
std::unique_lock< std::mutex > lk(m_test_done_mtx);
m_test_done_cv.wait(lk, [&]() { return test_count == 0; });
{
std::unique_lock< std::mutex > lk(m_test_done_mtx);
m_test_done_cv.wait(lk, [&]() { return test_count == 0; });
}
}
LOGINFO("Preload Done");
}
Expand Down Expand Up @@ -157,15 +160,13 @@ struct BtreeTestHelper {
auto pk = std::make_unique< K >(k);

auto rreq = BtreeSingleRemoveRequest{pk.get(), existing_v.get()};
rreq.enable_route_tracing();
bool removed = (m_bt->remove(rreq) == btree_status_t::success);

ASSERT_EQ(removed, m_shadow_map.exists(*pk))
<< "Removal of key " << pk->key() << " status doesn't match with shadow";

if (removed) {
m_shadow_map.validate_data(rreq.key(), (const V&)rreq.value());
m_shadow_map.erase(rreq.key());
}
if (removed) { m_shadow_map.remove_and_check(*pk, *existing_v); }
}

void remove_random() {
Expand Down Expand Up @@ -213,14 +214,17 @@ struct BtreeTestHelper {
auto const expected_count = std::min(remaining, batch_size);

ASSERT_EQ(out_vector.size(), expected_count) << "Received incorrect value on query pagination";
remaining -= expected_count;

if (remaining == 0) {
if (remaining < batch_size) {
ASSERT_EQ(ret, btree_status_t::success) << "Expected success on query";
} else {
} else if (remaining > batch_size) {
ASSERT_EQ(ret, btree_status_t::has_more) << "Expected query to return has_more";
} else if (remaining == batch_size) {
// we don't know, go to the next round
}

remaining -= expected_count;

for (size_t idx{0}; idx < out_vector.size(); ++idx) {
ASSERT_EQ(out_vector[idx].second, it->second)
<< "Range get doesn't return correct data for key=" << it->first << " idx=" << idx;
Expand Down Expand Up @@ -253,7 +257,7 @@ struct BtreeTestHelper {
*copy_key = key;
auto out_v = std::make_unique< V >();
auto req = BtreeSingleGetRequest{copy_key.get(), out_v.get()};

req.enable_route_tracing();
const auto ret = m_bt->get(req);
ASSERT_EQ(ret, btree_status_t::success) << "Missing key " << key << " in btree but present in shadow map";
ASSERT_EQ((const V&)req.value(), value)
Expand All @@ -265,7 +269,7 @@ struct BtreeTestHelper {
auto pk = std::make_unique< K >(k);
auto out_v = std::make_unique< V >();
auto req = BtreeSingleGetRequest{pk.get(), out_v.get()};

req.enable_route_tracing();
const auto status = m_bt->get(req);
if (status == btree_status_t::success) {
m_shadow_map.validate_data(req.key(), (const V&)req.value());
Expand All @@ -279,6 +283,7 @@ struct BtreeTestHelper {
auto out_v = std::make_unique< V >();
auto req =
BtreeGetAnyRequest< K >{BtreeKeyRange< K >{K{start_k}, true, K{end_k}, true}, out_k.get(), out_v.get()};
req.enable_route_tracing();
const auto status = m_bt->get(req);

if (status == btree_status_t::success) {
Expand Down Expand Up @@ -335,6 +340,7 @@ struct BtreeTestHelper {
auto existing_v = std::make_unique< V >();
K key = K{k};
auto sreq = BtreeSinglePutRequest{&key, &value, put_type, existing_v.get()};
sreq.enable_route_tracing();
bool done = (m_bt->put(sreq) == btree_status_t::success);

if (put_type == btree_put_type::INSERT) {
Expand All @@ -351,43 +357,73 @@ struct BtreeTestHelper {
K end_key = K{end_k};

auto rreq = BtreeRangeRemoveRequest< K >{BtreeKeyRange< K >{start_key, true, end_key, true}};
rreq.enable_route_tracing();
auto const ret = m_bt->remove(rreq);

m_shadow_map.range_erase(start_key, end_key);

if (all_existing) {
m_shadow_map.range_erase(start_key, end_key);
ASSERT_EQ((ret == btree_status_t::success), true)
<< "not a successful remove op for range " << start_k << "-" << end_k;
}

if (start_k < m_max_range_input) {
m_shadow_map.remove_keys(start_k, std::min(end_k, uint64_cast(m_max_range_input - 1)));
} else if (start_k < m_max_range_input) {
K end_range{std::min(end_k, uint64_cast(m_max_range_input - 1))};
m_shadow_map.range_erase(start_key, end_range);
}
}

protected:
void run_in_parallel(const std::vector< std::pair< std::string, int > >& op_list) {
auto test_count = m_fibers.size();
for (auto it = m_fibers.begin(); it < m_fibers.end(); ++it) {
iomanager.run_on_forget(*it, [this, &test_count, op_list]() {
const auto total_iters = SISL_OPTIONS["num_iters"].as< uint32_t >();
const auto num_iters_per_thread = total_iters / m_fibers.size();
const auto extra_iters = total_iters % num_iters_per_thread;
LOGINFO("number of fibers {} num_iters_per_thread {} extra_iters {} ", m_fibers.size(), num_iters_per_thread,
extra_iters);

for (uint32_t fiber_id = 0; fiber_id < m_fibers.size(); ++fiber_id) {
auto num_iters_this_fiber = num_iters_per_thread + (fiber_id < extra_iters ? 1 : 0);
iomanager.run_on_forget(m_fibers[fiber_id], [this, fiber_id, &test_count, op_list, num_iters_this_fiber]() {
std::random_device g_rd{};
std::default_random_engine re{g_rd()};
const auto num_iters_per_thread =
sisl::round_up(SISL_OPTIONS["num_iters"].as< uint32_t >() / m_fibers.size(), m_fibers.size());
std::vector< uint32_t > weights;
std::transform(op_list.begin(), op_list.end(), std::back_inserter(weights),
[](const auto& pair) { return pair.second; });

double progress_interval = (double)num_iters_this_fiber / 20; // 5% of the total number of iterations
double progress_thresh = progress_interval; // threshold for progress interval
double elapsed_time, progress_percent, last_progress_time = 0;

// Construct a weighted distribution based on the input frequencies
std::discrete_distribution< uint32_t > s_rand_op_generator(weights.begin(), weights.end());
auto m_start_time = Clock::now();
auto time_to_stop = [this, m_start_time]() {
return (get_elapsed_time_sec(m_start_time) > m_run_time);
};

auto time_to_stop = [this, m_start_time]() {return (get_elapsed_time_sec(m_start_time) > m_run_time);};

for (uint32_t i = 0; i < num_iters_per_thread && !time_to_stop(); i++) {
for (uint32_t i = 0; i < num_iters_this_fiber && !time_to_stop(); i++) {
uint32_t op_idx = s_rand_op_generator(re);
(this->m_operations[op_list[op_idx].first])();
m_num_ops.fetch_add(1);

if (fiber_id == 0) {
elapsed_time = get_elapsed_time_sec(m_start_time);
progress_percent = (double)i / num_iters_this_fiber * 100;

// check progress every 5% of the total number of iterations or every 30 seconds
bool print_time = false;
if (i >= progress_thresh) {
progress_thresh += progress_interval;
print_time = true;
}
if (elapsed_time - last_progress_time > 30) {
last_progress_time = elapsed_time;
print_time = true;
}
if (print_time) {
LOGINFO("Progress: iterations completed ({:.2f}%)- Elapsed time: {:.0f} seconds of total "
"{} - total entries: {}",
progress_percent, elapsed_time, m_run_time, m_shadow_map.size());
}
}
}
{
std::unique_lock lg(m_test_done_mtx);
Expand Down
44 changes: 31 additions & 13 deletions src/tests/btree_helpers/shadow_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ShadowMap {
}

std::pair< K, K > pick_existing_range(const K& start_key, uint32_t max_count) const {
std::shared_lock lock{m_mutex};
std::lock_guard lock{m_mutex};
auto const start_it = m_map.lower_bound(start_key);
auto it = start_it;
uint32_t count = 0;
Expand All @@ -59,12 +59,12 @@ class ShadowMap {
}

bool exists(const K& key) const {
std::shared_lock lock{m_mutex};
std::lock_guard lock{m_mutex};
return m_map.find(key) != m_map.end();
}

bool exists_in_range(const K& key, uint64_t start_k, uint64_t end_k) const {
std::shared_lock lock{m_mutex};
std::lock_guard lock{m_mutex};
const auto itlower = m_map.lower_bound(K{start_k});
const auto itupper = m_map.upper_bound(K{end_k});
auto it = itlower;
Expand All @@ -76,7 +76,7 @@ class ShadowMap {
}

uint64_t size() const {
std::shared_lock lock{m_mutex};
std::lock_guard lock{m_mutex};
return m_map.size();
}

Expand All @@ -87,12 +87,21 @@ class ShadowMap {
}

void validate_data(const K& key, const V& btree_val) const {
std::shared_lock lock{m_mutex};
std::lock_guard lock{m_mutex};
const auto r = m_map.find(key);
ASSERT_NE(r, m_map.end()) << "Key " << key.to_string() << " is not present in shadow map";
ASSERT_EQ(btree_val, r->second) << "Found value in btree doesn't return correct data for key=" << r->first;
}

void remove_and_check(const K& key, const V& btree_val) {
std::lock_guard lock{m_mutex};
const auto r = m_map.find(key);
ASSERT_NE(r, m_map.end()) << "Key " << key.to_string() << " is not present in shadow map";
ASSERT_EQ(btree_val, r->second) << "Found value in btree doesn't return correct data for key=" << r->first;
m_map.erase(key);
m_range_scheduler.remove_key(key.key());
}

void erase(const K& key) {
std::lock_guard lock{m_mutex};
m_map.erase(key);
Expand All @@ -101,7 +110,7 @@ class ShadowMap {

void range_erase(const K& start_key, uint32_t count) {
std::lock_guard lock{m_mutex};
auto const it = m_map.lower_bound(start_key);
auto it = m_map.lower_bound(start_key);
uint32_t i{0};
while ((it != m_map.cend()) && (i++ < count)) {
it = m_map.erase(it);
Expand All @@ -124,25 +133,34 @@ class ShadowMap {
const std::map< K, V >& map_const() const { return m_map; }

void foreach (std::function< void(K, V) > func) const {
std::shared_lock lock{m_mutex};
std::lock_guard lock{m_mutex};
for (const auto& [key, value] : m_map) {
func(key, value);
}
}

std::pair< uint32_t, uint32_t > pick_random_non_existing_keys(uint32_t max_keys) {
std::shared_lock lock{m_mutex};
return m_range_scheduler.pick_random_non_existing_keys(max_keys);
do {
std::lock_guard lock{m_mutex};
auto ret = m_range_scheduler.pick_random_non_existing_keys(max_keys);
if (ret.first != UINT32_MAX) { return ret; }
} while (true);
}

std::pair< uint32_t, uint32_t > pick_random_existing_keys(uint32_t max_keys) {
std::shared_lock lock{m_mutex};
return m_range_scheduler.pick_random_existing_keys(max_keys);
do {
std::lock_guard lock{m_mutex};
auto ret = m_range_scheduler.pick_random_existing_keys(max_keys);
if (ret.first != UINT32_MAX) { return ret; }
} while (true);
}

std::pair< uint32_t, uint32_t > pick_random_non_working_keys(uint32_t max_keys) {
std::shared_lock lock{m_mutex};
return m_range_scheduler.pick_random_non_working_keys(max_keys);
do {
std::lock_guard lock{m_mutex};
auto ret = m_range_scheduler.pick_random_non_working_keys(max_keys);
if (ret.first != UINT32_MAX) { return ret; }
} while (true);
}

void remove_keys_from_working(uint32_t s, uint32_t e) {
Expand Down
2 changes: 1 addition & 1 deletion src/tests/test_common/homestore_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ class HSTestHelper {

LOGINFO("Starting iomgr with {} threads, spdk: {}", num_threads, is_spdk);
ioenvironment.with_iomgr(
iomgr::iomgr_params{.num_threads = num_threads, .is_spdk = is_spdk, .num_fibers = num_fibers});
iomgr::iomgr_params{.num_threads = num_threads, .is_spdk = is_spdk, .num_fibers = 1 + num_fibers});

auto const http_port = SISL_OPTIONS["http_port"].as< int >();
if (http_port != 0) {
Expand Down
Loading