Skip to content

Commit

Permalink
Merge 1da63a3 into dffc12a
Browse files Browse the repository at this point in the history
  • Loading branch information
pixcc authored Dec 17, 2024
2 parents dffc12a + 1da63a3 commit 83459a8
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 74 deletions.
5 changes: 3 additions & 2 deletions ydb/core/blobstorage/nodewarden/distconf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ namespace NKikimr::NStorage {
STLOG(PRI_DEBUG, BS_NODE, NWDC00, "Bootstrap");

auto ns = NNodeBroker::BuildNameserverTable(Cfg->NameserviceConfig);
auto ev = std::make_unique<TEvInterconnect::TEvNodesInfo>();
auto nodes = MakeIntrusive<TIntrusiveVector<TEvInterconnect::TNodeInfo>>();
for (const auto& [nodeId, item] : ns->StaticNodeTable) {
ev->Nodes.emplace_back(nodeId, item.Address, item.Host, item.ResolveHost, item.Port, item.Location);
nodes->emplace_back(nodeId, item.Address, item.Host, item.ResolveHost, item.Port, item.Location);
}
auto ev = std::make_unique<TEvInterconnect::TEvNodesInfo>(nodes);
Send(SelfId(), ev.release());

// and subscribe for the node list too
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/fq/libs/actors/nodes_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,14 +318,14 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
void HandleResponse(NFq::TEvInternalService::TEvHealthCheckResponse::TPtr& ev) {
try {
const auto& status = ev->Get()->Status.GetStatus();
THolder<TEvInterconnect::TEvNodesInfo> nameServiceUpdateReq(new TEvInterconnect::TEvNodesInfo());
if (!ev->Get()->Status.IsSuccess()) {
ythrow yexception() << status << '\n' << ev->Get()->Status.GetIssues().ToString();
}
const auto& res = ev->Get()->Result;

auto& nodesInfo = nameServiceUpdateReq->Nodes;
nodesInfo.reserve(res.nodes().size());
auto nodesInfo = MakeIntrusive<TIntrusiveVector<TEvInterconnect::TNodeInfo>>();
nodesInfo->reserve(res.nodes().size());
THolder<TEvInterconnect::TEvNodesInfo> nameServiceUpdateReq(new TEvInterconnect::TEvNodesInfo(nodesInfo));

Peers.clear();
std::set<ui32> nodeIds; // may be not unique
Expand All @@ -340,7 +340,7 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
node.active_workers(), node.memory_limit(), node.memory_allocated(), node.data_center()});

if (node.interconnect_port()) {
nodesInfo.emplace_back(TEvInterconnect::TNodeInfo{
nodesInfo->emplace_back(TEvInterconnect::TNodeInfo{
node.node_id(),
node.node_address(),
node.hostname(), // host
Expand All @@ -356,8 +356,8 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
ServiceCounters.Counters->GetCounter("PeerCount", false)->Set(Peers.size());
ServiceCounters.Counters->GetCounter("NodesHealthCheckOk", true)->Inc();

LOG_T("Send NodeInfo with size: " << nodesInfo.size() << " to DynamicNameserver");
if (!nodesInfo.empty()) {
LOG_T("Send NodeInfo with size: " << nodesInfo->size() << " to DynamicNameserver");
if (!nodesInfo->empty()) {
Send(GetNameserviceActorId(), nameServiceUpdateReq.Release());
}
} catch (yexception &e) {
Expand Down
11 changes: 6 additions & 5 deletions ydb/core/mind/bscontroller/impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1416,7 +1416,7 @@ class TBlobStorageController : public TActor<TBlobStorageController>, public TTa
TNodeId NodeId;
TNodeLocation Location;

THostRecord(TEvInterconnect::TNodeInfo&& nodeInfo)
THostRecord(TEvInterconnect::TNodeInfo nodeInfo)
: NodeId(nodeInfo.NodeId)
, Location(std::move(nodeInfo.Location))
{}
Expand All @@ -1432,11 +1432,13 @@ class TBlobStorageController : public TActor<TBlobStorageController>, public TTa
THashMap<TNodeId, THostId> NodeIdToHostId;

public:
THostRecordMapImpl() = default;

THostRecordMapImpl(TEvInterconnect::TEvNodesInfo *msg) {
for (TEvInterconnect::TNodeInfo& nodeInfo : msg->Nodes) {
for (const TEvInterconnect::TNodeInfo& nodeInfo : msg->Nodes) {
const THostId hostId(nodeInfo.Host, nodeInfo.Port);
NodeIdToHostId.emplace(nodeInfo.NodeId, hostId);
HostIdToRecord.emplace(hostId, std::move(nodeInfo));
HostIdToRecord.emplace(hostId, nodeInfo);
}
}

Expand Down Expand Up @@ -1824,8 +1826,7 @@ class TBlobStorageController : public TActor<TBlobStorageController>, public TTa

// For test purposes, required for self heal actor
void CreateEmptyHostRecordsMap() {
TEvInterconnect::TEvNodesInfo nodes;
HostRecords = std::make_shared<THostRecordMapImpl>(&nodes);
HostRecords = std::make_shared<THostRecordMapImpl>();
}

ui64 NextConfigTxSeqNo = 1;
Expand Down
47 changes: 32 additions & 15 deletions ydb/core/mind/dynamic_nameserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,18 @@ void TDynamicNodeResolverBase::Handle(TEvNodeBroker::TEvResolvedNode::TPtr &ev,

if (rec.GetStatus().GetCode() != NKikimrNodeBroker::TStatus::OK) {
// Reset proxy if node expired.
if (exists)
if (exists) {
ResetInterconnectProxyConfig(NodeId, ctx);
*NeedUpdateListNodesCache = true; // node was erased
}
ReplyWithErrorAndDie(ctx);
return;
}

TDynamicConfig::TDynamicNodeInfo node(rec.GetNode());
if (!exists || !oldNode.EqualExceptExpire(node)) {
*NeedUpdateListNodesCache = true;
}

// If ID is re-used by another node then proxy has to be reset.
if (exists && !oldNode.EqualExceptExpire(node))
Expand Down Expand Up @@ -145,6 +150,7 @@ void TDynamicNameserver::Handle(TEvNodeWardenStorageConfig::TPtr ev) {
auto newStaticConfig = BuildNameserverTable(config);
if (StaticConfig->StaticNodeTable != newStaticConfig->StaticNodeTable) {
StaticConfig = std::move(newStaticConfig);
*NeedUpdateListNodesCache = true;
for (const auto& subscriber : StaticNodeChangeSubscribers) {
TActivationContext::Send(new IEventHandle(SelfId(), subscriber, new TEvInterconnect::TEvListNodes));
}
Expand Down Expand Up @@ -220,31 +226,38 @@ void TDynamicNameserver::ResolveDynamicNode(ui32 nodeId,
reply->NodeId = nodeId;
ctx.Send(ev->Sender, reply);
} else {
ctx.RegisterWithSameMailbox(new TDynamicNodeResolver(SelfId(), nodeId, DynamicConfigs[domain], ev, deadline));
ctx.RegisterWithSameMailbox(new TDynamicNodeResolver(SelfId(), nodeId, DynamicConfigs[domain],
NeedUpdateListNodesCache, ev, deadline));
}
}

void TDynamicNameserver::SendNodesList(const TActorContext &ctx)
{
auto now = ctx.Now();
for (auto &sender : ListNodesQueue) {
THolder<TEvInterconnect::TEvNodesInfo> reply(new TEvInterconnect::TEvNodesInfo);
if (*NeedUpdateListNodesCache) {
auto newListNodesCache = MakeIntrusive<TIntrusiveVector<TEvInterconnect::TNodeInfo>>();

auto now = ctx.Now();
for (const auto &pr : StaticConfig->StaticNodeTable) {
reply->Nodes.emplace_back(pr.first,
pr.second.Address, pr.second.Host, pr.second.ResolveHost,
pr.second.Port, pr.second.Location, true);
newListNodesCache->emplace_back(pr.first,
pr.second.Address, pr.second.Host, pr.second.ResolveHost,
pr.second.Port, pr.second.Location, true);
}

for (auto &config : DynamicConfigs) {
for (auto &pr : config->DynamicNodes) {
if (pr.second.Expire > now)
reply->Nodes.emplace_back(pr.first, pr.second.Address,
pr.second.Host, pr.second.ResolveHost,
pr.second.Port, pr.second.Location, false);
newListNodesCache->emplace_back(pr.first, pr.second.Address,
pr.second.Host, pr.second.ResolveHost,
pr.second.Port, pr.second.Location, false);
}
}

ctx.Send(sender, reply.Release());
ListNodesCache = newListNodesCache;
*NeedUpdateListNodesCache = false;
}

for (auto &sender : ListNodesQueue) {
ctx.Send(sender, new TEvInterconnect::TEvNodesInfo(ListNodesCache));
}
ListNodesQueue.clear();
}
Expand Down Expand Up @@ -300,15 +313,18 @@ void TDynamicNameserver::UpdateState(const NKikimrNodeBroker::TNodesInfo &rec,
config->ExpiredNodes.emplace(node.GetNodeId(), info);
}

*NeedUpdateListNodesCache = true;
config->Epoch = rec.GetEpoch();
ctx.Schedule(config->Epoch.End - ctx.Now(),
new TEvPrivate::TEvUpdateEpoch(domain, config->Epoch.Id + 1));
} else {
// Note: this update may be optimized to only include new nodes
for (auto &node : rec.GetNodes()) {
auto nodeId = node.GetNodeId();
if (!config->DynamicNodes.contains(nodeId))
if (!config->DynamicNodes.contains(nodeId)) {
config->DynamicNodes.emplace(nodeId, node);
*NeedUpdateListNodesCache = true;
}
}
config->Epoch = rec.GetEpoch();
}
Expand Down Expand Up @@ -395,8 +411,8 @@ void TDynamicNameserver::Handle(TEvInterconnect::TEvGetNode::TPtr &ev, const TAc
ctx.Send(ev->Sender, reply.Release());
} else {
const TInstant deadline = ev->Get()->Deadline;
ctx.RegisterWithSameMailbox(new TDynamicNodeSearcher(SelfId(), nodeId, DynamicConfigs[domain], ev.Release(),
deadline));
ctx.RegisterWithSameMailbox(new TDynamicNodeSearcher(SelfId(), nodeId, DynamicConfigs[domain],
NeedUpdateListNodesCache, ev.Release(), deadline));
}
}
}
Expand Down Expand Up @@ -455,6 +471,7 @@ void TDynamicNameserver::Handle(NConsole::TEvConsole::TEvConfigNotificationReque
auto newStaticConfig = BuildNameserverTable(config.GetNameserviceConfig());
if (StaticConfig->StaticNodeTable != newStaticConfig->StaticNodeTable) {
StaticConfig = std::move(newStaticConfig);
*NeedUpdateListNodesCache = true;
for (const auto& subscriber : StaticNodeChangeSubscribers) {
TActivationContext::Send(new IEventHandle(SelfId(), subscriber, new TEvInterconnect::TEvListNodes));
}
Expand Down
15 changes: 13 additions & 2 deletions ydb/core/mind/dynamic_nameserver_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,12 @@ class TDynamicNodeResolverBase : public TActorBootstrapped<TDynamicNodeResolverB
}

TDynamicNodeResolverBase(TActorId owner, ui32 nodeId, TDynamicConfigPtr config,
std::shared_ptr<bool> needUpdateListNodesCache,
TAutoPtr<IEventHandle> origRequest, TInstant deadline)
: Owner(owner)
, NodeId(nodeId)
, Config(config)
, NeedUpdateListNodesCache(needUpdateListNodesCache)
, OrigRequest(origRequest)
, Deadline(deadline)
{
Expand Down Expand Up @@ -118,6 +120,7 @@ class TDynamicNodeResolverBase : public TActorBootstrapped<TDynamicNodeResolverB
TActorId Owner;
ui32 NodeId;
TDynamicConfigPtr Config;
std::shared_ptr<bool> NeedUpdateListNodesCache;
TAutoPtr<IEventHandle> OrigRequest;
const TInstant Deadline;

Expand All @@ -128,8 +131,9 @@ class TDynamicNodeResolverBase : public TActorBootstrapped<TDynamicNodeResolverB
class TDynamicNodeResolver : public TDynamicNodeResolverBase {
public:
TDynamicNodeResolver(TActorId owner, ui32 nodeId, TDynamicConfigPtr config,
std::shared_ptr<bool> needUpdateListNodesCache,
TAutoPtr<IEventHandle> origRequest, TInstant deadline)
: TDynamicNodeResolverBase(owner, nodeId, config, origRequest, deadline)
: TDynamicNodeResolverBase(owner, nodeId, config, needUpdateListNodesCache, origRequest, deadline)
{
}

Expand All @@ -140,8 +144,9 @@ class TDynamicNodeResolver : public TDynamicNodeResolverBase {
class TDynamicNodeSearcher : public TDynamicNodeResolverBase {
public:
TDynamicNodeSearcher(TActorId owner, ui32 nodeId, TDynamicConfigPtr config,
std::shared_ptr<bool> needUpdateListNodesCache,
TAutoPtr<IEventHandle> origRequest, TInstant deadline)
: TDynamicNodeResolverBase(owner, nodeId, config, origRequest, deadline)
: TDynamicNodeResolverBase(owner, nodeId, config, needUpdateListNodesCache, origRequest, deadline)
{
}

Expand Down Expand Up @@ -180,6 +185,8 @@ class TDynamicNameserver : public TActorBootstrapped<TDynamicNameserver> {

TDynamicNameserver(const TIntrusivePtr<TTableNameserverSetup> &setup, ui32 resolvePoolId)
: StaticConfig(setup)
, ListNodesCache(nullptr)
, NeedUpdateListNodesCache(std::make_shared<bool>(true))
, ResolvePoolId(resolvePoolId)
{
Y_ABORT_UNLESS(StaticConfig->IsEntriesUnique());
Expand Down Expand Up @@ -258,6 +265,10 @@ class TDynamicNameserver : public TActorBootstrapped<TDynamicNameserver> {
TIntrusivePtr<TTableNameserverSetup> StaticConfig;
std::array<TDynamicConfigPtr, DOMAINS_COUNT> DynamicConfigs;
TVector<TActorId> ListNodesQueue;

TIntrusiveVector<TEvInterconnect::TNodeInfo>::TPtr ListNodesCache;
std::shared_ptr<bool> NeedUpdateListNodesCache;

std::array<TActorId, DOMAINS_COUNT> NodeBrokerPipes;
// When ListNodes requests are sent to NodeBroker tablets this
// bitmap indicates domains which didn't answer yet.
Expand Down
Loading

0 comments on commit 83459a8

Please sign in to comment.