Skip to content

Commit

Permalink
Fix AddFile() conflict with compaction output [WaitForAddFile()]
Browse files Browse the repository at this point in the history
Summary:
Since AddFile unlock/lock the mutex inside LogAndApply() we need to ensure that during this period other compactions cannot run since such compactions are not aware of the file we are ingesting and could create a compaction that overlap wit this file

this diff add
- WaitForAddFile() call that will ensure that no AddFile() calls are being processed right now
- Call `WaitForAddFile()` in 3 locations
-- When doing manual Compaction
-- When starting automatic Compaction
-- When  doing CompactFiles()

Test Plan: unit test

Reviewers: lightmark, yiwu, andrewkr, sdong

Reviewed By: sdong

Subscribers: andrewkr, yoshinorim, jkedgar, dhruba

Differential Revision: https://reviews.facebook.net/D64383
  • Loading branch information
IslamAbdelRahman committed Sep 27, 2016
1 parent 9e9f5a0 commit 5c64fb6
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 3 deletions.
16 changes: 14 additions & 2 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
next_job_id_(1),
has_unpersisted_data_(false),
env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
num_running_addfile_(0),
addfile_cv_(&mutex_),
#ifndef ROCKSDB_LITE
wal_manager_(immutable_db_options_, env_options_),
#endif // ROCKSDB_LITE
Expand Down Expand Up @@ -894,8 +896,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
}
// If the log file is already in the log recycle list , don't put
// it in the candidate list.
if (std::find(log_recycle_files.begin(), log_recycle_files.end(),number) !=
log_recycle_files.end()) {
if (std::find(log_recycle_files.begin(), log_recycle_files.end(),
number) != log_recycle_files.end()) {
Log(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log,
"Log %" PRIu64 " Already added in the recycle list, skipping.\n",
number);
Expand Down Expand Up @@ -2031,6 +2033,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
int max_level_with_files = 0;
{
InstrumentedMutexLock l(&mutex_);
WaitForAddFile();
Version* base = cfd->current();
for (int level = 1; level < base->storage_info()->num_non_empty_levels();
level++) {
Expand Down Expand Up @@ -2146,6 +2149,10 @@ Status DBImpl::CompactFiles(
{
InstrumentedMutexLock l(&mutex_);

// This call will unlock/lock the mutex to wait for current running
// AddFile() calls to finish.
WaitForAddFile();

s = CompactFilesImpl(compact_options, cfd, sv->current,
input_file_names, output_level,
output_path_id, &job_context, &log_buffer);
Expand Down Expand Up @@ -3226,6 +3233,11 @@ void DBImpl::BackgroundCallCompaction(void* arg) {
immutable_db_options_.info_log.get());
{
InstrumentedMutexLock l(&mutex_);

// This call will unlock/lock the mutex to wait for current running
// AddFile() calls to finish.
WaitForAddFile();

num_running_compactions_++;

auto pending_outputs_inserted_elem =
Expand Down
16 changes: 16 additions & 0 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,10 @@ class DBImpl : public DB {
int PickLevelForIngestedFile(ColumnFamilyData* cfd,
const ExternalSstFileInfo& file_info);

// Wait for current AddFile() calls to finish.
// REQUIRES: mutex_ held
void WaitForAddFile();

Status CompactFilesImpl(
const CompactionOptions& compact_options, ColumnFamilyData* cfd,
Version* version, const std::vector<std::string>& input_file_names,
Expand All @@ -660,6 +664,10 @@ class DBImpl : public DB {
const std::string& file_path,
ExternalSstFileInfo* file_info);

#else
// AddFile is not supported in ROCKSDB_LITE so this function
// will be no-op
void WaitForAddFile() {}
#endif // ROCKSDB_LITE

ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name);
Expand Down Expand Up @@ -974,6 +982,14 @@ class DBImpl : public DB {
// REQUIRES: mutex held
std::unordered_set<Compaction*> running_compactions_;

// Number of running AddFile() calls.
// REQUIRES: mutex held
int num_running_addfile_;

// A condition variable that will be signaled whenever
// num_running_addfile_ goes to 0.
InstrumentedCondVar addfile_cv_;

#ifndef ROCKSDB_LITE
WalManager wal_manager_;
#endif // ROCKSDB_LITE
Expand Down
19 changes: 18 additions & 1 deletion db/db_impl_add_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,16 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,

{
InstrumentedMutexLock l(&mutex_);
TEST_SYNC_POINT("DBImpl::AddFile:MutexLock");

const MutableCFOptions mutable_cf_options =
*cfd->GetLatestMutableCFOptions();

WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_);

num_running_addfile_++;

if (!skip_snapshot_check && !snapshots_.empty()) {
// Check that no snapshots are being held
status =
Expand Down Expand Up @@ -333,7 +337,13 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
ReleaseFileNumberFromPendingOutputs(
pending_outputs_inserted_elem_list[i]);
}
}

num_running_addfile_--;
if (num_running_addfile_ == 0) {
addfile_cv_.SignalAll();
}
TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock");
} // mutex_ is unlocked here;

if (!status.ok()) {
// We failed to add the files to the database
Expand Down Expand Up @@ -412,6 +422,13 @@ int DBImpl::PickLevelForIngestedFile(ColumnFamilyData* cfd,

return target_level;
}

void DBImpl::WaitForAddFile() {
mutex_.AssertHeld();
while (num_running_addfile_ > 0) {
addfile_cv_.Wait();
}
}
#endif // ROCKSDB_LITE

} // namespace rocksdb
81 changes: 81 additions & 0 deletions db/external_sst_file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,87 @@ TEST_F(ExternalSSTFileTest, PickedLevel) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}

TEST_F(ExternalSSTFileTest, PickedLevelBug) {
Options options = CurrentOptions();
options.disable_auto_compactions = false;
options.level0_file_num_compaction_trigger = 3;
options.num_levels = 2;
options.env = env_;
DestroyAndReopen(options);

std::vector<int> file_keys;

// file #1 in L0
file_keys = {0, 5, 7};
for (int k : file_keys) {
ASSERT_OK(Put(Key(k), Key(k)));
}
ASSERT_OK(Flush());

// file #2 in L0
file_keys = {4, 6, 8, 9};
for (int k : file_keys) {
ASSERT_OK(Put(Key(k), Key(k)));
}
ASSERT_OK(Flush());

// We have 2 overlapping files in L0
EXPECT_EQ(FilesPerLevel(), "2");

rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"DBImpl::AddFile:MutexLock", "ExternalSSTFileTest::PickedLevelBug:0"},
{"ExternalSSTFileTest::PickedLevelBug:1", "DBImpl::AddFile:MutexUnlock"},
});

std::atomic<bool> bg_compact_started(false);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:Start",
[&](void* arg) { bg_compact_started.store(true); });

rocksdb::SyncPoint::GetInstance()->EnableProcessing();

// Start a thread that will ingest a new file
std::thread bg_addfile([&]() {
file_keys = {1, 2, 3};
ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, 1));
});

// Wait for AddFile to start picking levels and writing MANIFEST
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:0");

// While writing the MANIFEST start a thread that will ask for compaction
std::thread bg_compact([&]() {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
});

// We need to verify that no compactions can run while AddFile is
// ingesting the files into the levels it find suitable. So we will
// wait for 2 seconds to give a chance for compactions to run during
// this period, and then make sure that no compactions where able to run
env_->SleepForMicroseconds(1000000 * 2);
ASSERT_FALSE(bg_compact_started.load());

// Hold AddFile from finishing writing the MANIFEST
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:1");

bg_addfile.join();
bg_compact.join();

dbfull()->TEST_WaitForCompact();

int total_keys = 0;
Iterator* iter = db_->NewIterator(ReadOptions());
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
total_keys++;
}
ASSERT_EQ(total_keys, 10);

delete iter;

rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}

TEST_F(ExternalSSTFileTest, PickedLevelDynamic) {
Options options = CurrentOptions();
options.disable_auto_compactions = false;
Expand Down

0 comments on commit 5c64fb6

Please sign in to comment.