Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move statistics messages #5820

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions ydb/core/protos/statistics.proto
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import "ydb/core/scheme/protos/pathid.proto";
import "ydb/core/protos/data_events.proto";

package NKikimrStat;

Expand Down Expand Up @@ -89,3 +90,33 @@ message TEvGetScanStatusResponse {
}
optional EStatus Status = 1;
}

message TEvStatisticsRequest {
optional NKikimrDataEvents.TTableId TableId = 1;
optional bytes StartKey = 2;
// list of columns to gather statistics from. Empty means asking for every column.
repeated uint32 ColumnTags = 3;
// list of statistics types requested. Empty means asking for all available.
repeated uint32 Types = 4;
}

message TEvStatisticsResponse {
message TStatistic {
optional uint32 Type = 1;
optional bytes Data = 2;
}
message TColumn {
optional uint32 Tag = 1;
repeated TStatistic Statistics = 2;
}
repeated TColumn Columns = 1;

enum EStatus {
SUCCESS = 1;
ABORTED = 2;
ERROR = 3;
}
optional EStatus Status = 2;

optional fixed64 ShardTabletId = 3;
}
1 change: 1 addition & 0 deletions ydb/core/protos/tx_columnshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import "ydb/core/tx/columnshard/engines/scheme/statistics/protos/data.proto";
import "ydb/core/tx/columnshard/engines/protos/portion_info.proto";
import "ydb/core/protos/flat_scheme_op.proto";
import "ydb/core/protos/long_tx_service.proto";
import "ydb/core/protos/statistics.proto";
import "ydb/core/protos/subdomains.proto";
import "ydb/core/protos/tx.proto";
import "ydb/core/formats/arrow/protos/ssa.proto";
Expand Down
29 changes: 2 additions & 27 deletions ydb/core/protos/tx_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import "ydb/core/protos/tablet.proto";
import "ydb/core/protos/tx.proto";
import "ydb/core/protos/flat_scheme_op.proto";
import "ydb/core/protos/table_stats.proto";
import "ydb/core/protos/statistics.proto";
import "ydb/core/protos/subdomains.proto";
import "ydb/core/protos/query_stats.proto";
import "ydb/public/api/protos/ydb_issue_message.proto";
Expand Down Expand Up @@ -815,7 +816,7 @@ message TObjectStorageListingFilter {
}
optional bytes Values = 1;
repeated uint32 Columns = 2;
repeated EMatchType MatchTypes = 3;
repeated EMatchType MatchTypes = 3;
}

message TEvObjectStorageListingRequest {
Expand Down Expand Up @@ -1492,32 +1493,6 @@ message TEvCdcStreamScanResponse {
optional TStats Stats = 6;
}

message TEvStatisticsScanRequest {
optional NKikimrDataEvents.TTableId TableId = 1;
optional bytes StartKey = 2;
}

message TEvStatisticsScanResponse {
message TStatistic {
optional uint32 Type = 1;
optional bytes Data = 2;
}
message TColumn {
optional uint32 Tag = 1;
repeated TStatistic Statistics = 2;
}
repeated TColumn Columns = 1;

enum EStatus {
SUCCESS = 1;
ABORTED = 2;
ERROR = 3;
}
optional EStatus Status = 2;

optional fixed64 ShardTabletId = 3;
}

message TComputeShardingPolicy {
repeated string ColumnNames = 1;
optional uint32 ShardsCount = 2 [default = 0];
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/statistics/aggregator/aggregator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ void TStatisticsAggregator::NextRange() {
}

auto& range = ShardRanges.front();
auto request = std::make_unique<TEvDataShard::TEvStatisticsScanRequest>();
auto request = std::make_unique<NStat::TEvStatistics::TEvStatisticsRequest>();
auto& record = request->Record;
record.MutableTableId()->SetOwnerId(ScanTableId.PathId.OwnerId);
record.MutableTableId()->SetTableId(ScanTableId.PathId.LocalPathId);
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/statistics/aggregator/aggregator_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
void Handle(TEvStatistics::TEvScanTable::TPtr& ev);
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev);
void Handle(TEvDataShard::TEvStatisticsScanResponse::TPtr& ev);
void Handle(NStat::TEvStatistics::TEvStatisticsResponse::TPtr& ev);
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev);
void Handle(TEvStatistics::TEvStatTableCreationResponse::TPtr& ev);
void Handle(TEvStatistics::TEvSaveStatisticsQueryResponse::TPtr& ev);
Expand Down Expand Up @@ -157,7 +157,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
hFunc(TEvStatistics::TEvScanTable, Handle);
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, Handle);
hFunc(TEvDataShard::TEvStatisticsScanResponse, Handle);
hFunc(NStat::TEvStatistics::TEvStatisticsResponse, Handle);
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
hFunc(TEvStatistics::TEvStatTableCreationResponse, Handle);
hFunc(TEvStatistics::TEvSaveStatisticsQueryResponse, Handle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
namespace NKikimr::NStat {

struct TStatisticsAggregator::TTxStatisticsScanResponse : public TTxBase {
NKikimrTxDataShard::TEvStatisticsScanResponse Record;
NKikimrStat::TEvStatisticsResponse Record;
bool IsCorrectShardId = false;

TTxStatisticsScanResponse(TSelf* self, NKikimrTxDataShard::TEvStatisticsScanResponse&& record)
TTxStatisticsScanResponse(TSelf* self, NKikimrStat::TEvStatisticsResponse&& record)
: TTxBase(self)
, Record(std::move(record))
{}
Expand Down Expand Up @@ -75,7 +75,7 @@ struct TStatisticsAggregator::TTxStatisticsScanResponse : public TTxBase {
}
};

void TStatisticsAggregator::Handle(TEvDataShard::TEvStatisticsScanResponse::TPtr& ev) {
void TStatisticsAggregator::Handle(NStat::TEvStatistics::TEvStatisticsResponse::TPtr& ev) {
auto& record = ev->Get()->Record;
Execute(new TTxStatisticsScanResponse(this, std::move(record)),
TActivationContext::AsActorContext());
Expand Down
16 changes: 15 additions & 1 deletion ydb/core/statistics/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ struct TEvStatistics {
EvGetScanStatus,
EvGetScanStatusResponse,

EvStatisticsRequest,
EvStatisticsResponse,

EvEnd
};

Expand All @@ -87,7 +90,7 @@ struct TEvStatistics {
bool Success = true;
std::vector<TResponse> StatResponses;
};

struct TEvConfigureAggregator : public TEventPB<
TEvConfigureAggregator,
NKikimrStat::TEvConfigureAggregator,
Expand Down Expand Up @@ -202,6 +205,17 @@ struct TEvStatistics {
EvGetScanStatusResponse>
{};

struct TEvStatisticsRequest : public TEventPB<
TEvStatisticsRequest,
NKikimrStat::TEvStatisticsRequest,
EvStatisticsRequest>
{};

struct TEvStatisticsResponse : public TEventPB<
TEvStatisticsResponse,
NKikimrStat::TEvStatisticsResponse,
EvStatisticsResponse>
{};
};

} // NStat
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "blob.h"
#include "common/snapshot.h"

#include <ydb/core/protos/statistics.pb.h>
#include <ydb/core/protos/tx_columnshard.pb.h>
#include <ydb/core/tx/tx.h>
#include <ydb/core/tx/message_seqno.h>
Expand Down Expand Up @@ -289,7 +290,6 @@ struct TEvColumnShard {
};

using TEvScan = TEvDataShard::TEvKqpScan;

};

inline auto& Proto(TEvColumnShard::TEvProposeTransaction* ev) {
Expand Down
18 changes: 18 additions & 0 deletions ydb/core/tx/columnshard/columnshard__statistics.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#include "columnshard.h"
#include "columnshard_impl.h"

#include <ydb/core/protos/kqp.pb.h>

namespace NKikimr::NColumnShard {

void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev, const TActorContext&) {
auto response = std::make_unique<NStat::TEvStatistics::TEvStatisticsResponse>();
auto& record = response->Record;
record.SetShardTabletId(TabletID());

record.SetStatus(NKikimrStat::TEvStatisticsResponse::SUCCESS);

Send(ev->Sender, response.release(), 0, ev->Cookie);
}

}
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "subscriber/abstract/manager/manager.h"

#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/statistics/events.h>
#include <ydb/core/tablet/tablet_counters.h>
#include <ydb/core/tablet/tablet_pipe_client_cache.h>
#include <ydb/core/tablet_flat/flat_cxx_database.h>
Expand Down Expand Up @@ -223,6 +224,8 @@ class TColumnShard
void Handle(TEvPrivate::TEvTieringModified::TPtr& ev, const TActorContext&);
void Handle(TEvPrivate::TEvNormalizerResult::TPtr& ev, const TActorContext&);

void Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev, const TActorContext& ctx);

void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev, const TActorContext&);

void Handle(NOlap::NBlobOperations::NEvents::TEvDeleteSharedBlobs::TPtr& ev, const TActorContext& ctx);
Expand Down Expand Up @@ -377,6 +380,8 @@ class TColumnShard
HFunc(TEvPrivate::TEvGarbageCollectionFinished, Handle);
HFunc(TEvPrivate::TEvTieringModified, Handle);

HFunc(NStat::TEvStatistics::TEvStatisticsRequest, Handle);

HFunc(NActors::TEvents::TEvUndelivered, Handle);

HFunc(NOlap::NBlobOperations::NEvents::TEvDeleteSharedBlobs, Handle);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ SRCS(
columnshard__propose_cancel.cpp
columnshard__propose_transaction.cpp
columnshard__scan.cpp
columnshard__statistics.cpp
columnshard__write.cpp
columnshard__write_index.cpp
columnshard.cpp
Expand Down
19 changes: 1 addition & 18 deletions ydb/core/tx/datashard/datashard.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,6 @@ struct TEvDataShard {
EvOverloadReady,
EvOverloadUnsubscribe,

EvStatisticsScanRequest,
EvStatisticsScanResponse,

EvEnd
};

Expand Down Expand Up @@ -647,7 +644,7 @@ struct TEvDataShard {
TString result;
TStringOutput out(result);
for (ui32 i = 0; i < Record.ErrorSize(); ++i) {
out << Record.GetError(i).GetKind() << " ("
out << Record.GetError(i).GetKind() << " ("
<< (Record.GetError(i).HasReason() ? Record.GetError(i).GetReason() : "no reason")
<< ") |";
}
Expand Down Expand Up @@ -1704,20 +1701,6 @@ struct TEvDataShard {
Record.SetErrorDescription(error);
}
};

struct TEvStatisticsScanRequest
: public TEventPB<TEvStatisticsScanRequest,
NKikimrTxDataShard::TEvStatisticsScanRequest,
EvStatisticsScanRequest>
{
};

struct TEvStatisticsScanResponse
: public TEventPB<TEvStatisticsScanResponse,
NKikimrTxDataShard::TEvStatisticsScanResponse,
EvStatisticsScanResponse>
{
};
};

IActor* CreateDataShard(const TActorId &tablet, TTabletStorageInfo *info);
Expand Down
20 changes: 10 additions & 10 deletions ydb/core/tx/datashard/datashard__column_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,18 @@ class TStatisticsScan: public NTable::IScan {
}

TAutoPtr<IDestructable> Finish(EAbort abort) noexcept override {
auto response = std::make_unique<TEvDataShard::TEvStatisticsScanResponse>();
auto response = std::make_unique<NStat::TEvStatistics::TEvStatisticsResponse>();
auto& record = response->Record;
record.SetShardTabletId(ShardTabletId);

if (abort != EAbort::None) {
record.SetStatus(NKikimrTxDataShard::TEvStatisticsScanResponse::ABORTED);
record.SetStatus(NKikimrStat::TEvStatisticsResponse::ABORTED);
TlsActivationContext->Send(new IEventHandle(ReplyTo, TActorId(), response.release(), 0, Cookie));
delete this;
return nullptr;
}

record.SetStatus(NKikimrTxDataShard::TEvStatisticsScanResponse::SUCCESS);
record.SetStatus(NKikimrStat::TEvStatisticsResponse::SUCCESS);
auto tags = Scheme->Tags();
for (size_t t = 0; t < tags.size(); ++t) {
auto* column = record.AddColumns();
Expand Down Expand Up @@ -100,7 +100,7 @@ class TStatisticsScan: public NTable::IScan {

class TDataShard::TTxHandleSafeStatisticsScan : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
public:
TTxHandleSafeStatisticsScan(TDataShard* self, TEvDataShard::TEvStatisticsScanRequest::TPtr&& ev)
TTxHandleSafeStatisticsScan(TDataShard* self, NStat::TEvStatistics::TEvStatisticsRequest::TPtr&& ev)
: TTransactionBase(self)
, Ev(std::move(ev))
{}
Expand All @@ -114,10 +114,10 @@ class TDataShard::TTxHandleSafeStatisticsScan : public NTabletFlatExecutor::TTra
}

private:
TEvDataShard::TEvStatisticsScanRequest::TPtr Ev;
NStat::TEvStatistics::TEvStatisticsRequest::TPtr Ev;
};

void TDataShard::Handle(TEvDataShard::TEvStatisticsScanRequest::TPtr& ev, const TActorContext&) {
void TDataShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev, const TActorContext&) {
Execute(new TTxHandleSafeStatisticsScan(this, std::move(ev)));
}

Expand All @@ -126,22 +126,22 @@ void TDataShard::Handle(TEvPrivate::TEvStatisticsScanFinished::TPtr&, const TAct
StatisticsScanId = 0;
}

void TDataShard::HandleSafe(TEvDataShard::TEvStatisticsScanRequest::TPtr& ev, const TActorContext&) {
void TDataShard::HandleSafe(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev, const TActorContext&) {
const auto& record = ev->Get()->Record;

auto response = std::make_unique<TEvDataShard::TEvStatisticsScanResponse>();
auto response = std::make_unique<NStat::TEvStatistics::TEvStatisticsResponse>();
response->Record.SetShardTabletId(TabletID());

const auto& tableId = record.GetTableId();
if (PathOwnerId != tableId.GetOwnerId()) {
response->Record.SetStatus(NKikimrTxDataShard::TEvStatisticsScanResponse::ERROR);
response->Record.SetStatus(NKikimrStat::TEvStatisticsResponse::ERROR);
Send(ev->Sender, response.release(), 0, ev->Cookie);
return;
}

auto infoIt = TableInfos.find(tableId.GetTableId());
if (infoIt == TableInfos.end()) {
response->Record.SetStatus(NKikimrTxDataShard::TEvStatisticsScanResponse::ERROR);
response->Record.SetStatus(NKikimrStat::TEvStatisticsResponse::ERROR);
Send(ev->Sender, response.release(), 0, ev->Cookie);
return;
}
Expand Down
Loading
Loading