Skip to content

Commit

Permalink
ActorSystem cpu load log use std atomic (#646)
Browse files Browse the repository at this point in the history
  • Loading branch information
kitaisreal authored Dec 26, 2023
1 parent 90b5bf3 commit a9a47fa
Showing 1 changed file with 35 additions and 44 deletions.
79 changes: 35 additions & 44 deletions ydb/library/actors/util/cpu_load_log.h
Original file line number Diff line number Diff line change
@@ -1,44 +1,34 @@
#pragma once

#include "defs.h"
#include <library/cpp/deprecated/atomic/atomic.h>
#include <library/cpp/pop_count/popcount.h>

static constexpr ui64 BitDurationNs = 131'072; // A power of 2

template <ui64 DataSize>
struct TCpuLoadLog {
static constexpr ui64 BitsSize = DataSize * 64;
TAtomic LastTimeNs = 0;
ui64 Data[DataSize];
std::atomic<ui64> LastTimeNs = 0;
std::atomic<ui64> Data[DataSize] {};

TCpuLoadLog() {
LastTimeNs = 0;
for (size_t i = 0; i < DataSize; ++i) {
Data[i] = 0;
}
}
TCpuLoadLog() = default;

TCpuLoadLog(ui64 timeNs) {
LastTimeNs = timeNs;
for (size_t i = 0; i < DataSize; ++i) {
Data[i] = 0;
}
TCpuLoadLog(ui64 timeNs) : LastTimeNs(timeNs) {
}

void RegisterBusyPeriod(ui64 timeNs) {
RegisterBusyPeriod<true>(timeNs, AtomicGet(LastTimeNs));
RegisterBusyPeriod<true>(timeNs, LastTimeNs.load(std::memory_order_acquire));
}

template <bool ModifyLastTime>
void RegisterBusyPeriod(ui64 timeNs, ui64 lastTimeNs) {
timeNs |= 1ull;
if (timeNs < lastTimeNs) {
for (ui64 i = 0; i < DataSize; ++i) {
AtomicSet(Data[i], ~0ull);
Data[i].store(~0ull, std::memory_order_release);
}
if (ModifyLastTime) {
AtomicSet(LastTimeNs, timeNs);
LastTimeNs.store(timeNs, std::memory_order_release);
}
return;
}
Expand All @@ -51,60 +41,60 @@ struct TCpuLoadLog {
if (firstElementIdx == lastElementIdx) {
ui64 prevValue = 0;
if (firstBitIdx != 0) {
prevValue = AtomicGet(Data[firstElementIdx % DataSize]);
prevValue = Data[firstElementIdx % DataSize].load(std::memory_order_acquire);
}
const ui64 bits = (((~0ull) << (firstBitIdx + (63-lastBitIdx))) >> (63-lastBitIdx));
const ui64 newValue = prevValue | bits;
AtomicSet(Data[firstElementIdx % DataSize], newValue);
Data[firstElementIdx % DataSize].store(newValue, std::memory_order_release);
if (ModifyLastTime) {
AtomicSet(LastTimeNs, timeNs);
LastTimeNs.store(timeNs, std::memory_order_release);
}
return;
}
// process the first element
ui64 prevValue = 0;
if (firstBitIdx != 0) {
prevValue = AtomicGet(Data[firstElementIdx % DataSize]);
prevValue = Data[firstElementIdx % DataSize].load(std::memory_order_acquire);
}
const ui64 bits = ((~0ull) << firstBitIdx);
const ui64 newValue = (prevValue | bits);
AtomicSet(Data[firstElementIdx % DataSize], newValue);
Data[firstElementIdx % DataSize].store(newValue, std::memory_order_release);
++firstElementIdx;
// process the fully filled elements
const ui64 firstLoop = firstElementIdx / DataSize;
const ui64 lastLoop = lastElementIdx / DataSize;
const ui64 lastOffset = lastElementIdx % DataSize;
if (firstLoop < lastLoop) {
for (ui64 i = firstElementIdx % DataSize; i < DataSize; ++i) {
AtomicSet(Data[i], ~0ull);
Data[i].store(~0ull, std::memory_order_release);
}
for (ui64 i = 0; i < lastOffset; ++i) {
AtomicSet(Data[i], ~0ull);
Data[i].store(~0ull, std::memory_order_release);
}
} else {
for (ui64 i = firstElementIdx % DataSize; i < lastOffset; ++i) {
AtomicSet(Data[i], ~0ull);
Data[i].store(~0ull, std::memory_order_release);
}
}
// process the last element
const ui64 newValue2 = ((~0ull) >> (63-lastBitIdx));
AtomicSet(Data[lastOffset], newValue2);
Data[lastOffset].store(newValue2, std::memory_order_release);
if (ModifyLastTime) {
AtomicSet(LastTimeNs, timeNs);
LastTimeNs.store(timeNs, std::memory_order_release);
}
}

void RegisterIdlePeriod(ui64 timeNs) {
timeNs &= ~1ull;
ui64 lastTimeNs = AtomicGet(LastTimeNs);
ui64 lastTimeNs = LastTimeNs.load(std::memory_order_acquire);
if (timeNs < lastTimeNs) {
// Fast check first, slower chec later
if ((timeNs | 1ull) < lastTimeNs) {
// Time goes back, dont panic, just mark the whole array 'busy'
for (ui64 i = 0; i < DataSize; ++i) {
AtomicSet(Data[i], ~0ull);
Data[i].store(~0ull, std::memory_order_release);
}
AtomicSet(LastTimeNs, timeNs);
LastTimeNs.store(timeNs, std::memory_order_release);
return;
}
}
Expand All @@ -113,7 +103,7 @@ struct TCpuLoadLog {
ui64 firstElementIdx = curIdx / 64;
const ui64 lastElementIdx = lastIdx / 64;
if (firstElementIdx >= lastElementIdx) {
AtomicSet(LastTimeNs, timeNs);
LastTimeNs.store(timeNs, std::memory_order_release);
return;
}
// process the first partially filled element
Expand All @@ -124,35 +114,37 @@ struct TCpuLoadLog {
const ui64 lastOffset = lastElementIdx % DataSize;
if (firstLoop < lastLoop) {
for (ui64 i = firstElementIdx % DataSize; i < DataSize; ++i) {
AtomicSet(Data[i], 0);
Data[i].store(0, std::memory_order_release);
}
for (ui64 i = 0; i <= lastOffset; ++i) {
AtomicSet(Data[i], 0);
Data[i].store(0, std::memory_order_release);
}
} else {
for (ui64 i = firstElementIdx % DataSize; i <= lastOffset; ++i) {
AtomicSet(Data[i], 0);
Data[i].store(0, std::memory_order_release);
}
}
AtomicSet(LastTimeNs, timeNs);
LastTimeNs.store(timeNs, std::memory_order_release);
}
};

template <ui64 DataSize>
struct TMinusOneCpuEstimator {
class TMinusOneCpuEstimator {
static constexpr ui64 BitsSize = DataSize * 64;
ui64 BeginDelayIdx;
ui64 EndDelayIdx;
ui64 Idle;
ui64 Delay[BitsSize];
ui64 BeginDelayIdx = 0;
ui64 EndDelayIdx = 0;
ui64 Idle = 0;
ui64 Delay[BitsSize] {};
public:
TMinusOneCpuEstimator() = default;

ui64 MaxLatencyIncreaseWithOneLessCpu(TCpuLoadLog<DataSize>** logs, i64 logCount, ui64 timeNs, ui64 periodNs) {
Y_ABORT_UNLESS(logCount > 0);
ui64 endTimeNs = timeNs;

ui64 lastTimeNs = timeNs;
for (i64 log_idx = 0; log_idx < logCount; ++log_idx) {
ui64 x = AtomicGet(logs[log_idx]->LastTimeNs);
ui64 x = logs[log_idx]->LastTimeNs.load(std::memory_order_acquire);
if ((x & 1) == 1) {
lastTimeNs = Min(lastTimeNs, x);
} else {
Expand All @@ -173,10 +165,10 @@ struct TMinusOneCpuEstimator {
ui64 bucket = 0;
for (ui64 idx = beginElementIdx; idx <= lastElementIdx; ++idx) {
ui64 i = idx % DataSize;
ui64 input = AtomicGet(logs[0]->Data[i]);
ui64 input = logs[0]->Data[i].load(std::memory_order_acquire);
ui64 all_busy = ~0ull;
for (i64 log_idx = 1; log_idx < logCount; ++log_idx) {
ui64 x = AtomicGet(logs[log_idx]->Data[i]);
ui64 x = logs[log_idx]->Data[i].load(std::memory_order_acquire);
all_busy &= x;
}
if (!input) {
Expand Down Expand Up @@ -224,4 +216,3 @@ struct TMinusOneCpuEstimator {
return maxDelay * BitDurationNs;
}
};

0 comments on commit a9a47fa

Please sign in to comment.