Skip to content

Commit

Permalink
column chunks v1 schema (#11161)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Nov 1, 2024
1 parent 846a92b commit f6eeebf
Show file tree
Hide file tree
Showing 32 changed files with 673 additions and 232 deletions.
2 changes: 2 additions & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1655,6 +1655,8 @@ message TColumnShardConfig {
optional uint32 LagForCompactionBeforeTieringsMs = 22 [default = 3600000];
optional uint32 OptimizerFreshnessCheckDurationMs = 23 [default = 300000];
optional uint32 SmallPortionDetectSizeLimit = 24 [default = 1048576]; // 1 << 20
optional bool ColumnChunksV0Usage = 25 [default = true];
optional bool ColumnChunksV1Usage = 26 [default = true];
}

message TSchemeShardConfig {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1033,10 +1033,11 @@ void TColumnShard::Handle(NOlap::NDataSharing::NEvents::TEvSendDataFromSource::T
}

THashMap<ui64, NOlap::NDataSharing::NEvents::TPathIdData> dataByPathId;
TBlobGroupSelector dsGroupSelector(Info());
for (auto&& i : ev->Get()->Record.GetPathIdData()) {
auto schema = TablesManager.GetPrimaryIndexAsVerified<NOlap::TColumnEngineForLogs>().GetVersionedIndex().GetLastSchema();
AFL_VERIFY(schema);
auto data = NOlap::NDataSharing::NEvents::TPathIdData::BuildFromProto(i, schema->GetIndexInfo());
auto data = NOlap::NDataSharing::NEvents::TPathIdData::BuildFromProto(i, schema->GetIndexInfo(), dsGroupSelector);
AFL_VERIFY(data.IsSuccess())("error", data.GetErrorMessage());
AFL_VERIFY(dataByPathId.emplace(i.GetPathId(), data.DetachResult()).second);
}
Expand Down
70 changes: 64 additions & 6 deletions ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@

#include <type_traits>

namespace NKikimr::NOlap {
class TColumnChunkLoadContext;
}

namespace NKikimr::NColumnShard {

using NOlap::TInsertWriteId;
Expand Down Expand Up @@ -59,7 +55,8 @@ struct Schema : NIceDb::Schema {
ShardingInfoTableId,
RepairsTableId,
NormalizersTableId,
NormalizerEventsTableId
NormalizerEventsTableId,
ColumnsV1TableId
};

enum class ETierTables: ui32 {
Expand Down Expand Up @@ -558,6 +555,20 @@ struct Schema : NIceDb::Schema {
using TColumns = TableColumns<NormalizerId, EventId, Instant, EventType, Description>;
};

struct IndexColumnsV1: Table<ColumnsV1TableId> {
struct PathId: Column<1, NScheme::NTypeIds::Uint64> {};
struct PortionId: Column<2, NScheme::NTypeIds::Uint64> {};
struct SSColumnId: Column<3, NScheme::NTypeIds::Uint32> {};
struct ChunkIdx: Column<4, NScheme::NTypeIds::Uint32> {};
struct Metadata: Column<5, NScheme::NTypeIds::String> {}; // NKikimrTxColumnShard.TIndexColumnMeta
struct BlobIdx: Column<6, NScheme::NTypeIds::Uint32> {};
struct Offset: Column<7, NScheme::NTypeIds::Uint32> {};
struct Size: Column<8, NScheme::NTypeIds::Uint32> {};

using TKey = TableKey<PathId, PortionId, SSColumnId, ChunkIdx>;
using TColumns = TableColumns<PathId, PortionId, SSColumnId, ChunkIdx, Metadata, BlobIdx, Offset, Size>;
};

using TTables = SchemaTables<
Value,
TxInfo,
Expand Down Expand Up @@ -595,7 +606,8 @@ struct Schema : NIceDb::Schema {
InFlightSnapshots,
TxDependencies,
TxStates,
TxEvents
TxEvents,
IndexColumnsV1
>;

//
Expand Down Expand Up @@ -932,6 +944,10 @@ class TColumnChunkLoadContext {
return Address;
}

TFullChunkAddress GetFullChunkAddress() const {
return TFullChunkAddress(PathId, PortionId, Address.GetEntityId(), Address.GetChunkIdx());
}

TColumnChunkLoadContext(const ui64 pathId, const ui64 portionId, const TChunkAddress& address, const TBlobRange& bRange,
const NKikimrTxColumnShard::TIndexColumnMeta& metaProto)
: BlobRange(bRange)
Expand Down Expand Up @@ -966,6 +982,48 @@ class TColumnChunkLoadContext {
}
};

class TColumnChunkLoadContextV1 {
private:
TChunkAddress Address;
YDB_READONLY_DEF(TBlobRangeLink16, BlobRange);
YDB_READONLY(ui64, PathId, 0);
YDB_READONLY(ui64, PortionId, 0);
YDB_READONLY_DEF(NKikimrTxColumnShard::TIndexColumnMeta, MetaProto);

public:
TFullChunkAddress GetFullChunkAddress() const {
return TFullChunkAddress(PathId, PortionId, Address.GetEntityId(), Address.GetChunkIdx());
}

const TChunkAddress& GetAddress() const {
return Address;
}

TColumnChunkLoadContextV1(const ui64 pathId, const ui64 portionId, const TChunkAddress& address, const TBlobRangeLink16& bRange,
const NKikimrTxColumnShard::TIndexColumnMeta& metaProto)
: Address(address)
, BlobRange(bRange)
, PathId(pathId)
, PortionId(portionId)
, MetaProto(metaProto) {
}

template <class TSource>
TColumnChunkLoadContextV1(const TSource& rowset)
: Address(rowset.template GetValue<NColumnShard::Schema::IndexColumnsV1::SSColumnId>(),
rowset.template GetValue<NColumnShard::Schema::IndexColumnsV1::ChunkIdx>())
, BlobRange(rowset.template GetValue<NColumnShard::Schema::IndexColumnsV1::BlobIdx>(),
rowset.template GetValue<NColumnShard::Schema::IndexColumnsV1::Offset>(),
rowset.template GetValue<NColumnShard::Schema::IndexColumnsV1::Size>())
{
AFL_VERIFY(Address.GetColumnId())("event", "incorrect address")("address", Address.DebugString());
PathId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV1::PathId>();
PortionId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV1::PortionId>();
const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV1::Metadata>();
AFL_VERIFY(MetaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf");
}
};

class TIndexChunkLoadContext {
private:
YDB_READONLY_DEF(std::optional<TBlobRange>, BlobRange);
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/tx/columnshard/common/blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ class IBlobGroupSelector {
virtual ui32 GetGroup(const TLogoBlobID& blobId) const = 0;
};

class TFakeGroupSelector: public IBlobGroupSelector {
public:
virtual ui32 GetGroup(const TLogoBlobID& /*blobId*/) const override {
return 1;
}
};

class TUnifiedBlobId {
// Id of a blob in YDB distributed storage
struct TDsBlobId {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ class TPathIdData {

TPathIdData() = default;

TConclusionStatus DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TPathIdData& proto, const TIndexInfo& indexInfo) {
TConclusionStatus DeserializeFromProto(
const NKikimrColumnShardDataSharingProto::TPathIdData& proto, const TIndexInfo& indexInfo, const IBlobGroupSelector& groupSelector) {
if (!proto.HasPathId()) {
return TConclusionStatus::Fail("no path id in proto");
}
PathId = proto.GetPathId();
for (auto&& portionProto : proto.GetPortions()) {
TConclusion<TPortionDataAccessor> portion = TPortionDataAccessor::BuildFromProto(portionProto, indexInfo);
TConclusion<TPortionDataAccessor> portion = TPortionDataAccessor::BuildFromProto(portionProto, indexInfo, groupSelector);
if (!portion) {
return portion.GetError();
}
Expand Down Expand Up @@ -69,9 +70,10 @@ class TPathIdData {
}
};

static TConclusion<TPathIdData> BuildFromProto(const NKikimrColumnShardDataSharingProto::TPathIdData& proto, const TIndexInfo& indexInfo) {
static TConclusion<TPathIdData> BuildFromProto(
const NKikimrColumnShardDataSharingProto::TPathIdData& proto, const TIndexInfo& indexInfo, const IBlobGroupSelector& groupSelector) {
TPathIdData result;
auto resultParsing = result.DeserializeFromProto(proto, indexInfo);
auto resultParsing = result.DeserializeFromProto(proto, indexInfo, groupSelector);
if (!resultParsing) {
return resultParsing;
} else {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) {
TMemoryProfileGuard g("TTxInit/LoadColumns/Portions");
if (!db.LoadPortions([&](TPortionInfoConstructor&& portion, const NKikimrTxColumnShard::TIndexPortionMeta& metaProto) {
const TIndexInfo& indexInfo = portion.GetSchema(VersionedIndex)->GetIndexInfo();
AFL_VERIFY(portion.MutableMeta().LoadMetadata(metaProto, indexInfo));
AFL_VERIFY(portion.MutableMeta().LoadMetadata(metaProto, indexInfo, db.GetDsGroupSelectorVerified()));
AFL_VERIFY(constructors.AddConstructorVerified(std::move(portion)));
})) {
timer.AddLoadingFail();
Expand All @@ -216,7 +216,7 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) {
NColumnShard::TLoadTimeSignals::TLoadTimer timer = SignalCounters.ColumnsLoadingTimeCounters.StartGuard();
TMemoryProfileGuard g("TTxInit/LoadColumns/Records");
TPortionInfo::TSchemaCursor schema(VersionedIndex);
if (!db.LoadColumns([&](const TColumnChunkLoadContext& loadContext) {
if (!db.LoadColumns([&](const TColumnChunkLoadContextV1& loadContext) {
auto* constructor = constructors.GetConstructorVerified(loadContext.GetPathId(), loadContext.GetPortionId());
constructor->LoadRecord(loadContext);
})) {
Expand Down
48 changes: 30 additions & 18 deletions ydb/core/tx/columnshard/engines/db_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "db_wrapper.h"
#include "portions/constructor.h"
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/core/protos/config.pb.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>
#include <ydb/core/tx/sharding/sharding.h>

Expand Down Expand Up @@ -45,22 +46,33 @@ bool TDbWrapper::Load(TInsertTableAccessor& insertTable,

void TDbWrapper::WriteColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row, const ui32 firstPKColumnId) {
NIceDb::TNiceDb db(Database);
using IndexColumnsV1 = NColumnShard::Schema::IndexColumnsV1;
auto rowProto = row.GetMeta().SerializeToProto();
if (row.GetChunkIdx() == 0 && row.GetColumnId() == firstPKColumnId) {
*rowProto.MutablePortionMeta() = portion.GetMeta().SerializeToProto();
AFL_VERIFY(AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage() || AppDataVerified().ColumnShardConfig.GetColumnChunksV0Usage());
if (AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage()) {
db.Table<IndexColumnsV1>()
.Key(portion.GetPathId(), portion.GetPortionId(), row.ColumnId, row.Chunk)
.Update(NIceDb::TUpdate<IndexColumnsV1::BlobIdx>(row.GetBlobRange().GetBlobIdxVerified()),
NIceDb::TUpdate<IndexColumnsV1::Metadata>(rowProto.SerializeAsString()),
NIceDb::TUpdate<IndexColumnsV1::Offset>(row.BlobRange.Offset),
NIceDb::TUpdate<IndexColumnsV1::Size>(row.BlobRange.Size));
}
if (AppDataVerified().ColumnShardConfig.GetColumnChunksV0Usage()) {
if (row.GetChunkIdx() == 0 && row.GetColumnId() == firstPKColumnId) {
*rowProto.MutablePortionMeta() = portion.GetMeta().SerializeToProto();
}
using IndexColumns = NColumnShard::Schema::IndexColumns;
auto removeSnapshot = portion.GetRemoveSnapshotOptional();
db.Table<IndexColumns>()
.Key(0, 0, row.ColumnId, portion.GetMinSnapshotDeprecated().GetPlanStep(), portion.GetMinSnapshotDeprecated().GetTxId(),
portion.GetPortionId(), row.Chunk)
.Update(NIceDb::TUpdate<IndexColumns::XPlanStep>(removeSnapshot ? removeSnapshot->GetPlanStep() : 0),
NIceDb::TUpdate<IndexColumns::XTxId>(removeSnapshot ? removeSnapshot->GetTxId() : 0),
NIceDb::TUpdate<IndexColumns::Blob>(portion.GetBlobId(row.GetBlobRange().GetBlobIdxVerified()).SerializeBinary()),
NIceDb::TUpdate<IndexColumns::Metadata>(rowProto.SerializeAsString()),
NIceDb::TUpdate<IndexColumns::Offset>(row.BlobRange.Offset), NIceDb::TUpdate<IndexColumns::Size>(row.BlobRange.Size),
NIceDb::TUpdate<IndexColumns::PathId>(portion.GetPathId()));
}
using IndexColumns = NColumnShard::Schema::IndexColumns;
auto removeSnapshot = portion.GetRemoveSnapshotOptional();
db.Table<IndexColumns>().Key(0, 0, row.ColumnId,
portion.GetMinSnapshotDeprecated().GetPlanStep(), portion.GetMinSnapshotDeprecated().GetTxId(), portion.GetPortionId(), row.Chunk).Update(
NIceDb::TUpdate<IndexColumns::XPlanStep>(removeSnapshot ? removeSnapshot->GetPlanStep() : 0),
NIceDb::TUpdate<IndexColumns::XTxId>(removeSnapshot ? removeSnapshot->GetTxId() : 0),
NIceDb::TUpdate<IndexColumns::Blob>(portion.GetBlobId(row.GetBlobRange().GetBlobIdxVerified()).SerializeBinary()),
NIceDb::TUpdate<IndexColumns::Metadata>(rowProto.SerializeAsString()),
NIceDb::TUpdate<IndexColumns::Offset>(row.BlobRange.Offset),
NIceDb::TUpdate<IndexColumns::Size>(row.BlobRange.Size),
NIceDb::TUpdate<IndexColumns::PathId>(portion.GetPathId())
);
}

void TDbWrapper::WritePortion(const NOlap::TPortionInfo& portion) {
Expand Down Expand Up @@ -98,16 +110,16 @@ void TDbWrapper::EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRe
portion.GetMinSnapshotDeprecated().GetPlanStep(), portion.GetMinSnapshotDeprecated().GetTxId(), portion.GetPortionId(), row.Chunk).Delete();
}

bool TDbWrapper::LoadColumns(const std::function<void(const TColumnChunkLoadContext&)>& callback) {
bool TDbWrapper::LoadColumns(const std::function<void(const TColumnChunkLoadContextV1&)>& callback) {
NIceDb::TNiceDb db(Database);
using IndexColumns = NColumnShard::Schema::IndexColumns;
auto rowset = db.Table<IndexColumns>().Prefix(0).Select();
using IndexColumnsV1 = NColumnShard::Schema::IndexColumnsV1;
auto rowset = db.Table<IndexColumnsV1>().Select();
if (!rowset.IsReady()) {
return false;
}

while (!rowset.EndOfSet()) {
NOlap::TColumnChunkLoadContext chunkLoadContext(rowset, DsGroupSelector);
NOlap::TColumnChunkLoadContextV1 chunkLoadContext(rowset);
callback(chunkLoadContext);

if (!rowset.Next()) {
Expand Down
17 changes: 14 additions & 3 deletions ydb/core/tx/columnshard/engines/db_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class TDatabase;

namespace NKikimr::NOlap {

class TColumnChunkLoadContext;
class TColumnChunkLoadContextV1;
class TIndexChunkLoadContext;
class TInsertedData;
class TCommittedData;
Expand All @@ -30,6 +30,13 @@ class IDbWrapper {
public:
virtual ~IDbWrapper() = default;

virtual const IBlobGroupSelector* GetDsGroupSelector() const = 0;
const IBlobGroupSelector& GetDsGroupSelectorVerified() const {
const auto* result = GetDsGroupSelector();
AFL_VERIFY(result);
return *result;
}

virtual void Insert(const TInsertedData& data) = 0;
virtual void Commit(const TCommittedData& data) = 0;
virtual void Abort(const TInsertedData& data) = 0;
Expand All @@ -41,7 +48,7 @@ class IDbWrapper {

virtual void WriteColumn(const TPortionInfo& portion, const TColumnRecord& row, const ui32 firstPKColumnId) = 0;
virtual void EraseColumn(const TPortionInfo& portion, const TColumnRecord& row) = 0;
virtual bool LoadColumns(const std::function<void(const TColumnChunkLoadContext&)>& callback) = 0;
virtual bool LoadColumns(const std::function<void(const TColumnChunkLoadContextV1&)>& callback) = 0;

virtual void WritePortion(const NOlap::TPortionInfo& portion) = 0;
virtual void ErasePortion(const NOlap::TPortionInfo& portion) = 0;
Expand Down Expand Up @@ -78,7 +85,7 @@ class TDbWrapper : public IDbWrapper {

void WriteColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row, const ui32 firstPKColumnId) override;
void EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) override;
bool LoadColumns(const std::function<void(const TColumnChunkLoadContext&)>& callback) override;
bool LoadColumns(const std::function<void(const TColumnChunkLoadContextV1&)>& callback) override;

virtual void WriteIndex(const TPortionInfo& portion, const TIndexChunk& row) override;
virtual void EraseIndex(const TPortionInfo& portion, const TIndexChunk& row) override;
Expand All @@ -89,6 +96,10 @@ class TDbWrapper : public IDbWrapper {

virtual TConclusion<THashMap<ui64, std::map<TSnapshot, TGranuleShardingInfo>>> LoadGranulesShardingInfo() override;

virtual const IBlobGroupSelector* GetDsGroupSelector() const override {
return DsGroupSelector;
}

private:
NTable::TDatabase& Database;
const IBlobGroupSelector* DsGroupSelector;
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/engines/portions/column_record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ TConclusionStatus TChunkMeta::DeserializeFromProto(const NKikimrTxColumnShard::T
return TConclusionStatus::Success();
}

TChunkMeta::TChunkMeta(const TColumnChunkLoadContext& context) {
TChunkMeta::TChunkMeta(const TColumnChunkLoadContextV1& context) {
DeserializeFromProto(context.GetMetaProto()).Validate();
}

Expand All @@ -33,11 +33,11 @@ NKikimrTxColumnShard::TIndexColumnMeta TChunkMeta::SerializeToProto() const {
return meta;
}

TColumnRecord::TColumnRecord(const TBlobRangeLink16::TLinkId blobLinkId, const TColumnChunkLoadContext& loadContext)
TColumnRecord::TColumnRecord(const TColumnChunkLoadContextV1& loadContext)
: Meta(loadContext)
, ColumnId(loadContext.GetAddress().GetColumnId())
, Chunk(loadContext.GetAddress().GetChunk())
, BlobRange(loadContext.GetBlobRange().BuildLink(blobLinkId)) {
, BlobRange(loadContext.GetBlobRange()) {
}

TColumnRecord::TColumnRecord(const TChunkAddress& address, const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& column)
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/engines/portions/column_record.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class TColumnRecord;
}

namespace NKikimr::NOlap {
class TColumnChunkLoadContext;
class TColumnChunkLoadContextV1;
struct TIndexInfo;
class TColumnRecord;

Expand Down Expand Up @@ -59,7 +59,7 @@ struct TChunkMeta: public TSimpleChunkMeta {
}
};

TChunkMeta(const TColumnChunkLoadContext& context);
TChunkMeta(const TColumnChunkLoadContextV1& context);

TChunkMeta(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& column);
};
Expand Down Expand Up @@ -160,7 +160,7 @@ class TColumnRecord {
}

TColumnRecord(const TChunkAddress& address, const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& column);
TColumnRecord(const TBlobRangeLink16::TLinkId blobLinkId, const TColumnChunkLoadContext& loadContext);
TColumnRecord(const TColumnChunkLoadContextV1& loadContext);

friend IOutputStream& operator<<(IOutputStream& out, const TColumnRecord& rec) {
out << '{';
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/engines/portions/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,8 @@ TString TChunkAddress::DebugString() const {
return TStringBuilder() << "(column_id=" << ColumnId << ";chunk=" << Chunk << ";)";
}

TString TFullChunkAddress::DebugString() const {
return TStringBuilder() << "(path_id=" << PathId << ";portion_id=" << PortionId << ";column_id=" << ColumnId << ";chunk=" << Chunk << ";)";
}

}
Loading

0 comments on commit f6eeebf

Please sign in to comment.