Skip to content

Commit

Permalink
Merge 19e1352 into f9c6964
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Feb 17, 2025
2 parents f9c6964 + 19e1352 commit b80bdfd
Showing 1 changed file with 48 additions and 18 deletions.
66 changes: 48 additions & 18 deletions ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,47 @@ class TS3DataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
return TStatus::Error;
}

auto source = input->Child(TS3WriteObject::idx_Input);
const auto targetNode = input->Child(TS3WriteObject::idx_Target);
if (!TS3Target::Match(targetNode)) {
ctx.AddError(TIssue(ctx.GetPosition(targetNode->Pos()), "Expected S3 target."));
return TStatus::Error;
}

const TTypeAnnotationNode* targetType;
if (const TS3Target target(targetNode); const auto settings = target.Settings()) {
if (const auto userschema = GetSetting(settings.Cast().Ref(), "userschema")) {
targetType = userschema->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
}
}

const auto source = input->ChildPtr(TS3WriteObject::idx_Input);
if (const auto maybeTuple = TMaybeNode<TExprList>(source)) {
const auto tuple = maybeTuple.Cast();

TVector<TExprBase> tupleValues;
tupleValues.reserve(tuple.Size());
for (const auto& value : tuple) {
if (!EnsureStructType(input->Pos(), *value.Ref().GetTypeAnn(), ctx)) {
return TStatus::Error;
}

TExprNode::TPtr node = value.Ptr();
if (targetType && TryConvertTo(node, *targetType, ctx) == TStatus::Error) {
ctx.AddError(TIssue(ctx.GetPosition(source->Pos()), "Failed to convert input columns types to scheme types"));
return TStatus::Error;
}

tupleValues.emplace_back(std::move(node));
}

const auto list = Build<TCoAsList>(ctx, input->Pos())
.Add(std::move(tupleValues))
.Done();

input->ChildRef(TS3WriteObject::idx_Input) = list.Ptr();
return TStatus::Repeat;
}

if (!EnsureListType(*source, ctx)) {
return TStatus::Error;
}
Expand All @@ -80,23 +120,13 @@ class TS3DataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
return TStatus::Error;
}

auto target = input->Child(TS3WriteObject::idx_Target);
if (!TS3Target::Match(target)) {
ctx.AddError(TIssue(ctx.GetPosition(target->Pos()), "Expected S3 target."));
return TStatus::Error;
}

TS3Target tgt(target);
if (auto settings = tgt.Settings()) {
if (auto userschema = GetSetting(settings.Cast().Ref(), "userschema")) {
const TTypeAnnotationNode* targetType = userschema->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
if (!IsSameAnnotation(*targetType, *sourceType)) {
ctx.AddError(TIssue(ctx.GetPosition(source->Pos()),
TStringBuilder() << "Type mismatch between schema type: " << *targetType
<< " and actual data type: " << *sourceType << ", diff is: "
<< GetTypeDiff(*targetType, *sourceType)));
return TStatus::Error;
}
if (targetType) {
const auto status = TryConvertTo(input->ChildRef(TS3WriteObject::idx_Input), *ctx.MakeType<TListExprType>(targetType), ctx);
if (status == TStatus::Error) {
ctx.AddError(TIssue(ctx.GetPosition(source->Pos()), "Row type mismatch for S3 external table"));
return TStatus::Error;
} else if (status != TStatus::Ok) {
return status;
}
}

Expand Down

0 comments on commit b80bdfd

Please sign in to comment.