Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

24-3: Add ListNodes cache in DynamicNameserver (#12694) #12747

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ydb/core/blobstorage/nodewarden/distconf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ namespace NKikimr::NStorage {
STLOG(PRI_DEBUG, BS_NODE, NWDC00, "Bootstrap");

auto ns = NNodeBroker::BuildNameserverTable(Cfg->NameserviceConfig);
auto ev = std::make_unique<TEvInterconnect::TEvNodesInfo>();
auto nodes = MakeIntrusive<TIntrusiveVector<TEvInterconnect::TNodeInfo>>();
for (const auto& [nodeId, item] : ns->StaticNodeTable) {
ev->Nodes.emplace_back(nodeId, item.Address, item.Host, item.ResolveHost, item.Port, item.Location);
nodes->emplace_back(nodeId, item.Address, item.Host, item.ResolveHost, item.Port, item.Location);
}
auto ev = std::make_unique<TEvInterconnect::TEvNodesInfo>(nodes);
Send(SelfId(), ev.release());

// and subscribe for the node list too
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/fq/libs/actors/nodes_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,13 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
void HandleResponse(NFq::TEvInternalService::TEvHealthCheckResponse::TPtr& ev) {
try {
const auto& status = ev->Get()->Status.GetStatus();
THolder<TEvInterconnect::TEvNodesInfo> nameServiceUpdateReq(new TEvInterconnect::TEvNodesInfo());
if (!ev->Get()->Status.IsSuccess()) {
ythrow yexception() << status << '\n' << ev->Get()->Status.GetIssues().ToString();
}
const auto& res = ev->Get()->Result;

auto& nodesInfo = nameServiceUpdateReq->Nodes;
nodesInfo.reserve(res.nodes().size());
auto nodesInfo = MakeIntrusive<TIntrusiveVector<TEvInterconnect::TNodeInfo>>();
nodesInfo->reserve(res.nodes().size());

Peers.clear();
std::set<ui32> nodeIds; // may be not unique
Expand All @@ -281,7 +280,7 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
node.active_workers(), node.memory_limit(), node.memory_allocated(), node.data_center()});

if (node.interconnect_port()) {
nodesInfo.emplace_back(TEvInterconnect::TNodeInfo{
nodesInfo->emplace_back(TEvInterconnect::TNodeInfo{
node.node_id(),
node.node_address(),
node.hostname(), // host
Expand All @@ -297,8 +296,9 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
ServiceCounters.Counters->GetCounter("PeerCount", false)->Set(Peers.size());
ServiceCounters.Counters->GetCounter("NodesHealthCheckOk", true)->Inc();

LOG_T("Send NodeInfo with size: " << nodesInfo.size() << " to DynamicNameserver");
if (!nodesInfo.empty()) {
LOG_T("Send NodeInfo with size: " << nodesInfo->size() << " to DynamicNameserver");
if (!nodesInfo->empty()) {
THolder<TEvInterconnect::TEvNodesInfo> nameServiceUpdateReq(new TEvInterconnect::TEvNodesInfo(nodesInfo));
Send(GetNameserviceActorId(), nameServiceUpdateReq.Release());
}
} catch (yexception &e) {
Expand Down
10 changes: 7 additions & 3 deletions ydb/core/health_check/health_check_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,11 +337,15 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) {
}

void SetLongHostValue(TEvInterconnect::TEvNodesInfo::TPtr* ev) {
auto nodes = MakeIntrusive<TIntrusiveVector<TEvInterconnect::TNodeInfo>>((*ev)->Get()->Nodes);
TString host(1000000, 'a');
auto& pbRecord = (*ev)->Get()->Nodes;
for (auto itIssue = pbRecord.begin(); itIssue != pbRecord.end(); ++itIssue) {
itIssue->Host = host;
for (auto it = nodes->begin(); it != nodes->end(); ++it) {
it->Host = host;
}
auto newEv = IEventHandle::Downcast<TEvInterconnect::TEvNodesInfo>(
new IEventHandle((*ev)->Recipient, (*ev)->Sender, new TEvInterconnect::TEvNodesInfo(nodes))
);
ev->Swap(newEv);
}

Ydb::Monitoring::SelfCheckResult RequestHc(size_t const groupNumber, size_t const vdiscPerGroupNumber, bool const isMergeRecords = false, bool const largeSizeVdisksIssues = false) {
Expand Down
13 changes: 7 additions & 6 deletions ydb/core/mind/bscontroller/impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1412,9 +1412,9 @@ class TBlobStorageController : public TActor<TBlobStorageController>, public TTa
TNodeId NodeId;
TNodeLocation Location;

THostRecord(TEvInterconnect::TNodeInfo&& nodeInfo)
THostRecord(const TEvInterconnect::TNodeInfo& nodeInfo)
: NodeId(nodeInfo.NodeId)
, Location(std::move(nodeInfo.Location))
, Location(nodeInfo.Location)
{}
};

Expand All @@ -1423,11 +1423,13 @@ class TBlobStorageController : public TActor<TBlobStorageController>, public TTa
THashMap<TNodeId, THostId> NodeIdToHostId;

public:
THostRecordMapImpl() = default;

THostRecordMapImpl(TEvInterconnect::TEvNodesInfo *msg) {
for (TEvInterconnect::TNodeInfo& nodeInfo : msg->Nodes) {
for (const TEvInterconnect::TNodeInfo& nodeInfo : msg->Nodes) {
const THostId hostId(nodeInfo.Host, nodeInfo.Port);
NodeIdToHostId.emplace(nodeInfo.NodeId, hostId);
HostIdToRecord.emplace(hostId, std::move(nodeInfo));
HostIdToRecord.emplace(hostId, nodeInfo);
}
}

Expand Down Expand Up @@ -1805,8 +1807,7 @@ class TBlobStorageController : public TActor<TBlobStorageController>, public TTa

// For test purposes, required for self heal actor
void CreateEmptyHostRecordsMap() {
TEvInterconnect::TEvNodesInfo nodes;
HostRecords = std::make_shared<THostRecordMapImpl>(&nodes);
HostRecords = std::make_shared<THostRecordMapImpl>();
}

ui64 NextConfigTxSeqNo = 1;
Expand Down
74 changes: 58 additions & 16 deletions ydb/core/mind/dynamic_nameserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,18 @@ void TDynamicNodeResolverBase::Handle(TEvNodeBroker::TEvResolvedNode::TPtr &ev,

if (rec.GetStatus().GetCode() != NKikimrNodeBroker::TStatus::OK) {
// Reset proxy if node expired.
if (exists)
if (exists) {
ResetInterconnectProxyConfig(NodeId, ctx);
ListNodesCache->Invalidate(); // node was erased
}
ReplyWithErrorAndDie(ctx);
return;
}

TDynamicConfig::TDynamicNodeInfo node(rec.GetNode());
if (!exists || !oldNode.EqualExceptExpire(node)) {
ListNodesCache->Invalidate();
}

// If ID is re-used by another node then proxy has to be reset.
if (exists && !oldNode.EqualExceptExpire(node))
Expand Down Expand Up @@ -202,31 +207,40 @@ void TDynamicNameserver::ResolveDynamicNode(ui32 nodeId,
reply->NodeId = nodeId;
ctx.Send(ev->Sender, reply);
} else {
ctx.RegisterWithSameMailbox(new TDynamicNodeResolver(SelfId(), nodeId, DynamicConfigs[domain], ev, deadline));
ctx.RegisterWithSameMailbox(new TDynamicNodeResolver(SelfId(), nodeId, DynamicConfigs[domain],
ListNodesCache, ev, deadline));
}
}

void TDynamicNameserver::SendNodesList(const TActorContext &ctx)
{
{
auto now = ctx.Now();
for (auto &sender : ListNodesQueue) {
THolder<TEvInterconnect::TEvNodesInfo> reply(new TEvInterconnect::TEvNodesInfo);
if (ListNodesCache->NeedUpdate(now)) {
auto newNodes = MakeIntrusive<TIntrusiveVector<TEvInterconnect::TNodeInfo>>();
auto newExpire = now;

for (const auto &pr : StaticConfig->StaticNodeTable) {
reply->Nodes.emplace_back(pr.first,
pr.second.Address, pr.second.Host, pr.second.ResolveHost,
pr.second.Port, pr.second.Location, true);
newNodes->emplace_back(pr.first,
pr.second.Address, pr.second.Host, pr.second.ResolveHost,
pr.second.Port, pr.second.Location, true);
}

for (auto &config : DynamicConfigs) {
for (auto &pr : config->DynamicNodes) {
if (pr.second.Expire > now)
reply->Nodes.emplace_back(pr.first, pr.second.Address,
pr.second.Host, pr.second.ResolveHost,
pr.second.Port, pr.second.Location, false);
if (pr.second.Expire > now) {
newNodes->emplace_back(pr.first, pr.second.Address,
pr.second.Host, pr.second.ResolveHost,
pr.second.Port, pr.second.Location, false);
newExpire = std::min(newExpire, pr.second.Expire);
}
}
}

ctx.Send(sender, reply.Release());
ListNodesCache->Update(newNodes, newExpire);
}

for (auto &sender : ListNodesQueue) {
ctx.Send(sender, new TEvInterconnect::TEvNodesInfo(ListNodesCache->GetNodes()));
}
ListNodesQueue.clear();
}
Expand Down Expand Up @@ -282,15 +296,18 @@ void TDynamicNameserver::UpdateState(const NKikimrNodeBroker::TNodesInfo &rec,
config->ExpiredNodes.emplace(node.GetNodeId(), info);
}

ListNodesCache->Invalidate();
config->Epoch = rec.GetEpoch();
ctx.Schedule(config->Epoch.End - ctx.Now(),
new TEvPrivate::TEvUpdateEpoch(domain, config->Epoch.Id + 1));
} else {
// Note: this update may be optimized to only include new nodes
for (auto &node : rec.GetNodes()) {
auto nodeId = node.GetNodeId();
if (!config->DynamicNodes.contains(nodeId))
if (!config->DynamicNodes.contains(nodeId)) {
config->DynamicNodes.emplace(nodeId, node);
ListNodesCache->Invalidate();
}
}
config->Epoch = rec.GetEpoch();
}
Expand Down Expand Up @@ -377,8 +394,8 @@ void TDynamicNameserver::Handle(TEvInterconnect::TEvGetNode::TPtr &ev, const TAc
ctx.Send(ev->Sender, reply.Release());
} else {
const TInstant deadline = ev->Get()->Deadline;
ctx.RegisterWithSameMailbox(new TDynamicNodeSearcher(SelfId(), nodeId, DynamicConfigs[domain], ev.Release(),
deadline));
ctx.RegisterWithSameMailbox(new TDynamicNodeSearcher(SelfId(), nodeId, DynamicConfigs[domain],
ListNodesCache, ev.Release(), deadline));
}
}
}
Expand Down Expand Up @@ -437,6 +454,7 @@ void TDynamicNameserver::Handle(NConsole::TEvConsole::TEvConfigNotificationReque
auto newStaticConfig = BuildNameserverTable(config.GetNameserviceConfig());
if (StaticConfig->StaticNodeTable != newStaticConfig->StaticNodeTable) {
StaticConfig = std::move(newStaticConfig);
ListNodesCache->Invalidate();
for (const auto& subscriber : StaticNodeChangeSubscribers) {
TActivationContext::Send(new IEventHandle(SelfId(), subscriber, new TEvInterconnect::TEvListNodes));
}
Expand Down Expand Up @@ -478,5 +496,29 @@ TIntrusivePtr<TTableNameserverSetup> BuildNameserverTable(const NKikimrConfig::T
return table;
}

TListNodesCache::TListNodesCache()
: Nodes(nullptr)
, Expire(TInstant::Zero())
{}


void TListNodesCache::Update(TIntrusiveVector<TEvInterconnect::TNodeInfo>::TConstPtr newNodes, TInstant newExpire) {
Nodes = newNodes;
Expire = newExpire;
}

void TListNodesCache::Invalidate() {
Nodes = nullptr;
Expire = TInstant::Zero();
}

bool TListNodesCache::NeedUpdate(TInstant now) const {
return Nodes == nullptr || now > Expire;
}

TIntrusiveVector<TEvInterconnect::TNodeInfo>::TConstPtr TListNodesCache::GetNodes() const {
return Nodes;
}

} // NNodeBroker
} // NKikimr
25 changes: 23 additions & 2 deletions ydb/core/mind/dynamic_nameserver_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ struct TDynamicConfig : public TThrRefBase {

using TDynamicConfigPtr = TIntrusivePtr<TDynamicConfig>;

class TListNodesCache : public TSimpleRefCount<TListNodesCache> {
public:
TListNodesCache();

void Update(TIntrusiveVector<TEvInterconnect::TNodeInfo>::TConstPtr newNodes, TInstant newExpire);
void Invalidate();
bool NeedUpdate(TInstant now) const;
TIntrusiveVector<TEvInterconnect::TNodeInfo>::TConstPtr GetNodes() const;
private:
TIntrusiveVector<TEvInterconnect::TNodeInfo>::TConstPtr Nodes;
TInstant Expire;
};

class TDynamicNodeResolverBase : public TActorBootstrapped<TDynamicNodeResolverBase> {
public:
using TBase = TActorBootstrapped<TDynamicNodeResolverBase>;
Expand All @@ -77,10 +90,12 @@ class TDynamicNodeResolverBase : public TActorBootstrapped<TDynamicNodeResolverB
}

TDynamicNodeResolverBase(TActorId owner, ui32 nodeId, TDynamicConfigPtr config,
TIntrusivePtr<TListNodesCache> listNodesCache,
TAutoPtr<IEventHandle> origRequest, TInstant deadline)
: Owner(owner)
, NodeId(nodeId)
, Config(config)
, ListNodesCache(listNodesCache)
, OrigRequest(origRequest)
, Deadline(deadline)
{
Expand Down Expand Up @@ -117,6 +132,7 @@ class TDynamicNodeResolverBase : public TActorBootstrapped<TDynamicNodeResolverB
TActorId Owner;
ui32 NodeId;
TDynamicConfigPtr Config;
TIntrusivePtr<TListNodesCache> ListNodesCache;
TAutoPtr<IEventHandle> OrigRequest;
const TInstant Deadline;

Expand All @@ -127,8 +143,9 @@ class TDynamicNodeResolverBase : public TActorBootstrapped<TDynamicNodeResolverB
class TDynamicNodeResolver : public TDynamicNodeResolverBase {
public:
TDynamicNodeResolver(TActorId owner, ui32 nodeId, TDynamicConfigPtr config,
TIntrusivePtr<TListNodesCache> listNodesCache,
TAutoPtr<IEventHandle> origRequest, TInstant deadline)
: TDynamicNodeResolverBase(owner, nodeId, config, origRequest, deadline)
: TDynamicNodeResolverBase(owner, nodeId, config, listNodesCache, origRequest, deadline)
{
}

Expand All @@ -139,8 +156,9 @@ class TDynamicNodeResolver : public TDynamicNodeResolverBase {
class TDynamicNodeSearcher : public TDynamicNodeResolverBase {
public:
TDynamicNodeSearcher(TActorId owner, ui32 nodeId, TDynamicConfigPtr config,
TIntrusivePtr<TListNodesCache> listNodesCache,
TAutoPtr<IEventHandle> origRequest, TInstant deadline)
: TDynamicNodeResolverBase(owner, nodeId, config, origRequest, deadline)
: TDynamicNodeResolverBase(owner, nodeId, config, listNodesCache, origRequest, deadline)
{
}

Expand Down Expand Up @@ -179,6 +197,7 @@ class TDynamicNameserver : public TActorBootstrapped<TDynamicNameserver> {

TDynamicNameserver(const TIntrusivePtr<TTableNameserverSetup> &setup, ui32 resolvePoolId)
: StaticConfig(setup)
, ListNodesCache(MakeIntrusive<TListNodesCache>())
, ResolvePoolId(resolvePoolId)
{
Y_ABORT_UNLESS(StaticConfig->IsEntriesUnique());
Expand Down Expand Up @@ -254,6 +273,8 @@ class TDynamicNameserver : public TActorBootstrapped<TDynamicNameserver> {
TIntrusivePtr<TTableNameserverSetup> StaticConfig;
std::array<TDynamicConfigPtr, DOMAINS_COUNT> DynamicConfigs;
TVector<TActorId> ListNodesQueue;
TIntrusivePtr<TListNodesCache> ListNodesCache;

std::array<TActorId, DOMAINS_COUNT> NodeBrokerPipes;
// When ListNodes requests are sent to NodeBroker tablets this
// bitmap indicates domains which didn't answer yet.
Expand Down
Loading
Loading