Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ActorSystem cpu load log use std atomic #646

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 35 additions & 44 deletions ydb/library/actors/util/cpu_load_log.h
Original file line number Diff line number Diff line change
@@ -1,44 +1,34 @@
#pragma once

#include "defs.h"
#include <library/cpp/deprecated/atomic/atomic.h>
#include <library/cpp/pop_count/popcount.h>

static constexpr ui64 BitDurationNs = 131'072; // A power of 2

template <ui64 DataSize>
struct TCpuLoadLog {
static constexpr ui64 BitsSize = DataSize * 64;
TAtomic LastTimeNs = 0;
ui64 Data[DataSize];
std::atomic<ui64> LastTimeNs = 0;
std::atomic<ui64> Data[DataSize] {};

TCpuLoadLog() {
LastTimeNs = 0;
for (size_t i = 0; i < DataSize; ++i) {
Data[i] = 0;
}
}
TCpuLoadLog() = default;

TCpuLoadLog(ui64 timeNs) {
LastTimeNs = timeNs;
for (size_t i = 0; i < DataSize; ++i) {
Data[i] = 0;
}
TCpuLoadLog(ui64 timeNs) : LastTimeNs(timeNs) {
}

void RegisterBusyPeriod(ui64 timeNs) {
RegisterBusyPeriod<true>(timeNs, AtomicGet(LastTimeNs));
RegisterBusyPeriod<true>(timeNs, LastTimeNs.load(std::memory_order_acquire));
}

template <bool ModifyLastTime>
void RegisterBusyPeriod(ui64 timeNs, ui64 lastTimeNs) {
timeNs |= 1ull;
if (timeNs < lastTimeNs) {
for (ui64 i = 0; i < DataSize; ++i) {
AtomicSet(Data[i], ~0ull);
Data[i].store(~0ull, std::memory_order_release);
}
if (ModifyLastTime) {
AtomicSet(LastTimeNs, timeNs);
LastTimeNs.store(timeNs, std::memory_order_release);
}
return;
}
Expand All @@ -51,60 +41,60 @@ struct TCpuLoadLog {
if (firstElementIdx == lastElementIdx) {
ui64 prevValue = 0;
if (firstBitIdx != 0) {
prevValue = AtomicGet(Data[firstElementIdx % DataSize]);
prevValue = Data[firstElementIdx % DataSize].load(std::memory_order_acquire);
}
const ui64 bits = (((~0ull) << (firstBitIdx + (63-lastBitIdx))) >> (63-lastBitIdx));
const ui64 newValue = prevValue | bits;
AtomicSet(Data[firstElementIdx % DataSize], newValue);
Data[firstElementIdx % DataSize].store(newValue, std::memory_order_release);
if (ModifyLastTime) {
AtomicSet(LastTimeNs, timeNs);
LastTimeNs.store(timeNs, std::memory_order_release);
}
return;
}
// process the first element
ui64 prevValue = 0;
if (firstBitIdx != 0) {
prevValue = AtomicGet(Data[firstElementIdx % DataSize]);
prevValue = Data[firstElementIdx % DataSize].load(std::memory_order_acquire);
}
const ui64 bits = ((~0ull) << firstBitIdx);
const ui64 newValue = (prevValue | bits);
AtomicSet(Data[firstElementIdx % DataSize], newValue);
Data[firstElementIdx % DataSize].store(newValue, std::memory_order_release);
++firstElementIdx;
// process the fully filled elements
const ui64 firstLoop = firstElementIdx / DataSize;
const ui64 lastLoop = lastElementIdx / DataSize;
const ui64 lastOffset = lastElementIdx % DataSize;
if (firstLoop < lastLoop) {
for (ui64 i = firstElementIdx % DataSize; i < DataSize; ++i) {
AtomicSet(Data[i], ~0ull);
Data[i].store(~0ull, std::memory_order_release);
}
for (ui64 i = 0; i < lastOffset; ++i) {
AtomicSet(Data[i], ~0ull);
Data[i].store(~0ull, std::memory_order_release);
}
} else {
for (ui64 i = firstElementIdx % DataSize; i < lastOffset; ++i) {
AtomicSet(Data[i], ~0ull);
Data[i].store(~0ull, std::memory_order_release);
}
}
// process the last element
const ui64 newValue2 = ((~0ull) >> (63-lastBitIdx));
AtomicSet(Data[lastOffset], newValue2);
Data[lastOffset].store(newValue2, std::memory_order_release);
if (ModifyLastTime) {
AtomicSet(LastTimeNs, timeNs);
LastTimeNs.store(timeNs, std::memory_order_release);
}
}

void RegisterIdlePeriod(ui64 timeNs) {
timeNs &= ~1ull;
ui64 lastTimeNs = AtomicGet(LastTimeNs);
ui64 lastTimeNs = LastTimeNs.load(std::memory_order_acquire);
if (timeNs < lastTimeNs) {
// Fast check first, slower chec later
if ((timeNs | 1ull) < lastTimeNs) {
// Time goes back, dont panic, just mark the whole array 'busy'
for (ui64 i = 0; i < DataSize; ++i) {
AtomicSet(Data[i], ~0ull);
Data[i].store(~0ull, std::memory_order_release);
}
AtomicSet(LastTimeNs, timeNs);
LastTimeNs.store(timeNs, std::memory_order_release);
return;
}
}
Expand All @@ -113,7 +103,7 @@ struct TCpuLoadLog {
ui64 firstElementIdx = curIdx / 64;
const ui64 lastElementIdx = lastIdx / 64;
if (firstElementIdx >= lastElementIdx) {
AtomicSet(LastTimeNs, timeNs);
LastTimeNs.store(timeNs, std::memory_order_release);
return;
}
// process the first partially filled element
Expand All @@ -124,35 +114,37 @@ struct TCpuLoadLog {
const ui64 lastOffset = lastElementIdx % DataSize;
if (firstLoop < lastLoop) {
for (ui64 i = firstElementIdx % DataSize; i < DataSize; ++i) {
AtomicSet(Data[i], 0);
Data[i].store(0, std::memory_order_release);
}
for (ui64 i = 0; i <= lastOffset; ++i) {
AtomicSet(Data[i], 0);
Data[i].store(0, std::memory_order_release);
}
} else {
for (ui64 i = firstElementIdx % DataSize; i <= lastOffset; ++i) {
AtomicSet(Data[i], 0);
Data[i].store(0, std::memory_order_release);
}
}
AtomicSet(LastTimeNs, timeNs);
LastTimeNs.store(timeNs, std::memory_order_release);
}
};

template <ui64 DataSize>
struct TMinusOneCpuEstimator {
class TMinusOneCpuEstimator {
static constexpr ui64 BitsSize = DataSize * 64;
ui64 BeginDelayIdx;
ui64 EndDelayIdx;
ui64 Idle;
ui64 Delay[BitsSize];
ui64 BeginDelayIdx = 0;
ui64 EndDelayIdx = 0;
ui64 Idle = 0;
ui64 Delay[BitsSize] {};
public:
TMinusOneCpuEstimator() = default;

ui64 MaxLatencyIncreaseWithOneLessCpu(TCpuLoadLog<DataSize>** logs, i64 logCount, ui64 timeNs, ui64 periodNs) {
Y_ABORT_UNLESS(logCount > 0);
ui64 endTimeNs = timeNs;

ui64 lastTimeNs = timeNs;
for (i64 log_idx = 0; log_idx < logCount; ++log_idx) {
ui64 x = AtomicGet(logs[log_idx]->LastTimeNs);
ui64 x = logs[log_idx]->LastTimeNs.load(std::memory_order_acquire);
if ((x & 1) == 1) {
lastTimeNs = Min(lastTimeNs, x);
} else {
Expand All @@ -173,10 +165,10 @@ struct TMinusOneCpuEstimator {
ui64 bucket = 0;
for (ui64 idx = beginElementIdx; idx <= lastElementIdx; ++idx) {
ui64 i = idx % DataSize;
ui64 input = AtomicGet(logs[0]->Data[i]);
ui64 input = logs[0]->Data[i].load(std::memory_order_acquire);
ui64 all_busy = ~0ull;
for (i64 log_idx = 1; log_idx < logCount; ++log_idx) {
ui64 x = AtomicGet(logs[log_idx]->Data[i]);
ui64 x = logs[log_idx]->Data[i].load(std::memory_order_acquire);
all_busy &= x;
}
if (!input) {
Expand Down Expand Up @@ -224,4 +216,3 @@ struct TMinusOneCpuEstimator {
return maxDelay * BitDurationNs;
}
};

Loading