Skip to content

Commit

Permalink
Remove excessive atomicity from IC (#11027)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvru authored Oct 29, 2024
1 parent 799a8d0 commit 43363e5
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 102 deletions.
70 changes: 16 additions & 54 deletions ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,11 @@ namespace NActors {

Metrics->SetClockSkewMicrosec(0);

Context->UpdateState = EUpdateState::NONE;
Context->UpdateInFlight = false;
Context->NextUpdatePending = false;

// ensure that we do not spawn new session while the previous one is still alive
TAtomicBase sessions = AtomicIncrement(Context->NumInputSessions);
auto sessions = ++Context->NumInputSessions;
Y_ABORT_UNLESS(sessions == 1, "sessions# %" PRIu64, ui64(sessions));

// calculate number of bytes to catch
Expand Down Expand Up @@ -162,7 +163,7 @@ namespace NActors {
}

if (termEv) {
AtomicDecrement(Context->NumInputSessions);
--Context->NumInputSessions;
Send(SessionId, termEv.release());
PassAway();
Socket.Reset();
Expand Down Expand Up @@ -285,41 +286,12 @@ namespace NActors {
UpdateFromInputSession->Ping = Min(UpdateFromInputSession->Ping, ping);
}

for (;;) {
EUpdateState state = Context->UpdateState;
EUpdateState next;

// calculate next state
switch (state) {
case EUpdateState::NONE:
case EUpdateState::CONFIRMING:
// we have no inflight messages to session actor, we will issue one a bit later
next = EUpdateState::INFLIGHT;
break;

case EUpdateState::INFLIGHT:
case EUpdateState::INFLIGHT_AND_PENDING:
// we already have inflight message, so we will keep pending message and session actor will issue
// TEvConfirmUpdate to kick processing
next = EUpdateState::INFLIGHT_AND_PENDING;
break;
}

if (Context->UpdateState.compare_exchange_weak(state, next)) {
switch (next) {
case EUpdateState::INFLIGHT:
Send(SessionId, UpdateFromInputSession.Release());
break;

case EUpdateState::INFLIGHT_AND_PENDING:
Y_ABORT_UNLESS(UpdateFromInputSession);
break;

default:
Y_ABORT("unexpected state");
}
break;
}
if (Context->UpdateInFlight || Context->NextUpdatePending) {
Context->NextUpdatePending = true;
Y_ABORT_UNLESS(UpdateFromInputSession);
} else {
Context->UpdateInFlight = true;
Send(SessionId, UpdateFromInputSession.Release());
}

for (size_t channel = 0; channel < InputTrafficArray.size(); ++channel) {
Expand Down Expand Up @@ -703,22 +675,12 @@ namespace NActors {
}

void TInputSessionTCP::HandleConfirmUpdate() {
for (;;) {
switch (EUpdateState state = Context->UpdateState) {
case EUpdateState::NONE:
case EUpdateState::INFLIGHT:
case EUpdateState::INFLIGHT_AND_PENDING:
// here we may have a race
return;

case EUpdateState::CONFIRMING:
Y_ABORT_UNLESS(UpdateFromInputSession);
if (Context->UpdateState.compare_exchange_weak(state, EUpdateState::INFLIGHT)) {
Send(SessionId, UpdateFromInputSession.Release());
return;
}
}
}
Y_ABORT_UNLESS(UpdateFromInputSession);
Y_ABORT_UNLESS(!Context->UpdateInFlight);
Y_ABORT_UNLESS(Context->NextUpdatePending);
Context->UpdateInFlight = true;
Context->NextUpdatePending = false;
Send(SessionId, UpdateFromInputSession.Release());
}

ssize_t TInputSessionTCP::Read(NInterconnect::TStreamSocket& socket, const TPollerToken::TPtr& token,
Expand Down
28 changes: 5 additions & 23 deletions ydb/library/actors/interconnect/interconnect_tcp_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,28 +358,10 @@ namespace NActors {

GenerateTraffic();

for (;;) {
switch (EUpdateState state = ReceiveContext->UpdateState) {
case EUpdateState::NONE:
case EUpdateState::CONFIRMING:
Y_ABORT("unexpected state");

case EUpdateState::INFLIGHT:
// this message we are processing was the only one in flight, so we can reset state to NONE here
if (ReceiveContext->UpdateState.compare_exchange_weak(state, EUpdateState::NONE)) {
return;
}
break;

case EUpdateState::INFLIGHT_AND_PENDING:
// there is more messages pending from the input session actor, so we have to inform it to release
// that message
if (ReceiveContext->UpdateState.compare_exchange_weak(state, EUpdateState::CONFIRMING)) {
Send(ev->Sender, new TEvConfirmUpdate);
return;
}
break;
}
Y_ABORT_UNLESS(ReceiveContext->UpdateInFlight);
ReceiveContext->UpdateInFlight = false;
if (ReceiveContext->NextUpdatePending) {
Send(ev->Sender, new TEvConfirmUpdate);
}
}
}
Expand Down Expand Up @@ -1060,7 +1042,7 @@ namespace NActors {
flagState == EFlag::YELLOW,
flagState == EFlag::ORANGE,
flagState == EFlag::RED,
ReceiveContext->ClockSkew_us.load(),
ReceiveContext->ClockSkew_us,
reportClockSkew});
}

Expand Down
39 changes: 14 additions & 25 deletions ydb/library/actors/interconnect/interconnect_tcp_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,19 +104,19 @@ namespace NActors {
ui64 ControlPacketId = 0;

// last processed packet by input session
std::atomic_uint64_t LastPacketSerialToConfirm = 0;
ui64 LastPacketSerialToConfirm = 0;
static constexpr uint64_t LastPacketSerialToConfirmLockBit = uint64_t(1) << 63;

// for hardened checks
TAtomic NumInputSessions = 0;
ui32 NumInputSessions = 0;

NHPTimer::STime StartTime;

std::atomic<ui64> PingRTT_us = 0;
std::atomic<i64> ClockSkew_us = 0;
ui64 PingRTT_us = 0;
i64 ClockSkew_us = 0;

std::atomic<EUpdateState> UpdateState;
static_assert(std::atomic<EUpdateState>::is_always_lock_free);
bool UpdateInFlight = false;
bool NextUpdatePending = false;

bool MainWriteBlocked = false;
bool XdcWriteBlocked = false;
Expand Down Expand Up @@ -159,36 +159,25 @@ namespace NActors {

// returns false if sessions needs to be terminated
bool AdvanceLastPacketSerialToConfirm(ui64 nextValue) {
for (;;) {
uint64_t value = LastPacketSerialToConfirm.load();
if (value & LastPacketSerialToConfirmLockBit) {
return false;
}
Y_DEBUG_ABORT_UNLESS(value + 1 == nextValue);
if (LastPacketSerialToConfirm.compare_exchange_weak(value, nextValue)) {
return true;
}
if (LastPacketSerialToConfirm & LastPacketSerialToConfirmLockBit) {
return false;
}
Y_DEBUG_ABORT_UNLESS(LastPacketSerialToConfirm + 1 == nextValue);
LastPacketSerialToConfirm = nextValue;
return true;
}

ui64 LockLastPacketSerialToConfirm() {
for (;;) {
uint64_t value = LastPacketSerialToConfirm.load();
if (value & LastPacketSerialToConfirmLockBit) {
return value & ~LastPacketSerialToConfirmLockBit;
}
if (LastPacketSerialToConfirm.compare_exchange_strong(value, value | LastPacketSerialToConfirmLockBit)) {
return value;
}
}
LastPacketSerialToConfirm |= LastPacketSerialToConfirmLockBit;
return GetLastPacketSerialToConfirm();
}

void UnlockLastPacketSerialToConfirm() {
LastPacketSerialToConfirm &= ~LastPacketSerialToConfirmLockBit;
}

ui64 GetLastPacketSerialToConfirm() {
return LastPacketSerialToConfirm.load() & ~LastPacketSerialToConfirmLockBit;
return LastPacketSerialToConfirm & ~LastPacketSerialToConfirmLockBit;
}
};

Expand Down

0 comments on commit 43363e5

Please sign in to comment.