Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Library import 241128-1502 #12104

Merged
merged 9 commits into from
Nov 29, 2024
7 changes: 7 additions & 0 deletions build/conf/compilers/gnu_compiler.conf
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ when ($MSAN_TRACK_ORIGIN == "yes") {

when ($ARCH_XTENSA == "yes") {
FSTACK=
CFLAGS+=-Wno-c++14-extensions
when ($ARCH_XTENSA_HIFI4 == "yes") {
CFLAGS+=-Wno-c++1z-extensions
}
otherwise {
CFLAGS+=-Wno-c++17-extensions
}
}

when ($OS_EMSCRIPTEN == "yes") {
Expand Down
2 changes: 1 addition & 1 deletion ydb/ci/rightlib.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
23e9865bb938b83e7e32b670ba055c407f75494b
796e6186c6652f49958e68c7eb0f06c52827e702
2 changes: 1 addition & 1 deletion ydb/core/kqp/opt/kqp_opt_kql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,7 @@ TMaybe<TKqlQueryList> BuildKqlQuery(TKiDataQueryBlocks dataQueryBlocks, const TK
auto dataSource = typesCtx.DataSourceMap.FindPtr(dataSourceName);
YQL_ENSURE(dataSource);
if (auto dqIntegration = (*dataSource)->GetDqIntegration()) {
auto newRead = dqIntegration->WrapRead(NYql::TDqSettings(), input.Cast().Ptr(), ctx);
auto newRead = dqIntegration->WrapRead(input.Cast().Ptr(), ctx, {});
if (newRead.Get() != input.Raw()) {
return newRead;
}
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1092,7 +1092,10 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
// We prepare a lot of partitions and distribute them between these tasks
// Constraint of 1 task per partition is NOT valid anymore
auto maxTasksPerStage = Config->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage);
dqIntegration->Partition(NYql::TDqSettings(), maxTasksPerStage, source.Ref(), partitionParams, &clusterName, ctx, false);
IDqIntegration::TPartitionSettings pSettings;
pSettings.MaxPartitions = maxTasksPerStage;
pSettings.CanFallback = false;
dqIntegration->Partition(source.Ref(), partitionParams, &clusterName, ctx, pSettings);
externalSource.SetTaskParamKey(TString(dataSourceCategory));
for (const TString& partitionParam : partitionParams) {
externalSource.AddPartitionedTaskParams(partitionParam);
Expand Down
4 changes: 2 additions & 2 deletions ydb/library/yql/dq/opt/dq_opt_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ NNodes::TExprBase DqReplicateFieldSubset(NNodes::TExprBase node, TExprContext& c
return node;
}

IGraphTransformer::TStatus DqWrapIO(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const TDqSettings& config) {
IGraphTransformer::TStatus DqWrapIO(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const IDqIntegration::TWrapReadSettings& wrSettings) {
TOptimizeExprSettings settings{&typesCtx};
auto status = OptimizeExpr(input, output, [&](const TExprNode::TPtr& node, TExprContext& ctx) {
if (auto maybeRead = TMaybeNode<TCoRight>(node).Input()) {
Expand All @@ -345,7 +345,7 @@ IGraphTransformer::TStatus DqWrapIO(const TExprNode::TPtr& input, TExprNode::TPt
auto dataSource = typesCtx.DataSourceMap.FindPtr(dataSourceName);
YQL_ENSURE(dataSource);
if (auto dqIntegration = (*dataSource)->GetDqIntegration()) {
auto newRead = dqIntegration->WrapRead(config, maybeRead.Cast().Ptr(), ctx);
auto newRead = dqIntegration->WrapRead(maybeRead.Cast().Ptr(), ctx, wrSettings);
if (newRead.Get() != maybeRead.Raw()) {
return newRead;
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/library/yql/dq/opt/dq_opt_log.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <yql/essentials/core/dq_integration/yql_dq_integration.h>
#include <yql/essentials/ast/yql_expr.h>
#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
#include <yql/essentials/core/expr_nodes_gen/yql_expr_nodes_gen.h>
Expand All @@ -11,7 +12,6 @@
namespace NYql {
class IOptimizationContext;
struct TTypeAnnotationContext;
struct TDqSettings;
struct IProviderContext;
struct TRelOptimizerNode;
struct TOptimizerStatistics;
Expand All @@ -38,7 +38,7 @@ NNodes::TExprBase DqSqlInDropCompact(NNodes::TExprBase node, TExprContext& ctx);

NNodes::TExprBase DqReplicateFieldSubset(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parents);

IGraphTransformer::TStatus DqWrapIO(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const TDqSettings& config);
IGraphTransformer::TStatus DqWrapIO(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const IDqIntegration::TWrapReadSettings& wrSettings);

NNodes::TExprBase DqExpandMatchRecognize(NNodes::TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class TClickHouseDqIntegration: public TDqIntegrationBase {
return Nothing();
}

TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override {
TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings& ) override {
if (const auto maybeClReadTable = TMaybeNode<TClReadTable>(read)) {
const auto clReadTable = maybeClReadTable.Cast();
const auto token = TString("cluster:default_") += clReadTable.DataSource().Cluster().StringValue();
Expand Down Expand Up @@ -66,7 +66,7 @@ class TClickHouseDqIntegration: public TDqIntegrationBase {
return read;
}

ui64 Partition(const TDqSettings&, size_t, const TExprNode&, TVector<TString>& partitions, TString*, TExprContext&, bool) override {
ui64 Partition(const TExprNode&, TVector<TString>& partitions, TString*, TExprContext&, const TPartitionSettings&) override {
partitions.clear();
NCH::TRange range;
// range.SetRange("limit 42 offset 42 order by ...."); // Possible set range like this.
Expand Down
10 changes: 8 additions & 2 deletions ydb/library/yql/providers/dq/planner/execution_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,13 @@ namespace NYql::NDqs {
TVector<TString> parts;
if (auto dqIntegration = (*datasource)->GetDqIntegration()) {
TString clusterName;
_MaxDataSizePerJob = Max(_MaxDataSizePerJob, dqIntegration->Partition(*Settings, maxPartitions, *read, parts, &clusterName, ExprContext, canFallback));
IDqIntegration::TPartitionSettings settings {
.DataSizePerJob = Settings->DataSizePerJob.Get(),
.MaxPartitions = maxPartitions,
.EnableComputeActor = Settings->EnableComputeActor.Get(),
.CanFallback = canFallback
};
_MaxDataSizePerJob = Max(_MaxDataSizePerJob, dqIntegration->Partition(*read, parts, &clusterName, ExprContext, settings));
TMaybe<::google::protobuf::Any> sourceSettings;
TString sourceType;
if (dqSource) {
Expand Down Expand Up @@ -585,7 +591,7 @@ namespace NYql::NDqs {
YQL_ENSURE(dataSource);
auto dqIntegration = (*dataSource)->GetDqIntegration();
YQL_ENSURE(dqIntegration);

google::protobuf::Any providerSpecificLookupSourceSettings;
TString sourceType;
dqIntegration->FillLookupSourceSettings(*rightInput.Raw(), providerSpecificLookupSourceSettings, sourceType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,13 @@ class TDqsRecaptureTransformer : public TSyncTransformerBase {

State_->TypeCtx->DqFallbackPolicy = State_->Settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default);

IGraphTransformer::TStatus status = NDq::DqWrapIO(input, output, ctx, *State_->TypeCtx, *State_->Settings);
IDqIntegration::TWrapReadSettings wrSettings {
.WatermarksMode = State_->Settings->WatermarksMode.Get(),
.WatermarksGranularityMs = State_->Settings->WatermarksGranularityMs.Get(),
.WatermarksLateArrivalDelayMs = State_->Settings->WatermarksLateArrivalDelayMs.Get(),
.WatermarksEnableIdlePartitions = State_->Settings->WatermarksEnableIdlePartitions.Get()
};
IGraphTransformer::TStatus status = NDq::DqWrapIO(input, output, ctx, *State_->TypeCtx, wrSettings);
if (input != output) {
YQL_CLOG(INFO, ProviderDq) << "DqsRecapture";
// TODO: Add before/after recapture transformers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class TBuildDqSourceSettingsTransformer: public TOptimizeTransformerBase {
UNIT_ASSERT(genericDataSource != Types->DataSourceMap.end());
auto dqIntegration = genericDataSource->second->GetDqIntegration();
UNIT_ASSERT(dqIntegration);
auto newRead = dqIntegration->WrapRead(TDqSettings(), input.Ptr(), ctx);
auto newRead = dqIntegration->WrapRead(input.Ptr(), ctx, IDqIntegration::TWrapReadSettings{});
BuildSettings(newRead, dqIntegration, ctx);
return newRead;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ namespace NYql {
return Nothing();
}

TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override {
TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings&) override {
if (const auto maybeGenReadTable = TMaybeNode<TGenReadTable>(read)) {
const auto genReadTable = maybeGenReadTable.Cast();
YQL_ENSURE(genReadTable.Ref().GetTypeAnn(), "No type annotation for node " << genReadTable.Ref().Content());
Expand Down Expand Up @@ -106,8 +106,7 @@ namespace NYql {
return read;
}

ui64 Partition(const TDqSettings&, size_t, const TExprNode&, TVector<TString>& partitions, TString*, TExprContext&,
bool) override {
ui64 Partition(const TExprNode&, TVector<TString>& partitions, TString*, TExprContext&, const TPartitionSettings&) override {
partitions.clear();
Generic::TRange range;
partitions.emplace_back();
Expand Down
30 changes: 14 additions & 16 deletions ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,20 @@ class TPqDqIntegration: public TDqIntegrationBase {
return 0;
}

ui64 Partition(const TDqSettings&, size_t maxPartitions, const TExprNode& node, TVector<TString>& partitions, TString*, TExprContext&, bool) override {
ui64 Partition(const TExprNode& node, TVector<TString>& partitions, TString*, TExprContext&, const TPartitionSettings& settings) override {
if (auto maybePqRead = TMaybeNode<TPqReadTopic>(&node)) {
return PartitionTopicRead(maybePqRead.Cast().Topic(), maxPartitions, partitions);
return PartitionTopicRead(maybePqRead.Cast().Topic(), settings.MaxPartitions, partitions);
}
if (auto maybeDqSource = TMaybeNode<TDqSource>(&node)) {
auto settings = maybeDqSource.Cast().Settings();
if (auto topicSource = TMaybeNode<TDqPqTopicSource>(settings.Raw())) {
return PartitionTopicRead(topicSource.Cast().Topic(), maxPartitions, partitions);
auto srcSettings = maybeDqSource.Cast().Settings();
if (auto topicSource = TMaybeNode<TDqPqTopicSource>(srcSettings.Raw())) {
return PartitionTopicRead(topicSource.Cast().Topic(), settings.MaxPartitions, partitions);
}
}
return 0;
}

TExprNode::TPtr WrapRead(const TDqSettings& dqSettings, const TExprNode::TPtr& read, TExprContext& ctx) override {
TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings& wrSettings) override {
if (const auto& maybePqReadTopic = TMaybeNode<TPqReadTopic>(read)) {
const auto& pqReadTopic = maybePqReadTopic.Cast();
YQL_ENSURE(pqReadTopic.Ref().GetTypeAnn(), "No type annotation for node " << pqReadTopic.Ref().Content());
Expand Down Expand Up @@ -127,7 +127,7 @@ class TPqDqIntegration: public TDqIntegrationBase {

const auto& typeItems = pqReadTopic.Topic().RowSpec().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>()->GetItems();
const auto pos = read->Pos();

TExprNode::TListType colNames;
colNames.reserve(typeItems.size());
std::transform(typeItems.cbegin(), typeItems.cend(), std::back_inserter(colNames),
Expand All @@ -146,7 +146,7 @@ class TPqDqIntegration: public TDqIntegrationBase {
.World(pqReadTopic.World())
.Topic(pqReadTopic.Topic())
.Columns(std::move(columnNames))
.Settings(BuildTopicReadSettings(clusterName, dqSettings, read->Pos(), format, ctx))
.Settings(BuildTopicReadSettings(clusterName, wrSettings, read->Pos(), format, ctx))
.Token<TCoSecureParam>()
.Name().Build(token)
.Build()
Expand Down Expand Up @@ -325,7 +325,7 @@ class TPqDqIntegration: public TDqIntegrationBase {

NNodes::TCoNameValueTupleList BuildTopicReadSettings(
const TString& cluster,
const TDqSettings& dqSettings,
const IDqIntegration::TWrapReadSettings& wrSettings,
TPositionHandle pos,
std::string_view format,
TExprContext& ctx) const
Expand All @@ -349,7 +349,7 @@ class TPqDqIntegration: public TDqIntegrationBase {
Add(props, ReconnectPeriod, ToString(clusterConfiguration->ReconnectPeriod), pos, ctx);
Add(props, Format, format, pos, ctx);


if (clusterConfiguration->UseSsl) {
Add(props, UseSslSetting, "1", pos, ctx);
}
Expand All @@ -358,23 +358,21 @@ class TPqDqIntegration: public TDqIntegrationBase {
Add(props, AddBearerToTokenSetting, "1", pos, ctx);
}

if (dqSettings.WatermarksMode.Get().GetOrElse("") == "default") {
if (wrSettings.WatermarksMode.GetOrElse("") == "default") {
Add(props, WatermarksEnableSetting, ToString(true), pos, ctx);

const auto granularity = TDuration::MilliSeconds(dqSettings
const auto granularity = TDuration::MilliSeconds(wrSettings
.WatermarksGranularityMs
.Get()
.GetOrElse(TDqSettings::TDefault::WatermarksGranularityMs));
Add(props, WatermarksGranularityUsSetting, ToString(granularity.MicroSeconds()), pos, ctx);

const auto lateArrivalDelay = TDuration::MilliSeconds(dqSettings
const auto lateArrivalDelay = TDuration::MilliSeconds(wrSettings
.WatermarksLateArrivalDelayMs
.Get()
.GetOrElse(TDqSettings::TDefault::WatermarksLateArrivalDelayMs));
Add(props, WatermarksLateArrivalDelayUsSetting, ToString(lateArrivalDelay.MicroSeconds()), pos, ctx);
}

if (dqSettings.WatermarksEnableIdlePartitions.Get().GetOrElse(false)) {
if (wrSettings.WatermarksEnableIdlePartitions.GetOrElse(false)) {
Add(props, WatermarksIdlePartitionsSetting, ToString(true), pos, ctx);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class TS3DqIntegration: public TDqIntegrationBase {
{
}

ui64 Partition(const TDqSettings&, size_t maxPartitions, const TExprNode& node, TVector<TString>& partitions, TString*, TExprContext&, bool) override {
ui64 Partition(const TExprNode& node, TVector<TString>& partitions, TString*, TExprContext&, const TPartitionSettings& settings) override {
std::vector<std::vector<TPath>> parts;
std::optional<ui64> mbLimitHint;
bool hasDirectories = false;
Expand All @@ -108,6 +108,7 @@ class TS3DqIntegration: public TDqIntegrationBase {
}

constexpr ui64 maxTaskRatio = 20;
auto maxPartitions = settings.MaxPartitions;
if (!maxPartitions || (mbLimitHint && maxPartitions > *mbLimitHint / maxTaskRatio)) {
maxPartitions = std::max(*mbLimitHint / maxTaskRatio, ui64{1});
YQL_CLOG(TRACE, ProviderS3) << "limited max partitions to " << maxPartitions;
Expand Down Expand Up @@ -223,15 +224,15 @@ class TS3DqIntegration: public TDqIntegrationBase {
}

rows = size / 1024; // magic estimate
return primaryKey
return primaryKey
? TOptimizerStatistics(BaseTable, rows, cols, size, size, TIntrusivePtr<TOptimizerStatistics::TKeyColumns>(new TOptimizerStatistics::TKeyColumns(*primaryKey)))
: TOptimizerStatistics(BaseTable, rows, cols, size, size);
} else {
return Nothing();
}
}

TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override {
TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings& ) override {
if (const auto& maybeS3ReadObject = TMaybeNode<TS3ReadObject>(read)) {
const auto& s3ReadObject = maybeS3ReadObject.Cast();
YQL_ENSURE(s3ReadObject.Ref().GetTypeAnn(), "No type annotation for node " << s3ReadObject.Ref().Content());
Expand Down Expand Up @@ -394,7 +395,7 @@ class TS3DqIntegration: public TDqIntegrationBase {
TExprContext ctx;
srcDesc.SetRowType(NCommon::WriteTypeToYson(ctx.MakeType<TStructExprType>(rowTypeItems), NYT::NYson::EYsonFormat::Text));
}

if (auto predicate = parseSettings.FilterPredicate(); !IsEmptyFilterPredicate(predicate)) {
TStringBuilder err;
if (!SerializeFilterPredicate(predicate, srcDesc.mutable_predicate(), err)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ class TSolomonDqIntegration: public TDqIntegrationBase {
{
}

ui64 Partition(const TDqSettings&, size_t maxPartitions, const TExprNode& node, TVector<TString>& partitions, TString*, TExprContext&, bool) override {
Y_UNUSED(maxPartitions);
ui64 Partition(const TExprNode& node, TVector<TString>& partitions, TString*, TExprContext&, const TPartitionSettings&) override {
Y_UNUSED(node);
Y_UNUSED(partitions);
partitions.push_back("zz_partition");
Expand All @@ -95,7 +94,7 @@ class TSolomonDqIntegration: public TDqIntegrationBase {
YQL_ENSURE(false, "Unimplemented");
}

TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override {
TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings&) override {
if (const auto& maybeSoReadObject = TMaybeNode<TSoReadObject>(read)) {
const auto& soReadObject = maybeSoReadObject.Cast();
YQL_ENSURE(soReadObject.Ref().GetTypeAnn(), "No type annotation for node " << soReadObject.Ref().Content());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ class TYdbDqIntegration: public TDqIntegrationBase {
{
}

ui64 Partition(const TDqSettings& settings, size_t maxPartitions, const TExprNode& node,
TVector<TString>& partitions, TString*, TExprContext&, bool) override {
ui64 Partition(const TExprNode& node, TVector<TString>& partitions, TString*, TExprContext&, const TPartitionSettings& settings) override {
TString cluster, table;
if (const TMaybeNode<TDqSource> source = &node) {
cluster = source.Cast().DataSource().Cast<TYdbDataSource>().Cluster().Value();
Expand All @@ -35,9 +34,10 @@ class TYdbDqIntegration: public TDqIntegrationBase {
}

auto& meta = State_->Tables[std::make_pair(cluster, table)];
meta.ReadAsync = settings.EnableComputeActor.Get().GetOrElse(false); // TODO: Use special method for get settings.
meta.ReadAsync = settings.EnableComputeActor.GetOrElse(false); // TODO: Use special method for get settings.
auto parts = meta.Partitions;

auto maxPartitions = settings.MaxPartitions;
if (maxPartitions && parts.size() > maxPartitions) {
if (const auto extraParts = parts.size() - maxPartitions; extraParts > maxPartitions) {
const auto dropsPerTask = (parts.size() - 1ULL) / maxPartitions;
Expand Down Expand Up @@ -80,7 +80,7 @@ class TYdbDqIntegration: public TDqIntegrationBase {
return Nothing();
}

TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override {
TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings&) override {
if (const auto& maybeYdbReadTable = TMaybeNode<TYdbReadTable>(read)) {
const auto& ydbReadTable = maybeYdbReadTable.Cast();
YQL_ENSURE(ydbReadTable.Ref().GetTypeAnn(), "No type annotation for node " << ydbReadTable.Ref().Content());
Expand Down
Loading
Loading