diff --git a/db/db_impl.cc b/db/db_impl.cc index 5debc9e87d5..06ea1543b56 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -345,6 +345,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) next_job_id_(1), has_unpersisted_data_(false), env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)), + num_running_addfile_(0), + addfile_cv_(&mutex_), #ifndef ROCKSDB_LITE wal_manager_(immutable_db_options_, env_options_), #endif // ROCKSDB_LITE @@ -894,8 +896,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, } // If the log file is already in the log recycle list , don't put // it in the candidate list. - if (std::find(log_recycle_files.begin(), log_recycle_files.end(),number) != - log_recycle_files.end()) { + if (std::find(log_recycle_files.begin(), log_recycle_files.end(), + number) != log_recycle_files.end()) { Log(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log, "Log %" PRIu64 " Already added in the recycle list, skipping.\n", number); @@ -2031,6 +2033,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, int max_level_with_files = 0; { InstrumentedMutexLock l(&mutex_); + WaitForAddFile(); Version* base = cfd->current(); for (int level = 1; level < base->storage_info()->num_non_empty_levels(); level++) { @@ -2146,6 +2149,10 @@ Status DBImpl::CompactFiles( { InstrumentedMutexLock l(&mutex_); + // This call will unlock/lock the mutex to wait for current running + // AddFile() calls to finish. + WaitForAddFile(); + s = CompactFilesImpl(compact_options, cfd, sv->current, input_file_names, output_level, output_path_id, &job_context, &log_buffer); @@ -3226,6 +3233,11 @@ void DBImpl::BackgroundCallCompaction(void* arg) { immutable_db_options_.info_log.get()); { InstrumentedMutexLock l(&mutex_); + + // This call will unlock/lock the mutex to wait for current running + // AddFile() calls to finish. + WaitForAddFile(); + num_running_compactions_++; auto pending_outputs_inserted_elem = diff --git a/db/db_impl.h b/db/db_impl.h index b368543c7ca..4b0768ae1c2 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -651,6 +651,10 @@ class DBImpl : public DB { int PickLevelForIngestedFile(ColumnFamilyData* cfd, const ExternalSstFileInfo& file_info); + // Wait for current AddFile() calls to finish. + // REQUIRES: mutex_ held + void WaitForAddFile(); + Status CompactFilesImpl( const CompactionOptions& compact_options, ColumnFamilyData* cfd, Version* version, const std::vector& input_file_names, @@ -660,6 +664,10 @@ class DBImpl : public DB { const std::string& file_path, ExternalSstFileInfo* file_info); +#else + // AddFile is not supported in ROCKSDB_LITE so this function + // will be no-op + void WaitForAddFile() {} #endif // ROCKSDB_LITE ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name); @@ -974,6 +982,14 @@ class DBImpl : public DB { // REQUIRES: mutex held std::unordered_set running_compactions_; + // Number of running AddFile() calls. + // REQUIRES: mutex held + int num_running_addfile_; + + // A condition variable that will be signaled whenever + // num_running_addfile_ goes to 0. + InstrumentedCondVar addfile_cv_; + #ifndef ROCKSDB_LITE WalManager wal_manager_; #endif // ROCKSDB_LITE diff --git a/db/db_impl_add_file.cc b/db/db_impl_add_file.cc index b31c34aacbf..e90a5669449 100644 --- a/db/db_impl_add_file.cc +++ b/db/db_impl_add_file.cc @@ -237,12 +237,16 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family, { InstrumentedMutexLock l(&mutex_); + TEST_SYNC_POINT("DBImpl::AddFile:MutexLock"); + const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions(); WriteThread::Writer w; write_thread_.EnterUnbatched(&w, &mutex_); + num_running_addfile_++; + if (!skip_snapshot_check && !snapshots_.empty()) { // Check that no snapshots are being held status = @@ -333,7 +337,13 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family, ReleaseFileNumberFromPendingOutputs( pending_outputs_inserted_elem_list[i]); } - } + + num_running_addfile_--; + if (num_running_addfile_ == 0) { + addfile_cv_.SignalAll(); + } + TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock"); + } // mutex_ is unlocked here; if (!status.ok()) { // We failed to add the files to the database @@ -412,6 +422,13 @@ int DBImpl::PickLevelForIngestedFile(ColumnFamilyData* cfd, return target_level; } + +void DBImpl::WaitForAddFile() { + mutex_.AssertHeld(); + while (num_running_addfile_ > 0) { + addfile_cv_.Wait(); + } +} #endif // ROCKSDB_LITE } // namespace rocksdb diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index 028137d1403..d752786809b 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -984,6 +984,87 @@ TEST_F(ExternalSSTFileTest, PickedLevel) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } +TEST_F(ExternalSSTFileTest, PickedLevelBug) { + Options options = CurrentOptions(); + options.disable_auto_compactions = false; + options.level0_file_num_compaction_trigger = 3; + options.num_levels = 2; + options.env = env_; + DestroyAndReopen(options); + + std::vector file_keys; + + // file #1 in L0 + file_keys = {0, 5, 7}; + for (int k : file_keys) { + ASSERT_OK(Put(Key(k), Key(k))); + } + ASSERT_OK(Flush()); + + // file #2 in L0 + file_keys = {4, 6, 8, 9}; + for (int k : file_keys) { + ASSERT_OK(Put(Key(k), Key(k))); + } + ASSERT_OK(Flush()); + + // We have 2 overlapping files in L0 + EXPECT_EQ(FilesPerLevel(), "2"); + + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"DBImpl::AddFile:MutexLock", "ExternalSSTFileTest::PickedLevelBug:0"}, + {"ExternalSSTFileTest::PickedLevelBug:1", "DBImpl::AddFile:MutexUnlock"}, + }); + + std::atomic bg_compact_started(false); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:Start", + [&](void* arg) { bg_compact_started.store(true); }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // Start a thread that will ingest a new file + std::thread bg_addfile([&]() { + file_keys = {1, 2, 3}; + ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, 1)); + }); + + // Wait for AddFile to start picking levels and writing MANIFEST + TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:0"); + + // While writing the MANIFEST start a thread that will ask for compaction + std::thread bg_compact([&]() { + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + }); + + // We need to verify that no compactions can run while AddFile is + // ingesting the files into the levels it find suitable. So we will + // wait for 2 seconds to give a chance for compactions to run during + // this period, and then make sure that no compactions where able to run + env_->SleepForMicroseconds(1000000 * 2); + ASSERT_FALSE(bg_compact_started.load()); + + // Hold AddFile from finishing writing the MANIFEST + TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:1"); + + bg_addfile.join(); + bg_compact.join(); + + dbfull()->TEST_WaitForCompact(); + + int total_keys = 0; + Iterator* iter = db_->NewIterator(ReadOptions()); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + total_keys++; + } + ASSERT_EQ(total_keys, 10); + + delete iter; + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + TEST_F(ExternalSSTFileTest, PickedLevelDynamic) { Options options = CurrentOptions(); options.disable_auto_compactions = false;