Skip to content

Commit

Permalink
Split portion and chunks (#11386)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Nov 12, 2024
1 parent de53af3 commit 7e9ca33
Show file tree
Hide file tree
Showing 127 changed files with 2,890 additions and 1,325 deletions.
6 changes: 2 additions & 4 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ ydb/core/keyvalue/ut_trace TKeyValueTracingTest.WriteSmall
ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.Select
ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.UpsertEvWrite
ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.UpsertViaLegacyScripting-Streaming
ydb/core/tx/columnshard/engines/ut TColumnEngineTestLogs.IndexTtl
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestAggregation
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestFilterCompare
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.BlobsSharingSplit1_1_clean_with_restarts
Expand All @@ -31,10 +32,7 @@ ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplitsWithRestartsAfterWait
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplitsWithRestartsWhenWait
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleMergesWithRestartsAfterWait
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleMergesWithRestartsWhenWait
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesActualization
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesInBS
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesInLocalMetadata
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesModificationError
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.*
ydb/core/kqp/ut/olap KqpOlapStatistics.StatsUsageWithTTL
ydb/core/kqp/ut/olap KqpOlapWrite.TierDraftsGCWithRestart
ydb/core/kqp/ut/olap [*/*] chunk chunk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,24 @@ TConclusionStatus TUpsertOptionsOperation::DoDeserialize(NYql::TObjectSettingsIm
}
}

if (const auto className = features.Extract<TString>("METADATA_MEMORY_MANAGER.CLASS_NAME")) {
if (!MetadataManagerConstructor.Initialize(*className)) {
return TConclusionStatus::Fail("incorrect class name for metadata manager:" + *className);
}

NJson::TJsonValue jsonData = NJson::JSON_MAP;
auto fValue = features.Extract("METADATA_MEMORY_MANAGER.FEATURES");
if (fValue) {
if (!NJson::ReadJsonFastTree(*fValue, &jsonData)) {
return TConclusionStatus::Fail("incorrect json in request METADATA_MEMORY_MANAGER.FEATURES parameter");
}
}
auto result = MetadataManagerConstructor->DeserializeFromJson(jsonData);
if (result.IsFail()) {
return result;
}
}

return TConclusionStatus::Success();
}

Expand All @@ -40,6 +58,9 @@ void TUpsertOptionsOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTab
if (CompactionPlannerConstructor.HasObject()) {
CompactionPlannerConstructor.SerializeToProto(*schemaData.MutableOptions()->MutableCompactionPlannerConstructor());
}
if (MetadataManagerConstructor.HasObject()) {
MetadataManagerConstructor.SerializeToProto(*schemaData.MutableOptions()->MutableMetadataManagerConstructor());
}
}

}
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#include "abstract.h"
#include <ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h>
#include <ydb/core/tx/columnshard/data_accessor/abstract/constructor.h>

namespace NKikimr::NKqp {

class TUpsertOptionsOperation : public ITableStoreOperation {
class TUpsertOptionsOperation: public ITableStoreOperation {
private:
static TString GetTypeName() {
return "UPSERT_OPTIONS";
Expand All @@ -14,6 +15,7 @@ class TUpsertOptionsOperation : public ITableStoreOperation {
bool SchemeNeedActualization = false;
std::optional<bool> ExternalGuaranteeExclusivePK;
NOlap::NStorageOptimizer::TOptimizerPlannerConstructorContainer CompactionPlannerConstructor;
NOlap::NDataAccessorControl::TMetadataManagerConstructorContainer MetadataManagerConstructor;
public:
TConclusionStatus DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ PEERDIR(
ydb/services/metadata/manager
ydb/core/formats/arrow/serializer
ydb/core/tx/columnshard/engines/storage/optimizer/abstract
ydb/core/tx/columnshard/data_accessor/abstract
ydb/core/kqp/gateway/utils
ydb/core/protos
)
Expand Down
58 changes: 58 additions & 0 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2719,6 +2719,64 @@ Y_UNIT_TEST_SUITE(KqpOlap) {

}

Y_UNIT_TEST(MetadataMemoryManager) {
auto settings = TKikimrSettings().SetWithSampleTables(false);
TKikimrRunner kikimr(settings);

TLocalHelper(kikimr).CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();

// Tests::NCommon::TLoggerInit(kikimr).Initialize();

auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);

WriteTestData(kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1100000, 300100000, 10000);
{
auto it = tableClient
.StreamExecuteScanQuery(R"(
--!syntax_v1
SELECT
COUNT(*)
FROM `/Root/olapStore/olapTable`
)")
.GetValueSync();

UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
TString result = StreamResultToYson(it);
Cout << result << Endl;
CompareYson(result, R"([[20000u;]])");
}
{
auto alterQuery =
TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, `METADATA_MEMORY_MANAGER.CLASS_NAME`=`local_db`,
`METADATA_MEMORY_MANAGER.FEATURES`=`{"memory_cache_size" : 0}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
{
auto it = tableClient
.StreamExecuteScanQuery(R"(
--!syntax_v1
SELECT
COUNT(*)
FROM `/Root/olapStore/olapTable`
)")
.GetValueSync();

UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
TString result = StreamResultToYson(it);
Cout << result << Endl;
CompareYson(result, R"([[20000u;]])");
}
}

Y_UNIT_TEST(NormalizeAbsentColumn) {
auto settings = TKikimrSettings().SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/counters_columnshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,5 @@ enum ETxTypes {
TXTYPE_APPLY_NORMALIZER = 35 [(TxTypeOpts) = {Name: "TxApplyNormalizer"}];
TXTYPE_START_INTERNAL_SCAN = 36 [(TxTypeOpts) = {Name: "TxStartInternalScan"}];
TXTYPE_DATA_SHARING_START_SOURCE_CURSOR = 37 [(TxTypeOpts) = {Name: "TxDataSharingStartSourceCursor"}];
TXTYPE_ASK_PORTION_METADATA = 38 [(TxTypeOpts) = {Name: "TxAskPortionMetadata"}];
}
19 changes: 19 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -465,10 +465,28 @@ message TCompactionPlannerConstructorContainer {
}
}

message TMetadataManagerConstructorContainer {
optional string ClassName = 1;

message TInMem {
}

message TLocalDB {
optional uint64 MemoryCacheSize = 1 [default = 128000000];
optional bool FetchOnStart = 2 [default = false];
}

oneof Implementation {
TInMem InMem = 20;
TLocalDB LocalDB = 21;
}
}

message TColumnTableSchemeOptions {
optional bool SchemeNeedActualization = 1 [default = false];
optional bool ExternalGuaranteeExclusivePK = 2 [default = false];
optional TCompactionPlannerConstructorContainer CompactionPlannerConstructor = 3;
optional TMetadataManagerConstructorContainer MetadataManagerConstructor = 4;
}

message TColumnTableSchema {
Expand Down Expand Up @@ -508,6 +526,7 @@ message TColumnTableRequestedOptions {
optional bool SchemeNeedActualization = 1 [default = false];
optional bool ExternalGuaranteeExclusivePK = 2;
optional TCompactionPlannerConstructorContainer CompactionPlannerConstructor = 3;
optional TMetadataManagerConstructorContainer MetadataManagerConstructor = 4;
}

message TAlterColumnTableSchema {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo
for (auto&& portion : pack.MutablePortions()) {
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
static TAtomicCounter Counter = 0;
portion.GetPortionInfoConstructor()->SetInsertWriteId((TInsertWriteId)Counter.Inc());
portion.GetPortionInfoConstructor()->MutablePortionConstructor().SetInsertWriteId((TInsertWriteId)Counter.Inc());
} else {
portion.GetPortionInfoConstructor()->SetInsertWriteId(Self->InsertTable->BuildNextWriteId(txc));
portion.GetPortionInfoConstructor()->MutablePortionConstructor().SetInsertWriteId(Self->InsertTable->BuildNextWriteId(txc));
}
pack.AddInsertWriteId(portion.GetPortionInfoConstructor()->GetInsertWriteIdVerified());
pack.AddInsertWriteId(portion.GetPortionInfoConstructor()->GetPortionConstructor().GetInsertWriteIdVerified());
portion.Finalize(Self, txc);
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
granule.CommitImmediateOnExecute(txc, *CommitSnapshot, portion.GetPortionInfo());
Expand Down Expand Up @@ -99,7 +99,7 @@ void TTxBlobsWritingFinished::DoComplete(const TActorContext& ctx) {
Self->GetOperationsManager().AddEventForLock(*Self, op->GetLockId(), evWrite);
}
}
granule.InsertPortionOnComplete(portion.GetPortionInfo().MutablePortionInfoPtr());
granule.InsertPortionOnComplete(portion.GetPortionInfo(), index);
}
if (op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
AFL_VERIFY(CommitSnapshot);
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void TColumnShard::BecomeBroken(const TActorContext& ctx) {
void TColumnShard::SwitchToWork(const TActorContext& ctx) {
{
const TLogContextGuard gLogging =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletID())("self_id", SelfId());
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletID())("self_id", SelfId())("process", "SwitchToWork");
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "initialize_shard")("step", "SwitchToWork");

for (auto&& i : TablesManager.GetTables()) {
Expand Down Expand Up @@ -109,7 +109,7 @@ void TColumnShard::OnActivateExecutor(const TActorContext& ctx) {
ResourceSubscribeActor = ctx.Register(new NOlap::NResourceBroker::NSubscribe::TActor(TabletID(), SelfId()));
BufferizationWriteActorId = ctx.Register(new NColumnShard::NWriting::TActor(TabletID(), SelfId()));
DataAccessorsControlActorId = ctx.Register(new NOlap::NDataAccessorControl::TActor(TabletID(), SelfId()));
DataAccessorsManager = std::make_shared<NOlap::NDataAccessorControl::TActorAccessorsManager>(DataAccessorsControlActorId),
DataAccessorsManager = std::make_shared<NOlap::NDataAccessorControl::TActorAccessorsManager>(DataAccessorsControlActorId, SelfId()),

PrioritizationClientId = NPrioritiesQueue::TCompServiceOperator::RegisterClient();
Execute(CreateTxInitSchema(), ctx);
Expand Down
Loading

0 comments on commit 7e9ca33

Please sign in to comment.