Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce local SyncLog data cutter #4124

Merged
merged 12 commits into from
May 22, 2024
11 changes: 9 additions & 2 deletions ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,15 @@ void TNodeWarden::Bootstrap() {
DsProxyPerPoolCounters = new TDsProxyPerPoolCounters(AppData()->Counters);

if (actorSystem && actorSystem->AppData<TAppData>() && actorSystem->AppData<TAppData>()->Icb) {
actorSystem->AppData<TAppData>()->Icb->RegisterLocalControl(EnablePutBatching, "BlobStorage_EnablePutBatching");
actorSystem->AppData<TAppData>()->Icb->RegisterLocalControl(EnableVPatch, "BlobStorage_EnableVPatch");
const TIntrusivePtr<NKikimr::TControlBoard>& icb = actorSystem->AppData<TAppData>()->Icb;

icb->RegisterLocalControl(EnablePutBatching, "BlobStorage_EnablePutBatching");
icb->RegisterLocalControl(EnableVPatch, "BlobStorage_EnableVPatch");
icb->RegisterSharedControl(EnableLocalSyncLogDataCutting, "VDiskControls.EnableLocalSyncLogDataCutting");
icb->RegisterSharedControl(EnableSyncLogChunkCompressionHDD, "VDiskControls.EnableSyncLogChunkCompressionHDD");
icb->RegisterSharedControl(EnableSyncLogChunkCompressionSSD, "VDiskControls.EnableSyncLogChunkCompressionSSD");
icb->RegisterSharedControl(MaxSyncLogChunksInFlightHDD, "VDiskControls.MaxSyncLogChunksInFlightHDD");
icb->RegisterSharedControl(MaxSyncLogChunksInFlightSSD, "VDiskControls.MaxSyncLogChunksInFlightSSD");
}

// start replication broker
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/blobstorage/nodewarden/node_warden_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ namespace NKikimr::NStorage {
TControlWrapper EnablePutBatching;
TControlWrapper EnableVPatch;

TControlWrapper EnableLocalSyncLogDataCutting;
TControlWrapper EnableSyncLogChunkCompressionHDD;
TControlWrapper EnableSyncLogChunkCompressionSSD;
TControlWrapper MaxSyncLogChunksInFlightHDD;
TControlWrapper MaxSyncLogChunksInFlightSSD;

TReplQuoter::TPtr ReplNodeRequestQuoter;
TReplQuoter::TPtr ReplNodeResponseQuoter;

Expand All @@ -148,6 +154,11 @@ namespace NKikimr::NStorage {
: Cfg(cfg)
, EnablePutBatching(Cfg->FeatureFlags.GetEnablePutBatchingForBlobStorage(), false, true)
, EnableVPatch(Cfg->FeatureFlags.GetEnableVPatch(), false, true)
, EnableLocalSyncLogDataCutting(0, 0, 1)
, EnableSyncLogChunkCompressionHDD(1, 0, 1)
, EnableSyncLogChunkCompressionSSD(0, 0, 1)
, MaxSyncLogChunksInFlightHDD(10, 1, 1024)
, MaxSyncLogChunksInFlightSSD(10, 1, 1024)
{
Y_ABORT_UNLESS(Cfg->BlobStorageConfig.GetServiceSet().AvailabilityDomainsSize() <= 1);
AvailDomainId = 1;
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,16 @@ namespace NKikimr::NStorage {
vdiskConfig->EnableVDiskCooldownTimeout = Cfg->EnableVDiskCooldownTimeout;
vdiskConfig->ReplPausedAtStart = Cfg->VDiskReplPausedAtStart;
vdiskConfig->EnableVPatch = EnableVPatch;

vdiskConfig->EnableLocalSyncLogDataCutting = EnableLocalSyncLogDataCutting;
if (deviceType == NPDisk::EDeviceType::DEVICE_TYPE_ROT) {
vdiskConfig->EnableSyncLogChunkCompression = EnableSyncLogChunkCompressionHDD;
vdiskConfig->MaxSyncLogChunksInFlight = MaxSyncLogChunksInFlightHDD;
} else {
vdiskConfig->EnableSyncLogChunkCompression = EnableSyncLogChunkCompressionSSD;
vdiskConfig->MaxSyncLogChunksInFlight = MaxSyncLogChunksInFlightSSD;
}

vdiskConfig->FeatureFlags = Cfg->FeatureFlags;

if (Cfg->BlobStorageConfig.HasCostMetricsSettings()) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ struct TActorTestContext {
appData->IoContextFactory = IoContext.get();

Runtime->SetLogBackend(IsLowVerbose ? CreateStderrBackend() : CreateNullBackend());
Runtime->Initialize(TTestActorRuntime::TEgg{appData.Release(), nullptr, {}});
Runtime->Initialize(TTestActorRuntime::TEgg{appData.Release(), nullptr, {}, {}});
Runtime->SetLogPriority(NKikimrServices::BS_PDISK, NLog::PRI_NOTICE);
Runtime->SetLogPriority(NKikimrServices::BS_PDISK_SYSLOG, NLog::PRI_NOTICE);
Runtime->SetLogPriority(NKikimrServices::BS_PDISK_TEST, NLog::PRI_DEBUG);
Expand Down
101 changes: 101 additions & 0 deletions ydb/core/blobstorage/ut_blobstorage/sync.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,108 @@
#include <ydb/core/blobstorage/ut_blobstorage/ut_helpers.h>
#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>
#include <ydb/core/blobstorage/vdisk/common/vdisk_private_events.h>
#include <util/random/random.h>

Y_UNIT_TEST_SUITE(BlobStorageSync) {

void TestCutting(TBlobStorageGroupType groupType) {
const ui32 groupSize = groupType.BlobSubgroupSize();

// for (ui32 mask = 0; mask < (1 << groupSize); ++mask) { // TIMEOUT
{
ui32 mask = RandomNumber(1ull << groupSize);
for (bool compressChunks : { true, false }) {
TEnvironmentSetup env{{
.NodeCount = groupSize,
.Erasure = groupType,
}};

env.CreateBoxAndPool(1, 1);
std::vector<ui32> groups = env.GetGroups();
UNIT_ASSERT_VALUES_EQUAL(groups.size(), 1);
ui32 groupId = groups[0];

const ui64 tabletId = 5000;
const ui32 channel = 10;
ui32 gen = 1;
ui32 step = 1;
ui64 cookie = 1;

ui64 totalSize = 0;

std::vector<TControlWrapper> cutLocalSyncLogControls;
std::vector<TControlWrapper> compressChunksControls;
std::vector<TActorId> edges;

for (ui32 nodeId = 1; nodeId <= groupSize; ++nodeId) {
cutLocalSyncLogControls.emplace_back(0, 0, 1);
compressChunksControls.emplace_back(1, 0, 1);
TAppData* appData = env.Runtime->GetNode(nodeId)->AppData.get();
appData->Icb->RegisterSharedControl(cutLocalSyncLogControls.back(), "VDiskControls.EnableLocalSyncLogDataCutting");
appData->Icb->RegisterSharedControl(compressChunksControls.back(), "VDiskControls.EnableSyncLogChunkCompressionHDD");
edges.push_back(env.Runtime->AllocateEdgeActor(nodeId));
}

for (ui32 i = 0; i < groupSize; ++i) {
env.Runtime->WrapInActorContext(edges[i], [&] {
SendToBSProxy(edges[i], groupId, new TEvBlobStorage::TEvStatus(TInstant::Max()));
});
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvStatusResult>(edges[i], false);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
}

auto writeBlob = [&](ui32 nodeId, ui32 blobSize) {
TLogoBlobID blobId(tabletId, gen, step, channel, blobSize, ++cookie);
totalSize += blobSize;
TString data = MakeData(blobSize);

const TActorId& sender = edges[nodeId - 1];
env.Runtime->WrapInActorContext(sender, [&] () {
SendToBSProxy(sender, groupId, new TEvBlobStorage::TEvPut(blobId, std::move(data), TInstant::Max()));
});
};

env.Runtime->FilterFunction = [&](ui32/* nodeId*/, std::unique_ptr<IEventHandle>& ev) {
switch(ev->Type) {
case TEvBlobStorage::TEvPutResult::EventType:
UNIT_ASSERT_VALUES_EQUAL(ev->Get<TEvBlobStorage::TEvPutResult>()->Status, NKikimrProto::OK);
return false;
default:
return true;
}
};

while (totalSize < 16_MB) {
writeBlob(GenerateRandom(1, groupSize + 1), GenerateRandom(1, 1_MB));
}
env.Sim(TDuration::Minutes(5));

for (ui32 i = 0; i < groupSize; ++i) {
cutLocalSyncLogControls[i] = !!(mask & (1 << i));
compressChunksControls[i] = compressChunks;
}

while (totalSize < 32_MB) {
writeBlob(GenerateRandom(1, groupSize + 1), GenerateRandom(1, 1_MB));
}

env.Sim(TDuration::Minutes(5));
}
}
}

Y_UNIT_TEST(TestSyncLogCuttingMirror3dc) {
TestCutting(TBlobStorageGroupType::ErasureMirror3dc);
}

Y_UNIT_TEST(TestSyncLogCuttingMirror3of4) {
TestCutting(TBlobStorageGroupType::ErasureMirror3of4);
}

Y_UNIT_TEST(TestSyncLogCuttingBlock4Plus2) {
TestCutting(TBlobStorageGroupType::Erasure4Plus2Block);
}

Y_UNIT_TEST(SyncWhenDiskGetsDown) {
return; // re-enable when protocol issue is resolved

Expand Down
5 changes: 5 additions & 0 deletions ydb/core/blobstorage/ut_blobstorage/ut_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ namespace NKikimr {

TString MakeData(ui32 dataSize);

template<typename Int1 = ui32, typename Int2 = ui32>
inline Int1 GenerateRandom(Int1 min, Int2 max) {
return min + RandomNumber(max - min);
}

class TInflightActor : public TActorBootstrapped<TInflightActor> {
public:
struct TSettings {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/blobstorage/vdisk/common/vdisk_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ namespace NKikimr {
SyncLogAdvisedIndexedBlockSize = ui32(1) << ui32(20); // 1 MB
SyncLogMaxMemAmount = ui64(64) << ui64(20); // 64 MB

MaxSyncLogChunkSize = ui32(16) << ui32(10); // 32 Kb

ReplTimeInterval = TDuration::Seconds(60); // 60 seconds
ReplRequestTimeout = TDuration::Seconds(10); // 10 seconds
ReplPlanQuantum = TDuration::MilliSeconds(100); // 100 ms
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/blobstorage/vdisk/common/vdisk_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ namespace NKikimr {
ui32 SyncLogAdvisedIndexedBlockSize;
ui64 SyncLogMaxMemAmount;

TControlWrapper EnableLocalSyncLogDataCutting;
TControlWrapper EnableSyncLogChunkCompression;
TControlWrapper MaxSyncLogChunksInFlight;
ui32 MaxSyncLogChunkSize;

///////////// REPL SETTINGS /////////////////////////
TDuration ReplTimeInterval;
TDuration ReplRequestTimeout;
Expand Down
13 changes: 6 additions & 7 deletions ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -513,14 +513,11 @@ namespace NKikimr {

///////////////// SYNC //////////////////////////////////////////////////////
TLsnSeg THull::AllocateLsnForSyncDataCmd(const TString &data) {
// count number of elements
ui32 counter = 0;
auto count = [&counter] (const void *) { counter++; };
// do job - count all elements
NSyncLog::TFragmentReader(data).ForEach(count, count, count, count);
NSyncLog::TFragmentReader fragment(data);

// allocate LsnSeg; we reserve a diapason of lsns since we put multiple records
ui64 lsnAdvance = counter;
std::vector<const NSyncLog::TRecordHdr*> records = fragment.ListRecords();
ui64 lsnAdvance = records.size();
Y_ABORT_UNLESS(lsnAdvance > 0);
auto seg = Fields->LsnMngr->AllocLsnForHull(lsnAdvance);

Expand All @@ -536,7 +533,9 @@ namespace NKikimr {
curLsn++;
};
// do job - update blocks cache
NSyncLog::TFragmentReader(data).ForEach(otherHandler, blockHandler, otherHandler, blockHandlerV2);
for (const NSyncLog::TRecordHdr* rec : records) {
NSyncLog::HandleRecordHdr(rec, otherHandler, blockHandler, otherHandler, blockHandlerV2);
}
// check that all records are applied
Y_DEBUG_ABORT_UNLESS(curLsn == seg.Last + 1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,9 @@ namespace NKikimr {

void ApplySyncDataByRecord(const TActorContext &ctx, ui64 recordLsn) {
// count number of records
ui64 recsNum = 0;
auto count = [&recsNum] (const void *) { recsNum++; };
NSyncLog::TFragmentReader fragment(LocalSyncDataMsg.Data);
fragment.ForEach(count, count, count, count);
std::vector<const NSyncLog::TRecordHdr*> records = fragment.ListRecords();
ui64 recsNum = records.size();

// calculate lsn
Y_DEBUG_ABORT_UNLESS(recordLsn >= recsNum, "recordLsn# %" PRIu64 " recsNum# %" PRIu64,
Expand Down Expand Up @@ -465,7 +464,9 @@ namespace NKikimr {
};

// apply local sync data
fragment.ForEach(blobHandler, blockHandler, barrierHandler, blockHandlerV2);
for (const NSyncLog::TRecordHdr* rec : records) {
NSyncLog::HandleRecordHdr(rec, blobHandler, blockHandler, barrierHandler, blockHandlerV2);
}
}

void PutLogoBlobsBatchToHull(
Expand Down
114 changes: 114 additions & 0 deletions ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "blobstorage_syncer_localwriter.h"
#include <ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgreader.h>
#include <ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgwriter.h>

namespace NKikimr {

Expand Down Expand Up @@ -186,5 +187,118 @@ namespace NKikimr {
return new TLocalSyncDataExtractorActor(vctx, skeletonId, parentId, std::move(ev));
}

///////////////////////////////////////////////////////////////////////////////////////////////
// TLocalSyncDataCutterActor -- actor extracts data from TEvLocalSyncData, cuts it into
// smaller chunks and sends in multiple messages to Skeleton
///////////////////////////////////////////////////////////////////////////////////////////////
class TLocalSyncDataCutterActor : public TActorBootstrapped<TLocalSyncDataCutterActor> {
TIntrusivePtr<TVDiskConfig> VConfig;
TIntrusivePtr<TVDiskContext> VCtx;
TActorId SkeletonId;
TActorId ParentId;
std::unique_ptr<TEvLocalSyncData> Ev;
std::vector<TString> Chunks;

ui32 ChunksInFlight = 0;
bool CompressChunks;
ui32 MaxChunksInFlight;
ui32 MaxChunksSize;

public:
void Bootstrap(const TActorContext&) {
THPTimer timer;
std::unique_ptr<NSyncLog::TNaiveFragmentWriter> fragmentWriter;

if (CompressChunks) {
fragmentWriter.reset(new NSyncLog::TLz4FragmentWriter);
} else {
fragmentWriter.reset(new NSyncLog::TNaiveFragmentWriter);
}

auto addChunk = [&]() {
if (fragmentWriter->GetSize()) {
TString chunk;
fragmentWriter->Finish(&chunk);
Chunks.emplace_back(std::move(chunk));
fragmentWriter->Clear();
}
};

NSyncLog::TFragmentReader fragmentReader(Ev->Data);
std::vector<const NSyncLog::TRecordHdr*> records = fragmentReader.ListRecords();
for (const NSyncLog::TRecordHdr* rec : records) {
if (fragmentWriter->GetSize() + rec->GetSize() > MaxChunksSize) {
addChunk();
}
fragmentWriter->Push(rec, rec->GetSize());
}
addChunk();

LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_SYNCER, VCtx->VDiskLogPrefix
<< "TLocalSyncDataCutterActor: VDiskId# " << Ev->VDiskID.ToString()
<< " dataSize# " << Ev->Data.size()
<< " duration# " << TDuration::Seconds(timer.Passed()));

Become(&TThis::StateFunc);
SendChunks();
}

void Finish(const NKikimrProto::EReplyStatus& status) {
Send(ParentId, new TEvLocalSyncDataResult(status, TAppData::TimeProvider->Now(), nullptr, nullptr));
PassAway();
}

void Handle(const TEvLocalSyncDataResult::TPtr& ev) {
if (ev->Get()->Status == NKikimrProto::OK) {
--ChunksInFlight;
if (Chunks.empty() && ChunksInFlight == 0) {
Finish(NKikimrProto::OK);
} else {
SendChunks();
}
} else {
Finish(ev->Get()->Status);
}
}

void SendChunks() {
while (ChunksInFlight < MaxChunksInFlight && !Chunks.empty()) {
Send(SkeletonId, new TEvLocalSyncData(Ev->VDiskID, Ev->SyncState, std::move(Chunks.back())));
Chunks.pop_back();
++ChunksInFlight;
}
}

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::VDISK_LOCALSYNCDATA_CUTTER;
}

TLocalSyncDataCutterActor(
const TIntrusivePtr<TVDiskConfig>& vconfig,
const TIntrusivePtr<TVDiskContext>& vctx,
const TActorId& skeletonId,
const TActorId& parentId,
std::unique_ptr<TEvLocalSyncData> ev)
: VCtx(vctx)
, SkeletonId(skeletonId)
, ParentId(parentId)
, Ev(std::move(ev))
, CompressChunks(vconfig->MaxSyncLogChunksInFlight)
, MaxChunksInFlight(vconfig->MaxSyncLogChunksInFlight)
, MaxChunksSize(vconfig->MaxSyncLogChunkSize)
{}

STRICT_STFUNC(StateFunc, {
hFunc(TEvLocalSyncDataResult, Handle);
})

};

IActor* CreateLocalSyncDataCutter(const TIntrusivePtr<TVDiskConfig>& vconfig, const TIntrusivePtr<TVDiskContext>& vctx,
const TActorId& skeletonId, const TActorId& parentId, std::unique_ptr<TEvLocalSyncData> ev) {
return new TLocalSyncDataCutterActor(vconfig, vctx, skeletonId, parentId, std::move(ev));
}


} // NKikimr
Loading
Loading