Skip to content

Commit

Permalink
signals about long_tx writing (ydb-platform#8407)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Sep 9, 2024
1 parent 3ed7f54 commit c42a9a3
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 15 deletions.
8 changes: 4 additions & 4 deletions ydb/core/tx/columnshard/columnshard__progress_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
Self->ProgressTxController->FinishPlannedTx(txId, txc);
Self->Counters.GetTabletCounters()->IncCounter(COUNTER_PLANNED_TX_COMPLETED);
}
Self->ProgressTxInFlight = false;
if (!!Self->ProgressTxController->GetPlannedTx()) {
Self->EnqueueProgressTx(ctx);
}
return true;
}

Expand All @@ -76,10 +80,6 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
if (LastCompletedTx) {
Self->LastCompletedTx = std::max(*LastCompletedTx, Self->LastCompletedTx);
}
Self->ProgressTxInFlight = false;
if (!!Self->ProgressTxController->GetPlannedTx()) {
Self->EnqueueProgressTx(ctx);
}
Self->SetupIndexation();
}
};
Expand Down
7 changes: 0 additions & 7 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -477,13 +477,6 @@ void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProt

LOG_S_DEBUG("DropTable for pathId: " << pathId << " at tablet " << TabletID());
TablesManager.DropTable(pathId, version, db);

// TODO: Allow to read old snapshots after DROP
TBlobGroupSelector dsGroupSelector(Info());
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
THashSet<TWriteId> writesToAbort = InsertTable->DropPath(dbTable, pathId);

TryAbortWrites(db, dbTable, std::move(writesToAbort));
}

void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, const NOlap::TSnapshot& version,
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/changes/with_appended.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ void TChangesWithAppend::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self,
}
const auto predRemoveDroppedTable = [self](const TWritePortionInfoWithBlobsResult& item) {
auto& portionInfo = item.GetPortionResult();
if (!!self && (!self->TablesManager.HasTable(portionInfo.GetPathId()) || self->TablesManager.GetTable(portionInfo.GetPathId()).IsDropped())) {
if (!!self && !self->TablesManager.HasTable(portionInfo.GetPathId(), false)) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_inserted_data")("reason", "table_removed")("path_id", portionInfo.GetPathId());
return true;
} else {
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/data_events/shard_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ namespace NKikimr::NEvWrite {

void TWritersController::OnSuccess(const ui64 shardId, const ui64 writeId, const ui32 writePartId) {
WriteIds[WritesIndex.Inc() - 1] = TWriteIdForShard(shardId, writeId, writePartId);
Counters->OnCSReply(TMonotonic::Now() - StartInstant);
if (!WritesCount.Dec()) {
Counters->OnFullReply(TMonotonic::Now() - StartInstant);
auto req = MakeHolder<NLongTxService::TEvLongTxService::TEvAttachColumnShardWrites>(LongTxId);
for (auto&& i : WriteIds) {
req->AddWrite(i.GetShardId(), i.GetWriteId());
Expand All @@ -28,6 +30,7 @@ namespace NKikimr::NEvWrite {
}

void TWritersController::OnFail(const Ydb::StatusIds::StatusCode code, const TString& message) {
Counters->OnCSFailed(code);
NYql::TIssues issues;
issues.AddIssue(message);
LongTxActorId.Send(LongTxActorId, new TEvPrivate::TEvShardsWriteResult(code, issues));
Expand Down
52 changes: 49 additions & 3 deletions ydb/core/tx/data_events/shard_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <ydb/core/tx/long_tx_service/public/events.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/wilson/wilson_profile_span.h>
#include <ydb/core/tx/columnshard/counters/common/owner.h>


namespace NKikimr::NEvWrite {
Expand All @@ -22,19 +23,64 @@ class TWriteIdForShard {
TWriteIdForShard(const ui64 shardId, const ui64 writeId, const ui32 writePartId)
: ShardId(shardId)
, WriteId(writeId)
, WritePartId(writePartId)
{
, WritePartId(writePartId) {
}
};

class TCSUploadCounters: public NColumnShard::TCommonCountersOwner {
private:
using TBase = NColumnShard::TCommonCountersOwner;
NMonitoring::TDynamicCounters::TCounterPtr RequestsCount;
NMonitoring::THistogramPtr CSReplyDuration;
NMonitoring::THistogramPtr FullReplyDuration;
NMonitoring::THistogramPtr BytesDistribution;
NMonitoring::THistogramPtr RowsDistribution;
NMonitoring::TDynamicCounters::TCounterPtr RowsCount;
NMonitoring::TDynamicCounters::TCounterPtr BytesCount;
NMonitoring::TDynamicCounters::TCounterPtr FailsCount;
public:
TCSUploadCounters()
: TBase("CSUpload")
, RequestsCount(TBase::GetDeriviative("Requests"))
, CSReplyDuration(TBase::GetHistogram("Replies/Shard/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 1)))
, FullReplyDuration(TBase::GetHistogram("Replies/Full/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 1)))
, BytesDistribution(TBase::GetHistogram("Requests/Bytes", NMonitoring::ExponentialHistogram(15, 2, 1024)))
, RowsDistribution(TBase::GetHistogram("Requests/Rows", NMonitoring::ExponentialHistogram(15, 2, 16)))
, RowsCount(TBase::GetDeriviative("Rows"))
, BytesCount(TBase::GetDeriviative("Bytes"))
, FailsCount(TBase::GetDeriviative("Fails")) {

}

void OnRequest(const ui64 rows, const ui64 bytes) const {
BytesDistribution->Collect(bytes);
RowsDistribution->Collect(rows);
BytesCount->Add(bytes);
RowsCount->Add(rows);
}

void OnCSFailed(const Ydb::StatusIds::StatusCode /*code*/) {
FailsCount->Add(1);
}
};

void OnCSReply(const TDuration d) const {
CSReplyDuration->Collect(d.MilliSeconds());
}

void OnFullReply(const TDuration d) const {
FullReplyDuration->Collect(d.MilliSeconds());
}
};
// External transaction controller class
class TWritersController {
private:
TAtomicCounter WritesCount = 0;
TAtomicCounter WritesIndex = 0;
NActors::TActorIdentity LongTxActorId;
std::vector<TWriteIdForShard> WriteIds;
const TMonotonic StartInstant = TMonotonic::Now();
YDB_READONLY_DEF(NLongTxService::TLongTxId, LongTxId);
YDB_READONLY(std::shared_ptr<TCSUploadCounters>, Counters, std::make_shared<TCSUploadCounters>());
public:
using TPtr = std::shared_ptr<TWritersController>;

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/tx_proxy/rpc_long_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class TLongTxWriteBase : public TActorBootstrapped<TLongTxWriteImpl> {
ui32 writeIdx = 0;
for (auto& [shard, infos] : splittedData.GetShardsInfo()) {
for (auto&& shardInfo : infos) {
InternalController->GetCounters()->OnRequest(shardInfo->GetRowsCount(), shardInfo->GetBytes());
sumBytes += shardInfo->GetBytes();
rowsCount += shardInfo->GetRowsCount();
this->Register(new NEvWrite::TShardWriter(shard, shardsSplitter->GetTableId(), DedupId, shardInfo, ActorSpan, InternalController, ++writeIdx, NEvWrite::EModificationType::Replace));
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ namespace NKikimr {
RowsCount = TBase::GetDeriviative("Rows/Count");
PackageSize = TBase::GetHistogram("Rows/PackageSize", NMonitoring::ExponentialHistogram(15, 2, 10));

DurationToStartCommit = TBase::GetHistogram("ToStartCommit/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
DurationToFinishCommit = TBase::GetHistogram("ToFinishCommit/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
DurationToStartWriting = TBase::GetHistogram("ToStartWriting/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
DurationToTxStarted = TBase::GetHistogram("ToTxStarted/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));

const google::protobuf::EnumDescriptor* descriptor = ::Ydb::StatusIds::StatusCode_descriptor();
for (ui32 i = 0; i < (ui32)descriptor->value_count(); ++i) {
auto vDescription = descriptor->value(i);
Expand Down
25 changes: 25 additions & 0 deletions ydb/core/tx/tx_proxy/upload_rows_common_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,31 @@ class TUploadCounters: public NColumnShard::TCommonCountersOwner {
NMonitoring::TDynamicCounters::TCounterPtr RowsCount;
NMonitoring::THistogramPtr PackageSize;

NMonitoring::THistogramPtr DurationToStartCommit;
NMonitoring::THistogramPtr DurationToFinishCommit;
NMonitoring::THistogramPtr DurationToStartWriting;
NMonitoring::THistogramPtr DurationToTxStarted;

THashMap<TString, NMonitoring::TDynamicCounters::TCounterPtr> CodesCount;
public:
TUploadCounters();

void OnTxStarted(const TDuration d) const {
DurationToTxStarted->Collect(d.MilliSeconds());
}

void OnWritingStarted(const TDuration d) const {
DurationToStartWriting->Collect(d.MilliSeconds());
}

void OnStartCommit(const TDuration d) const {
DurationToStartCommit->Collect(d.MilliSeconds());
}

void OnFinishCommit(const TDuration d) const {
DurationToFinishCommit->Collect(d.MilliSeconds());
}

void OnRequest(const ui64 rowsCount) const {
RequestsCount->Add(1);
RowsCount->Add(rowsCount);
Expand Down Expand Up @@ -741,6 +762,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
}

void WriteToColumnTable(const NActors::TActorContext& ctx) {
UploadCounters.OnWritingStarted(TAppData::TimeProvider->Now() - StartTime);
TString accessCheckError;
if (!CheckAccess(accessCheckError)) {
return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, LogPrefix() << accessCheckError, ctx);
Expand All @@ -765,6 +787,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit

void Handle(NLongTxService::TEvLongTxService::TEvBeginTxResult::TPtr& ev, const TActorContext& ctx) {
const auto* msg = ev->Get();
UploadCounters.OnTxStarted(TAppData::TimeProvider->Now() - StartTime);

if (msg->Record.GetStatus() != Ydb::StatusIds::SUCCESS) {
NYql::TIssues issues;
Expand Down Expand Up @@ -894,6 +917,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
}

void CommitLongTx(const TActorContext& ctx) {
UploadCounters.OnStartCommit(TAppData::TimeProvider->Now() - StartTime);
TActorId longTxServiceId = NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId());
ctx.Send(longTxServiceId, new NLongTxService::TEvLongTxService::TEvCommitTx(LongTxId), 0, 0, Span.GetTraceId());
TBase::Become(&TThis::StateWaitCommitLongTx);
Expand All @@ -908,6 +932,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
}

void Handle(NLongTxService::TEvLongTxService::TEvCommitTxResult::TPtr& ev, const NActors::TActorContext& ctx) {
UploadCounters.OnFinishCommit(TAppData::TimeProvider->Now() - StartTime);
const auto* msg = ev->Get();

if (msg->Record.GetStatus() == Ydb::StatusIds::SUCCESS) {
Expand Down

0 comments on commit c42a9a3

Please sign in to comment.