Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YDB FQ: handle exception in YQL Generic Provider when IAM service is not available #9186

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,14 @@ namespace NYql::NDq {

void Bootstrap() {
Become(&TGenericReadActor::StateFunc);
InitSplitsListing();
auto issue = InitSplitsListing();
if (issue) {
return NotifyComputeActorWithIssue(
TActivationContext::ActorSystem(),
ComputeActorId_,
InputIndex_,
std::move(*issue));
};
}

static constexpr char ActorName[] = "GENERIC_READ_ACTOR";
Expand All @@ -139,13 +146,18 @@ namespace NYql::NDq {

// ListSplits

void InitSplitsListing() {
TMaybe<TIssue> InitSplitsListing() {
YQL_CLOG(DEBUG, ProviderGeneric) << "Start splits listing";

// Prepare request
NConnector::NApi::TListSplitsRequest request;
NConnector::NApi::TSelect select = Source_.select(); // copy TSelect from source
TokenProvider_->MaybeFillToken(*select.mutable_data_source_instance());

auto error = TokenProvider_->MaybeFillToken(*select.mutable_data_source_instance());
if (error) {
return TIssue(error);
}

*request.mutable_selects()->Add() = std::move(select);

// Initialize stream
Expand All @@ -160,6 +172,8 @@ namespace NYql::NDq {
TEvPrivate::TEvListSplitsIterator>(
actorSystem, selfId, computeActorId, inputIndex, future);
});

return Nothing();
}

void Handle(TEvPrivate::TEvListSplitsIterator::TPtr& ev) {
Expand Down Expand Up @@ -205,7 +219,16 @@ namespace NYql::NDq {
// Server sent EOF, now we are ready to start splits reading
if (NConnector::GrpcStatusEndOfStream(status)) {
YQL_CLOG(DEBUG, ProviderGeneric) << "Handle :: EvListSplitsFinished :: last message was reached, start data reading";
return InitSplitsReading();
auto issue = InitSplitsReading();
if (issue) {
return NotifyComputeActorWithIssue(
TActivationContext::ActorSystem(),
ComputeActorId_,
InputIndex_,
std::move(*issue));
}

return;
}

// Server temporary failure
Expand All @@ -223,27 +246,31 @@ namespace NYql::NDq {
}

// ReadSplits
void InitSplitsReading() {
TMaybe<TIssue> InitSplitsReading() {
YQL_CLOG(DEBUG, ProviderGeneric) << "Start splits reading";

if (Splits_.empty()) {
YQL_CLOG(WARN, ProviderGeneric) << "Accumulated empty list of splits";
ReadSplitsFinished_ = true;
return NotifyComputeActorWithData();
NotifyComputeActorWithData();
return Nothing();
}

// Prepare request
NConnector::NApi::TReadSplitsRequest request;
request.set_format(NConnector::NApi::TReadSplitsRequest::ARROW_IPC_STREAMING);
request.mutable_splits()->Reserve(Splits_.size());

std::for_each(
Splits_.cbegin(), Splits_.cend(),
[&](const NConnector::NApi::TSplit& split) {
NConnector::NApi::TSplit splitCopy = split;
TokenProvider_->MaybeFillToken(*splitCopy.mutable_select()->mutable_data_source_instance());
*request.mutable_splits()->Add() = std::move(split);
});
for (const auto& split : Splits_) {
NConnector::NApi::TSplit splitCopy = split;

auto error = TokenProvider_->MaybeFillToken(*splitCopy.mutable_select()->mutable_data_source_instance());
if (error) {
return TIssue(std::move(error));
}

*request.mutable_splits()->Add() = std::move(splitCopy);
}

// Start streaming
Client_->ReadSplits(request).Subscribe(
Expand All @@ -257,6 +284,8 @@ namespace NYql::NDq {
TEvPrivate::TEvReadSplitsIterator>(
actorSystem, selfId, computeActorId, inputIndex, future);
});

return Nothing();
}

void Handle(TEvPrivate::TEvReadSplitsIterator::TPtr& ev) {
Expand Down Expand Up @@ -371,8 +400,8 @@ namespace NYql::NDq {

static void NotifyComputeActorWithError(
TActorSystem* actorSystem,
const NActors::TActorId computeActorId,
const ui64 inputIndex,
NActors::TActorId computeActorId,
ui64 inputIndex,
const NConnector::NApi::TError& error) {
actorSystem->Send(computeActorId,
new TEvAsyncInputError(
Expand All @@ -382,6 +411,19 @@ namespace NYql::NDq {
return;
}

static void NotifyComputeActorWithIssue(
TActorSystem* actorSystem,
NActors::TActorId computeActorId,
ui64 inputIndex,
TIssue issue) {
actorSystem->Send(computeActorId,
new TEvAsyncInputError(
inputIndex,
TIssues{std::move(issue)},
NDqProto::StatusIds::StatusCode::StatusIds_StatusCode_INTERNAL_ERROR));
return;
}

i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer,
TMaybe<TInstant>&,
bool& finished,
Expand Down Expand Up @@ -546,7 +588,10 @@ namespace NYql::NDq {
part << ';';
*/

auto tokenProvider = CreateGenericTokenProvider(source, credentialsFactory);
auto tokenProvider = CreateGenericTokenProvider(
source.GetToken(),
source.GetServiceAccountId(), source.GetServiceAccountIdSignature(),
credentialsFactory);

const auto actor = new TGenericReadActor(
inputIndex,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,67 +1,73 @@
#include "yql_generic_token_provider.h"

#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
#include <ydb/library/yql/utils/log/log.h>

namespace NYql::NDq {
TGenericTokenProvider::TGenericTokenProvider(
const NYql::Generic::TSource& source, const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory)
: Source_(source)
, StaticIAMToken_(source.GetToken())
, CredentialsProvider_(nullptr)
TGenericTokenProvider::TGenericTokenProvider(const TString& staticIamToken)
: StaticIAMToken_(staticIamToken)
{
// 1. User has provided IAM-token itself.
// This token will be used during the whole lifetime of a read actor.
if (!StaticIAMToken_.empty()) {
return;
}
}

// 2. User has provided service account creds.
// We create token accessor client that will renew token accessor by demand.
if (source.GetServiceAccountId() && source.GetServiceAccountIdSignature()) {
Y_ENSURE(credentialsFactory, "CredentialsFactory is not initialized");
TGenericTokenProvider::TGenericTokenProvider(
const TString& serviceAccountId, const TString& ServiceAccountIdSignature,
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory) {
Y_ENSURE(!serviceAccountId.Empty(), "No service account provided");
Y_ENSURE(!ServiceAccountIdSignature.Empty(), "No service account signature provided");
Y_ENSURE(credentialsFactory, "CredentialsFactory is not initialized");

auto structuredTokenJSON =
TStructuredTokenBuilder()
.SetServiceAccountIdAuth(source.GetServiceAccountId(), source.GetServiceAccountIdSignature())
.ToJson();
auto structuredTokenJSON =
TStructuredTokenBuilder().SetServiceAccountIdAuth(serviceAccountId, ServiceAccountIdSignature).ToJson();

// If service account is provided, obtain IAM-token
Y_ENSURE(structuredTokenJSON, "empty structured token");
Y_ENSURE(structuredTokenJSON, "empty structured token");

auto credentialsProviderFactory =
CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, structuredTokenJSON, false);
CredentialsProvider_ = credentialsProviderFactory->CreateProvider();
}

// 3. If we reached this point, it means that user doesn't need token auth.
auto credentialsProviderFactory =
CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, structuredTokenJSON, false);
CredentialsProvider_ = credentialsProviderFactory->CreateProvider();
}

void TGenericTokenProvider::MaybeFillToken(NConnector::NApi::TDataSourceInstance& dsi) const {
TString TGenericTokenProvider::MaybeFillToken(NConnector::NApi::TDataSourceInstance& dsi) const {
// 1. Don't need tokens if basic auth is set
if (dsi.credentials().has_basic()) {
return;
return {};
}

*dsi.mutable_credentials()->mutable_token()->mutable_type() = "IAM";

// 2. If static IAM-token has been provided, use it
if (!StaticIAMToken_.empty()) {
*dsi.mutable_credentials()->mutable_token()->mutable_value() = StaticIAMToken_;
return;
return {};
}

// 3. Otherwise use credentials provider to get token
Y_ENSURE(CredentialsProvider_, "CredentialsProvider is not initialized");

auto iamToken = CredentialsProvider_->GetAuthInfo();
TString iamToken;
try {
iamToken = CredentialsProvider_->GetAuthInfo();
} catch (const std::exception& e) {
YQL_CLOG(ERROR, ProviderGeneric) << "MaybeFillToken: " << e.what();
return TString(e.what());
}

Y_ENSURE(iamToken, "CredentialsProvider returned empty IAM token");

*dsi.mutable_credentials()->mutable_token()->mutable_value() = std::move(iamToken);
return {};
}

TGenericTokenProvider::TPtr
CreateGenericTokenProvider(const NYql::Generic::TSource& source,
CreateGenericTokenProvider(const TString& staticIamToken, const TString& serviceAccountId,
const TString& serviceAccountIdSignature,
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory) {
return std::make_unique<TGenericTokenProvider>(source, credentialsFactory);
if (!staticIamToken.Empty()) {
return std::make_unique<TGenericTokenProvider>(staticIamToken);
}
if (!serviceAccountId.Empty()) {
return std::make_unique<TGenericTokenProvider>(serviceAccountId, serviceAccountIdSignature,
credentialsFactory);
}
return std::make_unique<TGenericTokenProvider>();
}
} //namespace NYql::NDq
} // namespace NYql::NDq
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h>
#include <ydb/library/yql/providers/generic/proto/source.pb.h>
#include <ydb/library/yql/public/issue/yql_issue.h>

namespace NYql::NDq {
// When accessing external data sources using authentication via tokens,
Expand All @@ -12,19 +13,25 @@ namespace NYql::NDq {
class TGenericTokenProvider {
public:
using TPtr = std::unique_ptr<TGenericTokenProvider>;
TGenericTokenProvider() = default; // No auth required
TGenericTokenProvider(const TString& staticIamToken);
TGenericTokenProvider(
const TString& serviceAccountId,
const TString& ServiceAccountIdSignature,
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory);

TGenericTokenProvider(const NYql::Generic::TSource& source,
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory);

void MaybeFillToken(NConnector::NApi::TDataSourceInstance& dsi) const;
// MaybeFillToken sets IAM-token within DataSourceInstance.
// Returns string containing error, if it happened.
TString MaybeFillToken(NConnector::NApi::TDataSourceInstance& dsi) const;

private:
NYql::Generic::TSource Source_;
TString StaticIAMToken_;
NYdb::TCredentialsProviderPtr CredentialsProvider_;
};

TGenericTokenProvider::TPtr
CreateGenericTokenProvider(const NYql::Generic::TSource& source,
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory);
} //namespace NYql::NDq
CreateGenericTokenProvider(
const TString& staticIamToken,
const TString& serviceAccountId, const TString& ServiceAccountIdSignature,
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory);
} // namespace NYql::NDq
Loading