Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scan policy sql control #12400

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ TConclusionStatus TUpsertOptionsOperation::DoDeserialize(NYql::TObjectSettingsIm
return TConclusionStatus::Fail("Incorrect value for SCHEME_NEED_ACTUALIZATION: cannot parse as boolean");
}
SchemeNeedActualization = *value;
ExternalGuaranteeExclusivePK = features.Extract<bool>("EXTERNAL_GUARANTEE_EXCLUSIVE_PK");
ScanReaderPolicyName = features.Extract<TString>("SCAN_READER_POLICY_NAME");
if (ScanReaderPolicyName) {
if (*ScanReaderPolicyName != "PLAIN" && *ScanReaderPolicyName != "SIMPLE") {
return TConclusionStatus::Fail("SCAN_READER_POLICY_NAME have to be in ['PLAIN', 'SIMPLE']");
}
}
if (const auto className = features.Extract<TString>("COMPACTION_PLANNER.CLASS_NAME")) {
if (!CompactionPlannerConstructor.Initialize(*className)) {
return TConclusionStatus::Fail("incorrect class name for compaction planner:" + *className);
Expand Down Expand Up @@ -52,8 +57,8 @@ TConclusionStatus TUpsertOptionsOperation::DoDeserialize(NYql::TObjectSettingsIm

void TUpsertOptionsOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const {
schemaData.MutableOptions()->SetSchemeNeedActualization(SchemeNeedActualization);
if (ExternalGuaranteeExclusivePK) {
schemaData.MutableOptions()->SetExternalGuaranteeExclusivePK(*ExternalGuaranteeExclusivePK);
if (ScanReaderPolicyName) {
schemaData.MutableOptions()->SetScanReaderPolicyName(*ScanReaderPolicyName);
}
if (CompactionPlannerConstructor.HasObject()) {
CompactionPlannerConstructor.SerializeToProto(*schemaData.MutableOptions()->MutableCompactionPlannerConstructor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class TUpsertOptionsOperation: public ITableStoreOperation {
static inline const auto Registrator = TFactory::TRegistrator<TUpsertOptionsOperation>(GetTypeName());
private:
bool SchemeNeedActualization = false;
std::optional<bool> ExternalGuaranteeExclusivePK;
std::optional<TString> ScanReaderPolicyName;
NOlap::NStorageOptimizer::TOptimizerPlannerConstructorContainer CompactionPlannerConstructor;
NOlap::NDataAccessorControl::TMetadataManagerConstructorContainer MetadataManagerConstructor;
public:
Expand Down
27 changes: 27 additions & 0 deletions ydb/core/kqp/ut/olap/aggregations_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,33 @@ Y_UNIT_TEST_SUITE(KqpOlapAggregations) {
Cout << result << Endl;
CompareYson(result, R"([[23000u;]])");
}

{
auto alterQuery = TStringBuilder() <<
R"(
ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, `SCAN_READER_POLICY_NAME`=`SIMPLE`)
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

{
auto it = tableClient
.StreamExecuteScanQuery(R"(
--!syntax_v1

SELECT
COUNT(*)
FROM `/Root/olapStore/olapTable`
)")
.GetValueSync();

UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
TString result = StreamResultToYson(it);
Cout << result << Endl;
CompareYson(result, R"([[23000u;]])");
}
}

Y_UNIT_TEST(AggregationCountPushdown) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -516,9 +516,9 @@ message TMetadataManagerConstructorContainer {

message TColumnTableSchemeOptions {
optional bool SchemeNeedActualization = 1 [default = false];
optional bool ExternalGuaranteeExclusivePK = 2 [default = false];
optional TCompactionPlannerConstructorContainer CompactionPlannerConstructor = 3;
optional TMetadataManagerConstructorContainer MetadataManagerConstructor = 4;
optional string ScanReaderPolicyName = 5;
}

message TColumnTableSchema {
Expand Down Expand Up @@ -560,9 +560,9 @@ message TColumnTableSchemaDiff {

message TColumnTableRequestedOptions {
optional bool SchemeNeedActualization = 1 [default = false];
optional bool ExternalGuaranteeExclusivePK = 2;
optional TCompactionPlannerConstructorContainer CompactionPlannerConstructor = 3;
optional TMetadataManagerConstructorContainer MetadataManagerConstructor = 4;
optional string ScanReaderPolicyName = 5;
}

message TAlterColumnTableSchema {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,6 @@ struct TReadMetadataBase {
return ResultIndexSchema;
}

bool HasGuaranteeExclusivePK() const {
return GetIndexInfo().GetExternalGuaranteeExclusivePK();
}

ISnapshotSchema::TPtr GetLoadSchemaVerified(const TPortionInfo& porition) const;

const std::shared_ptr<NArrow::TSchemaLite>& GetBlobSchema(const ui64 version) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ TConclusionStatus TStartMergeTask::DoExecuteImpl() {
break;
}
}
if ((MergingContext->IsExclusiveInterval() || Context->GetCommonContext()->GetReadMetadata()->HasGuaranteeExclusivePK()) &&
if ((MergingContext->IsExclusiveInterval()) &&
sourcesInMemory) {
TMemoryProfileGuard mGuard("SCAN_PROFILE::MERGE::EXCLUSIVE", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY));
auto& container = Sources.begin()->second->GetStageResult().GetBatch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ void TScanHead::OnIntervalResult(std::shared_ptr<NGroupedMemoryManager::TAllocat
}

TConclusionStatus TScanHead::Start() {
const bool guaranteeExclusivePK = Context->GetCommonContext()->GetReadMetadata()->HasGuaranteeExclusivePK();
TScanContext context;
for (auto itPoint = BorderPoints.begin(); itPoint != BorderPoints.end(); ++itPoint) {
auto& point = itPoint->second;
Expand All @@ -82,8 +81,7 @@ TConclusionStatus TScanHead::Start() {
}
const bool isExclusive = context.GetCurrentSources().size() == 1;
for (auto&& i : context.GetCurrentSources()) {
i.second->SetExclusiveIntervalOnly(
(isExclusive && i.second->GetExclusiveIntervalOnly() && !context.GetIsSpecialPoint()) || guaranteeExclusivePK);
i.second->SetExclusiveIntervalOnly((isExclusive && i.second->GetExclusiveIntervalOnly() && !context.GetIsSpecialPoint()));
}

for (auto&& i : point.GetFinishSources()) {
Expand Down
16 changes: 15 additions & 1 deletion ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,22 @@ void TTxScan::Complete(const TActorContext& ctx) {
read.PathId = request.GetLocalPathId();
read.ReadNothing = !Self->TablesManager.HasTable(read.PathId);
read.TableName = table;

const TString defaultReader =
AppDataVerified().ColumnShardConfig.GetReaderClassName() ? AppDataVerified().ColumnShardConfig.GetReaderClassName() : "PLAIN";
[&]() {
const TString defGlobal =
AppDataVerified().ColumnShardConfig.GetReaderClassName() ? AppDataVerified().ColumnShardConfig.GetReaderClassName() : "PLAIN";
if (Self->HasIndex()) {
return Self->GetIndexAs<TColumnEngineForLogs>()
.GetVersionedIndex()
.GetLastSchema()
->GetIndexInfo()
.GetScanReaderPolicyName()
.value_or(defGlobal);
} else {
return defGlobal;
}
}();
std::unique_ptr<IScannerConstructor> scannerConstructor = [&]() {
auto sysViewPolicy = NSysView::NAbstract::ISysViewPolicy::BuildByPath(read.TableName);
if (!sysViewPolicy) {
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/engines/scheme/index_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ std::shared_ptr<arrow::Schema> TIndexInfo::GetColumnSchema(const ui32 columnId)
void TIndexInfo::DeserializeOptionsFromProto(const NKikimrSchemeOp::TColumnTableSchemeOptions& optionsProto) {
TMemoryProfileGuard g("TIndexInfo::DeserializeFromProto::Options");
SchemeNeedActualization = optionsProto.GetSchemeNeedActualization();
ExternalGuaranteeExclusivePK = optionsProto.GetExternalGuaranteeExclusivePK();
if (optionsProto.HasScanReaderPolicyName()) {
ScanReaderPolicyName = optionsProto.GetScanReaderPolicyName();
}
if (optionsProto.HasCompactionPlannerConstructor()) {
auto container =
NStorageOptimizer::TOptimizerPlannerConstructorContainer::BuildFromProto(optionsProto.GetCompactionPlannerConstructor());
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/engines/scheme/index_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ struct TIndexInfo: public IIndexInfo {
bool SchemeNeedActualization = false;
std::shared_ptr<NStorageOptimizer::IOptimizerPlannerConstructor> CompactionPlannerConstructor;
std::shared_ptr<NDataAccessorControl::IManagerConstructor> MetadataManagerConstructor;
bool ExternalGuaranteeExclusivePK = false;
std::optional<TString> ScanReaderPolicyName;

ui64 Version = 0;
std::vector<ui32> SchemaColumnIds;
Expand Down Expand Up @@ -215,8 +215,8 @@ struct TIndexInfo: public IIndexInfo {
std::shared_ptr<arrow::Scalar> GetColumnExternalDefaultValueVerified(const ui32 colId) const;
std::shared_ptr<arrow::Scalar> GetColumnExternalDefaultValueByIndexVerified(const ui32 colIndex) const;

bool GetExternalGuaranteeExclusivePK() const {
return ExternalGuaranteeExclusivePK;
const std::optional<TString>& GetScanReaderPolicyName() const {
return ScanReaderPolicyName;
}

const TColumnFeatures& GetColumnFeaturesVerified(const ui32 columnId) const {
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/tx/schemeshard/olap/options/schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ namespace NKikimr::NSchemeShard {

bool TOlapOptionsDescription::ApplyUpdate(const TOlapOptionsUpdate& schemaUpdate, IErrorCollector& /*errors*/) {
SchemeNeedActualization = schemaUpdate.GetSchemeNeedActualization();
if (!!schemaUpdate.GetExternalGuaranteeExclusivePK()) {
ExternalGuaranteeExclusivePK = *schemaUpdate.GetExternalGuaranteeExclusivePK();
if (!!schemaUpdate.GetScanReaderPolicyName()) {
ScanReaderPolicyName = *schemaUpdate.GetScanReaderPolicyName();
}
if (schemaUpdate.GetCompactionPlannerConstructor().HasObject()) {
CompactionPlannerConstructor = schemaUpdate.GetCompactionPlannerConstructor();
Expand All @@ -18,8 +18,8 @@ bool TOlapOptionsDescription::ApplyUpdate(const TOlapOptionsUpdate& schemaUpdate

void TOlapOptionsDescription::Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema) {
SchemeNeedActualization = tableSchema.GetOptions().GetSchemeNeedActualization();
if (tableSchema.GetOptions().HasExternalGuaranteeExclusivePK()) {
ExternalGuaranteeExclusivePK = tableSchema.GetOptions().GetExternalGuaranteeExclusivePK();
if (tableSchema.GetOptions().HasScanReaderPolicyName()) {
ScanReaderPolicyName = tableSchema.GetOptions().GetScanReaderPolicyName();
}
if (tableSchema.GetOptions().HasCompactionPlannerConstructor()) {
AFL_VERIFY(CompactionPlannerConstructor.DeserializeFromProto(tableSchema.GetOptions().GetCompactionPlannerConstructor()));
Expand All @@ -31,8 +31,8 @@ void TOlapOptionsDescription::Parse(const NKikimrSchemeOp::TColumnTableSchema& t

void TOlapOptionsDescription::Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const {
tableSchema.MutableOptions()->SetSchemeNeedActualization(SchemeNeedActualization);
if (ExternalGuaranteeExclusivePK) {
tableSchema.MutableOptions()->SetExternalGuaranteeExclusivePK(ExternalGuaranteeExclusivePK);
if (ScanReaderPolicyName) {
tableSchema.MutableOptions()->SetScanReaderPolicyName(*ScanReaderPolicyName);
}
if (CompactionPlannerConstructor.HasObject()) {
CompactionPlannerConstructor.SerializeToProto(*tableSchema.MutableOptions()->MutableCompactionPlannerConstructor());
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/schemeshard/olap/options/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class TOlapSchema;
class TOlapOptionsDescription {
private:
YDB_READONLY(bool, SchemeNeedActualization, false);
YDB_READONLY(bool, ExternalGuaranteeExclusivePK, false);
YDB_READONLY_DEF(std::optional<TString>, ScanReaderPolicyName);
YDB_READONLY_DEF(NOlap::NStorageOptimizer::TOptimizerPlannerConstructorContainer, CompactionPlannerConstructor);
YDB_READONLY_DEF(NOlap::NDataAccessorControl::TMetadataManagerConstructorContainer, MetadataManagerConstructor);
public:
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/tx/schemeshard/olap/options/update.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ namespace NKikimr::NSchemeShard {
class TOlapOptionsUpdate {
private:
YDB_ACCESSOR(bool, SchemeNeedActualization, false);
YDB_ACCESSOR_DEF(std::optional<bool>, ExternalGuaranteeExclusivePK);
YDB_ACCESSOR_DEF(std::optional<TString>, ScanReaderPolicyName);
YDB_ACCESSOR_DEF(NOlap::NStorageOptimizer::TOptimizerPlannerConstructorContainer, CompactionPlannerConstructor);
YDB_ACCESSOR_DEF(NOlap::NDataAccessorControl::TMetadataManagerConstructorContainer, MetadataManagerConstructor);
public:
bool Parse(const NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest, IErrorCollector& errors) {
SchemeNeedActualization = alterRequest.GetOptions().GetSchemeNeedActualization();
if (alterRequest.GetOptions().HasExternalGuaranteeExclusivePK()) {
ExternalGuaranteeExclusivePK = alterRequest.GetOptions().GetExternalGuaranteeExclusivePK();
if (alterRequest.GetOptions().HasScanReaderPolicyName()) {
ScanReaderPolicyName = alterRequest.GetOptions().GetScanReaderPolicyName();
}
if (alterRequest.GetOptions().HasMetadataManagerConstructor()) {
auto container = NOlap::NDataAccessorControl::TMetadataManagerConstructorContainer::BuildFromProto(alterRequest.GetOptions().GetMetadataManagerConstructor());
Expand All @@ -41,8 +41,8 @@ class TOlapOptionsUpdate {
}
void SerializeToProto(NKikimrSchemeOp::TAlterColumnTableSchema& alterRequest) const {
alterRequest.MutableOptions()->SetSchemeNeedActualization(SchemeNeedActualization);
if (ExternalGuaranteeExclusivePK) {
alterRequest.MutableOptions()->SetExternalGuaranteeExclusivePK(*ExternalGuaranteeExclusivePK);
if (ScanReaderPolicyName) {
alterRequest.MutableOptions()->SetScanReaderPolicyName(*ScanReaderPolicyName);
}
if (CompactionPlannerConstructor.HasObject()) {
CompactionPlannerConstructor.SerializeToProto(*alterRequest.MutableOptions()->MutableCompactionPlannerConstructor());
Expand Down
Loading