diff --git a/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp b/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp index c2a77bbadd16..445493b986b7 100644 --- a/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp @@ -116,10 +116,11 @@ namespace NActors { Metrics->SetClockSkewMicrosec(0); - Context->UpdateState = EUpdateState::NONE; + Context->UpdateInFlight = false; + Context->NextUpdatePending = false; // ensure that we do not spawn new session while the previous one is still alive - TAtomicBase sessions = AtomicIncrement(Context->NumInputSessions); + auto sessions = ++Context->NumInputSessions; Y_ABORT_UNLESS(sessions == 1, "sessions# %" PRIu64, ui64(sessions)); // calculate number of bytes to catch @@ -162,7 +163,7 @@ namespace NActors { } if (termEv) { - AtomicDecrement(Context->NumInputSessions); + --Context->NumInputSessions; Send(SessionId, termEv.release()); PassAway(); Socket.Reset(); @@ -285,41 +286,12 @@ namespace NActors { UpdateFromInputSession->Ping = Min(UpdateFromInputSession->Ping, ping); } - for (;;) { - EUpdateState state = Context->UpdateState; - EUpdateState next; - - // calculate next state - switch (state) { - case EUpdateState::NONE: - case EUpdateState::CONFIRMING: - // we have no inflight messages to session actor, we will issue one a bit later - next = EUpdateState::INFLIGHT; - break; - - case EUpdateState::INFLIGHT: - case EUpdateState::INFLIGHT_AND_PENDING: - // we already have inflight message, so we will keep pending message and session actor will issue - // TEvConfirmUpdate to kick processing - next = EUpdateState::INFLIGHT_AND_PENDING; - break; - } - - if (Context->UpdateState.compare_exchange_weak(state, next)) { - switch (next) { - case EUpdateState::INFLIGHT: - Send(SessionId, UpdateFromInputSession.Release()); - break; - - case EUpdateState::INFLIGHT_AND_PENDING: - Y_ABORT_UNLESS(UpdateFromInputSession); - break; - - default: - Y_ABORT("unexpected state"); - } - break; - } + if (Context->UpdateInFlight || Context->NextUpdatePending) { + Context->NextUpdatePending = true; + Y_ABORT_UNLESS(UpdateFromInputSession); + } else { + Context->UpdateInFlight = true; + Send(SessionId, UpdateFromInputSession.Release()); } for (size_t channel = 0; channel < InputTrafficArray.size(); ++channel) { @@ -703,22 +675,12 @@ namespace NActors { } void TInputSessionTCP::HandleConfirmUpdate() { - for (;;) { - switch (EUpdateState state = Context->UpdateState) { - case EUpdateState::NONE: - case EUpdateState::INFLIGHT: - case EUpdateState::INFLIGHT_AND_PENDING: - // here we may have a race - return; - - case EUpdateState::CONFIRMING: - Y_ABORT_UNLESS(UpdateFromInputSession); - if (Context->UpdateState.compare_exchange_weak(state, EUpdateState::INFLIGHT)) { - Send(SessionId, UpdateFromInputSession.Release()); - return; - } - } - } + Y_ABORT_UNLESS(UpdateFromInputSession); + Y_ABORT_UNLESS(!Context->UpdateInFlight); + Y_ABORT_UNLESS(Context->NextUpdatePending); + Context->UpdateInFlight = true; + Context->NextUpdatePending = false; + Send(SessionId, UpdateFromInputSession.Release()); } ssize_t TInputSessionTCP::Read(NInterconnect::TStreamSocket& socket, const TPollerToken::TPtr& token, diff --git a/ydb/library/actors/interconnect/interconnect_tcp_session.cpp b/ydb/library/actors/interconnect/interconnect_tcp_session.cpp index 001f7e69ce9e..355b97c0e497 100644 --- a/ydb/library/actors/interconnect/interconnect_tcp_session.cpp +++ b/ydb/library/actors/interconnect/interconnect_tcp_session.cpp @@ -358,28 +358,10 @@ namespace NActors { GenerateTraffic(); - for (;;) { - switch (EUpdateState state = ReceiveContext->UpdateState) { - case EUpdateState::NONE: - case EUpdateState::CONFIRMING: - Y_ABORT("unexpected state"); - - case EUpdateState::INFLIGHT: - // this message we are processing was the only one in flight, so we can reset state to NONE here - if (ReceiveContext->UpdateState.compare_exchange_weak(state, EUpdateState::NONE)) { - return; - } - break; - - case EUpdateState::INFLIGHT_AND_PENDING: - // there is more messages pending from the input session actor, so we have to inform it to release - // that message - if (ReceiveContext->UpdateState.compare_exchange_weak(state, EUpdateState::CONFIRMING)) { - Send(ev->Sender, new TEvConfirmUpdate); - return; - } - break; - } + Y_ABORT_UNLESS(ReceiveContext->UpdateInFlight); + ReceiveContext->UpdateInFlight = false; + if (ReceiveContext->NextUpdatePending) { + Send(ev->Sender, new TEvConfirmUpdate); } } } @@ -1060,7 +1042,7 @@ namespace NActors { flagState == EFlag::YELLOW, flagState == EFlag::ORANGE, flagState == EFlag::RED, - ReceiveContext->ClockSkew_us.load(), + ReceiveContext->ClockSkew_us, reportClockSkew}); } diff --git a/ydb/library/actors/interconnect/interconnect_tcp_session.h b/ydb/library/actors/interconnect/interconnect_tcp_session.h index 0109f3a9b4af..d58ef897a505 100644 --- a/ydb/library/actors/interconnect/interconnect_tcp_session.h +++ b/ydb/library/actors/interconnect/interconnect_tcp_session.h @@ -104,19 +104,19 @@ namespace NActors { ui64 ControlPacketId = 0; // last processed packet by input session - std::atomic_uint64_t LastPacketSerialToConfirm = 0; + ui64 LastPacketSerialToConfirm = 0; static constexpr uint64_t LastPacketSerialToConfirmLockBit = uint64_t(1) << 63; // for hardened checks - TAtomic NumInputSessions = 0; + ui32 NumInputSessions = 0; NHPTimer::STime StartTime; - std::atomic PingRTT_us = 0; - std::atomic ClockSkew_us = 0; + ui64 PingRTT_us = 0; + i64 ClockSkew_us = 0; - std::atomic UpdateState; - static_assert(std::atomic::is_always_lock_free); + bool UpdateInFlight = false; + bool NextUpdatePending = false; bool MainWriteBlocked = false; bool XdcWriteBlocked = false; @@ -159,28 +159,17 @@ namespace NActors { // returns false if sessions needs to be terminated bool AdvanceLastPacketSerialToConfirm(ui64 nextValue) { - for (;;) { - uint64_t value = LastPacketSerialToConfirm.load(); - if (value & LastPacketSerialToConfirmLockBit) { - return false; - } - Y_DEBUG_ABORT_UNLESS(value + 1 == nextValue); - if (LastPacketSerialToConfirm.compare_exchange_weak(value, nextValue)) { - return true; - } + if (LastPacketSerialToConfirm & LastPacketSerialToConfirmLockBit) { + return false; } + Y_DEBUG_ABORT_UNLESS(LastPacketSerialToConfirm + 1 == nextValue); + LastPacketSerialToConfirm = nextValue; + return true; } ui64 LockLastPacketSerialToConfirm() { - for (;;) { - uint64_t value = LastPacketSerialToConfirm.load(); - if (value & LastPacketSerialToConfirmLockBit) { - return value & ~LastPacketSerialToConfirmLockBit; - } - if (LastPacketSerialToConfirm.compare_exchange_strong(value, value | LastPacketSerialToConfirmLockBit)) { - return value; - } - } + LastPacketSerialToConfirm |= LastPacketSerialToConfirmLockBit; + return GetLastPacketSerialToConfirm(); } void UnlockLastPacketSerialToConfirm() { @@ -188,7 +177,7 @@ namespace NActors { } ui64 GetLastPacketSerialToConfirm() { - return LastPacketSerialToConfirm.load() & ~LastPacketSerialToConfirmLockBit; + return LastPacketSerialToConfirm & ~LastPacketSerialToConfirmLockBit; } };