Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add UniversalPageStorage #6723

Merged
merged 29 commits into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ add_headers_and_sources(dbms src/Storages/Page/V3/WAL)
add_headers_and_sources(dbms src/Storages/Page/V3/spacemap)
add_headers_and_sources(dbms src/Storages/Page/V3/PageDirectory)
add_headers_and_sources(dbms src/Storages/Page/V3/Blob)
add_headers_and_sources(dbms src/Storages/Page/V3/Universal)
add_headers_and_sources(dbms src/Storages/Page/)
add_headers_and_sources(dbms src/TiDB)
add_headers_and_sources(dbms src/Client)
Expand Down
11 changes: 11 additions & 0 deletions dbms/src/IO/ReadBufferFromString.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <IO/ReadBufferFromMemory.h>
#include <common/types.h>


namespace DB
Expand All @@ -31,4 +32,14 @@ class ReadBufferFromString : public ReadBufferFromMemory
{}
};

class ReadBufferFromOwnString : public String
, public ReadBufferFromString
{
public:
explicit ReadBufferFromOwnString(std::string_view s_)
: String(s_)
, ReadBufferFromString(*this)
{}
};

} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/WriteBatches.h>
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/PageDefinesBase.h>

namespace DB
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class ColumnFileBig : public ColumnFilePersisted

auto getFile() const { return file; }

PageId getDataPageId() { return file->pageId(); }
PageIdU64 getDataPageId() { return file->pageId(); }

size_t getRows() const override { return valid_rows; }
size_t getBytes() const override { return valid_bytes; };
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ Columns ColumnFileTiny::readFromDisk(const PageReader & page_reader, //

// Read the columns from disk and apply DDL cast if need
auto page_map = page_reader.read({fields});
Page page = page_map[data_page_id];
Page page = page_map.at(data_page_id);
for (size_t index = col_start; index < col_end; ++index)
{
const size_t index_in_read_columns = index - col_start;
Expand Down Expand Up @@ -160,7 +160,7 @@ ColumnFilePersistedPtr ColumnFileTiny::deserializeMetadata(const DMContext & con
if (unlikely(!schema))
throw Exception("Cannot deserialize DeltaPackBlock's schema", ErrorCodes::LOGICAL_ERROR);

PageId data_page_id;
PageIdU64 data_page_id;
size_t rows, bytes;

readIntBinary(data_page_id, buf);
Expand Down Expand Up @@ -213,7 +213,7 @@ ColumnFileTinyPtr ColumnFileTiny::writeColumnFile(const DMContext & context, con
return std::make_shared<ColumnFileTiny>(schema, limit, bytes, page_id, cache);
}

PageId ColumnFileTiny::writeColumnFileData(const DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs)
PageIdU64 ColumnFileTiny::writeColumnFileData(const DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs)
{
auto page_id = context.storage_pool.newLogPageId();

Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ColumnFileTiny : public ColumnFilePersisted
UInt64 bytes = 0;

/// The id of data page which stores the data of this pack.
PageId data_page_id;
PageIdU64 data_page_id;

/// The members below are not serialized.

Expand All @@ -62,7 +62,7 @@ class ColumnFileTiny : public ColumnFilePersisted
}

public:
ColumnFileTiny(const ColumnFileSchemaPtr & schema_, UInt64 rows_, UInt64 bytes_, PageId data_page_id_, const CachePtr & cache_ = nullptr)
ColumnFileTiny(const ColumnFileSchemaPtr & schema_, UInt64 rows_, UInt64 bytes_, PageIdU64 data_page_id_, const CachePtr & cache_ = nullptr)
: schema(schema_)
, rows(rows_)
, bytes(bytes_)
Expand All @@ -81,7 +81,7 @@ class ColumnFileTiny : public ColumnFilePersisted
/// The schema of this pack. Could be empty, i.e. a DeleteRange does not have a schema.
ColumnFileSchemaPtr getSchema() const { return schema; }

ColumnFileTinyPtr cloneWith(PageId new_data_page_id)
ColumnFileTinyPtr cloneWith(PageIdU64 new_data_page_id)
{
auto new_tiny_file = std::make_shared<ColumnFileTiny>(*this);
new_tiny_file->data_page_id = new_data_page_id;
Expand All @@ -98,13 +98,13 @@ class ColumnFileTiny : public ColumnFilePersisted

void serializeMetadata(WriteBuffer & buf, bool save_schema) const override;

PageId getDataPageId() const { return data_page_id; }
PageIdU64 getDataPageId() const { return data_page_id; }

Block readBlockForMinorCompaction(const PageReader & page_reader) const;

static ColumnFileTinyPtr writeColumnFile(const DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs, const CachePtr & cache = nullptr);

static PageId writeColumnFileData(const DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs);
static PageIdU64 writeColumnFileData(const DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs);

static ColumnFilePersistedPtr deserializeMetadata(const DMContext & context, ReadBuffer & buf, ColumnFileSchemaPtr & last_schema);

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile_V2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct ColumnFileV2
UInt64 bytes = 0;
BlockPtr schema;
RowKeyRange delete_range;
PageId data_page_id = 0;
PageIdU64 data_page_id = 0;

bool isDeleteRange() const { return !delete_range.none(); }
};
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFileFlushTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/WriteBatches.h>
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/PageDefinesBase.h>
#include <common/logger_useful.h>

namespace DB
Expand All @@ -47,7 +47,7 @@ class ColumnFileFlushTask
ColumnFilePtr column_file;

Block block_data;
PageId data_page = 0;
PageIdU64 data_page = 0;

bool sorted = false;
size_t rows_offset = 0;
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace DB
{
namespace DM
{
inline void serializeColumnFilePersisteds(WriteBatches & wbs, PageId id, const ColumnFilePersisteds & persisted_files)
inline void serializeColumnFilePersisteds(WriteBatches & wbs, PageIdU64 id, const ColumnFilePersisteds & persisted_files)
{
MemoryWriteBuffer buf(0, COLUMN_FILE_SERIALIZE_BUFFER_SIZE);
serializeSavedColumnFiles(buf, persisted_files);
Expand Down Expand Up @@ -76,7 +76,7 @@ void ColumnFilePersistedSet::checkColumnFiles(const ColumnFilePersisteds & new_c
}

ColumnFilePersistedSet::ColumnFilePersistedSet( //
PageId metadata_id_,
PageIdU64 metadata_id_,
const ColumnFilePersisteds & persisted_column_files)
: metadata_id(metadata_id_)
, persisted_files(persisted_column_files)
Expand All @@ -88,7 +88,7 @@ ColumnFilePersistedSet::ColumnFilePersistedSet( //
ColumnFilePersistedSetPtr ColumnFilePersistedSet::restore( //
DMContext & context,
const RowKeyRange & segment_range,
PageId id)
PageIdU64 id)
{
Page page = context.storage_pool.metaReader()->read(id);
ReadBufferFromMemory buf(page.data.begin(), page.data.size());
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
#include <Storages/DeltaMerge/DeltaTree.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/PageDefinesBase.h>
#include <fmt/format.h>

namespace DB
Expand All @@ -49,7 +49,7 @@ class ColumnFilePersistedSet : public std::enable_shared_from_this<ColumnFilePer
, private boost::noncopyable
{
private:
PageId metadata_id;
PageIdU64 metadata_id;
ColumnFilePersisteds persisted_files;
// TODO: check the proper memory_order when use this atomic variable
std::atomic<size_t> persisted_files_count = 0;
Expand All @@ -70,11 +70,11 @@ class ColumnFilePersistedSet : public std::enable_shared_from_this<ColumnFilePer
void checkColumnFiles(const ColumnFilePersisteds & new_column_files);

public:
explicit ColumnFilePersistedSet(PageId metadata_id_, const ColumnFilePersisteds & persisted_column_files = {});
explicit ColumnFilePersistedSet(PageIdU64 metadata_id_, const ColumnFilePersisteds & persisted_column_files = {});

/// Restore the metadata of this instance.
/// Only called after reboot.
static ColumnFilePersistedSetPtr restore(DMContext & context, const RowKeyRange & segment_range, PageId id);
static ColumnFilePersistedSetPtr restore(DMContext & context, const RowKeyRange & segment_range, PageIdU64 id);

/**
* Resets the logger by using the one from the segment.
Expand Down Expand Up @@ -123,7 +123,7 @@ class ColumnFilePersistedSet : public std::enable_shared_from_this<ColumnFilePer
ColumnFilePersisteds diffColumnFiles(const ColumnFiles & previous_column_files) const;

/// Thread safe part start
PageId getId() const { return metadata_id; }
PageIdU64 getId() const { return metadata_id; }

size_t getColumnFileCount() const { return persisted_files_count.load(); }
size_t getRows() const { return rows.load(); }
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace DM
// ================================================
// Public methods
// ================================================
DeltaValueSpace::DeltaValueSpace(PageId id_, const ColumnFilePersisteds & persisted_files, const ColumnFiles & in_memory_files)
DeltaValueSpace::DeltaValueSpace(PageIdU64 id_, const ColumnFilePersisteds & persisted_files, const ColumnFiles & in_memory_files)
: persisted_file_set(std::make_shared<ColumnFilePersistedSet>(id_, persisted_files))
, mem_table_set(std::make_shared<MemTableSet>(in_memory_files))
, delta_index(std::make_shared<DeltaIndex>())
Expand All @@ -55,7 +55,7 @@ void DeltaValueSpace::abandon(DMContext & context)
manager->deleteRef(delta_index);
}

DeltaValueSpacePtr DeltaValueSpace::restore(DMContext & context, const RowKeyRange & segment_range, PageId id)
DeltaValueSpacePtr DeltaValueSpace::restore(DMContext & context, const RowKeyRange & segment_range, PageIdU64 id)
{
auto persisted_file_set = ColumnFilePersistedSet::restore(context, segment_range, id);
return std::make_shared<DeltaValueSpace>(std::move(persisted_file_set));
Expand Down Expand Up @@ -114,7 +114,7 @@ std::vector<ColumnFilePtrT> CloneColumnFilesHelper<ColumnFilePtrT>::clone(
else if (auto * t = column_file->tryToTinyFile(); t)
{
// Use a newly created page_id to reference the data page_id of current column file.
PageId new_data_page_id = context.storage_pool.newLogPageId();
PageIdU64 new_data_page_id = context.storage_pool.newLogPageId();
wbs.log.putRefPage(new_data_page_id, t->getDataPageId());
auto new_column_file = t->cloneWith(new_data_page_id);
cloned.push_back(new_column_file);
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
#include <Storages/DeltaMerge/DeltaTree.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/PageDefinesBase.h>

namespace DB
{
namespace DM
{
using GenPageId = std::function<PageId()>;
using GenPageId = std::function<PageIdU64()>;
class DeltaValueSpace;
class DeltaValueSnapshot;

Expand Down Expand Up @@ -100,13 +100,13 @@ class DeltaValueSpace
LoggerPtr log;

public:
explicit DeltaValueSpace(PageId id_, const ColumnFilePersisteds & persisted_files = {}, const ColumnFiles & in_memory_files = {});
explicit DeltaValueSpace(PageIdU64 id_, const ColumnFilePersisteds & persisted_files = {}, const ColumnFiles & in_memory_files = {});

explicit DeltaValueSpace(ColumnFilePersistedSetPtr && persisted_file_set_);

/// Restore the metadata of this instance.
/// Only called after reboot.
static DeltaValueSpacePtr restore(DMContext & context, const RowKeyRange & segment_range, PageId id);
static DeltaValueSpacePtr restore(DMContext & context, const RowKeyRange & segment_range, PageIdU64 id);

/**
* Resets the logger by using the one from the segment.
Expand Down Expand Up @@ -167,7 +167,7 @@ class DeltaValueSpace
const RowKeyRange & target_range,
WriteBatches & wbs) const;

PageId getId() const { return persisted_file_set->getId(); }
PageIdU64 getId() const { return persisted_file_set->getId(); }

size_t getColumnFileCount() const { return persisted_file_set->getColumnFileCount() + mem_table_set->getColumnFileCount(); }
size_t getRows(bool use_unsaved = true) const
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DeltaIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/DeltaTree.h>
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/PageDefinesBase.h>

namespace DB
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ void DeltaMergeStore::dropAllSegments(bool keep_first_segment)
{
std::unique_lock lock(read_write_mutex);
auto segment_id = DELTA_MERGE_FIRST_SEGMENT_ID;
std::stack<PageId> segment_ids;
std::stack<PageIdU64> segment_ids;
while (segment_id != 0)
{
segment_ids.push(segment_id);
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ namespace tests
class DeltaMergeStoreTest;
}

inline static const PageId DELTA_MERGE_FIRST_SEGMENT_ID = 1;
inline static const PageIdU64 DELTA_MERGE_FIRST_SEGMENT_ID = 1;

struct SegmentStats
{
Expand Down Expand Up @@ -168,7 +168,7 @@ class DeltaMergeStore : private boost::noncopyable
static Settings EMPTY_SETTINGS;

using SegmentSortedMap = std::map<RowKeyValueRef, SegmentPtr, std::less<>>;
using SegmentMap = std::unordered_map<PageId, SegmentPtr>;
using SegmentMap = std::unordered_map<PageIdU64, SegmentPtr>;

enum ThreadType
{
Expand Down Expand Up @@ -269,9 +269,9 @@ class DeltaMergeStore : private boost::noncopyable

void deleteRange(const Context & db_context, const DB::Settings & db_settings, const RowKeyRange & delete_range);

std::tuple<String, PageId> preAllocateIngestFile();
std::tuple<String, PageIdU64> preAllocateIngestFile();

void preIngestFile(const String & parent_path, PageId file_id, size_t file_size);
void preIngestFile(const String & parent_path, PageIdU64 file_id, size_t file_size);

/// You must ensure external files are ordered and do not overlap. Otherwise exceptions will be thrown.
/// You must ensure all of the external files are contained by the range. Otherwise exceptions will be thrown.
Expand Down
5 changes: 2 additions & 3 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ extern const char force_ingest_via_replace[];

namespace DM
{

std::tuple<String, PageId> DeltaMergeStore::preAllocateIngestFile()
std::tuple<String, PageIdU64> DeltaMergeStore::preAllocateIngestFile()
{
if (shutdown_called.load(std::memory_order_relaxed))
return {};
Expand All @@ -59,7 +58,7 @@ std::tuple<String, PageId> DeltaMergeStore::preAllocateIngestFile()
return {parent_path, new_id};
}

void DeltaMergeStore::preIngestFile(const String & parent_path, const PageId file_id, size_t file_size)
void DeltaMergeStore::preIngestFile(const String & parent_path, const PageIdU64 file_id, size_t file_size)
{
if (shutdown_called.load(std::memory_order_relaxed))
return;
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class LocalDMFileGcScanner final
options.only_list_can_gc = true;
for (auto & root_path : delegate.listPaths())
{
std::set<PageId> ids_under_path;
std::set<PageIdU64> ids_under_path;
auto file_ids_in_current_path = DMFile::listAllInPath(file_provider, root_path, options);
path_and_ids_vec.emplace_back(root_path, std::move(file_ids_in_current_path));
}
Expand All @@ -104,7 +104,7 @@ class LocalDMFileGcRemover final
, logger(std::move(log))
{}

void operator()(const ExternalPageCallbacks::PathAndIdsVec & path_and_ids_vec, const std::set<PageId> & valid_ids)
void operator()(const ExternalPageCallbacks::PathAndIdsVec & path_and_ids_vec, const std::set<PageIdU64> & valid_ids)
{
// If the StoragePathPool is invalid or shutdown flag is set,
// meaning we call `remover` after shutdowning or dropping the table,
Expand Down Expand Up @@ -177,7 +177,7 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)
// that callbacks is called after the `DeltaMergeStore` shutdown or dropped,
// we must make the callbacks safe.
ExternalPageCallbacks callbacks;
callbacks.ns_id = storage_pool->getNamespaceId();
callbacks.prefix = storage_pool->getNamespaceId();
callbacks.scanner = LocalDMFileGcScanner(std::weak_ptr<StoragePathPool>(path_pool), global_context.getFileProvider());
callbacks.remover = LocalDMFileGcRemover(std::weak_ptr<StoragePathPool>(path_pool), global_context.getFileProvider(), log);
// remember to unregister it when shutdown
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/ExternalDTFileInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#pragma once

#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/PageDefinesBase.h>
#include <fmt/core.h>

namespace DB::DM
Expand All @@ -26,7 +26,7 @@ struct ExternalDTFileInfo
/**
* The allocated PageId of the file.
*/
PageId id;
PageIdU64 id;

/**
* The handle range of contained data.
Expand Down
Loading