From 03d9d0b662af71453197d35db2cdd480c221d9e8 Mon Sep 17 00:00:00 2001 From: Oleg Doronin Date: Wed, 21 Aug 2024 10:41:35 +0000 Subject: [PATCH] AS has been moved to consturctor of s3 lister --- ydb/core/external_sources/object_storage.cpp | 3 +- ydb/core/fq/libs/actors/run_actor.cpp | 2 +- ydb/core/kqp/host/kqp_host.cpp | 1 + .../s3/actors/yql_s3_source_queue.cpp | 3 +- .../s3/object_listers/yql_s3_list.cpp | 35 ++++++++++--------- .../providers/s3/object_listers/yql_s3_list.h | 5 ++- .../s3/provider/yql_s3_io_discovery.cpp | 3 +- .../providers/s3/provider/yql_s3_provider.cpp | 5 +-- .../providers/s3/provider/yql_s3_provider.h | 3 +- ydb/library/yql/tools/dqrun/dqrun.cpp | 2 +- ydb/tests/fq/s3/test_s3_1.py | 2 +- 11 files changed, 37 insertions(+), 27 deletions(-) diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp index fc42c5479f58..a25bbe3d6f2e 100644 --- a/ydb/core/external_sources/object_storage.cpp +++ b/ydb/core/external_sources/object_storage.cpp @@ -279,7 +279,6 @@ struct TObjectStorageExternalSource : public IExternalSource { }; virtual NThreading::TFuture> LoadDynamicMetadata(std::shared_ptr meta) override { - Y_UNUSED(ActorSystem); auto format = meta->Attributes.FindPtr("format"); if (!format || !meta->Attributes.contains("withinfer")) { return NThreading::MakeFuture(std::move(meta)); @@ -322,7 +321,7 @@ struct TObjectStorageExternalSource : public IExternalSource { .Url = meta->DataSourceLocation, .Credentials = credentials, .Pattern = effectiveFilePattern, - }, Nothing(), AllowLocalFiles); + }, Nothing(), AllowLocalFiles, ActorSystem); auto afterListing = s3Lister->Next().Apply([path = effectiveFilePattern](const NThreading::TFuture& listResFut) { auto& listRes = listResFut.GetValue(); if (std::holds_alternative(listRes)) { diff --git a/ydb/core/fq/libs/actors/run_actor.cpp b/ydb/core/fq/libs/actors/run_actor.cpp index 82b984a27ead..f66a7eee4ee1 100644 --- a/ydb/core/fq/libs/actors/run_actor.cpp +++ b/ydb/core/fq/libs/actors/run_actor.cpp @@ -1971,7 +1971,7 @@ class TRunActor : public NActors::TActorBootstrapped { { dataProvidersInit.push_back(GetS3DataProviderInitializer(Params.S3Gateway, Params.CredentialsFactory, - Params.Config.GetReadActorsFactoryConfig().HasS3ReadActorFactoryConfig() ? Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles() : Params.Config.GetGateways().GetS3().GetAllowLocalFiles())); // This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig + Params.Config.GetReadActorsFactoryConfig().HasS3ReadActorFactoryConfig() ? Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles() : Params.Config.GetGateways().GetS3().GetAllowLocalFiles(), NActors::TActivationContext::ActorSystem())); // This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig } { diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index d10ad0f6bd1b..da4df55f7d54 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1700,6 +1700,7 @@ class TKqpHost : public IKqpHost { state->Gateway = FederatedQuerySetup->HttpGateway; state->GatewayRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()}); state->ExecutorPoolId = AppData()->UserPoolId; + state->ActorSystem = ActorSystem; auto dataSource = NYql::CreateS3DataSource(state); auto dataSink = NYql::CreateS3DataSink(state); diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp index e308a5c65adc..918953ad5b8d 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp @@ -502,7 +502,8 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped PatternType, object.GetPath()}, Nothing(), - AllowLocalFiles); + AllowLocalFiles, + NActors::TActivationContext::ActorSystem()); Fetch(); return true; } diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp index c76fe0577505..2f3bf284c498 100644 --- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp +++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp @@ -239,6 +239,7 @@ class TS3Lister : public IS3Lister { const TMaybe ContinuationToken; const ui64 MaxKeys; const std::pair CurrentLogContextPath; + const NActors::TActorSystem* ActorSystem; }; TS3Lister( @@ -247,7 +248,8 @@ class TS3Lister : public IS3Lister { const TListingRequest& listingRequest, const TMaybe& delimiter, size_t maxFilesPerQuery, - TSharedListingContextPtr sharedCtx) + TSharedListingContextPtr sharedCtx, + NActors::TActorSystem* actorSystem) : MaxFilesPerQuery(maxFilesPerQuery) { Y_ENSURE( listingRequest.Url.substr(0, 7) != "file://", @@ -272,7 +274,8 @@ class TS3Lister : public IS3Lister { delimiter, Nothing(), MaxFilesPerQuery, - NLog::CurrentLogContextPath()}; + NLog::CurrentLogContextPath(), + actorSystem}; YQL_CLOG(TRACE, ProviderS3) << "[TS3Lister] Got URL: '" << ctx.ListingRequest.Url @@ -338,14 +341,10 @@ class TS3Lister : public IS3Lister { retryPolicy); } - static NActors::TActorSystem* GetActorSystem() { - return NActors::TlsActivationContext ? NActors::TActivationContext::ActorSystem() : nullptr; - } - static IHTTPGateway::TOnResult CallbackFactoryMethod(TListingContext&& listingContext) { - return [c = std::move(listingContext), actorSystem = GetActorSystem()](IHTTPGateway::TResult&& result) { - if (actorSystem) { - NDq::TYqlLogScope logScope(actorSystem, NKikimrServices::KQP_YQL, c.CurrentLogContextPath.first, c.CurrentLogContextPath.second); + return [c = std::move(listingContext)](IHTTPGateway::TResult&& result) { + if (c.ActorSystem) { + NDq::TYqlLogScope logScope(c.ActorSystem, NKikimrServices::KQP_YQL, c.CurrentLogContextPath.first, c.CurrentLogContextPath.second); OnDiscovery(c, std::move(result)); } else { /* @@ -468,9 +467,10 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory { using TPtr = std::shared_ptr; explicit TS3ParallelLimitedListerFactory( - size_t maxParallelOps, TSharedListingContextPtr sharedCtx) + size_t maxParallelOps, TSharedListingContextPtr sharedCtx, NActors::TActorSystem* actorSystem) : SharedCtx(std::move(sharedCtx)) - , Semaphore(TAsyncSemaphore::Make(std::max(1, maxParallelOps))) { } + , Semaphore(TAsyncSemaphore::Make(std::max(1, maxParallelOps))) + , ActorSystem(actorSystem) { } TFuture Make( const IHTTPGateway::TPtr& httpGateway, @@ -480,10 +480,10 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory { bool allowLocalFiles) override { auto acquired = Semaphore->AcquireAsync(); return acquired.Apply( - [ctx = SharedCtx, httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles](const auto& f) { + [ctx = SharedCtx, httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles, actorSystem = ActorSystem](const auto& f) { return std::shared_ptr(new TListerLockReleaseWrapper{ NS3Lister::MakeS3Lister( - httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles, ctx), + httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles, actorSystem, ctx), std::make_unique( f.GetValue()->MakeAutoRelease())}); }); @@ -519,6 +519,7 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory { private: TSharedListingContextPtr SharedCtx; const TAsyncSemaphore::TPtr Semaphore; + NActors::TActorSystem* ActorSystem; }; } // namespace @@ -529,10 +530,11 @@ IS3Lister::TPtr MakeS3Lister( const TListingRequest& listingRequest, const TMaybe& delimiter, bool allowLocalFiles, + NActors::TActorSystem* actorSystem, TSharedListingContextPtr sharedCtx) { if (listingRequest.Url.substr(0, 7) != "file://") { return std::make_shared( - httpGateway, retryPolicy, listingRequest, delimiter, 1000, std::move(sharedCtx)); + httpGateway, retryPolicy, listingRequest, delimiter, 1000, std::move(sharedCtx), actorSystem); } if (!allowLocalFiles) { @@ -546,13 +548,14 @@ IS3ListerFactory::TPtr MakeS3ListerFactory( size_t maxParallelOps, size_t callbackThreadCount, size_t callbackPerThreadQueueSize, - size_t regexpCacheSize) { + size_t regexpCacheSize, + NActors::TActorSystem* actorSystem) { std::shared_ptr sharedCtx = nullptr; if (callbackThreadCount != 0 || regexpCacheSize != 0) { sharedCtx = std::make_shared( callbackThreadCount, callbackPerThreadQueueSize, regexpCacheSize); } - return std::make_shared(maxParallelOps, sharedCtx); + return std::make_shared(maxParallelOps, sharedCtx, actorSystem); } } // namespace NYql::NS3Lister diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h index 93fafae19057..3419ec3fd462 100644 --- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h +++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -169,6 +170,7 @@ IS3Lister::TPtr MakeS3Lister( const TListingRequest& listingRequest, const TMaybe& delimiter, bool allowLocalFiles, + NActors::TActorSystem* actorSystem, TSharedListingContextPtr sharedCtx = nullptr); class IS3ListerFactory { @@ -189,7 +191,8 @@ IS3ListerFactory::TPtr MakeS3ListerFactory( size_t maxParallelOps, size_t callbackThreadCount, size_t callbackPerThreadQueueSize, - size_t regexpCacheSize); + size_t regexpCacheSize, + NActors::TActorSystem* actorSystem); } // namespace NS3Lister } // namespace NYql diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp index 2c1ee3313622..bf8e033d32d9 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp @@ -83,7 +83,8 @@ class TS3IODiscoveryTransformer : public TGraphTransformerBase { State_->Configuration->MaxInflightListsPerQuery, State_->Configuration->ListingCallbackThreadCount, State_->Configuration->ListingCallbackPerThreadQueueSize, - State_->Configuration->RegexpCacheSize)) + State_->Configuration->RegexpCacheSize, + State_->ActorSystem)) , ListingStrategy_(MakeS3ListingStrategy( State_->Gateway, State_->GatewayRetryPolicy, diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp index 85707a21f16a..c283c53e8cab 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp @@ -4,8 +4,8 @@ namespace NYql { -TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool allowLocalFiles) { - return [gateway, credentialsFactory, allowLocalFiles] ( +TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool allowLocalFiles, NActors::TActorSystem* actorSystem) { + return [gateway, credentialsFactory, allowLocalFiles, actorSystem] ( const TString& userName, const TString& sessionId, const TGatewaysConfig* gatewaysConfig, @@ -31,6 +31,7 @@ TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway state->Types = typeCtx.Get(); state->FunctionRegistry = functionRegistry; state->CredentialsFactory = credentialsFactory; + state->ActorSystem = actorSystem; if (gatewaysConfig) { state->Configuration->Init(gatewaysConfig->GetS3(), typeCtx); } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_provider.h b/ydb/library/yql/providers/s3/provider/yql_s3_provider.h index 0bcf96290c7a..f5eaa96630c4 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_provider.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_provider.h @@ -32,9 +32,10 @@ struct TS3State : public TThrRefBase IHTTPGateway::TRetryPolicy::TPtr GatewayRetryPolicy = GetHTTPDefaultRetryPolicy(); ui32 ExecutorPoolId = 0; std::list> PrimaryKeys; + NActors::TActorSystem* ActorSystem = nullptr; }; -TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr, bool allowLocalFiles = false); +TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr, bool allowLocalFiles = false, NActors::TActorSystem* actorSystem = nullptr); TIntrusivePtr CreateS3DataSource(TS3State::TPtr state); TIntrusivePtr CreateS3DataSink(TS3State::TPtr state); diff --git a/ydb/library/yql/tools/dqrun/dqrun.cpp b/ydb/library/yql/tools/dqrun/dqrun.cpp index c69af2f6c7cd..f8f6a2a4d0a1 100644 --- a/ydb/library/yql/tools/dqrun/dqrun.cpp +++ b/ydb/library/yql/tools/dqrun/dqrun.cpp @@ -931,7 +931,7 @@ int RunMain(int argc, const char* argv[]) if (!httpGateway) { httpGateway = IHTTPGateway::Make(gatewaysConfig.HasHttpGateway() ? &gatewaysConfig.GetHttpGateway() : nullptr); } - dataProvidersInit.push_back(GetS3DataProviderInitializer(httpGateway, nullptr, true)); + dataProvidersInit.push_back(GetS3DataProviderInitializer(httpGateway, nullptr, true, actorSystemManager->GetActorSystem())); } if (gatewaysConfig.HasPq()) { diff --git a/ydb/tests/fq/s3/test_s3_1.py b/ydb/tests/fq/s3/test_s3_1.py index 20a65d213d9b..0f4260bdf4cc 100644 --- a/ydb/tests/fq/s3/test_s3_1.py +++ b/ydb/tests/fq/s3/test_s3_1.py @@ -482,7 +482,7 @@ def test_huge_source(self, kikimr, s3, client, runtime_listing, unique_prefix): # 1024 x 1024 x 10 = 10 MB of raw data + little overhead for header, eols etc assert sum(kikimr.control_plane.get_metering(1)) == 21 - # it looks like the runtime_listing for v1 doesn't work in case of + # it looks like the runtime_listing for v1 doesn't work in case of # restart of query because the v1 keeps the compiled query in the cache @yq_all @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)