Skip to content

Commit

Permalink
Reply error on database size exceeded (#12110)
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored Nov 29, 2024
1 parent 10fc4f2 commit d6d5488
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 57 deletions.
14 changes: 10 additions & 4 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace NKikimr::NPQ {

static const ui32 BATCH_UNPACK_SIZE_BORDER = 500_KB;
static const ui32 MAX_INLINE_SIZE = 1000;
static const TDuration SubDomainQuotaWaitDurationMs = TDuration::Seconds(60);

static constexpr NPersQueue::NErrorCode::EErrorCode InactivePartitionErrorCode = NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_INACTIVE;

Expand Down Expand Up @@ -1483,8 +1484,12 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq

void TPartition::SetDeadlinesForWrites(const TActorContext& ctx) {
PQ_LOG_T("TPartition::SetDeadlinesForWrites.");
if (AppData(ctx)->PQConfig.GetQuotingConfig().GetQuotaWaitDurationMs() > 0 && QuotaDeadline == TInstant::Zero()) {
QuotaDeadline = ctx.Now() + TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetQuotingConfig().GetQuotaWaitDurationMs());
auto quotaWaitDurationMs = TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetQuotingConfig().GetQuotaWaitDurationMs());
if (SubDomainOutOfSpace) {
quotaWaitDurationMs = quotaWaitDurationMs ? std::min(quotaWaitDurationMs, SubDomainQuotaWaitDurationMs) : SubDomainQuotaWaitDurationMs;
}
if (quotaWaitDurationMs > TDuration::Zero() && QuotaDeadline == TInstant::Zero()) {
QuotaDeadline = ctx.Now() + quotaWaitDurationMs;

ctx.Schedule(QuotaDeadline, new TEvPQ::TEvQuotaDeadlineCheck());
}
Expand Down Expand Up @@ -1513,7 +1518,7 @@ void TPartition::FilterDeadlinedWrites(const TActorContext& ctx, TMessageQueue&
{
TMessageQueue newRequests;
for (auto& w : requests) {
if (!w.IsWrite() || w.GetWrite().Msg.IgnoreQuotaDeadline) {
if (!w.IsWrite() || (w.GetWrite().Msg.IgnoreQuotaDeadline && !SubDomainOutOfSpace)) {
newRequests.emplace_back(std::move(w));
continue;
}
Expand All @@ -1529,7 +1534,8 @@ void TPartition::FilterDeadlinedWrites(const TActorContext& ctx, TMessageQueue&
WriteInflightSize -= msg.Data.size();
}

ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::OVERLOAD, "quota exceeded");
TString errorMsg = SubDomainOutOfSpace ? "database size exceeded" : "quota exceeded";
ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::OVERLOAD, errorMsg);
}
requests = std::move(newRequests);
}
Expand Down
112 changes: 59 additions & 53 deletions ydb/core/persqueue/ut/partition_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ class TPartitionFixture : public NUnitTest::TBaseFixture {
void ShadowPartitionCountersTest(bool isFirstClass);

void TestWriteSubDomainOutOfSpace(TDuration quotaWaitDuration, bool ignoreQuotaDeadline);
void TestWriteSubDomainOutOfSpace_DeadlineWork(bool ignoreQuotaDeadline);
void WaitKeyValueRequest(TMaybe<ui64>& cookie);

void CmdChangeOwner(ui64 cookie, const TString& sourceId, TDuration duration, TString& ownerCookie);
Expand Down Expand Up @@ -1206,6 +1207,62 @@ void TPartitionFixture::EmulateKVTablet()
Cerr << "Send disk status response with cookie: " << cookie.GetOrElse(0) << Endl;
}

void TPartitionFixture::TestWriteSubDomainOutOfSpace_DeadlineWork(bool ignoreQuotaDeadline)
{
Ctx->Runtime->GetAppData().FeatureFlags.SetEnableTopicDiskSubDomainQuota(true);
Ctx->Runtime->GetAppData().PQConfig.MutableQuotingConfig()->SetQuotaWaitDurationMs(300);
CreatePartition({
.Partition=TPartitionId{1},
.Begin=0, .End=0,
//
// partition configuration
//
.Config={.Version=1, .Consumers={{.Consumer="client-1", .Offset=3}}}
},
//
// tablet configuration
//
{.Version=2, .Consumers={{.Consumer="client-1"}}});
TMaybe<ui64> kvCookie;

SendSubDomainStatus(true);

ui64 cookie = 1;
ui64 messageNo = 0;
TString ownerCookie;

CmdChangeOwner(cookie, "owner1", TDuration::Seconds(1), ownerCookie);

TAutoPtr<IEventHandle> handle;
std::function<bool(const TEvPQ::TEvError&)> truth = [&](const TEvPQ::TEvError& e) {
return cookie == e.Cookie;
};

TString data = "data for write";

// First message will be processed because used storage 0 and limit 0. That is, the limit is not exceeded.
SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data, ignoreQuotaDeadline);
messageNo++;

WaitKeyValueRequest(kvCookie); // the partition saves the TEvPQ::TEvWrite event
SendDiskStatusResponse(&kvCookie);

{
auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvProxyResponse>(TDuration::Seconds(1));
UNIT_ASSERT(event != nullptr);
}

// Second message will not be processed because the limit is exceeded.
SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data, ignoreQuotaDeadline);
messageNo++;

{
auto event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvError>(handle, truth, TDuration::Seconds(1));
UNIT_ASSERT(event != nullptr);
UNIT_ASSERT_EQUAL(NPersQueue::NErrorCode::OVERLOAD, event->ErrorCode);
}
}

void TPartitionFixture::TestWriteSubDomainOutOfSpace(TDuration quotaWaitDuration, bool ignoreQuotaDeadline)
{
Ctx->Runtime->GetAppData().FeatureFlags.SetEnableTopicDiskSubDomainQuota(true);
Expand Down Expand Up @@ -2323,58 +2380,7 @@ Y_UNIT_TEST_F(ReserveSubDomainOutOfSpace, TPartitionFixture)

Y_UNIT_TEST_F(WriteSubDomainOutOfSpace, TPartitionFixture)
{
Ctx->Runtime->GetAppData().FeatureFlags.SetEnableTopicDiskSubDomainQuota(true);
Ctx->Runtime->GetAppData().PQConfig.MutableQuotingConfig()->SetQuotaWaitDurationMs(300);
CreatePartition({
.Partition=TPartitionId{1},
.Begin=0, .End=0,
//
// partition configuration
//
.Config={.Version=1, .Consumers={{.Consumer="client-1", .Offset=3}}}
},
//
// tablet configuration
//
{.Version=2, .Consumers={{.Consumer="client-1"}}});
TMaybe<ui64> kvCookie;

SendSubDomainStatus(true);

ui64 cookie = 1;
ui64 messageNo = 0;
TString ownerCookie;

CmdChangeOwner(cookie, "owner1", TDuration::Seconds(1), ownerCookie);

TAutoPtr<IEventHandle> handle;
std::function<bool(const TEvPQ::TEvError&)> truth = [&](const TEvPQ::TEvError& e) {
return cookie == e.Cookie;
};

TString data = "data for write";

// First message will be processed because used storage 0 and limit 0. That is, the limit is not exceeded.
SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data);
messageNo++;

WaitKeyValueRequest(kvCookie); // the partition saves the TEvPQ::TEvWrite event
SendDiskStatusResponse(&kvCookie);

{
auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvProxyResponse>(TDuration::Seconds(1));
UNIT_ASSERT(event != nullptr);
}

// Second message will not be processed because the limit is exceeded.
SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data);
messageNo++;

{
auto event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvError>(handle, truth, TDuration::Seconds(1));
UNIT_ASSERT(event != nullptr);
UNIT_ASSERT_EQUAL(NPersQueue::NErrorCode::OVERLOAD, event->ErrorCode);
}
TestWriteSubDomainOutOfSpace_DeadlineWork(false);
}

Y_UNIT_TEST_F(WriteSubDomainOutOfSpace_DisableExpiration, TPartitionFixture)
Expand All @@ -2384,7 +2390,7 @@ Y_UNIT_TEST_F(WriteSubDomainOutOfSpace_DisableExpiration, TPartitionFixture)

Y_UNIT_TEST_F(WriteSubDomainOutOfSpace_IgnoreQuotaDeadline, TPartitionFixture)
{
TestWriteSubDomainOutOfSpace(TDuration::MilliSeconds(300), true);
TestWriteSubDomainOutOfSpace_DeadlineWork(true);
}

Y_UNIT_TEST_F(GetPartitionWriteInfoSuccess, TPartitionFixture) {
Expand Down

0 comments on commit d6d5488

Please sign in to comment.