Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

listing fix #7643

Merged
merged 4 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,6 @@ struct TObjectStorageExternalSource : public IExternalSource {
};

virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) override {
Y_UNUSED(ActorSystem);
auto format = meta->Attributes.FindPtr("format");
if (!format || !meta->Attributes.contains("withinfer")) {
return NThreading::MakeFuture(std::move(meta));
Expand Down Expand Up @@ -322,7 +321,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
.Url = meta->DataSourceLocation,
.Credentials = credentials,
.Pattern = effectiveFilePattern,
}, Nothing(), AllowLocalFiles);
}, Nothing(), AllowLocalFiles, ActorSystem);
auto afterListing = s3Lister->Next().Apply([path = effectiveFilePattern](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
auto& listRes = listResFut.GetValue();
if (std::holds_alternative<NYql::NS3Lister::TListError>(listRes)) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/actors/run_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1971,7 +1971,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {

{
dataProvidersInit.push_back(GetS3DataProviderInitializer(Params.S3Gateway, Params.CredentialsFactory,
Params.Config.GetReadActorsFactoryConfig().HasS3ReadActorFactoryConfig() ? Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles() : Params.Config.GetGateways().GetS3().GetAllowLocalFiles())); // This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig
Params.Config.GetReadActorsFactoryConfig().HasS3ReadActorFactoryConfig() ? Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles() : Params.Config.GetGateways().GetS3().GetAllowLocalFiles(), NActors::TActivationContext::ActorSystem())); // This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig
}

{
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1700,6 +1700,7 @@ class TKqpHost : public IKqpHost {
state->Gateway = FederatedQuerySetup->HttpGateway;
state->GatewayRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
state->ExecutorPoolId = AppData()->UserPoolId;
state->ActorSystem = ActorSystem;

auto dataSource = NYql::CreateS3DataSource(state);
auto dataSink = NYql::CreateS3DataSink(state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,8 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
PatternType,
object.GetPath()},
Nothing(),
AllowLocalFiles);
AllowLocalFiles,
NActors::TActivationContext::ActorSystem());
Fetch();
return true;
}
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/s3/object_listers/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ PEERDIR(
ydb/library/yql/providers/common/http_gateway
ydb/library/yql/providers/s3/credentials
ydb/library/yql/utils
ydb/library/yql/utils/actor_log
ydb/library/yql/utils/threading
)

Expand Down
54 changes: 37 additions & 17 deletions ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h>
#include <ydb/library/yql/providers/s3/common/util.h>
#include <ydb/library/yql/utils/actor_log/log.h>
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/library/yql/utils/url_builder.h>
#include <ydb/library/yql/utils/yql_panic.h>
Expand All @@ -25,7 +26,7 @@
namespace NYql::NS3Lister {

IOutputStream& operator<<(IOutputStream& stream, const TListingRequest& request) {
return stream << "TListingRequest{.url=" << request.Url
return stream << "[TS3Lister] TListingRequest{.url=" << request.Url
<< ",.Prefix=" << request.Prefix
<< ",.Pattern=" << request.Pattern
<< ",.PatternType=" << request.PatternType
Expand All @@ -50,7 +51,7 @@ std::pair<TPathFilter, TEarlyStopChecker> MakeFilterRegexp(const TString& regex,

const size_t numGroups = re->NumberOfCapturingGroups();
YQL_CLOG(DEBUG, ProviderS3)
<< "Got regex: '" << regex << "' with " << numGroups << " capture groups ";
<< "[TS3Lister] Got regex: '" << regex << "' with " << numGroups << " capture groups ";

auto groups = std::make_shared<std::vector<std::string>>(numGroups);
auto reArgs = std::make_shared<std::vector<re2::RE2::Arg>>(numGroups);
Expand Down Expand Up @@ -100,7 +101,7 @@ std::pair<TPathFilter, TEarlyStopChecker> MakeFilterWildcard(const TString& patt
}

const auto regex = NS3::RegexFromWildcards(pattern);
YQL_CLOG(DEBUG, ProviderS3) << "Got prefix: '" << regexPatternPrefix << "', regex: '"
YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister] Got prefix: '" << regexPatternPrefix << "', regex: '"
<< regex << "' from original pattern '" << pattern << "'";

return MakeFilterRegexp(regex, sharedCtx);
Expand Down Expand Up @@ -237,6 +238,8 @@ class TS3Lister : public IS3Lister {
const TMaybe<TString> Delimiter;
const TMaybe<TString> ContinuationToken;
const ui64 MaxKeys;
const std::pair<TString, TString> CurrentLogContextPath;
const NActors::TActorSystem* ActorSystem;
};

TS3Lister(
Expand All @@ -245,7 +248,8 @@ class TS3Lister : public IS3Lister {
const TListingRequest& listingRequest,
const TMaybe<TString>& delimiter,
size_t maxFilesPerQuery,
TSharedListingContextPtr sharedCtx)
TSharedListingContextPtr sharedCtx,
NActors::TActorSystem* actorSystem)
: MaxFilesPerQuery(maxFilesPerQuery) {
Y_ENSURE(
listingRequest.Url.substr(0, 7) != "file://",
Expand All @@ -269,7 +273,9 @@ class TS3Lister : public IS3Lister {
std::move(request),
delimiter,
Nothing(),
MaxFilesPerQuery};
MaxFilesPerQuery,
NLog::CurrentLogContextPath(),
actorSystem};

YQL_CLOG(TRACE, ProviderS3)
<< "[TS3Lister] Got URL: '" << ctx.ListingRequest.Url
Expand Down Expand Up @@ -334,9 +340,19 @@ class TS3Lister : public IS3Lister {
/*data=*/"",
retryPolicy);
}

static IHTTPGateway::TOnResult CallbackFactoryMethod(TListingContext&& listingContext) {
return [c = std::move(listingContext)](IHTTPGateway::TResult&& result) {
OnDiscovery(c, std::move(result));
if (c.ActorSystem) {
NDq::TYqlLogScope logScope(c.ActorSystem, NKikimrServices::KQP_YQL, c.CurrentLogContextPath.first, c.CurrentLogContextPath.second);
OnDiscovery(c, std::move(result));
} else {
/*
If the subsystem doesn't use the actor system
then there is a need to use an own YqlLoggerScope on the top level
*/
OnDiscovery(c, std::move(result));
}
};
}

Expand All @@ -350,7 +366,7 @@ class TS3Lister : public IS3Lister {
const NXml::TDocument xml(xmlString, NXml::TDocument::String);
auto parsedResponse = ParseListObjectV2Response(xml, ctx.RequestId);
YQL_CLOG(DEBUG, ProviderS3)
<< "Listing of " << ctx.ListingRequest.Url
<< "[TS3Lister] Listing of " << ctx.ListingRequest.Url
<< ctx.ListingRequest.Prefix << ": have " << ctx.Output->Size()
<< " entries, got another " << parsedResponse.KeyCount
<< " entries, request id: [" << ctx.RequestId << "]";
Expand Down Expand Up @@ -379,7 +395,7 @@ class TS3Lister : public IS3Lister {
}

if (parsedResponse.IsTruncated && !earlyStop) {
YQL_CLOG(DEBUG, ProviderS3) << "Listing of " << ctx.ListingRequest.Url
YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister] Listing of " << ctx.ListingRequest.Url
<< ctx.ListingRequest.Prefix
<< ": got truncated flag, will continue";

Expand Down Expand Up @@ -408,14 +424,14 @@ class TS3Lister : public IS3Lister {
TStringBuilder{} << "request id: [" << ctx.RequestId << "]",
std::move(result.Issues));
YQL_CLOG(INFO, ProviderS3)
<< "Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix
<< "[TS3Lister] Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix
<< ": got error from http gateway: " << issues.ToString(true);
ctx.Promise.SetValue(TListError{EListError::GENERAL, std::move(issues)});
ctx.NextRequestPromise.SetValue(Nothing());
}
} catch (const std::exception& ex) {
YQL_CLOG(INFO, ProviderS3)
<< "Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix
<< "[TS3Lister] Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix
<< " : got exception: " << ex.what();
ctx.Promise.SetException(std::current_exception());
ctx.NextRequestPromise.SetValue(Nothing());
Expand Down Expand Up @@ -451,9 +467,10 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory {
using TPtr = std::shared_ptr<TS3ParallelLimitedListerFactory>;

explicit TS3ParallelLimitedListerFactory(
size_t maxParallelOps, TSharedListingContextPtr sharedCtx)
size_t maxParallelOps, TSharedListingContextPtr sharedCtx, NActors::TActorSystem* actorSystem)
: SharedCtx(std::move(sharedCtx))
, Semaphore(TAsyncSemaphore::Make(std::max<size_t>(1, maxParallelOps))) { }
, Semaphore(TAsyncSemaphore::Make(std::max<size_t>(1, maxParallelOps)))
, ActorSystem(actorSystem) { }

TFuture<NS3Lister::IS3Lister::TPtr> Make(
const IHTTPGateway::TPtr& httpGateway,
Expand All @@ -463,10 +480,10 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory {
bool allowLocalFiles) override {
auto acquired = Semaphore->AcquireAsync();
return acquired.Apply(
[ctx = SharedCtx, httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles](const auto& f) {
[ctx = SharedCtx, httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles, actorSystem = ActorSystem](const auto& f) {
return std::shared_ptr<NS3Lister::IS3Lister>(new TListerLockReleaseWrapper{
NS3Lister::MakeS3Lister(
httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles, ctx),
httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles, actorSystem, ctx),
std::make_unique<TAsyncSemaphore::TAutoRelease>(
f.GetValue()->MakeAutoRelease())});
});
Expand Down Expand Up @@ -502,6 +519,7 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory {
private:
TSharedListingContextPtr SharedCtx;
const TAsyncSemaphore::TPtr Semaphore;
NActors::TActorSystem* ActorSystem;
};

} // namespace
Expand All @@ -512,10 +530,11 @@ IS3Lister::TPtr MakeS3Lister(
const TListingRequest& listingRequest,
const TMaybe<TString>& delimiter,
bool allowLocalFiles,
NActors::TActorSystem* actorSystem,
TSharedListingContextPtr sharedCtx) {
if (listingRequest.Url.substr(0, 7) != "file://") {
return std::make_shared<TS3Lister>(
httpGateway, retryPolicy, listingRequest, delimiter, 1000, std::move(sharedCtx));
httpGateway, retryPolicy, listingRequest, delimiter, 1000, std::move(sharedCtx), actorSystem);
}

if (!allowLocalFiles) {
Expand All @@ -529,13 +548,14 @@ IS3ListerFactory::TPtr MakeS3ListerFactory(
size_t maxParallelOps,
size_t callbackThreadCount,
size_t callbackPerThreadQueueSize,
size_t regexpCacheSize) {
size_t regexpCacheSize,
NActors::TActorSystem* actorSystem) {
std::shared_ptr<TSharedListingContext> sharedCtx = nullptr;
if (callbackThreadCount != 0 || regexpCacheSize != 0) {
sharedCtx = std::make_shared<TSharedListingContext>(
callbackThreadCount, callbackPerThreadQueueSize, regexpCacheSize);
}
return std::make_shared<TS3ParallelLimitedListerFactory>(maxParallelOps, sharedCtx);
return std::make_shared<TS3ParallelLimitedListerFactory>(maxParallelOps, sharedCtx, actorSystem);
}

} // namespace NYql::NS3Lister
5 changes: 4 additions & 1 deletion ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <library/cpp/cache/cache.h>
#include <library/cpp/threading/future/future.h>
#include <util/thread/pool.h>
#include <ydb/library/actors/core/actorsystem.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/s3/credentials/credentials.h>

Expand Down Expand Up @@ -169,6 +170,7 @@ IS3Lister::TPtr MakeS3Lister(
const TListingRequest& listingRequest,
const TMaybe<TString>& delimiter,
bool allowLocalFiles,
NActors::TActorSystem* actorSystem,
TSharedListingContextPtr sharedCtx = nullptr);

class IS3ListerFactory {
Expand All @@ -189,7 +191,8 @@ IS3ListerFactory::TPtr MakeS3ListerFactory(
size_t maxParallelOps,
size_t callbackThreadCount,
size_t callbackPerThreadQueueSize,
size_t regexpCacheSize);
size_t regexpCacheSize,
NActors::TActorSystem* actorSystem);

} // namespace NS3Lister
} // namespace NYql
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ class TS3IODiscoveryTransformer : public TGraphTransformerBase {
State_->Configuration->MaxInflightListsPerQuery,
State_->Configuration->ListingCallbackThreadCount,
State_->Configuration->ListingCallbackPerThreadQueueSize,
State_->Configuration->RegexpCacheSize))
State_->Configuration->RegexpCacheSize,
State_->ActorSystem))
, ListingStrategy_(MakeS3ListingStrategy(
State_->Gateway,
State_->GatewayRetryPolicy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,14 @@ class TBFSDirectoryResolverIterator : public IS3Lister {
});
return NextDirectoryListeningChunk;
}

static TString ParseBasePath(const TString& path) {
TString basePath = TString{TStringBuf{path}.RBefore('/')};
return basePath == path && !basePath.EndsWith('/') ? TString{} : basePath;
}

void PerformEarlyStop(TListEntries& result, const TString& sourcePrefix) {
result.Directories.push_back({.Path = sourcePrefix});
result.Directories.push_back({.Path = ParseBasePath(sourcePrefix)});
for (auto& directoryPrefix : DirectoryPrefixQueue) {
result.Directories.push_back({.Path = directoryPrefix});
}
Expand Down
5 changes: 3 additions & 2 deletions ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

namespace NYql {

TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool allowLocalFiles) {
return [gateway, credentialsFactory, allowLocalFiles] (
TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool allowLocalFiles, NActors::TActorSystem* actorSystem) {
return [gateway, credentialsFactory, allowLocalFiles, actorSystem] (
const TString& userName,
const TString& sessionId,
const TGatewaysConfig* gatewaysConfig,
Expand All @@ -31,6 +31,7 @@ TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway
state->Types = typeCtx.Get();
state->FunctionRegistry = functionRegistry;
state->CredentialsFactory = credentialsFactory;
state->ActorSystem = actorSystem;
if (gatewaysConfig) {
state->Configuration->Init(gatewaysConfig->GetS3(), typeCtx);
}
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/providers/s3/provider/yql_s3_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ struct TS3State : public TThrRefBase
IHTTPGateway::TRetryPolicy::TPtr GatewayRetryPolicy = GetHTTPDefaultRetryPolicy();
ui32 ExecutorPoolId = 0;
std::list<TVector<TString>> PrimaryKeys;
NActors::TActorSystem* ActorSystem = nullptr;
};

TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr, bool allowLocalFiles = false);
TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr, bool allowLocalFiles = false, NActors::TActorSystem* actorSystem = nullptr);

TIntrusivePtr<IDataProvider> CreateS3DataSource(TS3State::TPtr state);
TIntrusivePtr<IDataProvider> CreateS3DataSink(TS3State::TPtr state);
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/tools/dqrun/dqrun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ int RunMain(int argc, const char* argv[])
if (!httpGateway) {
httpGateway = IHTTPGateway::Make(gatewaysConfig.HasHttpGateway() ? &gatewaysConfig.GetHttpGateway() : nullptr);
}
dataProvidersInit.push_back(GetS3DataProviderInitializer(httpGateway, nullptr, true));
dataProvidersInit.push_back(GetS3DataProviderInitializer(httpGateway, nullptr, true, actorSystemManager->GetActorSystem()));
}

if (gatewaysConfig.HasPq()) {
Expand Down
Loading
Loading