Skip to content

Commit

Permalink
store data in uni ps
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu committed Feb 7, 2023
1 parent 584c992 commit ec66818
Show file tree
Hide file tree
Showing 29 changed files with 1,027 additions and 180 deletions.
45 changes: 45 additions & 0 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
#include <Storages/IStorage.h>
#include <Storages/MarkCache.h>
#include <Storages/Page/V3/PageStorageImpl.h>
#include <Storages/Page/V3/Universal/UniversalPageStorageService.h>
#include <Storages/PathCapacityMetrics.h>
#include <Storages/PathPool.h>
#include <Storages/Transaction/BackgroundService.h>
Expand Down Expand Up @@ -164,6 +165,10 @@ struct ContextShared
IORateLimiter io_rate_limiter;
PageStorageRunMode storage_run_mode = PageStorageRunMode::ONLY_V3;
DM::GlobalStoragePoolPtr global_storage_pool;

/// The PS instance available on Write Node.
UniversalPageStorageServicePtr ps_write;

TiFlashSecurityConfigPtr security_config;

/// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests.
Expand Down Expand Up @@ -1594,6 +1599,11 @@ void Context::initializePageStorageMode(const PathPool & path_pool, UInt64 stora
shared->storage_run_mode = isPageStorageV2Existed(path_pool) ? PageStorageRunMode::MIX_MODE : PageStorageRunMode::ONLY_V3;
return;
}
case PageFormat::V4:
{
shared->storage_run_mode = PageStorageRunMode::UNI_PS;
return;
}
default:
throw Exception(fmt::format("Can't detect the format version of Page [page_version={}]", storage_page_format_version),
ErrorCodes::LOGICAL_ERROR);
Expand Down Expand Up @@ -1648,6 +1658,41 @@ DM::GlobalStoragePoolPtr Context::getGlobalStoragePool() const
return shared->global_storage_pool;
}

void Context::initializeWriteNodePageStorageIfNeed(const PathPool & path_pool, const FileProviderPtr & file_provider)
{
auto lock = getLock();
if (shared->storage_run_mode == PageStorageRunMode::UNI_PS)
{
if (shared->ps_write)
{
// GlobalStoragePool may be initialized many times in some test cases for restore.
LOG_WARNING(shared->log, "GlobalUniversalPageStorage(WriteNode) has already been initialized.");
}
PageStorageConfig config;
shared->ps_write = UniversalPageStorageService::create( //
*this,
"write",
path_pool.getPSDiskDelegatorGlobalMulti("write"),
config,
file_provider);
shared->ps_write->restore();
LOG_INFO(shared->log, "initialized GlobalUniversalPageStorage(WriteNode)");
}
}

UniversalPageStoragePtr Context::getWriteNodePageStorage() const
{
auto lock = getLock();
if (shared->ps_write)
{
return shared->ps_write->getUniversalPageStorage();
}
else
{
return nullptr;
}
}

UInt16 Context::getTCPPort() const
{
auto lock = getLock();
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ using Dependencies = std::vector<DatabaseAndTableName>;
using TableAndCreateAST = std::pair<StoragePtr, ASTPtr>;
using TableAndCreateASTs = std::map<String, TableAndCreateAST>;

class UniversalPageStorage;
using UniversalPageStoragePtr = std::shared_ptr<UniversalPageStorage>;

/** A set of known objects that can be used in the query.
* Consists of a shared part (always common to all sessions and queries)
* and copied part (which can be its own for each session or query).
Expand Down Expand Up @@ -427,6 +430,9 @@ class Context
bool initializeGlobalStoragePoolIfNeed(const PathPool & path_pool);
DM::GlobalStoragePoolPtr getGlobalStoragePool() const;

void initializeWriteNodePageStorageIfNeed(const PathPool & path_pool, const FileProviderPtr & file_provider);
UniversalPageStoragePtr getWriteNodePageStorage() const;

/// Call after initialization before using system logs. Call for global context.
void initializeSystemLogs();

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool());
LOG_INFO(log, "Global PageStorage run mode is {}", static_cast<UInt8>(global_context->getPageStorageRunMode()));

global_context->initializeWriteNodePageStorageIfNeed(global_context->getPathPool(), global_context->getFileProvider());

/// Initialize RateLimiter.
global_context->initializeRateLimiter(config(), bg_pool, blockable_bg_pool);

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ SegmentPtr Segment::restoreSegment( //
return segment;
}

void Segment::serialize(WriteBatch & wb)
void Segment::serialize(WriteBatchWrapper & wb)
{
MemoryWriteBuffer buf(0, SEGMENT_BUFFER_SIZE);
writeIntBinary(STORAGE_FORMAT_CURRENT.segment, buf);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class Segment

static SegmentPtr restoreSegment(const LoggerPtr & parent_log, DMContext & context, PageIdU64 segment_id);

void serialize(WriteBatch & wb);
void serialize(WriteBatchWrapper & wb);

/// Attach a new ColumnFile into the Segment. The ColumnFile will be added to MemFileSet and flushed to disk later.
/// The block data of the passed in ColumnFile should be placed on disk before calling this function.
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void StableValueSpace::setFiles(const DMFiles & files_, const RowKeyRange & rang
this->files = files_;
}

void StableValueSpace::saveMeta(WriteBatch & meta_wb)
void StableValueSpace::saveMeta(WriteBatchWrapper & meta_wb)
{
MemoryWriteBuffer buf(0, 8192);
writeIntBinary(STORAGE_FORMAT_CURRENT.stable, buf);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/StableValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class StableValueSpace : public std::enable_shared_from_this<StableValueSpace>
void setFiles(const DMFiles & files_, const RowKeyRange & range, DMContext * dm_context = nullptr);

PageIdU64 getId() const { return id; }
void saveMeta(WriteBatch & meta_wb);
void saveMeta(WriteBatchWrapper & meta_wb);

size_t getRows() const;
size_t getBytes() const;
Expand Down
Loading

0 comments on commit ec66818

Please sign in to comment.