From 062f109fafe8de6572f286fa1f6e8266921b5f43 Mon Sep 17 00:00:00 2001 From: Vitaly Isaev Date: Fri, 13 Sep 2024 06:13:48 +0000 Subject: [PATCH 1/2] YDB FQ: handle exception in YQL Generic Provider when IAM service is not available (#9092) --- .../generic/actors/yql_generic_read_actor.cpp | 72 +++++++++++++++---- .../actors/yql_generic_token_provider.cpp | 72 ++++++++++--------- .../actors/yql_generic_token_provider.h | 5 +- 3 files changed, 100 insertions(+), 49 deletions(-) diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp index 51c02bb40456..fb9c61343980 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp @@ -119,7 +119,14 @@ namespace NYql::NDq { void Bootstrap() { Become(&TGenericReadActor::StateFunc); - InitSplitsListing(); + auto issue = InitSplitsListing(); + if (issue) { + return NotifyComputeActorWithIssue( + TActivationContext::ActorSystem(), + ComputeActorId_, + InputIndex_, + std::move(*issue)); + }; } static constexpr char ActorName[] = "GENERIC_READ_ACTOR"; @@ -139,13 +146,18 @@ namespace NYql::NDq { // ListSplits - void InitSplitsListing() { + TMaybe InitSplitsListing() { YQL_CLOG(DEBUG, ProviderGeneric) << "Start splits listing"; // Prepare request NConnector::NApi::TListSplitsRequest request; NConnector::NApi::TSelect select = Source_.select(); // copy TSelect from source - TokenProvider_->MaybeFillToken(*select.mutable_data_source_instance()); + + auto error = TokenProvider_->MaybeFillToken(*select.mutable_data_source_instance()); + if (error) { + return TIssue(error); + } + *request.mutable_selects()->Add() = std::move(select); // Initialize stream @@ -160,6 +172,8 @@ namespace NYql::NDq { TEvPrivate::TEvListSplitsIterator>( actorSystem, selfId, computeActorId, inputIndex, future); }); + + return Nothing(); } void Handle(TEvPrivate::TEvListSplitsIterator::TPtr& ev) { @@ -205,7 +219,16 @@ namespace NYql::NDq { // Server sent EOF, now we are ready to start splits reading if (NConnector::GrpcStatusEndOfStream(status)) { YQL_CLOG(DEBUG, ProviderGeneric) << "Handle :: EvListSplitsFinished :: last message was reached, start data reading"; - return InitSplitsReading(); + auto issue = InitSplitsReading(); + if (issue) { + return NotifyComputeActorWithIssue( + TActivationContext::ActorSystem(), + ComputeActorId_, + InputIndex_, + std::move(*issue)); + } + + return; } // Server temporary failure @@ -223,13 +246,14 @@ namespace NYql::NDq { } // ReadSplits - void InitSplitsReading() { + TMaybe InitSplitsReading() { YQL_CLOG(DEBUG, ProviderGeneric) << "Start splits reading"; if (Splits_.empty()) { YQL_CLOG(WARN, ProviderGeneric) << "Accumulated empty list of splits"; ReadSplitsFinished_ = true; - return NotifyComputeActorWithData(); + NotifyComputeActorWithData(); + return Nothing(); } // Prepare request @@ -237,13 +261,16 @@ namespace NYql::NDq { request.set_format(NConnector::NApi::TReadSplitsRequest::ARROW_IPC_STREAMING); request.mutable_splits()->Reserve(Splits_.size()); - std::for_each( - Splits_.cbegin(), Splits_.cend(), - [&](const NConnector::NApi::TSplit& split) { - NConnector::NApi::TSplit splitCopy = split; - TokenProvider_->MaybeFillToken(*splitCopy.mutable_select()->mutable_data_source_instance()); - *request.mutable_splits()->Add() = std::move(split); - }); + for (const auto& split : Splits_) { + NConnector::NApi::TSplit splitCopy = split; + + auto error = TokenProvider_->MaybeFillToken(*splitCopy.mutable_select()->mutable_data_source_instance()); + if (error) { + return TIssue(std::move(error)); + } + + *request.mutable_splits()->Add() = std::move(splitCopy); + } // Start streaming Client_->ReadSplits(request).Subscribe( @@ -257,6 +284,8 @@ namespace NYql::NDq { TEvPrivate::TEvReadSplitsIterator>( actorSystem, selfId, computeActorId, inputIndex, future); }); + + return Nothing(); } void Handle(TEvPrivate::TEvReadSplitsIterator::TPtr& ev) { @@ -371,8 +400,8 @@ namespace NYql::NDq { static void NotifyComputeActorWithError( TActorSystem* actorSystem, - const NActors::TActorId computeActorId, - const ui64 inputIndex, + NActors::TActorId computeActorId, + ui64 inputIndex, const NConnector::NApi::TError& error) { actorSystem->Send(computeActorId, new TEvAsyncInputError( @@ -382,6 +411,19 @@ namespace NYql::NDq { return; } + static void NotifyComputeActorWithIssue( + TActorSystem* actorSystem, + NActors::TActorId computeActorId, + ui64 inputIndex, + TIssue issue) { + actorSystem->Send(computeActorId, + new TEvAsyncInputError( + inputIndex, + TIssues{std::move(issue)}, + NDqProto::StatusIds::StatusCode::StatusIds_StatusCode_INTERNAL_ERROR)); + return; + } + i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe&, bool& finished, diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.cpp index e8430b87e9ec..bbb6e1555c5f 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.cpp @@ -1,45 +1,35 @@ #include "yql_generic_token_provider.h" #include +#include namespace NYql::NDq { - TGenericTokenProvider::TGenericTokenProvider( - const NYql::Generic::TSource& source, const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory) - : Source_(source) - , StaticIAMToken_(source.GetToken()) - , CredentialsProvider_(nullptr) + TGenericTokenProvider::TGenericTokenProvider(const TString& staticIamToken) + : StaticIAMToken_(staticIamToken) { - // 1. User has provided IAM-token itself. - // This token will be used during the whole lifetime of a read actor. - if (!StaticIAMToken_.empty()) { - return; - } + } - // 2. User has provided service account creds. - // We create token accessor client that will renew token accessor by demand. - if (source.GetServiceAccountId() && source.GetServiceAccountIdSignature()) { - Y_ENSURE(credentialsFactory, "CredentialsFactory is not initialized"); + TGenericTokenProvider::TGenericTokenProvider( + const TString& serviceAccountId, const TString& ServiceAccountIdSignature, + const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory) { + Y_ENSURE(!serviceAccountId.Empty(), "No service account provided"); + Y_ENSURE(!ServiceAccountIdSignature.Empty(), "No service account signature provided"); + Y_ENSURE(credentialsFactory, "CredentialsFactory is not initialized"); - auto structuredTokenJSON = - TStructuredTokenBuilder() - .SetServiceAccountIdAuth(source.GetServiceAccountId(), source.GetServiceAccountIdSignature()) - .ToJson(); + auto structuredTokenJSON = + TStructuredTokenBuilder().SetServiceAccountIdAuth(serviceAccountId, ServiceAccountIdSignature).ToJson(); - // If service account is provided, obtain IAM-token - Y_ENSURE(structuredTokenJSON, "empty structured token"); + Y_ENSURE(structuredTokenJSON, "empty structured token"); - auto credentialsProviderFactory = - CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, structuredTokenJSON, false); - CredentialsProvider_ = credentialsProviderFactory->CreateProvider(); - } - - // 3. If we reached this point, it means that user doesn't need token auth. + auto credentialsProviderFactory = + CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, structuredTokenJSON, false); + CredentialsProvider_ = credentialsProviderFactory->CreateProvider(); } - void TGenericTokenProvider::MaybeFillToken(NConnector::NApi::TDataSourceInstance& dsi) const { + TString TGenericTokenProvider::MaybeFillToken(NConnector::NApi::TDataSourceInstance& dsi) const { // 1. Don't need tokens if basic auth is set if (dsi.credentials().has_basic()) { - return; + return {}; } *dsi.mutable_credentials()->mutable_token()->mutable_type() = "IAM"; @@ -47,21 +37,37 @@ namespace NYql::NDq { // 2. If static IAM-token has been provided, use it if (!StaticIAMToken_.empty()) { *dsi.mutable_credentials()->mutable_token()->mutable_value() = StaticIAMToken_; - return; + return {}; } // 3. Otherwise use credentials provider to get token Y_ENSURE(CredentialsProvider_, "CredentialsProvider is not initialized"); - auto iamToken = CredentialsProvider_->GetAuthInfo(); + TString iamToken; + try { + iamToken = CredentialsProvider_->GetAuthInfo(); + } catch (const std::exception& e) { + YQL_CLOG(ERROR, ProviderGeneric) << "MaybeFillToken: " << e.what(); + return TString(e.what()); + } + Y_ENSURE(iamToken, "CredentialsProvider returned empty IAM token"); *dsi.mutable_credentials()->mutable_token()->mutable_value() = std::move(iamToken); + return {}; } TGenericTokenProvider::TPtr - CreateGenericTokenProvider(const NYql::Generic::TSource& source, + CreateGenericTokenProvider(const TString& staticIamToken, const TString& serviceAccountId, + const TString& serviceAccountIdSignature, const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory) { - return std::make_unique(source, credentialsFactory); + if (!staticIamToken.Empty()) { + return std::make_unique(staticIamToken); + } + if (!serviceAccountId.Empty()) { + return std::make_unique(serviceAccountId, serviceAccountIdSignature, + credentialsFactory); + } + return std::make_unique(); } -} //namespace NYql::NDq +} // namespace NYql::NDq diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.h b/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.h index 495a44c15e57..34acbc0cf670 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.h +++ b/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.h @@ -3,6 +3,7 @@ #include #include #include +#include namespace NYql::NDq { // When accessing external data sources using authentication via tokens, @@ -16,7 +17,9 @@ namespace NYql::NDq { TGenericTokenProvider(const NYql::Generic::TSource& source, const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory); - void MaybeFillToken(NConnector::NApi::TDataSourceInstance& dsi) const; + // MaybeFillToken sets IAM-token within DataSourceInstance. + // Returns string containing error, if it happened. + TString MaybeFillToken(NConnector::NApi::TDataSourceInstance& dsi) const; private: NYql::Generic::TSource Source_; From d29163cfbb46f29ff680ddb93e791bca146f88f2 Mon Sep 17 00:00:00 2001 From: Vitaly Isaev Date: Fri, 13 Sep 2024 06:50:22 +0000 Subject: [PATCH 2/2] Fix missing code --- .../generic/actors/yql_generic_read_actor.cpp | 5 ++++- .../actors/yql_generic_token_provider.h | 18 +++++++++++------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp index fb9c61343980..c1abf504ed5c 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp @@ -588,7 +588,10 @@ namespace NYql::NDq { part << ';'; */ - auto tokenProvider = CreateGenericTokenProvider(source, credentialsFactory); + auto tokenProvider = CreateGenericTokenProvider( + source.GetToken(), + source.GetServiceAccountId(), source.GetServiceAccountIdSignature(), + credentialsFactory); const auto actor = new TGenericReadActor( inputIndex, diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.h b/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.h index 34acbc0cf670..c656e3a38daf 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.h +++ b/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.h @@ -13,21 +13,25 @@ namespace NYql::NDq { class TGenericTokenProvider { public: using TPtr = std::unique_ptr; - - TGenericTokenProvider(const NYql::Generic::TSource& source, - const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory); + TGenericTokenProvider() = default; // No auth required + TGenericTokenProvider(const TString& staticIamToken); + TGenericTokenProvider( + const TString& serviceAccountId, + const TString& ServiceAccountIdSignature, + const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory); // MaybeFillToken sets IAM-token within DataSourceInstance. // Returns string containing error, if it happened. TString MaybeFillToken(NConnector::NApi::TDataSourceInstance& dsi) const; private: - NYql::Generic::TSource Source_; TString StaticIAMToken_; NYdb::TCredentialsProviderPtr CredentialsProvider_; }; TGenericTokenProvider::TPtr - CreateGenericTokenProvider(const NYql::Generic::TSource& source, - const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory); -} //namespace NYql::NDq + CreateGenericTokenProvider( + const TString& staticIamToken, + const TString& serviceAccountId, const TString& ServiceAccountIdSignature, + const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory); +} // namespace NYql::NDq