diff --git a/ydb/core/persqueue/blob.cpp b/ydb/core/persqueue/blob.cpp index fa0eae4b3458..e1716161151f 100644 --- a/ydb/core/persqueue/blob.cpp +++ b/ydb/core/persqueue/blob.cpp @@ -172,7 +172,7 @@ TClientBlob TClientBlob::Deserialize(const char* data, ui32 size) Y_ABORT_UNLESS(data < end, "size %u SeqNo %" PRIu64 " SourceId %s", size, seqNo, sourceId.c_str()); TString dt(data, end - data); - return TClientBlob(sourceId, seqNo, dt, std::move(partData), writeTimestamp, createTimestamp, us, partitionKey, explicitHashKey); + return TClientBlob(sourceId, seqNo, std::move(dt), std::move(partData), writeTimestamp, createTimestamp, us, partitionKey, explicitHashKey); } void TBatch::SerializeTo(TString& res) const{ @@ -606,7 +606,7 @@ void TBatch::UnpackToType1(TVector *blobs) const { auto it = partData.find(pos[i]); if (it != partData.end()) pd = it->second; - (*blobs)[pos[i]] = TClientBlob(sourceIds[currentSID], seqNo[i], dt[i], std::move(pd), wtime[pos[i]], ctime[pos[i]], uncompressedSize[pos[i]], + (*blobs)[pos[i]] = TClientBlob(sourceIds[currentSID], seqNo[i], std::move(dt[i]), std::move(pd), wtime[pos[i]], ctime[pos[i]], uncompressedSize[pos[i]], partitionKey[i], explicitHash[i]); if (i + 1 == end[currentSID]) ++currentSID; diff --git a/ydb/core/persqueue/blob.h b/ydb/core/persqueue/blob.h index 1869ecbc8d58..b42b39d016f9 100644 --- a/ydb/core/persqueue/blob.h +++ b/ydb/core/persqueue/blob.h @@ -47,11 +47,11 @@ struct TClientBlob { , UncompressedSize(0) {} - TClientBlob(const TString& sourceId, const ui64 seqNo, const TString& data, TMaybe &&partData, TInstant writeTimestamp, TInstant createTimestamp, + TClientBlob(const TString& sourceId, const ui64 seqNo, const TString&& data, TMaybe &&partData, TInstant writeTimestamp, TInstant createTimestamp, const ui64 uncompressedSize, const TString& partitionKey, const TString& explicitHashKey) : SourceId(sourceId) , SeqNo(seqNo) - , Data(data) + , Data(std::move(data)) , PartData(std::move(partData)) , WriteTimestamp(writeTimestamp) , CreateTimestamp(createTimestamp) diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp index 75b7fb63b0f4..be59c045c77f 100644 --- a/ydb/core/persqueue/partition_init.cpp +++ b/ydb/core/persqueue/partition_init.cpp @@ -52,9 +52,7 @@ void TInitializer::Next(const TActorContext& ctx) { } void TInitializer::Done(const TActorContext& ctx) { - PQ_LOG_D("Initializing topic '" << Partition->TopicName() - << "' partition " << Partition->Partition - << ". Completed."); + PQ_LOG_D("Initializing completed."); InProgress = false; Partition->InitComplete(ctx); } @@ -74,12 +72,14 @@ void TInitializer::DoNext(const TActorContext& ctx) { } } - PQ_LOG_D("Initializing topic '" << Partition->TopicName() - << "' partition " << Partition->Partition - << ". Step " << CurrentStep->Get()->Name); + PQ_LOG_D("Start initializing step " << CurrentStep->Get()->Name); CurrentStep->Get()->Execute(ctx); } +TString TInitializer::LogPrefix() const { + return TStringBuilder() << "[" << Partition->TopicName() << ":" << Partition->Partition << ":Initializer] "; +} + // // TInitializerStep @@ -113,10 +113,19 @@ void TInitializerStep::PoisonPill(const TActorContext& ctx) { ctx.Send(Partition()->Tablet, new TEvents::TEvPoisonPill()); } -TString TInitializerStep::TopicName() const { +const TString& TInitializerStep::TopicName() const { return Partition()->TopicName(); } +TInitializionContext& TInitializerStep::GetContext() { + return Initializer->Ctx; +} + +TString TInitializerStep::LogPrefix() const { + return TStringBuilder() << "[" << Partition()->TopicName() << ":" << Partition()->Partition << ":" << Name << "] "; +} + + // // TBaseKVStep @@ -182,7 +191,7 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon break; case NKikimrProto::ERROR: - PQ_LOG_ERROR("Partition " << Partition()->Partition << " can't read config"); + PQ_LOG_ERROR("can't read config"); PoisonPill(ctx); return; @@ -294,7 +303,7 @@ void TInitMetaStep::LoadMeta(const NKikimrClient::TResponse& kvResponse, const T Y_ABORT(); } else { auto& ctx = mbCtx.GetRef(); - PQ_LOG_ERROR("read topic '" << TopicName() << "' partition " << PartitionId() << " error"); + PQ_LOG_ERROR("read topic error"); PoisonPill(ctx); } break; @@ -314,6 +323,13 @@ void TInitMetaStep::LoadMeta(const NKikimrClient::TResponse& kvResponse, const T if (Partition()->StartOffset == Partition()->EndOffset) { Partition()->NewHead.Offset = Partition()->Head.Offset = Partition()->EndOffset; } + if (meta.HasStartOffset()) { + GetContext().StartOffset = meta.GetStartOffset(); + } + if (meta.HasEndOffset()) { + GetContext().EndOffset = meta.GetEndOffset(); + } + Partition()->SubDomainOutOfSpace = meta.GetSubDomainOutOfSpace(); Partition()->EndWriteTimestamp = TInstant::MilliSeconds(meta.GetEndWriteTimestamp()); Partition()->PendingWriteTimestamp = Partition()->EndWriteTimestamp; @@ -384,8 +400,7 @@ void TInitInfoRangeStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActor const auto& pair = range.GetPair(i); Y_ABORT_UNLESS(pair.HasStatus()); if (pair.GetStatus() != NKikimrProto::OK) { - PQ_LOG_ERROR("read range error topic '" << TopicName() << "' partition " << PartitionId() - << " got status " << pair.GetStatus() << " for key " << (pair.HasKey() ? pair.GetKey() : "unknown") + PQ_LOG_ERROR("read range error got status " << pair.GetStatus() << " for key " << (pair.HasKey() ? pair.GetKey() : "unknown") ); PoisonPill(ctx); @@ -420,7 +435,7 @@ void TInitInfoRangeStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActor Done(ctx); break; case NKikimrProto::ERROR: - PQ_LOG_ERROR("read topic '" << TopicName() << "' partition " << PartitionId() << " error"); + PQ_LOG_ERROR("read topic error"); PoisonPill(ctx); break; default: @@ -467,6 +482,15 @@ void TInitDataRangeStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActor } FormHeadAndProceed(); + if (GetContext().StartOffset && *GetContext().StartOffset != Partition()->StartOffset) { + PQ_LOG_ERROR("StartOffset from meta and blobs are different: " << *GetContext().StartOffset << " != " << Partition()->StartOffset); + return PoisonPill(ctx); + } + if (GetContext().EndOffset && *GetContext().EndOffset != Partition()->EndOffset) { + PQ_LOG_ERROR("EndOffset from meta and blobs are different: " << *GetContext().EndOffset << " != " << Partition()->EndOffset); + return PoisonPill(ctx); + } + Done(ctx); break; case NKikimrProto::NODATA: @@ -509,8 +533,7 @@ void TInitDataRangeStep::FillBlobsMetaData(const NKikimrClient::TKeyValueRespons if (!k.IsHead()) //head.Size will be filled after read or head blobs bodySize += pair.GetValueSize(); - PQ_LOG_D("Got data topic " << TopicName() << " partition " << k.GetPartition() - << " offset " << k.GetOffset() << " count " << k.GetCount() << " size " << pair.GetValueSize() + PQ_LOG_D("Got data offset " << k.GetOffset() << " count " << k.GetCount() << " size " << pair.GetValueSize() << " so " << startOffset << " eo " << endOffset << " " << pair.GetKey() ); dataKeysBody.push_back({k, pair.GetValueSize(), @@ -612,8 +635,7 @@ void TInitDataStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorConte Y_ABORT_UNLESS(dataKeysHead[currentLevel].KeysCount() < AppData(ctx)->PQConfig.GetMaxBlobsPerLevel()); Y_ABORT_UNLESS(!dataKeysHead[currentLevel].NeedCompaction()); - PQ_LOG_D("read res partition topic '" << TopicName() - << "' parititon " << key.GetPartition() << " offset " << offset << " endOffset " << Partition()->EndOffset + PQ_LOG_D("read res partition offset " << offset << " endOffset " << Partition()->EndOffset << " key " << key.GetOffset() << "," << key.GetCount() << " valuesize " << read.GetValue().size() << " expected " << size ); @@ -636,9 +658,8 @@ void TInitDataStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorConte Y_ABORT("NODATA can't be here"); return; case NKikimrProto::ERROR: - PQ_LOG_ERROR("tablet " << Partition()->TabletID << " HandleOnInit topic '" << TopicName() - << "' partition " << PartitionId() - << " ReadResult " << i << " status NKikimrProto::ERROR result message: \"" << read.GetMessage() + PQ_LOG_ERROR("tablet " << Partition()->TabletID << " HandleOnInit ReadResult " + << i << " status NKikimrProto::ERROR result message: \"" << read.GetMessage() << " \" errorReason: \"" << response.GetErrorReason() << "\"" ); PoisonPill(ctx); @@ -664,9 +685,7 @@ TInitEndWriteTimestampStep::TInitEndWriteTimestampStep(TInitializer* initializer void TInitEndWriteTimestampStep::Execute(const TActorContext &ctx) { if (Partition()->EndWriteTimestamp != TInstant::Zero() || (Partition()->HeadKeys.empty() && Partition()->DataKeysBody.empty())) { - PQ_LOG_I("Initializing EndWriteTimestamp of the topic '" << Partition()->TopicName() - << "' partition " << Partition()->Partition - << " skiped because already initialized."); + PQ_LOG_I("Initializing EndWriteTimestamp skipped because already initialized."); return Done(ctx); } @@ -682,9 +701,7 @@ void TInitEndWriteTimestampStep::Execute(const TActorContext &ctx) { Partition()->PendingWriteTimestamp = Partition()->EndWriteTimestamp; } - PQ_LOG_I("Initializing EndWriteTimestamp of the topic '" << Partition()->TopicName() - << "' partition " << Partition()->Partition - << " from keys completed. Value " << Partition()->EndWriteTimestamp); + PQ_LOG_I("Initializing EndWriteTimestamp from keys completed. Value " << Partition()->EndWriteTimestamp); return Done(ctx); } diff --git a/ydb/core/persqueue/partition_init.h b/ydb/core/persqueue/partition_init.h index 0da5a1a505cc..625f1ac3a1c4 100644 --- a/ydb/core/persqueue/partition_init.h +++ b/ydb/core/persqueue/partition_init.h @@ -18,6 +18,11 @@ namespace NKikimr::NPQ { class TInitializerStep; class TPartition; +struct TInitializionContext { + std::optional StartOffset; + std::optional EndOffset; +}; + /** * This class execute independent steps of parttition actor initialization. @@ -40,13 +45,15 @@ class TInitializer { private: void DoNext(const TActorContext& ctx); + TString LogPrefix() const; + TPartition* Partition; bool InProgress; TVector> Steps; std::vector>::iterator CurrentStep; - + TInitializionContext Ctx; }; /** @@ -63,7 +70,8 @@ class TInitializerStep { TPartition* Partition() const; const TPartitionId& PartitionId() const; - TString TopicName() const; + const TString& TopicName() const; + TInitializionContext& GetContext(); const TString Name; const bool SkipNewPartition; @@ -72,6 +80,8 @@ class TInitializerStep { void Done(const TActorContext& ctx); void PoisonPill(const TActorContext& ctx); + TString LogPrefix() const; + private: TInitializer* Initializer; }; diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp index b850b2614a86..fe36e9acd801 100644 --- a/ydb/core/persqueue/partition_read.cpp +++ b/ydb/core/persqueue/partition_read.cpp @@ -187,8 +187,7 @@ void TPartition::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont Y_ABORT_UNLESS(record.HasSender()); auto cookie = record.HasCookie() ? TMaybe(record.GetCookie()) : TMaybe(); - - auto readTimestamp = GetReadFrom(record.GetMaxTimeLagMs(), record.GetReadTimestampMs(), TInstant::Zero() /* TODO */, ctx); + auto readTimestamp = GetReadFrom(record.GetMaxTimeLagMs(), record.GetReadTimestampMs(), TInstant::Zero(), ctx); TActorId sender = ActorIdFromProto(record.GetSender()); if (InitDone && EndOffset > (ui64)record.GetOffset() && (!readTimestamp || EndWriteTimestamp >= *readTimestamp)) { //already has data, answer right now diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index ac9060fe0c5d..5269e444b048 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -1256,13 +1256,18 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey ++WriteNewMessagesInternal; } + // Empty partition may will be filling from offset great than zero from mirror actor if source partition old and was clean by retantion time + if (!Head.GetCount() && !NewHead.GetCount() && DataKeysBody.empty() && HeadKeys.empty() && p.Offset) { + StartOffset = *p.Offset; + } + TMaybe partData; if (p.Msg.TotalParts > 1) { //this is multi-part message partData = TPartData(p.Msg.PartNo, p.Msg.TotalParts, p.Msg.TotalSize); } WriteTimestamp = ctx.Now(); WriteTimestampEstimate = p.Msg.WriteTimestamp > 0 ? TInstant::MilliSeconds(p.Msg.WriteTimestamp) : WriteTimestamp; - TClientBlob blob(p.Msg.SourceId, p.Msg.SeqNo, p.Msg.Data, std::move(partData), WriteTimestampEstimate, + TClientBlob blob(p.Msg.SourceId, p.Msg.SeqNo, std::move(p.Msg.Data), std::move(partData), WriteTimestampEstimate, TInstant::MilliSeconds(p.Msg.CreateTimestamp == 0 ? curOffset : p.Msg.CreateTimestamp), p.Msg.UncompressedSize, p.Msg.PartitionKey, p.Msg.ExplicitHashKey); //remove curOffset when LB will report CTime @@ -1338,7 +1343,6 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey PartitionedBlob = TPartitionedBlob(Partition, 0, "", 0, 0, 0, Head, NewHead, true, false, MaxBlobSize); } - TString().swap(p.Msg.Data); return true; } diff --git a/ydb/core/persqueue/ut/internals_ut.cpp b/ydb/core/persqueue/ut/internals_ut.cpp index fd26f8fa6252..0a2477bc3aa0 100644 --- a/ydb/core/persqueue/ut/internals_ut.cpp +++ b/ydb/core/persqueue/ut/internals_ut.cpp @@ -38,11 +38,11 @@ void Test(bool headCompacted, ui32 parts, ui32 partSize, ui32 leftInHead) THead head; head.Offset = 100; - TString value(100_KB, 'a'); head.AddBatch(TBatch(head.Offset, 0)); for (ui32 i = 0; i < 50; ++i) { + TString value(100_KB, 'a'); head.AddBlob(TClientBlob( - "sourceId" + TString(1,'a' + rand() % 26), i + 1, value, TMaybe(), + "sourceId" + TString(1,'a' + rand() % 26), i + 1, std::move(value), TMaybe(), TInstant::MilliSeconds(i + 1), TInstant::MilliSeconds(i + 1), 1, "", "" )); if (!headCompacted) @@ -64,8 +64,9 @@ void Test(bool headCompacted, ui32 parts, ui32 partSize, ui32 leftInHead) newHead.Offset = head.GetNextOffset(); newHead.AddBatch(TBatch(newHead.Offset, 0)); for (ui32 i = 0; i < 10; ++i) { + TString value(100_KB, 'a'); newHead.AddBlob(TClientBlob( - "sourceId2", i + 1, value, TMaybe(), + "sourceId2", i + 1, std::move(value), TMaybe(), TInstant::MilliSeconds(i + 1000), TInstant::MilliSeconds(i + 1000), 1, "", "" )); all.push_back(newHead.GetLastBatch().Blobs.back()); //newHead always glued @@ -82,8 +83,9 @@ void Test(bool headCompacted, ui32 parts, ui32 partSize, ui32 leftInHead) UNIT_ASSERT(!blob.IsComplete()); UNIT_ASSERT(blob.IsNextPart("sourceId3", 1, i, &error)); TMaybe partData = TPartData(i, parts, value2.size()); + TString v = value2; TClientBlob clientBlob( - "soruceId3", 1, value2, std::move(partData), + "soruceId3", 1, std::move(v), std::move(partData), TInstant::MilliSeconds(1), TInstant::MilliSeconds(1), 1, "", "" ); all.push_back(clientBlob); @@ -173,11 +175,11 @@ Y_UNIT_TEST(TestPartitionedBigTest) { } Y_UNIT_TEST(TestBatchPacking) { - TString value(10, 'a'); TBatch batch; for (ui32 i = 0; i < 100; ++i) { + TString value(10, 'a'); batch.AddBlob(TClientBlob( - "sourceId1", i + 1, value, TMaybe(), + "sourceId1", i + 1, std::move(value), TMaybe(), TInstant::MilliSeconds(1), TInstant::MilliSeconds(1), 0, "", "" )); } diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index d0af6d2ce201..ea678be2529a 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -239,7 +239,7 @@ class TPartitionFixture : public NUnitTest::TBaseFixture { void WaitDiskStatusRequest(); void SendDiskStatusResponse(TMaybe* cookie = nullptr); void WaitMetaReadRequest(); - void SendMetaReadResponse(TMaybe step, TMaybe txId, TInstant endWriteTimestamp); + void SendMetaReadResponse(ui64 begin, ui64 end, TMaybe step, TMaybe txId, TInstant endWriteTimestamp); void WaitBlobReadRequest(); void SendBlobReadResponse(ui64 begin, ui64 end); void WaitInfoRangeRequest(); @@ -416,7 +416,7 @@ TPartition* TPartitionFixture::CreatePartition(const TCreatePartitionParams& par SendDiskStatusResponse(); WaitMetaReadRequest(); - SendMetaReadResponse(params.PlanStep, params.TxId, params.EndWriteTimestamp); + SendMetaReadResponse(params.Begin, params.End, params.PlanStep, params.TxId, params.EndWriteTimestamp); WaitInfoRangeRequest(); SendInfoRangeResponse(params.Partition.InternalPartitionId, params.Config.Consumers); @@ -777,7 +777,7 @@ void TPartitionFixture::WaitMetaReadRequest() UNIT_ASSERT_VALUES_EQUAL(event->Record.CmdReadSize(), 2); } -void TPartitionFixture::SendMetaReadResponse(TMaybe step, TMaybe txId, TInstant endWriteTimestamp) +void TPartitionFixture::SendMetaReadResponse(ui64 begin, ui64 end, TMaybe step, TMaybe txId, TInstant endWriteTimestamp) { auto event = MakeHolder(); event->Record.SetStatus(NMsgBusProxy::MSTATUS_OK); @@ -790,6 +790,8 @@ void TPartitionFixture::SendMetaReadResponse(TMaybe step, TMaybe txI read->SetStatus(NKikimrProto::OK); NKikimrPQ::TPartitionMeta meta; + meta.SetStartOffset(begin); + meta.SetEndOffset(end); meta.SetEndWriteTimestamp(endWriteTimestamp.MilliSeconds()); TString out; @@ -840,7 +842,7 @@ TBatch CreateBatch(size_t count) { Cerr << ">>>> ADD BLOB " << i << " writeTimestamp=" << (TInstant::Now() - TDuration::MilliSeconds(10)) << Endl << Flush; TString data = TStringBuilder() << "message-data-" << i; - TClientBlob blob("source-id-1", 13 + i /* seqNo */, data, {} /* partData */, TInstant::Now() - TDuration::MilliSeconds(10) /* writeTimestamp */, + TClientBlob blob("source-id-1", 13 + i /* seqNo */, std::move(data), {} /* partData */, TInstant::Now() - TDuration::MilliSeconds(10) /* writeTimestamp */, TInstant::Now() - TDuration::MilliSeconds(50) /* createTimestamp */, data.size(), "partitionKey", "explicitHashKey"); batch.AddBlob(blob); }