diff --git a/HISTORY.md b/HISTORY.md index cabf93b73e2..d13fa01f98e 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,7 @@ ### New Features * Measure estimated number of reads per file. The information can be accessed through DB::GetColumnFamilyMetaData or "rocksdb.sstables" DB property. * RateLimiter support for throttling background reads, or throttling the sum of background reads and writes. This can give more predictable I/O usage when compaction reads more data than it writes, e.g., due to lots of deletions. +* [Experimental] FIFO compaction with TTL support. It can be enabled by setting CompactionOptionsFIFO.ttl > 0. ## 5.6.0 (06/06/2017) ### Public API Change diff --git a/db/builder.cc b/db/builder.cc index 0c0bbb236b6..6d34c9efe96 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -47,9 +47,9 @@ TableBuilder* NewTableBuilder( int_tbl_prop_collector_factories, uint32_t column_family_id, const std::string& column_family_name, WritableFileWriter* file, const CompressionType compression_type, - const CompressionOptions& compression_opts, - int level, - const std::string* compression_dict, const bool skip_filters) { + const CompressionOptions& compression_opts, int level, + const std::string* compression_dict, const bool skip_filters, + const uint64_t creation_time) { assert((column_family_id == TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == column_family_name.empty()); @@ -57,7 +57,7 @@ TableBuilder* NewTableBuilder( TableBuilderOptions(ioptions, internal_comparator, int_tbl_prop_collector_factories, compression_type, compression_opts, compression_dict, skip_filters, - column_family_name, level), + column_family_name, level, creation_time), column_family_id, file); } @@ -76,7 +76,8 @@ Status BuildTable( const CompressionOptions& compression_opts, bool paranoid_file_checks, InternalStats* internal_stats, TableFileCreationReason reason, EventLogger* event_logger, int job_id, const Env::IOPriority io_priority, - TableProperties* table_properties, int level) { + TableProperties* table_properties, int level, + const uint64_t creation_time) { assert((column_family_id == TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == column_family_name.empty()); @@ -125,7 +126,8 @@ Status BuildTable( builder = NewTableBuilder( ioptions, internal_comparator, int_tbl_prop_collector_factories, column_family_id, column_family_name, file_writer.get(), compression, - compression_opts, level); + compression_opts, level, nullptr /* compression_dict */, + false /* skip_filters */, creation_time); } MergeHelper merge(env, internal_comparator.user_comparator(), diff --git a/db/builder.h b/db/builder.h index b438aad8f9e..1f8102df1d9 100644 --- a/db/builder.h +++ b/db/builder.h @@ -50,10 +50,9 @@ TableBuilder* NewTableBuilder( int_tbl_prop_collector_factories, uint32_t column_family_id, const std::string& column_family_name, WritableFileWriter* file, const CompressionType compression_type, - const CompressionOptions& compression_opts, - int level, + const CompressionOptions& compression_opts, int level, const std::string* compression_dict = nullptr, - const bool skip_filters = false); + const bool skip_filters = false, const uint64_t creation_time = 0); // Build a Table file from the contents of *iter. The generated file // will be named according to number specified in meta. On success, the rest of @@ -79,6 +78,7 @@ extern Status BuildTable( InternalStats* internal_stats, TableFileCreationReason reason, EventLogger* event_logger = nullptr, int job_id = 0, const Env::IOPriority io_priority = Env::IO_HIGH, - TableProperties* table_properties = nullptr, int level = -1); + TableProperties* table_properties = nullptr, int level = -1, + const uint64_t creation_time = 0); } // namespace rocksdb diff --git a/db/compaction.cc b/db/compaction.cc index 5c382d16324..bb2384a3598 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -461,4 +461,17 @@ bool Compaction::ShouldFormSubcompactions() const { } } +uint64_t Compaction::MaxInputFileCreationTime() const { + uint64_t max_creation_time = 0; + for (const auto& file : inputs_[0].files) { + if (file->fd.table_reader != nullptr && + file->fd.table_reader->GetTableProperties() != nullptr) { + uint64_t creation_time = + file->fd.table_reader->GetTableProperties()->creation_time; + max_creation_time = std::max(max_creation_time, creation_time); + } + } + return max_creation_time; +} + } // namespace rocksdb diff --git a/db/compaction.h b/db/compaction.h index 457c2cd075d..0167b16f4c4 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -243,6 +243,8 @@ class Compaction { uint64_t max_compaction_bytes() const { return max_compaction_bytes_; } + uint64_t MaxInputFileCreationTime() const; + private: // mark (or clear) all files that are being compacted void MarkFilesBeingCompacted(bool mark_as_compacted); diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 41765f19420..af83532a1e8 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -1025,7 +1025,6 @@ Status CompactionJob::FinishCompactionOutputFile( uint64_t output_number = sub_compact->current_output()->meta.fd.GetNumber(); assert(output_number != 0); - TableProperties table_properties; // Check for iterator errors Status s = input_status; auto meta = &sub_compact->current_output()->meta; @@ -1263,14 +1262,25 @@ Status CompactionJob::OpenCompactionOutputFile( // data is going to be found bool skip_filters = cfd->ioptions()->optimize_filters_for_hits && bottommost_level_; + + uint64_t output_file_creation_time = + sub_compact->compaction->MaxInputFileCreationTime(); + if (output_file_creation_time == 0) { + int64_t _current_time; + auto status = db_options_.env->GetCurrentTime(&_current_time); + if (!status.ok()) { + _current_time = 0; + } + output_file_creation_time = static_cast(_current_time); + } + sub_compact->builder.reset(NewTableBuilder( *cfd->ioptions(), cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(), sub_compact->compaction->output_compression(), cfd->ioptions()->compression_opts, - sub_compact->compaction->output_level(), - &sub_compact->compression_dict, - skip_filters)); + sub_compact->compaction->output_level(), &sub_compact->compression_dict, + skip_filters, output_file_creation_time)); LogFlush(db_options_.info_log); return s; } diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index c5d2d94c029..fc6a8a8da86 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -1405,17 +1405,88 @@ bool FIFOCompactionPicker::NeedsCompaction( return vstorage->CompactionScore(kLevel0) >= 1; } -Compaction* FIFOCompactionPicker::PickCompaction( +namespace { +uint64_t GetTotalFilesSize( + const std::vector& files) { + uint64_t total_size = 0; + for (const auto& f : files) { + total_size += f->fd.file_size; + } + return total_size; +} +} // anonymous namespace + +Compaction* FIFOCompactionPicker::PickTTLCompaction( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, LogBuffer* log_buffer) { - assert(vstorage->num_levels() == 1); + assert(ioptions_.compaction_options_fifo.ttl > 0); + const int kLevel0 = 0; const std::vector& level_files = vstorage->LevelFiles(kLevel0); - uint64_t total_size = 0; - for (const auto& file : level_files) { - total_size += file->fd.file_size; + uint64_t total_size = GetTotalFilesSize(level_files); + + int64_t _current_time; + auto status = ioptions_.env->GetCurrentTime(&_current_time); + if (!status.ok()) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] FIFO compaction: Couldn't get current time: %s. " + "Not doing compactions based on TTL. ", + cf_name.c_str(), status.ToString().c_str()); + return nullptr; + } + const uint64_t current_time = static_cast(_current_time); + + std::vector inputs; + inputs.emplace_back(); + inputs[0].level = 0; + + for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { + auto f = *ritr; + if (f->fd.table_reader != nullptr && + f->fd.table_reader->GetTableProperties() != nullptr) { + auto creation_time = + f->fd.table_reader->GetTableProperties()->creation_time; + if (creation_time == 0 || + creation_time >= + (current_time - ioptions_.compaction_options_fifo.ttl)) { + break; + } + total_size -= f->compensated_file_size; + inputs[0].files.push_back(f); + } + } + + // Return a nullptr and proceed to size-based FIFO compaction if: + // 1. there are no files older than ttl OR + // 2. there are a few files older than ttl, but deleting them will not bring + // the total size to be less than max_table_files_size threshold. + if (inputs[0].files.empty() || + total_size > ioptions_.compaction_options_fifo.max_table_files_size) { + return nullptr; } + for (const auto& f : inputs[0].files) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] FIFO compaction: picking file %" PRIu64 + " with creation time %" PRIu64 " for deletion", + cf_name.c_str(), f->fd.GetNumber(), + f->fd.table_reader->GetTableProperties()->creation_time); + } + + Compaction* c = new Compaction( + vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0, + kNoCompression, {}, /* is manual */ false, vstorage->CompactionScore(0), + /* is deletion compaction */ true, CompactionReason::kFIFOTtl); + return c; +} + +Compaction* FIFOCompactionPicker::PickSizeCompaction( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, LogBuffer* log_buffer) { + const int kLevel0 = 0; + const std::vector& level_files = vstorage->LevelFiles(kLevel0); + uint64_t total_size = GetTotalFilesSize(level_files); + if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size || level_files.size() == 0) { // total size not exceeded @@ -1435,7 +1506,6 @@ Compaction* FIFOCompactionPicker::PickCompaction( /* is manual */ false, vstorage->CompactionScore(0), /* is deletion compaction */ false, CompactionReason::kFIFOReduceNumFiles); - RegisterCompaction(c); return c; } } @@ -1460,24 +1530,41 @@ Compaction* FIFOCompactionPicker::PickCompaction( std::vector inputs; inputs.emplace_back(); inputs[0].level = 0; - // delete old files (FIFO) + for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { auto f = *ritr; total_size -= f->compensated_file_size; inputs[0].files.push_back(f); char tmp_fsize[16]; AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize)); - ROCKS_LOG_BUFFER(log_buffer, "[%s] FIFO compaction: picking file %" PRIu64 - " with size %s for deletion", + ROCKS_LOG_BUFFER(log_buffer, + "[%s] FIFO compaction: picking file %" PRIu64 + " with size %s for deletion", cf_name.c_str(), f->fd.GetNumber(), tmp_fsize); if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size) { break; } } + Compaction* c = new Compaction( vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0, kNoCompression, {}, /* is manual */ false, vstorage->CompactionScore(0), /* is deletion compaction */ true, CompactionReason::kFIFOMaxSize); + return c; +} + +Compaction* FIFOCompactionPicker::PickCompaction( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, LogBuffer* log_buffer) { + assert(vstorage->num_levels() == 1); + + Compaction* c = nullptr; + if (ioptions_.compaction_options_fifo.ttl > 0) { + c = PickTTLCompaction(cf_name, mutable_cf_options, vstorage, log_buffer); + } + if (c == nullptr) { + c = PickSizeCompaction(cf_name, mutable_cf_options, vstorage, log_buffer); + } RegisterCompaction(c); return c; } diff --git a/db/compaction_picker.h b/db/compaction_picker.h index ae829cb4961..eb5f06819b6 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -244,6 +244,17 @@ class FIFOCompactionPicker : public CompactionPicker { virtual bool NeedsCompaction( const VersionStorageInfo* vstorage) const override; + + private: + Compaction* PickTTLCompaction(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* version, + LogBuffer* log_buffer); + + Compaction* PickSizeCompaction(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* version, + LogBuffer* log_buffer); }; class NullCompactionPicker : public CompactionPicker { diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 995b329bfa6..9641d492dbc 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -18,6 +18,7 @@ #include "db/builder.h" #include "options/options_helper.h" #include "rocksdb/wal_filter.h" +#include "table/block_based_table_factory.h" #include "util/rate_limiter.h" #include "util/sst_file_manager_impl.h" #include "util/sync_point.h" @@ -164,6 +165,19 @@ static Status ValidateOptions( "universal and level compaction styles. "); } } + if (cfd.options.compaction_options_fifo.ttl > 0) { + if (db_options.max_open_files != -1) { + return Status::NotSupported( + "FIFO Compaction with TTL is only supported when files are always " + "kept open (set max_open_files = -1). "); + } + if (cfd.options.table_factory->Name() != + BlockBasedTableFactory().Name()) { + return Status::NotSupported( + "FIFO Compaction with TTL is only supported in " + "Block-Based Table format. "); + } + } } if (db_options.db_paths.size() > 4) { @@ -832,6 +846,14 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, *cfd->GetLatestMutableCFOptions(); bool paranoid_file_checks = cfd->GetLatestMutableCFOptions()->paranoid_file_checks; + + int64_t _current_time; + s = env_->GetCurrentTime(&_current_time); + if (!s.ok()) { + _current_time = 0; + } + const uint64_t current_time = static_cast(_current_time); + { mutex_.Unlock(); @@ -851,7 +873,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), cfd->ioptions()->compression_opts, paranoid_file_checks, cfd->internal_stats(), TableFileCreationReason::kRecovery, - &event_logger_, job_id); + &event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */, + -1 /* level */, current_time); LogFlush(immutable_db_options_.info_log); ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] [WriteLevel0TableForRecovery]" diff --git a/db/db_test.cc b/db/db_test.cc index 52ee7306842..2a3546defd0 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2792,7 +2792,7 @@ TEST_F(DBTest, FIFOCompactionTestWithCompaction) { ASSERT_EQ(NumTableFilesAtLevel(0), 10); for (int i = 0; i < 60; i++) { - // Generate and flush a file about 10KB. + // Generate and flush a file about 20KB. for (int j = 0; j < 20; j++) { ASSERT_OK(Put(ToString(i * 20 + j + 2000), RandomString(&rnd, 980))); } @@ -2807,6 +2807,245 @@ TEST_F(DBTest, FIFOCompactionTestWithCompaction) { ASSERT_LE(SizeAtLevel(0), options.compaction_options_fifo.max_table_files_size); } + +// Check that FIFO-with-TTL is not supported with max_open_files != -1. +TEST_F(DBTest, FIFOCompactionWithTTLAndMaxOpenFilesTest) { + Options options; + options.compaction_style = kCompactionStyleFIFO; + options.create_if_missing = true; + options.compaction_options_fifo.ttl = 600; // seconds + + // Check that it is not supported with max_open_files != -1. + options.max_open_files = 100; + options = CurrentOptions(options); + ASSERT_TRUE(TryReopen(options).IsNotSupported()); + + options.max_open_files = -1; + ASSERT_OK(TryReopen(options)); +} + +// Check that FIFO-with-TTL is supported only with BlockBasedTableFactory. +TEST_F(DBTest, FIFOCompactionWithTTLAndVariousTableFormatsTest) { + Options options; + options.compaction_style = kCompactionStyleFIFO; + options.create_if_missing = true; + options.compaction_options_fifo.ttl = 600; // seconds + + options = CurrentOptions(options); + options.table_factory.reset(NewBlockBasedTableFactory()); + ASSERT_OK(TryReopen(options)); + + Destroy(options); + options.table_factory.reset(NewPlainTableFactory()); + ASSERT_TRUE(TryReopen(options).IsNotSupported()); + + Destroy(options); + options.table_factory.reset(NewCuckooTableFactory()); + ASSERT_TRUE(TryReopen(options).IsNotSupported()); + + Destroy(options); + options.table_factory.reset(NewAdaptiveTableFactory()); + ASSERT_TRUE(TryReopen(options).IsNotSupported()); +} + +TEST_F(DBTest, FIFOCompactionWithTTLTest) { + Options options; + options.compaction_style = kCompactionStyleFIFO; + options.write_buffer_size = 10 << 10; // 10KB + options.arena_block_size = 4096; + options.compression = kNoCompression; + options.create_if_missing = true; + + // Test to make sure that all files with expired ttl are deleted on next + // manual compaction. + { + options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB + options.compaction_options_fifo.allow_compaction = false; + options.compaction_options_fifo.ttl = 600; // seconds + options = CurrentOptions(options); + DestroyAndReopen(options); + + Random rnd(301); + for (int i = 0; i < 10; i++) { + // Generate and flush a file about 10KB. + for (int j = 0; j < 10; j++) { + ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); + } + Flush(); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 10); + + // sleep for 5 seconds + env_->SleepForMicroseconds(5 * 1000 * 1000); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 10); + + // change ttl to 1 sec. So all files should be deleted on next compaction. + options.compaction_options_fifo.ttl = 1; + Reopen(options); + + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + } + + // Test to make sure that all files with expired ttl are deleted on next + // automatic compaction. + { + options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB + options.compaction_options_fifo.allow_compaction = false; + options.compaction_options_fifo.ttl = 5; // seconds + options = CurrentOptions(options); + DestroyAndReopen(options); + + Random rnd(301); + for (int i = 0; i < 10; i++) { + // Generate and flush a file about 10KB. + for (int j = 0; j < 10; j++) { + ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); + } + Flush(); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 10); + + env_->SleepForMicroseconds(6 * 1000 * 1000); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 10); + + // Create 1 more file to trigger TTL compaction. The old files are dropped. + for (int i = 0; i < 1; i++) { + for (int j = 0; j < 10; j++) { + ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); + } + Flush(); + } + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + // Only the new 10 files remain. + ASSERT_EQ(NumTableFilesAtLevel(0), 1); + ASSERT_LE(SizeAtLevel(0), + options.compaction_options_fifo.max_table_files_size); + } + + // Test that shows the fall back to size-based FIFO compaction if TTL-based + // deletion doesn't move the total size to be less than max_table_files_size. + { + options.write_buffer_size = 110 << 10; // 10KB + options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB + options.compaction_options_fifo.allow_compaction = false; + options.compaction_options_fifo.ttl = 5; // seconds + options = CurrentOptions(options); + DestroyAndReopen(options); + + Random rnd(301); + for (int i = 0; i < 3; i++) { + // Generate and flush a file about 10KB. + for (int j = 0; j < 10; j++) { + ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); + } + Flush(); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 3); + + env_->SleepForMicroseconds(6 * 1000 * 1000); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 3); + + for (int i = 0; i < 5; i++) { + for (int j = 0; j < 140; j++) { + ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); + } + Flush(); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + // Size limit is still guaranteed. + ASSERT_LE(SizeAtLevel(0), + options.compaction_options_fifo.max_table_files_size); + } + + // Test with TTL + Intra-L0 compactions. + { + options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB + options.compaction_options_fifo.allow_compaction = true; + options.compaction_options_fifo.ttl = 5; // seconds + options.level0_file_num_compaction_trigger = 6; + options = CurrentOptions(options); + DestroyAndReopen(options); + + Random rnd(301); + for (int i = 0; i < 10; i++) { + // Generate and flush a file about 10KB. + for (int j = 0; j < 10; j++) { + ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); + } + Flush(); + } + // With Intra-L0 compaction, out of 10 files, 6 files will be compacted to 1 + // (due to level0_file_num_compaction_trigger = 6). + // So total files = 1 + remaining 4 = 5. + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 5); + + // Sleep for a little over ttl time. + env_->SleepForMicroseconds(6 * 1000 * 1000); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 5); + + // Create 10 more files. The old 5 files are dropped as their ttl expired. + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); + } + Flush(); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 5); + ASSERT_LE(SizeAtLevel(0), + options.compaction_options_fifo.max_table_files_size); + } + + // Test with large TTL + Intra-L0 compactions. + // Files dropped based on size, as ttl doesn't kick in. + { + options.write_buffer_size = 20 << 10; // 20K + options.compaction_options_fifo.max_table_files_size = 1500 << 10; // 1.5MB + options.compaction_options_fifo.allow_compaction = true; + options.compaction_options_fifo.ttl = 60 * 60; // 1 hour + options.level0_file_num_compaction_trigger = 6; + options = CurrentOptions(options); + DestroyAndReopen(options); + + Random rnd(301); + for (int i = 0; i < 60; i++) { + // Generate and flush a file about 20KB. + for (int j = 0; j < 20; j++) { + ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); + } + Flush(); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + // It should be compacted to 10 files. + ASSERT_EQ(NumTableFilesAtLevel(0), 10); + + for (int i = 0; i < 60; i++) { + // Generate and flush a file about 20KB. + for (int j = 0; j < 20; j++) { + ASSERT_OK(Put(ToString(i * 20 + j + 2000), RandomString(&rnd, 980))); + } + Flush(); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + // It should be compacted to no more than 20 files. + ASSERT_GT(NumTableFilesAtLevel(0), 10); + ASSERT_LT(NumTableFilesAtLevel(0), 18); + // Size limit is still guaranteed. + ASSERT_LE(SizeAtLevel(0), + options.compaction_options_fifo.max_table_files_size); + } +} #endif // ROCKSDB_LITE #ifndef ROCKSDB_LITE diff --git a/db/flush_job.cc b/db/flush_job.cc index 5c4645eeb42..d93e2c64e81 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -298,6 +298,14 @@ Status FlushJob::WriteLevel0Table() { &output_compression_); EnvOptions optimized_env_options = db_options_.env->OptimizeForCompactionTableWrite(env_options_, db_options_); + + int64_t _current_time; + auto status = db_options_.env->GetCurrentTime(&_current_time); + if (!status.ok()) { + _current_time = 0; + } + const uint64_t current_time = static_cast(_current_time); + s = BuildTable( dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_, optimized_env_options, cfd_->table_cache(), iter.get(), @@ -308,7 +316,7 @@ Status FlushJob::WriteLevel0Table() { cfd_->ioptions()->compression_opts, mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), TableFileCreationReason::kFlush, event_logger_, job_context_->job_id, - Env::IO_HIGH, &table_properties_, 0 /* level */); + Env::IO_HIGH, &table_properties_, 0 /* level */, current_time); LogFlush(db_options_.info_log); } ROCKS_LOG_INFO(db_options_.info_log, diff --git a/db/flush_job.h b/db/flush_job.h index b06654d7731..6a685c09f89 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -70,7 +70,7 @@ class FlushJob { ~FlushJob(); // Require db_mutex held. - // Once PickMemTable() is called, either Run() or Cancel() has to be call. + // Once PickMemTable() is called, either Run() or Cancel() has to be called. void PickMemTable(); Status Run(FileMetaData* file_meta = nullptr); void Cancel(); diff --git a/db/repair.cc b/db/repair.cc index 1f9e344e130..da6e0f958d0 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -382,6 +382,14 @@ class Repairer { ScopedArenaIterator iter(mem->NewIterator(ro, &arena)); EnvOptions optimized_env_options = env_->OptimizeForCompactionTableWrite(env_options_, immutable_db_options_); + + int64_t _current_time; + status = env_->GetCurrentTime(&_current_time); + if (!status.ok()) { + _current_time = 0; + } + const uint64_t current_time = static_cast(_current_time); + status = BuildTable( dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(), optimized_env_options, table_cache_, iter.get(), @@ -389,7 +397,9 @@ class Repairer { &meta, cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), {}, kMaxSequenceNumber, kNoCompression, CompressionOptions(), false, - nullptr /* internal_stats */, TableFileCreationReason::kRecovery); + nullptr /* internal_stats */, TableFileCreationReason::kRecovery, + nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH, + nullptr /* table_properties */, -1 /* level */, current_time); ROCKS_LOG_INFO(db_options_.info_log, "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s", log, counter, meta.fd.GetNumber(), diff --git a/db/version_set.cc b/db/version_set.cc index 6c220b5ef82..9251cbd6d14 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1318,6 +1318,32 @@ void VersionStorageInfo::EstimateCompactionBytesNeeded( } } +namespace { +uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions, + const std::vector& files) { + uint32_t ttl_expired_files_count = 0; + + int64_t _current_time; + auto status = ioptions.env->GetCurrentTime(&_current_time); + if (status.ok()) { + const uint64_t current_time = static_cast(_current_time); + for (auto f : files) { + if (!f->being_compacted && f->fd.table_reader != nullptr && + f->fd.table_reader->GetTableProperties() != nullptr) { + auto creation_time = + f->fd.table_reader->GetTableProperties()->creation_time; + if (creation_time > 0 && + creation_time < + (current_time - ioptions.compaction_options_fifo.ttl)) { + ttl_expired_files_count++; + } + } + } + } + return ttl_expired_files_count; +} +} // anonymous namespace + void VersionStorageInfo::ComputeCompactionScore( const ImmutableCFOptions& immutable_cf_options, const MutableCFOptions& mutable_cf_options) { @@ -1364,6 +1390,11 @@ void VersionStorageInfo::ComputeCompactionScore( mutable_cf_options.level0_file_num_compaction_trigger, score); } + if (immutable_cf_options.compaction_options_fifo.ttl > 0) { + score = std::max(static_cast(GetExpiredTtlFilesCount( + immutable_cf_options, files_[level])), + score); + } } else { score = static_cast(num_sorted_runs) / diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index 1e271483628..701bcb320a9 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -62,6 +62,13 @@ struct CompactionOptionsFIFO { // Default: 1GB uint64_t max_table_files_size; + // Drop files older than TTL. TTL based deletion will take precedence over + // size based deletion if ttl > 0. + // delete if sst_file_creation_time < (current_time - ttl) + // unit: seconds. Ex: 1 day = 1 * 24 * 60 * 60 + // Default: 0 (disabled) + uint64_t ttl = 0; + // If true, try to do compaction to compact smaller files into larger ones. // Minimum files to compact follows options.level0_file_num_compaction_trigger // and compaction won't trigger if average compact bytes per del file is @@ -71,9 +78,10 @@ struct CompactionOptionsFIFO { bool allow_compaction = false; CompactionOptionsFIFO() : max_table_files_size(1 * 1024 * 1024 * 1024) {} - CompactionOptionsFIFO(uint64_t _max_table_files_size, - bool _allow_compaction) + CompactionOptionsFIFO(uint64_t _max_table_files_size, bool _allow_compaction, + uint64_t _ttl = 0) : max_table_files_size(_max_table_files_size), + ttl(_ttl), allow_compaction(_allow_compaction) {} }; diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 17fee59844d..40d318e0941 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -71,6 +71,8 @@ enum class CompactionReason { kFIFOMaxSize, // [FIFO] reduce number of files. kFIFOReduceNumFiles, + // [FIFO] files with creation time < (current_time - interval) + kFIFOTtl, // Manual compaction kManualCompaction, // DB::SuggestCompactRange() marked files for compaction diff --git a/include/rocksdb/table_properties.h b/include/rocksdb/table_properties.h index 6559b1f3add..08360d1794a 100644 --- a/include/rocksdb/table_properties.h +++ b/include/rocksdb/table_properties.h @@ -48,6 +48,7 @@ struct TablePropertiesNames { static const std::string kPrefixExtractorName; static const std::string kPropertyCollectors; static const std::string kCompression; + static const std::string kCreationTime; }; extern const std::string kPropertiesBlock; @@ -158,6 +159,9 @@ struct TableProperties { // by column_family_name. uint64_t column_family_id = rocksdb::TablePropertiesCollectorFactory::Context::kUnknownColumnFamily; + // The time when the SST file was created. + // Since SST files are immutable, this is equivalent to last modified time. + uint64_t creation_time = 0; // Name of the column family with which this SST file is associated. // If column family is unknown, `column_family_name` will be an empty string. diff --git a/options/options.cc b/options/options.cc index 4aaedefda73..3f9fb3027e0 100644 --- a/options/options.cc +++ b/options/options.cc @@ -367,6 +367,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const { ROCKS_LOG_HEADER(log, "Options.compaction_options_fifo.allow_compaction: %d", compaction_options_fifo.allow_compaction); + ROCKS_LOG_HEADER(log, "Options.compaction_options_fifo.ttl: %" PRIu64, + compaction_options_fifo.ttl); std::string collector_names; for (const auto& collector_factory : table_properties_collector_factories) { collector_names.append(collector_factory->Name()); diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 910a70fb251..88258994c33 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -269,6 +269,7 @@ struct BlockBasedTableBuilder::Rep { std::unique_ptr flush_block_policy; uint32_t column_family_id; const std::string& column_family_name; + uint64_t creation_time = 0; std::vector> table_properties_collectors; @@ -281,7 +282,7 @@ struct BlockBasedTableBuilder::Rep { const CompressionType _compression_type, const CompressionOptions& _compression_opts, const std::string* _compression_dict, const bool skip_filters, - const std::string& _column_family_name) + const std::string& _column_family_name, const uint64_t _creation_time) : ioptions(_ioptions), table_options(table_opt), internal_comparator(icomparator), @@ -297,7 +298,8 @@ struct BlockBasedTableBuilder::Rep { table_options.flush_block_policy_factory->NewFlushBlockPolicy( table_options, data_block)), column_family_id(_column_family_id), - column_family_name(_column_family_name) { + column_family_name(_column_family_name), + creation_time(_creation_time) { if (table_options.index_type == BlockBasedTableOptions::kTwoLevelIndexSearch) { p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder( @@ -336,7 +338,7 @@ BlockBasedTableBuilder::BlockBasedTableBuilder( const CompressionType compression_type, const CompressionOptions& compression_opts, const std::string* compression_dict, const bool skip_filters, - const std::string& column_family_name) { + const std::string& column_family_name, const uint64_t creation_time) { BlockBasedTableOptions sanitized_table_options(table_options); if (sanitized_table_options.format_version == 0 && sanitized_table_options.checksum != kCRC32c) { @@ -352,7 +354,7 @@ BlockBasedTableBuilder::BlockBasedTableBuilder( rep_ = new Rep(ioptions, sanitized_table_options, internal_comparator, int_tbl_prop_collector_factories, column_family_id, file, compression_type, compression_opts, compression_dict, - skip_filters, column_family_name); + skip_filters, column_family_name, creation_time); if (rep_->filter_builder != nullptr) { rep_->filter_builder->StartBlock(0); @@ -728,6 +730,7 @@ Status BlockBasedTableBuilder::Finish() { r->props.top_level_index_size = r->p_index_builder_->EstimateTopLevelIndexSize(r->offset); } + r->props.creation_time = r->creation_time; // Add basic properties property_block_builder.AddTableProperty(r->props); diff --git a/table/block_based_table_builder.h b/table/block_based_table_builder.h index 6f7f494c62d..3b351443acd 100644 --- a/table/block_based_table_builder.h +++ b/table/block_based_table_builder.h @@ -17,6 +17,7 @@ #include #include "rocksdb/flush_block_policy.h" +#include "rocksdb/listener.h" #include "rocksdb/options.h" #include "rocksdb/status.h" #include "table/table_builder.h" @@ -48,7 +49,7 @@ class BlockBasedTableBuilder : public TableBuilder { const CompressionType compression_type, const CompressionOptions& compression_opts, const std::string* compression_dict, const bool skip_filters, - const std::string& column_family_name); + const std::string& column_family_name, const uint64_t creation_time = 0); // REQUIRES: Either Finish() or Abandon() has been called. ~BlockBasedTableBuilder(); diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc index 8db76ea38ed..8745027a6d1 100644 --- a/table/block_based_table_factory.cc +++ b/table/block_based_table_factory.cc @@ -72,7 +72,8 @@ TableBuilder* BlockBasedTableFactory::NewTableBuilder( table_builder_options.compression_opts, table_builder_options.compression_dict, table_builder_options.skip_filters, - table_builder_options.column_family_name); + table_builder_options.column_family_name, + table_builder_options.creation_time); return table_builder; } diff --git a/table/meta_blocks.cc b/table/meta_blocks.cc index 6af536fbcd4..229b7a7cfaa 100644 --- a/table/meta_blocks.cc +++ b/table/meta_blocks.cc @@ -77,6 +77,7 @@ void PropertyBlockBuilder::AddTableProperty(const TableProperties& props) { Add(TablePropertiesNames::kFormatVersion, props.format_version); Add(TablePropertiesNames::kFixedKeyLen, props.fixed_key_len); Add(TablePropertiesNames::kColumnFamilyId, props.column_family_id); + Add(TablePropertiesNames::kCreationTime, props.creation_time); if (!props.filter_policy_name.empty()) { Add(TablePropertiesNames::kFilterPolicy, props.filter_policy_name); @@ -208,6 +209,8 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, &new_table_properties->fixed_key_len}, {TablePropertiesNames::kColumnFamilyId, &new_table_properties->column_family_id}, + {TablePropertiesNames::kCreationTime, + &new_table_properties->creation_time}, }; std::string last_key; diff --git a/table/table_builder.h b/table/table_builder.h index cacb2a65dd8..4e413b41110 100644 --- a/table/table_builder.h +++ b/table/table_builder.h @@ -56,7 +56,8 @@ struct TableBuilderOptions { CompressionType _compression_type, const CompressionOptions& _compression_opts, const std::string* _compression_dict, bool _skip_filters, - const std::string& _column_family_name, int _level) + const std::string& _column_family_name, int _level, + const uint64_t _creation_time = 0) : ioptions(_ioptions), internal_comparator(_internal_comparator), int_tbl_prop_collector_factories(_int_tbl_prop_collector_factories), @@ -65,7 +66,8 @@ struct TableBuilderOptions { compression_dict(_compression_dict), skip_filters(_skip_filters), column_family_name(_column_family_name), - level(_level) {} + level(_level), + creation_time(_creation_time) {} const ImmutableCFOptions& ioptions; const InternalKeyComparator& internal_comparator; const std::vector>* @@ -77,6 +79,7 @@ struct TableBuilderOptions { bool skip_filters; // only used by BlockBasedTableBuilder const std::string& column_family_name; int level; // what level this table/file is on, -1 for "not set, don't know" + const uint64_t creation_time; }; // TableBuilder provides the interface used to build a Table diff --git a/table/table_properties.cc b/table/table_properties.cc index b03928e8868..f3373ba539d 100644 --- a/table/table_properties.cc +++ b/table/table_properties.cc @@ -139,6 +139,8 @@ std::string TableProperties::ToString( compression_name.empty() ? std::string("N/A") : compression_name, prop_delim, kv_delim); + AppendProperty(result, "creation time", creation_time, prop_delim, kv_delim); + return result; } @@ -190,6 +192,7 @@ const std::string TablePropertiesNames::kPrefixExtractorName = const std::string TablePropertiesNames::kPropertyCollectors = "rocksdb.property.collectors"; const std::string TablePropertiesNames::kCompression = "rocksdb.compression"; +const std::string TablePropertiesNames::kCreationTime = "rocksdb.creation.time"; extern const std::string kPropertiesBlock = "rocksdb.properties"; // Old property block name for backward compatibility diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 1ecae9a49d6..6193e603fd4 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -638,6 +638,7 @@ DEFINE_uint64(fifo_compaction_max_table_files_size_mb, 0, "The limit of total table file sizes to trigger FIFO compaction"); DEFINE_bool(fifo_compaction_allow_compaction, true, "Allow compaction in FIFO compaction."); +DEFINE_uint64(fifo_compaction_ttl, 0, "TTL for the SST Files in seconds."); #endif // ROCKSDB_LITE DEFINE_bool(report_bg_io_stats, false, @@ -2864,7 +2865,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { #ifndef ROCKSDB_LITE options.compaction_options_fifo = CompactionOptionsFIFO( FLAGS_fifo_compaction_max_table_files_size_mb * 1024 * 1024, - FLAGS_fifo_compaction_allow_compaction); + FLAGS_fifo_compaction_allow_compaction, FLAGS_fifo_compaction_ttl); #endif // ROCKSDB_LITE if (FLAGS_prefix_size != 0) { options.prefix_extractor.reset(