Skip to content

Commit

Permalink
fetcher accessors signals (#13038)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Dec 27, 2024
1 parent 24c8b75 commit 094be61
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 6 deletions.
6 changes: 6 additions & 0 deletions ydb/core/tx/columnshard/data_accessor/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ void TLocalManager::DrainQueue() {
auto it = RequestsByPortion.find(accessor.GetPortionInfo().GetPortionId());
AFL_VERIFY(it != RequestsByPortion.end());
for (auto&& i : it->second) {
Counters.ResultFromCache->Add(1);
if (!i->IsFetched() && !i->IsAborted()) {
i->AddAccessor(accessor);
}
Expand All @@ -66,11 +67,14 @@ void TLocalManager::DrainQueue() {
--countToFlight;
}
if (dataAnalyzed.GetPortionsToAsk().size()) {
Counters.ResultAskDirectly->Add(dataAnalyzed.GetPortionsToAsk().size());
it->second->AskData(dataAnalyzed.GetPortionsToAsk(), AccessorCallback, "ANALYZE");
}
}
}
PortionsAskInFlight += countToFlight;
Counters.FetchingCount->Set(PortionsAskInFlight);
Counters.QueueSize->Set(PortionsAsk.size());
}

void TLocalManager::DoAskData(const std::shared_ptr<TDataAccessorsRequest>& request) {
Expand All @@ -82,8 +86,10 @@ void TLocalManager::DoAskData(const std::shared_ptr<TDataAccessorsRequest>& requ
if (itRequest == RequestsByPortion.end()) {
AFL_VERIFY(RequestsByPortion.emplace(i->GetPortionId(), std::vector<std::shared_ptr<TDataAccessorsRequest>>({request})).second);
PortionsAsk.emplace_back(i, request->GetAbortionFlag());
Counters.AskNew->Add(1);
} else {
itRequest->second.emplace_back(request);
Counters.AskDuplication->Add(1);
}
}
}
Expand Down
33 changes: 27 additions & 6 deletions ydb/core/tx/columnshard/data_accessor/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,29 @@

namespace NKikimr::NOlap::NDataAccessorControl {

class TAccessorSignals: public NColumnShard::TCommonCountersOwner {
private:
using TBase = NColumnShard::TCommonCountersOwner;

public:
const NMonitoring::TDynamicCounters::TCounterPtr QueueSize;
const NMonitoring::TDynamicCounters::TCounterPtr FetchingCount;
const NMonitoring::TDynamicCounters::TCounterPtr AskNew;
const NMonitoring::TDynamicCounters::TCounterPtr AskDuplication;
const NMonitoring::TDynamicCounters::TCounterPtr ResultFromCache;
const NMonitoring::TDynamicCounters::TCounterPtr ResultAskDirectly;

TAccessorSignals()
: TBase("AccessorsFetching")
, QueueSize(TBase::GetValue("Queue/Count"))
, FetchingCount(TBase::GetValue("Fetching/Count"))
, AskNew(TBase::GetDeriviative("Ask/Fault/Count"))
, AskDuplication(TBase::GetDeriviative("Ask/Duplication/Count"))
, ResultFromCache(TBase::GetDeriviative("ResultFromCache/Count"))
, ResultAskDirectly(TBase::GetDeriviative("ResultAskDirectly/Count")) {
}
};

class IDataAccessorsManager {
private:
virtual void DoAskData(const std::shared_ptr<TDataAccessorsRequest>& request) = 0;
Expand Down Expand Up @@ -80,10 +103,8 @@ class TActorAccessorsManager: public IDataAccessorsManager {
public:
TActorAccessorsManager(const NActors::TActorId& actorId, const NActors::TActorId& tabletActorId)
: TBase(tabletActorId)
, ActorId(actorId)
, AccessorsCallback(std::make_shared<TActorAccessorsCallback>(ActorId))
{

, ActorId(actorId)
, AccessorsCallback(std::make_shared<TActorAccessorsCallback>(ActorId)) {
AFL_VERIFY(!!tabletActorId);
}
};
Expand All @@ -93,6 +114,7 @@ class TLocalManager: public IDataAccessorsManager {
using TBase = IDataAccessorsManager;
THashMap<ui64, std::unique_ptr<IGranuleDataAccessor>> Managers;
THashMap<ui64, std::vector<std::shared_ptr<TDataAccessorsRequest>>> RequestsByPortion;
TAccessorSignals Counters;
const std::shared_ptr<IAccessorCallback> AccessorCallback;

class TPortionToAsk {
Expand Down Expand Up @@ -157,8 +179,7 @@ class TLocalManager: public IDataAccessorsManager {

TLocalManager(const std::shared_ptr<IAccessorCallback>& callback)
: TBase(NActors::TActorId())
, AccessorCallback(callback)
{
, AccessorCallback(callback) {
}
};

Expand Down

0 comments on commit 094be61

Please sign in to comment.