Skip to content

Commit

Permalink
Implemented tracing for bulk row upsert (#2218)
Browse files Browse the repository at this point in the history
  • Loading branch information
domwst authored Feb 26, 2024
1 parent 23b3575 commit a2fdf6a
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 19 deletions.
9 changes: 5 additions & 4 deletions ydb/core/grpc_services/rpc_load_rows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ const Ydb::Table::BulkUpsertRequest* GetProtoRequest(IRequestOpCtx* req) {
class TUploadRowsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ> {
using TBase = NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ>;
public:
explicit TUploadRowsRPCPublic(IRequestOpCtx* request, bool diskQuotaExceeded)
: TBase(GetDuration(GetProtoRequest(request)->operation_params().operation_timeout()), diskQuotaExceeded)
explicit TUploadRowsRPCPublic(IRequestOpCtx* request, bool diskQuotaExceeded, const char* name)
: TBase(GetDuration(GetProtoRequest(request)->operation_params().operation_timeout()), diskQuotaExceeded,
NWilson::TSpan(TWilsonKqp::BulkUpsertActor, request->GetWilsonTraceId(), name))
, Request(request)
{}

Expand Down Expand Up @@ -517,7 +518,7 @@ void DoBulkUpsertRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvid
} else if (GetProtoRequest(p.get())->has_csv_settings()) {
f.RegisterActor(new TUploadColumnsRPCPublic(p.release(), diskQuotaExceeded));
} else {
f.RegisterActor(new TUploadRowsRPCPublic(p.release(), diskQuotaExceeded));
f.RegisterActor(new TUploadRowsRPCPublic(p.release(), diskQuotaExceeded, "BulkRowsUpsertActor"));
}
}

Expand All @@ -530,7 +531,7 @@ IActor* TEvBulkUpsertRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCt
} else if (GetProtoRequest(msg)->has_csv_settings()) {
return new TUploadColumnsRPCPublic(msg, diskQuotaExceeded);
} else {
return new TUploadRowsRPCPublic(msg, diskQuotaExceeded);
return new TUploadRowsRPCPublic(msg, diskQuotaExceeded, "BulkRowsUpsertActor");
}
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard__op_rows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class TTxDirectBase : public TTransactionBase<TDataShard> {

public:
TTxDirectBase(TDataShard* ds, TEvRequest ev)
: TBase(ds)
: TBase(ds, std::move(ev->TraceId))
, Ev(ev)
{
}
Expand Down
34 changes: 20 additions & 14 deletions ydb/core/tx/tx_proxy/upload_rows_common_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#include <ydb/core/formats/arrow/converter.h>
#include <ydb/core/io_formats/arrow/csv_arrow.h>
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/library/ydb_issue/issue_helpers.h>
#include <ydb/core/base/path.h>
#include <ydb/core/base/feature_flags.h>
#include <ydb/core/scheme/scheme_tablecell.h>
Expand All @@ -29,6 +28,8 @@
#undef INCLUDE_YDB_INTERNAL_H

#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/wilson_ids/wilson.h>
#include <ydb/library/ydb_issue/issue_helpers.h>

#include <util/string/join.h>
#include <util/string/vector.h>
Expand Down Expand Up @@ -208,18 +209,21 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
std::shared_ptr<arrow::RecordBatch> Batch;
float RuCost = 0.0;

NWilson::TSpan Span;

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return DerivedActivityType;
}

explicit TUploadRowsBase(TDuration timeout = TDuration::Max(), bool diskQuotaExceeded = false)
explicit TUploadRowsBase(TDuration timeout = TDuration::Max(), bool diskQuotaExceeded = false, NWilson::TSpan span = {})
: TBase()
, SchemeCache(MakeSchemeCacheID())
, LeaderPipeCache(MakePipePeNodeCacheID(false))
, Timeout((timeout && timeout <= DEFAULT_TIMEOUT) ? timeout : DEFAULT_TIMEOUT)
, Status(Ydb::StatusIds::SUCCESS)
, DiskQuotaExceeded(diskQuotaExceeded)
, Span(std::move(span))
{}

void Bootstrap(const NActors::TActorContext& ctx) {
Expand All @@ -232,10 +236,10 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
for (auto& pr : ShardUploadRetryStates) {
if (pr.second.SentOverloadSeqNo) {
auto* msg = new TEvDataShard::TEvOverloadUnsubscribe(pr.second.SentOverloadSeqNo);
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(msg, pr.first, false));
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(msg, pr.first, false), 0, 0, Span.GetTraceId());
}
}
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(0));
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(0), 0, 0, Span.GetTraceId());
if (TimeoutTimerActorId) {
ctx.Send(TimeoutTimerActorId, new TEvents::TEvPoisonPill());
}
Expand Down Expand Up @@ -330,6 +334,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
private:
void Handle(TEvents::TEvPoison::TPtr&, const TActorContext& ctx) {
OnBeforePoison(ctx);
Span.EndError("poison");
Die(ctx);
}

Expand Down Expand Up @@ -595,7 +600,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
entry.SyncVersion = true;
entry.ShowPrivatePath = AllowWriteToPrivateTable;
request->ResultSet.emplace_back(entry);
ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvNavigateKeySet(request));
ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvNavigateKeySet(request), 0, 0, Span.GetTraceId());

TimeoutTimerActorId = CreateLongTimer(ctx, Timeout,
new IEventHandle(ctx.SelfID, ctx.SelfID, new TEvents::TEvWakeup()));
Expand Down Expand Up @@ -743,7 +748,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
// Begin Long Tx for writing a batch into OLAP table
TActorId longTxServiceId = NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId());
NKikimrLongTxService::TEvBeginTx::EMode mode = NKikimrLongTxService::TEvBeginTx::MODE_WRITE_ONLY;
ctx.Send(longTxServiceId, new NLongTxService::TEvLongTxService::TEvBeginTx(GetDatabase(), mode));
ctx.Send(longTxServiceId, new NLongTxService::TEvLongTxService::TEvBeginTx(GetDatabase(), mode), 0, 0, Span.GetTraceId());
TBase::Become(&TThis::StateWaitBeginLongTx);
}

Expand Down Expand Up @@ -861,7 +866,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
LogPrefix() << "rolling back LongTx '" << LongTxId.ToString() << "'");

TActorId longTxServiceId = NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId());
ctx.Send(longTxServiceId, new NLongTxService::TEvLongTxService::TEvRollbackTx(LongTxId));
ctx.Send(longTxServiceId, new NLongTxService::TEvLongTxService::TEvRollbackTx(LongTxId), 0, 0, Span.GetTraceId());
}

STFUNC(StateWaitWriteBatchResult) {
Expand All @@ -887,7 +892,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit

void CommitLongTx(const TActorContext& ctx) {
TActorId longTxServiceId = NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId());
ctx.Send(longTxServiceId, new NLongTxService::TEvLongTxService::TEvCommitTx(LongTxId));
ctx.Send(longTxServiceId, new NLongTxService::TEvLongTxService::TEvCommitTx(LongTxId), 0, 0, Span.GetTraceId());
TBase::Become(&TThis::StateWaitCommitLongTx);
}

Expand Down Expand Up @@ -966,7 +971,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
request->ResultSet.emplace_back(std::move(keyRange));

TAutoPtr<TEvTxProxySchemeCache::TEvResolveKeySet> resolveReq(new TEvTxProxySchemeCache::TEvResolveKeySet(request));
ctx.Send(SchemeCache, resolveReq.Release());
ctx.Send(SchemeCache, resolveReq.Release(), 0, 0, Span.GetTraceId());

TBase::Become(&TThis::StateWaitResolveShards);
}
Expand Down Expand Up @@ -1027,7 +1032,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
ev->Record.SetOverloadSubscribe(seqNo);
state->SentOverloadSeqNo = seqNo;

ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(ev.release(), shardId, true), IEventHandle::FlagTrackDelivery);
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(ev.release(), shardId, true), IEventHandle::FlagTrackDelivery, 0, Span.GetTraceId());
}

void MakeShardRequests(const NActors::TActorContext& ctx) {
Expand Down Expand Up @@ -1109,7 +1114,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
ev->Record.SetOverloadSubscribe(seqNo);
uploadRetryStates[idx]->SentOverloadSeqNo = seqNo;

ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(ev.release(), shardId, true), IEventHandle::FlagTrackDelivery);
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(ev.release(), shardId, true), IEventHandle::FlagTrackDelivery, 0, Span.GetTraceId());

auto res = ShardRepliesLeft.insert(shardId);
if (!res.second) {
Expand All @@ -1133,7 +1138,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
}

void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr &ev, const TActorContext &ctx) {
ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvInvalidateTable(GetKeyRange()->TableId, TActorId()));
ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvInvalidateTable(GetKeyRange()->TableId, TActorId()), 0, 0, Span.GetTraceId());

SetError(Ydb::StatusIds::UNAVAILABLE, Sprintf("Failed to connect to shard %" PRIu64, ev->Get()->TabletId));
ShardRepliesLeft.erase(ev->Get()->TabletId);
Expand Down Expand Up @@ -1170,7 +1175,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
switch (shardResponse.GetStatus()) {
case NKikimrTxDataShard::TError::WRONG_SHARD_STATE:
case NKikimrTxDataShard::TError::SHARD_IS_BLOCKED:
ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvInvalidateTable(GetKeyRange()->TableId, TActorId()));
ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvInvalidateTable(GetKeyRange()->TableId, TActorId()), 0, 0, Span.GetTraceId());
status = Ydb::StatusIds::OVERLOADED;
break;
case NKikimrTxDataShard::TError::DISK_SPACE_EXHAUSTED:
Expand Down Expand Up @@ -1203,7 +1208,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
}

// Notify the cache that we are done with the pipe
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(shardId));
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(shardId), 0, 0, Span.GetTraceId());

ShardRepliesLeft.erase(shardId);
ShardUploadRetryStates.erase(shardId);
Expand Down Expand Up @@ -1266,6 +1271,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
Y_DEBUG_ABORT_UNLESS(status != ::Ydb::StatusIds::SUCCESS);
RollbackLongTx(ctx);
}
Span.EndOk();

Die(ctx);
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/wilson_ids/wilson.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ namespace NKikimr {

LookupActor = 9,
LookupActorShardsResolve = 10,

BulkUpsertActor = 9,
};
};

Expand Down

0 comments on commit a2fdf6a

Please sign in to comment.