Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support YDB in YQL Generic Provider (YQv1) #2300

Merged
merged 4 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions ydb/core/fq/libs/actors/clusters_from_connections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,17 +216,14 @@ void AddClustersFromConnections(
switch (conn.content().setting().connection_case()) {
case FederatedQuery::ConnectionSetting::kYdbDatabase: {
const auto& db = conn.content().setting().ydb_database();
auto* clusterCfg = gatewaysConfig.MutableYdb()->AddClusterMapping();
auto* clusterCfg = gatewaysConfig.MutableGeneric()->AddClusterMapping();
clusterCfg->SetKind(NYql::NConnector::NApi::EDataSourceKind::YDB);
clusterCfg->SetProtocol(NYql::NConnector::NApi::EProtocol::NATIVE);
clusterCfg->SetName(connectionName);
clusterCfg->SetId(db.database_id());
if (db.database())
clusterCfg->SetDatabase(db.database());
if (db.endpoint())
clusterCfg->SetEndpoint(db.endpoint());
clusterCfg->SetSecure(db.secure());
clusterCfg->SetAddBearerToToken(common.GetUseBearerForYdb());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bearer - важная тема. хорошо бы ее тоже запилить. это нужно для работы внутри Я не с помощью OAuth токена, а с помощью IAM токена

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ты имеешь в виду, добавить опцию AddBearerToToken вот сюда https://github.com/ydb-platform/ydb/blob/main/ydb/library/yql/providers/common/proto/gateways_config.proto#L547-L590, чтобы индивидуально указывать каждому кластеру?

Сейчас при ресолвинге безусловно true. Это неправильно? https://github.com/ydb-platform/ydb/blob/main/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp#L52

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

пусть будет true, нормуль

Copy link
Member Author

@vitalyisaev2 vitalyisaev2 Feb 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ещё раз хочу по пунктам проговорить, что надо делать с bearer, потому что логика весьма запутанная и есть разные контексты применения bearer.

Поход в MDB API, в YDB CP (за ресолвингом)

Bearer всегда true. Так всё и остаётся.

Поход в YDB CP (за метаданными/данными)

Сейчас Bearer всегда false. Надо:

  1. Добавить в TGenericClusterConfig поле AddBearerToToken.
  2. Устанавливать значение из этого поля в YQv1 в clusters_from_connections.cpp, брать его из конфига YQ.
  3. Устанавливать значение из этого поля в YQv2 в external source factory, и оттуда прокидывать в провадйер через TGenericProvider::AddCluster. Это означает, что AddBearerToToken просочится в DSL (как минимум в параметры CREATE EXTERNAL DATA SOURCE для YDB).
  4. В конфигах YQ типа этого поменять true на false.

Всё верно?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

вроде верно. это доп флаг для аутентификации во внешней системе

clusterCfg->SetDatabaseId(db.database_id());
clusterCfg->SetUseSsl(!common.GetDisableSslForGenericDataSources());
FillClusterAuth(*clusterCfg, db.auth(), authToken, accountIdSignatures);
clusters.emplace(connectionName, YdbProviderName);
clusters.emplace(connectionName, GenericProviderName);
break;
}
case FederatedQuery::ConnectionSetting::kClickhouseCluster: {
Expand Down
16 changes: 13 additions & 3 deletions ydb/core/fq/libs/actors/database_resolver.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "database_resolver.h"

#include <util/string/split.h>
#include <ydb/core/fq/libs/common/cache.h>
#include <ydb/core/fq/libs/config/protos/issue_id.pb.h>
#include <ydb/core/fq/libs/events/events.h>
Expand Down Expand Up @@ -136,7 +137,7 @@ class TResponseProcessor : public TActorBootstrapped<TResponseProcessor>
const auto requestIter = Requests.find(ev->Get()->Request);
HandledIds++;

LOG_T("ResponseProcessor::Handle(HttpIncomingResponse): got MDB API response: code=" << ev->Get()->Response->Status);
LOG_T("ResponseProcessor::Handle(HttpIncomingResponse): got API response: code=" << ev->Get()->Response->Status);

try {
HandleResponse(ev, requestIter, errorMessage, result);
Expand Down Expand Up @@ -312,7 +313,12 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
}

Y_ENSURE(endpoint);
return TDatabaseDescription{endpoint, "", 0, database, secure};

TVector<TString> split = StringSplitter(endpoint).Split(':');

Y_ENSURE(split.size() == 2);

return TDatabaseDescription{endpoint, split[0], FromString(split[1]), database, secure};
};
Parsers[NYql::EDatabaseType::Ydb] = ydbParser;
Parsers[NYql::EDatabaseType::DataStreams] = [ydbParser](
Expand All @@ -327,9 +333,11 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
if (!isDedicatedDb && ret.Endpoint.StartsWith("ydb.")) {
// Replace "ydb." -> "yds."
ret.Endpoint[2] = 's';
ret.Host[2] = 's';
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

избыточность. нет смысл заменить endpoint на два поля host + port ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Такие мысли были, но думаю, что точно не в этом ревью, так как это изменение затронет много провайдеров в yql.

}
if (isDedicatedDb) {
ret.Endpoint = "u-" + ret.Endpoint;
ret.Host = "u-" + ret.Host;
}
return ret;
};
Expand Down Expand Up @@ -486,6 +494,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
try {
TString url;
if (IsIn({NYql::EDatabaseType::Ydb, NYql::EDatabaseType::DataStreams }, databaseType)) {
YQL_ENSURE(ev->Get()->YdbMvpEndpoint.Size() > 0, "empty YDB MVP Endpoint");
url = TUrlBuilder(ev->Get()->YdbMvpEndpoint + "/database")
.AddUrlParam("databaseId", databaseId)
.Build();
Expand All @@ -497,7 +506,6 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
.AddPathComponent("hosts")
.Build();
}
LOG_D("ResponseProccessor::Handle(EndpointRequest): start GET request: " << url);

NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet(url);

Expand All @@ -507,6 +515,8 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
httpRequest->Set("Authorization", token);
}

LOG_D("ResponseProccessor::Handle(EndpointRequest): start GET request: " << "url: " << httpRequest->URL);

requests[httpRequest] = TResolveParams{databaseId, databaseType, databaseAuth};
} catch (const std::exception& e) {
const TString msg = TStringBuilder() << "error while preparing to resolve database id: " << databaseId
Expand Down
7 changes: 1 addition & 6 deletions ydb/core/fq/libs/actors/run_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
#include <ydb/library/yql/providers/pq/task_meta/task_meta.h>
#include <ydb/library/yql/providers/s3/provider/yql_s3_provider.h>
#include <ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h>
#include <ydb/library/yql/providers/solomon/gateway/yql_solomon_gateway.h>
#include <ydb/library/yql/providers/solomon/provider/yql_solomon_provider.h>
#include <ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.h>
Expand Down Expand Up @@ -1940,11 +1939,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
}

{
dataProvidersInit.push_back(GetYdbDataProviderInitializer(Params.YqSharedResources->UserSpaceYdbDriver, Params.CredentialsFactory, dbResolver));
}

{
dataProvidersInit.push_back(GetGenericDataProviderInitializer(Params.ConnectorClient, dbResolver));
dataProvidersInit.push_back(GetGenericDataProviderInitializer(Params.ConnectorClient, dbResolver, Params.CredentialsFactory));
}

{
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/fq/libs/actors/ut/database_resolver_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
})",
NYql::TDatabaseResolverResponse::TDatabaseDescription{
TString{"ydb.serverless.yandexcloud.net:2135"},
TString{""},
0,
TString{"ydb.serverless.yandexcloud.net"},
2135,
TString("/ru-central1/b1g7jdjqd07qg43c4fmp/etn021us5r9rhld1vgbh"),
true
},
Expand All @@ -196,8 +196,8 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
})",
NYql::TDatabaseResolverResponse::TDatabaseDescription{
TString{"yds.serverless.yandexcloud.net:2135"},
TString{""},
0,
TString{"yds.serverless.yandexcloud.net"},
2135,
TString("/ru-central1/b1g7jdjqd07qg43c4fmp/etn021us5r9rhld1vgbh"),
true
},
Expand All @@ -218,8 +218,8 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
})",
NYql::TDatabaseResolverResponse::TDatabaseDescription{
TString{"u-lb.etn021us5r9rhld1vgbh.ydb.mdb.yandexcloud.net:2135"},
TString{""},
0,
TString{"u-lb.etn021us5r9rhld1vgbh.ydb.mdb.yandexcloud.net"},
2135,
TString("/ru-central1/b1g7jdjqd07qg43c4fmp/etn021us5r9rhld1vgbh"),
true
},
Expand Down
1 change: 0 additions & 1 deletion ydb/core/fq/libs/actors/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ PEERDIR(
ydb/library/yql/providers/pq/provider
ydb/library/yql/providers/pq/task_meta
ydb/library/yql/providers/s3/provider
ydb/library/yql/providers/ydb/provider
ydb/library/yql/public/issue
ydb/library/yql/public/issue/protos
ydb/library/yql/sql/settings
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/fq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,10 @@ void Init(
&protoConfig.GetGateways().GetHttpGateway(),
yqCounters->GetSubgroup("subcomponent", "http_gateway"));

const auto connectorClient = NYql::NConnector::MakeClientGRPC(protoConfig.GetGateways().GetGeneric().GetConnector());
NYql::NConnector::IClient::TPtr connectorClient = nullptr;
if (protoConfig.GetGateways().GetGeneric().HasConnector()) {
connectorClient = NYql::NConnector::MakeClientGRPC(protoConfig.GetGateways().GetGeneric().GetConnector());
}

if (protoConfig.GetTokenAccessor().GetEnabled()) {
const auto& tokenAccessorConfig = protoConfig.GetTokenAccessor();
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1552,6 +1552,7 @@ class TKqpHost : public IKqpHost {
TypesCtx.Get(),
FuncRegistry,
FederatedQuerySetup->DatabaseAsyncResolver,
nullptr,
FederatedQuerySetup->ConnectorClient,
FederatedQuerySetup->GenericGatewayConfig
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ namespace NKikimr::NKqp {
// step 3: ReadSplits
std::vector<ui16> colData = {10, 20, 30, 40, 50};
clientMock->ExpectReadSplits()
.DataSourceInstance(dataSourceInstance)
.Split()
.Description("some binary description")
.Select()
Expand Down Expand Up @@ -208,7 +207,6 @@ namespace NKikimr::NKqp {

// step 3: ReadSplits
clientMock->ExpectReadSplits()
.DataSourceInstance(dataSourceInstance)
.Split()
.Description("some binary description")
.Select()
Expand Down Expand Up @@ -304,7 +302,6 @@ namespace NKikimr::NKqp {

// step 3: ReadSplits
clientMock->ExpectReadSplits()
.DataSourceInstance(dataSourceInstance)
.Split()
.Description("some binary description")
.Select()
Expand Down Expand Up @@ -413,7 +410,6 @@ namespace NKikimr::NKqp {
std::vector<i32> filterColumnData = {42, 24};
// clang-format off
clientMock->ExpectReadSplits()
.DataSourceInstance(dataSourceInstance)
.Split()
.Description("some binary description")
.Select(select)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,8 @@ message TGenericGatewayConfig {
/////////////////////////////// Db Resolver ///////////////////////////////////

message TDbResolverConfig {
// Ydb / Yds mvp endpoint
// Ydb / Yds MVP endpoint. Expected format:
// [http|https]://host:port/ydbc/cloud-prod/
optional string YdbMvpEndpoint = 2;
}

Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/providers/generic/actors/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ SRCS(
PEERDIR(
ydb/library/yql/dq/actors/compute
ydb/library/yql/minikql/computation
ydb/library/yql/providers/common/structured_token
ydb/library/yql/providers/common/token_accessor/client
ydb/library/yql/providers/generic/proto
ydb/library/yql/public/types
ydb/library/yql/providers/generic/connector/libcpp
ydb/public/sdk/cpp/client/ydb_types/credentials
)

YQL_LAST_ABI_VERSION()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h>
#include <ydb/library/yql/providers/generic/connector/libcpp/error.h>
#include <ydb/library/yql/providers/generic/connector/libcpp/utils.h>
#include <ydb/library/yql/providers/generic/proto/range.pb.h>
#include <ydb/library/yql/public/udf/arrow/util.h>
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/library/yql/utils/yql_panic.h>
#include <ydb/public/sdk/cpp/client/ydb_types/credentials/credentials.h>

namespace NYql::NDq {

Expand Down Expand Up @@ -102,16 +104,16 @@ namespace NYql::NDq {
ui64 inputIndex,
TCollectStatsLevel statsLevel,
NConnector::IClient::TPtr client,
const NConnector::NApi::TSelect& select,
const NConnector::NApi::TDataSourceInstance& dataSourceInstance,
NYdb::TCredentialsProviderPtr credentialsProvider,
NConnector::TSource&& source,
const NActors::TActorId& computeActorId,
const NKikimr::NMiniKQL::THolderFactory& holderFactory)
: InputIndex_(inputIndex)
, ComputeActorId_(computeActorId)
, Client_(std::move(client))
, CredentialsProvider_(std::move(credentialsProvider))
, HolderFactory_(holderFactory)
, Select_(select)
, DataSourceInstance_(dataSourceInstance)
, Source_(source)
{
IngressStats_.Level = statsLevel;
}
Expand Down Expand Up @@ -143,7 +145,9 @@ namespace NYql::NDq {

// Prepare request
NConnector::NApi::TListSplitsRequest request;
*request.mutable_selects()->Add() = Select_;
NConnector::NApi::TSelect select = Source_.select(); // copy TSelect from source
MaybeRefreshToken(select.mutable_data_source_instance());
*request.mutable_selects()->Add() = std::move(select);

// Initialize stream
Client_->ListSplits(request).Subscribe(
Expand Down Expand Up @@ -236,8 +240,11 @@ namespace NYql::NDq {

std::for_each(
Splits_.cbegin(), Splits_.cend(),
[&](const NConnector::NApi::TSplit& split) { request.mutable_splits()->Add()->CopyFrom(split); });
request.mutable_data_source_instance()->CopyFrom(DataSourceInstance_);
[&](const NConnector::NApi::TSplit& split) {
NConnector::NApi::TSplit splitCopy = split;
MaybeRefreshToken(splitCopy.mutable_select()->mutable_data_source_instance());
*request.mutable_splits()->Add() = std::move(split);
});

// Start streaming
Client_->ReadSplits(request).Subscribe(
Expand Down Expand Up @@ -403,8 +410,8 @@ namespace NYql::NDq {
// It's very important to fill UV columns in the alphabet order,
// paying attention to the scalar field containing block length.
TVector<TString> fieldNames;
std::transform(Select_.what().items().cbegin(),
Select_.what().items().cend(),
std::transform(Source_.select().what().items().cbegin(),
Source_.select().what().items().cend(),
std::back_inserter(fieldNames),
[](const auto& item) { return item.column().name(); });

Expand Down Expand Up @@ -452,6 +459,20 @@ namespace NYql::NDq {
return total;
}

void MaybeRefreshToken(NConnector::NApi::TDataSourceInstance* dsi) const {
if (!dsi->credentials().has_token()) {
return;
}

// Token may have expired. Refresh it.
Y_ENSURE(CredentialsProvider_, "CredentialsProvider is not initialized");
auto iamToken = CredentialsProvider_->GetAuthInfo();
Y_ENSURE(iamToken, "empty IAM token");

*dsi->mutable_credentials()->mutable_token()->mutable_value() = iamToken;
*dsi->mutable_credentials()->mutable_token()->mutable_type() = "IAM";
}

// IActor & IDqComputeActorAsyncInput
void PassAway() override { // Is called from Compute Actor
YQL_CLOG(INFO, ProviderGeneric) << "PassAway :: final ingress stats"
Expand Down Expand Up @@ -484,6 +505,7 @@ namespace NYql::NDq {
const NActors::TActorId ComputeActorId_;

NConnector::IClient::TPtr Client_;
NYdb::TCredentialsProviderPtr CredentialsProvider_;
NConnector::IListSplitsStreamIterator::TPtr ListSplitsIterator_;
TVector<NConnector::NApi::TSplit> Splits_; // accumulated list of table splits
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator_;
Expand All @@ -492,22 +514,21 @@ namespace NYql::NDq {

NKikimr::NMiniKQL::TPlainContainerCache ArrowRowContainerCache_;
const NKikimr::NMiniKQL::THolderFactory& HolderFactory_;
const NYql::NConnector::NApi::TSelect Select_;
const NYql::NConnector::NApi::TDataSourceInstance DataSourceInstance_;
NConnector::TSource Source_;
};

std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*>
CreateGenericReadActor(NConnector::IClient::TPtr genericClient,
Generic::TSource&& params,
NConnector::TSource&& source,
ui64 inputIndex,
TCollectStatsLevel statsLevel,
const THashMap<TString, TString>& /*secureParams*/,
const THashMap<TString, TString>& /*taskParams*/,
const NActors::TActorId& computeActorId,
ISecuredServiceAccountCredentialsFactory::TPtr /*credentialsFactory*/,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
const NKikimr::NMiniKQL::THolderFactory& holderFactory)
{
const auto dsi = params.select().data_source_instance();
const auto dsi = source.select().data_source_instance();
YQL_CLOG(INFO, ProviderGeneric) << "Creating read actor with params:"
<< " kind=" << NYql::NConnector::NApi::EDataSourceKind_Name(dsi.kind())
<< ", endpoint=" << dsi.endpoint().ShortDebugString()
Expand All @@ -526,6 +547,25 @@ namespace NYql::NDq {
YQL_ENSURE(one != TString::npos && two != TString::npos && one < two, "Bad token format:" << token);
*/

// Obtain token to access remote data source if necessary
NYdb::TCredentialsProviderPtr credentialProvider;
if (source.GetServiceAccountId() && source.GetServiceAccountIdSignature()) {
Y_ENSURE(credentialsFactory, "CredentialsFactory is not initialized");

auto structuredTokenJSON = TStructuredTokenBuilder().SetServiceAccountIdAuth(
source.GetServiceAccountId(), source.GetServiceAccountIdSignature())
.ToJson();

// If service account is provided, obtain IAM-token
Y_ENSURE(structuredTokenJSON, "empty structured token");

auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(
credentialsFactory,
structuredTokenJSON,
false);
credentialProvider = credentialsProviderFactory->CreateProvider();
}

// TODO: partitioning is not implemented now, but this code will be useful for the further research:
/*
TStringBuilder part;
Expand All @@ -543,8 +583,8 @@ namespace NYql::NDq {
inputIndex,
statsLevel,
genericClient,
params.select(),
dsi,
std::move(credentialProvider),
std::move(source),
computeActorId,
holderFactory);

Expand Down
Loading
Loading