Skip to content

Commit

Permalink
Evwrite optimizations (#11428)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Nov 11, 2024
1 parent 4790237 commit 6c7ed0d
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 87 deletions.
1 change: 1 addition & 0 deletions ydb/core/kqp/common/buffer/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ struct TKqpBufferWriterSettings {
IKqpTransactionManagerPtr TxManager;
NWilson::TTraceId TraceId;
TIntrusivePtr<TKqpCounters> Counters;
TIntrusivePtr<NTxProxy::TTxProxyMon> TxProxyMon;
};

NActors::IActor* CreateKqpBufferWriterActor(TKqpBufferWriterSettings&& settings);
Expand Down
30 changes: 4 additions & 26 deletions ydb/core/kqp/common/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,23 +251,12 @@ bool HasOlapTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
return false;
}

bool HasOlapTableWriteInStage(const NKqpProto::TKqpPhyStage& stage, const google::protobuf::RepeatedPtrField< ::NKqpProto::TKqpPhyTable>& tables) {
bool HasOlapTableWriteInStage(const NKqpProto::TKqpPhyStage& stage) {
for (const auto& sink : stage.GetSinks()) {
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink && sink.GetInternalSink().GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>()) {
NKikimrKqp::TKqpTableSinkSettings settings;
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");

const bool isOlapSink = std::any_of(
std::begin(tables),
std::end(tables),
[&](const NKqpProto::TKqpPhyTable& table) {
return table.GetKind() == NKqpProto::EKqpPhyTableKind::TABLE_KIND_OLAP
&& google::protobuf::util::MessageDifferencer::Equals(table.GetId(), settings.GetTable());
});

if (isOlapSink) {
return true;
}
return settings.GetIsOlap();
}
}
return false;
Expand All @@ -276,7 +265,7 @@ bool HasOlapTableWriteInStage(const NKqpProto::TKqpPhyStage& stage, const google
bool HasOlapTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
for (const auto &tx : physicalQuery.GetTransactions()) {
for (const auto &stage : tx.GetStages()) {
if (HasOlapTableWriteInStage(stage, tx.GetTables())) {
if (HasOlapTableWriteInStage(stage)) {
return true;
}
}
Expand Down Expand Up @@ -325,18 +314,7 @@ bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink && sink.GetInternalSink().GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>()) {
NKikimrKqp::TKqpTableSinkSettings settings;
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");

const bool isOltpSink = std::any_of(
std::begin(tx.GetTables()),
std::end(tx.GetTables()),
[&](const NKqpProto::TKqpPhyTable& table) {
return table.GetKind() == NKqpProto::EKqpPhyTableKind::TABLE_KIND_DS
&& google::protobuf::util::MessageDifferencer::Equals(table.GetId(), settings.GetTable());
});

if (isOltpSink) {
return true;
}
return !settings.GetIsOlap();
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions ydb/core/kqp/common/kqp_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -507,9 +507,7 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
bool commitTx, const NKqpProto::TKqpPhyQuery& physicalQuery);

bool HasOlapTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
bool HasOlapTableWriteInStage(
const NKqpProto::TKqpPhyStage& stage,
const google::protobuf::RepeatedPtrField< ::NKqpProto::TKqpPhyTable>& tables);
bool HasOlapTableWriteInStage(const NKqpProto::TKqpPhyStage& stage);
bool HasOlapTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
bool HasOltpTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
Expand Down
18 changes: 8 additions & 10 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
}

ResponseEv->Record.MutableResponse()->SetStatus(Ydb::StatusIds::SUCCESS);
Counters->TxProxyMon->ReportStatusOK->Inc();

auto addLocks = [this](const ui64 taskId, const auto& data) {
if (data.GetData().template Is<NKikimrTxDataShard::TEvKqpInputActorResultInfo>()) {
NKikimrTxDataShard::TEvKqpInputActorResultInfo info;
Expand Down Expand Up @@ -256,11 +253,13 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
ShardIdToTableInfo->Add(lock.GetDataShard(), stageInfo.Meta.TableKind == ETableKind::Olap, stageInfo.Meta.TablePath);
if (TxManager) {
YQL_ENSURE(stageInfo.Meta.TableKind == ETableKind::Olap);
TxManager->AddShard(lock.GetDataShard(), stageInfo.Meta.TableKind == ETableKind::Olap, stageInfo.Meta.TablePath);
TxManager->AddAction(lock.GetDataShard(), IKqpTransactionManager::EAction::WRITE);
IKqpTransactionManager::TActionFlags flags = IKqpTransactionManager::EAction::WRITE;
if (info.GetHasRead()) {
TxManager->AddAction(lock.GetDataShard(), IKqpTransactionManager::EAction::READ);
flags |= IKqpTransactionManager::EAction::READ;
}

TxManager->AddShard(lock.GetDataShard(), stageInfo.Meta.TableKind == ETableKind::Olap, stageInfo.Meta.TablePath);
TxManager->AddAction(lock.GetDataShard(), flags);
TxManager->AddLock(lock.GetDataShard(), lock);
}
}
Expand Down Expand Up @@ -337,6 +336,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}

void MakeResponseAndPassAway() {
ResponseEv->Record.MutableResponse()->SetStatus(Ydb::StatusIds::SUCCESS);
Counters->TxProxyMon->ReportStatusOK->Inc();

ResponseEv->Snapshot = GetSnapshot();

if (!Locks.empty() || (TxManager && TxManager->HasLocks())) {
Expand Down Expand Up @@ -1942,10 +1944,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
return false;
}

bool HasOlapSink(const NKqpProto::TKqpPhyStage& stage, const google::protobuf::RepeatedPtrField< ::NKqpProto::TKqpPhyTable>& tables) {
return NKqp::HasOlapTableWriteInStage(stage, tables);
}

void Execute() {
LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId);

Expand Down
63 changes: 41 additions & 22 deletions ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,11 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
// TODO: Maybe there are better ways to initialize new shards...
for (const auto& shardInfo : ShardedWriteController->GetPendingShards()) {
TxManager->AddShard(shardInfo.ShardId, IsOlap(), TablePath);
TxManager->AddAction(shardInfo.ShardId, IKqpTransactionManager::EAction::WRITE);
IKqpTransactionManager::TActionFlags flags = IKqpTransactionManager::EAction::WRITE;
if (shardInfo.HasRead) {
TxManager->AddAction(shardInfo.ShardId, IKqpTransactionManager::EAction::READ);
flags |= IKqpTransactionManager::EAction::READ;
}
TxManager->AddAction(shardInfo.ShardId, flags);
}
}

Expand Down Expand Up @@ -540,7 +541,6 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());

// TODO: Add new status for splits in datashard. This is tmp solution.
if (getIssues().ToOneLineString().Contains("in a pre/offline state assuming this is due to a finished split (wrong shard state)")) {
ResetShardRetries(ev->Get()->Record.GetOrigin(), ev->Cookie);
Expand All @@ -561,13 +561,12 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());

RuntimeError(
TStringBuilder() << "Disk space exhausted for table `"
<< TablePath << "`. "
<< getIssues().ToOneLineString(),
NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
getIssues());
RuntimeError(
TStringBuilder() << "Disk space exhausted for table `"
<< TablePath << "`. "
<< getIssues().ToOneLineString(),
NYql::NDqProto::StatusIds::UNAVAILABLE,
getIssues());
return;
}
case NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED: {
Expand Down Expand Up @@ -670,7 +669,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
preparedInfo.Coordinator = domainCoordinators.Select(*TxId);
}

OnMessageAcknowledged(ev->Get()->Record.GetOrigin());
OnMessageReceived(ev->Get()->Record.GetOrigin());
const auto result = ShardedWriteController->OnMessageAcknowledged(
ev->Get()->Record.GetOrigin(), ev->Cookie);
if (result) {
Expand Down Expand Up @@ -713,7 +712,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
return;
}

OnMessageAcknowledged(ev->Get()->Record.GetOrigin());
OnMessageReceived(ev->Get()->Record.GetOrigin());
const auto result = ShardedWriteController->OnMessageAcknowledged(
ev->Get()->Record.GetOrigin(), ev->Cookie);
if (result && result->IsShardEmpty && Mode == EMode::IMMEDIATE_COMMIT) {
Expand All @@ -723,7 +722,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
}
}

void OnMessageAcknowledged(const ui64 shardId) {
void OnMessageReceived(const ui64 shardId) {
if (auto it = SendTime.find(shardId); it != std::end(SendTime)) {
Counters->WriteActorWritesLatencyHistogram->Collect((TInstant::Now() - it->second).MilliSeconds());
SendTime.erase(it);
Expand Down Expand Up @@ -847,7 +846,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
<< ", Attempts=" << metadata->SendAttempts << ", Mode=" << static_cast<int>(Mode));
Send(
PipeCacheId,
new TEvPipeCache::TEvForward(evWrite.release(), shardId, true),
new TEvPipeCache::TEvForward(evWrite.release(), shardId, /* subscribe */ true),
IEventHandle::FlagTrackDelivery,
metadata->Cookie,
TableWriteActorSpan.GetTraceId());
Expand Down Expand Up @@ -1270,6 +1269,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
, Alloc(std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(__LOCATION__))
, TypeEnv(*Alloc)
, Counters(settings.Counters)
, TxProxyMon(settings.TxProxyMon)
, BufferWriteActor(TWilsonKqp::BufferWriteActor, NWilson::TTraceId(settings.TraceId), "TKqpBufferWriteActor", NWilson::EFlags::AUTO_END)
, BufferWriteActorState(TWilsonKqp::BufferWriteActorState, BufferWriteActor.GetTraceId(),
"BufferWriteActorState::Writing", NWilson::EFlags::AUTO_END)
Expand Down Expand Up @@ -1552,6 +1552,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
FillEvWritePrepare(evWrite.get(), shardId, *TxId, TxManager);
}

SendTime[shardId] = TInstant::Now();
CA_LOG_D("Send EvWrite (external) to ShardID=" << shardId << ", isPrepare=" << !isRollback << ", isImmediateCommit=" << isRollback << ", TxId=" << evWrite->Record.GetTxId()
<< ", LockTxId=" << evWrite->Record.GetLockTxId() << ", LockNodeId=" << evWrite->Record.GetLockNodeId()
<< ", Locks= " << [&]() {
Expand All @@ -1565,10 +1566,11 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
<< ", OperationsCount=" << 0 << ", IsFinal=" << 1
<< ", Attempts=" << 0);

// TODO: Track latecy
Send(
NKikimr::MakePipePerNodeCacheID(false),
new TEvPipeCache::TEvForward(evWrite.release(), shardId, true),
0,
new TEvPipeCache::TEvForward(evWrite.release(), shardId, /* subscribe */ true),
IEventHandle::FlagTrackDelivery,
0);
}
}
Expand Down Expand Up @@ -1672,26 +1674,30 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub

switch (res->GetStatus()) {
case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusAccepted:
// TODO: metrics
TxProxyMon->ClientTxStatusAccepted->Inc();
break;
case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusProcessed:
TxProxyMon->ClientTxStatusProcessed->Inc();
break;
case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusConfirmed:
TxProxyMon->ClientTxStatusConfirmed->Inc();
break;

case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusPlanned:
TxProxyMon->ClientTxStatusPlanned->Inc();
break;

case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusOutdated:
case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusDeclined:
case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusDeclinedNoSpace:
case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusRestarting:
// TODO: CancelProposal???
TxProxyMon->ClientTxStatusCoordinatorDeclined->Inc();
ReplyErrorAndDie(TStringBuilder() << "Failed to plan transaction, status: " << res->GetStatus(), NYql::NDqProto::StatusIds::UNAVAILABLE, {});
break;

case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusUnknown:
case TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusAborted:
TxProxyMon->ClientTxStatusCoordinatorDeclined->Inc();
ReplyErrorAndDie(TStringBuilder() << "Unexpected TEvProposeTransactionStatus status: " << res->GetStatus(), NYql::NDqProto::StatusIds::INTERNAL_ERROR, {});
break;
}
Expand Down Expand Up @@ -1797,7 +1803,6 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());

ReplyErrorAndDie(
TStringBuilder() << "Internal error for table. "
<< getIssues().ToOneLineString(),
Expand All @@ -1810,11 +1815,10 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());

ReplyErrorAndDie(
TStringBuilder() << "Disk space exhausted for table. "
<< getIssues().ToOneLineString(),
NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
NYql::NDqProto::StatusIds::UNAVAILABLE,
getIssues());
return;
}
Expand All @@ -1824,7 +1828,6 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
<< " Sink=" << this->SelfId() << "."
<< " Ignored this error."
<< getIssues().ToOneLineString());
// TODO: support waiting
ReplyErrorAndDie(
TStringBuilder() << "Tablet " << ev->Get()->Record.GetOrigin() << " is overloaded."
<< getIssues().ToOneLineString(),
Expand Down Expand Up @@ -1886,11 +1889,23 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
}
}

void OnMessageReceived(const ui64 shardId) {
if (auto it = SendTime.find(shardId); it != std::end(SendTime)) {
Counters->WriteActorWritesLatencyHistogram->Collect((TInstant::Now() - it->second).MilliSeconds());
SendTime.erase(it);
}
}

void ProcessWritePreparedShard(NKikimr::NEvents::TDataEvents::TEvWriteResult::TPtr& ev) {
if (State != EState::PREPARING) {
CA_LOG_D("Ignored write prepared event.");
return;
}
OnMessageReceived(ev->Get()->Record.GetOrigin());
CA_LOG_D("Got prepared result TxId=" << ev->Get()->Record.GetTxId()
<< ", TabletId=" << ev->Get()->Record.GetOrigin()
<< ", Cookie=" << ev->Cookie);

const auto& record = ev->Get()->Record;
IKqpTransactionManager::TPrepareResult preparedInfo;
preparedInfo.ShardId = record.GetOrigin();
Expand All @@ -1912,6 +1927,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
CA_LOG_D("Ignored write completed event.");
return;
}
OnMessageReceived(ev->Get()->Record.GetOrigin());
CA_LOG_D("Got completed result TxId=" << ev->Get()->Record.GetTxId()
<< ", TabletId=" << ev->Get()->Record.GetOrigin()
<< ", Cookie=" << ev->Cookie
Expand Down Expand Up @@ -2033,6 +2049,9 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
IShardedWriteControllerPtr ShardedWriteController = nullptr;

TIntrusivePtr<TKqpCounters> Counters;
TIntrusivePtr<NTxProxy::TTxProxyMon> TxProxyMon;
THashMap<ui64, TInstant> SendTime;

NWilson::TSpan BufferWriteActor;
NWilson::TSpan BufferWriteActorState;
};
Expand Down
Loading

0 comments on commit 6c7ed0d

Please sign in to comment.