Skip to content

Commit

Permalink
YDB FQ: handle exception in YQL Generic Provider when IAM service is …
Browse files Browse the repository at this point in the history
…not available (#9186)
  • Loading branch information
vitalyisaev2 authored Sep 13, 2024
1 parent 678534f commit 592841b
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 57 deletions.
77 changes: 61 additions & 16 deletions ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,14 @@ namespace NYql::NDq {

void Bootstrap() {
Become(&TGenericReadActor::StateFunc);
InitSplitsListing();
auto issue = InitSplitsListing();
if (issue) {
return NotifyComputeActorWithIssue(
TActivationContext::ActorSystem(),
ComputeActorId_,
InputIndex_,
std::move(*issue));
};
}

static constexpr char ActorName[] = "GENERIC_READ_ACTOR";
Expand All @@ -139,13 +146,18 @@ namespace NYql::NDq {

// ListSplits

void InitSplitsListing() {
TMaybe<TIssue> InitSplitsListing() {
YQL_CLOG(DEBUG, ProviderGeneric) << "Start splits listing";

// Prepare request
NConnector::NApi::TListSplitsRequest request;
NConnector::NApi::TSelect select = Source_.select(); // copy TSelect from source
TokenProvider_->MaybeFillToken(*select.mutable_data_source_instance());

auto error = TokenProvider_->MaybeFillToken(*select.mutable_data_source_instance());
if (error) {
return TIssue(error);
}

*request.mutable_selects()->Add() = std::move(select);

// Initialize stream
Expand All @@ -160,6 +172,8 @@ namespace NYql::NDq {
TEvPrivate::TEvListSplitsIterator>(
actorSystem, selfId, computeActorId, inputIndex, future);
});

return Nothing();
}

void Handle(TEvPrivate::TEvListSplitsIterator::TPtr& ev) {
Expand Down Expand Up @@ -205,7 +219,16 @@ namespace NYql::NDq {
// Server sent EOF, now we are ready to start splits reading
if (NConnector::GrpcStatusEndOfStream(status)) {
YQL_CLOG(DEBUG, ProviderGeneric) << "Handle :: EvListSplitsFinished :: last message was reached, start data reading";
return InitSplitsReading();
auto issue = InitSplitsReading();
if (issue) {
return NotifyComputeActorWithIssue(
TActivationContext::ActorSystem(),
ComputeActorId_,
InputIndex_,
std::move(*issue));
}

return;
}

// Server temporary failure
Expand All @@ -223,27 +246,31 @@ namespace NYql::NDq {
}

// ReadSplits
void InitSplitsReading() {
TMaybe<TIssue> InitSplitsReading() {
YQL_CLOG(DEBUG, ProviderGeneric) << "Start splits reading";

if (Splits_.empty()) {
YQL_CLOG(WARN, ProviderGeneric) << "Accumulated empty list of splits";
ReadSplitsFinished_ = true;
return NotifyComputeActorWithData();
NotifyComputeActorWithData();
return Nothing();
}

// Prepare request
NConnector::NApi::TReadSplitsRequest request;
request.set_format(NConnector::NApi::TReadSplitsRequest::ARROW_IPC_STREAMING);
request.mutable_splits()->Reserve(Splits_.size());

std::for_each(
Splits_.cbegin(), Splits_.cend(),
[&](const NConnector::NApi::TSplit& split) {
NConnector::NApi::TSplit splitCopy = split;
TokenProvider_->MaybeFillToken(*splitCopy.mutable_select()->mutable_data_source_instance());
*request.mutable_splits()->Add() = std::move(split);
});
for (const auto& split : Splits_) {
NConnector::NApi::TSplit splitCopy = split;

auto error = TokenProvider_->MaybeFillToken(*splitCopy.mutable_select()->mutable_data_source_instance());
if (error) {
return TIssue(std::move(error));
}

*request.mutable_splits()->Add() = std::move(splitCopy);
}

// Start streaming
Client_->ReadSplits(request).Subscribe(
Expand All @@ -257,6 +284,8 @@ namespace NYql::NDq {
TEvPrivate::TEvReadSplitsIterator>(
actorSystem, selfId, computeActorId, inputIndex, future);
});

return Nothing();
}

void Handle(TEvPrivate::TEvReadSplitsIterator::TPtr& ev) {
Expand Down Expand Up @@ -371,8 +400,8 @@ namespace NYql::NDq {

static void NotifyComputeActorWithError(
TActorSystem* actorSystem,
const NActors::TActorId computeActorId,
const ui64 inputIndex,
NActors::TActorId computeActorId,
ui64 inputIndex,
const NConnector::NApi::TError& error) {
actorSystem->Send(computeActorId,
new TEvAsyncInputError(
Expand All @@ -382,6 +411,19 @@ namespace NYql::NDq {
return;
}

static void NotifyComputeActorWithIssue(
TActorSystem* actorSystem,
NActors::TActorId computeActorId,
ui64 inputIndex,
TIssue issue) {
actorSystem->Send(computeActorId,
new TEvAsyncInputError(
inputIndex,
TIssues{std::move(issue)},
NDqProto::StatusIds::StatusCode::StatusIds_StatusCode_INTERNAL_ERROR));
return;
}

i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer,
TMaybe<TInstant>&,
bool& finished,
Expand Down Expand Up @@ -546,7 +588,10 @@ namespace NYql::NDq {
part << ';';
*/

auto tokenProvider = CreateGenericTokenProvider(source, credentialsFactory);
auto tokenProvider = CreateGenericTokenProvider(
source.GetToken(),
source.GetServiceAccountId(), source.GetServiceAccountIdSignature(),
credentialsFactory);

const auto actor = new TGenericReadActor(
inputIndex,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,67 +1,73 @@
#include "yql_generic_token_provider.h"

#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
#include <ydb/library/yql/utils/log/log.h>

namespace NYql::NDq {
TGenericTokenProvider::TGenericTokenProvider(
const NYql::Generic::TSource& source, const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory)
: Source_(source)
, StaticIAMToken_(source.GetToken())
, CredentialsProvider_(nullptr)
TGenericTokenProvider::TGenericTokenProvider(const TString& staticIamToken)
: StaticIAMToken_(staticIamToken)
{
// 1. User has provided IAM-token itself.
// This token will be used during the whole lifetime of a read actor.
if (!StaticIAMToken_.empty()) {
return;
}
}

// 2. User has provided service account creds.
// We create token accessor client that will renew token accessor by demand.
if (source.GetServiceAccountId() && source.GetServiceAccountIdSignature()) {
Y_ENSURE(credentialsFactory, "CredentialsFactory is not initialized");
TGenericTokenProvider::TGenericTokenProvider(
const TString& serviceAccountId, const TString& ServiceAccountIdSignature,
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory) {
Y_ENSURE(!serviceAccountId.Empty(), "No service account provided");
Y_ENSURE(!ServiceAccountIdSignature.Empty(), "No service account signature provided");
Y_ENSURE(credentialsFactory, "CredentialsFactory is not initialized");

auto structuredTokenJSON =
TStructuredTokenBuilder()
.SetServiceAccountIdAuth(source.GetServiceAccountId(), source.GetServiceAccountIdSignature())
.ToJson();
auto structuredTokenJSON =
TStructuredTokenBuilder().SetServiceAccountIdAuth(serviceAccountId, ServiceAccountIdSignature).ToJson();

// If service account is provided, obtain IAM-token
Y_ENSURE(structuredTokenJSON, "empty structured token");
Y_ENSURE(structuredTokenJSON, "empty structured token");

auto credentialsProviderFactory =
CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, structuredTokenJSON, false);
CredentialsProvider_ = credentialsProviderFactory->CreateProvider();
}

// 3. If we reached this point, it means that user doesn't need token auth.
auto credentialsProviderFactory =
CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, structuredTokenJSON, false);
CredentialsProvider_ = credentialsProviderFactory->CreateProvider();
}

void TGenericTokenProvider::MaybeFillToken(NConnector::NApi::TDataSourceInstance& dsi) const {
TString TGenericTokenProvider::MaybeFillToken(NConnector::NApi::TDataSourceInstance& dsi) const {
// 1. Don't need tokens if basic auth is set
if (dsi.credentials().has_basic()) {
return;
return {};
}

*dsi.mutable_credentials()->mutable_token()->mutable_type() = "IAM";

// 2. If static IAM-token has been provided, use it
if (!StaticIAMToken_.empty()) {
*dsi.mutable_credentials()->mutable_token()->mutable_value() = StaticIAMToken_;
return;
return {};
}

// 3. Otherwise use credentials provider to get token
Y_ENSURE(CredentialsProvider_, "CredentialsProvider is not initialized");

auto iamToken = CredentialsProvider_->GetAuthInfo();
TString iamToken;
try {
iamToken = CredentialsProvider_->GetAuthInfo();
} catch (const std::exception& e) {
YQL_CLOG(ERROR, ProviderGeneric) << "MaybeFillToken: " << e.what();
return TString(e.what());
}

Y_ENSURE(iamToken, "CredentialsProvider returned empty IAM token");

*dsi.mutable_credentials()->mutable_token()->mutable_value() = std::move(iamToken);
return {};
}

TGenericTokenProvider::TPtr
CreateGenericTokenProvider(const NYql::Generic::TSource& source,
CreateGenericTokenProvider(const TString& staticIamToken, const TString& serviceAccountId,
const TString& serviceAccountIdSignature,
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory) {
return std::make_unique<TGenericTokenProvider>(source, credentialsFactory);
if (!staticIamToken.Empty()) {
return std::make_unique<TGenericTokenProvider>(staticIamToken);
}
if (!serviceAccountId.Empty()) {
return std::make_unique<TGenericTokenProvider>(serviceAccountId, serviceAccountIdSignature,
credentialsFactory);
}
return std::make_unique<TGenericTokenProvider>();
}
} //namespace NYql::NDq
} // namespace NYql::NDq
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h>
#include <ydb/library/yql/providers/generic/proto/source.pb.h>
#include <ydb/library/yql/public/issue/yql_issue.h>

namespace NYql::NDq {
// When accessing external data sources using authentication via tokens,
Expand All @@ -12,19 +13,25 @@ namespace NYql::NDq {
class TGenericTokenProvider {
public:
using TPtr = std::unique_ptr<TGenericTokenProvider>;
TGenericTokenProvider() = default; // No auth required
TGenericTokenProvider(const TString& staticIamToken);
TGenericTokenProvider(
const TString& serviceAccountId,
const TString& ServiceAccountIdSignature,
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory);

TGenericTokenProvider(const NYql::Generic::TSource& source,
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory);

void MaybeFillToken(NConnector::NApi::TDataSourceInstance& dsi) const;
// MaybeFillToken sets IAM-token within DataSourceInstance.
// Returns string containing error, if it happened.
TString MaybeFillToken(NConnector::NApi::TDataSourceInstance& dsi) const;

private:
NYql::Generic::TSource Source_;
TString StaticIAMToken_;
NYdb::TCredentialsProviderPtr CredentialsProvider_;
};

TGenericTokenProvider::TPtr
CreateGenericTokenProvider(const NYql::Generic::TSource& source,
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory);
} //namespace NYql::NDq
CreateGenericTokenProvider(
const TString& staticIamToken,
const TString& serviceAccountId, const TString& ServiceAccountIdSignature,
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory);
} // namespace NYql::NDq

0 comments on commit 592841b

Please sign in to comment.