Skip to content

Commit

Permalink
Fix DBTest.SoftLimit flakyness (#4658)
Browse files Browse the repository at this point in the history
Summary:
The flakyness can be reproduced with the following patch:
```
 --- a/db/db_impl_compaction_flush.cc
+++ b/db/db_impl_compaction_flush.cc
@@ -2013,6 +2013,9 @@ void DBImpl::BackgroundCallFlush() {
       if (job_context.HaveSomethingToDelete()) {
         PurgeObsoleteFiles(job_context);
       }
+      static int f_count = 0;
+      printf("clean flush job context %d\n", ++f_count);
+      env_->SleepForMicroseconds(1000000);
       job_context.Clean();
       mutex_.Lock();
     }
```
The issue is that FlushMemtable with opt.wait=true does not wait for `OnStallConditionsChanged` being called. The event listener is triggered on `JobContext::Clean`, which happens after flush result is installed. At the time we check for stall condition after flushing memtable, the job context cleanup may not be finished.

To fix the flaykyness, we use sync point to create a custom WaitForFlush that waits for context cleanup.
Pull Request resolved: #4658

Differential Revision: D13007301

Pulled By: yiwu-arbug

fbshipit-source-id: d98395ee7b0ad4c62e83e8d0e9b6028058c61712
  • Loading branch information
Yi Wu authored and facebook-github-bot committed Nov 10, 2018
1 parent 7a2f98a commit 859dbda
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 27 deletions.
1 change: 1 addition & 0 deletions db/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2016,6 +2016,7 @@ void DBImpl::BackgroundCallFlush() {
job_context.Clean();
mutex_.Lock();
}
TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:ContextCleanedUp");

assert(num_running_flushes_ > 0);
num_running_flushes_--;
Expand Down
70 changes: 43 additions & 27 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5649,41 +5649,18 @@ TEST_F(DBTest, HardLimit) {
#if !defined(ROCKSDB_LITE) && !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
class WriteStallListener : public EventListener {
public:
WriteStallListener()
: cond_(&mutex_),
condition_(WriteStallCondition::kNormal),
expected_(WriteStallCondition::kNormal),
expected_set_(false) {}
WriteStallListener() : condition_(WriteStallCondition::kNormal) {}
void OnStallConditionsChanged(const WriteStallInfo& info) override {
MutexLock l(&mutex_);
condition_ = info.condition.cur;
if (expected_set_ && condition_ == expected_) {
cond_.Signal();
expected_set_ = false;
}
}
bool CheckCondition(WriteStallCondition expected) {
MutexLock l(&mutex_);
if (expected != condition_) {
expected_ = expected;
expected_set_ = true;
while (expected != condition_) {
// We bail out on timeout 500 milliseconds
const uint64_t timeout_us = 500000;
if (cond_.TimedWait(timeout_us)) {
expected_set_ = false;
return false;
}
}
}
return true;
return expected == condition_;
}
private:
port::Mutex mutex_;
port::CondVar cond_;
WriteStallCondition condition_;
WriteStallCondition expected_;
bool expected_set_;
};

TEST_F(DBTest, SoftLimit) {
Expand All @@ -5704,6 +5681,41 @@ TEST_F(DBTest, SoftLimit) {
WriteStallListener* listener = new WriteStallListener();
options.listeners.emplace_back(listener);

// FlushMemtable with opt.wait=true does not wait for
// `OnStallConditionsChanged` being called. The event listener is triggered
// on `JobContext::Clean`, which happens after flush result is installed.
// We use sync point to create a custom WaitForFlush that waits for
// context cleanup.
port::Mutex flush_mutex;
port::CondVar flush_cv(&flush_mutex);
bool flush_finished = false;
auto InstallFlushCallback = [&]() {
{
MutexLock l(&flush_mutex);
flush_finished = false;
}
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCallFlush:ContextCleanedUp", [&](void*) {
{
MutexLock l(&flush_mutex);
flush_finished = true;
}
flush_cv.SignalAll();
});
};
auto WaitForFlush = [&]() {
{
MutexLock l(&flush_mutex);
while (!flush_finished) {
flush_cv.Wait();
}
}
SyncPoint::GetInstance()->ClearCallBack(
"DBImpl::BackgroundCallFlush:ContextCleanedUp");
};

rocksdb::SyncPoint::GetInstance()->EnableProcessing();

Reopen(options);

// Generating 360KB in Level 3
Expand Down Expand Up @@ -5739,7 +5751,9 @@ TEST_F(DBTest, SoftLimit) {
Put(Key(i), std::string(5000, 'x'));
Put(Key(100 - i), std::string(5000, 'x'));
// Flush the file. File size is around 30KB.
InstallFlushCallback();
dbfull()->TEST_FlushMemTable(true, true);
WaitForFlush();
}
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed));
Expand All @@ -5764,8 +5778,6 @@ TEST_F(DBTest, SoftLimit) {
&sleeping_task_low, Env::Priority::LOW);
});

rocksdb::SyncPoint::GetInstance()->EnableProcessing();

env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW);
sleeping_task_low.WaitUntilSleeping();
Expand All @@ -5774,7 +5786,9 @@ TEST_F(DBTest, SoftLimit) {
Put(Key(10 + i), std::string(5000, 'x'));
Put(Key(90 - i), std::string(5000, 'x'));
// Flush the file. File size is around 30KB.
InstallFlushCallback();
dbfull()->TEST_FlushMemTable(true, true);
WaitForFlush();
}

// Wake up sleep task to enable compaction to run and waits
Expand All @@ -5795,7 +5809,9 @@ TEST_F(DBTest, SoftLimit) {
Put(Key(20 + i), std::string(5000, 'x'));
Put(Key(80 - i), std::string(5000, 'x'));
// Flush the file. File size is around 30KB.
InstallFlushCallback();
dbfull()->TEST_FlushMemTable(true, true);
WaitForFlush();
}
// Wake up sleep task to enable compaction to run and waits
// for it to go to sleep state again to make sure one compaction
Expand Down

0 comments on commit 859dbda

Please sign in to comment.