Skip to content

Commit

Permalink
ydb_topic: schedule SendImpl from OnCompressedImpl instead of calling…
Browse files Browse the repository at this point in the history
… it directly (#8372)
  • Loading branch information
qyryq authored Aug 28, 2024
1 parent 0b3e86d commit 01f56bd
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 3 deletions.
12 changes: 11 additions & 1 deletion ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,17 @@ TMemoryUsageChange TWriteSessionImpl::OnCompressedImpl(TBlock&& block) {
(*Counters->BytesInflightCompressed) += block.Data.size();

PackedMessagesToSend.emplace(std::move(block));
SendImpl();

if (!SendImplScheduled.exchange(true)) {
CompressionExecutor->Post([cbContext = SelfContext]() {
if (auto self = cbContext->LockShared()) {
self->SendImplScheduled = false;
with_lock (self->Lock) {
self->SendImpl();
}
}
});
}
return memoryUsage;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
const size_t MaxBlockMessageCount = 1; //!< Max message count that can be packed into a single block. In block version 0 is equal to 1 for compatibility
bool Connected = false;
bool Started = false;
std::atomic<bool> SendImplScheduled = false;
TAtomic Aborting = 0;
bool SessionEstablished = false;
ui32 PartitionId = 0;
Expand Down
5 changes: 3 additions & 2 deletions ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,14 +455,15 @@ Y_UNIT_TEST_SUITE(BasicUsage) {

UNIT_ASSERT(!futureWrite.HasValue());
Cerr << ">>>TEST: future write has no value " << Endl;
RunTasks(stepByStepExecutor, {0});
RunTasks(stepByStepExecutor, {0}); // Run compression task.
RunTasks(stepByStepExecutor, {1}); // Run send task.
futureWrite.GetValueSync();
UNIT_ASSERT(futureWrite.HasValue());
Cerr << ">>>TEST: future write has value " << Endl;

UNIT_ASSERT(!futureRead.HasValue());
Cerr << ">>>TEST: future read has no value " << Endl;
RunTasks(stepByStepExecutor, {1});
RunTasks(stepByStepExecutor, {2}); // Run decompression task.
futureRead.GetValueSync();
UNIT_ASSERT(futureRead.HasValue());
Cerr << ">>>TEST: future read has value " << Endl;
Expand Down

0 comments on commit 01f56bd

Please sign in to comment.