diff --git a/ydb/core/grpc_services/rpc_load_rows.cpp b/ydb/core/grpc_services/rpc_load_rows.cpp index 64e359be13ef..05cc85065298 100644 --- a/ydb/core/grpc_services/rpc_load_rows.cpp +++ b/ydb/core/grpc_services/rpc_load_rows.cpp @@ -117,8 +117,9 @@ const Ydb::Table::BulkUpsertRequest* GetProtoRequest(IRequestOpCtx* req) { class TUploadRowsRPCPublic : public NTxProxy::TUploadRowsBase { using TBase = NTxProxy::TUploadRowsBase; public: - explicit TUploadRowsRPCPublic(IRequestOpCtx* request, bool diskQuotaExceeded) - : TBase(GetDuration(GetProtoRequest(request)->operation_params().operation_timeout()), diskQuotaExceeded) + explicit TUploadRowsRPCPublic(IRequestOpCtx* request, bool diskQuotaExceeded, const char* name) + : TBase(GetDuration(GetProtoRequest(request)->operation_params().operation_timeout()), diskQuotaExceeded, + NWilson::TSpan(TWilsonKqp::BulkUpsertActor, request->GetWilsonTraceId(), name)) , Request(request) {} @@ -517,7 +518,7 @@ void DoBulkUpsertRequest(std::unique_ptr p, const IFacilityProvid } else if (GetProtoRequest(p.get())->has_csv_settings()) { f.RegisterActor(new TUploadColumnsRPCPublic(p.release(), diskQuotaExceeded)); } else { - f.RegisterActor(new TUploadRowsRPCPublic(p.release(), diskQuotaExceeded)); + f.RegisterActor(new TUploadRowsRPCPublic(p.release(), diskQuotaExceeded, "BulkRowsUpsertActor")); } } @@ -530,7 +531,7 @@ IActor* TEvBulkUpsertRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCt } else if (GetProtoRequest(msg)->has_csv_settings()) { return new TUploadColumnsRPCPublic(msg, diskQuotaExceeded); } else { - return new TUploadRowsRPCPublic(msg, diskQuotaExceeded); + return new TUploadRowsRPCPublic(msg, diskQuotaExceeded, "BulkRowsUpsertActor"); } } diff --git a/ydb/core/tx/datashard/datashard__op_rows.cpp b/ydb/core/tx/datashard/datashard__op_rows.cpp index 5cee5d62acab..0372e649947e 100644 --- a/ydb/core/tx/datashard/datashard__op_rows.cpp +++ b/ydb/core/tx/datashard/datashard__op_rows.cpp @@ -16,7 +16,7 @@ class TTxDirectBase : public TTransactionBase { public: TTxDirectBase(TDataShard* ds, TEvRequest ev) - : TBase(ds) + : TBase(ds, std::move(ev->TraceId)) , Ev(ev) { } diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index 16f4964299b5..0826905f9069 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -9,7 +9,6 @@ #include #include #include -#include #include #include #include @@ -29,6 +28,8 @@ #undef INCLUDE_YDB_INTERNAL_H #include +#include +#include #include #include @@ -208,18 +209,21 @@ class TUploadRowsBase : public TActorBootstrapped Batch; float RuCost = 0.0; + NWilson::TSpan Span; + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return DerivedActivityType; } - explicit TUploadRowsBase(TDuration timeout = TDuration::Max(), bool diskQuotaExceeded = false) + explicit TUploadRowsBase(TDuration timeout = TDuration::Max(), bool diskQuotaExceeded = false, NWilson::TSpan span = {}) : TBase() , SchemeCache(MakeSchemeCacheID()) , LeaderPipeCache(MakePipePeNodeCacheID(false)) , Timeout((timeout && timeout <= DEFAULT_TIMEOUT) ? timeout : DEFAULT_TIMEOUT) , Status(Ydb::StatusIds::SUCCESS) , DiskQuotaExceeded(diskQuotaExceeded) + , Span(std::move(span)) {} void Bootstrap(const NActors::TActorContext& ctx) { @@ -232,10 +236,10 @@ class TUploadRowsBase : public TActorBootstrappedResultSet.emplace_back(entry); - ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvNavigateKeySet(request)); + ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvNavigateKeySet(request), 0, 0, Span.GetTraceId()); TimeoutTimerActorId = CreateLongTimer(ctx, Timeout, new IEventHandle(ctx.SelfID, ctx.SelfID, new TEvents::TEvWakeup())); @@ -743,7 +748,7 @@ class TUploadRowsBase : public TActorBootstrappedResultSet.emplace_back(std::move(keyRange)); TAutoPtr resolveReq(new TEvTxProxySchemeCache::TEvResolveKeySet(request)); - ctx.Send(SchemeCache, resolveReq.Release()); + ctx.Send(SchemeCache, resolveReq.Release(), 0, 0, Span.GetTraceId()); TBase::Become(&TThis::StateWaitResolveShards); } @@ -1027,7 +1032,7 @@ class TUploadRowsBase : public TActorBootstrappedRecord.SetOverloadSubscribe(seqNo); state->SentOverloadSeqNo = seqNo; - ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(ev.release(), shardId, true), IEventHandle::FlagTrackDelivery); + ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(ev.release(), shardId, true), IEventHandle::FlagTrackDelivery, 0, Span.GetTraceId()); } void MakeShardRequests(const NActors::TActorContext& ctx) { @@ -1109,7 +1114,7 @@ class TUploadRowsBase : public TActorBootstrappedRecord.SetOverloadSubscribe(seqNo); uploadRetryStates[idx]->SentOverloadSeqNo = seqNo; - ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(ev.release(), shardId, true), IEventHandle::FlagTrackDelivery); + ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(ev.release(), shardId, true), IEventHandle::FlagTrackDelivery, 0, Span.GetTraceId()); auto res = ShardRepliesLeft.insert(shardId); if (!res.second) { @@ -1133,7 +1138,7 @@ class TUploadRowsBase : public TActorBootstrappedTableId, TActorId())); + ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvInvalidateTable(GetKeyRange()->TableId, TActorId()), 0, 0, Span.GetTraceId()); SetError(Ydb::StatusIds::UNAVAILABLE, Sprintf("Failed to connect to shard %" PRIu64, ev->Get()->TabletId)); ShardRepliesLeft.erase(ev->Get()->TabletId); @@ -1170,7 +1175,7 @@ class TUploadRowsBase : public TActorBootstrappedTableId, TActorId())); + ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvInvalidateTable(GetKeyRange()->TableId, TActorId()), 0, 0, Span.GetTraceId()); status = Ydb::StatusIds::OVERLOADED; break; case NKikimrTxDataShard::TError::DISK_SPACE_EXHAUSTED: @@ -1203,7 +1208,7 @@ class TUploadRowsBase : public TActorBootstrapped