diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index bf53e324cdb4..07d331a3fe9c 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -814,6 +814,18 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co DataShardIteratorMessages = KqpGroup->GetCounter("IteratorReads/DatashardMessages", true); IteratorDeliveryProblems = KqpGroup->GetCounter("IteratorReads/DeliveryProblems", true); + /* sink writes */ + WriteActorsShardResolve = KqpGroup->GetCounter("SinkWrites/WriteActorShardResolve", true); + WriteActorsCount = KqpGroup->GetCounter("SinkWrites/WriteActorsCount", false); + WriteActorImmediateWrites = KqpGroup->GetCounter("SinkWrites/WriteActorImmediateWrites", true); + WriteActorImmediateWritesRetries = KqpGroup->GetCounter("SinkWrites/WriteActorImmediateWritesRetries", true); + WriteActorWritesSizeHistogram = + KqpGroup->GetHistogram("SinkWrites/WriteActorWritesSize", NMonitoring::ExponentialHistogram(28, 2, 1)); + WriteActorWritesOperationsHistogram = + KqpGroup->GetHistogram("SinkWrites/WriteActorWritesOperations", NMonitoring::ExponentialHistogram(20, 2, 1)); + WriteActorWritesLatencyHistogram = + KqpGroup->GetHistogram("SinkWrites/WriteActorWritesLatencyMs", NMonitoring::ExponentialHistogram(20, 2, 1)); + /* sequencers */ SequencerActorsCount = KqpGroup->GetCounter("Sequencer/ActorCount", false); diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h index fe1cd34ea358..6a707b0337b7 100644 --- a/ydb/core/kqp/counters/kqp_counters.h +++ b/ydb/core/kqp/counters/kqp_counters.h @@ -409,6 +409,15 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter ::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorMessages; ::NMonitoring::TDynamicCounters::TCounterPtr IteratorDeliveryProblems; + // Sink write counters + ::NMonitoring::TDynamicCounters::TCounterPtr WriteActorsShardResolve; + ::NMonitoring::TDynamicCounters::TCounterPtr WriteActorsCount; + ::NMonitoring::TDynamicCounters::TCounterPtr WriteActorImmediateWrites; + ::NMonitoring::TDynamicCounters::TCounterPtr WriteActorImmediateWritesRetries; + NMonitoring::THistogramPtr WriteActorWritesSizeHistogram; + NMonitoring::THistogramPtr WriteActorWritesOperationsHistogram; + NMonitoring::THistogramPtr WriteActorWritesLatencyHistogram; + // Scheduler signals ::NMonitoring::TDynamicCounters::TCounterPtr SchedulerThrottled; ::NMonitoring::TDynamicCounters::TCounterPtr SchedulerCapacity; diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index a0888ae65b5e..8085609e395d 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -134,10 +135,13 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu , InconsistentTx( Settings.GetInconsistentTx()) , MemoryLimit(MessageSettings.InFlightMemoryLimitPerActorBytes) + , WriteActorSpan(TWilsonKqp::WriteActor, NWilson::TTraceId(args.TraceId), "WriteActor") { YQL_ENSURE(std::holds_alternative(TxId)); YQL_ENSURE(!ImmediateTx); EgressStats.Level = args.StatsLevel; + + Counters->WriteActorsCount->Inc(); } void Bootstrap() { @@ -244,6 +248,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu } void ResolveTable() { + Counters->WriteActorsShardResolve->Inc(); SchemeEntry.reset(); SchemeRequest.reset(); @@ -267,8 +272,11 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu entry.ShowPrivatePath = true; request->ResultSet.emplace_back(entry); - Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {})); - Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request)); + WriteActorStateSpan = NWilson::TSpan(TWilsonKqp::WriteActorTableNavigate, WriteActorSpan.GetTraceId(), + "WaitForShardsResolve", NWilson::EFlags::AUTO_END); + + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}), 0, 0, WriteActorSpan.GetTraceId()); + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request), 0, 0, WriteActorSpan.GetTraceId()); } void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { @@ -327,7 +335,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu request->ResultSet.emplace_back(std::move(keyRange)); TAutoPtr resolveReq(new TEvTxProxySchemeCache::TEvResolveKeySet(request)); - Send(MakeSchemeCacheID(), resolveReq.Release(), 0, 0); + Send(MakeSchemeCacheID(), resolveReq.Release(), 0, 0, WriteActorSpan.GetTraceId()); } void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) { @@ -368,6 +376,8 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu }() << ", Cookie=" << ev->Cookie); + + switch (ev->Get()->GetStatus()) { case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED: { CA_LOG_E("Got UNSPECIFIED for table `" @@ -557,6 +567,11 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu EgressStats.Chunks++; EgressStats.Splits++; EgressStats.Resume(); + + if (auto it = SendTime.find(shardId); it != std::end(SendTime)) { + Counters->WriteActorWritesLatencyHistogram->Collect((TInstant::Now() - it->second).MilliSeconds()); + SendTime.erase(it); + } } resumeNotificator.CheckMemory(); } @@ -594,7 +609,6 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu NYql::NDqProto::StatusIds::UNAVAILABLE); return; } - auto evWrite = std::make_unique( NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE); @@ -628,6 +642,16 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu ShardedWriteController->GetDataFormat()); } + if (metadata->SendAttempts == 0) { + Counters->WriteActorImmediateWrites->Inc(); + Counters->WriteActorWritesSizeHistogram->Collect(serializationResult.TotalDataSize); + Counters->WriteActorWritesOperationsHistogram->Collect(metadata->OperationsCount); + + SendTime[shardId] = TInstant::Now(); + } else { + Counters->WriteActorImmediateWritesRetries->Inc(); + } + CA_LOG_D("Send EvWrite to ShardID=" << shardId << ", TxId=" << evWrite->Record.GetTxId() << ", TxMode=" << evWrite->Record.GetTxMode() << ", LockTxId=" << evWrite->Record.GetLockTxId() << ", LockNodeId=" << evWrite->Record.GetLockNodeId() @@ -723,6 +747,13 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu NYql::TIssues issues; issues.AddIssue(std::move(issue)); + if (WriteActorStateSpan) { + WriteActorStateSpan.EndError(issues.ToOneLineString()); + } + if (WriteActorSpan) { + WriteActorSpan.EndError(issues.ToOneLineString()); + } + Callbacks->OnAsyncOutputError(OutputIndex, std::move(issues), statusCode); } @@ -732,6 +763,8 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu } void Prepare() { + WriteActorStateSpan.EndOk(); + YQL_ENSURE(SchemeEntry); ResolveAttempts = 0; @@ -803,12 +836,16 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu std::optional SchemeRequest; ui64 ResolveAttempts = 0; + THashMap SendTime; THashMap LocksInfo; bool Finished = false; const i64 MemoryLimit; IShardedWriteControllerPtr ShardedWriteController = nullptr; + + NWilson::TSpan WriteActorSpan; + NWilson::TSpan WriteActorStateSpan; }; void RegisterKqpWriteActor(NYql::NDq::TDqAsyncIoFactory& factory, TIntrusivePtr counters) { diff --git a/ydb/library/wilson_ids/wilson.h b/ydb/library/wilson_ids/wilson.h index 9c1f3bc9bd3b..d8a176ccfa9e 100644 --- a/ydb/library/wilson_ids/wilson.h +++ b/ydb/library/wilson_ids/wilson.h @@ -79,6 +79,9 @@ namespace NKikimr { LookupActor = TComponentTracingLevels::TQueryProcessor::Basic, LookupActorShardsResolve = TComponentTracingLevels::TQueryProcessor::Detailed, + WriteActor = TComponentTracingLevels::TQueryProcessor::Basic, + WriteActorTableNavigate = TComponentTracingLevels::TQueryProcessor::Detailed, + BulkUpsertActor = TComponentTracingLevels::TQueryProcessor::TopLevel, }; }; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index e4a8754cb485..57aab6e42ca5 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -287,6 +287,7 @@ struct IDqAsyncIoFactory : public TThrRefBase { const NKikimr::NMiniKQL::THolderFactory& HolderFactory; std::shared_ptr Alloc; IRandomProvider *const RandomProvider; + NWilson::TTraceId TraceId; }; struct TInputTransformArguments { diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 3d1b8b284761..46540ba528fa 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1394,7 +1394,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped .TypeEnv = typeEnv, .HolderFactory = holderFactory, .Alloc = Alloc, - .RandomProvider = randomProvider + .RandomProvider = randomProvider, + .TraceId = ComputeActorSpan.GetTraceId(), }); } catch (const std::exception& ex) { throw yexception() << "Failed to create sink " << outputDesc.GetSink().GetType() << ": " << ex.what();