diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp index 557693070328..9b944b27b916 100644 --- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp +++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp @@ -57,6 +57,10 @@ class TColumnShard::TTxProgressTx: public TTransactionBase { Self->ProgressTxController->FinishPlannedTx(txId, txc); Self->Counters.GetTabletCounters()->IncCounter(COUNTER_PLANNED_TX_COMPLETED); } + Self->ProgressTxInFlight = false; + if (!!Self->ProgressTxController->GetPlannedTx()) { + Self->EnqueueProgressTx(ctx); + } return true; } @@ -76,10 +80,6 @@ class TColumnShard::TTxProgressTx: public TTransactionBase { if (LastCompletedTx) { Self->LastCompletedTx = std::max(*LastCompletedTx, Self->LastCompletedTx); } - Self->ProgressTxInFlight = false; - if (!!Self->ProgressTxController->GetPlannedTx()) { - Self->EnqueueProgressTx(ctx); - } Self->SetupIndexation(); } }; diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 1875a14d9763..efb73296622d 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -477,13 +477,6 @@ void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProt LOG_S_DEBUG("DropTable for pathId: " << pathId << " at tablet " << TabletID()); TablesManager.DropTable(pathId, version, db); - - // TODO: Allow to read old snapshots after DROP - TBlobGroupSelector dsGroupSelector(Info()); - NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); - THashSet writesToAbort = InsertTable->DropPath(dbTable, pathId); - - TryAbortWrites(db, dbTable, std::move(writesToAbort)); } void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, const NOlap::TSnapshot& version, diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index e24fa9e8988c..24d44eb34587 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -19,7 +19,7 @@ void TChangesWithAppend::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, } const auto predRemoveDroppedTable = [self](const TWritePortionInfoWithBlobsResult& item) { auto& portionInfo = item.GetPortionResult(); - if (!!self && (!self->TablesManager.HasTable(portionInfo.GetPathId()) || self->TablesManager.GetTable(portionInfo.GetPathId()).IsDropped())) { + if (!!self && !self->TablesManager.HasTable(portionInfo.GetPathId(), false)) { AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_inserted_data")("reason", "table_removed")("path_id", portionInfo.GetPathId()); return true; } else { diff --git a/ydb/core/tx/data_events/shard_writer.cpp b/ydb/core/tx/data_events/shard_writer.cpp index 0158527fee49..4fcd62f3b12f 100644 --- a/ydb/core/tx/data_events/shard_writer.cpp +++ b/ydb/core/tx/data_events/shard_writer.cpp @@ -18,7 +18,9 @@ namespace NKikimr::NEvWrite { void TWritersController::OnSuccess(const ui64 shardId, const ui64 writeId, const ui32 writePartId) { WriteIds[WritesIndex.Inc() - 1] = TWriteIdForShard(shardId, writeId, writePartId); + Counters->OnCSReply(TMonotonic::Now() - StartInstant); if (!WritesCount.Dec()) { + Counters->OnFullReply(TMonotonic::Now() - StartInstant); auto req = MakeHolder(LongTxId); for (auto&& i : WriteIds) { req->AddWrite(i.GetShardId(), i.GetWriteId()); @@ -28,6 +30,7 @@ namespace NKikimr::NEvWrite { } void TWritersController::OnFail(const Ydb::StatusIds::StatusCode code, const TString& message) { + Counters->OnCSFailed(code); NYql::TIssues issues; issues.AddIssue(message); LongTxActorId.Send(LongTxActorId, new TEvPrivate::TEvShardsWriteResult(code, issues)); diff --git a/ydb/core/tx/data_events/shard_writer.h b/ydb/core/tx/data_events/shard_writer.h index d7abe2e2a3ed..7faf74d331c3 100644 --- a/ydb/core/tx/data_events/shard_writer.h +++ b/ydb/core/tx/data_events/shard_writer.h @@ -8,6 +8,7 @@ #include #include #include +#include namespace NKikimr::NEvWrite { @@ -22,11 +23,54 @@ class TWriteIdForShard { TWriteIdForShard(const ui64 shardId, const ui64 writeId, const ui32 writePartId) : ShardId(shardId) , WriteId(writeId) - , WritePartId(writePartId) - { + , WritePartId(writePartId) { + } +}; + +class TCSUploadCounters: public NColumnShard::TCommonCountersOwner { +private: + using TBase = NColumnShard::TCommonCountersOwner; + NMonitoring::TDynamicCounters::TCounterPtr RequestsCount; + NMonitoring::THistogramPtr CSReplyDuration; + NMonitoring::THistogramPtr FullReplyDuration; + NMonitoring::THistogramPtr BytesDistribution; + NMonitoring::THistogramPtr RowsDistribution; + NMonitoring::TDynamicCounters::TCounterPtr RowsCount; + NMonitoring::TDynamicCounters::TCounterPtr BytesCount; + NMonitoring::TDynamicCounters::TCounterPtr FailsCount; +public: + TCSUploadCounters() + : TBase("CSUpload") + , RequestsCount(TBase::GetDeriviative("Requests")) + , CSReplyDuration(TBase::GetHistogram("Replies/Shard/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 1))) + , FullReplyDuration(TBase::GetHistogram("Replies/Full/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 1))) + , BytesDistribution(TBase::GetHistogram("Requests/Bytes", NMonitoring::ExponentialHistogram(15, 2, 1024))) + , RowsDistribution(TBase::GetHistogram("Requests/Rows", NMonitoring::ExponentialHistogram(15, 2, 16))) + , RowsCount(TBase::GetDeriviative("Rows")) + , BytesCount(TBase::GetDeriviative("Bytes")) + , FailsCount(TBase::GetDeriviative("Fails")) { + + } + + void OnRequest(const ui64 rows, const ui64 bytes) const { + BytesDistribution->Collect(bytes); + RowsDistribution->Collect(rows); + BytesCount->Add(bytes); + RowsCount->Add(rows); + } + + void OnCSFailed(const Ydb::StatusIds::StatusCode /*code*/) { + FailsCount->Add(1); } -}; + void OnCSReply(const TDuration d) const { + CSReplyDuration->Collect(d.MilliSeconds()); + } + + void OnFullReply(const TDuration d) const { + FullReplyDuration->Collect(d.MilliSeconds()); + } +}; // External transaction controller class class TWritersController { private: @@ -34,7 +78,9 @@ class TWritersController { TAtomicCounter WritesIndex = 0; NActors::TActorIdentity LongTxActorId; std::vector WriteIds; + const TMonotonic StartInstant = TMonotonic::Now(); YDB_READONLY_DEF(NLongTxService::TLongTxId, LongTxId); + YDB_READONLY(std::shared_ptr, Counters, std::make_shared()); public: using TPtr = std::shared_ptr; diff --git a/ydb/core/tx/tx_proxy/rpc_long_tx.cpp b/ydb/core/tx/tx_proxy/rpc_long_tx.cpp index f5b7c6b07cd5..29e14d3fb00e 100644 --- a/ydb/core/tx/tx_proxy/rpc_long_tx.cpp +++ b/ydb/core/tx/tx_proxy/rpc_long_tx.cpp @@ -85,6 +85,7 @@ class TLongTxWriteBase : public TActorBootstrapped { ui32 writeIdx = 0; for (auto& [shard, infos] : splittedData.GetShardsInfo()) { for (auto&& shardInfo : infos) { + InternalController->GetCounters()->OnRequest(shardInfo->GetRowsCount(), shardInfo->GetBytes()); sumBytes += shardInfo->GetBytes(); rowsCount += shardInfo->GetRowsCount(); this->Register(new NEvWrite::TShardWriter(shard, shardsSplitter->GetTableId(), DedupId, shardInfo, ActorSpan, InternalController, ++writeIdx, NEvWrite::EModificationType::Replace)); diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp b/ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp index 6d487a26016b..6a7911593cea 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp @@ -12,6 +12,11 @@ namespace NKikimr { RowsCount = TBase::GetDeriviative("Rows/Count"); PackageSize = TBase::GetHistogram("Rows/PackageSize", NMonitoring::ExponentialHistogram(15, 2, 10)); + DurationToStartCommit = TBase::GetHistogram("ToStartCommit/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10)); + DurationToFinishCommit = TBase::GetHistogram("ToFinishCommit/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10)); + DurationToStartWriting = TBase::GetHistogram("ToStartWriting/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10)); + DurationToTxStarted = TBase::GetHistogram("ToTxStarted/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10)); + const google::protobuf::EnumDescriptor* descriptor = ::Ydb::StatusIds::StatusCode_descriptor(); for (ui32 i = 0; i < (ui32)descriptor->value_count(); ++i) { auto vDescription = descriptor->value(i); diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index 599f9984b8ab..1e0e073c39d1 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -45,10 +45,31 @@ class TUploadCounters: public NColumnShard::TCommonCountersOwner { NMonitoring::TDynamicCounters::TCounterPtr RowsCount; NMonitoring::THistogramPtr PackageSize; + NMonitoring::THistogramPtr DurationToStartCommit; + NMonitoring::THistogramPtr DurationToFinishCommit; + NMonitoring::THistogramPtr DurationToStartWriting; + NMonitoring::THistogramPtr DurationToTxStarted; + THashMap CodesCount; public: TUploadCounters(); + void OnTxStarted(const TDuration d) const { + DurationToTxStarted->Collect(d.MilliSeconds()); + } + + void OnWritingStarted(const TDuration d) const { + DurationToStartWriting->Collect(d.MilliSeconds()); + } + + void OnStartCommit(const TDuration d) const { + DurationToStartCommit->Collect(d.MilliSeconds()); + } + + void OnFinishCommit(const TDuration d) const { + DurationToFinishCommit->Collect(d.MilliSeconds()); + } + void OnRequest(const ui64 rowsCount) const { RequestsCount->Add(1); RowsCount->Add(rowsCount); @@ -741,6 +762,7 @@ class TUploadRowsBase : public TActorBootstrappedNow() - StartTime); TString accessCheckError; if (!CheckAccess(accessCheckError)) { return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, LogPrefix() << accessCheckError, ctx); @@ -765,6 +787,7 @@ class TUploadRowsBase : public TActorBootstrappedGet(); + UploadCounters.OnTxStarted(TAppData::TimeProvider->Now() - StartTime); if (msg->Record.GetStatus() != Ydb::StatusIds::SUCCESS) { NYql::TIssues issues; @@ -894,6 +917,7 @@ class TUploadRowsBase : public TActorBootstrappedNow() - StartTime); TActorId longTxServiceId = NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId()); ctx.Send(longTxServiceId, new NLongTxService::TEvLongTxService::TEvCommitTx(LongTxId), 0, 0, Span.GetTraceId()); TBase::Become(&TThis::StateWaitCommitLongTx); @@ -908,6 +932,7 @@ class TUploadRowsBase : public TActorBootstrappedNow() - StartTime); const auto* msg = ev->Get(); if (msg->Record.GetStatus() == Ydb::StatusIds::SUCCESS) {