Skip to content

Commit

Permalink
listing strategy has been fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
dorooleg committed Aug 21, 2024
1 parent b331d70 commit 7e161fe
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 17 deletions.
37 changes: 27 additions & 10 deletions ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h>
#include <ydb/library/yql/providers/s3/common/util.h>
#include <ydb/library/yql/utils/actor_log/log.h>
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/library/yql/utils/url_builder.h>
#include <ydb/library/yql/utils/yql_panic.h>
Expand All @@ -25,7 +26,7 @@
namespace NYql::NS3Lister {

IOutputStream& operator<<(IOutputStream& stream, const TListingRequest& request) {
return stream << "TListingRequest{.url=" << request.Url
return stream << "[TS3Lister] TListingRequest{.url=" << request.Url
<< ",.Prefix=" << request.Prefix
<< ",.Pattern=" << request.Pattern
<< ",.PatternType=" << request.PatternType
Expand All @@ -50,7 +51,7 @@ std::pair<TPathFilter, TEarlyStopChecker> MakeFilterRegexp(const TString& regex,

const size_t numGroups = re->NumberOfCapturingGroups();
YQL_CLOG(DEBUG, ProviderS3)
<< "Got regex: '" << regex << "' with " << numGroups << " capture groups ";
<< "[TS3Lister] Got regex: '" << regex << "' with " << numGroups << " capture groups ";

auto groups = std::make_shared<std::vector<std::string>>(numGroups);
auto reArgs = std::make_shared<std::vector<re2::RE2::Arg>>(numGroups);
Expand Down Expand Up @@ -100,7 +101,7 @@ std::pair<TPathFilter, TEarlyStopChecker> MakeFilterWildcard(const TString& patt
}

const auto regex = NS3::RegexFromWildcards(pattern);
YQL_CLOG(DEBUG, ProviderS3) << "Got prefix: '" << regexPatternPrefix << "', regex: '"
YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister] Got prefix: '" << regexPatternPrefix << "', regex: '"
<< regex << "' from original pattern '" << pattern << "'";

return MakeFilterRegexp(regex, sharedCtx);
Expand Down Expand Up @@ -237,6 +238,7 @@ class TS3Lister : public IS3Lister {
const TMaybe<TString> Delimiter;
const TMaybe<TString> ContinuationToken;
const ui64 MaxKeys;
const std::pair<TString, TString> CurrentLogContextPath;
};

TS3Lister(
Expand Down Expand Up @@ -269,7 +271,8 @@ class TS3Lister : public IS3Lister {
std::move(request),
delimiter,
Nothing(),
MaxFilesPerQuery};
MaxFilesPerQuery,
NLog::CurrentLogContextPath()};

YQL_CLOG(TRACE, ProviderS3)
<< "[TS3Lister] Got URL: '" << ctx.ListingRequest.Url
Expand Down Expand Up @@ -334,9 +337,23 @@ class TS3Lister : public IS3Lister {
/*data=*/"",
retryPolicy);
}

static NActors::TActorSystem* GetActorSystem() {
return NActors::TlsActivationContext ? NActors::TActivationContext::ActorSystem() : nullptr;
}

static IHTTPGateway::TOnResult CallbackFactoryMethod(TListingContext&& listingContext) {
return [c = std::move(listingContext)](IHTTPGateway::TResult&& result) {
OnDiscovery(c, std::move(result));
return [c = std::move(listingContext), actorSystem = GetActorSystem()](IHTTPGateway::TResult&& result) {
if (actorSystem) {
NDq::TYqlLogScope logScope(actorSystem, NKikimrServices::KQP_YQL, c.CurrentLogContextPath.first, c.CurrentLogContextPath.second);
OnDiscovery(c, std::move(result));
} else {
/*
If the subsystem doesn't use the actor system
then there is a need to use an own YqlLoggerScope on the top level
*/
OnDiscovery(c, std::move(result));
}
};
}

Expand All @@ -350,7 +367,7 @@ class TS3Lister : public IS3Lister {
const NXml::TDocument xml(xmlString, NXml::TDocument::String);
auto parsedResponse = ParseListObjectV2Response(xml, ctx.RequestId);
YQL_CLOG(DEBUG, ProviderS3)
<< "Listing of " << ctx.ListingRequest.Url
<< "[TS3Lister] Listing of " << ctx.ListingRequest.Url
<< ctx.ListingRequest.Prefix << ": have " << ctx.Output->Size()
<< " entries, got another " << parsedResponse.KeyCount
<< " entries, request id: [" << ctx.RequestId << "]";
Expand Down Expand Up @@ -379,7 +396,7 @@ class TS3Lister : public IS3Lister {
}

if (parsedResponse.IsTruncated && !earlyStop) {
YQL_CLOG(DEBUG, ProviderS3) << "Listing of " << ctx.ListingRequest.Url
YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister] Listing of " << ctx.ListingRequest.Url
<< ctx.ListingRequest.Prefix
<< ": got truncated flag, will continue";

Expand Down Expand Up @@ -408,14 +425,14 @@ class TS3Lister : public IS3Lister {
TStringBuilder{} << "request id: [" << ctx.RequestId << "]",
std::move(result.Issues));
YQL_CLOG(INFO, ProviderS3)
<< "Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix
<< "[TS3Lister] Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix
<< ": got error from http gateway: " << issues.ToString(true);
ctx.Promise.SetValue(TListError{EListError::GENERAL, std::move(issues)});
ctx.NextRequestPromise.SetValue(Nothing());
}
} catch (const std::exception& ex) {
YQL_CLOG(INFO, ProviderS3)
<< "Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix
<< "[TS3Lister] Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix
<< " : got exception: " << ex.what();
ctx.Promise.SetException(std::current_exception());
ctx.NextRequestPromise.SetValue(Nothing());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,14 @@ class TBFSDirectoryResolverIterator : public IS3Lister {
});
return NextDirectoryListeningChunk;
}

static TString ParseBasePath(const TString& path) {
TString basePath = TString{TStringBuf{path}.RBefore('/')};
return basePath == path && !basePath.EndsWith('/') ? TString{} : basePath;
}

void PerformEarlyStop(TListEntries& result, const TString& sourcePrefix) {
result.Directories.push_back({.Path = sourcePrefix});
result.Directories.push_back({.Path = ParseBasePath(sourcePrefix)});
for (auto& directoryPrefix : DirectoryPrefixQueue) {
result.Directories.push_back({.Path = directoryPrefix});
}
Expand Down
22 changes: 17 additions & 5 deletions ydb/tests/fq/s3/test_s3_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,8 @@ def test_huge_source(self, kikimr, s3, client, runtime_listing, unique_prefix):
# 1024 x 1024 x 10 = 10 MB of raw data + little overhead for header, eols etc
assert sum(kikimr.control_plane.get_metering(1)) == 21

# it looks like the runtime_listing for v1 doesn't work in case of
# restart of query because the v1 keeps the compiled query in the cache
@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
@pytest.mark.parametrize("runtime_listing", ["false", "true"])
Expand All @@ -502,11 +504,11 @@ def test_top_level_listing(self, kikimr, s3, client, runtime_listing, unique_pre
Banana,3,100
Apple,2,22
Pear,15,33'''
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='/2024-08-09.csv', ContentType='text/plain')
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='/2024-08-08.csv', ContentType='text/plain')
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='2024-08-09.csv', ContentType='text/plain')
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='2024-08-08.csv', ContentType='text/plain')

kikimr.control_plane.wait_bootstrap(1)
storage_connection_name = unique_prefix + "fruitbucket"
storage_connection_name = unique_prefix + "test_top_level_listing"
client.create_storage_connection(storage_connection_name, "fbucket")

sql = f'''
Expand All @@ -518,7 +520,8 @@ def test_top_level_listing(self, kikimr, s3, client, runtime_listing, unique_pre
Fruit String NOT NULL,
Price Int NOT NULL,
Weight Int NOT NULL
));
)
);
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
Expand All @@ -534,7 +537,7 @@ def test_top_level_listing(self, kikimr, s3, client, runtime_listing, unique_pre
assert result_set.columns[1].type.type_id == ydb.Type.INT32
assert result_set.columns[2].name == "Weight"
assert result_set.columns[2].type.type_id == ydb.Type.INT32
assert len(result_set.rows) == 3
assert len(result_set.rows) == 6
assert result_set.rows[0].items[0].bytes_value == b"Banana"
assert result_set.rows[0].items[1].int32_value == 3
assert result_set.rows[0].items[2].int32_value == 100
Expand All @@ -544,4 +547,13 @@ def test_top_level_listing(self, kikimr, s3, client, runtime_listing, unique_pre
assert result_set.rows[2].items[0].bytes_value == b"Pear"
assert result_set.rows[2].items[1].int32_value == 15
assert result_set.rows[2].items[2].int32_value == 33
assert result_set.rows[3].items[0].bytes_value == b"Banana"
assert result_set.rows[3].items[1].int32_value == 3
assert result_set.rows[3].items[2].int32_value == 100
assert result_set.rows[4].items[0].bytes_value == b"Apple"
assert result_set.rows[4].items[1].int32_value == 2
assert result_set.rows[4].items[2].int32_value == 22
assert result_set.rows[5].items[0].bytes_value == b"Pear"
assert result_set.rows[5].items[1].int32_value == 15
assert result_set.rows[5].items[2].int32_value == 33
assert sum(kikimr.control_plane.get_metering(1)) == 10
2 changes: 1 addition & 1 deletion ydb/tests/tools/fq_runner/kikimr_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ def fill_config(self, control_plane):
self.config_generator.yaml_config['grpc_config']['skip_scheme_check'] = True
self.config_generator.yaml_config['grpc_config']['services'] = ["local_discovery", "yq", "yq_private"]
# yq services
fq_config['control_plane_storage']['task_lease_ttl'] = "10s"
fq_config['control_plane_storage']['task_lease_ttl'] = "20s"
self.fill_storage_config(fq_config['control_plane_storage']['storage'], "DbPoolStorage_" + self.uuid)
else:
self.config_generator.yaml_config.pop('grpc_config', None)
Expand Down

0 comments on commit 7e161fe

Please sign in to comment.