From 859dbda6e3cac17416aff48f1760d01707867351 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Fri, 9 Nov 2018 16:43:08 -0800 Subject: [PATCH] Fix DBTest.SoftLimit flakyness (#4658) Summary: The flakyness can be reproduced with the following patch: ``` --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -2013,6 +2013,9 @@ void DBImpl::BackgroundCallFlush() { if (job_context.HaveSomethingToDelete()) { PurgeObsoleteFiles(job_context); } + static int f_count = 0; + printf("clean flush job context %d\n", ++f_count); + env_->SleepForMicroseconds(1000000); job_context.Clean(); mutex_.Lock(); } ``` The issue is that FlushMemtable with opt.wait=true does not wait for `OnStallConditionsChanged` being called. The event listener is triggered on `JobContext::Clean`, which happens after flush result is installed. At the time we check for stall condition after flushing memtable, the job context cleanup may not be finished. To fix the flaykyness, we use sync point to create a custom WaitForFlush that waits for context cleanup. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4658 Differential Revision: D13007301 Pulled By: yiwu-arbug fbshipit-source-id: d98395ee7b0ad4c62e83e8d0e9b6028058c61712 --- db/db_impl_compaction_flush.cc | 1 + db/db_test.cc | 70 +++++++++++++++++++++------------- 2 files changed, 44 insertions(+), 27 deletions(-) diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 63c329aa8cb..dd292c0902a 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -2016,6 +2016,7 @@ void DBImpl::BackgroundCallFlush() { job_context.Clean(); mutex_.Lock(); } + TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:ContextCleanedUp"); assert(num_running_flushes_ > 0); num_running_flushes_--; diff --git a/db/db_test.cc b/db/db_test.cc index a0a32f6cd45..ba196d9b2e5 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -5649,41 +5649,18 @@ TEST_F(DBTest, HardLimit) { #if !defined(ROCKSDB_LITE) && !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION) class WriteStallListener : public EventListener { public: - WriteStallListener() - : cond_(&mutex_), - condition_(WriteStallCondition::kNormal), - expected_(WriteStallCondition::kNormal), - expected_set_(false) {} + WriteStallListener() : condition_(WriteStallCondition::kNormal) {} void OnStallConditionsChanged(const WriteStallInfo& info) override { MutexLock l(&mutex_); condition_ = info.condition.cur; - if (expected_set_ && condition_ == expected_) { - cond_.Signal(); - expected_set_ = false; - } } bool CheckCondition(WriteStallCondition expected) { MutexLock l(&mutex_); - if (expected != condition_) { - expected_ = expected; - expected_set_ = true; - while (expected != condition_) { - // We bail out on timeout 500 milliseconds - const uint64_t timeout_us = 500000; - if (cond_.TimedWait(timeout_us)) { - expected_set_ = false; - return false; - } - } - } - return true; + return expected == condition_; } private: port::Mutex mutex_; - port::CondVar cond_; WriteStallCondition condition_; - WriteStallCondition expected_; - bool expected_set_; }; TEST_F(DBTest, SoftLimit) { @@ -5704,6 +5681,41 @@ TEST_F(DBTest, SoftLimit) { WriteStallListener* listener = new WriteStallListener(); options.listeners.emplace_back(listener); + // FlushMemtable with opt.wait=true does not wait for + // `OnStallConditionsChanged` being called. The event listener is triggered + // on `JobContext::Clean`, which happens after flush result is installed. + // We use sync point to create a custom WaitForFlush that waits for + // context cleanup. + port::Mutex flush_mutex; + port::CondVar flush_cv(&flush_mutex); + bool flush_finished = false; + auto InstallFlushCallback = [&]() { + { + MutexLock l(&flush_mutex); + flush_finished = false; + } + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCallFlush:ContextCleanedUp", [&](void*) { + { + MutexLock l(&flush_mutex); + flush_finished = true; + } + flush_cv.SignalAll(); + }); + }; + auto WaitForFlush = [&]() { + { + MutexLock l(&flush_mutex); + while (!flush_finished) { + flush_cv.Wait(); + } + } + SyncPoint::GetInstance()->ClearCallBack( + "DBImpl::BackgroundCallFlush:ContextCleanedUp"); + }; + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + Reopen(options); // Generating 360KB in Level 3 @@ -5739,7 +5751,9 @@ TEST_F(DBTest, SoftLimit) { Put(Key(i), std::string(5000, 'x')); Put(Key(100 - i), std::string(5000, 'x')); // Flush the file. File size is around 30KB. + InstallFlushCallback(); dbfull()->TEST_FlushMemTable(true, true); + WaitForFlush(); } ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed)); @@ -5764,8 +5778,6 @@ TEST_F(DBTest, SoftLimit) { &sleeping_task_low, Env::Priority::LOW); }); - rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); sleeping_task_low.WaitUntilSleeping(); @@ -5774,7 +5786,9 @@ TEST_F(DBTest, SoftLimit) { Put(Key(10 + i), std::string(5000, 'x')); Put(Key(90 - i), std::string(5000, 'x')); // Flush the file. File size is around 30KB. + InstallFlushCallback(); dbfull()->TEST_FlushMemTable(true, true); + WaitForFlush(); } // Wake up sleep task to enable compaction to run and waits @@ -5795,7 +5809,9 @@ TEST_F(DBTest, SoftLimit) { Put(Key(20 + i), std::string(5000, 'x')); Put(Key(80 - i), std::string(5000, 'x')); // Flush the file. File size is around 30KB. + InstallFlushCallback(); dbfull()->TEST_FlushMemTable(true, true); + WaitForFlush(); } // Wake up sleep task to enable compaction to run and waits // for it to go to sleep state again to make sure one compaction