Skip to content

Commit

Permalink
Add new UT in PDisk for TEvChunkWrite contract (#12298)
Browse files Browse the repository at this point in the history
  • Loading branch information
va-kuznecov authored Dec 6, 2024
1 parent f3b158a commit 120f8d0
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 60 deletions.
41 changes: 10 additions & 31 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk.h
Original file line number Diff line number Diff line change
Expand Up @@ -976,46 +976,20 @@ struct TEvChunkWrite : public TEventLocal<TEvChunkWrite, TEvBlobStorage::EvChunk
class TBufBackedUpParts : public IParts {
public:
TBufBackedUpParts(TTrackableBuffer &&buf)
: Buffers({std::move(buf)})
: Buffer(std::move(buf))
{}

virtual TDataRef operator[] (ui32 i) const override {
Y_DEBUG_ABORT_UNLESS(i < Buffers.size());
return TDataRef(Buffers[i].Data(), Buffers[i].Size());
}

virtual ui32 Size() const override {
return Buffers.size();
}

void AppendBuffer(TTrackableBuffer&& buffer) {
Buffers.push_back(std::move(buffer));
}

private:
TVector<TTrackableBuffer> Buffers;
};

///////////////////// TStrokaBackedUpParts //////////////////////////////
class TStrokaBackedUpParts : public IParts {
public:
TStrokaBackedUpParts(TString &buf)
: Buf()
{
Buf.swap(buf);
}

virtual TDataRef operator[] (ui32 i) const override {
Y_DEBUG_ABORT_UNLESS(i == 0);
return TDataRef(Buf.data(), (ui32)Buf.size());
return TDataRef(Buffer.Data(), Buffer.Size());
}

virtual ui32 Size() const override {
return 1;
}

private:
TString Buf;
TTrackableBuffer Buffer;
};

///////////////////// TAlignedParts //////////////////////////////
Expand All @@ -1024,6 +998,11 @@ struct TEvChunkWrite : public TEventLocal<TEvChunkWrite, TEvBlobStorage::EvChunk
size_t FullSize;

public:
TAlignedParts(TString&& data)
: Data(std::move(data))
, FullSize(Data.size())
{}

TAlignedParts(TString&& data, size_t fullSize)
: Data(std::move(data))
, FullSize(fullSize)
Expand All @@ -1046,7 +1025,7 @@ struct TEvChunkWrite : public TEventLocal<TEvChunkWrite, TEvBlobStorage::EvChunk
}
};

///////////////////// TAlignedParts //////////////////////////////
///////////////////// TRopeAlignedParts //////////////////////////////
class TRopeAlignedParts : public IParts {
TRope Data; // we shall keep the rope here to prevent it from being freed
TVector<TDataRef> Refs;
Expand Down Expand Up @@ -1567,7 +1546,7 @@ struct TEvWriteMetadataResult : TEventLocal<TEvWriteMetadataResult, TEvBlobStora
struct TPDiskCtx {
TActorSystem * const ActorSystem = nullptr;
const ui32 PDiskId = 0;
const TActorId PDiskActor;
const TActorId PDiskActor;
// TPDiskMon * const Mon = nullptr; TODO implement it

TPDiskCtx() = default;
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2864,6 +2864,13 @@ bool TPDisk::PreprocessRequest(TRequestBase *request) {
delete request;
return false;
}
if (ev.Offset % GetChunkAppendBlockSize() != 0) {
err << Sprintf("Can't write chunkIdx# %" PRIu32 " with not aligned offset# %" PRIu32 " ownerId# %"
PRIu32, ev.ChunkIdx, ev.Offset, (ui32)ev.Owner);
SendChunkWriteError(ev, err.Str(), NKikimrProto::ERROR);
delete request;
return false;
}
if (ev.ChunkIdx > ChunkState.size()) {
err << Sprintf("Can't write chunk: chunkIdx# %" PRIu32
" is too large (total# %" PRIu32 ") ownerId# %" PRIu32,
Expand Down
124 changes: 112 additions & 12 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,10 +448,9 @@ Y_UNIT_TEST_SUITE(TPDiskTest) {
UNIT_ASSERT(vdisk.Chunks[EChunkState::COMMITTED].size() == 1);
const ui32 reservedChunk = *vdisk.Chunks[EChunkState::COMMITTED].begin();

TString chunkWriteData = PrepareData(1024);
testCtx.TestResponse<NPDisk::TEvChunkWriteResult>(
new NPDisk::TEvChunkWrite(vdisk.PDiskParams->Owner, vdisk.PDiskParams->OwnerRound,
reservedChunk, 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(chunkWriteData), nullptr, false, 0),
reservedChunk, 0, new NPDisk::TEvChunkWrite::TAlignedParts(PrepareData(1024)), nullptr, false, 0),
NKikimrProto::OK);

bool printed = false;
Expand Down Expand Up @@ -493,9 +492,8 @@ Y_UNIT_TEST_SUITE(TPDiskTest) {
ui32 errors = 0;
bool printed = false;
for (ui32 i = 0; i < 10'000; ++i) {
TString data = PrepareData(1024);
testCtx.Send(new NPDisk::TEvChunkWrite(evInitRes->PDiskParams->Owner, evInitRes->PDiskParams->OwnerRound,
reservedChunk, 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(data), nullptr, false, 0));
reservedChunk, 0, new NPDisk::TEvChunkWrite::TAlignedParts(PrepareData(1024)), nullptr, false, 0));

const auto res = testCtx.Recv<NPDisk::TEvChunkWriteResult>();
//Ctest << res->ToString() << Endl;
Expand Down Expand Up @@ -796,9 +794,8 @@ Y_UNIT_TEST_SUITE(TPDiskTest) {
UNIT_ASSERT_VALUES_EQUAL(evReadRes->Data.ToString(), data);
};

TString dataCopy = data;
testCtx.TestResponse<NPDisk::TEvChunkWriteResult>(new NPDisk::TEvChunkWrite(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound,
chunk, 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(dataCopy), nullptr, false, 0),
chunk, 0, new NPDisk::TEvChunkWrite::TAlignedParts(TString(data)), nullptr, false, 0),
NKikimrProto::OK);
mock.CommitReservedChunks();

Expand Down Expand Up @@ -842,9 +839,8 @@ Y_UNIT_TEST_SUITE(TPDiskTest) {
mock.ReserveChunk();
const ui32 chunk = *mock.Chunks[EChunkState::RESERVED].begin();

TString dataCopy = data;
testCtx.TestResponse<NPDisk::TEvChunkWriteResult>(new NPDisk::TEvChunkWrite(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound,
chunk, 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(dataCopy), nullptr, false, 0),
chunk, 0, new NPDisk::TEvChunkWrite::TAlignedParts(TString(data)), nullptr, false, 0),
NKikimrProto::OK);
mock.CommitReservedChunks();
testCtx.TestResponse<NPDisk::TEvCheckSpaceResult>(
Expand Down Expand Up @@ -954,8 +950,7 @@ Y_UNIT_TEST_SUITE(TPDiskTest) {
auto chunk1 = *vdisk1.Chunks[EChunkState::COMMITTED].begin();
auto chunk2 = *vdisk2.Chunks[EChunkState::COMMITTED].begin();

TString data(123, '0');
auto parts = MakeIntrusive<NPDisk::TEvChunkWrite::TStrokaBackedUpParts>(data);
auto parts = MakeIntrusive<NPDisk::TEvChunkWrite::TAlignedParts>(TString(123, '0'));

// write to own chunk is OK
testCtx.TestResponse<NPDisk::TEvChunkWriteResult>(new NPDisk::TEvChunkWrite(
Expand Down Expand Up @@ -996,8 +991,7 @@ Y_UNIT_TEST_SUITE(TPDiskTest) {
for (ui32 i = 0; i < 100; ++i) {
testCtx.Send(new NPDisk::TEvLog(
vdisk.PDiskParams->Owner, vdisk.PDiskParams->OwnerRound, 0, TRcBuf(PrepareData(logBuffSize)), vdisk.GetLsnSeg(), nullptr));
auto data = PrepareData(chunkBuffSize);
auto parts = MakeIntrusive<NPDisk::TEvChunkWrite::TStrokaBackedUpParts>(data);
auto parts = MakeIntrusive<NPDisk::TEvChunkWrite::TAlignedParts>(PrepareData(chunkBuffSize));
testCtx.Send(new NPDisk::TEvChunkWrite(
vdisk.PDiskParams->Owner, vdisk.PDiskParams->OwnerRound,
chunk, 0, parts, nullptr, false, 0));
Expand Down Expand Up @@ -1028,6 +1022,112 @@ Y_UNIT_TEST_SUITE(TPDiskTest) {
Cerr << "all log writes are received" << Endl;
}
}

NPDisk::TEvChunkWrite::TPartsPtr GenParts(TReallyFastRng32& rng, size_t size) {
static int testCase = 0;
switch(testCase++) {
case 0: {
auto data = PrepareData(size);

auto counter = MakeIntrusive<::NMonitoring::TCounterForPtr>();
TMemoryConsumer consumer(counter);
TTrackableBuffer buffer(std::move(consumer), data.data(), data.size());
return MakeIntrusive<NPDisk::TEvChunkWrite::TBufBackedUpParts>(std::move(buffer));
}
case 1: {
size_t partsCount = rng.Uniform(1, 10);
TRope rope;
size_t createdBytes = 0;
if (size >= partsCount) {
for (size_t i = 0; i < partsCount - 1; ++i) {
TRope x(PrepareData(rng.Uniform(1, size / partsCount)));
createdBytes += x.size();
rope.Insert(rope.End(), std::move(x));
}
}
if (createdBytes < size) {
rope.Insert(rope.End(), TRope(PrepareData(size - createdBytes)));
}
return MakeIntrusive<NPDisk::TEvChunkWrite::TRopeAlignedParts>(std::move(rope), size);
}
case 2: {
testCase = 0;
return MakeIntrusive<NPDisk::TEvChunkWrite::TAlignedParts>(PrepareData(size));
}
}
UNIT_ASSERT(false);
return nullptr;
}

TString ConvertIPartsToString(NPDisk::TEvChunkWrite::IParts* parts) {
auto data = TString::Uninitialized(parts->ByteSize());
char *ptr = data.Detach();
for (ui32 i = 0; i < parts->Size(); ++i) {
auto [buf, bufSize] = (*parts)[i];
memcpy(ptr, buf, bufSize);
ptr += bufSize;
}
return data;
}

Y_UNIT_TEST(ChunkWriteDifferentOffsetAndSize) {
TActorTestContext testCtx{{}};

TVDiskMock vdisk(&testCtx);
vdisk.InitFull();

vdisk.ReserveChunk();
vdisk.CommitReservedChunks();
UNIT_ASSERT(vdisk.Chunks[EChunkState::COMMITTED].size() == 1);
const ui32 reservedChunk = *vdisk.Chunks[EChunkState::COMMITTED].begin();

auto seed = TInstant::Now().MicroSeconds();
Cerr << "seed# " << seed << Endl;
TReallyFastRng32 rng(seed);

auto blockSize = vdisk.PDiskParams->AppendBlockSize;
size_t maxSize = 8 * blockSize;
for (ui32 offset = 0; offset <= vdisk.PDiskParams->ChunkSize - maxSize; offset += rng.Uniform(vdisk.PDiskParams->ChunkSize / 100)) {
offset = offset / blockSize * blockSize;
auto size = rng.Uniform(1, maxSize + 1); // + 1 for maxSize to be included in distribution
NPDisk::TEvChunkWrite::TPartsPtr parts = GenParts(rng, size);
Ctest << "offset# " << offset << " size# " << size << Endl;
testCtx.TestResponse<NPDisk::TEvChunkWriteResult>(
new NPDisk::TEvChunkWrite(vdisk.PDiskParams->Owner, vdisk.PDiskParams->OwnerRound,
reservedChunk, offset, parts, nullptr, false, 0),
NKikimrProto::OK);
auto res = testCtx.TestResponse<NPDisk::TEvChunkReadResult>(
new NPDisk::TEvChunkRead(vdisk.PDiskParams->Owner, vdisk.PDiskParams->OwnerRound,
reservedChunk, offset, size, 0, 0),
NKikimrProto::OK);
UNIT_ASSERT(ConvertIPartsToString(parts.Get()) == res->Data.ToString().Slice());
}
}

Y_UNIT_TEST(ChunkWriteBadOffset) {
TActorTestContext testCtx{{}};

TVDiskMock vdisk(&testCtx);
vdisk.InitFull();

vdisk.ReserveChunk();
vdisk.CommitReservedChunks();
UNIT_ASSERT(vdisk.Chunks[EChunkState::COMMITTED].size() == 1);
const ui32 reservedChunk = *vdisk.Chunks[EChunkState::COMMITTED].begin();

auto seed = TInstant::Now().MicroSeconds();
Cerr << "seed# " << seed << Endl;
TReallyFastRng32 rng(seed);

auto blockSize = vdisk.PDiskParams->AppendBlockSize;
for (ui32 offset = 1; offset < blockSize; offset += rng.Uniform(1, blockSize / 20)) {
NPDisk::TEvChunkWrite::TPartsPtr parts = GenParts(rng, 1);
testCtx.TestResponse<NPDisk::TEvChunkWriteResult>(
new NPDisk::TEvChunkWrite(vdisk.PDiskParams->Owner, vdisk.PDiskParams->OwnerRound,
reservedChunk, offset, parts, nullptr, false, 0),
NKikimrProto::ERROR);
}
}
}

Y_UNIT_TEST_SUITE(PDiskCompatibilityInfo) {
Expand Down
10 changes: 6 additions & 4 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_actions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,16 +267,18 @@ void TTestIncorrectRequests::TestFSM(const TActorContext &ctx) {
case 170:
TEST_RESPONSE(EvChunkWriteResult, ERROR);
VERBOSE_COUT(" Sending TEvChunkWrite that actually does the thing");
ctx.Send(Yard, new NPDisk::TEvChunkWrite(Owner, OwnerRound, ChunkIdx0, ChunkWriteData.size(),
ctx.Send(Yard, new NPDisk::TEvChunkWrite(Owner, OwnerRound, ChunkIdx0, 0,
new NPDisk::TEvChunkWrite::TNonOwningParts(ChunkWriteParts.Get(), 1), (void*)42, false, 1));
break;
case 180:
case 180: {
TEST_RESPONSE(EvChunkWriteResult, OK);
ChunkIdx = LastResponse.ChunkIdx;
size_t blockSize = LastResponse.AppendBlockSize;
VERBOSE_COUT(" Sending TEvChunkWrite");
ctx.Send(Yard, new NPDisk::TEvChunkWrite(Owner, OwnerRound, ChunkIdx, ChunkWriteData.size() / 2,
ctx.Send(Yard, new NPDisk::TEvChunkWrite(Owner, OwnerRound, ChunkIdx, ChunkWriteData.size() / 2 / blockSize * blockSize,
new NPDisk::TEvChunkWrite::TNonOwningParts(ChunkWriteParts.Get(), 1), (void*)42, false, 1));
break;
}
case 190:
TEST_RESPONSE(EvChunkWriteResult, OK);
VERBOSE_COUT(" Sending TEvInit for invalid id");
Expand Down Expand Up @@ -3411,7 +3413,7 @@ void TTestChunkDeletionWhileWritingIt::TestFSM(const TActorContext &ctx) {
ChunkWriteData = PrepareData(ChunkSize - 1);
ChunkWriteParts[0].Data = ChunkWriteData.data();
ChunkWriteParts[0].Size = (ui32)ChunkWriteData.size();
ctx.Send(Yard, new NPDisk::TEvChunkWrite(Owner, OwnerRound, ChunkIdx, 1,
ctx.Send(Yard, new NPDisk::TEvChunkWrite(Owner, OwnerRound, ChunkIdx, 0,
new NPDisk::TEvChunkWrite::TNonOwningParts(ChunkWriteParts.Get(), 1), (void*)42, false, 5));

VERBOSE_COUT(" Sending TEvLog to commit");
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ void TestChunkWriteReleaseRun() {
pDisk->ProcessLogWriteQueueAndCommits();

{
TString chunkWriteData = PrepareData(1024);
NPDisk::TEvChunkWrite ev(evInitRes->PDiskParams->Owner, evInitRes->PDiskParams->OwnerRound, reservedChunk,
0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(chunkWriteData), nullptr, false, 0);
0, new NPDisk::TEvChunkWrite::TAlignedParts(PrepareData(1024)), nullptr, false, 0);
NPDisk::TChunkWrite *chunkWrite = new NPDisk::TChunkWrite(ev, testCtx.Sender, {}, {});
bool ok = pDisk->PreprocessRequest(chunkWrite);
UNIT_ASSERT(!ok);
Expand Down
12 changes: 4 additions & 8 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_races.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ Y_UNIT_TEST_SUITE(TPDiskRaces) {
while (mock.Chunks[EChunkState::COMMITTED].size() > 0) {
auto it = mock.Chunks[EChunkState::COMMITTED].begin();
for (ui32 i = 0; i < inflight; ++i) {
TString dataCopy = data;
testCtx.Send(new NPDisk::TEvChunkWrite(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound,
*it, 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(dataCopy), nullptr, false, 0));
*it, 0, new NPDisk::TEvChunkWrite::TAlignedParts(TString(data)), nullptr, false, 0));
}
NPDisk::TCommitRecord rec;
rec.DeleteChunks.push_back(*it);
Expand Down Expand Up @@ -112,9 +111,8 @@ Y_UNIT_TEST_SUITE(TPDiskRaces) {

auto sendManyWrites = [&](TVDiskMock& mock, TChunkIdx chunk, ui32 number, ui64& cookie) {
for (ui32 i = 0; i < number; ++i) {
TString dataCopy = data;
testCtx.Send(new NPDisk::TEvChunkWrite(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound,
chunk, 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(dataCopy), (void*)(cookie++), false, 0));
chunk, 0, new NPDisk::TEvChunkWrite::TAlignedParts(TString(data)), (void*)(cookie++), false, 0));
}
};

Expand All @@ -128,9 +126,8 @@ Y_UNIT_TEST_SUITE(TPDiskRaces) {
{
auto& chunkIds = mock.Chunks[EChunkState::COMMITTED];
for (auto it = chunkIds.begin(); it != chunkIds.end(); ++it) {
TString dataCopy = data;
testCtx.TestResponse<NPDisk::TEvChunkWriteResult>(new NPDisk::TEvChunkWrite(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound,
*it, 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(dataCopy), (void*)10, false, 0),
*it, 0, new NPDisk::TEvChunkWrite::TAlignedParts(TString(data)), (void*)10, false, 0),
NKikimrProto::OK);
}
}
Expand Down Expand Up @@ -216,9 +213,8 @@ Y_UNIT_TEST_SUITE(TPDiskRaces) {
while (mock.Chunks[EChunkState::COMMITTED].size() > 0) {
auto it = mock.Chunks[EChunkState::COMMITTED].begin();
for (ui32 i = 0; i < inflight; ++i) {
TString dataCopy = data;
testCtx.Send(new NPDisk::TEvChunkWrite(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound,
*it, 0, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(dataCopy), nullptr, false, 0));
*it, 0, new NPDisk::TEvChunkWrite::TAlignedParts(TString(data)), nullptr, false, 0));
}
NPDisk::TCommitRecord rec;
rec.DeleteChunks.push_back(*it);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/ut_pdiskfit/lib/basic_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ class TFakeVDisk
void *cookie = reinterpret_cast<void *>(NextWriteCookie++);

SendPDiskRequest(ctx, new NPDisk::TEvChunkWrite(PDiskParams->Owner, PDiskParams->OwnerRound, it->first,
offsetInBlocks * PDiskParams->AppendBlockSize, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(data),
offsetInBlocks * PDiskParams->AppendBlockSize, new NPDisk::TEvChunkWrite::TAlignedParts(TString(data)),
cookie, true, NPriWrite::HullHugeAsyncBlob), [&] {
State.WritesInFlight.push_back(TWriteRecord{it->first, offsetInBlocks, numBlocks, std::move(checksums),
cookie});
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/load_test/pdisk_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,8 @@ class TPDiskReaderLoadTestActor : public TActorBootstrapped<TPDiskReaderLoadTest
TChunkIdx chunkIdx = msg->ChunkIds[i];
Chunks[i].Idx = chunkIdx;
ui64 requestIdx = NewTRequestInfo((ui32)DataBuffer.size(), chunkIdx, TAppData::TimeProvider->Now());
TString tmp = DataBuffer;
SendRequest(ctx, std::make_unique<NPDisk::TEvChunkWrite>(PDiskParams->Owner, PDiskParams->OwnerRound,
chunkIdx, 0u, new NPDisk::TEvChunkWrite::TStrokaBackedUpParts(tmp),
chunkIdx, 0u, new NPDisk::TEvChunkWrite::TAlignedParts(TString(DataBuffer)),
reinterpret_cast<void*>(requestIdx), true, NPriWrite::HullHugeAsyncBlob, Sequential));
++ChunkWrite_RequestsSent;
}
Expand Down

0 comments on commit 120f8d0

Please sign in to comment.