diff --git a/db/db_impl.cc b/db/db_impl.cc index 9fdfbd00c81..e9de712f94f 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2056,16 +2056,16 @@ ColumnFamilyHandle* DBImpl::GetColumnFamilyHandle(uint32_t column_family_id) { // REQUIRED: mutex is NOT held. std::unique_ptr DBImpl::GetColumnFamilyHandleUnlocked( uint32_t column_family_id) { - ColumnFamilyMemTables* cf_memtables = column_family_memtables_.get(); - InstrumentedMutexLock l(&mutex_); - if (!cf_memtables->Seek(column_family_id)) { + auto* cfd = + versions_->GetColumnFamilySet()->GetColumnFamily(column_family_id); + if (cfd == nullptr) { return nullptr; } return std::unique_ptr( - new ColumnFamilyHandleImpl(cf_memtables->current(), this, &mutex_)); + new ColumnFamilyHandleImpl(cfd, this, &mutex_)); } void DBImpl::GetApproximateMemTableStats(ColumnFamilyHandle* column_family, diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 22c25e96cd0..e135632e008 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -460,7 +460,7 @@ Status DBImpl::CompactFiles(const CompactionOptions& compact_options, immutable_db_options_.info_log.get()); // Perform CompactFiles - SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); + TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile2"); { InstrumentedMutexLock l(&mutex_); @@ -468,15 +468,16 @@ Status DBImpl::CompactFiles(const CompactionOptions& compact_options, // IngestExternalFile() calls to finish. WaitForIngestFile(); - s = CompactFilesImpl(compact_options, cfd, sv->current, input_file_names, + // We need to get current after `WaitForIngestFile`, because + // `IngestExternalFile` may add files that overlap with `input_file_names` + auto* current = cfd->current(); + current->Ref(); + + s = CompactFilesImpl(compact_options, cfd, current, input_file_names, output_file_names, output_level, output_path_id, &job_context, &log_buffer); - } - if (sv->Unref()) { - mutex_.Lock(); - sv->Cleanup(); - mutex_.Unlock(); - delete sv; + + current->Unref(); } // Find and delete obsolete files diff --git a/db/db_test2.cc b/db/db_test2.cc index 763eeab38c1..78f1049ff44 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2771,6 +2771,67 @@ TEST_F(DBTest2, TestGetColumnFamilyHandleUnlocked) { rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); } +#ifndef ROCKSDB_LITE +TEST_F(DBTest2, TestCompactFiles) { + // Setup sync point dependency to reproduce the race condition of + // DBImpl::GetColumnFamilyHandleUnlocked + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"TestCompactFiles::IngestExternalFile1", + "TestCompactFiles::IngestExternalFile2"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + + Options options; + options.num_levels = 2; + options.disable_auto_compactions = true; + Reopen(options); + auto* handle = db_->DefaultColumnFamily(); + ASSERT_EQ(db_->NumberLevels(handle), 2); + + rocksdb::SstFileWriter sst_file_writer{rocksdb::EnvOptions(), options}; + std::string external_file1 = dbname_ + "/test_compact_files1.sst_t"; + std::string external_file2 = dbname_ + "/test_compact_files2.sst_t"; + std::string external_file3 = dbname_ + "/test_compact_files3.sst_t"; + + ASSERT_OK(sst_file_writer.Open(external_file1)); + ASSERT_OK(sst_file_writer.Put("1", "1")); + ASSERT_OK(sst_file_writer.Put("2", "2")); + ASSERT_OK(sst_file_writer.Finish()); + + ASSERT_OK(sst_file_writer.Open(external_file2)); + ASSERT_OK(sst_file_writer.Put("3", "3")); + ASSERT_OK(sst_file_writer.Put("4", "4")); + ASSERT_OK(sst_file_writer.Finish()); + + ASSERT_OK(sst_file_writer.Open(external_file3)); + ASSERT_OK(sst_file_writer.Put("5", "5")); + ASSERT_OK(sst_file_writer.Put("6", "6")); + ASSERT_OK(sst_file_writer.Finish()); + + ASSERT_OK(db_->IngestExternalFile(handle, {external_file1, external_file3}, + IngestExternalFileOptions())); + ASSERT_EQ(NumTableFilesAtLevel(1, 0), 2); + std::vector files; + GetSstFiles(env_, dbname_, &files); + ASSERT_EQ(files.size(), 2); + + port::Thread user_thread1( + [&]() { db_->CompactFiles(CompactionOptions(), handle, files, 1); }); + + port::Thread user_thread2([&]() { + ASSERT_OK(db_->IngestExternalFile(handle, {external_file2}, + IngestExternalFileOptions())); + TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile1"); + }); + + user_thread1.join(); + user_thread2.join(); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); +} +#endif // ROCKSDB_LITE + } // namespace rocksdb int main(int argc, char** argv) {