Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add verify for wrong StartOffset and EndOffset. Fix copy of blob. #12049

Merged
merged 4 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ydb/core/persqueue/blob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ TClientBlob TClientBlob::Deserialize(const char* data, ui32 size)
Y_ABORT_UNLESS(data < end, "size %u SeqNo %" PRIu64 " SourceId %s", size, seqNo, sourceId.c_str());
TString dt(data, end - data);

return TClientBlob(sourceId, seqNo, dt, std::move(partData), writeTimestamp, createTimestamp, us, partitionKey, explicitHashKey);
return TClientBlob(sourceId, seqNo, std::move(dt), std::move(partData), writeTimestamp, createTimestamp, us, partitionKey, explicitHashKey);
}

void TBatch::SerializeTo(TString& res) const{
Expand Down Expand Up @@ -606,7 +606,7 @@ void TBatch::UnpackToType1(TVector<TClientBlob> *blobs) const {
auto it = partData.find(pos[i]);
if (it != partData.end())
pd = it->second;
(*blobs)[pos[i]] = TClientBlob(sourceIds[currentSID], seqNo[i], dt[i], std::move(pd), wtime[pos[i]], ctime[pos[i]], uncompressedSize[pos[i]],
(*blobs)[pos[i]] = TClientBlob(sourceIds[currentSID], seqNo[i], std::move(dt[i]), std::move(pd), wtime[pos[i]], ctime[pos[i]], uncompressedSize[pos[i]],
partitionKey[i], explicitHash[i]);
if (i + 1 == end[currentSID])
++currentSID;
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/persqueue/blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ struct TClientBlob {
, UncompressedSize(0)
{}

TClientBlob(const TString& sourceId, const ui64 seqNo, const TString& data, TMaybe<TPartData> &&partData, TInstant writeTimestamp, TInstant createTimestamp,
TClientBlob(const TString& sourceId, const ui64 seqNo, const TString&& data, TMaybe<TPartData> &&partData, TInstant writeTimestamp, TInstant createTimestamp,
const ui64 uncompressedSize, const TString& partitionKey, const TString& explicitHashKey)
: SourceId(sourceId)
, SeqNo(seqNo)
, Data(data)
, Data(std::move(data))
, PartData(std::move(partData))
, WriteTimestamp(writeTimestamp)
, CreateTimestamp(createTimestamp)
Expand Down
67 changes: 42 additions & 25 deletions ydb/core/persqueue/partition_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ void TInitializer::Next(const TActorContext& ctx) {
}

void TInitializer::Done(const TActorContext& ctx) {
PQ_LOG_D("Initializing topic '" << Partition->TopicName()
<< "' partition " << Partition->Partition
<< ". Completed.");
PQ_LOG_D("Initializing completed.");
InProgress = false;
Partition->InitComplete(ctx);
}
Expand All @@ -74,12 +72,14 @@ void TInitializer::DoNext(const TActorContext& ctx) {
}
}

PQ_LOG_D("Initializing topic '" << Partition->TopicName()
<< "' partition " << Partition->Partition
<< ". Step " << CurrentStep->Get()->Name);
PQ_LOG_D("Start initializing step " << CurrentStep->Get()->Name);
CurrentStep->Get()->Execute(ctx);
}

TString TInitializer::LogPrefix() const {
return TStringBuilder() << "[" << Partition->TopicName() << ":" << Partition->Partition << ":Initializer] ";
}


//
// TInitializerStep
Expand Down Expand Up @@ -113,10 +113,19 @@ void TInitializerStep::PoisonPill(const TActorContext& ctx) {
ctx.Send(Partition()->Tablet, new TEvents::TEvPoisonPill());
}

TString TInitializerStep::TopicName() const {
const TString& TInitializerStep::TopicName() const {
return Partition()->TopicName();
}

TInitializionContext& TInitializerStep::GetContext() {
return Initializer->Ctx;
}

TString TInitializerStep::LogPrefix() const {
return TStringBuilder() << "[" << Partition()->TopicName() << ":" << Partition()->Partition << ":" << Name << "] ";
}



//
// TBaseKVStep
Expand Down Expand Up @@ -182,7 +191,7 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon
break;

case NKikimrProto::ERROR:
PQ_LOG_ERROR("Partition " << Partition()->Partition << " can't read config");
PQ_LOG_ERROR("can't read config");
PoisonPill(ctx);
return;

Expand Down Expand Up @@ -294,7 +303,7 @@ void TInitMetaStep::LoadMeta(const NKikimrClient::TResponse& kvResponse, const T
Y_ABORT();
} else {
auto& ctx = mbCtx.GetRef();
PQ_LOG_ERROR("read topic '" << TopicName() << "' partition " << PartitionId() << " error");
PQ_LOG_ERROR("read topic error");
PoisonPill(ctx);
}
break;
Expand All @@ -314,6 +323,13 @@ void TInitMetaStep::LoadMeta(const NKikimrClient::TResponse& kvResponse, const T
if (Partition()->StartOffset == Partition()->EndOffset) {
Partition()->NewHead.Offset = Partition()->Head.Offset = Partition()->EndOffset;
}
if (meta.HasStartOffset()) {
GetContext().StartOffset = meta.GetStartOffset();
}
if (meta.HasEndOffset()) {
GetContext().EndOffset = meta.GetEndOffset();
}

Partition()->SubDomainOutOfSpace = meta.GetSubDomainOutOfSpace();
Partition()->EndWriteTimestamp = TInstant::MilliSeconds(meta.GetEndWriteTimestamp());
Partition()->PendingWriteTimestamp = Partition()->EndWriteTimestamp;
Expand Down Expand Up @@ -384,8 +400,7 @@ void TInitInfoRangeStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActor
const auto& pair = range.GetPair(i);
Y_ABORT_UNLESS(pair.HasStatus());
if (pair.GetStatus() != NKikimrProto::OK) {
PQ_LOG_ERROR("read range error topic '" << TopicName() << "' partition " << PartitionId()
<< " got status " << pair.GetStatus() << " for key " << (pair.HasKey() ? pair.GetKey() : "unknown")
PQ_LOG_ERROR("read range error got status " << pair.GetStatus() << " for key " << (pair.HasKey() ? pair.GetKey() : "unknown")
);

PoisonPill(ctx);
Expand Down Expand Up @@ -420,7 +435,7 @@ void TInitInfoRangeStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActor
Done(ctx);
break;
case NKikimrProto::ERROR:
PQ_LOG_ERROR("read topic '" << TopicName() << "' partition " << PartitionId() << " error");
PQ_LOG_ERROR("read topic error");
PoisonPill(ctx);
break;
default:
Expand Down Expand Up @@ -467,6 +482,15 @@ void TInitDataRangeStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActor
}
FormHeadAndProceed();

if (GetContext().StartOffset && *GetContext().StartOffset != Partition()->StartOffset) {
PQ_LOG_ERROR("StartOffset from meta and blobs are different: " << *GetContext().StartOffset << " != " << Partition()->StartOffset);
return PoisonPill(ctx);
}
if (GetContext().EndOffset && *GetContext().EndOffset != Partition()->EndOffset) {
PQ_LOG_ERROR("EndOffset from meta and blobs are different: " << *GetContext().EndOffset << " != " << Partition()->EndOffset);
return PoisonPill(ctx);
}

Done(ctx);
break;
case NKikimrProto::NODATA:
Expand Down Expand Up @@ -509,8 +533,7 @@ void TInitDataRangeStep::FillBlobsMetaData(const NKikimrClient::TKeyValueRespons
if (!k.IsHead()) //head.Size will be filled after read or head blobs
bodySize += pair.GetValueSize();

PQ_LOG_D("Got data topic " << TopicName() << " partition " << k.GetPartition()
<< " offset " << k.GetOffset() << " count " << k.GetCount() << " size " << pair.GetValueSize()
PQ_LOG_D("Got data offset " << k.GetOffset() << " count " << k.GetCount() << " size " << pair.GetValueSize()
<< " so " << startOffset << " eo " << endOffset << " " << pair.GetKey()
);
dataKeysBody.push_back({k, pair.GetValueSize(),
Expand Down Expand Up @@ -612,8 +635,7 @@ void TInitDataStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorConte
Y_ABORT_UNLESS(dataKeysHead[currentLevel].KeysCount() < AppData(ctx)->PQConfig.GetMaxBlobsPerLevel());
Y_ABORT_UNLESS(!dataKeysHead[currentLevel].NeedCompaction());

PQ_LOG_D("read res partition topic '" << TopicName()
<< "' parititon " << key.GetPartition() << " offset " << offset << " endOffset " << Partition()->EndOffset
PQ_LOG_D("read res partition offset " << offset << " endOffset " << Partition()->EndOffset
<< " key " << key.GetOffset() << "," << key.GetCount() << " valuesize " << read.GetValue().size()
<< " expected " << size
);
Expand All @@ -636,9 +658,8 @@ void TInitDataStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorConte
Y_ABORT("NODATA can't be here");
return;
case NKikimrProto::ERROR:
PQ_LOG_ERROR("tablet " << Partition()->TabletID << " HandleOnInit topic '" << TopicName()
<< "' partition " << PartitionId()
<< " ReadResult " << i << " status NKikimrProto::ERROR result message: \"" << read.GetMessage()
PQ_LOG_ERROR("tablet " << Partition()->TabletID << " HandleOnInit ReadResult "
<< i << " status NKikimrProto::ERROR result message: \"" << read.GetMessage()
<< " \" errorReason: \"" << response.GetErrorReason() << "\""
);
PoisonPill(ctx);
Expand All @@ -664,9 +685,7 @@ TInitEndWriteTimestampStep::TInitEndWriteTimestampStep(TInitializer* initializer

void TInitEndWriteTimestampStep::Execute(const TActorContext &ctx) {
if (Partition()->EndWriteTimestamp != TInstant::Zero() || (Partition()->HeadKeys.empty() && Partition()->DataKeysBody.empty())) {
PQ_LOG_I("Initializing EndWriteTimestamp of the topic '" << Partition()->TopicName()
<< "' partition " << Partition()->Partition
<< " skiped because already initialized.");
PQ_LOG_I("Initializing EndWriteTimestamp skipped because already initialized.");
return Done(ctx);
}

Expand All @@ -682,9 +701,7 @@ void TInitEndWriteTimestampStep::Execute(const TActorContext &ctx) {
Partition()->PendingWriteTimestamp = Partition()->EndWriteTimestamp;
}

PQ_LOG_I("Initializing EndWriteTimestamp of the topic '" << Partition()->TopicName()
<< "' partition " << Partition()->Partition
<< " from keys completed. Value " << Partition()->EndWriteTimestamp);
PQ_LOG_I("Initializing EndWriteTimestamp from keys completed. Value " << Partition()->EndWriteTimestamp);

return Done(ctx);
}
Expand Down
14 changes: 12 additions & 2 deletions ydb/core/persqueue/partition_init.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ namespace NKikimr::NPQ {
class TInitializerStep;
class TPartition;

struct TInitializionContext {
std::optional<ui64> StartOffset;
std::optional<ui64> EndOffset;
};


/**
* This class execute independent steps of parttition actor initialization.
Expand All @@ -40,13 +45,15 @@ class TInitializer {
private:
void DoNext(const TActorContext& ctx);

TString LogPrefix() const;

TPartition* Partition;

bool InProgress;

TVector<THolder<TInitializerStep>> Steps;
std::vector<THolder<TInitializerStep>>::iterator CurrentStep;

TInitializionContext Ctx;
};

/**
Expand All @@ -63,7 +70,8 @@ class TInitializerStep {

TPartition* Partition() const;
const TPartitionId& PartitionId() const;
TString TopicName() const;
const TString& TopicName() const;
TInitializionContext& GetContext();

const TString Name;
const bool SkipNewPartition;
Expand All @@ -72,6 +80,8 @@ class TInitializerStep {
void Done(const TActorContext& ctx);
void PoisonPill(const TActorContext& ctx);

TString LogPrefix() const;

private:
TInitializer* Initializer;
};
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/persqueue/partition_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,7 @@ void TPartition::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont
Y_ABORT_UNLESS(record.HasSender());

auto cookie = record.HasCookie() ? TMaybe<ui64>(record.GetCookie()) : TMaybe<ui64>();

auto readTimestamp = GetReadFrom(record.GetMaxTimeLagMs(), record.GetReadTimestampMs(), TInstant::Zero() /* TODO */, ctx);
auto readTimestamp = GetReadFrom(record.GetMaxTimeLagMs(), record.GetReadTimestampMs(), TInstant::Zero(), ctx);

TActorId sender = ActorIdFromProto(record.GetSender());
if (InitDone && EndOffset > (ui64)record.GetOffset() && (!readTimestamp || EndWriteTimestamp >= *readTimestamp)) { //already has data, answer right now
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1256,13 +1256,18 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
++WriteNewMessagesInternal;
}

// Empty partition may will be filling from offset great than zero from mirror actor if source partition old and was clean by retantion time
if (!Head.GetCount() && !NewHead.GetCount() && DataKeysBody.empty() && HeadKeys.empty() && p.Offset) {
StartOffset = *p.Offset;
}

TMaybe<TPartData> partData;
if (p.Msg.TotalParts > 1) { //this is multi-part message
partData = TPartData(p.Msg.PartNo, p.Msg.TotalParts, p.Msg.TotalSize);
}
WriteTimestamp = ctx.Now();
WriteTimestampEstimate = p.Msg.WriteTimestamp > 0 ? TInstant::MilliSeconds(p.Msg.WriteTimestamp) : WriteTimestamp;
TClientBlob blob(p.Msg.SourceId, p.Msg.SeqNo, p.Msg.Data, std::move(partData), WriteTimestampEstimate,
TClientBlob blob(p.Msg.SourceId, p.Msg.SeqNo, std::move(p.Msg.Data), std::move(partData), WriteTimestampEstimate,
TInstant::MilliSeconds(p.Msg.CreateTimestamp == 0 ? curOffset : p.Msg.CreateTimestamp),
p.Msg.UncompressedSize, p.Msg.PartitionKey, p.Msg.ExplicitHashKey); //remove curOffset when LB will report CTime

Expand Down Expand Up @@ -1338,7 +1343,6 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
PartitionedBlob = TPartitionedBlob(Partition, 0, "", 0, 0, 0, Head, NewHead, true, false, MaxBlobSize);
}

TString().swap(p.Msg.Data);
return true;
}

Expand Down
14 changes: 8 additions & 6 deletions ydb/core/persqueue/ut/internals_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ void Test(bool headCompacted, ui32 parts, ui32 partSize, ui32 leftInHead)

THead head;
head.Offset = 100;
TString value(100_KB, 'a');
head.AddBatch(TBatch(head.Offset, 0));
for (ui32 i = 0; i < 50; ++i) {
TString value(100_KB, 'a');
head.AddBlob(TClientBlob(
"sourceId" + TString(1,'a' + rand() % 26), i + 1, value, TMaybe<TPartData>(),
"sourceId" + TString(1,'a' + rand() % 26), i + 1, std::move(value), TMaybe<TPartData>(),
TInstant::MilliSeconds(i + 1), TInstant::MilliSeconds(i + 1), 1, "", ""
));
if (!headCompacted)
Expand All @@ -64,8 +64,9 @@ void Test(bool headCompacted, ui32 parts, ui32 partSize, ui32 leftInHead)
newHead.Offset = head.GetNextOffset();
newHead.AddBatch(TBatch(newHead.Offset, 0));
for (ui32 i = 0; i < 10; ++i) {
TString value(100_KB, 'a');
newHead.AddBlob(TClientBlob(
"sourceId2", i + 1, value, TMaybe<TPartData>(),
"sourceId2", i + 1, std::move(value), TMaybe<TPartData>(),
TInstant::MilliSeconds(i + 1000), TInstant::MilliSeconds(i + 1000), 1, "", ""
));
all.push_back(newHead.GetLastBatch().Blobs.back()); //newHead always glued
Expand All @@ -82,8 +83,9 @@ void Test(bool headCompacted, ui32 parts, ui32 partSize, ui32 leftInHead)
UNIT_ASSERT(!blob.IsComplete());
UNIT_ASSERT(blob.IsNextPart("sourceId3", 1, i, &error));
TMaybe<TPartData> partData = TPartData(i, parts, value2.size());
TString v = value2;
TClientBlob clientBlob(
"soruceId3", 1, value2, std::move(partData),
"soruceId3", 1, std::move(v), std::move(partData),
TInstant::MilliSeconds(1), TInstant::MilliSeconds(1), 1, "", ""
);
all.push_back(clientBlob);
Expand Down Expand Up @@ -173,11 +175,11 @@ Y_UNIT_TEST(TestPartitionedBigTest) {
}

Y_UNIT_TEST(TestBatchPacking) {
TString value(10, 'a');
TBatch batch;
for (ui32 i = 0; i < 100; ++i) {
TString value(10, 'a');
batch.AddBlob(TClientBlob(
"sourceId1", i + 1, value, TMaybe<TPartData>(),
"sourceId1", i + 1, std::move(value), TMaybe<TPartData>(),
TInstant::MilliSeconds(1), TInstant::MilliSeconds(1), 0, "", ""
));
}
Expand Down
10 changes: 6 additions & 4 deletions ydb/core/persqueue/ut/partition_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ class TPartitionFixture : public NUnitTest::TBaseFixture {
void WaitDiskStatusRequest();
void SendDiskStatusResponse(TMaybe<ui64>* cookie = nullptr);
void WaitMetaReadRequest();
void SendMetaReadResponse(TMaybe<ui64> step, TMaybe<ui64> txId, TInstant endWriteTimestamp);
void SendMetaReadResponse(ui64 begin, ui64 end, TMaybe<ui64> step, TMaybe<ui64> txId, TInstant endWriteTimestamp);
void WaitBlobReadRequest();
void SendBlobReadResponse(ui64 begin, ui64 end);
void WaitInfoRangeRequest();
Expand Down Expand Up @@ -416,7 +416,7 @@ TPartition* TPartitionFixture::CreatePartition(const TCreatePartitionParams& par
SendDiskStatusResponse();

WaitMetaReadRequest();
SendMetaReadResponse(params.PlanStep, params.TxId, params.EndWriteTimestamp);
SendMetaReadResponse(params.Begin, params.End, params.PlanStep, params.TxId, params.EndWriteTimestamp);

WaitInfoRangeRequest();
SendInfoRangeResponse(params.Partition.InternalPartitionId, params.Config.Consumers);
Expand Down Expand Up @@ -777,7 +777,7 @@ void TPartitionFixture::WaitMetaReadRequest()
UNIT_ASSERT_VALUES_EQUAL(event->Record.CmdReadSize(), 2);
}

void TPartitionFixture::SendMetaReadResponse(TMaybe<ui64> step, TMaybe<ui64> txId, TInstant endWriteTimestamp)
void TPartitionFixture::SendMetaReadResponse(ui64 begin, ui64 end, TMaybe<ui64> step, TMaybe<ui64> txId, TInstant endWriteTimestamp)
{
auto event = MakeHolder<TEvKeyValue::TEvResponse>();
event->Record.SetStatus(NMsgBusProxy::MSTATUS_OK);
Expand All @@ -790,6 +790,8 @@ void TPartitionFixture::SendMetaReadResponse(TMaybe<ui64> step, TMaybe<ui64> txI
read->SetStatus(NKikimrProto::OK);

NKikimrPQ::TPartitionMeta meta;
meta.SetStartOffset(begin);
meta.SetEndOffset(end);
meta.SetEndWriteTimestamp(endWriteTimestamp.MilliSeconds());

TString out;
Expand Down Expand Up @@ -840,7 +842,7 @@ TBatch CreateBatch(size_t count) {
Cerr << ">>>> ADD BLOB " << i << " writeTimestamp=" << (TInstant::Now() - TDuration::MilliSeconds(10)) << Endl << Flush;

TString data = TStringBuilder() << "message-data-" << i;
TClientBlob blob("source-id-1", 13 + i /* seqNo */, data, {} /* partData */, TInstant::Now() - TDuration::MilliSeconds(10) /* writeTimestamp */,
TClientBlob blob("source-id-1", 13 + i /* seqNo */, std::move(data), {} /* partData */, TInstant::Now() - TDuration::MilliSeconds(10) /* writeTimestamp */,
TInstant::Now() - TDuration::MilliSeconds(50) /* createTimestamp */, data.size(), "partitionKey", "explicitHashKey");
batch.AddBlob(blob);
}
Expand Down
Loading