Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export Import sst files #3822

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ set(SOURCES
db/event_helpers.cc
db/experimental.cc
db/external_sst_file_ingestion_job.cc
db/external_sst_file_import_job.cc
db/file_indexer.cc
db/flush_job.cc
db/flush_scheduler.cc
Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ TESTS = \
plain_table_db_test \
comparator_db_test \
external_sst_file_test \
external_sst_file_import_test \
prefix_test \
skiplist_test \
write_buffer_manager_test \
Expand Down Expand Up @@ -529,6 +530,7 @@ PARALLEL_TEST = \
db_universal_compaction_test \
db_wal_test \
external_sst_file_test \
external_sst_file_import_test \
fault_injection_test \
inlineskiplist_test \
manual_compaction_test \
Expand Down Expand Up @@ -1189,6 +1191,9 @@ external_sst_file_basic_test: db/external_sst_file_basic_test.o db/db_test_util.
external_sst_file_test: db/external_sst_file_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

external_sst_file_import_test: db/external_sst_file_import_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

db_tailing_iter_test: db/db_tailing_iter_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

Expand Down
1 change: 1 addition & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ cpp_library(
"db/event_helpers.cc",
"db/experimental.cc",
"db/external_sst_file_ingestion_job.cc",
"db/external_sst_file_import_job.cc",
"db/file_indexer.cc",
"db/flush_job.cc",
"db/flush_scheduler.cc",
Expand Down
7 changes: 7 additions & 0 deletions db/compacted_db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ class CompactedDBImpl : public DBImpl {
const IngestExternalFileOptions& /*ingestion_options*/) override {
return Status::NotSupported("Not supported in compacted db mode.");
}
using DB::ImportExternalFile;
virtual Status ImportExternalFile(
ColumnFamilyHandle* /*column_family*/,
const std::vector<LiveFileMetaData>& /*external_file_metadata*/,
const ImportExternalFileOptions& /*import_options*/) override {
return Status::NotSupported("Not supported in compacted db mode.");
}

private:
friend class DB;
Expand Down
146 changes: 146 additions & 0 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "db/dbformat.h"
#include "db/event_helpers.h"
#include "db/external_sst_file_ingestion_job.h"
#include "db/external_sst_file_import_job.h"
#include "db/flush_job.h"
#include "db/forward_iterator.h"
#include "db/job_context.h"
Expand Down Expand Up @@ -2949,6 +2950,125 @@ Status DBImpl::IngestExternalFile(
return status;
}

// TODO: This is similar to ingest in certain aspects and can share the code
// with some added abstraction. Keeping it a simple copy for the first version.
Status DBImpl::ImportExternalFile(
ColumnFamilyHandle* column_family,
const std::vector<LiveFileMetaData>& external_file_metadata,
const ImportExternalFileOptions& import_options) {
Status status;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();

ExternalSstFileImportJob import_job(
env_, versions_.get(), cfd, immutable_db_options_, env_options_,
&snapshots_, external_file_metadata, import_options);

std::list<uint64_t>::iterator pending_output_elem;
{
InstrumentedMutexLock l(&mutex_);
if (!bg_error_.ok()) {
// Don't import files when there is a bg_error
return bg_error_;
}

// Make sure that bg cleanup wont delete the files that we are importing
pending_output_elem = CaptureCurrentFileNumberInPendingOutputs();
}

status = import_job.Prepare();
if (!status.ok()) {
return status;
}

SuperVersionContext sv_context(/* create_superversion */ true);
TEST_SYNC_POINT("DBImpl::ImportFile:Start");
{
// Lock db mutex
InstrumentedMutexLock l(&mutex_);
TEST_SYNC_POINT("DBImpl::ImporFile:MutexLock");

// Stop writes to the DB by entering both write threads
WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_);
WriteThread::Writer nonmem_w;
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}

num_running_ingest_file_++;

// We cannot import a file into a dropped CF
if (cfd->IsDropped()) {
status = Status::InvalidArgument(
"Cannot import an external file into a dropped CF");
}

// Figure out if we need to flush the memtable first
if (status.ok()) {
bool need_flush = false;
status = import_job.NeedsFlush(&need_flush, cfd->GetSuperVersion());
TEST_SYNC_POINT_CALLBACK("DBImpl::ImportExternalFile:NeedFlush",
&need_flush);
if (status.ok() && need_flush) {
mutex_.Unlock();
status = FlushMemTable(cfd, FlushOptions(),
FlushReason::kExternalFileIngestion,
true /* writes_stopped */);
mutex_.Lock();
}
}

// Run the import job
if (status.ok()) {
status = import_job.Run();
}

// Install job edit [Mutex will be unlocked here]
auto mutable_cf_options = cfd->GetLatestMutableCFOptions();
if (status.ok()) {
status =
versions_->LogAndApply(cfd, *mutable_cf_options, import_job.edit(),
&mutex_, directories_.GetDbDir());
}
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &sv_context, *mutable_cf_options,
FlushReason::kExternalFileIngestion);
}

// Resume writes to the DB
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
write_thread_.ExitUnbatched(&w);

// Update stats
if (status.ok()) {
import_job.UpdateStats();
}

ReleaseFileNumberFromPendingOutputs(pending_output_elem);

num_running_ingest_file_--;
if (num_running_ingest_file_ == 0) {
bg_cv_.SignalAll();
}

TEST_SYNC_POINT("DBImpl::ImportFile:MutexUnlock");
}
// mutex_ is unlocked here

// Cleanup
sv_context.Clean();
import_job.Cleanup(status);

if (status.ok()) {
NotifyOnExternalFileImported(cfd, import_job);
}

return status;
}

Status DBImpl::VerifyChecksum() {
Status s;
Options options;
Expand Down Expand Up @@ -3020,6 +3140,32 @@ void DBImpl::NotifyOnExternalFileIngested(
#endif
}

void DBImpl::NotifyOnExternalFileImported(
ColumnFamilyData* cfd, const ExternalSstFileImportJob& import_job) {
#ifndef ROCKSDB_LITE
if (immutable_db_options_.listeners.empty()) {
return;
}

for (unsigned int i = 0; i < import_job.files_to_import().size(); ++i) {
const auto& f = import_job.files_to_import()[i];
const auto& import_metadata = import_job.external_file_metadata()[i];
ExternalFileImportedInfo info;
info.cf_name = cfd->GetName();
info.external_file_path = f.external_file_path;
info.internal_file_path = f.internal_file_path;
info.smallest_seqnum = import_metadata.smallest_seqno;
info.largest_seqnum = import_metadata.largest_seqno;
info.level = import_metadata.level;
info.table_properties = f.table_properties;
for (auto listener : immutable_db_options_.listeners) {
listener->OnExternalFileImported(this, info);
}
}

#endif
}

void DBImpl::WaitForIngestFile() {
mutex_.AssertHeld();
while (num_running_ingest_file_ > 0) {
Expand Down
11 changes: 10 additions & 1 deletion db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "db/compaction_job.h"
#include "db/dbformat.h"
#include "db/external_sst_file_ingestion_job.h"
#include "db/external_sst_file_import_job.h"
#include "db/flush_job.h"
#include "db/flush_scheduler.h"
#include "db/internal_stats.h"
Expand Down Expand Up @@ -325,6 +326,12 @@ class DBImpl : public DB {
const std::vector<std::string>& external_files,
const IngestExternalFileOptions& ingestion_options) override;

using DB::ImportExternalFile;
virtual Status ImportExternalFile(
ColumnFamilyHandle* column_family,
const std::vector<LiveFileMetaData>& external_file_metadata,
const ImportExternalFileOptions& import_options) override;

virtual Status VerifyChecksum() override;

#endif // ROCKSDB_LITE
Expand Down Expand Up @@ -684,6 +691,8 @@ class DBImpl : public DB {
#ifndef ROCKSDB_LITE
void NotifyOnExternalFileIngested(
ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job);
void NotifyOnExternalFileImported(
ColumnFamilyData* cfd, const ExternalSstFileImportJob& import_job);
#endif // !ROCKSDB_LITE

void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const;
Expand Down Expand Up @@ -1315,7 +1324,7 @@ class DBImpl : public DB {
// Additonal options for compaction and flush
EnvOptions env_options_for_compaction_;

// Number of running IngestExternalFile() calls.
// Number of running IngestExternalFile() or ImportExternalFile() calls.
// REQUIRES: mutex held
int num_running_ingest_file_;

Expand Down
8 changes: 8 additions & 0 deletions db/db_impl_readonly.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ class DBImplReadOnly : public DBImpl {
return Status::NotSupported("Not supported operation in read only mode.");
}

using DB::ImportExternalFile;
virtual Status ImportExternalFile(
ColumnFamilyHandle* /*column_family*/,
const std::vector<LiveFileMetaData>& /*external_file_metadata*/,
const ImportExternalFileOptions& /*import_options*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
}

private:
friend class DB;

Expand Down
8 changes: 8 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2272,6 +2272,14 @@ class ModelDB : public DB {
return Status::NotSupported("Not implemented.");
}

using DB::ImportExternalFile;
virtual Status ImportExternalFile(
ColumnFamilyHandle* /*column_family*/,
const std::vector<LiveFileMetaData>& /*external_file_metadata*/,
const ImportExternalFileOptions& /*import_options*/) override {
return Status::NotSupported("Not implemented.");
}

virtual Status VerifyChecksum() override {
return Status::NotSupported("Not implemented.");
}
Expand Down
Loading