Skip to content

Commit

Permalink
Release 5.15 cherrypick (#60)
Browse files Browse the repository at this point in the history
* [cherry-pick] Fix DBImpl::GetColumnFamilyHandleUnlocked race condition (facebook#4391)

Summary:
- Fix DBImpl API race condition

The timeline of execution flow is as follow:
```
timeline              user_thread1                      user_thread2
t1   |     cfh = GetColumnFamilyHandleUnlocked(0)
t2   |     id1 = cfh->GetID()
t3   |                                                GetColumnFamilyHandleUnlocked(1)
t4   |     id2 = cfh->GetID()
     V
```
The original implementation return a pointer to a stateful variable, so that the return `ColumnFamilyHandle` will be changed when another thread calls `GetColumnFamilyHandleUnlocked` with different `column family id`

- Expose ColumnFamily ID to compaction event listener

- Fix the return status of `DBImpl::GetLatestSequenceForKey`
Pull Request resolved: facebook#4391

Differential Revision: D10221243

Pulled By: yiwu-arbug

fbshipit-source-id: dec60ee9ff0c8261a2f2413a8506ec1063991993

# Conflicts:
#	db/db_test2.cc

* Fix `DBImpl::GetColumnFamilyHandleUnlocked` data race (facebook#4666)

Summary:
Hi, yiwu-arbug, I found that `DBImpl::GetColumnFamilyHandleUnlocked` still have data race condition, because `column_family_memtables_` has a stateful cache `current_` and `column_family_memtables_::Seek` maybe call without the protection of `mutex_` by a write thread

check https://github.com/facebook/rocksdb/blob/859dbda6e3cac17416aff48f1760d01707867351/db/write_batch.cc#L1188  and   https://github.com/facebook/rocksdb/blob/859dbda6e3cac17416aff48f1760d01707867351/db/write_batch.cc#L1756  and  https://github.com/facebook/rocksdb/blob/859dbda6e3cac17416aff48f1760d01707867351/db/db_impl_write.cc#L318

So it's better to use `versions_->GetColumnFamilySet()->GetColumnFamily` instead.
Pull Request resolved: facebook#4666

Differential Revision: D13027117

Pulled By: yiwu-arbug

fbshipit-source-id: 4e3778eaf8e7f7c8577bbd78129b6a5fd7ce79fb
(cherry picked from commit 09426ae)

* Fix `CompactFiles` bug (facebook#4665)

Summary:
`CompactFiles` gets `SuperVersion` before `WaitForIngestFile`, while `IngestExternalFile` may add files that overlap with `input_file_names`

The timeline of execution flow is as follow:

Let's say that level N has two file [1,2] and [5,6]
```
timeline              user_thread1                             user_thread2
t0   |      CompactFiles([1, 2], [5, 6]) begin
t1   |         GetReferencedSuperVersion()
t2   |                                              IngestExternalFile([3,4]) to level N begin
t3   |             CompactFiles resume
     V
```
Pull Request resolved: facebook#4665

Differential Revision: D13030674

Pulled By: ajkr

fbshipit-source-id: 8be19477fd6e505032267a979d32f3097cc3be51
(cherry picked from commit 0f88160)

* Fix RocksDB Lite build (facebook#4675)

Summary:
Our internal CI test caught RocksDB Lite build failures. The failures are due to a new test introduced in facebook#4665 using `SSTFileWriter` and `IngestExternalFile`, but these is not exposed under lite mode. Fixed by #ifdef'ing out the test.

```
db/db_test2.cc: In member function ‘virtual void rocksdb::DBTest2_TestCompactFiles_Test::TestBody()’:
db/db_test2.cc:2907:3: error: ‘SstFileWriter’ is not a member of ‘rocksdb’
   rocksdb::SstFileWriter sst_file_writer{rocksdb::EnvOptions(), options};
   ^
In file included from ./util/testharness.h:15:0,
                 from ./table/mock_table.h:23,
                 from ./db/db_test_util.h:44,
                 from db/db_test2.cc:13:
db/db_test2.cc:2912:13: error: ‘sst_file_writer’ was not declared in this scope
   ASSERT_OK(sst_file_writer.Open(external_file1));
```
Pull Request resolved: facebook#4675

Differential Revision: D13035984

Pulled By: sagar0

fbshipit-source-id: c1ceac550dfac1a85eeea436693dc7dd467519a6
(cherry picked from commit 2993cd2)
  • Loading branch information
DorianZheng authored and huachaohuang committed Nov 13, 2018
1 parent d7bab10 commit adf83d4
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 12 deletions.
8 changes: 4 additions & 4 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2056,16 +2056,16 @@ ColumnFamilyHandle* DBImpl::GetColumnFamilyHandle(uint32_t column_family_id) {
// REQUIRED: mutex is NOT held.
std::unique_ptr<ColumnFamilyHandle> DBImpl::GetColumnFamilyHandleUnlocked(
uint32_t column_family_id) {
ColumnFamilyMemTables* cf_memtables = column_family_memtables_.get();

InstrumentedMutexLock l(&mutex_);

if (!cf_memtables->Seek(column_family_id)) {
auto* cfd =
versions_->GetColumnFamilySet()->GetColumnFamily(column_family_id);
if (cfd == nullptr) {
return nullptr;
}

return std::unique_ptr<ColumnFamilyHandleImpl>(
new ColumnFamilyHandleImpl(cf_memtables->current(), this, &mutex_));
new ColumnFamilyHandleImpl(cfd, this, &mutex_));
}

void DBImpl::GetApproximateMemTableStats(ColumnFamilyHandle* column_family,
Expand Down
17 changes: 9 additions & 8 deletions db/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -460,23 +460,24 @@ Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
immutable_db_options_.info_log.get());

// Perform CompactFiles
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile2");
{
InstrumentedMutexLock l(&mutex_);

// This call will unlock/lock the mutex to wait for current running
// IngestExternalFile() calls to finish.
WaitForIngestFile();

s = CompactFilesImpl(compact_options, cfd, sv->current, input_file_names,
// We need to get current after `WaitForIngestFile`, because
// `IngestExternalFile` may add files that overlap with `input_file_names`
auto* current = cfd->current();
current->Ref();

s = CompactFilesImpl(compact_options, cfd, current, input_file_names,
output_file_names, output_level, output_path_id,
&job_context, &log_buffer);
}
if (sv->Unref()) {
mutex_.Lock();
sv->Cleanup();
mutex_.Unlock();
delete sv;

current->Unref();
}

// Find and delete obsolete files
Expand Down
61 changes: 61 additions & 0 deletions db/db_test2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2771,6 +2771,67 @@ TEST_F(DBTest2, TestGetColumnFamilyHandleUnlocked) {
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
}

#ifndef ROCKSDB_LITE
TEST_F(DBTest2, TestCompactFiles) {
// Setup sync point dependency to reproduce the race condition of
// DBImpl::GetColumnFamilyHandleUnlocked
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"TestCompactFiles::IngestExternalFile1",
"TestCompactFiles::IngestExternalFile2"},
});
SyncPoint::GetInstance()->EnableProcessing();

Options options;
options.num_levels = 2;
options.disable_auto_compactions = true;
Reopen(options);
auto* handle = db_->DefaultColumnFamily();
ASSERT_EQ(db_->NumberLevels(handle), 2);

rocksdb::SstFileWriter sst_file_writer{rocksdb::EnvOptions(), options};
std::string external_file1 = dbname_ + "/test_compact_files1.sst_t";
std::string external_file2 = dbname_ + "/test_compact_files2.sst_t";
std::string external_file3 = dbname_ + "/test_compact_files3.sst_t";

ASSERT_OK(sst_file_writer.Open(external_file1));
ASSERT_OK(sst_file_writer.Put("1", "1"));
ASSERT_OK(sst_file_writer.Put("2", "2"));
ASSERT_OK(sst_file_writer.Finish());

ASSERT_OK(sst_file_writer.Open(external_file2));
ASSERT_OK(sst_file_writer.Put("3", "3"));
ASSERT_OK(sst_file_writer.Put("4", "4"));
ASSERT_OK(sst_file_writer.Finish());

ASSERT_OK(sst_file_writer.Open(external_file3));
ASSERT_OK(sst_file_writer.Put("5", "5"));
ASSERT_OK(sst_file_writer.Put("6", "6"));
ASSERT_OK(sst_file_writer.Finish());

ASSERT_OK(db_->IngestExternalFile(handle, {external_file1, external_file3},
IngestExternalFileOptions()));
ASSERT_EQ(NumTableFilesAtLevel(1, 0), 2);
std::vector<std::string> files;
GetSstFiles(env_, dbname_, &files);
ASSERT_EQ(files.size(), 2);

port::Thread user_thread1(
[&]() { db_->CompactFiles(CompactionOptions(), handle, files, 1); });

port::Thread user_thread2([&]() {
ASSERT_OK(db_->IngestExternalFile(handle, {external_file2},
IngestExternalFileOptions()));
TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile1");
});

user_thread1.join();
user_thread2.join();

rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
}
#endif // ROCKSDB_LITE

} // namespace rocksdb

int main(int argc, char** argv) {
Expand Down

0 comments on commit adf83d4

Please sign in to comment.