Skip to content

Commit

Permalink
Merge d75094c into 45af022
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored Apr 3, 2024
2 parents 45af022 + d75094c commit 2497757
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace NKikimr::NPQ {
static const ui32 BATCH_UNPACK_SIZE_BORDER = 500_KB;
static const ui32 MAX_WRITE_CYCLE_SIZE = 16_MB;
static const ui32 MAX_INLINE_SIZE = 1000;
static const TDuration SubDomainQuotaWaitDurationMs = TDuration::Seconds(60);

static constexpr NPersQueue::NErrorCode::EErrorCode InactivePartitionErrorCode = NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_INACTIVE;

Expand Down Expand Up @@ -1417,8 +1418,12 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq

void TPartition::SetDeadlinesForWrites(const TActorContext& ctx) {
PQ_LOG_T("TPartition::SetDeadlinesForWrites.");
if (AppData(ctx)->PQConfig.GetQuotingConfig().GetQuotaWaitDurationMs() > 0 && QuotaDeadline == TInstant::Zero()) {
QuotaDeadline = ctx.Now() + TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetQuotingConfig().GetQuotaWaitDurationMs());
auto quotaWaitDurationMs = TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetQuotingConfig().GetQuotaWaitDurationMs());
if (!quotaWaitDurationMs && SubDomainOutOfSpace) {
quotaWaitDurationMs = SubDomainQuotaWaitDurationMs;
}
if (quotaWaitDurationMs > TDuration::Zero() && QuotaDeadline == TInstant::Zero()) {
QuotaDeadline = ctx.Now() + quotaWaitDurationMs;

ctx.Schedule(QuotaDeadline, new TEvPQ::TEvQuotaDeadlineCheck());
}
Expand Down Expand Up @@ -1500,7 +1505,7 @@ void TPartition::FilterDeadlinedWrites(const TActorContext& ctx, TMessageQueue&
{
TMessageQueue newRequests;
for (auto& w : requests) {
if (!w.IsWrite() || w.GetWrite().Msg.IgnoreQuotaDeadline) {
if (!w.IsWrite() || (w.GetWrite().Msg.IgnoreQuotaDeadline && !SubDomainOutOfSpace)) {
newRequests.emplace_back(std::move(w));
continue;
}
Expand All @@ -1512,7 +1517,8 @@ void TPartition::FilterDeadlinedWrites(const TActorContext& ctx, TMessageQueue&
WriteInflightSize -= msg.Data.size();
}

ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::OVERLOAD, "quota exceeded");
TString errorMsg = SubDomainOutOfSpace ? "database size exceeded" : "quota exceeded";
ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::OVERLOAD, errorMsg);
}
requests = std::move(newRequests);
}
Expand Down

0 comments on commit 2497757

Please sign in to comment.