forked from facebook/rocksdb
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
b2c8ae6
commit 21540f0
Showing
55 changed files
with
5,848 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
#include "utilities/titandb/blob_file_builder.h" | ||
|
||
#include "util/crc32c.h" | ||
#include "utilities/titandb/util.h" | ||
|
||
namespace rocksdb { | ||
namespace titandb { | ||
|
||
// HEADER: 8 bytes length | ||
// BODY: variable length | ||
// TAIL: 5 bytes length | ||
void BlobFileBuilder::Add(const BlobRecord& record, BlobHandle* handle) { | ||
if (!ok()) return; | ||
|
||
buffer_.clear(); | ||
assert(!record.key.empty()); | ||
assert(!record.value.empty()); | ||
record.EncodeTo(&buffer_); | ||
|
||
CompressionType compression = options_.blob_file_compression; | ||
auto output = Compress(&compression, buffer_, &compressed_buffer_); | ||
|
||
uint64_t body_length = output.size(); | ||
status_ = file_->Append( | ||
Slice{reinterpret_cast<const char*>(&body_length), kBlobHeaderSize}); | ||
if (!ok()) return; | ||
|
||
handle->offset = file_->GetFileSize(); | ||
handle->size = output.size(); | ||
|
||
status_ = file_->Append(output); | ||
if (ok()) { | ||
char tailer[kBlobTailerSize]; | ||
tailer[0] = compression; | ||
EncodeFixed32(tailer+1, crc32c::Value(output.data(), output.size())); | ||
status_ = file_->Append(Slice(tailer, sizeof(tailer))); | ||
} | ||
} | ||
|
||
Status BlobFileBuilder::Finish() { | ||
if (!ok()) return status(); | ||
|
||
BlobFileFooter footer; | ||
buffer_.clear(); | ||
footer.EncodeTo(&buffer_); | ||
|
||
status_ = file_->Append(buffer_); | ||
if (ok()) { | ||
status_ = file_->Flush(); | ||
} | ||
return status(); | ||
} | ||
|
||
void BlobFileBuilder::Abandon() {} | ||
|
||
} // namespace titandb | ||
} // namespace rocksdb |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
#pragma once | ||
|
||
#include "util/file_reader_writer.h" | ||
#include "utilities/titandb/blob_format.h" | ||
#include "utilities/titandb/options.h" | ||
|
||
namespace rocksdb { | ||
namespace titandb { | ||
|
||
// Blob file format: | ||
// | ||
// <begin> | ||
// [blob record 1] | ||
// [blob record 2] | ||
// ... | ||
// [blob record N] | ||
// [meta block 1] | ||
// [meta block 2] | ||
// ... | ||
// [meta block K] | ||
// [meta index block] | ||
// [footer] | ||
// <end> | ||
// | ||
// 1. The sequence of blob records in the file are stored in sorted | ||
// order. These records come one after another at the beginning of the | ||
// file, and are compressed according to the compression options. | ||
// | ||
// 2. After the blob records we store a bunch of meta blocks, and a | ||
// meta index block with block handles pointed to the meta blocks. The | ||
// meta block and the meta index block are formatted the same as the | ||
// BlockBasedTable. | ||
|
||
class BlobFileBuilder { | ||
public: | ||
// Constructs a builder that will store the contents of the file it | ||
// is building in "*file". Does not close the file. It is up to the | ||
// caller to sync and close the file after calling Finish(). | ||
BlobFileBuilder(const TitanCFOptions& options, WritableFileWriter* file) | ||
: options_(options), file_(file) {} | ||
|
||
// Adds the record to the file and points the handle to it. | ||
void Add(const BlobRecord& record, BlobHandle* handle); | ||
|
||
// Returns non-ok iff some error has been detected. | ||
Status status() const { return status_; } | ||
|
||
// Finishes building the table. | ||
// REQUIRES: Finish(), Abandon() have not been called. | ||
Status Finish(); | ||
|
||
// Abandons building the table. If the caller is not going to call | ||
// Finish(), it must call Abandon() before destroying this builder. | ||
// REQUIRES: Finish(), Abandon() have not been called. | ||
void Abandon(); | ||
|
||
private: | ||
bool ok() const { return status().ok(); } | ||
|
||
TitanCFOptions options_; | ||
WritableFileWriter* file_; | ||
|
||
Status status_; | ||
std::string buffer_; | ||
std::string compressed_buffer_; | ||
}; | ||
|
||
} // namespace titandb | ||
} // namespace rocksdb |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
#include "utilities/titandb/blob_file_cache.h" | ||
|
||
#include "util/filename.h" | ||
#include "util/file_reader_writer.h" | ||
#include "utilities/titandb/util.h" | ||
|
||
namespace rocksdb { | ||
namespace titandb { | ||
|
||
namespace { | ||
|
||
Slice EncodeFileNumber(const uint64_t* number) { | ||
return Slice(reinterpret_cast<const char*>(number), sizeof(*number)); | ||
} | ||
|
||
} // namespace | ||
|
||
BlobFileCache::BlobFileCache(const TitanDBOptions& db_options, | ||
const TitanCFOptions& cf_options, | ||
std::shared_ptr<Cache> cache) | ||
: env_(db_options.env), | ||
env_options_(db_options), | ||
db_options_(db_options), | ||
cf_options_(cf_options), | ||
cache_(cache) {} | ||
|
||
Status BlobFileCache::Get(const ReadOptions& options, uint64_t file_number, | ||
uint64_t file_size, const BlobHandle& handle, | ||
BlobRecord* record, PinnableSlice* buffer) { | ||
Cache::Handle* cache_handle = nullptr; | ||
Status s = FindFile(file_number, file_size, &cache_handle); | ||
if (!s.ok()) return s; | ||
|
||
auto reader = reinterpret_cast<BlobFileReader*>(cache_->Value(cache_handle)); | ||
s = reader->Get(options, handle, record, buffer); | ||
cache_->Release(cache_handle); | ||
return s; | ||
} | ||
|
||
Status BlobFileCache::NewPrefetcher( | ||
uint64_t file_number, | ||
uint64_t file_size, | ||
std::unique_ptr<BlobFilePrefetcher>* result) { | ||
Cache::Handle* cache_handle = nullptr; | ||
Status s = FindFile(file_number, file_size, &cache_handle); | ||
if (!s.ok()) return s; | ||
|
||
auto reader = reinterpret_cast<BlobFileReader*>(cache_->Value(cache_handle)); | ||
auto prefetcher = new BlobFilePrefetcher(reader); | ||
prefetcher->RegisterCleanup(&UnrefCacheHandle, cache_.get(), cache_handle); | ||
result->reset(prefetcher); | ||
return s; | ||
} | ||
|
||
void BlobFileCache::Evict(uint64_t file_number) { | ||
cache_->Erase(EncodeFileNumber(&file_number)); | ||
} | ||
|
||
Status BlobFileCache::FindFile(uint64_t file_number, | ||
uint64_t file_size, | ||
Cache::Handle** handle) { | ||
Status s; | ||
Slice cache_key = EncodeFileNumber(&file_number); | ||
*handle = cache_->Lookup(cache_key); | ||
if (*handle) return s; | ||
|
||
std::unique_ptr<RandomAccessFileReader> file; | ||
{ | ||
std::unique_ptr<RandomAccessFile> f; | ||
auto file_name = BlobFileName(db_options_.dirname, file_number); | ||
s = env_->NewRandomAccessFile(file_name, &f, env_options_); | ||
if (!s.ok()) return s; | ||
if (db_options_.advise_random_on_open) { | ||
f->Hint(RandomAccessFile::RANDOM); | ||
} | ||
file.reset(new RandomAccessFileReader(std::move(f), file_name)); | ||
} | ||
|
||
std::unique_ptr<BlobFileReader> reader; | ||
s = BlobFileReader::Open(cf_options_, std::move(file), file_size, &reader); | ||
if (!s.ok()) return s; | ||
|
||
cache_->Insert(cache_key, reader.release(), 1, | ||
&DeleteCacheValue<BlobFileReader>, handle); | ||
return s; | ||
} | ||
|
||
} // namespace titandb | ||
} // namespace rocksdb |
Oops, something went wrong.