Skip to content

Commit

Permalink
Merge 16aa7a9 into 3c0eb8e
Browse files Browse the repository at this point in the history
  • Loading branch information
vitalyisaev2 authored Feb 28, 2024
2 parents 3c0eb8e + 16aa7a9 commit db2ebe9
Show file tree
Hide file tree
Showing 30 changed files with 299 additions and 164 deletions.
15 changes: 6 additions & 9 deletions ydb/core/fq/libs/actors/clusters_from_connections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,17 +216,14 @@ void AddClustersFromConnections(
switch (conn.content().setting().connection_case()) {
case FederatedQuery::ConnectionSetting::kYdbDatabase: {
const auto& db = conn.content().setting().ydb_database();
auto* clusterCfg = gatewaysConfig.MutableYdb()->AddClusterMapping();
auto* clusterCfg = gatewaysConfig.MutableGeneric()->AddClusterMapping();
clusterCfg->SetKind(NYql::NConnector::NApi::EDataSourceKind::YDB);
clusterCfg->SetProtocol(NYql::NConnector::NApi::EProtocol::NATIVE);
clusterCfg->SetName(connectionName);
clusterCfg->SetId(db.database_id());
if (db.database())
clusterCfg->SetDatabase(db.database());
if (db.endpoint())
clusterCfg->SetEndpoint(db.endpoint());
clusterCfg->SetSecure(db.secure());
clusterCfg->SetAddBearerToToken(common.GetUseBearerForYdb());
clusterCfg->SetDatabaseId(db.database_id());
clusterCfg->SetUseSsl(!common.GetDisableSslForGenericDataSources());
FillClusterAuth(*clusterCfg, db.auth(), authToken, accountIdSignatures);
clusters.emplace(connectionName, YdbProviderName);
clusters.emplace(connectionName, GenericProviderName);
break;
}
case FederatedQuery::ConnectionSetting::kClickhouseCluster: {
Expand Down
16 changes: 13 additions & 3 deletions ydb/core/fq/libs/actors/database_resolver.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "database_resolver.h"

#include <util/string/split.h>
#include <ydb/core/fq/libs/common/cache.h>
#include <ydb/core/fq/libs/config/protos/issue_id.pb.h>
#include <ydb/core/fq/libs/events/events.h>
Expand Down Expand Up @@ -136,7 +137,7 @@ class TResponseProcessor : public TActorBootstrapped<TResponseProcessor>
const auto requestIter = Requests.find(ev->Get()->Request);
HandledIds++;

LOG_T("ResponseProcessor::Handle(HttpIncomingResponse): got MDB API response: code=" << ev->Get()->Response->Status);
LOG_T("ResponseProcessor::Handle(HttpIncomingResponse): got API response: code=" << ev->Get()->Response->Status);

try {
HandleResponse(ev, requestIter, errorMessage, result);
Expand Down Expand Up @@ -312,7 +313,12 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
}

Y_ENSURE(endpoint);
return TDatabaseDescription{endpoint, "", 0, database, secure};

TVector<TString> split = StringSplitter(endpoint).Split(':');

Y_ENSURE(split.size() == 2);

return TDatabaseDescription{endpoint, split[0], FromString(split[1]), database, secure};
};
Parsers[NYql::EDatabaseType::Ydb] = ydbParser;
Parsers[NYql::EDatabaseType::DataStreams] = [ydbParser](
Expand All @@ -327,9 +333,11 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
if (!isDedicatedDb && ret.Endpoint.StartsWith("ydb.")) {
// Replace "ydb." -> "yds."
ret.Endpoint[2] = 's';
ret.Host[2] = 's';
}
if (isDedicatedDb) {
ret.Endpoint = "u-" + ret.Endpoint;
ret.Host = "u-" + ret.Host;
}
return ret;
};
Expand Down Expand Up @@ -486,6 +494,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
try {
TString url;
if (IsIn({NYql::EDatabaseType::Ydb, NYql::EDatabaseType::DataStreams }, databaseType)) {
YQL_ENSURE(ev->Get()->YdbMvpEndpoint.Size() > 0, "empty YDB MVP Endpoint");
url = TUrlBuilder(ev->Get()->YdbMvpEndpoint + "/database")
.AddUrlParam("databaseId", databaseId)
.Build();
Expand All @@ -497,7 +506,6 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
.AddPathComponent("hosts")
.Build();
}
LOG_D("ResponseProccessor::Handle(EndpointRequest): start GET request: " << url);

NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet(url);

Expand All @@ -507,6 +515,8 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
httpRequest->Set("Authorization", token);
}

LOG_D("ResponseProccessor::Handle(EndpointRequest): start GET request: " << "url: " << httpRequest->URL);

requests[httpRequest] = TResolveParams{databaseId, databaseType, databaseAuth};
} catch (const std::exception& e) {
const TString msg = TStringBuilder() << "error while preparing to resolve database id: " << databaseId
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/fq/libs/actors/run_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1939,12 +1939,13 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
dataProvidersInit.push_back(GetDqDataProviderInitializer(&CreateDqExecTransformer, NFq::CreateEmptyGateway(SelfId()), Params.DqCompFactory, {}, nullptr));
}

// FIXME: remove YDB provider support?
{
dataProvidersInit.push_back(GetYdbDataProviderInitializer(Params.YqSharedResources->UserSpaceYdbDriver, Params.CredentialsFactory, dbResolver));
}

{
dataProvidersInit.push_back(GetGenericDataProviderInitializer(Params.ConnectorClient, dbResolver));
dataProvidersInit.push_back(GetGenericDataProviderInitializer(Params.ConnectorClient, dbResolver, Params.CredentialsFactory));
}

{
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/fq/libs/actors/ut/database_resolver_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
})",
NYql::TDatabaseResolverResponse::TDatabaseDescription{
TString{"ydb.serverless.yandexcloud.net:2135"},
TString{""},
0,
TString{"ydb.serverless.yandexcloud.net"},
2135,
TString("/ru-central1/b1g7jdjqd07qg43c4fmp/etn021us5r9rhld1vgbh"),
true
},
Expand All @@ -196,8 +196,8 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
})",
NYql::TDatabaseResolverResponse::TDatabaseDescription{
TString{"yds.serverless.yandexcloud.net:2135"},
TString{""},
0,
TString{"yds.serverless.yandexcloud.net"},
2135,
TString("/ru-central1/b1g7jdjqd07qg43c4fmp/etn021us5r9rhld1vgbh"),
true
},
Expand All @@ -218,8 +218,8 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
})",
NYql::TDatabaseResolverResponse::TDatabaseDescription{
TString{"u-lb.etn021us5r9rhld1vgbh.ydb.mdb.yandexcloud.net:2135"},
TString{""},
0,
TString{"u-lb.etn021us5r9rhld1vgbh.ydb.mdb.yandexcloud.net"},
2135,
TString("/ru-central1/b1g7jdjqd07qg43c4fmp/etn021us5r9rhld1vgbh"),
true
},
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/fq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,10 @@ void Init(
&protoConfig.GetGateways().GetHttpGateway(),
yqCounters->GetSubgroup("subcomponent", "http_gateway"));

const auto connectorClient = NYql::NConnector::MakeClientGRPC(protoConfig.GetGateways().GetGeneric().GetConnector());
NYql::NConnector::IClient::TPtr connectorClient = nullptr;
if (protoConfig.GetGateways().GetGeneric().HasConnector()) {
connectorClient = NYql::NConnector::MakeClientGRPC(protoConfig.GetGateways().GetGeneric().GetConnector());
}

if (protoConfig.GetTokenAccessor().GetEnabled()) {
const auto& tokenAccessorConfig = protoConfig.GetTokenAccessor();
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1552,6 +1552,7 @@ class TKqpHost : public IKqpHost {
TypesCtx.Get(),
FuncRegistry,
FederatedQuerySetup->DatabaseAsyncResolver,
nullptr,
FederatedQuerySetup->ConnectorClient,
FederatedQuerySetup->GenericGatewayConfig
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ namespace NKikimr::NKqp {
// step 3: ReadSplits
std::vector<ui16> colData = {10, 20, 30, 40, 50};
clientMock->ExpectReadSplits()
.DataSourceInstance(dataSourceInstance)
.Split()
.Description("some binary description")
.Select()
Expand Down Expand Up @@ -208,7 +207,6 @@ namespace NKikimr::NKqp {

// step 3: ReadSplits
clientMock->ExpectReadSplits()
.DataSourceInstance(dataSourceInstance)
.Split()
.Description("some binary description")
.Select()
Expand Down Expand Up @@ -304,7 +302,6 @@ namespace NKikimr::NKqp {

// step 3: ReadSplits
clientMock->ExpectReadSplits()
.DataSourceInstance(dataSourceInstance)
.Split()
.Description("some binary description")
.Select()
Expand Down Expand Up @@ -413,7 +410,6 @@ namespace NKikimr::NKqp {
std::vector<i32> filterColumnData = {42, 24};
// clang-format off
clientMock->ExpectReadSplits()
.DataSourceInstance(dataSourceInstance)
.Split()
.Description("some binary description")
.Select(select)
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/providers/common/proto/gateways_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,8 @@ message TGenericGatewayConfig {
/////////////////////////////// Db Resolver ///////////////////////////////////

message TDbResolverConfig {
// Ydb / Yds mvp endpoint
// Ydb / Yds mvp endpoint, for example:
// https://ydbc.ydb.cloud.yandex.net:8789/ydbc/cloud-prod/
optional string YdbMvpEndpoint = 2;
}

Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/generic/actors/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ SRCS(
PEERDIR(
ydb/library/yql/dq/actors/compute
ydb/library/yql/minikql/computation
ydb/library/yql/providers/common/structured_token
ydb/library/yql/providers/common/token_accessor/client
ydb/library/yql/providers/generic/proto
ydb/library/yql/public/types
Expand Down
76 changes: 49 additions & 27 deletions ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h>
#include <ydb/library/yql/providers/generic/connector/libcpp/error.h>
#include <ydb/library/yql/providers/generic/connector/libcpp/utils.h>
Expand Down Expand Up @@ -102,16 +103,16 @@ namespace NYql::NDq {
ui64 inputIndex,
TCollectStatsLevel statsLevel,
NConnector::IClient::TPtr client,
const NConnector::NApi::TSelect& select,
const NConnector::NApi::TDataSourceInstance& dataSourceInstance,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
NConnector::TSource&& source,
const NActors::TActorId& computeActorId,
const NKikimr::NMiniKQL::THolderFactory& holderFactory)
: InputIndex_(inputIndex)
, ComputeActorId_(computeActorId)
, Client_(std::move(client))
, CredentialsFactory_(std::move(credentialsFactory))
, HolderFactory_(holderFactory)
, Select_(select)
, DataSourceInstance_(dataSourceInstance)
, Source_(source)
{
IngressStats_.Level = statsLevel;
}
Expand Down Expand Up @@ -143,7 +144,9 @@ namespace NYql::NDq {

// Prepare request
NConnector::NApi::TListSplitsRequest request;
*request.mutable_selects()->Add() = Select_;
NConnector::NApi::TSelect select = Source_.select(); // copy TSelect from source
MaybeRefreshToken(select.mutable_data_source_instance());
*request.mutable_selects()->Add() = std::move(select);

// Initialize stream
Client_->ListSplits(request).Subscribe(
Expand Down Expand Up @@ -236,8 +239,11 @@ namespace NYql::NDq {

std::for_each(
Splits_.cbegin(), Splits_.cend(),
[&](const NConnector::NApi::TSplit& split) { request.mutable_splits()->Add()->CopyFrom(split); });
request.mutable_data_source_instance()->CopyFrom(DataSourceInstance_);
[&](const NConnector::NApi::TSplit& split) {
NConnector::NApi::TSplit splitCopy = split;
MaybeRefreshToken(splitCopy.mutable_select()->mutable_data_source_instance());
*request.mutable_splits()->Add() = std::move(split);
});

// Start streaming
Client_->ReadSplits(request).Subscribe(
Expand Down Expand Up @@ -403,8 +409,8 @@ namespace NYql::NDq {
// It's very important to fill UV columns in the alphabet order,
// paying attention to the scalar field containing block length.
TVector<TString> fieldNames;
std::transform(Select_.what().items().cbegin(),
Select_.what().items().cend(),
std::transform(Source_.select().what().items().cbegin(),
Source_.select().what().items().cend(),
std::back_inserter(fieldNames),
[](const auto& item) { return item.column().name(); });

Expand Down Expand Up @@ -452,6 +458,33 @@ namespace NYql::NDq {
return total;
}

void MaybeRefreshToken(NConnector::NApi::TDataSourceInstance* dsi) const {
if (!dsi->credentials().has_token()) {
return;
}

// Token may have expired. Refresh it.
Y_ENSURE(CredentialsFactory_, "CredentialsFactory is not initialized");

auto structuredTokenJSON = TStructuredTokenBuilder().SetServiceAccountIdAuth(
Source_.GetServiceAccountId(),
Source_.GetServiceAccountIdSignature())
.ToJson();

// If service account is provided, obtain IAM-token
Y_ENSURE(structuredTokenJSON, "empty structured token");

auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(
CredentialsFactory_,
structuredTokenJSON,
false);
auto iamToken = credentialsProviderFactory->CreateProvider()->GetAuthInfo();
Y_ENSURE(iamToken, "empty IAM token");

*dsi->mutable_credentials()->mutable_token()->mutable_value() = iamToken;
*dsi->mutable_credentials()->mutable_token()->mutable_type() = "IAM";
}

// IActor & IDqComputeActorAsyncInput
void PassAway() override { // Is called from Compute Actor
YQL_CLOG(INFO, ProviderGeneric) << "PassAway :: final ingress stats"
Expand Down Expand Up @@ -484,6 +517,7 @@ namespace NYql::NDq {
const NActors::TActorId ComputeActorId_;

NConnector::IClient::TPtr Client_;
ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory_;
NConnector::IListSplitsStreamIterator::TPtr ListSplitsIterator_;
TVector<NConnector::NApi::TSplit> Splits_; // accumulated list of table splits
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator_;
Expand All @@ -492,40 +526,28 @@ namespace NYql::NDq {

NKikimr::NMiniKQL::TPlainContainerCache ArrowRowContainerCache_;
const NKikimr::NMiniKQL::THolderFactory& HolderFactory_;
const NYql::NConnector::NApi::TSelect Select_;
const NYql::NConnector::NApi::TDataSourceInstance DataSourceInstance_;
NConnector::TSource Source_;
};

std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*>
CreateGenericReadActor(NConnector::IClient::TPtr genericClient,
Generic::TSource&& params,
NConnector::TSource&& source,
ui64 inputIndex,
TCollectStatsLevel statsLevel,
const THashMap<TString, TString>& /*secureParams*/,
const THashMap<TString, TString>& /*taskParams*/,
const NActors::TActorId& computeActorId,
ISecuredServiceAccountCredentialsFactory::TPtr /*credentialsFactory*/,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
const NKikimr::NMiniKQL::THolderFactory& holderFactory)
{
const auto dsi = params.select().data_source_instance();
const auto dsi = source.select().data_source_instance();
YQL_CLOG(INFO, ProviderGeneric) << "Creating read actor with params:"
<< " kind=" << NYql::NConnector::NApi::EDataSourceKind_Name(dsi.kind())
<< ", endpoint=" << dsi.endpoint().ShortDebugString()
<< ", database=" << dsi.database()
<< ", use_tls=" << ToString(dsi.use_tls())
<< ", protocol=" << NYql::NConnector::NApi::EProtocol_Name(dsi.protocol());

// FIXME: strange piece of logic - authToken is created but not used:
// https://a.yandex-team.ru/arcadia/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp?rev=r11550199#L140
/*
const auto token = secureParams.Value(params.token(), TString{});
const auto credentialsProviderFactory =
CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token);
const auto authToken = credentialsProviderFactory->CreateProvider()->GetAuthInfo();
const auto one = token.find('#'), two = token.rfind('#');
YQL_ENSURE(one != TString::npos && two != TString::npos && one < two, "Bad token format:" << token);
*/

// TODO: partitioning is not implemented now, but this code will be useful for the further research:
/*
TStringBuilder part;
Expand All @@ -543,8 +565,8 @@ namespace NYql::NDq {
inputIndex,
statsLevel,
genericClient,
params.select(),
dsi,
credentialsFactory,
std::move(source),
computeActorId,
holderFactory);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
namespace NYql::NDq {

std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*>
CreateGenericReadActor(NConnector::IClient::TPtr genericClient, Generic::TSource&& params, ui64 inputIndex,
CreateGenericReadActor(NConnector::IClient::TPtr genericClient, NConnector::TSource&& params, ui64 inputIndex,
TCollectStatsLevel statsLevel, const THashMap<TString, TString>& secureParams,
const THashMap<TString, TString>& taskParams, const NActors::TActorId& computeActorId,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ namespace NYql::NDq {
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
NYql::NConnector::IClient::TPtr genericClient) {
auto genericFactory = [credentialsFactory, genericClient](
Generic::TSource&& settings,
NConnector::TSource&& settings,
IDqAsyncIoFactory::TSourceArguments&& args) {
return CreateGenericReadActor(genericClient, std::move(settings), args.InputIndex, args.StatsLevel,
args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, args.HolderFactory);
};

for (auto& sourceName : {"ClickHouseGeneric", "PostgreSqlGeneric", "YdbGeneric"}) {
factory.RegisterSource<Generic::TSource>(sourceName, genericFactory);
factory.RegisterSource<NConnector::TSource>(sourceName, genericFactory);
}
}

Expand Down
Loading

0 comments on commit db2ebe9

Please sign in to comment.