Skip to content

Commit

Permalink
AS has been moved to consturctor of s3 lister
Browse files Browse the repository at this point in the history
  • Loading branch information
dorooleg committed Aug 21, 2024
1 parent dd86240 commit 03d9d0b
Show file tree
Hide file tree
Showing 11 changed files with 37 additions and 27 deletions.
3 changes: 1 addition & 2 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,6 @@ struct TObjectStorageExternalSource : public IExternalSource {
};

virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) override {
Y_UNUSED(ActorSystem);
auto format = meta->Attributes.FindPtr("format");
if (!format || !meta->Attributes.contains("withinfer")) {
return NThreading::MakeFuture(std::move(meta));
Expand Down Expand Up @@ -322,7 +321,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
.Url = meta->DataSourceLocation,
.Credentials = credentials,
.Pattern = effectiveFilePattern,
}, Nothing(), AllowLocalFiles);
}, Nothing(), AllowLocalFiles, ActorSystem);
auto afterListing = s3Lister->Next().Apply([path = effectiveFilePattern](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
auto& listRes = listResFut.GetValue();
if (std::holds_alternative<NYql::NS3Lister::TListError>(listRes)) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/actors/run_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1971,7 +1971,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {

{
dataProvidersInit.push_back(GetS3DataProviderInitializer(Params.S3Gateway, Params.CredentialsFactory,
Params.Config.GetReadActorsFactoryConfig().HasS3ReadActorFactoryConfig() ? Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles() : Params.Config.GetGateways().GetS3().GetAllowLocalFiles())); // This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig
Params.Config.GetReadActorsFactoryConfig().HasS3ReadActorFactoryConfig() ? Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles() : Params.Config.GetGateways().GetS3().GetAllowLocalFiles(), NActors::TActivationContext::ActorSystem())); // This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig
}

{
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1700,6 +1700,7 @@ class TKqpHost : public IKqpHost {
state->Gateway = FederatedQuerySetup->HttpGateway;
state->GatewayRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
state->ExecutorPoolId = AppData()->UserPoolId;
state->ActorSystem = ActorSystem;

auto dataSource = NYql::CreateS3DataSource(state);
auto dataSink = NYql::CreateS3DataSink(state);
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,8 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
PatternType,
object.GetPath()},
Nothing(),
AllowLocalFiles);
AllowLocalFiles,
NActors::TActivationContext::ActorSystem());
Fetch();
return true;
}
Expand Down
35 changes: 19 additions & 16 deletions ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ class TS3Lister : public IS3Lister {
const TMaybe<TString> ContinuationToken;
const ui64 MaxKeys;
const std::pair<TString, TString> CurrentLogContextPath;
const NActors::TActorSystem* ActorSystem;
};

TS3Lister(
Expand All @@ -247,7 +248,8 @@ class TS3Lister : public IS3Lister {
const TListingRequest& listingRequest,
const TMaybe<TString>& delimiter,
size_t maxFilesPerQuery,
TSharedListingContextPtr sharedCtx)
TSharedListingContextPtr sharedCtx,
NActors::TActorSystem* actorSystem)
: MaxFilesPerQuery(maxFilesPerQuery) {
Y_ENSURE(
listingRequest.Url.substr(0, 7) != "file://",
Expand All @@ -272,7 +274,8 @@ class TS3Lister : public IS3Lister {
delimiter,
Nothing(),
MaxFilesPerQuery,
NLog::CurrentLogContextPath()};
NLog::CurrentLogContextPath(),
actorSystem};

YQL_CLOG(TRACE, ProviderS3)
<< "[TS3Lister] Got URL: '" << ctx.ListingRequest.Url
Expand Down Expand Up @@ -338,14 +341,10 @@ class TS3Lister : public IS3Lister {
retryPolicy);
}

static NActors::TActorSystem* GetActorSystem() {
return NActors::TlsActivationContext ? NActors::TActivationContext::ActorSystem() : nullptr;
}

static IHTTPGateway::TOnResult CallbackFactoryMethod(TListingContext&& listingContext) {
return [c = std::move(listingContext), actorSystem = GetActorSystem()](IHTTPGateway::TResult&& result) {
if (actorSystem) {
NDq::TYqlLogScope logScope(actorSystem, NKikimrServices::KQP_YQL, c.CurrentLogContextPath.first, c.CurrentLogContextPath.second);
return [c = std::move(listingContext)](IHTTPGateway::TResult&& result) {
if (c.ActorSystem) {
NDq::TYqlLogScope logScope(c.ActorSystem, NKikimrServices::KQP_YQL, c.CurrentLogContextPath.first, c.CurrentLogContextPath.second);
OnDiscovery(c, std::move(result));
} else {
/*
Expand Down Expand Up @@ -468,9 +467,10 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory {
using TPtr = std::shared_ptr<TS3ParallelLimitedListerFactory>;

explicit TS3ParallelLimitedListerFactory(
size_t maxParallelOps, TSharedListingContextPtr sharedCtx)
size_t maxParallelOps, TSharedListingContextPtr sharedCtx, NActors::TActorSystem* actorSystem)
: SharedCtx(std::move(sharedCtx))
, Semaphore(TAsyncSemaphore::Make(std::max<size_t>(1, maxParallelOps))) { }
, Semaphore(TAsyncSemaphore::Make(std::max<size_t>(1, maxParallelOps)))
, ActorSystem(actorSystem) { }

TFuture<NS3Lister::IS3Lister::TPtr> Make(
const IHTTPGateway::TPtr& httpGateway,
Expand All @@ -480,10 +480,10 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory {
bool allowLocalFiles) override {
auto acquired = Semaphore->AcquireAsync();
return acquired.Apply(
[ctx = SharedCtx, httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles](const auto& f) {
[ctx = SharedCtx, httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles, actorSystem = ActorSystem](const auto& f) {
return std::shared_ptr<NS3Lister::IS3Lister>(new TListerLockReleaseWrapper{
NS3Lister::MakeS3Lister(
httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles, ctx),
httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles, actorSystem, ctx),
std::make_unique<TAsyncSemaphore::TAutoRelease>(
f.GetValue()->MakeAutoRelease())});
});
Expand Down Expand Up @@ -519,6 +519,7 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory {
private:
TSharedListingContextPtr SharedCtx;
const TAsyncSemaphore::TPtr Semaphore;
NActors::TActorSystem* ActorSystem;
};

} // namespace
Expand All @@ -529,10 +530,11 @@ IS3Lister::TPtr MakeS3Lister(
const TListingRequest& listingRequest,
const TMaybe<TString>& delimiter,
bool allowLocalFiles,
NActors::TActorSystem* actorSystem,
TSharedListingContextPtr sharedCtx) {
if (listingRequest.Url.substr(0, 7) != "file://") {
return std::make_shared<TS3Lister>(
httpGateway, retryPolicy, listingRequest, delimiter, 1000, std::move(sharedCtx));
httpGateway, retryPolicy, listingRequest, delimiter, 1000, std::move(sharedCtx), actorSystem);
}

if (!allowLocalFiles) {
Expand All @@ -546,13 +548,14 @@ IS3ListerFactory::TPtr MakeS3ListerFactory(
size_t maxParallelOps,
size_t callbackThreadCount,
size_t callbackPerThreadQueueSize,
size_t regexpCacheSize) {
size_t regexpCacheSize,
NActors::TActorSystem* actorSystem) {
std::shared_ptr<TSharedListingContext> sharedCtx = nullptr;
if (callbackThreadCount != 0 || regexpCacheSize != 0) {
sharedCtx = std::make_shared<TSharedListingContext>(
callbackThreadCount, callbackPerThreadQueueSize, regexpCacheSize);
}
return std::make_shared<TS3ParallelLimitedListerFactory>(maxParallelOps, sharedCtx);
return std::make_shared<TS3ParallelLimitedListerFactory>(maxParallelOps, sharedCtx, actorSystem);
}

} // namespace NYql::NS3Lister
5 changes: 4 additions & 1 deletion ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <library/cpp/cache/cache.h>
#include <library/cpp/threading/future/future.h>
#include <util/thread/pool.h>
#include <ydb/library/actors/core/actorsystem.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/s3/credentials/credentials.h>

Expand Down Expand Up @@ -169,6 +170,7 @@ IS3Lister::TPtr MakeS3Lister(
const TListingRequest& listingRequest,
const TMaybe<TString>& delimiter,
bool allowLocalFiles,
NActors::TActorSystem* actorSystem,
TSharedListingContextPtr sharedCtx = nullptr);

class IS3ListerFactory {
Expand All @@ -189,7 +191,8 @@ IS3ListerFactory::TPtr MakeS3ListerFactory(
size_t maxParallelOps,
size_t callbackThreadCount,
size_t callbackPerThreadQueueSize,
size_t regexpCacheSize);
size_t regexpCacheSize,
NActors::TActorSystem* actorSystem);

} // namespace NS3Lister
} // namespace NYql
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ class TS3IODiscoveryTransformer : public TGraphTransformerBase {
State_->Configuration->MaxInflightListsPerQuery,
State_->Configuration->ListingCallbackThreadCount,
State_->Configuration->ListingCallbackPerThreadQueueSize,
State_->Configuration->RegexpCacheSize))
State_->Configuration->RegexpCacheSize,
State_->ActorSystem))
, ListingStrategy_(MakeS3ListingStrategy(
State_->Gateway,
State_->GatewayRetryPolicy,
Expand Down
5 changes: 3 additions & 2 deletions ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

namespace NYql {

TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool allowLocalFiles) {
return [gateway, credentialsFactory, allowLocalFiles] (
TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool allowLocalFiles, NActors::TActorSystem* actorSystem) {
return [gateway, credentialsFactory, allowLocalFiles, actorSystem] (
const TString& userName,
const TString& sessionId,
const TGatewaysConfig* gatewaysConfig,
Expand All @@ -31,6 +31,7 @@ TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway
state->Types = typeCtx.Get();
state->FunctionRegistry = functionRegistry;
state->CredentialsFactory = credentialsFactory;
state->ActorSystem = actorSystem;
if (gatewaysConfig) {
state->Configuration->Init(gatewaysConfig->GetS3(), typeCtx);
}
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/providers/s3/provider/yql_s3_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ struct TS3State : public TThrRefBase
IHTTPGateway::TRetryPolicy::TPtr GatewayRetryPolicy = GetHTTPDefaultRetryPolicy();
ui32 ExecutorPoolId = 0;
std::list<TVector<TString>> PrimaryKeys;
NActors::TActorSystem* ActorSystem = nullptr;
};

TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr, bool allowLocalFiles = false);
TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr, bool allowLocalFiles = false, NActors::TActorSystem* actorSystem = nullptr);

TIntrusivePtr<IDataProvider> CreateS3DataSource(TS3State::TPtr state);
TIntrusivePtr<IDataProvider> CreateS3DataSink(TS3State::TPtr state);
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/tools/dqrun/dqrun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ int RunMain(int argc, const char* argv[])
if (!httpGateway) {
httpGateway = IHTTPGateway::Make(gatewaysConfig.HasHttpGateway() ? &gatewaysConfig.GetHttpGateway() : nullptr);
}
dataProvidersInit.push_back(GetS3DataProviderInitializer(httpGateway, nullptr, true));
dataProvidersInit.push_back(GetS3DataProviderInitializer(httpGateway, nullptr, true, actorSystemManager->GetActorSystem()));
}

if (gatewaysConfig.HasPq()) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/tests/fq/s3/test_s3_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ def test_huge_source(self, kikimr, s3, client, runtime_listing, unique_prefix):
# 1024 x 1024 x 10 = 10 MB of raw data + little overhead for header, eols etc
assert sum(kikimr.control_plane.get_metering(1)) == 21

# it looks like the runtime_listing for v1 doesn't work in case of
# it looks like the runtime_listing for v1 doesn't work in case of
# restart of query because the v1 keeps the compiled query in the cache
@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
Expand Down

0 comments on commit 03d9d0b

Please sign in to comment.