Skip to content

Commit

Permalink
Merge dd4a6a0 into 22bee29
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Feb 18, 2025
2 parents 22bee29 + dd4a6a0 commit c55f540
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 19 deletions.
76 changes: 76 additions & 0 deletions ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,82 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
UNIT_ASSERT_EQUAL(GetObjectKeys(writeBucket).size(), 3);
}

Y_UNIT_TEST(InsertIntoBucketValuesCast) {
const TString writeDataSourceName = "/Root/write_data_source";
const TString writeTableName = "/Root/write_binding";
const TString writeBucket = "test_bucket_values_cast";
const TString writeObject = "test_object_write/";
{
Aws::S3::S3Client s3Client = MakeS3Client();
CreateBucket(writeBucket, s3Client);
}

auto kikimr = NTestUtils::MakeKikimrRunner();

auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
{
const TString query = fmt::format(R"(
CREATE EXTERNAL DATA SOURCE `{write_source}` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="{write_location}",
AUTH_METHOD="NONE"
);
CREATE EXTERNAL TABLE `{write_table}` (
key Uint64 NOT NULL,
value String NOT NULL
) WITH (
DATA_SOURCE="{write_source}",
LOCATION="{write_object}",
FORMAT="tsv_with_names"
);
)",
"write_source"_a = writeDataSourceName,
"write_table"_a = writeTableName,
"write_location"_a = GetBucketLocation(writeBucket),
"write_object"_a = writeObject);

const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToOneLineString());
}

auto db = kikimr->GetQueryClient();
{
const TString query = fmt::format(R"(
INSERT INTO `{write_table}`
(key, value)
VALUES
(1, "#######"),
(4294967295u, "#######");
INSERT INTO `{write_source}`.`{write_object}` WITH (FORMAT = "tsv_with_names")
(key, value)
VALUES
(1, "#######"),
(4294967295u, "#######");
INSERT INTO `{write_table}` SELECT * FROM AS_TABLE([
<|key: 1, value: "#####"|>,
<|key: 4294967295u, value: "#####"|>
]);
INSERT INTO `{write_source}`.`{write_object}` WITH (FORMAT = "tsv_with_names")
SELECT * FROM AS_TABLE([
<|key: 1, value: "#####"|>,
<|key: 4294967295u, value: "#####"|>
]);
)",
"write_source"_a = writeDataSourceName,
"write_table"_a = writeTableName,
"write_object"_a = writeObject);

const auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToOneLineString());
}

UNIT_ASSERT_EQUAL(GetObjectKeys(writeBucket).size(), 4);
}

Y_UNIT_TEST(UpdateExternalTable) {
const TString readDataSourceName = "/Root/read_data_source";
const TString readTableName = "/Root/read_binding";
Expand Down
66 changes: 48 additions & 18 deletions ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,47 @@ class TS3DataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
return TStatus::Error;
}

auto source = input->Child(TS3WriteObject::idx_Input);
const auto targetNode = input->Child(TS3WriteObject::idx_Target);
if (!TS3Target::Match(targetNode)) {
ctx.AddError(TIssue(ctx.GetPosition(targetNode->Pos()), "Expected S3 target."));
return TStatus::Error;
}

const TTypeAnnotationNode* targetType = nullptr;
if (const TS3Target target(targetNode); const auto settings = target.Settings()) {
if (const auto userschema = GetSetting(settings.Cast().Ref(), "userschema")) {
targetType = userschema->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
}
}

const auto source = input->ChildPtr(TS3WriteObject::idx_Input);
if (const auto maybeTuple = TMaybeNode<TExprList>(source)) {
const auto tuple = maybeTuple.Cast();

TVector<TExprBase> convertedValues;
convertedValues.reserve(tuple.Size());
for (const auto& value : tuple) {
if (!EnsureStructType(input->Pos(), *value.Ref().GetTypeAnn(), ctx)) {
return TStatus::Error;
}

TExprNode::TPtr node = value.Ptr();
if (targetType && TryConvertTo(node, *targetType, ctx) == TStatus::Error) {
ctx.AddError(TIssue(ctx.GetPosition(source->Pos()), "Failed to convert input columns types to scheme types"));
return TStatus::Error;
}

convertedValues.emplace_back(std::move(node));
}

const auto list = Build<TCoAsList>(ctx, input->Pos())
.Add(std::move(convertedValues))
.Done();

input->ChildRef(TS3WriteObject::idx_Input) = list.Ptr();
return TStatus::Repeat;
}

if (!EnsureListType(*source, ctx)) {
return TStatus::Error;
}
Expand All @@ -80,23 +120,13 @@ class TS3DataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
return TStatus::Error;
}

auto target = input->Child(TS3WriteObject::idx_Target);
if (!TS3Target::Match(target)) {
ctx.AddError(TIssue(ctx.GetPosition(target->Pos()), "Expected S3 target."));
return TStatus::Error;
}

TS3Target tgt(target);
if (auto settings = tgt.Settings()) {
if (auto userschema = GetSetting(settings.Cast().Ref(), "userschema")) {
const TTypeAnnotationNode* targetType = userschema->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
if (!IsSameAnnotation(*targetType, *sourceType)) {
ctx.AddError(TIssue(ctx.GetPosition(source->Pos()),
TStringBuilder() << "Type mismatch between schema type: " << *targetType
<< " and actual data type: " << *sourceType << ", diff is: "
<< GetTypeDiff(*targetType, *sourceType)));
return TStatus::Error;
}
if (targetType) {
const auto status = TryConvertTo(input->ChildRef(TS3WriteObject::idx_Input), *ctx.MakeType<TListExprType>(targetType), ctx);
if (status == TStatus::Error) {
ctx.AddError(TIssue(ctx.GetPosition(source->Pos()), "Row type mismatch for S3 external table"));
return TStatus::Error;
} else if (status != TStatus::Ok) {
return status;
}
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/tests/fq/s3/test_bindings_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def test_s3_format_mismatch(self, kikimr, s3, client, unique_prefix):

describe_result = client.describe_query(query_id).result
describe_string = "{}".format(describe_result)
assert "Type mismatch between schema type" in describe_string, describe_string
assert "Row type mismatch for S3 external table" in describe_string, describe_string

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
Expand Down

0 comments on commit c55f540

Please sign in to comment.