From 4ba1f3d99a3e5499a3a0fa5c22a4628fe41f06f4 Mon Sep 17 00:00:00 2001 From: koryabkin Date: Thu, 28 Nov 2024 14:09:40 +0300 Subject: [PATCH 1/8] Remove warnings for newer C++ standard features on Xtensa toolchains commit_hash:f9018bf14e08601272186d3e525f6fe686475c88 --- build/conf/compilers/gnu_compiler.conf | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/build/conf/compilers/gnu_compiler.conf b/build/conf/compilers/gnu_compiler.conf index 64be422797e8..8dc3e376c7fc 100644 --- a/build/conf/compilers/gnu_compiler.conf +++ b/build/conf/compilers/gnu_compiler.conf @@ -84,6 +84,13 @@ when ($MSAN_TRACK_ORIGIN == "yes") { when ($ARCH_XTENSA == "yes") { FSTACK= + CFLAGS+=-Wno-c++14-extensions + when ($ARCH_XTENSA_HIFI4 == "yes") { + CFLAGS+=-Wno-c++1z-extensions + } + otherwise { + CFLAGS+=-Wno-c++17-extensions + } } when ($OS_EMSCRIPTEN == "yes") { From 667d7a8073070f148e1adc7650b55e7a7ef33439 Mon Sep 17 00:00:00 2001 From: ermolovd Date: Thu, 28 Nov 2024 15:34:32 +0300 Subject: [PATCH 2/8] YT-23645: mapreduce client can use yt/core log manager commit_hash:7007a3c7f56a6f271073811160f00f03162aaaa6 --- yt/cpp/mapreduce/client/init.cpp | 22 +++++++------ yt/cpp/mapreduce/interface/config.cpp | 1 + yt/cpp/mapreduce/interface/config.h | 12 +++++++ yt/cpp/mapreduce/interface/logging/logger.cpp | 7 ++++- yt/cpp/mapreduce/interface/logging/logger.h | 4 +++ yt/cpp/mapreduce/interface/logging/yt_log.cpp | 31 +++++++++++++------ 6 files changed, 57 insertions(+), 20 deletions(-) diff --git a/yt/cpp/mapreduce/client/init.cpp b/yt/cpp/mapreduce/client/init.cpp index 242bbcb2bb19..6121952f868b 100644 --- a/yt/cpp/mapreduce/client/init.cpp +++ b/yt/cpp/mapreduce/client/init.cpp @@ -166,27 +166,32 @@ NLogging::ELogLevel ToCoreLogLevel(ILogger::ELevel level) Y_ABORT(); } -void CommonInitialize(int, const char**) +void CommonInitialize(TGuard& g) { auto logLevelStr = to_lower(TConfig::Get()->LogLevel); ILogger::ELevel logLevel; if (!TryFromString(logLevelStr, logLevel)) { Cerr << "Invalid log level: " << TConfig::Get()->LogLevel << Endl; + g.Release(); exit(1); } auto logPath = TConfig::Get()->LogPath; - ILoggerPtr logger; if (logPath.empty()) { - logger = CreateStdErrLogger(logLevel); + if (TConfig::Get()->LogUseCore) { + auto coreLoggingConfig = NLogging::TLogManagerConfig::CreateStderrLogger(ToCoreLogLevel(logLevel)); + NLogging::TLogManager::Get()->Configure(coreLoggingConfig); + SetUseCoreLog(); + } else { + auto logger = CreateStdErrLogger(logLevel); + SetLogger(logger); + } } else { - logger = CreateFileLogger(logLevel, logPath, /*append*/ true); - auto coreLoggingConfig = NLogging::TLogManagerConfig::CreateLogFile(logPath, ToCoreLogLevel(logLevel)); NLogging::TLogManager::Get()->Configure(coreLoggingConfig); + SetUseCoreLog(); } - SetLogger(logger); } void NonJobInitialize(const TInitializeOptions& options) @@ -281,8 +286,7 @@ void JoblessInitialize(const TInitializeOptions& options) { auto g = Guard(InitializeLock); - static const char* fakeArgv[] = {"unknown..."}; - NDetail::CommonInitialize(1, fakeArgv); + NDetail::CommonInitialize(g); NDetail::NonJobInitialize(options); NDetail::ElevateInitStatus(NDetail::EInitStatus::JoblessInitialization); } @@ -291,7 +295,7 @@ void Initialize(int argc, const char* argv[], const TInitializeOptions& options) { auto g = Guard(InitializeLock); - NDetail::CommonInitialize(argc, argv); + NDetail::CommonInitialize(g); NDetail::ElevateInitStatus(NDetail::EInitStatus::FullInitialization); diff --git a/yt/cpp/mapreduce/interface/config.cpp b/yt/cpp/mapreduce/interface/config.cpp index b1f546f7a4e8..0ed5cb57f8ea 100644 --- a/yt/cpp/mapreduce/interface/config.cpp +++ b/yt/cpp/mapreduce/interface/config.cpp @@ -194,6 +194,7 @@ void TConfig::Reset() ApiVersion = GetEnv("YT_VERSION", "v3"); LogLevel = GetEnv("YT_LOG_LEVEL", "error"); LogPath = GetEnv("YT_LOG_PATH"); + LogUseCore = GetBool("YT_LOG_USE_CORE", false); ContentEncoding = GetEncoding("YT_CONTENT_ENCODING"); AcceptEncoding = GetEncoding("YT_ACCEPT_ENCODING"); diff --git a/yt/cpp/mapreduce/interface/config.h b/yt/cpp/mapreduce/interface/config.h index 05c4c473e5c7..4d1fd49e8c4e 100644 --- a/yt/cpp/mapreduce/interface/config.h +++ b/yt/cpp/mapreduce/interface/config.h @@ -81,6 +81,18 @@ struct TConfig TString LogLevel; TString LogPath; + /// + /// For historical reasons mapreduce client uses its own logging system. + /// + /// If this options is set to true library switches to yt/yt/core logging by default. + /// But if user calls @ref NYT::SetLogger library switches back to logger provided by user + /// (except for messages from yt/yt/core). + /// + /// This is temporary option. In future it would be true by default, and then removed. + /// + /// https://st.yandex-team.ru/YT-23645 + bool LogUseCore = false; + // Compression for data that is sent to YT cluster. EEncoding ContentEncoding; diff --git a/yt/cpp/mapreduce/interface/logging/logger.cpp b/yt/cpp/mapreduce/interface/logging/logger.cpp index bfa56b94f6d5..5878c960ff8d 100644 --- a/yt/cpp/mapreduce/interface/logging/logger.cpp +++ b/yt/cpp/mapreduce/interface/logging/logger.cpp @@ -182,7 +182,12 @@ ILoggerPtr GetLogger() return Logger; } +void SetUseCoreLog() +{ + auto guard = TWriteGuard(LoggerMutex); + Logger = nullptr; +} + //////////////////////////////////////////////////////////////////////////////// } - diff --git a/yt/cpp/mapreduce/interface/logging/logger.h b/yt/cpp/mapreduce/interface/logging/logger.h index 2b5aae87d144..780327961d19 100644 --- a/yt/cpp/mapreduce/interface/logging/logger.h +++ b/yt/cpp/mapreduce/interface/logging/logger.h @@ -30,6 +30,8 @@ using ILoggerPtr = ::TIntrusivePtr; void SetLogger(ILoggerPtr logger); ILoggerPtr GetLogger(); +void SetUseCoreLog(); + ILoggerPtr CreateStdErrLogger(ILogger::ELevel cutLevel); ILoggerPtr CreateFileLogger(ILogger::ELevel cutLevel, const TString& path, bool append = false); @@ -40,4 +42,6 @@ ILoggerPtr CreateFileLogger(ILogger::ELevel cutLevel, const TString& path, bool */ ILoggerPtr CreateBufferedFileLogger(ILogger::ELevel cutLevel, const TString& path, bool append = false); +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT diff --git a/yt/cpp/mapreduce/interface/logging/yt_log.cpp b/yt/cpp/mapreduce/interface/logging/yt_log.cpp index d41bfa559f10..6c464d48ac41 100644 --- a/yt/cpp/mapreduce/interface/logging/yt_log.cpp +++ b/yt/cpp/mapreduce/interface/logging/yt_log.cpp @@ -26,23 +26,34 @@ class TLogManager ::TSourceLocation sourceLocation, TStringBuf anchorMessage) override { + if (auto* defaultLogManager = GetDefaultLogManager()) { + defaultLogManager->RegisterStaticAnchor(anchor, sourceLocation, anchorMessage); + } auto guard = Guard(Mutex_); anchor->SourceLocation = sourceLocation; anchor->AnchorMessage = anchorMessage; } - void UpdateAnchor(TLoggingAnchor* /*position*/) override - { } + void UpdateAnchor(TLoggingAnchor* anchor) override + { + if (auto* defaultLogManager = GetDefaultLogManager()) { + defaultLogManager->UpdateAnchor(anchor); + } + } void Enqueue(TLogEvent&& event) override { - auto message = TString(event.MessageRef.ToStringBuf()); - LogMessage( - ToImplLevel(event.Level), - ::TSourceLocation(event.SourceFile, event.SourceLine), - "%.*s", - event.MessageRef.size(), - event.MessageRef.begin()); + if (auto logger = GetLogger()) { + LogMessage( + logger, + ToImplLevel(event.Level), + ::TSourceLocation(event.SourceFile, event.SourceLine), + "%.*s", + event.MessageRef.size(), + event.MessageRef.begin()); + } else if (auto* defaultLogManager = GetDefaultLogManager()) { + defaultLogManager->Enqueue(std::move(event)); + } } const TLoggingCategory* GetCategory(TStringBuf categoryName) override @@ -81,7 +92,7 @@ class TLogManager } } - static void LogMessage(ILogger::ELevel level, const ::TSourceLocation& sourceLocation, const char* format, ...) + static void LogMessage(const ILoggerPtr& logger, ILogger::ELevel level, const ::TSourceLocation& sourceLocation, const char* format, ...) { va_list args; va_start(args, format); From d914da33a8058c17411fe1c33b6deed930f29450 Mon Sep 17 00:00:00 2001 From: Maxim Akhmedov Date: Thu, 28 Nov 2024 15:52:58 +0300 Subject: [PATCH 3/8] Introduce YAML format support. * Changelog entry Type: feature Component: proxy Support YAML format for structured data. See more details in RFC: https://github.com/ytsaurus/ytsaurus/wiki/%5BRFC%5D-YAML-format-support --- Pull Request resolved: https://github.com/ytsaurus/ytsaurus/pull/938 commit_hash:2c6c1fbd1e3d1b83182a430b537c802eb8c6b79d --- yt/yt/client/formats/config.cpp | 8 + yt/yt/client/formats/config.h | 20 + yt/yt/client/formats/public.h | 2 + yt/yt/library/formats/format.cpp | 37 ++ yt/yt/library/formats/ya.make | 4 + yt/yt/library/formats/yaml_helpers.cpp | 209 +++++++++ yt/yt/library/formats/yaml_helpers.h | 114 +++++ yt/yt/library/formats/yaml_parser.cpp | 562 +++++++++++++++++++++++++ yt/yt/library/formats/yaml_parser.h | 20 + yt/yt/library/formats/yaml_writer.cpp | 388 +++++++++++++++++ yt/yt/library/formats/yaml_writer.h | 18 + 11 files changed, 1382 insertions(+) create mode 100644 yt/yt/library/formats/yaml_helpers.cpp create mode 100644 yt/yt/library/formats/yaml_helpers.h create mode 100644 yt/yt/library/formats/yaml_parser.cpp create mode 100644 yt/yt/library/formats/yaml_parser.h create mode 100644 yt/yt/library/formats/yaml_writer.cpp create mode 100644 yt/yt/library/formats/yaml_writer.h diff --git a/yt/yt/client/formats/config.cpp b/yt/yt/client/formats/config.cpp index 71c115206171..ed4ee65a73d0 100644 --- a/yt/yt/client/formats/config.cpp +++ b/yt/yt/client/formats/config.cpp @@ -352,4 +352,12 @@ void TSkiffFormatConfig::Register(TRegistrar registrar) //////////////////////////////////////////////////////////////////////////////// +void TYamlFormatConfig::Register(TRegistrar registrar) +{ + registrar.Parameter("write_uint_tag", &TThis::WriteUintTag) + .Default(false); +} + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NFormats diff --git a/yt/yt/client/formats/config.h b/yt/yt/client/formats/config.h index aec3f30db4a8..84b4164b129a 100644 --- a/yt/yt/client/formats/config.h +++ b/yt/yt/client/formats/config.h @@ -415,4 +415,24 @@ DEFINE_REFCOUNTED_TYPE(TSkiffFormatConfig) //////////////////////////////////////////////////////////////////////////////// +class TYamlFormatConfig + : public NYTree::TYsonStruct +{ +public: + //! Write explicit tag "!yt/uint64" for uint64 data type. + //! Use this option if you want to preserve information about + //! the original YT type (without it, numbers in range [0, 2^63-1] + //! will always be written as integers). + //! Option has no effect for parsing. + bool WriteUintTag; + + REGISTER_YSON_STRUCT(TYamlFormatConfig); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TYamlFormatConfig) + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NFormats diff --git a/yt/yt/client/formats/public.h b/yt/yt/client/formats/public.h index 0ac2a234719b..753456699b55 100644 --- a/yt/yt/client/formats/public.h +++ b/yt/yt/client/formats/public.h @@ -58,6 +58,7 @@ DEFINE_ENUM(EFormatType, (WebJson) (Skiff) (Arrow) + (Yaml) ); //////////////////////////////////////////////////////////////////////////////// @@ -76,6 +77,7 @@ DECLARE_REFCOUNTED_CLASS(TProtobufTableConfig) DECLARE_REFCOUNTED_CLASS(TProtobufFormatConfig) DECLARE_REFCOUNTED_CLASS(TWebJsonFormatConfig) DECLARE_REFCOUNTED_CLASS(TSkiffFormatConfig) +DECLARE_REFCOUNTED_CLASS(TYamlFormatConfig) DECLARE_REFCOUNTED_STRUCT(IYamrConsumer) diff --git a/yt/yt/library/formats/format.cpp b/yt/yt/library/formats/format.cpp index 6da0985abf6a..f551b9136ed0 100644 --- a/yt/yt/library/formats/format.cpp +++ b/yt/yt/library/formats/format.cpp @@ -12,6 +12,8 @@ #include "schemaless_writer_adapter.h" #include "skiff_parser.h" #include "skiff_writer.h" +#include "yaml_parser.h" +#include "yaml_writer.h" #include "yamred_dsv_parser.h" #include "yamred_dsv_writer.h" #include "yamr_parser.h" @@ -108,6 +110,18 @@ std::unique_ptr CreateConsumerForDsv( }; } +std::unique_ptr CreateConsumerForYaml( + EDataType dataType, + const IAttributeDictionary& attributes, + IZeroCopyOutput* output) +{ + if (dataType != EDataType::Structured) { + THROW_ERROR_EXCEPTION("YAML is supported only for structured data"); + } + auto config = ConvertTo(&attributes); + return CreateYamlWriter(output, DataTypeToYsonType(dataType), config); +} + class TTableParserAdapter : public IParser { @@ -161,6 +175,8 @@ std::unique_ptr CreateConsumerForFormat( return CreateConsumerForJson(dataType, format.Attributes(), output); case EFormatType::Dsv: return CreateConsumerForDsv(dataType, format.Attributes(), output); + case EFormatType::Yaml: + return CreateConsumerForYaml(dataType, format.Attributes(), output); default: THROW_ERROR_EXCEPTION("Unsupported output format %Qlv", format.GetType()); @@ -408,6 +424,21 @@ TYsonProducer CreateProducerForJson( }); } +TYsonProducer CreateProducerForYaml( + EDataType dataType, + const IAttributeDictionary& attributes, + IInputStream* input) +{ + if (dataType != EDataType::Structured) { + THROW_ERROR_EXCEPTION("YAML is supported only for structured data"); + } + auto ysonType = DataTypeToYsonType(dataType); + auto config = ConvertTo(&attributes); + return BIND([=] (IYsonConsumer* consumer) { + ParseYaml(input, consumer, config, ysonType); + }); +} + TYsonProducer CreateProducerForYson(EDataType dataType, IInputStream* input) { auto ysonType = DataTypeToYsonType(dataType); @@ -429,6 +460,8 @@ TYsonProducer CreateProducerForFormat(const TFormat& format, EDataType dataType, return CreateProducerForYamredDsv(dataType, format.Attributes(), input); case EFormatType::SchemafulDsv: return CreateProducerForSchemafulDsv(dataType, format.Attributes(), input); + case EFormatType::Yaml: + return CreateProducerForYaml(dataType, format.Attributes(), input); default: THROW_ERROR_EXCEPTION("Unsupported input format %Qlv", format.GetType()); @@ -489,6 +522,10 @@ std::unique_ptr CreateParserForFormat(const TFormat& format, EDataType auto config = ConvertTo(&format.Attributes()); return CreateParserForSchemafulDsv(consumer, config); } + case EFormatType::Yaml: + // We can only get here with EDataType::Tabular, so throw specific error about supporting + // only structured data in YAML. + THROW_ERROR_EXCEPTION("YAML is supported only for structured data"); default: THROW_ERROR_EXCEPTION("Unsupported input format %Qlv", format.GetType()); diff --git a/yt/yt/library/formats/ya.make b/yt/yt/library/formats/ya.make index 58c8f28f9de4..b15a6a04e5a4 100644 --- a/yt/yt/library/formats/ya.make +++ b/yt/yt/library/formats/ya.make @@ -22,6 +22,9 @@ SRCS( skiff_yson_converter.cpp unversioned_value_yson_writer.cpp web_json_writer.cpp + yaml_helpers.cpp + yaml_parser.cpp + yaml_writer.cpp yamred_dsv_parser.cpp yamred_dsv_writer.cpp yamr_parser_base.cpp @@ -40,6 +43,7 @@ PEERDIR( yt/yt/library/column_converters contrib/libs/apache/arrow + contrib/libs/yaml ) END() diff --git a/yt/yt/library/formats/yaml_helpers.cpp b/yt/yt/library/formats/yaml_helpers.cpp new file mode 100644 index 000000000000..1797514767d3 --- /dev/null +++ b/yt/yt/library/formats/yaml_helpers.cpp @@ -0,0 +1,209 @@ +#include "yaml_helpers.h" + +#include + +#include + +namespace NYT::NFormats { + +using namespace NYTree; + +//////////////////////////////////////////////////////////////////////////////// + +template +TLibYamlTypeWrapper::TLibYamlTypeWrapper() +{ + // Just in case if we are allocated on stack and the destructor is called before + // the object is initialized. + memset(this, 0, sizeof(*this)); +} + +template +void TLibYamlTypeWrapper::Reset() +{ + Deleter(this); + memset(this, 0, sizeof(*this)); +} + +template +TLibYamlTypeWrapper::~TLibYamlTypeWrapper() +{ + Reset(); +} + +// Explicitly instantiate the wrappers for the types we use. +template struct TLibYamlTypeWrapper; +template struct TLibYamlTypeWrapper; +template struct TLibYamlTypeWrapper; + +//////////////////////////////////////////////////////////////////////////////// + +static THashMap YTTypeMap = { + {"!", EYamlScalarType::String}, + {YAML_INT_TAG, EYamlScalarType::Int}, + {YAML_FLOAT_TAG, EYamlScalarType::Float}, + {YAML_BOOL_TAG, EYamlScalarType::Bool}, + {YAML_NULL_TAG, EYamlScalarType::Null}, + {YAML_STR_TAG, EYamlScalarType::String}, + {YTUintTag, EYamlScalarType::Uint}, +}; + +EYamlScalarType DeduceScalarTypeFromTag(const std::string_view& tag) +{ + auto it = YTTypeMap.find(tag); + if (it != YTTypeMap.end()) { + return it->second; + } + return EYamlScalarType::String; +} + +EYamlScalarType DeduceScalarTypeFromValue(const std::string_view& value) +{ + // We conform to YAML 1.2 Core Schema: + // https://yaml.org/spec/1.2.2/#103-core-schema + static const re2::RE2 NullRE = "null|Null|NULL|~|"; + static const re2::RE2 BoolRE = "true|True|TRUE|false|False|FALSE"; + static const re2::RE2 IntRE = "[+-]?[0-9]+"; + // In YAML 1.2 there are also octal and hexadecimal integers, but they are always positive. + // Therefore, we treat them separately and represent as a uint scalar type. + static const re2::RE2 UintRE = "0o[0-7]+|0x[0-9a-fA-F]+"; + static const re2::RE2 FloatRE = + "[-+]?(\\.[0-9]+|[0-9]+(\\.[0-9]*)?)([eE][-+]?[0-9]+)?|" + "[-+]?(\\.inf|\\.Inf|\\.INF)|" + "\\.nan|\\.NaN|\\.NAN"; + if (re2::RE2::FullMatch(value, NullRE)) { + return EYamlScalarType::Null; + } else if (re2::RE2::FullMatch(value, BoolRE)) { + return EYamlScalarType::Bool; + } else if (re2::RE2::FullMatch(value, IntRE)) { + return EYamlScalarType::Int; + } else if (re2::RE2::FullMatch(value, UintRE)) { + return EYamlScalarType::Uint; + } else if (re2::RE2::FullMatch(value, FloatRE)) { + return EYamlScalarType::Float; + } + return EYamlScalarType::String; +} + +bool ParseAndValidateYamlBool(const std::string_view& value) +{ + if (value == "true" || value == "True" || value == "TRUE") { + return true; + } else if (value == "false" || value == "False" || value == "FALSE") { + return false; + } else { + THROW_ERROR_EXCEPTION("Value %Qv is not a boolean", value); + } +} + +std::pair ParseAndValidateYamlInteger(const std::string_view& value, EYamlScalarType yamlType) +{ + // First, detect the base and prepare a string to calling TryIntFromString function by + // optionally removing the 0x/0o prefix, + int base; + std::string_view adjustedValue; + if (value.starts_with("0x")) { + base = 16; + adjustedValue = value.substr(2); + } else if (value.starts_with("0o")) { + base = 8; + adjustedValue = value.substr(2); + } else { + base = 10; + adjustedValue = value; + } + i64 i64Value; + ui64 ui64Value; + + auto tryFromString = [&] (auto& result) -> bool { + if (base == 10) { + return TryIntFromString<10>(adjustedValue, result); + } else if (base == 16) { + return TryIntFromString<16>(adjustedValue, result); + } else if (base = 8) { + return TryIntFromString<8>(adjustedValue, result); + } else { + YT_ABORT(); + } + }; + + // For untagged or int-tagged values (EYamlScalarType::Int) we first try to fit the value into int64, then into uint64. + // For uint-tagged values (EYamlScalarType::Uint) we try to fit the value only into uint64. + if (yamlType == EYamlScalarType::Int && tryFromString(i64Value)) { + return {ENodeType::Int64, {.Int64 = i64Value}}; + } else if (tryFromString(ui64Value)) { + return {ENodeType::Uint64, {.Uint64 = ui64Value}}; + } else { + std::string requiredDomain = (yamlType == EYamlScalarType::Int) ? "either int64 or uint64" : "uint64"; + THROW_ERROR_EXCEPTION("Value %Qv is not an integer or does not fit into %v", value, requiredDomain); + } +} + +double ParseAndValidateYamlDouble(const std::string_view& value) +{ + double doubleValue; + if (value == ".inf" || value == ".Inf" || value == ".INF" || + value == "+.inf" || value == "+.Inf" || value == "+.INF") + { + doubleValue = std::numeric_limits::infinity(); + } else if (value == "-.inf" || value == "-.Inf" || value == "-.INF") { + doubleValue = -std::numeric_limits::infinity(); + } else if (value == ".nan" || value == ".NaN" || value == ".NAN") { + doubleValue = std::numeric_limits::quiet_NaN(); + } else if (!TryFromString(value, doubleValue)) { + THROW_ERROR_EXCEPTION("Value %Qv is not a floating point integer or does not fit into double", value); + } + return doubleValue; +} + +std::pair ParseScalarValue(const std::string_view& value, EYamlScalarType yamlType) +{ + switch (yamlType) { + case EYamlScalarType::String: + return {ENodeType::String, {}}; + case EYamlScalarType::Null: + return {ENodeType::Entity, {}}; + case EYamlScalarType::Bool: { + bool boolValue = ParseAndValidateYamlBool(value); + return {ENodeType::Boolean, {.Boolean = boolValue}}; + } + case EYamlScalarType::Int: + case EYamlScalarType::Uint: { + return ParseAndValidateYamlInteger(value, yamlType); + } + case EYamlScalarType::Float: { + auto doubleValue = ParseAndValidateYamlDouble(value); + return {ENodeType::Double, {.Double = doubleValue}}; + } + } + YT_ABORT(); +} + +//////////////////////////////////////////////////////////////////////////////// + +std::string_view YamlLiteralToStringView(const yaml_char_t* literal, size_t length) +{ + return literal + ? std::string_view(reinterpret_cast(literal), length) + : std::string_view(); +} + +std::string_view YamlLiteralToStringView(const yaml_char_t* literal) +{ + return literal + ? std::string_view(reinterpret_cast(literal)) + : std::string_view(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NFormats + +void Serialize(const yaml_mark_t& mark, NYT::NYson::IYsonConsumer* consumer) +{ + NYT::NYTree::BuildYsonFluently(consumer) + .BeginMap() + .Item("position").Value(NYT::Format("%v:%v", mark.line, mark.column)) + .Item("index").Value(static_cast(mark.index)) + .EndMap(); +} diff --git a/yt/yt/library/formats/yaml_helpers.h b/yt/yt/library/formats/yaml_helpers.h new file mode 100644 index 000000000000..f9b3300ffc4e --- /dev/null +++ b/yt/yt/library/formats/yaml_helpers.h @@ -0,0 +1,114 @@ +#pragma once + +#include "private.h" + +#include + +#include + +#include + +namespace NYT::NFormats { + +//////////////////////////////////////////////////////////////////////////////// + +template +struct TLibYamlTypeWrapper + : public TLibYamlType + , public TNonCopyable +{ + TLibYamlTypeWrapper(); + void Reset(); + ~TLibYamlTypeWrapper(); +}; + +using TLibYamlParser = TLibYamlTypeWrapper; +using TLibYamlEmitter = TLibYamlTypeWrapper; +using TLibYamlEvent = TLibYamlTypeWrapper; + +//////////////////////////////////////////////////////////////////////////////// + +// These enums are counterparts of the enums in the Yaml library. +// Keep them in sync with the library. + +DEFINE_ENUM(EYamlErrorType, + ((NoError) (YAML_NO_ERROR)) + ((Memory) (YAML_MEMORY_ERROR)) + ((Reader) (YAML_READER_ERROR)) + ((Scanner) (YAML_SCANNER_ERROR)) + ((Parser) (YAML_PARSER_ERROR)) + ((Composer) (YAML_COMPOSER_ERROR)) + ((Writer) (YAML_WRITER_ERROR)) + ((Emitter) (YAML_EMITTER_ERROR)) +); + +DEFINE_ENUM(EYamlEventType, + ((NoEvent) (YAML_NO_EVENT)) + ((StreamStart) (YAML_STREAM_START_EVENT)) + ((StreamEnd) (YAML_STREAM_END_EVENT)) + ((DocumentStart) (YAML_DOCUMENT_START_EVENT)) + ((DocumentEnd) (YAML_DOCUMENT_END_EVENT)) + ((Alias) (YAML_ALIAS_EVENT)) + ((Scalar) (YAML_SCALAR_EVENT)) + ((SequenceStart) (YAML_SEQUENCE_START_EVENT)) + ((SequenceEnd) (YAML_SEQUENCE_END_EVENT)) + ((MappingStart) (YAML_MAPPING_START_EVENT)) + ((MappingEnd) (YAML_MAPPING_END_EVENT)) +); + +//! This tag is used for denoting 2-element sequences that represent a YSON node with attributes. +static constexpr std::string_view YTAttrNodeTag = "!yt/attrnode"; + +//! Thia tag is used upon parsing to denote an integer scalar which should be +//! represented by YT uint64 type. Writer by default omits this tag, but may be +//! configured to force this tag on all uint64 values. +static constexpr std::string_view YTUintTag = "!yt/uint64"; + +//////////////////////////////////////////////////////////////////////////////// + +//! We support: +//! - YAML 1.2 Core schema types +//! - YT-specific uint type, for which we introduce a special tag "!yt/uint64". +DEFINE_ENUM(EYamlScalarType, + (String) + (Int) + (Float) + (Bool) + (Null) + (Uint) +); + +union TNonStringScalar +{ + i64 Int64; + ui64 Uint64; + double Double; + bool Boolean; +}; + +//! Extracts a recognized YAML scalar type from a tag. +EYamlScalarType DeduceScalarTypeFromTag(const std::string_view& tag); +//! Guesses a recognized YAML scalar type from a value. +EYamlScalarType DeduceScalarTypeFromValue(const std::string_view& value); +//! Given a recognized YAML type, transforms it into a YT type and, +//! in case of a non-string result, parses a scalar value. +std::pair ParseScalarValue( + const std::string_view& value, + EYamlScalarType yamlType); + +//////////////////////////////////////////////////////////////////////////////// + +// Convenience helpers for transforming a weirdly represented (yaml_char_t* ~ unsigned char*) +// YAML string into string_view, also handling the case of a null pointer. + +std::string_view YamlLiteralToStringView(const yaml_char_t* literal, size_t length); +std::string_view YamlLiteralToStringView(const yaml_char_t* literal); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NFormats + +// Note that ADL requires to put this function in the global namespace since +// yaml_mark_t is defined in the global namespace from C++ POV + +void Serialize(const yaml_mark_t& mark, NYT::NYson::IYsonConsumer* consumer); diff --git a/yt/yt/library/formats/yaml_parser.cpp b/yt/yt/library/formats/yaml_parser.cpp new file mode 100644 index 000000000000..5e930f250eea --- /dev/null +++ b/yt/yt/library/formats/yaml_parser.cpp @@ -0,0 +1,562 @@ +#include "yaml_parser.h" + +#include "yaml_helpers.h" + +#include + +#include + +#include + +#include + +#include + +namespace NYT::NFormats { + +using namespace NYson; +using namespace NYTree; + +//////////////////////////////////////////////////////////////////////////////// + +//! A helper class that takes care of the repeated parts of a YAML document that +//! are expressed as anchors and aliases. Under the hood, materializes a YSON +//! string for each anchor and emits it to the underlying consumer via +//! OnRaw when needed. +/*! + * Implementation notes: + * - Conforming to YAML 1.2, alias may refer only to a previously defined anchor. + * - Aliasing an anchor to an ancestor node is not supported as the resulting document + * cannot be represent as a finite YSON (even though some implementations with tree + * representations support that, e.g. PyYAML). + * - According to the YAML spec, alias may be "overridden" by a later definition. + * This feature is considered error-prone, will probably be removed in next + * versions of YAML spec (https://github.com/yaml/yaml-spec/pull/65) and is not + * supported by us. + * - Using an alias to a scalar anchor as a map key or anchoring a map key are not + * supported for the sake of simpler implementation (and are considered a weird thing + * to do by an author of this code). + */ +class TAnchorRecordingConsumer + : public IYsonConsumer +{ +public: + explicit TAnchorRecordingConsumer(IYsonConsumer* underlyingConsumer) + : UnderlyingConsumer_(underlyingConsumer) + , RunListWriter_(&RunListStream_, EYsonType::ListFragment) + { } + + void OnStringScalar(TStringBuf value) override + { + ForAllConsumers([=] (auto* consumer) { consumer->OnStringScalar(value); }); + MaybeFinishAnchor(); + } + + void OnInt64Scalar(i64 value) override + { + ForAllConsumers([=] (auto* consumer) { consumer->OnInt64Scalar(value); }); + MaybeFinishAnchor(); + } + + void OnUint64Scalar(ui64 value) override + { + ForAllConsumers([=] (auto* consumer) { consumer->OnUint64Scalar(value); }); + MaybeFinishAnchor(); + } + + void OnDoubleScalar(double value) override + { + ForAllConsumers([=] (auto* consumer) { consumer->OnDoubleScalar(value); }); + MaybeFinishAnchor(); + } + + void OnBooleanScalar(bool value) override + { + ForAllConsumers([=] (auto* consumer) { consumer->OnBooleanScalar(value); }); + MaybeFinishAnchor(); + } + + void OnEntity() override + { + ForAllConsumers([=] (auto* consumer) { consumer->OnEntity(); }); + MaybeFinishAnchor(); + } + + void OnBeginList() override + { + ++CurrentDepth_; + ForAllConsumers([=] (auto* consumer) { consumer->OnBeginList(); }); + } + + void OnListItem() override + { + ForAllConsumers([=] (auto* consumer) { consumer->OnListItem(); }); + } + + void OnEndList() override + { + ForAllConsumers([=] (auto* consumer) { consumer->OnEndList(); }); + --CurrentDepth_; + MaybeFinishAnchor(); + } + + void OnBeginMap() override + { + ++CurrentDepth_; + ForAllConsumers([] (auto* consumer) { consumer->OnBeginMap(); }); + } + + void OnKeyedItem(TStringBuf key) override + { + ForAllConsumers([=] (auto* consumer) { consumer->OnKeyedItem(key); }); + } + + void OnEndMap() override + { + ForAllConsumers([] (auto* consumer) { consumer->OnEndMap(); }); + --CurrentDepth_; + MaybeFinishAnchor(); + } + + void OnBeginAttributes() override + { + ++CurrentDepth_; + ForAllConsumers([] (auto* consumer) { consumer->OnBeginAttributes(); }); + } + + void OnEndAttributes() override + { + ForAllConsumers([] (auto* consumer) { consumer->OnEndAttributes(); }); + --CurrentDepth_; + // NB: do not call MaybeFinishAnchorOrRun here, as we do not want to record only + // attribute map part of the node. + } + + void OnRaw(TStringBuf yson, EYsonType type) override + { + // The only caller for this OnRaw is ourselves in case of aliases, and aliases + // always point to YSON node. + YT_VERIFY(type == EYsonType::Node); + ForAllConsumers([=] (auto* consumer) { consumer->OnRaw(yson, type); }); + MaybeFinishAnchor(); + } + + void OnAnchor(const std::string& anchor) + { + StartRun(); + auto inserted = KnownAnchorNames_.insert(anchor).second; + if (!inserted) { + THROW_ERROR_EXCEPTION("Anchor %Qv is already defined", anchor); + } + auto& currentAnchor = ConstructingAnchors_.emplace_back(); + currentAnchor = { + anchor, + CurrentDepth_, + GetCurrentRunOffset(), + }; + } + + void OnAlias(const std::string& alias) + { + auto it = FinishedAnchors_.find(alias); + if (it == FinishedAnchors_.end()) { + THROW_ERROR_EXCEPTION("Alias %Qv refers to an undefined or unfinished anchor", alias); + } + auto& anchor = it->second; + + RunListWriter_.Flush(); + std::string_view yson = RunListStream_.Str(); + yson = yson.substr(anchor.StartOffset, anchor.EndOffset - anchor.StartOffset); + // NB: TBufferedBinaryYsonWriter writes ';' in a bit different way that you would expect from + // IYsonConsumer interface -- not as a reaction to OnListItem or OnKeyedItem, but rather when a node + // is finished. This leads to yson string above always containing extra trailing ';'. + // We strip it off, as our YSON consumers expect a YSON node to be serialized during OnAlias call. + YT_VERIFY(yson.ends_with(NYson::NDetail::ItemSeparatorSymbol)); + yson.remove_suffix(1); + OnRaw(yson, EYsonType::Node); + } + +private: + IYsonConsumer* UnderlyingConsumer_; + //! Whenever there is at least one anchor being recorded, the stream is used to + //! record the YSON representation of the outermost anchor. We call the representation + //! of such an outermost anchor a run. Conveniently, we represent runs as elements of + //! a fictional YSON list, making each anchor a substring of that YSON list. + TStringStream RunListStream_; + TBufferedBinaryYsonWriter RunListWriter_; + + struct TAnchor + { + std::string Name; + int Depth; + ssize_t StartOffset; + ssize_t EndOffset = -1; + }; + //! A stack of all anchors currently being constructed. + std::vector ConstructingAnchors_; + //! A set of all anchors that are currently constructed. + THashSet KnownAnchorNames_; + + //! A map containing YSON representations of anchors that have been finished. + THashMap FinishedAnchors_; + + int CurrentDepth_ = 0; + + void ForAllConsumers(auto&& action) + { + action(UnderlyingConsumer_); + if (IsInRun()) { + action(&RunListWriter_); + } + } + + void StartRun() + { + if (!IsInRun()) { + RunListWriter_.OnListItem(); + } + } + + bool IsInRun() const + { + return !ConstructingAnchors_.empty(); + } + + ssize_t GetCurrentRunOffset() const + { + YT_VERIFY(IsInRun()); + return RunListWriter_.GetTotalWrittenSize(); + } + + //! Checks current depth, maybe finalizes the innermost anchor, and if + //! the anchor stack vanishes, finalizes the outermost anchor. + void MaybeFinishAnchor() + { + if (!ConstructingAnchors_.empty() && CurrentDepth_ == ConstructingAnchors_.back().Depth) { + // Finalize the innermost (stack topmost) anchor. + YT_VERIFY(IsInRun()); + auto& anchor = ConstructingAnchors_.back(); + + anchor.EndOffset = GetCurrentRunOffset(); + auto inserted = FinishedAnchors_.emplace(anchor.Name, std::move(anchor)).second; + // Insertion is ensured by the checks in OnAnchor. + YT_VERIFY(inserted); + ConstructingAnchors_.pop_back(); + } + } +}; + +class TYamlParser +{ +public: + TYamlParser(IInputStream* input, IYsonConsumer* consumer, TYamlFormatConfigPtr config, EYsonType ysonType) + : Input_(input) + , Consumer_(consumer) + , Config_(std::move(config)) + , YsonType_(ysonType) + { + yaml_parser_initialize(&Parser_); + yaml_parser_set_input(&Parser_, &ReadHandler, this); + } + + void Parse() + { + VisitStream(); + } + +private: + IInputStream* Input_; + TAnchorRecordingConsumer Consumer_; + TYamlFormatConfigPtr Config_; + EYsonType YsonType_; + + TLibYamlParser Parser_; + + TError ReadError_; + + TLibYamlEvent Event_; + + //! Convenience helper to get rid of the ugly casts. + EYamlEventType GetEventType() const + { + return static_cast(Event_.type); + } + + static int ReadHandler(void* data, unsigned char* buffer, size_t size, size_t* sizeRead) + { + auto* yamlParser = reinterpret_cast(data); + auto* input = yamlParser->Input_; + + try { + // IInputStream is similar to yaml_read_handler_t interface + // in EOF case: former returns 0 from Read(), and latter + // expects handler to set size_read to 0 and return 1 + *sizeRead = input->Read(buffer, size); + return 1; + } catch (const std::exception& ex) { + // We do not expect the read handler to be called after an error. + YT_ASSERT(yamlParser->ReadError_.IsOK()); + yamlParser->ReadError_ = TError(ex); + // Not really used by libyaml, but let's set it to 0 just in case. + *sizeRead = 0; + return 0; + } + } + + //! A wrapper around C-style libyaml API calls that return 0 on error which + //! throws an exception in case of an error. + int SafeInvoke(auto* method, auto... args) + { + int result = method(args...); + if (result == 0) { + ThrowError(); + } + return result; + } + + //! Throw an exception formed from the emitter state and possibly the exception + //! caught in the last write handler call. + [[noreturn]] void ThrowError() + { + // Unfortunately, libyaml may sometimes set error = YAML_NO_ERROR. This may lead + // to unclear exceptions during parsing. + auto yamlErrorType = static_cast(Parser_.error); + auto error = TError("YAML parser error: %v", Parser_.problem) + << TErrorAttribute("yaml_error_type", yamlErrorType) + << TErrorAttribute("problem_offset", Parser_.problem_offset) + << TErrorAttribute("problem_value", Parser_.problem_value) + << TErrorAttribute("problem_mark", Parser_.problem_mark); + if (Parser_.context) { + error <<= TErrorAttribute("context", Parser_.context); + error <<= TErrorAttribute("context_mark", Parser_.context_mark); + } + if (!ReadError_.IsOK()) { + error <<= ReadError_; + } + + THROW_ERROR error; + } + + //! Pull the next event from the parser into Event_ and check that it is one of the expected types. + void PullEvent(std::initializer_list expectedTypes) + { + Event_.Reset(); + SafeInvoke(yaml_parser_parse, &Parser_, &Event_); + for (const auto expectedType : expectedTypes) { + if (GetEventType() == expectedType) { + return; + } + } + // TODO(max42): stack and position! + THROW_ERROR_EXCEPTION( + "Unexpected event type %Qlv, expected one of %Qlv", + GetEventType(), + std::vector(expectedTypes)); + } + + void VisitStream() + { + PullEvent({EYamlEventType::StreamStart}); + while (true) { + PullEvent({EYamlEventType::DocumentStart, EYamlEventType::StreamEnd}); + if (GetEventType() == EYamlEventType::StreamEnd) { + break; + } + if (YsonType_ == EYsonType::ListFragment) { + Consumer_.OnListItem(); + } + VisitDocument(); + } + } + + void VisitDocument() + { + PullEvent({ + EYamlEventType::Scalar, + EYamlEventType::SequenceStart, + EYamlEventType::MappingStart, + EYamlEventType::Alias, + }); + VisitNode(); + PullEvent({EYamlEventType::DocumentEnd}); + } + + void VisitNode() + { + auto maybeOnAnchor = [&] (yaml_char_t* anchor) { + if (anchor) { + Consumer_.OnAnchor(std::string(YamlLiteralToStringView(anchor))); + } + }; + switch (GetEventType()) { + case EYamlEventType::Scalar: + maybeOnAnchor(Event_.data.scalar.anchor); + VisitScalar(); + break; + case EYamlEventType::SequenceStart: + maybeOnAnchor(Event_.data.scalar.anchor); + VisitSequence(); + break; + case EYamlEventType::MappingStart: + maybeOnAnchor(Event_.data.scalar.anchor); + VisitMapping(/*isAttributes*/ false); + break; + case EYamlEventType::Alias: + Consumer_.OnAlias(std::string(YamlLiteralToStringView(Event_.data.alias.anchor))); + break; + default: + YT_ABORT(); + } + } + + void VisitScalar() + { + auto scalar = Event_.data.scalar; + auto yamlValue = YamlLiteralToStringView(scalar.value, scalar.length); + + // According to YAML spec, there are two non-specific tags "!" and "?", and all other + // tags are specific. + // + // If the tag is missing, parser should assign tag "!" to non-plain (quoted) scalars, + // and "?" to plain scalars and collection nodes. For some reason, libyaml does not + // do that for us. + // + // Then, "!"-tagged scalars should always be treated as strings, i.e. "!" -> YT string. + // + // Specific tags are either recognized by us, in which case we deduce a corresponding YT type, + // or we assign a string type otherwise. + // + // For the "?"-tagged scalars we perform the type deduction based on the scalar value + // (which is the most often case, as almost nobody uses type tags in YAML). + // + // Cf. https://yaml.org/spec/1.2.2/#332-resolved-tags + std::string_view tag; + if (scalar.tag) { + tag = YamlLiteralToStringView(scalar.tag); + } else if (scalar.style != YAML_PLAIN_SCALAR_STYLE) { + tag = "!"; + } else { + tag = "?"; + } + + EYamlScalarType yamlType; + if (tag != "?") { + yamlType = DeduceScalarTypeFromTag(tag); + } else { + yamlType = DeduceScalarTypeFromValue(yamlValue); + } + auto [ytType, nonStringScalar] = ParseScalarValue(yamlValue, yamlType); + switch (ytType) { + case ENodeType::String: + Consumer_.OnStringScalar(yamlValue); + break; + case ENodeType::Int64: + Consumer_.OnInt64Scalar(nonStringScalar.Int64); + break; + case ENodeType::Uint64: + Consumer_.OnUint64Scalar(nonStringScalar.Uint64); + break; + case ENodeType::Double: + Consumer_.OnDoubleScalar(nonStringScalar.Double); + break; + case ENodeType::Boolean: + Consumer_.OnBooleanScalar(nonStringScalar.Boolean); + break; + case ENodeType::Entity: + Consumer_.OnEntity(); + break; + default: + YT_ABORT(); + } + } + + void VisitSequence() + { + // NB: YSON node with attributes is represented as a yt/attrnode-tagged YAML sequence, + // so handle it as a special case. + if (YamlLiteralToStringView(Event_.data.mapping_start.tag) == YTAttrNodeTag) { + VisitNodeWithAttributes(); + return; + } + + Consumer_.OnBeginList(); + while (true) { + PullEvent({ + EYamlEventType::SequenceEnd, + EYamlEventType::SequenceStart, + EYamlEventType::MappingStart, + EYamlEventType::Scalar, + EYamlEventType::Alias + }); + if (GetEventType() == EYamlEventType::SequenceEnd) { + break; + } + Consumer_.OnListItem(); + VisitNode(); + } + Consumer_.OnEndList(); + } + + void VisitNodeWithAttributes() + { + PullEvent({EYamlEventType::MappingStart}); + VisitMapping(/*isAttributes*/ true); + + PullEvent({ + EYamlEventType::Scalar, + EYamlEventType::SequenceStart, + EYamlEventType::MappingStart, + EYamlEventType::Alias, + }); + VisitNode(); + + PullEvent({EYamlEventType::SequenceEnd}); + } + + void VisitMapping(bool isAttributes) + { + isAttributes ? Consumer_.OnBeginAttributes() : Consumer_.OnBeginMap(); + while (true) { + PullEvent({ + EYamlEventType::MappingEnd, + EYamlEventType::Scalar, + // Yes, YAML is weird enough to support aliases as keys! + EYamlEventType::Alias, + }); + if (GetEventType() == EYamlEventType::MappingEnd) { + break; + } else if (GetEventType() == EYamlEventType::Alias) { + THROW_ERROR_EXCEPTION("Using alias as a map key is not supported"); + } else { + if (Event_.data.scalar.anchor) { + THROW_ERROR_EXCEPTION("Putting anchors on map keys is not supported"); + } + auto key = YamlLiteralToStringView(Event_.data.scalar.value, Event_.data.scalar.length); + Consumer_.OnKeyedItem(key); + } + + PullEvent({ + EYamlEventType::Scalar, + EYamlEventType::SequenceStart, + EYamlEventType::MappingStart, + EYamlEventType::Alias, + }); + VisitNode(); + } + isAttributes ? Consumer_.OnEndAttributes() : Consumer_.OnEndMap(); + } +}; + +void ParseYaml( + IInputStream* input, + IYsonConsumer* consumer, + TYamlFormatConfigPtr config, + EYsonType ysonType) +{ + TYamlParser parser(input, consumer, config, ysonType); + parser.Parse(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NFormats diff --git a/yt/yt/library/formats/yaml_parser.h b/yt/yt/library/formats/yaml_parser.h new file mode 100644 index 000000000000..6cdfc4dab63b --- /dev/null +++ b/yt/yt/library/formats/yaml_parser.h @@ -0,0 +1,20 @@ +#pragma once + +#include + +#include + +namespace NYT::NFormats { + +//////////////////////////////////////////////////////////////////////////////// + +//! Parses a YAML stream in pull mode (may be used for structured driver commands). +void ParseYaml( + IInputStream* input, + NYson::IYsonConsumer* consumer, + TYamlFormatConfigPtr config, + NYson::EYsonType ysonType); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NFormats diff --git a/yt/yt/library/formats/yaml_writer.cpp b/yt/yt/library/formats/yaml_writer.cpp new file mode 100644 index 000000000000..3d2961d46a66 --- /dev/null +++ b/yt/yt/library/formats/yaml_writer.cpp @@ -0,0 +1,388 @@ +#include "yaml_writer.h" + +#include "helpers.h" +#include "yaml_helpers.h" + +#include + +#include + +namespace NYT::NFormats { + +using namespace NYson; + +//////////////////////////////////////////////////////////////////////////// + +class TYamlWriter + : public TFormatsConsumerBase +{ +public: + TYamlWriter( + IOutputStream* output, + NYson::EYsonType /*type*/, + TYamlFormatConfigPtr config) + : Output_(output) + , Config_(config) + { + SafeInvoke(yaml_emitter_initialize, &Emitter_); + yaml_emitter_set_output(&Emitter_, &WriteHandler, this); + EmitEvent(yaml_stream_start_event_initialize, YAML_ANY_ENCODING); + } + + void Flush() override + { + SafeInvoke(yaml_emitter_flush, &Emitter_); + } + + void OnStringScalar(TStringBuf value) override + { + OnNodeEnter(); + // We try to emit a plain (unquoted) scalar if possible. It may be not possible + // either because of YAML syntax restrictions (which will be handled by libyaml switching + // to a quoted style automatically), or because the plain style would produce a scalar + // which belongs to one of Core YAML schema regexps, making reasonable parsers interpret it as + // int/float/bool/null instead of string. + // + // PyYAML and Go YAML parsers handle this issue by checking the type that would be deduced from + // an unquoted representation and quoting the scalar if it would be interpreted as a non-string type. + // We utilize the same approach here. + + auto plainYamlType = DeduceScalarTypeFromValue(value); + auto desiredScalarStyle = YAML_ANY_SCALAR_STYLE; + if (plainYamlType != EYamlScalarType::String) { + desiredScalarStyle = YAML_DOUBLE_QUOTED_SCALAR_STYLE; + } + + EmitEvent( + yaml_scalar_event_initialize, + /*anchor*/ nullptr, + /*tag*/ nullptr, + reinterpret_cast(value.Data()), + value.Size(), + /*plain_implicit*/ 1, + /*quoted_implicit*/ 1, + desiredScalarStyle); + OnNodeLeave(); + } + + void OnInt64Scalar(i64 value) override + { + OnNodeEnter(); + // Int64 scalars are always represented as plain (unquoted) YAML scalars. + // Core YAML schema regexps ensures that they will be interpreted as integers + // by all reasonable YAML parsers. + // Cf. https://yaml.org/spec/1.2.2/#1032-tag-resolution + char buf[64]; + auto length = IntToString<10>(value, buf, sizeof(buf)); + EmitEvent( + yaml_scalar_event_initialize, + /*anchor*/ nullptr, + /*tag*/ nullptr, + reinterpret_cast(buf), + length, + /*plain_implicit*/ 1, + /*quoted_implicit*/ 0, + YAML_PLAIN_SCALAR_STYLE); + OnNodeLeave(); + } + + void OnUint64Scalar(ui64 value) override + { + OnNodeEnter(); + // Uint64 scalars are by default represented as plain (unquoted) YAML scalars, + // similar to Int64 scalars (see the comment in OnInt64Scalar). + // However, we optionally support a custom "!yt/uint64" tag to preserve the + // information that the value is unsigned, which may be useful to control + // signedness upon writing and for YT -> YAML -> YT roundtrip consistency. + char buf[64]; + auto length = IntToString<10>(value, buf, sizeof(buf)); + + // In libyaml API plainImplicit defines whether the writer omits the tag. + bool plainImplicit = !Config_->WriteUintTag; + + EmitEvent( + yaml_scalar_event_initialize, + /*anchor*/ nullptr, + /*tag*/ reinterpret_cast(YTUintTag.data()), + reinterpret_cast(buf), + length, + /*plain_implicit*/ plainImplicit, + /*quoted_implicit*/ 0, + YAML_PLAIN_SCALAR_STYLE); + OnNodeLeave(); + } + + void OnDoubleScalar(double value) override + { + OnNodeEnter(); + // Double scalars are by default represented as plain (unquoted) YAML scalars, + // similar to Int64 scalars (see the comment in OnInt64Scalar). + + char buf[512]; + auto length = DoubleToYamlString(value, buf, sizeof(buf)); + EmitEvent( + yaml_scalar_event_initialize, + /*anchor*/ nullptr, + /*tag*/ reinterpret_cast(YAML_FLOAT_TAG), + reinterpret_cast(buf), + length, + /*plain_implicit*/ 1, + /*quoted_implicit*/ 0, + YAML_PLAIN_SCALAR_STYLE); + OnNodeLeave(); + } + + void OnBooleanScalar(bool value) override + { + OnNodeEnter(); + static const std::string_view trueLiteral = "true"; + static const std::string_view falseLiteral = "false"; + const std::string_view& literal = value ? trueLiteral : falseLiteral; + EmitEvent( + yaml_scalar_event_initialize, + /*anchor*/ nullptr, + /*tag*/ reinterpret_cast(YAML_BOOL_TAG), + reinterpret_cast(const_cast(literal.data())), + literal.size(), + /*plain_implicit*/ 1, + /*quoted_implicit*/ 0, + YAML_PLAIN_SCALAR_STYLE); + OnNodeLeave(); + } + + virtual void OnEntity() override + { + OnNodeEnter(); + static const std::string_view nullLiteral = "null"; + EmitEvent( + yaml_scalar_event_initialize, + /*anchor*/ nullptr, + /*tag*/ reinterpret_cast(YAML_NULL_TAG), + reinterpret_cast(const_cast(nullLiteral.data())), + nullLiteral.size(), + /*plain_implicit*/ 1, + /*quoted_implicit*/ 0, + YAML_PLAIN_SCALAR_STYLE); + OnNodeLeave(); + } + + virtual void OnBeginList() override + { + OnNodeEnter(); + EmitEvent( + yaml_sequence_start_event_initialize, + /*anchor*/ nullptr, + /*tag*/ nullptr, + /*implicit*/ 1, + YAML_ANY_SEQUENCE_STYLE); + } + + virtual void OnListItem() override + { } + + virtual void OnEndList() override + { + EmitEvent(yaml_sequence_end_event_initialize); + OnNodeLeave(); + } + + virtual void OnBeginMap() override + { + OnNodeEnter(); + EmitEvent( + yaml_mapping_start_event_initialize, + /*anchor*/ nullptr, + /*tag*/ nullptr, + /*implicit*/ 1, + YAML_ANY_MAPPING_STYLE); + } + + virtual void OnKeyedItem(TStringBuf key) override + { + OnStringScalar(key); + } + + virtual void OnEndMap() override + { + EmitEvent(yaml_mapping_end_event_initialize); + OnNodeLeave(); + } + + virtual void OnBeginAttributes() override + { + // NB: node with attributes in YAML is represented as a yt/attrnode-tagged 2-item sequence. + OnNodeEnter(); + EmitEvent( + yaml_sequence_start_event_initialize, + /*anchor*/ nullptr, + /*tag*/ reinterpret_cast(YTAttrNodeTag.data()), + /*implicit*/ 0, + YAML_ANY_SEQUENCE_STYLE); + EmitEvent( + yaml_mapping_start_event_initialize, + /*anchor*/ nullptr, + /*tag*/ nullptr, + /*implicit*/ 1, + YAML_ANY_MAPPING_STYLE); + } + + virtual void OnEndAttributes() override + { + EmitEvent(yaml_mapping_end_event_initialize); + ImmediatelyAfterAttributes_ = true; + OnNodeLeave(/*isAttributes*/ true); + DepthsWithPendingValueClosure_.push_back(CurrentDepth_); + } + +private: + using TEmitterPtr = std::unique_ptr; + using TEventPtr = std::unique_ptr; + + IOutputStream* Output_; + TYamlFormatConfigPtr Config_; + TLibYamlEmitter Emitter_; + TError WriteError_; + + // Utilities for tracking the current depth and the stack of depths at which + // we must perform extra sequence closure due to yt/attrnode-tagged 2-item sequence convention. + + //! The depth of the current node in the YSON tree. + int CurrentDepth_ = 0; + //! A stack of depths at which attributes are present. + std::vector DepthsWithPendingValueClosure_ = {-1}; + //! A flag indicating that the we are immediately after the OnEndAttributes() event. + bool ImmediatelyAfterAttributes_ = false; + + static int WriteHandler(void* data, unsigned char* buffer, size_t size) + { + auto* yamlWriter = reinterpret_cast(data); + auto* output = yamlWriter->Output_; + + try { + output->Write(buffer, size); + } catch (const std::exception& ex) { + // We do not expect the write handler to be called after an error. + YT_ASSERT(yamlWriter->WriteError_.IsOK()); + yamlWriter->WriteError_ = TError(ex); + return 0; + } + return 1; + } + + //! A wrapper around C-style libyaml API calls that return 0 on error which + //! throws an exception in case of an error. + int SafeInvoke(auto* method, auto... args) + { + int result = method(args...); + if (result == 0) { + ThrowError(); + } + return result; + } + + //! Throw an exception formed from the emitter state and possibly the exception + //! caught in the last write handler call. + void ThrowError() + { + // Unfortunately, libyaml may sometimes YAML_NO_ERROR. This may lead + // to unclear exceptions during parsing. + auto yamlErrorType = static_cast(Emitter_.error); + auto error = TError("YAML emitter error: %v", Emitter_.problem) + << TErrorAttribute("yaml_error_type", yamlErrorType); + + if (!WriteError_.IsOK()) { + error <<= WriteError_; + } + + THROW_ERROR error; + } + + void EmitEvent(auto* eventInitializer, auto... args) + { + yaml_event_t event; + // Event initializer is guaranteed to release all resources in case of an error. + SafeInvoke(eventInitializer, &event, args...); + SafeInvoke(yaml_emitter_emit, &Emitter_, &event); + } + + void OnNodeEnter() + { + // If we are at the depth 0 and it is not a break between the root node attributes and the root node, + // emit the document start event. + if (CurrentDepth_ == 0 && !ImmediatelyAfterAttributes_) { + EmitEvent( + yaml_document_start_event_initialize, + /*version_directive*/ nullptr, + /*tag_directives_start*/ nullptr, + /*tag_directives_end*/ nullptr, + /*implicit*/ 1); + } + ++CurrentDepth_; + ImmediatelyAfterAttributes_ = false; + } + + void OnNodeLeave(bool isAttributes = false) + { + --CurrentDepth_; + if (CurrentDepth_ == DepthsWithPendingValueClosure_.back()) { + EmitEvent(yaml_sequence_end_event_initialize); + DepthsWithPendingValueClosure_.pop_back(); + } + if (isAttributes) { + ImmediatelyAfterAttributes_ = true; + } + // If we are leaving the root node and it is not a break between the root node attributes and the root node, + // emit the document end event. + if (CurrentDepth_ == 0 && !isAttributes) { + EmitEvent(yaml_document_end_event_initialize, /*implicit*/ 1); + } + } + + size_t DoubleToYamlString(double value, char* buf, size_t size) + { + // Extra care must be taken to handle non-finite values (NaN, Inf, -Inf), + // and also to ensure that the resulting value cannot be parsed as an integer. + // Both things are done similarly to the corresponding logic in the YSON writer. + // Cf. NYson::TUncheckedYsonTokenWriter::WriteTextDouble. + + if (std::isfinite(value)) { + auto length = FloatToString(value, buf, size); + std::string_view str(buf, length); + if (str.find('.') == std::string::npos && str.find('e') == std::string::npos) { + YT_VERIFY(length + 1 <= size); + buf[length++] = '.'; + } + return length; + } else { + static const std::string_view nanLiteral = ".nan"; + static const std::string_view infLiteral = ".inf"; + static const std::string_view negativeInfLiteral = "-.inf"; + + std::string_view str; + if (std::isnan(value)) { + str = nanLiteral; + } else if (std::isinf(value) && value > 0) { + str = infLiteral; + } else { + str = negativeInfLiteral; + } + YT_VERIFY(str.size() + 1 <= size); + ::memcpy(buf, str.data(), str.size() + 1); + return str.size(); + } + } +}; + +std::unique_ptr CreateYamlWriter( + IZeroCopyOutput* output, + NYson::EYsonType type, + TYamlFormatConfigPtr config) +{ + // Note that output gets narrowed to IOutputStream* as the currently used yaml library + // interface is not zero-copy by its nature. + return std::make_unique(output, type, config); +} + +//////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NFormats diff --git a/yt/yt/library/formats/yaml_writer.h b/yt/yt/library/formats/yaml_writer.h new file mode 100644 index 000000000000..c3e7fb76fd4a --- /dev/null +++ b/yt/yt/library/formats/yaml_writer.h @@ -0,0 +1,18 @@ +#pragma once + +#include + +#include + +namespace NYT::NFormats { + +//////////////////////////////////////////////////////////////////////////////// + +std::unique_ptr CreateYamlWriter( + IZeroCopyOutput* output, + NYson::EYsonType type, + TYamlFormatConfigPtr config); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NFormats From 796e6186c6652f49958e68c7eb0f06c52827e702 Mon Sep 17 00:00:00 2001 From: udovichenko-r Date: Thu, 28 Nov 2024 17:44:07 +0300 Subject: [PATCH 4/8] YQL-19309 Remove IDqIntegration dependency on TDqSettings commit_hash:46cec1b389108c3cf83c120f92b37b2ebd38a8e3 --- .../core/dq_integration/yql_dq_integration.h | 21 +++++++++++++++---- .../common/dq/yql_dq_integration_impl.cpp | 5 ++--- .../common/dq/yql_dq_integration_impl.h | 5 ++--- .../pg/provider/yql_pg_dq_integration.cpp | 2 +- 4 files changed, 22 insertions(+), 11 deletions(-) diff --git a/yql/essentials/core/dq_integration/yql_dq_integration.h b/yql/essentials/core/dq_integration/yql_dq_integration.h index 4f08726cf23f..d1aaa655a643 100644 --- a/yql/essentials/core/dq_integration/yql_dq_integration.h +++ b/yql/essentials/core/dq_integration/yql_dq_integration.h @@ -21,7 +21,6 @@ class TJsonValue; namespace NYql { -struct TDqSettings; class TTransformationPipeline; namespace NCommon { @@ -45,12 +44,26 @@ class IDqIntegration { public: virtual ~IDqIntegration() {} - virtual ui64 Partition(const TDqSettings& config, size_t maxPartitions, const TExprNode& node, - TVector& partitions, TString* clusterName, TExprContext& ctx, bool canFallback) = 0; + struct TPartitionSettings { + TMaybe DataSizePerJob; + size_t MaxPartitions = 0; + TMaybe EnableComputeActor; + bool CanFallback = false; + }; + + virtual ui64 Partition(const TExprNode& node, TVector& partitions, TString* clusterName, TExprContext& ctx, const TPartitionSettings& settings) = 0; virtual bool CheckPragmas(const TExprNode& node, TExprContext& ctx, bool skipIssues = false) = 0; virtual bool CanRead(const TExprNode& read, TExprContext& ctx, bool skipIssues = true) = 0; virtual TMaybe EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TVector& nodes, TExprContext& ctx) = 0; - virtual TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) = 0; + + struct TWrapReadSettings { + TMaybe WatermarksMode; + TMaybe WatermarksGranularityMs; + TMaybe WatermarksLateArrivalDelayMs; + TMaybe WatermarksEnableIdlePartitions; + }; + + virtual TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings& settings) = 0; virtual TMaybe ReadStatistics(const TExprNode::TPtr& readWrap, TExprContext& ctx) = 0; virtual TExprNode::TPtr RecaptureWrite(const TExprNode::TPtr& write, TExprContext& ctx) = 0; diff --git a/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp b/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp index 600e3c72e25f..1209b53c77b5 100644 --- a/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp +++ b/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp @@ -2,8 +2,7 @@ namespace NYql { -ui64 TDqIntegrationBase::Partition(const TDqSettings&, size_t, const TExprNode&, - TVector&, TString*, TExprContext&, bool) { +ui64 TDqIntegrationBase::Partition(const TExprNode&, TVector&, TString*, TExprContext&, const TPartitionSettings& ) { return 0; } @@ -22,7 +21,7 @@ TMaybe TDqIntegrationBase::EstimateReadSize(ui64, ui32, const TVector& partitions, TString* clusterName, TExprContext& ctx, bool canFallback) override; + ui64 Partition(const TExprNode& node, TVector& partitions, TString* clusterName, TExprContext& ctx, const TPartitionSettings& settings) override; bool CheckPragmas(const TExprNode& node, TExprContext& ctx, bool skipIssues) override; bool CanRead(const TExprNode& read, TExprContext& ctx, bool skipIssues) override; TMaybe EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TVector& nodes, TExprContext& ctx) override; - TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) override; + TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings& settings) override; TMaybe ReadStatistics(const TExprNode::TPtr& readWrap, TExprContext& ctx) override; TExprNode::TPtr RecaptureWrite(const TExprNode::TPtr& write, TExprContext& ctx) override; void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) override; diff --git a/yql/essentials/providers/pg/provider/yql_pg_dq_integration.cpp b/yql/essentials/providers/pg/provider/yql_pg_dq_integration.cpp index 2e8a9b434f48..263e0fe3105c 100644 --- a/yql/essentials/providers/pg/provider/yql_pg_dq_integration.cpp +++ b/yql/essentials/providers/pg/provider/yql_pg_dq_integration.cpp @@ -25,7 +25,7 @@ class TPgDqIntegration: public TDqIntegrationBase { return Nothing(); } - ui64 Partition(const TDqSettings&, size_t, const TExprNode&, TVector& partitions, TString*, TExprContext&, bool) override { + ui64 Partition(const TExprNode&, TVector& partitions, TString*, TExprContext&, const TPartitionSettings&) override { partitions.clear(); partitions.emplace_back(); return 0ULL; From 9abfdacf4ebece9c8666e859a6cb92fa4ea782b9 Mon Sep 17 00:00:00 2001 From: Alexander Smirnov Date: Thu, 28 Nov 2024 15:03:17 +0000 Subject: [PATCH 5/8] Import libraries 241128-1502 --- ydb/ci/rightlib.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/ci/rightlib.txt b/ydb/ci/rightlib.txt index c740b6d99812..272506c34f44 100644 --- a/ydb/ci/rightlib.txt +++ b/ydb/ci/rightlib.txt @@ -1 +1 @@ -23e9865bb938b83e7e32b670ba055c407f75494b +796e6186c6652f49958e68c7eb0f06c52827e702 From 335f07a9e90ca2c5b14652a58d0499bbda37e217 Mon Sep 17 00:00:00 2001 From: Roman Udovichenko Date: Thu, 28 Nov 2024 18:17:55 +0300 Subject: [PATCH 6/8] Apply arcadia changes (#12106) --- ydb/core/kqp/opt/kqp_opt_kql.cpp | 2 +- .../kqp/query_compiler/kqp_query_compiler.cpp | 5 +++- ydb/library/yql/dq/opt/dq_opt_log.cpp | 4 +-- ydb/library/yql/dq/opt/dq_opt_log.h | 4 +-- .../yql_clickhouse_dq_integration.cpp | 4 +-- .../dq/planner/execution_planner.cpp | 10 +++++-- .../dq/provider/yql_dq_recapture.cpp | 8 ++++- .../provider/ut/pushdown/pushdown_ut.cpp | 2 +- .../provider/yql_generic_dq_integration.cpp | 5 ++-- .../pq/provider/yql_pq_dq_integration.cpp | 30 +++++++++---------- .../s3/provider/yql_s3_dq_integration.cpp | 9 +++--- .../provider/yql_solomon_dq_integration.cpp | 5 ++-- .../ydb/provider/yql_ydb_dq_integration.cpp | 8 ++--- .../provider/ut/yql_yt_dq_integration_ut.cpp | 19 ++++-------- .../yt/provider/yql_yt_dq_integration.cpp | 16 +++++----- 15 files changed, 68 insertions(+), 63 deletions(-) diff --git a/ydb/core/kqp/opt/kqp_opt_kql.cpp b/ydb/core/kqp/opt/kqp_opt_kql.cpp index f02c81c1f6ed..d201edaf0285 100644 --- a/ydb/core/kqp/opt/kqp_opt_kql.cpp +++ b/ydb/core/kqp/opt/kqp_opt_kql.cpp @@ -1017,7 +1017,7 @@ TMaybe BuildKqlQuery(TKiDataQueryBlocks dataQueryBlocks, const TK auto dataSource = typesCtx.DataSourceMap.FindPtr(dataSourceName); YQL_ENSURE(dataSource); if (auto dqIntegration = (*dataSource)->GetDqIntegration()) { - auto newRead = dqIntegration->WrapRead(NYql::TDqSettings(), input.Cast().Ptr(), ctx); + auto newRead = dqIntegration->WrapRead(input.Cast().Ptr(), ctx, {}); if (newRead.Get() != input.Raw()) { return newRead; } diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index f4e5465cc19c..b10e68dabea6 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -1092,7 +1092,10 @@ class TKqpQueryCompiler : public IKqpQueryCompiler { // We prepare a lot of partitions and distribute them between these tasks // Constraint of 1 task per partition is NOT valid anymore auto maxTasksPerStage = Config->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage); - dqIntegration->Partition(NYql::TDqSettings(), maxTasksPerStage, source.Ref(), partitionParams, &clusterName, ctx, false); + IDqIntegration::TPartitionSettings pSettings; + pSettings.MaxPartitions = maxTasksPerStage; + pSettings.CanFallback = false; + dqIntegration->Partition(source.Ref(), partitionParams, &clusterName, ctx, pSettings); externalSource.SetTaskParamKey(TString(dataSourceCategory)); for (const TString& partitionParam : partitionParams) { externalSource.AddPartitionedTaskParams(partitionParam); diff --git a/ydb/library/yql/dq/opt/dq_opt_log.cpp b/ydb/library/yql/dq/opt/dq_opt_log.cpp index 94cc0f728e1b..6c9a4abbf811 100644 --- a/ydb/library/yql/dq/opt/dq_opt_log.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_log.cpp @@ -336,7 +336,7 @@ NNodes::TExprBase DqReplicateFieldSubset(NNodes::TExprBase node, TExprContext& c return node; } -IGraphTransformer::TStatus DqWrapIO(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const TDqSettings& config) { +IGraphTransformer::TStatus DqWrapIO(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const IDqIntegration::TWrapReadSettings& wrSettings) { TOptimizeExprSettings settings{&typesCtx}; auto status = OptimizeExpr(input, output, [&](const TExprNode::TPtr& node, TExprContext& ctx) { if (auto maybeRead = TMaybeNode(node).Input()) { @@ -345,7 +345,7 @@ IGraphTransformer::TStatus DqWrapIO(const TExprNode::TPtr& input, TExprNode::TPt auto dataSource = typesCtx.DataSourceMap.FindPtr(dataSourceName); YQL_ENSURE(dataSource); if (auto dqIntegration = (*dataSource)->GetDqIntegration()) { - auto newRead = dqIntegration->WrapRead(config, maybeRead.Cast().Ptr(), ctx); + auto newRead = dqIntegration->WrapRead(maybeRead.Cast().Ptr(), ctx, wrSettings); if (newRead.Get() != maybeRead.Raw()) { return newRead; } diff --git a/ydb/library/yql/dq/opt/dq_opt_log.h b/ydb/library/yql/dq/opt/dq_opt_log.h index e33642f6d569..4bde75d88320 100644 --- a/ydb/library/yql/dq/opt/dq_opt_log.h +++ b/ydb/library/yql/dq/opt/dq_opt_log.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -11,7 +12,6 @@ namespace NYql { class IOptimizationContext; struct TTypeAnnotationContext; - struct TDqSettings; struct IProviderContext; struct TRelOptimizerNode; struct TOptimizerStatistics; @@ -38,7 +38,7 @@ NNodes::TExprBase DqSqlInDropCompact(NNodes::TExprBase node, TExprContext& ctx); NNodes::TExprBase DqReplicateFieldSubset(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parents); -IGraphTransformer::TStatus DqWrapIO(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const TDqSettings& config); +IGraphTransformer::TStatus DqWrapIO(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const IDqIntegration::TWrapReadSettings& wrSettings); NNodes::TExprBase DqExpandMatchRecognize(NNodes::TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typeAnnCtx); diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_dq_integration.cpp b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_dq_integration.cpp index 6fc36b0bfce8..8481141a3088 100644 --- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_dq_integration.cpp +++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_dq_integration.cpp @@ -33,7 +33,7 @@ class TClickHouseDqIntegration: public TDqIntegrationBase { return Nothing(); } - TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override { + TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings& ) override { if (const auto maybeClReadTable = TMaybeNode(read)) { const auto clReadTable = maybeClReadTable.Cast(); const auto token = TString("cluster:default_") += clReadTable.DataSource().Cluster().StringValue(); @@ -66,7 +66,7 @@ class TClickHouseDqIntegration: public TDqIntegrationBase { return read; } - ui64 Partition(const TDqSettings&, size_t, const TExprNode&, TVector& partitions, TString*, TExprContext&, bool) override { + ui64 Partition(const TExprNode&, TVector& partitions, TString*, TExprContext&, const TPartitionSettings&) override { partitions.clear(); NCH::TRange range; // range.SetRange("limit 42 offset 42 order by ...."); // Possible set range like this. diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp index 4b60df1d9dad..d3a6a2989da4 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp +++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp @@ -544,7 +544,13 @@ namespace NYql::NDqs { TVector parts; if (auto dqIntegration = (*datasource)->GetDqIntegration()) { TString clusterName; - _MaxDataSizePerJob = Max(_MaxDataSizePerJob, dqIntegration->Partition(*Settings, maxPartitions, *read, parts, &clusterName, ExprContext, canFallback)); + IDqIntegration::TPartitionSettings settings { + .DataSizePerJob = Settings->DataSizePerJob.Get(), + .MaxPartitions = maxPartitions, + .EnableComputeActor = Settings->EnableComputeActor.Get(), + .CanFallback = canFallback + }; + _MaxDataSizePerJob = Max(_MaxDataSizePerJob, dqIntegration->Partition(*read, parts, &clusterName, ExprContext, settings)); TMaybe<::google::protobuf::Any> sourceSettings; TString sourceType; if (dqSource) { @@ -585,7 +591,7 @@ namespace NYql::NDqs { YQL_ENSURE(dataSource); auto dqIntegration = (*dataSource)->GetDqIntegration(); YQL_ENSURE(dqIntegration); - + google::protobuf::Any providerSpecificLookupSourceSettings; TString sourceType; dqIntegration->FillLookupSourceSettings(*rightInput.Raw(), providerSpecificLookupSourceSettings, sourceType); diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp index 2829f729c68c..705c49669fa3 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp @@ -97,7 +97,13 @@ class TDqsRecaptureTransformer : public TSyncTransformerBase { State_->TypeCtx->DqFallbackPolicy = State_->Settings->FallbackPolicy.Get().GetOrElse(EFallbackPolicy::Default); - IGraphTransformer::TStatus status = NDq::DqWrapIO(input, output, ctx, *State_->TypeCtx, *State_->Settings); + IDqIntegration::TWrapReadSettings wrSettings { + .WatermarksMode = State_->Settings->WatermarksMode.Get(), + .WatermarksGranularityMs = State_->Settings->WatermarksGranularityMs.Get(), + .WatermarksLateArrivalDelayMs = State_->Settings->WatermarksLateArrivalDelayMs.Get(), + .WatermarksEnableIdlePartitions = State_->Settings->WatermarksEnableIdlePartitions.Get() + }; + IGraphTransformer::TStatus status = NDq::DqWrapIO(input, output, ctx, *State_->TypeCtx, wrSettings); if (input != output) { YQL_CLOG(INFO, ProviderDq) << "DqsRecapture"; // TODO: Add before/after recapture transformers diff --git a/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp b/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp index 5ee69b5e9b80..73e538e024a8 100644 --- a/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp +++ b/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp @@ -162,7 +162,7 @@ class TBuildDqSourceSettingsTransformer: public TOptimizeTransformerBase { UNIT_ASSERT(genericDataSource != Types->DataSourceMap.end()); auto dqIntegration = genericDataSource->second->GetDqIntegration(); UNIT_ASSERT(dqIntegration); - auto newRead = dqIntegration->WrapRead(TDqSettings(), input.Ptr(), ctx); + auto newRead = dqIntegration->WrapRead(input.Ptr(), ctx, IDqIntegration::TWrapReadSettings{}); BuildSettings(newRead, dqIntegration, ctx); return newRead; } diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp index 39ed3bc50bb2..c7a92f783037 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp @@ -63,7 +63,7 @@ namespace NYql { return Nothing(); } - TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override { + TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings&) override { if (const auto maybeGenReadTable = TMaybeNode(read)) { const auto genReadTable = maybeGenReadTable.Cast(); YQL_ENSURE(genReadTable.Ref().GetTypeAnn(), "No type annotation for node " << genReadTable.Ref().Content()); @@ -106,8 +106,7 @@ namespace NYql { return read; } - ui64 Partition(const TDqSettings&, size_t, const TExprNode&, TVector& partitions, TString*, TExprContext&, - bool) override { + ui64 Partition(const TExprNode&, TVector& partitions, TString*, TExprContext&, const TPartitionSettings&) override { partitions.clear(); Generic::TRange range; partitions.emplace_back(); diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp index cfa5da4d243e..80685f51cf00 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp @@ -59,20 +59,20 @@ class TPqDqIntegration: public TDqIntegrationBase { return 0; } - ui64 Partition(const TDqSettings&, size_t maxPartitions, const TExprNode& node, TVector& partitions, TString*, TExprContext&, bool) override { + ui64 Partition(const TExprNode& node, TVector& partitions, TString*, TExprContext&, const TPartitionSettings& settings) override { if (auto maybePqRead = TMaybeNode(&node)) { - return PartitionTopicRead(maybePqRead.Cast().Topic(), maxPartitions, partitions); + return PartitionTopicRead(maybePqRead.Cast().Topic(), settings.MaxPartitions, partitions); } if (auto maybeDqSource = TMaybeNode(&node)) { - auto settings = maybeDqSource.Cast().Settings(); - if (auto topicSource = TMaybeNode(settings.Raw())) { - return PartitionTopicRead(topicSource.Cast().Topic(), maxPartitions, partitions); + auto srcSettings = maybeDqSource.Cast().Settings(); + if (auto topicSource = TMaybeNode(srcSettings.Raw())) { + return PartitionTopicRead(topicSource.Cast().Topic(), settings.MaxPartitions, partitions); } } return 0; } - TExprNode::TPtr WrapRead(const TDqSettings& dqSettings, const TExprNode::TPtr& read, TExprContext& ctx) override { + TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings& wrSettings) override { if (const auto& maybePqReadTopic = TMaybeNode(read)) { const auto& pqReadTopic = maybePqReadTopic.Cast(); YQL_ENSURE(pqReadTopic.Ref().GetTypeAnn(), "No type annotation for node " << pqReadTopic.Ref().Content()); @@ -127,7 +127,7 @@ class TPqDqIntegration: public TDqIntegrationBase { const auto& typeItems = pqReadTopic.Topic().RowSpec().Ref().GetTypeAnn()->Cast()->GetType()->Cast()->GetItems(); const auto pos = read->Pos(); - + TExprNode::TListType colNames; colNames.reserve(typeItems.size()); std::transform(typeItems.cbegin(), typeItems.cend(), std::back_inserter(colNames), @@ -146,7 +146,7 @@ class TPqDqIntegration: public TDqIntegrationBase { .World(pqReadTopic.World()) .Topic(pqReadTopic.Topic()) .Columns(std::move(columnNames)) - .Settings(BuildTopicReadSettings(clusterName, dqSettings, read->Pos(), format, ctx)) + .Settings(BuildTopicReadSettings(clusterName, wrSettings, read->Pos(), format, ctx)) .Token() .Name().Build(token) .Build() @@ -325,7 +325,7 @@ class TPqDqIntegration: public TDqIntegrationBase { NNodes::TCoNameValueTupleList BuildTopicReadSettings( const TString& cluster, - const TDqSettings& dqSettings, + const IDqIntegration::TWrapReadSettings& wrSettings, TPositionHandle pos, std::string_view format, TExprContext& ctx) const @@ -349,7 +349,7 @@ class TPqDqIntegration: public TDqIntegrationBase { Add(props, ReconnectPeriod, ToString(clusterConfiguration->ReconnectPeriod), pos, ctx); Add(props, Format, format, pos, ctx); - + if (clusterConfiguration->UseSsl) { Add(props, UseSslSetting, "1", pos, ctx); } @@ -358,23 +358,21 @@ class TPqDqIntegration: public TDqIntegrationBase { Add(props, AddBearerToTokenSetting, "1", pos, ctx); } - if (dqSettings.WatermarksMode.Get().GetOrElse("") == "default") { + if (wrSettings.WatermarksMode.GetOrElse("") == "default") { Add(props, WatermarksEnableSetting, ToString(true), pos, ctx); - const auto granularity = TDuration::MilliSeconds(dqSettings + const auto granularity = TDuration::MilliSeconds(wrSettings .WatermarksGranularityMs - .Get() .GetOrElse(TDqSettings::TDefault::WatermarksGranularityMs)); Add(props, WatermarksGranularityUsSetting, ToString(granularity.MicroSeconds()), pos, ctx); - const auto lateArrivalDelay = TDuration::MilliSeconds(dqSettings + const auto lateArrivalDelay = TDuration::MilliSeconds(wrSettings .WatermarksLateArrivalDelayMs - .Get() .GetOrElse(TDqSettings::TDefault::WatermarksLateArrivalDelayMs)); Add(props, WatermarksLateArrivalDelayUsSetting, ToString(lateArrivalDelay.MicroSeconds()), pos, ctx); } - if (dqSettings.WatermarksEnableIdlePartitions.Get().GetOrElse(false)) { + if (wrSettings.WatermarksEnableIdlePartitions.GetOrElse(false)) { Add(props, WatermarksIdlePartitionsSetting, ToString(true), pos, ctx); } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index f9e5d0e1d566..feb7730a9023 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -82,7 +82,7 @@ class TS3DqIntegration: public TDqIntegrationBase { { } - ui64 Partition(const TDqSettings&, size_t maxPartitions, const TExprNode& node, TVector& partitions, TString*, TExprContext&, bool) override { + ui64 Partition(const TExprNode& node, TVector& partitions, TString*, TExprContext&, const TPartitionSettings& settings) override { std::vector> parts; std::optional mbLimitHint; bool hasDirectories = false; @@ -108,6 +108,7 @@ class TS3DqIntegration: public TDqIntegrationBase { } constexpr ui64 maxTaskRatio = 20; + auto maxPartitions = settings.MaxPartitions; if (!maxPartitions || (mbLimitHint && maxPartitions > *mbLimitHint / maxTaskRatio)) { maxPartitions = std::max(*mbLimitHint / maxTaskRatio, ui64{1}); YQL_CLOG(TRACE, ProviderS3) << "limited max partitions to " << maxPartitions; @@ -223,7 +224,7 @@ class TS3DqIntegration: public TDqIntegrationBase { } rows = size / 1024; // magic estimate - return primaryKey + return primaryKey ? TOptimizerStatistics(BaseTable, rows, cols, size, size, TIntrusivePtr(new TOptimizerStatistics::TKeyColumns(*primaryKey))) : TOptimizerStatistics(BaseTable, rows, cols, size, size); } else { @@ -231,7 +232,7 @@ class TS3DqIntegration: public TDqIntegrationBase { } } - TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override { + TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings& ) override { if (const auto& maybeS3ReadObject = TMaybeNode(read)) { const auto& s3ReadObject = maybeS3ReadObject.Cast(); YQL_ENSURE(s3ReadObject.Ref().GetTypeAnn(), "No type annotation for node " << s3ReadObject.Ref().Content()); @@ -394,7 +395,7 @@ class TS3DqIntegration: public TDqIntegrationBase { TExprContext ctx; srcDesc.SetRowType(NCommon::WriteTypeToYson(ctx.MakeType(rowTypeItems), NYT::NYson::EYsonFormat::Text)); } - + if (auto predicate = parseSettings.FilterPredicate(); !IsEmptyFilterPredicate(predicate)) { TStringBuilder err; if (!SerializeFilterPredicate(predicate, srcDesc.mutable_predicate(), err)) { diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp index cca10cbdd9d0..56b96e68dd98 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp @@ -79,8 +79,7 @@ class TSolomonDqIntegration: public TDqIntegrationBase { { } - ui64 Partition(const TDqSettings&, size_t maxPartitions, const TExprNode& node, TVector& partitions, TString*, TExprContext&, bool) override { - Y_UNUSED(maxPartitions); + ui64 Partition(const TExprNode& node, TVector& partitions, TString*, TExprContext&, const TPartitionSettings&) override { Y_UNUSED(node); Y_UNUSED(partitions); partitions.push_back("zz_partition"); @@ -95,7 +94,7 @@ class TSolomonDqIntegration: public TDqIntegrationBase { YQL_ENSURE(false, "Unimplemented"); } - TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override { + TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings&) override { if (const auto& maybeSoReadObject = TMaybeNode(read)) { const auto& soReadObject = maybeSoReadObject.Cast(); YQL_ENSURE(soReadObject.Ref().GetTypeAnn(), "No type annotation for node " << soReadObject.Ref().Content()); diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp index 2e35d34d4b30..0c9be9b9393b 100644 --- a/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp +++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp @@ -23,8 +23,7 @@ class TYdbDqIntegration: public TDqIntegrationBase { { } - ui64 Partition(const TDqSettings& settings, size_t maxPartitions, const TExprNode& node, - TVector& partitions, TString*, TExprContext&, bool) override { + ui64 Partition(const TExprNode& node, TVector& partitions, TString*, TExprContext&, const TPartitionSettings& settings) override { TString cluster, table; if (const TMaybeNode source = &node) { cluster = source.Cast().DataSource().Cast().Cluster().Value(); @@ -35,9 +34,10 @@ class TYdbDqIntegration: public TDqIntegrationBase { } auto& meta = State_->Tables[std::make_pair(cluster, table)]; - meta.ReadAsync = settings.EnableComputeActor.Get().GetOrElse(false); // TODO: Use special method for get settings. + meta.ReadAsync = settings.EnableComputeActor.GetOrElse(false); // TODO: Use special method for get settings. auto parts = meta.Partitions; + auto maxPartitions = settings.MaxPartitions; if (maxPartitions && parts.size() > maxPartitions) { if (const auto extraParts = parts.size() - maxPartitions; extraParts > maxPartitions) { const auto dropsPerTask = (parts.size() - 1ULL) / maxPartitions; @@ -80,7 +80,7 @@ class TYdbDqIntegration: public TDqIntegrationBase { return Nothing(); } - TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override { + TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings&) override { if (const auto& maybeYdbReadTable = TMaybeNode(read)) { const auto& ydbReadTable = maybeYdbReadTable.Cast(); YQL_ENSURE(ydbReadTable.Ref().GetTypeAnn(), "No type annotation for node " << ydbReadTable.Ref().Content()); diff --git a/ydb/library/yql/providers/yt/provider/ut/yql_yt_dq_integration_ut.cpp b/ydb/library/yql/providers/yt/provider/ut/yql_yt_dq_integration_ut.cpp index 5e2e931d5219..cf7225074f88 100644 --- a/ydb/library/yql/providers/yt/provider/ut/yql_yt_dq_integration_ut.cpp +++ b/ydb/library/yql/providers/yt/provider/ut/yql_yt_dq_integration_ut.cpp @@ -1,5 +1,4 @@ #include -#include #include #include @@ -24,10 +23,8 @@ struct TTestSetup { Y_UNIT_TEST_SUITE(TSchedulerTest) { Y_UNIT_TEST(Ranges_table4_table5_test0) { TTestSetup setup; - TDqSettings settings; - settings.DataSizePerJob = 1000; + IDqIntegration::TPartitionSettings settings {.DataSizePerJob = 1000, .MaxPartitions = 4, .EnableComputeActor = {}}; TVector partitions; - size_t maxTasks = 4; auto astStr = "(\n" "(let $4 (Void))\n" "(let $5 (YtMeta '('\"CanWrite\" '\"1\") '('\"DoesExist\" '\"1\") '('\"YqlCompatibleScheme\" '\"1\") '('\"InferredScheme\" '\"0\") '('\"IsDynamic\" '\"0\") '('\"Attrs\" '('('\"optimize_for\" '\"lookup\")))))\n" @@ -49,7 +46,7 @@ Y_UNIT_TEST_SUITE(TSchedulerTest) { TExprNode::TPtr exprRoot_; UNIT_ASSERT(CompileExpr(*astRes.Root, exprRoot_, exprCtx_, nullptr, nullptr)); TString cluster; - const auto result = setup.State->DqIntegration_->Partition(settings, maxTasks, *exprRoot_, partitions, &cluster, exprCtx_, false); + const auto result = setup.State->DqIntegration_->Partition(*exprRoot_, partitions, &cluster, exprCtx_, settings); const auto expected = 428; UNIT_ASSERT_VALUES_EQUAL(result, expected); UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 3); @@ -57,10 +54,8 @@ Y_UNIT_TEST_SUITE(TSchedulerTest) { Y_UNIT_TEST(Ranges_table4_table7_test1) { TTestSetup setup; - TDqSettings settings; - settings.DataSizePerJob = 1000; + IDqIntegration::TPartitionSettings settings {.DataSizePerJob = 1000, .MaxPartitions = 2, .EnableComputeActor = {}}; TVector partitions; - size_t maxTasks = 2; auto astStr = "(\n" "(let $4 (Void))\n" "(let $5 (YtMeta '('\"CanWrite\" '\"1\") '('\"DoesExist\" '\"1\") '('\"YqlCompatibleScheme\" '\"1\") '('\"InferredScheme\" '\"0\") '('\"IsDynamic\" '\"0\") '('\"Attrs\" '('('\"optimize_for\" '\"lookup\")))))\n" @@ -89,7 +84,7 @@ Y_UNIT_TEST_SUITE(TSchedulerTest) { TExprNode::TPtr exprRoot_; UNIT_ASSERT(CompileExpr(*astRes.Root, exprRoot_, exprCtx_, nullptr, nullptr)); TString cluster; - const auto result = setup.State->DqIntegration_->Partition(settings, maxTasks, *exprRoot_, partitions, &cluster, exprCtx_, false); + const auto result = setup.State->DqIntegration_->Partition(*exprRoot_, partitions, &cluster, exprCtx_, settings); const auto expected = 642; UNIT_ASSERT_VALUES_EQUAL(result, expected); UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 2); @@ -97,10 +92,8 @@ Y_UNIT_TEST_SUITE(TSchedulerTest) { Y_UNIT_TEST(Ranges_table4_table7_test2) { TTestSetup setup; - TDqSettings settings; - settings.DataSizePerJob = 1000; + IDqIntegration::TPartitionSettings settings {.DataSizePerJob = 1000, .MaxPartitions = 10, .EnableComputeActor = {}}; TVector partitions; - size_t maxTasks = 10; auto astStr = "(\n" "(let $4 (Void))\n" "(let $5 (YtMeta '('\"CanWrite\" '\"1\") '('\"DoesExist\" '\"1\") '('\"YqlCompatibleScheme\" '\"1\") '('\"InferredScheme\" '\"0\") '('\"IsDynamic\" '\"0\") '('\"Attrs\" '('('\"optimize_for\" '\"lookup\")))))\n" @@ -129,7 +122,7 @@ Y_UNIT_TEST_SUITE(TSchedulerTest) { TExprNode::TPtr exprRoot_; UNIT_ASSERT(CompileExpr(*astRes.Root, exprRoot_, exprCtx_, nullptr, nullptr)); TString cluster; - const auto result = setup.State->DqIntegration_->Partition(settings, maxTasks, *exprRoot_, partitions, &cluster, exprCtx_, false); + const auto result = setup.State->DqIntegration_->Partition(*exprRoot_, partitions, &cluster, exprCtx_, settings); const auto expected = 214; UNIT_ASSERT_VALUES_EQUAL(result, expected); UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 6); diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp index 082b038ba90f..badb6420ced2 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp @@ -197,10 +197,9 @@ class TYtDqIntegration: public TDqIntegrationBase { return groupIdColumnarStats; } - ui64 Partition(const TDqSettings& config, size_t maxTasks, const TExprNode& node, - TVector& serializedPartitions, TString* clusterName, TExprContext& ctx, bool canFallback) override + ui64 Partition(const TExprNode& node, TVector& serializedPartitions, TString* clusterName, TExprContext& ctx, const TPartitionSettings& settings) override { - auto dataSizePerJob = config.DataSizePerJob.Get().GetOrElse(TDqSettings::TDefault::DataSizePerJob); + auto dataSizePerJob = settings.DataSizePerJob.GetOrElse(TDqSettings::TDefault::DataSizePerJob); if (!TMaybeNode(&node).IsValid()) { return 0; } @@ -228,7 +227,7 @@ class TYtDqIntegration: public TDqIntegrationBase { groupIdPathInfos.push_back(pathInfos); } - if (auto maxChunks = State_->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); canFallback && chunksCount > maxChunks) { + if (auto maxChunks = State_->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); settings.CanFallback && chunksCount > maxChunks) { throw TFallbackError() << DqFallbackErrorMessageWrap( TStringBuilder() << "table with too many chunks: " << chunksCount << " > " << maxChunks); } @@ -240,6 +239,7 @@ class TYtDqIntegration: public TDqIntegrationBase { } } + auto maxTasks = settings.MaxPartitions; ui64 maxDataSizePerJob = 0; if (State_->Configuration->_EnableYtPartitioning.Get(cluster).GetOrElse(false)) { TVector paths; @@ -269,7 +269,7 @@ class TYtDqIntegration: public TDqIntegrationBase { .Cluster(cluster) .MaxPartitions(maxTasks) .DataSizePerJob(dataSizePerJob) - .AdjustDataWeightPerPartition(!canFallback) + .AdjustDataWeightPerPartition(!settings.CanFallback) .Config(State_->Configuration->Snapshot()) .Paths(std::move(paths))); if (!res.Success()) { @@ -280,7 +280,7 @@ class TYtDqIntegration: public TDqIntegrationBase { issue.AddSubIssue(MakeIntrusive(subIssue)); } - if (canFallback) { + if (settings.CanFallback) { throw TFallbackError(MakeIntrusive(std::move(issue))) << message; } else { ctx.IssueManager.RaiseIssue(issue); @@ -303,7 +303,7 @@ class TYtDqIntegration: public TDqIntegrationBase { ui64 sumAllTableSizes = 0; TVector> groupIdColumnarStats = EstimateColumnStats(ctx, cluster, {groupIdPathInfos}, sumAllTableSizes); ui64 parts = (sumAllTableSizes + dataSizePerJob - 1) / dataSizePerJob; - if (canFallback && hasErasure && parts > maxTasks) { + if (settings.CanFallback && hasErasure && parts > maxTasks) { auto message = DqFallbackErrorMessageWrap("too big table with erasure codec"); YQL_CLOG(INFO, ProviderDq) << message; throw TFallbackError() << message; @@ -642,7 +642,7 @@ class TYtDqIntegration: public TDqIntegrationBase { ctx.AddError(YqlIssue(ctx.GetPosition(where), TIssuesIds::DQ_OPTIMIZE_ERROR, DqFallbackErrorMessageWrap(cause))); } - TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override { + TExprNode::TPtr WrapRead(const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings&) override { if (auto maybeYtReadTable = TMaybeNode(read)) { TMaybeNode secParams; const auto cluster = maybeYtReadTable.Cast().DataSource().Cluster(); From 8f7c3ef03c06bff3bd0ee0f2d690694754ccd26d Mon Sep 17 00:00:00 2001 From: udovichenko-r Date: Thu, 28 Nov 2024 22:28:55 +0300 Subject: [PATCH 7/8] YQL-19309 Remove yt provider dependecy on dq internals commit_hash:94de9654226b766f9fd4f8fff607b59e747a6f96 --- .../core/dq_integration/yql_dq_helper.cpp | 1 + .../core/dq_integration/yql_dq_helper.h | 29 +++++++++++++++++++ .../ut/extract_predicate_ut.cpp | 2 +- yql/essentials/core/ut/yql_execution_ut.cpp | 2 +- yql/essentials/core/ut/yql_qplayer_ut.cpp | 2 +- yql/essentials/tools/pgrun/pgrun.cpp | 2 +- 6 files changed, 34 insertions(+), 4 deletions(-) create mode 100644 yql/essentials/core/dq_integration/yql_dq_helper.cpp create mode 100644 yql/essentials/core/dq_integration/yql_dq_helper.h diff --git a/yql/essentials/core/dq_integration/yql_dq_helper.cpp b/yql/essentials/core/dq_integration/yql_dq_helper.cpp new file mode 100644 index 000000000000..e083986833da --- /dev/null +++ b/yql/essentials/core/dq_integration/yql_dq_helper.cpp @@ -0,0 +1 @@ +#include "yql_dq_helper.h" diff --git a/yql/essentials/core/dq_integration/yql_dq_helper.h b/yql/essentials/core/dq_integration/yql_dq_helper.h new file mode 100644 index 000000000000..9876c3389bc8 --- /dev/null +++ b/yql/essentials/core/dq_integration/yql_dq_helper.h @@ -0,0 +1,29 @@ +#pragma once + +#include + +#include +#include +#include +#include + + +namespace NYql { + +class IOptimizationContext; + +class IDqHelper { +public: + using TPtr = std::shared_ptr; + + virtual ~IDqHelper() {} + + virtual bool IsSingleConsumerConnection(const TExprNode::TPtr& node, const TParentsMap& parentsMap) = 0; + virtual TExprNode::TPtr PushLambdaAndCreateCnResult(const TExprNode::TPtr& dcUnionAll, const TExprNode::TPtr& lambda, TPositionHandle pos, + TExprContext& ctx, IOptimizationContext& optCtx) = 0; + virtual TExprNode::TPtr CreateDqStageSettings(bool singleTask, TExprContext& ctx, TPositionHandle pos) = 0; + virtual TExprNode::TListType RemoveVariadicDqStageSettings(const TExprNode& settings) = 0; +}; + + +} // namespace NYql diff --git a/yql/essentials/core/extract_predicate/ut/extract_predicate_ut.cpp b/yql/essentials/core/extract_predicate/ut/extract_predicate_ut.cpp index ec4a49a65ce8..5c5511790167 100644 --- a/yql/essentials/core/extract_predicate/ut/extract_predicate_ut.cpp +++ b/yql/essentials/core/extract_predicate/ut/extract_predicate_ut.cpp @@ -155,7 +155,7 @@ Y_UNIT_TEST_SUITE(TYqlExtractPredicate) { auto ytGateway = CreateYtFileGateway(yqlNativeServices); TVector dataProvidersInit; - dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytGateway, MakeSimpleCBOOptimizerFactory())); + dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytGateway, MakeSimpleCBOOptimizerFactory(), {})); TProgramFactory factory(true, funcReg, 0ULL, dataProvidersInit, "ut"); TProgramPtr program = factory.Create("-stdin-", Src); diff --git a/yql/essentials/core/ut/yql_execution_ut.cpp b/yql/essentials/core/ut/yql_execution_ut.cpp index 312d5db913c4..8860188a543a 100644 --- a/yql/essentials/core/ut/yql_execution_ut.cpp +++ b/yql/essentials/core/ut/yql_execution_ut.cpp @@ -59,7 +59,7 @@ namespace NYql { auto ytGateway = CreateYtFileGateway(yqlNativeServices); TVector dataProvidersInit; - dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytGateway, MakeSimpleCBOOptimizerFactory())); + dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytGateway, MakeSimpleCBOOptimizerFactory(), {})); TProgramFactory factory(true, funcReg, 0ULL, dataProvidersInit, "ut"); TProgramPtr program = factory.Create("-stdin-", Src); diff --git a/yql/essentials/core/ut/yql_qplayer_ut.cpp b/yql/essentials/core/ut/yql_qplayer_ut.cpp index 8fa8eb9d84f8..89d27699d86d 100644 --- a/yql/essentials/core/ut/yql_qplayer_ut.cpp +++ b/yql/essentials/core/ut/yql_qplayer_ut.cpp @@ -83,7 +83,7 @@ bool RunProgram(bool replay, const TString& query, const TQContext& qContext, co auto ytGateway = CreateYtFileGateway(yqlNativeServices); TVector dataProvidersInit; - dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytGateway, MakeSimpleCBOOptimizerFactory())); + dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytGateway, MakeSimpleCBOOptimizerFactory(), {})); TExprContext modulesCtx; IModuleResolver::TPtr moduleResolver; diff --git a/yql/essentials/tools/pgrun/pgrun.cpp b/yql/essentials/tools/pgrun/pgrun.cpp index dfcf373f0af1..404f1297174c 100644 --- a/yql/essentials/tools/pgrun/pgrun.cpp +++ b/yql/essentials/tools/pgrun/pgrun.cpp @@ -1170,7 +1170,7 @@ int Main(int argc, char* argv[]) } TVector dataProvidersInit; - dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway, MakeSimpleCBOOptimizerFactory())); + dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway, MakeSimpleCBOOptimizerFactory(), {})); dataProvidersInit.push_back(GetPgDataProviderInitializer()); TExprContext ctx; From c069549e798236746be04f8c87bc6ca287fd8397 Mon Sep 17 00:00:00 2001 From: Roman Udovichenko Date: Thu, 28 Nov 2024 19:53:12 +0000 Subject: [PATCH 8/8] Apply: Remove yt provider dependecy on dq internals --- ydb/core/kqp/host/kqp_host.cpp | 3 +- ydb/core/kqp/host/ya.make | 1 + ydb/library/yql/providers/dq/helper/ya.make | 14 +++ .../dq/helper/yql_dq_helper_impl.cpp | 93 +++++++++++++++++ .../providers/dq/helper/yql_dq_helper_impl.h | 12 +++ .../dq/planner/execution_planner.cpp | 2 +- ydb/library/yql/providers/dq/ya.make | 1 + .../provider/phy_opt/yql_yt_phy_opt_write.cpp | 99 +++---------------- .../yql/providers/yt/provider/ut/ya.make | 2 - .../yt/provider/ut/yql_yt_epoch_ut.cpp | 15 +-- ydb/library/yql/providers/yt/provider/ya.make | 4 +- .../yt/provider/yql_yt_datasink_exec.cpp | 3 - .../yt/provider/yql_yt_datasink_type_ann.cpp | 1 - .../yt/provider/yql_yt_dq_hybrid.cpp | 28 +++--- .../yt/provider/yql_yt_dq_integration.cpp | 4 +- .../providers/yt/provider/yql_yt_op_hash.cpp | 44 ++++----- .../providers/yt/provider/yql_yt_provider.cpp | 9 +- .../providers/yt/provider/yql_yt_provider.h | 8 +- ydb/library/yql/tools/dqrun/dqrun.cpp | 9 +- ydb/library/yql/tools/dqrun/ya.make | 1 + ydb/library/yql/tools/mrrun/mrrun.cpp | 5 +- ydb/library/yql/tools/mrrun/ya.make | 1 + ydb/library/yql/tools/yqlrun/http/ya.make | 1 + .../yql/tools/yqlrun/http/yql_server.cpp | 5 +- ydb/library/yql/tools/yqlrun/ya.make | 1 + ydb/library/yql/tools/yqlrun/yqlrun.cpp | 5 +- ydb/library/yql/yt/native/plugin.cpp | 3 +- ydb/library/yql/yt/native/ya.make | 1 + 28 files changed, 211 insertions(+), 164 deletions(-) create mode 100644 ydb/library/yql/providers/dq/helper/ya.make create mode 100644 ydb/library/yql/providers/dq/helper/yql_dq_helper_impl.cpp create mode 100644 ydb/library/yql/providers/dq/helper/yql_dq_helper_impl.h diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 42a48eb59d54..39bccbab419c 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -1818,7 +1819,7 @@ class TKqpHost : public IKqpHost { } TString sessionId = CreateGuidAsString(); - auto [ytState, statWriter] = CreateYtNativeState(FederatedQuerySetup->YtGateway, userName, sessionId, &FederatedQuerySetup->YtGatewayConfig, TypesCtx, NDq::MakeCBOOptimizerFactory()); + auto [ytState, statWriter] = CreateYtNativeState(FederatedQuerySetup->YtGateway, userName, sessionId, &FederatedQuerySetup->YtGatewayConfig, TypesCtx, NDq::MakeCBOOptimizerFactory(), MakeDqHelper()); ytState->PassiveExecution = true; ytState->Gateway->OpenSession( diff --git a/ydb/core/kqp/host/ya.make b/ydb/core/kqp/host/ya.make index c1d03eacd33c..a8b0dfb59c88 100644 --- a/ydb/core/kqp/host/ya.make +++ b/ydb/core/kqp/host/ya.make @@ -25,6 +25,7 @@ PEERDIR( yql/essentials/core yql/essentials/providers/common/codec ydb/library/yql/dq/opt + ydb/library/yql/providers/dq/helper ydb/library/yql/providers/common/http_gateway yql/essentials/providers/common/udf_resolve yql/essentials/providers/config diff --git a/ydb/library/yql/providers/dq/helper/ya.make b/ydb/library/yql/providers/dq/helper/ya.make new file mode 100644 index 000000000000..d43dffdeb896 --- /dev/null +++ b/ydb/library/yql/providers/dq/helper/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +PEERDIR( + yql/essentials/core/dq_integration + ydb/library/yql/dq/expr_nodes + ydb/library/yql/dq/opt + ydb/library/yql/dq/type_ann +) + +SRCS( + yql_dq_helper_impl.cpp +) + +END() diff --git a/ydb/library/yql/providers/dq/helper/yql_dq_helper_impl.cpp b/ydb/library/yql/providers/dq/helper/yql_dq_helper_impl.cpp new file mode 100644 index 000000000000..519845845e82 --- /dev/null +++ b/ydb/library/yql/providers/dq/helper/yql_dq_helper_impl.cpp @@ -0,0 +1,93 @@ +#include "yql_dq_helper_impl.h" + +#include +#include +#include +#include + + +namespace NYql { + +using namespace NNodes; + +class TDqHelperImpl: public IDqHelper { +public: + bool IsSingleConsumerConnection(const TExprNode::TPtr& node, const TParentsMap& parentsMap) final { + YQL_ENSURE(TDqCnUnionAll::Match(node.Get())); + return NDq::IsSingleConsumerConnection(TDqCnUnionAll(node), parentsMap); + } + + TExprNode::TPtr PushLambdaAndCreateCnResult(const TExprNode::TPtr& conn, const TExprNode::TPtr& lambda, TPositionHandle pos, + TExprContext& ctx, IOptimizationContext& optCtx) final + { + YQL_ENSURE(TDqCnUnionAll::Match(conn.Get())); + YQL_ENSURE(lambda->IsLambda()); + + auto dqUnion = TDqCnUnionAll(conn); + TMaybeNode result; + if (NDq::GetStageOutputsCount(dqUnion.Output().Stage()) > 1) { + result = Build(ctx, pos) + .Output() + .Stage() + .Inputs() + .Add(dqUnion) + .Build() + .Program(lambda) + .Settings(NDq::TDqStageSettings().BuildNode(ctx, pos)) + .Build() + .Index().Build("0") + .Build() + .Done().Ptr(); + } else { + result = NDq::DqPushLambdaToStageUnionAll(dqUnion, TCoLambda(lambda), {}, ctx, optCtx); + if (!result) { + return {}; + } + } + + return Build(ctx, pos) + .Output() + .Stage() + .Inputs() + .Add(result.Cast()) + .Build() + .Program() + .Args({"row"}) + .Body("row") + .Build() + .Settings(NDq::TDqStageSettings().BuildNode(ctx, pos)) + .Build() + .Index().Build("0") + .Build() + .ColumnHints() // TODO: set column hints + .Build() + .Done().Ptr(); + } + + TExprNode::TPtr CreateDqStageSettings(bool singleTask, TExprContext& ctx, TPositionHandle pos) final { + NDq::TDqStageSettings settings; + settings.PartitionMode = singleTask ? NDq::TDqStageSettings::EPartitionMode::Single : NDq::TDqStageSettings::EPartitionMode::Default; + return settings.BuildNode(ctx, pos).Ptr(); + } + + TExprNode::TListType RemoveVariadicDqStageSettings(const TExprNode& settings) final { + TExprNode::TListType res; + + for (auto n: settings.Children()) { + if (n->Type() == TExprNode::List + && n->ChildrenSize() > 0 + && n->Child(0)->Content() == NDq::TDqStageSettings::LogicalIdSettingName) { + continue; + } + res.push_back(n); + } + return res; + } +}; + + +IDqHelper::TPtr MakeDqHelper() { + return std::make_shared(); +} + +} // namespace NYql diff --git a/ydb/library/yql/providers/dq/helper/yql_dq_helper_impl.h b/ydb/library/yql/providers/dq/helper/yql_dq_helper_impl.h new file mode 100644 index 000000000000..80ee82342d81 --- /dev/null +++ b/ydb/library/yql/providers/dq/helper/yql_dq_helper_impl.h @@ -0,0 +1,12 @@ +#pragma once + +#include +#include + +#include + +namespace NYql { + +IDqHelper::TPtr MakeDqHelper(); + +} // namespace NYql diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp index d3a6a2989da4..bf0a0cba842a 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp +++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp @@ -545,7 +545,7 @@ namespace NYql::NDqs { if (auto dqIntegration = (*datasource)->GetDqIntegration()) { TString clusterName; IDqIntegration::TPartitionSettings settings { - .DataSizePerJob = Settings->DataSizePerJob.Get(), + .DataSizePerJob = Settings->DataSizePerJob.Get().GetOrElse(TDqSettings::TDefault::DataSizePerJob), .MaxPartitions = maxPartitions, .EnableComputeActor = Settings->EnableComputeActor.Get(), .CanFallback = canFallback diff --git a/ydb/library/yql/providers/dq/ya.make b/ydb/library/yql/providers/dq/ya.make index b3bd4e2f7592..fcde4bce3f81 100644 --- a/ydb/library/yql/providers/dq/ya.make +++ b/ydb/library/yql/providers/dq/ya.make @@ -4,6 +4,7 @@ RECURSE( counters expr_nodes global_worker_manager + helper interface metrics mkql diff --git a/ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_write.cpp b/ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_write.cpp index d7f8f734099b..dda1b76b4d3c 100644 --- a/ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_write.cpp +++ b/ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_write.cpp @@ -9,9 +9,6 @@ #include #include -#include -#include -#include #include #include @@ -22,7 +19,7 @@ using namespace NNodes; using namespace NPrivate; TMaybeNode TYtPhysicalOptProposalTransformer::DqWrite(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) const { - if (State_->PassiveExecution) { + if (State_->PassiveExecution || !State_->DqHelper) { return node; } @@ -38,7 +35,7 @@ TMaybeNode TYtPhysicalOptProposalTransformer::DqWrite(TExprBase node, return node; } - if (!NDq::IsSingleConsumerConnection(write.Content().Cast(), *getParents())) { + if (!State_->DqHelper->IsSingleConsumerConnection(write.Content().Ptr(), *getParents())) { return node; } @@ -106,46 +103,12 @@ TMaybeNode TYtPhysicalOptProposalTransformer::DqWrite(TExprBase node, .Done(); } - TMaybeNode result; - if (NDq::GetStageOutputsCount(dqUnion.Output().Stage()) > 1) { - result = Build(ctx, write.Pos()) - .Output() - .Stage() - .Inputs() - .Add(dqUnion) - .Build() - .Program(writeLambda) - .Settings(NDq::TDqStageSettings().BuildNode(ctx, write.Pos())) - .Build() - .Index().Build("0") - .Build() - .Done().Ptr(); - } else { - result = NDq::DqPushLambdaToStageUnionAll(dqUnion, writeLambda, {}, ctx, optCtx); - if (!result) { - return {}; - } + auto result = State_->DqHelper->PushLambdaAndCreateCnResult(dqUnion.Ptr(), writeLambda.Ptr(), write.Pos(), ctx, optCtx); + if (!result) { + return {}; } - result = CleanupWorld(result.Cast(), ctx); - - auto dqCnResult = Build(ctx, write.Pos()) - .Output() - .Stage() - .Inputs() - .Add(result.Cast()) - .Build() - .Program() - .Args({"row"}) - .Body("row") - .Build() - .Settings(NDq::TDqStageSettings().BuildNode(ctx, write.Pos())) - .Build() - .Index().Build("0") - .Build() - .ColumnHints() // TODO: set column hints - .Build() - .Done().Ptr(); + result = YtCleanupWorld(result, ctx, State_); auto writeOp = Build(ctx, write.Pos()) .World(ApplySyncListToWorld(ctx.NewWorld(write.Pos()), syncList, ctx)) @@ -153,7 +116,7 @@ TMaybeNode TYtPhysicalOptProposalTransformer::DqWrite(TExprBase node, .Output() .Add(outTable.ToExprNode(ctx, write.Pos()).Cast()) .Build() - .Input(dqCnResult) + .Input(result) .Done().Ptr(); auto writeOutput = Build(ctx, write.Pos()) @@ -218,7 +181,7 @@ TMaybeNode TYtPhysicalOptProposalTransformer::DqWrite(TExprBase node, } TMaybeNode TYtPhysicalOptProposalTransformer::DqMaterialize(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) const { - if (State_->PassiveExecution) { + if (State_->PassiveExecution || !State_->DqHelper) { return node; } @@ -235,7 +198,7 @@ TMaybeNode TYtPhysicalOptProposalTransformer::DqMaterialize(TExprBase return node; } - if (!NDq::IsSingleConsumerConnection(materialize.Input().Cast(), *getParents())) { + if (!State_->DqHelper->IsSingleConsumerConnection(materialize.Input().Ptr(), *getParents())) { return node; } @@ -281,46 +244,12 @@ TMaybeNode TYtPhysicalOptProposalTransformer::DqMaterialize(TExprBase const auto dqUnion = materialize.Input().Cast(); - TMaybeNode result; - if (NDq::GetStageOutputsCount(dqUnion.Output().Stage()) > 1) { - result = Build(ctx, materialize.Pos()) - .Output() - .Stage() - .Inputs() - .Add(dqUnion) - .Build() - .Program(writeLambda) - .Settings(NDq::TDqStageSettings().BuildNode(ctx, materialize.Pos())) - .Build() - .Index().Build("0") - .Build() - .Done().Ptr(); - } else { - result = NDq::DqPushLambdaToStageUnionAll(dqUnion, writeLambda, {}, ctx, optCtx); - if (!result) { - return {}; - } + auto result = State_->DqHelper->PushLambdaAndCreateCnResult(dqUnion.Ptr(), writeLambda.Ptr(), materialize.Pos(), ctx, optCtx); + if (!result) { + return {}; } - result = CleanupWorld(result.Cast(), ctx); - - auto dqCnResult = Build(ctx, materialize.Pos()) - .Output() - .Stage() - .Inputs() - .Add(result.Cast()) - .Build() - .Program() - .Args({"row"}) - .Body("row") - .Build() - .Settings(NDq::TDqStageSettings().BuildNode(ctx, materialize.Pos())) - .Build() - .Index().Build("0") - .Build() - .ColumnHints() // TODO: set column hints - .Build() - .Done().Ptr(); + result = YtCleanupWorld(result, ctx, State_); auto writeOp = Build(ctx, materialize.Pos()) .World(ApplySyncListToWorld(materialize.World().Ptr(), syncList, ctx)) @@ -328,7 +257,7 @@ TMaybeNode TYtPhysicalOptProposalTransformer::DqMaterialize(TExprBase .Output() .Add(outTable.ToExprNode(ctx, materialize.Pos()).Cast()) .Build() - .Input(dqCnResult) + .Input(result) .Done().Ptr(); return Build(ctx, materialize.Pos()) diff --git a/ydb/library/yql/providers/yt/provider/ut/ya.make b/ydb/library/yql/providers/yt/provider/ut/ya.make index aafd53a18da0..1fc93c3e3d3e 100644 --- a/ydb/library/yql/providers/yt/provider/ut/ya.make +++ b/ydb/library/yql/providers/yt/provider/ut/ya.make @@ -26,8 +26,6 @@ PEERDIR( yql/essentials/providers/common/provider yql/essentials/providers/common/config yql/essentials/providers/config - ydb/library/yql/providers/dq/common - ydb/library/yql/providers/dq/provider yql/essentials/providers/result/provider yql/essentials/sql yql/essentials/minikql/invoke_builtins/llvm14 diff --git a/ydb/library/yql/providers/yt/provider/ut/yql_yt_epoch_ut.cpp b/ydb/library/yql/providers/yt/provider/ut/yql_yt_epoch_ut.cpp index 5459bbaa0faf..75497f79d42b 100644 --- a/ydb/library/yql/providers/yt/provider/ut/yql_yt_epoch_ut.cpp +++ b/ydb/library/yql/providers/yt/provider/ut/yql_yt_epoch_ut.cpp @@ -7,9 +7,6 @@ #include #include #include -#include -#include -#include #include #include #include @@ -50,10 +47,6 @@ Y_UNIT_TEST_SUITE(TYqlEpoch) { typeAnnotationContext->AddDataSink(YtProviderName, CreateYtDataSink(ytState)); typeAnnotationContext->AddDataSource(YtProviderName, CreateYtDataSource(ytState)); - TDqStatePtr dqState = new TDqState(nullptr, {}, functionRegistry.Get(), {}, {}, {}, typeAnnotationContext.Get(), {}, {}, {}, nullptr, {}, {}, {}, false, {}); - typeAnnotationContext->AddDataSink(DqProviderName, CreateDqDataSink(dqState)); - typeAnnotationContext->AddDataSource(DqProviderName, CreateDqDataSource(dqState, [](const TDqStatePtr&) { return new TNullTransformer; })); - typeAnnotationContext->AddDataSource(ConfigProviderName, CreateConfigProvider(*typeAnnotationContext, nullptr, "")); auto writerFactory = [] () { return CreateYsonResultWriter(NYson::EYsonFormat::Binary); }; @@ -363,10 +356,10 @@ R"(( R"( use plato; pragma yt.Pool="1"; -pragma dq.AnalyzeQuery="1"; +pragma config.flags("LLVM_OFF"); pragma yt.Pool="2"; select * from Input; -pragma dq.AnalyzeQuery="1"; +pragma config.flags("LLVM_OFF"); pragma yt.Pool="3"; select * from Input; )", @@ -374,9 +367,9 @@ R"(( (let $1 (YtConfigure! world '"yt" '"Attr" '"pool" '"1")) (let $2 (YtConfigure! $1 '"yt" '"Attr" '"pool" '"2")) (let $3 (YtConfigure! $2 '"yt" '"Attr" '"pool" '"3")) -(let $4 (Configure! world '"dq" '"Attr" '"analyzequery" '"1")) +(let $4 (Configure! world '"config" '"LLVM_OFF")) (let $5 (Write! (Sync! $2 $4) 'result (Key) (Right! (Read! $2 '"yt" '((YtTable '"Input" (Void) (Void) (Void) '() (Void) (Void) '"plato")) (Void) '())) '('('type) '('autoref)))) -(let $6 (Configure! (Commit! $5 'result) '"dq" '"Attr" '"analyzequery" '"1")) +(let $6 (Configure! (Commit! $5 'result) '"config" '"LLVM_OFF")) (let $7 (Write! (Sync! $3 $6) 'result (Key) (Right! (Read! $3 '"yt" '((YtTable '"Input" (Void) (Void) (Void) '() (Void) (Void) '"plato")) (Void) '())) '('('type) '('autoref)))) (return (Commit! (Commit! $7 'result) '"yt" '('('"epoch" '"1")))) ) diff --git a/ydb/library/yql/providers/yt/provider/ya.make b/ydb/library/yql/providers/yt/provider/ya.make index e04e21e542ff..35500a01c2a1 100644 --- a/ydb/library/yql/providers/yt/provider/ya.make +++ b/ydb/library/yql/providers/yt/provider/ya.make @@ -85,8 +85,6 @@ PEERDIR( yql/essentials/core/file_storage yql/essentials/core/url_lister/interface yql/essentials/core/dq_integration - ydb/library/yql/dq/opt - ydb/library/yql/dq/type_ann yql/essentials/minikql yql/essentials/providers/common/codec yql/essentials/providers/common/config @@ -98,7 +96,7 @@ PEERDIR( yql/essentials/providers/common/schema/expr yql/essentials/providers/common/structured_token yql/essentials/providers/common/transform - ydb/library/yql/providers/dq/common + ydb/library/yql/dq/expr_nodes ydb/library/yql/providers/dq/expr_nodes yql/essentials/providers/result/expr_nodes ydb/library/yql/providers/stat/expr_nodes diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp index 1cddefb54581..d680b9d5900c 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp @@ -4,7 +4,6 @@ #include "yql_yt_helpers.h" #include "yql_yt_optimize.h" -#include #include #include #include @@ -21,8 +20,6 @@ #include #include -#include -#include #include diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp index 93627ffc5293..e58a453d117e 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp @@ -2,7 +2,6 @@ #include "yql_yt_op_settings.h" #include "yql_yt_helpers.h" -#include #include #include #include diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp index f8064ca6b312..aad68af8127e 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp @@ -17,8 +17,6 @@ #include #include #include -#include -#include #include #include #include @@ -42,12 +40,14 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { , State_(std::move(state)), Finalizer_(std::move(finalizer)) { #define HNDL(name) "YtDqHybrid-"#name, Hndl(&TYtDqHybridTransformer::name) - AddHandler(0, &TYtFill::Match, HNDL(TryYtFillByDq)); - AddHandler(0, &TYtSort::Match, HNDL(TryYtSortByDq)); - AddHandler(0, &TYtMap::Match, HNDL(TryYtMapByDq)); - AddHandler(0, &TYtReduce::Match, HNDL(TryYtReduceByDq)); - AddHandler(0, &TYtMapReduce::Match, HNDL(TryYtMapReduceByDq)); - AddHandler(0, &TYtMerge::Match, HNDL(TryYtMergeByDq)); + if (State_->DqHelper) { + AddHandler(0, &TYtFill::Match, HNDL(TryYtFillByDq)); + AddHandler(0, &TYtSort::Match, HNDL(TryYtSortByDq)); + AddHandler(0, &TYtMap::Match, HNDL(TryYtMapByDq)); + AddHandler(0, &TYtReduce::Match, HNDL(TryYtReduceByDq)); + AddHandler(0, &TYtMapReduce::Match, HNDL(TryYtMapReduceByDq)); + AddHandler(0, &TYtMerge::Match, HNDL(TryYtMergeByDq)); + } #undef HNDL } private: @@ -160,7 +160,7 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { PushSkipStat("OverLimits", nodeName); return false; } - + return true; } @@ -228,7 +228,7 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { .Settings().Build() .Build() .Build() - .Settings(TDqStageSettings{.PartitionMode = TDqStageSettings::EPartitionMode::Single}.BuildNode(ctx, fill.Pos())) + .Settings(State_->DqHelper->CreateDqStageSettings(true, ctx, fill.Pos())) .Build() .Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default) .Build() @@ -315,7 +315,7 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { .Settings().Build() .Build() .Build() - .Settings(TDqStageSettings{.PartitionMode = TDqStageSettings::EPartitionMode::Single}.BuildNode(ctx, sort.Pos())) + .Settings(State_->DqHelper->CreateDqStageSettings(true, ctx, sort.Pos())) .Build() .Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default) .Build() @@ -401,7 +401,7 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { .Settings().Build() .Build() .Build() - .Settings(TDqStageSettings{.PartitionMode = ordered ? TDqStageSettings::EPartitionMode::Single : TDqStageSettings::EPartitionMode::Default}.BuildNode(ctx, map.Pos())) + .Settings(State_->DqHelper->CreateDqStageSettings(ordered, ctx, map.Pos())) .Done(); if (!ordered) { @@ -415,7 +415,7 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { .Build() .Build() .Program().Args({"pass"}).Body("pass").Build() - .Settings(TDqStageSettings().BuildNode(ctx, map.Pos())) + .Settings(State_->DqHelper->CreateDqStageSettings(false, ctx, map.Pos())) .Done(); } @@ -628,7 +628,7 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { .template Settings().Build() .Build() .Build() - .Settings(TDqStageSettings{.PartitionMode = TDqStageSettings::EPartitionMode::Single}.BuildNode(ctx, reduce.Pos())) + .Settings(State_->DqHelper->CreateDqStageSettings(true, ctx, reduce.Pos())) .Build() .Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default) .Build() diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp index badb6420ced2..3cf9d99cf69e 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp @@ -13,7 +13,6 @@ #include #include #include -#include #include #include #include @@ -199,7 +198,8 @@ class TYtDqIntegration: public TDqIntegrationBase { ui64 Partition(const TExprNode& node, TVector& serializedPartitions, TString* clusterName, TExprContext& ctx, const TPartitionSettings& settings) override { - auto dataSizePerJob = settings.DataSizePerJob.GetOrElse(TDqSettings::TDefault::DataSizePerJob); + YQL_ENSURE(settings.DataSizePerJob.Defined()); + ui64 dataSizePerJob = *settings.DataSizePerJob; if (!TMaybeNode(&node).IsValid()) { return 0; } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_op_hash.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_op_hash.cpp index b2d7f7f08616..7d498af7a731 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_op_hash.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_op_hash.cpp @@ -2,9 +2,9 @@ #include "yql_yt_op_hash.h" #include "yql_yt_op_settings.h" -#include #include #include +#include #include #include @@ -49,36 +49,34 @@ TYtNodeHashCalculator::TYtNodeHashCalculator(const TYtState::TPtr& state, const return TString(); }; - Hashers[TDqStage::CallableName()] = [this] (const TExprNode& node, TArgIndex& argIndex, ui32 frameLevel) { - THashBuilder builder; - builder << node.Content(); - for (size_t i = 0; i < node.ChildrenSize(); ++i) { - // skip _logical_id setting from hashing - if (i == TDqStageBase::idx_Settings) { - for (size_t j = 0; j < node.Child(i)->ChildrenSize(); ++j) { - if((node.Child(i)->Child(j)->Type() == TExprNode::List) - && node.Child(i)->Child(j)->ChildrenSize() > 0 - && (node.Child(i)->Child(j)->Child(0)->Content() = NDq::TDqStageSettings::LogicalIdSettingName)) { - continue; + if (State->DqHelper) { + Hashers[TDqStage::CallableName()] = [this] (const TExprNode& node, TArgIndex& argIndex, ui32 frameLevel) { + THashBuilder builder; + builder << node.Content(); + for (size_t i = 0; i < node.ChildrenSize(); ++i) { + if (i == TDqStageBase::idx_Settings) { + // skip _logical_id setting from hashing + const auto& settings = State->DqHelper->RemoveVariadicDqStageSettings(*node.Child(i)); + for (const auto& s: settings) { + if (auto partHash = GetHashImpl(*s, argIndex, frameLevel)) { + builder << partHash; + } + else { + return TString(); + } } - if (auto partHash = GetHashImpl(*node.Child(i)->Child(j), argIndex, frameLevel)) { + } else { + if (auto partHash = GetHashImpl(*node.Child(i), argIndex, frameLevel)) { builder << partHash; } else { return TString(); } } - } else { - if (auto partHash = GetHashImpl(*node.Child(i), argIndex, frameLevel)) { - builder << partHash; - } - else { - return TString(); - } } - } - return builder.Finish(); - }; + return builder.Finish(); + }; + } Hashers[TYtOutput::CallableName()] = [this] (const TExprNode& node, TArgIndex& argIndex, ui32 frameLevel) { return GetOutputHash(node, argIndex, frameLevel); diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp index 563601d2c212..9cfd2416b797 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp @@ -338,13 +338,14 @@ void TYtState::LeaveEvaluation(ui64 id) { std::pair, TStatWriter> CreateYtNativeState(IYtGateway::TPtr gateway, const TString& userName, const TString& sessionId, const TYtGatewayConfig* ytGatewayConfig, TIntrusivePtr typeCtx, - const IOptimizerFactory::TPtr& optFactory) + const IOptimizerFactory::TPtr& optFactory, const IDqHelper::TPtr& helper) { auto ytState = MakeIntrusive(typeCtx.Get()); ytState->SessionId = sessionId; ytState->Gateway = gateway; ytState->DqIntegration_ = CreateYtDqIntegration(ytState.Get()); ytState->OptimizerFactory_ = optFactory; + ytState->DqHelper = helper; if (ytGatewayConfig) { std::unordered_set groups; @@ -378,8 +379,8 @@ std::pair, TStatWriter> CreateYtNativeState(IYtGateway:: return {ytState, statWriter}; } -TDataProviderInitializer GetYtNativeDataProviderInitializer(IYtGateway::TPtr gateway, IOptimizerFactory::TPtr optFactory, ui32 planLimits) { - return [originalGateway = gateway, optFactory, planLimits] ( +TDataProviderInitializer GetYtNativeDataProviderInitializer(IYtGateway::TPtr gateway, IOptimizerFactory::TPtr optFactory, IDqHelper::TPtr helper, ui32 planLimits) { + return [originalGateway = gateway, optFactory, helper, planLimits] ( const TString& userName, const TString& sessionId, const TGatewaysConfig* gatewaysConfig, @@ -408,7 +409,7 @@ TDataProviderInitializer GetYtNativeDataProviderInitializer(IYtGateway::TPtr gat const TYtGatewayConfig* ytGatewayConfig = gatewaysConfig ? &gatewaysConfig->GetYt() : nullptr; TIntrusivePtr ytState; TStatWriter statWriter; - std::tie(ytState, statWriter) = CreateYtNativeState(gateway, userName, sessionId, ytGatewayConfig, typeCtx, optFactory); + std::tie(ytState, statWriter) = CreateYtNativeState(gateway, userName, sessionId, ytGatewayConfig, typeCtx, optFactory, helper); ytState->PlanLimits = planLimits; info.Names.insert({TString{YtProviderName}}); diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_provider.h b/ydb/library/yql/providers/yt/provider/yql_yt_provider.h index a08cb54b5159..c78e37f28663 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_provider.h +++ b/ydb/library/yql/providers/yt/provider/yql_yt_provider.h @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -90,7 +91,7 @@ struct TYtState : public TThrRefBase { bool IsHybridEnabled() const; bool IsHybridEnabledForCluster(const std::string_view& cluster) const; bool HybridTakesTooLong() const; - + TYtState(TTypeAnnotationContext* types) { Types = types; Configuration = MakeIntrusive(*types); @@ -121,6 +122,7 @@ struct TYtState : public TThrRefBase { ui32 PlanLimits = 10; i32 FlowDependsOnId = 0; IOptimizerFactory::TPtr OptimizerFactory_; + IDqHelper::TPtr DqHelper; private: std::unordered_map ConfigurationEvalStates_; std::unordered_map EpochEvalStates_; @@ -130,11 +132,11 @@ struct TYtState : public TThrRefBase { class TYtGatewayConfig; std::pair, TStatWriter> CreateYtNativeState(IYtGateway::TPtr gateway, const TString& userName, const TString& sessionId, const TYtGatewayConfig* ytGatewayConfig, TIntrusivePtr typeCtx, - const IOptimizerFactory::TPtr& optFactory); + const IOptimizerFactory::TPtr& optFactory, const IDqHelper::TPtr& helper); TIntrusivePtr CreateYtDataSource(TYtState::TPtr state); TIntrusivePtr CreateYtDataSink(TYtState::TPtr state); -TDataProviderInitializer GetYtNativeDataProviderInitializer(IYtGateway::TPtr gateway, IOptimizerFactory::TPtr optFactory, ui32 planLimits = 10); +TDataProviderInitializer GetYtNativeDataProviderInitializer(IYtGateway::TPtr gateway, IOptimizerFactory::TPtr optFactory, IDqHelper::TPtr helper, ui32 planLimits = 10); const THashSet& YtDataSourceFunctions(); const THashSet& YtDataSinkFunctions(); diff --git a/ydb/library/yql/tools/dqrun/dqrun.cpp b/ydb/library/yql/tools/dqrun/dqrun.cpp index 866b50390250..050df4c617cd 100644 --- a/ydb/library/yql/tools/dqrun/dqrun.cpp +++ b/ydb/library/yql/tools/dqrun/dqrun.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -297,7 +298,7 @@ NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory( NYql::NConnector::IClient::TPtr genericClient, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry, - size_t HTTPmaxTimeSeconds, + size_t HTTPmaxTimeSeconds, size_t maxRetriesCount, IPqGateway::TPtr pqGateway) { auto factory = MakeIntrusive(); @@ -964,7 +965,7 @@ int RunMain(int argc, const char* argv[]) factories.push_back(GetYtFileFactory(ytFileServices)); clusters["plato"] = YtProviderName; auto ytNativeGateway = CreateYtFileGateway(ytFileServices, &emulateOutputForMultirun); - dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway, NDq::MakeCBOOptimizerFactory())); + dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway, NDq::MakeCBOOptimizerFactory(), MakeDqHelper())); } else if (gatewaysConfig.HasYt()) { TYtNativeServices ytServices; ytServices.FunctionRegistry = funcRegistry.Get(); @@ -975,14 +976,14 @@ int RunMain(int argc, const char* argv[]) for (auto& cluster: gatewaysConfig.GetYt().GetClusterMapping()) { clusters.emplace(to_lower(cluster.GetName()), TString{YtProviderName}); } - dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway, NDq::MakeCBOOptimizerFactory())); + dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway, NDq::MakeCBOOptimizerFactory(), MakeDqHelper())); } ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory; if (tokenAccessorEndpoint) { TVector ss = StringSplitter(tokenAccessorEndpoint).SplitByString("://"); - YQL_ENSURE(ss.size() == 2, "Invalid tokenAccessorEndpoint: " << tokenAccessorEndpoint); + YQL_ENSURE(ss.size() == 2, "Invalid tokenAccessorEndpoint: " << tokenAccessorEndpoint); credentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory(ss[1], ss[0] == "grpcs", ""); } diff --git a/ydb/library/yql/tools/dqrun/ya.make b/ydb/library/yql/tools/dqrun/ya.make index ccda8a5dff56..9bfbc9f1649d 100644 --- a/ydb/library/yql/tools/dqrun/ya.make +++ b/ydb/library/yql/tools/dqrun/ya.make @@ -52,6 +52,7 @@ ENDIF() ydb/library/yql/providers/dq/local_gateway ydb/library/yql/providers/dq/provider ydb/library/yql/providers/dq/provider/exec + ydb/library/yql/providers/dq/helper ydb/library/yql/providers/pq/async_io ydb/library/yql/providers/pq/gateway/dummy ydb/library/yql/providers/pq/gateway/native diff --git a/ydb/library/yql/tools/mrrun/mrrun.cpp b/ydb/library/yql/tools/mrrun/mrrun.cpp index 68d3f2adb0c2..61b66e651fee 100644 --- a/ydb/library/yql/tools/mrrun/mrrun.cpp +++ b/ydb/library/yql/tools/mrrun/mrrun.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include @@ -236,7 +237,7 @@ bool FillUsedUrls( NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory(const NYdb::TDriver& driver, IHTTPGateway::TPtr httpGateway) { auto factory = MakeIntrusive(); - + TPqGatewayServices pqServices( driver, nullptr, @@ -658,7 +659,7 @@ int RunMain(int argc, const char* argv[]) auto ytNativeGateway = CreateYtNativeGateway(services); gateways.emplace_back(ytNativeGateway); FillClusterMapping(clusters, gatewaysConfig.GetYt(), TString{YtProviderName}); - dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway, NDq::MakeCBOOptimizerFactory())); + dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway, NDq::MakeCBOOptimizerFactory(), MakeDqHelper())); } if (gatewayTypes.contains(ClickHouseProviderName) && gatewaysConfig.HasClickHouse()) { diff --git a/ydb/library/yql/tools/mrrun/ya.make b/ydb/library/yql/tools/mrrun/ya.make index aef16123e5c2..88149e7c18c6 100644 --- a/ydb/library/yql/tools/mrrun/ya.make +++ b/ydb/library/yql/tools/mrrun/ya.make @@ -49,6 +49,7 @@ PEERDIR( ydb/library/yql/providers/dq/local_gateway ydb/library/yql/providers/dq/provider ydb/library/yql/providers/dq/provider/exec + ydb/library/yql/providers/dq/helper ydb/library/yql/providers/pq/async_io ydb/library/yql/providers/pq/gateway/native ydb/library/yql/providers/s3/actors diff --git a/ydb/library/yql/tools/yqlrun/http/ya.make b/ydb/library/yql/tools/yqlrun/http/ya.make index 3bc08d0f6e2f..fa9416391db1 100644 --- a/ydb/library/yql/tools/yqlrun/http/ya.make +++ b/ydb/library/yql/tools/yqlrun/http/ya.make @@ -24,6 +24,7 @@ PEERDIR( yql/essentials/core/facade yql/essentials/core/type_ann ydb/library/yql/dq/opt + ydb/library/yql/providers/dq/helper ydb/library/yql/providers/dq/provider yql/essentials/providers/result/provider yql/essentials/parser/pg_wrapper diff --git a/ydb/library/yql/tools/yqlrun/http/yql_server.cpp b/ydb/library/yql/tools/yqlrun/http/yql_server.cpp index ae058fa0a9c6..34572aafc7cd 100644 --- a/ydb/library/yql/tools/yqlrun/http/yql_server.cpp +++ b/ydb/library/yql/tools/yqlrun/http/yql_server.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -174,7 +175,7 @@ struct TTableFileHolder { }; TProgramPtr MakeFileProgram(const TString& program, TYqlServer& yqlServer, - const THashMap& tables, const THashMap, + const THashMap& tables, const THashMap, TVector>>& rtmrTableAttributes, const TString& tmpDir) { TVector dataProvidersInit; @@ -190,7 +191,7 @@ TProgramPtr MakeFileProgram(const TString& program, TYqlServer& yqlServer, dataProvidersInit.push_back(GetDqDataProviderInitializer([](const TDqStatePtr&){ return new TNullTransformer; }, {}, dqCompFactory, {}, yqlServer.FileStorage)); - dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway, NDq::MakeCBOOptimizerFactory())); + dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway, NDq::MakeCBOOptimizerFactory(), MakeDqHelper())); dataProvidersInit.push_back(GetPgDataProviderInitializer()); ExtProviderSpecific(yqlServer.FunctionRegistry, dataProvidersInit, rtmrTableAttributes); diff --git a/ydb/library/yql/tools/yqlrun/ya.make b/ydb/library/yql/tools/yqlrun/ya.make index b5e4ad29caff..76e9fc14ea8a 100644 --- a/ydb/library/yql/tools/yqlrun/ya.make +++ b/ydb/library/yql/tools/yqlrun/ya.make @@ -38,6 +38,7 @@ PEERDIR( yql/essentials/providers/common/udf_resolve ydb/library/yql/dq/opt ydb/library/yql/providers/dq/provider + ydb/library/yql/providers/dq/helper ydb/library/yql/providers/yt/gateway/file ydb/library/yql/providers/yt/codec/codegen ydb/library/yql/providers/yt/comp_nodes/llvm14 diff --git a/ydb/library/yql/tools/yqlrun/yqlrun.cpp b/ydb/library/yql/tools/yqlrun/yqlrun.cpp index b601ee913674..42e04a386b55 100644 --- a/ydb/library/yql/tools/yqlrun/yqlrun.cpp +++ b/ydb/library/yql/tools/yqlrun/yqlrun.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include @@ -702,7 +703,7 @@ int Main(int argc, const char *argv[]) if (gatewayTypes.contains(YtProviderName) || res.Has("opt-collision")) { auto yqlNativeServices = NFile::TYtFileServices::Make(funcRegistry.Get(), tablesMapping, fileStorage, tmpDir, res.Has("keep-temp"), tablesDirMapping); auto ytNativeGateway = CreateYtFileGateway(yqlNativeServices, &emulateOutputForMultirun); - dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway, NDq::MakeCBOOptimizerFactory())); + dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway, NDq::MakeCBOOptimizerFactory(), MakeDqHelper())); } } @@ -985,7 +986,7 @@ int RunUI(int argc, const char* argv[]) Y_ABORT_UNLESS(NKikimr::ParsePBFromFile(pgExtConfig, &config)); TVector extensions; PgExtensionsFromProto(config, extensions); - NPg::RegisterExtensions(extensions, false, + NPg::RegisterExtensions(extensions, false, *NSQLTranslationPG::CreateExtensionSqlParser(), NKikimr::NMiniKQL::CreateExtensionLoader().get()); } diff --git a/ydb/library/yql/yt/native/plugin.cpp b/ydb/library/yql/yt/native/plugin.cpp index f231895aa3f8..b6013f2029b7 100644 --- a/ydb/library/yql/yt/native/plugin.cpp +++ b/ydb/library/yql/yt/native/plugin.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -354,7 +355,7 @@ class TYqlPlugin } auto ytNativeGateway = CreateYtNativeGateway(ytServices); - dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway, NDq::MakeCBOOptimizerFactory())); + dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway, NDq::MakeCBOOptimizerFactory(), MakeDqHelper())); ProgramFactory_ = std::make_unique( false, FuncRegistry_.Get(), ExprContext_.NextUniqueId, dataProvidersInit, "embedded"); diff --git a/ydb/library/yql/yt/native/ya.make b/ydb/library/yql/yt/native/ya.make index c470cff46b59..142bed0ac112 100644 --- a/ydb/library/yql/yt/native/ya.make +++ b/ydb/library/yql/yt/native/ya.make @@ -46,6 +46,7 @@ PEERDIR( ydb/library/yql/dq/opt ydb/library/yql/providers/dq/actors/yt ydb/library/yql/providers/dq/global_worker_manager + ydb/library/yql/providers/dq/helper ydb/library/yql/providers/dq/provider ydb/library/yql/providers/dq/provider/exec ydb/library/yql/providers/dq/service