Skip to content

Commit

Permalink
Review fixes pt. 1
Browse files Browse the repository at this point in the history
  • Loading branch information
vitalyisaev2 committed Feb 29, 2024
1 parent 16aa7a9 commit 8616d9d
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 48 deletions.
6 changes: 0 additions & 6 deletions ydb/core/fq/libs/actors/run_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
#include <ydb/library/yql/providers/pq/task_meta/task_meta.h>
#include <ydb/library/yql/providers/s3/provider/yql_s3_provider.h>
#include <ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h>
#include <ydb/library/yql/providers/solomon/gateway/yql_solomon_gateway.h>
#include <ydb/library/yql/providers/solomon/provider/yql_solomon_provider.h>
#include <ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.h>
Expand Down Expand Up @@ -1939,11 +1938,6 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
dataProvidersInit.push_back(GetDqDataProviderInitializer(&CreateDqExecTransformer, NFq::CreateEmptyGateway(SelfId()), Params.DqCompFactory, {}, nullptr));
}

// FIXME: remove YDB provider support?
{
dataProvidersInit.push_back(GetYdbDataProviderInitializer(Params.YqSharedResources->UserSpaceYdbDriver, Params.CredentialsFactory, dbResolver));
}

{
dataProvidersInit.push_back(GetGenericDataProviderInitializer(Params.ConnectorClient, dbResolver, Params.CredentialsFactory));
}
Expand Down
1 change: 0 additions & 1 deletion ydb/core/fq/libs/actors/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ PEERDIR(
ydb/library/yql/providers/pq/provider
ydb/library/yql/providers/pq/task_meta
ydb/library/yql/providers/s3/provider
ydb/library/yql/providers/ydb/provider
ydb/library/yql/public/issue
ydb/library/yql/public/issue/protos
ydb/library/yql/sql/settings
Expand Down
4 changes: 2 additions & 2 deletions ydb/library/yql/providers/common/proto/gateways_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -620,8 +620,8 @@ message TGenericGatewayConfig {
/////////////////////////////// Db Resolver ///////////////////////////////////

message TDbResolverConfig {
// Ydb / Yds mvp endpoint, for example:
// https://ydbc.ydb.cloud.yandex.net:8789/ydbc/cloud-prod/
// Ydb / Yds MVP endpoint. Expected format:
// [http|https]://host:port/ydbc/cloud-prod/
optional string YdbMvpEndpoint = 2;
}

Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/generic/actors/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ PEERDIR(
ydb/library/yql/providers/generic/proto
ydb/library/yql/public/types
ydb/library/yql/providers/generic/connector/libcpp
ydb/public/sdk/cpp/client/ydb_types/credentials
)

YQL_LAST_ABI_VERSION()
Expand Down
56 changes: 37 additions & 19 deletions ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <ydb/library/yql/public/udf/arrow/util.h>
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/library/yql/utils/yql_panic.h>
#include <ydb/public/sdk/cpp/client/ydb_types/credentials/credentials.h>

namespace NYql::NDq {

Expand Down Expand Up @@ -103,14 +104,14 @@ namespace NYql::NDq {
ui64 inputIndex,
TCollectStatsLevel statsLevel,
NConnector::IClient::TPtr client,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
NYdb::TCredentialsProviderPtr credentialsProvider,
NConnector::TSource&& source,
const NActors::TActorId& computeActorId,
const NKikimr::NMiniKQL::THolderFactory& holderFactory)
: InputIndex_(inputIndex)
, ComputeActorId_(computeActorId)
, Client_(std::move(client))
, CredentialsFactory_(std::move(credentialsFactory))
, CredentialsProvider_(std::move(credentialsProvider))
, HolderFactory_(holderFactory)
, Source_(source)
{
Expand Down Expand Up @@ -464,21 +465,8 @@ namespace NYql::NDq {
}

// Token may have expired. Refresh it.
Y_ENSURE(CredentialsFactory_, "CredentialsFactory is not initialized");

auto structuredTokenJSON = TStructuredTokenBuilder().SetServiceAccountIdAuth(
Source_.GetServiceAccountId(),
Source_.GetServiceAccountIdSignature())
.ToJson();

// If service account is provided, obtain IAM-token
Y_ENSURE(structuredTokenJSON, "empty structured token");

auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(
CredentialsFactory_,
structuredTokenJSON,
false);
auto iamToken = credentialsProviderFactory->CreateProvider()->GetAuthInfo();
Y_ENSURE(CredentialsProvider_, "CredentialsProvider is not initialized");
auto iamToken = CredentialsProvider_->GetAuthInfo();
Y_ENSURE(iamToken, "empty IAM token");

*dsi->mutable_credentials()->mutable_token()->mutable_value() = iamToken;
Expand Down Expand Up @@ -517,7 +505,7 @@ namespace NYql::NDq {
const NActors::TActorId ComputeActorId_;

NConnector::IClient::TPtr Client_;
ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory_;
NYdb::TCredentialsProviderPtr CredentialsProvider_;
NConnector::IListSplitsStreamIterator::TPtr ListSplitsIterator_;
TVector<NConnector::NApi::TSplit> Splits_; // accumulated list of table splits
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator_;
Expand Down Expand Up @@ -548,6 +536,36 @@ namespace NYql::NDq {
<< ", use_tls=" << ToString(dsi.use_tls())
<< ", protocol=" << NYql::NConnector::NApi::EProtocol_Name(dsi.protocol());

// FIXME: strange piece of logic - authToken is created but not used:
// https://a.yandex-team.ru/arcadia/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp?rev=r11550199#L140
/*
const auto token = secureParams.Value(params.token(), TString{});
const auto credentialsProviderFactory =
CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token);
const auto authToken = credentialsProviderFactory->CreateProvider()->GetAuthInfo();
const auto one = token.find('#'), two = token.rfind('#');
YQL_ENSURE(one != TString::npos && two != TString::npos && one < two, "Bad token format:" << token);
*/

// Obtain token to access remote data source if necessary
NYdb::TCredentialsProviderPtr credentialProvider;
if (source.GetServiceAccountId() && source.GetServiceAccountIdSignature()) {
Y_ENSURE(credentialsFactory, "CredentialsFactory is not initialized");

auto structuredTokenJSON = TStructuredTokenBuilder().SetServiceAccountIdAuth(
source.GetServiceAccountId(), source.GetServiceAccountIdSignature())
.ToJson();

// If service account is provided, obtain IAM-token
Y_ENSURE(structuredTokenJSON, "empty structured token");

auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(
credentialsFactory,
structuredTokenJSON,
false);
credentialProvider = credentialsProviderFactory->CreateProvider();
}

// TODO: partitioning is not implemented now, but this code will be useful for the further research:
/*
TStringBuilder part;
Expand All @@ -565,7 +583,7 @@ namespace NYql::NDq {
inputIndex,
statsLevel,
genericClient,
credentialsFactory,
std::move(credentialProvider),
std::move(source),
computeActorId,
holderFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ namespace NYql::NConnector::NTest {

MATCHER_P(ProtobufRequestMatcher, expected, "request does not match") {
return google::protobuf::util::MessageDifferencer::Equals(arg, expected);
return google::protobuf::util::MessageDifferencer::Equals(arg, expected);
}

#define MATCH_RESULT_WITH_INPUT(INPUT, RESULT_SET, GETTER) \
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/generic/provider/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ PEERDIR(
ydb/library/yql/providers/generic/connector/api/common
ydb/library/yql/providers/generic/connector/libcpp
ydb/library/yql/utils/plan
ydb/public/sdk/cpp/client/ydb_types/credentials
)

END()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,14 +362,14 @@ namespace NYql {
return ValidationError(
clusterConfig,
context,
"For YDB clusters you must set either database name or database id");
"For YDB clusters you must set either database name or database id, but you have set both of them");
}

if (!clusterConfig.HasDatabaseName() && !clusterConfig.HasDatabaseId()) {
return ValidationError(
clusterConfig,
context,
"For YDB clusters you must set either database name or database id");
"For YDB clusters you must set either database name or database id, but you have set none of them");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,12 @@ namespace NYql {
}
}

// copy service account ids to enable work with tokens during runtime phase
source.SetServiceAccountId(clusterConfig.GetServiceAccountId());
source.SetServiceAccountIdSignature(clusterConfig.GetServiceAccountIdSignature());
// Managed YDB supports access via IAM token.
// Copy service account ids to obtain tokens during request execution phase.
if (clusterConfig.kind() == NConnector::NApi::EDataSourceKind::YDB) {
source.SetServiceAccountId(clusterConfig.GetServiceAccountId());
source.SetServiceAccountIdSignature(clusterConfig.GetServiceAccountIdSignature());
}

// preserve source description for read actor
protoSettings.PackFrom(source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,20 @@ namespace NYql {
for (const auto& r : reads) {
const TGenRead read(r);
if (!read.FreeArgs().Get(2).Ref().IsCallable("MrTableConcat")) {
ctx.AddError(TIssue(ctx.GetPosition(read.FreeArgs().Get(0).Pos()), TStringBuilder() << "Expected key"));
ctx.AddError(TIssue(ctx.GetPosition(read.FreeArgs().Get(0).Pos()), "Expected key"));
return TStatus::Error;
}

const auto maybeKey = TExprBase(read.FreeArgs().Get(2).Ref().HeadPtr()).Maybe<TCoKey>();
if (!maybeKey) {
ctx.AddError(TIssue(ctx.GetPosition(read.FreeArgs().Get(0).Pos()), TStringBuilder() << "Expected key"));
ctx.AddError(TIssue(ctx.GetPosition(read.FreeArgs().Get(0).Pos()), "Expected key"));
return TStatus::Error;
}

const auto& keyArg = maybeKey.Cast().Ref().Head();
if (!keyArg.IsList() || keyArg.ChildrenSize() != 2U || !keyArg.Head().IsAtom("table") ||
!keyArg.Tail().IsCallable(TCoString::CallableName())) {
ctx.AddError(TIssue(ctx.GetPosition(keyArg.Pos()), TStringBuilder() << "Expected single table name"));
ctx.AddError(TIssue(ctx.GetPosition(keyArg.Pos()), "Expected single table name"));
return TStatus::Error;
}

Expand Down Expand Up @@ -200,7 +200,7 @@ namespace NYql {
}
} else {
ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), TStringBuilder()
<< "Not found result for " << clusterName << '.' << tableName));
<< "Not found result for " << clusterName << '.' << tableName));
hasErrors = true;
break;
}
Expand Down Expand Up @@ -262,27 +262,36 @@ namespace NYql {
void FillCredentials(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig) {
auto dsi = request.mutable_data_source_instance();

// If login/password is provided, copy them into request
// If login/password is provided, just copy them into request
if (clusterConfig.GetCredentials().Hasbasic()) {
*dsi->mutable_credentials() = clusterConfig.GetCredentials();
return;
}

Y_ENSURE(State_->CredentialsFactory, "CredentialsFactory is not initialized");

// If service account is provided, prepare to obtain IAM-token

auto structuredTokenJSON = TStructuredTokenBuilder().SetServiceAccountIdAuth(
clusterConfig.GetServiceAccountId(),
clusterConfig.GetServiceAccountIdSignature())
.ToJson();

// If service account is provided, obtain IAM-token
.ToJson();
Y_ENSURE(structuredTokenJSON, "empty structured token");

auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(
State_->CredentialsFactory,
structuredTokenJSON,
false);
auto iamToken = credentialsProviderFactory->CreateProvider()->GetAuthInfo();
// Create provider or get existing one.
// It's crucial to reuse providers because their construction implies synchronous IO.
auto providersIt = State_->CredentialProviders.find(clusterConfig.name());
if (providersIt == State_->CredentialProviders.end()) {
auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(
State_->CredentialsFactory,
structuredTokenJSON,
false);

providersIt = State_->CredentialProviders.emplace(
std::make_pair(clusterConfig.name(), credentialsProviderFactory->CreateProvider())).first;
}

auto iamToken = providersIt->second->GetAuthInfo();
Y_ENSURE(iamToken, "empty IAM token");

*dsi->mutable_credentials()->mutable_token()->mutable_value() = iamToken;
Expand Down
10 changes: 8 additions & 2 deletions ydb/library/yql/providers/generic/provider/yql_generic_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
#include "yql_generic_settings.h"

#include <ydb/library/yql/core/yql_data_provider.h>
#include <ydb/library/yql/providers/generic/connector/libcpp/client.h>
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/providers/generic/connector/libcpp/client.h>
#include <ydb/public/sdk/cpp/client/ydb_types/credentials/credentials.h>

namespace NKikimr::NMiniKQL {
class IFunctionRegistry;
Expand Down Expand Up @@ -52,11 +53,16 @@ namespace NYql {
TGenericConfiguration::TPtr Configuration = MakeIntrusive<TGenericConfiguration>();
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry;

// key - (database id, database type), value - credentials to access MDB API
// key - (database id, database type), value - credentials to access managed APIs
IDatabaseAsyncResolver::TDatabaseAuthMap DatabaseAuth;
std::shared_ptr<IDatabaseAsyncResolver> DatabaseResolver;

// key - cluster name, value - TCredentialsProviderPtr
// It's important to cache credentials providers, because they make IO
// (synchronous call via Token Accessor client) during the construction.
std::unordered_map<TString, NYdb::TCredentialsProviderPtr> CredentialProviders;
ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;

NConnector::IClient::TPtr GenericClient;

private:
Expand Down

0 comments on commit 8616d9d

Please sign in to comment.