From 657f89bd37d75828feb5858ff80eae933677740e Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy <79596613+GrigoriyPA@users.noreply.github.com> Date: Mon, 8 Apr 2024 13:03:00 +0300 Subject: [PATCH] YQ-3011 fix parquet type validation (#3502) --- .../provider/yql_s3_datasource_type_ann.cpp | 28 ++++++++++++++++++- .../s3/provider/yql_s3_dq_integration.cpp | 9 ------ 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp index bccfdca0e121..ce0bd022f439 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp @@ -282,6 +282,18 @@ bool ExtractSettingValue(const TExprNode& value, TStringBuf settingName, TString } +bool EnsureParquetTypeSupported(TPositionHandle position, const TTypeAnnotationNode* type, TExprContext& ctx, const IArrowResolver::TPtr& arrowResolver) { + auto resolveStatus = arrowResolver->AreTypesSupported(ctx.GetPosition(position), { type }, ctx); + YQL_ENSURE(resolveStatus != IArrowResolver::ERROR); + + if (resolveStatus != IArrowResolver::OK) { + ctx.AddError(TIssue(ctx.GetPosition(position), TStringBuilder() << "Type " << *type << " is not supported for parquet")); + return false; + } + + return true; +} + class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase { public: TS3DataSourceTypeAnnotationTransformer(TS3State::TPtr state) @@ -407,7 +419,8 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase { return TStatus::Error; } - if (!TS3Object::Match(input->Child(TS3ReadObject::idx_Object))) { + const auto& objectNode = input->Child(TS3ReadObject::idx_Object); + if (!TS3Object::Match(objectNode)) { ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3ReadObject::idx_Object)->Pos()), "Expected S3 object.")); return TStatus::Error; } @@ -467,6 +480,19 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase { return TStatus::Error; } + if (objectNode->Child(TS3Object::idx_Format)->Content() == "parquet") { + YQL_ENSURE(State_->Types->ArrowResolver); + bool allTypesSupported = true; + for (const auto& item : rowType->Cast()->GetItems()) { + if (!EnsureParquetTypeSupported(input->Pos(), item->GetItemType(), ctx, State_->Types->ArrowResolver)) { + allTypesSupported = false; + } + } + if (!allTypesSupported) { + return TStatus::Error; + } + } + input->SetTypeAnn(ctx.MakeType(TTypeAnnotationNode::TListType{ input->Child(TS3ReadObject::idx_World)->GetTypeAnn(), ctx.MakeType(rowType) diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index dcb4c1dc9933..94e93ee2b1f5 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -259,15 +259,6 @@ class TS3DqIntegration: public TDqIntegrationBase { auto format = s3ReadObject.Object().Format().Ref().Content(); if (const auto useCoro = State_->Configuration->SourceCoroActor.Get(); (!useCoro || *useCoro) && format != "raw" && format != "json_list") { - if (format == "parquet") { - YQL_ENSURE(State_->Types->ArrowResolver); - TVector allTypes; - for (const auto& x : rowType->Cast()->GetItems()) { - allTypes.push_back(x->GetItemType()); - } - auto resolveStatus = State_->Types->ArrowResolver->AreTypesSupported(ctx.GetPosition(read->Pos()), allTypes, ctx); - YQL_ENSURE(resolveStatus == IArrowResolver::OK); - } return Build(ctx, read->Pos()) .Input() .Paths(s3ReadObject.Object().Paths())