diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp index 37e5eaa0fa1b..287e2ea64309 100644 --- a/ydb/core/external_sources/object_storage.cpp +++ b/ydb/core/external_sources/object_storage.cpp @@ -307,15 +307,20 @@ struct TObjectStorageExternalSource : public IExternalSource { structuredTokenBuilder.SetNoAuth(); } + auto effectiveFilePattern = meta->TableLocation; + if (meta->TableLocation.EndsWith('/')) { + effectiveFilePattern += '*'; + } + const NYql::TS3Credentials credentials(CredentialsFactory, structuredTokenBuilder.ToJson()); auto httpGateway = NYql::IHTTPGateway::Make(); auto httpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()}); auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, httpRetryPolicy, NYql::NS3Lister::TListingRequest{ .Url = meta->DataSourceLocation, .Credentials = credentials, - .Pattern = meta->TableLocation, + .Pattern = effectiveFilePattern, }, Nothing(), false); - auto afterListing = s3Lister->Next().Apply([path = meta->TableLocation](const NThreading::TFuture& listResFut) { + auto afterListing = s3Lister->Next().Apply([path = effectiveFilePattern](const NThreading::TFuture& listResFut) { auto& listRes = listResFut.GetValue(); if (std::holds_alternative(listRes)) { auto& error = std::get(listRes); diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp index 804bed00e89a..97496c3068e7 100644 --- a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp +++ b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp @@ -281,7 +281,6 @@ TTableMetadataResult GetExternalDataSourceMetadataResult(const NSchemeCache::TSc tableMeta->ExternalSource.DataSourceAuth = description.GetAuth(); tableMeta->ExternalSource.Properties = description.GetProperties(); tableMeta->ExternalSource.DataSourcePath = tableName; - tableMeta->ExternalSource.TableLocation = JoinPath(entry.Path); return result; } @@ -831,14 +830,14 @@ NThreading::TFuture TKqpTableMetadataLoader::LoadTableMeta switch (entry.Kind) { case EKind::KindExternalDataSource: { - if (externalPath) { - entry.Path = SplitPath(*externalPath); - } auto externalDataSourceMetadata = GetLoadTableMetadataResult(entry, cluster, mainCluster, table); if (!externalDataSourceMetadata.Success() || !settings.RequestAuthInfo_) { promise.SetValue(externalDataSourceMetadata); return; } + if (externalPath) { + externalDataSourceMetadata.Metadata->ExternalSource.TableLocation = *externalPath; + } LoadExternalDataSourceSecretValues(entry, userToken, ActorSystem) .Subscribe([promise, externalDataSourceMetadata, settings](const TFuture& result) mutable { diff --git a/ydb/tests/fq/s3/test_s3_0.py b/ydb/tests/fq/s3/test_s3_0.py index 232513cc4d71..923a16f5fa1d 100644 --- a/ydb/tests/fq/s3/test_s3_0.py +++ b/ydb/tests/fq/s3/test_s3_0.py @@ -252,6 +252,64 @@ def test_inference_optional_types(self, kikimr, s3, client, unique_prefix): assert result_set.rows[2].items[2].int64_value == 15 assert result_set.rows[2].items[3].int64_value == 33 + @yq_v2 + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + def test_inference_multiple_files(self, kikimr, s3, client, unique_prefix): + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("fbucket") + bucket.create(ACL='public-read') + bucket.objects.all().delete() + + s3_client = boto3.client( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + read_data_1 = '''a,b,c +1,2,3 +1,2,3''' + read_data_2 = '''a,b,c +1,2,3''' + + s3_client.put_object(Body=read_data_1, Bucket='fbucket', Key='/1.csv', ContentType='text/plain') + s3_client.put_object(Body=read_data_2, Bucket='fbucket', Key='/2.csv', ContentType='text/plain') + kikimr.control_plane.wait_bootstrap(1) + storage_connection_name = unique_prefix + "multiple_files_bucket" + client.create_storage_connection(storage_connection_name, "fbucket") + + sql = f''' + SELECT * + FROM `{storage_connection_name}`.`/` + WITH (format=csv_with_names, with_infer='true'); + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + data = client.get_result_data(query_id) + result_set = data.result.result_set + logging.debug(str(result_set)) + assert len(result_set.columns) == 3 + assert result_set.columns[0].name == "a" + assert result_set.columns[0].type.optional_type.item.type_id == ydb.Type.INT64 + assert result_set.columns[1].name == "b" + assert result_set.columns[1].type.optional_type.item.type_id == ydb.Type.INT64 + assert result_set.columns[2].name == "c" + assert result_set.columns[2].type.optional_type.item.type_id == ydb.Type.INT64 + assert len(result_set.rows) == 3 + assert result_set.rows[0].items[0].int64_value == 1 + assert result_set.rows[0].items[1].int64_value == 2 + assert result_set.rows[0].items[2].int64_value == 3 + assert result_set.rows[1].items[0].int64_value == 1 + assert result_set.rows[1].items[1].int64_value == 2 + assert result_set.rows[1].items[2].int64_value == 3 + assert result_set.rows[2].items[0].int64_value == 1 + assert result_set.rows[2].items[1].int64_value == 2 + assert result_set.rows[2].items[2].int64_value == 3 + assert sum(kikimr.control_plane.get_metering(1)) == 10 + @yq_all @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) def test_csv_with_hopping(self, kikimr, s3, client, unique_prefix):