Skip to content

Commit

Permalink
ydb_persqueue_public: cancel previous client context on reconnect (yd…
Browse files Browse the repository at this point in the history
  • Loading branch information
qyryq authored Apr 19, 2024
1 parent e5efcb1 commit 5a7fff3
Showing 1 changed file with 10 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -445,18 +445,19 @@ void TWriteSessionImpl::DoConnect(const TDuration& delay, const TString& endpoin
}
++ConnectionGeneration;
auto subclient = Client->GetClientForEndpoint(endpoint);
connectionFactory = subclient->CreateWriteSessionConnectionProcessorFactory();
auto clientContext = subclient->CreateContext();
ConnectionFactory = connectionFactory;

ClientContext = std::move(clientContext);
ServerMessage = std::make_shared<TServerMessage>();

if (!ClientContext) {
auto clientContext = subclient->CreateContext();
if (!clientContext) {
AbortImpl();
// Grpc and WriteSession is closing right now.
return;
}
auto prevClientContext = std::exchange(ClientContext, clientContext);

ServerMessage = std::make_shared<TServerMessage>();

connectionFactory = subclient->CreateWriteSessionConnectionProcessorFactory();
ConnectionFactory = connectionFactory;

connectContext = ClientContext->CreateContext();
if (delay)
Expand All @@ -477,8 +478,10 @@ void TWriteSessionImpl::DoConnect(const TDuration& delay, const TString& endpoin
if (prevConnectDelayContext)
Cancel(prevConnectDelayContext);
Cancel(prevConnectTimeoutContext);
Cancel(prevClientContext);
Y_ASSERT(connectContext);
Y_ASSERT(connectTimeoutContext);

reqSettings = TRpcRequestSettings::Make(Settings);

connectCallback = [cbContext = SelfContext,
Expand Down

0 comments on commit 5a7fff3

Please sign in to comment.