diff --git a/ydb/core/fq/libs/actors/pending_fetcher.cpp b/ydb/core/fq/libs/actors/pending_fetcher.cpp index 4f7eb80bd335..834dd624d307 100644 --- a/ydb/core/fq/libs/actors/pending_fetcher.cpp +++ b/ydb/core/fq/libs/actors/pending_fetcher.cpp @@ -159,6 +159,8 @@ class TPendingFetcher : public NActors::TActorBootstrapped { if (Monitoring) { Monitoring->RegisterActorPage(Monitoring->RegisterIndexPage("fq_diag", "Federated Query diagnostics"), "fetcher", "Pending Fetcher", false, TActivationContext::ActorSystem(), SelfId()); + Monitoring->RegisterActorPage(Monitoring->RegisterIndexPage("fq_diag", "Federated Query diagnostics"), + "local_worker_manager", "Local Worker Manager", false, TActivationContext::ActorSystem(), NYql::NDqs::MakeWorkerManagerActorID(SelfId().NodeId())); } Become(&TPendingFetcher::StateFunc); diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 9bb9b48064d5..df47796f5da5 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -181,11 +181,179 @@ class TDqAsyncComputeActor : public TDqComputeActorBaseCommon"; + html << "Cookie: " << Cookie << "
"; + html << "MkqlMemoryLimit: " << MkqlMemoryLimit << "
"; + html << "SentStatsRequest: " << SentStatsRequest << "
"; + + html << "

State

"; + html << "
" << ComputeActorState.DebugString() << "
"; + + html << "

Watermarks

"; + for (const auto& [time, id]: WatermarkTakeInputChannelDataRequests) { + html << "WatermarkTakeInputChannelDataRequests: " << time.ToString() << " " << id << "
"; + } + + html << "

CPU Quota

"; + html << "QuoterServiceActorId: " << QuoterServiceActorId.ToString() << "
"; + if (ContinueRunEvent) { + html << "ContinueRunEvent.AskFreeSpace: " << ContinueRunEvent->AskFreeSpace << "
"; + html << "ContinueRunEvent.AskFreeSpace: " << ContinueRunEvent->CheckpointOnly << "
"; + html << "ContinueRunEvent.AskFreeSpace: " << ContinueRunEvent->CheckpointRequest.Defined() << "
"; + html << "ContinueRunEvent.AskFreeSpace: " << ContinueRunEvent->WatermarkRequest.Defined() << "
"; + html << "ContinueRunEvent.AskFreeSpace: " << ContinueRunEvent->CheckpointOnly << "
"; + html << "ContinueRunEvent.AskFreeSpace: " << ContinueRunEvent->MemLimit << "
"; + for (const auto& sinkId: ContinueRunEvent->SinkIds) { + html << "ContinueRunEvent.SinkIds: " << sinkId << "
"; + } + + for (const auto& inputTransformId: ContinueRunEvent->InputTransformIds) { + html << "ContinueRunEvent.InputTransformIds: " << inputTransformId << "
"; + } + } + + html << "ContinueRunStartWaitTime: " << ContinueRunStartWaitTime.ToString() << "
"; + html << "ContinueRunInflight: " << ContinueRunInflight << "
"; + html << "CpuTimeSpent: " << CpuTimeSpent.ToString() << "
"; + html << "CpuTimeQuotaAsked: " << CpuTimeQuotaAsked.ToString() << "
"; + html << "UseCpuQuota: " << UseCpuQuota() << "
"; + + html << "

Checkpoints

"; + html << "ReadyToCheckpoint: " << ReadyToCheckpoint() << "
"; + html << "CheckpointRequestedFromTaskRunner: " << CheckpointRequestedFromTaskRunner << "
"; + + html << "

InputChannels

"; + for (const auto& [id, info]: InputChannelsMap) { + html << "

Input Channel Id: " << id << "

"; + html << "LogPrefix: " << info.LogPrefix << "
"; + html << "ChannelId: " << info.ChannelId << "
"; + html << "SrcStageId: " << info.SrcStageId << "
"; + html << "HasPeer: " << info.HasPeer << "
"; + html << "PendingWatermarks: " << !info.PendingWatermarks.empty() << " " << (info.PendingWatermarks.empty() ? TString{} : info.PendingWatermarks.back().ToString()) << "
"; + html << "WatermarksMode: " << NDqProto::EWatermarksMode_Name(info.WatermarksMode) << "
"; + html << "PendingCheckpoint: " << info.PendingCheckpoint.has_value() << " " << (info.PendingCheckpoint ? TStringBuilder{} << info.PendingCheckpoint->GetId() << " " << info.PendingCheckpoint->GetGeneration() : TString{}) << "
"; + html << "CheckpointingMode: " << NDqProto::ECheckpointingMode_Name(info.CheckpointingMode) << "
"; + html << "FreeSpace: " << info.FreeSpace << "
"; + html << "IsPaused: " << info.IsPaused() << "
"; + if (info.Channel) { + html << "DqInputChannel.ChannelId: " << info.Channel->GetChannelId() << "
"; + html << "DqInputChannel.FreeSpace: " << info.Channel->GetFreeSpace() << "
"; + html << "DqInputChannel.StoredBytes: " << info.Channel->GetStoredBytes() << "
"; + html << "DqInputChannel.Empty: " << info.Channel->Empty() << "
"; + html << "DqInputChannel.InputType: " << (info.Channel->GetInputType() ? info.Channel->GetInputType()->GetKindAsStr() : TString{"unknown"}) << "
"; + html << "DqInputChannel.InputWidth: " << (info.Channel->GetInputWidth() ? ToString(*info.Channel->GetInputWidth()) : TString{"unknown"}) << "
"; + html << "DqInputChannel.IsFinished: " << info.Channel->IsFinished() << "
"; + + const auto& pushStats = info.Channel->GetPushStats(); + html << "DqInputChannel.PushStats.ChannelId: " << pushStats.ChannelId << "
"; + html << "DqInputChannel.PushStats.SrcStageId: " << pushStats.SrcStageId << "
"; + html << "DqInputChannel.PushStats.RowsInMemory: " << pushStats.RowsInMemory << "
"; + html << "DqInputChannel.PushStats.MaxMemoryUsage: " << pushStats.MaxMemoryUsage << "
"; + html << "DqInputChannel.PushStats.DeserializationTime: " << pushStats.DeserializationTime.ToString() << "
"; + html << "DqInputChannel.PushStats.Level: " << static_cast(pushStats.Level) << "
"; + html << "DqInputChannel.PushStats.MinWaitDuration: " << pushStats.MinWaitDuration.ToString() << "
"; + html << "DqInputChannel.PushStats.CurrentPauseTs: " << (pushStats.CurrentPauseTs ? pushStats.CurrentPauseTs->ToString() : TString{}) << "
"; + html << "DqInputChannel.PushStats.MergeWaitPeriod: " << pushStats.MergeWaitPeriod << "
"; + html << "DqInputChannel.PushStats.Bytes: " << pushStats.Bytes << "
"; + html << "DqInputChannel.PushStats.Rows: " << pushStats.Rows << "
"; + html << "DqInputChannel.PushStats.Chunks: " << pushStats.Chunks << "
"; + html << "DqInputChannel.PushStats.Splits: " << pushStats.Splits << "
"; + html << "DqInputChannel.PushStats.FirstMessageTs: " << pushStats.FirstMessageTs.ToString() << "
"; + html << "DqInputChannel.PushStats.PauseMessageTs: " << pushStats.PauseMessageTs.ToString() << "
"; + html << "DqInputChannel.PushStats.ResumeMessageTs: " << pushStats.ResumeMessageTs.ToString() << "
"; + html << "DqInputChannel.PushStats.LastMessageTs: " << pushStats.LastMessageTs.ToString() << "
"; + html << "DqInputChannel.PushStats.WaitTime: " << pushStats.WaitTime.ToString() << "
"; + + const auto& popStats = info.Channel->GetPopStats(); + html << "DqInputChannel.PopStats.Bytes: " << popStats.Bytes << "
"; + html << "DqInputChannel.PopStats.Rows: " << popStats.Rows << "
"; + html << "DqInputChannel.PopStats.Chunks: " << popStats.Chunks << "
"; + html << "DqInputChannel.PopStats.Splits: " << popStats.Splits << "
"; + html << "DqInputChannel.PopStats.FirstMessageTs: " << popStats.FirstMessageTs.ToString() << "
"; + html << "DqInputChannel.PopStats.PauseMessageTs: " << popStats.PauseMessageTs.ToString() << "
"; + html << "DqInputChannel.PopStats.ResumeMessageTs: " << popStats.ResumeMessageTs.ToString() << "
"; + html << "DqInputChannel.PopStats.LastMessageTs: " << popStats.LastMessageTs.ToString() << "
"; + html << "DqInputChannel.PopStats.WaitTime: " << popStats.WaitTime.ToString() << "
"; + } + } + + html << "

OutputChannels

"; + for (const auto& [id, info]: OutputChannelsMap) { + html << "

Input Channel Id: " << id << "

"; + html << "ChannelId: " << info.ChannelId << "
"; + html << "DstStageId: " << info.DstStageId << "
"; + html << "HasPeer: " << info.HasPeer << "
"; + html << "Finished: " << info.Finished << "
"; + html << "EarlyFinish: " << info.EarlyFinish << "
"; + html << "PopStarted: " << info.PopStarted << "
"; + html << "IsTransformOutput: " << info.IsTransformOutput << "
"; + html << "EWatermarksMode: " << NDqProto::EWatermarksMode_Name(info.WatermarksMode) << "
"; + + if (info.AsyncData) { + html << "AsyncData.DataSize: " << info.AsyncData->Data.size() << "
"; + html << "AsyncData.Changed: " << info.AsyncData->Changed << "
"; + html << "AsyncData.Checkpoint: " << info.AsyncData->Checkpoint << "
"; + html << "AsyncData.Finished: " << info.AsyncData->Finished << "
"; + html << "AsyncData.Watermark: " << info.AsyncData->Watermark << "
"; + } + + if (info.Channel) { + html << "DqOutputChannel.ChannelId: " << info.Channel->GetChannelId() << "
"; + html << "DqOutputChannel.ValuesCount: " << info.Channel->GetValuesCount() << "
"; + html << "DqOutputChannel.IsFull: " << info.Channel->IsFull() << "
"; + html << "DqOutputChannel.HasData: " << info.Channel->HasData() << "
"; + html << "DqOutputChannel.IsFinished: " << info.Channel->IsFinished() << "
"; + html << "DqInputChannel.OutputType: " << (info.Channel->GetOutputType() ? info.Channel->GetOutputType()->GetKindAsStr() : TString{"unknown"}) << "
"; + + const auto& pushStats = info.Channel->GetPushStats(); + html << "DqOutputChannel.PushStats.MaxRowsInMemory: " << pushStats.MaxRowsInMemory << "
"; + html << "DqOutputChannel.PushStats.MaxMemoryUsage: " << pushStats.MaxMemoryUsage << "
"; + html << "DqOutputChannel.PushStats.Level: " << static_cast(pushStats.Level) << "
"; + html << "DqOutputChannel.PushStats.MinWaitDuration: " << pushStats.MinWaitDuration.ToString() << "
"; + html << "DqOutputChannel.PushStats.CurrentPauseTs: " << (pushStats.CurrentPauseTs ? pushStats.CurrentPauseTs->ToString() : TString{}) << "
"; + html << "DqOutputChannel.PushStats.MergeWaitPeriod: " << pushStats.MergeWaitPeriod << "
"; + html << "DqOutputChannel.PushStats.Bytes: " << pushStats.Bytes << "
"; + html << "DqOutputChannel.PushStats.Rows: " << pushStats.Rows << "
"; + html << "DqOutputChannel.PushStats.Chunks: " << pushStats.Chunks << "
"; + html << "DqOutputChannel.PushStats.Splits: " << pushStats.Splits << "
"; + html << "DqOutputChannel.PushStats.FirstMessageTs: " << pushStats.FirstMessageTs.ToString() << "
"; + html << "DqOutputChannel.PushStats.PauseMessageTs: " << pushStats.PauseMessageTs.ToString() << "
"; + html << "DqOutputChannel.PushStats.ResumeMessageTs: " << pushStats.ResumeMessageTs.ToString() << "
"; + html << "DqOutputChannel.PushStats.LastMessageTs: " << pushStats.LastMessageTs.ToString() << "
"; + html << "DqOutputChannel.PushStats.WaitTime: " << pushStats.WaitTime.ToString() << "
"; + + const auto& popStats = info.Channel->GetPopStats(); + html << "DqOutputChannel.PopStats.ChannelId: " << popStats.ChannelId << "
"; + html << "DqOutputChannel.PopStats.DstStageId: " << popStats.DstStageId << "
"; + html << "DqOutputChannel.PopStats.MaxMemoryUsage: " << popStats.MaxMemoryUsage << "
"; + html << "DqOutputChannel.PopStats.MaxRowsInMemory: " << popStats.MaxRowsInMemory << "
"; + html << "DqOutputChannel.PopStats.SerializationTime: " << popStats.SerializationTime.ToString() << "
"; + html << "DqOutputChannel.PopStats.SpilledBytes: " << popStats.SpilledBytes << "
"; + html << "DqOutputChannel.PopStats.SpilledRows: " << popStats.SpilledRows << "
"; + html << "DqOutputChannel.PopStats.SpilledBlobs: " << popStats.SpilledBlobs << "
"; + html << "DqOutputChannel.PopStats.Bytes: " << popStats.Bytes << "
"; + html << "DqOutputChannel.PopStats.Rows: " << popStats.Rows << "
"; + html << "DqOutputChannel.PopStats.Chunks: " << popStats.Chunks << "
"; + html << "DqOutputChannel.PopStats.Splits: " << popStats.Splits << "
"; + html << "DqOutputChannel.PopStats.FirstMessageTs: " << popStats.FirstMessageTs.ToString() << "
"; + html << "DqOutputChannel.PopStats.PauseMessageTs: " << popStats.PauseMessageTs.ToString() << "
"; + html << "DqOutputChannel.PopStats.ResumeMessageTs: " << popStats.ResumeMessageTs.ToString() << "
"; + html << "DqOutputChannel.PopStats.LastMessageTs: " << popStats.LastMessageTs.ToString() << "
"; + html << "DqOutputChannel.PopStats.WaitTime: " << popStats.WaitTime.ToString() << "
"; + } + } + + Send(ev->Sender, new NActors::NMon::TEvHttpInfoRes(html.Str())); + } + void OnStateRequest(TEvDqCompute::TEvStateRequest::TPtr& ev) { CA_LOG_T("Got TEvStateRequest from actor " << ev->Sender << " PingCookie: " << ev->Cookie); if (!SentStatsRequest) { @@ -230,18 +398,18 @@ class TDqAsyncComputeActor : public TDqComputeActorBaseGet()->Stats); } - auto record = NDqProto::TEvComputeActorState(); - record.SetState(NDqProto::COMPUTE_STATE_EXECUTING); - record.SetStatusCode(NYql::NDqProto::StatusIds::SUCCESS); - record.SetTaskId(Task.GetId()); + ComputeActorState = NDqProto::TEvComputeActorState(); + ComputeActorState.SetState(NDqProto::COMPUTE_STATE_EXECUTING); + ComputeActorState.SetStatusCode(NYql::NDqProto::StatusIds::SUCCESS); + ComputeActorState.SetTaskId(Task.GetId()); NYql::TIssues issues; FillIssues(issues); - IssuesToMessage(issues, record.MutableIssues()); - FillStats(record.MutableStats(), /* last */ false); + IssuesToMessage(issues, ComputeActorState.MutableIssues()); + FillStats(ComputeActorState.MutableStats(), /* last */ false); for (const auto& [actorId, cookie] : WaitingForStateResponse) { auto state = MakeHolder(); - state->Record = record; + state->Record = ComputeActorState; Send(actorId, std::move(state), NActors::IEventHandle::FlagTrackDelivery, cookie); } WaitingForStateResponse.clear(); @@ -976,6 +1144,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase { hFunc(TEvConfigureFailureInjectorRequest, OnConfigureFailureInjector) HFunc(TEvRoutesRequest, OnRoutesRequest) hFunc(TEvQueryStatus, OnQueryStatus) + hFunc(NActors::NMon::TEvHttpInfo, OnMonitoringPage) }) TAutoPtr AfterRegister(const TActorId& self, const TActorId& parentId) override { @@ -110,6 +111,92 @@ class TLocalWorkerManager: public TWorkerManagerCommon { Send(SelfId(), new TEvents::TEvWakeup()); } + void OnMonitoringPage(NActors::NMon::TEvHttpInfo::TPtr& ev) { + TStringStream html; + const auto& params = ev->Get()->Request.GetParams(); + if (params.Has("get")) { + TString txId = params.Get("tx_id"); + ui64 taskId = 0; + try { + taskId = std::stoull(params.Get("task_id")); + } catch (...) { + // ¯\_(ツ)_/¯ + } + + for (const auto& [_, workersInfo]: AllocatedWorkers) { + auto* traceId = std::get_if(&workersInfo.TxId); + if (traceId && *traceId != txId) { + continue; + } + + for (size_t i = 0; i < workersInfo.WorkerActors.ActorIds.size(); i++) { + auto workerTaskId = workersInfo.WorkerActors.TaskIds[i]; + auto actorId = workersInfo.WorkerActors.ActorIds[i]; + if (workerTaskId == taskId) { + Send(ev->Forward(actorId)); + return; + } + } + } + + html << "
Couldn't find the worker with parameters: TxId = " << txId << ", TaskId: " << taskId << "
"; + } + + html << "
"; + html << "

TxId:

"; + html << "

TaskId:

"; + html << ""; + html << "
"; + + // Table with known workers info + html << "
"; + html << ""; + html << ""; + html << ""; + html << ""; + html << ""; + html << ""; + html << ""; + html << ""; + html << ""; + html << ""; + + for (const auto& [ResourceId, workersInfo]: AllocatedWorkers) { + auto* traceId = std::get_if(&workersInfo.TxId); + html << ""; + html << ""; + html << ""; + html << ""; + html << ""; + html << ""; + html << "\n"; + + for (size_t i = 0; i < workersInfo.WorkerActors.ActorIds.size(); i++) { + auto workerTaskId = workersInfo.WorkerActors.TaskIds[i]; + auto actorId = workersInfo.WorkerActors.ActorIds[i]; + html << ""; + html << ""; + html << ""; + html << ""; + html << ""; + html << ""; + html << ""; + html << ""; + html << "\n"; + } + } + + html << "
ResourceIdSenderDeadlineTxIdWorkerActorTaskIdLink
" << ResourceId << "" << workersInfo.Sender.ToString() << "" << workersInfo.Deadline.ToString() << "" << (traceId ? *traceId : "") << "
" << ResourceId << "" << workersInfo.Sender.ToString() << "" << workersInfo.Deadline.ToString() << "" << (traceId ? *traceId : "") << "" << actorId.ToString() << "" << workerTaskId <<""; + html << "
"; + html << ""; + html << ""; + html << ""; + html << "
"; + html <<"
"; + + Send(ev->Sender, new NActors::NMon::TEvHttpInfoRes(html.Str())); + } + void WakeUp() { auto currentRusage = TRusage::Get(); TRusage delta; @@ -261,8 +348,9 @@ class TLocalWorkerManager: public TWorkerManagerCommon { auto& allocationInfo = AllocatedWorkers[resourceId]; allocationInfo.TxId = traceId; - if (allocationInfo.WorkerActors.empty()) { - allocationInfo.WorkerActors.reserve(count); + if (allocationInfo.WorkerActors.ActorIds.empty()) { + allocationInfo.WorkerActors.ActorIds.reserve(count); + allocationInfo.WorkerActors.TaskIds.reserve(count); allocationInfo.Sender = ev->Sender; if (ev->Get()->Record.GetFreeWorkerAfterMs()) { allocationInfo.Deadline = @@ -283,10 +371,12 @@ class TLocalWorkerManager: public TWorkerManagerCommon { for (ui32 i = 0; i < count; i++) { THolder actor; - + ui64 taskId = 0; if (createComputeActor) { YQL_CLOG(DEBUG, ProviderDq) << "Create compute actor: " << computeActorType; + NYql::NDqProto::TDqTask* taskPtr = &(tasks[i]); + taskId = taskPtr->id(); actor.Reset(NYql::CreateComputeActor( Options, std::make_shared(MemoryQuoter, allocationInfo.TxId, quotas[i]), @@ -304,16 +394,17 @@ class TLocalWorkerManager: public TWorkerManagerCommon { Options.TaskRunnerActorFactory, Options.AsyncIoFactory)); } - allocationInfo.WorkerActors.emplace_back(RegisterChild( + allocationInfo.WorkerActors.ActorIds.emplace_back(RegisterChild( actor.Release(), createComputeActor ? NYql::NDq::TEvDq::TEvAbortExecution::Unavailable("Aborted by LWM").Release() : nullptr )); + allocationInfo.WorkerActors.TaskIds.emplace_back(taskId); } Options.Counters.ActiveWorkers->Add(count); } Send(ev->Sender, - MakeHolder(resourceId, allocationInfo.WorkerActors), + MakeHolder(resourceId, allocationInfo.WorkerActors.ActorIds), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, ev->Cookie); Subscribe(ev->Sender.NodeId()); @@ -333,13 +424,13 @@ class TLocalWorkerManager: public TWorkerManagerCommon { void DropTaskCounters(const auto& info) { auto traceId = std::get(info.TxId); if (auto it = TaskCountersMap.find(traceId); it != TaskCountersMap.end()) { - if (it->second.ReferenceCount <= info.WorkerActors.size()) { + if (it->second.ReferenceCount <= info.WorkerActors.ActorIds.size()) { if (TaskCounters) { TaskCounters->RemoveSubgroup("operation", traceId); } TaskCountersMap.erase(it); } else { - it->second.ReferenceCount -= info.WorkerActors.size(); + it->second.ReferenceCount -= info.WorkerActors.ActorIds.size(); } } } @@ -348,7 +439,7 @@ class TLocalWorkerManager: public TWorkerManagerCommon { YQL_CLOG(DEBUG, ProviderDq) << "Free Group " << id; auto it = AllocatedWorkers.find(id); if (it != AllocatedWorkers.end()) { - for (const auto& actorId : it->second.WorkerActors) { + for (const auto& actorId : it->second.WorkerActors.ActorIds) { UnregisterChild(actorId); } @@ -361,7 +452,7 @@ class TLocalWorkerManager: public TWorkerManagerCommon { DropTaskCounters(it->second); } - Options.Counters.ActiveWorkers->Sub(it->second.WorkerActors.size()); + Options.Counters.ActiveWorkers->Sub(it->second.WorkerActors.ActorIds.size()); AllocatedWorkers.erase(it); } } @@ -384,7 +475,12 @@ class TLocalWorkerManager: public TWorkerManagerCommon { NMonitoring::TDynamicCounterPtr TaskCounters; struct TAllocationInfo { - TVector WorkerActors; + struct TWorkerInfo { + TVector ActorIds; + TVector TaskIds; + }; + + TWorkerInfo WorkerActors; NActors::TActorId Sender; TInstant Deadline; NDq::TTxId TxId;