Skip to content

Commit

Permalink
Merge 33e4f7f into 0ef6729
Browse files Browse the repository at this point in the history
  • Loading branch information
ildar-khisambeev authored Apr 12, 2024
2 parents 0ef6729 + 33e4f7f commit a7bd9a4
Show file tree
Hide file tree
Showing 14 changed files with 88 additions and 78 deletions.
24 changes: 2 additions & 22 deletions ydb/core/client/server/grpc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@

#include <util/string/join.h>

#include <google/protobuf/text_format.h>

#include <grpc++/resource_quota.h>
#include <grpc++/security/server_credentials.h>
#include <grpc++/server_builder.h>
Expand Down Expand Up @@ -266,15 +264,8 @@ class TSimpleRequest
}

void Finish(const TOut& resp, ui32 status) {
auto makeResponseString = [&] {
TString x;
google::protobuf::TextFormat::Printer printer;
printer.SetSingleLineMode(true);
printer.PrintToString(resp, &x);
return x;
};
LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] issuing response Name# %s data# %s peer# %s", this,
Name, makeResponseString().data(), GetPeerName().c_str());
Name, NYdbGrpc::FormatMessage<TOut>(resp).data(), GetPeerName().c_str());
ResponseSize = resp.ByteSize();
ResponseStatus = status;
StateFunc = &TSimpleRequest::FinishDone;
Expand All @@ -300,19 +291,8 @@ class TSimpleRequest
bool RequestDone(bool ok) {
OnAfterCall();

auto makeRequestString = [&] {
TString resp;
if (ok) {
google::protobuf::TextFormat::Printer printer;
printer.SetSingleLineMode(true);
printer.PrintToString(Request, &resp);
} else {
resp = "<not ok>";
}
return resp;
};
LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] received request Name# %s ok# %s data# %s peer# %s current inflight# %li", this,
Name, ok ? "true" : "false", makeRequestString().data(), GetPeerName().c_str(), Server->GetCurrentInFlight());
Name, ok ? "true" : "false", NYdbGrpc::FormatMessage<TIn>(Request, ok).data(), GetPeerName().c_str(), Server->GetCurrentInFlight());

if (Context.c_call() == nullptr) {
Y_ABORT_UNLESS(!ok);
Expand Down
27 changes: 3 additions & 24 deletions ydb/core/grpc_streaming/grpc_streaming.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

#include <contrib/libs/grpc/include/grpcpp/support/async_stream.h>
#include <contrib/libs/grpc/include/grpcpp/support/async_unary_call.h>
#include <google/protobuf/text_format.h>

#include <atomic>

Expand Down Expand Up @@ -347,22 +346,10 @@ class TGRpcStreamingRequest final
}

void OnReadDone(NYdbGrpc::EQueueEventStatus status) {
auto dumpResultText = [&] {
TString text;
if (status == NYdbGrpc::EQueueEventStatus::OK) {
google::protobuf::TextFormat::Printer printer;
printer.SetSingleLineMode(true);
printer.PrintToString(ReadInProgress->Record, &text);
} else {
text = "<not ok>";
}
return text;
};

LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] read finished Name# %s ok# %s data# %s peer# %s",
this, Name,
status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false",
dumpResultText().c_str(),
NYdbGrpc::FormatMessage<TIn>(ReadInProgress->Record, status == NYdbGrpc::EQueueEventStatus::OK).c_str(),
this->GetPeerName().c_str());

// Take current in-progress read first
Expand Down Expand Up @@ -400,25 +387,17 @@ class TGRpcStreamingRequest final
}

bool Write(TOut&& message, const grpc::WriteOptions& options = { }, const grpc::Status* status = nullptr) {
auto dumpMessageText = [&] {
TString text;
google::protobuf::TextFormat::Printer printer;
printer.SetSingleLineMode(true);
printer.PrintToString(message, &text);
return text;
};

if (status) {
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade write Name# %s data# %s peer# %s grpc status# (%d) message# %s",
this, Name,
dumpMessageText().c_str(),
NYdbGrpc::FormatMessage<TOut>(message).c_str(),
this->GetPeerName().c_str(),
static_cast<int>(status->error_code()),
status->error_message().c_str());
} else {
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade write Name# %s data# %s peer# %s",
this, Name,
dumpMessageText().c_str(),
NYdbGrpc::FormatMessage<TOut>(message).c_str(),
this->GetPeerName().c_str());
}

Expand Down
11 changes: 10 additions & 1 deletion ydb/core/persqueue/partition_monitoring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <ydb/core/protos/counters_pq.pb.h>
#include <ydb/core/protos/msgbus.pb.h>
#include <ydb/library/persqueue/topic_parser/topic_parser.h>
#include <ydb/library/protobuf_printer/security_printer.h>
#include <ydb/public/lib/base/msgbus.h>
#include <library/cpp/html/pcdata/pcdata.h>
#include <library/cpp/monlib/service/pages/templates.h>
Expand All @@ -22,6 +23,14 @@

namespace NKikimr::NPQ {

TString PrintConfig(const NKikimrPQ::TPQTabletConfig& cfg) {
TSecurityTextFormatPrinter<NKikimrPQ::TPQTabletConfig> printer;
printer.SetSingleLineMode(true);
TString string;
printer.PrintToString(cfg, &string);
return string;
}

void HtmlOutput(IOutputStream& out, const TString& line, const std::deque<std::pair<TKey, ui32>>& keys) {
HTML(out) {
TABLE() {
Expand Down Expand Up @@ -109,7 +118,7 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo
out << "AvgWriteSize per " << avg.GetDuration().ToString() << " is " << avg.GetValue() << " bytes";
res.push_back(out.Str()); out.Clear();
}
out << Config.DebugString(); res.push_back(out.Str()); out.Clear();
out << PrintConfig(Config); res.push_back(out.Str()); out.Clear();
HTML(out) {
DIV_CLASS_ID("tab-pane fade", Sprintf("partition_%u", ui32(Partition))) {
TABLE_SORTABLE_CLASS("table") {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/persqueue/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ PEERDIR(
ydb/library/logger
ydb/library/persqueue/counter_time_keeper
ydb/library/persqueue/topic_parser
ydb/library/protobuf_printer
ydb/public/lib/base
ydb/public/sdk/cpp/client/ydb_persqueue_core
)
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/protos/flat_tx_scheme.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import "ydb/core/protos/bind_channel_storage_pool.proto";
import "ydb/core/protos/flat_scheme_op.proto";
import "ydb/public/api/protos/ydb_cms.proto";
import "ydb/public/api/protos/ydb_issue_message.proto";
import "ydb/public/api/protos/annotations/sensitive.proto";

package NKikimrScheme;
option java_package = "ru.yandex.kikimr.proto";
Expand Down Expand Up @@ -53,7 +54,7 @@ message TEvModifySchemeTransaction {
optional uint64 TabletId = 3;
optional string Owner = 5;
optional bool FailOnExist = 6; // depricated, TModifyScheme.FailOnExist is recomended
optional string UserToken = 7; // serialized NACLib::TUserToken
optional string UserToken = 7 [(Ydb.sensitive) = true]; // serialized NACLib::TUserToken
optional string PeerName = 8;
}

Expand Down
8 changes: 5 additions & 3 deletions ydb/core/protos/pqconfig.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import "ydb/public/api/protos/draft/persqueue_error_codes.proto";
import "ydb/public/api/protos/draft/persqueue_common.proto";

import "ydb/public/api/protos/annotations/sensitive.proto";

import "ydb/core/protos/base.proto";
import "ydb/core/protos/msgbus_kv.proto";
import "ydb/core/protos/node_limits.proto";
Expand Down Expand Up @@ -208,11 +210,11 @@ message TMirrorPartitionConfig {
message TCredentials {
message IamCredentials {
optional string Endpoint = 1;
optional string ServiceAccountKey = 2;
optional string ServiceAccountKey = 2 [(Ydb.sensitive) = true];
}
oneof Credentials {
string OauthToken = 1;
string JwtParams = 2;
string OauthToken = 1 [(Ydb.sensitive) = true];
string JwtParams = 2 [(Ydb.sensitive) = true];
IamCredentials Iam = 3;
}
}
Expand Down
16 changes: 13 additions & 3 deletions ydb/core/tx/schemeshard/schemeshard__operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include <ydb/core/tablet_flat/flat_cxx_database.h>
#include <ydb/core/tablet_flat/tablet_flat_executor.h>

#include <ydb/library/protobuf_printer/security_printer.h>

#include <util/generic/algorithm.h>

namespace NKikimr::NSchemeShard {
Expand Down Expand Up @@ -85,6 +87,14 @@ NKikimrScheme::TEvModifySchemeTransaction GetRecordForPrint(const NKikimrScheme:
return recordForPrint;
}

TString PrintSecurely(const NKikimrScheme::TEvModifySchemeTransaction& record) {
TSecurityTextFormatPrinter<NKikimrScheme::TEvModifySchemeTransaction> printer;
printer.SetSingleLineMode(true);
TString string;
printer.PrintToString(record, &string);
return string;
}

THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request, TOperationContext& context) {
THolder<TProposeResponse> response = nullptr;

Expand Down Expand Up @@ -183,7 +193,7 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
<< ", already accepted parts: " << operation->Parts.size()
<< ", propose result status: " << NKikimrScheme::EStatus_Name(response->Record.GetStatus())
<< ", with reason: " << response->Record.GetReason()
<< ", tx message: " << GetRecordForPrint(record).ShortDebugString());
<< ", tx message: " << PrintSecurely(record));
}

Y_VERIFY_S(context.IsUndoChangesSafe(),
Expand All @@ -194,7 +204,7 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
<< ", already accepted parts: " << operation->Parts.size()
<< ", propose result status: " << NKikimrScheme::EStatus_Name(response->Record.GetStatus())
<< ", with reason: " << response->Record.GetReason()
<< ", tx message: " << GetRecordForPrint(record).ShortDebugString());
<< ", tx message: " << PrintSecurely(record));

context.OnComplete = {}; // recreate
context.DbChanges = {};
Expand Down Expand Up @@ -237,7 +247,7 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti

LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"TTxOperationPropose Execute"
<< ", message: " << GetRecordForPrint(Request->Get()->Record).ShortDebugString()
<< ", message: " << PrintSecurely(Request->Get()->Record)
<< ", at schemeshard: " << selfId);

txc.DB.NoMoreReadsForTx();
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ PEERDIR(
ydb/library/aclib/protos
ydb/library/login
ydb/library/login/protos
ydb/library/protobuf_printer
ydb/library/yql/minikql
ydb/services/bg_tasks
)
Expand Down
1 change: 1 addition & 0 deletions ydb/library/grpc/server/actors/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ SRCS(

PEERDIR(
ydb/library/actors/core
ydb/library/grpc/server
)

END()
23 changes: 23 additions & 0 deletions ydb/library/grpc/server/logger.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
#pragma once

#include <ydb/library/protobuf_printer/security_printer.h>

#include <library/cpp/logger/priority.h>

#include <util/generic/ptr.h>
#include <util/system/env.h>


namespace NYdbGrpc {

static bool LogBodyEnabled = "BODY" == GetEnv("YDB_GRPC_SERVER_LOGGING");

class TLogger: public TThrRefBase {
protected:
TLogger() = default;
Expand Down Expand Up @@ -40,4 +46,21 @@ using TLoggerPtr = TIntrusivePtr<TLogger>;
logger->Write(ELogPriority::TLOG_INFO, format, __VA_ARGS__); \
} else { }

template <typename TMsg>
inline TString FormatMessage(const TMsg& message, bool ok = true) {
if (ok) {
if (LogBodyEnabled) {
TString text;
NKikimr::TSecurityTextFormatPrinter<TMsg> printer;
printer.SetSingleLineMode(true);
printer.PrintToString(message, &text);
return text;
} else {
return "<hidden>";
}
} else {
return "<not ok>";
}
}

} // namespace NYdbGrpc
2 changes: 1 addition & 1 deletion ydb/library/grpc/server/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ SRCS(
GENERATE_ENUM_SERIALIZATION(grpc_request_base.h)

PEERDIR(
ydb/library/protobuf_printer
contrib/libs/grpc
library/cpp/monlib/dynamic_counters/percentile
)

END()

RECURSE_FOR_TESTS(ut)

7 changes: 4 additions & 3 deletions ydb/public/api/protos/draft/persqueue_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ syntax = "proto3";
import "google/protobuf/descriptor.proto";
import "ydb/public/api/protos/draft/persqueue_error_codes.proto";

import "ydb/public/api/protos/annotations/sensitive.proto";

package NPersQueueCommon;

option java_package = "com.yandex.ydb.persqueue";
Expand Down Expand Up @@ -35,8 +37,7 @@ enum ECodec {

message Credentials {
oneof credentials {
bytes tvm_service_ticket = 1;
bytes oauth_token = 2;
bytes tvm_service_ticket = 1 [(Ydb.sensitive) = true];
bytes oauth_token = 2 [(Ydb.sensitive) = true];
}
}

9 changes: 5 additions & 4 deletions ydb/public/api/protos/ydb_persqueue_v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import "ydb/public/api/protos/ydb_operation.proto";
import "ydb/public/api/protos/ydb_scheme.proto";
import "ydb/public/api/protos/ydb_status_codes.proto";
import "ydb/public/api/protos/ydb_issue_message.proto";
import "ydb/public/api/protos/annotations/sensitive.proto";
import "ydb/public/api/protos/annotations/validation.proto";

package Ydb.PersQueue.V1;
Expand Down Expand Up @@ -38,7 +39,7 @@ message OffsetsRange {

// In-session reauthentication and reauthorization, lets user increase session lifetime. You should wait for 'update_token_response' before sending next 'update_token_request'.
message UpdateTokenRequest {
string token = 1;
string token = 1 [(Ydb.sensitive) = true];
}

message UpdateTokenResponse {
Expand Down Expand Up @@ -788,7 +789,7 @@ message MigrationStreamingReadClientMessage {
}

// User credentials if update is needed or empty string.
bytes token = 20;
bytes token = 20 [(Ydb.sensitive) = true];
}

/**
Expand Down Expand Up @@ -1073,8 +1074,8 @@ message Credentials {
string service_account_key = 2;
}
oneof credentials {
string oauth_token = 1;
string jwt_params = 2;
string oauth_token = 1 [(Ydb.sensitive) = true];
string jwt_params = 2 [(Ydb.sensitive) = true];
Iam iam = 3;
}
}
Expand Down
Loading

0 comments on commit a7bd9a4

Please sign in to comment.