Skip to content

Commit

Permalink
Mon page for async CA (ydb-platform#5047) (ydb-platform#5249)
Browse files Browse the repository at this point in the history
  • Loading branch information
dorooleg committed Jul 12, 2024
1 parent d54e24e commit 7947a77
Show file tree
Hide file tree
Showing 3 changed files with 284 additions and 17 deletions.
2 changes: 2 additions & 0 deletions ydb/core/fq/libs/actors/pending_fetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
if (Monitoring) {
Monitoring->RegisterActorPage(Monitoring->RegisterIndexPage("fq_diag", "Federated Query diagnostics"),
"fetcher", "Pending Fetcher", false, TActivationContext::ActorSystem(), SelfId());
Monitoring->RegisterActorPage(Monitoring->RegisterIndexPage("fq_diag", "Federated Query diagnostics"),
"local_worker_manager", "Local Worker Manager", false, TActivationContext::ActorSystem(), NYql::NDqs::MakeWorkerManagerActorID(SelfId().NodeId()));
}

Become(&TPendingFetcher::StateFunc);
Expand Down
183 changes: 176 additions & 7 deletions ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,179 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
hFunc(TEvDqCompute::TEvInjectCheckpoint, OnInjectCheckpoint);
hFunc(TEvDqCompute::TEvRestoreFromCheckpoint, OnRestoreFromCheckpoint);
hFunc(NKikimr::TEvQuota::TEvClearance, OnCpuQuotaGiven);
hFunc(NActors::NMon::TEvHttpInfo, OnMonitoringPage)
default:
TBase::BaseStateFuncBody(ev);
};
};

void OnMonitoringPage(NActors::NMon::TEvHttpInfo::TPtr& ev) {
TStringStream html;
html << "<h3>Common</h3>";
html << "Cookie: " << Cookie << "<br />";
html << "MkqlMemoryLimit: " << MkqlMemoryLimit << "<br />";
html << "SentStatsRequest: " << SentStatsRequest << "<br />";

html << "<h3>State</h3>";
html << "<pre>" << ComputeActorState.DebugString() << "</pre>";

html << "<h3>Watermarks</h3>";
for (const auto& [time, id]: WatermarkTakeInputChannelDataRequests) {
html << "WatermarkTakeInputChannelDataRequests: " << time.ToString() << " " << id << "<br />";
}

html << "<h3>CPU Quota</h3>";
html << "QuoterServiceActorId: " << QuoterServiceActorId.ToString() << "<br />";
if (ContinueRunEvent) {
html << "ContinueRunEvent.AskFreeSpace: " << ContinueRunEvent->AskFreeSpace << "<br />";
html << "ContinueRunEvent.AskFreeSpace: " << ContinueRunEvent->CheckpointOnly << "<br />";
html << "ContinueRunEvent.AskFreeSpace: " << ContinueRunEvent->CheckpointRequest.Defined() << "<br />";
html << "ContinueRunEvent.AskFreeSpace: " << ContinueRunEvent->WatermarkRequest.Defined() << "<br />";
html << "ContinueRunEvent.AskFreeSpace: " << ContinueRunEvent->CheckpointOnly << "<br />";
html << "ContinueRunEvent.AskFreeSpace: " << ContinueRunEvent->MemLimit << "<br />";
for (const auto& sinkId: ContinueRunEvent->SinkIds) {
html << "ContinueRunEvent.SinkIds: " << sinkId << "<br />";
}

for (const auto& inputTransformId: ContinueRunEvent->InputTransformIds) {
html << "ContinueRunEvent.InputTransformIds: " << inputTransformId << "<br />";
}
}

html << "ContinueRunStartWaitTime: " << ContinueRunStartWaitTime.ToString() << "<br />";
html << "ContinueRunInflight: " << ContinueRunInflight << "<br />";
html << "CpuTimeSpent: " << CpuTimeSpent.ToString() << "<br />";
html << "CpuTimeQuotaAsked: " << CpuTimeQuotaAsked.ToString() << "<br />";
html << "UseCpuQuota: " << UseCpuQuota() << "<br />";

html << "<h3>Checkpoints</h3>";
html << "ReadyToCheckpoint: " << ReadyToCheckpoint() << "<br />";
html << "CheckpointRequestedFromTaskRunner: " << CheckpointRequestedFromTaskRunner << "<br />";

html << "<h3>InputChannels</h3>";
for (const auto& [id, info]: InputChannelsMap) {
html << "<h4>Input Channel Id: " << id << "</h4>";
html << "LogPrefix: " << info.LogPrefix << "<br />";
html << "ChannelId: " << info.ChannelId << "<br />";
html << "SrcStageId: " << info.SrcStageId << "<br />";
html << "HasPeer: " << info.HasPeer << "<br />";
html << "PendingWatermarks: " << !info.PendingWatermarks.empty() << " " << (info.PendingWatermarks.empty() ? TString{} : info.PendingWatermarks.back().ToString()) << "<br />";
html << "WatermarksMode: " << NDqProto::EWatermarksMode_Name(info.WatermarksMode) << "<br />";
html << "PendingCheckpoint: " << info.PendingCheckpoint.has_value() << " " << (info.PendingCheckpoint ? TStringBuilder{} << info.PendingCheckpoint->GetId() << " " << info.PendingCheckpoint->GetGeneration() : TString{}) << "<br />";
html << "CheckpointingMode: " << NDqProto::ECheckpointingMode_Name(info.CheckpointingMode) << "<br />";
html << "FreeSpace: " << info.FreeSpace << "<br />";
html << "IsPaused: " << info.IsPaused() << "<br />";
if (info.Channel) {
html << "DqInputChannel.ChannelId: " << info.Channel->GetChannelId() << "<br />";
html << "DqInputChannel.FreeSpace: " << info.Channel->GetFreeSpace() << "<br />";
html << "DqInputChannel.StoredBytes: " << info.Channel->GetStoredBytes() << "<br />";
html << "DqInputChannel.Empty: " << info.Channel->Empty() << "<br />";
html << "DqInputChannel.InputType: " << (info.Channel->GetInputType() ? info.Channel->GetInputType()->GetKindAsStr() : TString{"unknown"}) << "<br />";
html << "DqInputChannel.InputWidth: " << (info.Channel->GetInputWidth() ? ToString(*info.Channel->GetInputWidth()) : TString{"unknown"}) << "<br />";
html << "DqInputChannel.IsFinished: " << info.Channel->IsFinished() << "<br />";

const auto& pushStats = info.Channel->GetPushStats();
html << "DqInputChannel.PushStats.ChannelId: " << pushStats.ChannelId << "<br />";
html << "DqInputChannel.PushStats.SrcStageId: " << pushStats.SrcStageId << "<br />";
html << "DqInputChannel.PushStats.RowsInMemory: " << pushStats.RowsInMemory << "<br />";
html << "DqInputChannel.PushStats.MaxMemoryUsage: " << pushStats.MaxMemoryUsage << "<br />";
html << "DqInputChannel.PushStats.DeserializationTime: " << pushStats.DeserializationTime.ToString() << "<br />";
html << "DqInputChannel.PushStats.Level: " << static_cast<int>(pushStats.Level) << "<br />";
html << "DqInputChannel.PushStats.MinWaitDuration: " << pushStats.MinWaitDuration.ToString() << "<br />";
html << "DqInputChannel.PushStats.CurrentPauseTs: " << (pushStats.CurrentPauseTs ? pushStats.CurrentPauseTs->ToString() : TString{}) << "<br />";
html << "DqInputChannel.PushStats.MergeWaitPeriod: " << pushStats.MergeWaitPeriod << "<br />";
html << "DqInputChannel.PushStats.Bytes: " << pushStats.Bytes << "<br />";
html << "DqInputChannel.PushStats.Rows: " << pushStats.Rows << "<br />";
html << "DqInputChannel.PushStats.Chunks: " << pushStats.Chunks << "<br />";
html << "DqInputChannel.PushStats.Splits: " << pushStats.Splits << "<br />";
html << "DqInputChannel.PushStats.FirstMessageTs: " << pushStats.FirstMessageTs.ToString() << "<br />";
html << "DqInputChannel.PushStats.PauseMessageTs: " << pushStats.PauseMessageTs.ToString() << "<br />";
html << "DqInputChannel.PushStats.ResumeMessageTs: " << pushStats.ResumeMessageTs.ToString() << "<br />";
html << "DqInputChannel.PushStats.LastMessageTs: " << pushStats.LastMessageTs.ToString() << "<br />";
html << "DqInputChannel.PushStats.WaitTime: " << pushStats.WaitTime.ToString() << "<br />";

const auto& popStats = info.Channel->GetPopStats();
html << "DqInputChannel.PopStats.Bytes: " << popStats.Bytes << "<br />";
html << "DqInputChannel.PopStats.Rows: " << popStats.Rows << "<br />";
html << "DqInputChannel.PopStats.Chunks: " << popStats.Chunks << "<br />";
html << "DqInputChannel.PopStats.Splits: " << popStats.Splits << "<br />";
html << "DqInputChannel.PopStats.FirstMessageTs: " << popStats.FirstMessageTs.ToString() << "<br />";
html << "DqInputChannel.PopStats.PauseMessageTs: " << popStats.PauseMessageTs.ToString() << "<br />";
html << "DqInputChannel.PopStats.ResumeMessageTs: " << popStats.ResumeMessageTs.ToString() << "<br />";
html << "DqInputChannel.PopStats.LastMessageTs: " << popStats.LastMessageTs.ToString() << "<br />";
html << "DqInputChannel.PopStats.WaitTime: " << popStats.WaitTime.ToString() << "<br />";
}
}

html << "<h3>OutputChannels</h3>";
for (const auto& [id, info]: OutputChannelsMap) {
html << "<h4>Input Channel Id: " << id << "</h4>";
html << "ChannelId: " << info.ChannelId << "<br />";
html << "DstStageId: " << info.DstStageId << "<br />";
html << "HasPeer: " << info.HasPeer << "<br />";
html << "Finished: " << info.Finished << "<br />";
html << "EarlyFinish: " << info.EarlyFinish << "<br />";
html << "PopStarted: " << info.PopStarted << "<br />";
html << "IsTransformOutput: " << info.IsTransformOutput << "<br />";
html << "EWatermarksMode: " << NDqProto::EWatermarksMode_Name(info.WatermarksMode) << "<br />";

if (info.AsyncData) {
html << "AsyncData.DataSize: " << info.AsyncData->Data.size() << "<br />";
html << "AsyncData.Changed: " << info.AsyncData->Changed << "<br />";
html << "AsyncData.Checkpoint: " << info.AsyncData->Checkpoint << "<br />";
html << "AsyncData.Finished: " << info.AsyncData->Finished << "<br />";
html << "AsyncData.Watermark: " << info.AsyncData->Watermark << "<br />";
}

if (info.Channel) {
html << "DqOutputChannel.ChannelId: " << info.Channel->GetChannelId() << "<br />";
html << "DqOutputChannel.ValuesCount: " << info.Channel->GetValuesCount() << "<br />";
html << "DqOutputChannel.IsFull: " << info.Channel->IsFull() << "<br />";
html << "DqOutputChannel.HasData: " << info.Channel->HasData() << "<br />";
html << "DqOutputChannel.IsFinished: " << info.Channel->IsFinished() << "<br />";
html << "DqInputChannel.OutputType: " << (info.Channel->GetOutputType() ? info.Channel->GetOutputType()->GetKindAsStr() : TString{"unknown"}) << "<br />";

const auto& pushStats = info.Channel->GetPushStats();
html << "DqOutputChannel.PushStats.MaxRowsInMemory: " << pushStats.MaxRowsInMemory << "<br />";
html << "DqOutputChannel.PushStats.MaxMemoryUsage: " << pushStats.MaxMemoryUsage << "<br />";
html << "DqOutputChannel.PushStats.Level: " << static_cast<int>(pushStats.Level) << "<br />";
html << "DqOutputChannel.PushStats.MinWaitDuration: " << pushStats.MinWaitDuration.ToString() << "<br />";
html << "DqOutputChannel.PushStats.CurrentPauseTs: " << (pushStats.CurrentPauseTs ? pushStats.CurrentPauseTs->ToString() : TString{}) << "<br />";
html << "DqOutputChannel.PushStats.MergeWaitPeriod: " << pushStats.MergeWaitPeriod << "<br />";
html << "DqOutputChannel.PushStats.Bytes: " << pushStats.Bytes << "<br />";
html << "DqOutputChannel.PushStats.Rows: " << pushStats.Rows << "<br />";
html << "DqOutputChannel.PushStats.Chunks: " << pushStats.Chunks << "<br />";
html << "DqOutputChannel.PushStats.Splits: " << pushStats.Splits << "<br />";
html << "DqOutputChannel.PushStats.FirstMessageTs: " << pushStats.FirstMessageTs.ToString() << "<br />";
html << "DqOutputChannel.PushStats.PauseMessageTs: " << pushStats.PauseMessageTs.ToString() << "<br />";
html << "DqOutputChannel.PushStats.ResumeMessageTs: " << pushStats.ResumeMessageTs.ToString() << "<br />";
html << "DqOutputChannel.PushStats.LastMessageTs: " << pushStats.LastMessageTs.ToString() << "<br />";
html << "DqOutputChannel.PushStats.WaitTime: " << pushStats.WaitTime.ToString() << "<br />";

const auto& popStats = info.Channel->GetPopStats();
html << "DqOutputChannel.PopStats.ChannelId: " << popStats.ChannelId << "<br />";
html << "DqOutputChannel.PopStats.DstStageId: " << popStats.DstStageId << "<br />";
html << "DqOutputChannel.PopStats.MaxMemoryUsage: " << popStats.MaxMemoryUsage << "<br />";
html << "DqOutputChannel.PopStats.MaxRowsInMemory: " << popStats.MaxRowsInMemory << "<br />";
html << "DqOutputChannel.PopStats.SerializationTime: " << popStats.SerializationTime.ToString() << "<br />";
html << "DqOutputChannel.PopStats.SpilledBytes: " << popStats.SpilledBytes << "<br />";
html << "DqOutputChannel.PopStats.SpilledRows: " << popStats.SpilledRows << "<br />";
html << "DqOutputChannel.PopStats.SpilledBlobs: " << popStats.SpilledBlobs << "<br />";
html << "DqOutputChannel.PopStats.Bytes: " << popStats.Bytes << "<br />";
html << "DqOutputChannel.PopStats.Rows: " << popStats.Rows << "<br />";
html << "DqOutputChannel.PopStats.Chunks: " << popStats.Chunks << "<br />";
html << "DqOutputChannel.PopStats.Splits: " << popStats.Splits << "<br />";
html << "DqOutputChannel.PopStats.FirstMessageTs: " << popStats.FirstMessageTs.ToString() << "<br />";
html << "DqOutputChannel.PopStats.PauseMessageTs: " << popStats.PauseMessageTs.ToString() << "<br />";
html << "DqOutputChannel.PopStats.ResumeMessageTs: " << popStats.ResumeMessageTs.ToString() << "<br />";
html << "DqOutputChannel.PopStats.LastMessageTs: " << popStats.LastMessageTs.ToString() << "<br />";
html << "DqOutputChannel.PopStats.WaitTime: " << popStats.WaitTime.ToString() << "<br />";
}
}

Send(ev->Sender, new NActors::NMon::TEvHttpInfoRes(html.Str()));
}

void OnStateRequest(TEvDqCompute::TEvStateRequest::TPtr& ev) {
CA_LOG_T("Got TEvStateRequest from actor " << ev->Sender << " PingCookie: " << ev->Cookie);
if (!SentStatsRequest) {
Expand Down Expand Up @@ -230,18 +398,18 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
CA_LOG_T("update task runner stats");
TaskRunnerStats = std::move(ev->Get()->Stats);
}
auto record = NDqProto::TEvComputeActorState();
record.SetState(NDqProto::COMPUTE_STATE_EXECUTING);
record.SetStatusCode(NYql::NDqProto::StatusIds::SUCCESS);
record.SetTaskId(Task.GetId());
ComputeActorState = NDqProto::TEvComputeActorState();
ComputeActorState.SetState(NDqProto::COMPUTE_STATE_EXECUTING);
ComputeActorState.SetStatusCode(NYql::NDqProto::StatusIds::SUCCESS);
ComputeActorState.SetTaskId(Task.GetId());
NYql::TIssues issues;
FillIssues(issues);

IssuesToMessage(issues, record.MutableIssues());
FillStats(record.MutableStats(), /* last */ false);
IssuesToMessage(issues, ComputeActorState.MutableIssues());
FillStats(ComputeActorState.MutableStats(), /* last */ false);
for (const auto& [actorId, cookie] : WaitingForStateResponse) {
auto state = MakeHolder<TEvDqCompute::TEvState>();
state->Record = record;
state->Record = ComputeActorState;
Send(actorId, std::move(state), NActors::IEventHandle::FlagTrackDelivery, cookie);
}
WaitingForStateResponse.clear();
Expand Down Expand Up @@ -976,6 +1144,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
NMonitoring::THistogramPtr CpuTimeGetQuotaLatency;
NMonitoring::THistogramPtr CpuTimeQuotaWaitDelay;
NMonitoring::TDynamicCounters::TCounterPtr CpuTime;
NDqProto::TEvComputeActorState ComputeActorState;
};


Expand Down
Loading

0 comments on commit 7947a77

Please sign in to comment.