diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp index 19450533eca9..cb3cebe24aa7 100644 --- a/ydb/core/external_sources/object_storage.cpp +++ b/ydb/core/external_sources/object_storage.cpp @@ -120,6 +120,7 @@ struct TObjectStorageExternalSource : public IExternalSource { static NYql::TIssues Validate(const TScheme& schema, const TObjectStorage& objectStorage, size_t pathsLimit) { NYql::TIssues issues; issues.AddIssues(ValidateFormatSetting(objectStorage.format(), objectStorage.format_setting())); + issues.AddIssues(ValidateRawFormat(objectStorage.format(), schema, objectStorage.partitioned_by())); if (objectStorage.projection_size() || objectStorage.partitioned_by_size()) { try { TVector partitionedBy{objectStorage.partitioned_by().begin(), objectStorage.partitioned_by().end()}; @@ -223,6 +224,37 @@ struct TObjectStorageExternalSource : public IExternalSource { return issues; } + template + static NYql::TIssues ValidateRawFormat(const TString& format, const TScheme& schema, const google::protobuf::RepeatedPtrField& partitionedBy) { + NYql::TIssues issues; + if (format != "raw"sv) { + return issues; + } + + ui64 realSchemaColumnsCount = 0; + Ydb::Column lastColumn; + TSet partitionedBySet{partitionedBy.begin(), partitionedBy.end()}; + + for (const auto& column: schema.column()) { + if (partitionedBySet.contains(column.name())) { + continue; + } + if (!ValidateStringType(column.type())) { + issues.AddIssue(MakeErrorIssue( + Ydb::StatusIds::BAD_REQUEST, + TStringBuilder{} << TStringBuilder() << "Only string type column in schema supported in raw format (you have '" + << column.name() << " " << NYdb::TType(column.type()).ToString() << "' field)")); + } + ++realSchemaColumnsCount; + } + + if (realSchemaColumnsCount != 1) { + issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, TStringBuilder{} << TStringBuilder() << "Only one column in schema supported in raw format (you have " + << realSchemaColumnsCount << " fields)")); + } + return issues; + } + private: static bool IsValidIntervalUnit(const TString& unit) { static constexpr std::array IntervalUnits = { @@ -416,6 +448,29 @@ struct TObjectStorageExternalSource : public IExternalSource { return dataSlotColumns; } + static std::vector GetStringTypes() { + NYdb::TType stringType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::String).Build(); + NYdb::TType utf8Type = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Utf8).Build(); + NYdb::TType ysonType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Yson).Build(); + NYdb::TType jsonType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Json).Build(); + const std::vector result { + stringType, + utf8Type, + ysonType, + jsonType, + NYdb::TTypeBuilder{}.Optional(stringType).Build(), + NYdb::TTypeBuilder{}.Optional(utf8Type).Build(), + NYdb::TTypeBuilder{}.Optional(ysonType).Build(), + NYdb::TTypeBuilder{}.Optional(jsonType).Build() + }; + return result; + } + + static bool ValidateStringType(const NYdb::TType& columnType) { + static const std::vector availableTypes = GetStringTypes(); + return FindIf(availableTypes, [&columnType](const auto& availableType) { return NYdb::TypesEqual(availableType, columnType); }) != availableTypes.end(); + } + private: const std::vector HostnamePatterns; const size_t PathsLimit; @@ -435,4 +490,8 @@ NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map& partitionedBy) { + return TObjectStorageExternalSource::ValidateRawFormat(format, schema, partitionedBy); +} + } diff --git a/ydb/core/external_sources/object_storage.h b/ydb/core/external_sources/object_storage.h index 9387b3617d58..e357be02d994 100644 --- a/ydb/core/external_sources/object_storage.h +++ b/ydb/core/external_sources/object_storage.h @@ -14,4 +14,6 @@ NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuer NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map& formatSetting, bool matchAllSettings = false); +NYql::TIssues ValidateRawFormat(const TString& format, const FederatedQuery::Schema& schema, const google::protobuf::RepeatedPtrField& partitionedBy); + } diff --git a/ydb/core/fq/libs/control_plane_storage/request_validators.h b/ydb/core/fq/libs/control_plane_storage/request_validators.h index d28023daa6a5..c17ac4d41a16 100644 --- a/ydb/core/fq/libs/control_plane_storage/request_validators.h +++ b/ydb/core/fq/libs/control_plane_storage/request_validators.h @@ -110,6 +110,7 @@ NYql::TIssues ValidateBinding(const T& ev, size_t maxSize, const TSet())); break; } case FederatedQuery::BindingSetting::BINDING_NOT_SET: { diff --git a/ydb/library/yql/providers/common/mkql/parser.cpp b/ydb/library/yql/providers/common/mkql/parser.cpp index 6dd57a930d62..6ccac3ca764d 100644 --- a/ydb/library/yql/providers/common/mkql/parser.cpp +++ b/ydb/library/yql/providers/common/mkql/parser.cpp @@ -215,6 +215,7 @@ TRuntimeNode BuildParseCall( if (parseItemStructType->GetMembersCount() == 0) { return ctx.ProgramBuilder.NewStruct(parseItemType, {}); } + MKQL_ENSURE(parseItemStructType->GetMembersCount() == 1, "Only one column in schema supported in raw format"); bool isOptional; const auto schemeType = UnpackOptionalData( diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp index a58824e07686..24a2406e7341 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.cpp +++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp @@ -1378,13 +1378,49 @@ bool ValidateCompressionForOutput(std::string_view format, std::string_view comp return false; } -bool ValidateFormatForInput(std::string_view format, TExprContext& ctx) { - if (format.empty() || IsIn(FormatsForInput, format)) { +bool ValidateFormatForInput( + std::string_view format, + const TStructExprType* schemaStructRowType, + const std::function& excludeFields, + TExprContext& ctx) { + if (format.empty()) { return true; } - ctx.AddError(TIssue(TStringBuilder() << "Unknown format: " << format - << ". Use one of: " << JoinSeq(", ", FormatsForInput))); - return false; + + if (!IsIn(FormatsForInput, format)) { + ctx.AddError(TIssue(TStringBuilder() << "Unknown format: " << format + << ". Use one of: " << JoinSeq(", ", FormatsForInput))); + return false; + } + + if (schemaStructRowType && format == TStringBuf("raw")) { + ui64 realSchemaColumnsCount = 0; + + for (const TItemExprType* item : schemaStructRowType->GetItems()) { + if (excludeFields && excludeFields(item->GetName())) { + continue; + } + const TTypeAnnotationNode* rowType = item->GetItemType(); + if (rowType->GetKind() == ETypeAnnotationKind::Optional) { + rowType = rowType->Cast()->GetItemType(); + } + + if (rowType->GetKind() != ETypeAnnotationKind::Data + || !IsDataTypeString(rowType->Cast()->GetSlot())) { + ctx.AddError(TIssue(TStringBuilder() << "Only string type column in schema supported in raw format (you have '" + << item->GetName() << " " << FormatType(rowType) << "' field)")); + return false; + } + ++realSchemaColumnsCount; + } + + if (realSchemaColumnsCount != 1) { + ctx.AddError(TIssue(TStringBuilder() << "Only one column in schema supported in raw format (you have " + << realSchemaColumnsCount << " fields)")); + return false; + } + } + return true; } bool ValidateFormatForOutput(std::string_view format, TExprContext& ctx) { diff --git a/ydb/library/yql/providers/common/provider/yql_provider.h b/ydb/library/yql/providers/common/provider/yql_provider.h index b7553c38ad3c..51ed31dc4dd0 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.h +++ b/ydb/library/yql/providers/common/provider/yql_provider.h @@ -187,7 +187,7 @@ void WriteStatistics(NYson::TYsonWriter& writer, const TOperationStatistics& sta bool ValidateCompressionForInput(std::string_view format, std::string_view compression, TExprContext& ctx); bool ValidateCompressionForOutput(std::string_view format, std::string_view compression, TExprContext& ctx); -bool ValidateFormatForInput(std::string_view format, TExprContext& ctx); +bool ValidateFormatForInput(std::string_view format, const TStructExprType* schemaStructRowType, const std::function& excludeFields, TExprContext& ctx); bool ValidateFormatForOutput(std::string_view format, TExprContext& ctx); bool ValidateIntervalUnit(std::string_view unit, TExprContext& ctx); diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp index f3a1051802cd..86c0e1746bfe 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp @@ -99,23 +99,27 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase { return TStatus::Error; } - auto format = read.Format().Ref().Content(); - if (!NCommon::ValidateFormatForInput(format, ctx)) { + TPqTopic topic = read.Topic(); + if (!EnsureCallable(topic.Ref(), ctx)) { return TStatus::Error; } - if (!NCommon::ValidateCompressionForInput(format, read.Compression().Ref().Content(), ctx)) { + TVector columnOrder; + auto schema = GetReadTopicSchema(topic, read.Columns().Maybe(), ctx, columnOrder); + if (!schema) { return TStatus::Error; } - TPqTopic topic = read.Topic(); - if (!EnsureCallable(topic.Ref(), ctx)) { + auto format = read.Format().Ref().Content(); + if (!NCommon::ValidateFormatForInput( + format, + schema->Cast()->GetItemType()->Cast(), + [](TStringBuf fieldName) {return FindPqMetaFieldDescriptorBySysColumn(TString(fieldName)); }, + ctx)) { return TStatus::Error; } - TVector columnOrder; - auto schema = GetReadTopicSchema(topic, read.Columns().Maybe(), ctx, columnOrder); - if (!schema) { + if (!NCommon::ValidateCompressionForInput(format, read.Compression().Ref().Content(), ctx)) { return TStatus::Error; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp index ce0bd022f439..6921d2898660 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp @@ -357,7 +357,7 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase { } if (!EnsureAtom(*input->Child(TS3ParseSettings::idx_Format), ctx) || - !NCommon::ValidateFormatForInput(input->Child(TS3ParseSettings::idx_Format)->Content(), ctx)) + !NCommon::ValidateFormatForInput(input->Child(TS3ParseSettings::idx_Format)->Content(), nullptr, nullptr, ctx)) { return TStatus::Error; } @@ -438,13 +438,15 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase { std::vector partitionedBy; TString projection; { - THashSet columns; + TS3Object s3Object(input->Child(TS3ReadObject::idx_Object)); + auto format = s3Object.Format().Ref().Content(); const TStructExprType* structRowType = rowType->Cast(); + + THashSet columns; for (const TItemExprType* item : structRowType->GetItems()) { columns.emplace(item->GetName()); } - - TS3Object s3Object(input->Child(TS3ReadObject::idx_Object)); + if (TMaybeNode settings = s3Object.Settings()) { for (auto& settingNode : settings.Raw()->ChildrenList()) { const TStringBuf name = settingNode->Head().Content(); @@ -461,13 +463,21 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase { ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), "Table contains no columns except partitioning columns")); return TStatus::Error; } - } if (name == "projection"sv) { projection = settingNode->Tail().Content(); } } } + + TSet partitionedBySet{partitionedBy.begin(), partitionedBy.end()}; + if (!NCommon::ValidateFormatForInput( + format, + structRowType, + [partitionedBySet](TStringBuf fieldName) {return partitionedBySet.contains(fieldName); }, + ctx)) { + return TStatus::Error; + } } if (!ValidateProjectionTypes( @@ -550,7 +560,7 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase { } const auto format = input->Child(TS3Object::idx_Format)->Content(); - if (!EnsureAtom(*input->Child(TS3Object::idx_Format), ctx) || !NCommon::ValidateFormatForInput(format, ctx)) { + if (!EnsureAtom(*input->Child(TS3Object::idx_Format), ctx) || !NCommon::ValidateFormatForInput(format, nullptr, nullptr, ctx)) { return TStatus::Error; } diff --git a/ydb/tests/fq/s3/test_bindings.py b/ydb/tests/fq/s3/test_bindings.py index f3e7f21b5fbb..a0ab22c01591 100644 --- a/ydb/tests/fq/s3/test_bindings.py +++ b/ydb/tests/fq/s3/test_bindings.py @@ -619,3 +619,17 @@ def test_ast_in_failed_query_compilation(self, kikimr, s3, client): ast = client.describe_query(query_id).result.query.ast.data assert "(\'columns \'(\'\"some_unknown_column\"))" in ast, "Invalid query ast" + + @yq_all + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + def test_raw_empty_schema_binding(self, kikimr, client, unique_prefix): + kikimr.control_plane.wait_bootstrap(1) + connection_response = client.create_storage_connection(unique_prefix + "fruitbucket", "fbucket") + binding_response = client.create_object_storage_binding(name=unique_prefix + "my_binding", + path="fruits.csv", + format="raw", + connection_id=connection_response.result.connection_id, + columns=[], + check_issues=False) + assert "Only one column in schema supported in raw format" in str(binding_response.issues), str( + binding_response.issues) \ No newline at end of file diff --git a/ydb/tests/fq/s3/test_formats.py b/ydb/tests/fq/s3/test_formats.py index 5ff364095224..608a49e04bce 100644 --- a/ydb/tests/fq/s3/test_formats.py +++ b/ydb/tests/fq/s3/test_formats.py @@ -478,3 +478,20 @@ def test_precompute(self, kikimr, s3, client): assert len(result_set.rows) == 1 assert result_set.rows[0].items[0].bytes_value == b"Pear" assert result_set.rows[0].items[1].int32_value == 15 + + @yq_all + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + def test_raw_empty_schema_query(self, kikimr, s3, client, unique_prefix): + self.create_bucket_and_upload_file("test.parquet", s3, kikimr) + storage_connection_name = unique_prefix + "fruitbucket" + client.create_storage_connection(storage_connection_name, "fbucket") + sql = f''' + SELECT * FROM `{storage_connection_name}`.`*` + WITH (format=raw, SCHEMA ()); + ''' + + query_id = client.create_query("test_raw_empty_schema", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.FAILED) + describe_result = client.describe_query(query_id).result + describe_string = "{}".format(describe_result) + assert r"Only one column in schema supported in raw format" in describe_string diff --git a/ydb/tests/fq/yds/test_yds_bindings.py b/ydb/tests/fq/yds/test_yds_bindings.py index 4b64cd648297..ab9974cae5ee 100644 --- a/ydb/tests/fq/yds/test_yds_bindings.py +++ b/ydb/tests/fq/yds/test_yds_bindings.py @@ -55,3 +55,18 @@ def test_yds_insert(self, client): assert result_set.rows[0].items[1].text_value == 'xxx' assert result_set.rows[1].items[0].int32_value == 456 assert result_set.rows[1].items[1].text_value == 'yyy' + + @yq_v1 + def test_raw_empty_schema_binding(self, kikimr, client, yq_version): + self.init_topics(f"pq_test_raw_empty_schema_binding_{yq_version}") + connection_response = client.create_yds_connection("myyds2", os.getenv("YDB_DATABASE"), + os.getenv("YDB_ENDPOINT")) + assert not connection_response.issues, str(connection_response.issues) + binding_response = client.create_yds_binding(name="my_binding", + stream=self.input_topic, + format="raw", + connection_id=connection_response.result.connection_id, + columns=[], + check_issues=False) + assert "Only one column in schema supported in raw format" in str(binding_response.issues), str( + binding_response.issues)