Skip to content

Commit

Permalink
Fix CompactFiles bug (facebook#4665)
Browse files Browse the repository at this point in the history
Summary:
`CompactFiles` gets `SuperVersion` before `WaitForIngestFile`, while `IngestExternalFile` may add files that overlap with `input_file_names`

The timeline of execution flow is as follow:

Let's say that level N has two file [1,2] and [5,6]
```
timeline              user_thread1                             user_thread2
t0   |      CompactFiles([1, 2], [5, 6]) begin
t1   |         GetReferencedSuperVersion()
t2   |                                              IngestExternalFile([3,4]) to level N begin
t3   |             CompactFiles resume
     V
```
Pull Request resolved: facebook#4665

Differential Revision: D13030674

Pulled By: ajkr

fbshipit-source-id: 8be19477fd6e505032267a979d32f3097cc3be51
  • Loading branch information
DorianZheng authored and facebook-github-bot committed Nov 12, 2018
1 parent 05dec0c commit 0f88160
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 8 deletions.
17 changes: 9 additions & 8 deletions db/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -705,23 +705,24 @@ Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
immutable_db_options_.info_log.get());

// Perform CompactFiles
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile2");
{
InstrumentedMutexLock l(&mutex_);

// This call will unlock/lock the mutex to wait for current running
// IngestExternalFile() calls to finish.
WaitForIngestFile();

s = CompactFilesImpl(compact_options, cfd, sv->current, input_file_names,
// We need to get current after `WaitForIngestFile`, because
// `IngestExternalFile` may add files that overlap with `input_file_names`
auto* current = cfd->current();
current->Ref();

s = CompactFilesImpl(compact_options, cfd, current, input_file_names,
output_file_names, output_level, output_path_id,
&job_context, &log_buffer);
}
if (sv->Unref()) {
mutex_.Lock();
sv->Cleanup();
mutex_.Unlock();
delete sv;

current->Unref();
}

// Find and delete obsolete files
Expand Down
59 changes: 59 additions & 0 deletions db/db_test2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2888,6 +2888,65 @@ TEST_F(DBTest2, TestGetColumnFamilyHandleUnlocked) {
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
}

TEST_F(DBTest2, TestCompactFiles) {
// Setup sync point dependency to reproduce the race condition of
// DBImpl::GetColumnFamilyHandleUnlocked
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"TestCompactFiles::IngestExternalFile1",
"TestCompactFiles::IngestExternalFile2"},
});
SyncPoint::GetInstance()->EnableProcessing();

Options options;
options.num_levels = 2;
options.disable_auto_compactions = true;
Reopen(options);
auto* handle = db_->DefaultColumnFamily();
ASSERT_EQ(db_->NumberLevels(handle), 2);

rocksdb::SstFileWriter sst_file_writer{rocksdb::EnvOptions(), options};
std::string external_file1 = dbname_ + "/test_compact_files1.sst_t";
std::string external_file2 = dbname_ + "/test_compact_files2.sst_t";
std::string external_file3 = dbname_ + "/test_compact_files3.sst_t";

ASSERT_OK(sst_file_writer.Open(external_file1));
ASSERT_OK(sst_file_writer.Put("1", "1"));
ASSERT_OK(sst_file_writer.Put("2", "2"));
ASSERT_OK(sst_file_writer.Finish());

ASSERT_OK(sst_file_writer.Open(external_file2));
ASSERT_OK(sst_file_writer.Put("3", "3"));
ASSERT_OK(sst_file_writer.Put("4", "4"));
ASSERT_OK(sst_file_writer.Finish());

ASSERT_OK(sst_file_writer.Open(external_file3));
ASSERT_OK(sst_file_writer.Put("5", "5"));
ASSERT_OK(sst_file_writer.Put("6", "6"));
ASSERT_OK(sst_file_writer.Finish());

ASSERT_OK(db_->IngestExternalFile(handle, {external_file1, external_file3},
IngestExternalFileOptions()));
ASSERT_EQ(NumTableFilesAtLevel(1, 0), 2);
std::vector<std::string> files;
GetSstFiles(env_, dbname_, &files);
ASSERT_EQ(files.size(), 2);

port::Thread user_thread1(
[&]() { db_->CompactFiles(CompactionOptions(), handle, files, 1); });

port::Thread user_thread2([&]() {
ASSERT_OK(db_->IngestExternalFile(handle, {external_file2},
IngestExternalFileOptions()));
TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile1");
});

user_thread1.join();
user_thread2.join();

rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
}

} // namespace rocksdb

int main(int argc, char** argv) {
Expand Down

0 comments on commit 0f88160

Please sign in to comment.