Skip to content

Commit

Permalink
24-3: Add ListNodes cache in DynamicNameserver (#12694) (#12747)
Browse files Browse the repository at this point in the history
  • Loading branch information
pixcc authored Dec 19, 2024
1 parent ee4ad94 commit 11eb58f
Show file tree
Hide file tree
Showing 15 changed files with 201 additions and 105 deletions.
5 changes: 3 additions & 2 deletions ydb/core/blobstorage/nodewarden/distconf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ namespace NKikimr::NStorage {
STLOG(PRI_DEBUG, BS_NODE, NWDC00, "Bootstrap");

auto ns = NNodeBroker::BuildNameserverTable(Cfg->NameserviceConfig);
auto ev = std::make_unique<TEvInterconnect::TEvNodesInfo>();
auto nodes = MakeIntrusive<TIntrusiveVector<TEvInterconnect::TNodeInfo>>();
for (const auto& [nodeId, item] : ns->StaticNodeTable) {
ev->Nodes.emplace_back(nodeId, item.Address, item.Host, item.ResolveHost, item.Port, item.Location);
nodes->emplace_back(nodeId, item.Address, item.Host, item.ResolveHost, item.Port, item.Location);
}
auto ev = std::make_unique<TEvInterconnect::TEvNodesInfo>(nodes);
Send(SelfId(), ev.release());

// and subscribe for the node list too
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/fq/libs/actors/nodes_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,13 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
void HandleResponse(NFq::TEvInternalService::TEvHealthCheckResponse::TPtr& ev) {
try {
const auto& status = ev->Get()->Status.GetStatus();
THolder<TEvInterconnect::TEvNodesInfo> nameServiceUpdateReq(new TEvInterconnect::TEvNodesInfo());
if (!ev->Get()->Status.IsSuccess()) {
ythrow yexception() << status << '\n' << ev->Get()->Status.GetIssues().ToString();
}
const auto& res = ev->Get()->Result;

auto& nodesInfo = nameServiceUpdateReq->Nodes;
nodesInfo.reserve(res.nodes().size());
auto nodesInfo = MakeIntrusive<TIntrusiveVector<TEvInterconnect::TNodeInfo>>();
nodesInfo->reserve(res.nodes().size());

Peers.clear();
std::set<ui32> nodeIds; // may be not unique
Expand All @@ -281,7 +280,7 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
node.active_workers(), node.memory_limit(), node.memory_allocated(), node.data_center()});

if (node.interconnect_port()) {
nodesInfo.emplace_back(TEvInterconnect::TNodeInfo{
nodesInfo->emplace_back(TEvInterconnect::TNodeInfo{
node.node_id(),
node.node_address(),
node.hostname(), // host
Expand All @@ -297,8 +296,9 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
ServiceCounters.Counters->GetCounter("PeerCount", false)->Set(Peers.size());
ServiceCounters.Counters->GetCounter("NodesHealthCheckOk", true)->Inc();

LOG_T("Send NodeInfo with size: " << nodesInfo.size() << " to DynamicNameserver");
if (!nodesInfo.empty()) {
LOG_T("Send NodeInfo with size: " << nodesInfo->size() << " to DynamicNameserver");
if (!nodesInfo->empty()) {
THolder<TEvInterconnect::TEvNodesInfo> nameServiceUpdateReq(new TEvInterconnect::TEvNodesInfo(nodesInfo));
Send(GetNameserviceActorId(), nameServiceUpdateReq.Release());
}
} catch (yexception &e) {
Expand Down
10 changes: 7 additions & 3 deletions ydb/core/health_check/health_check_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,11 +337,15 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) {
}

void SetLongHostValue(TEvInterconnect::TEvNodesInfo::TPtr* ev) {
auto nodes = MakeIntrusive<TIntrusiveVector<TEvInterconnect::TNodeInfo>>((*ev)->Get()->Nodes);
TString host(1000000, 'a');
auto& pbRecord = (*ev)->Get()->Nodes;
for (auto itIssue = pbRecord.begin(); itIssue != pbRecord.end(); ++itIssue) {
itIssue->Host = host;
for (auto it = nodes->begin(); it != nodes->end(); ++it) {
it->Host = host;
}
auto newEv = IEventHandle::Downcast<TEvInterconnect::TEvNodesInfo>(
new IEventHandle((*ev)->Recipient, (*ev)->Sender, new TEvInterconnect::TEvNodesInfo(nodes))
);
ev->Swap(newEv);
}

Ydb::Monitoring::SelfCheckResult RequestHc(size_t const groupNumber, size_t const vdiscPerGroupNumber, bool const isMergeRecords = false, bool const largeSizeVdisksIssues = false) {
Expand Down
13 changes: 7 additions & 6 deletions ydb/core/mind/bscontroller/impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1412,9 +1412,9 @@ class TBlobStorageController : public TActor<TBlobStorageController>, public TTa
TNodeId NodeId;
TNodeLocation Location;

THostRecord(TEvInterconnect::TNodeInfo&& nodeInfo)
THostRecord(const TEvInterconnect::TNodeInfo& nodeInfo)
: NodeId(nodeInfo.NodeId)
, Location(std::move(nodeInfo.Location))
, Location(nodeInfo.Location)
{}
};

Expand All @@ -1423,11 +1423,13 @@ class TBlobStorageController : public TActor<TBlobStorageController>, public TTa
THashMap<TNodeId, THostId> NodeIdToHostId;

public:
THostRecordMapImpl() = default;

THostRecordMapImpl(TEvInterconnect::TEvNodesInfo *msg) {
for (TEvInterconnect::TNodeInfo& nodeInfo : msg->Nodes) {
for (const TEvInterconnect::TNodeInfo& nodeInfo : msg->Nodes) {
const THostId hostId(nodeInfo.Host, nodeInfo.Port);
NodeIdToHostId.emplace(nodeInfo.NodeId, hostId);
HostIdToRecord.emplace(hostId, std::move(nodeInfo));
HostIdToRecord.emplace(hostId, nodeInfo);
}
}

Expand Down Expand Up @@ -1805,8 +1807,7 @@ class TBlobStorageController : public TActor<TBlobStorageController>, public TTa

// For test purposes, required for self heal actor
void CreateEmptyHostRecordsMap() {
TEvInterconnect::TEvNodesInfo nodes;
HostRecords = std::make_shared<THostRecordMapImpl>(&nodes);
HostRecords = std::make_shared<THostRecordMapImpl>();
}

ui64 NextConfigTxSeqNo = 1;
Expand Down
74 changes: 58 additions & 16 deletions ydb/core/mind/dynamic_nameserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,18 @@ void TDynamicNodeResolverBase::Handle(TEvNodeBroker::TEvResolvedNode::TPtr &ev,

if (rec.GetStatus().GetCode() != NKikimrNodeBroker::TStatus::OK) {
// Reset proxy if node expired.
if (exists)
if (exists) {
ResetInterconnectProxyConfig(NodeId, ctx);
ListNodesCache->Invalidate(); // node was erased
}
ReplyWithErrorAndDie(ctx);
return;
}

TDynamicConfig::TDynamicNodeInfo node(rec.GetNode());
if (!exists || !oldNode.EqualExceptExpire(node)) {
ListNodesCache->Invalidate();
}

// If ID is re-used by another node then proxy has to be reset.
if (exists && !oldNode.EqualExceptExpire(node))
Expand Down Expand Up @@ -202,31 +207,40 @@ void TDynamicNameserver::ResolveDynamicNode(ui32 nodeId,
reply->NodeId = nodeId;
ctx.Send(ev->Sender, reply);
} else {
ctx.RegisterWithSameMailbox(new TDynamicNodeResolver(SelfId(), nodeId, DynamicConfigs[domain], ev, deadline));
ctx.RegisterWithSameMailbox(new TDynamicNodeResolver(SelfId(), nodeId, DynamicConfigs[domain],
ListNodesCache, ev, deadline));
}
}

void TDynamicNameserver::SendNodesList(const TActorContext &ctx)
{
{
auto now = ctx.Now();
for (auto &sender : ListNodesQueue) {
THolder<TEvInterconnect::TEvNodesInfo> reply(new TEvInterconnect::TEvNodesInfo);
if (ListNodesCache->NeedUpdate(now)) {
auto newNodes = MakeIntrusive<TIntrusiveVector<TEvInterconnect::TNodeInfo>>();
auto newExpire = now;

for (const auto &pr : StaticConfig->StaticNodeTable) {
reply->Nodes.emplace_back(pr.first,
pr.second.Address, pr.second.Host, pr.second.ResolveHost,
pr.second.Port, pr.second.Location, true);
newNodes->emplace_back(pr.first,
pr.second.Address, pr.second.Host, pr.second.ResolveHost,
pr.second.Port, pr.second.Location, true);
}

for (auto &config : DynamicConfigs) {
for (auto &pr : config->DynamicNodes) {
if (pr.second.Expire > now)
reply->Nodes.emplace_back(pr.first, pr.second.Address,
pr.second.Host, pr.second.ResolveHost,
pr.second.Port, pr.second.Location, false);
if (pr.second.Expire > now) {
newNodes->emplace_back(pr.first, pr.second.Address,
pr.second.Host, pr.second.ResolveHost,
pr.second.Port, pr.second.Location, false);
newExpire = std::min(newExpire, pr.second.Expire);
}
}
}

ctx.Send(sender, reply.Release());
ListNodesCache->Update(newNodes, newExpire);
}

for (auto &sender : ListNodesQueue) {
ctx.Send(sender, new TEvInterconnect::TEvNodesInfo(ListNodesCache->GetNodes()));
}
ListNodesQueue.clear();
}
Expand Down Expand Up @@ -282,15 +296,18 @@ void TDynamicNameserver::UpdateState(const NKikimrNodeBroker::TNodesInfo &rec,
config->ExpiredNodes.emplace(node.GetNodeId(), info);
}

ListNodesCache->Invalidate();
config->Epoch = rec.GetEpoch();
ctx.Schedule(config->Epoch.End - ctx.Now(),
new TEvPrivate::TEvUpdateEpoch(domain, config->Epoch.Id + 1));
} else {
// Note: this update may be optimized to only include new nodes
for (auto &node : rec.GetNodes()) {
auto nodeId = node.GetNodeId();
if (!config->DynamicNodes.contains(nodeId))
if (!config->DynamicNodes.contains(nodeId)) {
config->DynamicNodes.emplace(nodeId, node);
ListNodesCache->Invalidate();
}
}
config->Epoch = rec.GetEpoch();
}
Expand Down Expand Up @@ -377,8 +394,8 @@ void TDynamicNameserver::Handle(TEvInterconnect::TEvGetNode::TPtr &ev, const TAc
ctx.Send(ev->Sender, reply.Release());
} else {
const TInstant deadline = ev->Get()->Deadline;
ctx.RegisterWithSameMailbox(new TDynamicNodeSearcher(SelfId(), nodeId, DynamicConfigs[domain], ev.Release(),
deadline));
ctx.RegisterWithSameMailbox(new TDynamicNodeSearcher(SelfId(), nodeId, DynamicConfigs[domain],
ListNodesCache, ev.Release(), deadline));
}
}
}
Expand Down Expand Up @@ -437,6 +454,7 @@ void TDynamicNameserver::Handle(NConsole::TEvConsole::TEvConfigNotificationReque
auto newStaticConfig = BuildNameserverTable(config.GetNameserviceConfig());
if (StaticConfig->StaticNodeTable != newStaticConfig->StaticNodeTable) {
StaticConfig = std::move(newStaticConfig);
ListNodesCache->Invalidate();
for (const auto& subscriber : StaticNodeChangeSubscribers) {
TActivationContext::Send(new IEventHandle(SelfId(), subscriber, new TEvInterconnect::TEvListNodes));
}
Expand Down Expand Up @@ -478,5 +496,29 @@ TIntrusivePtr<TTableNameserverSetup> BuildNameserverTable(const NKikimrConfig::T
return table;
}

TListNodesCache::TListNodesCache()
: Nodes(nullptr)
, Expire(TInstant::Zero())
{}


void TListNodesCache::Update(TIntrusiveVector<TEvInterconnect::TNodeInfo>::TConstPtr newNodes, TInstant newExpire) {
Nodes = newNodes;
Expire = newExpire;
}

void TListNodesCache::Invalidate() {
Nodes = nullptr;
Expire = TInstant::Zero();
}

bool TListNodesCache::NeedUpdate(TInstant now) const {
return Nodes == nullptr || now > Expire;
}

TIntrusiveVector<TEvInterconnect::TNodeInfo>::TConstPtr TListNodesCache::GetNodes() const {
return Nodes;
}

} // NNodeBroker
} // NKikimr
25 changes: 23 additions & 2 deletions ydb/core/mind/dynamic_nameserver_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ struct TDynamicConfig : public TThrRefBase {

using TDynamicConfigPtr = TIntrusivePtr<TDynamicConfig>;

class TListNodesCache : public TSimpleRefCount<TListNodesCache> {
public:
TListNodesCache();

void Update(TIntrusiveVector<TEvInterconnect::TNodeInfo>::TConstPtr newNodes, TInstant newExpire);
void Invalidate();
bool NeedUpdate(TInstant now) const;
TIntrusiveVector<TEvInterconnect::TNodeInfo>::TConstPtr GetNodes() const;
private:
TIntrusiveVector<TEvInterconnect::TNodeInfo>::TConstPtr Nodes;
TInstant Expire;
};

class TDynamicNodeResolverBase : public TActorBootstrapped<TDynamicNodeResolverBase> {
public:
using TBase = TActorBootstrapped<TDynamicNodeResolverBase>;
Expand All @@ -77,10 +90,12 @@ class TDynamicNodeResolverBase : public TActorBootstrapped<TDynamicNodeResolverB
}

TDynamicNodeResolverBase(TActorId owner, ui32 nodeId, TDynamicConfigPtr config,
TIntrusivePtr<TListNodesCache> listNodesCache,
TAutoPtr<IEventHandle> origRequest, TInstant deadline)
: Owner(owner)
, NodeId(nodeId)
, Config(config)
, ListNodesCache(listNodesCache)
, OrigRequest(origRequest)
, Deadline(deadline)
{
Expand Down Expand Up @@ -117,6 +132,7 @@ class TDynamicNodeResolverBase : public TActorBootstrapped<TDynamicNodeResolverB
TActorId Owner;
ui32 NodeId;
TDynamicConfigPtr Config;
TIntrusivePtr<TListNodesCache> ListNodesCache;
TAutoPtr<IEventHandle> OrigRequest;
const TInstant Deadline;

Expand All @@ -127,8 +143,9 @@ class TDynamicNodeResolverBase : public TActorBootstrapped<TDynamicNodeResolverB
class TDynamicNodeResolver : public TDynamicNodeResolverBase {
public:
TDynamicNodeResolver(TActorId owner, ui32 nodeId, TDynamicConfigPtr config,
TIntrusivePtr<TListNodesCache> listNodesCache,
TAutoPtr<IEventHandle> origRequest, TInstant deadline)
: TDynamicNodeResolverBase(owner, nodeId, config, origRequest, deadline)
: TDynamicNodeResolverBase(owner, nodeId, config, listNodesCache, origRequest, deadline)
{
}

Expand All @@ -139,8 +156,9 @@ class TDynamicNodeResolver : public TDynamicNodeResolverBase {
class TDynamicNodeSearcher : public TDynamicNodeResolverBase {
public:
TDynamicNodeSearcher(TActorId owner, ui32 nodeId, TDynamicConfigPtr config,
TIntrusivePtr<TListNodesCache> listNodesCache,
TAutoPtr<IEventHandle> origRequest, TInstant deadline)
: TDynamicNodeResolverBase(owner, nodeId, config, origRequest, deadline)
: TDynamicNodeResolverBase(owner, nodeId, config, listNodesCache, origRequest, deadline)
{
}

Expand Down Expand Up @@ -179,6 +197,7 @@ class TDynamicNameserver : public TActorBootstrapped<TDynamicNameserver> {

TDynamicNameserver(const TIntrusivePtr<TTableNameserverSetup> &setup, ui32 resolvePoolId)
: StaticConfig(setup)
, ListNodesCache(MakeIntrusive<TListNodesCache>())
, ResolvePoolId(resolvePoolId)
{
Y_ABORT_UNLESS(StaticConfig->IsEntriesUnique());
Expand Down Expand Up @@ -254,6 +273,8 @@ class TDynamicNameserver : public TActorBootstrapped<TDynamicNameserver> {
TIntrusivePtr<TTableNameserverSetup> StaticConfig;
std::array<TDynamicConfigPtr, DOMAINS_COUNT> DynamicConfigs;
TVector<TActorId> ListNodesQueue;
TIntrusivePtr<TListNodesCache> ListNodesCache;

std::array<TActorId, DOMAINS_COUNT> NodeBrokerPipes;
// When ListNodes requests are sent to NodeBroker tablets this
// bitmap indicates domains which didn't answer yet.
Expand Down
Loading

0 comments on commit 11eb58f

Please sign in to comment.