Skip to content

Commit

Permalink
YQ-3855 Add cookie to events RD <-> read_actor (#11562)
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds authored Nov 18, 2024
1 parent b8d3c86 commit c6b53a8
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 82 deletions.
74 changes: 45 additions & 29 deletions ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,16 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
ui64 eventQueueId,
NFq::NRowDispatcherProto::TEvStartSession& proto,
TActorId topicSessionId,
bool alreadyConnected)
bool alreadyConnected,
ui64 generation)
: ReadActorId(readActorId)
, SourceParams(proto.GetSource())
, PartitionId(proto.GetPartitionId())
, EventQueueId(eventQueueId)
, Proto(proto)
, TopicSessionId(topicSessionId)
, QueryId(proto.GetQueryId()) {
, QueryId(proto.GetQueryId())
, Generation(generation) {
EventsQueue.Init("txId", selfId, selfId, eventQueueId, /* KeepAlive */ true, /* UseConnect */ false);
EventsQueue.OnNewRecipientId(readActorId, true, alreadyConnected);
}
Expand All @@ -267,6 +269,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
bool PendingGetNextBatch = false;
bool PendingNewDataArrived = false;
TopicSessionClientStatistic Stat;
ui64 Generation;
};

struct SessionInfo {
Expand Down Expand Up @@ -325,6 +328,8 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
void DeleteConsumer(const ConsumerSessionKey& key);
void UpdateMetrics();
TString GetInternalState();
template <class TEventPtr>
bool CheckSession(TAtomicSharedPtr<ConsumerInfo>& consumer, const TEventPtr& ev);

STRICT_STFUNC(
StateFunc, {
Expand Down Expand Up @@ -496,7 +501,8 @@ TString TRowDispatcher::GetInternalState() {
<< consumer->Stat.UnreadRows << " unread bytes " << consumer->Stat.UnreadBytes << " offset " << consumer->Stat.Offset
<< " get " << consumer->Counters.GetNextBatch
<< " arr " << consumer->Counters.NewDataArrived << " btc " << consumer->Counters.MessageBatch
<< " pend get " << consumer->PendingGetNextBatch << " pend new " << consumer->PendingNewDataArrived << " ";
<< " pend get " << consumer->PendingGetNextBatch << " pend new " << consumer->PendingNewDataArrived
<< " conn id " << consumer->Generation << " ";
str << " retry queue: ";
consumer->EventsQueue.PrintInternalState(str);
}
Expand All @@ -507,7 +513,7 @@ TString TRowDispatcher::GetInternalState() {

void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
LOG_ROW_DISPATCHER_DEBUG("TEvStartSession from " << ev->Sender << ", topicPath " << ev->Get()->Record.GetSource().GetTopicPath() <<
" partitionId " << ev->Get()->Record.GetPartitionId());
" partitionId " << ev->Get()->Record.GetPartitionId() << " cookie " << ev->Cookie);
NodesTracker.AddNode(ev->Sender.NodeId());
TMaybe<ui64> readOffset;
if (ev->Get()->Record.HasOffset()) {
Expand All @@ -517,24 +523,27 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
ConsumerSessionKey key{ev->Sender, ev->Get()->Record.GetPartitionId()};
auto it = Consumers.find(key);
if (it != Consumers.end()) {
LOG_ROW_DISPATCHER_ERROR("Consumer already exists, ignore StartSession");
return;
if (ev->Cookie <= it->second->Generation) {
LOG_ROW_DISPATCHER_WARN("Consumer already exists, ignore StartSession");
return;
}
LOG_ROW_DISPATCHER_WARN("Consumer already exists, new consumer with new generation (" << ev->Cookie << ", current "
<< it->second->Generation << "), remove old consumer, sender " << ev->Sender << ", topicPath "
<< ev->Get()->Record.GetSource().GetTopicPath() <<" partitionId " << ev->Get()->Record.GetPartitionId() << " cookie " << ev->Cookie);
DeleteConsumer(key);
}
const auto& source = ev->Get()->Record.GetSource();

TActorId sessionActorId;
TopicSessionKey topicKey{source.GetEndpoint(), source.GetDatabase(), source.GetTopicPath(), ev->Get()->Record.GetPartitionId()};
TopicSessionInfo& topicSessionInfo = TopicSessions[topicKey];
LOG_ROW_DISPATCHER_DEBUG("Topic session count " << topicSessionInfo.Sessions.size());
Y_ENSURE(topicSessionInfo.Sessions.size() <= 1);

auto consumerInfo = MakeAtomicShared<ConsumerInfo>(ev->Sender, SelfId(), NextEventQueueId++, ev->Get()->Record, TActorId(), NodesTracker.GetNodeConnected(ev->Sender.NodeId()));
auto consumerInfo = MakeAtomicShared<ConsumerInfo>(ev->Sender, SelfId(), NextEventQueueId++, ev->Get()->Record, TActorId(), NodesTracker.GetNodeConnected(ev->Sender.NodeId()), ev->Cookie);
Consumers[key] = consumerInfo;
ConsumersByEventQueueId[consumerInfo->EventQueueId] = consumerInfo;
if (!consumerInfo->EventsQueue.OnEventReceived(ev)) {
const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta();
const ui64 seqNo = meta.GetSeqNo();
LOG_ROW_DISPATCHER_ERROR("TEvStartSession: wrong seq num from " << ev->Sender.ToString() << ", seqNo " << seqNo << ", ignore message");
if (!CheckSession(consumerInfo, ev)) {
return;
}

if (topicSessionInfo.Sessions.empty()) {
Expand Down Expand Up @@ -564,7 +573,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
sessionActorId = sessionIt->first;
}
consumerInfo->TopicSessionId = sessionActorId;
consumerInfo->EventsQueue.Send(new NFq::TEvRowDispatcher::TEvStartSessionAck(consumerInfo->Proto));
consumerInfo->EventsQueue.Send(new NFq::TEvRowDispatcher::TEvStartSessionAck(consumerInfo->Proto), consumerInfo->Generation);

Forward(ev, sessionActorId);
Metrics.ClientsCount->Set(Consumers.size());
Expand All @@ -581,10 +590,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvGetNextBatch::TPtr& ev) {
LOG_ROW_DISPATCHER_WARN("Ignore TEvGetNextBatch, no such session");
return;
}
if (!it->second->EventsQueue.OnEventReceived(ev)) {
const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta();
const ui64 seqNo = meta.GetSeqNo();
LOG_ROW_DISPATCHER_ERROR("TEvGetNextBatch: wrong seq num from " << ev->Sender.ToString() << ", seqNo " << seqNo << ", ignore message");
if (!CheckSession(it->second, ev)) {
return;
}
it->second->PendingNewDataArrived = false;
Expand All @@ -599,10 +605,24 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev) {
ConsumerSessionKey key{ev->Sender, ev->Get()->Record.GetPartitionId()};
auto it = Consumers.find(key);
if (it == Consumers.end()) {
LOG_ROW_DISPATCHER_WARN("Wrong consumer, sender " << ev->Sender << ", part id " << ev->Cookie);
LOG_ROW_DISPATCHER_WARN("Wrong consumer, sender " << ev->Sender << ", part id " << ev->Get()->Record.GetPartitionId());
return;
}
it->second->EventsQueue.OnEventReceived(ev);
CheckSession(it->second, ev);
}

template <class TEventPtr>
bool TRowDispatcher::CheckSession(TAtomicSharedPtr<ConsumerInfo>& consumer, const TEventPtr& ev) {
if (ev->Cookie != consumer->Generation) {
LOG_ROW_DISPATCHER_WARN("Wrong message generation (" << typeid(TEventPtr).name() << "), sender " << ev->Sender << " cookie " << ev->Cookie << ", session generation " << consumer->Generation);
return false;
}
if (!consumer->EventsQueue.OnEventReceived(ev)) {
const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta();
LOG_ROW_DISPATCHER_WARN("Wrong seq num ignore message (" << typeid(TEventPtr).name() << ") seqNo " << meta.GetSeqNo() << " from " << ev->Sender.ToString());
return false;
}
return true;
}

void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) {
Expand All @@ -614,11 +634,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) {
LOG_ROW_DISPATCHER_WARN("Wrong consumer, sender " << ev->Sender << ", part id " << ev->Get()->Record.GetPartitionId());
return;
}
if (!it->second->EventsQueue.OnEventReceived(ev)) {
const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta();
const ui64 seqNo = meta.GetSeqNo();

LOG_ROW_DISPATCHER_ERROR("TEvStopSession: wrong seq num from " << ev->Sender.ToString() << ", seqNo " << seqNo << ", ignore message");
if (!CheckSession(it->second, ev)) {
return;
}
DeleteConsumer(key);
Expand Down Expand Up @@ -689,7 +705,7 @@ void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbea

bool needSend = sessionInfo->EventsQueue.Heartbeat();
if (needSend) {
sessionInfo->EventsQueue.Send(new NFq::TEvRowDispatcher::TEvHeartbeat(sessionInfo->PartitionId));
sessionInfo->EventsQueue.Send(new NFq::TEvRowDispatcher::TEvHeartbeat(sessionInfo->PartitionId), sessionInfo->Generation);
}
}

Expand All @@ -704,7 +720,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev)
LOG_ROW_DISPATCHER_TRACE("Forward TEvNewDataArrived to " << ev->Get()->ReadActorId);
it->second->PendingNewDataArrived = true;
it->second->Counters.NewDataArrived++;
it->second->EventsQueue.Send(ev->Release().Release());
it->second->EventsQueue.Send(ev->Release().Release(), it->second->Generation);
}

void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev) {
Expand All @@ -719,7 +735,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev) {
LOG_ROW_DISPATCHER_TRACE("Forward TEvMessageBatch to " << ev->Get()->ReadActorId);
it->second->PendingGetNextBatch = false;
it->second->Counters.MessageBatch++;
it->second->EventsQueue.Send(ev->Release().Release());
it->second->EventsQueue.Send(ev->Release().Release(), it->second->Generation);
}

void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev) {
Expand All @@ -732,7 +748,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev) {
}
Metrics.ErrorsCount->Inc();
LOG_ROW_DISPATCHER_TRACE("Forward TEvSessionError to " << ev->Get()->ReadActorId);
it->second->EventsQueue.Send(ev->Release().Release());
it->second->EventsQueue.Send(ev->Release().Release(), it->second->Generation);
DeleteConsumer(key);
}

Expand All @@ -745,7 +761,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStatus::TPtr& ev) {
return;
}
LOG_ROW_DISPATCHER_TRACE("Forward TEvStatus to " << ev->Get()->ReadActorId);
it->second->EventsQueue.Send(ev->Release().Release());
it->second->EventsQueue.Send(ev->Release().Release(), it->second->Generation);
}

void TRowDispatcher::Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&) {
Expand Down
28 changes: 22 additions & 6 deletions ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,22 +119,22 @@ class TFixture : public NUnitTest::TBaseFixture {
return settings;
}

void MockAddSession(const NYql::NPq::NProto::TDqPqTopicSource& source, ui64 partitionId, TActorId readActorId) {
void MockAddSession(const NYql::NPq::NProto::TDqPqTopicSource& source, ui64 partitionId, TActorId readActorId, ui64 generation = 1) {
auto event = new NFq::TEvRowDispatcher::TEvStartSession(
source,
partitionId, // partitionId
"Token",
Nothing(), // readOffset,
0, // StartingMessageTimestamp;
"QueryId");
Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event));
Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event, 0, generation));
}

void MockStopSession(const NYql::NPq::NProto::TDqPqTopicSource& source, ui64 partitionId, TActorId readActorId) {
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>();
event->Record.MutableSource()->CopyFrom(source);
event->Record.SetPartitionId(partitionId);
Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event.release()));
Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event.release(), 0, 1));
}

void MockNewDataArrived(ui64 partitionId, TActorId topicSessionId, TActorId readActorId) {
Expand All @@ -148,7 +148,7 @@ class TFixture : public NUnitTest::TBaseFixture {
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvMessageBatch>();
event->Record.SetPartitionId(partitionId);
event->ReadActorId = readActorId;
Runtime.Send(new IEventHandle(RowDispatcher, topicSessionId, event.release()));
Runtime.Send(new IEventHandle(RowDispatcher, topicSessionId, event.release(), 0, 1));
}

void MockSessionError(ui64 partitionId, TActorId topicSessionId, TActorId readActorId) {
Expand All @@ -161,7 +161,7 @@ class TFixture : public NUnitTest::TBaseFixture {
void MockGetNextBatch(ui64 partitionId, TActorId readActorId) {
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvGetNextBatch>();
event->Record.SetPartitionId(partitionId);
Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event.release()));
Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event.release(), 0, 1));
}

void ExpectStartSession(NActors::TActorId actorId) {
Expand All @@ -187,9 +187,10 @@ class TFixture : public NUnitTest::TBaseFixture {
UNIT_ASSERT(eventHolder->Get()->Record.GetPartitionId() == partitionId);
}

void ExpectStartSessionAck(NActors::TActorId readActorId) {
void ExpectStartSessionAck(NActors::TActorId readActorId, ui64 expectedGeneration = 1) {
auto eventHolder = Runtime.GrabEdgeEvent<NFq::TEvRowDispatcher::TEvStartSessionAck>(readActorId);
UNIT_ASSERT(eventHolder.Get() != nullptr);
UNIT_ASSERT(eventHolder->Cookie == expectedGeneration);
}

void ExpectMessageBatch(NActors::TActorId readActorId) {
Expand Down Expand Up @@ -351,6 +352,21 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) {
// Ignore data after StopSession
MockMessageBatch(PartitionId1, topicSession4, ReadActorId2);
}

Y_UNIT_TEST_F(ReinitConsumerIfNewGeneration, TFixture) {
MockAddSession(Source1, PartitionId0, ReadActorId1, 1);
auto topicSessionId = ExpectRegisterTopicSession();
ExpectStartSessionAck(ReadActorId1);
ExpectStartSession(topicSessionId);
ProcessData(ReadActorId1, PartitionId0, topicSessionId);

// ignore StartSession with same generation
MockAddSession(Source1, PartitionId0, ReadActorId1, 1);

// reinit consumer
MockAddSession(Source1, PartitionId0, ReadActorId1, 2);
ExpectStartSessionAck(ReadActorId1, 2);
}
}

}
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/actors/common/retry_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class TRetryEventsQueue {
void Send(THolder<T> ev, ui64 cookie = 0) {
if (LocalRecipient) {
LastSentDataTime = TInstant::Now();
NActors::TActivationContext::Send(new NActors::IEventHandle(RecipientId, SenderId, ev.Release(), cookie));
NActors::TActivationContext::Send(new NActors::IEventHandle(RecipientId, SenderId, ev.Release(), /* flags */ 0, cookie));
return;
}

Expand Down
Loading

0 comments on commit c6b53a8

Please sign in to comment.