Skip to content

Commit

Permalink
split to few files
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov committed Jan 10, 2024
1 parent 4ba5c57 commit a4220ad
Show file tree
Hide file tree
Showing 12 changed files with 831 additions and 619 deletions.
2 changes: 1 addition & 1 deletion ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class TPartition : public TActorBootstrapped<TPartition> {

void ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst, const i64 offset, const TInstant writeTimestamp, const TInstant createTimestamp);
void ReplyOk(const TActorContext& ctx, const ui64 dst);
void ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& ownerCookie);
void ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& ownerCookie, bool registered);

void ReplyWrite(const TActorContext& ctx, ui64 dst, const TString& sourceId, ui64 seqNo, ui16 partNo, ui16 totalParts, ui64 offset, TInstant writeTimestamp, bool already, ui64 maxSeqNo, TDuration partitionQuotedTime, TDuration topicQuotedTime, TDuration queueTime, TDuration writeTime);

Expand Down
1 change: 1 addition & 0 deletions ydb/core/persqueue/partition_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ struct TWriteMsg {
struct TOwnershipMsg {
ui64 Cookie;
TString OwnerCookie;
bool Registered;
};

struct TRegisterMessageGroupMsg {
Expand Down
16 changes: 11 additions & 5 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,18 @@ static const ui32 MAX_INLINE_SIZE = 1000;

static constexpr NPersQueue::NErrorCode::EErrorCode InactivePartitionErrorCode = NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_IS_FULL;

void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie) {
void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie, bool registered) {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ReplyOwnerOk. Partition: " << Partition);

THolder<TEvPQ::TEvProxyResponse> response = MakeHolder<TEvPQ::TEvProxyResponse>(dst);
NKikimrClient::TResponse& resp = *response->Response;
resp.SetStatus(NMsgBusProxy::MSTATUS_OK);
resp.SetErrorCode(NPersQueue::NErrorCode::OK);
resp.MutablePartitionResponse()->MutableCmdGetOwnershipResult()->SetOwnerCookie(cookie);
auto* r = resp.MutablePartitionResponse()->MutableCmdGetOwnershipResult();
r->SetOwnerCookie(cookie);
r->SetStatus(PartitionConfig->GetStatus());
r->SetRegistered(registered);

ctx.Send(Tablet, response.Release());
}

Expand Down Expand Up @@ -154,8 +158,9 @@ void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, c
ReservedSize -= it->second.ReservedSize;

it->second.GenerateCookie(owner, ev->PipeClient, ev->Sender, TopicName(), Partition, ctx);//will change OwnerCookie
bool registered = SourceIdStorage.GetInMemorySourceIds().contains(owner);
//cookie is generated. but answer will be sent when all inflight writes will be done - they in the same queue 'Requests'
EmplaceRequest(TOwnershipMsg{ev->Cookie, it->second.OwnerCookie}, ctx);
EmplaceRequest(TOwnershipMsg{ev->Cookie, it->second.OwnerCookie, registered}, ctx);
TabletCounters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(ReservedSize);
UpdateWriteBufferIsFullState(ctx.Now());
ProcessReserveRequests(ctx);
Expand Down Expand Up @@ -346,10 +351,11 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
if (!already && partNo + 1 == totalParts && !writeResponse.Msg.HeartbeatVersion)
++offset;
} else if (response.IsOwnership()) {
const TString& ownerCookie = response.GetOwnership().OwnerCookie;
const auto& r = response.GetOwnership();
const TString& ownerCookie = r.OwnerCookie;
auto it = Owners.find(TOwnerInfo::GetOwnerFromOwnerCookie(ownerCookie));
if (it != Owners.end() && it->second.OwnerCookie == ownerCookie) {
ReplyOwnerOk(ctx, response.GetCookie(), ownerCookie);
ReplyOwnerOk(ctx, response.GetCookie(), ownerCookie, r.Registered);
} else {
ReplyError(ctx, response.GetCookie(), NPersQueue::NErrorCode::WRONG_COOKIE, "new GetOwnership request is dropped already");
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/persqueue/writer/partition_chooser.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class IPartitionChooser {

virtual const TPartitionInfo* GetPartition(const TString& sourceId) const = 0;
virtual const TPartitionInfo* GetPartition(ui32 partitionId) const = 0;
virtual const TPartitionInfo* GetRandomPartition() const = 0;
};

std::shared_ptr<IPartitionChooser> CreatePartitionChooser(const NKikimrSchemeOp::TPersQueueGroupDescription& config, bool withoutHash = false);
Expand Down
Loading

0 comments on commit a4220ad

Please sign in to comment.