Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ListNodes cache in DynamicNameserver #12694

Merged
merged 9 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ydb/core/blobstorage/nodewarden/distconf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ namespace NKikimr::NStorage {
STLOG(PRI_DEBUG, BS_NODE, NWDC00, "Bootstrap");

auto ns = NNodeBroker::BuildNameserverTable(Cfg->NameserviceConfig);
auto ev = std::make_unique<TEvInterconnect::TEvNodesInfo>();
auto nodes = MakeIntrusive<TIntrusiveVector<TEvInterconnect::TNodeInfo>>();
for (const auto& [nodeId, item] : ns->StaticNodeTable) {
ev->Nodes.emplace_back(nodeId, item.Address, item.Host, item.ResolveHost, item.Port, item.Location);
nodes->emplace_back(nodeId, item.Address, item.Host, item.ResolveHost, item.Port, item.Location);
}
auto ev = std::make_unique<TEvInterconnect::TEvNodesInfo>(nodes);
Send(SelfId(), ev.release());

// and subscribe for the node list too
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/fq/libs/actors/nodes_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,14 +318,14 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
void HandleResponse(NFq::TEvInternalService::TEvHealthCheckResponse::TPtr& ev) {
try {
const auto& status = ev->Get()->Status.GetStatus();
THolder<TEvInterconnect::TEvNodesInfo> nameServiceUpdateReq(new TEvInterconnect::TEvNodesInfo());
if (!ev->Get()->Status.IsSuccess()) {
ythrow yexception() << status << '\n' << ev->Get()->Status.GetIssues().ToString();
}
const auto& res = ev->Get()->Result;

auto& nodesInfo = nameServiceUpdateReq->Nodes;
nodesInfo.reserve(res.nodes().size());
auto nodesInfo = MakeIntrusive<TIntrusiveVector<TEvInterconnect::TNodeInfo>>();
nodesInfo->reserve(res.nodes().size());
THolder<TEvInterconnect::TEvNodesInfo> nameServiceUpdateReq(new TEvInterconnect::TEvNodesInfo(nodesInfo));

Peers.clear();
std::set<ui32> nodeIds; // may be not unique
Expand All @@ -340,7 +340,7 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
node.active_workers(), node.memory_limit(), node.memory_allocated(), node.data_center()});

if (node.interconnect_port()) {
nodesInfo.emplace_back(TEvInterconnect::TNodeInfo{
nodesInfo->emplace_back(TEvInterconnect::TNodeInfo{
node.node_id(),
node.node_address(),
node.hostname(), // host
Expand All @@ -356,8 +356,8 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
ServiceCounters.Counters->GetCounter("PeerCount", false)->Set(Peers.size());
ServiceCounters.Counters->GetCounter("NodesHealthCheckOk", true)->Inc();

LOG_T("Send NodeInfo with size: " << nodesInfo.size() << " to DynamicNameserver");
if (!nodesInfo.empty()) {
LOG_T("Send NodeInfo with size: " << nodesInfo->size() << " to DynamicNameserver");
if (!nodesInfo->empty()) {
Send(GetNameserviceActorId(), nameServiceUpdateReq.Release());
}
} catch (yexception &e) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/health_check/health_check_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) {

void SetLongHostValue(TEvInterconnect::TEvNodesInfo::TPtr* ev) {
TString host(1000000, 'a');
auto& pbRecord = (*ev)->Get()->Nodes;
auto& pbRecord = const_cast<TVector<TEvInterconnect::TNodeInfo>&>((*ev)->Get()->Nodes);
for (auto itIssue = pbRecord.begin(); itIssue != pbRecord.end(); ++itIssue) {
itIssue->Host = host;
}
Expand Down
11 changes: 6 additions & 5 deletions ydb/core/mind/bscontroller/impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1416,7 +1416,7 @@ class TBlobStorageController : public TActor<TBlobStorageController>, public TTa
TNodeId NodeId;
TNodeLocation Location;

THostRecord(TEvInterconnect::TNodeInfo&& nodeInfo)
THostRecord(TEvInterconnect::TNodeInfo nodeInfo)
: NodeId(nodeInfo.NodeId)
, Location(std::move(nodeInfo.Location))
{}
Expand All @@ -1432,11 +1432,13 @@ class TBlobStorageController : public TActor<TBlobStorageController>, public TTa
THashMap<TNodeId, THostId> NodeIdToHostId;

public:
THostRecordMapImpl() = default;

THostRecordMapImpl(TEvInterconnect::TEvNodesInfo *msg) {
for (TEvInterconnect::TNodeInfo& nodeInfo : msg->Nodes) {
for (const TEvInterconnect::TNodeInfo& nodeInfo : msg->Nodes) {
const THostId hostId(nodeInfo.Host, nodeInfo.Port);
NodeIdToHostId.emplace(nodeInfo.NodeId, hostId);
HostIdToRecord.emplace(hostId, std::move(nodeInfo));
HostIdToRecord.emplace(hostId, nodeInfo);
}
}

Expand Down Expand Up @@ -1824,8 +1826,7 @@ class TBlobStorageController : public TActor<TBlobStorageController>, public TTa

// For test purposes, required for self heal actor
void CreateEmptyHostRecordsMap() {
TEvInterconnect::TEvNodesInfo nodes;
HostRecords = std::make_shared<THostRecordMapImpl>(&nodes);
HostRecords = std::make_shared<THostRecordMapImpl>();
}

ui64 NextConfigTxSeqNo = 1;
Expand Down
47 changes: 32 additions & 15 deletions ydb/core/mind/dynamic_nameserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,18 @@ void TDynamicNodeResolverBase::Handle(TEvNodeBroker::TEvResolvedNode::TPtr &ev,

if (rec.GetStatus().GetCode() != NKikimrNodeBroker::TStatus::OK) {
// Reset proxy if node expired.
if (exists)
if (exists) {
ResetInterconnectProxyConfig(NodeId, ctx);
*NeedUpdateListNodesCache = true; // node was erased
}
ReplyWithErrorAndDie(ctx);
return;
}

TDynamicConfig::TDynamicNodeInfo node(rec.GetNode());
if (!exists || !oldNode.EqualExceptExpire(node)) {
*NeedUpdateListNodesCache = true;
}

// If ID is re-used by another node then proxy has to be reset.
if (exists && !oldNode.EqualExceptExpire(node))
Expand Down Expand Up @@ -145,6 +150,7 @@ void TDynamicNameserver::Handle(TEvNodeWardenStorageConfig::TPtr ev) {
auto newStaticConfig = BuildNameserverTable(config);
if (StaticConfig->StaticNodeTable != newStaticConfig->StaticNodeTable) {
StaticConfig = std::move(newStaticConfig);
*NeedUpdateListNodesCache = true;
for (const auto& subscriber : StaticNodeChangeSubscribers) {
TActivationContext::Send(new IEventHandle(SelfId(), subscriber, new TEvInterconnect::TEvListNodes));
}
Expand Down Expand Up @@ -220,31 +226,38 @@ void TDynamicNameserver::ResolveDynamicNode(ui32 nodeId,
reply->NodeId = nodeId;
ctx.Send(ev->Sender, reply);
} else {
ctx.RegisterWithSameMailbox(new TDynamicNodeResolver(SelfId(), nodeId, DynamicConfigs[domain], ev, deadline));
ctx.RegisterWithSameMailbox(new TDynamicNodeResolver(SelfId(), nodeId, DynamicConfigs[domain],
NeedUpdateListNodesCache, ev, deadline));
}
}

void TDynamicNameserver::SendNodesList(const TActorContext &ctx)
{
auto now = ctx.Now();
for (auto &sender : ListNodesQueue) {
THolder<TEvInterconnect::TEvNodesInfo> reply(new TEvInterconnect::TEvNodesInfo);
if (*NeedUpdateListNodesCache) {
auto newListNodesCache = MakeIntrusive<TIntrusiveVector<TEvInterconnect::TNodeInfo>>();

auto now = ctx.Now();
for (const auto &pr : StaticConfig->StaticNodeTable) {
reply->Nodes.emplace_back(pr.first,
pr.second.Address, pr.second.Host, pr.second.ResolveHost,
pr.second.Port, pr.second.Location, true);
newListNodesCache->emplace_back(pr.first,
pr.second.Address, pr.second.Host, pr.second.ResolveHost,
pr.second.Port, pr.second.Location, true);
}

for (auto &config : DynamicConfigs) {
for (auto &pr : config->DynamicNodes) {
if (pr.second.Expire > now)
reply->Nodes.emplace_back(pr.first, pr.second.Address,
pr.second.Host, pr.second.ResolveHost,
pr.second.Port, pr.second.Location, false);
newListNodesCache->emplace_back(pr.first, pr.second.Address,
pr.second.Host, pr.second.ResolveHost,
pr.second.Port, pr.second.Location, false);
}
}

ctx.Send(sender, reply.Release());
ListNodesCache = newListNodesCache;
*NeedUpdateListNodesCache = false;
}

for (auto &sender : ListNodesQueue) {
ctx.Send(sender, new TEvInterconnect::TEvNodesInfo(ListNodesCache));
}
ListNodesQueue.clear();
}
Expand Down Expand Up @@ -300,15 +313,18 @@ void TDynamicNameserver::UpdateState(const NKikimrNodeBroker::TNodesInfo &rec,
config->ExpiredNodes.emplace(node.GetNodeId(), info);
}

*NeedUpdateListNodesCache = true;
config->Epoch = rec.GetEpoch();
ctx.Schedule(config->Epoch.End - ctx.Now(),
new TEvPrivate::TEvUpdateEpoch(domain, config->Epoch.Id + 1));
} else {
// Note: this update may be optimized to only include new nodes
for (auto &node : rec.GetNodes()) {
auto nodeId = node.GetNodeId();
if (!config->DynamicNodes.contains(nodeId))
if (!config->DynamicNodes.contains(nodeId)) {
config->DynamicNodes.emplace(nodeId, node);
*NeedUpdateListNodesCache = true;
}
}
config->Epoch = rec.GetEpoch();
}
Expand Down Expand Up @@ -395,8 +411,8 @@ void TDynamicNameserver::Handle(TEvInterconnect::TEvGetNode::TPtr &ev, const TAc
ctx.Send(ev->Sender, reply.Release());
} else {
const TInstant deadline = ev->Get()->Deadline;
ctx.RegisterWithSameMailbox(new TDynamicNodeSearcher(SelfId(), nodeId, DynamicConfigs[domain], ev.Release(),
deadline));
ctx.RegisterWithSameMailbox(new TDynamicNodeSearcher(SelfId(), nodeId, DynamicConfigs[domain],
NeedUpdateListNodesCache, ev.Release(), deadline));
}
}
}
Expand Down Expand Up @@ -455,6 +471,7 @@ void TDynamicNameserver::Handle(NConsole::TEvConsole::TEvConfigNotificationReque
auto newStaticConfig = BuildNameserverTable(config.GetNameserviceConfig());
if (StaticConfig->StaticNodeTable != newStaticConfig->StaticNodeTable) {
StaticConfig = std::move(newStaticConfig);
*NeedUpdateListNodesCache = true;
for (const auto& subscriber : StaticNodeChangeSubscribers) {
TActivationContext::Send(new IEventHandle(SelfId(), subscriber, new TEvInterconnect::TEvListNodes));
}
Expand Down
15 changes: 13 additions & 2 deletions ydb/core/mind/dynamic_nameserver_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,12 @@ class TDynamicNodeResolverBase : public TActorBootstrapped<TDynamicNodeResolverB
}

TDynamicNodeResolverBase(TActorId owner, ui32 nodeId, TDynamicConfigPtr config,
std::shared_ptr<bool> needUpdateListNodesCache,
TAutoPtr<IEventHandle> origRequest, TInstant deadline)
: Owner(owner)
, NodeId(nodeId)
, Config(config)
, NeedUpdateListNodesCache(needUpdateListNodesCache)
, OrigRequest(origRequest)
, Deadline(deadline)
{
Expand Down Expand Up @@ -118,6 +120,7 @@ class TDynamicNodeResolverBase : public TActorBootstrapped<TDynamicNodeResolverB
TActorId Owner;
ui32 NodeId;
TDynamicConfigPtr Config;
std::shared_ptr<bool> NeedUpdateListNodesCache;
TAutoPtr<IEventHandle> OrigRequest;
const TInstant Deadline;

Expand All @@ -128,8 +131,9 @@ class TDynamicNodeResolverBase : public TActorBootstrapped<TDynamicNodeResolverB
class TDynamicNodeResolver : public TDynamicNodeResolverBase {
public:
TDynamicNodeResolver(TActorId owner, ui32 nodeId, TDynamicConfigPtr config,
std::shared_ptr<bool> needUpdateListNodesCache,
TAutoPtr<IEventHandle> origRequest, TInstant deadline)
: TDynamicNodeResolverBase(owner, nodeId, config, origRequest, deadline)
: TDynamicNodeResolverBase(owner, nodeId, config, needUpdateListNodesCache, origRequest, deadline)
{
}

Expand All @@ -140,8 +144,9 @@ class TDynamicNodeResolver : public TDynamicNodeResolverBase {
class TDynamicNodeSearcher : public TDynamicNodeResolverBase {
public:
TDynamicNodeSearcher(TActorId owner, ui32 nodeId, TDynamicConfigPtr config,
std::shared_ptr<bool> needUpdateListNodesCache,
TAutoPtr<IEventHandle> origRequest, TInstant deadline)
: TDynamicNodeResolverBase(owner, nodeId, config, origRequest, deadline)
: TDynamicNodeResolverBase(owner, nodeId, config, needUpdateListNodesCache, origRequest, deadline)
{
}

Expand Down Expand Up @@ -180,6 +185,8 @@ class TDynamicNameserver : public TActorBootstrapped<TDynamicNameserver> {

TDynamicNameserver(const TIntrusivePtr<TTableNameserverSetup> &setup, ui32 resolvePoolId)
: StaticConfig(setup)
, ListNodesCache(nullptr)
, NeedUpdateListNodesCache(std::make_shared<bool>(true))
, ResolvePoolId(resolvePoolId)
{
Y_ABORT_UNLESS(StaticConfig->IsEntriesUnique());
Expand Down Expand Up @@ -258,6 +265,10 @@ class TDynamicNameserver : public TActorBootstrapped<TDynamicNameserver> {
TIntrusivePtr<TTableNameserverSetup> StaticConfig;
std::array<TDynamicConfigPtr, DOMAINS_COUNT> DynamicConfigs;
TVector<TActorId> ListNodesQueue;

TIntrusiveVector<TEvInterconnect::TNodeInfo>::TPtr ListNodesCache;
std::shared_ptr<bool> NeedUpdateListNodesCache;

std::array<TActorId, DOMAINS_COUNT> NodeBrokerPipes;
// When ListNodes requests are sent to NodeBroker tablets this
// bitmap indicates domains which didn't answer yet.
Expand Down
Loading
Loading