Skip to content

Commit

Permalink
scan policy sql control (#12400)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Dec 10, 2024
1 parent d299ffe commit 456154d
Show file tree
Hide file tree
Showing 13 changed files with 73 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ TConclusionStatus TUpsertOptionsOperation::DoDeserialize(NYql::TObjectSettingsIm
return TConclusionStatus::Fail("Incorrect value for SCHEME_NEED_ACTUALIZATION: cannot parse as boolean");
}
SchemeNeedActualization = *value;
ExternalGuaranteeExclusivePK = features.Extract<bool>("EXTERNAL_GUARANTEE_EXCLUSIVE_PK");
ScanReaderPolicyName = features.Extract<TString>("SCAN_READER_POLICY_NAME");
if (ScanReaderPolicyName) {
if (*ScanReaderPolicyName != "PLAIN" && *ScanReaderPolicyName != "SIMPLE") {
return TConclusionStatus::Fail("SCAN_READER_POLICY_NAME have to be in ['PLAIN', 'SIMPLE']");
}
}
if (const auto className = features.Extract<TString>("COMPACTION_PLANNER.CLASS_NAME")) {
if (!CompactionPlannerConstructor.Initialize(*className)) {
return TConclusionStatus::Fail("incorrect class name for compaction planner:" + *className);
Expand Down Expand Up @@ -52,8 +57,8 @@ TConclusionStatus TUpsertOptionsOperation::DoDeserialize(NYql::TObjectSettingsIm

void TUpsertOptionsOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const {
schemaData.MutableOptions()->SetSchemeNeedActualization(SchemeNeedActualization);
if (ExternalGuaranteeExclusivePK) {
schemaData.MutableOptions()->SetExternalGuaranteeExclusivePK(*ExternalGuaranteeExclusivePK);
if (ScanReaderPolicyName) {
schemaData.MutableOptions()->SetScanReaderPolicyName(*ScanReaderPolicyName);
}
if (CompactionPlannerConstructor.HasObject()) {
CompactionPlannerConstructor.SerializeToProto(*schemaData.MutableOptions()->MutableCompactionPlannerConstructor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class TUpsertOptionsOperation: public ITableStoreOperation {
static inline const auto Registrator = TFactory::TRegistrator<TUpsertOptionsOperation>(GetTypeName());
private:
bool SchemeNeedActualization = false;
std::optional<bool> ExternalGuaranteeExclusivePK;
std::optional<TString> ScanReaderPolicyName;
NOlap::NStorageOptimizer::TOptimizerPlannerConstructorContainer CompactionPlannerConstructor;
NOlap::NDataAccessorControl::TMetadataManagerConstructorContainer MetadataManagerConstructor;
public:
Expand Down
27 changes: 27 additions & 0 deletions ydb/core/kqp/ut/olap/aggregations_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,33 @@ Y_UNIT_TEST_SUITE(KqpOlapAggregations) {
Cout << result << Endl;
CompareYson(result, R"([[23000u;]])");
}

{
auto alterQuery = TStringBuilder() <<
R"(
ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, `SCAN_READER_POLICY_NAME`=`SIMPLE`)
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

{
auto it = tableClient
.StreamExecuteScanQuery(R"(
--!syntax_v1
SELECT
COUNT(*)
FROM `/Root/olapStore/olapTable`
)")
.GetValueSync();

UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
TString result = StreamResultToYson(it);
Cout << result << Endl;
CompareYson(result, R"([[23000u;]])");
}
}

Y_UNIT_TEST(AggregationCountPushdown) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -516,9 +516,9 @@ message TMetadataManagerConstructorContainer {

message TColumnTableSchemeOptions {
optional bool SchemeNeedActualization = 1 [default = false];
optional bool ExternalGuaranteeExclusivePK = 2 [default = false];
optional TCompactionPlannerConstructorContainer CompactionPlannerConstructor = 3;
optional TMetadataManagerConstructorContainer MetadataManagerConstructor = 4;
optional string ScanReaderPolicyName = 5;
}

message TColumnTableSchema {
Expand Down Expand Up @@ -560,9 +560,9 @@ message TColumnTableSchemaDiff {

message TColumnTableRequestedOptions {
optional bool SchemeNeedActualization = 1 [default = false];
optional bool ExternalGuaranteeExclusivePK = 2;
optional TCompactionPlannerConstructorContainer CompactionPlannerConstructor = 3;
optional TMetadataManagerConstructorContainer MetadataManagerConstructor = 4;
optional string ScanReaderPolicyName = 5;
}

message TAlterColumnTableSchema {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,6 @@ class TReadMetadataBase {
return ResultIndexSchema;
}

bool HasGuaranteeExclusivePK() const {
return GetIndexInfo().GetExternalGuaranteeExclusivePK();
}

ISnapshotSchema::TPtr GetLoadSchemaVerified(const TPortionInfo& porition) const;

const std::shared_ptr<NArrow::TSchemaLite>& GetBlobSchema(const ui64 version) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ TConclusionStatus TStartMergeTask::DoExecuteImpl() {
break;
}
}
if ((MergingContext->IsExclusiveInterval() || Context->GetCommonContext()->GetReadMetadata()->HasGuaranteeExclusivePK()) &&
if ((MergingContext->IsExclusiveInterval()) &&
sourcesInMemory) {
TMemoryProfileGuard mGuard("SCAN_PROFILE::MERGE::EXCLUSIVE", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY));
auto& container = Sources.begin()->second->GetStageResult().GetBatch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ void TScanHead::OnIntervalResult(std::shared_ptr<NGroupedMemoryManager::TAllocat
}

TConclusionStatus TScanHead::Start() {
const bool guaranteeExclusivePK = Context->GetCommonContext()->GetReadMetadata()->HasGuaranteeExclusivePK();
TScanContext context;
for (auto itPoint = BorderPoints.begin(); itPoint != BorderPoints.end(); ++itPoint) {
auto& point = itPoint->second;
Expand All @@ -82,8 +81,7 @@ TConclusionStatus TScanHead::Start() {
}
const bool isExclusive = context.GetCurrentSources().size() == 1;
for (auto&& i : context.GetCurrentSources()) {
i.second->SetExclusiveIntervalOnly(
(isExclusive && i.second->GetExclusiveIntervalOnly() && !context.GetIsSpecialPoint()) || guaranteeExclusivePK);
i.second->SetExclusiveIntervalOnly((isExclusive && i.second->GetExclusiveIntervalOnly() && !context.GetIsSpecialPoint()));
}

for (auto&& i : point.GetFinishSources()) {
Expand Down
16 changes: 15 additions & 1 deletion ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,22 @@ void TTxScan::Complete(const TActorContext& ctx) {
read.PathId = request.GetLocalPathId();
read.ReadNothing = !Self->TablesManager.HasTable(read.PathId);
read.TableName = table;

const TString defaultReader =
AppDataVerified().ColumnShardConfig.GetReaderClassName() ? AppDataVerified().ColumnShardConfig.GetReaderClassName() : "PLAIN";
[&]() {
const TString defGlobal =
AppDataVerified().ColumnShardConfig.GetReaderClassName() ? AppDataVerified().ColumnShardConfig.GetReaderClassName() : "PLAIN";
if (Self->HasIndex()) {
return Self->GetIndexAs<TColumnEngineForLogs>()
.GetVersionedIndex()
.GetLastSchema()
->GetIndexInfo()
.GetScanReaderPolicyName()
.value_or(defGlobal);
} else {
return defGlobal;
}
}();
std::unique_ptr<IScannerConstructor> scannerConstructor = [&]() {
auto sysViewPolicy = NSysView::NAbstract::ISysViewPolicy::BuildByPath(read.TableName);
if (!sysViewPolicy) {
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/engines/scheme/index_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ std::shared_ptr<arrow::Schema> TIndexInfo::GetColumnSchema(const ui32 columnId)
void TIndexInfo::DeserializeOptionsFromProto(const NKikimrSchemeOp::TColumnTableSchemeOptions& optionsProto) {
TMemoryProfileGuard g("TIndexInfo::DeserializeFromProto::Options");
SchemeNeedActualization = optionsProto.GetSchemeNeedActualization();
ExternalGuaranteeExclusivePK = optionsProto.GetExternalGuaranteeExclusivePK();
if (optionsProto.HasScanReaderPolicyName()) {
ScanReaderPolicyName = optionsProto.GetScanReaderPolicyName();
}
if (optionsProto.HasCompactionPlannerConstructor()) {
auto container =
NStorageOptimizer::TOptimizerPlannerConstructorContainer::BuildFromProto(optionsProto.GetCompactionPlannerConstructor());
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/engines/scheme/index_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ struct TIndexInfo: public IIndexInfo {
bool SchemeNeedActualization = false;
std::shared_ptr<NStorageOptimizer::IOptimizerPlannerConstructor> CompactionPlannerConstructor;
std::shared_ptr<NDataAccessorControl::IManagerConstructor> MetadataManagerConstructor;
bool ExternalGuaranteeExclusivePK = false;
std::optional<TString> ScanReaderPolicyName;

ui64 Version = 0;
std::vector<ui32> SchemaColumnIds;
Expand Down Expand Up @@ -215,8 +215,8 @@ struct TIndexInfo: public IIndexInfo {
std::shared_ptr<arrow::Scalar> GetColumnExternalDefaultValueVerified(const ui32 colId) const;
std::shared_ptr<arrow::Scalar> GetColumnExternalDefaultValueByIndexVerified(const ui32 colIndex) const;

bool GetExternalGuaranteeExclusivePK() const {
return ExternalGuaranteeExclusivePK;
const std::optional<TString>& GetScanReaderPolicyName() const {
return ScanReaderPolicyName;
}

const TColumnFeatures& GetColumnFeaturesVerified(const ui32 columnId) const {
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/tx/schemeshard/olap/options/schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ namespace NKikimr::NSchemeShard {

bool TOlapOptionsDescription::ApplyUpdate(const TOlapOptionsUpdate& schemaUpdate, IErrorCollector& /*errors*/) {
SchemeNeedActualization = schemaUpdate.GetSchemeNeedActualization();
if (!!schemaUpdate.GetExternalGuaranteeExclusivePK()) {
ExternalGuaranteeExclusivePK = *schemaUpdate.GetExternalGuaranteeExclusivePK();
if (!!schemaUpdate.GetScanReaderPolicyName()) {
ScanReaderPolicyName = *schemaUpdate.GetScanReaderPolicyName();
}
if (schemaUpdate.GetCompactionPlannerConstructor().HasObject()) {
CompactionPlannerConstructor = schemaUpdate.GetCompactionPlannerConstructor();
Expand All @@ -18,8 +18,8 @@ bool TOlapOptionsDescription::ApplyUpdate(const TOlapOptionsUpdate& schemaUpdate

void TOlapOptionsDescription::Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema) {
SchemeNeedActualization = tableSchema.GetOptions().GetSchemeNeedActualization();
if (tableSchema.GetOptions().HasExternalGuaranteeExclusivePK()) {
ExternalGuaranteeExclusivePK = tableSchema.GetOptions().GetExternalGuaranteeExclusivePK();
if (tableSchema.GetOptions().HasScanReaderPolicyName()) {
ScanReaderPolicyName = tableSchema.GetOptions().GetScanReaderPolicyName();
}
if (tableSchema.GetOptions().HasCompactionPlannerConstructor()) {
AFL_VERIFY(CompactionPlannerConstructor.DeserializeFromProto(tableSchema.GetOptions().GetCompactionPlannerConstructor()));
Expand All @@ -31,8 +31,8 @@ void TOlapOptionsDescription::Parse(const NKikimrSchemeOp::TColumnTableSchema& t

void TOlapOptionsDescription::Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const {
tableSchema.MutableOptions()->SetSchemeNeedActualization(SchemeNeedActualization);
if (ExternalGuaranteeExclusivePK) {
tableSchema.MutableOptions()->SetExternalGuaranteeExclusivePK(ExternalGuaranteeExclusivePK);
if (ScanReaderPolicyName) {
tableSchema.MutableOptions()->SetScanReaderPolicyName(*ScanReaderPolicyName);
}
if (CompactionPlannerConstructor.HasObject()) {
CompactionPlannerConstructor.SerializeToProto(*tableSchema.MutableOptions()->MutableCompactionPlannerConstructor());
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/schemeshard/olap/options/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class TOlapSchema;
class TOlapOptionsDescription {
private:
YDB_READONLY(bool, SchemeNeedActualization, false);
YDB_READONLY(bool, ExternalGuaranteeExclusivePK, false);
YDB_READONLY_DEF(std::optional<TString>, ScanReaderPolicyName);
YDB_READONLY_DEF(NOlap::NStorageOptimizer::TOptimizerPlannerConstructorContainer, CompactionPlannerConstructor);
YDB_READONLY_DEF(NOlap::NDataAccessorControl::TMetadataManagerConstructorContainer, MetadataManagerConstructor);
public:
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/tx/schemeshard/olap/options/update.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ namespace NKikimr::NSchemeShard {
class TOlapOptionsUpdate {
private:
YDB_ACCESSOR(bool, SchemeNeedActualization, false);
YDB_ACCESSOR_DEF(std::optional<bool>, ExternalGuaranteeExclusivePK);
YDB_ACCESSOR_DEF(std::optional<TString>, ScanReaderPolicyName);
YDB_ACCESSOR_DEF(NOlap::NStorageOptimizer::TOptimizerPlannerConstructorContainer, CompactionPlannerConstructor);
YDB_ACCESSOR_DEF(NOlap::NDataAccessorControl::TMetadataManagerConstructorContainer, MetadataManagerConstructor);
public:
bool Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors) {
SchemeNeedActualization = alterRequest.GetOptions().GetSchemeNeedActualization();
if (alterRequest.GetOptions().HasExternalGuaranteeExclusivePK()) {
ExternalGuaranteeExclusivePK = alterRequest.GetOptions().GetExternalGuaranteeExclusivePK();
if (alterRequest.GetOptions().HasScanReaderPolicyName()) {
ScanReaderPolicyName = alterRequest.GetOptions().GetScanReaderPolicyName();
}
if (alterRequest.GetOptions().HasMetadataManagerConstructor()) {
auto container = NOlap::NDataAccessorControl::TMetadataManagerConstructorContainer::BuildFromProto(alterRequest.GetOptions().GetMetadataManagerConstructor());
Expand All @@ -41,8 +41,8 @@ class TOlapOptionsUpdate {
}
void SerializeToProto(NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest) const {
alterRequest.MutableOptions()->SetSchemeNeedActualization(SchemeNeedActualization);
if (ExternalGuaranteeExclusivePK) {
alterRequest.MutableOptions()->SetExternalGuaranteeExclusivePK(*ExternalGuaranteeExclusivePK);
if (ScanReaderPolicyName) {
alterRequest.MutableOptions()->SetScanReaderPolicyName(*ScanReaderPolicyName);
}
if (CompactionPlannerConstructor.HasObject()) {
CompactionPlannerConstructor.SerializeToProto(*alterRequest.MutableOptions()->MutableCompactionPlannerConstructor());
Expand Down

0 comments on commit 456154d

Please sign in to comment.