From 7e161fec2ae19a993f005a6894d5f7dcf597527b Mon Sep 17 00:00:00 2001 From: Oleg Doronin Date: Sun, 18 Aug 2024 18:36:52 +0000 Subject: [PATCH] listing strategy has been fixed --- .../s3/object_listers/yql_s3_list.cpp | 37 ++++++++++++++----- .../s3/provider/yql_s3_listing_strategy.cpp | 8 +++- ydb/tests/fq/s3/test_s3_1.py | 22 ++++++++--- ydb/tests/tools/fq_runner/kikimr_runner.py | 2 +- 4 files changed, 52 insertions(+), 17 deletions(-) diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp index be5fd6134c2e..c76fe0577505 100644 --- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp +++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -25,7 +26,7 @@ namespace NYql::NS3Lister { IOutputStream& operator<<(IOutputStream& stream, const TListingRequest& request) { - return stream << "TListingRequest{.url=" << request.Url + return stream << "[TS3Lister] TListingRequest{.url=" << request.Url << ",.Prefix=" << request.Prefix << ",.Pattern=" << request.Pattern << ",.PatternType=" << request.PatternType @@ -50,7 +51,7 @@ std::pair MakeFilterRegexp(const TString& regex, const size_t numGroups = re->NumberOfCapturingGroups(); YQL_CLOG(DEBUG, ProviderS3) - << "Got regex: '" << regex << "' with " << numGroups << " capture groups "; + << "[TS3Lister] Got regex: '" << regex << "' with " << numGroups << " capture groups "; auto groups = std::make_shared>(numGroups); auto reArgs = std::make_shared>(numGroups); @@ -100,7 +101,7 @@ std::pair MakeFilterWildcard(const TString& patt } const auto regex = NS3::RegexFromWildcards(pattern); - YQL_CLOG(DEBUG, ProviderS3) << "Got prefix: '" << regexPatternPrefix << "', regex: '" + YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister] Got prefix: '" << regexPatternPrefix << "', regex: '" << regex << "' from original pattern '" << pattern << "'"; return MakeFilterRegexp(regex, sharedCtx); @@ -237,6 +238,7 @@ class TS3Lister : public IS3Lister { const TMaybe Delimiter; const TMaybe ContinuationToken; const ui64 MaxKeys; + const std::pair CurrentLogContextPath; }; TS3Lister( @@ -269,7 +271,8 @@ class TS3Lister : public IS3Lister { std::move(request), delimiter, Nothing(), - MaxFilesPerQuery}; + MaxFilesPerQuery, + NLog::CurrentLogContextPath()}; YQL_CLOG(TRACE, ProviderS3) << "[TS3Lister] Got URL: '" << ctx.ListingRequest.Url @@ -334,9 +337,23 @@ class TS3Lister : public IS3Lister { /*data=*/"", retryPolicy); } + + static NActors::TActorSystem* GetActorSystem() { + return NActors::TlsActivationContext ? NActors::TActivationContext::ActorSystem() : nullptr; + } + static IHTTPGateway::TOnResult CallbackFactoryMethod(TListingContext&& listingContext) { - return [c = std::move(listingContext)](IHTTPGateway::TResult&& result) { - OnDiscovery(c, std::move(result)); + return [c = std::move(listingContext), actorSystem = GetActorSystem()](IHTTPGateway::TResult&& result) { + if (actorSystem) { + NDq::TYqlLogScope logScope(actorSystem, NKikimrServices::KQP_YQL, c.CurrentLogContextPath.first, c.CurrentLogContextPath.second); + OnDiscovery(c, std::move(result)); + } else { + /* + If the subsystem doesn't use the actor system + then there is a need to use an own YqlLoggerScope on the top level + */ + OnDiscovery(c, std::move(result)); + } }; } @@ -350,7 +367,7 @@ class TS3Lister : public IS3Lister { const NXml::TDocument xml(xmlString, NXml::TDocument::String); auto parsedResponse = ParseListObjectV2Response(xml, ctx.RequestId); YQL_CLOG(DEBUG, ProviderS3) - << "Listing of " << ctx.ListingRequest.Url + << "[TS3Lister] Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix << ": have " << ctx.Output->Size() << " entries, got another " << parsedResponse.KeyCount << " entries, request id: [" << ctx.RequestId << "]"; @@ -379,7 +396,7 @@ class TS3Lister : public IS3Lister { } if (parsedResponse.IsTruncated && !earlyStop) { - YQL_CLOG(DEBUG, ProviderS3) << "Listing of " << ctx.ListingRequest.Url + YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister] Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix << ": got truncated flag, will continue"; @@ -408,14 +425,14 @@ class TS3Lister : public IS3Lister { TStringBuilder{} << "request id: [" << ctx.RequestId << "]", std::move(result.Issues)); YQL_CLOG(INFO, ProviderS3) - << "Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix + << "[TS3Lister] Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix << ": got error from http gateway: " << issues.ToString(true); ctx.Promise.SetValue(TListError{EListError::GENERAL, std::move(issues)}); ctx.NextRequestPromise.SetValue(Nothing()); } } catch (const std::exception& ex) { YQL_CLOG(INFO, ProviderS3) - << "Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix + << "[TS3Lister] Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix << " : got exception: " << ex.what(); ctx.Promise.SetException(std::current_exception()); ctx.NextRequestPromise.SetValue(Nothing()); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp index 1c582018e157..18c374425eb3 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp @@ -492,8 +492,14 @@ class TBFSDirectoryResolverIterator : public IS3Lister { }); return NextDirectoryListeningChunk; } + + static TString ParseBasePath(const TString& path) { + TString basePath = TString{TStringBuf{path}.RBefore('/')}; + return basePath == path && !basePath.EndsWith('/') ? TString{} : basePath; + } + void PerformEarlyStop(TListEntries& result, const TString& sourcePrefix) { - result.Directories.push_back({.Path = sourcePrefix}); + result.Directories.push_back({.Path = ParseBasePath(sourcePrefix)}); for (auto& directoryPrefix : DirectoryPrefixQueue) { result.Directories.push_back({.Path = directoryPrefix}); } diff --git a/ydb/tests/fq/s3/test_s3_1.py b/ydb/tests/fq/s3/test_s3_1.py index dc503797a69c..20a65d213d9b 100644 --- a/ydb/tests/fq/s3/test_s3_1.py +++ b/ydb/tests/fq/s3/test_s3_1.py @@ -482,6 +482,8 @@ def test_huge_source(self, kikimr, s3, client, runtime_listing, unique_prefix): # 1024 x 1024 x 10 = 10 MB of raw data + little overhead for header, eols etc assert sum(kikimr.control_plane.get_metering(1)) == 21 + # it looks like the runtime_listing for v1 doesn't work in case of + # restart of query because the v1 keeps the compiled query in the cache @yq_all @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) @pytest.mark.parametrize("runtime_listing", ["false", "true"]) @@ -502,11 +504,11 @@ def test_top_level_listing(self, kikimr, s3, client, runtime_listing, unique_pre Banana,3,100 Apple,2,22 Pear,15,33''' - s3_client.put_object(Body=fruits, Bucket='fbucket', Key='/2024-08-09.csv', ContentType='text/plain') - s3_client.put_object(Body=fruits, Bucket='fbucket', Key='/2024-08-08.csv', ContentType='text/plain') + s3_client.put_object(Body=fruits, Bucket='fbucket', Key='2024-08-09.csv', ContentType='text/plain') + s3_client.put_object(Body=fruits, Bucket='fbucket', Key='2024-08-08.csv', ContentType='text/plain') kikimr.control_plane.wait_bootstrap(1) - storage_connection_name = unique_prefix + "fruitbucket" + storage_connection_name = unique_prefix + "test_top_level_listing" client.create_storage_connection(storage_connection_name, "fbucket") sql = f''' @@ -518,7 +520,8 @@ def test_top_level_listing(self, kikimr, s3, client, runtime_listing, unique_pre Fruit String NOT NULL, Price Int NOT NULL, Weight Int NOT NULL - )); + ) + ); ''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id @@ -534,7 +537,7 @@ def test_top_level_listing(self, kikimr, s3, client, runtime_listing, unique_pre assert result_set.columns[1].type.type_id == ydb.Type.INT32 assert result_set.columns[2].name == "Weight" assert result_set.columns[2].type.type_id == ydb.Type.INT32 - assert len(result_set.rows) == 3 + assert len(result_set.rows) == 6 assert result_set.rows[0].items[0].bytes_value == b"Banana" assert result_set.rows[0].items[1].int32_value == 3 assert result_set.rows[0].items[2].int32_value == 100 @@ -544,4 +547,13 @@ def test_top_level_listing(self, kikimr, s3, client, runtime_listing, unique_pre assert result_set.rows[2].items[0].bytes_value == b"Pear" assert result_set.rows[2].items[1].int32_value == 15 assert result_set.rows[2].items[2].int32_value == 33 + assert result_set.rows[3].items[0].bytes_value == b"Banana" + assert result_set.rows[3].items[1].int32_value == 3 + assert result_set.rows[3].items[2].int32_value == 100 + assert result_set.rows[4].items[0].bytes_value == b"Apple" + assert result_set.rows[4].items[1].int32_value == 2 + assert result_set.rows[4].items[2].int32_value == 22 + assert result_set.rows[5].items[0].bytes_value == b"Pear" + assert result_set.rows[5].items[1].int32_value == 15 + assert result_set.rows[5].items[2].int32_value == 33 assert sum(kikimr.control_plane.get_metering(1)) == 10 diff --git a/ydb/tests/tools/fq_runner/kikimr_runner.py b/ydb/tests/tools/fq_runner/kikimr_runner.py index f1ed9a18d6d9..3161591f2c78 100644 --- a/ydb/tests/tools/fq_runner/kikimr_runner.py +++ b/ydb/tests/tools/fq_runner/kikimr_runner.py @@ -484,7 +484,7 @@ def fill_config(self, control_plane): self.config_generator.yaml_config['grpc_config']['skip_scheme_check'] = True self.config_generator.yaml_config['grpc_config']['services'] = ["local_discovery", "yq", "yq_private"] # yq services - fq_config['control_plane_storage']['task_lease_ttl'] = "10s" + fq_config['control_plane_storage']['task_lease_ttl'] = "20s" self.fill_storage_config(fq_config['control_plane_storage']['storage'], "DbPoolStorage_" + self.uuid) else: self.config_generator.yaml_config.pop('grpc_config', None)