Skip to content

Commit

Permalink
Rework FormatRead and SysLogRead, move their execution to PDisk to av…
Browse files Browse the repository at this point in the history
…oid race (ydb-platform#10009)
  • Loading branch information
va-kuznecov authored Oct 3, 2024
1 parent 2353272 commit 595af5f
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 86 deletions.
14 changes: 7 additions & 7 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
str << " Config: " << Cfg->ToString();
P_LOG(PRI_CRIT, BPD01, str.Str());
} else {
PDisk->InitiateReadSysLog(SelfId());
PDisk->InputRequest(PDisk->ReqCreator.CreateFromArgs<TReadFormat>(SelfId()));
StateErrorReason =
"PDisk is in StateInit, wait for PDisk to read sys log. Did you ckeck EvYardInit result? Marker# BSY09";
Become(&TThis::StateInit);
Expand Down Expand Up @@ -557,8 +557,8 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
// PDisk GUID is OK and format is complete
*PDisk->Mon.PDiskState = NKikimrBlobStorage::TPDiskState::InitialSysLogRead;
*PDisk->Mon.PDiskDetailedState = TPDiskMon::TPDisk::BootingSysLogRead;
PDisk->Format.InitMagic();
PDisk->ReadSysLog(SelfId());

PDisk->InputRequest(PDisk->ReqCreator.CreateFromArgs<TReadSysLog>(SelfId()));
}
}
}
Expand Down Expand Up @@ -1083,7 +1083,7 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
}

Send(ev->Sender, new TEvBlobStorage::TEvNotifyWardenPDiskRestarted(PCtx->PDiskId, NKikimrProto::EReplyStatus::NOTREADY));

return;
}

Expand All @@ -1096,7 +1096,7 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
NPDisk::TMainKey newMainKey = ev->Get()->MainKey;

SecureWipeBuffer((ui8*)ev->Get()->MainKey.Keys.data(), sizeof(NPDisk::TKey) * ev->Get()->MainKey.Keys.size());

P_LOG(PRI_NOTICE, BSP01, "Going to restart PDisk since received TEvAskWardenRestartPDiskResult");

const TActorIdentity& thisActorId = SelfId();
Expand All @@ -1109,7 +1109,7 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
TIntrusivePtr<TPDiskConfig> actorCfg = std::move(Cfg);

auto& newCfg = ev->Get()->Config;

if (newCfg) {
Y_VERIFY_S(newCfg->PDiskId == pdiskId,
"New config's PDiskId# " << newCfg->PDiskId << " is not equal to real PDiskId# " << pdiskId);
Expand All @@ -1124,7 +1124,7 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
TGenericExecutorThread& executorThread = actorCtx.ExecutorThread;

PassAway();

CreatePDiskActor(executorThread, counters, actorCfg, newMainKey, pdiskId, poolId, nodeId);

Send(ev->Sender, new TEvBlobStorage::TEvNotifyWardenPDiskRestarted(pdiskId));
Expand Down
71 changes: 0 additions & 71 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,69 +173,6 @@ void WaitForValue(TAtomic *counter, TDuration maxDuration, TAtomicBase expectedV
}
}

void RunTestMultipleRequestsFromCompletionAction() {
const TIntrusivePtr<::NMonitoring::TDynamicCounters> counters = new ::NMonitoring::TDynamicCounters;
THolder<TPDiskMon> mon(new TPDiskMon(counters, 0, nullptr));
const ui32 dataSize = 4 << 10;
const ui64 generations = 8;
TAtomic counter = 0;


TTempDir tempDir;
TString path = CreateFile(tempDir().c_str(), dataSize);

{
TActorSystemCreator creator;
THolder<NPDisk::TBufferPool> bufferPool(NPDisk::CreateBufferPool(dataSize, 1, false, {}));
NPDisk::TBuffer::TPtr alignedBuffer(bufferPool->Pop());
memset(alignedBuffer->Data(), 0, dataSize);
THolder<NPDisk::IBlockDevice> device(NPDisk::CreateRealBlockDevice(path, *mon, 0, 0, 4,
NPDisk::TDeviceMode::LockFile, 2 << generations, nullptr));
device->Initialize(std::make_shared<NPDisk::TPDiskCtx>(creator.GetActorSystem()));

(new TWriter(*device, alignedBuffer.Get(), (i32)generations, &counter))->Exec(nullptr);

TAtomicBase expectedCounter = 0;
for (ui64 i = 0; i <= generations; ++i) {
expectedCounter += 1ull << i;
}
WaitForValue(&counter, TIMEOUT, expectedCounter);

TAtomicBase resultingCounter = AtomicGet(counter);

UNIT_ASSERT_VALUES_EQUAL(
resultingCounter,
expectedCounter
);
}
Ctest << "Done" << Endl;
}

void RunTestDestructionWithMultipleFlushesFromCompletionAction() {
const TIntrusivePtr<::NMonitoring::TDynamicCounters> counters = new ::NMonitoring::TDynamicCounters;
THolder<TPDiskMon> mon(new TPDiskMon(counters, 0, nullptr));
const ui32 dataSize = 4 << 10;
const i32 generations = 8;
TAtomic counter = 0;

TTempDir tempDir;
TString path = CreateFile(tempDir().c_str(), dataSize);

TActorSystemCreator creator;
THolder<NPDisk::IBlockDevice> device(NPDisk::CreateRealBlockDevice(path, *mon, 0, 0, 4,
NPDisk::TDeviceMode::LockFile, 2 << generations, nullptr));
device->Initialize(std::make_shared<NPDisk::TPDiskCtx>(creator.GetActorSystem()));

(new TFlusher(*device, generations, &counter))->Exec(nullptr);
device->Stop();
for (int i = 0; i < 10000; ++i) {
(new TFlusher(*device, generations, &counter))->Exec(nullptr);
}
device.Destroy();

Ctest << "Done" << Endl;
}

void RunWriteTestWithSectorMap(NPDisk::NSectorMap::EDiskMode diskMode, ui32 diskSize, ui32 bufferSize, bool sequential = true) {
const TIntrusivePtr<::NMonitoring::TDynamicCounters> counters = new ::NMonitoring::TDynamicCounters;
THolder<TPDiskMon> mon(new TPDiskMon(counters, 0, nullptr));
Expand Down Expand Up @@ -264,14 +201,6 @@ void RunWriteTestWithSectorMap(NPDisk::NSectorMap::EDiskMode diskMode, ui32 disk

Y_UNIT_TEST_SUITE(TBlockDeviceTest) {

Y_UNIT_TEST(TestMultipleRequestsFromCompletionAction) {
RunTestMultipleRequestsFromCompletionAction();
}

Y_UNIT_TEST(TestDestructionWithMultipleFlushesFromCompletionAction) {
RunTestDestructionWithMultipleFlushesFromCompletionAction();
}

Y_UNIT_TEST(TestDeviceWithSubmitGetThread) {
const TIntrusivePtr<::NMonitoring::TDynamicCounters> counters = new ::NMonitoring::TDynamicCounters;
THolder<TPDiskMon> mon(new TPDiskMon(counters, 0, nullptr));
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2487,6 +2487,11 @@ void TPDisk::ProcessFastOperationsQueue() {
break;
case ERequestType::RequestContinueReadMetadata:
static_cast<TContinueReadMetadata&>(*req).Execute(PCtx->ActorSystem);
case ERequestType::RequestReadFormat:
InitiateReadFormat();
break;
case ERequestType::RequestReadSysLog:
InitiateReadSysLog();
break;
default:
Y_FAIL_S("Unexpected request type# " << (ui64)req->GetType());
Expand Down Expand Up @@ -3082,6 +3087,8 @@ bool TPDisk::PreprocessRequest(TRequestBase *request) {
case ERequestType::RequestReadMetadata:
case ERequestType::RequestWriteMetadata:
case ERequestType::RequestContinueReadMetadata:
case ERequestType::RequestReadFormat:
case ERequestType::RequestReadSysLog:
break;
case ERequestType::RequestStopDevice:
BlockDevice->Stop();
Expand Down
6 changes: 2 additions & 4 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class TPDisk : public IPDisk {
bool IsFormatMagicValid(ui8 *magicData, ui32 magicDataSize, const TMainKey& mainKey); // Called by actor
bool CheckGuid(TString *outReason); // Called by actor
bool CheckFormatComplete(); // Called by actor
void ReadSysLog(const TActorId &pDiskActor); // Called by actor
void InitiateReadSysLog();
bool ProcessChunk0(const TEvReadLogResult &readLogResult, TString& errorReason);
void PrintChunksDebugInfo();
TRcBuf ProcessReadSysLogResult(ui64 &outWritePosition, ui64 &outLsn, const TEvReadLogResult &readLogResult);
Expand Down Expand Up @@ -394,9 +394,8 @@ class TPDisk : public IPDisk {
// Internal interface

// Schedules EvReadLogResult event for the system log
void ResetInit();
bool Initialize(); // Called by actor
void InitiateReadSysLog(const TActorId &pDiskActor); // Called by actor
void InitiateReadFormat();
void ProcessReadLogResult(const TEvReadLogResult &evReadLogResult, const TActorId &pDiskActor);

NKikimrProto::EReplyStatus ValidateRequest(TLogWrite *logWrite, TStringStream& outErrorReason);
Expand Down Expand Up @@ -432,4 +431,3 @@ bool ParseSectorOffset(const TDiskFormat& format, TActorSystem *actorSystem, ui3

} // NPDisk
} // NKikimr

10 changes: 6 additions & 4 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,10 @@ void TPDisk::GetStartingPoints(NPDisk::TOwner owner, TMap<TLogSignature, NPDisk:
}
}

void TPDisk::ReadSysLog(const TActorId &pDiskActor) {
TIntrusivePtr<TSysLogReader> sysLogReader(new TSysLogReader(this, PCtx->ActorSystem, pDiskActor,
void TPDisk::InitiateReadSysLog() {
Format.InitMagic();

TIntrusivePtr<TSysLogReader> sysLogReader(new TSysLogReader(this, PCtx->ActorSystem, PCtx->PDiskActor,
TReqId(TReqId::ReadSysLog, 0)));
sysLogReader->Start();
return;
Expand Down Expand Up @@ -1310,15 +1312,15 @@ void TPDisk::MarkChunksAsReleased(TReleaseChunks& req) {
}

// Schedules EvReadLogResult event for the system log
void TPDisk::InitiateReadSysLog(const TActorId &pDiskActor) {
void TPDisk::InitiateReadFormat() {
Y_VERIFY_S(PDiskThread.Running(), "expect PDiskThread to be running");
Y_VERIFY_S(InitPhase == EInitPhase::Uninitialized, "expect InitPhase to be Uninitialized, but InitPhase# "
<< InitPhase);
ui32 formatSectorsSize = FormatSectorSize * ReplicationFactor;
THolder<TEvReadFormatResult> evReadFormatResult(new TEvReadFormatResult(formatSectorsSize, UseHugePages));
ui8 *formatSectors = evReadFormatResult->FormatSectors.Get();
BlockDevice->PreadAsync(formatSectors, formatSectorsSize, 0,
new TCompletionEventSender(this, pDiskActor, evReadFormatResult.Release()), TReqId(TReqId::InitialFormatRead, 0), {});
new TCompletionEventSender(this, PCtx->PDiskActor, evReadFormatResult.Release()), TReqId(TReqId::InitialFormatRead, 0), {});
*Mon.PDiskState = NKikimrBlobStorage::TPDiskState::InitialFormatRead;
*Mon.PDiskDetailedState = TPDiskMon::TPDisk::BootingFormatRead;
InitPhase = EInitPhase::ReadingSysLog;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ enum class ERequestType {
RequestWriteMetadataResult,
RequestPushUnformattedMetadataSector,
RequestContinueReadMetadata,
RequestReadFormat,
RequestReadSysLog,
};

inline IOutputStream& operator <<(IOutputStream& out, const TReqId& reqId) {
Expand Down
22 changes: 22 additions & 0 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,28 @@ class TRequestBase : public TThrRefBase {
static void AbortDelete(TRequestBase* request, TActorSystem* actorSystem);
};

class TReadFormat : public TRequestBase {
public:
TReadFormat(TActorId pdiskActor, TAtomicBase reqIdx)
: TRequestBase(pdiskActor, TReqId(TReqId::ReadFormatInfo, reqIdx), 0, 0, NPriInternal::Other)
{}

ERequestType GetType() const override {
return ERequestType::RequestReadFormat;
}
};

class TReadSysLog : public TRequestBase {
public:
TReadSysLog(TActorId pdiskActor, TAtomicBase reqIdx)
: TRequestBase(pdiskActor, TReqId(TReqId::ReadSysLog, reqIdx), 0, 0, NPriInternal::Other)
{}

ERequestType GetType() const override {
return ERequestType::RequestReadSysLog;
}
};

//
// TYardInit
//
Expand Down

0 comments on commit 595af5f

Please sign in to comment.