Skip to content

Commit

Permalink
YQ-3850 Shared reading: fix interconnect subscribing (#11506)
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds authored Nov 12, 2024
1 parent a2e65ee commit a431c94
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 31 deletions.
135 changes: 105 additions & 30 deletions ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,19 @@ struct TEvPrivate {
EvCoordinatorPing = EvBegin + 20,
EvUpdateMetrics,
EvPrintStateToLog,
EvTryConnect,
EvEnd
};

static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
struct TEvCoordinatorPing : NActors::TEventLocal<TEvCoordinatorPing, EvCoordinatorPing> {};
struct TEvUpdateMetrics : public NActors::TEventLocal<TEvUpdateMetrics, EvUpdateMetrics> {};
struct TEvPrintStateToLog : public NActors::TEventLocal<TEvPrintStateToLog, EvPrintStateToLog> {};
struct TEvTryConnect : public NActors::TEventLocal<TEvTryConnect, EvTryConnect> {
TEvTryConnect(ui32 nodeId = 0)
: NodeId(nodeId) {}
ui32 NodeId = 0;
};
};

struct TQueryStat {
Expand Down Expand Up @@ -119,6 +125,92 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
}
};

struct TNodesTracker{
class TRetryState {
public:
TDuration GetNextDelay() {
constexpr TDuration MaxDelay = TDuration::Seconds(10);
constexpr TDuration MinDelay = TDuration::MilliSeconds(100); // from second retry
TDuration ret = Delay; // The first delay is zero
Delay = ClampVal(Delay * 2, MinDelay, MaxDelay);
return ret ? RandomizeDelay(ret) : ret;
}
private:
static TDuration RandomizeDelay(TDuration baseDelay) {
const TDuration::TValue half = baseDelay.GetValue() / 2;
return TDuration::FromValue(half + RandomNumber<TDuration::TValue>(half));
}
private:
TDuration Delay; // The first time retry will be done instantly.
};

struct TNodeState {
bool Connected = false;
bool RetryScheduled = false;
TMaybe<TRetryState> RetryState;
};
public:
void Init(const NActors::TActorId& selfId) {
SelfId = selfId;
}

void AddNode(ui32 nodeId) {
if (Nodes.contains(nodeId)) {
return;
}
HandleNodeDisconnected(nodeId);
}

void TryConnect(ui32 nodeId) {
auto& state = Nodes[nodeId];
state.RetryScheduled = false;
if (state.Connected) {
return;
}
auto connectEvent = MakeHolder<NActors::TEvInterconnect::TEvConnectNode>();
auto proxyId = NActors::TActivationContext::InterconnectProxy(nodeId);
NActors::TActivationContext::Send(
new NActors::IEventHandle(proxyId, SelfId, connectEvent.Release(), 0, 0));
}

bool GetNodeConnected(ui32 nodeId) {
return Nodes[nodeId].Connected;
}

void HandleNodeConnected(ui32 nodeId) {
auto& state = Nodes[nodeId];
state.Connected = true;
state.RetryState = Nothing();
}

void HandleNodeDisconnected(ui32 nodeId) {
auto& state = Nodes[nodeId];
state.Connected = false;
if (state.RetryScheduled) {
return;
}
state.RetryScheduled = true;
if (!state.RetryState) {
state.RetryState.ConstructInPlace();
}
auto ev = MakeHolder<TEvPrivate::TEvTryConnect>(nodeId);
auto delay = state.RetryState->GetNextDelay();
NActors::TActivationContext::Schedule(delay, new NActors::IEventHandle(SelfId, SelfId, ev.Release()));
}

void PrintInternalState(TStringStream& stream) const {
stream << "Nodes states: \n";
for (const auto& [nodeId, state] : Nodes) {
stream << " id " << nodeId << " connected " << state.Connected << " retry scheduled " << state.RetryScheduled << "\n";
}
}

private:
TMap<ui32, TNodeState> Nodes;
NActors::TActorId SelfId;
TString LogPrefix = "RowDispatcher: ";
};


NConfig::TRowDispatcherConfig Config;
NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
Expand All @@ -134,8 +226,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
const ::NMonitoring::TDynamicCounterPtr Counters;
TRowDispatcherMetrics Metrics;
NYql::IPqGateway::TPtr PqGateway;
THashSet<TActorId> InterconnectSessions;
TMap<ui32, bool> NodeConnected;
TNodesTracker NodesTracker;

struct ConsumerCounters {
ui64 NewDataArrived = 0;
Expand Down Expand Up @@ -222,15 +313,14 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
void Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev);

void Handle(NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev);
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr&);
void Handle(const TEvPrivate::TEvTryConnect::TPtr&);
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr&);
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr&);
void Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&);
void Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&);
void Handle(const NMon::TEvHttpInfo::TPtr&);

void DeleteConsumer(const ConsumerSessionKey& key);
void UpdateInterconnectSessions(const NActors::TActorId& interconnectSession);
void UpdateMetrics();
TString GetInternalState();

Expand All @@ -250,7 +340,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
hFunc(NFq::TEvRowDispatcher::TEvSessionError, Handle);
hFunc(NFq::TEvRowDispatcher::TEvStatus, Handle);
hFunc(NFq::TEvRowDispatcher::TEvSessionStatistic, Handle);
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle);
hFunc(TEvPrivate::TEvTryConnect, Handle);
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat, Handle);
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed, Handle);
hFunc(NFq::TEvRowDispatcher::TEvHeartbeat, Handle);
Expand Down Expand Up @@ -300,6 +390,7 @@ void TRowDispatcher::Bootstrap() {
mon->RegisterActorPage(actorsMonPage, "row_dispatcher", "Row Dispatcher", false,
TlsActivationContext->ExecutorThread.ActorSystem, SelfId());
}
NodesTracker.Init(SelfId());
}

void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev) {
Expand All @@ -317,15 +408,15 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr&

void TRowDispatcher::HandleConnected(TEvInterconnect::TEvNodeConnected::TPtr& ev) {
LOG_ROW_DISPATCHER_DEBUG("EvNodeConnected, node id " << ev->Get()->NodeId);
NodeConnected[ev->Get()->NodeId] = true;
NodesTracker.HandleNodeConnected(ev->Get()->NodeId);
for (auto& [actorId, consumer] : Consumers) {
consumer->EventsQueue.HandleNodeConnected(ev->Get()->NodeId);
}
}

void TRowDispatcher::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
LOG_ROW_DISPATCHER_DEBUG("TEvNodeDisconnected, node id " << ev->Get()->NodeId);
NodeConnected[ev->Get()->NodeId] = false;
NodesTracker.HandleNodeDisconnected(ev->Get()->NodeId);
for (auto& [actorId, consumer] : Consumers) {
consumer->EventsQueue.HandleNodeDisconnected(ev->Get()->NodeId);
}
Expand Down Expand Up @@ -353,7 +444,7 @@ void TRowDispatcher::Handle(NActors::TEvents::TEvPong::TPtr&) {

void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe::TPtr& ev) {
LOG_ROW_DISPATCHER_DEBUG("TEvCoordinatorChangesSubscribe from " << ev->Sender);
UpdateInterconnectSessions(ev->InterconnectSession);
NodesTracker.AddNode(ev->Sender.NodeId());
CoordinatorChangedSubscribers.insert(ev->Sender);
if (!CoordinatorActorId) {
return;
Expand Down Expand Up @@ -387,6 +478,7 @@ void TRowDispatcher::UpdateMetrics() {

TString TRowDispatcher::GetInternalState() {
TStringStream str;
NodesTracker.PrintInternalState(str);
str << "Statistics:\n";
for (auto& [key, sessionsInfo] : TopicSessions) {
str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicPath << " / " << key.PartitionId;
Expand All @@ -410,7 +502,7 @@ TString TRowDispatcher::GetInternalState() {
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
LOG_ROW_DISPATCHER_DEBUG("TEvStartSession from " << ev->Sender << ", topicPath " << ev->Get()->Record.GetSource().GetTopicPath() <<
" partitionId " << ev->Get()->Record.GetPartitionId());
UpdateInterconnectSessions(ev->InterconnectSession);
NodesTracker.AddNode(ev->Sender.NodeId());
TMaybe<ui64> readOffset;
if (ev->Get()->Record.HasOffset()) {
readOffset = ev->Get()->Record.GetOffset();
Expand All @@ -430,7 +522,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
LOG_ROW_DISPATCHER_DEBUG("Topic session count " << topicSessionInfo.Sessions.size());
Y_ENSURE(topicSessionInfo.Sessions.size() <= 1);

auto consumerInfo = MakeAtomicShared<ConsumerInfo>(ev->Sender, SelfId(), NextEventQueueId++, ev->Get()->Record, TActorId(), NodeConnected[ev->Sender.NodeId()]);
auto consumerInfo = MakeAtomicShared<ConsumerInfo>(ev->Sender, SelfId(), NextEventQueueId++, ev->Get()->Record, TActorId(), NodesTracker.GetNodeConnected(ev->Sender.NodeId()));
Consumers[key] = consumerInfo;
ConsumersByEventQueueId[consumerInfo->EventQueueId] = consumerInfo;
if (!consumerInfo->EventsQueue.OnEventReceived(ev)) {
Expand Down Expand Up @@ -575,14 +667,9 @@ void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClo
}
}

void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr& ev) {
LOG_ROW_DISPATCHER_TRACE("TEvRetry " << ev->Get()->EventQueueId);
auto it = ConsumersByEventQueueId.find(ev->Get()->EventQueueId);
if (it == ConsumersByEventQueueId.end()) {
LOG_ROW_DISPATCHER_WARN("No consumer with EventQueueId = " << ev->Get()->EventQueueId);
return;
}
it->second->EventsQueue.Retry();
void TRowDispatcher::Handle(const TEvPrivate::TEvTryConnect::TPtr& ev) {
LOG_ROW_DISPATCHER_TRACE("TEvTryConnect to node id " << ev->Get()->NodeId);
NodesTracker.TryConnect(ev->Get()->NodeId);
}

void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr& ev) {
Expand Down Expand Up @@ -705,18 +792,6 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev
}
}

void TRowDispatcher::UpdateInterconnectSessions(const NActors::TActorId& interconnectSession) {
if (!interconnectSession) {
return;
}
auto sessionsIt = InterconnectSessions.find(interconnectSession);
if (sessionsIt != InterconnectSessions.end()) {
return;
}
Send(interconnectSession, new NActors::TEvents::TEvSubscribe, IEventHandle::FlagTrackDelivery);
InterconnectSessions.insert(interconnectSession);
}

} // namespace

////////////////////////////////////////////////////////////////////////////////
Expand Down
7 changes: 6 additions & 1 deletion ydb/library/yql/dq/actors/common/retry_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,12 @@ TDuration TRetryEventsQueue::TRetryState::RandomizeDelay(TDuration baseDelay) {
}

void TRetryEventsQueue::PrintInternalState(TStringStream& stream) const {
stream << "id " << EventQueueId << ", NextSeqNo "
stream << "id " << EventQueueId;
if (LocalRecipient) {
stream << ", LocalRecipient\n";
return;
}
stream << ", NextSeqNo "
<< NextSeqNo << ", MyConfSeqNo " << MyConfirmedSeqNo << ", SeqNos " << ReceivedEventsSeqNos.size() << ", events size " << Events.size() << ", connected " << Connected << "\n";
}

Expand Down

0 comments on commit a431c94

Please sign in to comment.