Skip to content

Commit

Permalink
Merge 1832742 into 3de9c00
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Jul 16, 2024
2 parents 3de9c00 + 1832742 commit 1abaac1
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 53 deletions.
19 changes: 11 additions & 8 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
#include <ydb/library/yql/providers/s3/credentials/credentials.h>
#include <ydb/library/yql/providers/s3/object_listers/yql_s3_list.h>
#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h>
#include <ydb/library/yql/providers/s3/proto/credentials.pb.h>
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>

Expand Down Expand Up @@ -284,12 +286,13 @@ struct TObjectStorageExternalSource : public IExternalSource {
return NThreading::MakeFuture(std::move(meta));
}

NYql::TS3Credentials::TAuthInfo authInfo{};
NYql::TStructuredTokenBuilder structuredTokenBuilder;
if (std::holds_alternative<NAuth::TAws>(meta->Auth)) {
auto& awsAuth = std::get<NAuth::TAws>(meta->Auth);
authInfo.AwsAccessKey = awsAuth.AccessKey;
authInfo.AwsAccessSecret = awsAuth.SecretAccessKey;
authInfo.AwsRegion = awsAuth.Region;
NYql::NS3::TAwsParams params;
params.SetAwsAccessKey(awsAuth.AccessKey);
params.SetAwsRegion(awsAuth.Region);
structuredTokenBuilder.SetBasicAuth(params.SerializeAsString(), awsAuth.SecretAccessKey);
} else if (std::holds_alternative<NAuth::TServiceAccount>(meta->Auth)) {
if (!CredentialsFactory) {
try {
Expand All @@ -299,15 +302,15 @@ struct TObjectStorageExternalSource : public IExternalSource {
}
}
auto& saAuth = std::get<NAuth::TServiceAccount>(meta->Auth);
NYql::GetAuthInfo(CredentialsFactory, "");
authInfo.Token = CredentialsFactory->Create(saAuth.ServiceAccountId, saAuth.ServiceAccountIdSignature)->CreateProvider()->GetAuthInfo();
structuredTokenBuilder.SetServiceAccountIdAuth(saAuth.ServiceAccountId, saAuth.ServiceAccountIdSignature);
}

const NYql::TS3Credentials credentials(CredentialsFactory, structuredTokenBuilder.ToJson());
auto httpGateway = NYql::IHTTPGateway::Make();
auto httpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, httpRetryPolicy, NYql::NS3Lister::TListingRequest{
.Url = meta->DataSourceLocation,
.AuthInfo = authInfo,
.Credentials = credentials,
.Pattern = meta->TableLocation,
}, Nothing(), false);
auto afterListing = s3Lister->Next().Apply([path = meta->TableLocation](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
Expand All @@ -332,7 +335,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
meta->DataSourceLocation,
httpGateway,
NYql::IHTTPGateway::TRetryPolicy::GetNoRetryPolicy(),
std::move(authInfo)
std::move(credentials.GetAuthInfo())
));

meta->Attributes.erase("withinfer");
Expand Down
21 changes: 11 additions & 10 deletions ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
IHTTPGateway::TPtr gateway,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
const TString& url,
const TS3Credentials::TAuthInfo& authInfo,
const TS3Credentials& credentials,
const TString& pattern,
NYql::NS3Lister::ES3PatternVariant patternVariant,
NYql::NS3Details::TPathList&& paths,
Expand All @@ -69,7 +69,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
, RetryPolicy(retryPolicy)
, ActorSystem(NActors::TActivationContext::ActorSystem())
, Url(url)
, AuthInfo(authInfo)
, Credentials(credentials)
, Pattern(pattern)
, PatternVariant(patternVariant)
, Paths(std::move(paths))
Expand Down Expand Up @@ -113,7 +113,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
Gateway,
RetryPolicy,
Url,
AuthInfo,
Credentials,
Pattern,
PatternVariant,
NYql::NS3Lister::ES3PatternType::Wildcard));
Expand Down Expand Up @@ -164,10 +164,11 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
auto url = Url + object.GetPath();
auto id = object.GetPathIndex();
const TString requestId = CreateGuidAsString();
const auto& authInfo = Credentials.GetAuthInfo();
LOG_D("TS3ReadActor", "Download: " << url << ", ID: " << id << ", request id: [" << requestId << "]");
Gateway->Download(
UrlEscapeRet(url, true),
IHTTPGateway::MakeYcHeaders(requestId, AuthInfo.GetToken(), {}, AuthInfo.GetAwsUserPwd(), AuthInfo.GetAwsSigV4()),
IHTTPGateway::MakeYcHeaders(requestId, authInfo.GetToken(), {}, authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()),
0U,
std::min(object.GetSize(), SizeLimit),
std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), requestId, std::placeholders::_1, id, object.GetPath()),
Expand Down Expand Up @@ -456,7 +457,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
NActors::TActorSystem* const ActorSystem;

const TString Url;
const TS3Credentials::TAuthInfo AuthInfo;
const TS3Credentials Credentials;
const TString Pattern;
const NYql::NS3Lister::ES3PatternVariant PatternVariant;
NYql::NS3Details::TPathList Paths;
Expand Down Expand Up @@ -503,7 +504,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateRawRead
IHTTPGateway::TPtr gateway,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
const TString& url,
const TS3Credentials::TAuthInfo& authInfo,
const TS3Credentials& credentials,
const TString& pattern,
NYql::NS3Lister::ES3PatternVariant patternVariant,
NYql::NS3Details::TPathList&& paths,
Expand All @@ -527,14 +528,14 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateRawRead
statsLevel,
txId,
std::move(gateway),
holderFactory,
url,
authInfo,
holderFactory,
url,
credentials,
pattern,
patternVariant,
std::move(paths),
addPathIndex,
computeActorId,
computeActorId,
sizeLimit,
retryPolicy,
readActorFactoryCfg,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateRawRead
IHTTPGateway::TPtr gateway,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
const TString& url,
const TS3Credentials::TAuthInfo& authInfo,
const TS3Credentials& credentials,
const TString& pattern,
NYql::NS3Lister::ES3PatternVariant patternVariant,
NYql::NS3Details::TPathList&& paths,
Expand Down
17 changes: 9 additions & 8 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1201,7 +1201,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
IHTTPGateway::TPtr gateway,
const THolderFactory& holderFactory,
const TString& url,
const TS3Credentials::TAuthInfo& authInfo,
const TS3Credentials& credentials,
const TString& pattern,
ES3PatternVariant patternVariant,
TPathList&& paths,
Expand Down Expand Up @@ -1230,7 +1230,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
, ComputeActorId(computeActorId)
, RetryPolicy(retryPolicy)
, Url(url)
, AuthInfo(authInfo)
, Credentials(credentials)
, Pattern(pattern)
, PatternVariant(patternVariant)
, Paths(std::move(paths))
Expand Down Expand Up @@ -1321,7 +1321,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
Gateway,
RetryPolicy,
Url,
AuthInfo,
Credentials,
Pattern,
PatternVariant,
ES3PatternType::Wildcard));
Expand Down Expand Up @@ -1385,10 +1385,11 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
<< pathIndex);

TActorId actorId;
const auto& authInfo = Credentials.GetAuthInfo();
auto stuff = std::make_shared<TRetryStuff>(
Gateway,
Url + object.GetPath(),
IHTTPGateway::MakeYcHeaders(requestId, AuthInfo.GetToken(), {}, AuthInfo.GetAwsUserPwd(), AuthInfo.GetAwsSigV4()),
IHTTPGateway::MakeYcHeaders(requestId, authInfo.GetToken(), {}, authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()),
object.GetSize(),
TxId,
requestId,
Expand Down Expand Up @@ -1786,7 +1787,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy;

const TString Url;
const TS3Credentials::TAuthInfo AuthInfo;
const TS3Credentials Credentials;
const TString Pattern;
const ES3PatternVariant PatternVariant;
TPathList Paths;
Expand Down Expand Up @@ -2000,7 +2001,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
ReadPathsList(params, taskParams, readRanges, paths);

const auto token = secureParams.Value(params.GetToken(), TString{});
const auto authInfo = GetAuthInfo(credentialsFactory, token);
const TS3Credentials credentials(credentialsFactory, token);

const auto& settings = params.GetSettings();
TString pathPattern = "*";
Expand Down Expand Up @@ -2178,7 +2179,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
sizeLimit = FromString<ui64>(it->second);
}

const auto actor = new TS3StreamReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), authInfo, pathPattern, pathPatternVariant,
const auto actor = new TS3StreamReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), credentials, pathPattern, pathPatternVariant,
std::move(paths), addPathIndex, readSpec, computeActorId, retryPolicy,
cfg, counters, taskCounters, fileSizeLimit, sizeLimit, rowsLimitHint, memoryQuotaManager,
params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta,
Expand All @@ -2190,7 +2191,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
if (const auto it = settings.find("sizeLimit"); settings.cend() != it)
sizeLimit = FromString<ui64>(it->second);

return CreateRawReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), authInfo, pathPattern, pathPatternVariant,
return CreateRawReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), credentials, pathPattern, pathPatternVariant,
std::move(paths), addPathIndex, computeActorId, sizeLimit, retryPolicy,
cfg, counters, taskCounters, fileSizeLimit, rowsLimitHint,
params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta);
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ NActors::IActor* CreateS3FileQueueActor(
IHTTPGateway::TPtr gateway,
IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
TString url,
TS3Credentials::TAuthInfo authInfo,
TS3Credentials credentials,
TString pattern,
NYql::NS3Lister::ES3PatternVariant patternVariant,
NS3Lister::ES3PatternType patternType);
Expand Down
12 changes: 6 additions & 6 deletions ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
IHTTPGateway::TPtr gateway,
IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
TString url,
TS3Credentials::TAuthInfo authInfo,
TS3Credentials credentials,
TString pattern,
NS3Lister::ES3PatternVariant patternVariant,
NS3Lister::ES3PatternType patternType)
Expand All @@ -189,7 +189,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
, Gateway(std::move(gateway))
, RetryPolicy(std::move(retryPolicy))
, Url(std::move(url))
, AuthInfo(std::move(authInfo))
, Credentials(std::move(credentials))
, Pattern(std::move(pattern))
, PatternVariant(patternVariant)
, PatternType(patternType) {
Expand Down Expand Up @@ -493,7 +493,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
RetryPolicy,
NS3Lister::TListingRequest{
Url,
AuthInfo,
Credentials,
PatternVariant == NS3Lister::ES3PatternVariant::PathPattern
? Pattern
: TStringBuilder{} << object.GetPath() << Pattern,
Expand Down Expand Up @@ -616,7 +616,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
const IHTTPGateway::TPtr Gateway;
const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy;
const TString Url;
const TS3Credentials::TAuthInfo AuthInfo;
const TS3Credentials Credentials;
const TString Pattern;
const NS3Lister::ES3PatternVariant PatternVariant;
const NS3Lister::ES3PatternType PatternType;
Expand All @@ -638,7 +638,7 @@ NActors::IActor* CreateS3FileQueueActor(
IHTTPGateway::TPtr gateway,
IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
TString url,
TS3Credentials::TAuthInfo authInfo,
TS3Credentials credentials,
TString pattern,
NS3Lister::ES3PatternVariant patternVariant,
NS3Lister::ES3PatternType patternType) {
Expand All @@ -655,7 +655,7 @@ NActors::IActor* CreateS3FileQueueActor(
gateway,
retryPolicy,
url,
authInfo,
credentials,
pattern,
patternVariant,
patternType
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/s3/actors/yql_s3_source_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ NActors::IActor* CreateS3FileQueueActor(
IHTTPGateway::TPtr gateway,
IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
TString url,
TS3Credentials::TAuthInfo authInfo,
TS3Credentials credentials,
TString pattern,
NYql::NS3Lister::ES3PatternVariant patternVariant,
NS3Lister::ES3PatternType patternType);
Expand Down
21 changes: 18 additions & 3 deletions ydb/library/yql/providers/s3/credentials/credentials.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
namespace NYql {

TS3Credentials::TS3Credentials(ISecuredServiceAccountCredentialsFactory::TPtr factory, const TString& structuredTokenJson, bool addBearerToToken)
: StructuredTokenJson(structuredTokenJson)
{
if (NYql::IsStructuredTokenJson(structuredTokenJson)) {
NYql::TStructuredTokenParser parser = NYql::CreateStructuredTokenParser(structuredTokenJson);
Expand All @@ -23,17 +24,31 @@ TS3Credentials::TS3Credentials(ISecuredServiceAccountCredentialsFactory::TPtr fa
}
}

auto providerFactory = CreateCredentialsProviderFactoryForStructuredToken(factory, structuredTokenJson, addBearerToToken);
CredentialsProvider = providerFactory->CreateProvider();
CredentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(factory, structuredTokenJson, addBearerToToken);
}

TS3Credentials::TAuthInfo TS3Credentials::GetAuthInfo() const {
if (CredentialsProvider) {
if (CredentialsProviderFactory) {
if (Y_UNLIKELY(!CredentialsProvider)) {
// Heavy operation, BLOCKs thread until TA reply
CredentialsProvider = CredentialsProviderFactory->CreateProvider();
}
return TS3Credentials::TAuthInfo{.Token = CredentialsProvider->GetAuthInfo()};
}
return AuthInfo;
}

bool TS3Credentials::operator<(const TS3Credentials& other) const {
return StructuredTokenJson < other.StructuredTokenJson;
}

IOutputStream& operator<<(IOutputStream& stream, const TS3Credentials& credentials) {
const auto& authInfo = credentials.AuthInfo;
return stream << "TS3Credentials{.ServiceAccountAuth=" << static_cast<bool>(credentials.CredentialsProviderFactory)
<< ",.AwsUserPwd=<some token with length" << authInfo.GetAwsUserPwd().length() << ">"
<< ",.AwsSigV4=<some sig with length" << authInfo.GetAwsSigV4().length() << ">}";
}

// string value after AWS prefix should be suitable for passing it to curl as CURLOPT_USERPWD, see details here:
// https://curl.se/libcurl/c/CURLOPT_AWS_SIGV4.html
// CURLOPT_USERPWD = "MY_ACCESS_KEY:MY_SECRET_KEY"
Expand Down
10 changes: 9 additions & 1 deletion ydb/library/yql/providers/s3/credentials/credentials.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,23 @@ struct TS3Credentials {
TString AwsRegion;
};

TS3Credentials() = default;
TS3Credentials(ISecuredServiceAccountCredentialsFactory::TPtr factory, const TString& structuredTokenJson, bool addBearerToToken = false);

TAuthInfo GetAuthInfo() const;

bool operator<(const TS3Credentials& other) const;
friend IOutputStream& operator<<(IOutputStream& stream, const TS3Credentials& credentials);

private:
NYdb::TCredentialsProviderPtr CredentialsProvider;
TString StructuredTokenJson;
std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory;
mutable NYdb::TCredentialsProviderPtr CredentialsProvider;
TS3Credentials::TAuthInfo AuthInfo;
};

IOutputStream& operator<<(IOutputStream& stream, const TS3Credentials& credentials);

TS3Credentials::TAuthInfo GetAuthInfo(ISecuredServiceAccountCredentialsFactory::TPtr factory, const TString& structuredTokenJson);

}
7 changes: 3 additions & 4 deletions ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ IOutputStream& operator<<(IOutputStream& stream, const TListingRequest& request)
<< ",.Prefix=" << request.Prefix
<< ",.Pattern=" << request.Pattern
<< ",.PatternType=" << request.PatternType
<< ",.AwsUserPwd=<some token with length" << request.AuthInfo.GetAwsUserPwd().length() << ">"
<< ",.AwsSigV4=" << request.AuthInfo.GetAwsSigV4().length()
<< ",.Token=<some token with length " << request.AuthInfo.GetToken().length() << ">}";
<< ",.Credentials=" << request.Credentials << "}";
}

namespace {
Expand Down Expand Up @@ -287,7 +285,8 @@ class TS3Lister : public IS3Lister {
~TS3Lister() override = default;
private:
static void SubmitRequestIntoGateway(TListingContext& ctx) {
IHTTPGateway::THeaders headers = IHTTPGateway::MakeYcHeaders(ctx.RequestId, ctx.ListingRequest.AuthInfo.GetToken(), {}, ctx.ListingRequest.AuthInfo.GetAwsUserPwd(), ctx.ListingRequest.AuthInfo.GetAwsSigV4());
const auto& authInfo = ctx.ListingRequest.Credentials.GetAuthInfo();
IHTTPGateway::THeaders headers = IHTTPGateway::MakeYcHeaders(ctx.RequestId, authInfo.GetToken(), {}, authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4());

// We have to sort the cgi parameters for the correct aws signature
// This requirement will be fixed in the curl library
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ using TListResult = std::variant<TListEntries, TListError>;

struct TListingRequest {
TString Url;
TS3Credentials::TAuthInfo AuthInfo;
TS3Credentials Credentials;
TString Pattern;
ES3PatternType PatternType = ES3PatternType::Wildcard;
TString Prefix;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ class TS3DqIntegration: public TDqIntegrationBase {
State_->Gateway,
State_->GatewayRetryPolicy,
connect.Url,
GetAuthInfo(State_->CredentialsFactory, State_->Configuration->Tokens.at(cluster)),
TS3Credentials(State_->CredentialsFactory, State_->Configuration->Tokens.at(cluster)),
pathPattern,
pathPatternVariant,
NS3Lister::ES3PatternType::Wildcard
Expand Down
Loading

0 comments on commit 1abaac1

Please sign in to comment.